Move sender metrics/tracing concerns to OutboundDeliveryContext

This commit is contained in:
Julien Viet
2019-10-18 10:18:06 +02:00
parent 79c13877b9
commit 05067ca881
7 changed files with 128 additions and 129 deletions

View File

@@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
/**
* A local event bus implementation
@@ -106,16 +105,16 @@ public class EventBusImpl implements EventBus, MetricsProvider {
@Override
public EventBus send(String address, Object message, DeliveryOptions options) {
MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName(), null);
sendOrPubInternal(msg, options, null);
MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName());
sendOrPubInternal(msg, options, null, null);
return this;
}
@Override
public <T> Future<Message<T>> request(String address, Object message, DeliveryOptions options) {
MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName(), null);
MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName());
ReplyHandler<T> handler = createReplyHandler(msg, true, options);
sendOrPubInternal(msg, options, handler);
sendOrPubInternal(msg, options, handler, null);
return handler.result.future();
}
@@ -152,7 +151,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
@Override
public EventBus publish(String address, Object message, DeliveryOptions options) {
sendOrPubInternal(createMessage(false, true, address, options.getHeaders(), message, options.getCodecName(), null), options, null);
sendOrPubInternal(createMessage(false, true, address, options.getHeaders(), message, options.getCodecName()), options, null, null);
return this;
}
@@ -232,11 +231,11 @@ public class EventBusImpl implements EventBus, MetricsProvider {
return metrics;
}
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Promise<Void> writeHandler) {
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, src, this, writeHandler);
MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, src, this);
return msg;
}
@@ -321,16 +320,10 @@ public class EventBusImpl implements EventBus, MetricsProvider {
// Guarantees the order when there is no current context in clustered mode
ctx = sendNoContext;
}
send(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage));
sendOrPubInternal(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage, null));
}
}
private void send(OutboundDeliveryContext ctx) {
ctx.iter = sendInterceptors.iterator();
ctx.bus = this;
ctx.next();
}
protected <T> void sendReply(OutboundDeliveryContext<T> sendContext, MessageImpl replierMessage) {
sendOrPub(sendContext);
}
@@ -339,21 +332,6 @@ public class EventBusImpl implements EventBus, MetricsProvider {
sendLocally(sendContext);
}
protected final Object messageSent(OutboundDeliveryContext<?> sendContext, boolean local, boolean remote) {
MessageImpl msg = sendContext.message;
if (metrics != null) {
MessageImpl message = msg;
metrics.messageSent(message.address(), !message.send, local, remote);
}
VertxTracer tracer = sendContext.ctx.tracer();
if (tracer != null && msg.src) {
BiConsumer<String, String> biConsumer = (String key, String val) -> msg.headers().set(key, val);
return tracer.sendRequest(sendContext.ctx, msg, msg.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
} else {
return null;
}
}
protected void callCompletionHandlerAsync(Handler<AsyncResult<Void>> completionHandler) {
if (completionHandler != null) {
vertx.runOnContext(v -> {
@@ -363,29 +341,11 @@ public class EventBusImpl implements EventBus, MetricsProvider {
}
private <T> void sendLocally(OutboundDeliveryContext<T> sendContext) {
Object trace = messageSent(sendContext, true, false);
ReplyException failure = deliverMessageLocally(sendContext.message);
if (failure != null) {
// no handlers
VertxTracer tracer = sendContext.ctx.tracer();
if (sendContext.replyHandler != null) {
sendContext.replyHandler.trace = trace;
sendContext.replyHandler.fail(failure);
} else {
if (tracer != null && sendContext.message.src) {
tracer.receiveResponse(sendContext.ctx, null, trace, failure, TagExtractor.empty());
}
}
sendContext.written(failure);
} else {
failure = null;
VertxTracer tracer = sendContext.ctx.tracer();
if (tracer != null && sendContext.message.src) {
if (sendContext.replyHandler == null) {
tracer.receiveResponse(sendContext.ctx, null, trace, null, TagExtractor.empty());
} else {
sendContext.replyHandler.trace = trace;
}
}
sendContext.written(null);
}
}
@@ -404,7 +364,6 @@ public class EventBusImpl implements EventBus, MetricsProvider {
}
if (holder != null) {
deliverToHandler(msg, holder);
msg.written(null);
} else {
// RACY issue !!!!!
}
@@ -416,16 +375,13 @@ public class EventBusImpl implements EventBus, MetricsProvider {
for (HandlerHolder holder: handlers) {
deliverToHandler(msg, holder);
}
msg.written(null);
}
return null;
} else {
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), 0);
}
ReplyException failure = new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
msg.written(failure);
return failure;
return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
}
}
@@ -498,15 +454,28 @@ public class EventBusImpl implements EventBus, MetricsProvider {
}
}
public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler) {
checkStarted();
public <T> OutboundDeliveryContext<T> newSendContext(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler, Promise<Void> writePromise) {
ContextInternal ctx = vertx.getContext();
if (ctx == null) {
// Guarantees the order when there is no current context in clustered mode
ctx = sendNoContext;
}
send(new OutboundDeliveryContext<>(ctx, message, options, handler));
return new OutboundDeliveryContext<>(ctx, message, options, handler, writePromise);
}
public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
checkStarted();
senderCtx.iter = sendInterceptors.iterator();
senderCtx.bus = this;
senderCtx.metrics = metrics;
senderCtx.next();
}
public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler, Promise<Void> writePromise) {
checkStarted();
sendOrPubInternal(newSendContext(message, options, handler, writePromise));
}
private void unregisterAll() {

View File

@@ -11,11 +11,8 @@
package io.vertx.core.eventbus.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.*;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.impl.logging.Logger;
@@ -40,7 +37,6 @@ public class MessageImpl<U, V> implements Message<V> {
protected U sentBody;
protected V receivedBody;
protected boolean send;
protected Promise<Void> write;
public MessageImpl(boolean src, EventBusImpl bus) {
this.bus = bus;
@@ -49,8 +45,7 @@ public class MessageImpl<U, V> implements Message<V> {
public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec,
boolean send, boolean src, EventBusImpl bus,
Promise<Void> write) {
boolean send, boolean src, EventBusImpl bus) {
this.messageCodec = messageCodec;
this.address = address;
this.replyAddress = replyAddress;
@@ -59,7 +54,6 @@ public class MessageImpl<U, V> implements Message<V> {
this.send = send;
this.bus = bus;
this.src = src;
this.write = write;
}
protected MessageImpl(MessageImpl<U, V> other, boolean src) {
@@ -80,7 +74,6 @@ public class MessageImpl<U, V> implements Message<V> {
this.receivedBody = messageCodec.transform(other.sentBody);
}
this.send = other.send;
this.write = other.write;
}
public MessageImpl<U, V> copyBeforeReceive(boolean src) {
@@ -117,7 +110,7 @@ public class MessageImpl<U, V> implements Message<V> {
@Override
public void reply(Object message, DeliveryOptions options) {
if (replyAddress != null) {
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName(), null);
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName());
bus.sendReply(reply, this, options, null);
}
}
@@ -125,7 +118,7 @@ public class MessageImpl<U, V> implements Message<V> {
@Override
public <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options) {
if (replyAddress != null) {
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName(), null);
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName());
EventBusImpl.ReplyHandler<R> handler = bus.createReplyHandler(reply, reply.src, options);
bus.sendReply(reply, this, options, handler);
return handler.result.future();
@@ -143,20 +136,6 @@ public class MessageImpl<U, V> implements Message<V> {
this.replyAddress = replyAddress;
}
public void written(Throwable failure) {
if (write != null) {
if (failure == null) {
write.complete();
} else {
write.fail(failure);
}
}
}
public Promise<Void> write() {
return write;
}
public MessageCodec<U, V> codec() {
return messageCodec;
}

View File

@@ -17,7 +17,6 @@ import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.*;
import io.vertx.core.impl.VertxInternal;
import java.util.ArrayDeque;
import java.util.Queue;
@@ -34,7 +33,7 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
private final EventBusImpl bus;
private final boolean send;
private final String address;
private final Queue<MessageImpl<T, ?>> pending = new ArrayDeque<>();
private final Queue<OutboundDeliveryContext<T>> pending = new ArrayDeque<>();
private final MessageConsumer<Integer> creditConsumer;
private DeliveryOptions options;
private int maxSize = DEFAULT_WRITE_QUEUE_MAX_SIZE;
@@ -99,18 +98,19 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
}
private void write(T data, Promise<Void> handler) {
MessageImpl msg = bus.createMessage(send, true, address, options.getHeaders(), data, options.getCodecName(), handler);
MessageImpl msg = bus.createMessage(send, true, address, options.getHeaders(), data, options.getCodecName());
OutboundDeliveryContext<T> sendCtx = bus.newSendContext(msg, options, null, handler);
if (send) {
synchronized (this) {
if (credits > 0) {
credits--;
} else {
pending.add(msg);
pending.add(sendCtx);
return;
}
}
}
bus.sendOrPubInternal(msg, options, null);
bus.sendOrPubInternal(msg, options, null, handler);
}
@Override
@@ -180,12 +180,12 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
private synchronized void doReceiveCredit(int credit) {
credits += credit;
while (credits > 0) {
MessageImpl<T, ?> msg = pending.poll();
if (msg == null) {
OutboundDeliveryContext<T> sendContext = pending.poll();
if (sendContext == null) {
break;
} else {
credits--;
bus.sendOrPubInternal(msg, options, null);
bus.sendOrPubInternal(sendContext);
}
}
checkDrained();

View File

@@ -10,35 +10,90 @@
*/
package io.vertx.core.eventbus.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryContext;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.Iterator;
import java.util.function.BiConsumer;
public class OutboundDeliveryContext<T> implements DeliveryContext<T> {
public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<AsyncResult<Void>> {
public final ContextInternal ctx;
public final MessageImpl message;
public final DeliveryOptions options;
public final EventBusImpl.ReplyHandler<T> replyHandler;
private final MessageImpl replierMessage;
private final Promise<Void> writePromise;
private Object trace;
Iterator<Handler<DeliveryContext>> iter;
EventBusImpl bus;
EventBusMetrics metrics;
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler<T> replyHandler) {
this(ctx, message, options, replyHandler, null);
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler<T> replyHandler, Promise<Void> writePromise) {
this(ctx, message, options, replyHandler, null, writePromise);
}
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler<T> replyHandler, MessageImpl replierMessage) {
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler<T> replyHandler, MessageImpl replierMessage, Promise<Void> writePromise) {
this.ctx = ctx;
this.message = message;
this.options = options;
this.replierMessage = replierMessage;
this.replyHandler = replyHandler;
this.writePromise = writePromise;
}
@Override
public void handle(AsyncResult<Void> event) {
written(event.cause());
}
public void written(Throwable failure) {
// Metrics
if (metrics != null) {
boolean remote = (message instanceof ClusteredMessage) && ((ClusteredMessage<?, ?>)message).isToWire();
metrics.messageSent(message.address(), !message.send, !remote, remote);
}
// Tracing
VertxTracer tracer = ctx.tracer();
if (tracer != null) {
if (message.src) {
if (replyHandler != null) {
replyHandler.trace = trace;
} else {
tracer.receiveResponse(ctx, null, trace, failure, TagExtractor.empty());
}
}
}
// Fail fast reply handler
if (failure instanceof ReplyException) {
if (replyHandler != null) {
replyHandler.fail((ReplyException) failure);
}
}
// Notify promise finally
if (writePromise != null) {
if (failure == null) {
writePromise.complete();
} else {
writePromise.fail(failure);
}
}
}
@Override
@@ -60,6 +115,11 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T> {
EventBusImpl.log.error("Failure in interceptor", t);
}
} else {
VertxTracer tracer = ctx.tracer();
if (tracer != null && message.src) {
BiConsumer<String, String> biConsumer = (String key, String val) -> message.headers().set(key, val);
trace = tracer.sendRequest(ctx, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
}
if (replierMessage == null) {
bus.sendOrPub(this);
} else {

View File

@@ -32,8 +32,6 @@ import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import java.io.Serializable;
import java.util.Objects;
@@ -176,11 +174,11 @@ public class ClusteredEventBus extends EventBusImpl {
}
@Override
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Promise<Void> writeHandler) {
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, src, this, writeHandler);
ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, src, this);
return msg;
}
@@ -237,7 +235,7 @@ public class ClusteredEventBus extends EventBusImpl {
}
} else {
log.error("Failed to send message", asyncResult.cause());
sendContext.message.written(asyncResult.cause());
sendContext.written(asyncResult.cause());
}
}
@@ -354,17 +352,6 @@ public class ClusteredEventBus extends EventBusImpl {
}
private void sendRemote(OutboundDeliveryContext<?> sendContext, ServerID theServerID, MessageImpl message) {
Object trace = messageSent(sendContext, false, true);
// SAME CODE THAN IN PARENT!!!!
VertxTracer tracer = sendContext.ctx.tracer();
if (tracer != null && sendContext.message.src) {
if (sendContext.replyHandler == null) {
tracer.receiveResponse(sendContext.ctx, null, trace, null, TagExtractor.empty());
} else {
sendContext.replyHandler.trace = trace;
}
}
// We need to deal with the fact that connecting can take some time and is async, and we cannot
// block to wait for it. So we add any sends to a pending list if not connected yet.
// Once we connect we send them.
@@ -383,7 +370,7 @@ public class ClusteredEventBus extends EventBusImpl {
holder.connect();
}
}
holder.writeMessage((ClusteredMessage) message);
holder.writeMessage(sendContext);
}
private void removeSub(String subName, ClusterNodeInfo node, Handler<AsyncResult<Void>> completionHandler) {

View File

@@ -12,10 +12,7 @@
package io.vertx.core.eventbus.impl.clustered;
import io.netty.util.CharsetUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.impl.CodecManager;
@@ -43,14 +40,15 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
private int bodyPos;
private int headersPos;
private boolean fromWire;
private boolean toWire;
public ClusteredMessage(boolean src, EventBusImpl bus) {
super(src, bus);
}
public ClusteredMessage(ServerID sender, String address, String replyAddress, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec, boolean send, boolean src, EventBusImpl bus, Promise<Void> writeHandler) {
super(address, replyAddress, headers, sentBody, messageCodec, send, src, bus, writeHandler);
MessageCodec<U, V> messageCodec, boolean send, boolean src, EventBusImpl bus) {
super(address, replyAddress, headers, sentBody, messageCodec, send, src, bus);
this.sender = sender;
}
@@ -100,6 +98,7 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
}
public Buffer encodeToWire() {
toWire = true;
int length = 1024; // TODO make this configurable
Buffer buffer = Buffer.buffer(length);
buffer.appendInt(0);
@@ -245,6 +244,10 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
return fromWire;
}
public boolean isToWire() {
return toWire;
}
protected boolean isLocal() {
return !isFromWire();
}

