Improve Netty promise/future listener adapter and use it when possible

This commit is contained in:
Julien Viet
2019-10-07 11:43:37 +02:00
parent 707ad8c1bb
commit 48751f950a
18 changed files with 136 additions and 120 deletions

View File

@@ -31,7 +31,7 @@ import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.ChannelFutureListenerAdapter;
import io.vertx.core.net.impl.FutureListenerAdapter;
import io.vertx.core.net.impl.SocketAddressImpl;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.net.impl.transport.Transport;
@@ -213,7 +213,7 @@ public class DatagramSocketImpl implements DatagramSocket, MetricsProvider {
@SuppressWarnings("unchecked")
final void addListener(ChannelFuture future, Handler<AsyncResult<Void>> handler) {
if (handler != null) {
future.addListener(new ChannelFutureListenerAdapter<>(context, null, handler));
future.addListener(FutureListenerAdapter.toVoid(context, handler));
}
}
@@ -310,7 +310,7 @@ public class DatagramSocketImpl implements DatagramSocket, MetricsProvider {
channel.flush();
ChannelFuture future = channel.close();
if (handler != null) {
future.addListener(new ChannelFutureListenerAdapter<>(context, null, handler));
future.addListener(FutureListenerAdapter.toVoid(context, handler));
}
}

View File

@@ -25,6 +25,7 @@ import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFram
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateClientExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
@@ -37,6 +38,7 @@ import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.FutureListenerAdapter;
import io.vertx.core.net.impl.NetSocketImpl;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.HttpClientMetrics;
@@ -298,8 +300,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
}
private void sendRequest(
HttpRequest request, ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler) {
private void sendRequest(HttpRequest request, ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler) {
if (end) {
if (buf != null) {
request = new AssembledFullHttpRequest(request, buf);
@@ -311,7 +312,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
request = new AssembledHttpRequest(request, buf);
}
}
conn.writeToChannel(request, conn.toPromise(handler));
conn.writeToChannel(request, conn.toPromise(toFutureListener(handler)));
}
private boolean handleChunk(Buffer buff) {
@@ -335,7 +336,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
msg = new DefaultHttpContent(buff);
}
bytesWritten += msg.content().readableBytes();
conn.writeToChannel(msg, conn.toPromise(handler));
conn.writeToChannel(msg, conn.toPromise(toFutureListener(handler)));
}
@Override
@@ -543,6 +544,10 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
promise.tryFail(cause);
}
}
private FutureListener<Void> toFutureListener(Handler<AsyncResult<Void>> handler) {
return handler == null ? null : FutureListenerAdapter.toVoid(context, handler);
}
}
private void checkLifecycle() {

View File

@@ -37,8 +37,8 @@ import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.impl.ChannelFutureListenerAdapter;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.FutureListenerAdapter;
import static io.vertx.core.net.impl.VertxHandler.safeBuffer;
@@ -156,12 +156,12 @@ abstract class Http1xConnectionBase<S extends WebSocketImplBase<S>> extends Conn
fut.addListener((ChannelFutureListener) f -> {
ChannelFuture closeFut = chctx.channel().close();
if (handler != null) {
closeFut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler));
closeFut.addListener(FutureListenerAdapter.toVoid(context, handler));
}
});
} else {
if (handler != null) {
fut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler));
fut.addListener(FutureListenerAdapter.toVoid(context, handler));
}
}
});

View File

@@ -22,6 +22,7 @@ import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
@@ -428,12 +429,12 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
}
@Override
public void writeBuffer(ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler) {
public void writeBuffer(ByteBuf buf, boolean end, Handler<AsyncResult<Void>> listener) {
if (buf == null && end) {
buf = Unpooled.EMPTY_BUFFER;
}
if (buf != null) {
writeData(buf, end, handler);
writeData(buf, end, listener);
}
if (end) {
handlerContext.flush();

View File

@@ -38,6 +38,7 @@ import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.FutureListenerAdapter;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -408,7 +409,7 @@ abstract class Http2ConnectionBase extends ConnectionBase implements Http2FrameL
@Override
public Future<Void> close() {
Promise<Void> promise = Promise.promise();
ChannelPromise channelPromise = toPromise(promise);
ChannelPromise channelPromise = chctx.newPromise().addListener(FutureListenerAdapter.toVoid(promise));
flush(channelPromise);
channelPromise.addListener((ChannelFutureListener) future -> shutdown(0L));
return promise.future();

View File

@@ -29,7 +29,6 @@ import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.FileSystem;
import io.vertx.core.file.FileSystemException;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpHeaders;
@@ -41,13 +40,11 @@ import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.streams.WriteStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.NoSuchFileException;
import java.util.Map;
import static io.vertx.core.http.HttpHeaders.SET_COOKIE;

View File

@@ -109,7 +109,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
boolean end,
StreamPriority priority,
Handler<Void> continueHandler,
Handler<AsyncResult<Void>> handler) {
Handler<AsyncResult<Void>> listener) {
ChannelPipeline pipeline = conn.channel().pipeline();
HttpClientCodec httpCodec = pipeline.get(HttpClientCodec.class);
class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {
@@ -170,7 +170,7 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536);
pipeline.addAfter("codec", null, new UpgradeRequestHandler());
pipeline.addAfter("codec", null, upgradeHandler);
stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, priority, continueHandler, handler);
stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, priority, continueHandler, listener);
}
@Override

View File

@@ -12,6 +12,7 @@
package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
@@ -43,8 +44,8 @@ public interface HttpClientStream {
HttpConnection connection();
ContextInternal getContext();
void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler<Void> contHandler, Handler<AsyncResult<Void>> handler);
void writeBuffer(ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler);
void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler<Void> contHandler, Handler<AsyncResult<Void>> listener);
void writeBuffer(ByteBuf buf, boolean end, Handler<AsyncResult<Void>> listener);
void writeFrame(int type, int flags, ByteBuf payload);
void doSetWriteQueueMaxSize(int size);

