Merge pull request #2727 from vietj/Openet-Labs-master

HTTP/2 stream priority implementation - fixes #2685
This commit is contained in:
Julien Viet
2018-11-20 18:24:48 +01:00
committed by GitHub
28 changed files with 1138 additions and 37 deletions

View File

@@ -1835,6 +1835,29 @@ Set the request relative URI
+++
|===
[[StreamPriority]]
== StreamPriority
++++
This class represents HTTP/2 stream priority defined in RFC 7540 clause 5.3
++++
'''
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[dependency]]`@dependency`|`Number (int)`|+++
Set the priority dependency value.
+++
|[[exclusive]]`@exclusive`|`Boolean`|+++
Set the priority exclusive value.
+++
|[[weight]]`@weight`|`Number (short)`|+++
Set the priority weight.
+++
|===
[[TCPSSLOptions]]
== TCPSSLOptions

View File

@@ -384,4 +384,21 @@ public interface HttpClientRequest extends WriteStream<Buffer>, ReadStream<HttpC
default HttpClientRequest writeCustomFrame(HttpFrame frame) {
return writeCustomFrame(frame.type(), frame.flags(), frame.payload());
}
/**
* Sets the priority of the associated stream.
* <p/>
* This is not implemented for HTTP/1.x.
*
* @param streamPriority the priority of this request's stream
*/
@Fluent
default HttpClientRequest setStreamPriority(StreamPriority streamPriority) {
return this;
}
/**
* @return the priority of the associated HTTP/2 stream for HTTP/2 otherwise {@code null}
*/
StreamPriority getStreamPriority();
}

View File

@@ -150,4 +150,14 @@ public interface HttpClientResponse extends ReadStream<Buffer> {
*/
@CacheReturn
HttpClientRequest request();
/**
* Set an handler for stream priority changes.
* <p/>
* This is not implemented for HTTP/1.x.
*
* @param handler the handler to be called when the stream priority changes
*/
@Fluent
HttpClientResponse streamPriorityHandler(Handler<StreamPriority> handler);
}

View File

@@ -11,6 +11,7 @@
package io.vertx.core.http;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.codegen.annotations.*;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
@@ -328,4 +329,20 @@ public interface HttpServerRequest extends ReadStream<Buffer> {
@CacheReturn
HttpConnection connection();
/**
* @return the priority of the associated HTTP/2 stream for HTTP/2 otherwise {@code null}
*/
default StreamPriority streamPriority() {
return null;
}
/**
* Set an handler for stream priority changes
* <p>
* This is not implemented for HTTP/1.x.
*
* @param handler the handler to be called when stream priority changes
*/
@Fluent
HttpServerRequest streamPriorityHandler(Handler<StreamPriority> handler);
}

View File

@@ -469,4 +469,16 @@ public interface HttpServerResponse extends WriteStream<Buffer> {
default HttpServerResponse writeCustomFrame(HttpFrame frame) {
return writeCustomFrame(frame.type(), frame.flags(), frame.payload());
}
/**
* Sets the priority of the associated stream
* <p/>
* This is not implemented for HTTP/1.x.
*
* @param streamPriority the priority for this request's stream
*/
@Fluent
default HttpServerResponse setStreamPriority(StreamPriority streamPriority) {
return this;
}
}

View File

@@ -0,0 +1,142 @@
/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.http;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.json.JsonObject;
/**
* This class represents HTTP/2 stream priority defined in RFC 7540 clause 5.3
*/
@DataObject
public class StreamPriority {
public static final int DEFAULT_DEPENDENCY = 0;
public static final short DEFAULT_WEIGHT = Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
public static final boolean DEFAULT_EXCLUSIVE = false;
private short weight;
private int dependency;
private boolean exclusive;
public StreamPriority() {
weight = DEFAULT_WEIGHT;
dependency = DEFAULT_DEPENDENCY;
exclusive = DEFAULT_EXCLUSIVE;
}
public StreamPriority(JsonObject json) {
this.weight = json.getInteger("weight", (int)DEFAULT_WEIGHT).shortValue();
this.dependency = json.getInteger("dependency", DEFAULT_DEPENDENCY);
this.exclusive = json.getBoolean("exclusive", DEFAULT_EXCLUSIVE);
}
public StreamPriority(StreamPriority other) {
this.weight = other.weight;
this.dependency = other.dependency;
this.exclusive = other.exclusive;
}
/**
* @return An integer value between {@code 1} and {@code 256} representing a priority weight
* for the stream.
*/
public short getWeight() {
return weight;
}
/**
* Set the priority weight.
*
* @param weight the new value
* @return a reference to this, so the API can be used fluently
*/
public StreamPriority setWeight(short weight) {
this.weight = weight;
return this;
}
/**
* @return A stream identifier for the stream that this stream depends on.
*/
public int getDependency() {
return dependency;
}
/**
* Set the priority dependency value.
*
* @param dependency the new value
* @return a reference to this, so the API can be used fluently
*/
public StreamPriority setDependency(int dependency) {
this.dependency = dependency;
return this;
}
/**
* @return A flag indicating that the stream dependency is exclusive.
*/
public boolean isExclusive() {
return exclusive;
}
/**
* Set the priority exclusive value.
*
* @param exclusive the new value
* @return a reference to this, so the API can be used fluently
*/
public StreamPriority setExclusive(boolean exclusive) {
this.exclusive = exclusive;
return this;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (exclusive ? 1231 : 1237);
result = prime * result + dependency;
result = prime * result + weight;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
StreamPriority other = (StreamPriority) obj;
if (exclusive != other.exclusive) return false;
if (dependency != other.dependency) return false;
if (weight != other.weight) return false;
return true;
}
public JsonObject toJson() {
JsonObject json = new JsonObject();
json.put("weight", weight);
json.put("dependency", dependency);
json.put("exclusive", exclusive);
return json;
}
@Override
public String toString() {
return "StreamPriority [weight=" + weight + ", dependency=" + dependency + ", exclusive=" + exclusive + "]";
}
}

View File

@@ -250,7 +250,7 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC
return conn.context;
}
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end) {
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority) {
HttpRequest request = createRequest(method, rawMethod, uri, headers);
prepareRequestHeaders(request, hostHeader, chunked);
sendRequest(request, buf, end);
@@ -421,6 +421,15 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC
}
}
@Override
public StreamPriority priority() {
return null;
}
@Override
public void updatePriority(StreamPriority streamPriority) {
}
private HttpClientResponseImpl beginResponse(HttpResponse resp) {
HttpVersion version;
if (resp.protocolVersion() == io.netty.handler.codec.http.HttpVersion.HTTP_1_0) {

View File

@@ -132,12 +132,26 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
listener.onRecycle(expired);
}
@Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
if (stream != null) {
StreamPriority streamPriority = new StreamPriority()
.setDependency(streamDependency)
.setWeight(weight)
.setExclusive(exclusive);
context.executeFromIO(v -> {
stream.handleHeaders(headers, streamPriority, endOfStream);
});
}
}
@Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
if (stream != null) {
context.executeFromIO(v -> {
stream.handleHeaders(headers, endOfStream);
stream.handleHeaders(headers, null, endOfStream);
});
}
}
@@ -185,6 +199,16 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
this.request = request;
}
@Override
public StreamPriority priority() {
return super.priority();
}
@Override
public void updatePriority(StreamPriority streamPriority) {
super.updatePriority(streamPriority);
}
@Override
public HttpVersion version() {
return HttpVersion.HTTP_2;
@@ -263,7 +287,18 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
response.handleUnknownFrame(new HttpFrameImpl(type, flags, buff));
}
void handleHeaders(Http2Headers headers, boolean end) {
@Override
void handlePriorityChange(StreamPriority streamPriority) {
if(streamPriority != null && !streamPriority.equals(priority())) {
priority(streamPriority);
response.handlePriorityChange(streamPriority);
}
}
void handleHeaders(Http2Headers headers, StreamPriority streamPriority, boolean end) {
if(streamPriority != null)
priority(streamPriority);
if (response == null || response.statusCode() == 100) {
int status;
String statusMessage;
@@ -315,7 +350,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
}
@Override
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end) {
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end, StreamPriority priority) {
Http2Headers h = new DefaultHttp2Headers();
h.method(method != HttpMethod.OTHER ? method.name() : rawMethod);
if (method == HttpMethod.CONNECT) {
@@ -341,6 +376,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
if (conn.metrics != null) {
request.metric(conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request));
}
priority(priority);
writeHeaders(h, end && content == null);
if (content != null) {
writeBuffer(content, end);
@@ -361,7 +397,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
handlerContext.flush();
}
}
@Override
public void writeFrame(int type, int flags, ByteBuf payload) {
super.writeFrame(type, flags, payload);
@@ -430,7 +466,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
return conn.toNetSocket(this);
}
}
public static VertxHttp2ConnectionHandler<Http2ClientConnection> createHttp2ConnectionHandler(
HttpClientImpl client,
Object queueMetric,

View File

@@ -24,13 +24,13 @@ import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.GoAway;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.NetSocket;
@@ -193,8 +193,18 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL
// Http2FrameListener
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) {
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) {
VertxHttp2Stream stream;
synchronized (this) {
stream = streams.get(streamId);
}
if (stream != null) {
StreamPriority streamPriority = new StreamPriority()
.setDependency(streamDependency)
.setWeight(weight)
.setExclusive(exclusive);
context.executeFromIO(v -> stream.handlePriorityChange(streamPriority));
}
}
@Override

View File

@@ -101,13 +101,12 @@ public class Http2ServerConnection extends Http2ConnectionBase {
Http2Stream stream = handler.connection().stream(streamId);
String contentEncoding = options.isCompressionSupported() ? HttpUtils.determineContentEncoding(headers) : null;
boolean writable = handler.encoder().flowController().isWritable(stream);
Http2ServerRequestImpl request = new Http2ServerRequestImpl(this, stream, metrics, serverOrigin, headers, contentEncoding, writable);
return request;
return new Http2ServerRequestImpl(this, stream, metrics, serverOrigin, headers, contentEncoding, writable);
}
@Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endOfStream) {
Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) {
VertxHttp2Stream stream = streams.get(streamId);
if (stream == null) {
if (isMalformedRequest(headers)) {
@@ -115,6 +114,12 @@ public class Http2ServerConnection extends Http2ConnectionBase {
return;
}
Http2ServerRequestImpl req = createRequest(streamId, headers);
req.priority(new StreamPriority()
.setDependency(streamDependency)
.setWeight(weight)
.setExclusive(exclusive)
);
stream = req;
CharSequence value = headers.get(HttpHeaderNames.EXPECT);
if (options.isHandle100ContinueAutomatically() &&
@@ -141,6 +146,12 @@ public class Http2ServerConnection extends Http2ConnectionBase {
}
}
@Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endOfStream) {
onHeadersRead(ctx, streamId, headers, 0, Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream);
}
@Override
public synchronized void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
Long v = settings.maxConcurrentStreams();
@@ -150,7 +161,7 @@ public class Http2ServerConnection extends Http2ConnectionBase {
super.onSettingsRead(ctx, settings);
}
synchronized void sendPush(int streamId, String host, HttpMethod method, MultiMap headers, String path, Handler<AsyncResult<HttpServerResponse>> completionHandler) {
synchronized void sendPush(int streamId, String host, HttpMethod method, MultiMap headers, String path, StreamPriority streamPriority, Handler<AsyncResult<HttpServerResponse>> completionHandler) {
Http2Headers headers_ = new DefaultHttp2Headers();
if (method == HttpMethod.OTHER) {
throw new IllegalArgumentException("Cannot push HttpMethod.OTHER");
@@ -175,6 +186,7 @@ public class Http2ServerConnection extends Http2ConnectionBase {
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
boolean writable = handler.encoder().flowController().isWritable(promisedStream);
Push push = new Push(promisedStream, contentEncoding, method, path, writable, completionHandler);
push.priority(streamPriority);
streams.put(promisedStreamId, push);
if (maxConcurrentStreams == null || concurrentStreams < maxConcurrentStreams) {
concurrentStreams++;
@@ -238,6 +250,10 @@ public class Http2ServerConnection extends Http2ConnectionBase {
void handleData(Buffer buf) {
}
@Override
void handlePriorityChange(StreamPriority streamPriority) {
}
@Override
void handleInterestedOpsChanged() {
if (response != null) {

View File

@@ -20,6 +20,7 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.vertx.codegen.annotations.Nullable;
@@ -33,6 +34,7 @@ import io.vertx.core.http.HttpServerFileUpload;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.http.HttpFrame;
import io.vertx.core.logging.Logger;
@@ -80,6 +82,8 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
private Handler<Throwable> exceptionHandler;
private Handler<HttpFrame> customFrameHandler;
private Handler<StreamPriority> streamPriorityHandler;
public Http2ServerRequestImpl(Http2ServerConnection conn, Http2Stream stream, HttpServerMetrics metrics,
String serverOrigin, Http2Headers headers, String contentEncoding, boolean writable) {
super(conn, stream, writable);
@@ -516,4 +520,36 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
public HttpConnection connection() {
return conn;
}
@Override
public HttpServerRequest streamPriorityHandler(Handler<StreamPriority> handler) {
synchronized (conn) {
streamPriorityHandler = handler;
}
return this;
}
@Override
void handlePriorityChange(StreamPriority streamPriority) {
Handler<StreamPriority> handler;
boolean priorityChanged = false;
synchronized (conn) {
handler = streamPriorityHandler;
if (streamPriority != null && !streamPriority.equals(streamPriority())) {
priority(streamPriority);
priorityChanged = true;
}
}
if (handler != null && priorityChanged) {
handler.handle(streamPriority);
}
}
@Override
public StreamPriority streamPriority() {
return priority();
}
}

View File

@@ -20,6 +20,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpStatusClass;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
@@ -27,9 +28,11 @@ import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
@@ -696,7 +699,7 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
throw new IllegalStateException("A push response cannot promise another push");
}
checkEnded();
conn.sendPush(stream.id(), host, method, headers, path, handler);
conn.sendPush(stream.id(), host, method, headers, path, stream.priority(), handler);
if (!inHandler) {
ctx.flush();
}
@@ -709,4 +712,10 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
public HttpServerResponse push(HttpMethod method, String path, Handler<AsyncResult<HttpServerResponse>> handler) {
return push(method, host, path, handler);
}
@Override
public HttpServerResponse setStreamPriority(StreamPriority priority) {
stream.updatePriority(priority);
return this;
}
}

View File

@@ -102,7 +102,8 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
String hostHeader,
boolean chunked,
ByteBuf buf,
boolean end) {
boolean end,
StreamPriority priority) {
ChannelPipeline pipeline = conn.channel().pipeline();
HttpClientCodec httpCodec = pipeline.get(HttpClientCodec.class);
class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {
@@ -161,7 +162,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536);
pipeline.addAfter("codec", null, new UpgradeRequestHandler());
pipeline.addAfter("codec", null, upgradeHandler);
stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end);
stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, priority);
}
@Override
@@ -244,6 +245,16 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
public NetSocket createNetSocket() {
return stream.createNetSocket();
}
@Override
public StreamPriority priority() {
return stream.priority();
}
@Override
public void updatePriority(StreamPriority streamPriority) {
stream.updatePriority(streamPriority);
}
}
@Override

View File

@@ -11,6 +11,7 @@
package io.vertx.core.http.impl;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
@@ -250,4 +251,5 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
}
return this;
}
}

View File

@@ -22,6 +22,7 @@ import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.http.impl.headers.VertxHttpHeaders;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
@@ -66,7 +67,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
private int followRedirects;
private long written;
private VertxHttpHeaders headers;
private StreamPriority priority;
private HttpClientStream stream;
private boolean connecting;
@@ -77,6 +78,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
super(client, ssl, method, host, port, relativeURI);
this.chunked = false;
this.vertx = vertx;
this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY;
}
@Override
@@ -525,20 +527,20 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
if (completed) {
// we also need to write the head so optimize this and write all out in once
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true, priority);
stream.reportBytesWritten(written);
stream.endRequest();
} else {
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false, priority);
}
} else {
if (completed) {
// we also need to write the head so optimize this and write all out in once
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true, priority);
stream.reportBytesWritten(written);
stream.endRequest();
} else {
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false, priority);
}
}
this.connecting = false;
@@ -672,4 +674,22 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
synchronized Handler<HttpClientRequest> pushHandler() {
return pushHandler;
}
@Override
public synchronized HttpClientRequest setStreamPriority(StreamPriority priority) {
synchronized (this) {
if (stream != null) {
stream.updatePriority(priority);
} else {
this.priority = priority;
}
}
return this;
}
@Override
public synchronized StreamPriority getStreamPriority() {
HttpClientStream s = stream;
return s != null ? s.priority() : priority;
}
}

View File

@@ -16,11 +16,7 @@ import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.*;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
@@ -239,6 +235,11 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase {
throw new IllegalStateException();
}
@Override
public StreamPriority getStreamPriority() {
return stream.priority();
}
@Override
public HttpClientRequest writeCustomFrame(int type, int flags, Buffer payload) {
throw new UnsupportedOperationException("Cannot write frame with HTTP/1.x ");

View File

@@ -19,7 +19,6 @@ import io.vertx.core.http.*;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.streams.ReadStream;
import java.util.ArrayList;
import java.util.List;
@@ -48,6 +47,7 @@ public class HttpClientResponseImpl implements HttpClientResponse {
private Handler<HttpFrame> customFrameHandler;
private Handler<Void> endHandler;
private Handler<Throwable> exceptionHandler;
private Handler<StreamPriority> priorityHandler;
private NetSocket netSocket;
// Track for metrics
@@ -273,4 +273,22 @@ public class HttpClientResponseImpl implements HttpClientResponse {
body = null;
}
}
@Override
public HttpClientResponse streamPriorityHandler(Handler<StreamPriority> handler) {
synchronized (conn) {
priorityHandler = handler;
}
return this;
}
void handlePriorityChange(StreamPriority streamPriority) {
Handler<StreamPriority> handler;
synchronized (conn) {
if ((handler = priorityHandler) == null) {
return;
}
}
handler.handle(streamPriority);
}
}

View File

@@ -17,6 +17,7 @@ import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.net.NetSocket;
/**
@@ -38,7 +39,7 @@ interface HttpClientStream {
HttpConnection connection();
Context getContext();
void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end);
void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority);
void writeBuffer(ByteBuf buf, boolean end);
void writeFrame(int type, int flags, ByteBuf payload);
@@ -56,4 +57,7 @@ interface HttpClientStream {
void endRequest();
NetSocket createNetSocket();
StreamPriority priority();
void updatePriority(StreamPriority streamPriority);
}

View File

@@ -603,4 +603,8 @@ public class HttpServerRequestImpl implements HttpServerRequest {
return QueryStringDecoder.decodeComponent(str, CharsetUtil.UTF_8);
}
@Override
public HttpServerRequest streamPriorityHandler(Handler<StreamPriority> handler) {
return this;
}
}

View File

@@ -671,4 +671,5 @@ public class HttpServerResponseImpl implements HttpServerResponse {
public HttpServerResponse writeCustomFrame(int type, int flags, Buffer payload) {
return this;
}
}

View File

@@ -25,6 +25,7 @@ import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.StreamPriority;
import java.net.URI;
import java.net.URISyntaxException;
@@ -45,6 +46,24 @@ import static io.vertx.core.http.Http2Settings.*;
*/
public final class HttpUtils {
static final StreamPriority DEFAULT_STREAM_PRIORITY = new StreamPriority() {
@Override
public StreamPriority setWeight(short weight) {
throw new UnsupportedOperationException("Unmodifiable stream priority");
}
@Override
public StreamPriority setDependency(int dependency) {
throw new UnsupportedOperationException("Unmodifiable stream priority");
}
@Override
public StreamPriority setExclusive(boolean exclusive) {
throw new UnsupportedOperationException("Unmodifiable stream priority");
}
};
private HttpUtils() {
}

View File

@@ -192,19 +192,19 @@ class VertxHttp2ConnectionHandler<C extends Http2ConnectionBase> extends Http2Co
//
void writeHeaders(Http2Stream stream, Http2Headers headers, boolean end) {
void writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive) {
EventExecutor executor = chctx.executor();
if (executor.inEventLoop()) {
_writeHeaders(stream, headers, end);
_writeHeaders(stream, headers, end, streamDependency, weight, exclusive);
} else {
executor.execute(() -> {
_writeHeaders(stream, headers, end);
_writeHeaders(stream, headers, end, streamDependency, weight, exclusive);
});
}
}
private void _writeHeaders(Http2Stream stream, Http2Headers headers, boolean end) {
encoder().writeHeaders(chctx, stream.id(), headers, 0, end, chctx.newPromise());
private void _writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive) {
encoder().writeHeaders(chctx, stream.id(), headers, streamDependency, weight, exclusive, 0, end, chctx.newPromise());
}
void writeData(Http2Stream stream, ByteBuf chunk, boolean end, Handler<AsyncResult<Void>> handler) {
@@ -455,4 +455,19 @@ class VertxHttp2ConnectionHandler<C extends Http2ConnectionBase> extends Http2Co
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) throws Http2Exception {
throw new UnsupportedOperationException();
}
private void _writePriority(Http2Stream stream, int streamDependency, short weight, boolean exclusive) {
encoder().writePriority(chctx, stream.id(), streamDependency, weight, exclusive, chctx.newPromise());
}
void writePriority(Http2Stream stream, int streamDependency, short weight, boolean exclusive) {
EventExecutor executor = chctx.executor();
if (executor.inEventLoop()) {
_writePriority(stream, streamDependency, weight, exclusive);
} else {
executor.execute(() -> {
_writePriority(stream, streamDependency, weight, exclusive);
});
}
}
}

View File

@@ -22,6 +22,7 @@ import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
@@ -102,7 +103,11 @@ class VertxHttp2NetSocket<C extends Http2ConnectionBase> extends VertxHttp2Strea
}
}
// NetSocket impl
@Override
void handlePriorityChange(StreamPriority streamPriority) {
}
// NetSocket impl
@Override
public NetSocket exceptionHandler(Handler<Throwable> handler) {

View File

@@ -18,6 +18,7 @@ import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.streams.impl.InboundBuffer;
@@ -39,6 +40,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
private int pendingBytes;
private MultiMap trailers;
private boolean writable;
private StreamPriority priority;
VertxHttp2Stream(C conn, Http2Stream stream, boolean writable) {
this.conn = conn;
@@ -48,6 +50,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
this.context = conn.getContext();
this.writable = writable;
this.pending = new InboundBuffer<>(context, 5);
this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY;
pending.drainHandler(v -> {
int numBytes = pendingBytes;
@@ -125,7 +128,11 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
}
void writeHeaders(Http2Headers headers, boolean end) {
conn.handler.writeHeaders(stream, headers, end);
conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive());
}
private void writePriorityFrame(StreamPriority priority) {
conn.handler.writePriority(stream, priority.getDependency(), priority.getWeight(), priority.isExclusive());
}
void writeData(ByteBuf chunk, boolean end) {
@@ -156,4 +163,23 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
void handleClose() {
}
synchronized void priority(StreamPriority streamPriority) {
this.priority = streamPriority;
}
synchronized StreamPriority priority() {
return priority;
}
synchronized void updatePriority(StreamPriority priority) {
if (!this.priority.equals(priority)) {
this.priority = priority;
if (stream.isHeadersSent()) {
writePriorityFrame(priority);
}
}
}
abstract void handlePriorityChange(StreamPriority streamPriority);
}

View File

@@ -1554,8 +1554,6 @@ public class Http2ClientTest extends Http2TestBase {
assertSame(((HttpClientConnection)conn).channel(), ((HttpClientConnection)resp2.request().connection()).channel());
testComplete();
}).exceptionHandler(this::fail).end();
}).connectionHandler(conn -> {
System.out.println("CONNECTED " + conn);
}).exceptionHandler(this::fail).end();
await();
} finally {
@@ -1934,4 +1932,174 @@ public class Http2ClientTest extends Http2TestBase {
await();
}
@Test
public void testStreamPriority() throws Exception {
StreamPriority requestStreamPriority = new StreamPriority().setDependency(123).setWeight((short)45).setExclusive(true);
StreamPriority responseStreamPriority = new StreamPriority().setDependency(153).setWeight((short)75).setExclusive(false);
waitFor(2);
ServerBootstrap bootstrap = createH2Server((decoder, encoder) -> new Http2EventAdapter() {
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
vertx.runOnContext(v -> {
assertEquals(requestStreamPriority.getDependency(), streamDependency);
assertEquals(requestStreamPriority.getWeight(), weight);
assertEquals(requestStreamPriority.isExclusive(), exclusive);
encoder.writeHeaders(ctx, streamId, new DefaultHttp2Headers().status("200"), responseStreamPriority.getDependency(), responseStreamPriority.getWeight(), responseStreamPriority.isExclusive(), 0, true, ctx.newPromise());
ctx.flush();
if(endStream)
complete();
});
}
});
ChannelFuture s = bootstrap.bind(DEFAULT_HTTPS_HOST, DEFAULT_HTTPS_PORT).sync();
try {
HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath").setStreamPriority(requestStreamPriority);
req.handler(resp -> {
assertEquals(responseStreamPriority, resp.request().getStreamPriority());
Context ctx = vertx.getOrCreateContext();
assertOnIOContext(ctx);
resp.endHandler(v -> {
complete();
});
}).end();
await();
} finally {
s.channel().close().sync();
}
}
@Test
public void testStreamPriorityChange() throws Exception {
StreamPriority requestStreamPriority = new StreamPriority().setDependency(123).setWeight((short)45).setExclusive(true);
StreamPriority requestStreamPriority2 = new StreamPriority().setDependency(223).setWeight((short)145).setExclusive(false);
StreamPriority responseStreamPriority = new StreamPriority().setDependency(153).setWeight((short)75).setExclusive(false);
StreamPriority responseStreamPriority2 = new StreamPriority().setDependency(253).setWeight((short)175).setExclusive(true);
waitFor(5);
ServerBootstrap bootstrap = createH2Server((decoder, encoder) -> new Http2EventAdapter() {
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
vertx.runOnContext(v -> {
assertEquals(requestStreamPriority.getDependency(), streamDependency);
assertEquals(requestStreamPriority.getWeight(), weight);
assertEquals(requestStreamPriority.isExclusive(), exclusive);
assertFalse(endStream);
complete();
});
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) throws Http2Exception {
vertx.runOnContext(v -> {
assertEquals(requestStreamPriority2.getDependency(), streamDependency);
assertEquals(requestStreamPriority2.getWeight(), weight);
assertEquals(requestStreamPriority2.isExclusive(), exclusive);
complete();
});
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
if(endOfStream) {
encoder.writeHeaders(ctx, streamId, new DefaultHttp2Headers().status("200"), responseStreamPriority.getDependency(), responseStreamPriority.getWeight(), responseStreamPriority.isExclusive(), 0, false, ctx.newPromise());
ctx.flush();
encoder.writePriority(ctx, streamId, responseStreamPriority2.getDependency(), responseStreamPriority2.getWeight(), responseStreamPriority2.isExclusive(), ctx.newPromise());
ctx.flush();
encoder.writeData(ctx, streamId, Buffer.buffer("hello").getByteBuf(), 0, true, ctx.newPromise());
ctx.flush();
vertx.runOnContext(v -> {
complete();
});
}
return super.onDataRead(ctx, streamId, data, padding, endOfStream);
}
});
ChannelFuture s = bootstrap.bind(DEFAULT_HTTPS_HOST, DEFAULT_HTTPS_PORT).sync();
try {
HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath").setStreamPriority(requestStreamPriority);
req.handler(resp -> {
assertEquals(responseStreamPriority, resp.request().getStreamPriority());
Context ctx = vertx.getOrCreateContext();
assertOnIOContext(ctx);
resp.streamPriorityHandler(streamPriority -> {
assertEquals(responseStreamPriority2, streamPriority);
assertEquals(responseStreamPriority2, resp.request().getStreamPriority());
complete();
});
resp.endHandler(v -> {
assertEquals(responseStreamPriority2, resp.request().getStreamPriority());
complete();
});
}).sendHead(h -> {
req.setStreamPriority(requestStreamPriority2);
req.end();
});
await();
} finally {
s.channel().close().sync();
}
}
@Test
public void testStreamPriorityNoChange() throws Exception {
StreamPriority requestStreamPriority = new StreamPriority().setDependency(123).setWeight((short)45).setExclusive(true);
StreamPriority responseStreamPriority = new StreamPriority().setDependency(153).setWeight((short)75).setExclusive(false);
waitFor(3);
ServerBootstrap bootstrap = createH2Server((decoder, encoder) -> new Http2EventAdapter() {
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
vertx.runOnContext(v -> {
assertEquals(requestStreamPriority.getDependency(), streamDependency);
assertEquals(requestStreamPriority.getWeight(), weight);
assertEquals(requestStreamPriority.isExclusive(), exclusive);
assertFalse(endStream);
complete();
});
encoder.writeHeaders(ctx, streamId, new DefaultHttp2Headers().status("200"), responseStreamPriority.getDependency(), responseStreamPriority.getWeight(), responseStreamPriority.isExclusive(), 0, false, ctx.newPromise());
ctx.flush();
encoder.writePriority(ctx, streamId, responseStreamPriority.getDependency(), responseStreamPriority.getWeight(), responseStreamPriority.isExclusive(), ctx.newPromise());
ctx.flush();
encoder.writeData(ctx, streamId, Buffer.buffer("hello").getByteBuf(), 0, true, ctx.newPromise());
ctx.flush();
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) throws Http2Exception {
fail("Priority frame shoudl not be sent");
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
if(endOfStream) {
vertx.runOnContext(v -> {
complete();
});
}
return super.onDataRead(ctx, streamId, data, padding, endOfStream);
}
});
ChannelFuture s = bootstrap.bind(DEFAULT_HTTPS_HOST, DEFAULT_HTTPS_PORT).sync();
try {
HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath").setStreamPriority(requestStreamPriority);
req.handler(resp -> {
assertEquals(responseStreamPriority, resp.request().getStreamPriority());
Context ctx = vertx.getOrCreateContext();
assertOnIOContext(ctx);
resp.streamPriorityHandler(streamPriority -> {
fail("Stream priority handler shoudl not be called");
});
resp.endHandler(v -> {
assertEquals(responseStreamPriority, resp.request().getStreamPriority());
complete();
});
}).sendHead(h -> {
req.setStreamPriority(requestStreamPriority);
req.end();
});
await();
} finally {
s.channel().close().sync();
}
}
}

View File

@@ -3074,4 +3074,205 @@ public class Http2ServerTest extends Http2TestBase {
assertEquals(Http1xOrH2CHandler.HTTP_2_PREFACE, res.toString(StandardCharsets.UTF_8));
assertNull(ch.pipeline().get(TestHttp1xOrH2CHandler.class));
}
@Test
public void testStreamPriority() throws Exception {
StreamPriority requestStreamPriority = new StreamPriority().setDependency(123).setWeight((short)45).setExclusive(true);
StreamPriority responseStreamPriority = new StreamPriority().setDependency(153).setWeight((short)75).setExclusive(false);
waitFor(3);
server.requestHandler(req -> {
HttpServerResponse resp = req.response();
assertEquals(requestStreamPriority, req.streamPriority());
resp.setStatusCode(200);
resp.setStreamPriority(responseStreamPriority);
resp.end("data");
complete();
});
startServer();
TestClient client = new TestClient();
ChannelFuture fut = client.connect(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, request -> {
int id = request.nextStreamId();
request.encoder.writeHeaders(request.context, id, GET("/"), requestStreamPriority.getDependency(), requestStreamPriority.getWeight(), requestStreamPriority.isExclusive(), 0, true, request.context.newPromise());
request.context.flush();
request.decoder.frameListener(new Http2FrameAdapter() {
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding,
boolean endStream) throws Http2Exception {
super.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream);
vertx.runOnContext(v -> {
assertEquals(id, streamId);
assertEquals(responseStreamPriority.getDependency(), streamDependency);
assertEquals(responseStreamPriority.getWeight(), weight);
assertEquals(responseStreamPriority.isExclusive(), exclusive);
complete();
});
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) throws Http2Exception {
vertx.runOnContext(v -> {
fail("Priority frame should not be sent");
});
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
if(endOfStream) {
vertx.runOnContext(v -> {
complete();
});
}
return super.onDataRead(ctx, streamId, data, padding, endOfStream);
}
});
});
fut.sync();
await();
}
@Test
public void testStreamPriorityChange() throws Exception {
StreamPriority requestStreamPriority = new StreamPriority().setDependency(123).setWeight((short) 45).setExclusive(true);
StreamPriority requestStreamPriority2 = new StreamPriority().setDependency(223).setWeight((short) 145).setExclusive(false);
StreamPriority responseStreamPriority = new StreamPriority().setDependency(153).setWeight((short) 75).setExclusive(false);
StreamPriority responseStreamPriority2 = new StreamPriority().setDependency(253).setWeight((short) 175).setExclusive(true);
waitFor(5);
server.requestHandler(req -> {
HttpServerResponse resp = req.response();
assertEquals(requestStreamPriority, req.streamPriority());
req.bodyHandler(b -> {
assertEquals(requestStreamPriority2, req.streamPriority());
resp.setStatusCode(200);
resp.setStreamPriority(responseStreamPriority);
resp.write("hello");
resp.setStreamPriority(responseStreamPriority2);
resp.end("world");
complete();
});
req.streamPriorityHandler(streamPriority -> {
assertEquals(requestStreamPriority2, streamPriority);
assertEquals(requestStreamPriority2, req.streamPriority());
complete();
});
});
startServer();
TestClient client = new TestClient();
ChannelFuture fut = client.connect(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, request -> {
int id = request.nextStreamId();
request.encoder.writeHeaders(request.context, id, GET("/"), requestStreamPriority.getDependency(), requestStreamPriority.getWeight(), requestStreamPriority.isExclusive(), 0, false, request.context.newPromise());
request.context.flush();
request.encoder.writePriority(request.context, id, requestStreamPriority2.getDependency(), requestStreamPriority2.getWeight(), requestStreamPriority2.isExclusive(), request.context.newPromise());
request.context.flush();
request.encoder.writeData(request.context, id, Buffer.buffer("hello").getByteBuf(), 0, true, request.context.newPromise());
request.context.flush();
request.decoder.frameListener(new Http2FrameAdapter() {
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding,
boolean endStream) throws Http2Exception {
super.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream);
vertx.runOnContext(v -> {
assertEquals(id, streamId);
assertEquals(responseStreamPriority.getDependency(), streamDependency);
assertEquals(responseStreamPriority.getWeight(), weight);
assertEquals(responseStreamPriority.isExclusive(), exclusive);
complete();
});
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) throws Http2Exception {
vertx.runOnContext(v -> {
assertEquals(id, streamId);
assertEquals(responseStreamPriority2.getDependency(), streamDependency);
assertEquals(responseStreamPriority2.getWeight(), weight);
assertEquals(responseStreamPriority2.isExclusive(), exclusive);
complete();
});
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
if(endOfStream) {
vertx.runOnContext(v -> {
complete();
});
}
return super.onDataRead(ctx, streamId, data, padding, endOfStream);
}
});
});
fut.sync();
await();
}
@Test
public void testStreamPriorityNoChange() throws Exception {
StreamPriority requestStreamPriority = new StreamPriority().setDependency(123).setWeight((short)45).setExclusive(true);
StreamPriority responseStreamPriority = new StreamPriority().setDependency(153).setWeight((short)75).setExclusive(false);
waitFor(3);
server.requestHandler(req -> {
HttpServerResponse resp = req.response();
assertEquals(requestStreamPriority, req.streamPriority());
req.bodyHandler(b -> {
assertEquals(requestStreamPriority, req.streamPriority());
resp.setStatusCode(200);
resp.setStreamPriority(responseStreamPriority);
resp.write("hello");
resp.setStreamPriority(responseStreamPriority);
resp.end("world");
complete();
});
req.streamPriorityHandler(streamPriority -> {
fail("Stream priority handler should not be called");
});
});
startServer();
TestClient client = new TestClient();
ChannelFuture fut = client.connect(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, request -> {
int id = request.nextStreamId();
request.encoder.writeHeaders(request.context, id, GET("/"), requestStreamPriority.getDependency(), requestStreamPriority.getWeight(), requestStreamPriority.isExclusive(), 0, false, request.context.newPromise());
request.context.flush();
request.encoder.writePriority(request.context, id, requestStreamPriority.getDependency(), requestStreamPriority.getWeight(), requestStreamPriority.isExclusive(), request.context.newPromise());
request.context.flush();
request.encoder.writeData(request.context, id, Buffer.buffer("hello").getByteBuf(), 0, true, request.context.newPromise());
request.context.flush();
request.decoder.frameListener(new Http2FrameAdapter() {
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding,
boolean endStream) throws Http2Exception {
super.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream);
vertx.runOnContext(v -> {
assertEquals(id, streamId);
assertEquals(responseStreamPriority.getDependency(), streamDependency);
assertEquals(responseStreamPriority.getWeight(), weight);
assertEquals(responseStreamPriority.isExclusive(), exclusive);
complete();
});
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) throws Http2Exception {
vertx.runOnContext(v -> {
fail("Priority frame should not be sent");
});
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
if(endOfStream) {
vertx.runOnContext(v -> {
complete();
});
}
return super.onDataRead(ctx, streamId, data, padding, endOfStream);
}
});
});
fut.sync();
await();
}
}

View File

@@ -12,6 +12,7 @@
package io.vertx.core.http;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.Http2ServerConnection;
@@ -442,4 +443,269 @@ public class Http2Test extends HttpTest {
});
await();
}
@Test
public void testStreamWeightAndDependency() throws Exception {
int requestStreamDependency = 56;
short requestStreamWeight = 43;
int responseStreamDependency = 98;
short responseStreamWeight = 55;
waitFor(2);
server.requestHandler(req -> {
assertEquals(requestStreamWeight, req.streamPriority().getWeight());
assertEquals(requestStreamDependency, req.streamPriority().getDependency());
req.response().setStreamPriority(new StreamPriority()
.setDependency(responseStreamDependency)
.setWeight(responseStreamWeight)
.setExclusive(false));
req.response().end();
complete();
});
startServer();
client = vertx.createHttpClient(createBaseClientOptions().setHttp2KeepAliveTimeout(3).setPoolCleanerPeriod(1));
HttpClientRequest request = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
assertEquals(responseStreamWeight, resp.request().getStreamPriority().getWeight());
assertEquals(responseStreamDependency, resp.request().getStreamPriority().getDependency());
complete();
});
request.setStreamPriority(new StreamPriority()
.setDependency(requestStreamDependency)
.setWeight(requestStreamWeight)
.setExclusive(false));
request.end();
await();
}
@Test
public void testStreamWeightAndDependencyChange() throws Exception {
int requestStreamDependency = 56;
short requestStreamWeight = 43;
int requestStreamDependency2 = 157;
short requestStreamWeight2 = 143;
int responseStreamDependency = 98;
short responseStreamWeight = 55;
int responseStreamDependency2 = 198;
short responseStreamWeight2 = 155;
waitFor(4);
server.requestHandler(req -> {
req.streamPriorityHandler( sp -> {
assertEquals(requestStreamWeight2, sp.getWeight());
assertEquals(requestStreamDependency2, sp.getDependency());
assertEquals(requestStreamWeight2, req.streamPriority().getWeight());
assertEquals(requestStreamDependency2, req.streamPriority().getDependency());
complete();
});
assertEquals(requestStreamWeight, req.streamPriority().getWeight());
assertEquals(requestStreamDependency, req.streamPriority().getDependency());
req.response().setStreamPriority(new StreamPriority()
.setDependency(responseStreamDependency)
.setWeight(responseStreamWeight)
.setExclusive(false));
req.response().write("hello");
req.response().setStreamPriority(new StreamPriority()
.setDependency(responseStreamDependency2)
.setWeight(responseStreamWeight2)
.setExclusive(false));
req.response().drainHandler(h -> {});
req.response().end("world");
complete();
});
startServer();
client = vertx.createHttpClient(createBaseClientOptions().setHttp2KeepAliveTimeout(3).setPoolCleanerPeriod(1));
HttpClientRequest request = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
assertEquals(responseStreamWeight, resp.request().getStreamPriority().getWeight());
assertEquals(responseStreamDependency, resp.request().getStreamPriority().getDependency());
resp.streamPriorityHandler( sp -> {
assertEquals(responseStreamWeight2, sp.getWeight());
assertEquals(responseStreamDependency2, sp.getDependency());
assertEquals(responseStreamWeight2, resp.request().getStreamPriority().getWeight());
assertEquals(responseStreamDependency2, resp.request().getStreamPriority().getDependency());
complete();
});
complete();
});
request.setStreamPriority(new StreamPriority()
.setDependency(requestStreamDependency)
.setWeight(requestStreamWeight)
.setExclusive(false));
request.sendHead(h -> {
request.setStreamPriority(new StreamPriority()
.setDependency(requestStreamDependency2)
.setWeight(requestStreamWeight2)
.setExclusive(false));
request.end();
});
await();
}
@Test
public void testStreamWeightAndDependencyNoChange() throws Exception {
int requestStreamDependency = 56;
short requestStreamWeight = 43;
int responseStreamDependency = 98;
short responseStreamWeight = 55;
waitFor(2);
server.requestHandler(req -> {
req.streamPriorityHandler( sp -> {
fail("Stream priority handler shoudl not be called");
});
assertEquals(requestStreamWeight, req.streamPriority().getWeight());
assertEquals(requestStreamDependency, req.streamPriority().getDependency());
req.response().setStreamPriority(new StreamPriority()
.setDependency(responseStreamDependency)
.setWeight(responseStreamWeight)
.setExclusive(false));
req.response().write("hello");
req.response().setStreamPriority(new StreamPriority()
.setDependency(responseStreamDependency)
.setWeight(responseStreamWeight)
.setExclusive(false));
req.response().drainHandler(h -> {});
req.response().end("world");
complete();
});
startServer();
client = vertx.createHttpClient(createBaseClientOptions().setHttp2KeepAliveTimeout(3).setPoolCleanerPeriod(1));
HttpClientRequest request = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
assertEquals(responseStreamWeight, resp.request().getStreamPriority().getWeight());
assertEquals(responseStreamDependency, resp.request().getStreamPriority().getDependency());
resp.streamPriorityHandler( sp -> {
fail("Stream priority handler shoudl not be called");
complete();
});
complete();
});
request.setStreamPriority(new StreamPriority()
.setDependency(requestStreamDependency)
.setWeight(requestStreamWeight)
.setExclusive(false));
request.sendHead(h -> {
request.setStreamPriority(new StreamPriority()
.setDependency(requestStreamDependency)
.setWeight(requestStreamWeight)
.setExclusive(false));
request.end();
});
await();
}
@Test
public void testStreamWeightAndDependencyInheritance() throws Exception {
int requestStreamDependency = 86;
short requestStreamWeight = 53;
waitFor(2);
server.requestHandler(req -> {
assertEquals(requestStreamWeight, req.streamPriority().getWeight());
assertEquals(requestStreamDependency, req.streamPriority().getDependency());
req.response().end();
complete();
});
startServer();
client = vertx.createHttpClient(createBaseClientOptions().setHttp2KeepAliveTimeout(3).setPoolCleanerPeriod(1));
HttpClientRequest request = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
assertEquals(requestStreamWeight, resp.request().getStreamPriority().getWeight());
assertEquals(requestStreamDependency, resp.request().getStreamPriority().getDependency());
complete();
});
request.setStreamPriority(new StreamPriority()
.setDependency(requestStreamDependency)
.setWeight(requestStreamWeight)
.setExclusive(false));
request.end();
await();
}
@Test
public void testDefaultStreamWeightAndDependency() throws Exception {
int defaultStreamDependency = 0;
short defaultStreamWeight = Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
waitFor(2);
server.requestHandler(req -> {
assertEquals(defaultStreamWeight, req.streamPriority().getWeight());
assertEquals(defaultStreamDependency, req.streamPriority().getDependency());
req.response().end();
complete();
});
startServer();
client = vertx.createHttpClient(createBaseClientOptions().setHttp2KeepAliveTimeout(3).setPoolCleanerPeriod(1));
HttpClientRequest request = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
assertEquals(defaultStreamWeight, resp.request().getStreamPriority().getWeight());
assertEquals(defaultStreamDependency, resp.request().getStreamPriority().getDependency());
complete();
});
request.end();
await();
}
@Test
public void testStreamWeightAndDependencyPushPromise() throws Exception {
int pushStreamDependency = 456;
short pushStreamWeight = 14;
waitFor(4);
server.requestHandler(req -> {
req.response().push(HttpMethod.GET, "/pushpath", ar -> {
assertTrue(ar.succeeded());
HttpServerResponse pushedResp = ar.result();
pushedResp.setStreamPriority(new StreamPriority()
.setDependency(pushStreamDependency)
.setWeight(pushStreamWeight)
.setExclusive(false));
pushedResp.end();
});
req.response().end();
complete();
});
startServer();
client = vertx.createHttpClient(createBaseClientOptions().setHttp2KeepAliveTimeout(3).setPoolCleanerPeriod(1));
HttpClientRequest request = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
complete();
});
request.pushHandler(pushReq -> {
complete();
pushReq.handler(pushResp -> {
assertEquals(pushStreamDependency, pushResp.request().getStreamPriority().getDependency());
assertEquals(pushStreamWeight, pushResp.request().getStreamPriority().getWeight());
complete();
});
});
request.end();
await();
}
@Test
public void testStreamWeightAndDependencyInheritancePushPromise() throws Exception {
int reqStreamDependency = 556;
short reqStreamWeight = 84;
waitFor(4);
server.requestHandler(req -> {
req.response().push(HttpMethod.GET, "/pushpath", ar -> {
assertTrue(ar.succeeded());
HttpServerResponse pushedResp = ar.result();
pushedResp.end();
});
req.response().end();
complete();
});
startServer();
client = vertx.createHttpClient(createBaseClientOptions().setHttp2KeepAliveTimeout(3).setPoolCleanerPeriod(1));
HttpClientRequest request = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
complete();
});
request.pushHandler(pushReq -> {
complete();
pushReq.handler(pushResp -> {
assertEquals(reqStreamDependency, pushResp.request().getStreamPriority().getDependency());
assertEquals(reqStreamWeight, pushResp.request().getStreamPriority().getWeight());
complete();
});
});
request.setStreamPriority(new StreamPriority()
.setDependency(reqStreamDependency)
.setWeight(reqStreamWeight)
.setExclusive(false));
request.end();
await();
}
}

View File

@@ -4015,6 +4015,8 @@ public abstract class HttpTest extends HttpTestBase {
public HttpClientRequest connectionHandler(@Nullable Handler<HttpConnection> handler) { throw new UnsupportedOperationException(); }
public HttpClientRequest writeCustomFrame(int type, int flags, Buffer payload) { throw new UnsupportedOperationException(); }
public boolean writeQueueFull() { throw new UnsupportedOperationException(); }
@Override public HttpClientRequest setStreamPriority(StreamPriority streamPriority) { return this; }
@Override public StreamPriority getStreamPriority() { return null; }
}
HttpClientRequest req = new MockReq();
class MockResp implements HttpClientResponse {
@@ -4037,6 +4039,7 @@ public abstract class HttpTest extends HttpTestBase {
public HttpClientResponse customFrameHandler(Handler<HttpFrame> handler) { throw new UnsupportedOperationException(); }
public NetSocket netSocket() { throw new UnsupportedOperationException(); }
public HttpClientRequest request() { return req; }
public HttpClientResponse streamPriorityHandler(Handler<StreamPriority> handler) { return this; }
}
MockResp resp = new MockResp();
Function<HttpClientResponse, Future<HttpClientRequest>> handler = client.redirectHandler();