Add support for stream dependency and priority weight

Signed-off-by: Michal Michalowski <michal.michalowski@openet.com>
This commit is contained in:
Michal Michalowski
2018-10-25 17:09:52 +02:00
parent 699ea2fde2
commit 8d4d078e13
17 changed files with 279 additions and 17 deletions

View File

@@ -387,4 +387,15 @@ public interface HttpClientRequest extends WriteStream<Buffer>, ReadStream<HttpC
default HttpClientRequest writeCustomFrame(HttpFrame frame) {
return writeCustomFrame(frame.type(), frame.flags(), frame.payload());
}
/**
* Sets the priority weight of the associated stream
* @param weight The weight priority weight or this requet's stream
*/
void setWeight(short weight);
/**
* Sets the dependecy of associated stream
* @param The identifier of the HTTP/2 stream this request's stream depends on
*/
void setStreamDependency(int streamDependency);
}

View File

@@ -153,4 +153,15 @@ public interface HttpClientResponse extends ReadStream<Buffer> {
*/
@CacheReturn
HttpClientRequest request();
/**
* @return The identifier of the HTTP/2 stream this request's stream depends on
*/
int getStreamDependency();
/**
* @return The weight priority weight or this requet's stream
*/
short getWeight();
}

View File

@@ -11,6 +11,7 @@
package io.vertx.core.http;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.codegen.annotations.*;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
@@ -328,4 +329,18 @@ public interface HttpServerRequest extends ReadStream<Buffer> {
@CacheReturn
HttpConnection connection();
/**
* @return The identifier of the HTTP/2 stream this request's stream depends on
*/
default int getStreamDependency() {
return 0;
}
/**
* @return The weight priority weight or this requet's stream
*/
default short getWeight() {
return Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
}
}

View File

@@ -472,4 +472,15 @@ public interface HttpServerResponse extends WriteStream<Buffer> {
default HttpServerResponse writeCustomFrame(HttpFrame frame) {
return writeCustomFrame(frame.type(), frame.flags(), frame.payload());
}
/**
* Sets the priority weight of the associated stream
* @param weight The weight priority weight or this requet's stream
*/
void setWeight(short weight);
/**
* Sets the dependecy of associated stream
* @param The identifier of the HTTP/2 stream this request's stream depends on
*/
void setStreamDependency(int streamDependency);
}

View File

