Merge pull request #3158 from vietj/event-bus-iso

Event bus improvements
This commit is contained in:
Julien Viet
2019-10-23 20:05:01 +02:00
committed by GitHub
16 changed files with 833 additions and 751 deletions

View File

@@ -17,7 +17,6 @@ import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.metrics.Measured;
/**
@@ -70,15 +69,15 @@ public interface EventBus extends Measured {
* @return a reference to this, so the API can be used fluently
*/
@Fluent
<T> EventBus request(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);
default <T> EventBus request(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler) {
return request(address, message, new DeliveryOptions(), replyHandler);
}
/**
* Like {@link #request(String, Object, Handler)} but returns a {@code Future} of the asynchronous result
*/
default <T> Future<Message<T>> request(String address, Object message) {
Promise<Message<T>> promise = Promise.promise();
request(address, message, promise);
return promise.future();
return request(address, message, new DeliveryOptions());
}
/**
@@ -91,16 +90,16 @@ public interface EventBus extends Measured {
* @return a reference to this, so the API can be used fluently
*/
@Fluent
<T> EventBus request(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler);
default <T> EventBus request(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
Future<Message<T>> reply = request(address, message, options);
reply.setHandler(replyHandler);
return this;
}
/**
* Like {@link #request(String, Object, DeliveryOptions, Handler)} but returns a {@code Future} of the asynchronous result
*/
default <T> Future<Message<T>> request(String address, Object message, DeliveryOptions options) {
Promise<Message<T>> promise = Promise.promise();
request(address, message, options, promise);
return promise.future();
}
<T> Future<Message<T>> request(String address, Object message, DeliveryOptions options);
/**
* Publish a message.<p>

View File

@@ -18,7 +18,6 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
/**
* Represents a message that is received from the event bus in a handler.
@@ -78,7 +77,9 @@ public interface Message<T> {
*
* @param message the message to reply with.
*/
void reply(Object message);
default void reply(Object message) {
reply(message, new DeliveryOptions());
}
/**
* Link {@link #reply(Object)} but allows you to specify delivery options for the reply.
@@ -99,15 +100,15 @@ public interface Message<T> {
* @param message the message to reply with.
* @param replyHandler the reply handler for the reply.
*/
<R> void replyAndRequest(Object message, Handler<AsyncResult<Message<R>>> replyHandler);
default <R> void replyAndRequest(Object message, Handler<AsyncResult<Message<R>>> replyHandler) {
replyAndRequest(message, new DeliveryOptions(), replyHandler);
}
/**
* Like {@link #replyAndRequest(Object, Handler)} but returns a {@code Future} of the asynchronous result
*/
default <R> Future<Message<R>> replyAndRequest(Object message) {
Promise<Message<R>> promise = Promise.promise();
replyAndRequest(message, promise);
return promise.future();
return replyAndRequest(message, new DeliveryOptions());
}
/**
@@ -118,16 +119,14 @@ public interface Message<T> {
* @param options delivery options
* @param replyHandler reply handler will be called when any reply from the recipient is received
*/
<R> void replyAndRequest(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler);
default <R> void replyAndRequest(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
this.<R>replyAndRequest(message, options).setHandler(replyHandler);
}
/**
* Like {@link #replyAndRequest(Object, DeliveryOptions, Handler)} but returns a {@code Future} of the asynchronous result
*/
default <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options) {
Promise<Message<R>> promise = Promise.promise();
replyAndRequest(message, options, promise);
return promise.future();
}
<R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options);
/**
* Signal to the sender that processing of this message failed.
@@ -138,6 +137,8 @@ public interface Message<T> {
* @param failureCode A failure code to pass back to the sender
* @param message A message to pass back to the sender
*/
void fail(int failureCode, String message);
default void fail(int failureCode, String message) {
reply(new ReplyException(ReplyFailure.RECIPIENT_FAILURE, failureCode, message));
}
}

View File

