diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index ed2936b7f..93278a12d 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensio import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFrameClientExtensionHandshaker; import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateClientExtensionHandshaker; import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker; +import io.netty.util.concurrent.FutureListener; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.*; @@ -324,21 +325,15 @@ class Http1xClientConnection extends Http1xConnectionBase impleme request = new AssembledHttpRequest(request, buf); } } - beginRequest(request, handler == null ? null : context.promise(handler)); + writeHead(request, handler == null ? null : context.promise(handler)); } - private void beginRequest(HttpRequest request, Handler> handler) { + private void writeHead(HttpRequest request, Handler> handler) { EventLoop eventLoop = conn.context.nettyEventLoop(); if (eventLoop.inEventLoop()) { conn.beginRequest(this, request, handler); } else { - eventLoop.execute(() -> conn.beginRequest(this, request, handler)); - } - } - - void handleChunk(Buffer buff) { - if (!queue.write(buff)) { - conn.doPause(); + eventLoop.execute(() -> writeHead(request, handler)); } } @@ -357,9 +352,18 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } else { msg = new DefaultHttpContent(buff); } - conn.writeToChannel(msg, handler == null ? null : context.promise(handler)); - if (end) { - endRequest(); + writeBuffer(msg, handler == null ? null : context.promise(handler)); + } + + private void writeBuffer(HttpContent content, FutureListener listener) { + EventLoop eventLoop = conn.context.nettyEventLoop(); + if (eventLoop.inEventLoop()) { + conn.writeToChannel(content, listener); + if (content instanceof LastHttpContent) { + conn.endRequest(this); + } + } else { + eventLoop.execute(() -> writeBuffer(content, listener)); } } @@ -399,29 +403,16 @@ class Http1xClientConnection extends Http1xConnectionBase impleme handleException(cause); EventLoop eventLoop = conn.context.nettyEventLoop(); if (eventLoop.inEventLoop()) { - doReset(); + reset(); } else { - eventLoop.execute(this::doReset); + eventLoop.execute(this::reset); } } - private void doReset() { + private void reset() { conn.resetRequest(this); } - private void endRequest() { - EventLoop eventLoop = conn.context.nettyEventLoop(); - if (eventLoop.inEventLoop()) { - doEndRequest(); - } else { - eventLoop.execute(this::doEndRequest); - } - } - - private void doEndRequest() { - conn.endRequest(this); - } - @Override public StreamPriority priority() { return null; @@ -451,6 +442,12 @@ class Http1xClientConnection extends Http1xConnectionBase impleme request.handleResponse(response); } + void handleChunk(Buffer buff) { + if (!queue.write(buff)) { + conn.doPause(); + } + } + void handleEnd(LastHttpContent trailer) { queue.write(new HeadersAdaptor(trailer.trailingHeaders())); } diff --git a/src/test/java/io/vertx/core/http/HttpTest.java b/src/test/java/io/vertx/core/http/HttpTest.java index 34bf50f97..ac24e6072 100644 --- a/src/test/java/io/vertx/core/http/HttpTest.java +++ b/src/test/java/io/vertx/core/http/HttpTest.java @@ -22,13 +22,15 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.dns.AddressResolverOptions; import io.vertx.core.file.AsyncFile; import io.vertx.core.http.impl.HeadersAdaptor; +import io.vertx.core.http.impl.HttpClientImpl; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.net.*; import io.vertx.core.streams.Pump; import io.vertx.test.core.Repeat; import io.vertx.test.core.TestUtils; +import io.vertx.test.fakestream.FakeStream; import io.vertx.test.netty.TestLoggerFactory; import org.junit.Assume; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -5113,35 +5115,6 @@ public abstract class HttpTest extends HttpTestBase { await(); } - // This test check that ending an HttpClientRequest will not hold a lock when sending Netty messages - // holding suck lock might deadlock when the ChannelOutboundBuffer is full and becomes drained - // doing an HttpClientRequest reentrant during the drain - @Repeat(times = 30) - @Test - public void testClientRequestEndDeadlock() throws Exception { - server.requestHandler(req -> req.endHandler(v -> req.response().end())); - startServer(testAddress); - Context ctx = vertx.getOrCreateContext(); - ctx.runOnContext(v1 -> { - HttpClientRequest request = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, onSuccess(resp -> { - resp.endHandler(v2 -> { - testComplete(); - }); - })) - .setChunked(true); - new Thread(() -> { - Buffer s = randomBuffer(256); - while (!request.writeQueueFull()) { - request.write(s); - } - ctx.runOnContext(v2 -> { - request.end(); - }); - }).start(); - }); - await(); - } - @Test public void testServerResponseWriteSuccess() throws Exception { testServerResponseWriteSuccess((resp, handler) -> resp.write(TestUtils.randomBuffer(1024), handler)); @@ -5646,4 +5619,37 @@ public abstract class HttpTest extends HttpTestBase { })); await(); } + + @Test + public void testClientRequestWithLargeBodyInSmallChunks() throws Exception { + StringBuilder sb = new StringBuilder(); + FakeStream src = new FakeStream<>(); + src.pause(); + int numChunks = 1024; + int chunkLength = 1024; + for (int i = 0;i < numChunks;i++) { + String chunk = randomAlphaString(chunkLength); + sb.append(chunk); + src.write(Buffer.buffer(chunk)); + } + src.end(); + String expected = sb.toString(); + waitFor(2); + server.requestHandler(req -> { + req.bodyHandler(body -> { + assertEquals(HttpMethod.PUT, req.method()); + assertEquals(Buffer.buffer(expected), body); + complete(); + req.response().end(); + }); + }); + startServer(); + HttpClientRequest stream = client.put(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, onSuccess(resp -> { + assertEquals(200, resp.statusCode()); + complete(); + })); + stream.putHeader(HttpHeaders.CONTENT_LENGTH, "" + numChunks * chunkLength); + src.pipeTo(stream); + await(); + } }