View File

@@ -17,6 +17,7 @@ import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import io.vertx.core.eventbus.impl.codecs.PingMessageCodec;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
@@ -46,7 +47,7 @@ class ConnectionHolder {
private final Vertx vertx;
private final EventBusMetrics metrics;
private Queue<ClusteredMessage> pending;
private Queue<OutboundDeliveryContext<?>> pending;
private NetSocket socket;
private boolean connected;
private long timeoutID = -1;
@@ -78,13 +79,13 @@ class ConnectionHolder {
}
// TODO optimise this (contention on monitor)
synchronized void writeMessage(ClusteredMessage message) {
synchronized void writeMessage(OutboundDeliveryContext<?> ctx) {
if (connected) {
Buffer data = message.encodeToWire();
Buffer data = ((ClusteredMessage)ctx.message).encodeToWire();
if (metrics != null) {
metrics.messageWritten(message.address(), data.length());
metrics.messageWritten(ctx.message.address(), data.length());
}
socket.write(data, message.write());
socket.write(data, ctx);
} else {
if (pending == null) {
if (log.isDebugEnabled()) {
@@ -92,7 +93,7 @@ class ConnectionHolder {
}
pending = new ArrayDeque<>();
}
pending.add(message);
pending.add(ctx);
}
}
@@ -108,7 +109,7 @@ class ConnectionHolder {
vertx.cancelTimer(pingTimeoutID);
}
synchronized (this) {
ClusteredMessage<?, ?> msg;
OutboundDeliveryContext<?> msg;
if (pending != null) {
while ((msg = pending.poll()) != null) {
msg.written(cause);
@@ -138,7 +139,7 @@ class ConnectionHolder {
close();
});
ClusteredMessage pingMessage =
new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, null, new PingMessageCodec(), true, true, eventBus, null);
new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, null, new PingMessageCodec(), true, true, eventBus);
Buffer data = pingMessage.encodeToWire();
socket.write(data);
});
@@ -162,12 +163,12 @@ class ConnectionHolder {
if (log.isDebugEnabled()) {
log.debug("Draining the queue for server " + serverID);
}
for (ClusteredMessage message : pending) {
Buffer data = message.encodeToWire();
for (OutboundDeliveryContext<?> ctx : pending) {
Buffer data = ((ClusteredMessage<?, ?>)ctx.message).encodeToWire();
if (metrics != null) {
metrics.messageWritten(message.address(), data.length());
metrics.messageWritten(ctx.message.address(), data.length());
}
socket.write(data, message.write());
socket.write(data, ctx);
}
}
pending = null;