Make sure that all HTTP/1 client connection writes are serialized without reordering due to back-pressure

This commit is contained in:
Julien Viet
2019-12-05 12:58:33 +01:00
parent 24368ac06e
commit d663a5c7fb
2 changed files with 61 additions and 58 deletions

View File

@@ -24,6 +24,7 @@ import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensio
import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFrameClientExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateClientExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
@@ -324,21 +325,15 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
request = new AssembledHttpRequest(request, buf);
}
}
beginRequest(request, handler == null ? null : context.promise(handler));
writeHead(request, handler == null ? null : context.promise(handler));
}
private void beginRequest(HttpRequest request, Handler<AsyncResult<Void>> handler) {
private void writeHead(HttpRequest request, Handler<AsyncResult<Void>> handler) {
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
conn.beginRequest(this, request, handler);
} else {
eventLoop.execute(() -> conn.beginRequest(this, request, handler));
}
}
void handleChunk(Buffer buff) {
if (!queue.write(buff)) {
conn.doPause();
eventLoop.execute(() -> writeHead(request, handler));
}
}
@@ -357,9 +352,18 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
} else {
msg = new DefaultHttpContent(buff);
}
conn.writeToChannel(msg, handler == null ? null : context.promise(handler));
if (end) {
endRequest();
writeBuffer(msg, handler == null ? null : context.promise(handler));
}
private void writeBuffer(HttpContent content, FutureListener<Void> listener) {
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
conn.writeToChannel(content, listener);
if (content instanceof LastHttpContent) {
conn.endRequest(this);
}
} else {
eventLoop.execute(() -> writeBuffer(content, listener));
}
}
@@ -399,29 +403,16 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
handleException(cause);
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
doReset();
reset();
} else {
eventLoop.execute(this::doReset);
eventLoop.execute(this::reset);
}
}
private void doReset() {
private void reset() {
conn.resetRequest(this);
}
private void endRequest() {
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
doEndRequest();
} else {
eventLoop.execute(this::doEndRequest);
}
}
private void doEndRequest() {
conn.endRequest(this);
}
@Override
public StreamPriority priority() {
return null;
@@ -451,6 +442,12 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
request.handleResponse(response);
}
void handleChunk(Buffer buff) {
if (!queue.write(buff)) {
conn.doPause();
}
}
void handleEnd(LastHttpContent trailer) {
queue.write(new HeadersAdaptor(trailer.trailingHeaders()));
}

View File

@@ -22,13 +22,15 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.dns.AddressResolverOptions;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.http.impl.HeadersAdaptor;
import io.vertx.core.http.impl.HttpClientImpl;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.*;
import io.vertx.core.streams.Pump;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakestream.FakeStream;
import io.vertx.test.netty.TestLoggerFactory;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -5113,35 +5115,6 @@ public abstract class HttpTest extends HttpTestBase {
await();
}
// This test check that ending an HttpClientRequest will not hold a lock when sending Netty messages
// holding suck lock might deadlock when the ChannelOutboundBuffer is full and becomes drained
// doing an HttpClientRequest reentrant during the drain
@Repeat(times = 30)
@Test
public void testClientRequestEndDeadlock() throws Exception {
server.requestHandler(req -> req.endHandler(v -> req.response().end()));
startServer(testAddress);
Context ctx = vertx.getOrCreateContext();
ctx.runOnContext(v1 -> {
HttpClientRequest request = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, onSuccess(resp -> {
resp.endHandler(v2 -> {
testComplete();
});
}))
.setChunked(true);
new Thread(() -> {
Buffer s = randomBuffer(256);
while (!request.writeQueueFull()) {
request.write(s);
}
ctx.runOnContext(v2 -> {
request.end();
});
}).start();
});
await();
}
@Test
public void testServerResponseWriteSuccess() throws Exception {
testServerResponseWriteSuccess((resp, handler) -> resp.write(TestUtils.randomBuffer(1024), handler));
@@ -5646,4 +5619,37 @@ public abstract class HttpTest extends HttpTestBase {
}));
await();
}
@Test
public void testClientRequestWithLargeBodyInSmallChunks() throws Exception {
StringBuilder sb = new StringBuilder();
FakeStream<Buffer> src = new FakeStream<>();
src.pause();
int numChunks = 1024;
int chunkLength = 1024;
for (int i = 0;i < numChunks;i++) {
String chunk = randomAlphaString(chunkLength);
sb.append(chunk);
src.write(Buffer.buffer(chunk));
}
src.end();
String expected = sb.toString();
waitFor(2);
server.requestHandler(req -> {
req.bodyHandler(body -> {
assertEquals(HttpMethod.PUT, req.method());
assertEquals(Buffer.buffer(expected), body);
complete();
req.response().end();
});
});
startServer();
HttpClientRequest stream = client.put(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, onSuccess(resp -> {
assertEquals(200, resp.statusCode());
complete();
}));
stream.putHeader(HttpHeaders.CONTENT_LENGTH, "" + numChunks * chunkLength);
src.pipeTo(stream);
await();
}
}