Move event bus metrics unregistration to HandlerRegistration instead of HandlerHolder - fixes #2900

This commit is contained in:
Julien Viet
2019-04-08 15:28:32 +02:00
parent 6d33b93623
commit a392dc8ea6
5 changed files with 27 additions and 8 deletions

View File

@@ -276,7 +276,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
}
registration.setHandlerContext(context);
HandlerHolder<T> holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context);
HandlerHolder<T> holder = new HandlerHolder<>(registration, replyHandler, localOnly, context);
ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(

View File

@@ -19,16 +19,14 @@ import io.vertx.core.spi.metrics.EventBusMetrics;
*/
public class HandlerHolder<T> {
private final EventBusMetrics metrics;
private final Context context;
private final HandlerRegistration<T> handler;
private final boolean replyHandler;
private final boolean localOnly;
private boolean removed;
public HandlerHolder(EventBusMetrics metrics, HandlerRegistration<T> handler, boolean replyHandler, boolean localOnly,
public HandlerHolder(HandlerRegistration<T> handler, boolean replyHandler, boolean localOnly,
Context context) {
this.metrics = metrics;
this.context = context;
this.handler = handler;
this.replyHandler = replyHandler;
@@ -36,7 +34,7 @@ public class HandlerHolder<T> {
}
// We use a synchronized block to protect removed as it can be unregistered from a different thread
public boolean setRemoved() {
boolean setRemoved() {
boolean unregistered = false;
synchronized (this) {
if (!removed) {
@@ -44,9 +42,6 @@ public class HandlerHolder<T> {
unregistered = true;
}
}
if (metrics != null && unregistered) {
metrics.handlerUnregistered(handler.getMetric());
}
return unregistered;
}

View File

@@ -163,6 +163,11 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
if (result == null) {
result = Future.failedFuture("Consumer unregistered before registration completed");
callHandlerAsync(result, completionHandler);
} else {
EventBusMetrics metrics = eventBus.metrics;
if (metrics != null) {
metrics.handlerUnregistered(metric);
}
}
}
if (discardHandler != null && discarded != null) {

View File

@@ -254,6 +254,22 @@ public class MetricsTest extends VertxTestBase {
await();
}
@Test
public void testClusterUnregistration() {
startNodes(1);
FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(vertices[0].eventBus());
MessageConsumer<Object> consumer = vertices[0].eventBus().consumer(ADDRESS1, ar -> {
fail("Should not receive message");
});
consumer.completionHandler(ar -> {
assertTrue(ar.failed());
assertEquals(Collections.emptyList(), metrics.getRegistrations());
testComplete();
});
consumer.unregister();
await();
}
@Test
public void testHandlerProcessMessage() {
testHandlerProcessMessage(vertx, vertx, 1);

View File

@@ -80,6 +80,9 @@ public class FakeEventBusMetrics extends FakeMetricsBase implements EventBusMetr
}
public void handlerUnregistered(HandlerMetric handler) {
if (handler == null) {
throw new NullPointerException("Must not be null");
}
registrations.remove(handler);
}