From c6dd46d10c9b2f37da41af41e788c331a4572a16 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Fri, 18 Oct 2019 13:37:36 +0200 Subject: [PATCH] Move the replier message info on the replied message itself --- .../core/eventbus/impl/EventBusImpl.java | 6 +---- .../vertx/core/eventbus/impl/MessageImpl.java | 8 ++++-- .../impl/OutboundDeliveryContext.java | 12 +-------- .../impl/clustered/ClusteredEventBus.java | 25 +++++++++---------- .../impl/clustered/ClusteredMessage.java | 13 ++++++++++ 5 files changed, 33 insertions(+), 31 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 90001bd64..497734dfd 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -320,14 +320,10 @@ public class EventBusImpl implements EventBus, MetricsProvider { // Guarantees the order when there is no current context in clustered mode ctx = sendNoContext; } - sendOrPubInternal(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage, null)); + sendOrPubInternal(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, null)); } } - protected void sendReply(OutboundDeliveryContext sendContext, MessageImpl replierMessage) { - sendOrPub(sendContext); - } - protected void sendOrPub(OutboundDeliveryContext sendContext) { sendLocally(sendContext); } diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java index cc3bc9766..198c94364 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java @@ -109,7 +109,7 @@ public class MessageImpl implements Message { @Override public void reply(Object message, DeliveryOptions options) { if (replyAddress != null) { - MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName()); + MessageImpl reply = createReply(message, options); bus.sendReply(reply, this, options, null); } } @@ -117,7 +117,7 @@ public class MessageImpl implements Message { @Override public Future> replyAndRequest(Object message, DeliveryOptions options) { if (replyAddress != null) { - MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName()); + MessageImpl reply = createReply(message, options); EventBusImpl.ReplyHandler handler = bus.createReplyHandler(reply, reply.src, options); bus.sendReply(reply, this, options, handler); return handler.result.future(); @@ -126,6 +126,10 @@ public class MessageImpl implements Message { } } + protected MessageImpl createReply(Object message, DeliveryOptions options) { + return bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName()); + } + @Override public boolean isSend() { return send; diff --git a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java index c34aa5231..5bac8ed81 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java +++ b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java @@ -32,7 +32,6 @@ public class OutboundDeliveryContext implements DeliveryContext, Handler replyHandler; - private final MessageImpl replierMessage; private final Promise writePromise; private Object trace; @@ -42,14 +41,9 @@ public class OutboundDeliveryContext implements DeliveryContext, Handler replyHandler, Promise writePromise) { - this(ctx, message, options, replyHandler, null, writePromise); - } - - OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler replyHandler, MessageImpl replierMessage, Promise writePromise) { this.ctx = ctx; this.message = message; this.options = options; - this.replierMessage = replierMessage; this.replyHandler = replyHandler; this.writePromise = writePromise; } @@ -120,11 +114,7 @@ public class OutboundDeliveryContext implements DeliveryContext, Handler biConsumer = (String key, String val) -> message.headers().set(key, val); trace = tracer.sendRequest(ctx, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE); } - if (replierMessage == null) { - bus.sendOrPub(this); - } else { - bus.sendReply(this, replierMessage); - } + bus.sendOrPub(this); } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index ef7991d7e..3ab3f17be 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -206,22 +206,21 @@ public class ClusteredEventBus extends EventBusImpl { } } - @Override - protected void sendReply(OutboundDeliveryContext sendContext, MessageImpl replierMessage) { - clusteredSendReply(((ClusteredMessage) replierMessage).getSender(), sendContext); - } - @Override protected void sendOrPub(OutboundDeliveryContext sendContext) { - if (sendContext.options.isLocalOnly()) { - super.sendOrPub(sendContext); - } else if (Vertx.currentContext() != sendContext.ctx) { - // Current event-loop might be null when sending from non vertx thread - sendContext.ctx.runOnContext(v -> { - subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext)); - }); + if (((ClusteredMessage) sendContext.message).getRepliedTo() != null) { + clusteredSendReply(((ClusteredMessage) sendContext.message).getRepliedTo(), sendContext); } else { - subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext)); + if (sendContext.options.isLocalOnly()) { + super.sendOrPub(sendContext); + } else if (Vertx.currentContext() != sendContext.ctx) { + // Current event-loop might be null when sending from non vertx thread + sendContext.ctx.runOnContext(v -> { + subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext)); + }); + } else { + subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext)); + } } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java index 6dd26d4df..5d4d07541 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java @@ -14,6 +14,7 @@ package io.vertx.core.eventbus.impl.clustered; import io.netty.util.CharsetUtil; import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.eventbus.impl.CodecManager; import io.vertx.core.eventbus.impl.EventBusImpl; @@ -36,6 +37,7 @@ public class ClusteredMessage extends MessageImpl { private static final byte WIRE_PROTOCOL_VERSION = 1; private ServerID sender; + private ServerID repliedTo; private Buffer wireBuffer; private int bodyPos; private int headersPos; @@ -63,6 +65,13 @@ public class ClusteredMessage extends MessageImpl { this.fromWire = other.fromWire; } + @Override + protected MessageImpl createReply(Object message, DeliveryOptions options) { + ClusteredMessage reply = (ClusteredMessage) super.createReply(message, options); + reply.repliedTo = sender; + return reply; + } + public ClusteredMessage copyBeforeReceive(boolean src) { return new ClusteredMessage<>(this, src); } @@ -240,6 +249,10 @@ public class ClusteredMessage extends MessageImpl { return sender; } + ServerID getRepliedTo() { + return repliedTo; + } + public boolean isFromWire() { return fromWire; }