diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 7d1d15b48..994a4a1c4 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -105,14 +105,14 @@ public class EventBusImpl implements EventBus, MetricsProvider { @Override public EventBus send(String address, Object message, DeliveryOptions options) { - MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName()); + MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName()); sendOrPubInternal(msg, options, null, null); return this; } @Override public Future> request(String address, Object message, DeliveryOptions options) { - MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName()); + MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName()); ReplyHandler handler = createReplyHandler(msg, true, options); sendOrPubInternal(msg, options, handler, null); return handler.result.future(); @@ -151,7 +151,7 @@ public class EventBusImpl implements EventBus, MetricsProvider { @Override public EventBus publish(String address, Object message, DeliveryOptions options) { - sendOrPubInternal(createMessage(false, true, address, options.getHeaders(), message, options.getCodecName()), options, null, null); + sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null, null); return this; } @@ -231,11 +231,11 @@ public class EventBusImpl implements EventBus, MetricsProvider { return metrics; } - public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName) { + public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) { Objects.requireNonNull(address, "no null address accepted"); MessageCodec codec = codecManager.lookupCodec(body, codecName); @SuppressWarnings("unchecked") - MessageImpl msg = new MessageImpl(address, headers, body, codec, send, src, this); + MessageImpl msg = new MessageImpl(address, headers, body, codec, send, this); return msg; } @@ -483,7 +483,7 @@ public class EventBusImpl implements EventBus, MetricsProvider { private void deliverToHandler(MessageImpl msg, HandlerHolder holder) { // Each handler gets a fresh copy - MessageImpl copied = msg.copyBeforeReceive(holder.getHandler().src); + MessageImpl copied = msg.copyBeforeReceive(); holder.getContext().runOnContext((v) -> { // Need to check handler is still there - the handler might have been removed after the message were sent but diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java index cc9d8c886..df3bc98e8 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java @@ -30,7 +30,6 @@ public class MessageImpl implements Message { protected MessageCodec messageCodec; protected final EventBusImpl bus; - public final boolean src; protected String address; protected String replyAddress; protected MultiMap headers; @@ -39,29 +38,26 @@ public class MessageImpl implements Message { protected boolean send; protected Object trace; - public MessageImpl(boolean src, EventBusImpl bus) { + public MessageImpl(EventBusImpl bus) { this.bus = bus; - this.src = src; } public MessageImpl(String address, MultiMap headers, U sentBody, MessageCodec messageCodec, - boolean send, boolean src, EventBusImpl bus) { + boolean send, EventBusImpl bus) { this.messageCodec = messageCodec; this.address = address; this.headers = headers; this.sentBody = sentBody; this.send = send; this.bus = bus; - this.src = src; } - protected MessageImpl(MessageImpl other, boolean src) { + protected MessageImpl(MessageImpl other) { this.bus = other.bus; this.address = other.address; this.replyAddress = other.replyAddress; this.messageCodec = other.messageCodec; - this.src = src; if (other.headers != null) { List> entries = other.headers.entries(); this.headers = new CaseInsensitiveHeaders(); @@ -76,8 +72,8 @@ public class MessageImpl implements Message { this.send = other.send; } - public MessageImpl copyBeforeReceive(boolean src) { - return new MessageImpl<>(this, src); + public MessageImpl copyBeforeReceive() { + return new MessageImpl<>(this); } @Override @@ -119,7 +115,7 @@ public class MessageImpl implements Message { public Future> replyAndRequest(Object message, DeliveryOptions options) { if (replyAddress != null) { MessageImpl reply = createReply(message, options); - EventBusImpl.ReplyHandler handler = bus.createReplyHandler(reply, reply.src, options); + EventBusImpl.ReplyHandler handler = bus.createReplyHandler(reply, false, options); bus.sendReply(reply, options, handler); return handler.result.future(); } else { @@ -128,7 +124,7 @@ public class MessageImpl implements Message { } protected MessageImpl createReply(Object message, DeliveryOptions options) { - MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName()); + MessageImpl reply = bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName()); reply.trace = trace; return reply; } diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java index 3951ba4b3..44ca2ff74 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java @@ -98,7 +98,7 @@ public class MessageProducerImpl implements MessageProducer { } private void write(T data, Promise handler) { - MessageImpl msg = bus.createMessage(send, true, address, options.getHeaders(), data, options.getCodecName()); + MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), data, options.getCodecName()); OutboundDeliveryContext sendCtx = bus.newSendContext(msg, options, null, handler); if (send) { synchronized (this) { diff --git a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java index 4bf6b820c..bcb49e22c 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java +++ b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java @@ -33,6 +33,7 @@ public class OutboundDeliveryContext implements DeliveryContext, Handler replyHandler; private final Promise writePromise; + private boolean src; Iterator> iter; EventBusImpl bus; @@ -62,7 +63,7 @@ public class OutboundDeliveryContext implements DeliveryContext, Handler implements DeliveryContext, Handler biConsumer = (String key, String val) -> message.headers().set(key, val); message.trace = tracer.sendRequest(ctx, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE); - } else if (message.trace != null) { + } else { // Handle failure here tracer.sendResponse(ctx, null, message.trace, null, TagExtractor.empty()); } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index 3ab3f17be..8453448a2 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -174,11 +174,11 @@ public class ClusteredEventBus extends EventBusImpl { } @Override - public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName) { + public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) { Objects.requireNonNull(address, "no null address accepted"); MessageCodec codec = codecManager.lookupCodec(body, codecName); @SuppressWarnings("unchecked") - ClusteredMessage msg = new ClusteredMessage(serverID, address, headers, body, codec, send, src, this); + ClusteredMessage msg = new ClusteredMessage(serverID, address, headers, body, codec, send, this); return msg; } @@ -298,7 +298,7 @@ public class ClusteredEventBus extends EventBusImpl { size = buff.getInt(0); parser.fixedSizeMode(size); } else { - ClusteredMessage received = new ClusteredMessage(false, ClusteredEventBus.this); + ClusteredMessage received = new ClusteredMessage(ClusteredEventBus.this); received.readFromWire(buff, codecManager); if (metrics != null) { metrics.messageRead(received.address(), buff.length()); diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java index 5d4d07541..880f465f3 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java @@ -44,18 +44,18 @@ public class ClusteredMessage extends MessageImpl { private boolean fromWire; private boolean toWire; - public ClusteredMessage(boolean src, EventBusImpl bus) { - super(src, bus); + public ClusteredMessage(EventBusImpl bus) { + super(bus); } public ClusteredMessage(ServerID sender, String address, MultiMap headers, U sentBody, - MessageCodec messageCodec, boolean send, boolean src, EventBusImpl bus) { - super(address, headers, sentBody, messageCodec, send, src, bus); + MessageCodec messageCodec, boolean send, EventBusImpl bus) { + super(address, headers, sentBody, messageCodec, send, bus); this.sender = sender; } - protected ClusteredMessage(ClusteredMessage other, boolean src) { - super(other, src); + protected ClusteredMessage(ClusteredMessage other) { + super(other); this.sender = other.sender; if (other.sentBody == null) { this.wireBuffer = other.wireBuffer; @@ -72,8 +72,8 @@ public class ClusteredMessage extends MessageImpl { return reply; } - public ClusteredMessage copyBeforeReceive(boolean src) { - return new ClusteredMessage<>(this, src); + public ClusteredMessage copyBeforeReceive() { + return new ClusteredMessage<>(this); } @Override diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java index 5584cf914..c3fff7dc7 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java @@ -136,7 +136,7 @@ class ConnectionHolder { close(); }); ClusteredMessage pingMessage = - new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, new PingMessageCodec(), true, true, eventBus); + new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, new PingMessageCodec(), true, eventBus); Buffer data = pingMessage.encodeToWire(); socket.write(data); });