From 9c87fdfda063e977ed238690ac9c739cfb4f550d Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 15 Oct 2019 10:54:24 +0200 Subject: [PATCH] Use Netty future/promise in AsyncResolveConnectHelper to remove unecessary concurrency - fixes #3142 --- .../vertx/core/http/impl/HttpServerImpl.java | 15 +-- .../net/impl/AsyncResolveConnectHelper.java | 53 ++------- .../io/vertx/core/net/impl/NetServerImpl.java | 20 ++-- .../core/SharedServersConcurrencyTest.java | 101 ++++++++++++++++++ 4 files changed, 130 insertions(+), 59 deletions(-) create mode 100644 src/test/java/io/vertx/core/SharedServersConcurrencyTest.java diff --git a/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java b/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java index b5939e018..4428519ba 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java @@ -17,6 +17,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GlobalEventExecutor; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; @@ -71,7 +72,7 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider { private ChannelGroup serverChannelGroup; private volatile boolean listening; - private AsyncResolveConnectHelper bindFuture; + private io.netty.util.concurrent.Future bindFuture; private ServerID id; private HttpServerImpl actualServer; private volatile int actualPort; @@ -243,13 +244,13 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider { addHandlers(this, listenContext); try { bindFuture = AsyncResolveConnectHelper.doBind(vertx, address, bootstrap); - bindFuture.addListener(res -> { - if (res.failed()) { + bindFuture.addListener((GenericFutureListener>) res -> { + if (!res.isSuccess()) { synchronized (sharedHttpServers) { sharedHttpServers.remove(id); } } else { - Channel serverChannel = res.result(); + Channel serverChannel = res.getNow(); if (serverChannel.localAddress() instanceof InetSocketAddress) { HttpServerImpl.this.actualPort = ((InetSocketAddress)serverChannel.localAddress()).getPort(); } else { @@ -279,17 +280,17 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider { VertxMetrics metrics = vertx.metricsSPI(); this.metrics = metrics != null ? metrics.createHttpServerMetrics(options, address) : null; } - actualServer.bindFuture.addListener(future -> { + actualServer.bindFuture.addListener((GenericFutureListener>) future -> { if (listenHandler != null) { final AsyncResult res; - if (future.succeeded()) { + if (future.isSuccess()) { res = Future.succeededFuture(HttpServerImpl.this); } else { res = Future.failedFuture(future.cause()); listening = false; } listenContext.runOnContext((v) -> listenHandler.handle(res)); - } else if (future.failed()) { + } else if (!future.isSuccess()) { listening = false; if (metrics != null) { metrics.close(); diff --git a/src/main/java/io/vertx/core/net/impl/AsyncResolveConnectHelper.java b/src/main/java/io/vertx/core/net/impl/AsyncResolveConnectHelper.java index 6ec25cafb..a89f5253f 100644 --- a/src/main/java/io/vertx/core/net/impl/AsyncResolveConnectHelper.java +++ b/src/main/java/io/vertx/core/net/impl/AsyncResolveConnectHelper.java @@ -14,67 +14,36 @@ package io.vertx.core.net.impl; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; +import io.netty.util.concurrent.Promise; import io.vertx.core.impl.VertxInternal; import io.vertx.core.net.SocketAddress; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; /** * @author Tim Fox */ public class AsyncResolveConnectHelper { - private List>> handlers = new ArrayList<>(); - private ChannelFuture future; - private AsyncResult result; - - public synchronized void addListener(Handler> handler) { - if (result != null) { - if (future != null) { - future.addListener(v -> handler.handle(result)); - } else { - handler.handle(result); - } - } else { - handlers.add(handler); - } - } - - private synchronized void handle(ChannelFuture cf, AsyncResult res) { - if (result == null) { - for (Handler> handler: handlers) { - handler.handle(res); - } - future = cf; - result = res; - } else { - throw new IllegalStateException("Already complete!"); - } - } - private static void checkPort(int port) { if (port < 0 || port > 65535) { throw new IllegalArgumentException("Invalid port " + port); } } - public static AsyncResolveConnectHelper doBind(VertxInternal vertx, SocketAddress socketAddress, - ServerBootstrap bootstrap) { - AsyncResolveConnectHelper asyncResolveConnectHelper = new AsyncResolveConnectHelper(); + public static io.netty.util.concurrent.Future doBind(VertxInternal vertx, + SocketAddress socketAddress, + ServerBootstrap bootstrap) { + Promise promise = vertx.getAcceptorEventLoopGroup().next().newPromise(); bootstrap.channelFactory(vertx.transport().serverChannelFactory(socketAddress.path() != null)); if (socketAddress.path() != null) { java.net.SocketAddress converted = vertx.transport().convert(socketAddress, true); ChannelFuture future = bootstrap.bind(converted); future.addListener(f -> { if (f.isSuccess()) { - asyncResolveConnectHelper.handle(future, Future.succeededFuture(future.channel())); + promise.setSuccess(future.channel()); } else { - asyncResolveConnectHelper.handle(future, Future.failedFuture(f.cause())); + promise.setFailure(f.cause()); } }); } else { @@ -86,16 +55,16 @@ public class AsyncResolveConnectHelper { ChannelFuture future = bootstrap.bind(t); future.addListener(f -> { if (f.isSuccess()) { - asyncResolveConnectHelper.handle(future, Future.succeededFuture(future.channel())); + promise.setSuccess(future.channel()); } else { - asyncResolveConnectHelper.handle(future, Future.failedFuture(f.cause())); + promise.setFailure(f.cause()); } }); } else { - asyncResolveConnectHelper.handle(null, Future.failedFuture(res.cause())); + promise.setFailure(res.cause()); } }); } - return asyncResolveConnectHelper; + return promise; } } diff --git a/src/main/java/io/vertx/core/net/impl/NetServerImpl.java b/src/main/java/io/vertx/core/net/impl/NetServerImpl.java index fc60ff163..af8fd3c39 100644 --- a/src/main/java/io/vertx/core/net/impl/NetServerImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetServerImpl.java @@ -23,6 +23,7 @@ import io.netty.handler.ssl.SniHandler; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GlobalEventExecutor; import io.vertx.core.*; import io.vertx.core.impl.ContextInternal; @@ -71,7 +72,7 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer { private Handler registeredHandler; private volatile ServerID id; private NetServerImpl actualServer; - private AsyncResolveConnectHelper bindFuture; + private io.netty.util.concurrent.Future bindFuture; private volatile int actualPort; private ContextInternal listenContext; private TCPMetrics metrics; @@ -182,7 +183,7 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer { bootstrap.childHandler(new ChannelInitializer() { @Override - protected void initChannel(Channel ch) throws Exception { + protected void initChannel(Channel ch) { if (!accept()) { ch.close(); return; @@ -225,9 +226,9 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer { try { bindFuture = AsyncResolveConnectHelper.doBind(vertx, socketAddress, bootstrap); - bindFuture.addListener(res -> { - if (res.succeeded()) { - Channel ch = res.result(); + bindFuture.addListener((GenericFutureListener>) res -> { + if (res.isSuccess()) { + Channel ch = res.getNow(); log.trace("Net server listening on " + (hostOrPath) + ":" + ch.localAddress()); // Update port to actual port - wildcard port 0 might have been used if (NetServerImpl.this.actualPort != -1) { @@ -274,10 +275,10 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer { } // just add it to the future so it gets notified once the bind is complete - actualServer.bindFuture.addListener(res -> { + actualServer.bindFuture.addListener((GenericFutureListener>) res -> { if (listenHandler != null) { AsyncResult ares; - if (res.succeeded()) { + if (res.isSuccess()) { ares = Future.succeededFuture(); } else { listening = false; @@ -287,14 +288,13 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer { // Netty will call future handler immediately with calling thread // which might be a non Vert.x thread (if running embedded) listenContext.runOnContext(v -> listenHandler.handle(ares)); - } else if (res.failed()) { + } else if (!res.isSuccess()) { // No handler - log so user can see failure log.error("Failed to listen", res.cause()); listening = false; } }); } - return; } public Future close() { @@ -429,7 +429,7 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer { return !listening; } - public synchronized int actualPort() { + public int actualPort() { return actualPort; } diff --git a/src/test/java/io/vertx/core/SharedServersConcurrencyTest.java b/src/test/java/io/vertx/core/SharedServersConcurrencyTest.java new file mode 100644 index 000000000..a48bb2b79 --- /dev/null +++ b/src/test/java/io/vertx/core/SharedServersConcurrencyTest.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core; + +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.net.NetServerOptions; +import io.vertx.test.core.Repeat; +import io.vertx.test.core.VertxTestBase; +import org.junit.Test; + +/** + * Orginial reproducer from https://github.com/LarsKrogJensen + */ +public class SharedServersConcurrencyTest extends VertxTestBase { + + @Test + @Repeat(times = 100) + public void testConcurrency() { + deployVerticle(vertx, new MonitorVerticle()) + .compose(__ -> deployVerticle(vertx, new RestVerticle())) + .compose(__ -> deployVerticle(vertx, new ApiVerticle())) + .setHandler(onSuccess(__ -> testComplete())); + await(); + } + + private static class ApiVerticle extends AbstractVerticle { + @Override + public void start(Promise startPromise) { + vertx.deployVerticle(() -> new NetVerticle(), new DeploymentOptions().setInstances(32), ar -> { + if (ar.succeeded()) { + startPromise.complete(); + } else { + startPromise.fail(ar.cause()); + } + }); + } + } + + private static class NetVerticle extends AbstractVerticle { + @Override + public void start(Promise startPromise) { + vertx.createNetServer(new NetServerOptions().setPort(20152)) + .connectHandler(netSocket -> { + }) + .listen(ar -> { + if (ar.succeeded()) { + startPromise.complete(); + } else { + startPromise.fail(ar.cause()); + } + }); + } + } + + private static class RestVerticle extends AbstractVerticle { + @Override + public void start(Promise startPromise) { + vertx.createHttpServer(new HttpServerOptions()) + .requestHandler(req -> { + }) + .listen(15152, ar -> { + if (ar.succeeded()) { + System.out.println("REST listening on port: " + ar.result().actualPort()); + startPromise.complete(); + } else { + startPromise.fail(ar.cause()); + } + }); + } + } + + private static class MonitorVerticle extends AbstractVerticle { + @Override + public void start(Promise startPromise) { + vertx.createHttpServer(new HttpServerOptions()) + .requestHandler(req -> { + }) + .listen(16152, ar -> { + if (ar.succeeded()) { + System.out.println("Monitor listening on port: " + ar.result().actualPort()); + startPromise.complete(); + } else { + startPromise.fail(ar.cause()); + } + }); + } + } + + private static Future deployVerticle(Vertx vertx, Verticle verticle) { + return Future.future(promise -> vertx.deployVerticle(verticle, promise)); + } +}