From ea2c1b71a5d2b146abc5674396cb56633be0207a Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 3 Dec 2019 23:12:13 +0100 Subject: [PATCH] Duplicate contexts should maintain their own ordered task queue instead of relying on the original context queue, so that worker tasks are serialised for the duplicate context and not for all duplicate contexts of the same original context. This allow for better concurrency processing in HTTP worker servers. This fixes #3217 --- .../java/io/vertx/core/impl/ContextImpl.java | 15 ----- .../io/vertx/core/impl/EventLoopContext.java | 31 ++++++++++ .../io/vertx/core/impl/WorkerContext.java | 38 +++++++++--- src/test/java/io/vertx/core/ContextTest.java | 59 +++++++++++++++++++ 4 files changed, 119 insertions(+), 24 deletions(-) diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index 329c0a9b3..dca7c7fb2 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -270,21 +270,6 @@ abstract class ContextImpl extends AbstractContext { return delegate.tracer(); } - @Override - public final Future executeBlockingInternal(Handler> action) { - return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks); - } - - @Override - public Future<@Nullable T> executeBlocking(Handler> blockingCodeHandler, boolean ordered) { - return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? delegate.orderedTasks : null); - } - - @Override - public Future executeBlocking(Handler> blockingCodeHandler, TaskQueue queue) { - return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); - } - @Override public final void schedule(T argument, Handler task) { delegate.schedule(argument, task); diff --git a/src/main/java/io/vertx/core/impl/EventLoopContext.java b/src/main/java/io/vertx/core/impl/EventLoopContext.java index 2dc0d993c..0a8fd8787 100644 --- a/src/main/java/io/vertx/core/impl/EventLoopContext.java +++ b/src/main/java/io/vertx/core/impl/EventLoopContext.java @@ -12,7 +12,10 @@ package io.vertx.core.impl; import io.netty.channel.EventLoop; +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.spi.tracing.VertxTracer; @@ -87,6 +90,8 @@ public class EventLoopContext extends ContextImpl { static class Duplicated extends ContextImpl.Duplicated { + private TaskQueue orderedTasks; + Duplicated(EventLoopContext delegate, ContextInternal other) { super(delegate, other); } @@ -101,6 +106,32 @@ public class EventLoopContext extends ContextImpl { nettyEventLoop().execute(() -> emit(task)); } + @Override + public final Future executeBlockingInternal(Handler> action) { + return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks); + } + + @Override + public Future<@Nullable T> executeBlocking(Handler> blockingCodeHandler, boolean ordered) { + TaskQueue queue; + if (ordered) { + queue = null; + } else { + synchronized (this) { + if (orderedTasks == null) { + orderedTasks = new TaskQueue(); + } + queue = orderedTasks; + } + } + return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); + } + + @Override + public Future executeBlocking(Handler> blockingCodeHandler, TaskQueue queue) { + return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); + } + @Override public void dispatchFromIO(T argument, Handler task) { if (THREAD_CHECKS) { diff --git a/src/main/java/io/vertx/core/impl/WorkerContext.java b/src/main/java/io/vertx/core/impl/WorkerContext.java index 4b3dcbf74..9a2e71f0e 100644 --- a/src/main/java/io/vertx/core/impl/WorkerContext.java +++ b/src/main/java/io/vertx/core/impl/WorkerContext.java @@ -11,7 +11,10 @@ package io.vertx.core.impl; +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.spi.metrics.PoolMetrics; import io.vertx.core.spi.tracing.VertxTracer; @@ -29,12 +32,12 @@ class WorkerContext extends ContextImpl { @Override void execute(T argument, Handler task) { - execute(this, argument, task); + execute(this, orderedTasks, argument, task); } @Override public void execute(Runnable task) { - execute(this, task); + execute(this, orderedTasks, task); } @Override @@ -49,7 +52,7 @@ class WorkerContext extends ContextImpl { if (THREAD_CHECKS) { checkEventLoopThread(); } - execute(this, argument, task); + execute(argument, task); } @Override @@ -67,10 +70,10 @@ class WorkerContext extends ContextImpl { } } - private void execute(ContextInternal ctx, Runnable task) { + private void execute(ContextInternal ctx, TaskQueue queue, Runnable task) { PoolMetrics metrics = workerPool.metrics(); Object queueMetric = metrics != null ? metrics.submitted() : null; - orderedTasks.execute(() -> { + queue.execute(() -> { Object execMetric = null; if (metrics != null) { execMetric = metrics.begin(queueMetric); @@ -85,11 +88,11 @@ class WorkerContext extends ContextImpl { }, workerPool.executor()); } - private void execute(ContextInternal ctx, T value, Handler task) { + private void execute(ContextInternal ctx, TaskQueue queue, T value, Handler task) { Objects.requireNonNull(task, "Task handler must not be null"); PoolMetrics metrics = workerPool.metrics(); Object queueMetric = metrics != null ? metrics.submitted() : null; - orderedTasks.execute(() -> { + queue.execute(() -> { Object execMetric = null; if (metrics != null) { execMetric = metrics.begin(queueMetric); @@ -128,18 +131,35 @@ class WorkerContext extends ContextImpl { static class Duplicated extends ContextImpl.Duplicated { + final TaskQueue orderedTasks = new TaskQueue(); + Duplicated(WorkerContext delegate, ContextInternal other) { super(delegate, other); } @Override void execute(T argument, Handler task) { - delegate.execute(this, argument, task); + delegate.execute(this, orderedTasks, argument, task); } @Override public void execute(Runnable task) { - delegate.execute(this, task); + delegate.execute(this, orderedTasks, task); + } + + @Override + public final Future executeBlockingInternal(Handler> action) { + return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks); + } + + @Override + public Future<@Nullable T> executeBlocking(Handler> blockingCodeHandler, boolean ordered) { + return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? orderedTasks : null); + } + + @Override + public Future executeBlocking(Handler> blockingCodeHandler, TaskQueue queue) { + return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); } @Override diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java index 0240a979f..7ba3f0900 100644 --- a/src/test/java/io/vertx/core/ContextTest.java +++ b/src/test/java/io/vertx/core/ContextTest.java @@ -779,6 +779,65 @@ public class ContextTest extends VertxTestBase { assertNull(ctx.getLocal("key")); } + @Test + public void testDuplicateWorkerConcurrency() throws Exception { + ContextInternal ctx = createWorkerContext(); + ContextInternal dup1 = ctx.duplicate(); + ContextInternal dup2 = ctx.duplicate(); + CyclicBarrier barrier = new CyclicBarrier(3); + dup1.runOnContext(v -> { + assertTrue(Context.isOnWorkerThread()); + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + fail(e); + } + }); + dup2.runOnContext(v -> { + assertTrue(Context.isOnWorkerThread()); + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + fail(e); + } + }); + barrier.await(10, TimeUnit.SECONDS); + } + + @Test + public void testDuplicateEventLoopExecuteBlocking() throws Exception { + testDuplicateExecuteBlocking((ContextInternal) vertx.getOrCreateContext()); + } + + @Test + public void testDuplicateWorkerExecuteBlocking() throws Exception { + testDuplicateExecuteBlocking(createWorkerContext()); + } + + private void testDuplicateExecuteBlocking(ContextInternal ctx) throws Exception { + ContextInternal dup1 = ctx.duplicate(); + ContextInternal dup2 = ctx.duplicate(); + CyclicBarrier barrier = new CyclicBarrier(3); + dup1.executeBlocking(p -> { + assertTrue(Context.isOnWorkerThread()); + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + fail(e); + } + p.complete(); + }); + dup2.executeBlocking(p -> { + assertTrue(Context.isOnWorkerThread()); + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + fail(e); + } + p.complete(); + }); + barrier.await(10, TimeUnit.SECONDS); + } @Test public void testReentrantDispatch() { ClassLoader cl = new URLClassLoader(new URL[0]);