Move the replier message info on the replied message itself

This commit is contained in:
Julien Viet
2019-10-18 13:37:36 +02:00
parent a06aa2a296
commit c6dd46d10c
5 changed files with 33 additions and 31 deletions

View File

@@ -320,14 +320,10 @@ public class EventBusImpl implements EventBus, MetricsProvider {
// Guarantees the order when there is no current context in clustered mode
ctx = sendNoContext;
}
sendOrPubInternal(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage, null));
sendOrPubInternal(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, null));
}
}
protected <T> void sendReply(OutboundDeliveryContext<T> sendContext, MessageImpl replierMessage) {
sendOrPub(sendContext);
}
protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
sendLocally(sendContext);
}

View File

@@ -109,7 +109,7 @@ public class MessageImpl<U, V> implements Message<V> {
@Override
public void reply(Object message, DeliveryOptions options) {
if (replyAddress != null) {
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName());
MessageImpl reply = createReply(message, options);
bus.sendReply(reply, this, options, null);
}
}
@@ -117,7 +117,7 @@ public class MessageImpl<U, V> implements Message<V> {
@Override
public <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options) {
if (replyAddress != null) {
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName());
MessageImpl reply = createReply(message, options);
EventBusImpl.ReplyHandler<R> handler = bus.createReplyHandler(reply, reply.src, options);
bus.sendReply(reply, this, options, handler);
return handler.result.future();
@@ -126,6 +126,10 @@ public class MessageImpl<U, V> implements Message<V> {
}
}
protected MessageImpl createReply(Object message, DeliveryOptions options) {
return bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName());
}
@Override
public boolean isSend() {
return send;

View File

@@ -32,7 +32,6 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
public final MessageImpl message;
public final DeliveryOptions options;
public final EventBusImpl.ReplyHandler<T> replyHandler;
private final MessageImpl replierMessage;
private final Promise<Void> writePromise;
private Object trace;
@@ -42,14 +41,9 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
EventBusMetrics metrics;
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler<T> replyHandler, Promise<Void> writePromise) {
this(ctx, message, options, replyHandler, null, writePromise);
}
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler<T> replyHandler, MessageImpl replierMessage, Promise<Void> writePromise) {
this.ctx = ctx;
this.message = message;
this.options = options;
this.replierMessage = replierMessage;
this.replyHandler = replyHandler;
this.writePromise = writePromise;
}
@@ -120,11 +114,7 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
BiConsumer<String, String> biConsumer = (String key, String val) -> message.headers().set(key, val);
trace = tracer.sendRequest(ctx, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
}
if (replierMessage == null) {
bus.sendOrPub(this);
} else {
bus.sendReply(this, replierMessage);
}
bus.sendOrPub(this);
}
}

View File

@@ -206,22 +206,21 @@ public class ClusteredEventBus extends EventBusImpl {
}
}
@Override
protected <T> void sendReply(OutboundDeliveryContext<T> sendContext, MessageImpl replierMessage) {
clusteredSendReply(((ClusteredMessage) replierMessage).getSender(), sendContext);
}
@Override
protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
if (sendContext.options.isLocalOnly()) {
super.sendOrPub(sendContext);
} else if (Vertx.currentContext() != sendContext.ctx) {
// Current event-loop might be null when sending from non vertx thread
sendContext.ctx.runOnContext(v -> {
subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
});
if (((ClusteredMessage) sendContext.message).getRepliedTo() != null) {
clusteredSendReply(((ClusteredMessage) sendContext.message).getRepliedTo(), sendContext);
} else {
subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
if (sendContext.options.isLocalOnly()) {
super.sendOrPub(sendContext);
} else if (Vertx.currentContext() != sendContext.ctx) {
// Current event-loop might be null when sending from non vertx thread
sendContext.ctx.runOnContext(v -> {
subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
});
} else {
subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
}
}
}

View File

@@ -14,6 +14,7 @@ package io.vertx.core.eventbus.impl.clustered;
import io.netty.util.CharsetUtil;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.EventBusImpl;
@@ -36,6 +37,7 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
private static final byte WIRE_PROTOCOL_VERSION = 1;
private ServerID sender;
private ServerID repliedTo;
private Buffer wireBuffer;
private int bodyPos;
private int headersPos;
@@ -63,6 +65,13 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
this.fromWire = other.fromWire;
}
@Override
protected MessageImpl createReply(Object message, DeliveryOptions options) {
ClusteredMessage reply = (ClusteredMessage) super.createReply(message, options);
reply.repliedTo = sender;
return reply;
}
public ClusteredMessage<U, V> copyBeforeReceive(boolean src) {
return new ClusteredMessage<>(this, src);
}
@@ -240,6 +249,10 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
return sender;
}
ServerID getRepliedTo() {
return repliedTo;
}
public boolean isFromWire() {
return fromWire;
}