diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index 329c0a9b3..dca7c7fb2 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -270,21 +270,6 @@ abstract class ContextImpl extends AbstractContext { return delegate.tracer(); } - @Override - public final Future executeBlockingInternal(Handler> action) { - return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks); - } - - @Override - public Future<@Nullable T> executeBlocking(Handler> blockingCodeHandler, boolean ordered) { - return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? delegate.orderedTasks : null); - } - - @Override - public Future executeBlocking(Handler> blockingCodeHandler, TaskQueue queue) { - return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); - } - @Override public final void schedule(T argument, Handler task) { delegate.schedule(argument, task); diff --git a/src/main/java/io/vertx/core/impl/EventLoopContext.java b/src/main/java/io/vertx/core/impl/EventLoopContext.java index 2dc0d993c..0a8fd8787 100644 --- a/src/main/java/io/vertx/core/impl/EventLoopContext.java +++ b/src/main/java/io/vertx/core/impl/EventLoopContext.java @@ -12,7 +12,10 @@ package io.vertx.core.impl; import io.netty.channel.EventLoop; +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.spi.tracing.VertxTracer; @@ -87,6 +90,8 @@ public class EventLoopContext extends ContextImpl { static class Duplicated extends ContextImpl.Duplicated { + private TaskQueue orderedTasks; + Duplicated(EventLoopContext delegate, ContextInternal other) { super(delegate, other); } @@ -101,6 +106,32 @@ public class EventLoopContext extends ContextImpl { nettyEventLoop().execute(() -> emit(task)); } + @Override + public final Future executeBlockingInternal(Handler> action) { + return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks); + } + + @Override + public Future<@Nullable T> executeBlocking(Handler> blockingCodeHandler, boolean ordered) { + TaskQueue queue; + if (ordered) { + queue = null; + } else { + synchronized (this) { + if (orderedTasks == null) { + orderedTasks = new TaskQueue(); + } + queue = orderedTasks; + } + } + return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); + } + + @Override + public Future executeBlocking(Handler> blockingCodeHandler, TaskQueue queue) { + return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); + } + @Override public void dispatchFromIO(T argument, Handler task) { if (THREAD_CHECKS) { diff --git a/src/main/java/io/vertx/core/impl/WorkerContext.java b/src/main/java/io/vertx/core/impl/WorkerContext.java index 4b3dcbf74..9a2e71f0e 100644 --- a/src/main/java/io/vertx/core/impl/WorkerContext.java +++ b/src/main/java/io/vertx/core/impl/WorkerContext.java @@ -11,7 +11,10 @@ package io.vertx.core.impl; +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.spi.metrics.PoolMetrics; import io.vertx.core.spi.tracing.VertxTracer; @@ -29,12 +32,12 @@ class WorkerContext extends ContextImpl { @Override void execute(T argument, Handler task) { - execute(this, argument, task); + execute(this, orderedTasks, argument, task); } @Override public void execute(Runnable task) { - execute(this, task); + execute(this, orderedTasks, task); } @Override @@ -49,7 +52,7 @@ class WorkerContext extends ContextImpl { if (THREAD_CHECKS) { checkEventLoopThread(); } - execute(this, argument, task); + execute(argument, task); } @Override @@ -67,10 +70,10 @@ class WorkerContext extends ContextImpl { } } - private void execute(ContextInternal ctx, Runnable task) { + private void execute(ContextInternal ctx, TaskQueue queue, Runnable task) { PoolMetrics metrics = workerPool.metrics(); Object queueMetric = metrics != null ? metrics.submitted() : null; - orderedTasks.execute(() -> { + queue.execute(() -> { Object execMetric = null; if (metrics != null) { execMetric = metrics.begin(queueMetric); @@ -85,11 +88,11 @@ class WorkerContext extends ContextImpl { }, workerPool.executor()); } - private void execute(ContextInternal ctx, T value, Handler task) { + private void execute(ContextInternal ctx, TaskQueue queue, T value, Handler task) { Objects.requireNonNull(task, "Task handler must not be null"); PoolMetrics metrics = workerPool.metrics(); Object queueMetric = metrics != null ? metrics.submitted() : null; - orderedTasks.execute(() -> { + queue.execute(() -> { Object execMetric = null; if (metrics != null) { execMetric = metrics.begin(queueMetric); @@ -128,18 +131,35 @@ class WorkerContext extends ContextImpl { static class Duplicated extends ContextImpl.Duplicated { + final TaskQueue orderedTasks = new TaskQueue(); + Duplicated(WorkerContext delegate, ContextInternal other) { super(delegate, other); } @Override void execute(T argument, Handler task) { - delegate.execute(this, argument, task); + delegate.execute(this, orderedTasks, argument, task); } @Override public void execute(Runnable task) { - delegate.execute(this, task); + delegate.execute(this, orderedTasks, task); + } + + @Override + public final Future executeBlockingInternal(Handler> action) { + return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks); + } + + @Override + public Future<@Nullable T> executeBlocking(Handler> blockingCodeHandler, boolean ordered) { + return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? orderedTasks : null); + } + + @Override + public Future executeBlocking(Handler> blockingCodeHandler, TaskQueue queue) { + return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); } @Override diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java index 0240a979f..7ba3f0900 100644 --- a/src/test/java/io/vertx/core/ContextTest.java +++ b/src/test/java/io/vertx/core/ContextTest.java @@ -779,6 +779,65 @@ public class ContextTest extends VertxTestBase { assertNull(ctx.getLocal("key")); } + @Test + public void testDuplicateWorkerConcurrency() throws Exception { + ContextInternal ctx = createWorkerContext(); + ContextInternal dup1 = ctx.duplicate(); + ContextInternal dup2 = ctx.duplicate(); + CyclicBarrier barrier = new CyclicBarrier(3); + dup1.runOnContext(v -> { + assertTrue(Context.isOnWorkerThread()); + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + fail(e); + } + }); + dup2.runOnContext(v -> { + assertTrue(Context.isOnWorkerThread()); + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + fail(e); + } + }); + barrier.await(10, TimeUnit.SECONDS); + } + + @Test + public void testDuplicateEventLoopExecuteBlocking() throws Exception { + testDuplicateExecuteBlocking((ContextInternal) vertx.getOrCreateContext()); + } + + @Test + public void testDuplicateWorkerExecuteBlocking() throws Exception { + testDuplicateExecuteBlocking(createWorkerContext()); + } + + private void testDuplicateExecuteBlocking(ContextInternal ctx) throws Exception { + ContextInternal dup1 = ctx.duplicate(); + ContextInternal dup2 = ctx.duplicate(); + CyclicBarrier barrier = new CyclicBarrier(3); + dup1.executeBlocking(p -> { + assertTrue(Context.isOnWorkerThread()); + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + fail(e); + } + p.complete(); + }); + dup2.executeBlocking(p -> { + assertTrue(Context.isOnWorkerThread()); + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + fail(e); + } + p.complete(); + }); + barrier.await(10, TimeUnit.SECONDS); + } @Test public void testReentrantDispatch() { ClassLoader cl = new URLClassLoader(new URL[0]);