From a655b9b4c4ec3ec065f6c8f0c038823b5c16b60c Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Wed, 9 Oct 2019 10:34:33 +0200 Subject: [PATCH] Obtain context in HttpClientRequest when the request is created and not at connect time --- .../core/http/impl/HttpClientRequestImpl.java | 7 +- .../java/io/vertx/core/http/Http1xTest.java | 67 +++++++++---------- .../io/vertx/core/http/Http2ClientTest.java | 56 ++++++++-------- 3 files changed, 64 insertions(+), 66 deletions(-) diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java index 6743c6787..b33552a54 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java @@ -54,6 +54,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http static final Logger log = LoggerFactory.getLogger(HttpClientRequestImpl.class); private final VertxInternal vertx; + private final ContextInternal context; private boolean chunked; private String hostHeader; private String rawMethod; @@ -81,6 +82,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http super(client, ssl, method, server, host, port, relativeURI); this.chunked = false; this.vertx = vertx; + this.context = vertx.getOrCreateContext(); this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY; } @@ -437,15 +439,12 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } else { initializer = h2; } - ContextInternal connectCtx = vertx.getOrCreateContext(); - - // We defer actual connection until the first part of body is written or end is called // This gives the user an opportunity to set an exception handler before connecting so // they can capture any exceptions on connection connecting = true; - client.getConnectionForRequest(connectCtx, peerAddress, ssl, server, ar1 -> { + client.getConnectionForRequest(context, peerAddress, ssl, server, ar1 -> { if (ar1.succeeded()) { HttpClientStream stream = ar1.result(); ContextInternal ctx = stream.getContext(); diff --git a/src/test/java/io/vertx/core/http/Http1xTest.java b/src/test/java/io/vertx/core/http/Http1xTest.java index eb2c028e9..bf6242327 100644 --- a/src/test/java/io/vertx/core/http/Http1xTest.java +++ b/src/test/java/io/vertx/core/http/Http1xTest.java @@ -2072,17 +2072,17 @@ public class Http1xTest extends HttpTest { }); startServer(testAddress); Context clientCtx = vertx.getOrCreateContext(); - HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, onFailure(err -> { - assertSameEventLoop(clientCtx, Vertx.currentContext()); - complete(); - })); clientCtx.runOnContext(v -> { + HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, onFailure(err -> { + assertSameEventLoop(clientCtx, Vertx.currentContext()); + complete(); + })); + req.exceptionHandler(err -> { + assertSameEventLoop(clientCtx, Vertx.currentContext()); + complete(); + }); req.sendHead(); }); - req.exceptionHandler(err -> { - assertSameEventLoop(clientCtx, Vertx.currentContext()); - complete(); - }); await(); } @@ -2159,32 +2159,31 @@ public class Http1xTest extends HttpTest { CompletableFuture cf = new CompletableFuture<>(); String path = "/" + val; requestResumeMap.put(path, cf); - HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, path, onSuccess(resp -> { - assertSameEventLoop(clientCtx, Vertx.currentContext()); - assertEquals(200, resp.statusCode()); - contexts.add(Vertx.currentContext()); - threads.add(Thread.currentThread()); - resp.pause(); - responseResumeMap.get(path).thenAccept(v -> resp.resume()); - resp.handler(chunk -> { - assertSameEventLoop(clientCtx, Vertx.currentContext()); - }); - resp.exceptionHandler(this::fail); - resp.endHandler(v -> { - assertSameEventLoop(clientCtx, Vertx.currentContext()); - if (cnt.incrementAndGet() == numReqs) { - assertEquals(numReqs, contexts.size()); - assertEquals(1, threads.size()); - latch2.countDown(); - } - }); - })).setChunked(true).exceptionHandler(this::fail); - CountDownLatch drainLatch = new CountDownLatch(1); - req.drainHandler(v -> { - assertSameEventLoop(clientCtx, Vertx.currentContext()); - drainLatch.countDown(); - }); clientCtx.runOnContext(v -> { + HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, path, onSuccess(resp -> { + assertSameEventLoop(clientCtx, Vertx.currentContext()); + assertEquals(200, resp.statusCode()); + contexts.add(Vertx.currentContext()); + threads.add(Thread.currentThread()); + resp.pause(); + responseResumeMap.get(path).thenAccept(v2 -> resp.resume()); + resp.handler(chunk -> { + assertSameEventLoop(clientCtx, Vertx.currentContext()); + }); + resp.exceptionHandler(this::fail); + resp.endHandler(v2 -> { + assertSameEventLoop(clientCtx, Vertx.currentContext()); + if (cnt.incrementAndGet() == numReqs) { + assertEquals(numReqs, contexts.size()); + assertEquals(1, threads.size()); + latch2.countDown(); + } + }); + })).setChunked(true).exceptionHandler(this::fail); + req.drainHandler(v2 -> { + assertSameEventLoop(clientCtx, Vertx.currentContext()); + req.end(); + }); req.sendHead(version -> { assertSameEventLoop(clientCtx, Vertx.currentContext()); fill(data, req, () -> { @@ -2192,8 +2191,6 @@ public class Http1xTest extends HttpTest { }); }); }); - awaitLatch(drainLatch); - req.end(); } awaitLatch(latch2, 40, TimeUnit.SECONDS); // Close should be in own context diff --git a/src/test/java/io/vertx/core/http/Http2ClientTest.java b/src/test/java/io/vertx/core/http/Http2ClientTest.java index 561946931..b9e6632ea 100644 --- a/src/test/java/io/vertx/core/http/Http2ClientTest.java +++ b/src/test/java/io/vertx/core/http/Http2ClientTest.java @@ -428,34 +428,36 @@ public class Http2ClientTest extends Http2TestBase { }); }); startServer(); - HttpClientRequest req = client.post(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { - testComplete(); - }).setChunked(true).exceptionHandler(err -> { - fail(); - }); - AtomicInteger sent = new AtomicInteger(); - AtomicInteger count = new AtomicInteger(); - AtomicInteger drained = new AtomicInteger(); - vertx.setPeriodic(1, timerID -> { - Context ctx = vertx.getOrCreateContext(); - if (req.writeQueueFull()) { - assertTrue(paused.get()); - assertEquals(1, numPause.get()); - req.drainHandler(v -> { - assertOnIOContext(ctx); - assertEquals(0, drained.getAndIncrement()); + Context ctx = vertx.getOrCreateContext(); + ctx.runOnContext(v -> { + HttpClientRequest req = client.post(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { + testComplete(); + }).setChunked(true).exceptionHandler(err -> { + fail(); + }); + AtomicInteger sent = new AtomicInteger(); + AtomicInteger count = new AtomicInteger(); + AtomicInteger drained = new AtomicInteger(); + vertx.setPeriodic(1, timerID -> { + if (req.writeQueueFull()) { + assertTrue(paused.get()); assertEquals(1, numPause.get()); - assertFalse(paused.get()); - req.end(); - }); - vertx.cancelTimer(timerID); - done.complete(null); - } else { - count.incrementAndGet(); - expected.appendString(chunk); - req.write(chunk); - sent.addAndGet(chunk.length()); - } + req.drainHandler(v2 -> { + assertOnIOContext(ctx); + assertEquals(0, drained.getAndIncrement()); + assertEquals(1, numPause.get()); + assertFalse(paused.get()); + req.end(); + }); + vertx.cancelTimer(timerID); + done.complete(null); + } else { + count.incrementAndGet(); + expected.appendString(chunk); + req.write(chunk); + sent.addAndGet(chunk.length()); + } + }); }); await(); }