diff --git a/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java b/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java index 671d759c1..4fa448c80 100644 --- a/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java +++ b/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java @@ -25,7 +25,6 @@ import io.vertx.codegen.annotations.Nullable; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.datagram.DatagramSocket; import io.vertx.core.datagram.DatagramSocketOptions; @@ -428,7 +427,7 @@ public class DatagramSocketImpl implements DatagramSocket, MetricsProvider { } private void notifyException(final Handler> handler, final Throwable cause) { - context.executeFromIO(v -> handler.handle(Future.failedFuture(cause))); + context.emitFromIO(v -> handler.handle(Future.failedFuture(cause))); } @Override diff --git a/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java b/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java index d2939bc4b..6c086fb9a 100644 --- a/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java +++ b/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java @@ -347,7 +347,7 @@ public final class DnsClientImpl implements DnsClient { }); channel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { - actualCtx.executeFromIO(future.cause(), this::fail); + actualCtx.emitFromIO(future.cause(), this::fail); } }); } diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 9a5a1c0b4..0f999f86c 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -736,7 +736,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme ws.registerHandler(vertx.eventBus()); } - getContext().executeFromIO(wsRes, res -> { + getContext().emitFromIO(wsRes, res -> { if (res.succeeded()) { log.debug("WebSocket handshake complete"); if (metrics != null) { diff --git a/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java b/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java index a9436cfcc..ebda426da 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java +++ b/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java @@ -207,7 +207,7 @@ class HttpChannelConnector implements ConnectionProvider { VertxHandler clientHandler = VertxHandler.create(context, chctx -> { Http1xClientConnection conn = new Http1xClientConnection(listener, upgrade ? HttpVersion.HTTP_1_1 : version, client, endpointMetric, chctx, ssl, server, context, metrics); if (metrics != null) { - context.executeFromIO(v -> { + context.emitFromIO(v -> { Object socketMetric = metrics.connected(conn.remoteAddress(), conn.remoteName()); conn.metric(socketMetric); metrics.endpointConnected(endpointMetric, socketMetric); diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java index a085bf48d..21268bc4c 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java @@ -449,7 +449,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http HttpClientStream stream = ar1.result(); ContextInternal ctx = stream.getContext(); if (stream.id() == 1 && initializer != null) { - ctx.executeFromIO(v -> { + ctx.emitFromIO(v -> { initializer.handle(stream.connection()); }); } diff --git a/src/main/java/io/vertx/core/http/impl/HttpServerChannelInitializer.java b/src/main/java/io/vertx/core/http/impl/HttpServerChannelInitializer.java index 52682b150..5c3408ca6 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpServerChannelInitializer.java +++ b/src/main/java/io/vertx/core/http/impl/HttpServerChannelInitializer.java @@ -37,7 +37,6 @@ import io.vertx.core.net.impl.VertxHandler; import io.vertx.core.spi.metrics.HttpServerMetrics; import java.nio.charset.StandardCharsets; -import java.util.function.BiConsumer; import java.util.function.Function; /** @@ -151,7 +150,7 @@ import java.util.function.Function; private void handleException(Channel ch, Throwable cause) { HandlerHolder> holder = errorHandler.apply(ch.eventLoop()); if (holder != null) { - holder.context.executeFromIO(cause, holder.handler); + holder.context.emitFromIO(cause, holder.handler); } } @@ -206,7 +205,7 @@ import java.util.function.Function; if (options.getHttp2ConnectionWindowSize() > 0) { conn.setWindowSize(options.getHttp2ConnectionWindowSize()); } - ctx.executeFromIO(conn, handler_); + ctx.emitFromIO(conn, handler_); }); return handler; } @@ -254,9 +253,9 @@ import java.util.function.Function; pipeline.addLast("handler", handler); Http1xServerConnection conn = handler.getConnection(); if (metrics != null) { - holder.context.executeFromIO(v -> conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName()))); + holder.context.emitFromIO(v -> conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName()))); } - holder.context.executeFromIO(conn, holder.handler); + holder.context.emitFromIO(conn, holder.handler); } diff --git a/src/main/java/io/vertx/core/http/impl/HttpServerResponseImpl.java b/src/main/java/io/vertx/core/http/impl/HttpServerResponseImpl.java index 0d98f62ea..8095f8098 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpServerResponseImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpServerResponseImpl.java @@ -563,7 +563,7 @@ public class HttpServerResponseImpl implements HttpServerResponse { } else { res = Future.failedFuture(future.cause()); } - ctx.executeFromIO(v -> resultHandler.handle(res)); + ctx.emitFromIO(v -> resultHandler.handle(res)); } // signal body end handler @@ -572,7 +572,7 @@ public class HttpServerResponseImpl implements HttpServerResponse { handler = bodyEndHandler; } if (handler != null) { - context.executeFromIO(v -> { + context.emitFromIO(v -> { handler.handle(null); }); } diff --git a/src/main/java/io/vertx/core/http/impl/pool/Pool.java b/src/main/java/io/vertx/core/http/impl/pool/Pool.java index 640afea06..610f70141 100644 --- a/src/main/java/io/vertx/core/http/impl/pool/Pool.java +++ b/src/main/java/io/vertx/core/http/impl/pool/Pool.java @@ -38,7 +38,7 @@ import java.util.function.LongSupplier; * * * A connection is delivered to a {@link Waiter} on the pool's context event loop thread, the waiter must take care of - * calling {@link io.vertx.core.impl.ContextInternal#executeFromIO} if necessary. + * calling {@link io.vertx.core.impl.ContextInternal#emitFromIO} if necessary. *

* Calls to the pool are synchronized on the pool to avoid race conditions and maintain its invariants. This pool can * be called from different threads safely (although it is not encouraged for performance reasons, we benefit from biased diff --git a/src/main/java/io/vertx/core/impl/AbstractContext.java b/src/main/java/io/vertx/core/impl/AbstractContext.java index 557692dff..7a27f2d29 100644 --- a/src/main/java/io/vertx/core/impl/AbstractContext.java +++ b/src/main/java/io/vertx/core/impl/AbstractContext.java @@ -12,7 +12,6 @@ package io.vertx.core.impl; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.FastThreadLocalThread; -import io.vertx.codegen.annotations.Nullable; import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Future; @@ -80,7 +79,13 @@ abstract class AbstractContext implements ContextInternal { } }; - abstract void executeAsync(T value, Handler task); + /** + * Emit the {@code event} asynchronously. + * + * @param event the event for the {@code handler} + * @param handler the task to execute with the {@code event} argument + */ + abstract void emitAsync(E event, Handler handler); @Override public abstract boolean isEventLoopContext(); @@ -94,8 +99,8 @@ abstract class AbstractContext implements ContextInternal { // In such a case we should already be on an event loop thread (as Netty manages the event loops) // but check this anyway, then execute directly @Override - public final void executeFromIO(Handler task) { - executeFromIO(null, task); + public final void emitFromIO(Handler handler) { + emitFromIO(null, handler); } @Override @@ -104,8 +109,8 @@ abstract class AbstractContext implements ContextInternal { } @Override - public final void dispatch(Handler task) { - dispatch(null, task); + public final void dispatch(Handler handler) { + dispatch(null, handler); } public final ContextInternal beginDispatch() { @@ -172,10 +177,10 @@ abstract class AbstractContext implements ContextInternal { } @Override - public final void dispatch(T arg, Handler task) { + public final void dispatch(T event, Handler handler) { ContextInternal prev = beginDispatch(); try { - task.handle(arg); + handler.handle(event); } catch (Throwable t) { reportException(t); } finally { @@ -194,9 +199,9 @@ abstract class AbstractContext implements ContextInternal { // Run the task asynchronously on this same context @Override - public final void runOnContext(Handler task) { + public final void runOnContext(Handler handler) { try { - executeAsync(null, task); + emitAsync(null, handler); } catch (RejectedExecutionException ignore) { // Pool is already shut down } diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index f05845159..771f1f2c9 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -35,7 +35,7 @@ abstract class ContextImpl extends AbstractContext { static FutureListener toListenerFuture(ContextInternal context, Handler> handler) { return future -> { Future res = future.isSuccess() ? Future.succeededFuture() : Future.failedFuture(future.cause()); - context.executeFromIO(res, handler); + context.emitFromIO(res, handler); }; } diff --git a/src/main/java/io/vertx/core/impl/ContextInternal.java b/src/main/java/io/vertx/core/impl/ContextInternal.java index 89e41a845..ff364d7bc 100644 --- a/src/main/java/io/vertx/core/impl/ContextInternal.java +++ b/src/main/java/io/vertx/core/impl/ContextInternal.java @@ -12,7 +12,6 @@ package io.vertx.core.impl; import io.netty.channel.EventLoop; -import io.netty.util.concurrent.FutureListener; import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Future; @@ -22,7 +21,6 @@ import io.vertx.core.Vertx; import io.vertx.core.spi.tracing.VertxTracer; import java.util.concurrent.ConcurrentMap; -import java.util.function.Function; /** * This interface provides an api for vert.x core internal use only @@ -102,33 +100,33 @@ public interface ContextInternal extends Context { VertxInternal owner(); /** - * Like {@link #executeFromIO(Object, Handler)} but with no argument. + * @see #emit(Object, Handler) */ - void executeFromIO(Handler task); + void emitFromIO(Handler handler); /** - * Execute the context task and switch on this context if necessary, this also associates the + * Emit the {@code event} to the {@code handler} and switch on this context if necessary, this also associates the * current thread with the current context so {@link Vertx#currentContext()} returns this context.

* * The caller thread is assumed to be the event loop thread of this context.

* - * Any exception thrown from the {@literal task} will be reported on this context. + * Any exception thrown from the {@literal handler} will be reported on this context. * - * @param value the argument for the {@code task} - * @param task the task to execute with the {@code value} argument + * @param event the event for the {@code handler} + * @param handler the handler to execute with the {@code event} argument */ - void executeFromIO(T value, Handler task); + void emitFromIO(T event, Handler handler); /** - * Execute the context task and switch on this context if necessary, this also associates the + * Emit the {@code event} to the {@code handler} and switch on this context if necessary, this also associates the * current thread with the current context so {@link Vertx#currentContext()} returns this context.

* - * Any exception thrown from the {@literal task} will be reported on this context. + * Any exception thrown from the {@literal handler} will be reported on this context. * - * @param value the argument for the {@code task} - * @param task the task to execute with the {@code value} argument + * @param event the event for the {@code handler} + * @param handler the handler to execute with the {@code event} argument */ - void execute(T value, Handler task); + void emit(E event, Handler handler); /** * @see #schedule(Object, Handler) @@ -148,20 +146,20 @@ public interface ContextInternal extends Context { /** * @see #dispatch(Object, Handler) */ - void dispatch(Handler task); + void dispatch(Handler handler); /** - * Dispatch a task on this context. The task is executed directly by the caller thread which must be a + * Dispatch a {@code event} to the {@code handler} on this context. The handler is executed directly by the caller thread which must be a * {@link VertxThread}. *

- * The task execution is monitored by the blocked thread checker. + * The handler execution is monitored by the blocked thread checker. *

* This context is thread-local associated during the task execution. * - * @param arg the task argument - * @param task the task to execute + * @param event the event for the {@code handler} + * @param handler the handler to execute with the {@code event} argument */ - void dispatch(T arg, Handler task); + void dispatch(E event, Handler handler); /** * Begin the dispatch of a task on this context. diff --git a/src/main/java/io/vertx/core/impl/EventLoopContext.java b/src/main/java/io/vertx/core/impl/EventLoopContext.java index fe5c25d4b..74469ba62 100644 --- a/src/main/java/io/vertx/core/impl/EventLoopContext.java +++ b/src/main/java/io/vertx/core/impl/EventLoopContext.java @@ -13,7 +13,6 @@ package io.vertx.core.impl; import io.netty.channel.EventLoop; import io.vertx.core.Handler; -import io.vertx.core.Vertx; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.spi.tracing.VertxTracer; @@ -36,8 +35,8 @@ public class EventLoopContext extends ContextImpl { } @Override - void executeAsync(T value, Handler task) { - nettyEventLoop().execute(() -> dispatch(value, task)); + void emitAsync(T event, Handler handler) { + nettyEventLoop().execute(() -> dispatch(event, handler)); } @Override @@ -46,16 +45,16 @@ public class EventLoopContext extends ContextImpl { } @Override - public void executeFromIO(T value, Handler task) { + public void emitFromIO(T event, Handler handler) { if (THREAD_CHECKS) { checkEventLoopThread(); } - dispatch(value, task); + dispatch(event, handler); } @Override - public void execute(T value, Handler task) { - execute(this, value, task); + public void emit(T event, Handler handler) { + execute(this, event, handler); } private static void execute(AbstractContext ctx, T value, Handler task) { @@ -64,10 +63,10 @@ public class EventLoopContext extends ContextImpl { if (AbstractContext.context() == ctx) { ctx.dispatch(value, task); } else { - ctx.executeFromIO(value, task); + ctx.emitFromIO(value, task); } } else { - ctx.executeAsync(value, task); + ctx.emitAsync(value, task); } } @@ -88,21 +87,21 @@ public class EventLoopContext extends ContextImpl { } @Override - void executeAsync(T value, Handler task) { - nettyEventLoop().execute(() -> dispatch(value, task)); + void emitAsync(T event, Handler handler) { + nettyEventLoop().execute(() -> dispatch(event, handler)); } @Override - public void executeFromIO(T value, Handler task) { + public void emitFromIO(T event, Handler handler) { if (THREAD_CHECKS) { checkEventLoopThread(); } - dispatch(value, task); + dispatch(event, handler); } @Override - public void execute(T value, Handler task) { - EventLoopContext.execute(this, value, task); + public void emit(T event, Handler handler) { + EventLoopContext.execute(this, event, handler); } @Override diff --git a/src/main/java/io/vertx/core/impl/FutureImpl.java b/src/main/java/io/vertx/core/impl/FutureImpl.java index 391144d4d..fc8302386 100644 --- a/src/main/java/io/vertx/core/impl/FutureImpl.java +++ b/src/main/java/io/vertx/core/impl/FutureImpl.java @@ -122,7 +122,7 @@ class FutureImpl implements PromiseInternal, Future { private void doDispatch(Handler> handler) { if (context != null) { - context.execute(this, handler); + context.emit(this, handler); } else { handler.handle(this); } diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java index 1e7453e5e..66a9f9c2b 100644 --- a/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -947,7 +947,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider { @Override public void run() { - context.executeFromIO(this); + context.emitFromIO(this); } public void handle(Void v) { diff --git a/src/main/java/io/vertx/core/impl/WorkerContext.java b/src/main/java/io/vertx/core/impl/WorkerContext.java index ee673e115..c1de98480 100644 --- a/src/main/java/io/vertx/core/impl/WorkerContext.java +++ b/src/main/java/io/vertx/core/impl/WorkerContext.java @@ -12,7 +12,6 @@ package io.vertx.core.impl; import io.vertx.core.Handler; -import io.vertx.core.Vertx; import io.vertx.core.spi.metrics.PoolMetrics; import io.vertx.core.spi.tracing.VertxTracer; @@ -27,8 +26,8 @@ class WorkerContext extends ContextImpl { } @Override - void executeAsync(T value, Handler task) { - executeAsync(this, value, task); + void emitAsync(T event, Handler handler) { + executeAsync(this, event, handler); } @Override @@ -39,25 +38,25 @@ class WorkerContext extends ContextImpl { // In the case of a worker context, the IO will always be provided on an event loop thread, not a worker thread // so we need to execute it on the worker thread @Override - public void executeFromIO(T value, Handler task) { + public void emitFromIO(T event, Handler handler) { if (THREAD_CHECKS) { checkEventLoopThread(); } - executeAsync(this, value ,task); + executeAsync(this, event, handler); } @Override - public void execute(T value, Handler task) { - execute(this, value, task); + public void emit(T event, Handler handler) { + execute(this, event, handler); } private static void execute(AbstractContext ctx, T value, Handler task) { if (AbstractContext.context() == ctx) { ctx.dispatch(value, task); } else if (ctx.nettyEventLoop().inEventLoop()) { - ctx.executeFromIO(value, task); + ctx.emitFromIO(value, task); } else { - ctx.executeAsync(value, task); + ctx.emitAsync(value, task); } } @@ -108,18 +107,18 @@ class WorkerContext extends ContextImpl { } @Override - void executeAsync(T value, Handler task) { - executeFromIO(value, task); + void emitAsync(T event, Handler handler) { + emitFromIO(event, handler); } @Override - public void executeFromIO(T value, Handler task) { - delegate.executeAsync(this, value, task); + public void emitFromIO(T event, Handler handler) { + delegate.executeAsync(this, event, handler); } @Override - public void execute(T value, Handler task) { - delegate.execute(this, value, task); + public void emit(T event, Handler handler) { + delegate.execute(this, event, handler); } @Override diff --git a/src/main/java/io/vertx/core/net/impl/NetClientImpl.java b/src/main/java/io/vertx/core/net/impl/NetClientImpl.java index beef47931..1bf62513f 100644 --- a/src/main/java/io/vertx/core/net/impl/NetClientImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetClientImpl.java @@ -218,7 +218,7 @@ public class NetClientImpl implements MetricsProvider, NetClient { // FileNotFoundException for domain sockets boolean connectError = cause instanceof ConnectException || cause instanceof FileNotFoundException; if (connectError && (remainingAttempts > 0 || remainingAttempts == -1)) { - context.executeFromIO(v -> { + context.emitFromIO(v -> { log.debug("Failed to create connection. Will retry in " + options.getReconnectInterval() + " milliseconds"); //Set a timer to retry connection vertx.setTimer(options.getReconnectInterval(), tid -> @@ -239,7 +239,7 @@ public class NetClientImpl implements MetricsProvider, NetClient { VertxHandler handler = VertxHandler.create(context, ctx -> new NetSocketImpl(vertx, ctx, remoteAddress, context, sslHelper, metrics)); handler.addHandler(sock -> { socketMap.put(ch, sock); - context.executeFromIO(v -> { + context.emitFromIO(v -> { if (metrics != null) { sock.metric(metrics.connected(sock.remoteAddress(), sock.remoteName())); } @@ -257,7 +257,7 @@ public class NetClientImpl implements MetricsProvider, NetClient { if (ch != null) { ch.close(); } - context.executeFromIO(th, connectHandler::tryFail); + context.emitFromIO(th, connectHandler::tryFail); } @Override diff --git a/src/main/java/io/vertx/core/net/impl/NetServerImpl.java b/src/main/java/io/vertx/core/net/impl/NetServerImpl.java index 6a4e7f0b0..0b6912676 100644 --- a/src/main/java/io/vertx/core/net/impl/NetServerImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetServerImpl.java @@ -225,7 +225,7 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer { } else { Handler exceptionHandler = handler.handler.exceptionHandler; if (exceptionHandler != null) { - handler.context.executeFromIO(v -> { + handler.context.emitFromIO(v -> { exceptionHandler.handle(ar.cause()); }); } else { @@ -451,7 +451,7 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer { VertxHandler nh = VertxHandler.create(handler.context, ctx -> new NetSocketImpl(vertx, ctx, handler.context, sslHelper, metrics)); nh.addHandler(conn -> { socketMap.put(ch, conn); - handler.context.executeFromIO(v -> { + handler.context.emitFromIO(v -> { if (metrics != null) { conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName())); } diff --git a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java index 0670b8521..06a909295 100644 --- a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java @@ -320,7 +320,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { if (sslHandler == null) { chctx.pipeline().addFirst("handshaker", new SslHandshakeCompletionHandler(ar -> { if (handler != null) { - context.executeFromIO(ar.mapEmpty(), handler); + context.emitFromIO(ar.mapEmpty(), handler); } })); if (remoteAddress != null) { diff --git a/src/main/java/io/vertx/core/net/impl/VertxHandler.java b/src/main/java/io/vertx/core/net/impl/VertxHandler.java index 26b871b75..474c211f4 100644 --- a/src/main/java/io/vertx/core/net/impl/VertxHandler.java +++ b/src/main/java/io/vertx/core/net/impl/VertxHandler.java @@ -140,7 +140,7 @@ public final class VertxHandler extends ChannelDuplexH // Don't remove the connection at this point, or the handleClosed won't be called when channelInactive is called! C connection = getConnection(); if (connection != null) { - context.executeFromIO(v -> { + context.emitFromIO(v -> { try { if (ch.isOpen()) { ch.close(); diff --git a/src/test/benchmarks/io/vertx/benchmarks/ContextBenchmark.java b/src/test/benchmarks/io/vertx/benchmarks/ContextBenchmark.java index 9ee05aa5e..2ad582630 100644 --- a/src/test/benchmarks/io/vertx/benchmarks/ContextBenchmark.java +++ b/src/test/benchmarks/io/vertx/benchmarks/ContextBenchmark.java @@ -59,12 +59,12 @@ public class ContextBenchmark extends BenchmarkBase { @Benchmark public void executeFromIO(BaselineState state) { - state.context.executeFromIO(state.task); + state.context.emitFromIO(state.task); } @Benchmark @Fork(jvmArgsAppend = { "-Dvertx.threadChecks=false", "-Dvertx.disableContextTimings=true", "-Dvertx.disableTCCL=true" }) public void executeFromIONoChecks(BaselineState state) { - state.context.executeFromIO(state.task); + state.context.emitFromIO(state.task); } } diff --git a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java index 7bd000208..9ff3b12f6 100644 --- a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java +++ b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java @@ -29,16 +29,16 @@ public class BenchmarkContext extends ContextImpl { } @Override - void executeAsync(T value, Handler task) { - executeFromIO(value, task); + void emitAsync(T event, Handler handler) { + emitFromIO(event, handler); } @Override - public void executeFromIO(T value, Handler task) { + public void emitFromIO(T event, Handler handler) { if (THREAD_CHECKS) { checkEventLoopThread(); } - dispatch(value, task); + dispatch(event, handler); } @Override @@ -47,7 +47,7 @@ public class BenchmarkContext extends ContextImpl { } @Override - public void execute(T value, Handler task) { + public void emit(T event, Handler handler) { throw new UnsupportedOperationException(); } diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java index 0e3bef7da..acc8082ea 100644 --- a/src/test/java/io/vertx/core/ContextTest.java +++ b/src/test/java/io/vertx/core/ContextTest.java @@ -187,7 +187,7 @@ public class ContextTest extends VertxTestBase { // Check from other thread try { - eventLoopContext.executeFromIO(v -> fail()); + eventLoopContext.emitFromIO(v -> fail()); fail(); } catch (IllegalStateException expected) { } @@ -198,7 +198,7 @@ public class ContextTest extends VertxTestBase { assertNull(Vertx.currentContext()); Thread vertxThread = Thread.currentThread(); AtomicBoolean nested = new AtomicBoolean(true); - eventLoopContext.executeFromIO(v -> { + eventLoopContext.emitFromIO(v -> { assertTrue(nested.get()); assertSame(eventLoopContext, Vertx.currentContext()); assertSame(vertxThread, Thread.currentThread()); @@ -215,7 +215,7 @@ public class ContextTest extends VertxTestBase { workerContext.nettyEventLoop().execute(() -> { assertNull(Vertx.currentContext()); workerContext.nettyEventLoop().execute(() -> { - workerContext.executeFromIO(v -> { + workerContext.emitFromIO(v -> { assertSame(workerContext, Vertx.currentContext()); assertTrue(Context.isOnWorkerThread()); testComplete(); @@ -439,7 +439,7 @@ public class ContextTest extends VertxTestBase { ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext(); AtomicBoolean called = new AtomicBoolean(); try { - ctx.executeFromIO(v -> { + ctx.emitFromIO(v -> { called.set(true); }); fail(); @@ -455,7 +455,7 @@ public class ContextTest extends VertxTestBase { ContextInternal ctx = createWorkerContext(); AtomicBoolean called = new AtomicBoolean(); try { - ctx.executeFromIO(v -> { + ctx.emitFromIO(v -> { called.set(true); }); fail(); @@ -469,7 +469,7 @@ public class ContextTest extends VertxTestBase { public void testEventLoopContextExecuteFromAnyThread() { ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext(); Object expected = new Object(); - ctx.execute(expected, event -> { + ctx.emit(expected, event -> { assertSame(ctx, Vertx.currentContext()); assertTrue(Context.isOnEventLoopThread()); assertSame(expected, event); @@ -485,7 +485,7 @@ public class ContextTest extends VertxTestBase { ctx.runOnContext(v -> { ThreadLocal local = new ThreadLocal<>(); local.set(true); - ctx.execute(expected, event -> { + ctx.emit(expected, event -> { assertSame(ctx, Vertx.currentContext()); assertTrue(Context.isOnEventLoopThread()); assertEquals(true, local.get()); @@ -505,7 +505,7 @@ public class ContextTest extends VertxTestBase { Thread t = Thread.currentThread(); ThreadLocal local = new ThreadLocal<>(); local.set(true); - ctx.execute(expected, event -> { + ctx.emit(expected, event -> { assertSame(t, Thread.currentThread()); assertSame(ctx, Vertx.currentContext()); assertTrue(Context.isOnEventLoopThread()); @@ -525,7 +525,7 @@ public class ContextTest extends VertxTestBase { Object expected = new Object(); any.runOnContext(v -> { Thread thread = Thread.currentThread(); - ctx.execute(expected, event -> { + ctx.emit(expected, event -> { assertNotSame(thread, Thread.currentThread()); assertSame(ctx, Vertx.currentContext()); assertTrue(Context.isOnEventLoopThread()); @@ -543,7 +543,7 @@ public class ContextTest extends VertxTestBase { Object expected = new Object(); any.nettyEventLoop().execute(() -> { Thread thread = Thread.currentThread(); - ctx.execute(expected, event -> { + ctx.emit(expected, event -> { assertNotSame(thread, Thread.currentThread()); assertSame(ctx, Vertx.currentContext()); assertTrue(Context.isOnEventLoopThread()); @@ -560,7 +560,7 @@ public class ContextTest extends VertxTestBase { RuntimeException failure = new RuntimeException(); AtomicReference caught = new AtomicReference<>(); ctx.exceptionHandler(caught::set); - ctx.execute(new Object(), event -> { + ctx.emit(new Object(), event -> { throw failure; }); assertWaitUntil(() -> caught.get() == failure); @@ -570,7 +570,7 @@ public class ContextTest extends VertxTestBase { public void testWorkerContextExecuteFromAnyThread() { ContextInternal ctx = createWorkerContext(); Object expected = new Object(); - ctx.execute(expected, event -> { + ctx.emit(expected, event -> { assertSame(ctx, Vertx.currentContext()); assertTrue(Context.isOnWorkerThread()); assertSame(expected, event); @@ -586,7 +586,7 @@ public class ContextTest extends VertxTestBase { ctx.runOnContext(v -> { ThreadLocal local = new ThreadLocal<>(); local.set(true); - ctx.execute(expected, event -> { + ctx.emit(expected, event -> { assertSame(ctx, Vertx.currentContext()); assertTrue(Context.isOnWorkerThread()); assertEquals(true, local.get()); @@ -606,7 +606,7 @@ public class ContextTest extends VertxTestBase { Thread t = Thread.currentThread(); ThreadLocal local = new ThreadLocal<>(); local.set(true); - ctx.execute(expected, event -> { + ctx.emit(expected, event -> { assertNotSame(t, Thread.currentThread()); assertSame(ctx, Vertx.currentContext()); assertTrue(Context.isOnWorkerThread()); @@ -626,7 +626,7 @@ public class ContextTest extends VertxTestBase { Object expected = new Object(); any.runOnContext(v -> { Thread thread = Thread.currentThread(); - ctx.execute(expected, event -> { + ctx.emit(expected, event -> { assertNotSame(thread, Thread.currentThread()); assertSame(ctx, Vertx.currentContext()); assertTrue(Context.isOnWorkerThread()); @@ -650,7 +650,7 @@ public class ContextTest extends VertxTestBase { Object expected = new Object(); any.nettyEventLoop().execute(() -> { Thread thread = Thread.currentThread(); - ctx.execute(expected, event -> { + ctx.emit(expected, event -> { assertNotSame(thread, Thread.currentThread()); assertSame(ctx, Vertx.currentContext()); assertTrue(Context.isOnWorkerThread()); @@ -667,7 +667,7 @@ public class ContextTest extends VertxTestBase { RuntimeException failure = new RuntimeException(); AtomicReference caught = new AtomicReference<>(); ctx.exceptionHandler(caught::set); - ctx.execute(new Object(), event -> { + ctx.emit(new Object(), event -> { throw failure; }); assertWaitUntil(() -> caught.get() == failure); diff --git a/src/test/java/io/vertx/core/EventLoopGroupTest.java b/src/test/java/io/vertx/core/EventLoopGroupTest.java index 3eb62f42f..70e1a2270 100644 --- a/src/test/java/io/vertx/core/EventLoopGroupTest.java +++ b/src/test/java/io/vertx/core/EventLoopGroupTest.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; @@ -66,14 +65,14 @@ public class EventLoopGroupTest extends VertxTestBase { @Override protected void initChannel(SocketChannel ch) throws Exception { assertSame(contextThread.get(), Thread.currentThread()); - context.executeFromIO(v -> { + context.emitFromIO(v -> { assertSame(contextThread.get(), Thread.currentThread()); assertSame(context, Vertx.currentContext()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { assertSame(contextThread.get(), Thread.currentThread()); - context.executeFromIO(v -> { + context.emitFromIO(v -> { assertSame(contextThread.get(), Thread.currentThread()); assertSame(context, Vertx.currentContext()); }); @@ -83,7 +82,7 @@ public class EventLoopGroupTest extends VertxTestBase { ByteBuf buf = (ByteBuf) msg; assertEquals("hello", buf.toString(StandardCharsets.UTF_8)); assertSame(contextThread.get(), Thread.currentThread()); - context.executeFromIO(v -> { + context.emitFromIO(v -> { assertSame(contextThread.get(), Thread.currentThread()); assertSame(context, Vertx.currentContext()); }); @@ -91,7 +90,7 @@ public class EventLoopGroupTest extends VertxTestBase { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { assertSame(contextThread.get(), Thread.currentThread()); - context.executeFromIO(v -> { + context.emitFromIO(v -> { assertSame(contextThread.get(), Thread.currentThread()); assertSame(context, Vertx.currentContext()); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); @@ -100,7 +99,7 @@ public class EventLoopGroupTest extends VertxTestBase { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { assertSame(contextThread.get(), Thread.currentThread()); - context.executeFromIO(v -> { + context.emitFromIO(v -> { assertSame(contextThread.get(), Thread.currentThread()); assertSame(context, Vertx.currentContext()); testComplete();