Connection should not process messages when in closed state - fixes #3249 - fixes #3245

This commit is contained in:
Julien Viet
2020-01-08 15:08:13 +01:00
parent 7b94a8d534
commit 39593e163f
6 changed files with 58 additions and 47 deletions

View File

@@ -718,7 +718,6 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
} else {
ws = (WebSocketImpl) wsRes.result();
ws.registerHandler(vertx.eventBus());
}
getContext().executeFromIO(wsRes, res -> {
if (res.succeeded()) {

View File

@@ -83,7 +83,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL
private Handler<GoAway> goAwayHandler;
private Handler<Void> shutdownHandler;
private Handler<Buffer> pingHandler;
private boolean closed;
private boolean goneAway;
private int windowSize;
private long maxConcurrentStreams;
@@ -108,9 +107,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL
@Override
public void handleClosed() {
synchronized (this) {
closed = true;
}
super.handleClosed();
}
@@ -124,10 +120,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL
super.handleIdle();
}
synchronized boolean isClosed() {
return closed;
}
synchronized void onConnectionError(Throwable cause) {
ArrayList<VertxHttp2Stream> copy;
synchronized (this) {

View File

@@ -25,11 +25,9 @@ import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.FileSystem;
import io.vertx.core.file.FileSystemException;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpHeaders;
@@ -41,13 +39,11 @@ import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.streams.WriteStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.NoSuchFileException;
import java.util.Map;
import static io.vertx.core.http.HttpHeaders.SET_COOKIE;
@@ -651,7 +647,9 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
@Override
public boolean closed() {
return conn.isClosed();
synchronized (conn) {
return closed;
}
}
@Override
@@ -689,11 +687,6 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
@Override
public void reset(long code) {
/*
if (!handleEnded(true)) {
throw new IllegalStateException("Response has already been written");
}
*/
checkValid();
stream.writeReset(code);
ctx.flush();

View File

@@ -15,6 +15,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.concurrent.EventExecutor;
import io.vertx.core.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
@@ -66,6 +67,7 @@ public abstract class ConnectionBase {
// State accessed exclusively from the event loop thread
private boolean read;
private boolean needsFlush;
private boolean closed;
protected ConnectionBase(VertxInternal vertx, ChannelHandlerContext chctx, ContextInternal context) {
this.vertx = vertx;
@@ -104,8 +106,9 @@ public abstract class ConnectionBase {
/**
* This method is exclusively called by {@code VertxHandler} to signal read on the event-loop thread.
*/
final void setRead() {
final boolean setRead() {
read = true;
return !closed;
}
/**
@@ -124,6 +127,45 @@ public abstract class ConnectionBase {
}
}
/**
* This method is exclusively called on the event-loop thread
*
* @param promise the promise receiving the completion event
*/
private void writeFlush(ChannelPromise promise) {
if (needsFlush) {
needsFlush = false;
chctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
} else {
promise.setSuccess();
}
}
/**
* This method is exclusively called on the event-loop thread
*
* @param handler the handler receiving the completion event
*/
private void writeClose(Handler<AsyncResult<Void>> handler) {
if (closed) {
if (handler != null) {
handler.handle(Future.succeededFuture());
}
return;
}
closed = true;
// make sure everything is flushed out on close
ChannelPromise promise = chctx
.newPromise()
.addListener((ChannelFutureListener) f -> {
ChannelFuture closeFut = chctx.channel().close();
if (handler != null) {
closeFut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler));
}
});
writeFlush(promise);
}
/**
* Provide a promise that will call the {@code handler} upon completion.
* When the {@code handler} is {@code null} {@link #voidPromise} is returned.
@@ -192,14 +234,9 @@ public abstract class ConnectionBase {
*/
public final void flush(ChannelPromise promise) {
if (chctx.executor().inEventLoop()) {
if (needsFlush) {
needsFlush = false;
chctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
} else {
promise.setSuccess();
}
writeFlush(promise);
} else {
chctx.executor().execute(() -> flush(promise));
chctx.executor().execute(() -> writeFlush(promise));
}
}
@@ -219,16 +256,12 @@ public abstract class ConnectionBase {
* Close the connection and notifies the {@code handler}
*/
public void close(Handler<AsyncResult<Void>> handler) {
// make sure everything is flushed out on close
ChannelPromise promise = chctx
.newPromise()
.addListener((ChannelFutureListener) f -> {
ChannelFuture closeFut = chctx.channel().close();
if (handler != null) {
closeFut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler));
}
});
flush(promise);
EventExecutor exec = chctx.executor();
if (exec.inEventLoop()) {
writeClose(handler);
} else {
exec.execute(() -> writeClose(handler));
}
}
public synchronized ConnectionBase closeHandler(Handler<Void> handler) {
@@ -307,6 +340,7 @@ public abstract class ConnectionBase {
}
protected void handleClosed() {
closed = true;
Handler<Void> handler;
synchronized (this) {
NetworkMetrics metrics = metrics();

View File

@@ -72,7 +72,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
private InboundBuffer<Object> pending;
private MessageConsumer registration;
private Handler<Object> messageHandler;
private boolean closed;
public NetSocketImpl(VertxInternal vertx, ChannelHandlerContext channel, ContextInternal context,
SSLHelper helper, TCPMetrics metrics) {
@@ -121,9 +120,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
@Override
public synchronized NetSocketInternal writeMessage(Object message) {
if (closed) {
throw new IllegalStateException("Socket is closed");
}
writeToChannel(message);
return this;
}
@@ -346,10 +342,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
protected void handleClosed() {
MessageConsumer consumer;
synchronized (this) {
if (closed) {
return;
}
closed = true;
consumer = registration;
registration = null;
}

View File

@@ -169,8 +169,9 @@ public final class VertxHandler<C extends ConnectionBase> extends ChannelDuplexH
@Override
public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exception {
conn.setRead();
context.executeFromIO(msg, messageHandler);
if (conn.setRead()) {
context.executeFromIO(msg, messageHandler);
}
}
@Override