From 24434e9ecfd51aab97c1bcacf652f9358badd0d2 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Wed, 9 Jan 2019 10:43:41 +0100 Subject: [PATCH] Delay the end handler notification when an stream end handler is called - see #2852 --- .../vertx/core/file/impl/AsyncFileImpl.java | 34 ++++++---- .../http/impl/Http1xClientConnection.java | 11 ++-- .../http/impl/Http2ServerRequestImpl.java | 9 ++- .../impl/Http2UpgradedClientConnection.java | 5 -- .../http/impl/HttpClientResponseImpl.java | 64 +++++++++++++------ .../core/http/impl/HttpClientStream.java | 1 - .../core/http/impl/HttpServerRequestImpl.java | 12 +++- .../core/http/impl/VertxHttp2NetSocket.java | 3 +- .../core/http/impl/VertxHttp2Stream.java | 14 ++-- .../core/http/impl/WebSocketImplBase.java | 16 ++++- .../io/vertx/core/net/impl/NetSocketImpl.java | 9 +-- .../core/streams/impl/InboundBuffer.java | 12 ++-- .../io/vertx/core/file/FileSystemTest.java | 22 +++++++ .../java/io/vertx/core/http/Http1xTest.java | 5 +- .../java/io/vertx/core/http/HttpTest.java | 42 +++++++++++- .../io/vertx/core/http/WebSocketTest.java | 24 ++++++- src/test/java/io/vertx/core/net/NetTest.java | 22 +++++++ 17 files changed, 229 insertions(+), 76 deletions(-) diff --git a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java index 46893817a..7cbe761a5 100644 --- a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java +++ b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java @@ -69,6 +69,7 @@ public class AsyncFileImpl implements AsyncFile { private int lwm = maxWrites / 2; private int readBufferSize = DEFAULT_READ_BUFFER_SIZE; private InboundBuffer queue; + private Handler handler; private Handler endHandler; private long readPos; private long readLength = Long.MAX_VALUE; @@ -102,7 +103,13 @@ public class AsyncFileImpl implements AsyncFile { } this.context = context; this.queue = new InboundBuffer<>(context, 0); - + queue.handler(buff -> { + if (buff.length() > 0) { + handleBuffer(buff); + } else { + handleEnd(); + } + }); queue.drainHandler(v -> { doRead(); }); @@ -235,7 +242,7 @@ public class AsyncFileImpl implements AsyncFile { if (closed) { return this; } - queue.handler(handler); + this.handler = handler; if (handler != null) { doRead(); } else { @@ -346,15 +353,11 @@ public class AsyncFileImpl implements AsyncFile { doRead(buff, 0, bb, readPos, ar -> { if (ar.succeeded()) { Buffer buffer = ar.result(); - if (buffer.length() == 0) { - // Empty buffer represents end of file - handleEnd(); - } else { - readPos += buffer.length(); - readLength -= buffer.length(); - if (queue.write(buffer)) { - doRead(bb); - } + readPos += buffer.length(); + readLength -= buffer.length(); + // Empty buffer represents end of file + if (queue.write(buffer) && buffer.length() > 0) { + doRead(bb); } } else { handleException(ar.cause()); @@ -362,8 +365,15 @@ public class AsyncFileImpl implements AsyncFile { }); } + private synchronized void handleBuffer(Buffer buff) { + if (handler != null) { + checkContext(); + handler.handle(buff); + } + } + private synchronized void handleEnd() { - queue.handler(null); + handler = null; if (endHandler != null) { checkContext(); endHandler.handle(null); diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 46bde3950..9579f410a 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -348,12 +348,9 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC @Override public void doFetch(long amount) { - queue.fetch(amount); - } - - @Override - public void doResume() { - queue.resume(); + if (!queue.fetch(amount)) { + response.handleEnd(trailers); + } } @Override @@ -489,7 +486,7 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC } } trailers = new HeadersAdaptor(trailer.trailingHeaders()); - if (queue.isEmpty()) { + if (queue.isEmpty() && !queue.isPaused()) { response.handleEnd(trailers); } responseEnded = true; diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java b/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java index 9db7167ad..0c3935e8d 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java @@ -225,19 +225,22 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream dataHandler) { + public HttpClientResponse handler(Handler handle) { synchronized (conn) { - this.dataHandler = dataHandler; + if (handle != null) { + checkEnded(); + } + dataHandler = handle; return this; } } @Override - public HttpClientResponse endHandler(Handler endHandler) { + public HttpClientResponse endHandler(Handler handler) { synchronized (conn) { - this.endHandler = endHandler; + if (handler != null) { + checkEnded(); + } + endHandler = handler; return this; } } @Override - public HttpClientResponse exceptionHandler(Handler exceptionHandler) { + public HttpClientResponse exceptionHandler(Handler handler) { synchronized (conn) { - this.exceptionHandler = exceptionHandler; + if (handler != null) { + checkEnded(); + } + exceptionHandler = handler; return this; } } @@ -164,8 +179,7 @@ public class HttpClientResponseImpl implements HttpClientResponse { @Override public HttpClientResponse resume() { - stream.doResume(); - return this; + return fetch(Long.MAX_VALUE); } @Override @@ -175,16 +189,24 @@ public class HttpClientResponseImpl implements HttpClientResponse { } @Override - public HttpClientResponse bodyHandler(final Handler bodyHandler) { - BodyHandler handler = new BodyHandler(); - handler(handler); - endHandler(v -> handler.notifyHandler(bodyHandler)); + public HttpClientResponse bodyHandler(final Handler handler) { + if (handler != null) { + BodyHandler bodyHandler = new BodyHandler(); + handler(bodyHandler); + endHandler(v -> bodyHandler.notifyHandler(handler)); + } else { + handler(null); + endHandler(null); + } return this; } @Override public HttpClientResponse customFrameHandler(Handler handler) { synchronized (conn) { + if (endHandler != null) { + checkEnded(); + } customFrameHandler = handler; return this; } @@ -217,16 +239,19 @@ public class HttpClientResponseImpl implements HttpClientResponse { } void handleEnd(MultiMap trailers) { + Handler handler; synchronized (conn) { stream.reportBytesRead(bytesRead); bytesRead = 0; this.trailers = trailers; - if (endHandler != null) { - try { - endHandler.handle(null); - } catch (Throwable t) { - handleException(t); - } + handler = endHandler; + endHandler = null; + } + if (handler != null) { + try { + handler.handle(null); + } catch (Throwable t) { + handleException(t); } } } @@ -277,6 +302,9 @@ public class HttpClientResponseImpl implements HttpClientResponse { @Override public HttpClientResponse streamPriorityHandler(Handler handler) { synchronized (conn) { + if (handler != null) { + checkEnded(); + } priorityHandler = handler; } return this; diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java index b32488a61..bfc94c1ab 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java @@ -49,7 +49,6 @@ interface HttpClientStream { void doSetWriteQueueMaxSize(int size); boolean isNotWritable(); void doPause(); - void doResume(); void doFetch(long amount); void reset(long code); diff --git a/src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java index 806b39d91..8d17f991d 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java @@ -161,7 +161,7 @@ public class HttpServerRequestImpl implements HttpServerRequest { boolean end = ended; ended = false; handleBegin(); - if (pending != null && pending.size() > 0) { + if (pending != null && pending.isPaused()) { pending.resume(); } if (end) { @@ -340,7 +340,13 @@ public class HttpServerRequestImpl implements HttpServerRequest { public HttpServerRequest resume() { synchronized (conn) { if (!isEnded()) { - pendingQueue().resume(); + if (ended) { + if (!pending.resume()) { + doEnd(); + } + } else if (pending != null) { + pending.resume(); + } } return this; } @@ -478,7 +484,7 @@ public class HttpServerRequestImpl implements HttpServerRequest { @Override public boolean isEnded() { synchronized (conn) { - return ended && (pending == null || pending.isEmpty()); + return ended && (pending == null || (!pending.isPaused() && pending.isEmpty())); } } diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java index 1cf422bd7..494f80bfc 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java @@ -151,8 +151,7 @@ class VertxHttp2NetSocket extends VertxHttp2Strea @Override public NetSocket resume() { - doResume(); - return this; + return fetch(Long.MAX_VALUE); } @Override diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java index 240d5be7b..149e24361 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java @@ -36,7 +36,7 @@ abstract class VertxHttp2Stream { protected final ChannelHandlerContext handlerContext; protected final Http2Stream stream; - private InboundBuffer pending; + private final InboundBuffer pending; private int pendingBytes; private MultiMap trailers; private boolean writable; @@ -95,7 +95,7 @@ abstract class VertxHttp2Stream { void onEnd(MultiMap map) { synchronized (conn) { trailers = map; - if (pending.isEmpty()) { + if (pending.isEmpty() && !pending.isPaused()) { handleEnd(trailers); } } @@ -109,12 +109,12 @@ abstract class VertxHttp2Stream { pending.pause(); } - public void doResume() { - pending.resume(); - } - public void doFetch(long amount) { - pending.fetch(amount); + if (!pending.fetch(amount)) { + if (trailers != null) { + handleEnd(trailers); + } + } } boolean isNotWritable() { diff --git a/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java b/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java index 62aaae37b..21ac3c5c8 100644 --- a/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java +++ b/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java @@ -21,6 +21,7 @@ import io.vertx.core.http.WebSocketBase; import io.vertx.core.http.WebSocketFrame; import io.vertx.core.http.impl.ws.WebSocketFrameImpl; import io.vertx.core.http.impl.ws.WebSocketFrameInternal; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.core.net.SocketAddress; import io.vertx.core.streams.impl.InboundBuffer; @@ -444,7 +445,7 @@ public abstract class WebSocketImplBase implements WebS Handler endHandler; Handler closeHandler; synchronized (conn) { - endHandler = this.endHandler; + endHandler = pending.isPaused() ? null : this.endHandler; closeHandler = this.closeHandler; closed = true; binaryHandlerRegistration = null; @@ -548,8 +549,17 @@ public abstract class WebSocketImplBase implements WebS @Override public S resume() { - if (!isClosed()) { - pending.resume(); + synchronized (this) { + if (isClosed()) { + Handler handler = endHandler; + endHandler = null; + if (handler != null) { + ContextInternal ctx = conn.getContext(); + ctx.runOnContext(v -> handler.handle(null)); + } + } else { + pending.resume(); + } } return (S) this; } diff --git a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java index e4834bd26..ed3a9f241 100644 --- a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java @@ -193,14 +193,15 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { @Override public NetSocket fetch(long amount) { - pending.fetch(amount); + if (!pending.fetch(amount)) { + checkEnd(); + } return this; } @Override public synchronized NetSocket resume() { - pending.resume(); - return this; + return fetch(Long.MAX_VALUE); } @Override @@ -358,7 +359,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { private void checkEnd() { Handler handler; synchronized (this) { - if (!closed || pending.size() > 0 || (handler = endHandler) == null) { + if (!closed || pending.isPaused() || (handler = endHandler) == null) { return; } } diff --git a/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java b/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java index fadd6a7c4..55aa53920 100644 --- a/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java +++ b/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java @@ -251,9 +251,9 @@ public class InboundBuffer { *

