mirror of
https://github.com/jlengrand/vert.x.git
synced 2026-03-10 08:51:19 +00:00
Properly redeliver accumulated messages in the pipeline during a client NetSocket upgrade
This commit is contained in:
@@ -67,6 +67,10 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(Http1xClientConnection.class);
|
||||
|
||||
private static final Handler<Object> INVALID_MSG_HANDLER = msg -> {
|
||||
throw new IllegalStateException("Invalid object " + msg);
|
||||
};
|
||||
|
||||
private final ConnectionListener<HttpClientConnection> listener;
|
||||
private final HttpClientImpl client;
|
||||
private final HttpClientOptions options;
|
||||
@@ -78,7 +82,8 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
|
||||
|
||||
private StreamImpl head;
|
||||
private StreamImpl requestInProgress;
|
||||
|
||||
|
||||
private Handler<Object> invalidMessageHandler = INVALID_MSG_HANDLER;
|
||||
private boolean close;
|
||||
private Promise<NetSocket> netSocketPromise;
|
||||
private int keepAliveTimeout;
|
||||
@@ -509,7 +514,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
|
||||
} else if (msg instanceof WebSocketFrame) {
|
||||
handleWsFrame((WebSocketFrame) msg);
|
||||
} else {
|
||||
throw new IllegalStateException("Invalid object " + msg);
|
||||
invalidMessageHandler.handle(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -568,6 +573,11 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
|
||||
if (inflater != null) {
|
||||
pipeline.remove(inflater);
|
||||
}
|
||||
|
||||
// removing this codec might fire pending buffers in the HTTP decoder
|
||||
// this happens when the channel reads the HTTP response and the following data in a single buffer
|
||||
Deque<Object> pending = new ArrayDeque<>();
|
||||
invalidMessageHandler = pending::add;
|
||||
pipeline.remove("codec");
|
||||
|
||||
// replace the old handler with one that handle plain sockets
|
||||
@@ -583,11 +593,13 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
|
||||
socket.metric(metric());
|
||||
pipeline.replace("handler", "handler", VertxHandler.create(socket));
|
||||
|
||||
// removing this codec might fire pending buffers in the HTTP decoder
|
||||
// this happens when the channel reads the HTTP response and the following data in a single buffer
|
||||
|
||||
// Handle back response
|
||||
promise.complete(socket);
|
||||
|
||||
// Redeliver pending messages
|
||||
for (Object msg : pending) {
|
||||
pipeline.fireChannelRead(msg);
|
||||
}
|
||||
} else {
|
||||
promise.fail("Server responded with " + response.statusCode() + " code instead of 200");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user