Make sure that the HttpServer connection handler is always called on the event-loop context (even for worker contexts)

This commit is contained in:
Julien Viet
2019-12-03 23:48:30 +01:00
parent 4f23ce260d
commit abb2de2160
6 changed files with 49 additions and 19 deletions

View File

@@ -62,6 +62,8 @@ public interface HttpServer extends Measured {
/**
* Set a connection handler for the server.
* <br/>
* The handler will always be called on the event-loop thread.
*
* @return a reference to this, so the API can be used fluently
*/

View File

@@ -21,6 +21,7 @@ import io.vertx.core.Handler;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.impl.ConnectionBase;
import java.util.ArrayList;
@@ -72,7 +73,16 @@ public class HttpHandlers implements Handler<HttpServerConnection> {
conn.exceptionHandler(exceptionHandler);
conn.handler(requestHandler);
if (connectionHandler != null) {
connectionHandler.handle(conn);
// We hand roll event-loop execution in case of a worker context
ContextInternal ctx = conn.getContext();
ContextInternal prev = ctx.beginEmission();
try {
connectionHandler.handle(conn);
} catch (Exception e) {
ctx.reportException(e);
} finally {
ctx.endEmission(prev);
}
}
}

View File

@@ -205,7 +205,7 @@ import java.util.function.Function;
if (options.getHttp2ConnectionWindowSize() > 0) {
conn.setWindowSize(options.getHttp2ConnectionWindowSize());
}
ctx.dispatchFromIO(conn, handler_);
handler_.handle(conn);
});
return handler;
}
@@ -255,9 +255,6 @@ import java.util.function.Function;
if (metrics != null) {
conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName()));
}
holder.context.dispatchFromIO(conn, holder.handler);
holder.handler.handle(conn);
}
}

View File

@@ -16,17 +16,19 @@ import io.vertx.codegen.annotations.Fluent;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.impl.ContextInternal;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public interface HttpServerConnection extends HttpConnection {
ContextInternal getContext();
Channel channel();
ChannelHandlerContext channelHandlerContext();
@Fluent
HttpServerConnection handler(Handler<HttpServerRequest> handler);
}

View File

@@ -17,6 +17,7 @@ import io.vertx.core.impl.ContextInternal;
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class HandlerHolder<T> {
public final ContextInternal context;
public final T handler;

View File

@@ -3316,7 +3316,11 @@ public abstract class HttpTest extends HttpTestBase {
@Test
public void testWorkerServer() throws Exception {
int numReq = 5; // 5 == the HTTP/1 pool max size
waitFor(numReq);
CyclicBarrier barrier = new CyclicBarrier(numReq);
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger connCount = new AtomicInteger();
vertx.deployVerticle(() -> new AbstractVerticle() {
@Override
public void start(Promise<Void> startPromise) {
@@ -3325,25 +3329,39 @@ public abstract class HttpTest extends HttpTestBase {
Context current = Vertx.currentContext();
assertTrue(current.isWorkerContext());
assertSameEventLoop(context, current);
try {
barrier.await(20, TimeUnit.SECONDS);
} catch (Exception e) {
fail(e);
}
req.response().end("pong");
}).listen()
}).connectionHandler(conn -> {
Context current = Vertx.currentContext();
assertTrue(Context.isOnEventLoopThread());
assertTrue(current.isWorkerContext());
assertSame(context, current);
connCount.incrementAndGet(); // No complete here as we may have 1 or 5 connections depending on the protocol
}).listen()
.<Void>mapEmpty()
.onComplete(startPromise);
}
}, new DeploymentOptions().setWorker(true), onSuccess(id -> latch.countDown()));
awaitLatch(latch);
client.request(
HttpMethod.GET,
testAddress,
new RequestOptions()
.setPort(DEFAULT_HTTP_PORT)
.setHost(DEFAULT_HTTP_HOST)
.setURI(DEFAULT_TEST_URI),
onSuccess(resp -> {
testComplete();
})
).end();
for (int i = 0;i < numReq;i++) {
client.request(
HttpMethod.GET,
testAddress,
new RequestOptions()
.setPort(DEFAULT_HTTP_PORT)
.setHost(DEFAULT_HTTP_HOST)
.setURI(DEFAULT_TEST_URI),
onSuccess(resp -> {
complete();
})
).end();
}
await();
assertTrue(connCount.get() > 0);
}
@Test