Refactor HttpServer initialization in a channel initializer

This commit is contained in:
Julien Viet
2019-07-10 10:06:44 +02:00
parent 44a0a9e9d3
commit 713c1a834d
11 changed files with 514 additions and 312 deletions

View File

@@ -67,7 +67,7 @@ import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED;
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocketImpl> implements HttpConnection {
public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocketImpl> implements HttpServerConnection {
private static final Logger log = LoggerFactory.getLogger(Http1xServerConnection.class);
@@ -80,8 +80,8 @@ public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocket
private HttpServerRequestImpl requestInProgress;
private HttpServerRequestImpl responseInProgress;
private boolean channelPaused;
private Handler<HttpServerRequest> requestHandler;
final Handler<HttpServerRequest> requestHandler;
final HttpServerMetrics metrics;
final boolean handle100ContinueAutomatically;
final HttpServerOptions options;
@@ -92,16 +92,19 @@ public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocket
ChannelHandlerContext channel,
ContextInternal context,
String serverOrigin,
HttpHandlers handlers,
HttpServerMetrics metrics) {
super(vertx, channel, context);
this.requestHandler = requestHandler(handlers);
this.serverOrigin = serverOrigin;
this.options = options;
this.sslHelper = sslHelper;
this.metrics = metrics;
this.handle100ContinueAutomatically = options.isHandle100ContinueAutomatically();
exceptionHandler(handlers.exceptionHandler);
}
@Override
public HttpServerConnection handler(Handler<HttpServerRequest> handler) {
requestHandler = handler;
return this;
}
@Override
@@ -509,23 +512,4 @@ public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocket
return -1;
}
}
private static Handler<HttpServerRequest> requestHandler(HttpHandlers handler) {
if (handler.connectionHandler != null) {
class Adapter implements Handler<HttpServerRequest> {
private boolean isFirst = true;
@Override
public void handle(HttpServerRequest request) {
if (isFirst) {
isFirst = false;
handler.connectionHandler.handle(request.connection());
}
handler.requestHandler.handle(request);
}
}
return new Adapter();
} else {
return handler.requestHandler;
}
}
}

View File

