Simplify HttpClientStream API

Signed-off-by: Michal Michalowski <michal.michalowski@openet.com>
This commit is contained in:
Michal Michalowski
2018-11-15 11:59:18 +00:00
parent e15e8cac5e
commit 3f2b70f051
5 changed files with 26 additions and 20 deletions

View File

@@ -250,7 +250,7 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC
return conn.context;
}
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end) {
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority streamPriority) {
HttpRequest request = createRequest(method, rawMethod, uri, headers);
prepareRequestHeaders(request, hostHeader, chunked);
sendRequest(request, buf, end);

View File

@@ -135,8 +135,9 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
@Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
stream.setStreamPriority(new StreamPriority(streamDependency, weight, exclusive));
VertxHttp2Stream stream = streams.get(streamId);
if(stream != null)
stream.setStreamPriority(new StreamPriority(streamDependency, weight, exclusive));
onHeadersRead(ctx, streamId, headers, padding, endOfStream);
}
@@ -332,7 +333,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
}
@Override
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end) {
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end, StreamPriority streamPriority) {
Http2Headers h = new DefaultHttp2Headers();
h.method(method != HttpMethod.OTHER ? method.name() : rawMethod);
if (method == HttpMethod.CONNECT) {
@@ -358,6 +359,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
if (conn.metrics != null) {
request.metric(conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request));
}
setStreamPriority(streamPriority);
writeHeaders(h, end && content == null);
if (content != null) {
writeBuffer(content, end);
@@ -378,6 +380,14 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
handlerContext.flush();
}
}
@Override
public void updateStreamPriority(StreamPriority streamPriority) {
setStreamPriority(streamPriority);
writePriorityFrame();
}
@Override
public void writeFrame(int type, int flags, ByteBuf payload) {
@@ -447,7 +457,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
return conn.toNetSocket(this);
}
}
public static VertxHttp2ConnectionHandler<Http2ClientConnection> createHttp2ConnectionHandler(
HttpClientImpl client,
Object queueMetric,

View File

@@ -102,7 +102,8 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
String hostHeader,
boolean chunked,
ByteBuf buf,
boolean end) {
boolean end,
StreamPriority streamPriority) {
ChannelPipeline pipeline = conn.channel().pipeline();
HttpClientCodec httpCodec = pipeline.get(HttpClientCodec.class);
class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {
@@ -161,7 +162,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536);
pipeline.addAfter("codec", null, new UpgradeRequestHandler());
pipeline.addAfter("codec", null, upgradeHandler);
stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end);
stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, streamPriority);
}
@Override

View File

@@ -71,7 +71,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
private HttpClientStream stream;
private boolean connecting;
private StreamPriority streamPriority; // This filed is used to hold the steamPriority if it is set before the stream is created
private StreamPriority streamPriority = StreamPriority.DEFAULT; // This filed is used to hold the steamPriority if it is set before the stream is created
// completed => drainHandler = null
@@ -513,10 +513,6 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
private void connected(Handler<HttpVersion> headersHandler, HttpClientStream stream) {
synchronized (this) {
this.stream = stream;
if(streamPriority != null) {
stream.setStreamPriority(streamPriority);
}
stream.beginRequest(this);
// If anything was written or the request ended before we got the connection, then
@@ -532,20 +528,20 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
if (completed) {
// we also need to write the head so optimize this and write all out in once
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, true, streamPriority);
stream.reportBytesWritten(written);
stream.endRequest();
} else {
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, false, streamPriority);
}
} else {
if (completed) {
// we also need to write the head so optimize this and write all out in once
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, true, streamPriority);
stream.reportBytesWritten(written);
stream.endRequest();
} else {
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false);
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, null, false, streamPriority);
}
}
this.connecting = false;
@@ -686,8 +682,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
if(!streamPriority.equals(getStreamPriority())) {
this.streamPriority = streamPriority;
if(stream != null) {
stream.setStreamPriority(streamPriority);
stream.writePriorityFrame();
stream.updateStreamPriority(streamPriority);
}
}
}

View File

@@ -39,7 +39,7 @@ interface HttpClientStream {
HttpConnection connection();
Context getContext();
void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end);
void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority streamPriority);
void writeBuffer(ByteBuf buf, boolean end);
void writeFrame(int type, int flags, ByteBuf payload);
default void writePriorityFrame() {}
@@ -62,5 +62,5 @@ interface HttpClientStream {
default StreamPriority getStreamPriority() {
return null;
}
default void setStreamPriority(StreamPriority streamPriority) { }
default void updateStreamPriority(StreamPriority streamPriority) { }
}