From 3b685e70b3635a2ae5830efd52dbabe673a7cdcd Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Wed, 8 Jan 2020 14:02:11 +0100 Subject: [PATCH] Connection should not process messages when in closed state - see #3249 --- .../core/http/impl/Http2ConnectionBase.java | 8 --- .../http/impl/Http2ServerResponseImpl.java | 8 ++- .../vertx/core/net/impl/ConnectionBase.java | 71 ++++++++++++++----- .../io/vertx/core/net/impl/NetSocketImpl.java | 13 +--- .../io/vertx/core/net/impl/VertxHandler.java | 3 +- 5 files changed, 61 insertions(+), 42 deletions(-) diff --git a/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java b/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java index 7bc3f5788..5296b063a 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java @@ -87,7 +87,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL private Handler goAwayHandler; private Handler shutdownHandler; private Handler pingHandler; - private boolean closed; private boolean goneAway; private int windowSize; private long maxConcurrentStreams; @@ -113,9 +112,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL @Override public void handleClosed() { - synchronized (this) { - closed = true; - } super.handleClosed(); } @@ -129,10 +125,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL super.handleIdle(); } - synchronized boolean isClosed() { - return closed; - } - synchronized void onConnectionError(Throwable cause) { ArrayList streams = new ArrayList<>(); try { diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java b/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java index a42dfbb9c..d66c1eed6 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java @@ -59,6 +59,7 @@ public class Http2ServerResponseImpl implements HttpServerResponse { private boolean chunked; private boolean headWritten; private boolean ended; + private boolean closed; private Map cookies; private HttpResponseStatus status = HttpResponseStatus.OK; private String statusMessage; // Not really used but we keep the message for the getStatusMessage() @@ -106,6 +107,7 @@ public class Http2ServerResponseImpl implements HttpServerResponse { Handler endHandler; Handler closeHandler; synchronized (conn) { + closed = true; boolean failed = !ended; exceptionHandler = failed ? this.exceptionHandler : null; endHandler = failed ? this.endHandler : null; @@ -619,8 +621,10 @@ public class Http2ServerResponseImpl implements HttpServerResponse { } @Override - public boolean closed() { - return conn.isClosed(); + public synchronized boolean closed() { + synchronized (conn) { + return closed; + } } @Override diff --git a/src/main/java/io/vertx/core/net/impl/ConnectionBase.java b/src/main/java/io/vertx/core/net/impl/ConnectionBase.java index 02d36d1af..ceaab2ddd 100644 --- a/src/main/java/io/vertx/core/net/impl/ConnectionBase.java +++ b/src/main/java/io/vertx/core/net/impl/ConnectionBase.java @@ -15,6 +15,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedFile; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.FutureListener; import io.vertx.core.*; import io.vertx.core.impl.ContextInternal; @@ -70,6 +71,7 @@ public abstract class ConnectionBase { // State accessed exclusively from the event loop thread private boolean read; private boolean needsFlush; + private boolean closed; protected ConnectionBase(VertxInternal vertx, ChannelHandlerContext chctx, ContextInternal context) { this.vertx = vertx; @@ -106,10 +108,13 @@ public abstract class ConnectionBase { } /** - * This method is exclusively called by {@code VertxHandler} to signal read on the event-loop thread. + * This method is exclusively called by {@code VertxHandler} to read a message on the event-loop thread. */ - final void setRead() { + final void read(Object msg) { read = true; + if (!closed) { + handleMessage(msg); + } } /** @@ -131,6 +136,40 @@ public abstract class ConnectionBase { } } + /** + * This method is exclusively called on the event-loop thread + * + * @param promise the promise receiving the completion event + */ + private void writeFlush(ChannelPromise promise) { + if (needsFlush) { + needsFlush = false; + chctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise); + } else { + promise.setSuccess(); + } + } + + /** + * This method is exclusively called on the event-loop thread + * + * @param promise the promise receiving the completion event + */ + private void writeClose(PromiseInternal promise) { + if (closed) { + promise.complete(); + return; + } + closed = true; + // Make sure everything is flushed out on close + ChannelPromise channelPromise = chctx + .newPromise() + .addListener((ChannelFutureListener) f -> { + chctx.channel().close().addListener(promise); + }); + writeFlush(channelPromise); + } + protected void reportsBytesWritten(Object msg) { } @@ -187,15 +226,11 @@ public abstract class ConnectionBase { * @param promise the promise resolved when flush occurred */ public final void flush(ChannelPromise promise) { - if (chctx.executor().inEventLoop()) { - if (needsFlush) { - needsFlush = false; - chctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise); - } else { - promise.setSuccess(); - } + EventExecutor exec = chctx.executor(); + if (exec.inEventLoop()) { + writeFlush(promise); } else { - chctx.executor().execute(() -> flush(promise)); + exec.execute(() -> writeFlush(promise)); } } @@ -209,13 +244,12 @@ public abstract class ConnectionBase { */ public Future close() { PromiseInternal promise = context.promise(); - // make sure everything is flushed out on close - ChannelPromise channelPromise = chctx - .newPromise() - .addListener((ChannelFutureListener) f -> { - chctx.channel().close().addListener(promise); - }); - flush(channelPromise); + EventExecutor exec = chctx.executor(); + if (exec.inEventLoop()) { + writeClose(promise); + } else { + exec.execute(() -> writeClose(promise)); + } return promise.future(); } @@ -301,6 +335,7 @@ public abstract class ConnectionBase { } protected void handleClosed() { + closed = true; NetworkMetrics metrics = metrics(); if (metrics instanceof TCPMetrics) { ((TCPMetrics) metrics).disconnected(metric(), remoteAddress()); @@ -465,6 +500,6 @@ public abstract class ConnectionBase { return address; } - public void handleMessage(Object msg) { + protected void handleMessage(Object msg) { } } diff --git a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java index 2fde75f86..2bcee29b2 100644 --- a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java @@ -17,7 +17,6 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; -import io.netty.channel.ChannelPromise; import io.netty.handler.ssl.SniHandler; import io.netty.handler.ssl.SslHandler; import io.netty.util.CharsetUtil; @@ -74,7 +73,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { private Handler drainHandler; private MessageConsumer registration; private Handler messageHandler; - private boolean closed; public NetSocketImpl(VertxInternal vertx, ChannelHandlerContext channel, ContextInternal context, SSLHelper helper, TCPMetrics metrics) { @@ -123,9 +121,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { @Override public synchronized Future writeMessage(Object message) { - if (closed) { - throw new IllegalStateException("Socket is closed"); - } Promise promise = context.promise(); writeMessage(message, promise); return promise.future(); @@ -133,9 +128,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { @Override public NetSocketInternal writeMessage(Object message, Handler> handler) { - if (closed) { - throw new IllegalStateException("Socket is closed"); - } writeToChannel(message, handler == null ? null : context.promise(handler)); return this; } @@ -364,13 +356,10 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { protected void handleClosed() { MessageConsumer consumer; synchronized (this) { - if (closed) { - return; - } - closed = true; consumer = registration; registration = null; } + // Should be done with dispatch.... pending.write(InboundBuffer.END_SENTINEL); super.handleClosed(); if (consumer != null) { diff --git a/src/main/java/io/vertx/core/net/impl/VertxHandler.java b/src/main/java/io/vertx/core/net/impl/VertxHandler.java index 30d317423..829519f95 100644 --- a/src/main/java/io/vertx/core/net/impl/VertxHandler.java +++ b/src/main/java/io/vertx/core/net/impl/VertxHandler.java @@ -153,8 +153,7 @@ public final class VertxHandler extends ChannelDuplexH @Override public void channelRead(ChannelHandlerContext chctx, Object msg) { - conn.setRead(); - conn.handleMessage(msg); + conn.read(msg); } @Override