Work on threading model, plus more threading checks and improvements

This commit is contained in:
purplefox
2014-06-20 15:44:10 +01:00
parent 17d05d0fb8
commit 8bf0bfa4cc
31 changed files with 354 additions and 381 deletions

1
.gitignore vendored
View File

@@ -19,3 +19,4 @@ ScratchTest.java
test-results
test-tmp
*.class
ScratchPad.java

View File

@@ -77,6 +77,9 @@
<io.netty.leakDetectionLevel>PARANOID</io.netty.leakDetectionLevel>
</systemPropertyVariables>
<argLine>-server -Xms128m -Xmx1024m -XX:NewRatio=2</argLine>
<excludes>
<exclude>**/ScratchPad.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>

View File

@@ -15,7 +15,6 @@
*/
package org.vertx.java.core.datagram.impl;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.vertx.java.core.AsyncResult;
@@ -42,17 +41,7 @@ final class DatagramChannelFutureListener<T> implements ChannelFutureListener {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
Channel ch = future.channel();
if (context.isOnCorrectWorker(ch.eventLoop())) {
try {
vertx.setContext(context);
notifyHandler(future);
} catch (Throwable t) {
context.reportException(t);
}
} else {
context.execute(() -> notifyHandler(future));
}
context.execute(() -> notifyHandler(future), true);
}
private void notifyHandler(ChannelFuture future) {

View File

@@ -47,22 +47,7 @@ final class DatagramServerHandler extends VertxHandler<DatagramSocketImpl> {
@SuppressWarnings("unchecked")
@Override
protected void channelRead(final DatagramSocketImpl server, final ContextImpl context, ChannelHandlerContext chctx, final Object msg) throws Exception {
if (context.isOnCorrectWorker(chctx.channel().eventLoop())) {
try {
vertx.setContext(context);
server.handleMessage((org.vertx.java.core.datagram.DatagramPacket) msg);
} catch (Throwable t) {
context.reportException(t);
}
} else {
context.execute(() -> {
try {
server.handleMessage((org.vertx.java.core.datagram.DatagramPacket) msg);
} catch (Throwable t) {
context.reportException(t);
}
});
}
context.execute(() -> server.handleMessage((org.vertx.java.core.datagram.DatagramPacket) msg), true);
}
@Override

View File

@@ -288,15 +288,6 @@ public class DatagramSocketImpl extends ConnectionBase
private void notifyException(final Handler<AsyncResult<DatagramSocket>> handler, final Throwable cause) {
if (context.isOnCorrectWorker(channel().eventLoop())) {
try {
vertx.setContext(context);
handler.handle(new FutureResultImpl<>(cause));
} catch (Throwable t) {
context.reportException(t);
}
} else {
context.execute(() -> handler.handle(new FutureResultImpl<>(cause)));
}
context.execute(() -> handler.handle(new FutureResultImpl<>(cause)), true);
}
}

View File

@@ -211,7 +211,7 @@ public final class DnsClientImpl implements DnsClient {
} catch (final UnknownHostException e) {
// Should never happen as we work with ip addresses as input
// anyway just in case notify the handler
actualCtx.execute(() -> handler.handle(new FutureResultImpl<>(e)));
actualCtx.execute(() -> handler.handle(new FutureResultImpl<>(e)), false);
}
return this;
}
@@ -275,26 +275,13 @@ public final class DnsClientImpl implements DnsClient {
if (r.complete()) {
return;
}
if (actualCtx.isOnCorrectWorker(loop)) {
try {
vertx.setContext(actualCtx);
if (result instanceof Throwable) {
r.setFailure((Throwable) result);
} else {
r.setResult(result);
}
} catch (Throwable t) {
actualCtx.reportException(t);
actualCtx.execute(() -> {
if (result instanceof Throwable) {
r.setFailure((Throwable) result);
} else {
r.setResult(result);
}
} else {
actualCtx.execute(() -> {
if (result instanceof Throwable) {
r.setFailure((Throwable) result);
} else {
r.setResult(result);
}
});
}
}, true);
}
private static final class HandlerAdapter<T> implements Handler<AsyncResult<List<T>>> {

View File

@@ -916,7 +916,7 @@ public class EventBusImpl implements EventBus {
unregisterHandler(msg.address, holder.handler);
}
}
});
}, false);
}
private void checkStarted() {

View File

@@ -341,14 +341,14 @@ public class AsyncFileImpl implements AsyncFile {
context.execute(() -> {
writesOutstanding -= buff.limit();
handler.handle(new FutureResultImpl<Void>().setResult(null));
});
}, false);
}
}
public void failed(Throwable exc, Object attachment) {
if (exc instanceof Exception) {
Exception e = (Exception) exc;
context.execute(() -> handler.handle(new FutureResultImpl<Void>().setResult(null)));
context.execute(() -> handler.handle(new FutureResultImpl<Void>().setResult(null)), false);
} else {
log.error("Error occurred", exc);
}
@@ -369,7 +369,7 @@ public class AsyncFileImpl implements AsyncFile {
buff.flip();
writeBuff.setBytes(offset, buff);
result.setResult(writeBuff).setHandler(handler);
});
}, false);
}
public void completed(Integer bytesRead, Object attachment) {
@@ -388,11 +388,7 @@ public class AsyncFileImpl implements AsyncFile {
}
public void failed(Throwable t, Object attachment) {
context.execute(new Runnable() {
public void run() {
result.setFailure(t).setHandler(handler);
}
});
context.execute(() -> result.setFailure(t).setHandler(handler), false);
}
});
}

View File

