diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index f0aac5ef3..3f68e3aa9 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -44,7 +44,7 @@ import java.util.function.BiConsumer; */ public class EventBusImpl implements EventBus, MetricsProvider { - private static final Logger log = LoggerFactory.getLogger(EventBusImpl.class); + static final Logger log = LoggerFactory.getLogger(EventBusImpl.class); private final List> sendInterceptors = new CopyOnWriteArrayList<>(); private final List> receiveInterceptors = new CopyOnWriteArrayList<>(); @@ -321,10 +321,16 @@ public class EventBusImpl implements EventBus, MetricsProvider { // Guarantees the order when there is no current context in clustered mode ctx = sendNoContext; } - new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage).next(); + send(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage)); } } + private void send(OutboundDeliveryContext ctx) { + ctx.iter = sendInterceptors.iterator(); + ctx.bus = this; + ctx.next(); + } + protected void sendReply(OutboundDeliveryContext sendContext, MessageImpl replierMessage) { sendOrPub(sendContext); } @@ -500,68 +506,7 @@ public class EventBusImpl implements EventBus, MetricsProvider { // Guarantees the order when there is no current context in clustered mode ctx = sendNoContext; } - OutboundDeliveryContext sendContext = new OutboundDeliveryContext<>(ctx, message, options, handler); - sendContext.next(); - } - - protected class OutboundDeliveryContext implements DeliveryContext { - - public final ContextInternal ctx; - public final MessageImpl message; - public final DeliveryOptions options; - public final Iterator> iter; - public final ReplyHandler replyHandler; - private final MessageImpl replierMessage; - - private OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler replyHandler) { - this(ctx, message, options, replyHandler, null); - } - - private OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler replyHandler, MessageImpl replierMessage) { - this.ctx = ctx; - this.message = message; - this.options = options; - this.iter = sendInterceptors.iterator(); - this.replierMessage = replierMessage; - this.replyHandler = replyHandler; - } - - @Override - public Message message() { - return message; - } - - @Override - public void next() { - if (iter.hasNext()) { - Handler handler = iter.next(); - try { - if (handler != null) { - handler.handle(this); - } else { - next(); - } - } catch (Throwable t) { - log.error("Failure in interceptor", t); - } - } else { - if (replierMessage == null) { - sendOrPub(this); - } else { - sendReply(this, replierMessage); - } - } - } - - @Override - public boolean send() { - return message.isSend(); - } - - @Override - public Object body() { - return message.sentBody; - } + send(new OutboundDeliveryContext<>(ctx, message, options, handler)); } private void unregisterAll() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java new file mode 100644 index 000000000..f89a36f1f --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Handler; +import io.vertx.core.eventbus.DeliveryContext; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.Message; +import io.vertx.core.impl.ContextInternal; + +import java.util.Iterator; + +public class OutboundDeliveryContext implements DeliveryContext { + + public final ContextInternal ctx; + public final MessageImpl message; + public final DeliveryOptions options; + public final EventBusImpl.ReplyHandler replyHandler; + private final MessageImpl replierMessage; + + Iterator> iter; + EventBusImpl bus; + + OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler replyHandler) { + this(ctx, message, options, replyHandler, null); + } + + OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler replyHandler, MessageImpl replierMessage) { + this.ctx = ctx; + this.message = message; + this.options = options; + this.replierMessage = replierMessage; + this.replyHandler = replyHandler; + } + + @Override + public Message message() { + return message; + } + + @Override + public void next() { + if (iter.hasNext()) { + Handler handler = iter.next(); + try { + if (handler != null) { + handler.handle(this); + } else { + next(); + } + } catch (Throwable t) { + EventBusImpl.log.error("Failure in interceptor", t); + } + } else { + if (replierMessage == null) { + bus.sendOrPub(this); + } else { + bus.sendReply(this, replierMessage); + } + } + } + + @Override + public boolean send() { + return message.isSend(); + } + + @Override + public Object body() { + return message.sentBody; + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index f94748eba..1447c836e 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -19,6 +19,7 @@ import io.vertx.core.eventbus.impl.CodecManager; import io.vertx.core.eventbus.impl.EventBusImpl; import io.vertx.core.eventbus.impl.HandlerHolder; import io.vertx.core.eventbus.impl.MessageImpl; +import io.vertx.core.eventbus.impl.OutboundDeliveryContext; import io.vertx.core.impl.ConcurrentHashSet; import io.vertx.core.impl.HAManager; import io.vertx.core.impl.VertxInternal; @@ -35,7 +36,6 @@ import io.vertx.core.spi.tracing.TagExtractor; import io.vertx.core.spi.tracing.VertxTracer; import java.io.Serializable; -import java.util.Collections; import java.util.Objects; import java.util.Set; import java.util.UUID;