Introduce specific classes to split behavior/state between MessageConsumer implementation and ReplyHandler implementations delegating to HandlerRegistration

This commit is contained in:
Julien Viet
2019-10-19 15:37:37 +02:00
parent 157f59dcf4
commit 474b35eb48
9 changed files with 469 additions and 404 deletions

View File

@@ -27,8 +27,6 @@ import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -115,7 +113,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
ReplyHandler<T> handler = createReplyHandler(msg, true, options);
sendOrPubInternal(msg, options, handler, null);
return handler.result.future();
return handler.result();
}
@Override
@@ -159,7 +157,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
public <T> MessageConsumer<T> consumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new HandlerRegistration<>(vertx, vertx.getOrCreateContext(), metrics, this, address, null, false, false);
return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, false);
}
@Override
@@ -174,7 +172,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
public <T> MessageConsumer<T> localConsumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new HandlerRegistration<>(vertx, vertx.getOrCreateContext(), metrics, this, address, null, true, false);
return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, true);
}
@Override
@@ -239,17 +237,18 @@ public class EventBusImpl implements EventBus, MetricsProvider {
return msg;
}
protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(registration.getHandler(), "handler");
protected <T> HandlerHolder<T> addRegistration(String address,
HandlerRegistration<T> registration,
boolean replyHandler,
boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
// Objects.requireNonNull(registration.getHandler(), "handler");
LocalRegistrationResult<T> result = addLocalRegistration(address, registration, replyHandler, localOnly);
addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult);
addRegistration(result.newAddress, result.holder, completionHandler);
return result.holder;
}
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
protected <T> void addRegistration(boolean newAddress, HandlerHolder<T> holder, Handler<AsyncResult<Void>> completionHandler) {
completionHandler.handle(Future.succeededFuture());
}
@@ -395,59 +394,11 @@ public class EventBusImpl implements EventBus, MetricsProvider {
long timeout = options.getSendTimeout();
String replyAddress = generateReplyAddress();
message.setReplyAddress(replyAddress);
HandlerRegistration<T> registration = new HandlerRegistration<>(vertx, vertx.getOrCreateContext(), metrics, this, replyAddress, message.address, true, src);
ReplyHandler<T> handler = new ReplyHandler<>(registration, timeout);
registration.handler(handler);
ReplyHandler<T> handler = new ReplyHandler<>(this, vertx.getOrCreateContext(), replyAddress, message.address, src, timeout);
handler.register();
return handler;
}
public class ReplyHandler<T> implements Handler<Message<T>> {
final ContextInternal context;
final Promise<Message<T>> result;
final HandlerRegistration<T> registration;
final long timeoutID;
public Object trace;
ReplyHandler(HandlerRegistration<T> registration, long timeout) {
this.context = registration.context;
this.result = Promise.promise();
this.registration = registration;
this.timeoutID = vertx.setTimer(timeout, id -> {
fail(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + registration.address + ", repliedAddress: " + registration.repliedAddress));
});
}
private void trace(Object reply, Throwable failure) {
ContextInternal ctx = registration.context;
VertxTracer tracer = ctx.tracer();
if (tracer != null && registration.src) {
tracer.receiveResponse(ctx, reply, trace, failure, TagExtractor.empty());
}
}
void fail(ReplyException failure) {
registration.unregister();
if (metrics != null) {
metrics.replyFailure(registration.repliedAddress, failure.failureType());
}
trace(null, failure);
result.tryFail(failure);
}
@Override
public void handle(Message<T> reply) {
vertx.cancelTimer(timeoutID);
if (reply.body() instanceof ReplyException) {
// This is kind of clunky - but hey-ho
fail((ReplyException) reply.body());
} else {
trace(reply, null);
result.complete(reply);
}
}
}
public <T> OutboundDeliveryContext<T> newSendContext(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler, Promise<Void> writePromise) {
ContextInternal ctx = vertx.getContext();
@@ -476,7 +427,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
// Unregister all handlers explicitly - don't rely on context hooks
for (ConcurrentCyclicSequence<HandlerHolder> handlers: handlerMap.values()) {
for (HandlerHolder holder: handlers) {
holder.getHandler().unregister();
holder.getHandler().unregister(ar -> {});
}
}
}
@@ -490,11 +441,11 @@ public class EventBusImpl implements EventBus, MetricsProvider {
// before it was received
try {
if (!holder.isRemoved()) {
holder.getHandler().handle(copied);
holder.getHandler().receive(copied);
}
} finally {
if (holder.isReplyHandler()) {
holder.getHandler().unregister();
holder.getHandler().unregister(ar -> {});
}
}
});

