From 515ca71eb9bff221482dac8a2b52f71185630223 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 2 Jul 2019 09:40:12 +0200 Subject: [PATCH] HttpClient improvements - this closes #3014, this closes #2982 --- .../vertx/core/http/StreamResetException.java | 2 +- .../core/http/impl/ConnectionManager.java | 17 +- .../http/impl/Http1xClientConnection.java | 93 ++++---- .../core/http/impl/Http2ClientConnection.java | 31 +-- .../impl/Http2UpgradedClientConnection.java | 11 +- .../core/http/impl/HttpClientRequestBase.java | 76 +++--- .../core/http/impl/HttpClientRequestImpl.java | 216 +++++++----------- .../impl/HttpClientRequestPushPromise.java | 14 +- .../http/impl/HttpClientResponseImpl.java | 12 +- .../core/http/impl/HttpClientStream.java | 4 +- .../java/io/vertx/core/http/Http1xTest.java | 15 +- .../io/vertx/core/http/Http2ClientTest.java | 23 +- .../java/io/vertx/core/http/Http2Test.java | 16 +- .../java/io/vertx/core/http/HttpTest.java | 157 ++++++++++--- .../java/io/vertx/test/core/TestUtils.java | 2 + 15 files changed, 351 insertions(+), 338 deletions(-) diff --git a/src/main/java/io/vertx/core/http/StreamResetException.java b/src/main/java/io/vertx/core/http/StreamResetException.java index a06b587c5..095bc0e37 100644 --- a/src/main/java/io/vertx/core/http/StreamResetException.java +++ b/src/main/java/io/vertx/core/http/StreamResetException.java @@ -23,7 +23,7 @@ public class StreamResetException extends VertxException { private final long code; public StreamResetException(long code) { - super("Stream reset: " + code); + super("Stream reset: " + code, true); this.code = code; } diff --git a/src/main/java/io/vertx/core/http/impl/ConnectionManager.java b/src/main/java/io/vertx/core/http/impl/ConnectionManager.java index 812f7852d..110a85d1a 100644 --- a/src/main/java/io/vertx/core/http/impl/ConnectionManager.java +++ b/src/main/java/io/vertx/core/http/impl/ConnectionManager.java @@ -148,21 +148,10 @@ class ConnectionManager { } if (endpoint.pool.getConnection(ar -> { - if (ar.succeeded()) { - - HttpClientConnection conn = ar.result(); - - if (metrics != null) { - metrics.dequeueRequest(endpoint.metric, metric); - } - - handler.handle(Future.succeededFuture(conn)); - } else { - if (metrics != null) { - metrics.dequeueRequest(endpoint.metric, metric); - } - handler.handle(Future.failedFuture(ar.cause())); + if (metrics != null) { + metrics.dequeueRequest(endpoint.metric, metric); } + handler.handle(ar); })) { break; } diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index edd317e86..c31219060 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -191,11 +191,11 @@ class Http1xClientConnection extends Http1xConnectionBase impleme private final Promise fut; private final InboundBuffer queue; private HttpClientRequestImpl request; + private Handler continueHandler; private HttpClientResponseImpl response; private boolean requestEnded; private boolean responseEnded; private boolean reset; - private MultiMap trailers; private StreamImpl next; private long bytesWritten; private long bytesRead; @@ -245,12 +245,13 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } @Override - public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler> handler) { + public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler contHandler, Handler> handler) { HttpRequest request = createRequest(method, rawMethod, uri, headers); prepareRequestHeaders(request, hostHeader, chunked); if (buf != null) { bytesWritten += buf.readableBytes(); } + continueHandler = contHandler; sendRequest(request, buf, end, handler); if (conn.responseInProgress == null) { conn.responseInProgress = this; @@ -357,22 +358,26 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } @Override - public void reset(long code) { + public void reset(Throwable cause) { synchronized (conn) { - if (!reset) { - reset = true; - if (conn.requestInProgress == this) { - if (request == null) { - conn.requestInProgress = null; - conn.recycle(); - } else { - conn.close(); - } - } else if (!responseEnded) { + if (reset) { + return; + } + reset = true; + if (conn.requestInProgress == this) { + if (request == null) { + // Is that possible in practice ??? + conn.handleRequestEnd(true); + } else { conn.close(); } + } else if (!responseEnded) { + conn.close(); + } else { + // ???? } } + handleException(cause); } @Override @@ -463,12 +468,12 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } } } - queue.handler(buf -> { - if (buf == InboundBuffer.END_SENTINEL) { + queue.handler(item -> { + if (item instanceof MultiMap) { conn.reportBytesRead(bytesRead); - response.handleEnd(trailers); + response.handleEnd((MultiMap) item); } else { - response.handleChunk((Buffer) buf); + response.handleChunk((Buffer) item); } }); queue.drainHandler(v -> { @@ -482,16 +487,10 @@ class Http1xClientConnection extends Http1xConnectionBase impleme private boolean endResponse(LastHttpContent trailer) { synchronized (conn) { if (conn.metrics != null) { - HttpClientRequestBase req = request; - if (req.exceptionOccurred != null) { - conn.metrics.requestReset(metric); - } else { - conn.metrics.responseEnd(metric, response); - } + conn.metrics.responseEnd(metric, response); } - trailers = new HeadersAdaptor(trailer.trailingHeaders()); } - queue.write(InboundBuffer.END_SENTINEL); + queue.write(new HeadersAdaptor(trailer.trailingHeaders())); synchronized (conn) { responseEnded = true; conn.close |= !conn.options.isKeepAlive(); @@ -512,7 +511,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme requestEnded = this.requestEnded; } if (request != null) { - if (response == null || response.statusCode() == 100) { + if (response == null) { request.handleException(cause); } else { if (!requestEnded) { @@ -582,27 +581,25 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } private void handleResponseBegin(HttpResponse resp) { - StreamImpl stream; - HttpClientResponseImpl response; - HttpClientRequestImpl request; - Exception err; - synchronized (this) { - stream = responseInProgress; - request = stream.request; - HttpClientResponseImpl r = null; - Exception t = null; - try { - r = stream.beginResponse(resp); - } catch (Exception e) { - t = e; + if (resp.status().code() == 100) { + Handler handler; + synchronized (this) { + StreamImpl stream = responseInProgress; + handler = stream.continueHandler; + } + if (handler != null) { + handler.handle(null); } - response = r; - err = t; - } - if (response != null) { - request.handleResponse(response); } else { - request.handleException(err); + StreamImpl stream; + HttpClientResponseImpl response; + HttpClientRequestImpl request; + synchronized (this) { + stream = responseInProgress; + request = stream.request; + response = stream.beginResponse(resp); + } + request.handleResponse(response); } } @@ -622,8 +619,8 @@ class Http1xClientConnection extends Http1xConnectionBase impleme StreamImpl stream; synchronized (this) { stream = responseInProgress; - // We don't signal response end for a 100-continue response as a real response will follow - if (stream.response.statusCode() == 100) { + if (stream.response == null) { + // 100-continue return; } responseInProgress = stream.next; @@ -640,7 +637,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme requestInProgress = next; } if (recycle) { - checkLifecycle(); + recycle(); } if (next != null) { next.fut.complete(next); diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java index 7b2466ba1..dbd777cf3 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java @@ -95,13 +95,12 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon super.onStreamClosed(nettyStream); } - void upgradeStream(HttpClientRequestImpl req, Object metric, Handler> completionHandler) { + void upgradeStream(Object metric, Handler> completionHandler) { Future fut; synchronized (this) { try { Http2ClientStream stream = createStream(handler.connection().stream(1)); stream.metric = metric; - stream.beginRequest(req); fut = Future.succeededFuture(stream); } catch (Exception e) { fut = Future.failedFuture(e); @@ -203,6 +202,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon private HttpClientRequestBase request; private HttpClientResponseImpl response; + private Handler continueHandler; private boolean requestEnded; private boolean responseEnded; private Object metric; @@ -244,11 +244,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon @Override void handleEnd(MultiMap trailers) { if (conn.metrics != null) { - if (request.exceptionOccurred != null) { - conn.metrics.requestReset(metric); - } else { - conn.metrics.responseEnd(metric, response); - } + conn.metrics.responseEnd(metric, response); } responseEnded = true; // Should use a shared immutable object for CaseInsensitiveHeaders ? @@ -320,9 +316,10 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon } void handleHeaders(Http2Headers headers, StreamPriority streamPriority, boolean end) { - if(streamPriority != null) + if(streamPriority != null) { priority(streamPriority); - if (response == null || response.statusCode() == 100) { + } + if (response == null) { int status; String statusMessage; try { @@ -333,9 +330,13 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon writeReset(0x01 /* PROTOCOL_ERROR */); return; } - + if (status == 100) { + if (continueHandler != null) { + continueHandler.handle(null); + } + return; + } headers.remove(":status"); - response = new HttpClientResponseImpl( request, HttpVersion.HTTP_2, @@ -376,7 +377,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon } @Override - public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end, StreamPriority priority, Handler> handler) { + public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end, StreamPriority priority, Handler contHandler, Handler> handler) { Http2Headers h = new DefaultHttp2Headers(); h.method(method != HttpMethod.OTHER ? method.name() : rawMethod); if (method == HttpMethod.CONNECT) { @@ -399,6 +400,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon if (conn.client.getOptions().isTryUseCompression() && h.get(HttpHeaderNames.ACCEPT_ENCODING) == null) { h.set(HttpHeaderNames.ACCEPT_ENCODING, DEFLATE_GZIP); } + continueHandler = contHandler; if (conn.metrics != null) { metric = conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request); } @@ -458,11 +460,14 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon } @Override - public void reset(long code) { + public void reset(Throwable cause) { + long code = cause instanceof StreamResetException ? ((StreamResetException)cause).getCode() : 0; if (request == null) { + // Not sure this is possible in practice writeReset(code); } else { if (!(requestEnded && responseEnded)) { + handleException(cause); requestEnded = true; responseEnded = true; writeReset(code); diff --git a/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java index a3e28e415..2b29068ff 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java @@ -104,6 +104,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { ByteBuf buf, boolean end, StreamPriority priority, + Handler continueHandler, Handler> handler) { ChannelPipeline pipeline = conn.channel().pipeline(); HttpClientCodec httpCodec = pipeline.get(HttpClientCodec.class); @@ -138,10 +139,12 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { // Now we need to upgrade this to an HTTP2 ConnectionListener listener = conn.listener(); VertxHttp2ConnectionHandler handler = Http2ClientConnection.createHttp2ConnectionHandler(client, conn.endpointMetric(), listener, conn.getContext(), current.metric(), (conn, concurrency) -> { - conn.upgradeStream(request, stream.metric(), ar -> { + conn.upgradeStream(stream.metric(), ar -> { UpgradingStream.this.conn.closeHandler(null); UpgradingStream.this.conn.exceptionHandler(null); if (ar.succeeded()) { + HttpClientStream upgradedStream = ar.result(); + upgradedStream.beginRequest(request); current = conn; conn.closeHandler(closeHandler); conn.exceptionHandler(exceptionHandler); @@ -163,7 +166,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536); pipeline.addAfter("codec", null, new UpgradeRequestHandler()); pipeline.addAfter("codec", null, upgradeHandler); - stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, priority, handler); + stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, priority, continueHandler, handler); } @Override @@ -217,8 +220,8 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { } @Override - public void reset(long code) { - stream.reset(code); + public void reset(Throwable cause) { + stream.reset(cause); } @Override diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java index 0c5d2fbca..fe3491da4 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java @@ -11,15 +11,16 @@ package io.vertx.core.http.impl; -import io.netty.handler.codec.http2.Http2CodecUtil; +import io.vertx.core.Promise; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpClientResponse; import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.StreamResetException; import io.vertx.core.net.SocketAddress; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; -import io.vertx.core.streams.ReadStream; import java.util.concurrent.TimeoutException; @@ -45,7 +46,7 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { private long lastDataReceived; protected Throwable exceptionOccurred; private boolean paused; - private HttpClientResponseImpl response; + private HttpClientResponse response; HttpClientRequestBase(HttpClientImpl client, boolean ssl, HttpMethod method, SocketAddress server, String host, int port, String uri) { this.client = client; @@ -59,8 +60,8 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { this.ssl = ssl; } - protected abstract void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs); - protected abstract void checkComplete(); + protected void checkEnded() { + } protected String hostHeader() { if ((port == 80 && !ssl) || (port == 443 && ssl)) { @@ -99,7 +100,7 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { @Override public synchronized HttpClientRequest exceptionHandler(Handler handler) { if (handler != null) { - checkComplete(); + checkEnded(); this.exceptionHandler = handler; } else { this.exceptionHandler = null; @@ -113,7 +114,7 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { @Override public synchronized HttpClientRequest setTimeout(long timeoutMs) { - cancelOutstandingTimeoutTimer(); + cancelTimeout(); currentTimeoutMs = timeoutMs; currentTimeoutTimerId = client.getVertx().setTimer(timeoutMs, id -> handleTimeout(timeoutMs)); return this; @@ -122,7 +123,7 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { public void handleException(Throwable t) { Handler handler; synchronized (this) { - cancelOutstandingTimeoutTimer(); + cancelTimeout(); exceptionOccurred = t; if (exceptionHandler != null) { handler = exceptionHandler; @@ -133,7 +134,7 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { handler.handle(t); } - void handleResponse(HttpClientResponseImpl resp) { + void handleResponse(HttpClientResponse resp) { synchronized (this) { response = resp; } @@ -141,38 +142,30 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { } private void checkHandleResponse() { - HttpClientResponseImpl resp; + long timeoutMS; + HttpClientResponse resp; synchronized (this) { if (response != null) { if (paused) { return; } + timeoutMS = cancelTimeout(); resp = response; response = null; } else { return; } } - doHandleResponse(resp); - } - - private synchronized void doHandleResponse(HttpClientResponseImpl resp) { - long timeoutMS; - synchronized (this) { - // If an exception occurred (e.g. a timeout fired) we won't receive the response. - if (exceptionOccurred != null) { - return; - } - timeoutMS = cancelOutstandingTimeoutTimer(); - } try { - doHandleResponse(resp, timeoutMS); + handleResponse(resp, timeoutMS); } catch (Throwable t) { handleException(t); } } - private long cancelOutstandingTimeoutTimer() { + abstract void handleResponse(HttpClientResponse resp, long timeoutMs); + + private synchronized long cancelTimeout() { long ret; if ((ret = currentTimeoutTimerId) != -1) { client.getVertx().cancelTimer(currentTimeoutTimerId); @@ -184,31 +177,25 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { } private void handleTimeout(long timeoutMs) { - if (lastDataReceived == 0) { - timeout(timeoutMs); - } else { - long now = System.currentTimeMillis(); - long timeSinceLastData = now - lastDataReceived; - if (timeSinceLastData >= timeoutMs) { - timeout(timeoutMs); - } else { - // reschedule - lastDataReceived = 0; - setTimeout(timeoutMs - timeSinceLastData); + synchronized (this) { + if (lastDataReceived > 0) { + long now = System.currentTimeMillis(); + long timeSinceLastData = now - lastDataReceived; + if (timeSinceLastData < timeoutMs) { + // reschedule + lastDataReceived = 0; + setTimeout(timeoutMs - timeSinceLastData); + return; + } } } - } - - private void timeout(long timeoutMs) { String msg = "The timeout period of " + timeoutMs + "ms has been exceeded while executing " + method + " " + uri + " for server " + server; - // Use a stack-less exception - handleException(new TimeoutException(msg) { + reset(new TimeoutException(msg) { @Override public synchronized Throwable fillInStackTrace() { return this; } }); - reset(0); } synchronized void dataReceived() { @@ -217,6 +204,13 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { } } + @Override + public boolean reset(long code) { + return reset(new StreamResetException(code)); + } + + abstract boolean reset(Throwable cause); + @Override public HttpClientRequest pause() { paused = true; diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java index aef6dcad0..25180cb47 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java @@ -53,8 +53,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http static final Logger log = LoggerFactory.getLogger(HttpClientRequestImpl.class); private final VertxInternal vertx; - private Handler respHandler; - private Handler endHandler; private boolean chunked; private String hostHeader; private String rawMethod; @@ -62,9 +60,11 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http private Handler drainHandler; private Handler pushHandler; private Handler connectionHandler; - private boolean completed; - private Handler completionHandler; - private Long reset; + private Handler exceptionHandler; + private Promise endPromise = Promise.promise(); + private Future endFuture = endPromise.future(); + private boolean ended; + private Throwable reset; private ByteBuf pendingChunks; private List>> pendingHandlers; private int pendingMaxSize = -1; @@ -73,8 +73,8 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http private StreamPriority priority; private HttpClientStream stream; private boolean connecting; - - // completed => drainHandler = null + private Handler respHandler; + private Handler endHandler; HttpClientRequestImpl(HttpClientImpl client, boolean ssl, HttpMethod method, SocketAddress server, String host, int port, @@ -86,20 +86,14 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } @Override - public int streamId() { - HttpClientStream s; - synchronized (this) { - if ((s = stream) == null) { - return -1; - } - } - return s.id(); + public synchronized int streamId() { + return stream == null ? -1 : stream.id(); } @Override public synchronized HttpClientRequest handler(Handler handler) { if (handler != null) { - checkComplete(); + checkEnded(); } respHandler = handler; return this; @@ -108,7 +102,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http @Override public HttpClientRequest setFollowRedirects(boolean followRedirects) { synchronized (this) { - checkComplete(); + checkEnded(); if (followRedirects) { this.followRedirects = client.getOptions().getMaxRedirects() - 1; } else { @@ -122,7 +116,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http public HttpClientRequest endHandler(Handler handler) { synchronized (this) { if (handler != null) { - checkComplete(); + checkEnded(); } endHandler = handler; return this; @@ -132,7 +126,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http @Override public HttpClientRequestImpl setChunked(boolean chunked) { synchronized (this) { - checkComplete(); + checkEnded(); if (stream != null) { throw new IllegalStateException("Cannot set chunked after data has been written on request"); } @@ -181,14 +175,14 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http @Override public synchronized HttpClientRequest putHeader(String name, String value) { - checkComplete(); + checkEnded(); headers().set(name, value); return this; } @Override public synchronized HttpClientRequest putHeader(String name, Iterable values) { - checkComplete(); + checkEnded(); headers().set(name, values); return this; } @@ -197,7 +191,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http public HttpClientRequest setWriteQueueMaxSize(int maxSize) { HttpClientStream s; synchronized (this) { - checkComplete(); + checkEnded(); if ((s = stream) == null) { pendingMaxSize = maxSize; return this; @@ -211,7 +205,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http public boolean writeQueueFull() { HttpClientStream s; synchronized (this) { - checkComplete(); + checkEnded(); if ((s = stream) == null) { // Should actually check with max queue size and not always blindly return false return false; @@ -224,7 +218,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http public HttpClientRequest drainHandler(Handler handler) { synchronized (this) { if (handler != null) { - checkComplete(); + checkEnded(); drainHandler = handler; HttpClientStream s; if ((s = stream) == null) { @@ -247,7 +241,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http @Override public synchronized HttpClientRequest continueHandler(Handler handler) { if (handler != null) { - checkComplete(); + checkEnded(); } this.continueHandler = handler; return this; @@ -260,7 +254,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http @Override public synchronized HttpClientRequest sendHead(Handler headersHandler) { - checkComplete(); + checkEnded(); checkResponseHandler(); if (stream != null) { throw new IllegalStateException("Head already written"); @@ -272,14 +266,14 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http @Override public synchronized HttpClientRequest putHeader(CharSequence name, CharSequence value) { - checkComplete(); + checkEnded(); headers().set(name, value); return this; } @Override public synchronized HttpClientRequest putHeader(CharSequence name, Iterable values) { - checkComplete(); + checkEnded(); headers().set(name, values); return this; } @@ -291,34 +285,25 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } @Override - public boolean reset(long code) { + boolean reset(Throwable cause) { HttpClientStream s; synchronized (this) { if (reset != null) { return false; } - reset = code; - if (tryComplete()) { - if (completionHandler != null) { - completionHandler.handle(null); - } - } + reset = cause; s = stream; } if (s != null) { - s.reset(code); + s.reset(cause); + } else { + handleException(cause); } return true; } - private boolean tryComplete() { - if (!completed) { - completed = true; - drainHandler = null; - return true; - } else { - return false; - } + private void tryComplete() { + endPromise.tryComplete(); } @Override @@ -342,7 +327,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http public synchronized HttpClientRequest writeCustomFrame(int type, int flags, Buffer payload) { HttpClientStream s; synchronized (this) { - checkComplete(); + checkEnded(); if ((s = stream) == null) { throw new IllegalStateException("Not yet connected"); } @@ -354,7 +339,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http void handleDrained() { Handler handler; synchronized (this) { - if ((handler = drainHandler) == null) { + if ((handler = drainHandler) == null || endFuture.isComplete()) { return; } } @@ -378,9 +363,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http if (headers != null && next.headers == null) { next.headers().addAll(headers); } - Promise promise = Promise.promise(); - Future future = promise.future(); - future.setHandler(ar -> { + endFuture.setHandler(ar -> { if (ar.succeeded()) { if (timeoutMs > 0) { next.setTimeout(timeoutMs); @@ -390,26 +373,15 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http next.handleException(ar.cause()); } }); - if (exceptionOccurred != null) { - promise.fail(exceptionOccurred); - } - else if (completed) { - promise.complete(); - } else { - exceptionHandler(err -> { - if (!future.isComplete()) { - promise.fail(err); - } - }); - completionHandler = v -> { - if (!future.isComplete()) { - promise.complete(); - } - }; - } } - protected void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs) { + @Override + public void handleException(Throwable t) { + super.handleException(t); + endPromise.tryFail(t); + } + + void handleResponse(HttpClientResponse resp, long timeoutMs) { if (reset == null) { int statusCode = resp.statusCode(); if (followRedirects > 0 && statusCode >= 300 && statusCode < 400) { @@ -425,17 +397,11 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http return; } } - if (statusCode == 100) { - if (continueHandler != null) { - continueHandler.handle(null); - } - } else { - if (respHandler != null) { - respHandler.handle(resp); - } - if (endHandler != null) { - endHandler.handle(null); - } + if (respHandler != null) { + respHandler.handle(resp); + } + if (endHandler != null) { + endHandler.handle(null); } } } @@ -499,8 +465,8 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } // No need to synchronize as the thread is the same that set exceptionOccurred to true // exceptionOccurred=true getting the connection => it's a TimeoutException - if (exceptionOccurred != null || reset != null) { - stream.reset(0); + if (reset != null) { + stream.reset(reset); } else { ctx.executeFromIO(v -> { connected(headersHandler, stream); @@ -527,35 +493,25 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http stream.doSetWriteQueueMaxSize(pendingMaxSize); } + ByteBuf pending = null; + Handler> handler = null; if (pendingChunks != null) { List>> handlers = pendingHandlers; - ByteBuf pending = pendingChunks; - pendingChunks = null; pendingHandlers = null; - Handler> handler; + pending = pendingChunks; + pendingChunks = null; if (handlers != null) { handler = ar -> { handlers.forEach(h -> h.handle(ar)); }; - } else { - handler = null; - } - if (completed) { - // we also need to write the head so optimize this and write all out in once - stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true, priority, handler); - stream.endRequest(); - } else { - stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false, priority, handler); - } - } else { - if (completed) { - // we also need to write the head so optimize this and write all out in once - stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true, priority, null); - stream.endRequest(); - } else { - stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false, priority, null); } } + stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, ended, priority, continueHandler, handler); + if (ended) { + // we also need to write the head so optimize this and write all out in once + stream.endRequest(); + tryComplete(); + } this.connecting = false; this.stream = stream; } @@ -564,10 +520,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } } - private boolean contentLengthSet() { - return headers != null && headers().contains(CONTENT_LENGTH); - } - @Override public void end(String chunk) { end(chunk, (Handler>) null); @@ -644,26 +596,28 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http return this; } - private void write(ByteBuf buff, boolean end, Handler> h) { + private boolean requiresContentLength() { + return !chunked && (headers == null || !headers.contains(CONTENT_LENGTH)); + } + + private void write(ByteBuf buff, boolean end, Handler> completionHandler) { + if (buff == null && !end) { + return; + } HttpClientStream s; synchronized (this) { - checkComplete(); + checkEnded(); checkResponseHandler(); if (end) { - if (buff != null && !chunked && !contentLengthSet()) { + if (buff != null && requiresContentLength()) { headers().set(CONTENT_LENGTH, String.valueOf(buff.readableBytes())); } - } else { - if (!chunked && !contentLengthSet()) { - throw new IllegalStateException("You must set the Content-Length header to be the total size of the message " - + "body BEFORE sending any data if you are not using HTTP chunked encoding."); - } + } else if (requiresContentLength()) { + throw new IllegalStateException("You must set the Content-Length header to be the total size of the message " + + "body BEFORE sending any data if you are not using HTTP chunked encoding."); } - if (buff == null && !end) { - // nothing to write to the connection just return - return; - } - if ((s = stream) == null) { + ended |= end; + if (stream == null) { if (buff != null) { if (pendingChunks == null) { pendingChunks = buff; @@ -678,39 +632,27 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } pending.addComponent(true, buff); } - if (h != null) { + if (completionHandler != null) { if (pendingHandlers == null) { pendingHandlers = new ArrayList<>(); } - pendingHandlers.add(h); - } - } - if (end) { - tryComplete(); - if (completionHandler != null) { - completionHandler.handle(null); + pendingHandlers.add(completionHandler); } } connect(null); return; } + s = stream; } - s.writeBuffer(buff, end, h); + s.writeBuffer(buff, end, completionHandler); if (end) { - Handler handler; - synchronized (this) { - tryComplete(); - s.endRequest(); - if ((handler = completionHandler) == null) { - return; - } - } - handler.handle(null); + s.endRequest(); + tryComplete(); } } - protected void checkComplete() { - if (completed) { + protected void checkEnded() { + if (ended) { throw new IllegalStateException("Request already complete"); } } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java index 3fa2528e7..d5d6991fa 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java @@ -54,7 +54,7 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase { } @Override - protected void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs) { + void handleResponse(HttpClientResponse resp, long timeoutMs) { Handler handler; synchronized (this) { if ((handler = respHandler) == null) { @@ -64,10 +64,6 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase { handler.handle(resp); } - @Override - protected void checkComplete() { - } - @Override public synchronized HttpClientRequest handler(Handler handler) { respHandler = handler; @@ -85,11 +81,9 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase { } @Override - public boolean reset(long code) { - synchronized (conn) { - stream.reset(code); - return true; - } + boolean reset(Throwable cause) { + stream.reset(cause); + return true; } @Override diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java index 88d903e2f..145e61d20 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java @@ -92,12 +92,12 @@ public class HttpClientResponseImpl implements HttpClientResponse { @Override public String getHeader(String headerName) { - return headers().get(headerName); + return headers.get(headerName); } @Override public String getHeader(CharSequence headerName) { - return headers().get(headerName); + return headers.get(headerName); } @Override @@ -136,12 +136,12 @@ public class HttpClientResponseImpl implements HttpClientResponse { } @Override - public HttpClientResponse handler(Handler handle) { + public HttpClientResponse handler(Handler handler) { synchronized (conn) { - if (handle != null) { + if (handler != null) { checkEnded(); } - dataHandler = handle; + dataHandler = handler; return this; } } @@ -222,9 +222,9 @@ public class HttpClientResponseImpl implements HttpClientResponse { } void handleChunk(Buffer data) { + request.dataReceived(); Handler handler; synchronized (conn) { - request.dataReceived(); handler = dataHandler; } if (handler != null) { diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java index 9828123af..0ce6c31b5 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java @@ -43,7 +43,7 @@ interface HttpClientStream { HttpConnection connection(); Context getContext(); - void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler> handler); + void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler contHandler, Handler> handler); void writeBuffer(ByteBuf buf, boolean end, Handler> handler); void writeFrame(int type, int flags, ByteBuf payload); @@ -52,7 +52,7 @@ interface HttpClientStream { void doPause(); void doFetch(long amount); - void reset(long code); + void reset(Throwable cause); void beginRequest(HttpClientRequestImpl req); void endRequest(); diff --git a/src/test/java/io/vertx/core/http/Http1xTest.java b/src/test/java/io/vertx/core/http/Http1xTest.java index a96484a32..a669628f9 100644 --- a/src/test/java/io/vertx/core/http/Http1xTest.java +++ b/src/test/java/io/vertx/core/http/Http1xTest.java @@ -2883,12 +2883,8 @@ public class Http1xTest extends HttpTest { client = vertx.createHttpClient(createBaseClientOptions().setMaxPoolSize(1).setPipelining(true).setKeepAlive(true)); AtomicInteger connCount = new AtomicInteger(); client.connectionHandler(conn -> connCount.incrementAndGet()); - HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/first", resp -> { - fail(); - }); - // Force connect - req.sendHead(v -> {}); - req.reset(); + HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/first", resp -> fail()); + req.reset(0); CountDownLatch respLatch = new CountDownLatch(2); client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/second", resp -> { assertEquals(200, resp.statusCode()); @@ -3058,9 +3054,7 @@ public class Http1xTest extends HttpTest { // There might be a race between the request write and the request reset // so we do it on the context thread to avoid it vertx.runOnContext(v -> { - HttpClientRequest post = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> { - fail(); - }); + HttpClientRequest post = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> fail()); post.setChunked(true).write(TestUtils.randomBuffer(1024)); assertTrue(post.reset()); client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> { @@ -4475,12 +4469,13 @@ public class Http1xTest extends HttpTest { }).setChunked(true); CheckingSender sender = new CheckingSender(vertx.getOrCreateContext(), req); AtomicBoolean connected = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); req.exceptionHandler(err -> { assertTrue(connected.get()); Throwable failure = sender.close(); if (failure != null) { fail(failure); - } else if (err == ConnectionBase.CLOSED_EXCEPTION) { + } else if (done.compareAndSet(false, true)) { testComplete(); } }); diff --git a/src/test/java/io/vertx/core/http/Http2ClientTest.java b/src/test/java/io/vertx/core/http/Http2ClientTest.java index 81d3c2f3e..1dd707f6a 100644 --- a/src/test/java/io/vertx/core/http/Http2ClientTest.java +++ b/src/test/java/io/vertx/core/http/Http2ClientTest.java @@ -700,6 +700,7 @@ public class Http2ClientTest extends Http2TestBase { @Test public void testClientResetServerStreamDuringRequest() throws Exception { + waitFor(2); Promise bufReceived = Promise.promise(); server.requestHandler(req -> { req.handler(buf -> { @@ -719,13 +720,14 @@ public class Http2ClientTest extends Http2TestBase { }); req.response().closeHandler(v -> { assertEquals(10L, reset.get()); - testComplete(); + complete(); }); }); startServer(); - HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { - fail(); - }).setChunked(true).write(Buffer.buffer("hello")); + HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> fail()) + .exceptionHandler(err -> complete()) + .setChunked(true); + req.write(Buffer.buffer("hello")); bufReceived.future().setHandler(ar -> { req.reset(10); }); @@ -751,13 +753,12 @@ public class Http2ClientTest extends Http2TestBase { req.response().setChunked(true).write(Buffer.buffer("some-data")); }); startServer(); - HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath"); - req.handler(resp -> { - resp.exceptionHandler(this::fail); - req.reset(10); - assertIllegalStateException(() -> req.write(Buffer.buffer())); - assertIllegalStateException(req::end); - }).end(Buffer.buffer("hello")); + client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", + resp -> { + resp.request().reset(10); + assertIllegalStateException(() -> resp.request().write(Buffer.buffer())); + assertIllegalStateException(resp.request()::end); + }).end(Buffer.buffer("hello")); await(); } diff --git a/src/test/java/io/vertx/core/http/Http2Test.java b/src/test/java/io/vertx/core/http/Http2Test.java index 1d73603f2..35cd63541 100644 --- a/src/test/java/io/vertx/core/http/Http2Test.java +++ b/src/test/java/io/vertx/core/http/Http2Test.java @@ -247,28 +247,24 @@ public class Http2Test extends HttpTest { @Test public void testResetClientRequestNotYetSent() throws Exception { - waitFor(2); server.close(); server = vertx.createHttpServer(createBaseServerOptions().setInitialSettings(new Http2Settings().setMaxConcurrentStreams(1))); AtomicInteger numReq = new AtomicInteger(); server.requestHandler(req -> { - assertEquals(0, numReq.getAndIncrement()); - req.response().end(); - complete(); + fail(); }); startServer(testAddress); // There might be a race between the request write and the request reset // so we do it on the context thread to avoid it vertx.runOnContext(v -> { - HttpClientRequest post = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> { - fail(); + HttpClientRequest post = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> fail()); + post.exceptionHandler(err -> { + if (err instanceof StreamResetException) { + complete(); + } }); post.setChunked(true).write(TestUtils.randomBuffer(1024)); assertTrue(post.reset()); - client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> { - assertEquals(1, numReq.get()); - complete(); - }).end(); }); await(); } diff --git a/src/test/java/io/vertx/core/http/HttpTest.java b/src/test/java/io/vertx/core/http/HttpTest.java index 548ea8f8a..9b67b55aa 100644 --- a/src/test/java/io/vertx/core/http/HttpTest.java +++ b/src/test/java/io/vertx/core/http/HttpTest.java @@ -1255,11 +1255,14 @@ public abstract class HttpTest extends HttpTestBase { } @Test - public void testClientExceptionHandlerCalledWhenFailingToConnect() throws Exception { - client.request(HttpMethod.GET, testAddress, 9998, "255.255.255.255", DEFAULT_TEST_URI, resp -> fail("Connect should not be called")). - exceptionHandler(error -> testComplete()). - endHandler(done -> fail()). - end(); + public void testClientExceptionHandlerCalledWhenFailingToConnect() { + waitFor(1); + client.request(HttpMethod.GET, testAddress, 9998, "255.255.255.255", DEFAULT_TEST_URI, resp -> { + fail(); + }).exceptionHandler(error -> { + complete(); + }) + .end(); await(); } @@ -1446,6 +1449,27 @@ public abstract class HttpTest extends HttpTestBase { await(); } + @Test + public void testClientRequestExceptionHandlerCalledWhenRequestEnded() throws Exception { + waitFor(2); + server.requestHandler(req -> { + req.connection().close(); + }); + startServer(testAddress); + HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> + fail() + ); + req.exceptionHandler(err -> complete()); + req.end(); + try { + req.exceptionHandler(err -> fail()); + fail(); + } catch (Exception e) { + complete(); + } + await(); + } + @Test public void testDefaultStatus() { testStatusCode(-1, null); @@ -2326,19 +2350,17 @@ public abstract class HttpTest extends HttpTestBase { @Test public void testConnectInvalidPort() { - client.request(HttpMethod.GET, 9998, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> fail("Connect should not be called")). - exceptionHandler(t -> testComplete()). - end(); - + client.request(HttpMethod.GET, 9998, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> fail()) + .exceptionHandler(t -> complete()) + .end(); await(); } @Test public void testConnectInvalidHost() { - client.request(HttpMethod.GET, 9998, "255.255.255.255", DEFAULT_TEST_URI, resp -> fail("Connect should not be called")). - exceptionHandler(t -> testComplete()). - end(); - + client.request(HttpMethod.GET, 9998, "255.255.255.255", DEFAULT_TEST_URI, resp -> fail()) + .exceptionHandler(t -> complete()) + .end(); await(); } @@ -3217,6 +3239,7 @@ public abstract class HttpTest extends HttpTestBase { @Test public void testResponseDataTimeout() { + waitFor(2); Buffer expected = TestUtils.randomBuffer(1000); server.requestHandler(req -> { req.response().setChunked(true).write(expected); @@ -3224,6 +3247,14 @@ public abstract class HttpTest extends HttpTestBase { server.listen(testAddress, onSuccess(s -> { Buffer received = Buffer.buffer(); HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> { + AtomicInteger count = new AtomicInteger(); + resp.exceptionHandler(t -> { + if (count.getAndIncrement() == 0) { + assertTrue(t instanceof TimeoutException); + assertEquals(expected, received); + complete(); + } + }); resp.request().setTimeout(500); resp.handler(received::appendBuffer); }); @@ -3232,7 +3263,7 @@ public abstract class HttpTest extends HttpTestBase { if (count.getAndIncrement() == 0) { assertTrue(t instanceof TimeoutException); assertEquals(expected, received); - testComplete(); + complete(); } }); req.sendHead(); @@ -5261,26 +5292,90 @@ public abstract class HttpTest extends HttpTestBase { await(); } - /* @Test - public void testReset() throws Exception { - CountDownLatch latch = new CountDownLatch(1); + public void testResetClientRequestBeforeActualSend() throws Exception { server.requestHandler(req -> { - req.exceptionHandler(err -> { - System.out.println("GOT ERR"); - }); - req.endHandler(v -> { - System.out.println("GOT END"); - latch.countDown(); - }); }); - startServer(); - HttpClientRequest req = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> {}); - req.end(); - awaitLatch(latch); - req.reset(); - + startServer(testAddress); + Context ctx = vertx.getOrCreateContext(); + ctx.runOnContext(v -> { + HttpClientRequest req = client.request( + HttpMethod.GET, + testAddress, + new RequestOptions() + .setPort(DEFAULT_HTTP_PORT) + .setHost(DEFAULT_HTTP_HOST) + .setURI(DEFAULT_TEST_URI), resp -> { + fail(); + }); + req.exceptionHandler(err -> { + if (err instanceof StreamResetException) { + assertTrue(err instanceof StreamResetException); + testComplete(); + } + }); + req.sendHead(version -> fail()); + req.reset(); + }); + await(); + } + + @Test + public void testResetClientRequestInProgress() throws Exception { + waitFor(1); + server.requestHandler(req -> { + }); + startServer(testAddress); + Context ctx = vertx.getOrCreateContext(); + ctx.runOnContext(v -> { + HttpClientRequest req = client.request( + HttpMethod.GET, + testAddress, + new RequestOptions() + .setPort(DEFAULT_HTTP_PORT) + .setHost(DEFAULT_HTTP_HOST) + .setURI(DEFAULT_TEST_URI), resp -> fail()); + req.exceptionHandler(err -> { + if (err instanceof StreamResetException) { + assertTrue(err instanceof StreamResetException); + complete(); + } + }); + req.sendHead(version -> { + req.reset(0); + }); + }); + await(); + } + + @Test + public void testResetClientRequestAwaitingResponse() throws Exception { + CompletableFuture fut = new CompletableFuture<>(); + server.requestHandler(req -> { + fut.complete(null); + }); + startServer(testAddress); + Context ctx = vertx.getOrCreateContext(); + ctx.runOnContext(v -> { + HttpClientRequest req = client.request( + HttpMethod.GET, + testAddress, + new RequestOptions() + .setPort(DEFAULT_HTTP_PORT) + .setHost(DEFAULT_HTTP_HOST) + .setURI(DEFAULT_TEST_URI), resp -> fail()); + req.exceptionHandler(err -> { + if (err instanceof StreamResetException) { + testComplete(); + } + }); + req.end(); + fut.thenAccept(v2 -> { + ctx.runOnContext(v3 -> { + req.reset(0); + }); + }); + }); await(); } -*/ } diff --git a/src/test/java/io/vertx/test/core/TestUtils.java b/src/test/java/io/vertx/test/core/TestUtils.java index a4bc7fbc1..a9472342a 100644 --- a/src/test/java/io/vertx/test/core/TestUtils.java +++ b/src/test/java/io/vertx/test/core/TestUtils.java @@ -15,6 +15,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.util.NetUtil; import io.netty.util.internal.logging.InternalLoggerFactory; +import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.Http2Settings; import io.vertx.core.net.*; @@ -30,6 +31,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.function.Supplier; import java.util.zip.GZIPOutputStream; import static org.junit.Assert.assertTrue;