Move HTTP metrics in the HTTP stream classes instead of request/response classes

This commit is contained in:
Julien Viet
2019-06-16 23:15:42 +02:00
parent d9cf8c016e
commit 3bc24dc0f8
13 changed files with 216 additions and 180 deletions

View File

@@ -203,6 +203,9 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
private MultiMap trailers;
private StreamImpl next;
private Object trace;
private long bytesWritten;
private long bytesRead;
private Object metric;
StreamImpl(ContextInternal context, Http1xClientConnection conn, int id, Handler<AsyncResult<HttpClientStream>> handler) {
this.context = context;
@@ -222,21 +225,16 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
c.next = s;
}
@Override
public void reportBytesWritten(long numberOfBytes) {
conn.reportBytesWritten(numberOfBytes);
}
@Override
public void reportBytesRead(long numberOfBytes) {
conn.reportBytesRead(numberOfBytes);
}
@Override
public int id() {
return id;
}
@Override
public Object metric() {
return metric;
}
@Override
public HttpVersion version() {
return conn.version;
@@ -263,6 +261,9 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
conn.responseInProgress.append(this);
}
next = null;
if (buf != null) {
bytesWritten += buf.readableBytes();
}
}
private HttpRequest createRequest(HttpMethod method, String rawMethod, String uri, MultiMap headers) {
@@ -313,6 +314,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
private boolean handleChunk(Buffer buff) {
bytesRead += buff.length();
return queue.write(buff);
}
@@ -321,7 +323,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
if (buff == null && !end) {
return;
}
HttpObject msg;
HttpContent msg;
if (end) {
if (buff != null && buff.isReadable()) {
msg = new DefaultLastHttpContent(buff, false);
@@ -331,6 +333,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
} else {
msg = new DefaultHttpContent(buff);
}
bytesWritten += msg.content().readableBytes();
conn.writeToChannel(msg, conn.toPromise(handler));
}
@@ -389,8 +392,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
request = req;
if (conn.metrics != null) {
Object reqMetric = conn.metrics.requestBegin(conn.endpointMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request);
request.metric(reqMetric);
metric = conn.metrics.requestBegin(conn.endpointMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request);
}
VertxTracer tracer = context.tracer();
if (tracer != null) {
@@ -415,10 +417,11 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
requestEnded = true;
if (conn.metrics != null) {
conn.metrics.requestEnd(request.metric());
conn.metrics.requestEnd(metric);
}
doRecycle = responseEnded;
}
conn.reportBytesWritten(bytesWritten);
conn.handleRequestEnd(doRecycle);
}
@@ -450,7 +453,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
response = new HttpClientResponseImpl(request, version, this, resp.status().code(), resp.status().reasonPhrase(), new HeadersAdaptor(resp.headers()));
if (conn.metrics != null) {
conn.metrics.responseBegin(request.metric(), response);
conn.metrics.responseBegin(metric, response);
}
if (resp.status().code() != 100 && request.method() != io.vertx.core.http.HttpMethod.CONNECT) {
// See https://tools.ietf.org/html/rfc7230#section-6.3
@@ -476,6 +479,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
queue.handler(buf -> {
if (buf == InboundBuffer.END_SENTINEL) {
conn.reportBytesRead(bytesRead);
response.handleEnd(trailers);
} else {
response.handleChunk((Buffer) buf);
@@ -493,11 +497,10 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
synchronized (conn) {
HttpClientRequestImpl req = request;
if (conn.metrics != null) {
Object reqMetric = req.metric();
if (req.exceptionOccurred != null) {
conn.metrics.requestReset(reqMetric);
conn.metrics.requestReset(metric);
} else {
conn.metrics.responseEnd(reqMetric, response);
conn.metrics.responseEnd(metric, response);
}
}
VertxTracer tracer = context.tracer();
@@ -797,7 +800,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
ws = this.ws;
for (StreamImpl r = responseInProgress;r != null;r = r.next) {
if (metrics != null) {
metrics.requestReset(r.request.metric());
metrics.requestReset(r.metric);
}
if (tracer != null) {
tracer.receiveResponse(r.context, null, r.trace, ConnectionBase.CLOSED_EXCEPTION, TagExtractor.empty());

View File

@@ -44,10 +44,10 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
private final ConnectionListener<HttpClientConnection> listener;
private final HttpClientImpl client;
final HttpClientMetrics metrics;
final Object queueMetric;
private final HttpClientMetrics metrics;
private final Object queueMetric;
public Http2ClientConnection(ConnectionListener<HttpClientConnection> listener,
Http2ClientConnection(ConnectionListener<HttpClientConnection> listener,
Object queueMetric,
HttpClientImpl client,
ContextInternal context,
@@ -97,11 +97,12 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
super.onStreamClosed(nettyStream);
}
void upgradeStream(HttpClientRequestImpl req, ContextInternal context, Handler<AsyncResult<HttpClientStream>> completionHandler) {
void upgradeStream(HttpClientRequestImpl req, Object metric, ContextInternal context, Handler<AsyncResult<HttpClientStream>> completionHandler) {
Future<HttpClientStream> fut;
synchronized (this) {
try {
Http2ClientStream stream = createStream(context, handler.connection().stream(1));
stream.metric = metric;
stream.beginRequest(req);
fut = Future.succeededFuture(stream);
} catch (Exception e) {
@@ -189,7 +190,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
}
HttpClientRequestPushPromise pushReq = new HttpClientRequestPushPromise(this, promisedStream, client, isSsl(), method, rawMethod, uri, host, port, headersMap);
if (metrics != null) {
pushReq.metric(metrics.responsePushed(queueMetric, metric(), localAddress(), remoteAddress(), pushReq));
pushReq.getStream().metric = metrics.responsePushed(queueMetric, metric(), localAddress(), remoteAddress(), pushReq);
}
streams.put(promisedStreamId, pushReq.getStream());
stream.context.dispatch(pushReq, pushHandler);
@@ -206,6 +207,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
private boolean requestEnded;
private boolean responseEnded;
private Object trace;
private Object metric;
Http2ClientStream(Http2ClientConnection conn, ContextInternal context, Http2Stream stream, boolean writable) {
super(conn, context, stream, writable);
@@ -236,13 +238,18 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
return super.id();
}
@Override
public Object metric() {
return metric;
}
@Override
void handleEnd(MultiMap trailers) {
if (conn.metrics != null) {
if (request.exceptionOccurred != null) {
conn.metrics.requestReset(request.metric());
conn.metrics.requestReset(metric);
} else {
conn.metrics.responseEnd(request.metric(), response);
conn.metrics.responseEnd(metric, response);
}
}
responseEnded = true;
@@ -266,7 +273,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
}
responseEnded = true;
if (conn.metrics != null) {
conn.metrics.requestReset(request.metric());
conn.metrics.requestReset(metric);
}
}
handleException(new StreamResetException(errorCode));
@@ -274,6 +281,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
@Override
void handleClose() {
super.handleClose();
VertxTracer tracer = context.tracer();
if (tracer != null) {
Throwable failure;
@@ -295,7 +303,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
if (!responseEnded) {
responseEnded = true;
if (conn.metrics != null) {
conn.metrics.requestReset(request.metric());
conn.metrics.requestReset(metric);
}
handleException(CLOSED_EXCEPTION);
}
@@ -350,7 +358,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
new Http2HeadersAdaptor(headers)
);
if (conn.metrics != null) {
conn.metrics.responseBegin(request.metric(), response);
conn.metrics.responseBegin(metric, response);
}
request.handleResponse(response);
if (end) {
@@ -405,7 +413,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
h.set(HttpHeaderNames.ACCEPT_ENCODING, DEFLATE_GZIP);
}
if (conn.metrics != null) {
request.metric(conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request));
metric = conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request);
}
priority(priority);
if (content != null) {
@@ -435,16 +443,6 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
super.writeFrame(type, flags, payload);
}
@Override
public void reportBytesWritten(long numberOfBytes) {
conn.reportBytesWritten(numberOfBytes);
}
@Override
public void reportBytesRead(long numberOfBytes) {
conn.reportBytesRead(numberOfBytes);
}
@Override
public ContextInternal getContext() {
return context;
@@ -472,7 +470,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
@Override
public void endRequest() {
if (conn.metrics != null) {
conn.metrics.requestEnd(request.metric());
conn.metrics.requestEnd(metric);
}
requestEnded = true;
}
@@ -487,7 +485,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
responseEnded = true;
writeReset(code);
if (conn.metrics != null) {
conn.metrics.requestReset(request.metric());
conn.metrics.requestReset(metric);
}
}
}

View File

@@ -114,6 +114,7 @@ public class Http2ServerConnection extends Http2ConnectionBase {
return;
}
Http2ServerRequestImpl req = createRequest(streamId, headers, endOfStream);
req.registerMetrics();
req.priority(new StreamPriority()
.setDependency(streamDependency)
.setWeight(weight)
@@ -199,12 +200,8 @@ public class Http2ServerConnection extends Http2ConnectionBase {
super.updateSettings(settingsUpdate, completionHandler);
}
private class Push extends VertxHttp2Stream<Http2ServerConnection> {
private class Push extends Http2ServerStream {
private final HttpMethod method;
private final String uri;
private final String contentEncoding;
private Http2ServerResponseImpl response;
private final Promise<HttpServerResponse> completionHandler;
public Push(Http2Stream stream,
@@ -214,12 +211,9 @@ public class Http2ServerConnection extends Http2ConnectionBase {
String uri,
boolean writable,
Handler<AsyncResult<HttpServerResponse>> completionHandler) {
super(Http2ServerConnection.this, context, stream, writable);
super(Http2ServerConnection.this, context, stream, contentEncoding, method, uri, writable);
Promise<HttpServerResponse> promise = Promise.promise();
promise.future().setHandler(completionHandler);
this.method = method;
this.uri = uri;
this.contentEncoding = contentEncoding;
promise.future().setHandler(ar -> context.dispatch(ar, completionHandler));
this.completionHandler = promise;
}
@@ -244,14 +238,8 @@ public class Http2ServerConnection extends Http2ConnectionBase {
@Override
void handleReset(long errorCode) {
Http2ServerResponseImpl response;
synchronized (conn) {
response = this.response;
}
if (response != null) {
if (!completionHandler.tryFail(new StreamResetException(errorCode))) {
response.handleReset(errorCode);
} else {
context.dispatch(Future.failedFuture(new StreamResetException(errorCode)), completionHandler);
}
}
@@ -264,6 +252,7 @@ public class Http2ServerConnection extends Http2ConnectionBase {
@Override
void handleClose() {
super.handleClose();
if (pendingPushes.remove(this)) {
completionHandler.fail("Push reset by client");
} else {
@@ -280,13 +269,8 @@ public class Http2ServerConnection extends Http2ConnectionBase {
}
void complete() {
synchronized (Http2ServerConnection.this) {
response = new Http2ServerResponseImpl(Http2ServerConnection.this, this, method, true, contentEncoding, null);
if (METRICS_ENABLED && metrics != null) {
response.metric(metrics.responsePushed(conn.metric(), method, uri, response));
}
completionHandler.complete(response);
}
registerMetrics();
completionHandler.complete(response);
}
}
}

View File

@@ -58,20 +58,15 @@ import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnection> implements HttpServerRequest {
public class Http2ServerRequestImpl extends Http2ServerStream implements HttpServerRequest {
private static final Logger log = LoggerFactory.getLogger(HttpServerRequestImpl.class);
private final String serverOrigin;
private final Http2ServerResponseImpl response;
private final MultiMap headersMap;
private final String rawMethod;
private final String scheme;
private final String host;
private final String uri;
private String path;
private String query;
private HttpMethod method;
private MultiMap params;
private String absoluteURI;
private MultiMap attributes;
@@ -81,7 +76,6 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
private Handler<Void> endHandler;
private boolean streamEnded;
private boolean ended;
private long bytesRead;
private Handler<HttpServerFileUpload> uploadHandler;
private HttpPostRequestDecoder postRequestDecoder;
@@ -92,32 +86,21 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
private Handler<StreamPriority> streamPriorityHandler;
public Http2ServerRequestImpl(Http2ServerConnection conn, ContextInternal context, Http2Stream stream, HttpServerMetrics metrics,
String serverOrigin, Http2Headers headers, String contentEncoding, boolean writable, boolean streamEnded) {
super(conn, context, stream, writable);
String serverOrigin, Http2Headers headers, String contentEncoding, boolean writable, boolean streamEnded) {
super(conn, context, stream, headers, contentEncoding, serverOrigin, writable);
this.serverOrigin = serverOrigin;
this.streamEnded = streamEnded;
this.uri = headers.get(":path") != null ? headers.get(":path").toString() : null;
this.scheme = headers.get(":scheme") != null ? headers.get(":scheme").toString() : null;
this.rawMethod = headers.get(":method") != null ? headers.get(":method").toString() : null;
this.host = headers.get(":authority") != null ? headers.get(":authority").toString() : null;
String scheme = headers.get(":scheme") != null ? headers.get(":scheme").toString() : null;
headers.remove(":method");
headers.remove(":scheme");
headers.remove(":path");
headers.remove(":authority");
headersMap = new Http2HeadersAdaptor(headers);
Http2HeadersAdaptor headersMap = new Http2HeadersAdaptor(headers);
String host = host();
if (host == null) {
int idx = serverOrigin.indexOf("://");
host = serverOrigin.substring(idx + 3);
}
this.response = new Http2ServerResponseImpl(conn, this, method(), false, contentEncoding, host);
if (METRICS_ENABLED && metrics != null) {
response.metric(metrics.requestBegin(conn.metric(), this));
}
this.serverOrigin = serverOrigin;
this.streamEnded = streamEnded;
this.scheme = scheme;
this.headersMap = headersMap;
}
void dispatch(Handler<HttpServerRequest> handler) {
@@ -167,6 +150,7 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
@Override
void handleClose() {
super.handleClose();
boolean notify;
Throwable failure;
synchronized (conn) {
@@ -195,7 +179,6 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
}
void handleData(Buffer data) {
bytesRead += data.length();
if (postRequestDecoder != null) {
try {
postRequestDecoder.offer(new DefaultHttpContent(data.getByteBuf()));
@@ -213,7 +196,6 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
synchronized (conn) {
streamEnded = true;
ended = true;
conn.reportBytesRead(bytesRead);
if (postRequestDecoder != null) {
try {
postRequestDecoder.offer(LastHttpContent.EMPTY_LAST_CONTENT);
@@ -321,21 +303,6 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
return HttpVersion.HTTP_2;
}
@Override
public HttpMethod method() {
synchronized (conn) {
if (method == null) {
method = HttpUtils.toVertxMethod(rawMethod);
}
return method;
}
}
@Override
public String rawMethod() {
return rawMethod;
}
@Override
public boolean isSSL() {
return conn.isSsl();
@@ -374,12 +341,9 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
@Override
public long bytesRead() {
synchronized (conn) {
return bytesRead;
}
return super.bytesRead();
}
@Override
public Http2ServerResponseImpl response() {
return response;
@@ -437,7 +401,7 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
@Override
public String absoluteURI() {
if (method() == HttpMethod.CONNECT) {
if (method == HttpMethod.CONNECT) {
return null;
}
synchronized (conn) {

View File

@@ -53,14 +53,13 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
private static final Logger log = LoggerFactory.getLogger(Http2ServerResponseImpl.class);
private final VertxHttp2Stream stream;
private final Http2ServerStream stream;
private final ChannelHandlerContext ctx;
private final Http2ServerConnection conn;
private final boolean head;
private final boolean push;
private final String host;
private Http2Headers headers = new DefaultHttp2Headers();
private Object metric;
private Http2HeadersAdaptor headersMap;
private Http2Headers trailers;
private Http2HeadersAdaptor trailedMap;
@@ -76,11 +75,10 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
private Handler<Void> bodyEndHandler;
private Handler<Void> closeHandler;
private Handler<Void> endHandler;
private long bytesWritten;
private NetSocket netSocket;
public Http2ServerResponseImpl(Http2ServerConnection conn,
VertxHttp2Stream stream,
Http2ServerStream stream,
HttpMethod method,
boolean push,
String contentEncoding,
@@ -97,8 +95,8 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
}
}
void metric(Object metric) {
this.metric = metric;
boolean isPush() {
return push;
}
void handleReset(long code) {
@@ -122,15 +120,6 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
synchronized (conn) {
boolean failed = !ended;
closed = true;
if (METRICS_ENABLED && metric != null) {
// Null in case of push response : handle this case
conn.reportBytesWritten(bytesWritten);
if (failed) {
conn.metrics().requestReset(metric);
} else {
conn.metrics().responseEnd(metric, this);
}
}
exceptionHandler = failed ? this.exceptionHandler : null;
endHandler = failed ? this.endHandler : null;
closeHandler = this.closeHandler;
@@ -460,12 +449,9 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
headersEndHandler.handle(null);
}
sanitizeHeaders();
if (Metrics.METRICS_ENABLED && metric != null) {
conn.metrics().responseBegin(metric, this);
}
headWritten = true;
headers.status(Integer.toString(status.code())); // Could be optimized for usual case ?
stream.writeHeaders(headers, end, null);
stream.writeHead(headers, end, null);
if (end) {
ctx.flush();
}
@@ -486,7 +472,6 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
boolean hasBody = false;
if (chunk != null) {
hasBody = true;
bytesWritten += chunk.readableBytes();
} else {
chunk = Unpooled.EMPTY_BUFFER;
}
@@ -618,7 +603,6 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
Promise<Long> result = Promise.promise();
result.future().setHandler(ar -> {
if (ar.succeeded()) {
bytesWritten += ar.result();
end();
}
if (resultHandler != null) {
@@ -686,9 +670,7 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
@Override
public long bytesWritten() {
synchronized (conn) {
return bytesWritten;
}
return stream.bytesWritten();
}
@Override

View File

@@ -0,0 +1,120 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.http.impl;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import io.vertx.core.spi.metrics.Metrics;
import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED;
abstract class Http2ServerStream extends VertxHttp2Stream<Http2ServerConnection> {
protected final String rawMethod;
protected final HttpMethod method;
protected final String uri;
protected final String contentEncoding;
protected final String host;
protected final Http2ServerResponseImpl response;
private Object metric;
Http2ServerStream(Http2ServerConnection conn,
ContextInternal context,
Http2Stream stream,
String contentEncoding,
HttpMethod method,
String uri,
boolean writable) {
super(conn, context, stream, writable);
this.method = method;
this.rawMethod = method.name();
this.contentEncoding = contentEncoding;
this.uri = uri;
this.host = null;
this.response = new Http2ServerResponseImpl(conn, this, method, true, contentEncoding, null);
}
Http2ServerStream(Http2ServerConnection conn, ContextInternal context, Http2Stream stream, Http2Headers headers, String contentEncoding, String serverOrigin, boolean writable) {
super(conn, context, stream, writable);
String host = headers.get(":authority") != null ? headers.get(":authority").toString() : null;
if (host == null) {
int idx = serverOrigin.indexOf("://");
host = serverOrigin.substring(idx + 3);
}
this.host = host;
this.contentEncoding = contentEncoding;
this.uri = headers.get(":path") != null ? headers.get(":path").toString() : null;
this.rawMethod = headers.get(":method") != null ? headers.get(":method").toString() : null;
this.method = HttpUtils.toVertxMethod(rawMethod);
this.response = new Http2ServerResponseImpl(conn, this, method, false, contentEncoding, host);
}
void registerMetrics() {
if (METRICS_ENABLED) {
HttpServerMetrics metrics = conn.metrics();
if (metrics != null) {
if (response.isPush()) {
metric = metrics.responsePushed(conn.metric(), method(), uri, response);
} else {
metric = metrics.requestBegin(conn.metric(), (HttpServerRequest) this);
}
}
}
}
void writeHead(Http2Headers headers, boolean end, Handler<AsyncResult<Void>> handler) {
if (Metrics.METRICS_ENABLED && metric != null) {
conn.metrics().responseBegin(metric, response);
}
writeHeaders(headers, end, handler);
}
@Override
void handleInterestedOpsChanged() {
if (response != null) {
response.writabilityChanged();
}
}
public HttpMethod method() {
return method;
}
public String rawMethod() {
return rawMethod;
}
@Override
void handleClose() {
super.handleClose();
if (METRICS_ENABLED) {
HttpServerMetrics metrics = conn.metrics();
if (metrics != null) {
// Null in case of push response : handle this case
boolean failed = !response.ended();
if (failed) {
metrics.requestReset(metric);
} else {
metrics.responseEnd(metric, response);
}
}
}
}
}

View File

@@ -142,7 +142,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
// Now we need to upgrade this to an HTTP2
ConnectionListener<HttpClientConnection> listener = conn.listener();
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = Http2ClientConnection.createHttp2ConnectionHandler(client, conn.endpointMetric(), listener, conn.getContext(), current.metric(), (conn, concurrency) -> {
conn.upgradeStream(request, stream.getContext(), ar -> {
conn.upgradeStream(request, stream.metric(), stream.getContext(), ar -> {
UpgradingStream.this.conn.closeHandler(null);
UpgradingStream.this.conn.exceptionHandler(null);
if (ar.succeeded()) {
@@ -175,6 +175,11 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
return 1;
}
@Override
public Object metric() {
return stream.metric();
}
@Override
public HttpVersion version() {
return HttpVersion.HTTP_2;
@@ -195,16 +200,6 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
stream.writeFrame(type, flags, payload);
}
@Override
public void reportBytesWritten(long numberOfBytes) {
stream.reportBytesWritten(numberOfBytes);
}
@Override
public void reportBytesRead(long numberOfBytes) {
stream.reportBytesRead(numberOfBytes);
}
@Override
public void doSetWriteQueueMaxSize(int size) {
stream.doSetWriteQueueMaxSize(size);

View File

@@ -35,7 +35,6 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
private long currentTimeoutMs;
private long lastDataReceived;
protected Throwable exceptionOccurred;
private Object metric;
HttpClientRequestBase(HttpClientImpl client, boolean ssl, HttpMethod method, SocketAddress server, String host, int port, String uri) {
this.client = client;
@@ -49,14 +48,6 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
this.ssl = ssl;
}
Object metric() {
return metric;
}
void metric(Object metric) {
this.metric = metric;
}
protected abstract void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs);
protected abstract void checkComplete();

View File

@@ -69,7 +69,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
private List<Handler<AsyncResult<Void>>> pendingHandlers;
private int pendingMaxSize = -1;
private int followRedirects;
private long written;
private VertxHttpHeaders headers;
private StreamPriority priority;
public HttpClientStream stream;
@@ -140,7 +139,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
public HttpClientRequestImpl setChunked(boolean chunked) {
synchronized (this) {
checkComplete();
if (written > 0) {
if (stream != null) {
throw new IllegalStateException("Cannot set chunked after data has been written on request");
}
// HTTP 1.0 does not support chunking so we ignore this if HTTP 1.0
@@ -393,7 +392,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
exceptionHandler(null);
next.pushHandler = pushHandler;
next.followRedirects = followRedirects - 1;
next.written = written;
if (next.hostHeader == null) {
next.hostHeader = hostHeader;
}
@@ -556,7 +554,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
if (completed) {
// we also need to write the head so optimize this and write all out in once
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true, priority, handler);
stream.reportBytesWritten(written);
stream.endRequest();
} else {
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false, priority, handler);
@@ -565,7 +562,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
if (completed) {
// we also need to write the head so optimize this and write all out in once
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true, priority, null);
stream.reportBytesWritten(written);
stream.endRequest();
} else {
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false, priority, null);
@@ -678,9 +674,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
// nothing to write to the connection just return
return;
}
if (buff != null) {
written += buff.readableBytes();
}
if ((s = stream) == null) {
if (buff != null) {
if (pendingChunks == null) {
@@ -714,9 +707,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
}
s.writeBuffer(buff, end, h);
if (end) {
s.reportBytesWritten(written); // MUST BE READ UNDER SYNCHRONIZATION
}
if (end) {
Handler<Void> handler;
synchronized (this) {

View File

@@ -51,9 +51,6 @@ public class HttpClientResponseImpl implements HttpClientResponse {
private Handler<StreamPriority> priorityHandler;
private NetSocket netSocket;
// Track for metrics
private long bytesRead;
// Cache these for performance
private MultiMap headers;
private MultiMap trailers;
@@ -229,7 +226,6 @@ public class HttpClientResponseImpl implements HttpClientResponse {
Handler<Buffer> handler;
synchronized (conn) {
request.dataReceived();
bytesRead += data.length();
handler = dataHandler;
}
if (handler != null) {
@@ -244,8 +240,6 @@ public class HttpClientResponseImpl implements HttpClientResponse {
void handleEnd(MultiMap trailers) {
Handler<Void> handler;
synchronized (conn) {
stream.reportBytesRead(bytesRead);
bytesRead = 0;
this.trailers = trailers;
handler = endHandler;
endHandler = null;

View File

@@ -33,6 +33,8 @@ public interface HttpClientStream {
*/
int id();
Object metric();
/**
* @return the stream version or null if it's not yet determined
*/
@@ -45,9 +47,6 @@ public interface HttpClientStream {
void writeBuffer(ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler);
void writeFrame(int type, int flags, ByteBuf payload);
void reportBytesWritten(long numberOfBytes);
void reportBytesRead(long numberOfBytes);
void doSetWriteQueueMaxSize(int size);
boolean isNotWritable();
void doPause();

View File

@@ -91,6 +91,7 @@ class VertxHttp2NetSocket<C extends Http2ConnectionBase> extends VertxHttp2Strea
@Override
void handleClose() {
super.handleClose();
Handler<Void> handler = closeHandler();
if (handler != null) {
handler.handle(null);

View File

@@ -43,6 +43,8 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
private MultiMap trailers;
private boolean writable;
private StreamPriority priority;
private long bytesRead;
private long bytesWritten;
VertxHttp2Stream(C conn, ContextInternal context, Http2Stream stream, boolean writable) {
this.conn = conn;
@@ -62,9 +64,12 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
pending.handler(buff -> {
if (buff == InboundBuffer.END_SENTINEL) {
conn.reportBytesRead(bytesRead);
handleEnd(trailers);
} else {
handleData((Buffer) buff);
Buffer data = (Buffer) buff;
bytesRead += data.length();
handleData(data);
}
});
pending.exceptionHandler(context.exceptionHandler());
@@ -106,6 +111,14 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
return stream.id();
}
long bytesWritten() {
return bytesWritten;
}
long bytesRead() {
return bytesRead;
}
public void doPause() {
pending.pause();
}
@@ -133,10 +146,11 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
}
void writeData(ByteBuf chunk, boolean end) {
conn.handler.writeData(stream, chunk, end, null);
writeData(chunk, end, null);
}
void writeData(ByteBuf chunk, boolean end, Handler<AsyncResult<Void>> handler) {
bytesWritten += chunk.readableBytes();
conn.handler.writeData(stream, chunk, end, handler);
}
@@ -163,6 +177,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
}
void handleClose() {
conn.reportBytesWritten(bytesWritten);
}
synchronized void priority(StreamPriority streamPriority) {