Obtain context in HttpClientRequest when the request is created and not at connect time

This commit is contained in:
Julien Viet
2019-10-09 10:34:33 +02:00
parent 48751f950a
commit a655b9b4c4
3 changed files with 64 additions and 66 deletions

View File

@@ -54,6 +54,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
static final Logger log = LoggerFactory.getLogger(HttpClientRequestImpl.class);
private final VertxInternal vertx;
private final ContextInternal context;
private boolean chunked;
private String hostHeader;
private String rawMethod;
@@ -81,6 +82,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
super(client, ssl, method, server, host, port, relativeURI);
this.chunked = false;
this.vertx = vertx;
this.context = vertx.getOrCreateContext();
this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY;
}
@@ -437,15 +439,12 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
} else {
initializer = h2;
}
ContextInternal connectCtx = vertx.getOrCreateContext();
// We defer actual connection until the first part of body is written or end is called
// This gives the user an opportunity to set an exception handler before connecting so
// they can capture any exceptions on connection
connecting = true;
client.getConnectionForRequest(connectCtx, peerAddress, ssl, server, ar1 -> {
client.getConnectionForRequest(context, peerAddress, ssl, server, ar1 -> {
if (ar1.succeeded()) {
HttpClientStream stream = ar1.result();
ContextInternal ctx = stream.getContext();

View File

@@ -2072,17 +2072,17 @@ public class Http1xTest extends HttpTest {
});
startServer(testAddress);
Context clientCtx = vertx.getOrCreateContext();
HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, onFailure(err -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
complete();
}));
clientCtx.runOnContext(v -> {
HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, onFailure(err -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
complete();
}));
req.exceptionHandler(err -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
complete();
});
req.sendHead();
});
req.exceptionHandler(err -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
complete();
});
await();
}
@@ -2159,32 +2159,31 @@ public class Http1xTest extends HttpTest {
CompletableFuture<Void> cf = new CompletableFuture<>();
String path = "/" + val;
requestResumeMap.put(path, cf);
HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, path, onSuccess(resp -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
assertEquals(200, resp.statusCode());
contexts.add(Vertx.currentContext());
threads.add(Thread.currentThread());
resp.pause();
responseResumeMap.get(path).thenAccept(v -> resp.resume());
resp.handler(chunk -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
});
resp.exceptionHandler(this::fail);
resp.endHandler(v -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
if (cnt.incrementAndGet() == numReqs) {
assertEquals(numReqs, contexts.size());
assertEquals(1, threads.size());
latch2.countDown();
}
});
})).setChunked(true).exceptionHandler(this::fail);
CountDownLatch drainLatch = new CountDownLatch(1);
req.drainHandler(v -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
drainLatch.countDown();
});
clientCtx.runOnContext(v -> {
HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, path, onSuccess(resp -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
assertEquals(200, resp.statusCode());
contexts.add(Vertx.currentContext());
threads.add(Thread.currentThread());
resp.pause();
responseResumeMap.get(path).thenAccept(v2 -> resp.resume());
resp.handler(chunk -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
});
resp.exceptionHandler(this::fail);
resp.endHandler(v2 -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
if (cnt.incrementAndGet() == numReqs) {
assertEquals(numReqs, contexts.size());
assertEquals(1, threads.size());
latch2.countDown();
}
});
})).setChunked(true).exceptionHandler(this::fail);
req.drainHandler(v2 -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
req.end();
});
req.sendHead(version -> {
assertSameEventLoop(clientCtx, Vertx.currentContext());
fill(data, req, () -> {
@@ -2192,8 +2191,6 @@ public class Http1xTest extends HttpTest {
});
});
});
awaitLatch(drainLatch);
req.end();
}
awaitLatch(latch2, 40, TimeUnit.SECONDS);
// Close should be in own context

View File

@@ -428,34 +428,36 @@ public class Http2ClientTest extends Http2TestBase {
});
});
startServer();
HttpClientRequest req = client.post(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
testComplete();
}).setChunked(true).exceptionHandler(err -> {
fail();
});
AtomicInteger sent = new AtomicInteger();
AtomicInteger count = new AtomicInteger();
AtomicInteger drained = new AtomicInteger();
vertx.setPeriodic(1, timerID -> {
Context ctx = vertx.getOrCreateContext();
if (req.writeQueueFull()) {
assertTrue(paused.get());
assertEquals(1, numPause.get());
req.drainHandler(v -> {
assertOnIOContext(ctx);
assertEquals(0, drained.getAndIncrement());
Context ctx = vertx.getOrCreateContext();
ctx.runOnContext(v -> {
HttpClientRequest req = client.post(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
testComplete();
}).setChunked(true).exceptionHandler(err -> {
fail();
});
AtomicInteger sent = new AtomicInteger();
AtomicInteger count = new AtomicInteger();
AtomicInteger drained = new AtomicInteger();
vertx.setPeriodic(1, timerID -> {
if (req.writeQueueFull()) {
assertTrue(paused.get());
assertEquals(1, numPause.get());
assertFalse(paused.get());
req.end();
});
vertx.cancelTimer(timerID);
done.complete(null);
} else {
count.incrementAndGet();
expected.appendString(chunk);
req.write(chunk);
sent.addAndGet(chunk.length());
}
req.drainHandler(v2 -> {
assertOnIOContext(ctx);
assertEquals(0, drained.getAndIncrement());
assertEquals(1, numPause.get());
assertFalse(paused.get());
req.end();
});
vertx.cancelTimer(timerID);
done.complete(null);
} else {
count.incrementAndGet();
expected.appendString(chunk);
req.write(chunk);
sent.addAndGet(chunk.length());
}
});
});
await();
}