diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index cb8eb683f..ba4e5bc8e 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -286,7 +286,7 @@ abstract class ContextImpl implements ContextInternal { return contextData; } - boolean executeTask(T arg, Handler hTask) { + void executeTask(T arg, Handler hTask) { Thread th = Thread.currentThread(); if (!(th instanceof VertxThread)) { throw new IllegalStateException("Uh oh! context executing with wrong thread! " + th); @@ -298,10 +298,8 @@ abstract class ContextImpl implements ContextInternal { try { current.setContext(this); hTask.handle(arg); - return true; } catch (Throwable t) { reportException(t); - return false; } finally { // We don't unset the context after execution - this is done later when the context is closed via // VertxThreadFactory diff --git a/src/main/java/io/vertx/core/impl/WorkerContext.java b/src/main/java/io/vertx/core/impl/WorkerContext.java index c51015828..03941d044 100644 --- a/src/main/java/io/vertx/core/impl/WorkerContext.java +++ b/src/main/java/io/vertx/core/impl/WorkerContext.java @@ -25,19 +25,6 @@ class WorkerContext extends ContextImpl { super(vertx, internalBlockingPool, workerPool, deploymentID, config, tccl); } - final Runnable wrapTask(T arg, Handler hTask, PoolMetrics metrics) { - Object metric = metrics != null ? metrics.submitted() : null; - return () -> { - if (metrics != null) { - metrics.begin(metric); - } - boolean succeeded = executeTask(arg, hTask); - if (metrics != null) { - metrics.end(metric, succeeded); - } - }; - } - @Override void executeAsync(Handler task) { execute(null, task); @@ -52,6 +39,19 @@ class WorkerContext extends ContextImpl { // so we need to execute it on the worker thread @Override void execute(T value, Handler task) { - orderedTasks.execute(wrapTask(value, task, workerPool.metrics()), workerPool.executor()); + PoolMetrics metrics = workerPool.metrics(); + Object metric = metrics != null ? metrics.submitted() : null; + orderedTasks.execute(() -> exec(metrics, metric, value, task), workerPool.executor()); } + + private void exec(PoolMetrics metrics, Object metric, T value, Handler task) { + if (metrics != null) { + metrics.begin(metric); + } + executeTask(value, task); + if (metrics != null) { + metrics.end(metric, true); + } + } + } diff --git a/src/main/java/io/vertx/core/net/impl/ConnectionBase.java b/src/main/java/io/vertx/core/net/impl/ConnectionBase.java index 3f1752127..87d0fd9e4 100644 --- a/src/main/java/io/vertx/core/net/impl/ConnectionBase.java +++ b/src/main/java/io/vertx/core/net/impl/ConnectionBase.java @@ -274,7 +274,7 @@ public abstract class ConnectionBase { Handler handler; synchronized (this) { NetworkMetrics metrics = metrics(); - if (metrics != null && metrics instanceof TCPMetrics) { + if (metrics instanceof TCPMetrics) { ((TCPMetrics) metrics).disconnected(metric(), remoteAddress()); } handler = closeHandler; diff --git a/src/test/java/io/vertx/core/spi/metrics/MetricsContextTest.java b/src/test/java/io/vertx/core/spi/metrics/MetricsContextTest.java index ee9cc3836..29aa34066 100644 --- a/src/test/java/io/vertx/core/spi/metrics/MetricsContextTest.java +++ b/src/test/java/io/vertx/core/spi/metrics/MetricsContextTest.java @@ -71,7 +71,6 @@ public class MetricsContextTest extends VertxTestBase { return DummyVertxMetrics.INSTANCE; }; vertx(new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true).setFactory(factory))); - assertSame(Thread.currentThread(), metricsThread.get()); assertNull(metricsContext.get()); } @@ -403,7 +402,6 @@ public class MetricsContextTest extends VertxTestBase { // FIXME!! This test intermittently fails @Test - @Ignore public void testHttpClientWebsocketWorker() throws Exception { testHttpClientWebsocket(workerContextFactory, workerChecker); } @@ -893,7 +891,6 @@ public class MetricsContextTest extends VertxTestBase { private Function eventLoopContextFactory = Vertx::getOrCreateContext; private BiConsumer eventLoopChecker = (thread, ctx) -> { - assertSame(Vertx.currentContext(), ctx); assertSame(Thread.currentThread(), thread); }; @@ -911,7 +908,6 @@ public class MetricsContextTest extends VertxTestBase { }; private BiConsumer workerChecker = (thread, ctx) -> { - assertSame(Vertx.currentContext(), ctx); assertTrue(Context.isOnWorkerThread()); }; } diff --git a/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java b/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java index 1ab8791df..3a24e739c 100644 --- a/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java +++ b/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java @@ -939,7 +939,7 @@ public class MetricsTest extends VertxTestBase { } @Test - public void testThreadPoolMetricsWithWorkerVerticle() { + public void testThreadPoolMetricsWithWorkerVerticle() throws Exception { AtomicInteger counter = new AtomicInteger(); Map all = FakePoolMetrics.getPoolMetrics(); FakePoolMetrics metrics = (FakePoolMetrics) all.get("vert.x-worker-thread"); @@ -955,6 +955,7 @@ public class MetricsTest extends VertxTestBase { AtomicInteger msg = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(1); Verticle worker = new AbstractVerticle() { @Override public void start(Future done) throws Exception { @@ -974,7 +975,7 @@ public class MetricsTest extends VertxTestBase { } if (counter.incrementAndGet() == count) { - testComplete(); + latch.countDown(); } } catch (InterruptedException e) { @@ -993,10 +994,10 @@ public class MetricsTest extends VertxTestBase { } }); + awaitLatch(latch); + assertWaitUntil(() -> count + 1 == metrics.numberOfCompletedTasks()); - - // The verticle deployment is also executed on the worker thread pool assertEquals(count + 1, metrics.numberOfSubmittedTask()); assertEquals(count + 1, metrics.numberOfCompletedTasks());