Duplicate contexts should maintain their own ordered task queue instead of relying on the original context queue, so that worker tasks are serialised for the duplicate context and not for all duplicate contexts of the same original context. This allow for better concurrency processing in HTTP worker servers. This fixes #3217

This commit is contained in:
Julien Viet
2019-12-03 23:12:13 +01:00
parent abb2de2160
commit ea2c1b71a5
4 changed files with 119 additions and 24 deletions

View File

@@ -270,21 +270,6 @@ abstract class ContextImpl extends AbstractContext {
return delegate.tracer();
}
@Override
public final <T> Future<T> executeBlockingInternal(Handler<Promise<T>> action) {
return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks);
}
@Override
public <T> Future<@Nullable T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered) {
return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? delegate.orderedTasks : null);
}
@Override
public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, TaskQueue queue) {
return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue);
}
@Override
public final <T> void schedule(T argument, Handler<T> task) {
delegate.schedule(argument, task);

View File

@@ -12,7 +12,10 @@
package io.vertx.core.impl;
import io.netty.channel.EventLoop;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.spi.tracing.VertxTracer;
@@ -87,6 +90,8 @@ public class EventLoopContext extends ContextImpl {
static class Duplicated extends ContextImpl.Duplicated<EventLoopContext> {
private TaskQueue orderedTasks;
Duplicated(EventLoopContext delegate, ContextInternal other) {
super(delegate, other);
}
@@ -101,6 +106,32 @@ public class EventLoopContext extends ContextImpl {
nettyEventLoop().execute(() -> emit(task));
}
@Override
public final <T> Future<T> executeBlockingInternal(Handler<Promise<T>> action) {
return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks);
}
@Override
public <T> Future<@Nullable T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered) {
TaskQueue queue;
if (ordered) {
queue = null;
} else {
synchronized (this) {
if (orderedTasks == null) {
orderedTasks = new TaskQueue();
}
queue = orderedTasks;
}
}
return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue);
}
@Override
public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, TaskQueue queue) {
return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue);
}
@Override
public <T> void dispatchFromIO(T argument, Handler<T> task) {
if (THREAD_CHECKS) {

View File

@@ -11,7 +11,10 @@
package io.vertx.core.impl;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.tracing.VertxTracer;
@@ -29,12 +32,12 @@ class WorkerContext extends ContextImpl {
@Override
<T> void execute(T argument, Handler<T> task) {
execute(this, argument, task);
execute(this, orderedTasks, argument, task);
}
@Override
public void execute(Runnable task) {
execute(this, task);
execute(this, orderedTasks, task);
}
@Override
@@ -49,7 +52,7 @@ class WorkerContext extends ContextImpl {
if (THREAD_CHECKS) {
checkEventLoopThread();
}
execute(this, argument, task);
execute(argument, task);
}
@Override
@@ -67,10 +70,10 @@ class WorkerContext extends ContextImpl {
}
}
private <T> void execute(ContextInternal ctx, Runnable task) {
private <T> void execute(ContextInternal ctx, TaskQueue queue, Runnable task) {
PoolMetrics metrics = workerPool.metrics();
Object queueMetric = metrics != null ? metrics.submitted() : null;
orderedTasks.execute(() -> {
queue.execute(() -> {
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
@@ -85,11 +88,11 @@ class WorkerContext extends ContextImpl {
}, workerPool.executor());
}
private <T> void execute(ContextInternal ctx, T value, Handler<T> task) {
private <T> void execute(ContextInternal ctx, TaskQueue queue, T value, Handler<T> task) {
Objects.requireNonNull(task, "Task handler must not be null");
PoolMetrics metrics = workerPool.metrics();
Object queueMetric = metrics != null ? metrics.submitted() : null;
orderedTasks.execute(() -> {
queue.execute(() -> {
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
@@ -128,18 +131,35 @@ class WorkerContext extends ContextImpl {
static class Duplicated extends ContextImpl.Duplicated<WorkerContext> {
final TaskQueue orderedTasks = new TaskQueue();
Duplicated(WorkerContext delegate, ContextInternal other) {
super(delegate, other);
}
@Override
<T> void execute(T argument, Handler<T> task) {
delegate.execute(this, argument, task);
delegate.execute(this, orderedTasks, argument, task);
}
@Override
public void execute(Runnable task) {
delegate.execute(this, task);
delegate.execute(this, orderedTasks, task);
}
@Override
public final <T> Future<T> executeBlockingInternal(Handler<Promise<T>> action) {
return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks);
}
@Override
public <T> Future<@Nullable T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered) {
return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? orderedTasks : null);
}
@Override
public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, TaskQueue queue) {
return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue);
}
@Override

View File

@@ -779,6 +779,65 @@ public class ContextTest extends VertxTestBase {
assertNull(ctx.getLocal("key"));
}
@Test
public void testDuplicateWorkerConcurrency() throws Exception {
ContextInternal ctx = createWorkerContext();
ContextInternal dup1 = ctx.duplicate();
ContextInternal dup2 = ctx.duplicate();
CyclicBarrier barrier = new CyclicBarrier(3);
dup1.runOnContext(v -> {
assertTrue(Context.isOnWorkerThread());
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
fail(e);
}
});
dup2.runOnContext(v -> {
assertTrue(Context.isOnWorkerThread());
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
fail(e);
}
});
barrier.await(10, TimeUnit.SECONDS);
}
@Test
public void testDuplicateEventLoopExecuteBlocking() throws Exception {
testDuplicateExecuteBlocking((ContextInternal) vertx.getOrCreateContext());
}
@Test
public void testDuplicateWorkerExecuteBlocking() throws Exception {
testDuplicateExecuteBlocking(createWorkerContext());
}
private void testDuplicateExecuteBlocking(ContextInternal ctx) throws Exception {
ContextInternal dup1 = ctx.duplicate();
ContextInternal dup2 = ctx.duplicate();
CyclicBarrier barrier = new CyclicBarrier(3);
dup1.executeBlocking(p -> {
assertTrue(Context.isOnWorkerThread());
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
fail(e);
}
p.complete();
});
dup2.executeBlocking(p -> {
assertTrue(Context.isOnWorkerThread());
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
fail(e);
}
p.complete();
});
barrier.await(10, TimeUnit.SECONDS);
}
@Test
public void testReentrantDispatch() {
ClassLoader cl = new URLClassLoader(new URL[0]);