From 39593e163fe664a7ce3669a25a79ee246465d0f7 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Wed, 8 Jan 2020 15:08:13 +0100 Subject: [PATCH] Connection should not process messages when in closed state - fixes #3249 - fixes #3245 --- .../http/impl/Http1xClientConnection.java | 1 - .../core/http/impl/Http2ConnectionBase.java | 8 --- .../http/impl/Http2ServerResponseImpl.java | 13 +--- .../vertx/core/net/impl/ConnectionBase.java | 70 ++++++++++++++----- .../io/vertx/core/net/impl/NetSocketImpl.java | 8 --- .../io/vertx/core/net/impl/VertxHandler.java | 5 +- 6 files changed, 58 insertions(+), 47 deletions(-) diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 3088fdc9b..deef8ca71 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -718,7 +718,6 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } else { ws = (WebSocketImpl) wsRes.result(); ws.registerHandler(vertx.eventBus()); - } getContext().executeFromIO(wsRes, res -> { if (res.succeeded()) { diff --git a/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java b/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java index 34b172006..bf7116c61 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java @@ -83,7 +83,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL private Handler goAwayHandler; private Handler shutdownHandler; private Handler pingHandler; - private boolean closed; private boolean goneAway; private int windowSize; private long maxConcurrentStreams; @@ -108,9 +107,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL @Override public void handleClosed() { - synchronized (this) { - closed = true; - } super.handleClosed(); } @@ -124,10 +120,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL super.handleIdle(); } - synchronized boolean isClosed() { - return closed; - } - synchronized void onConnectionError(Throwable cause) { ArrayList copy; synchronized (this) { diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java b/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java index 6563210da..83641412d 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java @@ -25,11 +25,9 @@ import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; -import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.AsyncFile; import io.vertx.core.file.FileSystem; -import io.vertx.core.file.FileSystemException; import io.vertx.core.file.OpenOptions; import io.vertx.core.http.Cookie; import io.vertx.core.http.HttpHeaders; @@ -41,13 +39,11 @@ import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.core.net.NetSocket; import io.vertx.core.net.impl.ConnectionBase; -import io.vertx.core.streams.WriteStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.file.NoSuchFileException; import java.util.Map; import static io.vertx.core.http.HttpHeaders.SET_COOKIE; @@ -651,7 +647,9 @@ public class Http2ServerResponseImpl implements HttpServerResponse { @Override public boolean closed() { - return conn.isClosed(); + synchronized (conn) { + return closed; + } } @Override @@ -689,11 +687,6 @@ public class Http2ServerResponseImpl implements HttpServerResponse { @Override public void reset(long code) { - /* - if (!handleEnded(true)) { - throw new IllegalStateException("Response has already been written"); - } - */ checkValid(); stream.writeReset(code); ctx.flush(); diff --git a/src/main/java/io/vertx/core/net/impl/ConnectionBase.java b/src/main/java/io/vertx/core/net/impl/ConnectionBase.java index 1b3205c26..baa394ba6 100644 --- a/src/main/java/io/vertx/core/net/impl/ConnectionBase.java +++ b/src/main/java/io/vertx/core/net/impl/ConnectionBase.java @@ -15,6 +15,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedFile; +import io.netty.util.concurrent.EventExecutor; import io.vertx.core.*; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; @@ -66,6 +67,7 @@ public abstract class ConnectionBase { // State accessed exclusively from the event loop thread private boolean read; private boolean needsFlush; + private boolean closed; protected ConnectionBase(VertxInternal vertx, ChannelHandlerContext chctx, ContextInternal context) { this.vertx = vertx; @@ -104,8 +106,9 @@ public abstract class ConnectionBase { /** * This method is exclusively called by {@code VertxHandler} to signal read on the event-loop thread. */ - final void setRead() { + final boolean setRead() { read = true; + return !closed; } /** @@ -124,6 +127,45 @@ public abstract class ConnectionBase { } } + /** + * This method is exclusively called on the event-loop thread + * + * @param promise the promise receiving the completion event + */ + private void writeFlush(ChannelPromise promise) { + if (needsFlush) { + needsFlush = false; + chctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise); + } else { + promise.setSuccess(); + } + } + + /** + * This method is exclusively called on the event-loop thread + * + * @param handler the handler receiving the completion event + */ + private void writeClose(Handler> handler) { + if (closed) { + if (handler != null) { + handler.handle(Future.succeededFuture()); + } + return; + } + closed = true; + // make sure everything is flushed out on close + ChannelPromise promise = chctx + .newPromise() + .addListener((ChannelFutureListener) f -> { + ChannelFuture closeFut = chctx.channel().close(); + if (handler != null) { + closeFut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler)); + } + }); + writeFlush(promise); + } + /** * Provide a promise that will call the {@code handler} upon completion. * When the {@code handler} is {@code null} {@link #voidPromise} is returned. @@ -192,14 +234,9 @@ public abstract class ConnectionBase { */ public final void flush(ChannelPromise promise) { if (chctx.executor().inEventLoop()) { - if (needsFlush) { - needsFlush = false; - chctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise); - } else { - promise.setSuccess(); - } + writeFlush(promise); } else { - chctx.executor().execute(() -> flush(promise)); + chctx.executor().execute(() -> writeFlush(promise)); } } @@ -219,16 +256,12 @@ public abstract class ConnectionBase { * Close the connection and notifies the {@code handler} */ public void close(Handler> handler) { - // make sure everything is flushed out on close - ChannelPromise promise = chctx - .newPromise() - .addListener((ChannelFutureListener) f -> { - ChannelFuture closeFut = chctx.channel().close(); - if (handler != null) { - closeFut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler)); - } - }); - flush(promise); + EventExecutor exec = chctx.executor(); + if (exec.inEventLoop()) { + writeClose(handler); + } else { + exec.execute(() -> writeClose(handler)); + } } public synchronized ConnectionBase closeHandler(Handler handler) { @@ -307,6 +340,7 @@ public abstract class ConnectionBase { } protected void handleClosed() { + closed = true; Handler handler; synchronized (this) { NetworkMetrics metrics = metrics(); diff --git a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java index 8f9d05ccf..16284a33b 100644 --- a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java @@ -72,7 +72,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { private InboundBuffer pending; private MessageConsumer registration; private Handler messageHandler; - private boolean closed; public NetSocketImpl(VertxInternal vertx, ChannelHandlerContext channel, ContextInternal context, SSLHelper helper, TCPMetrics metrics) { @@ -121,9 +120,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { @Override public synchronized NetSocketInternal writeMessage(Object message) { - if (closed) { - throw new IllegalStateException("Socket is closed"); - } writeToChannel(message); return this; } @@ -346,10 +342,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { protected void handleClosed() { MessageConsumer consumer; synchronized (this) { - if (closed) { - return; - } - closed = true; consumer = registration; registration = null; } diff --git a/src/main/java/io/vertx/core/net/impl/VertxHandler.java b/src/main/java/io/vertx/core/net/impl/VertxHandler.java index a9656e6d0..6dd972993 100644 --- a/src/main/java/io/vertx/core/net/impl/VertxHandler.java +++ b/src/main/java/io/vertx/core/net/impl/VertxHandler.java @@ -169,8 +169,9 @@ public final class VertxHandler extends ChannelDuplexH @Override public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exception { - conn.setRead(); - context.executeFromIO(msg, messageHandler); + if (conn.setRead()) { + context.executeFromIO(msg, messageHandler); + } } @Override