@@ -8,8 +8,10 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http2.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.vertx.core.Handler;
import io.vertx.core.net.impl.HandlerHolder;
import io.vertx.core.net.impl.HandlerManager;
import io.vertx.core.net.impl.VertxHandler;
import java.util.Iterator;
@@ -21,13 +23,13 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
class Http1xUpgradeToH2CHandler extends ChannelInboundHandlerAdapter {
private final HttpServerImpl server;
private final HandlerManager<HttpHandlers> httpHandlerMgr;
private final HttpServerChannelInitializer initializer;
private final HandlerHolder<? extends Handler<HttpServerConnection>> holder;
private VertxHttp2ConnectionHandler<Http2ServerConnection> handler;
Http1xUpgradeToH2CHandler(HttpServerImpl server, HandlerManager<HttpHandlers> httpHandlerMgr) {
this.server = server;
this.httpHandlerMgr = httpHandlerMgr;
Http1xUpgradeToH2CHandler(HttpServerChannelInitializer initializer, HandlerHolder<? extends Handler<HttpServerConnection>> holder) {
this.initializer = initializer;
this.holder = holder;
}
@Override
@@ -61,8 +63,7 @@ class Http1xUpgradeToH2CHandler extends ChannelInboundHandlerAdapter {
if (settingsHeader != null) {
Http2Settings settings = HttpUtils.decodeSettings(settingsHeader);
if (settings != null) {
HandlerHolder<HttpHandlers> reqHandler = httpHandlerMgr.chooseHandler(ctx.channel().eventLoop());
if (reqHandler != null && reqHandler.context.isEventLoopContext()) {
if (holder != null && holder.context.isEventLoopContext()) {
ChannelPipeline pipeline = ctx.pipeline();
DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, SWITCHING_PROTOCOLS, Unpooled.EMPTY_BUFFER, false);
res.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE);
@@ -70,8 +71,7 @@ class Http1xUpgradeToH2CHandler extends ChannelInboundHandlerAdapter {
res.headers().add(HttpHeaderNames.CONTENT_LENGTH, HttpHeaderValues.ZERO);
ctx.writeAndFlush(res);
pipeline.remove("httpEncoder");
pipeline.remove("handler");
handler = server.buildHttp2ConnectionHandler(reqHandler);
handler = initializer.buildHttp2ConnectionHandler(holder.context, holder.handler);
pipeline.addLast("handler", handler);
handler.serverUpgrade(ctx, settings, request);
DefaultHttp2Headers headers = new DefaultHttp2Headers();
@@ -95,6 +95,7 @@ class Http1xUpgradeToH2CHandler extends ChannelInboundHandlerAdapter {
ctx.writeAndFlush(res);
}
} else {
initializer.configureHttp1(ctx.pipeline(), holder);
ctx.fireChannelRead(msg);
ctx.pipeline().remove(this);
}
@@ -116,7 +117,7 @@ class Http1xUpgradeToH2CHandler extends ChannelInboundHandlerAdapter {
pipeline.remove(handler.getKey());
}
}
server.configureHttp2(pipeline);
initializer.configureHttp2(pipeline);
}
} else {
// We might have left over buffer sent when removing the HTTP decoder that needs to be propagated to the HTTP handler
@@ -125,4 +126,13 @@ class Http1xUpgradeToH2CHandler extends ChannelInboundHandlerAdapter {
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.ALL_IDLE) {
ctx.close();
} else {
ctx.fireUserEventTriggered(evt);
}
}
}

View File

@@ -39,13 +39,13 @@ import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class Http2ServerConnection extends Http2ConnectionBase {
public class Http2ServerConnection extends Http2ConnectionBase implements HttpServerConnection {
private final HttpServerOptions options;
private final String serverOrigin;
private final Handler<HttpServerRequest> requestHandler;
private final HttpServerMetrics metrics;
private Handler<HttpServerRequest> requestHandler;
private Long maxConcurrentStreams;
private int concurrentStreams;
private final ArrayDeque<Push> pendingPushes = new ArrayDeque<>(8);
@@ -55,16 +55,20 @@ public class Http2ServerConnection extends Http2ConnectionBase {
String serverOrigin,
VertxHttp2ConnectionHandler connHandler,
HttpServerOptions options,
Handler<HttpServerRequest> requestHandler,
HttpServerMetrics metrics) {
super(context, connHandler);
this.options = options;
this.serverOrigin = serverOrigin;
this.requestHandler = requestHandler;
this.metrics = metrics;
}
@Override
public HttpServerConnection handler(Handler<HttpServerRequest> handler) {
requestHandler = handler;
return this;
}
public HttpServerMetrics metrics() {
return metrics;
}

View File

@@ -11,11 +11,19 @@
package io.vertx.core.http.impl;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFrameServerExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.net.impl.ConnectionBase;
import java.util.ArrayList;
import java.util.Objects;
/**
@@ -23,7 +31,7 @@ import java.util.Objects;
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class HttpHandlers {
public class HttpHandlers implements Handler<HttpServerConnection> {
final HttpServerImpl server;
final Handler<HttpServerRequest> requestHandler;
@@ -44,6 +52,47 @@ public class HttpHandlers {
this.exceptionHandler = exceptionHandler;
}
@Override
public void handle(HttpServerConnection conn) {
server.connectionMap.put(conn.channel(), (ConnectionBase) conn);
conn.channel().closeFuture().addListener(fut -> {
server.connectionMap.remove(conn.channel());
});
Handler<HttpServerRequest> requestHandler = this.requestHandler;
if (HttpServerImpl.DISABLE_WEBSOCKETS) {
// As a performance optimisation you can set a system property to disable websockets altogether which avoids
// some casting and a header check
} else {
if (conn instanceof Http1xServerConnection) {
requestHandler = new WebSocketRequestHandler(server.metrics, this);
Http1xServerConnection c = (Http1xServerConnection) conn;
initializeWebsocketExtensions(c.channelHandlerContext().pipeline());
}
}
conn.exceptionHandler(exceptionHandler);
conn.handler(requestHandler);
if (connectionHandler != null) {
connectionHandler.handle(conn);
}
}
private void initializeWebsocketExtensions(ChannelPipeline pipeline) {
ArrayList<WebSocketServerExtensionHandshaker> extensionHandshakers = new ArrayList<>();
if (server.options.getPerFrameWebsocketCompressionSupported()) {
extensionHandshakers.add(new DeflateFrameServerExtensionHandshaker(server.options.getWebsocketCompressionLevel()));
}
if (server.options.getPerMessageWebsocketCompressionSupported()) {
extensionHandshakers.add(new PerMessageDeflateServerExtensionHandshaker(server.options.getWebsocketCompressionLevel(),
ZlibCodecFactory.isSupportingWindowSizeAndMemLevel(), PerMessageDeflateServerExtensionHandshaker.MAX_WINDOW_SIZE,
server.options.getWebsocketAllowServerNoContext(), server.options.getWebsocketPreferredClientNoContext()));
}
if (!extensionHandshakers.isEmpty()) {
WebSocketServerExtensionHandler extensionHandler = new WebSocketServerExtensionHandler(
extensionHandshakers.toArray(new WebSocketServerExtensionHandshaker[extensionHandshakers.size()]));
pipeline.addLast("websocketExtensionHandler", extensionHandler);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@@ -0,0 +1,264 @@
/*
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.http.impl;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SniHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.impl.cgbystrom.FlashPolicyHandler;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.impl.HandlerHolder;
import io.vertx.core.net.impl.SSLHelper;
import io.vertx.core.net.impl.SslHandshakeCompletionHandler;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
import java.util.function.Function;
/**
* A channel initializer that will takes care of configuring a blank channel for HTTP
* to Vert.x {@link io.vertx.core.http.HttpServerRequest}.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/public class HttpServerChannelInitializer extends ChannelInitializer<Channel> {
private final VertxInternal vertx;
private final SSLHelper sslHelper;
private final HttpServerOptions options;
private final String serverOrigin;
private final HttpServerMetrics metrics;
private final boolean logEnabled;
private final boolean disableH2C;
private final Function<EventLoop, HandlerHolder<? extends Handler<HttpServerConnection>>> connectionHandler;
private final Function<EventLoop, HandlerHolder<? extends Handler<Throwable>>> errorHandler;
public HttpServerChannelInitializer(VertxInternal vertx,
SSLHelper sslHelper,
HttpServerOptions options,
String serverOrigin,
HttpServerMetrics metrics,
boolean disableH2C,
Function<EventLoop, HandlerHolder<? extends Handler<HttpServerConnection>>> connectionHandler,
Function<EventLoop, HandlerHolder<? extends Handler<Throwable>>> errorHandler) {
this.vertx = vertx;
this.sslHelper = sslHelper;
this.options = options;
this.serverOrigin = serverOrigin;
this.metrics = metrics;
this.logEnabled = options.getLogActivity();
this.disableH2C = disableH2C;
this.connectionHandler = connectionHandler;
this.errorHandler = errorHandler;
}
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslHelper.isSSL()) {
ch.pipeline().addFirst("handshaker", new SslHandshakeCompletionHandler(ar -> {
if (ar.succeeded()) {
if (options.isUseAlpn()) {
SslHandler sslHandler = pipeline.get(SslHandler.class);
String protocol = sslHandler.applicationProtocol();
if ("h2".equals(protocol)) {
handleHttp2(ch);
} else {
handleHttp1(ch);
}
} else {
handleHttp1(ch);
}
} else {
handleException(ch, ar.cause());
}
}));
if (options.isSni()) {
SniHandler sniHandler = new SniHandler(sslHelper.serverNameMapper(vertx));
pipeline.addFirst(sniHandler);
} else {
SslHandler handler = new SslHandler(sslHelper.createEngine(vertx));
pipeline.addFirst("ssl", handler);
}
} else {
if (disableH2C) {
handleHttp1(ch);
} else {
IdleStateHandler idle;
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", idle = new IdleStateHandler(0, 0, options.getIdleTimeout(), options.getIdleTimeoutUnit()));
} else {
idle = null;
}
// Handler that detects whether the HTTP/2 connection preface or just process the request
// with the HTTP 1.x pipeline to support H2C with prior knowledge, i.e a client that connects
// and uses HTTP/2 in clear text directly without an HTTP upgrade.
pipeline.addLast(new Http1xOrH2CHandler() {
@Override
protected void configure(ChannelHandlerContext ctx, boolean h2c) {
if (idle != null) {
// It will be re-added but this way we don't need to pay attention to order
pipeline.remove(idle);
}
if (h2c) {
handleHttp2(ctx.channel());
} else {
handleHttp1(ch);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.ALL_IDLE) {
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
handleException(ch, cause);
}
});
}
}
}
private void handleException(Channel ch, Throwable cause) {
HandlerHolder<? extends Handler<Throwable>> holder = errorHandler.apply(ch.eventLoop());
if (holder != null) {
holder.context.executeFromIO(cause, holder.handler);
}
}
private void handleHttp1(Channel ch) {
HandlerHolder<? extends Handler<HttpServerConnection>> holder = connectionHandler.apply(ch.eventLoop());
if (holder == null) {
sendServiceUnavailable(ch);
return;
}
configureHttp1OrH2C(ch.pipeline(), holder);
}
private void sendServiceUnavailable(Channel ch) {
ch.writeAndFlush(
Unpooled.copiedBuffer("HTTP/1.1 503 Service Unavailable\r\n" +
"Content-Length:0\r\n" +
"\r\n", StandardCharsets.ISO_8859_1))
.addListener(ChannelFutureListener.CLOSE);
}
private void handleHttp2(Channel ch) {
HandlerHolder<? extends Handler<HttpServerConnection>> holder = connectionHandler.apply(ch.eventLoop());
if (holder == null) {
ch.close();
return;
}
VertxHttp2ConnectionHandler<Http2ServerConnection> handler = buildHttp2ConnectionHandler(holder.context, holder.handler);
ch.pipeline().addLast("handler", handler);
configureHttp2(ch.pipeline());
}
void configureHttp2(ChannelPipeline pipeline) {
if (options.getIdleTimeout() > 0) {
pipeline.addBefore("handler", "idle", new IdleStateHandler(0, 0, options.getIdleTimeout(), options.getIdleTimeoutUnit()));
}
}
VertxHttp2ConnectionHandler<Http2ServerConnection> buildHttp2ConnectionHandler(ContextInternal ctx, Handler<HttpServerConnection> handler_) {
VertxHttp2ConnectionHandler<Http2ServerConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ServerConnection>()
.server(true)
.useCompression(options.isCompressionSupported())
.useDecompression(options.isDecompressionSupported())
.compressionLevel(options.getCompressionLevel())
.initialSettings(options.getInitialSettings())
.connectionFactory(connHandler -> new Http2ServerConnection(ctx, serverOrigin, connHandler, options, metrics))
.logEnabled(logEnabled)
.build();
handler.addHandler(conn -> {
if (metrics != null) {
conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName()));
}
if (options.getHttp2ConnectionWindowSize() > 0) {
conn.setWindowSize(options.getHttp2ConnectionWindowSize());
}
ctx.executeFromIO(conn, handler_);
});
return handler;
}
private void configureHttp1OrH2C(ChannelPipeline pipeline, HandlerHolder<? extends Handler<HttpServerConnection>> holder) {
if (logEnabled) {
pipeline.addLast("logging", new LoggingHandler());
}
if (HttpServerImpl.USE_FLASH_POLICY_HANDLER) {
pipeline.addLast("flashpolicy", new FlashPolicyHandler());
}
pipeline.addLast("httpDecoder", new VertxHttpRequestDecoder(options));
pipeline.addLast("httpEncoder", new VertxHttpResponseEncoder());
if (options.isDecompressionSupported()) {
pipeline.addLast("inflater", new HttpContentDecompressor(false));
}
if (options.isCompressionSupported()) {
pipeline.addLast("deflater", new HttpChunkContentCompressor(options.getCompressionLevel()));
}
if (sslHelper.isSSL() || options.isCompressionSupported()) {
// only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used.
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support
}
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", new IdleStateHandler(0, 0, options.getIdleTimeout(), options.getIdleTimeoutUnit()));
}
if (disableH2C) {
configureHttp1(pipeline, holder);
} else {
pipeline.addLast("h2c", new Http1xUpgradeToH2CHandler(this, holder));
}
}
void configureHttp1(ChannelPipeline pipeline, HandlerHolder<? extends Handler<HttpServerConnection>> holder) {
VertxHandler<Http1xServerConnection> handler = VertxHandler.create(holder.context, chctx -> {
Http1xServerConnection conn = new Http1xServerConnection(holder.context.owner(),
sslHelper,
options,
chctx,
holder.context,
serverOrigin,
metrics);
return conn;
});
pipeline.addLast("handler", handler);
Http1xServerConnection conn = handler.getConnection();
if (metrics != null) {
holder.context.executeFromIO(v -> conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName())));
}
holder.context.executeFromIO(conn, holder.handler);
}
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.http.impl;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServerRequest;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public interface HttpServerConnection extends HttpConnection {
Channel channel();
ChannelHandlerContext channelHandlerContext();
@Fluent
HttpServerConnection handler(Handler<HttpServerRequest> handler);
}

View File

@@ -12,29 +12,14 @@
package io.vertx.core.http.impl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFrameServerExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SniHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.cgbystrom.FlashPolicyHandler;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
@@ -48,8 +33,6 @@ import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.core.streams.ReadStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -65,25 +48,25 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
static final Logger log = LoggerFactory.getLogger(HttpServerImpl.class);
private static final Handler<Throwable> DEFAULT_EXCEPTION_HANDLER = t -> log.trace("Connection failure", t);
private static final String FLASH_POLICY_HANDLER_PROP_NAME = "vertx.flashPolicyHandler";
private static final boolean USE_FLASH_POLICY_HANDLER = Boolean.getBoolean(FLASH_POLICY_HANDLER_PROP_NAME);
private static final String DISABLE_WEBSOCKETS_PROP_NAME = "vertx.disableWebsockets";
private static final boolean DISABLE_WEBSOCKETS = Boolean.getBoolean(DISABLE_WEBSOCKETS_PROP_NAME);
private static final String DISABLE_H2C_PROP_NAME = "vertx.disableH2c";
private final boolean DISABLE_H2C = Boolean.getBoolean(DISABLE_H2C_PROP_NAME);
private final HttpServerOptions options;
private final VertxInternal vertx;
private static final String FLASH_POLICY_HANDLER_PROP_NAME = "vertx.flashPolicyHandler";
private static final String DISABLE_WEBSOCKETS_PROP_NAME = "vertx.disableWebsockets";
private static final String DISABLE_H2C_PROP_NAME = "vertx.disableH2c";
static final boolean USE_FLASH_POLICY_HANDLER = Boolean.getBoolean(FLASH_POLICY_HANDLER_PROP_NAME);
static final boolean DISABLE_WEBSOCKETS = Boolean.getBoolean(DISABLE_WEBSOCKETS_PROP_NAME);
final HttpServerOptions options;
final VertxInternal vertx;
private final SSLHelper sslHelper;
private final ContextInternal creatingContext;
private final Map<Channel, ConnectionBase> connectionMap = new ConcurrentHashMap<>();
private final boolean disableH2c = Boolean.getBoolean(DISABLE_H2C_PROP_NAME);
final Map<Channel, ConnectionBase> connectionMap = new ConcurrentHashMap<>();
private final VertxEventLoopGroup availableWorkers = new VertxEventLoopGroup();
private final HandlerManager<HttpHandlers> httpHandlerMgr = new HandlerManager<>(availableWorkers);
private final HttpStreamHandler<ServerWebSocket> wsStream = new HttpStreamHandler<>();
private final HttpStreamHandler<HttpServerRequest> requestStream = new HttpStreamHandler<>();
private Handler<HttpConnection> connectionHandler;
private final String subProtocols;
private String serverOrigin;
private ChannelGroup serverChannelGroup;
private volatile boolean listening;
@@ -92,8 +75,7 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
private HttpServerImpl actualServer;
private volatile int actualPort;
private ContextInternal listenContext;
private HttpServerMetrics metrics;
private boolean logEnabled;
HttpServerMetrics metrics;
private Handler<Throwable> exceptionHandler;
public HttpServerImpl(VertxInternal vertx, HttpServerOptions options) {
@@ -104,8 +86,6 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
creatingContext.addCloseHook(this);
}
this.sslHelper = new SSLHelper(options, options.getKeyCertOptions(), options.getTrustOptions());
this.subProtocols = options.getWebsocketSubProtocols();
this.logEnabled = options.getLogActivity();
}
@Override
@@ -200,6 +180,36 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
return promise.future();
}
private ChannelHandler childHandler(SocketAddress address, String serverOrigin) {
VertxMetrics vertxMetrics = vertx.metricsSPI();
this.metrics = vertxMetrics != null ? vertxMetrics.createHttpServerMetrics(options, address) : null;
return new HttpServerChannelInitializer(
vertx,
sslHelper,
options,
serverOrigin,
metrics,
disableH2c,
httpHandlerMgr::chooseHandler,
eventLoop -> {
HandlerHolder<HttpHandlers> holder = httpHandlerMgr.chooseHandler(eventLoop);
if (holder != null && holder.handler.exceptionHandler != null) {
return new HandlerHolder<>(holder.context, holder.handler.exceptionHandler);
} else {
return null;
}
}) {
@Override
protected void initChannel(Channel ch) {
if (!requestStream.accept() || !wsStream.accept()) {
ch.close();
} else {
super.initChannel(ch);
}
}
};
}
public synchronized HttpServer listen(SocketAddress address, Handler<AsyncResult<HttpServer>> listenHandler) {
if (requestStream.handler() == null && wsStream.handler() == null) {
throw new IllegalStateException("Set request or websocket handler first");
@@ -211,7 +221,6 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
listening = true;
String host = address.host() != null ? address.host() : "localhost";
int port = address.port();
serverOrigin = (options.isSsl() ? "https" : "http") + "://" + host + ":" + port;
List<HttpVersion> applicationProtocols = options.getAlpnVersions();
if (listenContext.isWorkerContext()) {
applicationProtocols = applicationProtocols.stream().filter(v -> v != HttpVersion.HTTP_2).collect(Collectors.toList());
@@ -227,86 +236,8 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers);
applyConnectionOptions(address.path() != null, bootstrap);
sslHelper.validate(vertx);
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
if (!requestStream.accept() || !wsStream.accept()) {
ch.close();
return;
}
ChannelPipeline pipeline = ch.pipeline();
if (sslHelper.isSSL()) {
ch.pipeline().addFirst("handshaker", new SslHandshakeCompletionHandler(ar -> {
if (ar.succeeded()) {
if (options.isUseAlpn()) {
SslHandler sslHandler = pipeline.get(SslHandler.class);
String protocol = sslHandler.applicationProtocol();
if ("h2".equals(protocol)) {
handleHttp2(ch);
} else {
handleHttp1(ch);
}
} else {
handleHttp1(ch);
}
} else {
HandlerHolder<HttpHandlers> handler = httpHandlerMgr.chooseHandler(ch.eventLoop());
handler.context.executeFromIO(ar.cause(), handler.handler.exceptionHandler);
}
}));
if (options.isSni()) {
SniHandler sniHandler = new SniHandler(sslHelper.serverNameMapper(vertx));
pipeline.addFirst(sniHandler);
} else {
SslHandler handler = new SslHandler(sslHelper.createEngine(vertx));
pipeline.addFirst("ssl", handler);
}
} else {
if (DISABLE_H2C) {
handleHttp1(ch);
} else {
IdleStateHandler idle;
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", idle = new IdleStateHandler(0, 0, options.getIdleTimeout(), options.getIdleTimeoutUnit()));
} else {
idle = null;
}
// Handler that detects whether the HTTP/2 connection preface or just process the request
// with the HTTP 1.x pipeline to support H2C with prior knowledge, i.e a client that connects
// and uses HTTP/2 in clear text directly without an HTTP upgrade.
pipeline.addLast(new Http1xOrH2CHandler() {
@Override
protected void configure(ChannelHandlerContext ctx, boolean h2c) {
if (idle != null) {
// It will be re-added but this way we don't need to pay attention to order
pipeline.remove(idle);
}
if (h2c) {
handleHttp2(ctx.channel());
} else {
handleHttp1(ch);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.ALL_IDLE) {
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
HandlerHolder<HttpHandlers> handler = httpHandlerMgr.chooseHandler(ctx.channel().eventLoop());
handler.context.executeFromIO(v -> handler.handler.exceptionHandler.handle(cause));
}
});
}
}
}
});
String serverOrigin = (options.isSsl() ? "https" : "http") + "://" + host + ":" + port;
bootstrap.childHandler(childHandler(address, serverOrigin));
addHandlers(this, listenContext);
try {
bindFuture = AsyncResolveConnectHelper.doBind(vertx, address, bootstrap);
@@ -321,8 +252,6 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
HttpServerImpl.this.actualPort = address.port();
}
serverChannelGroup.add(serverChannel);
VertxMetrics metrics = vertx.metricsSPI();
this.metrics = metrics != null ? metrics.createHttpServerMetrics(options, address) : null;
}
});
} catch (final Throwable t) {
@@ -358,6 +287,10 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
listenContext.runOnContext((v) -> listenHandler.handle(res));
} else if (future.failed()) {
listening = false;
if (metrics != null) {
metrics.close();
metrics = null;
}
// No handler - log so user can see failure
log.error(future.cause());
}
@@ -366,149 +299,6 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
return this;
}
VertxHttp2ConnectionHandler<Http2ServerConnection> buildHttp2ConnectionHandler(HandlerHolder<HttpHandlers> holder) {
VertxHttp2ConnectionHandler<Http2ServerConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ServerConnection>()
.server(true)
.useCompression(options.isCompressionSupported())
.useDecompression(options.isDecompressionSupported())
.compressionLevel(options.getCompressionLevel())
.initialSettings(options.getInitialSettings())
.connectionFactory(connHandler -> new Http2ServerConnection(holder.context, serverOrigin, connHandler, options, holder.handler.requestHandler, metrics))
.logEnabled(logEnabled)
.build();
handler.addHandler(conn -> {
connectionMap.put(conn.channel(), conn);
if (metrics != null) {
conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName()));
}
if (options.getHttp2ConnectionWindowSize() > 0) {
conn.setWindowSize(options.getHttp2ConnectionWindowSize());
}
if (holder.handler.connectionHandler != null) {
holder.context.executeFromIO(v -> {
holder.handler.connectionHandler.handle(conn);
});
}
});
handler.removeHandler(conn -> {
connectionMap.remove(conn.channel());
});
return handler;
}
private void configureHttp1(ChannelPipeline pipeline, HandlerHolder<HttpHandlers> holder) {
if (logEnabled) {
pipeline.addLast("logging", new LoggingHandler());
}
if (USE_FLASH_POLICY_HANDLER) {
pipeline.addLast("flashpolicy", new FlashPolicyHandler());
}
pipeline.addLast("httpDecoder", new VertxHttpRequestDecoder(options));
pipeline.addLast("httpEncoder", new VertxHttpResponseEncoder());
if (options.isDecompressionSupported()) {
pipeline.addLast("inflater", new HttpContentDecompressor(false));
}
if (options.isCompressionSupported()) {
pipeline.addLast("deflater", new HttpChunkContentCompressor(options.getCompressionLevel()));
}
if (sslHelper.isSSL() || options.isCompressionSupported()) {
// only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used.
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support
}
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", new IdleStateHandler(0, 0, options.getIdleTimeout(), options.getIdleTimeoutUnit()));
}
if (!DISABLE_H2C) {
pipeline.addLast("h2c", new Http1xUpgradeToH2CHandler(this, httpHandlerMgr));
}
if (DISABLE_WEBSOCKETS) {
// As a performance optimisation you can set a system property to disable websockets altogether which avoids
// some casting and a header check
} else {
holder = new HandlerHolder<>(holder.context, new HttpHandlers(
this,
new WebSocketRequestHandler(metrics, holder.handler),
holder.handler.wsHandler,
holder.handler.connectionHandler,
holder.handler.exceptionHandler));
initializeWebsocketExtensions (pipeline);
}
HandlerHolder<HttpHandlers> holder2 = holder;
VertxHandler<Http1xServerConnection> handler = VertxHandler.create(holder2.context, chctx -> {
Http1xServerConnection conn = new Http1xServerConnection(holder2.context.owner(),
sslHelper,
options,
chctx,
holder2.context,
serverOrigin,
holder2.handler,
metrics);
if (metrics != null) {
holder2.context.executeFromIO(v -> conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName())));
}
return conn;
});
handler.addHandler(conn -> {
connectionMap.put(pipeline.channel(), conn);
});
handler.removeHandler(conn -> {
connectionMap.remove(pipeline.channel());
});
pipeline.addLast("handler", handler);
}
private void initializeWebsocketExtensions(ChannelPipeline pipeline) {
ArrayList<WebSocketServerExtensionHandshaker> extensionHandshakers = new ArrayList<>();
if (options.getPerFrameWebsocketCompressionSupported()) {
extensionHandshakers.add(new DeflateFrameServerExtensionHandshaker(options.getWebsocketCompressionLevel()));
}
if (options.getPerMessageWebsocketCompressionSupported()) {
extensionHandshakers.add(new PerMessageDeflateServerExtensionHandshaker(options.getWebsocketCompressionLevel(),
ZlibCodecFactory.isSupportingWindowSizeAndMemLevel(), PerMessageDeflateServerExtensionHandshaker.MAX_WINDOW_SIZE,
options.getWebsocketAllowServerNoContext(), options.getWebsocketPreferredClientNoContext()));
}
if (!extensionHandshakers.isEmpty()) {
WebSocketServerExtensionHandler extensionHandler = new WebSocketServerExtensionHandler(
extensionHandshakers.toArray(new WebSocketServerExtensionHandshaker[extensionHandshakers.size()]));
pipeline.addLast("websocketExtensionHandler", extensionHandler);
}
}
private void handleHttp1(Channel ch) {
HandlerHolder<HttpHandlers> holder = httpHandlerMgr.chooseHandler(ch.eventLoop());
if (holder == null) {
sendServiceUnavailable(ch);
return;
}
configureHttp1(ch.pipeline(), holder);
}
private void sendServiceUnavailable(Channel ch) {
ch.writeAndFlush(
Unpooled.copiedBuffer("HTTP/1.1 503 Service Unavailable\r\n" +
"Content-Length:0\r\n" +
"\r\n", StandardCharsets.ISO_8859_1))
.addListener(ChannelFutureListener.CLOSE);
}
private void handleHttp2(Channel ch) {
HandlerHolder<HttpHandlers> holder = httpHandlerMgr.chooseHandler(ch.eventLoop());
if (holder == null) {
ch.close();
return;
}
VertxHttp2ConnectionHandler<Http2ServerConnection> handler = buildHttp2ConnectionHandler(holder);
ch.pipeline().addLast("handler", handler);
configureHttp2(ch.pipeline());
}
void configureHttp2(ChannelPipeline pipeline) {
if (options.getIdleTimeout() > 0) {
pipeline.addBefore("handler", "idle", new IdleStateHandler(0, 0, options.getIdleTimeout(), options.getIdleTimeoutUnit()));
}
}
/**
* Internal method that closes all servers when Vert.x is closing
*/
@@ -675,7 +465,7 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
In practice synchronized overhead should be close to zero assuming most access is from the same thread due
to biased locks
*/
private class HttpStreamHandler<C extends ReadStream<Buffer>> implements ReadStream<C> {
class HttpStreamHandler<C extends ReadStream<Buffer>> implements ReadStream<C> {
private Handler<C> handler;
private long demand = Long.MAX_VALUE;

View File

@@ -231,16 +231,18 @@ public class HttpServerHandlerBenchmark extends BenchmarkBase {
.add(HEADER_CONTENT_LENGTH, HELLO_WORLD_LENGTH);
response.end(HELLO_WORLD_BUFFER);
};
HandlerHolder<HttpHandlers> holder = new HandlerHolder<>(context, new HttpHandlers(null, app, null, null, null));
VertxHandler<Http1xServerConnection> handler = VertxHandler.create(holder.context, chctx -> new Http1xServerConnection(
holder.context.owner(),
null,
new HttpServerOptions(),
chctx,
holder.context,
"localhost",
holder.handler,
null));
VertxHandler<Http1xServerConnection> handler = VertxHandler.create(context, chctx -> {
Http1xServerConnection conn = new Http1xServerConnection(
context.owner(),
null,
new HttpServerOptions(),
chctx,
context,
"localhost",
null);
conn.handler(app);
return conn;
});
vertxChannel.pipeline().addLast("handler", handler);
nettyChannel = new EmbeddedChannel(new HttpRequestDecoder(

View File

@@ -2937,14 +2937,16 @@ public class Http1xTest extends HttpTest {
@Test
public void testServerExceptionHandler() throws Exception {
Context serverCtx = vertx.getOrCreateContext();
server.exceptionHandler(err -> {
assertSame(serverCtx, Vertx.currentContext());
assertTrue(err instanceof TooLongFrameException);
testComplete();
});
server.requestHandler(req -> {
fail();
});
startServer(testAddress);
startServer(testAddress, serverCtx);
HttpClientRequest req = client.request(HttpMethod.POST, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> {
});
req.putHeader("the_header", TestUtils.randomAlphaString(10000));

View File

@@ -11,6 +11,8 @@
package io.vertx.core.http;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.impl.HttpServerImpl;
import io.vertx.core.net.NetClientOptions;
@@ -40,13 +42,17 @@ public class HttpConnectionEarlyResetTest extends VertxTestBase {
public void setUp() throws Exception {
super.setUp();
CountDownLatch listenLatch = new CountDownLatch(1);
Context ctx = vertx.getOrCreateContext();
httpServer = vertx.createHttpServer()
.requestHandler(request -> {})
.exceptionHandler(t -> {
assertSame(ctx, Vertx.currentContext());
caught.set(t);
resetLatch.countDown();
})
.listen(8080, onSuccess(server -> listenLatch.countDown()));
});
ctx.runOnContext(v -> {
httpServer.listen(8080, onSuccess(server -> listenLatch.countDown()));
});
awaitLatch(listenLatch);
}

View File

@@ -0,0 +1,59 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.http;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ServerChannel;
import io.vertx.core.http.impl.HttpServerChannelInitializer;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.impl.HandlerHolder;
import io.vertx.core.net.impl.SSLHelper;
import io.vertx.core.net.impl.transport.Transport;
import io.vertx.test.core.VertxTestBase;
import org.junit.Test;
public class HttpServerChannelInitializerTest extends VertxTestBase {
@Test
public void testHttpServer() throws Exception {
VertxInternal vertx = (VertxInternal) this.vertx;
Transport transport = vertx.transport();
ChannelFactory<? extends ServerChannel> factory = transport.serverChannelFactory(false);
ServerBootstrap bs = new ServerBootstrap();
bs.group(vertx.getAcceptorEventLoopGroup(), this.vertx.nettyEventLoopGroup());
bs.channelFactory(factory);
bs.childHandler(new HttpServerChannelInitializer(
vertx,
new SSLHelper(new HttpServerOptions(), null, null),
new HttpServerOptions(),
"http://localhost:8080",
null,
false,
eventLoop -> new HandlerHolder<>(vertx.createEventLoopContext(eventLoop, null, null), conn -> {
conn.handler(req -> {
req.response().end("Hello World");
});
}),
eventLoop -> new HandlerHolder<>(vertx.createEventLoopContext(eventLoop, null, null), this::fail))
);
ChannelFuture bind = bs.bind(HttpTest.DEFAULT_HTTP_HOST, HttpTest.DEFAULT_HTTP_PORT);
bind.sync();
HttpClient client = this.vertx.createHttpClient();
client.get(HttpTest.DEFAULT_HTTP_PORT, HttpTest.DEFAULT_HTTP_HOST, "/", resp -> {
testComplete();
}).exceptionHandler(this::fail).end();
await();
}
}