@@ -18,7 +18,6 @@ import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
@@ -28,15 +27,12 @@ import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
/**
* A local event bus implementation
@@ -45,7 +41,7 @@ import java.util.function.BiConsumer;
*/
public class EventBusImpl implements EventBus, MetricsProvider {
private static final Logger log = LoggerFactory.getLogger(EventBusImpl.class);
static final Logger log = LoggerFactory.getLogger(EventBusImpl.class);
private final List<Handler<DeliveryContext>> sendInterceptors = new CopyOnWriteArrayList<>();
private final List<Handler<DeliveryContext>> receiveInterceptors = new CopyOnWriteArrayList<>();
@@ -102,23 +98,22 @@ public class EventBusImpl implements EventBus, MetricsProvider {
@Override
public EventBus send(String address, Object message) {
return request(address, message, new DeliveryOptions(), null);
}
@Override
public <T> EventBus request(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler) {
return request(address, message, new DeliveryOptions(), replyHandler);
return send(address, message, new DeliveryOptions());
}
@Override
public EventBus send(String address, Object message, DeliveryOptions options) {
return request(address, message, options, null);
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
sendOrPubInternal(msg, options, null, null);
return this;
}
@Override
public <T> EventBus request(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
sendOrPubInternal(createMessage(true, true, address, options.getHeaders(), message, options.getCodecName(), null), options, replyHandler);
return this;
public <T> Future<Message<T>> request(String address, Object message, DeliveryOptions options) {
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
ReplyHandler<T> handler = createReplyHandler(msg, true, options);
sendOrPubInternal(msg, options, handler, null);
return handler.result();
}
@Override
@@ -154,7 +149,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
@Override
public EventBus publish(String address, Object message, DeliveryOptions options) {
sendOrPubInternal(createMessage(false, true, address, options.getHeaders(), message, options.getCodecName(), null), options, null);
sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null, null);
return this;
}
@@ -162,7 +157,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
public <T> MessageConsumer<T> consumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new HandlerRegistration<>(vertx, metrics, this, address, null, false, false);
return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, false);
}
@Override
@@ -177,7 +172,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
public <T> MessageConsumer<T> localConsumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new HandlerRegistration<>(vertx, metrics, this, address, null, true, false);
return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, true);
}
@Override
@@ -234,25 +229,26 @@ public class EventBusImpl implements EventBus, MetricsProvider {
return metrics;
}
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Handler<AsyncResult<Void>> writeHandler) {
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, src, this, writeHandler);
MessageImpl msg = new MessageImpl(address, headers, body, codec, send, this);
return msg;
}
protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(registration.getHandler(), "handler");
protected <T> HandlerHolder<T> addRegistration(String address,
HandlerRegistration<T> registration,
boolean replyHandler,
boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
// Objects.requireNonNull(registration.getHandler(), "handler");
LocalRegistrationResult<T> result = addLocalRegistration(address, registration, replyHandler, localOnly);
addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult);
addRegistration(result.newAddress, result.holder, completionHandler);
return result.holder;
}
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
protected <T> void addRegistration(boolean newAddress, HandlerHolder<T> holder, Handler<AsyncResult<Void>> completionHandler) {
completionHandler.handle(Future.succeededFuture());
}
@@ -269,10 +265,9 @@ public class EventBusImpl implements EventBus, MetricsProvider {
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(address, "address");
Context context = vertx.getOrCreateContext();
registration.setHandlerContext(context);
Context context = registration.context;
HandlerHolder<T> holder = new HandlerHolder<>(registration, replyHandler, localOnly, context);
HandlerHolder<T> holder = new HandlerHolder<>(registration, address, replyHandler, localOnly, context);
ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(
@@ -291,7 +286,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
protected <T> void removeRegistration(HandlerHolder<T> holder, Handler<AsyncResult<Void>> completionHandler) {
boolean last = removeLocalRegistration(holder);
removeRegistration(last ? holder : null, holder.getHandler().address(), completionHandler);
removeRegistration(last ? holder : null, holder.address, completionHandler);
}
protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, String address,
@@ -300,8 +295,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
}
private <T> boolean removeLocalRegistration(HandlerHolder<T> holder) {
String address = holder.getHandler().address();
boolean last = handlerMap.compute(address, (key, val) -> {
boolean last = handlerMap.compute(holder.address, (key, val) -> {
if (val == null) {
return null;
}
@@ -309,13 +303,12 @@ public class EventBusImpl implements EventBus, MetricsProvider {
return next.size() == 0 ? null : next;
}) == null;
if (holder.setRemoved() && holder.getContext().deploymentID() != null) {
holder.getContext().removeCloseHook(new HandlerEntry<>(address, holder.getHandler()));
holder.getContext().removeCloseHook(new HandlerEntry<>(holder.address, holder.getHandler()));
}
return last;
}
protected <T> void sendReply(MessageImpl replyMessage, MessageImpl replierMessage, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
protected <T> void sendReply(MessageImpl replyMessage, DeliveryOptions options, ReplyHandler<T> replyHandler) {
if (replyMessage.address() == null) {
throw new IllegalStateException("address not specified");
} else {
@@ -324,34 +317,14 @@ public class EventBusImpl implements EventBus, MetricsProvider {
// Guarantees the order when there is no current context in clustered mode
ctx = sendNoContext;
}
ReplyHandler<T> handler = createReplyHandler(replyMessage, replierMessage.src, options, replyHandler);
new OutboundDeliveryContext<>(ctx, replyMessage, options, handler, replierMessage).next();
sendOrPubInternal(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, null));
}
}
protected <T> void sendReply(OutboundDeliveryContext<T> sendContext, MessageImpl replierMessage) {
sendOrPub(sendContext);
}
protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
sendLocally(sendContext);
}
protected final Object messageSent(OutboundDeliveryContext<?> sendContext, boolean local, boolean remote) {
MessageImpl msg = sendContext.message;
if (metrics != null) {
MessageImpl message = msg;
metrics.messageSent(message.address(), !message.send, local, remote);
}
VertxTracer tracer = sendContext.ctx.tracer();
if (tracer != null && msg.src) {
BiConsumer<String, String> biConsumer = (String key, String val) -> msg.headers().set(key, val);
return tracer.sendRequest(sendContext.ctx, msg, msg.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
} else {
return null;
}
}
protected void callCompletionHandlerAsync(Handler<AsyncResult<Void>> completionHandler) {
if (completionHandler != null) {
vertx.runOnContext(v -> {
@@ -361,29 +334,11 @@ public class EventBusImpl implements EventBus, MetricsProvider {
}
private <T> void sendLocally(OutboundDeliveryContext<T> sendContext) {
Object trace = messageSent(sendContext, true, false);
ReplyException failure = deliverMessageLocally(sendContext.message);
if (failure != null) {
// no handlers
VertxTracer tracer = sendContext.ctx.tracer();
if (sendContext.replyHandler != null) {
sendContext.replyHandler.trace = trace;
sendContext.replyHandler.fail(failure);
} else {
if (tracer != null && sendContext.message.src) {
tracer.receiveResponse(sendContext.ctx, null, trace, failure, TagExtractor.empty());
}
}
sendContext.written(failure);
} else {
failure = null;
VertxTracer tracer = sendContext.ctx.tracer();
if (tracer != null && sendContext.message.src) {
if (sendContext.replyHandler == null) {
tracer.receiveResponse(sendContext.ctx, null, trace, null, TagExtractor.empty());
} else {
sendContext.replyHandler.trace = trace;
}
}
sendContext.written(null);
}
}
@@ -402,10 +357,6 @@ public class EventBusImpl implements EventBus, MetricsProvider {
}
if (holder != null) {
deliverToHandler(msg, holder);
Handler<AsyncResult<Void>> handler = msg.writeHandler;
if (handler != null) {
handler.handle(Future.succeededFuture());
}
} else {
// RACY issue !!!!!
}
@@ -417,22 +368,13 @@ public class EventBusImpl implements EventBus, MetricsProvider {
for (HandlerHolder holder: handlers) {
deliverToHandler(msg, holder);
}
Handler<AsyncResult<Void>> handler = msg.writeHandler;
if (handler != null) {
handler.handle(Future.succeededFuture());
}
}
return null;
} else {
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), 0);
}
ReplyException failure = new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
Handler<AsyncResult<Void>> handler = msg.writeHandler;
if (handler != null) {
handler.handle(Future.failedFuture(failure));
}
return failure;
return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
}
}
@@ -446,169 +388,64 @@ public class EventBusImpl implements EventBus, MetricsProvider {
return "__vertx.reply." + Long.toString(replySequence.incrementAndGet());
}
private <T> ReplyHandler<T> createReplyHandler(MessageImpl message,
<T> ReplyHandler<T> createReplyHandler(MessageImpl message,
boolean src,
DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
if (replyHandler != null) {
long timeout = options.getSendTimeout();
String replyAddress = generateReplyAddress();
message.setReplyAddress(replyAddress);
HandlerRegistration<T> registration = new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, src);
ReplyHandler<T> handler = new ReplyHandler<>(registration, timeout);
handler.result.future().setHandler(replyHandler);
registration.handler(handler);
return handler;
} else {
return null;
}
DeliveryOptions options) {
long timeout = options.getSendTimeout();
String replyAddress = generateReplyAddress();
message.setReplyAddress(replyAddress);
ReplyHandler<T> handler = new ReplyHandler<>(this, vertx.getOrCreateContext(), replyAddress, message.address, src, timeout);
handler.register();
return handler;
}
public class ReplyHandler<T> implements Handler<Message<T>> {
final Promise<Message<T>> result;
final HandlerRegistration<T> registration;
final long timeoutID;
public Object trace;
ReplyHandler(HandlerRegistration<T> registration, long timeout) {
this.result = Promise.promise();
this.registration = registration;
this.timeoutID = vertx.setTimer(timeout, id -> {
fail(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + registration.address + ", repliedAddress: " + registration.repliedAddress));
});
}
private void trace(Object reply, Throwable failure) {
ContextInternal ctx = registration.handlerContext();
VertxTracer tracer = ctx.tracer();
if (tracer != null && registration.src) {
tracer.receiveResponse(ctx, reply, trace, failure, TagExtractor.empty());
}
}
void fail(ReplyException failure) {
registration.unregister();
if (metrics != null) {
metrics.replyFailure(registration.repliedAddress, failure.failureType());
}
trace(null, failure);
result.tryFail(failure);
}
@Override
public void handle(Message<T> reply) {
vertx.cancelTimer(timeoutID);
if (reply.body() instanceof ReplyException) {
// This is kind of clunky - but hey-ho
fail((ReplyException) reply.body());
} else {
trace(reply, null);
result.complete(reply);
}
}
}
public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
checkStarted();
ReplyHandler<T> handler = createReplyHandler(message, true, options, replyHandler);
public <T> OutboundDeliveryContext<T> newSendContext(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler, Promise<Void> writePromise) {
ContextInternal ctx = vertx.getContext();
if (ctx == null) {
// Guarantees the order when there is no current context in clustered mode
ctx = sendNoContext;
}
OutboundDeliveryContext<T> sendContext = new OutboundDeliveryContext<>(ctx, message, options, handler);
sendContext.next();
return new OutboundDeliveryContext<>(ctx, message, options, handler, writePromise);
}
protected class OutboundDeliveryContext<T> implements DeliveryContext<T> {
public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
checkStarted();
senderCtx.iter = sendInterceptors.iterator();
senderCtx.bus = this;
senderCtx.metrics = metrics;
senderCtx.next();
}
public final ContextInternal ctx;
public final MessageImpl message;
public final DeliveryOptions options;
public final Iterator<Handler<DeliveryContext>> iter;
public final ReplyHandler<T> replyHandler;
private final MessageImpl replierMessage;
private OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler) {
this(ctx, message, options, replyHandler, null);
}
private OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler, MessageImpl replierMessage) {
this.ctx = ctx;
this.message = message;
this.options = options;
this.iter = sendInterceptors.iterator();
this.replierMessage = replierMessage;
this.replyHandler = replyHandler;
}
@Override
public Message<T> message() {
return message;
}
@Override
public void next() {
if (iter.hasNext()) {
Handler<DeliveryContext> handler = iter.next();
try {
if (handler != null) {
handler.handle(this);
} else {
next();
}
} catch (Throwable t) {
log.error("Failure in interceptor", t);
}
} else {
if (replierMessage == null) {
sendOrPub(this);
} else {
sendReply(this, replierMessage);
}
}
}
@Override
public boolean send() {
return message.isSend();
}
@Override
public Object body() {
return message.sentBody;
}
public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler, Promise<Void> writePromise) {
checkStarted();
sendOrPubInternal(newSendContext(message, options, handler, writePromise));
}
private void unregisterAll() {
// Unregister all handlers explicitly - don't rely on context hooks
for (ConcurrentCyclicSequence<HandlerHolder> handlers: handlerMap.values()) {
for (HandlerHolder holder: handlers) {
holder.getHandler().unregister();
holder.getHandler().unregister(ar -> {});
}
}
}
private <T> void deliverToHandler(MessageImpl msg, HandlerHolder<T> holder) {
// Each handler gets a fresh copy
MessageImpl copied = msg.copyBeforeReceive(holder.getHandler().src);
if (metrics != null) {
metrics.scheduleMessage(holder.getHandler().getMetric(), msg.isLocal());
}
MessageImpl copied = msg.copyBeforeReceive();
holder.getContext().runOnContext((v) -> {
// Need to check handler is still there - the handler might have been removed after the message were sent but
// before it was received
try {
if (!holder.isRemoved()) {
holder.getHandler().handle(copied);
holder.getHandler().receive(copied);
}
} finally {
if (holder.isReplyHandler()) {
holder.getHandler().unregister();
holder.getHandler().unregister(ar -> {});
}
}
});

View File

@@ -12,23 +12,27 @@
package io.vertx.core.eventbus.impl;
import io.vertx.core.Context;
import io.vertx.core.spi.metrics.EventBusMetrics;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class HandlerHolder<T> {
private final Context context;
private final HandlerRegistration<T> handler;
private final boolean replyHandler;
private final boolean localOnly;
public final Context context;
public final String address;
public final HandlerRegistration<T> handler;
public final boolean replyHandler;
public final boolean localOnly;
private boolean removed;
public HandlerHolder(HandlerRegistration<T> handler, boolean replyHandler, boolean localOnly,
public HandlerHolder(HandlerRegistration<T> handler,
String address,
boolean replyHandler,
boolean localOnly,
Context context) {
this.context = context;
this.handler = handler;
this.address = address;
this.replyHandler = replyHandler;
this.localOnly = localOnly;
}

View File

@@ -8,355 +8,96 @@
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.eventbus.impl;
import io.vertx.core.*;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryContext;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.streams.ReadStream;
import java.util.*;
import java.util.Iterator;
/*
* This class is optimised for performance when used on the same event loop it was created on.
* However it can be used safely from other threads.
*
* The internal state is protected using the synchronized keyword. If always used on the same event loop, then
* we benefit from biased locking which makes the overhead of synchronized near zero.
*/
public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Message<T>> {
abstract class HandlerRegistration<T> {
private static final Logger log = LoggerFactory.getLogger(HandlerRegistration.class);
public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
private final Vertx vertx;
private final EventBusMetrics metrics;
private final EventBusImpl eventBus;
final String address;
final String repliedAddress;
private final boolean localOnly;
protected final boolean src;
public final ContextInternal context;
public final EventBusImpl bus;
public final String address;
public final boolean src;
private HandlerHolder<T> registered;
private Handler<Message<T>> handler;
private ContextInternal handlerContext;
private AsyncResult<Void> result;
private Handler<AsyncResult<Void>> completionHandler;
private Handler<Void> endHandler;
private Handler<Message<T>> discardHandler;
private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES;
private final Queue<Message<T>> pending = new ArrayDeque<>(8);
private long demand = Long.MAX_VALUE;
private Object metric;
public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address,
String repliedAddress, boolean localOnly, boolean src) {
this.vertx = vertx;
this.metrics = metrics;
this.eventBus = eventBus;
this.address = address;
this.repliedAddress = repliedAddress;
this.localOnly = localOnly;
HandlerRegistration(ContextInternal context,
EventBusImpl bus,
String address,
boolean src) {
this.context = context;
this.bus = bus;
this.src = src;
this.address = address;
}
@Override
public MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages) {
Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
List<Message<T>> discarded;
Handler<Message<T>> discardHandler;
synchronized (this) {
this.maxBufferedMessages = maxBufferedMessages;
int overflow = pending.size() - maxBufferedMessages;
if (overflow <= 0) {
return this;
}
discardHandler = this.discardHandler;
if (discardHandler == null) {
while (pending.size() > maxBufferedMessages) {
pending.poll();
}
return this;
}
discarded = new ArrayList<>(overflow);
while (pending.size() > maxBufferedMessages) {
discarded.add(pending.poll());
}
final void receive(MessageImpl<?, T> msg) {
if (bus.metrics != null) {
bus.metrics.scheduleMessage(metric, msg.isLocal());
}
for (Message<T> msg : discarded) {
discardHandler.handle(msg);
doReceive(msg);
}
protected abstract void doReceive(Message<T> msg);
protected abstract void doUnregister();
synchronized Future<Void> register(String repliedAddress, boolean localOnly) {
if (registered != null) {
throw new IllegalStateException();
}
return this;
}
@Override
public synchronized int getMaxBufferedMessages() {
return maxBufferedMessages;
}
@Override
public String address() {
return address;
}
@Override
public synchronized void completionHandler(Handler<AsyncResult<Void>> completionHandler) {
Objects.requireNonNull(completionHandler);
if (result != null) {
AsyncResult<Void> value = result;
vertx.runOnContext(v -> completionHandler.handle(value));
} else {
this.completionHandler = completionHandler;
Promise<Void> p = Promise.promise();
Future<Void> fut = p.future();
registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, p);
if (bus.metrics != null) {
metric = bus.metrics.handlerRegistered(address, repliedAddress);
}
return fut;
}
@Override
public Future<Void> unregister() {
Promise<Void> promise = Promise.promise();
doUnregister(promise);
return promise.future();
}
@Override
public void unregister(Handler<AsyncResult<Void>> completionHandler) {
doUnregister(completionHandler);
}
private void doUnregister(Handler<AsyncResult<Void>> doneHandler) {
synchronized (this) {
if (handler == null) {
callHandlerAsync(Future.succeededFuture(), doneHandler);
return;
}
handler = null;
if (endHandler != null) {
Handler<Void> theEndHandler = endHandler;
Handler<AsyncResult<Void>> handler = doneHandler;
doneHandler = ar -> {
theEndHandler.handle(null);
if (handler != null) {
handler.handle(ar);
}
};
}
if (pending.size() > 0 && discardHandler != null) {
Deque<Message<T>> discarded = new ArrayDeque<>(pending);
Handler<Message<T>> handler = discardHandler;
handlerContext.runOnContext(v -> {
Message<T> msg;
while ((msg = discarded.poll()) != null) {
handler.handle(msg);
}
});
}
pending.clear();
discardHandler = null;
eventBus.removeRegistration(registered, doneHandler);
registered = null;
if (result == null) {
result = Future.failedFuture("Consumer unregistered before registration completed");
callHandlerAsync(result, completionHandler);
} else {
EventBusMetrics metrics = eventBus.metrics;
if (metrics != null) {
metrics.handlerUnregistered(metric);
}
}
}
}
private void callHandlerAsync(AsyncResult<Void> result, Handler<AsyncResult<Void>> completionHandler) {
if (completionHandler != null) {
vertx.runOnContext(v -> completionHandler.handle(result));
}
}
synchronized void setHandlerContext(Context context) {
handlerContext = (ContextInternal) context;
}
public synchronized void setResult(AsyncResult<Void> result) {
if (this.result != null) {
return;
}
this.result = result;
if (result.failed()) {
log.error("Failed to propagate registration for handler " + handler + " and address " + address);
} else {
if (metrics != null) {
metric = metrics.handlerRegistered(address, repliedAddress);
}
callHandlerAsync(result, completionHandler);
}
}
@Override
public void handle(Message<T> message) {
Handler<Message<T>> theHandler;
ContextInternal ctx;
synchronized (this) {
if (registered == null) {
return;
} else if (demand == 0L) {
if (pending.size() < maxBufferedMessages) {
pending.add(message);
} else {
if (discardHandler != null) {
discardHandler.handle(message);
} else {
log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address);
}
}
return;
} else {
if (pending.size() > 0) {
pending.add(message);
message = pending.poll();
}
if (demand != Long.MAX_VALUE) {
demand--;
}
theHandler = handler;
}
ctx = handlerContext;
}
deliver(theHandler, message, ctx);
}
private void deliver(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
// Handle the message outside the sync block
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714
String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}
InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl<?, T>) message, theHandler, context);
deliveryCtx.context.dispatch(v -> {
deliveryCtx.next();
});
checkNextTick();
}
ContextInternal handlerContext() {
return handlerContext;
}
private synchronized void checkNextTick() {
// Check if there are more pending messages in the queue that can be processed next time around
if (!pending.isEmpty() && demand > 0L) {
handlerContext.runOnContext(v -> {
Message<T> message;
Handler<Message<T>> theHandler;
ContextInternal ctx;
synchronized (HandlerRegistration.this) {
if (demand == 0L || (message = pending.poll()) == null) {
return;
}
if (demand != Long.MAX_VALUE) {
demand--;
}
theHandler = handler;
ctx = handlerContext;
}
deliver(theHandler, message, ctx);
});
}
}
/*
* Internal API for testing purposes.
*/
public synchronized void discardHandler(Handler<Message<T>> handler) {
this.discardHandler = handler;
}
@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
if (h != null) {
synchronized (this) {
handler = h;
if (registered == null) {
registered = eventBus.addRegistration(address, this, repliedAddress != null, localOnly);
}
}
return this;
}
this.unregister();
return this;
}
@Override
public ReadStream<T> bodyStream() {
return new BodyReadStream<>(this);
}
@Override
public synchronized boolean isRegistered() {
return registered != null;
}
@Override
public synchronized MessageConsumer<T> pause() {
demand = 0L;
return this;
}
@Override
public MessageConsumer<T> resume() {
return fetch(Long.MAX_VALUE);
}
@Override
public synchronized MessageConsumer<T> fetch(long amount) {
if (amount < 0) {
throw new IllegalArgumentException();
public void unregister(Handler<AsyncResult<Void>> completionHandler) {
doUnregister();
synchronized (this) {
if (registered != null) {
bus.removeRegistration(registered, completionHandler);
registered = null;
if (bus.metrics != null) {
bus.metrics.handlerUnregistered(metric);
metric = null;
}
} else {
if (completionHandler != null) {
context.owner().runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
}
}
}
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (demand > 0L) {
checkNextTick();
}
return this;
}
@Override
public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
if (endHandler != null) {
// We should use the HandlerHolder context to properly do this (needs small refactoring)
Context endCtx = vertx.getOrCreateContext();
this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
} else {
this.endHandler = null;
}
return this;
void dispatch(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl<?, T>) message, theHandler, context);
deliveryCtx.dispatch();
}
@Override
public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
return this;
}
public Handler<Message<T>> getHandler() {
return handler;
}
public Object getMetric() {
return metric;
}
protected class InboundDeliveryContext implements DeliveryContext<T> {
private class InboundDeliveryContext implements DeliveryContext<T> {
private final MessageImpl<?, T> message;
private final Iterator<Handler<DeliveryContext>> iter;
@@ -366,8 +107,14 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
private InboundDeliveryContext(MessageImpl<?, T> message, Handler<Message<T>> handler, ContextInternal context) {
this.message = message;
this.handler = handler;
this.iter = eventBus.receiveInterceptors();
this.context = message.src ? context : context.duplicate();
this.iter = message.bus.receiveInterceptors();
this.context = context;
}
void dispatch() {
context.dispatch(v -> {
next();
});
}
@Override
@@ -397,25 +144,28 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
local = false;
}
}
Object m = metric;
try {
if (metrics != null) {
metrics.beginHandleMessage(metric, local);
if (bus.metrics != null) {
bus.metrics.beginHandleMessage(m, local);
}
VertxTracer tracer = handlerContext.tracer();
VertxTracer tracer = context.tracer();
if (tracer != null && !src) {
Object trace = tracer.receiveRequest(context, message, message.isSend() ? "send" : "publish", message.headers, MessageTagExtractor.INSTANCE);
message.trace = tracer.receiveRequest(context, message, message.isSend() ? "send" : "publish", message.headers, MessageTagExtractor.INSTANCE);
handler.handle(message);
tracer.sendResponse(context, null, trace, null, TagExtractor.empty());
if (message.replyAddress == null) {
tracer.sendResponse(context, null, message.trace, null, TagExtractor.empty());
}
} else {
handler.handle(message);
}
if (metrics != null) {
metrics.endHandleMessage(metric, null);
if (bus.metrics != null) {
bus.metrics.endHandleMessage(m, null);
}
} catch (Exception e) {
log.error("Failed to handleMessage. address: " + message.address(), e);
if (metrics != null) {
metrics.endHandleMessage(metric, e);
if (bus.metrics != null) {
bus.metrics.endHandleMessage(m, e);
}
context.reportException(e);
}
@@ -432,5 +182,4 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
return message.receivedBody;
}
}
}

View File

@@ -0,0 +1,284 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.eventbus.impl;
import io.vertx.core.*;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import java.util.*;
/*
* This class is optimised for performance when used on the same event loop it was created on.
* However it can be used safely from other threads.
*
* The internal state is protected using the synchronized keyword. If always used on the same event loop, then
* we benefit from biased locking which makes the overhead of synchronized near zero.
*/
public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements MessageConsumer<T> {
private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class);
private static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
private final Vertx vertx;
private final ContextInternal context;
private final EventBusImpl eventBus;
private final String address;
private final boolean localOnly;
private Handler<Message<T>> handler;
private Handler<AsyncResult<Void>> completionHandler;
private Handler<Void> endHandler;
private Handler<Message<T>> discardHandler;
private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES;
private Queue<Message<T>> pending = new ArrayDeque<>(8);
private long demand = Long.MAX_VALUE;
private Future<Void> result;
MessageConsumerImpl(Vertx vertx, ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly) {
super(context, eventBus, address, false);
this.vertx = vertx;
this.context = context;
this.eventBus = eventBus;
this.address = address;
this.localOnly = localOnly;
}
@Override
public MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages) {
Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
List<Message<T>> discarded;
Handler<Message<T>> discardHandler;
synchronized (this) {
this.maxBufferedMessages = maxBufferedMessages;
int overflow = pending.size() - maxBufferedMessages;
if (overflow <= 0) {
return this;
}
discardHandler = this.discardHandler;
if (discardHandler == null) {
while (pending.size() > maxBufferedMessages) {
pending.poll();
}
return this;
}
discarded = new ArrayList<>(overflow);
while (pending.size() > maxBufferedMessages) {
discarded.add(pending.poll());
}
}
for (Message<T> msg : discarded) {
discardHandler.handle(msg);
}
return this;
}
@Override
public synchronized int getMaxBufferedMessages() {
return maxBufferedMessages;
}
@Override
public String address() {
return address;
}
@Override
public synchronized void completionHandler(Handler<AsyncResult<Void>> handler) {
Objects.requireNonNull(handler);
completionHandler = handler;
checkCompletionHandler();
}
@Override
public Future<Void> unregister() {
// Todo when we support multiple listeners per future
Promise<Void> promise = Promise.promise();
unregister(promise);
return promise.future();
}
protected synchronized void doUnregister() {
handler = null;
if (endHandler != null) {
endHandler.handle(null);
}
if (pending.size() > 0 && discardHandler != null) {
Queue<Message<T>> discarded = pending;
Handler<Message<T>> handler = discardHandler;
pending = new ArrayDeque<>();
context.runOnContext(v -> {
Message<T> msg;
while ((msg = discarded.poll()) != null) {
handler.handle(msg);
}
});
}
result = Future.failedFuture("blah");
checkCompletionHandler();
discardHandler = null;
result = null;
}
protected void doReceive(Message<T> message) {
Handler<Message<T>> theHandler;
synchronized (this) {
if (!isRegistered()) {
return;
} else if (demand == 0L) {
if (pending.size() < maxBufferedMessages) {
pending.add(message);
} else {
if (discardHandler != null) {
discardHandler.handle(message);
} else {
log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address);
}
}
return;
} else {
if (pending.size() > 0) {
pending.add(message);
message = pending.poll();
}
if (demand != Long.MAX_VALUE) {
demand--;
}
theHandler = handler;
}
}
deliver(theHandler, message);
}
private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
// Handle the message outside the sync block
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714
String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}
dispatch(theHandler, message, context.duplicate());
checkNextTick();
}
private synchronized void checkNextTick() {
// Check if there are more pending messages in the queue that can be processed next time around
if (!pending.isEmpty() && demand > 0L) {
context.runOnContext(v -> {
Message<T> message;
Handler<Message<T>> theHandler;
synchronized (MessageConsumerImpl.this) {
if (demand == 0L || (message = pending.poll()) == null) {
return;
}
if (demand != Long.MAX_VALUE) {
demand--;
}
theHandler = handler;
}
deliver(theHandler, message);
});
}
}
/*
* Internal API for testing purposes.
*/
public synchronized void discardHandler(Handler<Message<T>> handler) {
this.discardHandler = handler;
}
@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
if (h != null) {
synchronized (this) {
handler = h;
if (result == null) {
result = register(null, localOnly);
result.setHandler(ar -> checkCompletionHandler());
}
}
} else {
unregister();
}
return this;
}
private synchronized void checkCompletionHandler() {
Handler<AsyncResult<Void>> completionHandler = this.completionHandler;
Future<Void> result = this.result;
if (completionHandler != null && result != null && result.isComplete()) {
this.completionHandler = null;
this.result = null;
context.runOnContext(v -> {
completionHandler.handle(result);
});
}
}
@Override
public ReadStream<T> bodyStream() {
return new BodyReadStream<>(this);
}
@Override
public synchronized MessageConsumer<T> pause() {
demand = 0L;
return this;
}
@Override
public MessageConsumer<T> resume() {
return fetch(Long.MAX_VALUE);
}
@Override
public synchronized MessageConsumer<T> fetch(long amount) {
if (amount < 0) {
throw new IllegalArgumentException();
}
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (demand > 0L) {
checkNextTick();
}
return this;
}
@Override
public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
if (endHandler != null) {
// We should use the HandlerHolder context to properly do this (needs small refactoring)
Context endCtx = vertx.getOrCreateContext();
this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
} else {
this.endHandler = null;
}
return this;
}
@Override
public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
return this;
}
public synchronized Handler<Message<T>> getHandler() {
return handler;
}
}