@@ -17,6 +17,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
@@ -132,6 +133,14 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
listener.onRecycle(expired);
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
stream.setStreamDependency(streamDependency);
stream.setWeight(weight);
onHeadersRead(ctx, streamId, headers, padding, endOfStream);
}
@Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
@@ -175,6 +184,8 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
private HttpClientResponseImpl response;
private boolean requestEnded;
private boolean responseEnded;
private int streamDependency = 0;
private short weight = Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
Http2ClientStream(Http2ClientConnection conn, Http2Stream stream, boolean writable) {
super(conn, stream, writable);
@@ -195,6 +206,24 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
return super.id();
}
@Override
public int getStreamDependency() {
return streamDependency;
}
public void setStreamDependency(int streamDependency) {
this.streamDependency = streamDependency;
}
@Override
public short getWeight() {
return weight;
}
public void setWeight(short weight) {
this.weight = weight;
}
@Override
void handleEnd(MultiMap trailers) {
if (conn.metrics != null) {
@@ -315,7 +344,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
}
@Override
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end) {
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end, int streamDependency, short weight) {
Http2Headers h = new DefaultHttp2Headers();
h.method(method != HttpMethod.OTHER ? method.name() : rawMethod);
if (method == HttpMethod.CONNECT) {
@@ -341,7 +370,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
if (conn.metrics != null) {
request.metric(conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request));
}
writeHeaders(h, end && content == null);
writeHeaders(h, end && content == null, streamDependency, weight);
if (content != null) {
writeBuffer(content, end);
} else {
@@ -349,6 +378,11 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
}
}
@Override
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end) {
writeHead(method, rawMethod, uri, headers, hostHeader, chunked, content, end, 0, Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT);
}
@Override
public void writeBuffer(ByteBuf buf, boolean end) {
if (buf == null && end) {

View File

@@ -27,6 +27,7 @@ import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.Http2ClientConnection.Http2ClientStream;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.metrics.HttpServerMetrics;
@@ -107,7 +108,7 @@ public class Http2ServerConnection extends Http2ConnectionBase {
@Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endOfStream) {
Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) {
VertxHttp2Stream stream = streams.get(streamId);
if (stream == null) {
if (isMalformedRequest(headers)) {
@@ -115,6 +116,8 @@ public class Http2ServerConnection extends Http2ConnectionBase {
return;
}
Http2ServerRequestImpl req = createRequest(streamId, headers);
req.setStreamDependency(streamDependency);
req.setWeight(weight);
stream = req;
CharSequence value = headers.get(HttpHeaderNames.EXPECT);
if (options.isHandle100ContinueAutomatically() &&
@@ -141,6 +144,12 @@ public class Http2ServerConnection extends Http2ConnectionBase {
}
}
@Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endOfStream) {
onHeadersRead(ctx, streamId, headers, 0, Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream);
}
@Override
public synchronized void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
Long v = settings.maxConcurrentStreams();

View File

@@ -20,6 +20,7 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.vertx.codegen.annotations.Nullable;
@@ -79,6 +80,9 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
private Handler<Throwable> exceptionHandler;
private Handler<HttpFrame> customFrameHandler;
private int streamDependency = 0;
private short weight = Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
public Http2ServerRequestImpl(Http2ServerConnection conn, Http2Stream stream, HttpServerMetrics metrics,
String serverOrigin, Http2Headers headers, String contentEncoding, boolean writable) {
@@ -516,4 +520,24 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
public HttpConnection connection() {
return conn;
}
@Override
public int getStreamDependency() {
return streamDependency;
}
public void setStreamDependency(int streamDependency) {
this.streamDependency = streamDependency;
}
@Override
public short getWeight() {
return weight;
}
public void setWeight(short weight) {
this.weight = weight;
}
}

View File

@@ -20,6 +20,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpStatusClass;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
@@ -78,6 +79,8 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
private int numPush;
private boolean inHandler;
private NetSocket netSocket;
private short weight = Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
private int streamDependecy = 0;
public Http2ServerResponseImpl(Http2ServerConnection conn,
VertxHttp2Stream stream,
@@ -322,7 +325,7 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
public HttpServerResponse writeContinue() {
synchronized (conn) {
checkHeadWritten();
stream.writeHeaders(new DefaultHttp2Headers().status("100"), false);
stream.writeHeaders(new DefaultHttp2Headers().status("100"), false, streamDependecy, weight);
ctx.flush();
return this;
}
@@ -415,7 +418,7 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
}
headWritten = true;
headers.status(Integer.toString(status.code())); // Could be optimized for usual case ?
stream.writeHeaders(headers, end);
stream.writeHeaders(headers, end, streamDependecy, weight);
if (end) {
ctx.flush();
}
@@ -446,7 +449,7 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
stream.writeData(chunk, end && trailers == null);
}
if (end && trailers != null) {
stream.writeHeaders(trailers, true);
stream.writeHeaders(trailers, true, streamDependecy, weight);
}
bodyEndHandler = this.bodyEndHandler;
}
@@ -709,4 +712,14 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
public HttpServerResponse push(HttpMethod method, String path, Handler<AsyncResult<HttpServerResponse>> handler) {
return push(method, host, path, handler);
}
@Override
public void setStreamDependency(int streamDependency) {
this.streamDependecy = streamDependency;
}
@Override
public void setWeight(short weight) {
this.weight = weight;
}
}

View File

@@ -11,6 +11,7 @@
package io.vertx.core.http.impl;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
@@ -44,6 +45,8 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
private Object metric;
private boolean paused;
private HttpClientResponseImpl response;
protected int streamDependency = 0;
protected short weight = Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
HttpClientRequestBase(HttpClientImpl client, boolean ssl, HttpMethod method, String host, int port, String uri) {
this.client = client;
@@ -250,4 +253,14 @@ public abstract class HttpClientRequestBase implements HttpClientRequest {
}
return this;
}
@Override
public void setWeight(short weight) {
this.weight = weight;
}
@Override
public void setStreamDependency(int streamDependency) {
this.streamDependency = streamDependency;
}
}

View File

@@ -525,20 +525,20 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
if (completed) {
// we also need to write the head so optimize this and write all out in once
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true, streamDependency, weight);
stream.reportBytesWritten(written);
stream.endRequest();
} else {
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false, streamDependency, weight);
}
} else {
if (completed) {
// we also need to write the head so optimize this and write all out in once
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true, streamDependency, weight);
stream.reportBytesWritten(written);
stream.endRequest();
} else {
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false, streamDependency, weight);
}
}
this.connecting = false;

