From 05067ca881497380de1ab9438ee5fece27a11359 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Fri, 18 Oct 2019 10:18:06 +0200 Subject: [PATCH] Move sender metrics/tracing concerns to OutboundDeliveryContext --- .../core/eventbus/impl/EventBusImpl.java | 87 ++++++------------- .../vertx/core/eventbus/impl/MessageImpl.java | 27 +----- .../eventbus/impl/MessageProducerImpl.java | 16 ++-- .../impl/OutboundDeliveryContext.java | 68 ++++++++++++++- .../impl/clustered/ClusteredEventBus.java | 21 +---- .../impl/clustered/ClusteredMessage.java | 13 +-- .../impl/clustered/ConnectionHolder.java | 25 +++--- 7 files changed, 128 insertions(+), 129 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 3f68e3aa9..3b1304309 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; /** * A local event bus implementation @@ -106,16 +105,16 @@ public class EventBusImpl implements EventBus, MetricsProvider { @Override public EventBus send(String address, Object message, DeliveryOptions options) { - MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName(), null); - sendOrPubInternal(msg, options, null); + MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName()); + sendOrPubInternal(msg, options, null, null); return this; } @Override public Future> request(String address, Object message, DeliveryOptions options) { - MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName(), null); + MessageImpl msg = createMessage(true, true, address, options.getHeaders(), message, options.getCodecName()); ReplyHandler handler = createReplyHandler(msg, true, options); - sendOrPubInternal(msg, options, handler); + sendOrPubInternal(msg, options, handler, null); return handler.result.future(); } @@ -152,7 +151,7 @@ public class EventBusImpl implements EventBus, MetricsProvider { @Override public EventBus publish(String address, Object message, DeliveryOptions options) { - sendOrPubInternal(createMessage(false, true, address, options.getHeaders(), message, options.getCodecName(), null), options, null); + sendOrPubInternal(createMessage(false, true, address, options.getHeaders(), message, options.getCodecName()), options, null, null); return this; } @@ -232,11 +231,11 @@ public class EventBusImpl implements EventBus, MetricsProvider { return metrics; } - public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Promise writeHandler) { + public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName) { Objects.requireNonNull(address, "no null address accepted"); MessageCodec codec = codecManager.lookupCodec(body, codecName); @SuppressWarnings("unchecked") - MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, src, this, writeHandler); + MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, src, this); return msg; } @@ -321,16 +320,10 @@ public class EventBusImpl implements EventBus, MetricsProvider { // Guarantees the order when there is no current context in clustered mode ctx = sendNoContext; } - send(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage)); + sendOrPubInternal(new OutboundDeliveryContext<>(ctx, replyMessage, options, replyHandler, replierMessage, null)); } } - private void send(OutboundDeliveryContext ctx) { - ctx.iter = sendInterceptors.iterator(); - ctx.bus = this; - ctx.next(); - } - protected void sendReply(OutboundDeliveryContext sendContext, MessageImpl replierMessage) { sendOrPub(sendContext); } @@ -339,21 +332,6 @@ public class EventBusImpl implements EventBus, MetricsProvider { sendLocally(sendContext); } - protected final Object messageSent(OutboundDeliveryContext sendContext, boolean local, boolean remote) { - MessageImpl msg = sendContext.message; - if (metrics != null) { - MessageImpl message = msg; - metrics.messageSent(message.address(), !message.send, local, remote); - } - VertxTracer tracer = sendContext.ctx.tracer(); - if (tracer != null && msg.src) { - BiConsumer biConsumer = (String key, String val) -> msg.headers().set(key, val); - return tracer.sendRequest(sendContext.ctx, msg, msg.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE); - } else { - return null; - } - } - protected void callCompletionHandlerAsync(Handler> completionHandler) { if (completionHandler != null) { vertx.runOnContext(v -> { @@ -363,29 +341,11 @@ public class EventBusImpl implements EventBus, MetricsProvider { } private void sendLocally(OutboundDeliveryContext sendContext) { - Object trace = messageSent(sendContext, true, false); ReplyException failure = deliverMessageLocally(sendContext.message); if (failure != null) { - // no handlers - VertxTracer tracer = sendContext.ctx.tracer(); - if (sendContext.replyHandler != null) { - sendContext.replyHandler.trace = trace; - sendContext.replyHandler.fail(failure); - } else { - if (tracer != null && sendContext.message.src) { - tracer.receiveResponse(sendContext.ctx, null, trace, failure, TagExtractor.empty()); - } - } + sendContext.written(failure); } else { - failure = null; - VertxTracer tracer = sendContext.ctx.tracer(); - if (tracer != null && sendContext.message.src) { - if (sendContext.replyHandler == null) { - tracer.receiveResponse(sendContext.ctx, null, trace, null, TagExtractor.empty()); - } else { - sendContext.replyHandler.trace = trace; - } - } + sendContext.written(null); } } @@ -404,7 +364,6 @@ public class EventBusImpl implements EventBus, MetricsProvider { } if (holder != null) { deliverToHandler(msg, holder); - msg.written(null); } else { // RACY issue !!!!! } @@ -416,16 +375,13 @@ public class EventBusImpl implements EventBus, MetricsProvider { for (HandlerHolder holder: handlers) { deliverToHandler(msg, holder); } - msg.written(null); } return null; } else { if (metrics != null) { metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), 0); } - ReplyException failure = new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address); - msg.written(failure); - return failure; + return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address); } } @@ -498,15 +454,28 @@ public class EventBusImpl implements EventBus, MetricsProvider { } } - public void sendOrPubInternal(MessageImpl message, DeliveryOptions options, - ReplyHandler handler) { - checkStarted(); + public OutboundDeliveryContext newSendContext(MessageImpl message, DeliveryOptions options, + ReplyHandler handler, Promise writePromise) { ContextInternal ctx = vertx.getContext(); if (ctx == null) { // Guarantees the order when there is no current context in clustered mode ctx = sendNoContext; } - send(new OutboundDeliveryContext<>(ctx, message, options, handler)); + return new OutboundDeliveryContext<>(ctx, message, options, handler, writePromise); + } + + public void sendOrPubInternal(OutboundDeliveryContext senderCtx) { + checkStarted(); + senderCtx.iter = sendInterceptors.iterator(); + senderCtx.bus = this; + senderCtx.metrics = metrics; + senderCtx.next(); + } + + public void sendOrPubInternal(MessageImpl message, DeliveryOptions options, + ReplyHandler handler, Promise writePromise) { + checkStarted(); + sendOrPubInternal(newSendContext(message, options, handler, writePromise)); } private void unregisterAll() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java index ae5f723bd..8c85e5c36 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java @@ -11,11 +11,8 @@ package io.vertx.core.eventbus.impl; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; -import io.vertx.core.Handler; import io.vertx.core.MultiMap; -import io.vertx.core.Promise; import io.vertx.core.eventbus.*; import io.vertx.core.http.CaseInsensitiveHeaders; import io.vertx.core.impl.logging.Logger; @@ -40,7 +37,6 @@ public class MessageImpl implements Message { protected U sentBody; protected V receivedBody; protected boolean send; - protected Promise write; public MessageImpl(boolean src, EventBusImpl bus) { this.bus = bus; @@ -49,8 +45,7 @@ public class MessageImpl implements Message { public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody, MessageCodec messageCodec, - boolean send, boolean src, EventBusImpl bus, - Promise write) { + boolean send, boolean src, EventBusImpl bus) { this.messageCodec = messageCodec; this.address = address; this.replyAddress = replyAddress; @@ -59,7 +54,6 @@ public class MessageImpl implements Message { this.send = send; this.bus = bus; this.src = src; - this.write = write; } protected MessageImpl(MessageImpl other, boolean src) { @@ -80,7 +74,6 @@ public class MessageImpl implements Message { this.receivedBody = messageCodec.transform(other.sentBody); } this.send = other.send; - this.write = other.write; } public MessageImpl copyBeforeReceive(boolean src) { @@ -117,7 +110,7 @@ public class MessageImpl implements Message { @Override public void reply(Object message, DeliveryOptions options) { if (replyAddress != null) { - MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName(), null); + MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName()); bus.sendReply(reply, this, options, null); } } @@ -125,7 +118,7 @@ public class MessageImpl implements Message { @Override public Future> replyAndRequest(Object message, DeliveryOptions options) { if (replyAddress != null) { - MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName(), null); + MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName()); EventBusImpl.ReplyHandler handler = bus.createReplyHandler(reply, reply.src, options); bus.sendReply(reply, this, options, handler); return handler.result.future(); @@ -143,20 +136,6 @@ public class MessageImpl implements Message { this.replyAddress = replyAddress; } - public void written(Throwable failure) { - if (write != null) { - if (failure == null) { - write.complete(); - } else { - write.fail(failure); - } - } - } - - public Promise write() { - return write; - } - public MessageCodec codec() { return messageCodec; } diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java index ebfbf0b26..3951ba4b3 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java @@ -17,7 +17,6 @@ import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.*; -import io.vertx.core.impl.VertxInternal; import java.util.ArrayDeque; import java.util.Queue; @@ -34,7 +33,7 @@ public class MessageProducerImpl implements MessageProducer { private final EventBusImpl bus; private final boolean send; private final String address; - private final Queue> pending = new ArrayDeque<>(); + private final Queue> pending = new ArrayDeque<>(); private final MessageConsumer creditConsumer; private DeliveryOptions options; private int maxSize = DEFAULT_WRITE_QUEUE_MAX_SIZE; @@ -99,18 +98,19 @@ public class MessageProducerImpl implements MessageProducer { } private void write(T data, Promise handler) { - MessageImpl msg = bus.createMessage(send, true, address, options.getHeaders(), data, options.getCodecName(), handler); + MessageImpl msg = bus.createMessage(send, true, address, options.getHeaders(), data, options.getCodecName()); + OutboundDeliveryContext sendCtx = bus.newSendContext(msg, options, null, handler); if (send) { synchronized (this) { if (credits > 0) { credits--; } else { - pending.add(msg); + pending.add(sendCtx); return; } } } - bus.sendOrPubInternal(msg, options, null); + bus.sendOrPubInternal(msg, options, null, handler); } @Override @@ -180,12 +180,12 @@ public class MessageProducerImpl implements MessageProducer { private synchronized void doReceiveCredit(int credit) { credits += credit; while (credits > 0) { - MessageImpl msg = pending.poll(); - if (msg == null) { + OutboundDeliveryContext sendContext = pending.poll(); + if (sendContext == null) { break; } else { credits--; - bus.sendOrPubInternal(msg, options, null); + bus.sendOrPubInternal(sendContext); } } checkDrained(); diff --git a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java index f89a36f1f..c34aa5231 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java +++ b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java @@ -10,35 +10,90 @@ */ package io.vertx.core.eventbus.impl; +import io.vertx.core.AsyncResult; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.eventbus.DeliveryContext; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.ReplyException; +import io.vertx.core.eventbus.impl.clustered.ClusteredMessage; import io.vertx.core.impl.ContextInternal; +import io.vertx.core.spi.metrics.EventBusMetrics; +import io.vertx.core.spi.tracing.TagExtractor; +import io.vertx.core.spi.tracing.VertxTracer; import java.util.Iterator; +import java.util.function.BiConsumer; -public class OutboundDeliveryContext implements DeliveryContext { +public class OutboundDeliveryContext implements DeliveryContext, Handler> { public final ContextInternal ctx; public final MessageImpl message; public final DeliveryOptions options; public final EventBusImpl.ReplyHandler replyHandler; private final MessageImpl replierMessage; + private final Promise writePromise; + + private Object trace; Iterator> iter; EventBusImpl bus; + EventBusMetrics metrics; - OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler replyHandler) { - this(ctx, message, options, replyHandler, null); + OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler replyHandler, Promise writePromise) { + this(ctx, message, options, replyHandler, null, writePromise); } - OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler replyHandler, MessageImpl replierMessage) { + OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, EventBusImpl.ReplyHandler replyHandler, MessageImpl replierMessage, Promise writePromise) { this.ctx = ctx; this.message = message; this.options = options; this.replierMessage = replierMessage; this.replyHandler = replyHandler; + this.writePromise = writePromise; + } + + @Override + public void handle(AsyncResult event) { + written(event.cause()); + } + + public void written(Throwable failure) { + + // Metrics + if (metrics != null) { + boolean remote = (message instanceof ClusteredMessage) && ((ClusteredMessage)message).isToWire(); + metrics.messageSent(message.address(), !message.send, !remote, remote); + } + + // Tracing + VertxTracer tracer = ctx.tracer(); + if (tracer != null) { + if (message.src) { + if (replyHandler != null) { + replyHandler.trace = trace; + } else { + tracer.receiveResponse(ctx, null, trace, failure, TagExtractor.empty()); + } + } + } + + // Fail fast reply handler + if (failure instanceof ReplyException) { + if (replyHandler != null) { + replyHandler.fail((ReplyException) failure); + } + } + + // Notify promise finally + if (writePromise != null) { + if (failure == null) { + writePromise.complete(); + } else { + writePromise.fail(failure); + } + } } @Override @@ -60,6 +115,11 @@ public class OutboundDeliveryContext implements DeliveryContext { EventBusImpl.log.error("Failure in interceptor", t); } } else { + VertxTracer tracer = ctx.tracer(); + if (tracer != null && message.src) { + BiConsumer biConsumer = (String key, String val) -> message.headers().set(key, val); + trace = tracer.sendRequest(ctx, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE); + } if (replierMessage == null) { bus.sendOrPub(this); } else { diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index 1447c836e..402c85442 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -32,8 +32,6 @@ import io.vertx.core.parsetools.RecordParser; import io.vertx.core.spi.cluster.AsyncMultiMap; import io.vertx.core.spi.cluster.ChoosableIterable; import io.vertx.core.spi.cluster.ClusterManager; -import io.vertx.core.spi.tracing.TagExtractor; -import io.vertx.core.spi.tracing.VertxTracer; import java.io.Serializable; import java.util.Objects; @@ -176,11 +174,11 @@ public class ClusteredEventBus extends EventBusImpl { } @Override - public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Promise writeHandler) { + public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName) { Objects.requireNonNull(address, "no null address accepted"); MessageCodec codec = codecManager.lookupCodec(body, codecName); @SuppressWarnings("unchecked") - ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, src, this, writeHandler); + ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, src, this); return msg; } @@ -237,7 +235,7 @@ public class ClusteredEventBus extends EventBusImpl { } } else { log.error("Failed to send message", asyncResult.cause()); - sendContext.message.written(asyncResult.cause()); + sendContext.written(asyncResult.cause()); } } @@ -354,17 +352,6 @@ public class ClusteredEventBus extends EventBusImpl { } private void sendRemote(OutboundDeliveryContext sendContext, ServerID theServerID, MessageImpl message) { - Object trace = messageSent(sendContext, false, true); - - // SAME CODE THAN IN PARENT!!!! - VertxTracer tracer = sendContext.ctx.tracer(); - if (tracer != null && sendContext.message.src) { - if (sendContext.replyHandler == null) { - tracer.receiveResponse(sendContext.ctx, null, trace, null, TagExtractor.empty()); - } else { - sendContext.replyHandler.trace = trace; - } - } // We need to deal with the fact that connecting can take some time and is async, and we cannot // block to wait for it. So we add any sends to a pending list if not connected yet. // Once we connect we send them. @@ -383,7 +370,7 @@ public class ClusteredEventBus extends EventBusImpl { holder.connect(); } } - holder.writeMessage((ClusteredMessage) message); + holder.writeMessage(sendContext); } private void removeSub(String subName, ClusterNodeInfo node, Handler> completionHandler) { diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java index 802bfbf5a..67fb94ce8 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java @@ -12,10 +12,7 @@ package io.vertx.core.eventbus.impl.clustered; import io.netty.util.CharsetUtil; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; import io.vertx.core.MultiMap; -import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.eventbus.impl.CodecManager; @@ -43,14 +40,15 @@ public class ClusteredMessage extends MessageImpl { private int bodyPos; private int headersPos; private boolean fromWire; + private boolean toWire; public ClusteredMessage(boolean src, EventBusImpl bus) { super(src, bus); } public ClusteredMessage(ServerID sender, String address, String replyAddress, MultiMap headers, U sentBody, - MessageCodec messageCodec, boolean send, boolean src, EventBusImpl bus, Promise writeHandler) { - super(address, replyAddress, headers, sentBody, messageCodec, send, src, bus, writeHandler); + MessageCodec messageCodec, boolean send, boolean src, EventBusImpl bus) { + super(address, replyAddress, headers, sentBody, messageCodec, send, src, bus); this.sender = sender; } @@ -100,6 +98,7 @@ public class ClusteredMessage extends MessageImpl { } public Buffer encodeToWire() { + toWire = true; int length = 1024; // TODO make this configurable Buffer buffer = Buffer.buffer(length); buffer.appendInt(0); @@ -245,6 +244,10 @@ public class ClusteredMessage extends MessageImpl { return fromWire; } + public boolean isToWire() { + return toWire; + } + protected boolean isLocal() { return !isFromWire(); } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java index 288644afd..83a74818a 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java @@ -17,6 +17,7 @@ import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.EventBusOptions; +import io.vertx.core.eventbus.impl.OutboundDeliveryContext; import io.vertx.core.eventbus.impl.codecs.PingMessageCodec; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; @@ -46,7 +47,7 @@ class ConnectionHolder { private final Vertx vertx; private final EventBusMetrics metrics; - private Queue pending; + private Queue> pending; private NetSocket socket; private boolean connected; private long timeoutID = -1; @@ -78,13 +79,13 @@ class ConnectionHolder { } // TODO optimise this (contention on monitor) - synchronized void writeMessage(ClusteredMessage message) { + synchronized void writeMessage(OutboundDeliveryContext ctx) { if (connected) { - Buffer data = message.encodeToWire(); + Buffer data = ((ClusteredMessage)ctx.message).encodeToWire(); if (metrics != null) { - metrics.messageWritten(message.address(), data.length()); + metrics.messageWritten(ctx.message.address(), data.length()); } - socket.write(data, message.write()); + socket.write(data, ctx); } else { if (pending == null) { if (log.isDebugEnabled()) { @@ -92,7 +93,7 @@ class ConnectionHolder { } pending = new ArrayDeque<>(); } - pending.add(message); + pending.add(ctx); } } @@ -108,7 +109,7 @@ class ConnectionHolder { vertx.cancelTimer(pingTimeoutID); } synchronized (this) { - ClusteredMessage msg; + OutboundDeliveryContext msg; if (pending != null) { while ((msg = pending.poll()) != null) { msg.written(cause); @@ -138,7 +139,7 @@ class ConnectionHolder { close(); }); ClusteredMessage pingMessage = - new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, null, new PingMessageCodec(), true, true, eventBus, null); + new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, null, new PingMessageCodec(), true, true, eventBus); Buffer data = pingMessage.encodeToWire(); socket.write(data); }); @@ -162,12 +163,12 @@ class ConnectionHolder { if (log.isDebugEnabled()) { log.debug("Draining the queue for server " + serverID); } - for (ClusteredMessage message : pending) { - Buffer data = message.encodeToWire(); + for (OutboundDeliveryContext ctx : pending) { + Buffer data = ((ClusteredMessage)ctx.message).encodeToWire(); if (metrics != null) { - metrics.messageWritten(message.address(), data.length()); + metrics.messageWritten(ctx.message.address(), data.length()); } - socket.write(data, message.write()); + socket.write(data, ctx); } } pending = null;