Connection should not process messages when in closed state - see #3249

This commit is contained in:
Julien Viet
2020-01-08 14:02:11 +01:00
parent 5c1354f3bb
commit 3b685e70b3
5 changed files with 61 additions and 42 deletions

View File

@@ -87,7 +87,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL
private Handler<GoAway> goAwayHandler;
private Handler<Void> shutdownHandler;
private Handler<Buffer> pingHandler;
private boolean closed;
private boolean goneAway;
private int windowSize;
private long maxConcurrentStreams;
@@ -113,9 +112,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL
@Override
public void handleClosed() {
synchronized (this) {
closed = true;
}
super.handleClosed();
}
@@ -129,10 +125,6 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL
super.handleIdle();
}
synchronized boolean isClosed() {
return closed;
}
synchronized void onConnectionError(Throwable cause) {
ArrayList<VertxHttp2Stream> streams = new ArrayList<>();
try {

View File

@@ -59,6 +59,7 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
private boolean chunked;
private boolean headWritten;
private boolean ended;
private boolean closed;
private Map<String, ServerCookie> cookies;
private HttpResponseStatus status = HttpResponseStatus.OK;
private String statusMessage; // Not really used but we keep the message for the getStatusMessage()
@@ -106,6 +107,7 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
Handler<Void> endHandler;
Handler<Void> closeHandler;
synchronized (conn) {
closed = true;
boolean failed = !ended;
exceptionHandler = failed ? this.exceptionHandler : null;
endHandler = failed ? this.endHandler : null;
@@ -619,8 +621,10 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
}
@Override
public boolean closed() {
return conn.isClosed();
public synchronized boolean closed() {
synchronized (conn) {
return closed;
}
}
@Override

View File

@@ -15,6 +15,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.*;
import io.vertx.core.impl.ContextInternal;
@@ -70,6 +71,7 @@ public abstract class ConnectionBase {
// State accessed exclusively from the event loop thread
private boolean read;
private boolean needsFlush;
private boolean closed;
protected ConnectionBase(VertxInternal vertx, ChannelHandlerContext chctx, ContextInternal context) {
this.vertx = vertx;
@@ -106,10 +108,13 @@ public abstract class ConnectionBase {
}
/**
* This method is exclusively called by {@code VertxHandler} to signal read on the event-loop thread.
* This method is exclusively called by {@code VertxHandler} to read a message on the event-loop thread.
*/
final void setRead() {
final void read(Object msg) {
read = true;
if (!closed) {
handleMessage(msg);
}
}
/**
@@ -131,6 +136,40 @@ public abstract class ConnectionBase {
}
}
/**
* This method is exclusively called on the event-loop thread
*
* @param promise the promise receiving the completion event
*/
private void writeFlush(ChannelPromise promise) {
if (needsFlush) {
needsFlush = false;
chctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
} else {
promise.setSuccess();
}
}
/**
* This method is exclusively called on the event-loop thread
*
* @param promise the promise receiving the completion event
*/
private void writeClose(PromiseInternal<Void> promise) {
if (closed) {
promise.complete();
return;
}
closed = true;
// Make sure everything is flushed out on close
ChannelPromise channelPromise = chctx
.newPromise()
.addListener((ChannelFutureListener) f -> {
chctx.channel().close().addListener(promise);
});
writeFlush(channelPromise);
}
protected void reportsBytesWritten(Object msg) {
}
@@ -187,15 +226,11 @@ public abstract class ConnectionBase {
* @param promise the promise resolved when flush occurred
*/
public final void flush(ChannelPromise promise) {
if (chctx.executor().inEventLoop()) {
if (needsFlush) {
needsFlush = false;
chctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
} else {
promise.setSuccess();
}
EventExecutor exec = chctx.executor();
if (exec.inEventLoop()) {
writeFlush(promise);
} else {
chctx.executor().execute(() -> flush(promise));
exec.execute(() -> writeFlush(promise));
}
}
@@ -209,13 +244,12 @@ public abstract class ConnectionBase {
*/
public Future<Void> close() {
PromiseInternal<Void> promise = context.promise();
// make sure everything is flushed out on close
ChannelPromise channelPromise = chctx
.newPromise()
.addListener((ChannelFutureListener) f -> {
chctx.channel().close().addListener(promise);
});
flush(channelPromise);
EventExecutor exec = chctx.executor();
if (exec.inEventLoop()) {
writeClose(promise);
} else {
exec.execute(() -> writeClose(promise));
}
return promise.future();
}
@@ -301,6 +335,7 @@ public abstract class ConnectionBase {
}
protected void handleClosed() {
closed = true;
NetworkMetrics metrics = metrics();
if (metrics instanceof TCPMetrics) {
((TCPMetrics) metrics).disconnected(metric(), remoteAddress());
@@ -465,6 +500,6 @@ public abstract class ConnectionBase {
return address;
}
public void handleMessage(Object msg) {
protected void handleMessage(Object msg) {
}
}

View File

@@ -17,7 +17,6 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.handler.ssl.SniHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
@@ -74,7 +73,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
private Handler<Void> drainHandler;
private MessageConsumer registration;
private Handler<Object> messageHandler;
private boolean closed;
public NetSocketImpl(VertxInternal vertx, ChannelHandlerContext channel, ContextInternal context,
SSLHelper helper, TCPMetrics metrics) {
@@ -123,9 +121,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
@Override
public synchronized Future<Void> writeMessage(Object message) {
if (closed) {
throw new IllegalStateException("Socket is closed");
}
Promise<Void> promise = context.promise();
writeMessage(message, promise);
return promise.future();
@@ -133,9 +128,6 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
@Override
public NetSocketInternal writeMessage(Object message, Handler<AsyncResult<Void>> handler) {
if (closed) {
throw new IllegalStateException("Socket is closed");
}
writeToChannel(message, handler == null ? null : context.promise(handler));
return this;
}
@@ -364,13 +356,10 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
protected void handleClosed() {
MessageConsumer consumer;
synchronized (this) {
if (closed) {
return;
}
closed = true;
consumer = registration;
registration = null;
}
// Should be done with dispatch....
pending.write(InboundBuffer.END_SENTINEL);
super.handleClosed();
if (consumer != null) {

View File

@@ -153,8 +153,7 @@ public final class VertxHandler<C extends ConnectionBase> extends ChannelDuplexH
@Override
public void channelRead(ChannelHandlerContext chctx, Object msg) {
conn.setRead();
conn.handleMessage(msg);
conn.read(msg);
}
@Override