mirror of
https://github.com/jlengrand/vert.x.git
synced 2026-03-10 08:51:19 +00:00
Use Netty future/promise in AsyncResolveConnectHelper to remove unecessary concurrency - fixes #3142
This commit is contained in:
@@ -17,6 +17,7 @@ import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.group.ChannelGroup;
|
||||
import io.netty.channel.group.ChannelGroupFuture;
|
||||
import io.netty.channel.group.DefaultChannelGroup;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||
import io.vertx.core.*;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
@@ -71,7 +72,7 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
|
||||
|
||||
private ChannelGroup serverChannelGroup;
|
||||
private volatile boolean listening;
|
||||
private AsyncResolveConnectHelper bindFuture;
|
||||
private io.netty.util.concurrent.Future<Channel> bindFuture;
|
||||
private ServerID id;
|
||||
private HttpServerImpl actualServer;
|
||||
private volatile int actualPort;
|
||||
@@ -233,13 +234,13 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
|
||||
addHandlers(this, listenContext);
|
||||
try {
|
||||
bindFuture = AsyncResolveConnectHelper.doBind(vertx, address, bootstrap);
|
||||
bindFuture.addListener(res -> {
|
||||
if (res.failed()) {
|
||||
bindFuture.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Channel>>) res -> {
|
||||
if (!res.isSuccess()) {
|
||||
synchronized (sharedHttpServers) {
|
||||
sharedHttpServers.remove(id);
|
||||
}
|
||||
} else {
|
||||
Channel serverChannel = res.result();
|
||||
Channel serverChannel = res.getNow();
|
||||
if (serverChannel.localAddress() instanceof InetSocketAddress) {
|
||||
HttpServerImpl.this.actualPort = ((InetSocketAddress)serverChannel.localAddress()).getPort();
|
||||
} else {
|
||||
@@ -269,17 +270,17 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {
|
||||
VertxMetrics metrics = vertx.metricsSPI();
|
||||
this.metrics = metrics != null ? metrics.createHttpServerMetrics(options, address) : null;
|
||||
}
|
||||
actualServer.bindFuture.addListener(future -> {
|
||||
actualServer.bindFuture.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Channel>>) future -> {
|
||||
if (listenHandler != null) {
|
||||
final AsyncResult<HttpServer> res;
|
||||
if (future.succeeded()) {
|
||||
if (future.isSuccess()) {
|
||||
res = Future.succeededFuture(HttpServerImpl.this);
|
||||
} else {
|
||||
res = Future.failedFuture(future.cause());
|
||||
listening = false;
|
||||
}
|
||||
listenContext.runOnContext((v) -> listenHandler.handle(res));
|
||||
} else if (future.failed()) {
|
||||
} else if (!future.isSuccess()) {
|
||||
listening = false;
|
||||
if (metrics != null) {
|
||||
metrics.close();
|
||||
|
||||
@@ -14,67 +14,36 @@ package io.vertx.core.net.impl;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.vertx.core.AsyncResult;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Handler;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.vertx.core.impl.VertxInternal;
|
||||
import io.vertx.core.net.SocketAddress;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author <a href="http://tfox.org">Tim Fox</a>
|
||||
*/
|
||||
public class AsyncResolveConnectHelper {
|
||||
|
||||
private List<Handler<AsyncResult<Channel>>> handlers = new ArrayList<>();
|
||||
private ChannelFuture future;
|
||||
private AsyncResult<Channel> result;
|
||||
|
||||
public synchronized void addListener(Handler<AsyncResult<Channel>> handler) {
|
||||
if (result != null) {
|
||||
if (future != null) {
|
||||
future.addListener(v -> handler.handle(result));
|
||||
} else {
|
||||
handler.handle(result);
|
||||
}
|
||||
} else {
|
||||
handlers.add(handler);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void handle(ChannelFuture cf, AsyncResult<Channel> res) {
|
||||
if (result == null) {
|
||||
for (Handler<AsyncResult<Channel>> handler: handlers) {
|
||||
handler.handle(res);
|
||||
}
|
||||
future = cf;
|
||||
result = res;
|
||||
} else {
|
||||
throw new IllegalStateException("Already complete!");
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkPort(int port) {
|
||||
if (port < 0 || port > 65535) {
|
||||
throw new IllegalArgumentException("Invalid port " + port);
|
||||
}
|
||||
}
|
||||
|
||||
public static AsyncResolveConnectHelper doBind(VertxInternal vertx, SocketAddress socketAddress,
|
||||
ServerBootstrap bootstrap) {
|
||||
AsyncResolveConnectHelper asyncResolveConnectHelper = new AsyncResolveConnectHelper();
|
||||
public static io.netty.util.concurrent.Future<Channel> doBind(VertxInternal vertx,
|
||||
SocketAddress socketAddress,
|
||||
ServerBootstrap bootstrap) {
|
||||
Promise<Channel> promise = vertx.getAcceptorEventLoopGroup().next().newPromise();
|
||||
bootstrap.channelFactory(vertx.transport().serverChannelFactory(socketAddress.path() != null));
|
||||
if (socketAddress.path() != null) {
|
||||
java.net.SocketAddress converted = vertx.transport().convert(socketAddress, true);
|
||||
ChannelFuture future = bootstrap.bind(converted);
|
||||
future.addListener(f -> {
|
||||
if (f.isSuccess()) {
|
||||
asyncResolveConnectHelper.handle(future, Future.succeededFuture(future.channel()));
|
||||
promise.setSuccess(future.channel());
|
||||
} else {
|
||||
asyncResolveConnectHelper.handle(future, Future.failedFuture(f.cause()));
|
||||
promise.setFailure(f.cause());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
@@ -86,16 +55,16 @@ public class AsyncResolveConnectHelper {
|
||||
ChannelFuture future = bootstrap.bind(t);
|
||||
future.addListener(f -> {
|
||||
if (f.isSuccess()) {
|
||||
asyncResolveConnectHelper.handle(future, Future.succeededFuture(future.channel()));
|
||||
promise.setSuccess(future.channel());
|
||||
} else {
|
||||
asyncResolveConnectHelper.handle(future, Future.failedFuture(f.cause()));
|
||||
promise.setFailure(f.cause());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
asyncResolveConnectHelper.handle(null, Future.failedFuture(res.cause()));
|
||||
promise.setFailure(res.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
return asyncResolveConnectHelper;
|
||||
return promise;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import io.netty.handler.ssl.SniHandler;
|
||||
import io.netty.handler.ssl.SslHandler;
|
||||
import io.netty.handler.stream.ChunkedWriteHandler;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||
import io.vertx.core.*;
|
||||
import io.vertx.core.impl.ContextInternal;
|
||||
@@ -72,7 +73,7 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer {
|
||||
private Handler<NetSocket> registeredHandler;
|
||||
private volatile ServerID id;
|
||||
private NetServerImpl actualServer;
|
||||
private AsyncResolveConnectHelper bindFuture;
|
||||
private io.netty.util.concurrent.Future<Channel> bindFuture;
|
||||
private volatile int actualPort;
|
||||
private ContextInternal listenContext;
|
||||
private TCPMetrics metrics;
|
||||
@@ -186,7 +187,7 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer {
|
||||
|
||||
bootstrap.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
protected void initChannel(Channel ch) {
|
||||
if (!accept()) {
|
||||
ch.close();
|
||||
return;
|
||||
@@ -229,9 +230,9 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer {
|
||||
|
||||
try {
|
||||
bindFuture = AsyncResolveConnectHelper.doBind(vertx, socketAddress, bootstrap);
|
||||
bindFuture.addListener(res -> {
|
||||
if (res.succeeded()) {
|
||||
Channel ch = res.result();
|
||||
bindFuture.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Channel>>) res -> {
|
||||
if (res.isSuccess()) {
|
||||
Channel ch = res.getNow();
|
||||
log.trace("Net server listening on " + (hostOrPath) + ":" + ch.localAddress());
|
||||
// Update port to actual port - wildcard port 0 might have been used
|
||||
if (NetServerImpl.this.actualPort != -1) {
|
||||
@@ -278,10 +279,10 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer {
|
||||
}
|
||||
|
||||
// just add it to the future so it gets notified once the bind is complete
|
||||
actualServer.bindFuture.addListener(res -> {
|
||||
actualServer.bindFuture.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Channel>>) res -> {
|
||||
if (listenHandler != null) {
|
||||
AsyncResult<Void> ares;
|
||||
if (res.succeeded()) {
|
||||
if (res.isSuccess()) {
|
||||
ares = Future.succeededFuture();
|
||||
} else {
|
||||
listening = false;
|
||||
@@ -291,14 +292,13 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer {
|
||||
// Netty will call future handler immediately with calling thread
|
||||
// which might be a non Vert.x thread (if running embedded)
|
||||
listenContext.runOnContext(v -> listenHandler.handle(ares));
|
||||
} else if (res.failed()) {
|
||||
} else if (!res.isSuccess()) {
|
||||
// No handler - log so user can see failure
|
||||
log.error("Failed to listen", res.cause());
|
||||
listening = false;
|
||||
}
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
public synchronized void close() {
|
||||
@@ -424,7 +424,7 @@ public class NetServerImpl implements Closeable, MetricsProvider, NetServer {
|
||||
return !listening;
|
||||
}
|
||||
|
||||
public synchronized int actualPort() {
|
||||
public int actualPort() {
|
||||
return actualPort;
|
||||
}
|
||||
|
||||
|
||||
101
src/test/java/io/vertx/core/SharedServersConcurrencyTest.java
Normal file
101
src/test/java/io/vertx/core/SharedServersConcurrencyTest.java
Normal file
@@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
*/
|
||||
|
||||
package io.vertx.core;
|
||||
|
||||
import io.vertx.core.http.HttpServerOptions;
|
||||
import io.vertx.core.net.NetServerOptions;
|
||||
import io.vertx.test.core.Repeat;
|
||||
import io.vertx.test.core.VertxTestBase;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Orginial reproducer from https://github.com/LarsKrogJensen
|
||||
*/
|
||||
public class SharedServersConcurrencyTest extends VertxTestBase {
|
||||
|
||||
@Test
|
||||
@Repeat(times = 100)
|
||||
public void testConcurrency() {
|
||||
deployVerticle(vertx, new MonitorVerticle())
|
||||
.compose(__ -> deployVerticle(vertx, new RestVerticle()))
|
||||
.compose(__ -> deployVerticle(vertx, new ApiVerticle()))
|
||||
.setHandler(onSuccess(__ -> testComplete()));
|
||||
await();
|
||||
}
|
||||
|
||||
private static class ApiVerticle extends AbstractVerticle {
|
||||
@Override
|
||||
public void start(Promise<Void> startPromise) {
|
||||
vertx.deployVerticle(() -> new NetVerticle(), new DeploymentOptions().setInstances(32), ar -> {
|
||||
if (ar.succeeded()) {
|
||||
startPromise.complete();
|
||||
} else {
|
||||
startPromise.fail(ar.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static class NetVerticle extends AbstractVerticle {
|
||||
@Override
|
||||
public void start(Promise<Void> startPromise) {
|
||||
vertx.createNetServer(new NetServerOptions().setPort(20152))
|
||||
.connectHandler(netSocket -> {
|
||||
})
|
||||
.listen(ar -> {
|
||||
if (ar.succeeded()) {
|
||||
startPromise.complete();
|
||||
} else {
|
||||
startPromise.fail(ar.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static class RestVerticle extends AbstractVerticle {
|
||||
@Override
|
||||
public void start(Promise<Void> startPromise) {
|
||||
vertx.createHttpServer(new HttpServerOptions())
|
||||
.requestHandler(req -> {
|
||||
})
|
||||
.listen(15152, ar -> {
|
||||
if (ar.succeeded()) {
|
||||
System.out.println("REST listening on port: " + ar.result().actualPort());
|
||||
startPromise.complete();
|
||||
} else {
|
||||
startPromise.fail(ar.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static class MonitorVerticle extends AbstractVerticle {
|
||||
@Override
|
||||
public void start(Promise<Void> startPromise) {
|
||||
vertx.createHttpServer(new HttpServerOptions())
|
||||
.requestHandler(req -> {
|
||||
})
|
||||
.listen(16152, ar -> {
|
||||
if (ar.succeeded()) {
|
||||
System.out.println("Monitor listening on port: " + ar.result().actualPort());
|
||||
startPromise.complete();
|
||||
} else {
|
||||
startPromise.fail(ar.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static Future<String> deployVerticle(Vertx vertx, Verticle verticle) {
|
||||
return Future.future(promise -> vertx.deployVerticle(verticle, promise));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user