mirror of
https://github.com/jlengrand/vert.x.git
synced 2026-03-10 08:51:19 +00:00
Improve event bus tracing so that the response is traced when it is sent
This commit is contained in:
@@ -398,9 +398,11 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
|
||||
}
|
||||
VertxTracer tracer = HandlerRegistration.this.context.tracer();
|
||||
if (tracer != null && !src) {
|
||||
Object trace = tracer.receiveRequest(context, message, message.isSend() ? "send" : "publish", message.headers, MessageTagExtractor.INSTANCE);
|
||||
message.trace = tracer.receiveRequest(context, message, message.isSend() ? "send" : "publish", message.headers, MessageTagExtractor.INSTANCE);
|
||||
handler.handle(message);
|
||||
tracer.sendResponse(context, null, trace, null, TagExtractor.empty());
|
||||
if (message.replyAddress == null) {
|
||||
tracer.sendResponse(context, null, message.trace, null, TagExtractor.empty());
|
||||
}
|
||||
} else {
|
||||
handler.handle(message);
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ public class MessageImpl<U, V> implements Message<V> {
|
||||
protected U sentBody;
|
||||
protected V receivedBody;
|
||||
protected boolean send;
|
||||
protected Object trace;
|
||||
|
||||
public MessageImpl(boolean src, EventBusImpl bus) {
|
||||
this.bus = bus;
|
||||
@@ -127,7 +128,9 @@ public class MessageImpl<U, V> implements Message<V> {
|
||||
}
|
||||
|
||||
protected MessageImpl createReply(Object message, DeliveryOptions options) {
|
||||
return bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName());
|
||||
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName());
|
||||
reply.trace = trace;
|
||||
return reply;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -34,8 +34,6 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
|
||||
public final EventBusImpl.ReplyHandler<T> replyHandler;
|
||||
private final Promise<Void> writePromise;
|
||||
|
||||
private Object trace;
|
||||
|
||||
Iterator<Handler<DeliveryContext>> iter;
|
||||
EventBusImpl bus;
|
||||
EventBusMetrics metrics;
|
||||
@@ -66,9 +64,9 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
|
||||
if (tracer != null) {
|
||||
if (message.src) {
|
||||
if (replyHandler != null) {
|
||||
replyHandler.trace = trace;
|
||||
replyHandler.trace = message.trace;
|
||||
} else {
|
||||
tracer.receiveResponse(ctx, null, trace, failure, TagExtractor.empty());
|
||||
tracer.receiveResponse(ctx, null, message.trace, failure, TagExtractor.empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -110,9 +108,14 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
|
||||
}
|
||||
} else {
|
||||
VertxTracer tracer = ctx.tracer();
|
||||
if (tracer != null && message.src) {
|
||||
BiConsumer<String, String> biConsumer = (String key, String val) -> message.headers().set(key, val);
|
||||
trace = tracer.sendRequest(ctx, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
|
||||
if (tracer != null) {
|
||||
if (message.src) {
|
||||
BiConsumer<String, String> biConsumer = (String key, String val) -> message.headers().set(key, val);
|
||||
message.trace = tracer.sendRequest(ctx, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
|
||||
} else if (message.trace != null) {
|
||||
// Handle failure here
|
||||
tracer.sendResponse(ctx, null, message.trace, null, TagExtractor.empty());
|
||||
}
|
||||
}
|
||||
bus.sendOrPub(this);
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import io.vertx.core.impl.ContextInternal;
|
||||
import io.vertx.test.core.Repeat;
|
||||
import io.vertx.test.core.TestUtils;
|
||||
import io.vertx.test.core.VertxTestBase;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.*;
|
||||
@@ -244,9 +245,9 @@ public abstract class EventBusTracerTestBase extends VertxTestBase {
|
||||
vertx2.eventBus().request("the_address", "msg", new DeliveryOptions().setSendTimeout(100), onFailure(failure -> {
|
||||
}));
|
||||
});
|
||||
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 4);
|
||||
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 3);
|
||||
assertEquals(Arrays.asList("sendRequest[the_address]", "receiveResponse[TIMEOUT]"), ebTracer.sendEvents);
|
||||
assertEquals(Arrays.asList("receiveRequest[the_address]", "sendResponse[]"), ebTracer.receiveEvents);
|
||||
assertEquals(Arrays.asList("receiveRequest[the_address]"), ebTracer.receiveEvents);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user