mirror of
https://github.com/jlengrand/vert.x.git
synced 2026-03-10 08:51:19 +00:00
Use promise first in EventBus instead of async result handler
This commit is contained in:
@@ -17,7 +17,6 @@ import io.vertx.codegen.annotations.VertxGen;
|
||||
import io.vertx.core.AsyncResult;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.Promise;
|
||||
import io.vertx.core.metrics.Measured;
|
||||
|
||||
/**
|
||||
@@ -70,15 +69,15 @@ public interface EventBus extends Measured {
|
||||
* @return a reference to this, so the API can be used fluently
|
||||
*/
|
||||
@Fluent
|
||||
<T> EventBus request(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);
|
||||
default <T> EventBus request(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler) {
|
||||
return request(address, message, new DeliveryOptions(), replyHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Like {@link #request(String, Object, Handler)} but returns a {@code Future} of the asynchronous result
|
||||
*/
|
||||
default <T> Future<Message<T>> request(String address, Object message) {
|
||||
Promise<Message<T>> promise = Promise.promise();
|
||||
request(address, message, promise);
|
||||
return promise.future();
|
||||
return request(address, message, new DeliveryOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -91,16 +90,16 @@ public interface EventBus extends Measured {
|
||||
* @return a reference to this, so the API can be used fluently
|
||||
*/
|
||||
@Fluent
|
||||
<T> EventBus request(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler);
|
||||
default <T> EventBus request(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
|
||||
Future<Message<T>> reply = request(address, message, options);
|
||||
reply.setHandler(replyHandler);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Like {@link #request(String, Object, DeliveryOptions, Handler)} but returns a {@code Future} of the asynchronous result
|
||||
*/
|
||||
default <T> Future<Message<T>> request(String address, Object message, DeliveryOptions options) {
|
||||
Promise<Message<T>> promise = Promise.promise();
|
||||
request(address, message, options, promise);
|
||||
return promise.future();
|
||||
}
|
||||
<T> Future<Message<T>> request(String address, Object message, DeliveryOptions options);
|
||||
|
||||
/**
|
||||
* Publish a message.<p>
|
||||
|
||||
@@ -18,7 +18,6 @@ import io.vertx.core.AsyncResult;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.MultiMap;
|
||||
import io.vertx.core.Promise;
|
||||
|
||||
/**
|
||||
* Represents a message that is received from the event bus in a handler.
|
||||
@@ -78,7 +77,9 @@ public interface Message<T> {
|
||||
*
|
||||
* @param message the message to reply with.
|
||||
*/
|
||||
void reply(Object message);
|
||||
default void reply(Object message) {
|
||||
reply(message, new DeliveryOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* Link {@link #reply(Object)} but allows you to specify delivery options for the reply.
|
||||
@@ -99,15 +100,15 @@ public interface Message<T> {
|
||||
* @param message the message to reply with.
|
||||
* @param replyHandler the reply handler for the reply.
|
||||
*/
|
||||
<R> void replyAndRequest(Object message, Handler<AsyncResult<Message<R>>> replyHandler);
|
||||
default <R> void replyAndRequest(Object message, Handler<AsyncResult<Message<R>>> replyHandler) {
|
||||
replyAndRequest(message, new DeliveryOptions(), replyHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Like {@link #replyAndRequest(Object, Handler)} but returns a {@code Future} of the asynchronous result
|
||||
*/
|
||||
default <R> Future<Message<R>> replyAndRequest(Object message) {
|
||||
Promise<Message<R>> promise = Promise.promise();
|
||||
replyAndRequest(message, promise);
|
||||
return promise.future();
|
||||
return replyAndRequest(message, new DeliveryOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -118,16 +119,14 @@ public interface Message<T> {
|
||||
* @param options delivery options
|
||||
* @param replyHandler reply handler will be called when any reply from the recipient is received
|
||||
*/
|
||||
<R> void replyAndRequest(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler);
|
||||
default <R> void replyAndRequest(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
|
||||
this.<R>replyAndRequest(message, options).setHandler(replyHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Like {@link #replyAndRequest(Object, DeliveryOptions, Handler)} but returns a {@code Future} of the asynchronous result
|
||||
*/
|
||||
default <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options) {
|
||||
Promise<Message<R>> promise = Promise.promise();
|
||||
replyAndRequest(message, options, promise);
|
||||
return promise.future();
|
||||
}
|
||||
<R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options);
|
||||
|
||||
/**
|
||||
* Signal to the sender that processing of this message failed.
|
||||
@@ -138,6 +137,8 @@ public interface Message<T> {
|
||||
* @param failureCode A failure code to pass back to the sender
|
||||
* @param message A message to pass back to the sender
|
||||
*/
|
||||
void fail(int failureCode, String message);
|
||||
default void fail(int failureCode, String message) {
|
||||
reply(new ReplyException(ReplyFailure.RECIPIENT_FAILURE, failureCode, message));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -101,23 +101,22 @@ public class EventBusImpl implements EventBus, MetricsProvider {
|
||||
|
||||
@Override
|
||||
public EventBus send(String address, Object message) {
|
||||
return request(address, message, new DeliveryOptions(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> EventBus request(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler) {
|
||||
return request(address, message, new DeliveryOptions(), replyHandler);
|
||||
return send(address, message, new DeliveryOptions());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventBus send(String address, Object message, DeliveryOptions options) {
|
||||
return request(address, message, options, null);
|
||||
MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName(), null);
|
||||
sendOrPubInternal(msg, options, null);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> EventBus request(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
|
||||
sendOrPubInternal(createMessage(true, true, address, options.getHeaders(), message, options.getCodecName(), null), options, replyHandler);
|
||||
return this;
|
||||
public <T> Future<Message<T>> request(String address, Object message, DeliveryOptions options) {
|
||||
MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName(), null);
|
||||
ReplyHandler<T> handler = createReplyHandler(msg, true, options);
|
||||
sendOrPubInternal(msg, options, handler);
|
||||
return handler.result.future();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -233,7 +232,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Handler<AsyncResult<Void>> writeHandler) {
|
||||
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Promise<Void> writeHandler) {
|
||||
Objects.requireNonNull(address, "no null address accepted");
|
||||
MessageCodec codec = codecManager.lookupCodec(body, codecName);
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -313,7 +312,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
|
||||
}
|
||||
|
||||
protected <T> void sendReply(MessageImpl replyMessage, MessageImpl replierMessage, DeliveryOptions options,
|
||||
Handler<AsyncResult<Message<T>>> replyHandler) {
|
||||
ReplyHandler<T> replyHandler) {
|
||||
if (replyMessage.address() == null) {
|
||||
throw new IllegalStateException("address not specified");
|
||||
} else {
|
||||
@@ -322,8 +321,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
|
||||
// Guarantees the order when there is no current context in clustered mode
|
||||
ctx = sendNoContext;
|
||||
}
|
||||
ReplyHandler<T> handler = replyHandler != null ? createReplyHandler(replyMessage, replierMessage.src, options, replyHandler) : null;
|
||||
new OutboundDeliveryContext<>(ctx, replyMessage, options, handler, replierMessage).next();
|
||||
new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage).next();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -400,10 +398,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
|
||||
}
|
||||
if (holder != null) {
|
||||
deliverToHandler(msg, holder);
|
||||
Handler<AsyncResult<Void>> handler = msg.writeHandler;
|
||||
if (handler != null) {
|
||||
handler.handle(Future.succeededFuture());
|
||||
}
|
||||
msg.written(null);
|
||||
} else {
|
||||
// RACY issue !!!!!
|
||||
}
|
||||
@@ -415,10 +410,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
|
||||
for (HandlerHolder holder: handlers) {
|
||||
deliverToHandler(msg, holder);
|
||||
}
|
||||
Handler<AsyncResult<Void>> handler = msg.writeHandler;
|
||||
if (handler != null) {
|
||||
handler.handle(Future.succeededFuture());
|
||||
}
|
||||
msg.written(null);
|
||||
}
|
||||
return null;
|
||||
} else {
|
||||
@@ -426,10 +418,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
|
||||
metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), 0);
|
||||
}
|
||||
ReplyException failure = new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
|
||||
Handler<AsyncResult<Void>> handler = msg.writeHandler;
|
||||
if (handler != null) {
|
||||
handler.handle(Future.failedFuture(failure));
|
||||
}
|
||||
msg.written(failure);
|
||||
return failure;
|
||||
}
|
||||
}
|
||||
@@ -444,28 +433,28 @@ public class EventBusImpl implements EventBus, MetricsProvider {
|
||||
return "__vertx.reply." + Long.toString(replySequence.incrementAndGet());
|
||||
}
|
||||
|
||||
private <T> ReplyHandler<T> createReplyHandler(MessageImpl message,
|
||||
<T> ReplyHandler<T> createReplyHandler(MessageImpl message,
|
||||
boolean src,
|
||||
DeliveryOptions options,
|
||||
Handler<AsyncResult<Message<T>>> replyHandler) {
|
||||
DeliveryOptions options) {
|
||||
long timeout = options.getSendTimeout();
|
||||
String replyAddress = generateReplyAddress();
|
||||
message.setReplyAddress(replyAddress);
|
||||
HandlerRegistration<T> registration = new HandlerRegistration<>(vertx, vertx.getOrCreateContext(), metrics, this, replyAddress, message.address, true, src);
|
||||
ReplyHandler<T> handler = new ReplyHandler<>(registration, timeout);
|
||||
handler.result.future().setHandler(replyHandler);
|
||||
registration.handler(handler);
|
||||
return handler;
|
||||
}
|
||||
|
||||
public class ReplyHandler<T> implements Handler<Message<T>> {
|
||||
|
||||
final ContextInternal context;
|
||||
final Promise<Message<T>> result;
|
||||
final HandlerRegistration<T> registration;
|
||||
final long timeoutID;
|
||||
public Object trace;
|
||||
|
||||
ReplyHandler(HandlerRegistration<T> registration, long timeout) {
|
||||
this.context = registration.context;
|
||||
this.result = Promise.promise();
|
||||
this.registration = registration;
|
||||
this.timeoutID = vertx.setTimer(timeout, id -> {
|
||||
@@ -504,9 +493,8 @@ public class EventBusImpl implements EventBus, MetricsProvider {
|
||||
}
|
||||
|
||||
public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
|
||||
Handler<AsyncResult<Message<T>>> replyHandler) {
|
||||
ReplyHandler<T> handler) {
|
||||
checkStarted();
|
||||
ReplyHandler<T> handler = replyHandler != null ? createReplyHandler(message, true, options, replyHandler) : null;
|
||||
ContextInternal ctx = vertx.getContext();
|
||||
if (ctx == null) {
|
||||
// Guarantees the order when there is no current context in clustered mode
|
||||
|
||||
@@ -123,6 +123,7 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
|
||||
|
||||
@Override
|
||||
public Future<Void> unregister() {
|
||||
// Todo when we support multiple listeners per future
|
||||
Promise<Void> promise = Promise.promise();
|
||||
doUnregister(promise);
|
||||
return promise.future();
|
||||
@@ -359,7 +360,9 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
|
||||
this.message = message;
|
||||
this.handler = handler;
|
||||
this.iter = eventBus.receiveInterceptors();
|
||||
this.context = message.src ? context : context.duplicate();
|
||||
|
||||
// Temporary workaround
|
||||
this.context = handler instanceof EventBusImpl.ReplyHandler ? ((EventBusImpl.ReplyHandler)handler).context : context.duplicate();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -12,8 +12,10 @@
|
||||
package io.vertx.core.eventbus.impl;
|
||||
|
||||
import io.vertx.core.AsyncResult;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.MultiMap;
|
||||
import io.vertx.core.Promise;
|
||||
import io.vertx.core.eventbus.*;
|
||||
import io.vertx.core.http.CaseInsensitiveHeaders;
|
||||
import io.vertx.core.impl.logging.Logger;
|
||||
@@ -38,7 +40,7 @@ public class MessageImpl<U, V> implements Message<V> {
|
||||
protected U sentBody;
|
||||
protected V receivedBody;
|
||||
protected boolean send;
|
||||
protected Handler<AsyncResult<Void>> writeHandler;
|
||||
protected Promise<Void> write;
|
||||
|
||||
public MessageImpl(boolean src, EventBusImpl bus) {
|
||||
this.bus = bus;
|
||||
@@ -48,7 +50,7 @@ public class MessageImpl<U, V> implements Message<V> {
|
||||
public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody,
|
||||
MessageCodec<U, V> messageCodec,
|
||||
boolean send, boolean src, EventBusImpl bus,
|
||||
Handler<AsyncResult<Void>> writeHandler) {
|
||||
Promise<Void> write) {
|
||||
this.messageCodec = messageCodec;
|
||||
this.address = address;
|
||||
this.replyAddress = replyAddress;
|
||||
@@ -57,7 +59,7 @@ public class MessageImpl<U, V> implements Message<V> {
|
||||
this.send = send;
|
||||
this.bus = bus;
|
||||
this.src = src;
|
||||
this.writeHandler = writeHandler;
|
||||
this.write = write;
|
||||
}
|
||||
|
||||
protected MessageImpl(MessageImpl<U, V> other, boolean src) {
|
||||
@@ -78,7 +80,7 @@ public class MessageImpl<U, V> implements Message<V> {
|
||||
this.receivedBody = messageCodec.transform(other.sentBody);
|
||||
}
|
||||
this.send = other.send;
|
||||
this.writeHandler = other.writeHandler;
|
||||
this.write = other.write;
|
||||
}
|
||||
|
||||
public MessageImpl<U, V> copyBeforeReceive(boolean src) {
|
||||
@@ -112,31 +114,23 @@ public class MessageImpl<U, V> implements Message<V> {
|
||||
return replyAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fail(int failureCode, String message) {
|
||||
reply(new ReplyException(ReplyFailure.RECIPIENT_FAILURE, failureCode, message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reply(Object message) {
|
||||
replyAndRequest(message, new DeliveryOptions(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> void replyAndRequest(Object message, Handler<AsyncResult<Message<R>>> replyHandler) {
|
||||
replyAndRequest(message, new DeliveryOptions(), replyHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reply(Object message, DeliveryOptions options) {
|
||||
replyAndRequest(message, options, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> void replyAndRequest(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
|
||||
if (replyAddress != null) {
|
||||
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName(), null);
|
||||
bus.sendReply(reply, this, options, replyHandler);
|
||||
bus.sendReply(reply, this, options, null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options) {
|
||||
if (replyAddress != null) {
|
||||
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName(), null);
|
||||
EventBusImpl.ReplyHandler<R> handler = bus.createReplyHandler(reply, reply.src, options);
|
||||
bus.sendReply(reply, this, options, handler);
|
||||
return handler.result.future();
|
||||
} else {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,8 +143,18 @@ public class MessageImpl<U, V> implements Message<V> {
|
||||
this.replyAddress = replyAddress;
|
||||
}
|
||||
|
||||
public Handler<AsyncResult<Void>> writeHandler() {
|
||||
return writeHandler;
|
||||
public void written(Throwable failure) {
|
||||
if (write != null) {
|
||||
if (failure == null) {
|
||||
write.complete();
|
||||
} else {
|
||||
write.fail(failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Promise<Void> write() {
|
||||
return write;
|
||||
}
|
||||
|
||||
public MessageCodec<U, V> codec() {
|
||||
|
||||
@@ -17,6 +17,7 @@ import io.vertx.core.Handler;
|
||||
import io.vertx.core.Promise;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.eventbus.*;
|
||||
import io.vertx.core.impl.VertxInternal;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
@@ -89,13 +90,27 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
|
||||
|
||||
@Override
|
||||
public void write(T data, Handler<AsyncResult<Void>> handler) {
|
||||
if (send) {
|
||||
doSend(data, null, handler);
|
||||
} else {
|
||||
MessageImpl msg = bus.createMessage(false, true, address, options.getHeaders(), data, options.getCodecName(), handler);
|
||||
msg.writeHandler = handler;
|
||||
bus.sendOrPubInternal(msg, options, null);
|
||||
Promise<Void> promise = null;
|
||||
if (handler != null) {
|
||||
promise = Promise.promise();
|
||||
promise.future().setHandler(handler);
|
||||
}
|
||||
write(data, promise);
|
||||
}
|
||||
|
||||
private void write(T data, Promise<Void> handler) {
|
||||
MessageImpl msg = bus.createMessage(send, true, address, options.getHeaders(), data, options.getCodecName(), handler);
|
||||
if (send) {
|
||||
synchronized (this) {
|
||||
if (credits > 0) {
|
||||
credits--;
|
||||
} else {
|
||||
pending.add(msg);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
bus.sendOrPubInternal(msg, options, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -162,16 +177,6 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
|
||||
super.finalize();
|
||||
}
|
||||
|
||||
private synchronized <R> void doSend(T data, Handler<AsyncResult<Message<R>>> replyHandler, Handler<AsyncResult<Void>> handler) {
|
||||
MessageImpl msg = bus.createMessage(true, true, address, options.getHeaders(), data, options.getCodecName(), handler);
|
||||
if (credits > 0) {
|
||||
credits--;
|
||||
bus.sendOrPubInternal(msg, options, replyHandler);
|
||||
} else {
|
||||
pending.add(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void doReceiveCredit(int credit) {
|
||||
credits += credit;
|
||||
while (credits > 0) {
|
||||
|
||||
@@ -176,7 +176,7 @@ public class ClusteredEventBus extends EventBusImpl {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Handler<AsyncResult<Void>> writeHandler) {
|
||||
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Promise<Void> writeHandler) {
|
||||
Objects.requireNonNull(address, "no null address accepted");
|
||||
MessageCodec codec = codecManager.lookupCodec(body, codecName);
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -237,10 +237,7 @@ public class ClusteredEventBus extends EventBusImpl {
|
||||
}
|
||||
} else {
|
||||
log.error("Failed to send message", asyncResult.cause());
|
||||
Handler<AsyncResult<Void>> handler = sendContext.message.writeHandler();
|
||||
if (handler != null) {
|
||||
handler.handle(asyncResult.mapEmpty());
|
||||
}
|
||||
sendContext.message.written(asyncResult.cause());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import io.netty.util.CharsetUtil;
|
||||
import io.vertx.core.AsyncResult;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.MultiMap;
|
||||
import io.vertx.core.Promise;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.eventbus.MessageCodec;
|
||||
import io.vertx.core.eventbus.impl.CodecManager;
|
||||
@@ -48,7 +49,7 @@ public class ClusteredMessage<U, V> extends MessageImpl<U, V> {
|
||||
}
|
||||
|
||||
public ClusteredMessage(ServerID sender, String address, String replyAddress, MultiMap headers, U sentBody,
|
||||
MessageCodec<U, V> messageCodec, boolean send, boolean src, EventBusImpl bus, Handler<AsyncResult<Void>> writeHandler) {
|
||||
MessageCodec<U, V> messageCodec, boolean send, boolean src, EventBusImpl bus, Promise<Void> writeHandler) {
|
||||
super(address, replyAddress, headers, sentBody, messageCodec, send, src, bus, writeHandler);
|
||||
this.sender = sender;
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ class ConnectionHolder {
|
||||
if (metrics != null) {
|
||||
metrics.messageWritten(message.address(), data.length());
|
||||
}
|
||||
socket.write(data, message.writeHandler());
|
||||
socket.write(data, message.write());
|
||||
} else {
|
||||
if (pending == null) {
|
||||
if (log.isDebugEnabled()) {
|
||||
@@ -110,12 +110,8 @@ class ConnectionHolder {
|
||||
synchronized (this) {
|
||||
ClusteredMessage<?, ?> msg;
|
||||
if (pending != null) {
|
||||
Future<Void> failure = Future.failedFuture(cause);
|
||||
while ((msg = pending.poll()) != null) {
|
||||
Handler<AsyncResult<Void>> handler = msg.writeHandler();
|
||||
if (handler != null) {
|
||||
handler.handle(failure);
|
||||
}
|
||||
msg.written(cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -171,7 +167,7 @@ class ConnectionHolder {
|
||||
if (metrics != null) {
|
||||
metrics.messageWritten(message.address(), data.length());
|
||||
}
|
||||
socket.write(data, message.writeHandler());
|
||||
socket.write(data, message.write());
|
||||
}
|
||||
}
|
||||
pending = null;
|
||||
|
||||
@@ -124,20 +124,20 @@ public abstract class EventBusTracerTestBase extends VertxTestBase {
|
||||
public void testEventBusSend() throws Exception {
|
||||
EventBusTracer ebTracer = new EventBusTracer();
|
||||
tracer = ebTracer;
|
||||
Context receiveCtx = vertx2.getOrCreateContext();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
receiveCtx.runOnContext(v -> {
|
||||
vertx2.runOnContext(v -> {
|
||||
Context ctx = vertx2.getOrCreateContext();
|
||||
vertx2.eventBus().consumer("the_address", msg -> {
|
||||
assertNotSame(Vertx.currentContext(), receiveCtx);
|
||||
assertSameEventLoop(receiveCtx, Vertx.currentContext());
|
||||
assertNotSame(Vertx.currentContext(), ctx);
|
||||
assertSameEventLoop(ctx, Vertx.currentContext());
|
||||
assertEquals("msg", msg.body());
|
||||
});
|
||||
latch.countDown();
|
||||
});
|
||||
awaitLatch(latch);
|
||||
Context sendCtx = vertx1.getOrCreateContext();
|
||||
sendCtx.runOnContext(v -> {
|
||||
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) sendCtx).localContextData();
|
||||
vertx1.runOnContext(v -> {
|
||||
Context ctx = vertx1.getOrCreateContext();
|
||||
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) ctx).localContextData();
|
||||
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
|
||||
vertx1.eventBus().send("the_address", "msg");
|
||||
});
|
||||
@@ -162,20 +162,30 @@ public abstract class EventBusTracerTestBase extends VertxTestBase {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventBusRequestReply() {
|
||||
public void testEventBusRequestReply() throws Exception {
|
||||
EventBusTracer ebTracer = new EventBusTracer();
|
||||
tracer = ebTracer;
|
||||
vertx2.eventBus().consumer("the_address", msg -> {
|
||||
assertEquals("msg_1", msg.body());
|
||||
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
|
||||
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
|
||||
msg.reply("msg_2");
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
vertx2.runOnContext(v -> {
|
||||
Context ctx = vertx2.getOrCreateContext();
|
||||
vertx2.eventBus().consumer("the_address", msg -> {
|
||||
assertNotSame(ctx, vertx2.getOrCreateContext());
|
||||
assertSameEventLoop(ctx, vertx2.getOrCreateContext());
|
||||
assertEquals("msg_1", msg.body());
|
||||
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
|
||||
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
|
||||
msg.reply("msg_2");
|
||||
});
|
||||
latch.countDown();
|
||||
});
|
||||
Context ctx = vertx1.getOrCreateContext();
|
||||
ctx.runOnContext(v -> {
|
||||
awaitLatch(latch);
|
||||
vertx1.runOnContext(v -> {
|
||||
Context ctx = vertx1.getOrCreateContext();
|
||||
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) ctx).localContextData();
|
||||
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
|
||||
vertx1.eventBus().request("the_address", "msg_1", onSuccess(reply -> {
|
||||
assertSame(ctx, vertx1.getOrCreateContext());
|
||||
assertSameEventLoop(ctx, vertx1.getOrCreateContext());
|
||||
}));
|
||||
});
|
||||
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 4);
|
||||
@@ -240,21 +250,29 @@ public abstract class EventBusTracerTestBase extends VertxTestBase {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventBusRequestReplyReply() {
|
||||
public void testEventBusRequestReplyReply() throws Exception {
|
||||
EventBusTracer ebTracer = new EventBusTracer();
|
||||
tracer = ebTracer;
|
||||
vertx2.eventBus().consumer("the_address", msg -> {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
vertx2.runOnContext(v -> {
|
||||
Context ctx = vertx2.getOrCreateContext();
|
||||
assertEquals("msg_1", msg.body());
|
||||
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
|
||||
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
|
||||
msg.replyAndRequest("msg_2", reply -> {
|
||||
assertNotSame(ctx, vertx2.getOrCreateContext());
|
||||
assertSameEventLoop(ctx, vertx2.getOrCreateContext());
|
||||
vertx2.eventBus().consumer("the_address", msg -> {
|
||||
Context consumerCtx = vertx2.getOrCreateContext();
|
||||
assertNotSame(ctx, consumerCtx);
|
||||
assertSameEventLoop(ctx, consumerCtx);
|
||||
assertEquals("msg_1", msg.body());
|
||||
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
|
||||
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
|
||||
msg.replyAndRequest("msg_2", reply -> {
|
||||
assertSame(consumerCtx, vertx2.getOrCreateContext());
|
||||
assertSameEventLoop(consumerCtx, vertx2.getOrCreateContext());
|
||||
});
|
||||
});
|
||||
latch.countDown();
|
||||
});
|
||||
Context ctx = vertx1.getOrCreateContext();
|
||||
ctx.runOnContext(v -> {
|
||||
awaitLatch(latch);
|
||||
vertx1.runOnContext(v -> {
|
||||
Context ctx = vertx1.getOrCreateContext();
|
||||
ConcurrentMap<Object, Object> tracerMap = ((ContextInternal) ctx).localContextData();
|
||||
tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
|
||||
vertx1.eventBus().request("the_address", "msg_1", onSuccess(reply -> {
|
||||
|
||||
Reference in New Issue
Block a user