View File

@@ -18,11 +18,11 @@ import io.vertx.core.Context;
*/
public class HandlerHolder<T> {
private final Context context;
final String address;
private final HandlerRegistration<T> handler;
private final boolean replyHandler;
private final boolean localOnly;
public final Context context;
public final String address;
public final HandlerRegistration<T> handler;
public final boolean replyHandler;
public final boolean localOnly;
private boolean removed;
public HandlerHolder(HandlerRegistration<T> handler,

View File

@@ -8,348 +8,96 @@
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.eventbus.impl;
import io.vertx.core.*;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryContext;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.streams.ReadStream;
import java.util.*;
import java.util.Iterator;
/*
* This class is optimised for performance when used on the same event loop it was created on.
* However it can be used safely from other threads.
*
* The internal state is protected using the synchronized keyword. If always used on the same event loop, then
* we benefit from biased locking which makes the overhead of synchronized near zero.
*/
public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Message<T>> {
abstract class HandlerRegistration<T> {
private static final Logger log = LoggerFactory.getLogger(HandlerRegistration.class);
public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
private final Vertx vertx;
private final EventBusMetrics metrics;
private final EventBusImpl eventBus;
final ContextInternal context;
final String address;
final String repliedAddress;
private final boolean localOnly;
protected final boolean src;
public final ContextInternal context;
public final EventBusImpl bus;
public final String address;
public final boolean src;
private HandlerHolder<T> registered;
private Handler<Message<T>> handler;
private AsyncResult<Void> result;
private Handler<AsyncResult<Void>> completionHandler;
private Handler<Void> endHandler;
private Handler<Message<T>> discardHandler;
private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES;
private final Queue<Message<T>> pending = new ArrayDeque<>(8);
private long demand = Long.MAX_VALUE;
private Object metric;
public HandlerRegistration(Vertx vertx, ContextInternal context, EventBusMetrics metrics, EventBusImpl eventBus, String address,
String repliedAddress, boolean localOnly, boolean src) {
this.vertx = vertx;
this.metrics = metrics;
this.eventBus = eventBus;
this.address = address;
this.repliedAddress = repliedAddress;
this.localOnly = localOnly;
this.src = src;
HandlerRegistration(ContextInternal context,
EventBusImpl bus,
String address,
boolean src) {
this.context = context;
this.bus = bus;
this.src = src;
this.address = address;
}
@Override
public MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages) {
Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
List<Message<T>> discarded;
Handler<Message<T>> discardHandler;
synchronized (this) {
this.maxBufferedMessages = maxBufferedMessages;
int overflow = pending.size() - maxBufferedMessages;
if (overflow <= 0) {
return this;
}
discardHandler = this.discardHandler;
if (discardHandler == null) {
while (pending.size() > maxBufferedMessages) {
pending.poll();
}
return this;
}
discarded = new ArrayList<>(overflow);
while (pending.size() > maxBufferedMessages) {
discarded.add(pending.poll());
}
final void receive(MessageImpl<?, T> msg) {
if (bus.metrics != null) {
bus.metrics.scheduleMessage(metric, msg.isLocal());
}
for (Message<T> msg : discarded) {
discardHandler.handle(msg);
doReceive(msg);
}
protected abstract void doReceive(Message<T> msg);
protected abstract void doUnregister();
synchronized Future<Void> register(String repliedAddress, boolean localOnly) {
if (registered != null) {
throw new IllegalStateException();
}
return this;
}
@Override
public synchronized int getMaxBufferedMessages() {
return maxBufferedMessages;
}
@Override
public String address() {
return address;
}
@Override
public synchronized void completionHandler(Handler<AsyncResult<Void>> completionHandler) {
Objects.requireNonNull(completionHandler);
if (result != null) {
AsyncResult<Void> value = result;
vertx.runOnContext(v -> completionHandler.handle(value));
} else {
this.completionHandler = completionHandler;
Promise<Void> p = Promise.promise();
Future<Void> fut = p.future();
registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, p);
if (bus.metrics != null) {
metric = bus.metrics.handlerRegistered(address, repliedAddress);
}
return fut;
}
@Override
public Future<Void> unregister() {
// Todo when we support multiple listeners per future
Promise<Void> promise = Promise.promise();
doUnregister(promise);
return promise.future();
}
@Override
public void unregister(Handler<AsyncResult<Void>> completionHandler) {
doUnregister(completionHandler);
}
private void doUnregister(Handler<AsyncResult<Void>> doneHandler) {
synchronized (this) {
if (handler == null) {
callHandlerAsync(Future.succeededFuture(), doneHandler);
return;
}
handler = null;
if (endHandler != null) {
Handler<Void> theEndHandler = endHandler;
Handler<AsyncResult<Void>> handler = doneHandler;
doneHandler = ar -> {
theEndHandler.handle(null);
if (handler != null) {
handler.handle(ar);
}
};
}
if (pending.size() > 0 && discardHandler != null) {
Deque<Message<T>> discarded = new ArrayDeque<>(pending);
Handler<Message<T>> handler = discardHandler;
context.runOnContext(v -> {
Message<T> msg;
while ((msg = discarded.poll()) != null) {
handler.handle(msg);
}
});
}
pending.clear();
discardHandler = null;
eventBus.removeRegistration(registered, doneHandler);
registered = null;
if (result == null) {
result = Future.failedFuture("Consumer unregistered before registration completed");
callHandlerAsync(result, completionHandler);
} else {
EventBusMetrics metrics = eventBus.metrics;
if (metrics != null) {
metrics.handlerUnregistered(metric);
}
}
}
}
private void callHandlerAsync(AsyncResult<Void> result, Handler<AsyncResult<Void>> completionHandler) {
if (completionHandler != null) {
vertx.runOnContext(v -> completionHandler.handle(result));
}
}
public synchronized void setResult(AsyncResult<Void> result) {
if (this.result != null) {
return;
}
this.result = result;
if (result.failed()) {
log.error("Failed to propagate registration for handler " + handler + " and address " + address);
} else {
if (metrics != null) {
metric = metrics.handlerRegistered(address, repliedAddress);
}
callHandlerAsync(result, completionHandler);
}
}
@Override
public void handle(Message<T> message) {
if (metrics != null) {
metrics.scheduleMessage(metric, ((MessageImpl)message).isLocal());
}
Handler<Message<T>> theHandler;
ContextInternal ctx;
synchronized (this) {
if (registered == null) {
return;
} else if (demand == 0L) {
if (pending.size() < maxBufferedMessages) {
pending.add(message);
} else {
if (discardHandler != null) {
discardHandler.handle(message);
} else {
log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address);
}
}
return;
} else {
if (pending.size() > 0) {
pending.add(message);
message = pending.poll();
}
if (demand != Long.MAX_VALUE) {
demand--;
}
theHandler = handler;
}
ctx = context;
}
deliver(theHandler, message, ctx);
}
private void deliver(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
// Handle the message outside the sync block
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714
String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}
InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl<?, T>) message, theHandler, context);
deliveryCtx.context.dispatch(v -> {
deliveryCtx.next();
});
checkNextTick();
}
private synchronized void checkNextTick() {
// Check if there are more pending messages in the queue that can be processed next time around
if (!pending.isEmpty() && demand > 0L) {
context.runOnContext(v -> {
Message<T> message;
Handler<Message<T>> theHandler;
ContextInternal ctx;
synchronized (HandlerRegistration.this) {
if (demand == 0L || (message = pending.poll()) == null) {
return;
}
if (demand != Long.MAX_VALUE) {
demand--;
}
theHandler = handler;
ctx = context;
}
deliver(theHandler, message, ctx);
});
}
}
/*
* Internal API for testing purposes.
*/
public synchronized void discardHandler(Handler<Message<T>> handler) {
this.discardHandler = handler;
}
@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
if (h != null) {
synchronized (this) {
handler = h;
if (registered == null) {
registered = eventBus.addRegistration(address, this, repliedAddress != null, localOnly);
}
}
return this;
}
this.unregister();
return this;
}
@Override
public ReadStream<T> bodyStream() {
return new BodyReadStream<>(this);
}
@Override
public synchronized boolean isRegistered() {
return registered != null;
}
@Override
public synchronized MessageConsumer<T> pause() {
demand = 0L;
return this;
}
@Override
public MessageConsumer<T> resume() {
return fetch(Long.MAX_VALUE);
}
@Override
public synchronized MessageConsumer<T> fetch(long amount) {
if (amount < 0) {
throw new IllegalArgumentException();
public void unregister(Handler<AsyncResult<Void>> completionHandler) {
doUnregister();
synchronized (this) {
if (registered != null) {
bus.removeRegistration(registered, completionHandler);
registered = null;
if (bus.metrics != null) {
bus.metrics.handlerUnregistered(metric);
metric = null;
}
} else {
if (completionHandler != null) {
context.owner().runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
}
}
}
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (demand > 0L) {
checkNextTick();
}
return this;
}
@Override
public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
if (endHandler != null) {
// We should use the HandlerHolder context to properly do this (needs small refactoring)
Context endCtx = vertx.getOrCreateContext();
this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
} else {
this.endHandler = null;
}
return this;
void dispatch(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl<?, T>) message, theHandler, context);
deliveryCtx.dispatch();
}
@Override
public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
return this;
}
public Handler<Message<T>> getHandler() {
return handler;
}
protected class InboundDeliveryContext implements DeliveryContext<T> {
private class InboundDeliveryContext implements DeliveryContext<T> {
private final MessageImpl<?, T> message;
private final Iterator<Handler<DeliveryContext>> iter;
@@ -359,10 +107,14 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
private InboundDeliveryContext(MessageImpl<?, T> message, Handler<Message<T>> handler, ContextInternal context) {
this.message = message;
this.handler = handler;
this.iter = eventBus.receiveInterceptors();
this.iter = message.bus.receiveInterceptors();
this.context = context;
}
// Temporary workaround
this.context = handler instanceof EventBusImpl.ReplyHandler ? ((EventBusImpl.ReplyHandler)handler).context : context.duplicate();
void dispatch() {
context.dispatch(v -> {
next();
});
}
@Override
@@ -392,11 +144,12 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
local = false;
}
}
Object m = metric;
try {
if (metrics != null) {
metrics.beginHandleMessage(metric, local);
if (bus.metrics != null) {
bus.metrics.beginHandleMessage(m, local);
}
VertxTracer tracer = HandlerRegistration.this.context.tracer();
VertxTracer tracer = context.tracer();
if (tracer != null && !src) {
message.trace = tracer.receiveRequest(context, message, message.isSend() ? "send" : "publish", message.headers, MessageTagExtractor.INSTANCE);
handler.handle(message);
@@ -406,13 +159,13 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
} else {
handler.handle(message);
}
if (metrics != null) {
metrics.endHandleMessage(metric, null);
if (bus.metrics != null) {
bus.metrics.endHandleMessage(m, null);
}
} catch (Exception e) {
log.error("Failed to handleMessage. address: " + message.address(), e);
if (metrics != null) {
metrics.endHandleMessage(metric, e);
if (bus.metrics != null) {
bus.metrics.endHandleMessage(m, e);
}
context.reportException(e);
}
@@ -429,5 +182,4 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
return message.receivedBody;
}
}
}

View File

@@ -0,0 +1,284 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.eventbus.impl;
import io.vertx.core.*;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import java.util.*;
/*
* This class is optimised for performance when used on the same event loop it was created on.
* However it can be used safely from other threads.
*
* The internal state is protected using the synchronized keyword. If always used on the same event loop, then
* we benefit from biased locking which makes the overhead of synchronized near zero.
*/
public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements MessageConsumer<T> {
private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class);
private static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
private final Vertx vertx;
private final ContextInternal context;
private final EventBusImpl eventBus;
private final String address;
private final boolean localOnly;
private Handler<Message<T>> handler;
private Handler<AsyncResult<Void>> completionHandler;
private Handler<Void> endHandler;
private Handler<Message<T>> discardHandler;
private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES;
private Queue<Message<T>> pending = new ArrayDeque<>(8);
private long demand = Long.MAX_VALUE;
private Future<Void> result;
MessageConsumerImpl(Vertx vertx, ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly) {
super(context, eventBus, address, false);
this.vertx = vertx;
this.context = context;
this.eventBus = eventBus;
this.address = address;
this.localOnly = localOnly;
}
@Override
public MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages) {
Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
List<Message<T>> discarded;
Handler<Message<T>> discardHandler;
synchronized (this) {
this.maxBufferedMessages = maxBufferedMessages;
int overflow = pending.size() - maxBufferedMessages;
if (overflow <= 0) {
return this;
}
discardHandler = this.discardHandler;
if (discardHandler == null) {
while (pending.size() > maxBufferedMessages) {
pending.poll();
}
return this;
}
discarded = new ArrayList<>(overflow);
while (pending.size() > maxBufferedMessages) {
discarded.add(pending.poll());
}
}
for (Message<T> msg : discarded) {
discardHandler.handle(msg);
}
return this;
}
@Override
public synchronized int getMaxBufferedMessages() {
return maxBufferedMessages;
}
@Override
public String address() {
return address;
}
@Override
public synchronized void completionHandler(Handler<AsyncResult<Void>> handler) {
Objects.requireNonNull(handler);
completionHandler = handler;
checkCompletionHandler();
}
@Override
public Future<Void> unregister() {
// Todo when we support multiple listeners per future
Promise<Void> promise = Promise.promise();
unregister(promise);
return promise.future();
}
protected synchronized void doUnregister() {
handler = null;
if (endHandler != null) {
endHandler.handle(null);
}
if (pending.size() > 0 && discardHandler != null) {
Queue<Message<T>> discarded = pending;
Handler<Message<T>> handler = discardHandler;
pending = new ArrayDeque<>();
context.runOnContext(v -> {
Message<T> msg;
while ((msg = discarded.poll()) != null) {
handler.handle(msg);
}
});
}
result = Future.failedFuture("blah");
checkCompletionHandler();
discardHandler = null;
result = null;
}
protected void doReceive(Message<T> message) {
Handler<Message<T>> theHandler;
synchronized (this) {
if (!isRegistered()) {
return;
} else if (demand == 0L) {
if (pending.size() < maxBufferedMessages) {
pending.add(message);
} else {
if (discardHandler != null) {
discardHandler.handle(message);
} else {
log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address);
}
}
return;
} else {
if (pending.size() > 0) {
pending.add(message);
message = pending.poll();
}
if (demand != Long.MAX_VALUE) {
demand--;
}
theHandler = handler;
}
}
deliver(theHandler, message);
}
private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
// Handle the message outside the sync block
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714
String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}
dispatch(theHandler, message, context.duplicate());
checkNextTick();
}
private synchronized void checkNextTick() {
// Check if there are more pending messages in the queue that can be processed next time around
if (!pending.isEmpty() && demand > 0L) {
context.runOnContext(v -> {
Message<T> message;
Handler<Message<T>> theHandler;
synchronized (MessageConsumerImpl.this) {
if (demand == 0L || (message = pending.poll()) == null) {
return;
}
if (demand != Long.MAX_VALUE) {
demand--;
}
theHandler = handler;
}
deliver(theHandler, message);
});
}
}
/*
* Internal API for testing purposes.
*/
public synchronized void discardHandler(Handler<Message<T>> handler) {
this.discardHandler = handler;
}
@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
if (h != null) {
synchronized (this) {
handler = h;
if (result == null) {
result = register(null, localOnly);
result.setHandler(ar -> checkCompletionHandler());
}
}
} else {
unregister();
}
return this;
}
private synchronized void checkCompletionHandler() {
Handler<AsyncResult<Void>> completionHandler = this.completionHandler;
Future<Void> result = this.result;
if (completionHandler != null && result != null && result.isComplete()) {
this.completionHandler = null;
this.result = null;
context.runOnContext(v -> {
completionHandler.handle(result);
});
}
}
@Override
public ReadStream<T> bodyStream() {
return new BodyReadStream<>(this);
}
@Override
public synchronized MessageConsumer<T> pause() {
demand = 0L;
return this;
}
@Override
public MessageConsumer<T> resume() {
return fetch(Long.MAX_VALUE);
}
@Override
public synchronized MessageConsumer<T> fetch(long amount) {
if (amount < 0) {
throw new IllegalArgumentException();
}
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (demand > 0L) {
checkNextTick();
}
return this;
}
@Override
public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
if (endHandler != null) {
// We should use the HandlerHolder context to properly do this (needs small refactoring)
Context endCtx = vertx.getOrCreateContext();
this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
} else {
this.endHandler = null;
}
return this;
}
@Override
public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
return this;
}
public synchronized Handler<Message<T>> getHandler() {
return handler;
}
}