View File

@@ -189,6 +189,16 @@ public class HttpClientResponseImpl implements HttpClientResponse {
return this;
}
}
@Override
public int getStreamDependency() {
return stream.getStreamDependency();
}
@Override
public short getWeight() {
return stream.getWeight();
}
void handleUnknownFrame(HttpFrame frame) {
synchronized (conn) {

View File

@@ -12,6 +12,7 @@
package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpConnection;
@@ -38,6 +39,9 @@ interface HttpClientStream {
HttpConnection connection();
Context getContext();
default void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, int streamDependency, short weight) {
writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end);
}
void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end);
void writeBuffer(ByteBuf buf, boolean end);
void writeFrame(int type, int flags, ByteBuf payload);
@@ -56,4 +60,12 @@ interface HttpClientStream {
void endRequest();
NetSocket createNetSocket();
default int getStreamDependency() {
return 0;
}
default short getWeight() {
return Http2CodecUtil.CONNECTION_STREAM_ID;
}
}

View File

@@ -666,4 +666,14 @@ public class HttpServerResponseImpl implements HttpServerResponse {
public HttpServerResponse writeCustomFrame(int type, int flags, Buffer payload) {
return this;
}
@Override
public void setStreamDependency(int streamDependency) {
// NO-OP
}
@Override
public void setWeight(short weight) {
// NO-OP
}
}

View File

