Reproducer

This commit is contained in:
Julien Viet
2020-02-17 20:50:01 +01:00
parent 5b91d39d7a
commit acee43c53e
4 changed files with 143 additions and 9 deletions

View File

@@ -11,11 +11,14 @@
package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.*;
import io.netty.util.concurrent.EventExecutor;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
@@ -32,6 +35,8 @@ import io.vertx.core.net.SocketAddress;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
/**
* An HTTP/2 connection in clear text that upgraded from an HTTP/1 upgrade.
@@ -85,6 +90,8 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
private Promise<NetSocket> netSocketPromise;
private Http1xClientConnection conn;
private HttpClientStream stream;
private long pendingSize = 0;
private List<Object> pending = new ArrayList<>();
UpgradingStream(HttpClientStream stream, HttpClientRequestImpl request, Promise<NetSocket> netSocketPromise, Http1xClientConnection conn) {
this.conn = conn;
@@ -113,6 +120,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
Handler<AsyncResult<Void>> listener) {
ChannelPipeline pipeline = conn.channel().pipeline();
HttpClientCodec httpCodec = pipeline.get(HttpClientCodec.class);
class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
@@ -130,13 +138,15 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
pipeline.remove(this);
HttpResponse resp = (HttpResponse) msg;
if (resp.status() != HttpResponseStatus.SWITCHING_PROTOCOLS) {
// Insert the cloe headers to let the HTTP/1 stream close the connection
// Insert the close headers to let the HTTP/1 stream close the connection
resp.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
}
}
super.channelRead(ctx, msg);
}
}
VertxHttp2ClientUpgradeCodec upgradeCodec = new VertxHttp2ClientUpgradeCodec(client.getOptions().getInitialSettings()) {
@Override
public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception {
@@ -167,10 +177,73 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
handler.clientUpgrade(ctx);
}
};
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536);
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536) {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (pending != null) {
// Buffer all messages received from the server until the HTTP request is fully sent.
//
// Explanation:
//
// It is necessary that the client only starts to process the response when the request
// has been fully sent because the current HTTP2 implementation will not be able to process
// the server preface until the client preface has been sent.
//
// Adding the VertxHttp2ConnectionHandler to the pipeline has two effects:
// - it is required to process the server preface
// - it will send the request preface to the server
//
// As we are adding this handler to the pipeline when we receive the 101 response from the server
// this might send the client preface before the initial HTTP request (doing the upgrade) is fully sent
// resulting in corrupting the protocol (the server might interpret it as an corrupted connection preface).
//
// Therefore we must buffer all pending messages until the request is fully sent.
int maxContent = maxContentLength();
boolean lower = pendingSize < maxContent;
if (msg instanceof ByteBufHolder) {
pendingSize += ((ByteBufHolder)msg).content().readableBytes();
} else if (msg instanceof ByteBuf) {
pendingSize += ((ByteBuf)msg).readableBytes();
}
if (pendingSize >= maxContent) {
if (lower) {
pending.clear();
ctx.fireExceptionCaught(new TooLongFrameException("Max content exceeded " + maxContentLength() + " bytes."));
}
return;
}
pending.add(msg);
} else {
super.channelRead(ctx, msg);
}
}
};
pipeline.addAfter("codec", null, new UpgradeRequestHandler());
pipeline.addAfter("codec", null, upgradeHandler);
stream.writeHead(method, uri, headers, authority, chunked, buf, end, priority, listener);
Runnable task = () -> {
stream.writeHead(method, uri, headers, authority, chunked, buf, end, priority, listener);
if (end) {
end();
}
};
EventExecutor exec = conn.channelHandlerContext().executor();
if (exec.inEventLoop()) {
task.run();
} else {
exec.execute(task);
}
}
private void end() {
// Deliver pending messages to the handler
List<Object> messages = pending;
pending = null;
ChannelHandlerContext context = conn.channelHandlerContext().pipeline().context("codec");
for (Object message : messages) {
context.fireChannelRead(message);
}
}
@Override
@@ -195,7 +268,15 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
@Override
public void writeBuffer(ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler) {
stream.writeBuffer(buf, end, handler);
EventExecutor exec = conn.channelHandlerContext().executor();
if (exec.inEventLoop()) {
stream.writeBuffer(buf, end, handler);
if (end) {
end();
}
} else {
exec.execute(() -> writeBuffer(buf, end, handler));
}
}
@Override

View File

@@ -537,12 +537,12 @@ public class HttpClientImpl implements HttpClient, MetricsProvider {
@Override
public HttpClientRequest request(SocketAddress serverAddress, RequestOptions options) {
return createRequest(options.getMethod(), serverAddress, options.getHost(), options.getPort(), options.isSsl(), options.getURI(), null);
return createRequest(options.getMethod(), serverAddress, getHost(options.getHost()), getPort(options.getPort()), options.isSsl(), options.getURI(), null);
}
@Override
public HttpClientRequest request(RequestOptions options) {
return createRequest(options.getMethod(), null, options.getHost(), options.getPort(), options.isSsl(), options.getURI(), options.getHeaders());
return createRequest(options.getMethod(), null, getHost(options.getHost()), getPort(options.getPort()), options.isSsl(), options.getURI(), options.getHeaders());
}
@Override

View File

@@ -1617,7 +1617,7 @@ public class Http2ClientTest extends Http2TestBase {
}
private void testIdleTimeout(HttpServerOptions serverOptions, HttpClientOptions clientOptions) throws Exception {
waitFor(4);
waitFor(3);
server.close();
server = vertx.createHttpServer(serverOptions);
server.requestHandler(req -> {

View File

@@ -12,14 +12,13 @@
package io.vertx.core.http;
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.Http2ServerConnection;
import io.vertx.core.net.OpenSSLEngineOptions;
import io.vertx.test.core.AsyncTestBase;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.TestUtils;
import io.vertx.test.tls.Cert;
import org.junit.Test;
@@ -782,4 +781,58 @@ public class Http2Test extends HttpTest {
await();
}
@Test
public void testClearTextUpgradeWithBody() throws Exception {
server.close();
server = vertx.createHttpServer().requestHandler(req -> {
req.bodyHandler(body -> req.response().end(body));
});
startServer(testAddress);
client.close();
client = vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2));
client.connectionHandler(conn -> {
conn.goAwayHandler(ga -> {
assertEquals(0, ga.getErrorCode());
});
});
Buffer payload = Buffer.buffer("some-data");
HttpClientRequest req = client.request(testAddress, new RequestOptions().setSsl(false));
req.setHandler(onSuccess(response -> {
response.body(onSuccess(body -> {
assertEquals(Buffer.buffer().appendBuffer(payload).appendBuffer(payload), body);
testComplete();
}));
}));
req.putHeader("Content-Length", "" + payload.length() * 2);
req.exceptionHandler(this::fail);
req.write(payload);
Thread.sleep(1000);
req.end(payload);
await();
}
@Test
public void testClearTextUpgradeWithBodyTooLongFrameResponse() throws Exception {
server.close();
Buffer buffer = TestUtils.randomBuffer(1024);
server = vertx.createHttpServer().requestHandler(req -> {
req.response().setChunked(true);
vertx.setPeriodic(1, id -> {
req.response().write(buffer);
});
});
startServer(testAddress);
client.close();
client = vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2));
HttpClientRequest req = client.request(testAddress, new RequestOptions().setSsl(false));
req.setHandler(onFailure(err -> {}));
req.setChunked(true);
req.exceptionHandler(err -> {
if (err instanceof TooLongFrameException) {
testComplete();
}
});
req.sendHead();
await();
}
}