View File

@@ -115,9 +115,9 @@ public class MessageImpl<U, V> implements Message<V> {
public <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options) {
if (replyAddress != null) {
MessageImpl reply = createReply(message, options);
EventBusImpl.ReplyHandler<R> handler = bus.createReplyHandler(reply, false, options);
ReplyHandler<R> handler = bus.createReplyHandler(reply, false, options);
bus.sendReply(reply, options, handler);
return handler.result.future();
return handler.result();
} else {
throw new IllegalStateException();
}

View File

@@ -31,7 +31,7 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
public final ContextInternal ctx;
public final MessageImpl message;
public final DeliveryOptions options;
public final EventBusImpl.ReplyHandler<T> replyHandler;
public final ReplyHandler<T> replyHandler;
private final Promise<Void> writePromise;
private boolean src;
@@ -39,7 +39,7 @@ public class OutboundDeliveryContext<T> implements DeliveryContext<T>, Handler<A
EventBusImpl bus;
EventBusMetrics metrics;
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler<T> replyHandler, Promise<Void> writePromise) {
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler, Promise<Void> writePromise) {
this.ctx = ctx;
this.message = message;
this.options = options;

View File

@@ -0,0 +1,80 @@
package io.vertx.core.eventbus.impl;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
class ReplyHandler<T> extends HandlerRegistration<T> implements Handler<Message<T>> {
private final EventBusImpl eventBus;
private final ContextInternal context;
private final Promise<Message<T>> result;
private final long timeoutID;
private final boolean src;
private final String repliedAddress;
Object trace;
ReplyHandler(EventBusImpl eventBus, ContextInternal context, String address, String repliedAddress, boolean src, long timeout) {
super(context, eventBus, address, src);
this.eventBus = eventBus;
this.context = context;
this.result = Promise.promise();
this.src = src;
this.repliedAddress = repliedAddress;
this.timeoutID = eventBus.vertx.setTimer(timeout, id -> {
fail(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress));
});
}
private void trace(Object reply, Throwable failure) {
VertxTracer tracer = context.tracer();
if (tracer != null && src) {
tracer.receiveResponse(context, reply, trace, failure, TagExtractor.empty());
}
}
Future<Message<T>> result() {
return result.future();
}
void fail(ReplyException failure) {
unregister(ar -> {});
if (eventBus.metrics != null) {
eventBus.metrics.replyFailure(repliedAddress, failure.failureType());
}
trace(null, failure);
result.tryFail(failure);
}
@Override
protected void doReceive(Message<T> reply) {
dispatch(this, reply, context);
}
@Override
protected void doUnregister() {
}
void register() {
register(repliedAddress, true);
}
@Override
public void handle(Message<T> reply) {
eventBus.vertx.cancelTimer(timeoutID);
if (reply.body() instanceof ReplyException) {
// This is kind of clunky - but hey-ho
fail((ReplyException) reply.body());
} else {
trace(reply, null);
result.complete(reply);
}
}
}

