diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 8b5e77256..cc2866f30 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -203,6 +203,9 @@ class Http1xClientConnection extends Http1xConnectionBase impleme private MultiMap trailers; private StreamImpl next; private Object trace; + private long bytesWritten; + private long bytesRead; + private Object metric; StreamImpl(ContextInternal context, Http1xClientConnection conn, int id, Handler> handler) { this.context = context; @@ -222,21 +225,16 @@ class Http1xClientConnection extends Http1xConnectionBase impleme c.next = s; } - @Override - public void reportBytesWritten(long numberOfBytes) { - conn.reportBytesWritten(numberOfBytes); - } - - @Override - public void reportBytesRead(long numberOfBytes) { - conn.reportBytesRead(numberOfBytes); - } - @Override public int id() { return id; } + @Override + public Object metric() { + return metric; + } + @Override public HttpVersion version() { return conn.version; @@ -263,6 +261,9 @@ class Http1xClientConnection extends Http1xConnectionBase impleme conn.responseInProgress.append(this); } next = null; + if (buf != null) { + bytesWritten += buf.readableBytes(); + } } private HttpRequest createRequest(HttpMethod method, String rawMethod, String uri, MultiMap headers) { @@ -313,6 +314,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } private boolean handleChunk(Buffer buff) { + bytesRead += buff.length(); return queue.write(buff); } @@ -321,7 +323,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme if (buff == null && !end) { return; } - HttpObject msg; + HttpContent msg; if (end) { if (buff != null && buff.isReadable()) { msg = new DefaultLastHttpContent(buff, false); @@ -331,6 +333,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } else { msg = new DefaultHttpContent(buff); } + bytesWritten += msg.content().readableBytes(); conn.writeToChannel(msg, conn.toPromise(handler)); } @@ -389,8 +392,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } request = req; if (conn.metrics != null) { - Object reqMetric = conn.metrics.requestBegin(conn.endpointMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request); - request.metric(reqMetric); + metric = conn.metrics.requestBegin(conn.endpointMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request); } VertxTracer tracer = context.tracer(); if (tracer != null) { @@ -415,10 +417,11 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } requestEnded = true; if (conn.metrics != null) { - conn.metrics.requestEnd(request.metric()); + conn.metrics.requestEnd(metric); } doRecycle = responseEnded; } + conn.reportBytesWritten(bytesWritten); conn.handleRequestEnd(doRecycle); } @@ -450,7 +453,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } response = new HttpClientResponseImpl(request, version, this, resp.status().code(), resp.status().reasonPhrase(), new HeadersAdaptor(resp.headers())); if (conn.metrics != null) { - conn.metrics.responseBegin(request.metric(), response); + conn.metrics.responseBegin(metric, response); } if (resp.status().code() != 100 && request.method() != io.vertx.core.http.HttpMethod.CONNECT) { // See https://tools.ietf.org/html/rfc7230#section-6.3 @@ -476,6 +479,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } queue.handler(buf -> { if (buf == InboundBuffer.END_SENTINEL) { + conn.reportBytesRead(bytesRead); response.handleEnd(trailers); } else { response.handleChunk((Buffer) buf); @@ -493,11 +497,10 @@ class Http1xClientConnection extends Http1xConnectionBase impleme synchronized (conn) { HttpClientRequestImpl req = request; if (conn.metrics != null) { - Object reqMetric = req.metric(); if (req.exceptionOccurred != null) { - conn.metrics.requestReset(reqMetric); + conn.metrics.requestReset(metric); } else { - conn.metrics.responseEnd(reqMetric, response); + conn.metrics.responseEnd(metric, response); } } VertxTracer tracer = context.tracer(); @@ -797,7 +800,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme ws = this.ws; for (StreamImpl r = responseInProgress;r != null;r = r.next) { if (metrics != null) { - metrics.requestReset(r.request.metric()); + metrics.requestReset(r.metric); } if (tracer != null) { tracer.receiveResponse(r.context, null, r.trace, ConnectionBase.CLOSED_EXCEPTION, TagExtractor.empty()); diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java index b65248ae6..0e98824e7 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java @@ -44,10 +44,10 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon private final ConnectionListener listener; private final HttpClientImpl client; - final HttpClientMetrics metrics; - final Object queueMetric; + private final HttpClientMetrics metrics; + private final Object queueMetric; - public Http2ClientConnection(ConnectionListener listener, + Http2ClientConnection(ConnectionListener listener, Object queueMetric, HttpClientImpl client, ContextInternal context, @@ -97,11 +97,12 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon super.onStreamClosed(nettyStream); } - void upgradeStream(HttpClientRequestImpl req, ContextInternal context, Handler> completionHandler) { + void upgradeStream(HttpClientRequestImpl req, Object metric, ContextInternal context, Handler> completionHandler) { Future fut; synchronized (this) { try { Http2ClientStream stream = createStream(context, handler.connection().stream(1)); + stream.metric = metric; stream.beginRequest(req); fut = Future.succeededFuture(stream); } catch (Exception e) { @@ -189,7 +190,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon } HttpClientRequestPushPromise pushReq = new HttpClientRequestPushPromise(this, promisedStream, client, isSsl(), method, rawMethod, uri, host, port, headersMap); if (metrics != null) { - pushReq.metric(metrics.responsePushed(queueMetric, metric(), localAddress(), remoteAddress(), pushReq)); + pushReq.getStream().metric = metrics.responsePushed(queueMetric, metric(), localAddress(), remoteAddress(), pushReq); } streams.put(promisedStreamId, pushReq.getStream()); stream.context.dispatch(pushReq, pushHandler); @@ -206,6 +207,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon private boolean requestEnded; private boolean responseEnded; private Object trace; + private Object metric; Http2ClientStream(Http2ClientConnection conn, ContextInternal context, Http2Stream stream, boolean writable) { super(conn, context, stream, writable); @@ -236,13 +238,18 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon return super.id(); } + @Override + public Object metric() { + return metric; + } + @Override void handleEnd(MultiMap trailers) { if (conn.metrics != null) { if (request.exceptionOccurred != null) { - conn.metrics.requestReset(request.metric()); + conn.metrics.requestReset(metric); } else { - conn.metrics.responseEnd(request.metric(), response); + conn.metrics.responseEnd(metric, response); } } responseEnded = true; @@ -266,7 +273,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon } responseEnded = true; if (conn.metrics != null) { - conn.metrics.requestReset(request.metric()); + conn.metrics.requestReset(metric); } } handleException(new StreamResetException(errorCode)); @@ -274,6 +281,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon @Override void handleClose() { + super.handleClose(); VertxTracer tracer = context.tracer(); if (tracer != null) { Throwable failure; @@ -295,7 +303,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon if (!responseEnded) { responseEnded = true; if (conn.metrics != null) { - conn.metrics.requestReset(request.metric()); + conn.metrics.requestReset(metric); } handleException(CLOSED_EXCEPTION); } @@ -350,7 +358,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon new Http2HeadersAdaptor(headers) ); if (conn.metrics != null) { - conn.metrics.responseBegin(request.metric(), response); + conn.metrics.responseBegin(metric, response); } request.handleResponse(response); if (end) { @@ -405,7 +413,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon h.set(HttpHeaderNames.ACCEPT_ENCODING, DEFLATE_GZIP); } if (conn.metrics != null) { - request.metric(conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request)); + metric = conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request); } priority(priority); if (content != null) { @@ -435,16 +443,6 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon super.writeFrame(type, flags, payload); } - @Override - public void reportBytesWritten(long numberOfBytes) { - conn.reportBytesWritten(numberOfBytes); - } - - @Override - public void reportBytesRead(long numberOfBytes) { - conn.reportBytesRead(numberOfBytes); - } - @Override public ContextInternal getContext() { return context; @@ -472,7 +470,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon @Override public void endRequest() { if (conn.metrics != null) { - conn.metrics.requestEnd(request.metric()); + conn.metrics.requestEnd(metric); } requestEnded = true; } @@ -487,7 +485,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon responseEnded = true; writeReset(code); if (conn.metrics != null) { - conn.metrics.requestReset(request.metric()); + conn.metrics.requestReset(metric); } } } diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java index 9a61324ca..c23251a44 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java @@ -114,6 +114,7 @@ public class Http2ServerConnection extends Http2ConnectionBase { return; } Http2ServerRequestImpl req = createRequest(streamId, headers, endOfStream); + req.registerMetrics(); req.priority(new StreamPriority() .setDependency(streamDependency) .setWeight(weight) @@ -199,12 +200,8 @@ public class Http2ServerConnection extends Http2ConnectionBase { super.updateSettings(settingsUpdate, completionHandler); } - private class Push extends VertxHttp2Stream { + private class Push extends Http2ServerStream { - private final HttpMethod method; - private final String uri; - private final String contentEncoding; - private Http2ServerResponseImpl response; private final Promise completionHandler; public Push(Http2Stream stream, @@ -214,12 +211,9 @@ public class Http2ServerConnection extends Http2ConnectionBase { String uri, boolean writable, Handler> completionHandler) { - super(Http2ServerConnection.this, context, stream, writable); + super(Http2ServerConnection.this, context, stream, contentEncoding, method, uri, writable); Promise promise = Promise.promise(); - promise.future().setHandler(completionHandler); - this.method = method; - this.uri = uri; - this.contentEncoding = contentEncoding; + promise.future().setHandler(ar -> context.dispatch(ar, completionHandler)); this.completionHandler = promise; } @@ -244,14 +238,8 @@ public class Http2ServerConnection extends Http2ConnectionBase { @Override void handleReset(long errorCode) { - Http2ServerResponseImpl response; - synchronized (conn) { - response = this.response; - } - if (response != null) { + if (!completionHandler.tryFail(new StreamResetException(errorCode))) { response.handleReset(errorCode); - } else { - context.dispatch(Future.failedFuture(new StreamResetException(errorCode)), completionHandler); } } @@ -264,6 +252,7 @@ public class Http2ServerConnection extends Http2ConnectionBase { @Override void handleClose() { + super.handleClose(); if (pendingPushes.remove(this)) { completionHandler.fail("Push reset by client"); } else { @@ -280,13 +269,8 @@ public class Http2ServerConnection extends Http2ConnectionBase { } void complete() { - synchronized (Http2ServerConnection.this) { - response = new Http2ServerResponseImpl(Http2ServerConnection.this, this, method, true, contentEncoding, null); - if (METRICS_ENABLED && metrics != null) { - response.metric(metrics.responsePushed(conn.metric(), method, uri, response)); - } - completionHandler.complete(response); - } + registerMetrics(); + completionHandler.complete(response); } } } diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java b/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java index e996a9168..18f92e150 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java @@ -58,20 +58,15 @@ import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED; /** * @author Julien Viet */ -public class Http2ServerRequestImpl extends VertxHttp2Stream implements HttpServerRequest { +public class Http2ServerRequestImpl extends Http2ServerStream implements HttpServerRequest { private static final Logger log = LoggerFactory.getLogger(HttpServerRequestImpl.class); private final String serverOrigin; - private final Http2ServerResponseImpl response; private final MultiMap headersMap; - private final String rawMethod; private final String scheme; - private final String host; - private final String uri; private String path; private String query; - private HttpMethod method; private MultiMap params; private String absoluteURI; private MultiMap attributes; @@ -81,7 +76,6 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream endHandler; private boolean streamEnded; private boolean ended; - private long bytesRead; private Handler uploadHandler; private HttpPostRequestDecoder postRequestDecoder; @@ -92,32 +86,21 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream streamPriorityHandler; public Http2ServerRequestImpl(Http2ServerConnection conn, ContextInternal context, Http2Stream stream, HttpServerMetrics metrics, - String serverOrigin, Http2Headers headers, String contentEncoding, boolean writable, boolean streamEnded) { - super(conn, context, stream, writable); + String serverOrigin, Http2Headers headers, String contentEncoding, boolean writable, boolean streamEnded) { + super(conn, context, stream, headers, contentEncoding, serverOrigin, writable); - this.serverOrigin = serverOrigin; - this.streamEnded = streamEnded; - this.uri = headers.get(":path") != null ? headers.get(":path").toString() : null; - this.scheme = headers.get(":scheme") != null ? headers.get(":scheme").toString() : null; - this.rawMethod = headers.get(":method") != null ? headers.get(":method").toString() : null; - this.host = headers.get(":authority") != null ? headers.get(":authority").toString() : null; + String scheme = headers.get(":scheme") != null ? headers.get(":scheme").toString() : null; headers.remove(":method"); headers.remove(":scheme"); headers.remove(":path"); headers.remove(":authority"); - headersMap = new Http2HeadersAdaptor(headers); + Http2HeadersAdaptor headersMap = new Http2HeadersAdaptor(headers); - - String host = host(); - if (host == null) { - int idx = serverOrigin.indexOf("://"); - host = serverOrigin.substring(idx + 3); - } - this.response = new Http2ServerResponseImpl(conn, this, method(), false, contentEncoding, host); - if (METRICS_ENABLED && metrics != null) { - response.metric(metrics.requestBegin(conn.metric(), this)); - } + this.serverOrigin = serverOrigin; + this.streamEnded = streamEnded; + this.scheme = scheme; + this.headersMap = headersMap; } void dispatch(Handler handler) { @@ -167,6 +150,7 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream bodyEndHandler; private Handler closeHandler; private Handler endHandler; - private long bytesWritten; private NetSocket netSocket; public Http2ServerResponseImpl(Http2ServerConnection conn, - VertxHttp2Stream stream, + Http2ServerStream stream, HttpMethod method, boolean push, String contentEncoding, @@ -97,8 +95,8 @@ public class Http2ServerResponseImpl implements HttpServerResponse { } } - void metric(Object metric) { - this.metric = metric; + boolean isPush() { + return push; } void handleReset(long code) { @@ -122,15 +120,6 @@ public class Http2ServerResponseImpl implements HttpServerResponse { synchronized (conn) { boolean failed = !ended; closed = true; - if (METRICS_ENABLED && metric != null) { - // Null in case of push response : handle this case - conn.reportBytesWritten(bytesWritten); - if (failed) { - conn.metrics().requestReset(metric); - } else { - conn.metrics().responseEnd(metric, this); - } - } exceptionHandler = failed ? this.exceptionHandler : null; endHandler = failed ? this.endHandler : null; closeHandler = this.closeHandler; @@ -460,12 +449,9 @@ public class Http2ServerResponseImpl implements HttpServerResponse { headersEndHandler.handle(null); } sanitizeHeaders(); - if (Metrics.METRICS_ENABLED && metric != null) { - conn.metrics().responseBegin(metric, this); - } headWritten = true; headers.status(Integer.toString(status.code())); // Could be optimized for usual case ? - stream.writeHeaders(headers, end, null); + stream.writeHead(headers, end, null); if (end) { ctx.flush(); } @@ -486,7 +472,6 @@ public class Http2ServerResponseImpl implements HttpServerResponse { boolean hasBody = false; if (chunk != null) { hasBody = true; - bytesWritten += chunk.readableBytes(); } else { chunk = Unpooled.EMPTY_BUFFER; } @@ -618,7 +603,6 @@ public class Http2ServerResponseImpl implements HttpServerResponse { Promise result = Promise.promise(); result.future().setHandler(ar -> { if (ar.succeeded()) { - bytesWritten += ar.result(); end(); } if (resultHandler != null) { @@ -686,9 +670,7 @@ public class Http2ServerResponseImpl implements HttpServerResponse { @Override public long bytesWritten() { - synchronized (conn) { - return bytesWritten; - } + return stream.bytesWritten(); } @Override diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java b/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java new file mode 100644 index 000000000..6c1fa2543 --- /dev/null +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.http.impl; + +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2Stream; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.spi.metrics.HttpServerMetrics; +import io.vertx.core.spi.metrics.Metrics; + +import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED; + +abstract class Http2ServerStream extends VertxHttp2Stream { + + protected final String rawMethod; + protected final HttpMethod method; + protected final String uri; + protected final String contentEncoding; + protected final String host; + protected final Http2ServerResponseImpl response; + private Object metric; + + Http2ServerStream(Http2ServerConnection conn, + ContextInternal context, + Http2Stream stream, + String contentEncoding, + HttpMethod method, + String uri, + boolean writable) { + super(conn, context, stream, writable); + + this.method = method; + this.rawMethod = method.name(); + this.contentEncoding = contentEncoding; + this.uri = uri; + this.host = null; + this.response = new Http2ServerResponseImpl(conn, this, method, true, contentEncoding, null); + } + + Http2ServerStream(Http2ServerConnection conn, ContextInternal context, Http2Stream stream, Http2Headers headers, String contentEncoding, String serverOrigin, boolean writable) { + super(conn, context, stream, writable); + + String host = headers.get(":authority") != null ? headers.get(":authority").toString() : null; + if (host == null) { + int idx = serverOrigin.indexOf("://"); + host = serverOrigin.substring(idx + 3); + } + + this.host = host; + this.contentEncoding = contentEncoding; + this.uri = headers.get(":path") != null ? headers.get(":path").toString() : null; + this.rawMethod = headers.get(":method") != null ? headers.get(":method").toString() : null; + this.method = HttpUtils.toVertxMethod(rawMethod); + this.response = new Http2ServerResponseImpl(conn, this, method, false, contentEncoding, host); + } + + void registerMetrics() { + if (METRICS_ENABLED) { + HttpServerMetrics metrics = conn.metrics(); + if (metrics != null) { + if (response.isPush()) { + metric = metrics.responsePushed(conn.metric(), method(), uri, response); + } else { + metric = metrics.requestBegin(conn.metric(), (HttpServerRequest) this); + } + } + } + } + + void writeHead(Http2Headers headers, boolean end, Handler> handler) { + if (Metrics.METRICS_ENABLED && metric != null) { + conn.metrics().responseBegin(metric, response); + } + writeHeaders(headers, end, handler); + } + + @Override + void handleInterestedOpsChanged() { + if (response != null) { + response.writabilityChanged(); + } + } + + public HttpMethod method() { + return method; + } + + public String rawMethod() { + return rawMethod; + } + + @Override + void handleClose() { + super.handleClose(); + if (METRICS_ENABLED) { + HttpServerMetrics metrics = conn.metrics(); + if (metrics != null) { + // Null in case of push response : handle this case + boolean failed = !response.ended(); + if (failed) { + metrics.requestReset(metric); + } else { + metrics.responseEnd(metric, response); + } + } + } + } +} diff --git a/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java index 297581fe4..b6a37df80 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java @@ -142,7 +142,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { // Now we need to upgrade this to an HTTP2 ConnectionListener listener = conn.listener(); VertxHttp2ConnectionHandler handler = Http2ClientConnection.createHttp2ConnectionHandler(client, conn.endpointMetric(), listener, conn.getContext(), current.metric(), (conn, concurrency) -> { - conn.upgradeStream(request, stream.getContext(), ar -> { + conn.upgradeStream(request, stream.metric(), stream.getContext(), ar -> { UpgradingStream.this.conn.closeHandler(null); UpgradingStream.this.conn.exceptionHandler(null); if (ar.succeeded()) { @@ -175,6 +175,11 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { return 1; } + @Override + public Object metric() { + return stream.metric(); + } + @Override public HttpVersion version() { return HttpVersion.HTTP_2; @@ -195,16 +200,6 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { stream.writeFrame(type, flags, payload); } - @Override - public void reportBytesWritten(long numberOfBytes) { - stream.reportBytesWritten(numberOfBytes); - } - - @Override - public void reportBytesRead(long numberOfBytes) { - stream.reportBytesRead(numberOfBytes); - } - @Override public void doSetWriteQueueMaxSize(int size) { stream.doSetWriteQueueMaxSize(size); diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java index b402108ea..03ce2e619 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java @@ -35,7 +35,6 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { private long currentTimeoutMs; private long lastDataReceived; protected Throwable exceptionOccurred; - private Object metric; HttpClientRequestBase(HttpClientImpl client, boolean ssl, HttpMethod method, SocketAddress server, String host, int port, String uri) { this.client = client; @@ -49,14 +48,6 @@ public abstract class HttpClientRequestBase implements HttpClientRequest { this.ssl = ssl; } - Object metric() { - return metric; - } - - void metric(Object metric) { - this.metric = metric; - } - protected abstract void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs); protected abstract void checkComplete(); diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java index 15c1831ea..ba2233172 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java @@ -69,7 +69,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http private List>> pendingHandlers; private int pendingMaxSize = -1; private int followRedirects; - private long written; private VertxHttpHeaders headers; private StreamPriority priority; public HttpClientStream stream; @@ -140,7 +139,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http public HttpClientRequestImpl setChunked(boolean chunked) { synchronized (this) { checkComplete(); - if (written > 0) { + if (stream != null) { throw new IllegalStateException("Cannot set chunked after data has been written on request"); } // HTTP 1.0 does not support chunking so we ignore this if HTTP 1.0 @@ -393,7 +392,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http exceptionHandler(null); next.pushHandler = pushHandler; next.followRedirects = followRedirects - 1; - next.written = written; if (next.hostHeader == null) { next.hostHeader = hostHeader; } @@ -556,7 +554,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http if (completed) { // we also need to write the head so optimize this and write all out in once stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true, priority, handler); - stream.reportBytesWritten(written); stream.endRequest(); } else { stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false, priority, handler); @@ -565,7 +562,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http if (completed) { // we also need to write the head so optimize this and write all out in once stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true, priority, null); - stream.reportBytesWritten(written); stream.endRequest(); } else { stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false, priority, null); @@ -678,9 +674,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http // nothing to write to the connection just return return; } - if (buff != null) { - written += buff.readableBytes(); - } if ((s = stream) == null) { if (buff != null) { if (pendingChunks == null) { @@ -714,9 +707,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } } s.writeBuffer(buff, end, h); - if (end) { - s.reportBytesWritten(written); // MUST BE READ UNDER SYNCHRONIZATION - } if (end) { Handler handler; synchronized (this) { diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java index 08596b0a4..3cc58008c 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java @@ -51,9 +51,6 @@ public class HttpClientResponseImpl implements HttpClientResponse { private Handler priorityHandler; private NetSocket netSocket; - // Track for metrics - private long bytesRead; - // Cache these for performance private MultiMap headers; private MultiMap trailers; @@ -229,7 +226,6 @@ public class HttpClientResponseImpl implements HttpClientResponse { Handler handler; synchronized (conn) { request.dataReceived(); - bytesRead += data.length(); handler = dataHandler; } if (handler != null) { @@ -244,8 +240,6 @@ public class HttpClientResponseImpl implements HttpClientResponse { void handleEnd(MultiMap trailers) { Handler handler; synchronized (conn) { - stream.reportBytesRead(bytesRead); - bytesRead = 0; this.trailers = trailers; handler = endHandler; endHandler = null; diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java index f8a321522..eced24611 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java @@ -33,6 +33,8 @@ public interface HttpClientStream { */ int id(); + Object metric(); + /** * @return the stream version or null if it's not yet determined */ @@ -45,9 +47,6 @@ public interface HttpClientStream { void writeBuffer(ByteBuf buf, boolean end, Handler> handler); void writeFrame(int type, int flags, ByteBuf payload); - void reportBytesWritten(long numberOfBytes); - void reportBytesRead(long numberOfBytes); - void doSetWriteQueueMaxSize(int size); boolean isNotWritable(); void doPause(); diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java index 37c3c9785..ea5f2d4e3 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java @@ -91,6 +91,7 @@ class VertxHttp2NetSocket extends VertxHttp2Strea @Override void handleClose() { + super.handleClose(); Handler handler = closeHandler(); if (handler != null) { handler.handle(null); diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java index a53af770f..eaad99502 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java @@ -43,6 +43,8 @@ abstract class VertxHttp2Stream { private MultiMap trailers; private boolean writable; private StreamPriority priority; + private long bytesRead; + private long bytesWritten; VertxHttp2Stream(C conn, ContextInternal context, Http2Stream stream, boolean writable) { this.conn = conn; @@ -62,9 +64,12 @@ abstract class VertxHttp2Stream { pending.handler(buff -> { if (buff == InboundBuffer.END_SENTINEL) { + conn.reportBytesRead(bytesRead); handleEnd(trailers); } else { - handleData((Buffer) buff); + Buffer data = (Buffer) buff; + bytesRead += data.length(); + handleData(data); } }); pending.exceptionHandler(context.exceptionHandler()); @@ -106,6 +111,14 @@ abstract class VertxHttp2Stream { return stream.id(); } + long bytesWritten() { + return bytesWritten; + } + + long bytesRead() { + return bytesRead; + } + public void doPause() { pending.pause(); } @@ -133,10 +146,11 @@ abstract class VertxHttp2Stream { } void writeData(ByteBuf chunk, boolean end) { - conn.handler.writeData(stream, chunk, end, null); + writeData(chunk, end, null); } void writeData(ByteBuf chunk, boolean end, Handler> handler) { + bytesWritten += chunk.readableBytes(); conn.handler.writeData(stream, chunk, end, handler); } @@ -163,6 +177,7 @@ abstract class VertxHttp2Stream { } void handleClose() { + conn.reportBytesWritten(bytesWritten); } synchronized void priority(StreamPriority streamPriority) {