diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 00d47003d..4ce84dd34 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -250,7 +250,7 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC return conn.context; } - public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end) { + public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority streamPriority) { HttpRequest request = createRequest(method, rawMethod, uri, headers); prepareRequestHeaders(request, hostHeader, chunked); sendRequest(request, buf, end); diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java index 8d641ca28..e3eba5380 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java @@ -135,8 +135,9 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon @Override public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception { - Http2ClientStream stream = (Http2ClientStream) streams.get(streamId); - stream.setStreamPriority(new StreamPriority(streamDependency, weight, exclusive)); + VertxHttp2Stream stream = streams.get(streamId); + if(stream != null) + stream.setStreamPriority(new StreamPriority(streamDependency, weight, exclusive)); onHeadersRead(ctx, streamId, headers, padding, endOfStream); } @@ -332,7 +333,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon } @Override - public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end) { + public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end, StreamPriority streamPriority) { Http2Headers h = new DefaultHttp2Headers(); h.method(method != HttpMethod.OTHER ? method.name() : rawMethod); if (method == HttpMethod.CONNECT) { @@ -358,6 +359,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon if (conn.metrics != null) { request.metric(conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request)); } + setStreamPriority(streamPriority); writeHeaders(h, end && content == null); if (content != null) { writeBuffer(content, end); @@ -378,6 +380,14 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon handlerContext.flush(); } } + + + + @Override + public void updateStreamPriority(StreamPriority streamPriority) { + setStreamPriority(streamPriority); + writePriorityFrame(); + } @Override public void writeFrame(int type, int flags, ByteBuf payload) { @@ -447,7 +457,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon return conn.toNetSocket(this); } } - + public static VertxHttp2ConnectionHandler createHttp2ConnectionHandler( HttpClientImpl client, Object queueMetric, diff --git a/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java index fe9986d75..f69320010 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java @@ -102,7 +102,8 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { String hostHeader, boolean chunked, ByteBuf buf, - boolean end) { + boolean end, + StreamPriority streamPriority) { ChannelPipeline pipeline = conn.channel().pipeline(); HttpClientCodec httpCodec = pipeline.get(HttpClientCodec.class); class UpgradeRequestHandler extends ChannelInboundHandlerAdapter { @@ -161,7 +162,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536); pipeline.addAfter("codec", null, new UpgradeRequestHandler()); pipeline.addAfter("codec", null, upgradeHandler); - stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end); + stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, streamPriority); } @Override diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java index 96ce20475..40d2ed529 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java @@ -71,7 +71,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http private HttpClientStream stream; private boolean connecting; - private StreamPriority streamPriority; // This filed is used to hold the steamPriority if it is set before the stream is created + private StreamPriority streamPriority = StreamPriority.DEFAULT; // This filed is used to hold the steamPriority if it is set before the stream is created // completed => drainHandler = null @@ -513,10 +513,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http private void connected(Handler headersHandler, HttpClientStream stream) { synchronized (this) { this.stream = stream; - if(streamPriority != null) { - stream.setStreamPriority(streamPriority); - } - stream.beginRequest(this); // If anything was written or the request ended before we got the connection, then @@ -532,20 +528,20 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http if (completed) { // we also need to write the head so optimize this and write all out in once - stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true); + stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true, streamPriority); stream.reportBytesWritten(written); stream.endRequest(); } else { - stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false); + stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false, streamPriority); } } else { if (completed) { // we also need to write the head so optimize this and write all out in once - stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true); + stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true, streamPriority); stream.reportBytesWritten(written); stream.endRequest(); } else { - stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false); + stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false, streamPriority); } } this.connecting = false; @@ -686,8 +682,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http if(!streamPriority.equals(getStreamPriority())) { this.streamPriority = streamPriority; if(stream != null) { - stream.setStreamPriority(streamPriority); - stream.writePriorityFrame(); + stream.updateStreamPriority(streamPriority); } } } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java index f5a70dd99..b472123b3 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java @@ -39,7 +39,7 @@ interface HttpClientStream { HttpConnection connection(); Context getContext(); - void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end); + void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority streamPriority); void writeBuffer(ByteBuf buf, boolean end); void writeFrame(int type, int flags, ByteBuf payload); default void writePriorityFrame() {} @@ -62,5 +62,5 @@ interface HttpClientStream { default StreamPriority getStreamPriority() { return null; } - default void setStreamPriority(StreamPriority streamPriority) { } + default void updateStreamPriority(StreamPriority streamPriority) { } } \ No newline at end of file