View File

@@ -183,13 +183,11 @@ public class ClusteredEventBus extends EventBusImpl {
}
@Override
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !replyHandler && !localOnly) {
protected <T> void addRegistration(boolean newAddress, HandlerHolder<T> holder, Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !holder.replyHandler && !holder.localOnly) {
// Propagate the information
subs.add(address, nodeInfo, completionHandler);
ownSubs.add(address);
subs.add(holder.address, nodeInfo, completionHandler);
ownSubs.add(holder.address);
} else {
completionHandler.handle(Future.succeededFuture());
}

View File

@@ -12,7 +12,7 @@
package io.vertx.core.eventbus;
import io.vertx.core.*;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.eventbus.impl.MessageConsumerImpl;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.ContextInternal;
@@ -1043,7 +1043,7 @@ public class LocalEventBusTest extends EventBusTestBase {
};
MessageConsumer<String> reg = eb.<String>consumer(ADDRESS1).setMaxBufferedMessages(10);
ReadStream<?> controller = register.apply(reg, handler);
((HandlerRegistration<String>) reg).discardHandler(msg -> {
((MessageConsumerImpl<String>) reg).discardHandler(msg -> {
assertEquals(data[10], msg.body());
expected.addAll(Arrays.asList(data).subList(0, 10));
controller.resume();
@@ -1072,7 +1072,7 @@ public class LocalEventBusTest extends EventBusTestBase {
}
List<String> received = Collections.synchronizedList(new ArrayList<>());
CountDownLatch receiveLatch = new CountDownLatch(4);
HandlerRegistration<String> consumer = (HandlerRegistration<String>) eb.<String>consumer(ADDRESS1).setMaxBufferedMessages(5);
MessageConsumerImpl<String> consumer = (MessageConsumerImpl<String>) eb.<String>consumer(ADDRESS1).setMaxBufferedMessages(5);
streamSupplier.apply(consumer, e -> {
received.add(e);
receiveLatch.countDown();
@@ -1105,7 +1105,7 @@ public class LocalEventBusTest extends EventBusTestBase {
// Let enough time of the 20 messages to go in the consumer pending queue
vertx.setTimer(20, v -> {
AtomicInteger count = new AtomicInteger(1);
((HandlerRegistration<Integer>)consumer).discardHandler(discarded -> {
((MessageConsumerImpl<Integer>)consumer).discardHandler(discarded -> {
int val = discarded.body();
assertEquals(count.getAndIncrement(), val);
if (val == 9) {
@@ -1149,7 +1149,7 @@ public class LocalEventBusTest extends EventBusTestBase {
};
MessageConsumer<String> reg = eb.<String>consumer(ADDRESS1).setMaxBufferedMessages(10);
ReadStream<?> controller = register.apply(reg, handler);
((HandlerRegistration<String>) reg).discardHandler(msg -> {
((MessageConsumerImpl<String>) reg).discardHandler(msg -> {
assertEquals(data[10], msg.body());
expected.addAll(Arrays.asList(data).subList(0, 10));
controller.resume();
@@ -1409,7 +1409,7 @@ public class LocalEventBusTest extends EventBusTestBase {
Context ctx = Vertx.currentContext();
ctx.runOnContext(v -> {
consumer.resume();
((HandlerRegistration<?>) consumer).discardHandler(discarded -> {
((MessageConsumerImpl<?>) consumer).discardHandler(discarded -> {
assertEquals("val1", discarded.body());
testComplete();
});