HttpClient improvements - this closes #3014, this closes #2982

This commit is contained in:
Julien Viet
2019-07-02 09:40:12 +02:00
parent 9b85c1cc79
commit 515ca71eb9
15 changed files with 351 additions and 338 deletions

View File

@@ -23,7 +23,7 @@ public class StreamResetException extends VertxException {
private final long code;
public StreamResetException(long code) {
super("Stream reset: " + code);
super("Stream reset: " + code, true);
this.code = code;
}

View File

@@ -148,21 +148,10 @@ class ConnectionManager {
}
if (endpoint.pool.getConnection(ar -> {
if (ar.succeeded()) {
HttpClientConnection conn = ar.result();
if (metrics != null) {
metrics.dequeueRequest(endpoint.metric, metric);
}
handler.handle(Future.succeededFuture(conn));
} else {
if (metrics != null) {
metrics.dequeueRequest(endpoint.metric, metric);
}
handler.handle(Future.failedFuture(ar.cause()));
if (metrics != null) {
metrics.dequeueRequest(endpoint.metric, metric);
}
handler.handle(ar);
})) {
break;
}

View File

@@ -191,11 +191,11 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
private final Promise<HttpClientStream> fut;
private final InboundBuffer<Object> queue;
private HttpClientRequestImpl request;
private Handler<Void> continueHandler;
private HttpClientResponseImpl response;
private boolean requestEnded;
private boolean responseEnded;
private boolean reset;
private MultiMap trailers;
private StreamImpl next;
private long bytesWritten;
private long bytesRead;
@@ -245,12 +245,13 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
@Override
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler<AsyncResult<Void>> handler) {
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler<Void> contHandler, Handler<AsyncResult<Void>> handler) {
HttpRequest request = createRequest(method, rawMethod, uri, headers);
prepareRequestHeaders(request, hostHeader, chunked);
if (buf != null) {
bytesWritten += buf.readableBytes();
}
continueHandler = contHandler;
sendRequest(request, buf, end, handler);
if (conn.responseInProgress == null) {
conn.responseInProgress = this;
@@ -357,22 +358,26 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
@Override
public void reset(long code) {
public void reset(Throwable cause) {
synchronized (conn) {
if (!reset) {
reset = true;
if (conn.requestInProgress == this) {
if (request == null) {
conn.requestInProgress = null;
conn.recycle();
} else {
conn.close();
}
} else if (!responseEnded) {
if (reset) {
return;
}
reset = true;
if (conn.requestInProgress == this) {
if (request == null) {
// Is that possible in practice ???
conn.handleRequestEnd(true);
} else {
conn.close();
}
} else if (!responseEnded) {
conn.close();
} else {
// ????
}
}
handleException(cause);
}
@Override
@@ -463,12 +468,12 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
}
}
queue.handler(buf -> {
if (buf == InboundBuffer.END_SENTINEL) {
queue.handler(item -> {
if (item instanceof MultiMap) {
conn.reportBytesRead(bytesRead);
response.handleEnd(trailers);
response.handleEnd((MultiMap) item);
} else {
response.handleChunk((Buffer) buf);
response.handleChunk((Buffer) item);
}
});
queue.drainHandler(v -> {
@@ -482,16 +487,10 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
private boolean endResponse(LastHttpContent trailer) {
synchronized (conn) {
if (conn.metrics != null) {
HttpClientRequestBase req = request;
if (req.exceptionOccurred != null) {
conn.metrics.requestReset(metric);
} else {
conn.metrics.responseEnd(metric, response);
}
conn.metrics.responseEnd(metric, response);
}
trailers = new HeadersAdaptor(trailer.trailingHeaders());
}
queue.write(InboundBuffer.END_SENTINEL);
queue.write(new HeadersAdaptor(trailer.trailingHeaders()));
synchronized (conn) {
responseEnded = true;
conn.close |= !conn.options.isKeepAlive();
@@ -512,7 +511,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
requestEnded = this.requestEnded;
}
if (request != null) {
if (response == null || response.statusCode() == 100) {
if (response == null) {
request.handleException(cause);
} else {
if (!requestEnded) {
@@ -582,27 +581,25 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
private void handleResponseBegin(HttpResponse resp) {
StreamImpl stream;
HttpClientResponseImpl response;
HttpClientRequestImpl request;
Exception err;
synchronized (this) {
stream = responseInProgress;
request = stream.request;
HttpClientResponseImpl r = null;
Exception t = null;
try {
r = stream.beginResponse(resp);
} catch (Exception e) {
t = e;
if (resp.status().code() == 100) {
Handler<Void> handler;
synchronized (this) {
StreamImpl stream = responseInProgress;
handler = stream.continueHandler;
}
if (handler != null) {
handler.handle(null);
}
response = r;
err = t;
}
if (response != null) {
request.handleResponse(response);
} else {
request.handleException(err);
StreamImpl stream;
HttpClientResponseImpl response;
HttpClientRequestImpl request;
synchronized (this) {
stream = responseInProgress;
request = stream.request;
response = stream.beginResponse(resp);
}
request.handleResponse(response);
}
}
@@ -622,8 +619,8 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
StreamImpl stream;
synchronized (this) {
stream = responseInProgress;
// We don't signal response end for a 100-continue response as a real response will follow
if (stream.response.statusCode() == 100) {
if (stream.response == null) {
// 100-continue
return;
}
responseInProgress = stream.next;
@@ -640,7 +637,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
requestInProgress = next;
}
if (recycle) {
checkLifecycle();
recycle();
}
if (next != null) {
next.fut.complete(next);

View File

@@ -95,13 +95,12 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
super.onStreamClosed(nettyStream);
}
void upgradeStream(HttpClientRequestImpl req, Object metric, Handler<AsyncResult<HttpClientStream>> completionHandler) {
void upgradeStream(Object metric, Handler<AsyncResult<HttpClientStream>> completionHandler) {
Future<HttpClientStream> fut;
synchronized (this) {
try {
Http2ClientStream stream = createStream(handler.connection().stream(1));
stream.metric = metric;
stream.beginRequest(req);
fut = Future.succeededFuture(stream);
} catch (Exception e) {
fut = Future.failedFuture(e);
@@ -203,6 +202,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
private HttpClientRequestBase request;
private HttpClientResponseImpl response;
private Handler<Void> continueHandler;
private boolean requestEnded;
private boolean responseEnded;
private Object metric;
@@ -244,11 +244,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
@Override
void handleEnd(MultiMap trailers) {
if (conn.metrics != null) {
if (request.exceptionOccurred != null) {
conn.metrics.requestReset(metric);
} else {
conn.metrics.responseEnd(metric, response);
}
conn.metrics.responseEnd(metric, response);
}
responseEnded = true;
// Should use a shared immutable object for CaseInsensitiveHeaders ?
@@ -320,9 +316,10 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
}
void handleHeaders(Http2Headers headers, StreamPriority streamPriority, boolean end) {
if(streamPriority != null)
if(streamPriority != null) {
priority(streamPriority);
if (response == null || response.statusCode() == 100) {
}
if (response == null) {
int status;
String statusMessage;
try {
@@ -333,9 +330,13 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
writeReset(0x01 /* PROTOCOL_ERROR */);
return;
}
if (status == 100) {
if (continueHandler != null) {
continueHandler.handle(null);
}
return;
}
headers.remove(":status");
response = new HttpClientResponseImpl(
request,
HttpVersion.HTTP_2,
@@ -376,7 +377,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
}
@Override
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end, StreamPriority priority, Handler<AsyncResult<Void>> handler) {
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end, StreamPriority priority, Handler<Void> contHandler, Handler<AsyncResult<Void>> handler) {
Http2Headers h = new DefaultHttp2Headers();
h.method(method != HttpMethod.OTHER ? method.name() : rawMethod);
if (method == HttpMethod.CONNECT) {
@@ -399,6 +400,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
if (conn.client.getOptions().isTryUseCompression() && h.get(HttpHeaderNames.ACCEPT_ENCODING) == null) {
h.set(HttpHeaderNames.ACCEPT_ENCODING, DEFLATE_GZIP);
}
continueHandler = contHandler;
if (conn.metrics != null) {
metric = conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request);
}
@@ -458,11 +460,14 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
}
@Override
public void reset(long code) {
public void reset(Throwable cause) {
long code = cause instanceof StreamResetException ? ((StreamResetException)cause).getCode() : 0;
if (request == null) {
// Not sure this is possible in practice
writeReset(code);
} else {
if (!(requestEnded && responseEnded)) {
handleException(cause);
requestEnded = true;
responseEnded = true;
writeReset(code);

View File

@@ -104,6 +104,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
ByteBuf buf,
boolean end,
StreamPriority priority,
Handler<Void> continueHandler,
Handler<AsyncResult<Void>> handler) {
ChannelPipeline pipeline = conn.channel().pipeline();
HttpClientCodec httpCodec = pipeline.get(HttpClientCodec.class);
@@ -138,10 +139,12 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
// Now we need to upgrade this to an HTTP2
ConnectionListener<HttpClientConnection> listener = conn.listener();
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = Http2ClientConnection.createHttp2ConnectionHandler(client, conn.endpointMetric(), listener, conn.getContext(), current.metric(), (conn, concurrency) -> {
conn.upgradeStream(request, stream.metric(), ar -> {
conn.upgradeStream(stream.metric(), ar -> {
UpgradingStream.this.conn.closeHandler(null);
UpgradingStream.this.conn.exceptionHandler(null);
if (ar.succeeded()) {
HttpClientStream upgradedStream = ar.result();
upgradedStream.beginRequest(request);
current = conn;
conn.closeHandler(closeHandler);
conn.exceptionHandler(exceptionHandler);
@@ -163,7 +166,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536);
pipeline.addAfter("codec", null, new UpgradeRequestHandler());
pipeline.addAfter("codec", null, upgradeHandler);
stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, priority, handler);
stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, priority, continueHandler, handler);
}
@Override
@@ -217,8 +220,8 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
}
@Override
public void reset(long code) {
stream.reset(code);
public void reset(Throwable cause) {
stream.reset(cause);
}
@Override

View File

@@ -11,15 +11,16 @@
package io.vertx.core.http.impl;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.core.Promise;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import java.util.concurrent.TimeoutException;
@@ -45,7 +46,7 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
private long lastDataReceived;
protected Throwable exceptionOccurred;
private boolean paused;
private HttpClientResponseImpl response;
private HttpClientResponse response;
HttpClientRequestBase(HttpClientImpl client, boolean ssl, HttpMethod method, SocketAddress server, String host, int port, String uri) {
this.client = client;
@@ -59,8 +60,8 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
this.ssl = ssl;
}
protected abstract void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs);
protected abstract void checkComplete();
protected void checkEnded() {
}
protected String hostHeader() {
if ((port == 80 && !ssl) || (port == 443 && ssl)) {
@@ -99,7 +100,7 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
@Override
public synchronized HttpClientRequest exceptionHandler(Handler<Throwable> handler) {
if (handler != null) {
checkComplete();
checkEnded();
this.exceptionHandler = handler;
} else {
this.exceptionHandler = null;
@@ -113,7 +114,7 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
@Override
public synchronized HttpClientRequest setTimeout(long timeoutMs) {
cancelOutstandingTimeoutTimer();
cancelTimeout();
currentTimeoutMs = timeoutMs;
currentTimeoutTimerId = client.getVertx().setTimer(timeoutMs, id -> handleTimeout(timeoutMs));
return this;
@@ -122,7 +123,7 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
public void handleException(Throwable t) {
Handler<Throwable> handler;
synchronized (this) {
cancelOutstandingTimeoutTimer();
cancelTimeout();
exceptionOccurred = t;
if (exceptionHandler != null) {
handler = exceptionHandler;
@@ -133,7 +134,7 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
handler.handle(t);
}
void handleResponse(HttpClientResponseImpl resp) {
void handleResponse(HttpClientResponse resp) {
synchronized (this) {
response = resp;
}
@@ -141,38 +142,30 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
}
private void checkHandleResponse() {
HttpClientResponseImpl resp;
long timeoutMS;
HttpClientResponse resp;
synchronized (this) {
if (response != null) {
if (paused) {
return;
}
timeoutMS = cancelTimeout();
resp = response;
response = null;
} else {
return;
}
}
doHandleResponse(resp);
}
private synchronized void doHandleResponse(HttpClientResponseImpl resp) {
long timeoutMS;
synchronized (this) {
// If an exception occurred (e.g. a timeout fired) we won't receive the response.
if (exceptionOccurred != null) {
return;
}
timeoutMS = cancelOutstandingTimeoutTimer();
}
try {
doHandleResponse(resp, timeoutMS);
handleResponse(resp, timeoutMS);
} catch (Throwable t) {
handleException(t);
}
}
private long cancelOutstandingTimeoutTimer() {
abstract void handleResponse(HttpClientResponse resp, long timeoutMs);
private synchronized long cancelTimeout() {
long ret;
if ((ret = currentTimeoutTimerId) != -1) {
client.getVertx().cancelTimer(currentTimeoutTimerId);
@@ -184,31 +177,25 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
}
private void handleTimeout(long timeoutMs) {
if (lastDataReceived == 0) {
timeout(timeoutMs);
} else {
long now = System.currentTimeMillis();
long timeSinceLastData = now - lastDataReceived;
if (timeSinceLastData >= timeoutMs) {
timeout(timeoutMs);
} else {
// reschedule
lastDataReceived = 0;
setTimeout(timeoutMs - timeSinceLastData);
synchronized (this) {
if (lastDataReceived > 0) {
long now = System.currentTimeMillis();
long timeSinceLastData = now - lastDataReceived;
if (timeSinceLastData < timeoutMs) {
// reschedule
lastDataReceived = 0;
setTimeout(timeoutMs - timeSinceLastData);
return;
}
}
}
}
private void timeout(long timeoutMs) {
String msg = "The timeout period of " + timeoutMs + "ms has been exceeded while executing " + method + " " + uri + " for server " + server;
// Use a stack-less exception
handleException(new TimeoutException(msg) {
reset(new TimeoutException(msg) {
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
});
reset(0);
}
synchronized void dataReceived() {
@@ -217,6 +204,13 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
}
}
@Override
public boolean reset(long code) {
return reset(new StreamResetException(code));
}
abstract boolean reset(Throwable cause);
@Override
public HttpClientRequest pause() {
paused = true;

View File

@@ -53,8 +53,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
static final Logger log = LoggerFactory.getLogger(HttpClientRequestImpl.class);
private final VertxInternal vertx;
private Handler<HttpClientResponse> respHandler;
private Handler<Void> endHandler;
private boolean chunked;
private String hostHeader;
private String rawMethod;
@@ -62,9 +60,11 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
private Handler<Void> drainHandler;
private Handler<HttpClientRequest> pushHandler;
private Handler<HttpConnection> connectionHandler;
private boolean completed;
private Handler<Void> completionHandler;
private Long reset;
private Handler<Throwable> exceptionHandler;
private Promise<Void> endPromise = Promise.promise();
private Future<Void> endFuture = endPromise.future();
private boolean ended;
private Throwable reset;
private ByteBuf pendingChunks;
private List<Handler<AsyncResult<Void>>> pendingHandlers;
private int pendingMaxSize = -1;
@@ -73,8 +73,8 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
private StreamPriority priority;
private HttpClientStream stream;
private boolean connecting;
// completed => drainHandler = null
private Handler<HttpClientResponse> respHandler;
private Handler<Void> endHandler;
HttpClientRequestImpl(HttpClientImpl client, boolean ssl, HttpMethod method, SocketAddress server,
String host, int port,
@@ -86,20 +86,14 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
@Override
public int streamId() {
HttpClientStream s;
synchronized (this) {
if ((s = stream) == null) {
return -1;
}
}
return s.id();
public synchronized int streamId() {
return stream == null ? -1 : stream.id();
}
@Override
public synchronized HttpClientRequest handler(Handler<HttpClientResponse> handler) {
if (handler != null) {
checkComplete();
checkEnded();
}
respHandler = handler;
return this;
@@ -108,7 +102,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
@Override
public HttpClientRequest setFollowRedirects(boolean followRedirects) {
synchronized (this) {
checkComplete();
checkEnded();
if (followRedirects) {
this.followRedirects = client.getOptions().getMaxRedirects() - 1;
} else {
@@ -122,7 +116,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
public HttpClientRequest endHandler(Handler<Void> handler) {
synchronized (this) {
if (handler != null) {
checkComplete();
checkEnded();
}
endHandler = handler;
return this;
@@ -132,7 +126,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
@Override
public HttpClientRequestImpl setChunked(boolean chunked) {
synchronized (this) {
checkComplete();
checkEnded();
if (stream != null) {
throw new IllegalStateException("Cannot set chunked after data has been written on request");
}
@@ -181,14 +175,14 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
@Override
public synchronized HttpClientRequest putHeader(String name, String value) {
checkComplete();
checkEnded();
headers().set(name, value);
return this;
}
@Override
public synchronized HttpClientRequest putHeader(String name, Iterable<String> values) {
checkComplete();
checkEnded();
headers().set(name, values);
return this;
}
@@ -197,7 +191,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
public HttpClientRequest setWriteQueueMaxSize(int maxSize) {
HttpClientStream s;
synchronized (this) {
checkComplete();
checkEnded();
if ((s = stream) == null) {
pendingMaxSize = maxSize;
return this;
@@ -211,7 +205,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
public boolean writeQueueFull() {
HttpClientStream s;
synchronized (this) {
checkComplete();
checkEnded();
if ((s = stream) == null) {
// Should actually check with max queue size and not always blindly return false
return false;
@@ -224,7 +218,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
public HttpClientRequest drainHandler(Handler<Void> handler) {
synchronized (this) {
if (handler != null) {
checkComplete();
checkEnded();
drainHandler = handler;
HttpClientStream s;
if ((s = stream) == null) {
@@ -247,7 +241,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
@Override
public synchronized HttpClientRequest continueHandler(Handler<Void> handler) {
if (handler != null) {
checkComplete();
checkEnded();
}
this.continueHandler = handler;
return this;
@@ -260,7 +254,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
@Override
public synchronized HttpClientRequest sendHead(Handler<HttpVersion> headersHandler) {
checkComplete();
checkEnded();
checkResponseHandler();
if (stream != null) {
throw new IllegalStateException("Head already written");
@@ -272,14 +266,14 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
@Override
public synchronized HttpClientRequest putHeader(CharSequence name, CharSequence value) {
checkComplete();
checkEnded();
headers().set(name, value);
return this;
}
@Override
public synchronized HttpClientRequest putHeader(CharSequence name, Iterable<CharSequence> values) {
checkComplete();
checkEnded();
headers().set(name, values);
return this;
}
@@ -291,34 +285,25 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
@Override
public boolean reset(long code) {
boolean reset(Throwable cause) {
HttpClientStream s;
synchronized (this) {
if (reset != null) {
return false;
}
reset = code;
if (tryComplete()) {
if (completionHandler != null) {
completionHandler.handle(null);
}
}
reset = cause;
s = stream;
}
if (s != null) {
s.reset(code);
s.reset(cause);
} else {
handleException(cause);
}
return true;
}
private boolean tryComplete() {
if (!completed) {
completed = true;
drainHandler = null;
return true;
} else {
return false;
}
private void tryComplete() {
endPromise.tryComplete();
}
@Override
@@ -342,7 +327,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
public synchronized HttpClientRequest writeCustomFrame(int type, int flags, Buffer payload) {
HttpClientStream s;
synchronized (this) {
checkComplete();
checkEnded();
if ((s = stream) == null) {
throw new IllegalStateException("Not yet connected");
}
@@ -354,7 +339,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
void handleDrained() {
Handler<Void> handler;
synchronized (this) {
if ((handler = drainHandler) == null) {
if ((handler = drainHandler) == null || endFuture.isComplete()) {
return;
}
}
@@ -378,9 +363,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
if (headers != null && next.headers == null) {
next.headers().addAll(headers);
}
Promise<Void> promise = Promise.promise();
Future<Void> future = promise.future();
future.setHandler(ar -> {
endFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (timeoutMs > 0) {
next.setTimeout(timeoutMs);
@@ -390,26 +373,15 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
next.handleException(ar.cause());
}
});
if (exceptionOccurred != null) {
promise.fail(exceptionOccurred);
}
else if (completed) {
promise.complete();
} else {
exceptionHandler(err -> {
if (!future.isComplete()) {
promise.fail(err);
}
});
completionHandler = v -> {
if (!future.isComplete()) {
promise.complete();
}
};
}
}
protected void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs) {
@Override
public void handleException(Throwable t) {
super.handleException(t);
endPromise.tryFail(t);
}
void handleResponse(HttpClientResponse resp, long timeoutMs) {
if (reset == null) {
int statusCode = resp.statusCode();
if (followRedirects > 0 && statusCode >= 300 && statusCode < 400) {
@@ -425,17 +397,11 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
return;
}
}
if (statusCode == 100) {
if (continueHandler != null) {
continueHandler.handle(null);
}
} else {
if (respHandler != null) {
respHandler.handle(resp);
}
if (endHandler != null) {
endHandler.handle(null);
}
if (respHandler != null) {
respHandler.handle(resp);
}
if (endHandler != null) {
endHandler.handle(null);
}
}
}
@@ -499,8 +465,8 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
// No need to synchronize as the thread is the same that set exceptionOccurred to true
// exceptionOccurred=true getting the connection => it's a TimeoutException
if (exceptionOccurred != null || reset != null) {
stream.reset(0);
if (reset != null) {
stream.reset(reset);
} else {
ctx.executeFromIO(v -> {
connected(headersHandler, stream);
@@ -527,35 +493,25 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
stream.doSetWriteQueueMaxSize(pendingMaxSize);
}
ByteBuf pending = null;
Handler<AsyncResult<Void>> handler = null;
if (pendingChunks != null) {
List<Handler<AsyncResult<Void>>> handlers = pendingHandlers;
ByteBuf pending = pendingChunks;
pendingChunks = null;
pendingHandlers = null;
Handler<AsyncResult<Void>> handler;
pending = pendingChunks;
pendingChunks = null;
if (handlers != null) {
handler = ar -> {
handlers.forEach(h -> h.handle(ar));
};
} else {
handler = null;
}
if (completed) {
// we also need to write the head so optimize this and write all out in once
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true, priority, handler);
stream.endRequest();
} else {
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false, priority, handler);
}
} else {
if (completed) {
// we also need to write the head so optimize this and write all out in once
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true, priority, null);
stream.endRequest();
} else {
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false, priority, null);
}
}
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, ended, priority, continueHandler, handler);
if (ended) {
// we also need to write the head so optimize this and write all out in once
stream.endRequest();
tryComplete();
}
this.connecting = false;
this.stream = stream;
}
@@ -564,10 +520,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
}
private boolean contentLengthSet() {
return headers != null && headers().contains(CONTENT_LENGTH);
}
@Override
public void end(String chunk) {
end(chunk, (Handler<AsyncResult<Void>>) null);
@@ -644,26 +596,28 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
return this;
}
private void write(ByteBuf buff, boolean end, Handler<AsyncResult<Void>> h) {
private boolean requiresContentLength() {
return !chunked && (headers == null || !headers.contains(CONTENT_LENGTH));
}
private void write(ByteBuf buff, boolean end, Handler<AsyncResult<Void>> completionHandler) {
if (buff == null && !end) {
return;
}
HttpClientStream s;
synchronized (this) {
checkComplete();
checkEnded();
checkResponseHandler();
if (end) {
if (buff != null && !chunked && !contentLengthSet()) {
if (buff != null && requiresContentLength()) {
headers().set(CONTENT_LENGTH, String.valueOf(buff.readableBytes()));
}
} else {
if (!chunked && !contentLengthSet()) {
throw new IllegalStateException("You must set the Content-Length header to be the total size of the message "
+ "body BEFORE sending any data if you are not using HTTP chunked encoding.");
}
} else if (requiresContentLength()) {
throw new IllegalStateException("You must set the Content-Length header to be the total size of the message "
+ "body BEFORE sending any data if you are not using HTTP chunked encoding.");
}
if (buff == null && !end) {
// nothing to write to the connection just return
return;
}
if ((s = stream) == null) {
ended |= end;
if (stream == null) {
if (buff != null) {
if (pendingChunks == null) {
pendingChunks = buff;
@@ -678,39 +632,27 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
pending.addComponent(true, buff);
}
if (h != null) {
if (completionHandler != null) {
if (pendingHandlers == null) {
pendingHandlers = new ArrayList<>();
}
pendingHandlers.add(h);
}
}
if (end) {
tryComplete();
if (completionHandler != null) {
completionHandler.handle(null);
pendingHandlers.add(completionHandler);
}
}
connect(null);
return;
}
s = stream;
}
s.writeBuffer(buff, end, h);
s.writeBuffer(buff, end, completionHandler);
if (end) {
Handler<Void> handler;
synchronized (this) {
tryComplete();
s.endRequest();
if ((handler = completionHandler) == null) {
return;
}
}
handler.handle(null);
s.endRequest();
tryComplete();
}
}
protected void checkComplete() {
if (completed) {
protected void checkEnded() {
if (ended) {
throw new IllegalStateException("Request already complete");
}
}

View File

@@ -54,7 +54,7 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase {
}
@Override
protected void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs) {
void handleResponse(HttpClientResponse resp, long timeoutMs) {
Handler<HttpClientResponse> handler;
synchronized (this) {
if ((handler = respHandler) == null) {
@@ -64,10 +64,6 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase {
handler.handle(resp);
}
@Override
protected void checkComplete() {
}
@Override
public synchronized HttpClientRequest handler(Handler<HttpClientResponse> handler) {
respHandler = handler;
@@ -85,11 +81,9 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase {
}
@Override
public boolean reset(long code) {
synchronized (conn) {
stream.reset(code);
return true;
}
boolean reset(Throwable cause) {
stream.reset(cause);
return true;
}
@Override

View File

@@ -92,12 +92,12 @@ public class HttpClientResponseImpl implements HttpClientResponse {
@Override
public String getHeader(String headerName) {
return headers().get(headerName);
return headers.get(headerName);
}
@Override
public String getHeader(CharSequence headerName) {
return headers().get(headerName);
return headers.get(headerName);
}
@Override
@@ -136,12 +136,12 @@ public class HttpClientResponseImpl implements HttpClientResponse {
}
@Override
public HttpClientResponse handler(Handler<Buffer> handle) {
public HttpClientResponse handler(Handler<Buffer> handler) {
synchronized (conn) {
if (handle != null) {
if (handler != null) {
checkEnded();
}
dataHandler = handle;
dataHandler = handler;
return this;
}
}
@@ -222,9 +222,9 @@ public class HttpClientResponseImpl implements HttpClientResponse {
}
void handleChunk(Buffer data) {
request.dataReceived();
Handler<Buffer> handler;
synchronized (conn) {
request.dataReceived();
handler = dataHandler;
}
if (handler != null) {

View File

@@ -43,7 +43,7 @@ interface HttpClientStream {
HttpConnection connection();
Context getContext();
void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler<AsyncResult<Void>> handler);
void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler<Void> contHandler, Handler<AsyncResult<Void>> handler);
void writeBuffer(ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler);
void writeFrame(int type, int flags, ByteBuf payload);
@@ -52,7 +52,7 @@ interface HttpClientStream {
void doPause();
void doFetch(long amount);
void reset(long code);
void reset(Throwable cause);
void beginRequest(HttpClientRequestImpl req);
void endRequest();

View File

@@ -2883,12 +2883,8 @@ public class Http1xTest extends HttpTest {
client = vertx.createHttpClient(createBaseClientOptions().setMaxPoolSize(1).setPipelining(true).setKeepAlive(true));
AtomicInteger connCount = new AtomicInteger();
client.connectionHandler(conn -> connCount.incrementAndGet());
HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/first", resp -> {
fail();
});
// Force connect
req.sendHead(v -> {});
req.reset();
HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/first", resp -> fail());
req.reset(0);
CountDownLatch respLatch = new CountDownLatch(2);
client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/second", resp -> {
assertEquals(200, resp.statusCode());
@@ -3058,9 +3054,7 @@ public class Http1xTest extends HttpTest {
// There might be a race between the request write and the request reset
// so we do it on the context thread to avoid it
vertx.runOnContext(v -> {
HttpClientRequest post = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
fail();
});
HttpClientRequest post = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> fail());
post.setChunked(true).write(TestUtils.randomBuffer(1024));
assertTrue(post.reset());
client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
@@ -4475,12 +4469,13 @@ public class Http1xTest extends HttpTest {
}).setChunked(true);
CheckingSender sender = new CheckingSender(vertx.getOrCreateContext(), req);
AtomicBoolean connected = new AtomicBoolean();
AtomicBoolean done = new AtomicBoolean();
req.exceptionHandler(err -> {
assertTrue(connected.get());
Throwable failure = sender.close();
if (failure != null) {
fail(failure);
} else if (err == ConnectionBase.CLOSED_EXCEPTION) {
} else if (done.compareAndSet(false, true)) {
testComplete();
}
});

View File

@@ -700,6 +700,7 @@ public class Http2ClientTest extends Http2TestBase {
@Test
public void testClientResetServerStreamDuringRequest() throws Exception {
waitFor(2);
Promise<Void> bufReceived = Promise.promise();
server.requestHandler(req -> {
req.handler(buf -> {
@@ -719,13 +720,14 @@ public class Http2ClientTest extends Http2TestBase {
});
req.response().closeHandler(v -> {
assertEquals(10L, reset.get());
testComplete();
complete();
});
});
startServer();
HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
fail();
}).setChunked(true).write(Buffer.buffer("hello"));
HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> fail())
.exceptionHandler(err -> complete())
.setChunked(true);
req.write(Buffer.buffer("hello"));
bufReceived.future().setHandler(ar -> {
req.reset(10);
});
@@ -751,13 +753,12 @@ public class Http2ClientTest extends Http2TestBase {
req.response().setChunked(true).write(Buffer.buffer("some-data"));
});
startServer();
HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath");
req.handler(resp -> {
resp.exceptionHandler(this::fail);
req.reset(10);
assertIllegalStateException(() -> req.write(Buffer.buffer()));
assertIllegalStateException(req::end);
}).end(Buffer.buffer("hello"));
client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath",
resp -> {
resp.request().reset(10);
assertIllegalStateException(() -> resp.request().write(Buffer.buffer()));
assertIllegalStateException(resp.request()::end);
}).end(Buffer.buffer("hello"));
await();
}

View File

@@ -247,28 +247,24 @@ public class Http2Test extends HttpTest {
@Test
public void testResetClientRequestNotYetSent() throws Exception {
waitFor(2);
server.close();
server = vertx.createHttpServer(createBaseServerOptions().setInitialSettings(new Http2Settings().setMaxConcurrentStreams(1)));
AtomicInteger numReq = new AtomicInteger();
server.requestHandler(req -> {
assertEquals(0, numReq.getAndIncrement());
req.response().end();
complete();
fail();
});
startServer(testAddress);
// There might be a race between the request write and the request reset
// so we do it on the context thread to avoid it
vertx.runOnContext(v -> {
HttpClientRequest post = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
fail();
HttpClientRequest post = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> fail());
post.exceptionHandler(err -> {
if (err instanceof StreamResetException) {
complete();
}
});
post.setChunked(true).write(TestUtils.randomBuffer(1024));
assertTrue(post.reset());
client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
assertEquals(1, numReq.get());
complete();
}).end();
});
await();
}

View File

@@ -1255,11 +1255,14 @@ public abstract class HttpTest extends HttpTestBase {
}
@Test
public void testClientExceptionHandlerCalledWhenFailingToConnect() throws Exception {
client.request(HttpMethod.GET, testAddress, 9998, "255.255.255.255", DEFAULT_TEST_URI, resp -> fail("Connect should not be called")).
exceptionHandler(error -> testComplete()).
endHandler(done -> fail()).
end();
public void testClientExceptionHandlerCalledWhenFailingToConnect() {
waitFor(1);
client.request(HttpMethod.GET, testAddress, 9998, "255.255.255.255", DEFAULT_TEST_URI, resp -> {
fail();
}).exceptionHandler(error -> {
complete();
})
.end();
await();
}
@@ -1446,6 +1449,27 @@ public abstract class HttpTest extends HttpTestBase {
await();
}
@Test
public void testClientRequestExceptionHandlerCalledWhenRequestEnded() throws Exception {
waitFor(2);
server.requestHandler(req -> {
req.connection().close();
});
startServer(testAddress);
HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp ->
fail()
);
req.exceptionHandler(err -> complete());
req.end();
try {
req.exceptionHandler(err -> fail());
fail();
} catch (Exception e) {
complete();
}
await();
}
@Test
public void testDefaultStatus() {
testStatusCode(-1, null);
@@ -2326,19 +2350,17 @@ public abstract class HttpTest extends HttpTestBase {
@Test
public void testConnectInvalidPort() {
client.request(HttpMethod.GET, 9998, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> fail("Connect should not be called")).
exceptionHandler(t -> testComplete()).
end();
client.request(HttpMethod.GET, 9998, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> fail())
.exceptionHandler(t -> complete())
.end();
await();
}
@Test
public void testConnectInvalidHost() {
client.request(HttpMethod.GET, 9998, "255.255.255.255", DEFAULT_TEST_URI, resp -> fail("Connect should not be called")).
exceptionHandler(t -> testComplete()).
end();
client.request(HttpMethod.GET, 9998, "255.255.255.255", DEFAULT_TEST_URI, resp -> fail())
.exceptionHandler(t -> complete())
.end();
await();
}
@@ -3217,6 +3239,7 @@ public abstract class HttpTest extends HttpTestBase {
@Test
public void testResponseDataTimeout() {
waitFor(2);
Buffer expected = TestUtils.randomBuffer(1000);
server.requestHandler(req -> {
req.response().setChunked(true).write(expected);
@@ -3224,6 +3247,14 @@ public abstract class HttpTest extends HttpTestBase {
server.listen(testAddress, onSuccess(s -> {
Buffer received = Buffer.buffer();
HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
AtomicInteger count = new AtomicInteger();
resp.exceptionHandler(t -> {
if (count.getAndIncrement() == 0) {
assertTrue(t instanceof TimeoutException);
assertEquals(expected, received);
complete();
}
});
resp.request().setTimeout(500);
resp.handler(received::appendBuffer);
});
@@ -3232,7 +3263,7 @@ public abstract class HttpTest extends HttpTestBase {
if (count.getAndIncrement() == 0) {
assertTrue(t instanceof TimeoutException);
assertEquals(expected, received);
testComplete();
complete();
}
});
req.sendHead();
@@ -5261,26 +5292,90 @@ public abstract class HttpTest extends HttpTestBase {
await();
}
/*
@Test
public void testReset() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
public void testResetClientRequestBeforeActualSend() throws Exception {
server.requestHandler(req -> {
req.exceptionHandler(err -> {
System.out.println("GOT ERR");
});
req.endHandler(v -> {
System.out.println("GOT END");
latch.countDown();
});
});
startServer();
HttpClientRequest req = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> {});
req.end();
awaitLatch(latch);
req.reset();
startServer(testAddress);
Context ctx = vertx.getOrCreateContext();
ctx.runOnContext(v -> {
HttpClientRequest req = client.request(
HttpMethod.GET,
testAddress,
new RequestOptions()
.setPort(DEFAULT_HTTP_PORT)
.setHost(DEFAULT_HTTP_HOST)
.setURI(DEFAULT_TEST_URI), resp -> {
fail();
});
req.exceptionHandler(err -> {
if (err instanceof StreamResetException) {
assertTrue(err instanceof StreamResetException);
testComplete();
}
});
req.sendHead(version -> fail());
req.reset();
});
await();
}
@Test
public void testResetClientRequestInProgress() throws Exception {
waitFor(1);
server.requestHandler(req -> {
});
startServer(testAddress);
Context ctx = vertx.getOrCreateContext();
ctx.runOnContext(v -> {
HttpClientRequest req = client.request(
HttpMethod.GET,
testAddress,
new RequestOptions()
.setPort(DEFAULT_HTTP_PORT)
.setHost(DEFAULT_HTTP_HOST)
.setURI(DEFAULT_TEST_URI), resp -> fail());
req.exceptionHandler(err -> {
if (err instanceof StreamResetException) {
assertTrue(err instanceof StreamResetException);
complete();
}
});
req.sendHead(version -> {
req.reset(0);
});
});
await();
}
@Test
public void testResetClientRequestAwaitingResponse() throws Exception {
CompletableFuture<Void> fut = new CompletableFuture<>();
server.requestHandler(req -> {
fut.complete(null);
});
startServer(testAddress);
Context ctx = vertx.getOrCreateContext();
ctx.runOnContext(v -> {
HttpClientRequest req = client.request(
HttpMethod.GET,
testAddress,
new RequestOptions()
.setPort(DEFAULT_HTTP_PORT)
.setHost(DEFAULT_HTTP_HOST)
.setURI(DEFAULT_TEST_URI), resp -> fail());
req.exceptionHandler(err -> {
if (err instanceof StreamResetException) {
testComplete();
}
});
req.end();
fut.thenAccept(v2 -> {
ctx.runOnContext(v3 -> {
req.reset(0);
});
});
});
await();
}
*/
}

View File

@@ -15,6 +15,7 @@ import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.util.NetUtil;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.Http2Settings;
import io.vertx.core.net.*;
@@ -30,6 +31,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;
import static org.junit.Assert.assertTrue;