Improve Http1xTest#testContexts that will now overrun the client connection with buffers

This commit is contained in:
Julien Viet
2019-12-06 00:50:42 +01:00
parent 73f46c0097
commit ab1464c8df

View File

@@ -42,6 +42,7 @@ import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -2062,13 +2063,17 @@ public class Http1xTest extends HttpTest {
await();
}
private void fill(Buffer buffer, WriteStream<Buffer> ws, Runnable done) {
private void fill(Buffer buffer, WriteStream<Buffer> ws, LongConsumer done) {
fill(0, buffer, ws, done);
}
private void fill(long amount, Buffer buffer, WriteStream<Buffer> ws, LongConsumer done) {
if (ws.writeQueueFull()) {
done.run();
done.accept(amount);
} else {
ws.write(buffer);
vertx.runOnContext(v -> {
fill(buffer, ws, done);
vertx.setTimer(1, v -> {
fill(amount + buffer.length(), buffer, ws, done);
});
}
}
@@ -2082,39 +2087,40 @@ public class Http1xTest extends HttpTest {
Context serverCtx = vertx.getOrCreateContext();
CountDownLatch latch = new CountDownLatch(1);
List<HttpServerRequest> requests = Collections.synchronizedList(new ArrayList<>());
Map<String, CompletionStage<Void>> requestResumeMap = new HashMap<>();
Map<String, CompletionStage<Long>> requestResumeMap = new HashMap<>();
Map<String, CompletionStage<Void>> responseResumeMap = new HashMap<>();
serverCtx.runOnContext(v1 -> {
server.requestHandler(req -> {
req.pause();
requestResumeMap.get(req.path()).thenAccept(v -> {
req.resume();
});
assertSameEventLoop(serverCtx, Vertx.currentContext());
Buffer body = Buffer.buffer();
requestResumeMap.get(req.path()).thenAccept(amount -> {
req.resume();
req.endHandler(v2 -> {
assertSameEventLoop(serverCtx, Vertx.currentContext());
assertEquals((long)amount, body.length());
requests.add(req);
if (requests.size() == numReqs) {
requests.forEach(req_ ->{
HttpServerResponse resp = req_.response();
CompletableFuture<Void> cf = new CompletableFuture<>();
responseResumeMap.put(req_.path(), cf);
resp.setChunked(true);
fill(data, resp, sent -> {
cf.complete(null);
resp.drainHandler(v -> {
assertSameEventLoop(serverCtx, Vertx.currentContext());
resp.end();
});
});
});
}
});
});
req.handler(chunk -> {
assertSameEventLoop(serverCtx, Vertx.currentContext());
body.appendBuffer(chunk);
});
req.endHandler(v2 -> {
assertSameEventLoop(serverCtx, Vertx.currentContext());
requests.add(req);
if (requests.size() == numReqs) {
requests.forEach(req_ ->{
HttpServerResponse resp = req_.response();
CompletableFuture<Void> cf = new CompletableFuture<>();
responseResumeMap.put(req_.path(), cf);
resp.setChunked(true);
fill(data, resp, () -> {
cf.complete(null);
resp.drainHandler(v -> {
assertSameEventLoop(serverCtx, Vertx.currentContext());
resp.end();
});
});
});
}
});
});
server.listen(testAddress, onSuccess(s -> {
assertSameEventLoop(serverCtx, Vertx.currentContext());
@@ -2135,7 +2141,7 @@ public class Http1xTest extends HttpTest {
});
waitUntil(() -> client != null);
for (int i = 0; i < numReqs; i++) {
CompletableFuture<Void> cf = new CompletableFuture<>();
CompletableFuture<Long> cf = new CompletableFuture<>();
String path = "/" + i;
requestResumeMap.put(path, cf);
Context requestCtx = vertx.getOrCreateContext();
@@ -2166,9 +2172,7 @@ public class Http1xTest extends HttpTest {
});
req.sendHead(version -> {
assertSameEventLoop(requestCtx, Vertx.currentContext());
fill(data, req, () -> {
cf.complete(null);
});
fill(data, req, cf::complete);
});
});
}