Move the http client message handling Http1xClientConnection

This commit is contained in:
Julien Viet
2018-08-23 11:19:23 +02:00
parent 31a3d466af
commit 894446a2e1
5 changed files with 72 additions and 67 deletions

View File

@@ -12,10 +12,7 @@
package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.*;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.http.*;
@@ -77,6 +74,7 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC
private WebSocketClientHandshaker handshaker;
private WebSocketImpl ws;
private boolean closeFrameSent;
private StreamImpl requestInProgress; // The request being sent
private StreamImpl responseInProgress; // The request waiting for a response
@@ -520,7 +518,61 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC
}
}
void handleMessage(HttpObject obj) {
private Throwable validateMessage(Object msg) {
if (msg instanceof HttpObject) {
HttpObject obj = (HttpObject) msg;
DecoderResult result = obj.decoderResult();
if (result.isFailure()) {
return result.cause();
} else if (obj instanceof HttpResponse) {
io.netty.handler.codec.http.HttpVersion version = ((HttpResponse) obj).protocolVersion();
if (version != io.netty.handler.codec.http.HttpVersion.HTTP_1_0 && version != io.netty.handler.codec.http.HttpVersion.HTTP_1_1) {
return new IllegalStateException("Unsupported HTTP version: " + version);
}
}
}
return null;
}
protected void handleMessage(Object msg) {
Throwable error = validateMessage(msg);
if (error != null) {
fail(error);
} else if (msg instanceof HttpObject) {
HttpObject obj = (HttpObject) msg;
handleHttpMessage(obj);
} else if (msg instanceof WebSocketFrameInternal) {
WebSocketFrameInternal frame = (WebSocketFrameInternal) msg;
switch (frame.type()) {
case BINARY:
case CONTINUATION:
case TEXT:
case PONG:
handleWsFrame(frame);
break;
case PING:
// Echo back the content of the PING frame as PONG frame as specified in RFC 6455 Section 5.5.2
chctx.writeAndFlush(new PongWebSocketFrame(frame.getBinaryData().copy()));
break;
case CLOSE:
handleWsFrame(frame);
if (!closeFrameSent) {
// Echo back close frame and close the connection once it was written.
// This is specified in the WebSockets RFC 6455 Section 5.4.1
CloseWebSocketFrame closeFrame = new CloseWebSocketFrame(frame.closeStatusCode(), frame.closeReason());
chctx.writeAndFlush(closeFrame).addListener(ChannelFutureListener.CLOSE);
closeFrameSent = true;
}
break;
default:
throw new IllegalStateException("Invalid type: " + frame.type());
}
} else {
throw new IllegalStateException("Invalid object " + msg);
}
}
void handleHttpMessage(HttpObject obj) {
if (obj instanceof HttpResponse) {
handleResponseBegin((HttpResponse) obj);
} else if (obj instanceof HttpContent) {

View File

@@ -11,16 +11,9 @@
package io.vertx.core.http.impl;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.pool.ConnectionListener;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.metrics.HttpClientMetrics;
@@ -28,7 +21,7 @@ import io.vertx.core.spi.metrics.HttpClientMetrics;
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class Http1xClientHandler extends VertxHttpHandler<Http1xClientConnection> {
private boolean closeFrameSent;
private ContextInternal context;
private ChannelHandlerContext chctx;
private final HttpVersion version;
@@ -89,58 +82,8 @@ class Http1xClientHandler extends VertxHttpHandler<Http1xClientConnection> {
super.channelInactive(chctx);
}
private Throwable validateMessage(Object msg) {
if (msg instanceof HttpObject) {
HttpObject obj = (HttpObject) msg;
DecoderResult result = obj.decoderResult();
if (result.isFailure()) {
return result.cause();
} else if (obj instanceof HttpResponse) {
io.netty.handler.codec.http.HttpVersion version = ((HttpResponse) obj).protocolVersion();
if (version != io.netty.handler.codec.http.HttpVersion.HTTP_1_0 && version != io.netty.handler.codec.http.HttpVersion.HTTP_1_1) {
return new IllegalStateException("Unsupported HTTP version: " + version);
}
}
}
return null;
}
@Override
protected void handleMessage(Http1xClientConnection conn, Object msg) {
Throwable error = validateMessage(msg);
if (error != null) {
fail(error);
} else if (msg instanceof HttpObject) {
HttpObject obj = (HttpObject) msg;
conn.handleMessage(obj);
} else if (msg instanceof WebSocketFrameInternal) {
WebSocketFrameInternal frame = (WebSocketFrameInternal) msg;
switch (frame.type()) {
case BINARY:
case CONTINUATION:
case TEXT:
case PONG:
conn.handleWsFrame(frame);
break;
case PING:
// Echo back the content of the PING frame as PONG frame as specified in RFC 6455 Section 5.5.2
chctx.writeAndFlush(new PongWebSocketFrame(frame.getBinaryData().copy()));
break;
case CLOSE:
conn.handleWsFrame(frame);
if (!closeFrameSent) {
// Echo back close frame and close the connection once it was written.
// This is specified in the WebSockets RFC 6455 Section 5.4.1
CloseWebSocketFrame closeFrame = new CloseWebSocketFrame(frame.closeStatusCode(), frame.closeReason());
chctx.writeAndFlush(closeFrame).addListener(ChannelFutureListener.CLOSE);
closeFrameSent = true;
}
break;
default:
throw new IllegalStateException("Invalid type: " + frame.type());
}
} else {
throw new IllegalStateException("Invalid object " + msg);
}
conn.handleMessage(msg);
}
}

View File

@@ -518,10 +518,10 @@ public class Http1xServerConnection extends Http1xConnectionBase implements Http
ChannelPromise fut = chctx.newPromise();
writeToChannel(resp, fut);
fut.addListener(res -> {
handler().fail(result.cause());
fail(result.cause());
});
} else {
handler().fail(result.cause());
fail(result.cause());
}
}

View File

@@ -68,6 +68,16 @@ public abstract class ConnectionBase {
this.voidPromise = new VoidChannelPromise(chctx.channel(), false);
}
/**
* Fail the connection, the {@code error} will be sent to the pipeline and the connection will
* stop processing any further message.
*
* @param error the {@code Throwable} to propagate
*/
public void fail(Throwable error) {
handler().fail(error);
}
/**
* Encode to message before writing to the channel
*

View File

@@ -59,7 +59,7 @@ public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDupl
*
* @param error the {@code Throwable} to propagate
*/
public void fail(Throwable error) {
void fail(Throwable error) {
messageHandler = NULL_HANDLER;
conn.chctx.pipeline().fireExceptionCaught(error);
}