Handle the websocket upgrade in WebSocketRequestHandler and remove the WebSocketServerHandler subclassing Http1xServerConnection

This commit is contained in:
Julien Viet
2018-08-25 10:00:00 +02:00
parent 13da168022
commit 99afbbced7
7 changed files with 120 additions and 106 deletions

View File

@@ -9,7 +9,7 @@
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -82,7 +82,7 @@ Set the list of DNS server addresses, an address is the IP of the dns server, f
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -117,7 +117,7 @@ Sets whether or not the current link is required.
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -236,7 +236,7 @@ Set whether Netty pooled buffers are enabled
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -262,7 +262,7 @@ Whether an existing file, empty directory, or link should be replaced. Defaults
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -314,7 +314,7 @@ Set the value of traffic class
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -347,7 +347,7 @@ Set the send timeout.
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -402,7 +402,7 @@ Set the maximum number of worker threads to be used by the Vert.x instance.
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -431,7 +431,7 @@ Set whether or not recursion is desired
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -580,7 +580,7 @@ Set whether Netty pooled buffers are enabled
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -596,7 +596,7 @@ Set whether Netty pooled buffers are enabled
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -622,7 +622,7 @@ Set the last stream id.
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -654,7 +654,7 @@ Set the HTTP/2 setting
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -897,7 +897,7 @@ Set the websocket compression server no context option
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1104,7 +1104,7 @@ Set the websocket subprotocols supported by the server.
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1136,7 +1136,7 @@ Set the websocket subprotocols supported by the server.
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1160,7 +1160,7 @@ Set the key store as a buffer
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1177,7 +1177,7 @@ Set whether metrics will be enabled on the Vert.x instance.
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1306,7 +1306,7 @@ Set whether Netty pooled buffers are enabled
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1425,7 +1425,7 @@ Set whether Netty pooled buffers are enabled
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1459,7 +1459,7 @@ Set the value of traffic class
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1506,7 +1506,7 @@ Set whether the file is to be opened for writing
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1526,7 +1526,7 @@ Set whether session cache is enabled in open SSL session server context
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1645,7 +1645,7 @@ Sets whether or not this option can receive a value.
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1711,7 +1711,7 @@ Set all the keys as a list of buffer
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1749,7 +1749,7 @@ Add a certificate value
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1772,7 +1772,7 @@ Set the store as a buffer
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1803,7 +1803,7 @@ Set proxy username.
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1829,7 +1829,7 @@ Set the request relative URI
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
@@ -1931,7 +1931,7 @@ Set whether Netty pooled buffers are enabled
++++
'''
[cols=">25%,75%"]
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description

View File

@@ -302,10 +302,10 @@ public class Http1xServerConnection extends Http1xConnectionBase implements Http
if (ws != null) {
return ws;
}
if (!(request.request instanceof FullHttpRequest)) {
if (!(request.getRequest() instanceof FullHttpRequest)) {
throw new IllegalStateException();
}
FullHttpRequest nettyReq = (FullHttpRequest) request.request;
FullHttpRequest nettyReq = (FullHttpRequest) request.getRequest();
WebSocketServerHandshaker handshaker = createHandshaker(nettyReq);
if (handshaker == null) {
throw new IllegalStateException("Can't upgrade this request");

View File

@@ -421,11 +421,9 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
if (!DISABLE_H2C) {
pipeline.addLast("h2c", new Http1xUpgradeToH2CHandler(this, httpHandlerMgr));
}
Http1xServerHandler handler;
if (DISABLE_WEBSOCKETS) {
// As a performance optimisation you can set a system property to disable websockets altogether which avoids
// some casting and a header check
handler = new Http1xServerHandler(sslHelper, options, serverOrigin, holder, metrics);
} else {
holder = new HandlerHolder<>(holder.context, new HttpHandlers(
new WebSocketRequestHandler(metrics, holder.handler),
@@ -433,8 +431,8 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
holder.handler.connectionHandler,
holder.handler.exceptionHandler));
initializeWebsocketExtensions (pipeline);
handler = new WebSocketServerHandler(sslHelper, options, serverOrigin, holder, metrics);
}
Http1xServerHandler handler = new Http1xServerHandler(sslHelper, options, serverOrigin, holder, metrics);
handler.addHandler(conn -> {
connectionMap.put(pipeline.channel(), conn);
});

View File

@@ -56,8 +56,8 @@ public class HttpServerRequestImpl implements HttpServerRequest {
private static final Logger log = LoggerFactory.getLogger(HttpServerRequestImpl.class);
private final Http1xServerConnection conn;
final DefaultHttpRequest request;
private DefaultHttpRequest request;
private io.vertx.core.http.HttpVersion version;
private io.vertx.core.http.HttpMethod method;
private String rawMethod;
@@ -92,6 +92,18 @@ public class HttpServerRequestImpl implements HttpServerRequest {
this.request = request;
}
DefaultHttpRequest getRequest() {
synchronized (conn) {
return request;
}
}
void setRequest(DefaultHttpRequest request) {
synchronized (conn) {
this.request = request;
}
}
private Queue<Buffer> pendingQueue() {
if (pending == null) {
pending = Queue.queue(conn.getContext(), 8);

View File

@@ -10,7 +10,11 @@
*/
package io.vertx.core.http.impl;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.spi.metrics.HttpServerMetrics;
@@ -35,19 +39,61 @@ public class WebSocketRequestHandler implements Handler<HttpServerRequest> {
@Override
public void handle(HttpServerRequest req) {
if (req.headers().contains(io.vertx.core.http.HttpHeaders.UPGRADE, io.vertx.core.http.HttpHeaders.WEBSOCKET, true)) {
ServerWebSocketImpl ws = ((Http1xServerConnection)req.connection()).createWebSocket((HttpServerRequestImpl) req);
if (METRICS_ENABLED && metrics != null) {
ws.setMetric(metrics.connected(((Http1xServerConnection)req.connection()).metric(), ws));
}
if (handlers.wsHandler != null) {
handlers.wsHandler.handle(ws);
if (!ws.isRejected()) {
ws.connectNow();
} else {
req.response().setStatusCode(ws.getRejectedStatus().code()).end();
handle((HttpServerRequestImpl) req);
} else {
handlers.requestHandler.handle(req);
}
}
/**
* Handle the request when a websocket upgrade header is present.
*/
private void handle(HttpServerRequestImpl req) {
Buffer body = Buffer.buffer();
boolean[] failed = new boolean[1];
req.handler(buff -> {
if (!failed[0]) {
body.appendBuffer(buff);
if (body.length() > 8192) {
failed[0] = true;
// Request Entity Too Large
HttpServerResponseImpl resp = req.response();
resp.setStatusCode(413).end();
resp.close();
}
}
});
req.endHandler(v -> {
if (!failed[0]) {
handle(req, body);
}
});
}
/**
* Handle the request once we have the full body.
*/
private void handle(HttpServerRequestImpl req, Buffer body) {
DefaultHttpRequest nettyReq = req.getRequest();
nettyReq = new DefaultFullHttpRequest(
nettyReq.protocolVersion(),
nettyReq.method(),
nettyReq.uri(),
body.getByteBuf(),
nettyReq.headers(),
EmptyHttpHeaders.INSTANCE
);
req.setRequest(nettyReq);
ServerWebSocketImpl ws = ((Http1xServerConnection)req.connection()).createWebSocket(req);
if (METRICS_ENABLED && metrics != null) {
ws.setMetric(metrics.connected(((Http1xServerConnection)req.connection()).metric(), ws));
}
if (handlers.wsHandler != null) {
handlers.wsHandler.handle(ws);
if (!ws.isRejected()) {
ws.connectNow();
} else {
handlers.requestHandler.handle(req);
req.response().setStatusCode(ws.getRejectedStatus().code()).end();
}
} else {
handlers.requestHandler.handle(req);

View File

@@ -1,59 +0,0 @@
/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.http.impl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.impl.HandlerHolder;
import io.vertx.core.net.impl.SSLHelper;
import io.vertx.core.spi.metrics.HttpServerMetrics;
public class WebSocketServerHandler extends Http1xServerHandler {
private FullHttpRequest wsRequest;
WebSocketServerHandler(SSLHelper sslHelper, HttpServerOptions options, String serverOrigin, HandlerHolder<HttpHandlers> holder, HttpServerMetrics metrics) {
super(sslHelper, options, serverOrigin, holder, metrics);
}
@Override
public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
final HttpRequest request = (HttpRequest) msg;
if (HttpServerImpl.log.isTraceEnabled()) {
HttpServerImpl.log.trace("Server received request: " + request.uri());
}
if (request.headers().contains(io.vertx.core.http.HttpHeaders.UPGRADE, io.vertx.core.http.HttpHeaders.WEBSOCKET, true)) {
if (wsRequest == null) {
if (request instanceof FullHttpRequest) {
super.channelRead(chctx, request);
} else {
wsRequest = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri());
wsRequest.headers().set(request.headers());
}
return;
}
}
} else if (msg instanceof HttpContent) {
if (wsRequest != null) {
wsRequest.content().writeBytes(((HttpContent) msg).content());
if (msg instanceof LastHttpContent) {
FullHttpRequest req = wsRequest;
wsRequest = null;
super.channelRead(chctx, req);
return;
}
}
}
super.channelRead(chctx, msg);
}
}

View File

@@ -1186,6 +1186,23 @@ public class WebsocketTest extends VertxTestBase {
await();
}
@Test
public void testRequestEntityTooLarge() {
String path = "/some/path";
server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT)).websocketHandler(ws -> fail());
server.listen(onSuccess(ar -> {
client.get(DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTPS_HOST, path, resp -> {
assertEquals(413, resp.statusCode());
resp.request().connection().closeHandler(v -> {
testComplete();
});
}).putHeader("Upgrade", "Websocket")
.putHeader("Connection", "Upgrade")
.end(TestUtils.randomBuffer(8192 + 1));
}));
await();
}
@Test
public void testWriteMessageHybi00() {
testWriteMessage(256, WebsocketVersion.V00);