@@ -25,7 +25,6 @@ import org.vertx.java.core.streams.ReadStream;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.net.URI;
/**
* Represents a server-side HTTP request.<p>

View File

@@ -149,18 +149,18 @@ class ClientConnection extends ConnectionBase {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
context.execute(ctx.channel().eventLoop(), () -> {
context.execute(() -> {
// if still handshaking this means we not got any response back from the server and so need to notify the client
// about it as otherwise the client would never been notified.
if (handshaking) {
handleException(new WebSocketHandshakeException("Connection closed while handshake in process"));
}
});
}, true);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
context.execute(ctx.channel().eventLoop(), () -> {
context.execute(() -> {
if (handshaker != null && handshaking) {
if (msg instanceof HttpResponse) {
HttpResponse resp = (HttpResponse) msg;
@@ -197,7 +197,7 @@ class ClientConnection extends ConnectionBase {
} else {
buffered.add(msg);
}
});
}, true);
}
private void handleException(WebSocketHandshakeException e) {

View File

@@ -35,7 +35,6 @@ import org.vertx.java.core.impl.VertxInternal;
import org.vertx.java.core.net.NetSocket;
import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator;
import org.vertx.java.core.net.impl.SSLHelper;
import org.vertx.java.core.net.impl.VertxEventLoopGroup;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLHandshakeException;
@@ -50,9 +49,8 @@ public class HttpClientImpl implements HttpClient {
private final VertxInternal vertx;
private final HttpClientOptions options;
private final Map<Channel, ClientConnection> connectionMap = new ConcurrentHashMap<>();
private final ContextImpl actualCtx;
private final ContextImpl creatingContext;
private final ConnectionManager pool;
private Bootstrap bootstrap;
private Handler<Throwable> exceptionHandler;
private final Closeable closeHook;
private boolean closed;
@@ -62,16 +60,18 @@ public class HttpClientImpl implements HttpClient {
this.vertx = vertx;
this.options = new HttpClientOptions(options);
this.sslHelper = new SSLHelper(options);
actualCtx = vertx.getOrCreateContext();
this.creatingContext = vertx.getContext();
closeHook = doneHandler -> {
HttpClientImpl.this.close();
doneHandler.handle(new FutureResultImpl<>((Void)null));
};
actualCtx.addCloseHook(closeHook);
if (creatingContext != null) {
creatingContext.addCloseHook(closeHook);
}
pool = new ConnectionManager(vertx) {
protected void connect(String host, int port, Handler<ClientConnection> connectHandler, Handler<Throwable> connectErrorHandler, ContextImpl context,
ConnectionLifeCycleListener listener) {
internalConnect(port, host, connectHandler, connectErrorHandler, listener);
internalConnect(context, port, host, connectHandler, connectErrorHandler, listener);
}
};
pool.setKeepAlive(options.isKeepAlive());
@@ -89,13 +89,14 @@ public class HttpClientImpl implements HttpClient {
@Override
public HttpClient connectWebsocket(WebSocketConnectOptions wsOptions, Handler<WebSocket> wsConnect) {
checkClosed();
ContextImpl context = vertx.getOrCreateContext();
getConnection(wsOptions.getPort(), wsOptions.getHost(), conn -> {
if (!conn.isClosed()) {
conn.toWebSocket(wsOptions, wsOptions.getMaxWebsocketFrameSize(), wsConnect);
} else {
connectWebsocket(wsOptions, wsConnect);
}
}, exceptionHandler, actualCtx);
}, exceptionHandler, context);
return this;
}
@@ -167,7 +168,9 @@ public class HttpClientImpl implements HttpClient {
for (ClientConnection conn : connectionMap.values()) {
conn.close();
}
actualCtx.removeCloseHook(closeHook);
if (creatingContext != null) {
creatingContext.removeCloseHook(closeHook);
}
closed = true;
}
@@ -221,38 +224,34 @@ public class HttpClientImpl implements HttpClient {
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
}
private void internalConnect(int port, String host, Handler<ClientConnection> connectHandler, Handler<Throwable> connectErrorHandler,
ConnectionLifeCycleListener listener) {
if (bootstrap == null) {
VertxEventLoopGroup pool = new VertxEventLoopGroup();
pool.addWorker(actualCtx.getEventLoop());
bootstrap = new Bootstrap();
bootstrap.group(pool);
bootstrap.channel(NioSocketChannel.class);
sslHelper.checkSSL(vertx);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (options.isSsl()) {
SSLEngine engine = sslHelper.getSslContext().createSSLEngine(host, port);
if (options.isVerifyHost()) {
SSLParameters sslParameters = engine.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
engine.setSSLParameters(sslParameters);
}
engine.setUseClientMode(true); //We are on the client side of the connection
pipeline.addLast("ssl", new SslHandler(engine));
private void internalConnect(ContextImpl context, int port, String host, Handler<ClientConnection> connectHandler, Handler<Throwable> connectErrorHandler,
ConnectionLifeCycleListener listener) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(context.getEventLoop());
bootstrap.channel(NioSocketChannel.class);
sslHelper.checkSSL(vertx);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (options.isSsl()) {
SSLEngine engine = sslHelper.getSslContext().createSSLEngine(host, port);
if (options.isVerifyHost()) {
SSLParameters sslParameters = engine.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
engine.setSSLParameters(sslParameters);
}
pipeline.addLast("codec", new HttpClientCodec(4096, 8192, 8192, false, false));
if (options.isTryUseCompression()) {
pipeline.addLast("inflater", new HttpContentDecompressor(true));
}
pipeline.addLast("handler", new ClientHandler());
engine.setUseClientMode(true); //We are on the client side of the connection
pipeline.addLast("ssl", new SslHandler(engine));
}
});
}
pipeline.addLast("codec", new HttpClientCodec(4096, 8192, 8192, false, false));
if (options.isTryUseCompression()) {
pipeline.addLast("inflater", new HttpContentDecompressor(true));
}
pipeline.addLast("handler", new ClientHandler(context));
}
});
applyConnectionOptions(bootstrap);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
future.addListener(new ChannelFutureListener() {
@@ -267,17 +266,17 @@ public class HttpClientImpl implements HttpClient {
Future<Channel> fut = sslHandler.handshakeFuture();
fut.addListener(future -> {
if (future.isSuccess()) {
connected(port, host, ch, connectHandler, listener);
connected(context, port, host, ch, connectHandler, listener);
} else {
connectionFailed(ch, connectErrorHandler, new SSLHandshakeException("Failed to create SSL connection"),
connectionFailed(context, ch, connectErrorHandler, new SSLHandshakeException("Failed to create SSL connection"),
listener);
}
});
} else {
connected(port, host, ch, connectHandler, listener);
connected(context, port, host, ch, connectHandler, listener);
}
} else {
connectionFailed(ch, connectErrorHandler, channelFuture.cause(), listener);
connectionFailed(context, ch, connectErrorHandler, channelFuture.cause(), listener);
}
}
});
@@ -285,7 +284,8 @@ public class HttpClientImpl implements HttpClient {
private HttpClientRequest doRequest(String method, RequestOptions options, Handler<HttpClientResponse> responseHandler) {
checkClosed();
HttpClientRequest req = new HttpClientRequestImpl(this, method, options, responseHandler, actualCtx);
ContextImpl context = vertx.getOrCreateContext();
HttpClientRequest req = new HttpClientRequestImpl(this, method, options, responseHandler, context);
if (options.getHeaders() != null) {
req.headers().set(options.getHeaders());
}
@@ -298,13 +298,13 @@ public class HttpClientImpl implements HttpClient {
}
}
private void connected(int port, String host, Channel ch, Handler<ClientConnection> connectHandler, ConnectionLifeCycleListener listener) {
actualCtx.execute(ch.eventLoop(), () -> createConn(port, host, ch, connectHandler, listener));
private void connected(ContextImpl context, int port, String host, Channel ch, Handler<ClientConnection> connectHandler, ConnectionLifeCycleListener listener) {
context.execute(() -> createConn(context, port, host, ch, connectHandler, listener), true);
}
private void createConn(int port, String host, Channel ch, Handler<ClientConnection> connectHandler, ConnectionLifeCycleListener listener) {
private void createConn(ContextImpl context, int port, String host, Channel ch, Handler<ClientConnection> connectHandler, ConnectionLifeCycleListener listener) {
ClientConnection conn = new ClientConnection(vertx, HttpClientImpl.this, ch,
options.isSsl(), host, port, actualCtx, listener);
options.isSsl(), host, port, context, listener);
conn.closeHandler(v -> {
// The connection has been closed - tell the pool about it, this allows the pool to create more
// connections. Note the pool doesn't actually remove the connection, when the next person to get a connection
@@ -315,14 +315,14 @@ public class HttpClientImpl implements HttpClient {
connectHandler.handle(conn);
}
private void connectionFailed(Channel ch, Handler<Throwable> connectionExceptionHandler,
private void connectionFailed(ContextImpl context, Channel ch, Handler<Throwable> connectionExceptionHandler,
Throwable t, ConnectionLifeCycleListener listener) {
// If no specific exception handler is provided, fall back to the HttpClient's exception handler.
// If that doesn't exist just log it
Handler<Throwable> exHandler =
connectionExceptionHandler == null ? (exceptionHandler == null ? actualCtx::reportException : exceptionHandler ): connectionExceptionHandler;
connectionExceptionHandler == null ? (exceptionHandler == null ? context::reportException : exceptionHandler ): connectionExceptionHandler;
actualCtx.execute(ch.eventLoop(), () -> {
context.execute(() -> {
listener.connectionClosed(null);
try {
ch.close();
@@ -331,9 +331,9 @@ public class HttpClientImpl implements HttpClient {
if (exHandler != null) {
exHandler.handle(t);
} else {
actualCtx.reportException(t);
context.reportException(t);
}
});
}, true);
}
private Handler<HttpClientResponse> connectHandler(Handler<HttpClientResponse> responseHandler) {
@@ -384,7 +384,7 @@ public class HttpClientImpl implements HttpClient {
public NetSocket netSocket() {
if (!resumed) {
resumed = true;
vertx.getContext().execute(socket::resume); // resume the socket now as the user had the chance to register a dataHandler
vertx.getContext().execute(socket::resume, false); // resume the socket now as the user had the chance to register a dataHandler
}
return socket;
}
@@ -428,14 +428,16 @@ public class HttpClientImpl implements HttpClient {
private class ClientHandler extends VertxHttpHandler<ClientConnection> {
private boolean closeFrameSent;
private ContextImpl context;
public ClientHandler() {
public ClientHandler(ContextImpl context) {
super(vertx, HttpClientImpl.this.connectionMap);
this.context = context;
}
@Override
protected ContextImpl getContext(ClientConnection connection) {
return actualCtx;
return context;
}
@Override

View File

@@ -72,7 +72,7 @@ public class HttpServerImpl implements HttpServer, Closeable {
private final HttpServerOptions options;
private final VertxInternal vertx;
private final SSLHelper sslHelper;
private final ContextImpl actualCtx;
private final ContextImpl creatingContext;
private final Map<Channel, ServerConnection> connectionMap = new ConcurrentHashMap<>();
private final VertxEventLoopGroup availableWorkers = new VertxEventLoopGroup();
private Handler<HttpServerRequest> requestHandler;
@@ -86,12 +86,15 @@ public class HttpServerImpl implements HttpServer, Closeable {
private HttpServerImpl actualServer;
private HandlerManager<HttpServerRequest> reqHandlerManager = new HandlerManager<>(availableWorkers);
private HandlerManager<ServerWebSocket> wsHandlerManager = new HandlerManager<>(availableWorkers);
private ContextImpl listenContext;
public HttpServerImpl(VertxInternal vertx, HttpServerOptions options) {
this.options = new HttpServerOptions(options);
this.vertx = vertx;
actualCtx = vertx.getOrCreateContext();
actualCtx.addCloseHook(this);
this.creatingContext = vertx.getContext();
if (creatingContext != null) {
creatingContext.addCloseHook(this);
}
sslHelper = new SSLHelper(options);
}
@@ -148,14 +151,14 @@ public class HttpServerImpl implements HttpServer, Closeable {
bootstrap.option(ChannelOption.SO_BACKLOG, options.getAcceptBacklog());
}
public HttpServer listen(final Handler<AsyncResult<HttpServer>> listenHandler) {
if (requestHandler == null && wsHandler == null) {
throw new IllegalStateException("Set request or websocket handler first");
}
if (listening) {
throw new IllegalStateException("Listen already called");
throw new IllegalStateException("Already listening");
}
listenContext = vertx.getOrCreateContext();
listening = true;
synchronized (vertx.sharedHttpServers()) {
@@ -208,7 +211,7 @@ public class HttpServerImpl implements HttpServer, Closeable {
}
});
addHandlers(this);
addHandlers(this, listenContext);
try {
bindFuture = bootstrap.bind(new InetSocketAddress(InetAddress.getByName(options.getHost()), options.getPort()));
Channel serverChannel = bindFuture.channel();
@@ -227,7 +230,7 @@ public class HttpServerImpl implements HttpServer, Closeable {
vertx.runOnContext(v -> listenHandler.handle(new FutureResultImpl<>(t)));
} else {
// No handler - log so user can see failure
actualCtx.reportException(t);
listenContext.reportException(t);
}
listening = false;
return this;
@@ -237,7 +240,7 @@ public class HttpServerImpl implements HttpServer, Closeable {
} else {
// Server already exists with that host/port - we will use that
actualServer = shared;
addHandlers(actualServer);
addHandlers(actualServer, listenContext);
}
actualServer.bindFuture.addListener(new ChannelFutureListener() {
@Override
@@ -250,11 +253,11 @@ public class HttpServerImpl implements HttpServer, Closeable {
res = new FutureResultImpl<>(future.cause());
listening = false;
}
actualCtx.execute(future.channel().eventLoop(), () -> listenHandler.handle(res));
listenContext.execute(() -> listenHandler.handle(res), true);
} else if (!future.isSuccess()) {
listening = false;
// No handler - log so user can see failure
actualCtx.reportException(future.cause());
listenContext.reportException(future.cause());
}
}
});
@@ -262,12 +265,12 @@ public class HttpServerImpl implements HttpServer, Closeable {
return this;
}
private void addHandlers(HttpServerImpl server) {
private void addHandlers(HttpServerImpl server, ContextImpl context) {
if (requestHandler != null) {
server.reqHandlerManager.addHandler(requestHandler, actualCtx);
server.reqHandlerManager.addHandler(requestHandler, context);
}
if (wsHandler != null) {
server.wsHandlerManager.addHandler(wsHandler, actualCtx);
server.wsHandlerManager.addHandler(wsHandler, context);
}
}
@@ -278,8 +281,9 @@ public class HttpServerImpl implements HttpServer, Closeable {
@Override
public void close(final Handler<AsyncResult<Void>> done) {
ContextImpl context = vertx.getOrCreateContext();
if (!listening) {
executeCloseDone(actualCtx, done, null);
executeCloseDone(context, done, null);
return;
}
listening = false;
@@ -289,28 +293,30 @@ public class HttpServerImpl implements HttpServer, Closeable {
if (actualServer != null) {
if (requestHandler != null) {
actualServer.reqHandlerManager.removeHandler(requestHandler, actualCtx);
actualServer.reqHandlerManager.removeHandler(requestHandler, listenContext);
}
if (wsHandler != null) {
actualServer.wsHandlerManager.removeHandler(wsHandler, actualCtx);
actualServer.wsHandlerManager.removeHandler(wsHandler, listenContext);
}
if (actualServer.reqHandlerManager.hasHandlers() || actualServer.wsHandlerManager.hasHandlers()) {
// The actual server still has handlers so we don't actually close it
if (done != null) {
executeCloseDone(actualCtx, done, null);
executeCloseDone(context, done, null);
}
} else {
// No Handlers left so close the actual server
// The done handler needs to be executed on the context that calls close, NOT the context
// of the actual server
actualServer.actualClose(actualCtx, done);
actualServer.actualClose(context, done);
}
}
}
requestHandler = null;
wsHandler = null;
actualCtx.removeCloseHook(this);
if (creatingContext != null) {
creatingContext.removeCloseHook(this);
}
}
SSLHelper getSslHelper() {
@@ -351,7 +357,7 @@ public class HttpServerImpl implements HttpServer, Closeable {
private void executeCloseDone(final ContextImpl closeContext, final Handler<AsyncResult<Void>> done, final Exception e) {
if (done != null) {
closeContext.execute(() -> done.handle(new FutureResultImpl<>(e)));
closeContext.execute(() -> done.handle(new FutureResultImpl<>(e)), false);
}
}

View File

@@ -56,26 +56,12 @@ public abstract class VertxHttpHandler<C extends ConnectionBase> extends VertxHa
// we are reading from the channel
Channel ch = chctx.channel();
// We need to do this since it's possible the server is being used from a worker context
if (context.isOnCorrectWorker(ch.eventLoop())) {
try {
vertx.setContext(context);
doMessageReceived(connection, chctx, msg);
} catch (Throwable t) {
context.reportException(t);
}
} else {
context.execute(() -> {
try {
doMessageReceived(connection, chctx, msg);
} catch (Throwable t) {
context.reportException(t);
}
});
}
context.execute(() -> doMessageReceived(connection, chctx, msg), true);
} else {
try {
doMessageReceived(connection, chctx, msg);
} catch (Throwable t) {
} catch (Throwable t) {
chctx.pipeline().fireExceptionCaught(t);
}
}

View File

@@ -51,7 +51,7 @@ public abstract class BlockingAction<T> {
res.setFailure(e);
}
if (handler != null) {
context.execute(() -> res.setHandler(handler));
context.execute(() -> res.setHandler(handler), false);
}
});
}

View File

@@ -126,31 +126,22 @@ public abstract class ContextImpl implements Context {
}
}
public abstract void execute(Runnable handler);
public abstract void doExecute(ContextTask task);
public abstract boolean isOnCorrectWorker(EventLoop worker);
public abstract boolean isEventLoopContext();
// FIXME - make sure this is right and get rid of worker param
public void execute(EventLoop worker, Runnable handler) {
boolean correctThread;
Thread thread = Thread.currentThread();
if (thread instanceof VertxThread) {
VertxThread vthread = (VertxThread)thread;
Context ctx = vthread.getContext();
correctThread = ctx == this;
public void execute(ContextTask task, boolean expectRightThread) {
if (isOnCorrectContextThread(expectRightThread)) {
wrapTask(task, false, true).run();
} else {
correctThread = false;
}
if (correctThread) {
wrapTask(handler).run();
} else {
//System.out.println("Wrong thread, will execute on correct one: " + Thread.currentThread());
execute(handler);
doExecute(task);
}
}
protected abstract boolean isOnCorrectContextThread(boolean expectRightThread);
public void runOnContext(final Handler<Void> task) {
execute(() -> task.handle(null));
execute(() -> task.handle(null), false);
}
public EventLoop getEventLoop() {
@@ -159,8 +150,8 @@ public abstract class ContextImpl implements Context {
// This executes the task in the worker pool using the ordered executor of the context
// It's used e.g. from BlockingActions
protected void executeOnOrderedWorkerExec(final Runnable task) {
orderedBgExec.execute(wrapTask(task));
protected void executeOnOrderedWorkerExec(ContextTask task) {
orderedBgExec.execute(wrapTask(task, false, false));
}
public void close() {
@@ -172,26 +163,49 @@ public abstract class ContextImpl implements Context {
vertx.setContext(null);
}
protected Runnable wrapTask(final Runnable task) {
return () -> {
Thread currentThread = Thread.currentThread();
String threadName = currentThread.getName();
try {
vertx.setContext(ContextImpl.this);
task.run();
} catch (Throwable t) {
reportException(t);
} finally {
if (!threadName.equals(currentThread.getName())) {
currentThread.setName(threadName);
protected void setThread(Thread thread) {
}
protected Runnable wrapTask(ContextTask task, boolean checkThreadName, boolean setThread) {
checkThreadName = true;
if (checkThreadName) {
return () -> {
Thread currentThread = Thread.currentThread();
if (setThread) {
setThread(currentThread);
}
}
if (closed) {
// We allow tasks to be run after the context is closed but we make sure we unset the context afterwards
// to avoid any leaks
unsetContext();
}
};
String threadName = currentThread.getName();
try {
vertx.setContext(ContextImpl.this);
task.run();
} catch (Throwable t) {
reportException(t);
} finally {
if (!threadName.equals(currentThread.getName())) {
currentThread.setName(threadName);
}
}
if (closed) {
// We allow tasks to be run after the context is closed but we make sure we unset the context afterwards
// to avoid any leaks
unsetContext();
}
};
} else {
return () -> {
try {
vertx.setContext(ContextImpl.this);
task.run();
} catch (Throwable t) {
reportException(t);
}
if (closed) {
// We allow tasks to be run after the context is closed but we make sure we unset the context afterwards
// to avoid any leaks
unsetContext();
}
};
}
}
}

View File

@@ -0,0 +1,10 @@
package org.vertx.java.core.impl;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
@FunctionalInterface
public interface ContextTask {
public void run() throws Exception;
}

View File

@@ -16,7 +16,6 @@
package org.vertx.java.core.impl;
import io.netty.channel.EventLoop;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
@@ -29,16 +28,46 @@ public class EventLoopContext extends ContextImpl {
private static final Logger log = LoggerFactory.getLogger(EventLoopContext.class);
private Thread contextThread;
public EventLoopContext(VertxInternal vertx, Executor bgExec) {
super(vertx, bgExec);
}
public void execute(Runnable task) {
getEventLoop().execute(wrapTask(task));
public void doExecute(ContextTask task) {
getEventLoop().execute(wrapTask(task, false, true));
}
public boolean isOnCorrectWorker(EventLoop worker) {
return getEventLoop() == worker;
@Override
public boolean isEventLoopContext() {
return true;
}
@Override
protected void setThread(Thread thread) {
// Sanity check - make sure Netty is really delivering events on the correct thread
if (this.contextThread == null) {
this.contextThread = thread;
} else if (this.contextThread != thread) {
//log.warn("Uh oh! Event loop context executing with wrong thread! Expected " + this.thread + " got " + thread);
throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + this.contextThread + " got " + thread);
}
}
@Override
protected boolean isOnCorrectContextThread(boolean expectRightThread) {
Thread current = Thread.currentThread();
boolean correct = current == contextThread;
if (expectRightThread) {
if (!(current instanceof VertxThread)) {
log.warn("Expected to be on Vert.x thread, but actually on: " + current);
} else if (!correct && contextThread != null) {
log.warn("Event delivered on unexpected thread " + current + " expected: " + contextThread);
new Exception().printStackTrace();
}
}
return correct;
}
}

View File

@@ -27,7 +27,12 @@ public class MultiThreadedWorkerContext extends WorkerContext {
this.bgExec = bgExec;
}
public void execute(Runnable task) {
bgExec.execute(wrapTask(task));
public void execute(ContextTask task) {
bgExec.execute(wrapTask(task, false, true));
}
@Override
protected boolean isOnCorrectContextThread(boolean expectRightThread) {
return false;
}
}

View File

@@ -95,7 +95,7 @@ public class VertxImpl implements VertxInternal {
this(0, hostname, null);
}
public VertxImpl(int port, String hostname, final Handler<AsyncResult<Vertx>> resultHandler) {
public VertxImpl(int port, String hostname, Handler<AsyncResult<Vertx>> resultHandler) {
ClusterManagerFactory factory;
String clusterManagerFactoryClassName = System.getProperty("vertx.clusterManagerFactory");
if (clusterManagerFactoryClassName != null) {
@@ -115,7 +115,7 @@ public class VertxImpl implements VertxInternal {
}
this.clusterManager = factory.createClusterManager(this);
this.clusterManager.join();
final Vertx inst = this;
Vertx inst = this;
this.eventBus = new EventBusImpl(this, port, hostname, clusterManager, res -> {
if (resultHandler != null) {
if (res.succeeded()) {
@@ -189,15 +189,15 @@ public class VertxImpl implements VertxInternal {
return false;
}
public long setPeriodic(long delay, final Handler<Long> handler) {
public long setPeriodic(long delay, Handler<Long> handler) {
return scheduleTimeout(getOrCreateContext(), handler, delay, true);
}
public long setTimer(long delay, final Handler<Long> handler) {
public long setTimer(long delay, Handler<Long> handler) {
return scheduleTimeout(getOrCreateContext(), handler, delay, false);
}
public void runOnContext(final Handler<Void> task) {
public void runOnContext(Handler<Void> task) {
ContextImpl context = getOrCreateContext();
context.runOnContext(task);
}
@@ -260,22 +260,14 @@ public class VertxImpl implements VertxInternal {
return new DnsClientImpl(this, dnsServers);
}
private long scheduleTimeout(final ContextImpl context, final Handler<Long> handler, long delay, boolean periodic) {
private long scheduleTimeout(ContextImpl context, Handler<Long> handler, long delay, boolean periodic) {
if (delay < 1) {
throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms");
}
long timerId = timeoutCounter.getAndIncrement();
final InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, context);
final Runnable wrapped = context.wrapTask(task);
final Runnable toRun;
final EventLoop el = context.getEventLoop();
if (context instanceof EventLoopContext) {
toRun = wrapped;
} else {
// On worker context
toRun = () -> context.execute(wrapped);
}
InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, context);
Runnable toRun = () -> context.execute(task, false);
EventLoop el = context.getEventLoop();
Future<?> future;
if (periodic) {
future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS);
@@ -440,8 +432,8 @@ public class VertxImpl implements VertxInternal {
}
@Override
public <T> void executeBlocking(final Action<T> action, final Handler<AsyncResult<T>> resultHandler) {
final ContextImpl context = getOrCreateContext();
public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) {
ContextImpl context = getOrCreateContext();
context.executeOnOrderedWorkerExec(() -> {
FutureResultImpl<T> res = new FutureResultImpl<>();
try {
@@ -451,7 +443,7 @@ public class VertxImpl implements VertxInternal {
res.setFailure(e);
}
if (resultHandler != null) {
context.execute(() -> res.setHandler(resultHandler));
context.execute(() -> res.setHandler(resultHandler), false);
}
});
}
@@ -460,7 +452,7 @@ public class VertxImpl implements VertxInternal {
return clusterManager;
}
private class InternalTimerHandler implements Runnable, Closeable {
private class InternalTimerHandler implements ContextTask, Closeable {
final Handler<Long> handler;
final boolean periodic;
final long timerID;
@@ -480,7 +472,7 @@ public class VertxImpl implements VertxInternal {
this.periodic = periodic;
}
public void run() {
public void run() throws Exception {
if (!cancelled) {
try {
handler.handle(timerID);

View File

@@ -16,8 +16,6 @@
package org.vertx.java.core.impl;
import io.netty.channel.EventLoop;
import java.util.concurrent.Executor;
/**
@@ -29,11 +27,18 @@ public class WorkerContext extends ContextImpl {
super(vertx, orderedBgExec);
}
public void execute(Runnable task) {
executeOnOrderedWorkerExec(wrapTask(task));
public void doExecute(ContextTask task) {
executeOnOrderedWorkerExec(task);
}
public boolean isOnCorrectWorker(EventLoop worker) {
@Override
public boolean isEventLoopContext() {
return false;
}
@Override
protected boolean isOnCorrectContextThread(boolean expectRightThread) {
return false;
}
}

View File

@@ -16,8 +16,6 @@
package org.vertx.java.core.net;
import org.vertx.java.core.Handler;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/

View File

@@ -154,7 +154,7 @@ public abstract class ConnectionBase {
} else {
doneHandler.handle(new FutureResultImpl<>(channelFuture.cause()));
}
});
}, true);
} else if (!channelFuture.isSuccess()) {
handleException(channelFuture.cause());
}

View File

@@ -48,22 +48,23 @@ public class NetClientImpl implements NetClient {
private final VertxInternal vertx;
private final NetClientOptions options;
private final ContextImpl actualCtx;
private final SSLHelper sslHelper;
private final Map<Channel, NetSocketImpl> socketMap = new ConcurrentHashMap<>();
private final Closeable closeHook;
private Bootstrap bootstrap;
private final ContextImpl creatingContext;
public NetClientImpl(VertxInternal vertx, NetClientOptions options) {
this.vertx = vertx;
this.options = new NetClientOptions(options);
this.sslHelper = new SSLHelper(options);
actualCtx = vertx.getOrCreateContext();
this.closeHook = doneHandler -> {
NetClientImpl.this.close();
doneHandler.handle(new FutureResultImpl<>((Void)null));
};
actualCtx.addCloseHook(closeHook);
creatingContext = vertx.getContext();
if (creatingContext != null) {
creatingContext.addCloseHook(closeHook);
}
}
@Override
@@ -83,7 +84,9 @@ public class NetClientImpl implements NetClient {
for (NetSocket sock : socketMap.values()) {
sock.close();
}
actualCtx.removeCloseHook(closeHook);
if (creatingContext != null) {
creatingContext.removeCloseHook(closeHook);
}
}
private void applyConnectionOptions(Bootstrap bootstrap) {
@@ -106,28 +109,27 @@ public class NetClientImpl implements NetClient {
private void connect(final int port, final String host, final Handler<AsyncResult<NetSocket>> connectHandler,
final int remainingAttempts) {
if (bootstrap == null) {
sslHelper.checkSSL(vertx);
bootstrap = new Bootstrap();
bootstrap.group(actualCtx.getEventLoop());
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslHelper.isSSL()) {
SslHandler sslHandler = sslHelper.createSslHandler(vertx, true);
pipeline.addLast("ssl", sslHandler);
}
if (sslHelper.isSSL()) {
// only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used.
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support
}
pipeline.addLast("handler", new VertxNetHandler(vertx, socketMap));
ContextImpl context = vertx.getOrCreateContext();
sslHelper.checkSSL(vertx);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(context.getEventLoop());
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslHelper.isSSL()) {
SslHandler sslHandler = sslHelper.createSslHandler(vertx, true);
pipeline.addLast("ssl", sslHandler);
}
});
}
if (sslHelper.isSSL()) {
// only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used.
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support
}
pipeline.addLast("handler", new VertxNetHandler(vertx, socketMap));
}
});
applyConnectionOptions(bootstrap);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
future.addListener(new ChannelFutureListener() {
@@ -146,48 +148,46 @@ public class NetClientImpl implements NetClient {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
connected(ch, connectHandler);
connected(context, ch, connectHandler);
} else {
failed(ch, future.cause(), connectHandler);
failed(context, ch, future.cause(), connectHandler);
}
}
});
} else {
connected(ch, connectHandler);
connected(context, ch, connectHandler);
}
} else {
if (remainingAttempts > 0 || remainingAttempts == -1) {
actualCtx.execute(ch.eventLoop(), () -> {
context.execute(() -> {
log.debug("Failed to create connection. Will retry in " + options.getReconnectInterval() + " milliseconds");
//Set a timer to retry connection
vertx.setTimer(options.getReconnectInterval(), new Handler<Long>() {
public void handle(Long timerID) {
connect(port, host, connectHandler, remainingAttempts == -1 ? remainingAttempts : remainingAttempts
- 1);
}
vertx.setTimer(options.getReconnectInterval(), tid -> {
connect(port, host, connectHandler, remainingAttempts == -1 ? remainingAttempts : remainingAttempts
- 1);
});
});
}, true);
} else {
failed(ch, channelFuture.cause(), connectHandler);
failed(context, ch, channelFuture.cause(), connectHandler);
}
}
}
});
}
private void connected(final Channel ch, final Handler<AsyncResult<NetSocket>> connectHandler) {
actualCtx.execute(ch.eventLoop(), () -> doConnected(ch, connectHandler));
private void connected(ContextImpl context, Channel ch, Handler<AsyncResult<NetSocket>> connectHandler) {
context.execute(() -> doConnected(context, ch, connectHandler), true);
}
private void doConnected(Channel ch, final Handler<AsyncResult<NetSocket>> connectHandler) {
NetSocketImpl sock = new NetSocketImpl(vertx, ch, actualCtx, sslHelper, true);
private void doConnected(ContextImpl context, Channel ch, final Handler<AsyncResult<NetSocket>> connectHandler) {
NetSocketImpl sock = new NetSocketImpl(vertx, ch, context, sslHelper, true);
socketMap.put(ch, sock);
connectHandler.handle(new FutureResultImpl<NetSocket>(sock));
}
private void failed(Channel ch, final Throwable t, final Handler<AsyncResult<NetSocket>> connectHandler) {
private void failed(ContextImpl context, Channel ch, Throwable t, Handler<AsyncResult<NetSocket>> connectHandler) {
ch.close();
actualCtx.execute(ch.eventLoop(), () -> doFailed(connectHandler, t));
context.execute(() -> doFailed(connectHandler, t), true);
}
private static void doFailed(Handler<AsyncResult<NetSocket>> connectHandler, Throwable t) {

View File

@@ -56,7 +56,7 @@ public class NetServerImpl implements NetServer, Closeable {
private final VertxInternal vertx;
private final NetServerOptions options;
private final ContextImpl actualCtx;
private final ContextImpl creatingContext;
private final SSLHelper sslHelper;
private final Map<Channel, NetSocketImpl> socketMap = new ConcurrentHashMap<>();
private final VertxEventLoopGroup availableWorkers = new VertxEventLoopGroup();
@@ -70,13 +70,16 @@ public class NetServerImpl implements NetServer, Closeable {
private int actualPort;
private Queue<Runnable> bindListeners = new ConcurrentLinkedQueue<>();
private boolean listenersRun;
private ContextImpl listenContext;
public NetServerImpl(VertxInternal vertx, NetServerOptions options) {
this.vertx = vertx;
this.options = new NetServerOptions(options);
this.sslHelper = new SSLHelper(options);
actualCtx = vertx.getOrCreateContext();
actualCtx.addCloseHook(this);
this.creatingContext = vertx.getContext();
if (creatingContext != null) {
creatingContext.addCloseHook(this);
}
}
@Override
@@ -108,6 +111,8 @@ public class NetServerImpl implements NetServer, Closeable {
}
listening = true;
listenContext = vertx.getOrCreateContext();
synchronized (vertx.sharedNetServers()) {
this.actualPort = options.getPort(); // Will be updated on bind for a wildcard port
id = new ServerID(options.getPort(), options.getHost());
@@ -139,7 +144,7 @@ public class NetServerImpl implements NetServer, Closeable {
applyConnectionOptions(bootstrap);
if (connectHandler != null) {
handlerManager.addHandler(connectHandler, actualCtx);
handlerManager.addHandler(connectHandler, listenContext);
}
try {
@@ -168,7 +173,7 @@ public class NetServerImpl implements NetServer, Closeable {
vertx.runOnContext(v -> listenHandler.handle(new FutureResultImpl<>(t)));
} else {
// No handler - log so user can see failure
actualCtx.reportException(t);
listenContext.reportException(t);
}
listening = false;
return this;
@@ -182,8 +187,7 @@ public class NetServerImpl implements NetServer, Closeable {
actualServer = shared;
this.actualPort = shared.actualPort();
if (connectHandler != null) {
// Share the event loop thread to also serve the NetServer's network traffic.
actualServer.handlerManager.addHandler(connectHandler, actualCtx);
actualServer.handlerManager.addHandler(connectHandler, listenContext);
}
}
@@ -197,10 +201,10 @@ public class NetServerImpl implements NetServer, Closeable {
listening = false;
res = new FutureResultImpl<>(actualServer.bindFuture.cause());
}
actualCtx.execute(actualServer.bindFuture.channel().eventLoop(), () -> listenHandler.handle(res));
listenContext.execute(() -> listenHandler.handle(res), true);
} else if (!actualServer.bindFuture.isSuccess()) {
// No handler - log so user can see failure
actualCtx.reportException(actualServer.bindFuture.cause());
listenContext.reportException(actualServer.bindFuture.cause());
listening = false;
}
});
@@ -252,9 +256,10 @@ public class NetServerImpl implements NetServer, Closeable {
@Override
public void close(final Handler<AsyncResult<Void>> done) {
ContextImpl context = vertx.getOrCreateContext();
if (!listening) {
if (done != null) {
executeCloseDone(actualCtx, done, null);
executeCloseDone(context, done, null);
}
return;
}
@@ -262,22 +267,24 @@ public class NetServerImpl implements NetServer, Closeable {
synchronized (vertx.sharedNetServers()) {
if (actualServer != null) {
actualServer.handlerManager.removeHandler(connectHandler, actualCtx);
actualServer.handlerManager.removeHandler(connectHandler, listenContext);
if (actualServer.handlerManager.hasHandlers()) {
// The actual server still has handlers so we don't actually close it
if (done != null) {
executeCloseDone(actualCtx, done, null);
executeCloseDone(context, done, null);
}
} else {
// No Handlers left so close the actual server
// The done handler needs to be executed on the context that calls close, NOT the context
// of the actual server
actualServer.actualClose(actualCtx, done);
actualServer.actualClose(context, done);
}
}
}
actualCtx.removeCloseHook(this);
if (creatingContext != null) {
creatingContext.removeCloseHook(this);
}
}
@Override
@@ -306,7 +313,7 @@ public class NetServerImpl implements NetServer, Closeable {
private void executeCloseDone(final ContextImpl closeContext, final Handler<AsyncResult<Void>> done, final Exception e) {
if (done != null) {
closeContext.execute(() -> done.handle(new FutureResultImpl<>(e)));
closeContext.execute(() -> done.handle(new FutureResultImpl<>(e)), false);
}
}
@@ -353,7 +360,7 @@ public class NetServerImpl implements NetServer, Closeable {
}
private void connected(final Channel ch, final HandlerHolder<NetSocket> handler) {
handler.context.execute(ch.eventLoop(), () -> doConnected(ch, handler));
handler.context.execute(() -> doConnected(ch, handler), true);
}
private void doConnected(Channel ch, HandlerHolder<NetSocket> handler) {

View File

@@ -300,27 +300,13 @@ public class NetSocketImpl extends ConnectionBase implements NetSocket {
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(final Future<Channel> future) throws Exception {
if (context.isOnCorrectWorker(channel.eventLoop())) {
context.execute(() -> {
if (future.isSuccess()) {
try {
vertx.setContext(context);
handler.handle(null);
} catch (Throwable t) {
context.reportException(t);
}
handler.handle(null);
} else {
context.reportException(future.cause());
}
} else {
context.execute(() -> {
if (future.isSuccess()) {
handler.handle(null);
} else {
context.reportException(future.cause());
}
});
}
}, true);
}
});
return this;

View File

@@ -83,7 +83,7 @@ public class SSLHelper {
}
public void checkSSL(VertxInternal vertx) {
if (ssl) {
if (ssl && sslContext == null) {
sslContext = createContext(vertx, keyStorePath, keyStorePassword, trustStorePath, trustStorePassword, trustAll);
}
}

View File

@@ -69,16 +69,7 @@ public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDupl
if (conn != null) {
conn.setWritable(ctx.channel().isWritable());
ContextImpl context = getContext(conn);
if (context.isOnCorrectWorker(ch.eventLoop())) {
try {
vertx.setContext(context);
conn.handleInterestedOpsChanged();
} catch (Throwable t) {
context.reportException(t);
}
} else {
context.execute(() -> conn.handleInterestedOpsChanged());
}
context.execute(conn::handleInterestedOpsChanged, true);
}
}
@@ -89,7 +80,7 @@ public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDupl
final C connection = connectionMap.get(ch);
if (connection != null) {
ContextImpl context = getContext(connection);
context.execute(ch.eventLoop(), () ->{
context.execute(() ->{
try {
if (ch.isOpen()) {
ch.close();
@@ -97,7 +88,7 @@ public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDupl
} catch (Throwable ignore) {
}
connection.handleException(t);
});
}, true);
} else {
// Ignore - any exceptions before a channel exists will be passed manually via the failed(...) method
// Any exceptions after a channel is closed can be ignored
@@ -110,7 +101,7 @@ public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDupl
final C connection = connectionMap.remove(ch);
if (connection != null) {
ContextImpl context = getContext(connection);
context.execute(ch.eventLoop(), () -> connection.handleClosed());
context.execute(() -> connection.handleClosed(), true);
}
}
@@ -120,8 +111,8 @@ public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDupl
if (conn != null) {
ContextImpl context = getContext(conn);
// Only mark end read if its not a WorkerVerticle
if (context.isOnCorrectWorker(ctx.channel().eventLoop())) {
conn.endReadAndFlush();
if (context.isEventLoopContext()) {
context.execute(conn::endReadAndFlush, true);
}
}
}
@@ -137,8 +128,8 @@ public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDupl
// Only mark start read if we are on the correct worker. This is needed as while we are in read this may will
// delay flushes, which is a problem when we are no on the correct worker. This is mainly a problem as
// WorkerVerticle may block.
if (context.isOnCorrectWorker(chctx.channel().eventLoop())) {
connection.startRead();
if (context.isEventLoopContext()) {
context.execute(connection::startRead, true);
}
} else {
context = null;

View File

@@ -41,26 +41,7 @@ public class VertxNetHandler extends VertxHandler<NetSocketImpl> {
final ByteBuf buf = (ByteBuf) msg;
Channel ch = chctx.channel();
// We need to do this since it's possible the server is being used from a worker context
if (context.isOnCorrectWorker(ch.eventLoop())) {
try {
vertx.setContext(context);
try {
sock.handleDataReceived(new Buffer(buf));
} catch (Throwable t) {
context.reportException(t);
}
} catch (Throwable t) {
context.reportException(t);
}
} else {
context.execute(() -> {
try {
sock.handleDataReceived(new Buffer(buf));
} catch (Throwable t) {
context.reportException(t);
}
});
}
context.execute(() -> sock.handleDataReceived(new Buffer(buf)), true);
} else {
// just discard
}

View File

@@ -22,13 +22,9 @@ import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.datagram.DatagramSocket;
import org.vertx.java.core.datagram.DatagramSocketOptions;
import org.vertx.java.core.datagram.InternetProtocolFamily;
import org.vertx.java.core.net.impl.SocketDefaults;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

View File

@@ -1542,7 +1542,7 @@ public class HttpTest extends HttpTestBase {
}
@Test
public void testPipeliningOrder() {
public void testPipeliningOrder() throws Exception {
client.close();
client = vertx.createHttpClient(new HttpClientOptions().setKeepAlive(true).setPipelining(true).setMaxPoolSize(1));
int requests = 100;
@@ -1555,8 +1555,8 @@ public class HttpTest extends HttpTestBase {
req.response().setChunked(true);
req.bodyHandler(buff -> {
assertEquals("This is content " + theCount, buff.toString());
//We write the response back after a random time to increase the chances of responses written in the
//wrong order if we didn't implement pipelining correctly
// We write the response back after a random time to increase the chances of responses written in the
// wrong order if we didn't implement pipelining correctly
vertx.setTimer(1 + (long) (10 * Math.random()), id -> {
req.response().headers().set("count", String.valueOf(theCount));
req.response().write(buff);
@@ -1565,26 +1565,32 @@ public class HttpTest extends HttpTestBase {
});
});
CountDownLatch latch = new CountDownLatch(requests);
AtomicInteger cnt = new AtomicInteger(0);
server.listen(onSuccess(s -> {
for (int count = 0; count < requests; count++) {
int theCount = count;
HttpClientRequest req = client.post(new RequestOptions().setPort(DEFAULT_HTTP_PORT).setRequestURI(DEFAULT_TEST_URI), resp -> {
assertEquals(theCount, Integer.parseInt(resp.headers().get("count")));
resp.bodyHandler(buff -> {
assertEquals("This is content " + theCount, buff.toString());
if (theCount == requests - 1) {
testComplete();
}
vertx.setTimer(500, id -> {
for (int count = 0; count < requests; count++) {
int theCount = count;
HttpClientRequest req = client.post(new RequestOptions().setPort(DEFAULT_HTTP_PORT).setRequestURI(DEFAULT_TEST_URI), resp -> {
assertEquals(theCount, Integer.parseInt(resp.headers().get("count")));
resp.bodyHandler(buff -> {
assertEquals("This is content " + theCount, buff.toString());
latch.countDown();
});
});
});
req.setChunked(true);
req.headers().set("count", String.valueOf(count));
req.write("This is content " + count);
req.end();
}
req.setChunked(true);
req.headers().set("count", String.valueOf(count));
req.write("This is content " + count);
req.end();
}
});
}));
await();
awaitLatch(latch);
}
@Test
@@ -1607,6 +1613,7 @@ public class HttpTest extends HttpTestBase {
HttpServer[] servers = new HttpServer[numServers];
CountDownLatch startServerLatch = new CountDownLatch(numServers);
Set<HttpServer> connectedServers = new ConcurrentHashSet<>();
AtomicInteger res = new AtomicInteger(0);
for (int i = 0; i < numServers; i++) {
HttpServer server = vertx.createHttpServer(new HttpServerOptions().setHost(DEFAULT_HTTP_HOST).setPort(DEFAULT_HTTP_PORT));
server.requestHandler(req -> {
@@ -1623,6 +1630,7 @@ public class HttpTest extends HttpTestBase {
awaitLatch(startServerLatch);
CountDownLatch reqLatch = new CountDownLatch(requests);
AtomicInteger reqs = new AtomicInteger(0);
for (int count = 0; count < requests; count++) {
client.getNow(new RequestOptions().setPort(DEFAULT_HTTP_PORT).setRequestURI(DEFAULT_TEST_URI), resp -> {
assertEquals(200, resp.statusCode());

View File

@@ -24,7 +24,6 @@ import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.http.RequestOptions;
import org.vertx.java.core.impl.ConcurrentHashSet;
import org.vertx.java.core.net.*;
import org.vertx.java.core.net.impl.SocketDefaults;
@@ -1030,13 +1029,17 @@ public class NetTest extends VertxTestBase {
testComplete();
}
});
// Send some data to the client to trigger the sendfile
sock.write("foo");
});
server.listen(ar -> {
assertTrue(ar.succeeded());
client.connect(1234, ar2 -> {
assertTrue(ar2.succeeded());
NetSocket sock = ar2.result();
sock.sendFile(file.getAbsolutePath());
sock.dataHandler(buf -> {
sock.sendFile(file.getAbsolutePath());
});
});
});
@@ -1051,7 +1054,9 @@ public class NetTest extends VertxTestBase {
Buffer expected = new Buffer(content);
Buffer received = new Buffer();
server.connectHandler(sock -> {
sock.sendFile(file.getAbsolutePath());
sock.dataHandler(buf -> {
sock.sendFile(file.getAbsolutePath());
});
});
server.listen(ar -> {
assertTrue(ar.succeeded());
@@ -1065,6 +1070,7 @@ public class NetTest extends VertxTestBase {
testComplete();
}
});
sock.write("foo");
});
});