Rework context worker task execution

This commit is contained in:
Julien Viet
2018-12-07 10:10:21 +01:00
parent eba17c9681
commit 1400a982ad
5 changed files with 21 additions and 26 deletions

View File

@@ -286,7 +286,7 @@ abstract class ContextImpl implements ContextInternal {
return contextData;
}
<T> boolean executeTask(T arg, Handler<T> hTask) {
<T> void executeTask(T arg, Handler<T> hTask) {
Thread th = Thread.currentThread();
if (!(th instanceof VertxThread)) {
throw new IllegalStateException("Uh oh! context executing with wrong thread! " + th);
@@ -298,10 +298,8 @@ abstract class ContextImpl implements ContextInternal {
try {
current.setContext(this);
hTask.handle(arg);
return true;
} catch (Throwable t) {
reportException(t);
return false;
} finally {
// We don't unset the context after execution - this is done later when the context is closed via
// VertxThreadFactory

View File

@@ -25,19 +25,6 @@ class WorkerContext extends ContextImpl {
super(vertx, internalBlockingPool, workerPool, deploymentID, config, tccl);
}
final <T> Runnable wrapTask(T arg, Handler<T> hTask, PoolMetrics metrics) {
Object metric = metrics != null ? metrics.submitted() : null;
return () -> {
if (metrics != null) {
metrics.begin(metric);
}
boolean succeeded = executeTask(arg, hTask);
if (metrics != null) {
metrics.end(metric, succeeded);
}
};
}
@Override
void executeAsync(Handler<Void> task) {
execute(null, task);
@@ -52,6 +39,19 @@ class WorkerContext extends ContextImpl {
// so we need to execute it on the worker thread
@Override
<T> void execute(T value, Handler<T> task) {
orderedTasks.execute(wrapTask(value, task, workerPool.metrics()), workerPool.executor());
PoolMetrics metrics = workerPool.metrics();
Object metric = metrics != null ? metrics.submitted() : null;
orderedTasks.execute(() -> exec(metrics, metric, value, task), workerPool.executor());
}
private <T> void exec(PoolMetrics metrics, Object metric, T value, Handler<T> task) {
if (metrics != null) {
metrics.begin(metric);
}
executeTask(value, task);
if (metrics != null) {
metrics.end(metric, true);
}
}
}

View File

@@ -274,7 +274,7 @@ public abstract class ConnectionBase {
Handler<Void> handler;
synchronized (this) {
NetworkMetrics metrics = metrics();
if (metrics != null && metrics instanceof TCPMetrics) {
if (metrics instanceof TCPMetrics) {
((TCPMetrics) metrics).disconnected(metric(), remoteAddress());
}
handler = closeHandler;

View File

@@ -71,7 +71,6 @@ public class MetricsContextTest extends VertxTestBase {
return DummyVertxMetrics.INSTANCE;
};
vertx(new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true).setFactory(factory)));
assertSame(Thread.currentThread(), metricsThread.get());
assertNull(metricsContext.get());
}
@@ -403,7 +402,6 @@ public class MetricsContextTest extends VertxTestBase {
// FIXME!! This test intermittently fails
@Test
@Ignore
public void testHttpClientWebsocketWorker() throws Exception {
testHttpClientWebsocket(workerContextFactory, workerChecker);
}
@@ -893,7 +891,6 @@ public class MetricsContextTest extends VertxTestBase {
private Function<Vertx, Context> eventLoopContextFactory = Vertx::getOrCreateContext;
private BiConsumer<Thread, Context> eventLoopChecker = (thread, ctx) -> {
assertSame(Vertx.currentContext(), ctx);
assertSame(Thread.currentThread(), thread);
};
@@ -911,7 +908,6 @@ public class MetricsContextTest extends VertxTestBase {
};
private BiConsumer<Thread, Context> workerChecker = (thread, ctx) -> {
assertSame(Vertx.currentContext(), ctx);
assertTrue(Context.isOnWorkerThread());
};
}

View File

@@ -939,7 +939,7 @@ public class MetricsTest extends VertxTestBase {
}
@Test
public void testThreadPoolMetricsWithWorkerVerticle() {
public void testThreadPoolMetricsWithWorkerVerticle() throws Exception {
AtomicInteger counter = new AtomicInteger();
Map<String, PoolMetrics> all = FakePoolMetrics.getPoolMetrics();
FakePoolMetrics metrics = (FakePoolMetrics) all.get("vert.x-worker-thread");
@@ -955,6 +955,7 @@ public class MetricsTest extends VertxTestBase {
AtomicInteger msg = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
Verticle worker = new AbstractVerticle() {
@Override
public void start(Future<Void> done) throws Exception {
@@ -974,7 +975,7 @@ public class MetricsTest extends VertxTestBase {
}
if (counter.incrementAndGet() == count) {
testComplete();
latch.countDown();
}
} catch (InterruptedException e) {
@@ -993,10 +994,10 @@ public class MetricsTest extends VertxTestBase {
}
});
awaitLatch(latch);
assertWaitUntil(() -> count + 1 == metrics.numberOfCompletedTasks());
// The verticle deployment is also executed on the worker thread pool
assertEquals(count + 1, metrics.numberOfSubmittedTask());
assertEquals(count + 1, metrics.numberOfCompletedTasks());