@@ -188,19 +188,19 @@ class VertxHttp2ConnectionHandler<C extends Http2ConnectionBase> extends Http2Co
//
void writeHeaders(Http2Stream stream, Http2Headers headers, boolean end) {
void writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight) {
EventExecutor executor = chctx.executor();
if (executor.inEventLoop()) {
_writeHeaders(stream, headers, end);
_writeHeaders(stream, headers, end, streamDependency, weight);
} else {
executor.execute(() -> {
_writeHeaders(stream, headers, end);
_writeHeaders(stream, headers, end, streamDependency, weight);
});
}
}
private void _writeHeaders(Http2Stream stream, Http2Headers headers, boolean end) {
encoder().writeHeaders(chctx, stream.id(), headers, 0, end, chctx.newPromise());
private void _writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight) {
encoder().writeHeaders(chctx, stream.id(), headers, streamDependency, weight, false, 0, end, chctx.newPromise());
}
void writeData(Http2Stream stream, ByteBuf chunk, boolean end) {

View File

@@ -124,8 +124,8 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
conn.handler.writeFrame(stream, (byte) type, (short) flags, payload);
}
void writeHeaders(Http2Headers headers, boolean end) {
conn.handler.writeHeaders(stream, headers, end);
void writeHeaders(Http2Headers headers, boolean end, int streamDependency, short weight) {
conn.handler.writeHeaders(stream, headers, end, streamDependency, weight);
}
void writeData(ByteBuf chunk, boolean end) {

View File

@@ -12,6 +12,7 @@
package io.vertx.core.http;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.Http2ServerConnection;
@@ -442,4 +443,88 @@ public class Http2Test extends HttpTest {
});
await();
}
@Test
public void testStreamWeightAndDependency() throws Exception {
int requestStreamDependency = 56;
short requestStreamWeight = 43;
int responseStreamDependency = 98;
short responseStreamWeight = 55;
waitFor(2);
server.requestHandler(req -> {
assertEquals(requestStreamWeight, req.getWeight());
assertEquals(requestStreamDependency, req.getStreamDependency());
req.response().setWeight(responseStreamWeight);
req.response().setStreamDependency(responseStreamDependency);
req.response().end();
complete();
});
startServer();
client = vertx.createHttpClient(createBaseClientOptions().setHttp2KeepAliveTimeout(3).setPoolCleanerPeriod(1));
HttpClientRequest request = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
assertEquals(responseStreamWeight, resp.getWeight());
assertEquals(responseStreamDependency, resp.getStreamDependency());
complete();
});
request.setWeight(requestStreamWeight);
request.setStreamDependency(requestStreamDependency);
request.end();
await();
}
@Test
public void testDefaultStreamWeightAndDependency() throws Exception {
int defaultStreamDependency = 0;
short defaultStreamWeight = Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
waitFor(2);
server.requestHandler(req -> {
assertEquals(defaultStreamWeight, req.getWeight());
assertEquals(defaultStreamDependency, req.getStreamDependency());
req.response().end();
complete();
});
startServer();
client = vertx.createHttpClient(createBaseClientOptions().setHttp2KeepAliveTimeout(3).setPoolCleanerPeriod(1));
HttpClientRequest request = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
assertEquals(defaultStreamWeight, resp.getWeight());
assertEquals(defaultStreamDependency, resp.getStreamDependency());
complete();
});
request.end();
await();
}
@Test
public void testStreamWeightAndDependencyPushPromise() throws Exception {
int pushStreamDependency = 456;
short pushStreamWeight = 14;
waitFor(4);
server.requestHandler(req -> {
req.response().push(HttpMethod.GET, "/pushpath", ar -> {
assertTrue(ar.succeeded());
HttpServerResponse pushedResp = ar.result();
pushedResp.setWeight((short)14);
pushedResp.setStreamDependency(456);
pushedResp.end();
});
req.response().end();
complete();
});
startServer();
client = vertx.createHttpClient(createBaseClientOptions().setHttp2KeepAliveTimeout(3).setPoolCleanerPeriod(1));
HttpClientRequest request = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
complete();
});
request.pushHandler(pushReq -> {
complete();
pushReq.handler(pushResp -> {
assertEquals(pushStreamDependency, pushResp.getStreamDependency());
assertEquals(pushStreamWeight, pushResp.getWeight());
complete();
});
});
request.end();
await();
}
}

View File

@@ -4015,6 +4015,8 @@ public abstract class HttpTest extends HttpTestBase {
public HttpClientRequest connectionHandler(@Nullable Handler<HttpConnection> handler) { throw new UnsupportedOperationException(); }
public HttpClientRequest writeCustomFrame(int type, int flags, Buffer payload) { throw new UnsupportedOperationException(); }
public boolean writeQueueFull() { throw new UnsupportedOperationException(); }
public void setWeight(short weight) {}
public void setStreamDependency(int streamDependency) {}
}
HttpClientRequest req = new MockReq();
class MockResp implements HttpClientResponse {
@@ -4037,6 +4039,8 @@ public abstract class HttpTest extends HttpTestBase {
public HttpClientResponse customFrameHandler(Handler<HttpFrame> handler) { throw new UnsupportedOperationException(); }
public NetSocket netSocket() { throw new UnsupportedOperationException(); }
public HttpClientRequest request() { return req; }
@Override public short getWeight() { return 0; }
@Override public int getStreamDependency() { return -1; }
}
MockResp resp = new MockResp();
Function<HttpClientResponse, Future<HttpClientRequest>> handler = client.redirectHandler();