Rename context methods executeFromIO to emitFromIO and execute to emit

This commit is contained in:
Julien Viet
2019-11-05 15:50:21 +01:00
parent faa02df46d
commit 78d129f3f5
23 changed files with 112 additions and 114 deletions

View File

@@ -25,7 +25,6 @@ import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
@@ -428,7 +427,7 @@ public class DatagramSocketImpl implements DatagramSocket, MetricsProvider {
}
private void notifyException(final Handler<AsyncResult<Void>> handler, final Throwable cause) {
context.executeFromIO(v -> handler.handle(Future.failedFuture(cause)));
context.emitFromIO(v -> handler.handle(Future.failedFuture(cause)));
}
@Override

View File

@@ -347,7 +347,7 @@ public final class DnsClientImpl implements DnsClient {
});
channel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
actualCtx.executeFromIO(future.cause(), this::fail);
actualCtx.emitFromIO(future.cause(), this::fail);
}
});
}

View File

@@ -736,7 +736,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
ws.registerHandler(vertx.eventBus());
}
getContext().executeFromIO(wsRes, res -> {
getContext().emitFromIO(wsRes, res -> {
if (res.succeeded()) {
log.debug("WebSocket handshake complete");
if (metrics != null) {

View File

@@ -207,7 +207,7 @@ class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {
VertxHandler<Http1xClientConnection> clientHandler = VertxHandler.create(context, chctx -> {
Http1xClientConnection conn = new Http1xClientConnection(listener, upgrade ? HttpVersion.HTTP_1_1 : version, client, endpointMetric, chctx, ssl, server, context, metrics);
if (metrics != null) {
context.executeFromIO(v -> {
context.emitFromIO(v -> {
Object socketMetric = metrics.connected(conn.remoteAddress(), conn.remoteName());
conn.metric(socketMetric);
metrics.endpointConnected(endpointMetric, socketMetric);

View File

@@ -449,7 +449,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
HttpClientStream stream = ar1.result();
ContextInternal ctx = stream.getContext();
if (stream.id() == 1 && initializer != null) {
ctx.executeFromIO(v -> {
ctx.emitFromIO(v -> {
initializer.handle(stream.connection());
});
}

View File

@@ -37,7 +37,6 @@ import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
import java.util.function.Function;
/**
@@ -151,7 +150,7 @@ import java.util.function.Function;
private void handleException(Channel ch, Throwable cause) {
HandlerHolder<? extends Handler<Throwable>> holder = errorHandler.apply(ch.eventLoop());
if (holder != null) {
holder.context.executeFromIO(cause, holder.handler);
holder.context.emitFromIO(cause, holder.handler);
}
}
@@ -206,7 +205,7 @@ import java.util.function.Function;
if (options.getHttp2ConnectionWindowSize() > 0) {
conn.setWindowSize(options.getHttp2ConnectionWindowSize());
}
ctx.executeFromIO(conn, handler_);
ctx.emitFromIO(conn, handler_);
});
return handler;
}
@@ -254,9 +253,9 @@ import java.util.function.Function;
pipeline.addLast("handler", handler);
Http1xServerConnection conn = handler.getConnection();
if (metrics != null) {
holder.context.executeFromIO(v -> conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName())));
holder.context.emitFromIO(v -> conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName())));
}
holder.context.executeFromIO(conn, holder.handler);
holder.context.emitFromIO(conn, holder.handler);
}

View File

@@ -563,7 +563,7 @@ public class HttpServerResponseImpl implements HttpServerResponse {
} else {
res = Future.failedFuture(future.cause());
}
ctx.executeFromIO(v -> resultHandler.handle(res));
ctx.emitFromIO(v -> resultHandler.handle(res));
}
// signal body end handler
@@ -572,7 +572,7 @@ public class HttpServerResponseImpl implements HttpServerResponse {
handler = bodyEndHandler;
}
if (handler != null) {
context.executeFromIO(v -> {
context.emitFromIO(v -> {
handler.handle(null);
});
}

View File

@@ -38,7 +38,7 @@ import java.util.function.LongSupplier;
* </ul>
*
* A connection is delivered to a {@link Waiter} on the pool's context event loop thread, the waiter must take care of
* calling {@link io.vertx.core.impl.ContextInternal#executeFromIO} if necessary.
* calling {@link io.vertx.core.impl.ContextInternal#emitFromIO} if necessary.
* <p/>
* Calls to the pool are synchronized on the pool to avoid race conditions and maintain its invariants. This pool can
* be called from different threads safely (although it is not encouraged for performance reasons, we benefit from biased

View File

@@ -12,7 +12,6 @@ package io.vertx.core.impl;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
@@ -80,7 +79,13 @@ abstract class AbstractContext implements ContextInternal {
}
};
abstract <T> void executeAsync(T value, Handler<T> task);
/**
* Emit the {@code event} asynchronously.
*
* @param event the event for the {@code handler}
* @param handler the task to execute with the {@code event} argument
*/
abstract <E> void emitAsync(E event, Handler<E> handler);
@Override
public abstract boolean isEventLoopContext();
@@ -94,8 +99,8 @@ abstract class AbstractContext implements ContextInternal {
// In such a case we should already be on an event loop thread (as Netty manages the event loops)
// but check this anyway, then execute directly
@Override
public final void executeFromIO(Handler<Void> task) {
executeFromIO(null, task);
public final void emitFromIO(Handler<Void> handler) {
emitFromIO(null, handler);
}
@Override
@@ -104,8 +109,8 @@ abstract class AbstractContext implements ContextInternal {
}
@Override
public final void dispatch(Handler<Void> task) {
dispatch(null, task);
public final void dispatch(Handler<Void> handler) {
dispatch(null, handler);
}
public final ContextInternal beginDispatch() {
@@ -172,10 +177,10 @@ abstract class AbstractContext implements ContextInternal {
}
@Override
public final <T> void dispatch(T arg, Handler<T> task) {
public final <T> void dispatch(T event, Handler<T> handler) {
ContextInternal prev = beginDispatch();
try {
task.handle(arg);
handler.handle(event);
} catch (Throwable t) {
reportException(t);
} finally {
@@ -194,9 +199,9 @@ abstract class AbstractContext implements ContextInternal {
// Run the task asynchronously on this same context
@Override
public final void runOnContext(Handler<Void> task) {
public final void runOnContext(Handler<Void> handler) {
try {
executeAsync(null, task);
emitAsync(null, handler);
} catch (RejectedExecutionException ignore) {
// Pool is already shut down
}

View File

@@ -35,7 +35,7 @@ abstract class ContextImpl extends AbstractContext {
static FutureListener<Void> toListenerFuture(ContextInternal context, Handler<AsyncResult<Void>> handler) {
return future -> {
Future<Void> res = future.isSuccess() ? Future.succeededFuture() : Future.failedFuture(future.cause());
context.executeFromIO(res, handler);
context.emitFromIO(res, handler);
};
}

View File

@@ -12,7 +12,6 @@
package io.vertx.core.impl;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
@@ -22,7 +21,6 @@ import io.vertx.core.Vertx;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
/**
* This interface provides an api for vert.x core internal use only
@@ -102,33 +100,33 @@ public interface ContextInternal extends Context {
VertxInternal owner();
/**
* Like {@link #executeFromIO(Object, Handler)} but with no argument.
* @see #emit(Object, Handler)
*/
void executeFromIO(Handler<Void> task);
void emitFromIO(Handler<Void> handler);
/**
* Execute the context task and switch on this context if necessary, this also associates the
* Emit the {@code event} to the {@code handler} and switch on this context if necessary, this also associates the
* current thread with the current context so {@link Vertx#currentContext()} returns this context.<p/>
*
* The caller thread is assumed to be the event loop thread of this context.<p/>
*
* Any exception thrown from the {@literal task} will be reported on this context.
* Any exception thrown from the {@literal handler} will be reported on this context.
*
* @param value the argument for the {@code task}
* @param task the task to execute with the {@code value} argument
* @param event the event for the {@code handler}
* @param handler the handler to execute with the {@code event} argument
*/
<T> void executeFromIO(T value, Handler<T> task);
<T> void emitFromIO(T event, Handler<T> handler);
/**
* Execute the context task and switch on this context if necessary, this also associates the
* Emit the {@code event} to the {@code handler} and switch on this context if necessary, this also associates the
* current thread with the current context so {@link Vertx#currentContext()} returns this context.<p/>
*
* Any exception thrown from the {@literal task} will be reported on this context.
* Any exception thrown from the {@literal handler} will be reported on this context.
*
* @param value the argument for the {@code task}
* @param task the task to execute with the {@code value} argument
* @param event the event for the {@code handler}
* @param handler the handler to execute with the {@code event} argument
*/
<T> void execute(T value, Handler<T> task);
<E> void emit(E event, Handler<E> handler);
/**
* @see #schedule(Object, Handler)
@@ -148,20 +146,20 @@ public interface ContextInternal extends Context {
/**
* @see #dispatch(Object, Handler)
*/
void dispatch(Handler<Void> task);
void dispatch(Handler<Void> handler);
/**
* Dispatch a task on this context. The task is executed directly by the caller thread which must be a
* Dispatch a {@code event} to the {@code handler} on this context. The handler is executed directly by the caller thread which must be a
* {@link VertxThread}.
* <p>
* The task execution is monitored by the blocked thread checker.
* The handler execution is monitored by the blocked thread checker.
* <p>
* This context is thread-local associated during the task execution.
*
* @param arg the task argument
* @param task the task to execute
* @param event the event for the {@code handler}
* @param handler the handler to execute with the {@code event} argument
*/
<T> void dispatch(T arg, Handler<T> task);
<E> void dispatch(E event, Handler<E> handler);
/**
* Begin the dispatch of a task on this context.

View File

@@ -13,7 +13,6 @@ package io.vertx.core.impl;
import io.netty.channel.EventLoop;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.spi.tracing.VertxTracer;
@@ -36,8 +35,8 @@ public class EventLoopContext extends ContextImpl {
}
@Override
<T> void executeAsync(T value, Handler<T> task) {
nettyEventLoop().execute(() -> dispatch(value, task));
<T> void emitAsync(T event, Handler<T> handler) {
nettyEventLoop().execute(() -> dispatch(event, handler));
}
@Override
@@ -46,16 +45,16 @@ public class EventLoopContext extends ContextImpl {
}
@Override
public <T> void executeFromIO(T value, Handler<T> task) {
public <T> void emitFromIO(T event, Handler<T> handler) {
if (THREAD_CHECKS) {
checkEventLoopThread();
}
dispatch(value, task);
dispatch(event, handler);
}
@Override
public <T> void execute(T value, Handler<T> task) {
execute(this, value, task);
public <T> void emit(T event, Handler<T> handler) {
execute(this, event, handler);
}
private static <T> void execute(AbstractContext ctx, T value, Handler<T> task) {
@@ -64,10 +63,10 @@ public class EventLoopContext extends ContextImpl {
if (AbstractContext.context() == ctx) {
ctx.dispatch(value, task);
} else {
ctx.executeFromIO(value, task);
ctx.emitFromIO(value, task);
}
} else {
ctx.executeAsync(value, task);
ctx.emitAsync(value, task);
}
}
@@ -88,21 +87,21 @@ public class EventLoopContext extends ContextImpl {
}
@Override
<T> void executeAsync(T value, Handler<T> task) {
nettyEventLoop().execute(() -> dispatch(value, task));
<T> void emitAsync(T event, Handler<T> handler) {
nettyEventLoop().execute(() -> dispatch(event, handler));
}
@Override
public <T> void executeFromIO(T value, Handler<T> task) {
public <T> void emitFromIO(T event, Handler<T> handler) {
if (THREAD_CHECKS) {
checkEventLoopThread();
}
dispatch(value, task);
dispatch(event, handler);
}
@Override
public <T> void execute(T value, Handler<T> task) {
EventLoopContext.execute(this, value, task);
public <T> void emit(T event, Handler<T> handler) {
EventLoopContext.execute(this, event, handler);
}
@Override

View File

@@ -122,7 +122,7 @@ class FutureImpl<T> implements PromiseInternal<T>, Future<T> {
private void doDispatch(Handler<AsyncResult<T>> handler) {
if (context != null) {
context.execute(this, handler);
context.emit(this, handler);
} else {
handler.handle(this);
}

View File

@@ -947,7 +947,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
@Override
public void run() {
context.executeFromIO(this);
context.emitFromIO(this);
}
public void handle(Void v) {

View File

@@ -12,7 +12,6 @@
package io.vertx.core.impl;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.tracing.VertxTracer;
@@ -27,8 +26,8 @@ class WorkerContext extends ContextImpl {
}
@Override
<T> void executeAsync(T value, Handler<T> task) {
executeAsync(this, value, task);
<T> void emitAsync(T event, Handler<T> handler) {
executeAsync(this, event, handler);
}
@Override
@@ -39,25 +38,25 @@ class WorkerContext extends ContextImpl {
// In the case of a worker context, the IO will always be provided on an event loop thread, not a worker thread
// so we need to execute it on the worker thread
@Override
public <T> void executeFromIO(T value, Handler<T> task) {
public <T> void emitFromIO(T event, Handler<T> handler) {
if (THREAD_CHECKS) {
checkEventLoopThread();
}
executeAsync(this, value ,task);
executeAsync(this, event, handler);
}
@Override
public <T> void execute(T value, Handler<T> task) {
execute(this, value, task);
public <T> void emit(T event, Handler<T> handler) {
execute(this, event, handler);
}
private static <T> void execute(AbstractContext ctx, T value, Handler<T> task) {
if (AbstractContext.context() == ctx) {
ctx.dispatch(value, task);
} else if (ctx.nettyEventLoop().inEventLoop()) {
ctx.executeFromIO(value, task);
ctx.emitFromIO(value, task);
} else {
ctx.executeAsync(value, task);
ctx.emitAsync(value, task);
}
}
@@ -108,18 +107,18 @@ class WorkerContext extends ContextImpl {
}
@Override
<T> void executeAsync(T value, Handler<T> task) {
executeFromIO(value, task);
<T> void emitAsync(T event, Handler<T> handler) {
emitFromIO(event, handler);
}
@Override
public <T> void executeFromIO(T value, Handler<T> task) {
delegate.executeAsync(this, value, task);
public <T> void emitFromIO(T event, Handler<T> handler) {
delegate.executeAsync(this, event, handler);
}
@Override
public <T> void execute(T value, Handler<T> task) {
delegate.execute(this, value, task);
public <T> void emit(T event, Handler<T> handler) {
delegate.execute(this, event, handler);
}
@Override

View File

@@ -218,7 +218,7 @@ public class NetClientImpl implements MetricsProvider, NetClient {
// FileNotFoundException for domain sockets
boolean connectError = cause instanceof ConnectException || cause instanceof FileNotFoundException;
if (connectError && (remainingAttempts > 0 || remainingAttempts == -1)) {
context.executeFromIO(v -> {
context.emitFromIO(v -> {
log.debug("Failed to create connection. Will retry in " + options.getReconnectInterval() + " milliseconds");
//Set a timer to retry connection
vertx.setTimer(options.getReconnectInterval(), tid ->
@@ -239,7 +239,7 @@ public class NetClientImpl implements MetricsProvider, NetClient {
VertxHandler<NetSocketImpl> handler = VertxHandler.create(context, ctx -> new NetSocketImpl(vertx, ctx, remoteAddress, context, sslHelper, metrics));
handler.addHandler(sock -> {
socketMap.put(ch, sock);
context.executeFromIO(v -> {
context.emitFromIO(v -> {
if (metrics != null) {
sock.metric(metrics.connected(sock.remoteAddress(), sock.remoteName()));
}
@@ -257,7 +257,7 @@ public class NetClientImpl implements MetricsProvider, NetClient {
if (ch != null) {
ch.close();
}
context.executeFromIO(th, connectHandler::tryFail);
context.emitFromIO(th, connectHandler::tryFail);
}
@Override

View File

@@ -225,7 +225,7 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer {
} else {
Handler<Throwable> exceptionHandler = handler.handler.exceptionHandler;
if (exceptionHandler != null) {
handler.context.executeFromIO(v -> {
handler.context.emitFromIO(v -> {
exceptionHandler.handle(ar.cause());
});
} else {
@@ -451,7 +451,7 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer {
VertxHandler<NetSocketImpl> nh = VertxHandler.create(handler.context, ctx -> new NetSocketImpl(vertx, ctx, handler.context, sslHelper, metrics));
nh.addHandler(conn -> {
socketMap.put(ch, conn);
handler.context.executeFromIO(v -> {
handler.context.emitFromIO(v -> {
if (metrics != null) {
conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName()));
}

View File

@@ -320,7 +320,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
if (sslHandler == null) {
chctx.pipeline().addFirst("handshaker", new SslHandshakeCompletionHandler(ar -> {
if (handler != null) {
context.executeFromIO(ar.mapEmpty(), handler);
context.emitFromIO(ar.mapEmpty(), handler);
}
}));
if (remoteAddress != null) {

View File

@@ -140,7 +140,7 @@ public final class VertxHandler<C extends ConnectionBase> extends ChannelDuplexH
// Don't remove the connection at this point, or the handleClosed won't be called when channelInactive is called!
C connection = getConnection();
if (connection != null) {
context.executeFromIO(v -> {
context.emitFromIO(v -> {
try {
if (ch.isOpen()) {
ch.close();

View File

@@ -59,12 +59,12 @@ public class ContextBenchmark extends BenchmarkBase {
@Benchmark
public void executeFromIO(BaselineState state) {
state.context.executeFromIO(state.task);
state.context.emitFromIO(state.task);
}
@Benchmark
@Fork(jvmArgsAppend = { "-Dvertx.threadChecks=false", "-Dvertx.disableContextTimings=true", "-Dvertx.disableTCCL=true" })
public void executeFromIONoChecks(BaselineState state) {
state.context.executeFromIO(state.task);
state.context.emitFromIO(state.task);
}
}

View File

@@ -29,16 +29,16 @@ public class BenchmarkContext extends ContextImpl {
}
@Override
<T> void executeAsync(T value, Handler<T> task) {
executeFromIO(value, task);
<T> void emitAsync(T event, Handler<T> handler) {
emitFromIO(event, handler);
}
@Override
public <T> void executeFromIO(T value, Handler<T> task) {
public <T> void emitFromIO(T event, Handler<T> handler) {
if (THREAD_CHECKS) {
checkEventLoopThread();
}
dispatch(value, task);
dispatch(event, handler);
}
@Override
@@ -47,7 +47,7 @@ public class BenchmarkContext extends ContextImpl {
}
@Override
public <T> void execute(T value, Handler<T> task) {
public <T> void emit(T event, Handler<T> handler) {
throw new UnsupportedOperationException();
}

View File

@@ -187,7 +187,7 @@ public class ContextTest extends VertxTestBase {
// Check from other thread
try {
eventLoopContext.executeFromIO(v -> fail());
eventLoopContext.emitFromIO(v -> fail());
fail();
} catch (IllegalStateException expected) {
}
@@ -198,7 +198,7 @@ public class ContextTest extends VertxTestBase {
assertNull(Vertx.currentContext());
Thread vertxThread = Thread.currentThread();
AtomicBoolean nested = new AtomicBoolean(true);
eventLoopContext.executeFromIO(v -> {
eventLoopContext.emitFromIO(v -> {
assertTrue(nested.get());
assertSame(eventLoopContext, Vertx.currentContext());
assertSame(vertxThread, Thread.currentThread());
@@ -215,7 +215,7 @@ public class ContextTest extends VertxTestBase {
workerContext.nettyEventLoop().execute(() -> {
assertNull(Vertx.currentContext());
workerContext.nettyEventLoop().execute(() -> {
workerContext.executeFromIO(v -> {
workerContext.emitFromIO(v -> {
assertSame(workerContext, Vertx.currentContext());
assertTrue(Context.isOnWorkerThread());
testComplete();
@@ -439,7 +439,7 @@ public class ContextTest extends VertxTestBase {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
AtomicBoolean called = new AtomicBoolean();
try {
ctx.executeFromIO(v -> {
ctx.emitFromIO(v -> {
called.set(true);
});
fail();
@@ -455,7 +455,7 @@ public class ContextTest extends VertxTestBase {
ContextInternal ctx = createWorkerContext();
AtomicBoolean called = new AtomicBoolean();
try {
ctx.executeFromIO(v -> {
ctx.emitFromIO(v -> {
called.set(true);
});
fail();
@@ -469,7 +469,7 @@ public class ContextTest extends VertxTestBase {
public void testEventLoopContextExecuteFromAnyThread() {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Object expected = new Object();
ctx.execute(expected, event -> {
ctx.emit(expected, event -> {
assertSame(ctx, Vertx.currentContext());
assertTrue(Context.isOnEventLoopThread());
assertSame(expected, event);
@@ -485,7 +485,7 @@ public class ContextTest extends VertxTestBase {
ctx.runOnContext(v -> {
ThreadLocal<Boolean> local = new ThreadLocal<>();
local.set(true);
ctx.execute(expected, event -> {
ctx.emit(expected, event -> {
assertSame(ctx, Vertx.currentContext());
assertTrue(Context.isOnEventLoopThread());
assertEquals(true, local.get());
@@ -505,7 +505,7 @@ public class ContextTest extends VertxTestBase {
Thread t = Thread.currentThread();
ThreadLocal<Boolean> local = new ThreadLocal<>();
local.set(true);
ctx.execute(expected, event -> {
ctx.emit(expected, event -> {
assertSame(t, Thread.currentThread());
assertSame(ctx, Vertx.currentContext());
assertTrue(Context.isOnEventLoopThread());
@@ -525,7 +525,7 @@ public class ContextTest extends VertxTestBase {
Object expected = new Object();
any.runOnContext(v -> {
Thread thread = Thread.currentThread();
ctx.execute(expected, event -> {
ctx.emit(expected, event -> {
assertNotSame(thread, Thread.currentThread());
assertSame(ctx, Vertx.currentContext());
assertTrue(Context.isOnEventLoopThread());
@@ -543,7 +543,7 @@ public class ContextTest extends VertxTestBase {
Object expected = new Object();
any.nettyEventLoop().execute(() -> {
Thread thread = Thread.currentThread();
ctx.execute(expected, event -> {
ctx.emit(expected, event -> {
assertNotSame(thread, Thread.currentThread());
assertSame(ctx, Vertx.currentContext());
assertTrue(Context.isOnEventLoopThread());
@@ -560,7 +560,7 @@ public class ContextTest extends VertxTestBase {
RuntimeException failure = new RuntimeException();
AtomicReference<Throwable> caught = new AtomicReference<>();
ctx.exceptionHandler(caught::set);
ctx.execute(new Object(), event -> {
ctx.emit(new Object(), event -> {
throw failure;
});
assertWaitUntil(() -> caught.get() == failure);
@@ -570,7 +570,7 @@ public class ContextTest extends VertxTestBase {
public void testWorkerContextExecuteFromAnyThread() {
ContextInternal ctx = createWorkerContext();
Object expected = new Object();
ctx.execute(expected, event -> {
ctx.emit(expected, event -> {
assertSame(ctx, Vertx.currentContext());
assertTrue(Context.isOnWorkerThread());
assertSame(expected, event);
@@ -586,7 +586,7 @@ public class ContextTest extends VertxTestBase {
ctx.runOnContext(v -> {
ThreadLocal<Boolean> local = new ThreadLocal<>();
local.set(true);
ctx.execute(expected, event -> {
ctx.emit(expected, event -> {
assertSame(ctx, Vertx.currentContext());
assertTrue(Context.isOnWorkerThread());
assertEquals(true, local.get());
@@ -606,7 +606,7 @@ public class ContextTest extends VertxTestBase {
Thread t = Thread.currentThread();
ThreadLocal<Boolean> local = new ThreadLocal<>();
local.set(true);
ctx.execute(expected, event -> {
ctx.emit(expected, event -> {
assertNotSame(t, Thread.currentThread());
assertSame(ctx, Vertx.currentContext());
assertTrue(Context.isOnWorkerThread());
@@ -626,7 +626,7 @@ public class ContextTest extends VertxTestBase {
Object expected = new Object();
any.runOnContext(v -> {
Thread thread = Thread.currentThread();
ctx.execute(expected, event -> {
ctx.emit(expected, event -> {
assertNotSame(thread, Thread.currentThread());
assertSame(ctx, Vertx.currentContext());
assertTrue(Context.isOnWorkerThread());
@@ -650,7 +650,7 @@ public class ContextTest extends VertxTestBase {
Object expected = new Object();
any.nettyEventLoop().execute(() -> {
Thread thread = Thread.currentThread();
ctx.execute(expected, event -> {
ctx.emit(expected, event -> {
assertNotSame(thread, Thread.currentThread());
assertSame(ctx, Vertx.currentContext());
assertTrue(Context.isOnWorkerThread());
@@ -667,7 +667,7 @@ public class ContextTest extends VertxTestBase {
RuntimeException failure = new RuntimeException();
AtomicReference<Throwable> caught = new AtomicReference<>();
ctx.exceptionHandler(caught::set);
ctx.execute(new Object(), event -> {
ctx.emit(new Object(), event -> {
throw failure;
});
assertWaitUntil(() -> caught.get() == failure);

View File

@@ -22,7 +22,6 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
@@ -66,14 +65,14 @@ public class EventLoopGroupTest extends VertxTestBase {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
assertSame(contextThread.get(), Thread.currentThread());
context.executeFromIO(v -> {
context.emitFromIO(v -> {
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
assertSame(contextThread.get(), Thread.currentThread());
context.executeFromIO(v -> {
context.emitFromIO(v -> {
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
});
@@ -83,7 +82,7 @@ public class EventLoopGroupTest extends VertxTestBase {
ByteBuf buf = (ByteBuf) msg;
assertEquals("hello", buf.toString(StandardCharsets.UTF_8));
assertSame(contextThread.get(), Thread.currentThread());
context.executeFromIO(v -> {
context.emitFromIO(v -> {
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
});
@@ -91,7 +90,7 @@ public class EventLoopGroupTest extends VertxTestBase {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
assertSame(contextThread.get(), Thread.currentThread());
context.executeFromIO(v -> {
context.emitFromIO(v -> {
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
@@ -100,7 +99,7 @@ public class EventLoopGroupTest extends VertxTestBase {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
assertSame(contextThread.get(), Thread.currentThread());
context.executeFromIO(v -> {
context.emitFromIO(v -> {
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
testComplete();