EventBus field can be set at message creation time

This commit is contained in:
Julien Viet
2019-01-22 14:01:58 +01:00
parent 7ec6c36c08
commit 579bf006ce
4 changed files with 7 additions and 12 deletions

View File

@@ -384,7 +384,6 @@ public class EventBusImpl implements EventBus, MetricsProvider {
}
protected <T> boolean deliverMessageLocally(MessageImpl msg) {
msg.setBus(this);
ConcurrentCyclicSequence<HandlerHolder> handlers = handlerMap.get(msg.address());
if (handlers != null) {
if (msg.isSend()) {

View File

@@ -30,7 +30,7 @@ public class MessageImpl<U, V> implements Message<V> {
private static final Logger log = LoggerFactory.getLogger(MessageImpl.class);
protected MessageCodec<U, V> messageCodec;
protected EventBusImpl bus;
protected final EventBusImpl bus;
protected String address;
protected String replyAddress;
protected MultiMap headers;
@@ -38,7 +38,8 @@ public class MessageImpl<U, V> implements Message<V> {
protected V receivedBody;
protected boolean send;
public MessageImpl() {
public MessageImpl(EventBusImpl bus) {
this.bus = bus;
}
public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody,
@@ -146,14 +147,8 @@ public class MessageImpl<U, V> implements Message<V> {
return messageCodec;
}
public void setBus(EventBusImpl bus) {
this.bus = bus;
}
protected <R> void sendReply(MessageImpl msg, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
if (bus != null) {
bus.sendReply(msg, this, options, replyHandler);
}
bus.sendReply(msg, this, options, replyHandler);
}
protected boolean isLocal() {

View File

@@ -297,7 +297,7 @@ public class ClusteredEventBus extends EventBusImpl {
size = buff.getInt(0);
parser.fixedSizeMode(size);
} else {
ClusteredMessage received = new ClusteredMessage();
ClusteredMessage received = new ClusteredMessage(ClusteredEventBus.this);
received.readFromWire(buff, codecManager);
if (metrics != null) {
metrics.messageRead(received.address(), buff.length());

View File

@@ -41,7 +41,8 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
private int headersPos;
private boolean fromWire;
public ClusteredMessage() {
public ClusteredMessage(EventBusImpl bus) {
super(bus);
}
public ClusteredMessage(ServerID sender, String address, String replyAddress, MultiMap headers, U sentBody,