From 8bf0bfa4cceec4be6eb99486c5d7d03e0e6cc262 Mon Sep 17 00:00:00 2001 From: purplefox Date: Fri, 20 Jun 2014 15:44:10 +0100 Subject: [PATCH] Work on threading model, plus more threading checks and improvements --- .gitignore | 1 + vertx-core/pom.xml | 3 + .../impl/DatagramChannelFutureListener.java | 13 +- .../datagram/impl/DatagramServerHandler.java | 17 +-- .../datagram/impl/DatagramSocketImpl.java | 11 +- .../java/core/dns/impl/DnsClientImpl.java | 27 ++--- .../java/core/eventbus/impl/EventBusImpl.java | 2 +- .../java/core/file/impl/AsyncFileImpl.java | 12 +- .../java/core/http/HttpServerRequest.java | 1 - .../java/core/http/impl/ClientConnection.java | 8 +- .../java/core/http/impl/HttpClientImpl.java | 112 +++++++++--------- .../java/core/http/impl/HttpServerImpl.java | 46 +++---- .../java/core/http/impl/VertxHttpHandler.java | 20 +--- .../vertx/java/core/impl/BlockingAction.java | 2 +- .../org/vertx/java/core/impl/ContextImpl.java | 92 ++++++++------ .../org/vertx/java/core/impl/ContextTask.java | 10 ++ .../java/core/impl/EventLoopContext.java | 39 +++++- .../core/impl/MultiThreadedWorkerContext.java | 9 +- .../org/vertx/java/core/impl/VertxImpl.java | 36 +++--- .../vertx/java/core/impl/WorkerContext.java | 15 ++- .../vertx/java/core/net/NetServerOptions.java | 2 - .../java/core/net/impl/ConnectionBase.java | 2 +- .../java/core/net/impl/NetClientImpl.java | 86 +++++++------- .../java/core/net/impl/NetServerImpl.java | 39 +++--- .../java/core/net/impl/NetSocketImpl.java | 20 +--- .../vertx/java/core/net/impl/SSLHelper.java | 2 +- .../java/core/net/impl/VertxHandler.java | 25 ++-- .../java/core/net/impl/VertxNetHandler.java | 21 +--- .../vertx/java/tests/core/DatagramTest.java | 4 - .../org/vertx/java/tests/core/HttpTest.java | 46 ++++--- .../org/vertx/java/tests/core/NetTest.java | 12 +- 31 files changed, 354 insertions(+), 381 deletions(-) create mode 100644 vertx-core/src/main/java/org/vertx/java/core/impl/ContextTask.java diff --git a/.gitignore b/.gitignore index b70684827..a3629d59a 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ ScratchTest.java test-results test-tmp *.class +ScratchPad.java diff --git a/vertx-core/pom.xml b/vertx-core/pom.xml index 62238f182..f517ac891 100644 --- a/vertx-core/pom.xml +++ b/vertx-core/pom.xml @@ -77,6 +77,9 @@ PARANOID -server -Xms128m -Xmx1024m -XX:NewRatio=2 + + **/ScratchPad.java + diff --git a/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramChannelFutureListener.java b/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramChannelFutureListener.java index 10c7aef04..f2574457b 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramChannelFutureListener.java +++ b/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramChannelFutureListener.java @@ -15,7 +15,6 @@ */ package org.vertx.java.core.datagram.impl; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.vertx.java.core.AsyncResult; @@ -42,17 +41,7 @@ final class DatagramChannelFutureListener implements ChannelFutureListener { @Override public void operationComplete(final ChannelFuture future) throws Exception { - Channel ch = future.channel(); - if (context.isOnCorrectWorker(ch.eventLoop())) { - try { - vertx.setContext(context); - notifyHandler(future); - } catch (Throwable t) { - context.reportException(t); - } - } else { - context.execute(() -> notifyHandler(future)); - } + context.execute(() -> notifyHandler(future), true); } private void notifyHandler(ChannelFuture future) { diff --git a/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramServerHandler.java b/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramServerHandler.java index b85887819..d495676d0 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramServerHandler.java +++ b/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramServerHandler.java @@ -47,22 +47,7 @@ final class DatagramServerHandler extends VertxHandler { @SuppressWarnings("unchecked") @Override protected void channelRead(final DatagramSocketImpl server, final ContextImpl context, ChannelHandlerContext chctx, final Object msg) throws Exception { - if (context.isOnCorrectWorker(chctx.channel().eventLoop())) { - try { - vertx.setContext(context); - server.handleMessage((org.vertx.java.core.datagram.DatagramPacket) msg); - } catch (Throwable t) { - context.reportException(t); - } - } else { - context.execute(() -> { - try { - server.handleMessage((org.vertx.java.core.datagram.DatagramPacket) msg); - } catch (Throwable t) { - context.reportException(t); - } - }); - } + context.execute(() -> server.handleMessage((org.vertx.java.core.datagram.DatagramPacket) msg), true); } @Override diff --git a/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramSocketImpl.java b/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramSocketImpl.java index 8c3960f04..0842094b2 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramSocketImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/datagram/impl/DatagramSocketImpl.java @@ -288,15 +288,6 @@ public class DatagramSocketImpl extends ConnectionBase private void notifyException(final Handler> handler, final Throwable cause) { - if (context.isOnCorrectWorker(channel().eventLoop())) { - try { - vertx.setContext(context); - handler.handle(new FutureResultImpl<>(cause)); - } catch (Throwable t) { - context.reportException(t); - } - } else { - context.execute(() -> handler.handle(new FutureResultImpl<>(cause))); - } + context.execute(() -> handler.handle(new FutureResultImpl<>(cause)), true); } } diff --git a/vertx-core/src/main/java/org/vertx/java/core/dns/impl/DnsClientImpl.java b/vertx-core/src/main/java/org/vertx/java/core/dns/impl/DnsClientImpl.java index 20317004b..1a07707f2 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/dns/impl/DnsClientImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/dns/impl/DnsClientImpl.java @@ -211,7 +211,7 @@ public final class DnsClientImpl implements DnsClient { } catch (final UnknownHostException e) { // Should never happen as we work with ip addresses as input // anyway just in case notify the handler - actualCtx.execute(() -> handler.handle(new FutureResultImpl<>(e))); + actualCtx.execute(() -> handler.handle(new FutureResultImpl<>(e)), false); } return this; } @@ -275,26 +275,13 @@ public final class DnsClientImpl implements DnsClient { if (r.complete()) { return; } - if (actualCtx.isOnCorrectWorker(loop)) { - try { - vertx.setContext(actualCtx); - if (result instanceof Throwable) { - r.setFailure((Throwable) result); - } else { - r.setResult(result); - } - } catch (Throwable t) { - actualCtx.reportException(t); + actualCtx.execute(() -> { + if (result instanceof Throwable) { + r.setFailure((Throwable) result); + } else { + r.setResult(result); } - } else { - actualCtx.execute(() -> { - if (result instanceof Throwable) { - r.setFailure((Throwable) result); - } else { - r.setResult(result); - } - }); - } + }, true); } private static final class HandlerAdapter implements Handler>> { diff --git a/vertx-core/src/main/java/org/vertx/java/core/eventbus/impl/EventBusImpl.java b/vertx-core/src/main/java/org/vertx/java/core/eventbus/impl/EventBusImpl.java index 7070acb3c..2210efe45 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/eventbus/impl/EventBusImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/eventbus/impl/EventBusImpl.java @@ -916,7 +916,7 @@ public class EventBusImpl implements EventBus { unregisterHandler(msg.address, holder.handler); } } - }); + }, false); } private void checkStarted() { diff --git a/vertx-core/src/main/java/org/vertx/java/core/file/impl/AsyncFileImpl.java b/vertx-core/src/main/java/org/vertx/java/core/file/impl/AsyncFileImpl.java index debca25a5..7421dab70 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/file/impl/AsyncFileImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/file/impl/AsyncFileImpl.java @@ -341,14 +341,14 @@ public class AsyncFileImpl implements AsyncFile { context.execute(() -> { writesOutstanding -= buff.limit(); handler.handle(new FutureResultImpl().setResult(null)); - }); + }, false); } } public void failed(Throwable exc, Object attachment) { if (exc instanceof Exception) { Exception e = (Exception) exc; - context.execute(() -> handler.handle(new FutureResultImpl().setResult(null))); + context.execute(() -> handler.handle(new FutureResultImpl().setResult(null)), false); } else { log.error("Error occurred", exc); } @@ -369,7 +369,7 @@ public class AsyncFileImpl implements AsyncFile { buff.flip(); writeBuff.setBytes(offset, buff); result.setResult(writeBuff).setHandler(handler); - }); + }, false); } public void completed(Integer bytesRead, Object attachment) { @@ -388,11 +388,7 @@ public class AsyncFileImpl implements AsyncFile { } public void failed(Throwable t, Object attachment) { - context.execute(new Runnable() { - public void run() { - result.setFailure(t).setHandler(handler); - } - }); + context.execute(() -> result.setFailure(t).setHandler(handler), false); } }); } diff --git a/vertx-core/src/main/java/org/vertx/java/core/http/HttpServerRequest.java b/vertx-core/src/main/java/org/vertx/java/core/http/HttpServerRequest.java index f07b6dcd9..a79f89e26 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/http/HttpServerRequest.java +++ b/vertx-core/src/main/java/org/vertx/java/core/http/HttpServerRequest.java @@ -25,7 +25,6 @@ import org.vertx.java.core.streams.ReadStream; import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.cert.X509Certificate; -import java.net.URI; /** * Represents a server-side HTTP request.

diff --git a/vertx-core/src/main/java/org/vertx/java/core/http/impl/ClientConnection.java b/vertx-core/src/main/java/org/vertx/java/core/http/impl/ClientConnection.java index efc6ba49f..0ed51593e 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/http/impl/ClientConnection.java +++ b/vertx-core/src/main/java/org/vertx/java/core/http/impl/ClientConnection.java @@ -149,18 +149,18 @@ class ClientConnection extends ConnectionBase { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - context.execute(ctx.channel().eventLoop(), () -> { + context.execute(() -> { // if still handshaking this means we not got any response back from the server and so need to notify the client // about it as otherwise the client would never been notified. if (handshaking) { handleException(new WebSocketHandshakeException("Connection closed while handshake in process")); } - }); + }, true); } @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { - context.execute(ctx.channel().eventLoop(), () -> { + context.execute(() -> { if (handshaker != null && handshaking) { if (msg instanceof HttpResponse) { HttpResponse resp = (HttpResponse) msg; @@ -197,7 +197,7 @@ class ClientConnection extends ConnectionBase { } else { buffered.add(msg); } - }); + }, true); } private void handleException(WebSocketHandshakeException e) { diff --git a/vertx-core/src/main/java/org/vertx/java/core/http/impl/HttpClientImpl.java b/vertx-core/src/main/java/org/vertx/java/core/http/impl/HttpClientImpl.java index 46cf143b5..67ec66780 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/http/impl/HttpClientImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/http/impl/HttpClientImpl.java @@ -35,7 +35,6 @@ import org.vertx.java.core.impl.VertxInternal; import org.vertx.java.core.net.NetSocket; import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator; import org.vertx.java.core.net.impl.SSLHelper; -import org.vertx.java.core.net.impl.VertxEventLoopGroup; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLHandshakeException; @@ -50,9 +49,8 @@ public class HttpClientImpl implements HttpClient { private final VertxInternal vertx; private final HttpClientOptions options; private final Map connectionMap = new ConcurrentHashMap<>(); - private final ContextImpl actualCtx; + private final ContextImpl creatingContext; private final ConnectionManager pool; - private Bootstrap bootstrap; private Handler exceptionHandler; private final Closeable closeHook; private boolean closed; @@ -62,16 +60,18 @@ public class HttpClientImpl implements HttpClient { this.vertx = vertx; this.options = new HttpClientOptions(options); this.sslHelper = new SSLHelper(options); - actualCtx = vertx.getOrCreateContext(); + this.creatingContext = vertx.getContext(); closeHook = doneHandler -> { HttpClientImpl.this.close(); doneHandler.handle(new FutureResultImpl<>((Void)null)); }; - actualCtx.addCloseHook(closeHook); + if (creatingContext != null) { + creatingContext.addCloseHook(closeHook); + } pool = new ConnectionManager(vertx) { protected void connect(String host, int port, Handler connectHandler, Handler connectErrorHandler, ContextImpl context, ConnectionLifeCycleListener listener) { - internalConnect(port, host, connectHandler, connectErrorHandler, listener); + internalConnect(context, port, host, connectHandler, connectErrorHandler, listener); } }; pool.setKeepAlive(options.isKeepAlive()); @@ -89,13 +89,14 @@ public class HttpClientImpl implements HttpClient { @Override public HttpClient connectWebsocket(WebSocketConnectOptions wsOptions, Handler wsConnect) { checkClosed(); + ContextImpl context = vertx.getOrCreateContext(); getConnection(wsOptions.getPort(), wsOptions.getHost(), conn -> { if (!conn.isClosed()) { conn.toWebSocket(wsOptions, wsOptions.getMaxWebsocketFrameSize(), wsConnect); } else { connectWebsocket(wsOptions, wsConnect); } - }, exceptionHandler, actualCtx); + }, exceptionHandler, context); return this; } @@ -167,7 +168,9 @@ public class HttpClientImpl implements HttpClient { for (ClientConnection conn : connectionMap.values()) { conn.close(); } - actualCtx.removeCloseHook(closeHook); + if (creatingContext != null) { + creatingContext.removeCloseHook(closeHook); + } closed = true; } @@ -221,38 +224,34 @@ public class HttpClientImpl implements HttpClient { bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); } - private void internalConnect(int port, String host, Handler connectHandler, Handler connectErrorHandler, - ConnectionLifeCycleListener listener) { - if (bootstrap == null) { - VertxEventLoopGroup pool = new VertxEventLoopGroup(); - pool.addWorker(actualCtx.getEventLoop()); - bootstrap = new Bootstrap(); - bootstrap.group(pool); - bootstrap.channel(NioSocketChannel.class); - sslHelper.checkSSL(vertx); - bootstrap.handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - if (options.isSsl()) { - SSLEngine engine = sslHelper.getSslContext().createSSLEngine(host, port); - if (options.isVerifyHost()) { - SSLParameters sslParameters = engine.getSSLParameters(); - sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); - engine.setSSLParameters(sslParameters); - } - engine.setUseClientMode(true); //We are on the client side of the connection - pipeline.addLast("ssl", new SslHandler(engine)); + private void internalConnect(ContextImpl context, int port, String host, Handler connectHandler, Handler connectErrorHandler, + ConnectionLifeCycleListener listener) { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(context.getEventLoop()); + bootstrap.channel(NioSocketChannel.class); + sslHelper.checkSSL(vertx); + bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + if (options.isSsl()) { + SSLEngine engine = sslHelper.getSslContext().createSSLEngine(host, port); + if (options.isVerifyHost()) { + SSLParameters sslParameters = engine.getSSLParameters(); + sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); + engine.setSSLParameters(sslParameters); } - - pipeline.addLast("codec", new HttpClientCodec(4096, 8192, 8192, false, false)); - if (options.isTryUseCompression()) { - pipeline.addLast("inflater", new HttpContentDecompressor(true)); - } - pipeline.addLast("handler", new ClientHandler()); + engine.setUseClientMode(true); //We are on the client side of the connection + pipeline.addLast("ssl", new SslHandler(engine)); } - }); - } + + pipeline.addLast("codec", new HttpClientCodec(4096, 8192, 8192, false, false)); + if (options.isTryUseCompression()) { + pipeline.addLast("inflater", new HttpContentDecompressor(true)); + } + pipeline.addLast("handler", new ClientHandler(context)); + } + }); applyConnectionOptions(bootstrap); ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); future.addListener(new ChannelFutureListener() { @@ -267,17 +266,17 @@ public class HttpClientImpl implements HttpClient { Future fut = sslHandler.handshakeFuture(); fut.addListener(future -> { if (future.isSuccess()) { - connected(port, host, ch, connectHandler, listener); + connected(context, port, host, ch, connectHandler, listener); } else { - connectionFailed(ch, connectErrorHandler, new SSLHandshakeException("Failed to create SSL connection"), + connectionFailed(context, ch, connectErrorHandler, new SSLHandshakeException("Failed to create SSL connection"), listener); } }); } else { - connected(port, host, ch, connectHandler, listener); + connected(context, port, host, ch, connectHandler, listener); } } else { - connectionFailed(ch, connectErrorHandler, channelFuture.cause(), listener); + connectionFailed(context, ch, connectErrorHandler, channelFuture.cause(), listener); } } }); @@ -285,7 +284,8 @@ public class HttpClientImpl implements HttpClient { private HttpClientRequest doRequest(String method, RequestOptions options, Handler responseHandler) { checkClosed(); - HttpClientRequest req = new HttpClientRequestImpl(this, method, options, responseHandler, actualCtx); + ContextImpl context = vertx.getOrCreateContext(); + HttpClientRequest req = new HttpClientRequestImpl(this, method, options, responseHandler, context); if (options.getHeaders() != null) { req.headers().set(options.getHeaders()); } @@ -298,13 +298,13 @@ public class HttpClientImpl implements HttpClient { } } - private void connected(int port, String host, Channel ch, Handler connectHandler, ConnectionLifeCycleListener listener) { - actualCtx.execute(ch.eventLoop(), () -> createConn(port, host, ch, connectHandler, listener)); + private void connected(ContextImpl context, int port, String host, Channel ch, Handler connectHandler, ConnectionLifeCycleListener listener) { + context.execute(() -> createConn(context, port, host, ch, connectHandler, listener), true); } - private void createConn(int port, String host, Channel ch, Handler connectHandler, ConnectionLifeCycleListener listener) { + private void createConn(ContextImpl context, int port, String host, Channel ch, Handler connectHandler, ConnectionLifeCycleListener listener) { ClientConnection conn = new ClientConnection(vertx, HttpClientImpl.this, ch, - options.isSsl(), host, port, actualCtx, listener); + options.isSsl(), host, port, context, listener); conn.closeHandler(v -> { // The connection has been closed - tell the pool about it, this allows the pool to create more // connections. Note the pool doesn't actually remove the connection, when the next person to get a connection @@ -315,14 +315,14 @@ public class HttpClientImpl implements HttpClient { connectHandler.handle(conn); } - private void connectionFailed(Channel ch, Handler connectionExceptionHandler, + private void connectionFailed(ContextImpl context, Channel ch, Handler connectionExceptionHandler, Throwable t, ConnectionLifeCycleListener listener) { // If no specific exception handler is provided, fall back to the HttpClient's exception handler. // If that doesn't exist just log it Handler exHandler = - connectionExceptionHandler == null ? (exceptionHandler == null ? actualCtx::reportException : exceptionHandler ): connectionExceptionHandler; + connectionExceptionHandler == null ? (exceptionHandler == null ? context::reportException : exceptionHandler ): connectionExceptionHandler; - actualCtx.execute(ch.eventLoop(), () -> { + context.execute(() -> { listener.connectionClosed(null); try { ch.close(); @@ -331,9 +331,9 @@ public class HttpClientImpl implements HttpClient { if (exHandler != null) { exHandler.handle(t); } else { - actualCtx.reportException(t); + context.reportException(t); } - }); + }, true); } private Handler connectHandler(Handler responseHandler) { @@ -384,7 +384,7 @@ public class HttpClientImpl implements HttpClient { public NetSocket netSocket() { if (!resumed) { resumed = true; - vertx.getContext().execute(socket::resume); // resume the socket now as the user had the chance to register a dataHandler + vertx.getContext().execute(socket::resume, false); // resume the socket now as the user had the chance to register a dataHandler } return socket; } @@ -428,14 +428,16 @@ public class HttpClientImpl implements HttpClient { private class ClientHandler extends VertxHttpHandler { private boolean closeFrameSent; + private ContextImpl context; - public ClientHandler() { + public ClientHandler(ContextImpl context) { super(vertx, HttpClientImpl.this.connectionMap); + this.context = context; } @Override protected ContextImpl getContext(ClientConnection connection) { - return actualCtx; + return context; } @Override diff --git a/vertx-core/src/main/java/org/vertx/java/core/http/impl/HttpServerImpl.java b/vertx-core/src/main/java/org/vertx/java/core/http/impl/HttpServerImpl.java index 31987c665..74944aa3b 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/http/impl/HttpServerImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/http/impl/HttpServerImpl.java @@ -72,7 +72,7 @@ public class HttpServerImpl implements HttpServer, Closeable { private final HttpServerOptions options; private final VertxInternal vertx; private final SSLHelper sslHelper; - private final ContextImpl actualCtx; + private final ContextImpl creatingContext; private final Map connectionMap = new ConcurrentHashMap<>(); private final VertxEventLoopGroup availableWorkers = new VertxEventLoopGroup(); private Handler requestHandler; @@ -86,12 +86,15 @@ public class HttpServerImpl implements HttpServer, Closeable { private HttpServerImpl actualServer; private HandlerManager reqHandlerManager = new HandlerManager<>(availableWorkers); private HandlerManager wsHandlerManager = new HandlerManager<>(availableWorkers); + private ContextImpl listenContext; public HttpServerImpl(VertxInternal vertx, HttpServerOptions options) { this.options = new HttpServerOptions(options); this.vertx = vertx; - actualCtx = vertx.getOrCreateContext(); - actualCtx.addCloseHook(this); + this.creatingContext = vertx.getContext(); + if (creatingContext != null) { + creatingContext.addCloseHook(this); + } sslHelper = new SSLHelper(options); } @@ -148,14 +151,14 @@ public class HttpServerImpl implements HttpServer, Closeable { bootstrap.option(ChannelOption.SO_BACKLOG, options.getAcceptBacklog()); } - public HttpServer listen(final Handler> listenHandler) { if (requestHandler == null && wsHandler == null) { throw new IllegalStateException("Set request or websocket handler first"); } if (listening) { - throw new IllegalStateException("Listen already called"); + throw new IllegalStateException("Already listening"); } + listenContext = vertx.getOrCreateContext(); listening = true; synchronized (vertx.sharedHttpServers()) { @@ -208,7 +211,7 @@ public class HttpServerImpl implements HttpServer, Closeable { } }); - addHandlers(this); + addHandlers(this, listenContext); try { bindFuture = bootstrap.bind(new InetSocketAddress(InetAddress.getByName(options.getHost()), options.getPort())); Channel serverChannel = bindFuture.channel(); @@ -227,7 +230,7 @@ public class HttpServerImpl implements HttpServer, Closeable { vertx.runOnContext(v -> listenHandler.handle(new FutureResultImpl<>(t))); } else { // No handler - log so user can see failure - actualCtx.reportException(t); + listenContext.reportException(t); } listening = false; return this; @@ -237,7 +240,7 @@ public class HttpServerImpl implements HttpServer, Closeable { } else { // Server already exists with that host/port - we will use that actualServer = shared; - addHandlers(actualServer); + addHandlers(actualServer, listenContext); } actualServer.bindFuture.addListener(new ChannelFutureListener() { @Override @@ -250,11 +253,11 @@ public class HttpServerImpl implements HttpServer, Closeable { res = new FutureResultImpl<>(future.cause()); listening = false; } - actualCtx.execute(future.channel().eventLoop(), () -> listenHandler.handle(res)); + listenContext.execute(() -> listenHandler.handle(res), true); } else if (!future.isSuccess()) { listening = false; // No handler - log so user can see failure - actualCtx.reportException(future.cause()); + listenContext.reportException(future.cause()); } } }); @@ -262,12 +265,12 @@ public class HttpServerImpl implements HttpServer, Closeable { return this; } - private void addHandlers(HttpServerImpl server) { + private void addHandlers(HttpServerImpl server, ContextImpl context) { if (requestHandler != null) { - server.reqHandlerManager.addHandler(requestHandler, actualCtx); + server.reqHandlerManager.addHandler(requestHandler, context); } if (wsHandler != null) { - server.wsHandlerManager.addHandler(wsHandler, actualCtx); + server.wsHandlerManager.addHandler(wsHandler, context); } } @@ -278,8 +281,9 @@ public class HttpServerImpl implements HttpServer, Closeable { @Override public void close(final Handler> done) { + ContextImpl context = vertx.getOrCreateContext(); if (!listening) { - executeCloseDone(actualCtx, done, null); + executeCloseDone(context, done, null); return; } listening = false; @@ -289,28 +293,30 @@ public class HttpServerImpl implements HttpServer, Closeable { if (actualServer != null) { if (requestHandler != null) { - actualServer.reqHandlerManager.removeHandler(requestHandler, actualCtx); + actualServer.reqHandlerManager.removeHandler(requestHandler, listenContext); } if (wsHandler != null) { - actualServer.wsHandlerManager.removeHandler(wsHandler, actualCtx); + actualServer.wsHandlerManager.removeHandler(wsHandler, listenContext); } if (actualServer.reqHandlerManager.hasHandlers() || actualServer.wsHandlerManager.hasHandlers()) { // The actual server still has handlers so we don't actually close it if (done != null) { - executeCloseDone(actualCtx, done, null); + executeCloseDone(context, done, null); } } else { // No Handlers left so close the actual server // The done handler needs to be executed on the context that calls close, NOT the context // of the actual server - actualServer.actualClose(actualCtx, done); + actualServer.actualClose(context, done); } } } requestHandler = null; wsHandler = null; - actualCtx.removeCloseHook(this); + if (creatingContext != null) { + creatingContext.removeCloseHook(this); + } } SSLHelper getSslHelper() { @@ -351,7 +357,7 @@ public class HttpServerImpl implements HttpServer, Closeable { private void executeCloseDone(final ContextImpl closeContext, final Handler> done, final Exception e) { if (done != null) { - closeContext.execute(() -> done.handle(new FutureResultImpl<>(e))); + closeContext.execute(() -> done.handle(new FutureResultImpl<>(e)), false); } } diff --git a/vertx-core/src/main/java/org/vertx/java/core/http/impl/VertxHttpHandler.java b/vertx-core/src/main/java/org/vertx/java/core/http/impl/VertxHttpHandler.java index c9026eff1..b71dcc87d 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/http/impl/VertxHttpHandler.java +++ b/vertx-core/src/main/java/org/vertx/java/core/http/impl/VertxHttpHandler.java @@ -56,26 +56,12 @@ public abstract class VertxHttpHandler extends VertxHa // we are reading from the channel Channel ch = chctx.channel(); // We need to do this since it's possible the server is being used from a worker context - if (context.isOnCorrectWorker(ch.eventLoop())) { - try { - vertx.setContext(context); - doMessageReceived(connection, chctx, msg); - } catch (Throwable t) { - context.reportException(t); - } - } else { - context.execute(() -> { - try { - doMessageReceived(connection, chctx, msg); - } catch (Throwable t) { - context.reportException(t); - } - }); - } + context.execute(() -> doMessageReceived(connection, chctx, msg), true); + } else { try { doMessageReceived(connection, chctx, msg); - } catch (Throwable t) { + } catch (Throwable t) { chctx.pipeline().fireExceptionCaught(t); } } diff --git a/vertx-core/src/main/java/org/vertx/java/core/impl/BlockingAction.java b/vertx-core/src/main/java/org/vertx/java/core/impl/BlockingAction.java index 525803c07..7ac82ff1a 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/impl/BlockingAction.java +++ b/vertx-core/src/main/java/org/vertx/java/core/impl/BlockingAction.java @@ -51,7 +51,7 @@ public abstract class BlockingAction { res.setFailure(e); } if (handler != null) { - context.execute(() -> res.setHandler(handler)); + context.execute(() -> res.setHandler(handler), false); } }); } diff --git a/vertx-core/src/main/java/org/vertx/java/core/impl/ContextImpl.java b/vertx-core/src/main/java/org/vertx/java/core/impl/ContextImpl.java index 8244961f6..4387495ab 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/impl/ContextImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/impl/ContextImpl.java @@ -126,31 +126,22 @@ public abstract class ContextImpl implements Context { } } - public abstract void execute(Runnable handler); + public abstract void doExecute(ContextTask task); - public abstract boolean isOnCorrectWorker(EventLoop worker); + public abstract boolean isEventLoopContext(); - // FIXME - make sure this is right and get rid of worker param - public void execute(EventLoop worker, Runnable handler) { - boolean correctThread; - Thread thread = Thread.currentThread(); - if (thread instanceof VertxThread) { - VertxThread vthread = (VertxThread)thread; - Context ctx = vthread.getContext(); - correctThread = ctx == this; + public void execute(ContextTask task, boolean expectRightThread) { + if (isOnCorrectContextThread(expectRightThread)) { + wrapTask(task, false, true).run(); } else { - correctThread = false; - } - if (correctThread) { - wrapTask(handler).run(); - } else { - //System.out.println("Wrong thread, will execute on correct one: " + Thread.currentThread()); - execute(handler); + doExecute(task); } } + protected abstract boolean isOnCorrectContextThread(boolean expectRightThread); + public void runOnContext(final Handler task) { - execute(() -> task.handle(null)); + execute(() -> task.handle(null), false); } public EventLoop getEventLoop() { @@ -159,8 +150,8 @@ public abstract class ContextImpl implements Context { // This executes the task in the worker pool using the ordered executor of the context // It's used e.g. from BlockingActions - protected void executeOnOrderedWorkerExec(final Runnable task) { - orderedBgExec.execute(wrapTask(task)); + protected void executeOnOrderedWorkerExec(ContextTask task) { + orderedBgExec.execute(wrapTask(task, false, false)); } public void close() { @@ -172,26 +163,49 @@ public abstract class ContextImpl implements Context { vertx.setContext(null); } - protected Runnable wrapTask(final Runnable task) { - return () -> { - Thread currentThread = Thread.currentThread(); - String threadName = currentThread.getName(); - try { - vertx.setContext(ContextImpl.this); - task.run(); - } catch (Throwable t) { - reportException(t); - } finally { - if (!threadName.equals(currentThread.getName())) { - currentThread.setName(threadName); + protected void setThread(Thread thread) { + } + + protected Runnable wrapTask(ContextTask task, boolean checkThreadName, boolean setThread) { + checkThreadName = true; + if (checkThreadName) { + return () -> { + Thread currentThread = Thread.currentThread(); + if (setThread) { + setThread(currentThread); } - } - if (closed) { - // We allow tasks to be run after the context is closed but we make sure we unset the context afterwards - // to avoid any leaks - unsetContext(); - } - }; + String threadName = currentThread.getName(); + try { + vertx.setContext(ContextImpl.this); + task.run(); + } catch (Throwable t) { + reportException(t); + } finally { + if (!threadName.equals(currentThread.getName())) { + currentThread.setName(threadName); + } + } + if (closed) { + // We allow tasks to be run after the context is closed but we make sure we unset the context afterwards + // to avoid any leaks + unsetContext(); + } + }; + } else { + return () -> { + try { + vertx.setContext(ContextImpl.this); + task.run(); + } catch (Throwable t) { + reportException(t); + } + if (closed) { + // We allow tasks to be run after the context is closed but we make sure we unset the context afterwards + // to avoid any leaks + unsetContext(); + } + }; + } } } diff --git a/vertx-core/src/main/java/org/vertx/java/core/impl/ContextTask.java b/vertx-core/src/main/java/org/vertx/java/core/impl/ContextTask.java new file mode 100644 index 000000000..feb723d63 --- /dev/null +++ b/vertx-core/src/main/java/org/vertx/java/core/impl/ContextTask.java @@ -0,0 +1,10 @@ +package org.vertx.java.core.impl; + +/** + * @author Tim Fox + */ +@FunctionalInterface +public interface ContextTask { + + public void run() throws Exception; +} diff --git a/vertx-core/src/main/java/org/vertx/java/core/impl/EventLoopContext.java b/vertx-core/src/main/java/org/vertx/java/core/impl/EventLoopContext.java index 2a1b1b94f..08e6e513b 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/impl/EventLoopContext.java +++ b/vertx-core/src/main/java/org/vertx/java/core/impl/EventLoopContext.java @@ -16,7 +16,6 @@ package org.vertx.java.core.impl; -import io.netty.channel.EventLoop; import org.vertx.java.core.logging.Logger; import org.vertx.java.core.logging.impl.LoggerFactory; @@ -29,16 +28,46 @@ public class EventLoopContext extends ContextImpl { private static final Logger log = LoggerFactory.getLogger(EventLoopContext.class); + private Thread contextThread; + public EventLoopContext(VertxInternal vertx, Executor bgExec) { super(vertx, bgExec); } - public void execute(Runnable task) { - getEventLoop().execute(wrapTask(task)); + public void doExecute(ContextTask task) { + getEventLoop().execute(wrapTask(task, false, true)); } - public boolean isOnCorrectWorker(EventLoop worker) { - return getEventLoop() == worker; + @Override + public boolean isEventLoopContext() { + return true; + } + + + @Override + protected void setThread(Thread thread) { + // Sanity check - make sure Netty is really delivering events on the correct thread + if (this.contextThread == null) { + this.contextThread = thread; + } else if (this.contextThread != thread) { + //log.warn("Uh oh! Event loop context executing with wrong thread! Expected " + this.thread + " got " + thread); + throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + this.contextThread + " got " + thread); + } + } + + @Override + protected boolean isOnCorrectContextThread(boolean expectRightThread) { + Thread current = Thread.currentThread(); + boolean correct = current == contextThread; + if (expectRightThread) { + if (!(current instanceof VertxThread)) { + log.warn("Expected to be on Vert.x thread, but actually on: " + current); + } else if (!correct && contextThread != null) { + log.warn("Event delivered on unexpected thread " + current + " expected: " + contextThread); + new Exception().printStackTrace(); + } + } + return correct; } } diff --git a/vertx-core/src/main/java/org/vertx/java/core/impl/MultiThreadedWorkerContext.java b/vertx-core/src/main/java/org/vertx/java/core/impl/MultiThreadedWorkerContext.java index 69c3543cd..a936350a8 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/impl/MultiThreadedWorkerContext.java +++ b/vertx-core/src/main/java/org/vertx/java/core/impl/MultiThreadedWorkerContext.java @@ -27,7 +27,12 @@ public class MultiThreadedWorkerContext extends WorkerContext { this.bgExec = bgExec; } - public void execute(Runnable task) { - bgExec.execute(wrapTask(task)); + public void execute(ContextTask task) { + bgExec.execute(wrapTask(task, false, true)); + } + + @Override + protected boolean isOnCorrectContextThread(boolean expectRightThread) { + return false; } } diff --git a/vertx-core/src/main/java/org/vertx/java/core/impl/VertxImpl.java b/vertx-core/src/main/java/org/vertx/java/core/impl/VertxImpl.java index 569c23643..35a9cd0a7 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/impl/VertxImpl.java @@ -95,7 +95,7 @@ public class VertxImpl implements VertxInternal { this(0, hostname, null); } - public VertxImpl(int port, String hostname, final Handler> resultHandler) { + public VertxImpl(int port, String hostname, Handler> resultHandler) { ClusterManagerFactory factory; String clusterManagerFactoryClassName = System.getProperty("vertx.clusterManagerFactory"); if (clusterManagerFactoryClassName != null) { @@ -115,7 +115,7 @@ public class VertxImpl implements VertxInternal { } this.clusterManager = factory.createClusterManager(this); this.clusterManager.join(); - final Vertx inst = this; + Vertx inst = this; this.eventBus = new EventBusImpl(this, port, hostname, clusterManager, res -> { if (resultHandler != null) { if (res.succeeded()) { @@ -189,15 +189,15 @@ public class VertxImpl implements VertxInternal { return false; } - public long setPeriodic(long delay, final Handler handler) { + public long setPeriodic(long delay, Handler handler) { return scheduleTimeout(getOrCreateContext(), handler, delay, true); } - public long setTimer(long delay, final Handler handler) { + public long setTimer(long delay, Handler handler) { return scheduleTimeout(getOrCreateContext(), handler, delay, false); } - public void runOnContext(final Handler task) { + public void runOnContext(Handler task) { ContextImpl context = getOrCreateContext(); context.runOnContext(task); } @@ -260,22 +260,14 @@ public class VertxImpl implements VertxInternal { return new DnsClientImpl(this, dnsServers); } - private long scheduleTimeout(final ContextImpl context, final Handler handler, long delay, boolean periodic) { + private long scheduleTimeout(ContextImpl context, Handler handler, long delay, boolean periodic) { if (delay < 1) { throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms"); } long timerId = timeoutCounter.getAndIncrement(); - final InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, context); - final Runnable wrapped = context.wrapTask(task); - - final Runnable toRun; - final EventLoop el = context.getEventLoop(); - if (context instanceof EventLoopContext) { - toRun = wrapped; - } else { - // On worker context - toRun = () -> context.execute(wrapped); - } + InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, context); + Runnable toRun = () -> context.execute(task, false); + EventLoop el = context.getEventLoop(); Future future; if (periodic) { future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS); @@ -440,8 +432,8 @@ public class VertxImpl implements VertxInternal { } @Override - public void executeBlocking(final Action action, final Handler> resultHandler) { - final ContextImpl context = getOrCreateContext(); + public void executeBlocking(Action action, Handler> resultHandler) { + ContextImpl context = getOrCreateContext(); context.executeOnOrderedWorkerExec(() -> { FutureResultImpl res = new FutureResultImpl<>(); try { @@ -451,7 +443,7 @@ public class VertxImpl implements VertxInternal { res.setFailure(e); } if (resultHandler != null) { - context.execute(() -> res.setHandler(resultHandler)); + context.execute(() -> res.setHandler(resultHandler), false); } }); } @@ -460,7 +452,7 @@ public class VertxImpl implements VertxInternal { return clusterManager; } - private class InternalTimerHandler implements Runnable, Closeable { + private class InternalTimerHandler implements ContextTask, Closeable { final Handler handler; final boolean periodic; final long timerID; @@ -480,7 +472,7 @@ public class VertxImpl implements VertxInternal { this.periodic = periodic; } - public void run() { + public void run() throws Exception { if (!cancelled) { try { handler.handle(timerID); diff --git a/vertx-core/src/main/java/org/vertx/java/core/impl/WorkerContext.java b/vertx-core/src/main/java/org/vertx/java/core/impl/WorkerContext.java index 36c5f351e..99ad9528b 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/impl/WorkerContext.java +++ b/vertx-core/src/main/java/org/vertx/java/core/impl/WorkerContext.java @@ -16,8 +16,6 @@ package org.vertx.java.core.impl; -import io.netty.channel.EventLoop; - import java.util.concurrent.Executor; /** @@ -29,11 +27,18 @@ public class WorkerContext extends ContextImpl { super(vertx, orderedBgExec); } - public void execute(Runnable task) { - executeOnOrderedWorkerExec(wrapTask(task)); + public void doExecute(ContextTask task) { + executeOnOrderedWorkerExec(task); } - public boolean isOnCorrectWorker(EventLoop worker) { + @Override + public boolean isEventLoopContext() { return false; } + + @Override + protected boolean isOnCorrectContextThread(boolean expectRightThread) { + return false; + } + } diff --git a/vertx-core/src/main/java/org/vertx/java/core/net/NetServerOptions.java b/vertx-core/src/main/java/org/vertx/java/core/net/NetServerOptions.java index 46b9bffac..7dafd90b0 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/net/NetServerOptions.java +++ b/vertx-core/src/main/java/org/vertx/java/core/net/NetServerOptions.java @@ -16,8 +16,6 @@ package org.vertx.java.core.net; -import org.vertx.java.core.Handler; - /** * @author Tim Fox */ diff --git a/vertx-core/src/main/java/org/vertx/java/core/net/impl/ConnectionBase.java b/vertx-core/src/main/java/org/vertx/java/core/net/impl/ConnectionBase.java index bf34a5b3f..7e9d6cc05 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/net/impl/ConnectionBase.java +++ b/vertx-core/src/main/java/org/vertx/java/core/net/impl/ConnectionBase.java @@ -154,7 +154,7 @@ public abstract class ConnectionBase { } else { doneHandler.handle(new FutureResultImpl<>(channelFuture.cause())); } - }); + }, true); } else if (!channelFuture.isSuccess()) { handleException(channelFuture.cause()); } diff --git a/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetClientImpl.java b/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetClientImpl.java index 3fcd41622..3784699a4 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetClientImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetClientImpl.java @@ -48,22 +48,23 @@ public class NetClientImpl implements NetClient { private final VertxInternal vertx; private final NetClientOptions options; - private final ContextImpl actualCtx; private final SSLHelper sslHelper; private final Map socketMap = new ConcurrentHashMap<>(); private final Closeable closeHook; - private Bootstrap bootstrap; + private final ContextImpl creatingContext; public NetClientImpl(VertxInternal vertx, NetClientOptions options) { this.vertx = vertx; this.options = new NetClientOptions(options); this.sslHelper = new SSLHelper(options); - actualCtx = vertx.getOrCreateContext(); this.closeHook = doneHandler -> { NetClientImpl.this.close(); doneHandler.handle(new FutureResultImpl<>((Void)null)); }; - actualCtx.addCloseHook(closeHook); + creatingContext = vertx.getContext(); + if (creatingContext != null) { + creatingContext.addCloseHook(closeHook); + } } @Override @@ -83,7 +84,9 @@ public class NetClientImpl implements NetClient { for (NetSocket sock : socketMap.values()) { sock.close(); } - actualCtx.removeCloseHook(closeHook); + if (creatingContext != null) { + creatingContext.removeCloseHook(closeHook); + } } private void applyConnectionOptions(Bootstrap bootstrap) { @@ -106,28 +109,27 @@ public class NetClientImpl implements NetClient { private void connect(final int port, final String host, final Handler> connectHandler, final int remainingAttempts) { - if (bootstrap == null) { - sslHelper.checkSSL(vertx); - - bootstrap = new Bootstrap(); - bootstrap.group(actualCtx.getEventLoop()); - bootstrap.channel(NioSocketChannel.class); - bootstrap.handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - if (sslHelper.isSSL()) { - SslHandler sslHandler = sslHelper.createSslHandler(vertx, true); - pipeline.addLast("ssl", sslHandler); - } - if (sslHelper.isSSL()) { - // only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used. - pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support - } - pipeline.addLast("handler", new VertxNetHandler(vertx, socketMap)); + ContextImpl context = vertx.getOrCreateContext(); + sslHelper.checkSSL(vertx); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(context.getEventLoop()); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + if (sslHelper.isSSL()) { + SslHandler sslHandler = sslHelper.createSslHandler(vertx, true); + pipeline.addLast("ssl", sslHandler); } - }); - } + if (sslHelper.isSSL()) { + // only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used. + pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support + } + pipeline.addLast("handler", new VertxNetHandler(vertx, socketMap)); + } + }); + applyConnectionOptions(bootstrap); ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); future.addListener(new ChannelFutureListener() { @@ -146,48 +148,46 @@ public class NetClientImpl implements NetClient { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { - connected(ch, connectHandler); + connected(context, ch, connectHandler); } else { - failed(ch, future.cause(), connectHandler); + failed(context, ch, future.cause(), connectHandler); } } }); } else { - connected(ch, connectHandler); + connected(context, ch, connectHandler); } } else { if (remainingAttempts > 0 || remainingAttempts == -1) { - actualCtx.execute(ch.eventLoop(), () -> { + context.execute(() -> { log.debug("Failed to create connection. Will retry in " + options.getReconnectInterval() + " milliseconds"); //Set a timer to retry connection - vertx.setTimer(options.getReconnectInterval(), new Handler() { - public void handle(Long timerID) { - connect(port, host, connectHandler, remainingAttempts == -1 ? remainingAttempts : remainingAttempts - - 1); - } + vertx.setTimer(options.getReconnectInterval(), tid -> { + connect(port, host, connectHandler, remainingAttempts == -1 ? remainingAttempts : remainingAttempts + - 1); }); - }); + }, true); } else { - failed(ch, channelFuture.cause(), connectHandler); + failed(context, ch, channelFuture.cause(), connectHandler); } } } }); } - private void connected(final Channel ch, final Handler> connectHandler) { - actualCtx.execute(ch.eventLoop(), () -> doConnected(ch, connectHandler)); + private void connected(ContextImpl context, Channel ch, Handler> connectHandler) { + context.execute(() -> doConnected(context, ch, connectHandler), true); } - private void doConnected(Channel ch, final Handler> connectHandler) { - NetSocketImpl sock = new NetSocketImpl(vertx, ch, actualCtx, sslHelper, true); + private void doConnected(ContextImpl context, Channel ch, final Handler> connectHandler) { + NetSocketImpl sock = new NetSocketImpl(vertx, ch, context, sslHelper, true); socketMap.put(ch, sock); connectHandler.handle(new FutureResultImpl(sock)); } - private void failed(Channel ch, final Throwable t, final Handler> connectHandler) { + private void failed(ContextImpl context, Channel ch, Throwable t, Handler> connectHandler) { ch.close(); - actualCtx.execute(ch.eventLoop(), () -> doFailed(connectHandler, t)); + context.execute(() -> doFailed(connectHandler, t), true); } private static void doFailed(Handler> connectHandler, Throwable t) { diff --git a/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetServerImpl.java b/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetServerImpl.java index 987f2e312..b25ba4550 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetServerImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetServerImpl.java @@ -56,7 +56,7 @@ public class NetServerImpl implements NetServer, Closeable { private final VertxInternal vertx; private final NetServerOptions options; - private final ContextImpl actualCtx; + private final ContextImpl creatingContext; private final SSLHelper sslHelper; private final Map socketMap = new ConcurrentHashMap<>(); private final VertxEventLoopGroup availableWorkers = new VertxEventLoopGroup(); @@ -70,13 +70,16 @@ public class NetServerImpl implements NetServer, Closeable { private int actualPort; private Queue bindListeners = new ConcurrentLinkedQueue<>(); private boolean listenersRun; + private ContextImpl listenContext; public NetServerImpl(VertxInternal vertx, NetServerOptions options) { this.vertx = vertx; this.options = new NetServerOptions(options); this.sslHelper = new SSLHelper(options); - actualCtx = vertx.getOrCreateContext(); - actualCtx.addCloseHook(this); + this.creatingContext = vertx.getContext(); + if (creatingContext != null) { + creatingContext.addCloseHook(this); + } } @Override @@ -108,6 +111,8 @@ public class NetServerImpl implements NetServer, Closeable { } listening = true; + listenContext = vertx.getOrCreateContext(); + synchronized (vertx.sharedNetServers()) { this.actualPort = options.getPort(); // Will be updated on bind for a wildcard port id = new ServerID(options.getPort(), options.getHost()); @@ -139,7 +144,7 @@ public class NetServerImpl implements NetServer, Closeable { applyConnectionOptions(bootstrap); if (connectHandler != null) { - handlerManager.addHandler(connectHandler, actualCtx); + handlerManager.addHandler(connectHandler, listenContext); } try { @@ -168,7 +173,7 @@ public class NetServerImpl implements NetServer, Closeable { vertx.runOnContext(v -> listenHandler.handle(new FutureResultImpl<>(t))); } else { // No handler - log so user can see failure - actualCtx.reportException(t); + listenContext.reportException(t); } listening = false; return this; @@ -182,8 +187,7 @@ public class NetServerImpl implements NetServer, Closeable { actualServer = shared; this.actualPort = shared.actualPort(); if (connectHandler != null) { - // Share the event loop thread to also serve the NetServer's network traffic. - actualServer.handlerManager.addHandler(connectHandler, actualCtx); + actualServer.handlerManager.addHandler(connectHandler, listenContext); } } @@ -197,10 +201,10 @@ public class NetServerImpl implements NetServer, Closeable { listening = false; res = new FutureResultImpl<>(actualServer.bindFuture.cause()); } - actualCtx.execute(actualServer.bindFuture.channel().eventLoop(), () -> listenHandler.handle(res)); + listenContext.execute(() -> listenHandler.handle(res), true); } else if (!actualServer.bindFuture.isSuccess()) { // No handler - log so user can see failure - actualCtx.reportException(actualServer.bindFuture.cause()); + listenContext.reportException(actualServer.bindFuture.cause()); listening = false; } }); @@ -252,9 +256,10 @@ public class NetServerImpl implements NetServer, Closeable { @Override public void close(final Handler> done) { + ContextImpl context = vertx.getOrCreateContext(); if (!listening) { if (done != null) { - executeCloseDone(actualCtx, done, null); + executeCloseDone(context, done, null); } return; } @@ -262,22 +267,24 @@ public class NetServerImpl implements NetServer, Closeable { synchronized (vertx.sharedNetServers()) { if (actualServer != null) { - actualServer.handlerManager.removeHandler(connectHandler, actualCtx); + actualServer.handlerManager.removeHandler(connectHandler, listenContext); if (actualServer.handlerManager.hasHandlers()) { // The actual server still has handlers so we don't actually close it if (done != null) { - executeCloseDone(actualCtx, done, null); + executeCloseDone(context, done, null); } } else { // No Handlers left so close the actual server // The done handler needs to be executed on the context that calls close, NOT the context // of the actual server - actualServer.actualClose(actualCtx, done); + actualServer.actualClose(context, done); } } } - actualCtx.removeCloseHook(this); + if (creatingContext != null) { + creatingContext.removeCloseHook(this); + } } @Override @@ -306,7 +313,7 @@ public class NetServerImpl implements NetServer, Closeable { private void executeCloseDone(final ContextImpl closeContext, final Handler> done, final Exception e) { if (done != null) { - closeContext.execute(() -> done.handle(new FutureResultImpl<>(e))); + closeContext.execute(() -> done.handle(new FutureResultImpl<>(e)), false); } } @@ -353,7 +360,7 @@ public class NetServerImpl implements NetServer, Closeable { } private void connected(final Channel ch, final HandlerHolder handler) { - handler.context.execute(ch.eventLoop(), () -> doConnected(ch, handler)); + handler.context.execute(() -> doConnected(ch, handler), true); } private void doConnected(Channel ch, HandlerHolder handler) { diff --git a/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetSocketImpl.java b/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetSocketImpl.java index 40bb3c25f..0f2160812 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetSocketImpl.java +++ b/vertx-core/src/main/java/org/vertx/java/core/net/impl/NetSocketImpl.java @@ -300,27 +300,13 @@ public class NetSocketImpl extends ConnectionBase implements NetSocket { sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { @Override public void operationComplete(final Future future) throws Exception { - if (context.isOnCorrectWorker(channel.eventLoop())) { + context.execute(() -> { if (future.isSuccess()) { - try { - vertx.setContext(context); - handler.handle(null); - } catch (Throwable t) { - context.reportException(t); - } + handler.handle(null); } else { context.reportException(future.cause()); } - - } else { - context.execute(() -> { - if (future.isSuccess()) { - handler.handle(null); - } else { - context.reportException(future.cause()); - } - }); - } + }, true); } }); return this; diff --git a/vertx-core/src/main/java/org/vertx/java/core/net/impl/SSLHelper.java b/vertx-core/src/main/java/org/vertx/java/core/net/impl/SSLHelper.java index f4bf9c68d..99af1f913 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/net/impl/SSLHelper.java +++ b/vertx-core/src/main/java/org/vertx/java/core/net/impl/SSLHelper.java @@ -83,7 +83,7 @@ public class SSLHelper { } public void checkSSL(VertxInternal vertx) { - if (ssl) { + if (ssl && sslContext == null) { sslContext = createContext(vertx, keyStorePath, keyStorePassword, trustStorePath, trustStorePassword, trustAll); } } diff --git a/vertx-core/src/main/java/org/vertx/java/core/net/impl/VertxHandler.java b/vertx-core/src/main/java/org/vertx/java/core/net/impl/VertxHandler.java index 23cdfc835..629f6a4d9 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/net/impl/VertxHandler.java +++ b/vertx-core/src/main/java/org/vertx/java/core/net/impl/VertxHandler.java @@ -69,16 +69,7 @@ public abstract class VertxHandler extends ChannelDupl if (conn != null) { conn.setWritable(ctx.channel().isWritable()); ContextImpl context = getContext(conn); - if (context.isOnCorrectWorker(ch.eventLoop())) { - try { - vertx.setContext(context); - conn.handleInterestedOpsChanged(); - } catch (Throwable t) { - context.reportException(t); - } - } else { - context.execute(() -> conn.handleInterestedOpsChanged()); - } + context.execute(conn::handleInterestedOpsChanged, true); } } @@ -89,7 +80,7 @@ public abstract class VertxHandler extends ChannelDupl final C connection = connectionMap.get(ch); if (connection != null) { ContextImpl context = getContext(connection); - context.execute(ch.eventLoop(), () ->{ + context.execute(() ->{ try { if (ch.isOpen()) { ch.close(); @@ -97,7 +88,7 @@ public abstract class VertxHandler extends ChannelDupl } catch (Throwable ignore) { } connection.handleException(t); - }); + }, true); } else { // Ignore - any exceptions before a channel exists will be passed manually via the failed(...) method // Any exceptions after a channel is closed can be ignored @@ -110,7 +101,7 @@ public abstract class VertxHandler extends ChannelDupl final C connection = connectionMap.remove(ch); if (connection != null) { ContextImpl context = getContext(connection); - context.execute(ch.eventLoop(), () -> connection.handleClosed()); + context.execute(() -> connection.handleClosed(), true); } } @@ -120,8 +111,8 @@ public abstract class VertxHandler extends ChannelDupl if (conn != null) { ContextImpl context = getContext(conn); // Only mark end read if its not a WorkerVerticle - if (context.isOnCorrectWorker(ctx.channel().eventLoop())) { - conn.endReadAndFlush(); + if (context.isEventLoopContext()) { + context.execute(conn::endReadAndFlush, true); } } } @@ -137,8 +128,8 @@ public abstract class VertxHandler extends ChannelDupl // Only mark start read if we are on the correct worker. This is needed as while we are in read this may will // delay flushes, which is a problem when we are no on the correct worker. This is mainly a problem as // WorkerVerticle may block. - if (context.isOnCorrectWorker(chctx.channel().eventLoop())) { - connection.startRead(); + if (context.isEventLoopContext()) { + context.execute(connection::startRead, true); } } else { context = null; diff --git a/vertx-core/src/main/java/org/vertx/java/core/net/impl/VertxNetHandler.java b/vertx-core/src/main/java/org/vertx/java/core/net/impl/VertxNetHandler.java index 083af53b0..a4422487d 100644 --- a/vertx-core/src/main/java/org/vertx/java/core/net/impl/VertxNetHandler.java +++ b/vertx-core/src/main/java/org/vertx/java/core/net/impl/VertxNetHandler.java @@ -41,26 +41,7 @@ public class VertxNetHandler extends VertxHandler { final ByteBuf buf = (ByteBuf) msg; Channel ch = chctx.channel(); // We need to do this since it's possible the server is being used from a worker context - if (context.isOnCorrectWorker(ch.eventLoop())) { - try { - vertx.setContext(context); - try { - sock.handleDataReceived(new Buffer(buf)); - } catch (Throwable t) { - context.reportException(t); - } - } catch (Throwable t) { - context.reportException(t); - } - } else { - context.execute(() -> { - try { - sock.handleDataReceived(new Buffer(buf)); - } catch (Throwable t) { - context.reportException(t); - } - }); - } + context.execute(() -> sock.handleDataReceived(new Buffer(buf)), true); } else { // just discard } diff --git a/vertx-core/src/test/java/org/vertx/java/tests/core/DatagramTest.java b/vertx-core/src/test/java/org/vertx/java/tests/core/DatagramTest.java index feb618ae8..c71e1de06 100644 --- a/vertx-core/src/test/java/org/vertx/java/tests/core/DatagramTest.java +++ b/vertx-core/src/test/java/org/vertx/java/tests/core/DatagramTest.java @@ -22,13 +22,9 @@ import org.vertx.java.core.buffer.Buffer; import org.vertx.java.core.datagram.DatagramSocket; import org.vertx.java.core.datagram.DatagramSocketOptions; import org.vertx.java.core.datagram.InternetProtocolFamily; -import org.vertx.java.core.net.impl.SocketDefaults; -import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; -import java.net.SocketException; -import java.util.Enumeration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/vertx-core/src/test/java/org/vertx/java/tests/core/HttpTest.java b/vertx-core/src/test/java/org/vertx/java/tests/core/HttpTest.java index f550b2b02..8ac65b3c7 100644 --- a/vertx-core/src/test/java/org/vertx/java/tests/core/HttpTest.java +++ b/vertx-core/src/test/java/org/vertx/java/tests/core/HttpTest.java @@ -1542,7 +1542,7 @@ public class HttpTest extends HttpTestBase { } @Test - public void testPipeliningOrder() { + public void testPipeliningOrder() throws Exception { client.close(); client = vertx.createHttpClient(new HttpClientOptions().setKeepAlive(true).setPipelining(true).setMaxPoolSize(1)); int requests = 100; @@ -1555,8 +1555,8 @@ public class HttpTest extends HttpTestBase { req.response().setChunked(true); req.bodyHandler(buff -> { assertEquals("This is content " + theCount, buff.toString()); - //We write the response back after a random time to increase the chances of responses written in the - //wrong order if we didn't implement pipelining correctly + // We write the response back after a random time to increase the chances of responses written in the + // wrong order if we didn't implement pipelining correctly vertx.setTimer(1 + (long) (10 * Math.random()), id -> { req.response().headers().set("count", String.valueOf(theCount)); req.response().write(buff); @@ -1565,26 +1565,32 @@ public class HttpTest extends HttpTestBase { }); }); + + CountDownLatch latch = new CountDownLatch(requests); + AtomicInteger cnt = new AtomicInteger(0); + server.listen(onSuccess(s -> { - for (int count = 0; count < requests; count++) { - int theCount = count; - HttpClientRequest req = client.post(new RequestOptions().setPort(DEFAULT_HTTP_PORT).setRequestURI(DEFAULT_TEST_URI), resp -> { - assertEquals(theCount, Integer.parseInt(resp.headers().get("count"))); - resp.bodyHandler(buff -> { - assertEquals("This is content " + theCount, buff.toString()); - if (theCount == requests - 1) { - testComplete(); - } + vertx.setTimer(500, id -> { + for (int count = 0; count < requests; count++) { + int theCount = count; + HttpClientRequest req = client.post(new RequestOptions().setPort(DEFAULT_HTTP_PORT).setRequestURI(DEFAULT_TEST_URI), resp -> { + assertEquals(theCount, Integer.parseInt(resp.headers().get("count"))); + resp.bodyHandler(buff -> { + assertEquals("This is content " + theCount, buff.toString()); + latch.countDown(); + }); }); - }); - req.setChunked(true); - req.headers().set("count", String.valueOf(count)); - req.write("This is content " + count); - req.end(); - } + req.setChunked(true); + req.headers().set("count", String.valueOf(count)); + req.write("This is content " + count); + req.end(); + } + }); + })); - await(); + awaitLatch(latch); + } @Test @@ -1607,6 +1613,7 @@ public class HttpTest extends HttpTestBase { HttpServer[] servers = new HttpServer[numServers]; CountDownLatch startServerLatch = new CountDownLatch(numServers); Set connectedServers = new ConcurrentHashSet<>(); + AtomicInteger res = new AtomicInteger(0); for (int i = 0; i < numServers; i++) { HttpServer server = vertx.createHttpServer(new HttpServerOptions().setHost(DEFAULT_HTTP_HOST).setPort(DEFAULT_HTTP_PORT)); server.requestHandler(req -> { @@ -1623,6 +1630,7 @@ public class HttpTest extends HttpTestBase { awaitLatch(startServerLatch); CountDownLatch reqLatch = new CountDownLatch(requests); + AtomicInteger reqs = new AtomicInteger(0); for (int count = 0; count < requests; count++) { client.getNow(new RequestOptions().setPort(DEFAULT_HTTP_PORT).setRequestURI(DEFAULT_TEST_URI), resp -> { assertEquals(200, resp.statusCode()); diff --git a/vertx-core/src/test/java/org/vertx/java/tests/core/NetTest.java b/vertx-core/src/test/java/org/vertx/java/tests/core/NetTest.java index bff5b17eb..fb85f5c48 100644 --- a/vertx-core/src/test/java/org/vertx/java/tests/core/NetTest.java +++ b/vertx-core/src/test/java/org/vertx/java/tests/core/NetTest.java @@ -24,7 +24,6 @@ import org.vertx.java.core.AsyncResultHandler; import org.vertx.java.core.Handler; import org.vertx.java.core.buffer.Buffer; import org.vertx.java.core.eventbus.Message; -import org.vertx.java.core.http.RequestOptions; import org.vertx.java.core.impl.ConcurrentHashSet; import org.vertx.java.core.net.*; import org.vertx.java.core.net.impl.SocketDefaults; @@ -1030,13 +1029,17 @@ public class NetTest extends VertxTestBase { testComplete(); } }); + // Send some data to the client to trigger the sendfile + sock.write("foo"); }); server.listen(ar -> { assertTrue(ar.succeeded()); client.connect(1234, ar2 -> { assertTrue(ar2.succeeded()); NetSocket sock = ar2.result(); - sock.sendFile(file.getAbsolutePath()); + sock.dataHandler(buf -> { + sock.sendFile(file.getAbsolutePath()); + }); }); }); @@ -1051,7 +1054,9 @@ public class NetTest extends VertxTestBase { Buffer expected = new Buffer(content); Buffer received = new Buffer(); server.connectHandler(sock -> { - sock.sendFile(file.getAbsolutePath()); + sock.dataHandler(buf -> { + sock.sendFile(file.getAbsolutePath()); + }); }); server.listen(ar -> { assertTrue(ar.succeeded()); @@ -1065,6 +1070,7 @@ public class NetTest extends VertxTestBase { testComplete(); } }); + sock.write("foo"); }); });