From 474b35eb4822566f464e1a96eabdd76c5f675527 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Sat, 19 Oct 2019 15:37:37 +0200 Subject: [PATCH] Introduce specific classes to split behavior/state between MessageConsumer implementation and ReplyHandler implementations delegating to HandlerRegistration --- .../core/eventbus/impl/EventBusImpl.java | 81 +--- .../core/eventbus/impl/HandlerHolder.java | 10 +- .../eventbus/impl/HandlerRegistration.java | 388 ++++-------------- .../eventbus/impl/MessageConsumerImpl.java | 284 +++++++++++++ .../vertx/core/eventbus/impl/MessageImpl.java | 4 +- .../impl/OutboundDeliveryContext.java | 4 +- .../core/eventbus/impl/ReplyHandler.java | 80 ++++ .../impl/clustered/ClusteredEventBus.java | 10 +- .../core/eventbus/LocalEventBusTest.java | 12 +- 9 files changed, 469 insertions(+), 404 deletions(-) create mode 100644 src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java create mode 100644 src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 994a4a1c4..27621f63c 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -27,8 +27,6 @@ import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.spi.metrics.EventBusMetrics; import io.vertx.core.spi.metrics.MetricsProvider; import io.vertx.core.spi.metrics.VertxMetrics; -import io.vertx.core.spi.tracing.TagExtractor; -import io.vertx.core.spi.tracing.VertxTracer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -115,7 +113,7 @@ public class EventBusImpl implements EventBus, MetricsProvider { MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName()); ReplyHandler handler = createReplyHandler(msg, true, options); sendOrPubInternal(msg, options, handler, null); - return handler.result.future(); + return handler.result(); } @Override @@ -159,7 +157,7 @@ public class EventBusImpl implements EventBus, MetricsProvider { public MessageConsumer consumer(String address) { checkStarted(); Objects.requireNonNull(address, "address"); - return new HandlerRegistration<>(vertx, vertx.getOrCreateContext(), metrics, this, address, null, false, false); + return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, false); } @Override @@ -174,7 +172,7 @@ public class EventBusImpl implements EventBus, MetricsProvider { public MessageConsumer localConsumer(String address) { checkStarted(); Objects.requireNonNull(address, "address"); - return new HandlerRegistration<>(vertx, vertx.getOrCreateContext(), metrics, this, address, null, true, false); + return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, true); } @Override @@ -239,17 +237,18 @@ public class EventBusImpl implements EventBus, MetricsProvider { return msg; } - protected HandlerHolder addRegistration(String address, HandlerRegistration registration, - boolean replyHandler, boolean localOnly) { - Objects.requireNonNull(registration.getHandler(), "handler"); + protected HandlerHolder addRegistration(String address, + HandlerRegistration registration, + boolean replyHandler, + boolean localOnly, + Handler> completionHandler) { +// Objects.requireNonNull(registration.getHandler(), "handler"); LocalRegistrationResult result = addLocalRegistration(address, registration, replyHandler, localOnly); - addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult); + addRegistration(result.newAddress, result.holder, completionHandler); return result.holder; } - protected void addRegistration(boolean newAddress, String address, - boolean replyHandler, boolean localOnly, - Handler> completionHandler) { + protected void addRegistration(boolean newAddress, HandlerHolder holder, Handler> completionHandler) { completionHandler.handle(Future.succeededFuture()); } @@ -395,59 +394,11 @@ public class EventBusImpl implements EventBus, MetricsProvider { long timeout = options.getSendTimeout(); String replyAddress = generateReplyAddress(); message.setReplyAddress(replyAddress); - HandlerRegistration registration = new HandlerRegistration<>(vertx, vertx.getOrCreateContext(), metrics, this, replyAddress, message.address, true, src); - ReplyHandler handler = new ReplyHandler<>(registration, timeout); - registration.handler(handler); + ReplyHandler handler = new ReplyHandler<>(this, vertx.getOrCreateContext(), replyAddress, message.address, src, timeout); + handler.register(); return handler; } - public class ReplyHandler implements Handler> { - - final ContextInternal context; - final Promise> result; - final HandlerRegistration registration; - final long timeoutID; - public Object trace; - - ReplyHandler(HandlerRegistration registration, long timeout) { - this.context = registration.context; - this.result = Promise.promise(); - this.registration = registration; - this.timeoutID = vertx.setTimer(timeout, id -> { - fail(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + registration.address + ", repliedAddress: " + registration.repliedAddress)); - }); - } - - private void trace(Object reply, Throwable failure) { - ContextInternal ctx = registration.context; - VertxTracer tracer = ctx.tracer(); - if (tracer != null && registration.src) { - tracer.receiveResponse(ctx, reply, trace, failure, TagExtractor.empty()); - } - } - - void fail(ReplyException failure) { - registration.unregister(); - if (metrics != null) { - metrics.replyFailure(registration.repliedAddress, failure.failureType()); - } - trace(null, failure); - result.tryFail(failure); - } - - @Override - public void handle(Message reply) { - vertx.cancelTimer(timeoutID); - if (reply.body() instanceof ReplyException) { - // This is kind of clunky - but hey-ho - fail((ReplyException) reply.body()); - } else { - trace(reply, null); - result.complete(reply); - } - } - } - public OutboundDeliveryContext newSendContext(MessageImpl message, DeliveryOptions options, ReplyHandler handler, Promise writePromise) { ContextInternal ctx = vertx.getContext(); @@ -476,7 +427,7 @@ public class EventBusImpl implements EventBus, MetricsProvider { // Unregister all handlers explicitly - don't rely on context hooks for (ConcurrentCyclicSequence handlers: handlerMap.values()) { for (HandlerHolder holder: handlers) { - holder.getHandler().unregister(); + holder.getHandler().unregister(ar -> {}); } } } @@ -490,11 +441,11 @@ public class EventBusImpl implements EventBus, MetricsProvider { // before it was received try { if (!holder.isRemoved()) { - holder.getHandler().handle(copied); + holder.getHandler().receive(copied); } } finally { if (holder.isReplyHandler()) { - holder.getHandler().unregister(); + holder.getHandler().unregister(ar -> {}); } } }); diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java index 0475b9478..7e01b16cc 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java @@ -18,11 +18,11 @@ import io.vertx.core.Context; */ public class HandlerHolder { - private final Context context; - final String address; - private final HandlerRegistration handler; - private final boolean replyHandler; - private final boolean localOnly; + public final Context context; + public final String address; + public final HandlerRegistration handler; + public final boolean replyHandler; + public final boolean localOnly; private boolean removed; public HandlerHolder(HandlerRegistration handler, diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index a2e3fe391..400ab959e 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -8,348 +8,96 @@ * * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 */ - package io.vertx.core.eventbus.impl; -import io.vertx.core.*; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.eventbus.DeliveryContext; import io.vertx.core.eventbus.Message; -import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.eventbus.impl.clustered.ClusteredMessage; -import io.vertx.core.impl.Arguments; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; -import io.vertx.core.spi.metrics.EventBusMetrics; import io.vertx.core.spi.tracing.TagExtractor; import io.vertx.core.spi.tracing.VertxTracer; -import io.vertx.core.streams.ReadStream; -import java.util.*; +import java.util.Iterator; -/* - * This class is optimised for performance when used on the same event loop it was created on. - * However it can be used safely from other threads. - * - * The internal state is protected using the synchronized keyword. If always used on the same event loop, then - * we benefit from biased locking which makes the overhead of synchronized near zero. - */ -public class HandlerRegistration implements MessageConsumer, Handler> { +abstract class HandlerRegistration { private static final Logger log = LoggerFactory.getLogger(HandlerRegistration.class); - public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000; - - private final Vertx vertx; - private final EventBusMetrics metrics; - private final EventBusImpl eventBus; - final ContextInternal context; - final String address; - final String repliedAddress; - private final boolean localOnly; - protected final boolean src; + public final ContextInternal context; + public final EventBusImpl bus; + public final String address; + public final boolean src; private HandlerHolder registered; - private Handler> handler; - private AsyncResult result; - private Handler> completionHandler; - private Handler endHandler; - private Handler> discardHandler; - private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES; - private final Queue> pending = new ArrayDeque<>(8); - private long demand = Long.MAX_VALUE; private Object metric; - public HandlerRegistration(Vertx vertx, ContextInternal context, EventBusMetrics metrics, EventBusImpl eventBus, String address, - String repliedAddress, boolean localOnly, boolean src) { - this.vertx = vertx; - this.metrics = metrics; - this.eventBus = eventBus; - this.address = address; - this.repliedAddress = repliedAddress; - this.localOnly = localOnly; - this.src = src; + HandlerRegistration(ContextInternal context, + EventBusImpl bus, + String address, + boolean src) { this.context = context; + this.bus = bus; + this.src = src; + this.address = address; } - @Override - public MessageConsumer setMaxBufferedMessages(int maxBufferedMessages) { - Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative"); - List> discarded; - Handler> discardHandler; - synchronized (this) { - this.maxBufferedMessages = maxBufferedMessages; - int overflow = pending.size() - maxBufferedMessages; - if (overflow <= 0) { - return this; - } - discardHandler = this.discardHandler; - if (discardHandler == null) { - while (pending.size() > maxBufferedMessages) { - pending.poll(); - } - return this; - } - discarded = new ArrayList<>(overflow); - while (pending.size() > maxBufferedMessages) { - discarded.add(pending.poll()); - } + final void receive(MessageImpl msg) { + if (bus.metrics != null) { + bus.metrics.scheduleMessage(metric, msg.isLocal()); } - for (Message msg : discarded) { - discardHandler.handle(msg); + doReceive(msg); + } + + protected abstract void doReceive(Message msg); + + protected abstract void doUnregister(); + + synchronized Future register(String repliedAddress, boolean localOnly) { + if (registered != null) { + throw new IllegalStateException(); } - return this; - } - - @Override - public synchronized int getMaxBufferedMessages() { - return maxBufferedMessages; - } - - @Override - public String address() { - return address; - } - - @Override - public synchronized void completionHandler(Handler> completionHandler) { - Objects.requireNonNull(completionHandler); - if (result != null) { - AsyncResult value = result; - vertx.runOnContext(v -> completionHandler.handle(value)); - } else { - this.completionHandler = completionHandler; + Promise p = Promise.promise(); + Future fut = p.future(); + registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, p); + if (bus.metrics != null) { + metric = bus.metrics.handlerRegistered(address, repliedAddress); } + return fut; } - @Override - public Future unregister() { - // Todo when we support multiple listeners per future - Promise promise = Promise.promise(); - doUnregister(promise); - return promise.future(); - } - - @Override - public void unregister(Handler> completionHandler) { - doUnregister(completionHandler); - } - - private void doUnregister(Handler> doneHandler) { - synchronized (this) { - if (handler == null) { - callHandlerAsync(Future.succeededFuture(), doneHandler); - return; - } - handler = null; - if (endHandler != null) { - Handler theEndHandler = endHandler; - Handler> handler = doneHandler; - doneHandler = ar -> { - theEndHandler.handle(null); - if (handler != null) { - handler.handle(ar); - } - }; - } - if (pending.size() > 0 && discardHandler != null) { - Deque> discarded = new ArrayDeque<>(pending); - Handler> handler = discardHandler; - context.runOnContext(v -> { - Message msg; - while ((msg = discarded.poll()) != null) { - handler.handle(msg); - } - }); - } - pending.clear(); - discardHandler = null; - eventBus.removeRegistration(registered, doneHandler); - registered = null; - if (result == null) { - result = Future.failedFuture("Consumer unregistered before registration completed"); - callHandlerAsync(result, completionHandler); - } else { - EventBusMetrics metrics = eventBus.metrics; - if (metrics != null) { - metrics.handlerUnregistered(metric); - } - } - } - } - - private void callHandlerAsync(AsyncResult result, Handler> completionHandler) { - if (completionHandler != null) { - vertx.runOnContext(v -> completionHandler.handle(result)); - } - } - - public synchronized void setResult(AsyncResult result) { - if (this.result != null) { - return; - } - this.result = result; - if (result.failed()) { - log.error("Failed to propagate registration for handler " + handler + " and address " + address); - } else { - if (metrics != null) { - metric = metrics.handlerRegistered(address, repliedAddress); - } - callHandlerAsync(result, completionHandler); - } - } - - @Override - public void handle(Message message) { - if (metrics != null) { - metrics.scheduleMessage(metric, ((MessageImpl)message).isLocal()); - } - Handler> theHandler; - ContextInternal ctx; - synchronized (this) { - if (registered == null) { - return; - } else if (demand == 0L) { - if (pending.size() < maxBufferedMessages) { - pending.add(message); - } else { - if (discardHandler != null) { - discardHandler.handle(message); - } else { - log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address); - } - } - return; - } else { - if (pending.size() > 0) { - pending.add(message); - message = pending.poll(); - } - if (demand != Long.MAX_VALUE) { - demand--; - } - theHandler = handler; - } - ctx = context; - } - deliver(theHandler, message, ctx); - } - - private void deliver(Handler> theHandler, Message message, ContextInternal context) { - // Handle the message outside the sync block - // https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714 - String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME); - if (creditsAddress != null) { - eventBus.send(creditsAddress, 1); - } - InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl) message, theHandler, context); - deliveryCtx.context.dispatch(v -> { - deliveryCtx.next(); - }); - checkNextTick(); - } - - private synchronized void checkNextTick() { - // Check if there are more pending messages in the queue that can be processed next time around - if (!pending.isEmpty() && demand > 0L) { - context.runOnContext(v -> { - Message message; - Handler> theHandler; - ContextInternal ctx; - synchronized (HandlerRegistration.this) { - if (demand == 0L || (message = pending.poll()) == null) { - return; - } - if (demand != Long.MAX_VALUE) { - demand--; - } - theHandler = handler; - ctx = context; - } - deliver(theHandler, message, ctx); - }); - } - } - - /* - * Internal API for testing purposes. - */ - public synchronized void discardHandler(Handler> handler) { - this.discardHandler = handler; - } - - @Override - public synchronized MessageConsumer handler(Handler> h) { - if (h != null) { - synchronized (this) { - handler = h; - if (registered == null) { - registered = eventBus.addRegistration(address, this, repliedAddress != null, localOnly); - } - } - return this; - } - this.unregister(); - return this; - } - - @Override - public ReadStream bodyStream() { - return new BodyReadStream<>(this); - } - - @Override public synchronized boolean isRegistered() { return registered != null; } - @Override - public synchronized MessageConsumer pause() { - demand = 0L; - return this; - } - - @Override - public MessageConsumer resume() { - return fetch(Long.MAX_VALUE); - } - - @Override - public synchronized MessageConsumer fetch(long amount) { - if (amount < 0) { - throw new IllegalArgumentException(); + public void unregister(Handler> completionHandler) { + doUnregister(); + synchronized (this) { + if (registered != null) { + bus.removeRegistration(registered, completionHandler); + registered = null; + if (bus.metrics != null) { + bus.metrics.handlerUnregistered(metric); + metric = null; + } + } else { + if (completionHandler != null) { + context.owner().runOnContext(v -> completionHandler.handle(Future.succeededFuture())); + } + } } - demand += amount; - if (demand < 0L) { - demand = Long.MAX_VALUE; - } - if (demand > 0L) { - checkNextTick(); - } - return this; } - @Override - public synchronized MessageConsumer endHandler(Handler endHandler) { - if (endHandler != null) { - // We should use the HandlerHolder context to properly do this (needs small refactoring) - Context endCtx = vertx.getOrCreateContext(); - this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null)); - } else { - this.endHandler = null; - } - return this; + void dispatch(Handler> theHandler, Message message, ContextInternal context) { + InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl) message, theHandler, context); + deliveryCtx.dispatch(); } - @Override - public synchronized MessageConsumer exceptionHandler(Handler handler) { - return this; - } - - public Handler> getHandler() { - return handler; - } - - protected class InboundDeliveryContext implements DeliveryContext { + private class InboundDeliveryContext implements DeliveryContext { private final MessageImpl message; private final Iterator> iter; @@ -359,10 +107,14 @@ public class HandlerRegistration implements MessageConsumer, Handler message, Handler> handler, ContextInternal context) { this.message = message; this.handler = handler; - this.iter = eventBus.receiveInterceptors(); + this.iter = message.bus.receiveInterceptors(); + this.context = context; + } - // Temporary workaround - this.context = handler instanceof EventBusImpl.ReplyHandler ? ((EventBusImpl.ReplyHandler)handler).context : context.duplicate(); + void dispatch() { + context.dispatch(v -> { + next(); + }); } @Override @@ -392,11 +144,12 @@ public class HandlerRegistration implements MessageConsumer, Handler implements MessageConsumer, Handler implements MessageConsumer, Handler extends HandlerRegistration implements MessageConsumer { + + private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class); + + private static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000; + + private final Vertx vertx; + private final ContextInternal context; + private final EventBusImpl eventBus; + private final String address; + private final boolean localOnly; + private Handler> handler; + private Handler> completionHandler; + private Handler endHandler; + private Handler> discardHandler; + private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES; + private Queue> pending = new ArrayDeque<>(8); + private long demand = Long.MAX_VALUE; + private Future result; + + MessageConsumerImpl(Vertx vertx, ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly) { + super(context, eventBus, address, false); + this.vertx = vertx; + this.context = context; + this.eventBus = eventBus; + this.address = address; + this.localOnly = localOnly; + } + + @Override + public MessageConsumer setMaxBufferedMessages(int maxBufferedMessages) { + Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative"); + List> discarded; + Handler> discardHandler; + synchronized (this) { + this.maxBufferedMessages = maxBufferedMessages; + int overflow = pending.size() - maxBufferedMessages; + if (overflow <= 0) { + return this; + } + discardHandler = this.discardHandler; + if (discardHandler == null) { + while (pending.size() > maxBufferedMessages) { + pending.poll(); + } + return this; + } + discarded = new ArrayList<>(overflow); + while (pending.size() > maxBufferedMessages) { + discarded.add(pending.poll()); + } + } + for (Message msg : discarded) { + discardHandler.handle(msg); + } + return this; + } + + @Override + public synchronized int getMaxBufferedMessages() { + return maxBufferedMessages; + } + + @Override + public String address() { + return address; + } + + @Override + public synchronized void completionHandler(Handler> handler) { + Objects.requireNonNull(handler); + completionHandler = handler; + checkCompletionHandler(); + } + + @Override + public Future unregister() { + // Todo when we support multiple listeners per future + Promise promise = Promise.promise(); + unregister(promise); + return promise.future(); + } + + protected synchronized void doUnregister() { + handler = null; + if (endHandler != null) { + endHandler.handle(null); + } + if (pending.size() > 0 && discardHandler != null) { + Queue> discarded = pending; + Handler> handler = discardHandler; + pending = new ArrayDeque<>(); + context.runOnContext(v -> { + Message msg; + while ((msg = discarded.poll()) != null) { + handler.handle(msg); + } + }); + } + result = Future.failedFuture("blah"); + checkCompletionHandler(); + discardHandler = null; + result = null; + } + + protected void doReceive(Message message) { + Handler> theHandler; + synchronized (this) { + if (!isRegistered()) { + return; + } else if (demand == 0L) { + if (pending.size() < maxBufferedMessages) { + pending.add(message); + } else { + if (discardHandler != null) { + discardHandler.handle(message); + } else { + log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address); + } + } + return; + } else { + if (pending.size() > 0) { + pending.add(message); + message = pending.poll(); + } + if (demand != Long.MAX_VALUE) { + demand--; + } + theHandler = handler; + } + } + deliver(theHandler, message); + } + + private void deliver(Handler> theHandler, Message message) { + // Handle the message outside the sync block + // https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714 + String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME); + if (creditsAddress != null) { + eventBus.send(creditsAddress, 1); + } + dispatch(theHandler, message, context.duplicate()); + checkNextTick(); + } + + private synchronized void checkNextTick() { + // Check if there are more pending messages in the queue that can be processed next time around + if (!pending.isEmpty() && demand > 0L) { + context.runOnContext(v -> { + Message message; + Handler> theHandler; + synchronized (MessageConsumerImpl.this) { + if (demand == 0L || (message = pending.poll()) == null) { + return; + } + if (demand != Long.MAX_VALUE) { + demand--; + } + theHandler = handler; + } + deliver(theHandler, message); + }); + } + } + + /* + * Internal API for testing purposes. + */ + public synchronized void discardHandler(Handler> handler) { + this.discardHandler = handler; + } + + @Override + public synchronized MessageConsumer handler(Handler> h) { + if (h != null) { + synchronized (this) { + handler = h; + if (result == null) { + result = register(null, localOnly); + result.setHandler(ar -> checkCompletionHandler()); + } + } + } else { + unregister(); + } + return this; + } + + private synchronized void checkCompletionHandler() { + Handler> completionHandler = this.completionHandler; + Future result = this.result; + if (completionHandler != null && result != null && result.isComplete()) { + this.completionHandler = null; + this.result = null; + context.runOnContext(v -> { + completionHandler.handle(result); + }); + } + } + + @Override + public ReadStream bodyStream() { + return new BodyReadStream<>(this); + } + + @Override + public synchronized MessageConsumer pause() { + demand = 0L; + return this; + } + + @Override + public MessageConsumer resume() { + return fetch(Long.MAX_VALUE); + } + + @Override + public synchronized MessageConsumer fetch(long amount) { + if (amount < 0) { + throw new IllegalArgumentException(); + } + demand += amount; + if (demand < 0L) { + demand = Long.MAX_VALUE; + } + if (demand > 0L) { + checkNextTick(); + } + return this; + } + + @Override + public synchronized MessageConsumer endHandler(Handler endHandler) { + if (endHandler != null) { + // We should use the HandlerHolder context to properly do this (needs small refactoring) + Context endCtx = vertx.getOrCreateContext(); + this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null)); + } else { + this.endHandler = null; + } + return this; + } + + @Override + public synchronized MessageConsumer exceptionHandler(Handler handler) { + return this; + } + + public synchronized Handler> getHandler() { + return handler; + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java index df3bc98e8..a6bb060a5 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java @@ -115,9 +115,9 @@ public class MessageImpl implements Message { public Future> replyAndRequest(Object message, DeliveryOptions options) { if (replyAddress != null) { MessageImpl reply = createReply(message, options); - EventBusImpl.ReplyHandler handler = bus.createReplyHandler(reply, false, options); + ReplyHandler handler = bus.createReplyHandler(reply, false, options); bus.sendReply(reply, options, handler); - return handler.result.future(); + return handler.result(); } else { throw new IllegalStateException(); } diff --git a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java index bcb49e22c..8d84fe7b5 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java +++ b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java @@ -31,7 +31,7 @@ public class OutboundDeliveryContext implements DeliveryContext, Handler replyHandler; + public final ReplyHandler replyHandler; private final Promise writePromise; private boolean src; @@ -39,7 +39,7 @@ public class OutboundDeliveryContext implements DeliveryContext, Handler replyHandler, Promise writePromise) { + OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler replyHandler, Promise writePromise) { this.ctx = ctx; this.message = message; this.options = options; diff --git a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java new file mode 100644 index 000000000..019eaa8d4 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java @@ -0,0 +1,80 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.ReplyException; +import io.vertx.core.eventbus.ReplyFailure; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.spi.tracing.TagExtractor; +import io.vertx.core.spi.tracing.VertxTracer; + +class ReplyHandler extends HandlerRegistration implements Handler> { + + private final EventBusImpl eventBus; + private final ContextInternal context; + private final Promise> result; + private final long timeoutID; + private final boolean src; + private final String repliedAddress; + Object trace; + + ReplyHandler(EventBusImpl eventBus, ContextInternal context, String address, String repliedAddress, boolean src, long timeout) { + super(context, eventBus, address, src); + this.eventBus = eventBus; + this.context = context; + this.result = Promise.promise(); + this.src = src; + this.repliedAddress = repliedAddress; + this.timeoutID = eventBus.vertx.setTimer(timeout, id -> { + fail(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress)); + }); + } + + private void trace(Object reply, Throwable failure) { + VertxTracer tracer = context.tracer(); + if (tracer != null && src) { + tracer.receiveResponse(context, reply, trace, failure, TagExtractor.empty()); + } + } + + Future> result() { + return result.future(); + } + + void fail(ReplyException failure) { + unregister(ar -> {}); + if (eventBus.metrics != null) { + eventBus.metrics.replyFailure(repliedAddress, failure.failureType()); + } + trace(null, failure); + result.tryFail(failure); + } + + + @Override + protected void doReceive(Message reply) { + dispatch(this, reply, context); + } + + @Override + protected void doUnregister() { + } + + void register() { + register(repliedAddress, true); + } + + @Override + public void handle(Message reply) { + eventBus.vertx.cancelTimer(timeoutID); + if (reply.body() instanceof ReplyException) { + // This is kind of clunky - but hey-ho + fail((ReplyException) reply.body()); + } else { + trace(reply, null); + result.complete(reply); + } + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index 8453448a2..f209afcfe 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -183,13 +183,11 @@ public class ClusteredEventBus extends EventBusImpl { } @Override - protected void addRegistration(boolean newAddress, String address, - boolean replyHandler, boolean localOnly, - Handler> completionHandler) { - if (newAddress && subs != null && !replyHandler && !localOnly) { + protected void addRegistration(boolean newAddress, HandlerHolder holder, Handler> completionHandler) { + if (newAddress && subs != null && !holder.replyHandler && !holder.localOnly) { // Propagate the information - subs.add(address, nodeInfo, completionHandler); - ownSubs.add(address); + subs.add(holder.address, nodeInfo, completionHandler); + ownSubs.add(holder.address); } else { completionHandler.handle(Future.succeededFuture()); } diff --git a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java index 1e2eb6179..fbb4519b4 100644 --- a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java @@ -12,7 +12,7 @@ package io.vertx.core.eventbus; import io.vertx.core.*; -import io.vertx.core.eventbus.impl.HandlerRegistration; +import io.vertx.core.eventbus.impl.MessageConsumerImpl; import io.vertx.core.http.CaseInsensitiveHeaders; import io.vertx.core.impl.ConcurrentHashSet; import io.vertx.core.impl.ContextInternal; @@ -1043,7 +1043,7 @@ public class LocalEventBusTest extends EventBusTestBase { }; MessageConsumer reg = eb.consumer(ADDRESS1).setMaxBufferedMessages(10); ReadStream controller = register.apply(reg, handler); - ((HandlerRegistration) reg).discardHandler(msg -> { + ((MessageConsumerImpl) reg).discardHandler(msg -> { assertEquals(data[10], msg.body()); expected.addAll(Arrays.asList(data).subList(0, 10)); controller.resume(); @@ -1072,7 +1072,7 @@ public class LocalEventBusTest extends EventBusTestBase { } List received = Collections.synchronizedList(new ArrayList<>()); CountDownLatch receiveLatch = new CountDownLatch(4); - HandlerRegistration consumer = (HandlerRegistration) eb.consumer(ADDRESS1).setMaxBufferedMessages(5); + MessageConsumerImpl consumer = (MessageConsumerImpl) eb.consumer(ADDRESS1).setMaxBufferedMessages(5); streamSupplier.apply(consumer, e -> { received.add(e); receiveLatch.countDown(); @@ -1105,7 +1105,7 @@ public class LocalEventBusTest extends EventBusTestBase { // Let enough time of the 20 messages to go in the consumer pending queue vertx.setTimer(20, v -> { AtomicInteger count = new AtomicInteger(1); - ((HandlerRegistration)consumer).discardHandler(discarded -> { + ((MessageConsumerImpl)consumer).discardHandler(discarded -> { int val = discarded.body(); assertEquals(count.getAndIncrement(), val); if (val == 9) { @@ -1149,7 +1149,7 @@ public class LocalEventBusTest extends EventBusTestBase { }; MessageConsumer reg = eb.consumer(ADDRESS1).setMaxBufferedMessages(10); ReadStream controller = register.apply(reg, handler); - ((HandlerRegistration) reg).discardHandler(msg -> { + ((MessageConsumerImpl) reg).discardHandler(msg -> { assertEquals(data[10], msg.body()); expected.addAll(Arrays.asList(data).subList(0, 10)); controller.resume(); @@ -1409,7 +1409,7 @@ public class LocalEventBusTest extends EventBusTestBase { Context ctx = Vertx.currentContext(); ctx.runOnContext(v -> { consumer.resume(); - ((HandlerRegistration) consumer).discardHandler(discarded -> { + ((MessageConsumerImpl) consumer).discardHandler(discarded -> { assertEquals("val1", discarded.body()); testComplete(); });