Make OutboundDeliveryContext not anymore an inner class of EventBus

This commit is contained in:
Julien Viet
2019-10-18 09:55:24 +02:00
parent d6869caa0b
commit 79c13877b9
3 changed files with 90 additions and 65 deletions

View File

@@ -44,7 +44,7 @@ import java.util.function.BiConsumer;
*/
public class EventBusImpl implements EventBus, MetricsProvider {
private static final Logger log = LoggerFactory.getLogger(EventBusImpl.class);
static final Logger log = LoggerFactory.getLogger(EventBusImpl.class);
private final List<Handler<DeliveryContext>> sendInterceptors = new CopyOnWriteArrayList<>();
private final List<Handler<DeliveryContext>> receiveInterceptors = new CopyOnWriteArrayList<>();
@@ -321,10 +321,16 @@ public class EventBusImpl implements EventBus, MetricsProvider {
// Guarantees the order when there is no current context in clustered mode
ctx = sendNoContext;
}
new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage).next();
send(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage));
}
}
private void send(OutboundDeliveryContext ctx) {
ctx.iter = sendInterceptors.iterator();
ctx.bus = this;
ctx.next();
}
protected <T> void sendReply(OutboundDeliveryContext<T> sendContext, MessageImpl replierMessage) {
sendOrPub(sendContext);
}
@@ -500,68 +506,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
// Guarantees the order when there is no current context in clustered mode
ctx = sendNoContext;
}
OutboundDeliveryContext<T> sendContext = new OutboundDeliveryContext<>(ctx, message, options, handler);
sendContext.next();
}
protected class OutboundDeliveryContext<T> implements DeliveryContext<T> {
public final ContextInternal ctx;
public final MessageImpl message;
public final DeliveryOptions options;
public final Iterator<Handler<DeliveryContext>> iter;
public final ReplyHandler<T> replyHandler;
private final MessageImpl replierMessage;
private OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler) {
this(ctx, message, options, replyHandler, null);
}
private OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler, MessageImpl replierMessage) {
this.ctx = ctx;
this.message = message;
this.options = options;
this.iter = sendInterceptors.iterator();
this.replierMessage = replierMessage;
this.replyHandler = replyHandler;
}
@Override
public Message<T> message() {
return message;
}
@Override
public void next() {
if (iter.hasNext()) {
Handler<DeliveryContext> handler = iter.next();
try {
if (handler != null) {
handler.handle(this);
} else {
next();
}
} catch (Throwable t) {
log.error("Failure in interceptor", t);
}
} else {
if (replierMessage == null) {
sendOrPub(this);
} else {
sendReply(this, replierMessage);
}
}
}
@Override
public boolean send() {
return message.isSend();
}
@Override
public Object body() {
return message.sentBody;
}
send(new OutboundDeliveryContext<>(ctx, message, options, handler));
}
private void unregisterAll() {

View File

@@ -0,0 +1,80 @@
/*
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.eventbus.impl;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.DeliveryContext;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.impl.ContextInternal;
import java.util.Iterator;
public class OutboundDeliveryContext<T> implements DeliveryContext<T> {
public final ContextInternal ctx;
public final MessageImpl message;
public final DeliveryOptions options;
public final EventBusImpl.ReplyHandler<T> replyHandler;
private final MessageImpl replierMessage;
Iterator<Handler<DeliveryContext>> iter;
EventBusImpl bus;
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler<T> replyHandler) {
this(ctx, message, options, replyHandler, null);
}
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler<T> replyHandler, MessageImpl replierMessage) {
this.ctx = ctx;
this.message = message;
this.options = options;
this.replierMessage = replierMessage;
this.replyHandler = replyHandler;
}
@Override
public Message<T> message() {
return message;
}
@Override
public void next() {
if (iter.hasNext()) {
Handler<DeliveryContext> handler = iter.next();
try {
if (handler != null) {
handler.handle(this);
} else {
next();
}
} catch (Throwable t) {
EventBusImpl.log.error("Failure in interceptor", t);
}
} else {
if (replierMessage == null) {
bus.sendOrPub(this);
} else {
bus.sendReply(this, replierMessage);
}
}
}
@Override
public boolean send() {
return message.isSend();
}
@Override
public Object body() {
return message.sentBody;
}
}

View File

@@ -19,6 +19,7 @@ import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.HAManager;
import io.vertx.core.impl.VertxInternal;
@@ -35,7 +36,6 @@ import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import java.io.Serializable;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;