View File

@@ -22,6 +22,7 @@ import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.FutureListener;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
@@ -40,6 +41,7 @@ import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.FutureListenerAdapter;
import io.vertx.core.spi.metrics.Metrics;
import java.io.File;
@@ -311,7 +313,7 @@ public class HttpServerResponseImpl implements HttpServerResponse {
@Override
public void write(Buffer chunk, Handler<AsyncResult<Void>> handler) {
write(chunk.getByteBuf(), conn.toPromise(handler));
write(chunk.getByteBuf(), handler);
}
@Override
@@ -323,7 +325,7 @@ public class HttpServerResponseImpl implements HttpServerResponse {
@Override
public void write(String chunk, String enc, Handler<AsyncResult<Void>> handler) {
write(Buffer.buffer(chunk, enc).getByteBuf(), conn.toPromise(handler));
write(Buffer.buffer(chunk, enc).getByteBuf(), handler);
}
@Override
@@ -335,7 +337,7 @@ public class HttpServerResponseImpl implements HttpServerResponse {
@Override
public void write(String chunk, Handler<AsyncResult<Void>> handler) {
write(Buffer.buffer(chunk).getByteBuf(), conn.toPromise(handler));
write(Buffer.buffer(chunk).getByteBuf(), handler);
}
@Override
@@ -373,10 +375,6 @@ public class HttpServerResponseImpl implements HttpServerResponse {
@Override
public void end(Buffer chunk, Handler<AsyncResult<Void>> handler) {
end(chunk, conn.toPromise(handler));
}
private void end(Buffer chunk, ChannelPromise promise) {
synchronized (conn) {
if (written) {
throw new IllegalStateException(RESPONSE_WRITTEN);
@@ -392,7 +390,7 @@ public class HttpServerResponseImpl implements HttpServerResponse {
} else {
msg = new AssembledLastHttpContent(data, trailingHeaders);
}
conn.writeToChannel(msg, promise);
conn.writeToChannel(msg, conn.toPromise(toFutureListener(handler)));
written = true;
conn.responseComplete();
if (bodyEndHandler != null) {
@@ -677,7 +675,7 @@ public class HttpServerResponseImpl implements HttpServerResponse {
}
}
private HttpServerResponseImpl write(ByteBuf chunk, ChannelPromise promise) {
private HttpServerResponseImpl write(ByteBuf chunk, Handler<AsyncResult<Void>> handler) {
synchronized (conn) {
if (written) {
throw new IllegalStateException("Response has already been written");
@@ -695,7 +693,7 @@ public class HttpServerResponseImpl implements HttpServerResponse {
} else {
msg = new DefaultHttpContent(chunk);
}
conn.writeToChannel(msg, promise);
conn.writeToChannel(msg, conn.toPromise(toFutureListener(handler)));
return this;
}
}
@@ -769,4 +767,8 @@ public class HttpServerResponseImpl implements HttpServerResponse {
public @Nullable Cookie removeCookie(String name, boolean invalidate) {
return CookieImpl.removeCookie(cookies(), name, invalidate);
}
private FutureListener<Void> toFutureListener(Handler<AsyncResult<Void>> handler) {
return handler == null ? null : FutureListenerAdapter.toVoid(context, handler);
}
}

View File

@@ -20,6 +20,7 @@ import io.netty.handler.codec.http2.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
@@ -186,39 +187,24 @@ class VertxHttp2ConnectionHandler<C extends Http2ConnectionBase> extends Http2Co
//
void writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive, Handler<AsyncResult<Void>> handler) {
void writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive, FutureListener<Void> listener) {
EventExecutor executor = chctx.executor();
ChannelPromise promise = createPromise(handler);
if (executor.inEventLoop()) {
_writeHeaders(stream, headers, end, streamDependency, weight, exclusive, promise);
_writeHeaders(stream, headers, end, streamDependency, weight, exclusive, listener);
} else {
executor.execute(() -> {
_writeHeaders(stream, headers, end, streamDependency, weight, exclusive, promise);
_writeHeaders(stream, headers, end, streamDependency, weight, exclusive, listener);
});
}
}
private void _writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive, ChannelPromise promise) {
private void _writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive, FutureListener<Void> listener) {
ChannelPromise promise = listener == null ? chctx.voidPromise() : chctx.newPromise().addListener(listener);
encoder().writeHeaders(chctx, stream.id(), headers, streamDependency, weight, exclusive, 0, end, promise);
}
private ChannelPromise createPromise(Handler<AsyncResult<Void>> handler) {
ChannelPromise promise = chctx.newPromise();
if (handler != null) {
promise.addListener((future) -> {
if(future.isSuccess()) {
handler.handle(Future.succeededFuture());
} else {
handler.handle(Future.failedFuture(future.cause()));
}
});
}
return promise;
}
void writeData(Http2Stream stream, ByteBuf chunk, boolean end, Handler<AsyncResult<Void>> handler) {
void writeData(Http2Stream stream, ByteBuf chunk, boolean end, FutureListener<Void> promise) {
EventExecutor executor = chctx.executor();
ChannelPromise promise = createPromise(handler);
if (executor.inEventLoop()) {
_writeData(stream, chunk, end, promise);
} else {
@@ -228,7 +214,8 @@ class VertxHttp2ConnectionHandler<C extends Http2ConnectionBase> extends Http2Co
}
}
private void _writeData(Http2Stream stream, ByteBuf chunk, boolean end, ChannelPromise promise) {
private void _writeData(Http2Stream stream, ByteBuf chunk, boolean end, FutureListener<Void> listener) {
ChannelPromise promise = listener == null ? chctx.voidPromise() : chctx.newPromise().addListener(listener);
encoder().writeData(chctx, stream.id(), chunk, 0, end, promise);
Http2RemoteFlowController controller = encoder().flowController();
if (!controller.isWritable(stream) || end) {

View File

@@ -28,6 +28,7 @@ import io.vertx.core.http.StreamResetException;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.FutureListenerAdapter;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
@@ -237,7 +238,7 @@ class VertxHttp2NetSocket<C extends Http2ConnectionBase> extends VertxHttp2Strea
@Override
public void write(Buffer message, Handler<AsyncResult<Void>> handler) {
conn.handler.writeData(stream, message.getByteBuf(), false, handler);
conn.handler.writeData(stream, message.getByteBuf(), false, FutureListenerAdapter.toVoid(context, handler));
}
@Override

View File

@@ -16,6 +16,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.EmptyHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
@@ -23,6 +24,7 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.impl.FutureListenerAdapter;
import io.vertx.core.streams.impl.InboundBuffer;
/**
@@ -138,7 +140,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
}
void writeHeaders(Http2Headers headers, boolean end, Handler<AsyncResult<Void>> handler) {
conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(), handler);
conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(), toFutureListener(handler));
}
private void writePriorityFrame(StreamPriority priority) {
@@ -151,7 +153,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
void writeData(ByteBuf chunk, boolean end, Handler<AsyncResult<Void>> handler) {
bytesWritten += chunk.readableBytes();
conn.handler.writeData(stream, chunk, end, handler);
conn.handler.writeData(stream, chunk, end, toFutureListener(handler));
}
void writeReset(long code) {
@@ -198,4 +200,8 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
}
abstract void handlePriorityChange(StreamPriority streamPriority);
public FutureListener<Void> toFutureListener(Handler<AsyncResult<Void>> handler) {
return handler == null ? null : FutureListenerAdapter.toVoid(context, handler);
}
}

View File

@@ -28,6 +28,7 @@ import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.FutureListenerAdapter;
import io.vertx.core.streams.impl.InboundBuffer;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -365,7 +366,7 @@ public abstract class WebSocketImplBase<S extends WebSocketBase> implements WebS
synchronized (conn) {
checkClosed();
conn.reportBytesWritten(((WebSocketFrameInternal)frame).length());
conn.writeToChannel(conn.encodeFrame((WebSocketFrameImpl) frame), conn.toPromise(handler));
conn.writeToChannel(conn.encodeFrame((WebSocketFrameImpl) frame), conn.toPromise(FutureListenerAdapter.toVoid(handler)));
}
return (S) this;
}

View File

@@ -13,13 +13,13 @@ package io.vertx.core.impl;
import io.netty.resolver.AddressResolverGroup;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.dns.AddressResolverOptions;
import io.vertx.core.impl.launcher.commands.ExecUtils;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.impl.FutureListenerAdapter;
import io.vertx.core.spi.resolver.ResolverProvider;
import java.io.File;
@@ -81,16 +81,7 @@ public class AddressResolver {
ContextInternal callback = (ContextInternal) vertx.getOrCreateContext();
io.netty.resolver.AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(callback.nettyEventLoop());
io.netty.util.concurrent.Future<InetSocketAddress> fut = resolver.resolve(InetSocketAddress.createUnresolved(hostname, 0));
fut.addListener(a -> {
callback.runOnContext(v -> {
if (a.isSuccess()) {
InetSocketAddress address = fut.getNow();
resultHandler.handle(Future.succeededFuture(address.getAddress()));
} else {
resultHandler.handle(Future.failedFuture(a.cause()));
}
});
});
fut.addListener(FutureListenerAdapter.toValue(callback, InetSocketAddress::getAddress, resultHandler));
}
AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {

View File

@@ -1,41 +0,0 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.net.impl;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal;
/**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/
public final class ChannelFutureListenerAdapter<T> implements ChannelFutureListener {
private final Handler<AsyncResult<T>> handler;
private final T result;
private final ContextInternal context;
public ChannelFutureListenerAdapter(ContextInternal context, T result, Handler<AsyncResult<T>> handler) {
this.handler = handler;
this.result = result;
this.context = context;
}
@Override
public void operationComplete(ChannelFuture future) {
Future<T> res = future.isSuccess() ? Future.succeededFuture(result) : Future.failedFuture(future.cause());
context.executeFromIO(res, handler);
}
}

View File

@@ -15,6 +15,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
@@ -129,20 +130,13 @@ public abstract class ConnectionBase {
* @param handler the handler
* @return a promise
*/
public final ChannelPromise toPromise(Handler<AsyncResult<Void>> handler) {
public final ChannelPromise toPromise(FutureListener<Void> handler) {
return handler == null ? voidPromise : wrap(handler);
}
private ChannelPromise wrap(Handler<AsyncResult<Void>> handler) {
private ChannelPromise wrap(FutureListener<Void> handler) {
ChannelPromise promise = chctx.newPromise();
promise.addListener((fut) -> {
if (fut.isSuccess()) {
handler.handle(Future.succeededFuture());
} else {
handler.handle(Future.failedFuture(fut.cause()));
}
}
);
promise.addListener(handler);
return promise;
}
@@ -225,7 +219,7 @@ public abstract class ConnectionBase {
.addListener((ChannelFutureListener) f -> {
ChannelFuture closeFut = chctx.channel().close();
if (handler != null) {
closeFut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler));
closeFut.addListener(FutureListenerAdapter.toVoid(context, handler));
}
});
flush(promise);

View File

@@ -0,0 +1,70 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.net.impl;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal;
import java.util.function.Function;
/**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/
public final class FutureListenerAdapter {
public static <U, T> FutureListener<U> toValue(ContextInternal context, Function<U, T> adapter, Handler<AsyncResult<T>> handler) {
if (handler != null) {
return future -> {
Future<T> res = future.isSuccess() ? Future.succeededFuture(adapter.apply(future.getNow())) : Future.failedFuture(future.cause());
context.executeFromIO(res, handler);
};
} else {
return null;
}
}
public static <U, T> FutureListener<U> toValue(Function<U, T> adapter, Handler<AsyncResult<T>> handler) {
if (handler != null) {
return future -> {
Future<T> res = future.isSuccess() ? Future.succeededFuture(adapter.apply(future.getNow())) : Future.failedFuture(future.cause());
handler.handle(res);
};
} else {
return null;
}
}
public static FutureListener<Void> toVoid(ContextInternal context, Handler<AsyncResult<Void>> handler) {
if (handler != null) {
return future -> {
Future<Void> res = future.isSuccess() ? Future.succeededFuture() : Future.failedFuture(future.cause());
context.executeFromIO(res, handler);
};
} else {
return null;
}
}
public static FutureListener<Void> toVoid(Handler<AsyncResult<Void>> handler) {
if (handler != null) {
return future -> {
Future<Void> res = future.isSuccess() ? Future.succeededFuture() : Future.failedFuture(future.cause());
handler.handle(res);
};
} else {
return null;
}
}
}

View File

@@ -131,7 +131,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
@Override
public NetSocketInternal writeMessage(Object message, Handler<AsyncResult<Void>> handler) {
writeToChannel(message, toPromise(handler));
writeToChannel(message, toPromise(FutureListenerAdapter.toVoid(handler)));
return this;
}