From 48751f950a4158c54265f0da0584487ccd1bd024 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 7 Oct 2019 11:43:37 +0200 Subject: [PATCH] Improve Netty promise/future listener adapter and use it when possible --- .../datagram/impl/DatagramSocketImpl.java | 6 +- .../http/impl/Http1xClientConnection.java | 13 ++-- .../core/http/impl/Http1xConnectionBase.java | 6 +- .../core/http/impl/Http2ClientConnection.java | 5 +- .../core/http/impl/Http2ConnectionBase.java | 3 +- .../http/impl/Http2ServerResponseImpl.java | 3 - .../impl/Http2UpgradedClientConnection.java | 4 +- .../core/http/impl/HttpClientStream.java | 5 +- .../http/impl/HttpServerResponseImpl.java | 22 +++--- .../impl/VertxHttp2ConnectionHandler.java | 31 +++----- .../core/http/impl/VertxHttp2NetSocket.java | 3 +- .../core/http/impl/VertxHttp2Stream.java | 10 ++- .../core/http/impl/WebSocketImplBase.java | 3 +- .../io/vertx/core/impl/AddressResolver.java | 13 +--- .../impl/ChannelFutureListenerAdapter.java | 41 ----------- .../vertx/core/net/impl/ConnectionBase.java | 16 ++--- .../core/net/impl/FutureListenerAdapter.java | 70 +++++++++++++++++++ .../io/vertx/core/net/impl/NetSocketImpl.java | 2 +- 18 files changed, 136 insertions(+), 120 deletions(-) delete mode 100644 src/main/java/io/vertx/core/net/impl/ChannelFutureListenerAdapter.java create mode 100644 src/main/java/io/vertx/core/net/impl/FutureListenerAdapter.java diff --git a/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java b/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java index cb58dec58..422a4e443 100644 --- a/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java +++ b/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java @@ -31,7 +31,7 @@ import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.core.net.SocketAddress; import io.vertx.core.net.impl.ConnectionBase; -import io.vertx.core.net.impl.ChannelFutureListenerAdapter; +import io.vertx.core.net.impl.FutureListenerAdapter; import io.vertx.core.net.impl.SocketAddressImpl; import io.vertx.core.net.impl.VertxHandler; import io.vertx.core.net.impl.transport.Transport; @@ -213,7 +213,7 @@ public class DatagramSocketImpl implements DatagramSocket, MetricsProvider { @SuppressWarnings("unchecked") final void addListener(ChannelFuture future, Handler> handler) { if (handler != null) { - future.addListener(new ChannelFutureListenerAdapter<>(context, null, handler)); + future.addListener(FutureListenerAdapter.toVoid(context, handler)); } } @@ -310,7 +310,7 @@ public class DatagramSocketImpl implements DatagramSocket, MetricsProvider { channel.flush(); ChannelFuture future = channel.close(); if (handler != null) { - future.addListener(new ChannelFutureListenerAdapter<>(context, null, handler)); + future.addListener(FutureListenerAdapter.toVoid(context, handler)); } } diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index b4fb00fb4..9cc393276 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -25,6 +25,7 @@ import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFram import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateClientExtensionHandshaker; import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.FutureListener; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.*; @@ -37,6 +38,7 @@ import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.net.NetSocket; import io.vertx.core.net.SocketAddress; import io.vertx.core.net.impl.ConnectionBase; +import io.vertx.core.net.impl.FutureListenerAdapter; import io.vertx.core.net.impl.NetSocketImpl; import io.vertx.core.net.impl.VertxHandler; import io.vertx.core.spi.metrics.HttpClientMetrics; @@ -298,8 +300,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } } - private void sendRequest( - HttpRequest request, ByteBuf buf, boolean end, Handler> handler) { + private void sendRequest(HttpRequest request, ByteBuf buf, boolean end, Handler> handler) { if (end) { if (buf != null) { request = new AssembledFullHttpRequest(request, buf); @@ -311,7 +312,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme request = new AssembledHttpRequest(request, buf); } } - conn.writeToChannel(request, conn.toPromise(handler)); + conn.writeToChannel(request, conn.toPromise(toFutureListener(handler))); } private boolean handleChunk(Buffer buff) { @@ -335,7 +336,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme msg = new DefaultHttpContent(buff); } bytesWritten += msg.content().readableBytes(); - conn.writeToChannel(msg, conn.toPromise(handler)); + conn.writeToChannel(msg, conn.toPromise(toFutureListener(handler))); } @Override @@ -543,6 +544,10 @@ class Http1xClientConnection extends Http1xConnectionBase impleme promise.tryFail(cause); } } + + private FutureListener toFutureListener(Handler> handler) { + return handler == null ? null : FutureListenerAdapter.toVoid(context, handler); + } } private void checkLifecycle() { diff --git a/src/main/java/io/vertx/core/http/impl/Http1xConnectionBase.java b/src/main/java/io/vertx/core/http/impl/Http1xConnectionBase.java index 4066d72b7..174772183 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xConnectionBase.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xConnectionBase.java @@ -37,8 +37,8 @@ import io.vertx.core.http.impl.ws.WebSocketFrameImpl; import io.vertx.core.http.impl.ws.WebSocketFrameInternal; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; -import io.vertx.core.net.impl.ChannelFutureListenerAdapter; import io.vertx.core.net.impl.ConnectionBase; +import io.vertx.core.net.impl.FutureListenerAdapter; import static io.vertx.core.net.impl.VertxHandler.safeBuffer; @@ -156,12 +156,12 @@ abstract class Http1xConnectionBase> extends Conn fut.addListener((ChannelFutureListener) f -> { ChannelFuture closeFut = chctx.channel().close(); if (handler != null) { - closeFut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler)); + closeFut.addListener(FutureListenerAdapter.toVoid(context, handler)); } }); } else { if (handler != null) { - fut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler)); + fut.addListener(FutureListenerAdapter.toVoid(context, handler)); } } }); diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java index 4086b1352..e821ec949 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; +import io.netty.util.concurrent.FutureListener; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.*; @@ -428,12 +429,12 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon } @Override - public void writeBuffer(ByteBuf buf, boolean end, Handler> handler) { + public void writeBuffer(ByteBuf buf, boolean end, Handler> listener) { if (buf == null && end) { buf = Unpooled.EMPTY_BUFFER; } if (buf != null) { - writeData(buf, end, handler); + writeData(buf, end, listener); } if (end) { handlerContext.flush(); diff --git a/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java b/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java index 358687c6f..9577ad42c 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java @@ -38,6 +38,7 @@ import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.core.net.NetSocket; import io.vertx.core.net.impl.ConnectionBase; +import io.vertx.core.net.impl.FutureListenerAdapter; import java.util.ArrayDeque; import java.util.ArrayList; @@ -408,7 +409,7 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL @Override public Future close() { Promise promise = Promise.promise(); - ChannelPromise channelPromise = toPromise(promise); + ChannelPromise channelPromise = chctx.newPromise().addListener(FutureListenerAdapter.toVoid(promise)); flush(channelPromise); channelPromise.addListener((ChannelFutureListener) future -> shutdown(0L)); return promise.future(); diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java b/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java index eda4a7034..2395a0baf 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java @@ -29,7 +29,6 @@ import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.AsyncFile; import io.vertx.core.file.FileSystem; -import io.vertx.core.file.FileSystemException; import io.vertx.core.file.OpenOptions; import io.vertx.core.http.Cookie; import io.vertx.core.http.HttpHeaders; @@ -41,13 +40,11 @@ import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.net.NetSocket; import io.vertx.core.net.impl.ConnectionBase; -import io.vertx.core.streams.WriteStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.file.NoSuchFileException; import java.util.Map; import static io.vertx.core.http.HttpHeaders.SET_COOKIE; diff --git a/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java index c37695f96..f79221511 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2UpgradedClientConnection.java @@ -109,7 +109,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { boolean end, StreamPriority priority, Handler continueHandler, - Handler> handler) { + Handler> listener) { ChannelPipeline pipeline = conn.channel().pipeline(); HttpClientCodec httpCodec = pipeline.get(HttpClientCodec.class); class UpgradeRequestHandler extends ChannelInboundHandlerAdapter { @@ -170,7 +170,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection { HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536); pipeline.addAfter("codec", null, new UpgradeRequestHandler()); pipeline.addAfter("codec", null, upgradeHandler); - stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, priority, continueHandler, handler); + stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, priority, continueHandler, listener); } @Override diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java index 4a4aa372d..44fbe930b 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java @@ -12,6 +12,7 @@ package io.vertx.core.http.impl; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.FutureListener; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.MultiMap; @@ -43,8 +44,8 @@ public interface HttpClientStream { HttpConnection connection(); ContextInternal getContext(); - void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler contHandler, Handler> handler); - void writeBuffer(ByteBuf buf, boolean end, Handler> handler); + void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler contHandler, Handler> listener); + void writeBuffer(ByteBuf buf, boolean end, Handler> listener); void writeFrame(int type, int flags, ByteBuf payload); void doSetWriteQueueMaxSize(int size); diff --git a/src/main/java/io/vertx/core/http/impl/HttpServerResponseImpl.java b/src/main/java/io/vertx/core/http/impl/HttpServerResponseImpl.java index c66a8d35d..73f1e6b8d 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpServerResponseImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpServerResponseImpl.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.concurrent.FutureListener; import io.vertx.codegen.annotations.Nullable; import io.vertx.core.AsyncResult; import io.vertx.core.Future; @@ -40,6 +41,7 @@ import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.net.NetSocket; import io.vertx.core.net.impl.ConnectionBase; +import io.vertx.core.net.impl.FutureListenerAdapter; import io.vertx.core.spi.metrics.Metrics; import java.io.File; @@ -311,7 +313,7 @@ public class HttpServerResponseImpl implements HttpServerResponse { @Override public void write(Buffer chunk, Handler> handler) { - write(chunk.getByteBuf(), conn.toPromise(handler)); + write(chunk.getByteBuf(), handler); } @Override @@ -323,7 +325,7 @@ public class HttpServerResponseImpl implements HttpServerResponse { @Override public void write(String chunk, String enc, Handler> handler) { - write(Buffer.buffer(chunk, enc).getByteBuf(), conn.toPromise(handler)); + write(Buffer.buffer(chunk, enc).getByteBuf(), handler); } @Override @@ -335,7 +337,7 @@ public class HttpServerResponseImpl implements HttpServerResponse { @Override public void write(String chunk, Handler> handler) { - write(Buffer.buffer(chunk).getByteBuf(), conn.toPromise(handler)); + write(Buffer.buffer(chunk).getByteBuf(), handler); } @Override @@ -373,10 +375,6 @@ public class HttpServerResponseImpl implements HttpServerResponse { @Override public void end(Buffer chunk, Handler> handler) { - end(chunk, conn.toPromise(handler)); - } - - private void end(Buffer chunk, ChannelPromise promise) { synchronized (conn) { if (written) { throw new IllegalStateException(RESPONSE_WRITTEN); @@ -392,7 +390,7 @@ public class HttpServerResponseImpl implements HttpServerResponse { } else { msg = new AssembledLastHttpContent(data, trailingHeaders); } - conn.writeToChannel(msg, promise); + conn.writeToChannel(msg, conn.toPromise(toFutureListener(handler))); written = true; conn.responseComplete(); if (bodyEndHandler != null) { @@ -677,7 +675,7 @@ public class HttpServerResponseImpl implements HttpServerResponse { } } - private HttpServerResponseImpl write(ByteBuf chunk, ChannelPromise promise) { + private HttpServerResponseImpl write(ByteBuf chunk, Handler> handler) { synchronized (conn) { if (written) { throw new IllegalStateException("Response has already been written"); @@ -695,7 +693,7 @@ public class HttpServerResponseImpl implements HttpServerResponse { } else { msg = new DefaultHttpContent(chunk); } - conn.writeToChannel(msg, promise); + conn.writeToChannel(msg, conn.toPromise(toFutureListener(handler))); return this; } } @@ -769,4 +767,8 @@ public class HttpServerResponseImpl implements HttpServerResponse { public @Nullable Cookie removeCookie(String name, boolean invalidate) { return CookieImpl.removeCookie(cookies(), name, invalidate); } + + private FutureListener toFutureListener(Handler> handler) { + return handler == null ? null : FutureListenerAdapter.toVoid(context, handler); + } } diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java index e5d2c9195..6f60f1a89 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java @@ -20,6 +20,7 @@ import io.netty.handler.codec.http2.*; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.FutureListener; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; @@ -186,39 +187,24 @@ class VertxHttp2ConnectionHandler extends Http2Co // - void writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive, Handler> handler) { + void writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive, FutureListener listener) { EventExecutor executor = chctx.executor(); - ChannelPromise promise = createPromise(handler); if (executor.inEventLoop()) { - _writeHeaders(stream, headers, end, streamDependency, weight, exclusive, promise); + _writeHeaders(stream, headers, end, streamDependency, weight, exclusive, listener); } else { executor.execute(() -> { - _writeHeaders(stream, headers, end, streamDependency, weight, exclusive, promise); + _writeHeaders(stream, headers, end, streamDependency, weight, exclusive, listener); }); } } - private void _writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive, ChannelPromise promise) { + private void _writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive, FutureListener listener) { + ChannelPromise promise = listener == null ? chctx.voidPromise() : chctx.newPromise().addListener(listener); encoder().writeHeaders(chctx, stream.id(), headers, streamDependency, weight, exclusive, 0, end, promise); } - private ChannelPromise createPromise(Handler> handler) { - ChannelPromise promise = chctx.newPromise(); - if (handler != null) { - promise.addListener((future) -> { - if(future.isSuccess()) { - handler.handle(Future.succeededFuture()); - } else { - handler.handle(Future.failedFuture(future.cause())); - } - }); - } - return promise; - } - - void writeData(Http2Stream stream, ByteBuf chunk, boolean end, Handler> handler) { + void writeData(Http2Stream stream, ByteBuf chunk, boolean end, FutureListener promise) { EventExecutor executor = chctx.executor(); - ChannelPromise promise = createPromise(handler); if (executor.inEventLoop()) { _writeData(stream, chunk, end, promise); } else { @@ -228,7 +214,8 @@ class VertxHttp2ConnectionHandler extends Http2Co } } - private void _writeData(Http2Stream stream, ByteBuf chunk, boolean end, ChannelPromise promise) { + private void _writeData(Http2Stream stream, ByteBuf chunk, boolean end, FutureListener listener) { + ChannelPromise promise = listener == null ? chctx.voidPromise() : chctx.newPromise().addListener(listener); encoder().writeData(chctx, stream.id(), chunk, 0, end, promise); Http2RemoteFlowController controller = encoder().flowController(); if (!controller.isWritable(stream) || end) { diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java index 8a342a401..5e310f7df 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java @@ -28,6 +28,7 @@ import io.vertx.core.http.StreamResetException; import io.vertx.core.impl.ContextInternal; import io.vertx.core.net.NetSocket; import io.vertx.core.net.SocketAddress; +import io.vertx.core.net.impl.FutureListenerAdapter; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; @@ -237,7 +238,7 @@ class VertxHttp2NetSocket extends VertxHttp2Strea @Override public void write(Buffer message, Handler> handler) { - conn.handler.writeData(stream, message.getByteBuf(), false, handler); + conn.handler.writeData(stream, message.getByteBuf(), false, FutureListenerAdapter.toVoid(context, handler)); } @Override diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java index 525e3f41a..97c891bda 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java @@ -16,6 +16,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.EmptyHttp2Headers; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; +import io.netty.util.concurrent.FutureListener; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.MultiMap; @@ -23,6 +24,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.StreamPriority; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; +import io.vertx.core.net.impl.FutureListenerAdapter; import io.vertx.core.streams.impl.InboundBuffer; /** @@ -138,7 +140,7 @@ abstract class VertxHttp2Stream { } void writeHeaders(Http2Headers headers, boolean end, Handler> handler) { - conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(), handler); + conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(), toFutureListener(handler)); } private void writePriorityFrame(StreamPriority priority) { @@ -151,7 +153,7 @@ abstract class VertxHttp2Stream { void writeData(ByteBuf chunk, boolean end, Handler> handler) { bytesWritten += chunk.readableBytes(); - conn.handler.writeData(stream, chunk, end, handler); + conn.handler.writeData(stream, chunk, end, toFutureListener(handler)); } void writeReset(long code) { @@ -198,4 +200,8 @@ abstract class VertxHttp2Stream { } abstract void handlePriorityChange(StreamPriority streamPriority); + + public FutureListener toFutureListener(Handler> handler) { + return handler == null ? null : FutureListenerAdapter.toVoid(context, handler); + } } diff --git a/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java b/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java index d82ebb625..e8af2a5fd 100644 --- a/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java +++ b/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java @@ -28,6 +28,7 @@ import io.vertx.core.http.impl.ws.WebSocketFrameImpl; import io.vertx.core.http.impl.ws.WebSocketFrameInternal; import io.vertx.core.impl.ContextInternal; import io.vertx.core.net.SocketAddress; +import io.vertx.core.net.impl.FutureListenerAdapter; import io.vertx.core.streams.impl.InboundBuffer; import javax.net.ssl.SSLPeerUnverifiedException; @@ -365,7 +366,7 @@ public abstract class WebSocketImplBase implements WebS synchronized (conn) { checkClosed(); conn.reportBytesWritten(((WebSocketFrameInternal)frame).length()); - conn.writeToChannel(conn.encodeFrame((WebSocketFrameImpl) frame), conn.toPromise(handler)); + conn.writeToChannel(conn.encodeFrame((WebSocketFrameImpl) frame), conn.toPromise(FutureListenerAdapter.toVoid(handler))); } return (S) this; } diff --git a/src/main/java/io/vertx/core/impl/AddressResolver.java b/src/main/java/io/vertx/core/impl/AddressResolver.java index bedfaa218..50bd20105 100644 --- a/src/main/java/io/vertx/core/impl/AddressResolver.java +++ b/src/main/java/io/vertx/core/impl/AddressResolver.java @@ -13,13 +13,13 @@ package io.vertx.core.impl; import io.netty.resolver.AddressResolverGroup; import io.vertx.core.AsyncResult; -import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.dns.AddressResolverOptions; import io.vertx.core.impl.launcher.commands.ExecUtils; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.core.net.impl.FutureListenerAdapter; import io.vertx.core.spi.resolver.ResolverProvider; import java.io.File; @@ -81,16 +81,7 @@ public class AddressResolver { ContextInternal callback = (ContextInternal) vertx.getOrCreateContext(); io.netty.resolver.AddressResolver resolver = resolverGroup.getResolver(callback.nettyEventLoop()); io.netty.util.concurrent.Future fut = resolver.resolve(InetSocketAddress.createUnresolved(hostname, 0)); - fut.addListener(a -> { - callback.runOnContext(v -> { - if (a.isSuccess()) { - InetSocketAddress address = fut.getNow(); - resultHandler.handle(Future.succeededFuture(address.getAddress())); - } else { - resultHandler.handle(Future.failedFuture(a.cause())); - } - }); - }); + fut.addListener(FutureListenerAdapter.toValue(callback, InetSocketAddress::getAddress, resultHandler)); } AddressResolverGroup nettyAddressResolverGroup() { diff --git a/src/main/java/io/vertx/core/net/impl/ChannelFutureListenerAdapter.java b/src/main/java/io/vertx/core/net/impl/ChannelFutureListenerAdapter.java deleted file mode 100644 index adda91731..000000000 --- a/src/main/java/io/vertx/core/net/impl/ChannelFutureListenerAdapter.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 - * which is available at https://www.apache.org/licenses/LICENSE-2.0. - * - * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 - */ - -package io.vertx.core.net.impl; - -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.impl.ContextInternal; - -/** - * @author Norman Maurer - */ -public final class ChannelFutureListenerAdapter implements ChannelFutureListener { - - private final Handler> handler; - private final T result; - private final ContextInternal context; - - public ChannelFutureListenerAdapter(ContextInternal context, T result, Handler> handler) { - this.handler = handler; - this.result = result; - this.context = context; - } - - @Override - public void operationComplete(ChannelFuture future) { - Future res = future.isSuccess() ? Future.succeededFuture(result) : Future.failedFuture(future.cause()); - context.executeFromIO(res, handler); - } -} diff --git a/src/main/java/io/vertx/core/net/impl/ConnectionBase.java b/src/main/java/io/vertx/core/net/impl/ConnectionBase.java index 808c5d069..7680ac9f7 100644 --- a/src/main/java/io/vertx/core/net/impl/ConnectionBase.java +++ b/src/main/java/io/vertx/core/net/impl/ConnectionBase.java @@ -15,6 +15,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedFile; +import io.netty.util.concurrent.FutureListener; import io.vertx.core.*; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; @@ -129,20 +130,13 @@ public abstract class ConnectionBase { * @param handler the handler * @return a promise */ - public final ChannelPromise toPromise(Handler> handler) { + public final ChannelPromise toPromise(FutureListener handler) { return handler == null ? voidPromise : wrap(handler); } - private ChannelPromise wrap(Handler> handler) { + private ChannelPromise wrap(FutureListener handler) { ChannelPromise promise = chctx.newPromise(); - promise.addListener((fut) -> { - if (fut.isSuccess()) { - handler.handle(Future.succeededFuture()); - } else { - handler.handle(Future.failedFuture(fut.cause())); - } - } - ); + promise.addListener(handler); return promise; } @@ -225,7 +219,7 @@ public abstract class ConnectionBase { .addListener((ChannelFutureListener) f -> { ChannelFuture closeFut = chctx.channel().close(); if (handler != null) { - closeFut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler)); + closeFut.addListener(FutureListenerAdapter.toVoid(context, handler)); } }); flush(promise); diff --git a/src/main/java/io/vertx/core/net/impl/FutureListenerAdapter.java b/src/main/java/io/vertx/core/net/impl/FutureListenerAdapter.java new file mode 100644 index 000000000..78d5eecb4 --- /dev/null +++ b/src/main/java/io/vertx/core/net/impl/FutureListenerAdapter.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.net.impl; + +import io.netty.util.concurrent.FutureListener; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.impl.ContextInternal; + +import java.util.function.Function; + +/** + * @author Norman Maurer + */ +public final class FutureListenerAdapter { + + public static FutureListener toValue(ContextInternal context, Function adapter, Handler> handler) { + if (handler != null) { + return future -> { + Future res = future.isSuccess() ? Future.succeededFuture(adapter.apply(future.getNow())) : Future.failedFuture(future.cause()); + context.executeFromIO(res, handler); + }; + } else { + return null; + } + } + + public static FutureListener toValue(Function adapter, Handler> handler) { + if (handler != null) { + return future -> { + Future res = future.isSuccess() ? Future.succeededFuture(adapter.apply(future.getNow())) : Future.failedFuture(future.cause()); + handler.handle(res); + }; + } else { + return null; + } + } + + public static FutureListener toVoid(ContextInternal context, Handler> handler) { + if (handler != null) { + return future -> { + Future res = future.isSuccess() ? Future.succeededFuture() : Future.failedFuture(future.cause()); + context.executeFromIO(res, handler); + }; + } else { + return null; + } + } + + public static FutureListener toVoid(Handler> handler) { + if (handler != null) { + return future -> { + Future res = future.isSuccess() ? Future.succeededFuture() : Future.failedFuture(future.cause()); + handler.handle(res); + }; + } else { + return null; + } + } +} diff --git a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java index 831245798..07847071b 100644 --- a/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java @@ -131,7 +131,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal { @Override public NetSocketInternal writeMessage(Object message, Handler> handler) { - writeToChannel(message, toPromise(handler)); + writeToChannel(message, toPromise(FutureListenerAdapter.toVoid(handler))); return this; }