From d717e7556bc7511276ca8bc1fe49b93ced9093ca Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 16 Apr 2019 18:20:24 +0200 Subject: [PATCH] HTTP event handler should not be called when holding a lock and signal end of stream using a sentinel. Fixes #2915 - Fixes #2916 --- .../http/impl/Http1xClientConnection.java | 24 ++-- .../http/impl/Http1xServerConnection.java | 50 +++++--- .../http/impl/Http2ServerRequestImpl.java | 50 ++++---- .../http/impl/HttpClientResponseImpl.java | 14 ++- .../core/http/impl/HttpServerRequestImpl.java | 115 +++++++++--------- .../vertx/core/http/impl/NettyFileUpload.java | 59 ++++----- .../core/http/impl/VertxHttp2Stream.java | 21 ++-- .../io/vertx/core/net/impl/NetSocketImpl.java | 50 ++++---- .../core/streams/impl/InboundBuffer.java | 5 + .../java/io/vertx/core/http/Http1xTest.java | 33 +++++ .../vertx/core/http/HttpMetricsTestBase.java | 7 +- .../java/io/vertx/core/http/HttpTest.java | 88 +++++++++++++- .../io/vertx/core/http/WebSocketTest.java | 27 ---- .../vertx/core/spi/metrics/MetricsTest.java | 3 +- 14 files changed, 327 insertions(+), 219 deletions(-) diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 97049ae36..bbf70ccb4 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -193,12 +193,12 @@ class Http1xClientConnection extends Http1xConnectionBase impleme private final int id; private final Http1xClientConnection conn; private final Future fut; + private final InboundBuffer queue; private HttpClientRequestImpl request; private HttpClientResponseImpl response; private boolean requestEnded; private boolean responseEnded; private boolean reset; - private InboundBuffer queue; private MultiMap trailers; private StreamImpl next; @@ -345,12 +345,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme @Override public void doFetch(long amount) { - synchronized (this) { - if (queue.fetch(amount) || !responseEnded) { - return; - } - } - response.handleEnd(trailers); + queue.fetch(amount); } @Override @@ -460,10 +455,11 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } } } - queue.handler(buf -> response.handleChunk(buf)); - queue.emptyHandler(v -> { - if (responseEnded) { + queue.handler(buf -> { + if (buf == InboundBuffer.END_SENTINEL) { response.handleEnd(trailers); + } else { + response.handleChunk((Buffer) buf); } }); queue.drainHandler(v -> { @@ -486,9 +482,11 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } } trailers = new HeadersAdaptor(trailer.trailingHeaders()); - if (queue.isEmpty() && !queue.isPaused()) { - response.handleEnd(trailers); - } + conn.close |= !conn.options.isKeepAlive(); + conn.doResume(); + } + queue.write(InboundBuffer.END_SENTINEL); + synchronized (conn) { responseEnded = true; conn.close |= !conn.options.isKeepAlive(); conn.doResume(); diff --git a/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java index 9cda5b3ae..99bb46d64 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java @@ -111,7 +111,7 @@ public class Http1xServerConnection extends Http1xConnectionBase responseInProgress.handlePipelined()); + private void handleNext(HttpServerRequestImpl next) { + responseInProgress = next; + getContext().runOnContext(v -> { + next.resume(); + next.handleBegin(requestHandler); + }); } @Override diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java b/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java index 6ce99e478..3a1a4b027 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java @@ -171,34 +171,38 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream handler; + synchronized (conn) { + streamEnded = true; + ended = true; + conn.reportBytesRead(bytesRead); + if (postRequestDecoder != null) { + try { + postRequestDecoder.offer(LastHttpContent.EMPTY_LAST_CONTENT); + while (postRequestDecoder.hasNext()) { + InterfaceHttpData data = postRequestDecoder.next(); + if (data instanceof Attribute) { + Attribute attr = (Attribute) data; + try { + formAttributes().add(attr.getName(), attr.getValue()); + } catch (Exception e) { + // Will never happen, anyway handle it somehow just in case + handleException(e); + } } } + } catch (HttpPostRequestDecoder.EndOfDataDecoderException e) { + // ignore this as it is expected + } catch (Exception e) { + handleException(e); + } finally { + postRequestDecoder.destroy(); } - } catch (HttpPostRequestDecoder.EndOfDataDecoderException e) { - // ignore this as it is expected - } catch (Exception e) { - handleException(e); - } finally { - postRequestDecoder.destroy(); } + handler = endHandler; } - if (endHandler != null) { - endHandler.handle(null); + if (handler != null) { + handler.handle(null); } } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java index d6dafafab..19b90721a 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java @@ -225,15 +225,17 @@ public class HttpClientResponseImpl implements HttpClientResponse { } void handleChunk(Buffer data) { + Handler handler; synchronized (conn) { request.dataReceived(); bytesRead += data.length(); - if (dataHandler != null) { - try { - dataHandler.handle(data); - } catch (Throwable t) { - handleException(t); - } + handler = dataHandler; + } + if (handler != null) { + try { + handler.handle(data); + } catch (Throwable t) { + handleException(t); } } } diff --git a/src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java index 8a5a03169..9eaa01c8c 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java @@ -40,7 +40,6 @@ import java.net.URISyntaxException; import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED; import static io.netty.handler.codec.http.HttpHeaderValues.MULTIPART_FORM_DATA; import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED; -import static io.vertx.core.http.impl.HttpUtils.SC_SWITCHING_PROTOCOLS; /** * This class is optimised for performance when used on the same event loop that is was passed to the handler with. @@ -85,8 +84,7 @@ public class HttpServerRequestImpl implements HttpServerRequest { private HttpPostRequestDecoder decoder; private boolean ended; private long bytesRead; - - private InboundBuffer pending; + private InboundBuffer pending; HttpServerRequestImpl(Http1xServerConnection conn, HttpRequest request) { this.conn = conn; @@ -109,37 +107,38 @@ public class HttpServerRequestImpl implements HttpServerRequest { } } - private InboundBuffer pendingQueue() { + private InboundBuffer pendingQueue() { if (pending == null) { pending = new InboundBuffer<>(conn.getContext(), 8); pending.drainHandler(v -> conn.doResume()); - pending.emptyHandler(v -> { - if (ended) { - doEnd(); + pending.handler(buffer -> { + if (buffer == InboundBuffer.END_SENTINEL) { + onEnd(); + } else { + onData((Buffer) buffer); } }); - pending.handler(this::handleData); } return pending; } - private void enqueueData(Buffer chunk) { - // We queue requests if paused or a request is in progress to prevent responses being written in the wrong order - if (!pendingQueue().write(chunk)) { - // We only pause when we are actively called by the connection - conn.doPause(); - } - } - void handleContent(Buffer buffer) { - if (pending != null) { - enqueueData(buffer); + InboundBuffer queue; + synchronized (conn) { + queue = pending; + } + if (queue != null) { + // We queue requests if paused or a request is in progress to prevent responses being written in the wrong order + if (!queue.write(buffer)) { + // We only pause when we are actively called by the connection + conn.doPause(); + } } else { - handleData(buffer); + onData(buffer); } } - void handleBegin() { + void handleBegin(Handler handler) { if (Metrics.METRICS_ENABLED) { reportRequestBegin(); } @@ -147,33 +146,29 @@ public class HttpServerRequestImpl implements HttpServerRequest { if (conn.handle100ContinueAutomatically) { check100(); } - conn.requestHandler.handle(this); + handler.handle(this); } - void appendRequest(HttpServerRequestImpl next) { + /** + * Enqueue a pipelined request. + * + * @param request the enqueued request + */ + void enqueue(HttpServerRequestImpl request) { HttpServerRequestImpl current = this; while (current.next != null) { current = current.next; } - current.next = next; + current.next = request; } - HttpServerRequestImpl nextRequest() { + /** + * @return the next request following this one + */ + HttpServerRequestImpl next() { return next; } - void handlePipelined() { - boolean end = ended; - ended = false; - handleBegin(); - if (pending != null && pending.isPaused()) { - pending.resume(); - } - if (end) { - handleEnd(); - } - } - private void reportRequestBegin() { if (conn.metrics != null) { metric = conn.metrics.requestBegin(conn.metric(), this); @@ -326,9 +321,7 @@ public class HttpServerRequestImpl implements HttpServerRequest { @Override public HttpServerRequest pause() { synchronized (conn) { - if (!isEnded()) { - pendingQueue().pause(); - } + pendingQueue().pause(); return this; } } @@ -336,15 +329,7 @@ public class HttpServerRequestImpl implements HttpServerRequest { @Override public HttpServerRequest fetch(long amount) { synchronized (conn) { - if (!isEnded()) { - if (ended) { - if (!pending.fetch(amount)) { - doEnd(); - } - } else if (pending != null) { - pending.fetch(amount); - } - } + pendingQueue().fetch(amount); return this; } } @@ -500,7 +485,8 @@ public class HttpServerRequestImpl implements HttpServerRequest { return conn; } - private void handleData(Buffer data) { + private void onData(Buffer data) { + Handler handler; synchronized (conn) { bytesRead += data.length(); if (decoder != null) { @@ -510,28 +496,37 @@ public class HttpServerRequestImpl implements HttpServerRequest { handleException(e); } } - if (dataHandler != null) { - dataHandler.handle(data); - } + handler = dataHandler; + } + if (handler != null) { + handler.handle(data); } } void handleEnd() { + InboundBuffer queue; synchronized (conn) { ended = true; - if (isEnded()) { - doEnd(); - } + queue = pending; + } + if (queue != null) { + queue.write(InboundBuffer.END_SENTINEL); + } else { + onEnd(); } } - private void doEnd() { - if (decoder != null) { - endDecode(); + private void onEnd() { + Handler handler; + synchronized (conn) { + if (decoder != null) { + endDecode(); + } + handler = endHandler; } // If there have been uploads then we let the last one call the end handler once any fileuploads are complete - if (endHandler != null) { - endHandler.handle(null); + if (handler != null) { + handler.handle(null); } } diff --git a/src/main/java/io/vertx/core/http/impl/NettyFileUpload.java b/src/main/java/io/vertx/core/http/impl/NettyFileUpload.java index a2fe892d2..201639938 100644 --- a/src/main/java/io/vertx/core/http/impl/NettyFileUpload.java +++ b/src/main/java/io/vertx/core/http/impl/NettyFileUpload.java @@ -38,12 +38,11 @@ final class NettyFileUpload implements FileUpload, ReadStream { private Charset charset; private boolean completed; private long maxSize = -1; - private final HttpServerRequest request; - private final InboundBuffer pending; - private boolean ended; + private final InboundBuffer pending; private Handler endHandler; private Handler exceptionHandler; + private Handler dataHandler; NettyFileUpload(Context context, HttpServerRequest request, String name, String filename, String contentType, String contentTransferEncoding, Charset charset) { this.name = name; @@ -51,11 +50,22 @@ final class NettyFileUpload implements FileUpload, ReadStream { this.contentType = contentType; this.contentTransferEncoding = contentTransferEncoding; this.charset = charset; - this.request = request; - this.pending = new InboundBuffer(context) + this.pending = new InboundBuffer<>(context) .drainHandler(v -> request.resume()) - .emptyHandler(v -> checkComplete()); + .handler(buff -> { + if (buff == InboundBuffer.END_SENTINEL) { + Handler handler = endHandler(); + if (handler != null) { + handler.handle(null); + } + } else { + Handler handler = handler(); + if (handler != null) { + handler.handle((Buffer) buff); + } + } + }); } @Override @@ -64,9 +74,13 @@ final class NettyFileUpload implements FileUpload, ReadStream { return this; } + private Handler handler() { + return dataHandler; + } + @Override - public NettyFileUpload handler(Handler handler) { - pending.handler(handler); + public synchronized NettyFileUpload handler(Handler handler) { + dataHandler = handler; return this; } @@ -84,10 +98,13 @@ final class NettyFileUpload implements FileUpload, ReadStream { @Override public NettyFileUpload fetch(long amount) { pending.fetch(amount); - checkComplete(); return this; } + private synchronized Handler endHandler() { + return endHandler; + } + @Override public synchronized NettyFileUpload endHandler(Handler handler) { endHandler = handler; @@ -103,29 +120,7 @@ final class NettyFileUpload implements FileUpload, ReadStream { } private void end() { - synchronized (this) { - ended = true; - } - checkComplete(); - } - - private void checkComplete() { - synchronized (this) { - if (!pending.isEmpty() || pending.isPaused() || !ended) { - return; - } - } - notifyEndHandler(); - } - - private void notifyEndHandler() { - Handler handler; - synchronized (this) { - handler = endHandler; - } - if (handler != null) { - handler.handle(null); - } + pending.write(InboundBuffer.END_SENTINEL); } public void handleException(Throwable err) { diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java index 149e24361..403333570 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java @@ -36,7 +36,7 @@ abstract class VertxHttp2Stream { protected final ChannelHandlerContext handlerContext; protected final Http2Stream stream; - private final InboundBuffer pending; + private final InboundBuffer pending; private int pendingBytes; private MultiMap trailers; private boolean writable; @@ -58,13 +58,14 @@ abstract class VertxHttp2Stream { conn.handler.consume(stream, numBytes); }); - pending.handler(this::handleData); - pending.exceptionHandler(context.exceptionHandler()); - pending.emptyHandler(v -> { - if (trailers != null) { + pending.handler(buff -> { + if (buff == InboundBuffer.END_SENTINEL) { handleEnd(trailers); + } else { + handleData((Buffer) buff); } }); + pending.exceptionHandler(context.exceptionHandler()); pending.resume(); } @@ -95,10 +96,8 @@ abstract class VertxHttp2Stream { void onEnd(MultiMap map) { synchronized (conn) { trailers = map; - if (pending.isEmpty() && !pending.isPaused()) { - handleEnd(trailers); - } } + pending.write(InboundBuffer.END_SENTINEL); } int id() { @@ -110,11 +109,7 @@ abstract class VertxHttp2Stream { } public void doFetch(long amount) { - if (!pending.fetch(amount)) { - if (trailers != null) { - handleEnd(trailers); - } - } + pending.fetch(amount); } boolean isNotWritable() { diff --git a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java index ed3a9f241..1207117f5 100644 --- a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java @@ -73,6 +73,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { private Handler drainHandler; private InboundBuffer pending; private MessageConsumer registration; + private Handler messageHandler; private boolean closed; public NetSocketImpl(VertxInternal vertx, ChannelHandlerContext channel, ContextInternal context, @@ -87,10 +88,22 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { this.writeHandlerID = "__vertx.net." + UUID.randomUUID().toString(); this.remoteAddress = remoteAddress; this.metrics = metrics; + this.messageHandler = NULL_MSG_HANDLER; pending = new InboundBuffer<>(context); pending.drainHandler(v -> doResume()); - pending.handler(NULL_MSG_HANDLER); - pending.emptyHandler(v -> checkEnd()); + pending.handler(obj -> { + if (obj == InboundBuffer.END_SENTINEL) { + Handler handler = endHandler(); + if (handler != null) { + handler.handle(null); + } + } else { + Handler handler = messageHandler(); + if (handler != null) { + handler.handle(obj); + } + } + }); } synchronized void registerEventBusHandler() { @@ -175,13 +188,13 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { return this; } + private synchronized Handler messageHandler() { + return messageHandler; + } + @Override public synchronized NetSocketInternal messageHandler(Handler handler) { - if (handler != null) { - pending.handler(handler); - } else { - pending.handler(NULL_MSG_HANDLER); - } + messageHandler = handler; return this; } @@ -193,9 +206,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { @Override public NetSocket fetch(long amount) { - if (!pending.fetch(amount)) { - checkEnd(); - } + pending.fetch(amount); return this; } @@ -215,6 +226,10 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { return isNotWritable(); } + private synchronized Handler endHandler() { + return endHandler; + } + @Override public synchronized NetSocket endHandler(Handler endHandler) { this.endHandler = endHandler; @@ -349,25 +364,14 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { consumer = registration; registration = null; } - checkEnd(); + pending.write(InboundBuffer.END_SENTINEL); super.handleClosed(); if (consumer != null) { consumer.unregister(); } } - private void checkEnd() { - Handler handler; - synchronized (this) { - if (!closed || pending.isPaused() || (handler = endHandler) == null) { - return; - } - } - handler.handle(null); - } - - public synchronized void handleMessage(Object msg) { - checkContext(); + public void handleMessage(Object msg) { if (!pending.write(msg)) { doPause(); } diff --git a/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java b/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java index 55aa53920..18f2f4f6e 100644 --- a/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java +++ b/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java @@ -65,6 +65,11 @@ import java.util.Objects; */ public class InboundBuffer { + /** + * A reusable sentinel for signaling the end of a stream. + */ + public static final Object END_SENTINEL = new Object(); + private final Context context; private final ArrayDeque pending; private final long highWaterMark; diff --git a/src/test/java/io/vertx/core/http/Http1xTest.java b/src/test/java/io/vertx/core/http/Http1xTest.java index e390eac20..9a6cf35d5 100644 --- a/src/test/java/io/vertx/core/http/Http1xTest.java +++ b/src/test/java/io/vertx/core/http/Http1xTest.java @@ -1341,6 +1341,39 @@ public class Http1xTest extends HttpTest { await(); } + @Test + public void testPipeliningPauseRequest() throws Exception { + int n = 10; + client.close(); + client = vertx.createHttpClient(new HttpClientOptions().setKeepAlive(true).setPipelining(true).setMaxPoolSize(1)); + server.requestHandler(req -> { + AtomicBoolean paused = new AtomicBoolean(); + paused.set(true); + req.pause(); + req.bodyHandler(buff -> { + assertFalse(paused.get()); + req.response().end(); + }); + vertx.setTimer(30, id -> { + paused.set(false); + req.resume(); + }); + }); + startServer(); + AtomicInteger remaining = new AtomicInteger(n); + for (int i = 0;i < n;i++) { + HttpClientRequest req = client.put(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> { + resp.endHandler(v -> { + if (remaining.decrementAndGet() == 0) { + testComplete(); + } + }); + }); + req.end(TestUtils.randomAlphaString(16)); + } + await(); + } + @Test public void testServerPipeliningConnectionConcurrency() throws Exception { int n = 5; diff --git a/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java b/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java index 5cf05a102..e4c76e679 100644 --- a/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java +++ b/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java @@ -121,7 +121,12 @@ public abstract class HttpMetricsTestBase extends HttpTestBase { AsyncTestBase.assertWaitUntil(() -> metrics.endpoints().isEmpty()); assertEquals(null, metrics.connectionCount("localhost:8080")); AsyncTestBase.assertWaitUntil(() -> !serverMetric.get().socket.connected.get()); - AsyncTestBase.assertWaitUntil(() -> contentLength == serverMetric.get().socket.bytesRead.get()); + try { + AsyncTestBase.assertWaitUntil(() -> contentLength == serverMetric.get().socket.bytesRead.get()); + } catch (Exception e) { + System.out.println(contentLength + " == " + serverMetric.get().socket.bytesRead.get()); + throw e; + } AsyncTestBase.assertWaitUntil(() -> contentLength == serverMetric.get().socket.bytesWritten.get()); AsyncTestBase.assertWaitUntil(() -> !clientMetric.get().socket.connected.get()); assertEquals(contentLength, clientMetric.get().socket.bytesRead.get()); diff --git a/src/test/java/io/vertx/core/http/HttpTest.java b/src/test/java/io/vertx/core/http/HttpTest.java index 886ecd4a5..685f9b452 100644 --- a/src/test/java/io/vertx/core/http/HttpTest.java +++ b/src/test/java/io/vertx/core/http/HttpTest.java @@ -4276,7 +4276,93 @@ public abstract class HttpTest extends HttpTestBase { } @Test - public void testServerResponseCloseHandlerNotHoldingLock() throws Exception { + public void testEventHandlersNotHoldingLock() throws Exception { + waitFor(2); + server.requestHandler(req -> { + HttpConnection conn = req.connection(); + switch (req.path()) { + case "/0": + req.handler(chunk -> { + assertFalse(Thread.holdsLock(conn)); + }); + req.endHandler(v -> { + assertFalse(Thread.holdsLock(conn)); + req.response().end(TestUtils.randomAlphaString(256)); + }); + break; + case "/1": + AtomicBoolean paused = new AtomicBoolean(); + req.pause(); + paused.set(true); + vertx.runOnContext(v -> { + paused.set(false); + req.resume(); + }); + req.handler(chunk -> { + assertFalse(Thread.holdsLock(conn)); + assertFalse(paused.get()); + paused.set(true); + req.pause(); + vertx.runOnContext(v -> { + paused.set(false); + req.resume(); + }); + }); + req.endHandler(v -> { + assertFalse(Thread.holdsLock(conn)); + assertFalse(paused.get()); + req.response().end(TestUtils.randomAlphaString(256)); + }); + break; + } + }); + startServer(); + for (int i = 0;i < 2;i++) { + client.post(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/" + i, resp -> { + assertEquals(200, resp.statusCode()); + HttpConnection conn = resp.request().connection(); + switch (resp.request().path()) { + case "/0": + resp.handler(chunk -> { + assertFalse(Thread.holdsLock(conn)); + }); + resp.endHandler(v -> { + assertFalse(Thread.holdsLock(conn)); + complete(); + }); + break; + case "/1": + AtomicBoolean paused = new AtomicBoolean(); + resp.pause(); + paused.set(true); + vertx.runOnContext(v -> { + paused.set(false); + resp.resume(); + }); + resp.handler(chunk -> { + assertFalse(Thread.holdsLock(conn)); + assertFalse(paused.get()); + paused.set(true); + resp.pause(); + vertx.runOnContext(v -> { + paused.set(false); + resp.resume(); + }); + }); + resp.endHandler(v -> { + assertFalse(Thread.holdsLock(conn)); + assertFalse(paused.get()); + complete(); + }); + break; + } + }).end(TestUtils.randomAlphaString(256)); + } + await(); + } + + @Test + public void testEventHandlersNotHoldingLockOnClose() throws Exception { waitFor(7); server.requestHandler(req -> { HttpConnection conn = req.connection(); diff --git a/src/test/java/io/vertx/core/http/WebSocketTest.java b/src/test/java/io/vertx/core/http/WebSocketTest.java index dedbbffa2..fd9f2d327 100644 --- a/src/test/java/io/vertx/core/http/WebSocketTest.java +++ b/src/test/java/io/vertx/core/http/WebSocketTest.java @@ -928,33 +928,6 @@ public class WebSocketTest extends VertxTestBase { await(); } - @Test - public void testWriteFromConnectHandlerFromAnotherThread() { - Buffer expected = Buffer.buffer("AAA"); - server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT)); - server.websocketHandler(ws -> { - Thread t = new Thread() { - @Override - public void run() { - ws.writeFrame(WebSocketFrame.binaryFrame(expected, true)); - } - }; - t.start(); - while (t.getState() != Thread.State.BLOCKED) { - Thread.yield(); - } - }); - server.listen(onSuccess(server -> { - client.websocket(DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/", ws -> { - ws.handler(buff -> { - assertEquals(buff, expected); - testComplete(); - }); - }); - })); - await(); - } - @Test // Test normal negotiation of websocket compression public void testNormalWSDeflateFrameCompressionNegotiation() throws Exception { diff --git a/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java b/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java index 27633aa78..7c531f29f 100644 --- a/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java +++ b/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java @@ -262,7 +262,8 @@ public class MetricsTest extends VertxTestBase { fail("Should not receive message"); }); consumer.completionHandler(ar -> { - assertEquals(Collections.emptyList(), metrics.getRegistrations()); + List registrations = metrics.getRegistrations(); + assertEquals(Collections.emptyList(), registrations); testComplete(); }); consumer.unregister();