Remove src field from MessageImpl as it can be avoided

This commit is contained in:
Julien Viet
2019-10-19 13:38:10 +02:00
parent 64ee11b0a1
commit 157f59dcf4
7 changed files with 31 additions and 33 deletions

View File

@@ -105,14 +105,14 @@ public class EventBusImpl implements EventBus, MetricsProvider {
@Override
public EventBus send(String address, Object message, DeliveryOptions options) {
MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName());
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
sendOrPubInternal(msg, options, null, null);
return this;
}
@Override
public <T> Future<Message<T>> request(String address, Object message, DeliveryOptions options) {
MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName());
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
ReplyHandler<T> handler = createReplyHandler(msg, true, options);
sendOrPubInternal(msg, options, handler, null);
return handler.result.future();
@@ -151,7 +151,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
@Override
public EventBus publish(String address, Object message, DeliveryOptions options) {
sendOrPubInternal(createMessage(false, true, address, options.getHeaders(), message, options.getCodecName()), options, null, null);
sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null, null);
return this;
}
@@ -231,11 +231,11 @@ public class EventBusImpl implements EventBus, MetricsProvider {
return metrics;
}
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName) {
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
MessageImpl msg = new MessageImpl(address, headers, body, codec, send, src, this);
MessageImpl msg = new MessageImpl(address, headers, body, codec, send, this);
return msg;
}
@@ -483,7 +483,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
private <T> void deliverToHandler(MessageImpl msg, HandlerHolder<T> holder) {
// Each handler gets a fresh copy
MessageImpl copied = msg.copyBeforeReceive(holder.getHandler().src);
MessageImpl copied = msg.copyBeforeReceive();
holder.getContext().runOnContext((v) -> {
// Need to check handler is still there - the handler might have been removed after the message were sent but

View File

@@ -30,7 +30,6 @@ public class MessageImpl<U, V> implements Message<V> {
protected MessageCodec<U, V> messageCodec;
protected final EventBusImpl bus;
public final boolean src;
protected String address;
protected String replyAddress;
protected MultiMap headers;
@@ -39,29 +38,26 @@ public class MessageImpl<U, V> implements Message<V> {
protected boolean send;
protected Object trace;
public MessageImpl(boolean src, EventBusImpl bus) {
public MessageImpl(EventBusImpl bus) {
this.bus = bus;
this.src = src;
}
public MessageImpl(String address, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec,
boolean send, boolean src, EventBusImpl bus) {
boolean send, EventBusImpl bus) {
this.messageCodec = messageCodec;
this.address = address;
this.headers = headers;
this.sentBody = sentBody;
this.send = send;
this.bus = bus;
this.src = src;
}
protected MessageImpl(MessageImpl<U, V> other, boolean src) {
protected MessageImpl(MessageImpl<U, V> other) {
this.bus = other.bus;
this.address = other.address;
this.replyAddress = other.replyAddress;
this.messageCodec = other.messageCodec;
this.src = src;
if (other.headers != null) {
List<Map.Entry<String, String>> entries = other.headers.entries();
this.headers = new CaseInsensitiveHeaders();
@@ -76,8 +72,8 @@ public class MessageImpl<U, V> implements Message<V> {
this.send = other.send;
}
public MessageImpl<U, V> copyBeforeReceive(boolean src) {
return new MessageImpl<>(this, src);
public MessageImpl<U, V> copyBeforeReceive() {
return new MessageImpl<>(this);
}
@Override
@@ -119,7 +115,7 @@ public class MessageImpl<U, V> implements Message<V> {
public <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options) {
if (replyAddress != null) {
MessageImpl reply = createReply(message, options);
EventBusImpl.ReplyHandler<R> handler = bus.createReplyHandler(reply, reply.src, options);
EventBusImpl.ReplyHandler<R> handler = bus.createReplyHandler(reply, false, options);
bus.sendReply(reply, options, handler);
return handler.result.future();
} else {
@@ -128,7 +124,7 @@ public class MessageImpl<U, V> implements Message<V> {
}
protected MessageImpl createReply(Object message, DeliveryOptions options) {
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName());
MessageImpl reply = bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName());
reply.trace = trace;
return reply;
}

View File

@@ -98,7 +98,7 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
}
private void write(T data, Promise<Void> handler) {
MessageImpl msg = bus.createMessage(send, true, address, options.getHeaders(), data, options.getCodecName());
MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), data, options.getCodecName());
OutboundDeliveryContext<T> sendCtx = bus.newSendContext(msg, options, null, handler);
if (send) {
synchronized (this) {

View File

@@ -33,6 +33,7 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
public final DeliveryOptions options;
public final EventBusImpl.ReplyHandler<T> replyHandler;
private final Promise<Void> writePromise;
private boolean src;
Iterator<Handler<DeliveryContext>> iter;
EventBusImpl bus;
@@ -62,7 +63,7 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
// Tracing
VertxTracer tracer = ctx.tracer();
if (tracer != null) {
if (message.src) {
if (src) {
if (replyHandler != null) {
replyHandler.trace = message.trace;
} else {
@@ -109,10 +110,11 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
} else {
VertxTracer tracer = ctx.tracer();
if (tracer != null) {
if (message.src) {
if (message.trace == null) {
src = true;
BiConsumer<String, String> biConsumer = (String key, String val) -> message.headers().set(key, val);
message.trace = tracer.sendRequest(ctx, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
} else if (message.trace != null) {
} else {
// Handle failure here
tracer.sendResponse(ctx, null, message.trace, null, TagExtractor.empty());
}

View File

@@ -174,11 +174,11 @@ public class ClusteredEventBus extends EventBusImpl {
}
@Override
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName) {
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
ClusteredMessage msg = new ClusteredMessage(serverID, address, headers, body, codec, send, src, this);
ClusteredMessage msg = new ClusteredMessage(serverID, address, headers, body, codec, send, this);
return msg;
}
@@ -298,7 +298,7 @@ public class ClusteredEventBus extends EventBusImpl {
size = buff.getInt(0);
parser.fixedSizeMode(size);
} else {
ClusteredMessage received = new ClusteredMessage(false, ClusteredEventBus.this);
ClusteredMessage received = new ClusteredMessage(ClusteredEventBus.this);
received.readFromWire(buff, codecManager);
if (metrics != null) {
metrics.messageRead(received.address(), buff.length());

View File

@@ -44,18 +44,18 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
private boolean fromWire;
private boolean toWire;
public ClusteredMessage(boolean src, EventBusImpl bus) {
super(src, bus);
public ClusteredMessage(EventBusImpl bus) {
super(bus);
}
public ClusteredMessage(ServerID sender, String address, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec, boolean send, boolean src, EventBusImpl bus) {
super(address, headers, sentBody, messageCodec, send, src, bus);
MessageCodec<U, V> messageCodec, boolean send, EventBusImpl bus) {
super(address, headers, sentBody, messageCodec, send, bus);
this.sender = sender;
}
protected ClusteredMessage(ClusteredMessage<U, V> other, boolean src) {
super(other, src);
protected ClusteredMessage(ClusteredMessage<U, V> other) {
super(other);
this.sender = other.sender;
if (other.sentBody == null) {
this.wireBuffer = other.wireBuffer;
@@ -72,8 +72,8 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
return reply;
}
public ClusteredMessage<U, V> copyBeforeReceive(boolean src) {
return new ClusteredMessage<>(this, src);
public ClusteredMessage<U, V> copyBeforeReceive() {
return new ClusteredMessage<>(this);
}
@Override

View File

@@ -136,7 +136,7 @@ class ConnectionHolder {
close();
});
ClusteredMessage pingMessage =
new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, new PingMessageCodec(), true, true, eventBus);
new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, new PingMessageCodec(), true, eventBus);
Buffer data = pingMessage.encodeToWire();
socket.write(data);
});