* This method can be called from any thread. * - * @return a reference to this, so the API can be used fluently + * @return {@code true} when the buffer will be drained */ - public InboundBuffer fetch(long amount) { + public boolean fetch(long amount) { if (amount < 0L) { throw new IllegalArgumentException(); } @@ -263,12 +263,12 @@ public class InboundBuffer { demand = Long.MAX_VALUE; } if (emitting || (pending.isEmpty() && !overflow)) { - return this; + return false; } emitting = true; } context.runOnContext(v -> drain()); - return this; + return true; } /** @@ -313,9 +313,9 @@ public class InboundBuffer { *

* This method can be called from any thread. * - * @return a reference to this, so the API can be used fluently + * @return {@code true} when the buffer will be drained */ - public InboundBuffer resume() { + public boolean resume() { return fetch(Long.MAX_VALUE); } diff --git a/src/test/java/io/vertx/core/file/FileSystemTest.java b/src/test/java/io/vertx/core/file/FileSystemTest.java index d4daaf9eb..7cbe8ece5 100644 --- a/src/test/java/io/vertx/core/file/FileSystemTest.java +++ b/src/test/java/io/vertx/core/file/FileSystemTest.java @@ -1748,6 +1748,28 @@ public class FileSystemTest extends VertxTestBase { await(); } + @Test + public void testPausedEnd() throws Exception { + String fileName = "some-file.dat"; + createFile(fileName, new byte[0]); + AtomicBoolean paused = new AtomicBoolean(false); + vertx.fileSystem().open(testDir + pathSep + fileName, new OpenOptions(), onSuccess(file -> { + Buffer buffer = Buffer.buffer(); + paused.set(true); + file.pause(); + vertx.setTimer(100, id -> { + paused.set(false); + file.resume(); + }); + file.endHandler(v -> { + assertFalse(paused.get()); + testComplete(); + }); + file.handler(buffer::appendBuffer); + })); + await(); + } + private Handler> createHandler(boolean shouldPass, Handler afterOK) { return ar -> { if (ar.failed()) { diff --git a/src/test/java/io/vertx/core/http/Http1xTest.java b/src/test/java/io/vertx/core/http/Http1xTest.java index 230432431..8d8c07666 100644 --- a/src/test/java/io/vertx/core/http/Http1xTest.java +++ b/src/test/java/io/vertx/core/http/Http1xTest.java @@ -14,7 +14,6 @@ package io.vertx.core.http; import io.netty.handler.codec.TooLongFrameException; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.impl.HttpClientRequestImpl; import io.vertx.core.http.impl.HttpServerImpl; import io.vertx.core.http.impl.HttpUtils; import io.vertx.core.impl.ConcurrentHashSet; @@ -2741,9 +2740,9 @@ public class Http1xTest extends HttpTest { client = vertx.createHttpClient(new HttpClientOptions().setMaxPoolSize(1).setPipelining(true).setKeepAlive(true)); AtomicInteger connCount = new AtomicInteger(); client.connectionHandler(conn -> connCount.incrementAndGet()); - HttpClientRequestImpl req = (HttpClientRequestImpl) client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/first", resp -> { + HttpClientRequest req = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/first", onSuccess(resp -> { fail(); - }); + })); req.reset(0); CountDownLatch respLatch = new CountDownLatch(2); client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/second", onSuccess(resp -> { diff --git a/src/test/java/io/vertx/core/http/HttpTest.java b/src/test/java/io/vertx/core/http/HttpTest.java index 97d5f665f..153cd4d0c 100644 --- a/src/test/java/io/vertx/core/http/HttpTest.java +++ b/src/test/java/io/vertx/core/http/HttpTest.java @@ -2688,13 +2688,23 @@ public abstract class HttpTest extends HttpTestBase { } @Test - public void testPausedHttpServerRequestDuringLastChunkEndsTheRequest() throws Exception { + public void testHttpServerRequestPausedDuringLastChunk() throws Exception { server.requestHandler(req -> { + AtomicBoolean ended = new AtomicBoolean(); + AtomicBoolean paused = new AtomicBoolean(); req.handler(buff -> { assertEquals("small", buff.toString()); req.pause(); + paused.set(true); + vertx.setTimer(20, id -> { + assertFalse(ended.get()); + paused.set(false); + req.resume(); + }); }); req.endHandler(v -> { + assertFalse(paused.get()); + ended.set(true); req.response().end(); }); }); @@ -2707,6 +2717,36 @@ public abstract class HttpTest extends HttpTestBase { await(); } + @Test + public void testHttpClientResponsePausedDuringLastChunk() throws Exception { + server.requestHandler(req -> { + req.response().end("small"); + }); + startServer(); + client.close(); + client = vertx.createHttpClient(createBaseClientOptions().setMaxPoolSize(1)); + client.getNow(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/someuri", onSuccess(resp -> { + AtomicBoolean ended = new AtomicBoolean(); + AtomicBoolean paused = new AtomicBoolean(); + resp.handler(buff -> { + assertEquals("small", buff.toString()); + resp.pause(); + paused.set(true); + vertx.setTimer(20, id -> { + assertFalse(ended.get()); + paused.set(false); + resp.resume(); + }); + }); + resp.endHandler(v -> { + assertFalse(paused.get()); + ended.set(true); + complete(); + }); + })); + await(); + } + @Test public void testFormUploadSmallFile() throws Exception { testFormUploadFile(TestUtils.randomAlphaString(100), false); diff --git a/src/test/java/io/vertx/core/http/WebSocketTest.java b/src/test/java/io/vertx/core/http/WebSocketTest.java index 3a44a2073..afdde5c78 100644 --- a/src/test/java/io/vertx/core/http/WebSocketTest.java +++ b/src/test/java/io/vertx/core/http/WebSocketTest.java @@ -2694,4 +2694,26 @@ public class WebSocketTest extends VertxTestBase { })); await(); } -} + + @Test + public void testPausedDuringLastChunk() { + server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT)) + .websocketHandler(ws -> { + AtomicBoolean paused = new AtomicBoolean(true); + ws.pause(); + ws.closeHandler(v -> { + paused.set(false); + ws.resume(); + }); + ws.endHandler(v -> { + assertFalse(paused.get()); + testComplete(); + }); + }) + .listen(onSuccess(v -> { + client.websocket(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/someuri", ws -> { + ws.close(); + }); + })); + await(); + }} diff --git a/src/test/java/io/vertx/core/net/NetTest.java b/src/test/java/io/vertx/core/net/NetTest.java index 846a17f38..6f060277b 100755 --- a/src/test/java/io/vertx/core/net/NetTest.java +++ b/src/test/java/io/vertx/core/net/NetTest.java @@ -3383,6 +3383,28 @@ public class NetTest extends VertxTestBase { awaitLatch(latch); } + @Test + public void testPausedDuringLastChunk() throws Exception { + server.connectHandler(so -> { + AtomicBoolean paused = new AtomicBoolean(); + paused.set(true); + so.pause(); + so.closeHandler(v -> { + paused.set(false); + so.resume(); + }); + so.endHandler(v -> { + assertFalse(paused.get()); + testComplete(); + }); + }); + startServer(); + client.connect(1234, "localhost", onSuccess(so -> { + so.close(); + })); + await(); + } + protected void startServer() throws Exception { startServer(testAddress, vertx.getOrCreateContext()); }