View File

@@ -11,8 +11,7 @@
package io.vertx.core.eventbus.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.*;
import io.vertx.core.http.CaseInsensitiveHeaders;
@@ -31,41 +30,34 @@ public class MessageImpl<U, V> implements Message<V> {
protected MessageCodec<U, V> messageCodec;
protected final EventBusImpl bus;
public final boolean src;
protected String address;
protected String replyAddress;
protected MultiMap headers;
protected U sentBody;
protected V receivedBody;
protected boolean send;
protected Handler<AsyncResult<Void>> writeHandler;
protected Object trace;
public MessageImpl(boolean src, EventBusImpl bus) {
public MessageImpl(EventBusImpl bus) {
this.bus = bus;
this.src = src;
}
public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody,
public MessageImpl(String address, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec,
boolean send, boolean src, EventBusImpl bus,
Handler<AsyncResult<Void>> writeHandler) {
boolean send, EventBusImpl bus) {
this.messageCodec = messageCodec;
this.address = address;
this.replyAddress = replyAddress;
this.headers = headers;
this.sentBody = sentBody;
this.send = send;
this.bus = bus;
this.src = src;
this.writeHandler = writeHandler;
}
protected MessageImpl(MessageImpl<U, V> other, boolean src) {
protected MessageImpl(MessageImpl<U, V> other) {
this.bus = other.bus;
this.address = other.address;
this.replyAddress = other.replyAddress;
this.messageCodec = other.messageCodec;
this.src = src;
if (other.headers != null) {
List<Map.Entry<String, String>> entries = other.headers.entries();
this.headers = new CaseInsensitiveHeaders();
@@ -78,11 +70,10 @@ public class MessageImpl<U, V> implements Message<V> {
this.receivedBody = messageCodec.transform(other.sentBody);
}
this.send = other.send;
this.writeHandler = other.writeHandler;
}
public MessageImpl<U, V> copyBeforeReceive(boolean src) {
return new MessageImpl<>(this, src);
public MessageImpl<U, V> copyBeforeReceive() {
return new MessageImpl<>(this);
}
@Override
@@ -112,34 +103,32 @@ public class MessageImpl<U, V> implements Message<V> {
return replyAddress;
}
@Override
public void fail(int failureCode, String message) {
reply(new ReplyException(ReplyFailure.RECIPIENT_FAILURE, failureCode, message));
}
@Override
public void reply(Object message) {
replyAndRequest(message, new DeliveryOptions(), null);
}
@Override
public <R> void replyAndRequest(Object message, Handler<AsyncResult<Message<R>>> replyHandler) {
replyAndRequest(message, new DeliveryOptions(), replyHandler);
}
@Override
public void reply(Object message, DeliveryOptions options) {
replyAndRequest(message, options, null);
if (replyAddress != null) {
MessageImpl reply = createReply(message, options);
bus.sendReply(reply, options, null);
}
}
@Override
public <R> void replyAndRequest(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
public <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options) {
if (replyAddress != null) {
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName(), null);
bus.sendReply(reply, this, options, replyHandler);
MessageImpl reply = createReply(message, options);
ReplyHandler<R> handler = bus.createReplyHandler(reply, false, options);
bus.sendReply(reply, options, handler);
return handler.result();
} else {
throw new IllegalStateException();
}
}
protected MessageImpl createReply(Object message, DeliveryOptions options) {
MessageImpl reply = bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName());
reply.trace = trace;
return reply;
}
@Override
public boolean isSend() {
return send;
@@ -149,10 +138,6 @@ public class MessageImpl<U, V> implements Message<V> {
this.replyAddress = replyAddress;
}
public Handler<AsyncResult<Void>> writeHandler() {
return writeHandler;
}
public MessageCodec<U, V> codec() {
return messageCodec;
}

View File

@@ -33,7 +33,7 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
private final EventBusImpl bus;
private final boolean send;
private final String address;
private final Queue<MessageImpl<T, ?>> pending = new ArrayDeque<>();
private final Queue<OutboundDeliveryContext<T>> pending = new ArrayDeque<>();
private final MessageConsumer<Integer> creditConsumer;
private DeliveryOptions options;
private int maxSize = DEFAULT_WRITE_QUEUE_MAX_SIZE;
@@ -89,13 +89,28 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
@Override
public void write(T data, Handler<AsyncResult<Void>> handler) {
if (send) {
doSend(data, null, handler);
} else {
MessageImpl msg = bus.createMessage(false, true, address, options.getHeaders(), data, options.getCodecName(), handler);
msg.writeHandler = handler;
bus.sendOrPubInternal(msg, options, null);
Promise<Void> promise = null;
if (handler != null) {
promise = Promise.promise();
promise.future().setHandler(handler);
}
write(data, promise);
}
private void write(T data, Promise<Void> handler) {
MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), data, options.getCodecName());
OutboundDeliveryContext<T> sendCtx = bus.newSendContext(msg, options, null, handler);
if (send) {
synchronized (this) {
if (credits > 0) {
credits--;
} else {
pending.add(sendCtx);
return;
}
}
}
bus.sendOrPubInternal(msg, options, null, handler);
}
@Override
@@ -162,25 +177,15 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
super.finalize();
}
private synchronized <R> void doSend(T data, Handler<AsyncResult<Message<R>>> replyHandler, Handler<AsyncResult<Void>> handler) {
MessageImpl msg = bus.createMessage(true, true, address, options.getHeaders(), data, options.getCodecName(), handler);
if (credits > 0) {
credits--;
bus.sendOrPubInternal(msg, options, replyHandler);
} else {
pending.add(msg);
}
}
private synchronized void doReceiveCredit(int credit) {
credits += credit;
while (credits > 0) {
MessageImpl<T, ?> msg = pending.poll();
if (msg == null) {
OutboundDeliveryContext<T> sendContext = pending.poll();
if (sendContext == null) {
break;
} else {
credits--;
bus.sendOrPubInternal(msg, options, null);
bus.sendOrPubInternal(sendContext);
}
}
checkDrained();

View File

@@ -0,0 +1,135 @@
/*
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.eventbus.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryContext;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.Iterator;
import java.util.function.BiConsumer;
public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<AsyncResult<Void>> {
public final ContextInternal ctx;
public final MessageImpl message;
public final DeliveryOptions options;
public final ReplyHandler<T> replyHandler;
private final Promise<Void> writePromise;
private boolean src;
Iterator<Handler<DeliveryContext>> iter;
EventBusImpl bus;
EventBusMetrics metrics;
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler, Promise<Void> writePromise) {
this.ctx = ctx;
this.message = message;
this.options = options;
this.replyHandler = replyHandler;
this.writePromise = writePromise;
}
@Override
public void handle(AsyncResult<Void> event) {
written(event.cause());
}
public void written(Throwable failure) {
// Metrics
if (metrics != null) {
boolean remote = (message instanceof ClusteredMessage) && ((ClusteredMessage<?, ?>)message).isToWire();
metrics.messageSent(message.address(), !message.send, !remote, remote);
}
// Tracing
VertxTracer tracer = ctx.tracer();
if (tracer != null) {
if (src) {
if (replyHandler != null) {
replyHandler.trace = message.trace;
} else {
tracer.receiveResponse(ctx, null, message.trace, failure, TagExtractor.empty());
}
}
}
// Fail fast reply handler
if (failure instanceof ReplyException) {
if (replyHandler != null) {
replyHandler.fail((ReplyException) failure);
}
}
// Notify promise finally
if (writePromise != null) {
if (failure == null) {
writePromise.complete();
} else {
writePromise.fail(failure);
}
}
}
@Override
public Message<T> message() {
return message;
}
@Override
public void next() {
if (iter.hasNext()) {
Handler<DeliveryContext> handler = iter.next();
try {
if (handler != null) {
handler.handle(this);
} else {
next();
}
} catch (Throwable t) {
EventBusImpl.log.error("Failure in interceptor", t);
}
} else {
VertxTracer tracer = ctx.tracer();
if (tracer != null) {
if (message.trace == null) {
src = true;
BiConsumer<String, String> biConsumer = (String key, String val) -> message.headers().set(key, val);
message.trace = tracer.sendRequest(ctx, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
} else {
// Handle failure here
tracer.sendResponse(ctx, null, message.trace, null, TagExtractor.empty());
}
}
bus.sendOrPub(this);
}
}
@Override
public boolean send() {
return message.isSend();
}
@Override
public Object body() {
return message.sentBody;
}
}

View File

@@ -0,0 +1,80 @@
package io.vertx.core.eventbus.impl;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
class ReplyHandler<T> extends HandlerRegistration<T> implements Handler<Message<T>> {
private final EventBusImpl eventBus;
private final ContextInternal context;
private final Promise<Message<T>> result;
private final long timeoutID;
private final boolean src;
private final String repliedAddress;
Object trace;
ReplyHandler(EventBusImpl eventBus, ContextInternal context, String address, String repliedAddress, boolean src, long timeout) {
super(context, eventBus, address, src);
this.eventBus = eventBus;
this.context = context;
this.result = Promise.promise();
this.src = src;
this.repliedAddress = repliedAddress;
this.timeoutID = eventBus.vertx.setTimer(timeout, id -> {
fail(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress));
});
}
private void trace(Object reply, Throwable failure) {
VertxTracer tracer = context.tracer();
if (tracer != null && src) {
tracer.receiveResponse(context, reply, trace, failure, TagExtractor.empty());
}
}
Future<Message<T>> result() {
return result.future();
}
void fail(ReplyException failure) {
unregister(ar -> {});
if (eventBus.metrics != null) {
eventBus.metrics.replyFailure(repliedAddress, failure.failureType());
}
trace(null, failure);
result.tryFail(failure);
}
@Override
protected void doReceive(Message<T> reply) {
dispatch(this, reply, context);
}
@Override
protected void doUnregister() {
}
void register() {
register(repliedAddress, true);
}
@Override
public void handle(Message<T> reply) {
eventBus.vertx.cancelTimer(timeoutID);
if (reply.body() instanceof ReplyException) {
// This is kind of clunky - but hey-ho
fail((ReplyException) reply.body());
} else {
trace(reply, null);
result.complete(reply);
}
}
}

View File

@@ -19,6 +19,7 @@ import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.HAManager;
import io.vertx.core.impl.VertxInternal;
@@ -31,11 +32,8 @@ import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import java.io.Serializable;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -176,22 +174,20 @@ public class ClusteredEventBus extends EventBusImpl {
}
@Override
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Handler<AsyncResult<Void>> writeHandler) {
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, src, this, writeHandler);
ClusteredMessage msg = new ClusteredMessage(serverID, address, headers, body, codec, send, this);
return msg;
}
@Override
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !replyHandler && !localOnly) {
protected <T> void addRegistration(boolean newAddress, HandlerHolder<T> holder, Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !holder.replyHandler && !holder.localOnly) {
// Propagate the information
subs.add(address, nodeInfo, completionHandler);
ownSubs.add(address);
subs.add(holder.address, nodeInfo, completionHandler);
ownSubs.add(holder.address);
} else {
completionHandler.handle(Future.succeededFuture());
}
@@ -208,22 +204,21 @@ public class ClusteredEventBus extends EventBusImpl {
}
}
@Override
protected <T> void sendReply(OutboundDeliveryContext<T> sendContext, MessageImpl replierMessage) {
clusteredSendReply(((ClusteredMessage) replierMessage).getSender(), sendContext);
}
@Override
protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
if (sendContext.options.isLocalOnly()) {
super.sendOrPub(sendContext);
} else if (Vertx.currentContext() != sendContext.ctx) {
// Current event-loop might be null when sending from non vertx thread
sendContext.ctx.runOnContext(v -> {
subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
});
if (((ClusteredMessage) sendContext.message).getRepliedTo() != null) {
clusteredSendReply(((ClusteredMessage) sendContext.message).getRepliedTo(), sendContext);
} else {
subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
if (sendContext.options.isLocalOnly()) {
super.sendOrPub(sendContext);
} else if (Vertx.currentContext() != sendContext.ctx) {
// Current event-loop might be null when sending from non vertx thread
sendContext.ctx.runOnContext(v -> {
subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
});
} else {
subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
}
}
}
@@ -237,10 +232,7 @@ public class ClusteredEventBus extends EventBusImpl {
}
} else {
log.error("Failed to send message", asyncResult.cause());
Handler<AsyncResult<Void>> handler = sendContext.message.writeHandler();
if (handler != null) {
handler.handle(asyncResult.mapEmpty());
}
sendContext.written(asyncResult.cause());
}
}
@@ -304,7 +296,7 @@ public class ClusteredEventBus extends EventBusImpl {
size = buff.getInt(0);
parser.fixedSizeMode(size);
} else {
ClusteredMessage received = new ClusteredMessage(false, ClusteredEventBus.this);
ClusteredMessage received = new ClusteredMessage(ClusteredEventBus.this);
received.readFromWire(buff, codecManager);
if (metrics != null) {
metrics.messageRead(received.address(), buff.length());
@@ -357,17 +349,6 @@ public class ClusteredEventBus extends EventBusImpl {
}
private void sendRemote(OutboundDeliveryContext<?> sendContext, ServerID theServerID, MessageImpl message) {
Object trace = messageSent(sendContext, false, true);
// SAME CODE THAN IN PARENT!!!!
VertxTracer tracer = sendContext.ctx.tracer();
if (tracer != null && sendContext.message.src) {
if (sendContext.replyHandler == null) {
tracer.receiveResponse(sendContext.ctx, null, trace, null, TagExtractor.empty());
} else {
sendContext.replyHandler.trace = trace;
}
}
// We need to deal with the fact that connecting can take some time and is async, and we cannot
// block to wait for it. So we add any sends to a pending list if not connected yet.
// Once we connect we send them.
@@ -386,7 +367,7 @@ public class ClusteredEventBus extends EventBusImpl {
holder.connect();
}
}
holder.writeMessage((ClusteredMessage) message);
holder.writeMessage(sendContext);
}
private void removeSub(String subName, ClusterNodeInfo node, Handler<AsyncResult<Void>> completionHandler) {

View File

@@ -12,10 +12,9 @@
package io.vertx.core.eventbus.impl.clustered;
import io.netty.util.CharsetUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.EventBusImpl;
@@ -38,23 +37,25 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
private static final byte WIRE_PROTOCOL_VERSION = 1;
private ServerID sender;
private ServerID repliedTo;
private Buffer wireBuffer;
private int bodyPos;
private int headersPos;
private boolean fromWire;
private boolean toWire;
public ClusteredMessage(boolean src, EventBusImpl bus) {
super(src, bus);
public ClusteredMessage(EventBusImpl bus) {
super(bus);
}
public ClusteredMessage(ServerID sender, String address, String replyAddress, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec, boolean send, boolean src, EventBusImpl bus, Handler<AsyncResult<Void>> writeHandler) {
super(address, replyAddress, headers, sentBody, messageCodec, send, src, bus, writeHandler);
public ClusteredMessage(ServerID sender, String address, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec, boolean send, EventBusImpl bus) {
super(address, headers, sentBody, messageCodec, send, bus);
this.sender = sender;
}
protected ClusteredMessage(ClusteredMessage<U, V> other, boolean src) {
super(other, src);
protected ClusteredMessage(ClusteredMessage<U, V> other) {
super(other);
this.sender = other.sender;
if (other.sentBody == null) {
this.wireBuffer = other.wireBuffer;
@@ -64,8 +65,15 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
this.fromWire = other.fromWire;
}
public ClusteredMessage<U, V> copyBeforeReceive(boolean src) {
return new ClusteredMessage<>(this, src);
@Override
protected MessageImpl createReply(Object message, DeliveryOptions options) {
ClusteredMessage reply = (ClusteredMessage) super.createReply(message, options);
reply.repliedTo = sender;
return reply;
}
public ClusteredMessage<U, V> copyBeforeReceive() {
return new ClusteredMessage<>(this);
}
@Override
@@ -99,6 +107,7 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
}
public Buffer encodeToWire() {
toWire = true;
int length = 1024; // TODO make this configurable
Buffer buffer = Buffer.buffer(length);
buffer.appendInt(0);
@@ -240,10 +249,18 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
return sender;
}
ServerID getRepliedTo() {
return repliedTo;
}
public boolean isFromWire() {
return fromWire;
}
public boolean isToWire() {
return toWire;
}
protected boolean isLocal() {
return !isFromWire();
}

View File

@@ -11,12 +11,10 @@
package io.vertx.core.eventbus.impl.clustered;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import io.vertx.core.eventbus.impl.codecs.PingMessageCodec;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
@@ -46,7 +44,7 @@ class ConnectionHolder {
private final Vertx vertx;
private final EventBusMetrics metrics;
private Queue<ClusteredMessage> pending;
private Queue<OutboundDeliveryContext<?>> pending;
private NetSocket socket;
private boolean connected;
private long timeoutID = -1;
@@ -78,13 +76,13 @@ class ConnectionHolder {
}
// TODO optimise this (contention on monitor)
synchronized void writeMessage(ClusteredMessage message) {
synchronized void writeMessage(OutboundDeliveryContext<?> ctx) {
if (connected) {
Buffer data = message.encodeToWire();
Buffer data = ((ClusteredMessage)ctx.message).encodeToWire();
if (metrics != null) {
metrics.messageWritten(message.address(), data.length());
metrics.messageWritten(ctx.message.address(), data.length());
}
socket.write(data, message.writeHandler());
socket.write(data, ctx);
} else {
if (pending == null) {
if (log.isDebugEnabled()) {
@@ -92,7 +90,7 @@ class ConnectionHolder {
}
pending = new ArrayDeque<>();
}
pending.add(message);
pending.add(ctx);
}
}
@@ -108,14 +106,10 @@ class ConnectionHolder {
vertx.cancelTimer(pingTimeoutID);
}
synchronized (this) {
ClusteredMessage<?, ?> msg;
OutboundDeliveryContext<?> msg;
if (pending != null) {
Future<Void> failure = Future.failedFuture(cause);
while ((msg = pending.poll()) != null) {
Handler<AsyncResult<Void>> handler = msg.writeHandler();
if (handler != null) {
handler.handle(failure);
}
msg.written(cause);
}
}
}
@@ -142,7 +136,7 @@ class ConnectionHolder {
close();
});
ClusteredMessage pingMessage =
new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, null, new PingMessageCodec(), true, true, eventBus, null);
new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, new PingMessageCodec(), true, eventBus);
Buffer data = pingMessage.encodeToWire();
socket.write(data);
});
@@ -166,12 +160,12 @@ class ConnectionHolder {
if (log.isDebugEnabled()) {
log.debug("Draining the queue for server " + serverID);
}
for (ClusteredMessage message : pending) {
Buffer data = message.encodeToWire();
for (OutboundDeliveryContext<?> ctx : pending) {
Buffer data = ((ClusteredMessage<?, ?>)ctx.message).encodeToWire();
if (metrics != null) {
metrics.messageWritten(message.address(), data.length());
metrics.messageWritten(ctx.message.address(), data.length());
}
socket.write(data, message.writeHandler());
socket.write(data, ctx);
}
}
pending = null;

View File

@@ -12,7 +12,7 @@
package io.vertx.core.eventbus;
import io.vertx.core.*;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.eventbus.impl.MessageConsumerImpl;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.ContextInternal;
@@ -1043,7 +1043,7 @@ public class LocalEventBusTest extends EventBusTestBase {
};
MessageConsumer<String> reg = eb.<String>consumer(ADDRESS1).setMaxBufferedMessages(10);
ReadStream<?> controller = register.apply(reg, handler);
((HandlerRegistration<String>) reg).discardHandler(msg -> {
((MessageConsumerImpl<String>) reg).discardHandler(msg -> {
assertEquals(data[10], msg.body());
expected.addAll(Arrays.asList(data).subList(0, 10));
controller.resume();
@@ -1072,7 +1072,7 @@ public class LocalEventBusTest extends EventBusTestBase {
}
List<String> received = Collections.synchronizedList(new ArrayList<>());
CountDownLatch receiveLatch = new CountDownLatch(4);
HandlerRegistration<String> consumer = (HandlerRegistration<String>) eb.<String>consumer(ADDRESS1).setMaxBufferedMessages(5);
MessageConsumerImpl<String> consumer = (MessageConsumerImpl<String>) eb.<String>consumer(ADDRESS1).setMaxBufferedMessages(5);
streamSupplier.apply(consumer, e -> {
received.add(e);
receiveLatch.countDown();
@@ -1105,7 +1105,7 @@ public class LocalEventBusTest extends EventBusTestBase {
// Let enough time of the 20 messages to go in the consumer pending queue
vertx.setTimer(20, v -> {
AtomicInteger count = new AtomicInteger(1);
((HandlerRegistration<Integer>)consumer).discardHandler(discarded -> {
((MessageConsumerImpl<Integer>)consumer).discardHandler(discarded -> {
int val = discarded.body();
assertEquals(count.getAndIncrement(), val);
if (val == 9) {
@@ -1149,7 +1149,7 @@ public class LocalEventBusTest extends EventBusTestBase {
};
MessageConsumer<String> reg = eb.<String>consumer(ADDRESS1).setMaxBufferedMessages(10);
ReadStream<?> controller = register.apply(reg, handler);
((HandlerRegistration<String>) reg).discardHandler(msg -> {
((MessageConsumerImpl<String>) reg).discardHandler(msg -> {
assertEquals(data[10], msg.body());
expected.addAll(Arrays.asList(data).subList(0, 10));
controller.resume();
@@ -1409,7 +1409,7 @@ public class LocalEventBusTest extends EventBusTestBase {
Context ctx = Vertx.currentContext();
ctx.runOnContext(v -> {
consumer.resume();
((HandlerRegistration<?>) consumer).discardHandler(discarded -> {
((MessageConsumerImpl<?>) consumer).discardHandler(discarded -> {
assertEquals("val1", discarded.body());
testComplete();
});

View File

@@ -295,7 +295,6 @@ public class MetricsTest extends VertxTestBase {
private void testHandlerProcessMessage(Vertx from, Vertx to, int expectedLocalCount) {
FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(to.eventBus());
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
to.runOnContext(v -> {
to.eventBus().consumer(ADDRESS1, msg -> {
HandlerMetric registration = assertRegistration(metrics);
@@ -311,11 +310,6 @@ public class MetricsTest extends VertxTestBase {
}).completionHandler(onSuccess(v2 -> {
to.runOnContext(v3 -> {
latch1.countDown();
try {
awaitLatch(latch2);
} catch (InterruptedException e) {
fail(e);
}
});
}));
});
@@ -338,8 +332,6 @@ public class MetricsTest extends VertxTestBase {
testComplete();
});
assertWaitUntil(() -> registration.scheduleCount.get() == 1);
assertEquals(0, registration.beginCount.get());
latch2.countDown();
await();
}

View File

@@ -19,6 +19,7 @@ import io.vertx.core.impl.ContextInternal;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import org.junit.Ignore;
import org.junit.Test;
import java.util.*;
@@ -124,20 +125,20 @@ public abstract class EventBusTracerTestBase extends VertxTestBase {
public void testEventBusSend() throws Exception {
EventBusTracer ebTracer = new EventBusTracer();
tracer = ebTracer;
Context receiveCtx = vertx2.getOrCreateContext();
CountDownLatch latch = new CountDownLatch(1);
receiveCtx.runOnContext(v -> {
vertx2.runOnContext(v -> {
Context ctx = vertx2.getOrCreateContext();
vertx2.eventBus().consumer("the_address", msg -> {
assertNotSame(Vertx.currentContext(), receiveCtx);
assertSameEventLoop(receiveCtx, Vertx.currentContext());
assertNotSame(Vertx.currentContext(), ctx);
assertSameEventLoop(ctx, Vertx.currentContext());
assertEquals("msg", msg.body());
});
latch.countDown();
});
awaitLatch(latch);
Context sendCtx = vertx1.getOrCreateContext();
sendCtx.runOnContext(v -> {
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) sendCtx).localContextData();
vertx1.runOnContext(v -> {
Context ctx = vertx1.getOrCreateContext();
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) ctx).localContextData();
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
vertx1.eventBus().send("the_address", "msg");
});
@@ -162,20 +163,30 @@ public abstract class EventBusTracerTestBase extends VertxTestBase {
}
@Test
public void testEventBusRequestReply() {
public void testEventBusRequestReply() throws Exception {
EventBusTracer ebTracer = new EventBusTracer();
tracer = ebTracer;
vertx2.eventBus().consumer("the_address", msg -> {
assertEquals("msg_1", msg.body());
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
msg.reply("msg_2");
CountDownLatch latch = new CountDownLatch(1);
vertx2.runOnContext(v -> {
Context ctx = vertx2.getOrCreateContext();
vertx2.eventBus().consumer("the_address", msg -> {
assertNotSame(ctx, vertx2.getOrCreateContext());
assertSameEventLoop(ctx, vertx2.getOrCreateContext());
assertEquals("msg_1", msg.body());
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
msg.reply("msg_2");
});
latch.countDown();
});
Context ctx = vertx1.getOrCreateContext();
ctx.runOnContext(v -> {
awaitLatch(latch);
vertx1.runOnContext(v -> {
Context ctx = vertx1.getOrCreateContext();
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) ctx).localContextData();
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
vertx1.eventBus().request("the_address", "msg_1", onSuccess(reply -> {
assertSame(ctx, vertx1.getOrCreateContext());
assertSameEventLoop(ctx, vertx1.getOrCreateContext());
}));
});
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 4);
@@ -234,27 +245,35 @@ public abstract class EventBusTracerTestBase extends VertxTestBase {
vertx2.eventBus().request("the_address", "msg", new DeliveryOptions().setSendTimeout(100), onFailure(failure -> {
}));
});
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 4);
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 3);
assertEquals(Arrays.asList("sendRequest[the_address]", "receiveResponse[TIMEOUT]"), ebTracer.sendEvents);
assertEquals(Arrays.asList("receiveRequest[the_address]", "sendResponse[]"), ebTracer.receiveEvents);
assertEquals(Arrays.asList("receiveRequest[the_address]"), ebTracer.receiveEvents);
}
@Test
public void testEventBusRequestReplyReply() {
public void testEventBusRequestReplyReply() throws Exception {
EventBusTracer ebTracer = new EventBusTracer();
tracer = ebTracer;
vertx2.eventBus().consumer("the_address", msg -> {
CountDownLatch latch = new CountDownLatch(1);
vertx2.runOnContext(v -> {
Context ctx = vertx2.getOrCreateContext();
assertEquals("msg_1", msg.body());
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
msg.replyAndRequest("msg_2", reply -> {
assertNotSame(ctx, vertx2.getOrCreateContext());
assertSameEventLoop(ctx, vertx2.getOrCreateContext());
vertx2.eventBus().consumer("the_address", msg -> {
Context consumerCtx = vertx2.getOrCreateContext();
assertNotSame(ctx, consumerCtx);
assertSameEventLoop(ctx, consumerCtx);
assertEquals("msg_1", msg.body());
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
msg.replyAndRequest("msg_2", reply -> {
assertSame(consumerCtx, vertx2.getOrCreateContext());
assertSameEventLoop(consumerCtx, vertx2.getOrCreateContext());
});
});
latch.countDown();
});
Context ctx = vertx1.getOrCreateContext();
ctx.runOnContext(v -> {
awaitLatch(latch);
vertx1.runOnContext(v -> {
Context ctx = vertx1.getOrCreateContext();
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) ctx).localContextData();
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
vertx1.eventBus().request("the_address", "msg_1", onSuccess(reply -> {