diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index d55178f6f..02e0c8da4 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -116,9 +116,9 @@ public class ClusteredEventBus extends EventBusImpl { // Get the HA manager, it has been constructed but it's not yet initialized HAManager haManager = vertx.haManager(); setClusterViewChangedHandler(haManager); - clusterManager.getAsyncMultiMap(SUBS_MAP_NAME, ar2 -> { - if (ar2.succeeded()) { - subs = ar2.result(); + clusterManager.getAsyncMultiMap(SUBS_MAP_NAME, ar1 -> { + if (ar1.succeeded()) { + subs = ar1.result(); server = vertx.createNetServer(getServerOptions()); server.connectHandler(getServerHandler()); @@ -128,25 +128,23 @@ public class ClusteredEventBus extends EventBusImpl { String serverHost = getClusterPublicHost(options); serverID = new ServerID(serverPort, serverHost); nodeInfo = new ClusterNodeInfo(clusterManager.getNodeID(), serverID); - haManager.addDataToAHAInfo(SERVER_ID_HA_KEY, new JsonObject().put("host", serverID.host).put("port", serverID.port)); - if (resultHandler != null) { - started = true; - resultHandler.handle(Future.succeededFuture()); - } + vertx.executeBlocking(fut -> { + haManager.addDataToAHAInfo(SERVER_ID_HA_KEY, new JsonObject().put("host", serverID.host).put("port", serverID.port)); + fut.complete(); + }, false, ar2 -> { + if (ar2.succeeded()) { + started = true; + resultHandler.handle(Future.succeededFuture()); + } else { + resultHandler.handle(Future.failedFuture(ar2.cause())); + } + }); } else { - if (resultHandler != null) { - resultHandler.handle(Future.failedFuture(asyncResult.cause())); - } else { - log.error(asyncResult.cause()); - } + resultHandler.handle(Future.failedFuture(asyncResult.cause())); } }); } else { - if (resultHandler != null) { - resultHandler.handle(Future.failedFuture(ar2.cause())); - } else { - log.error(ar2.cause()); - } + resultHandler.handle(Future.failedFuture(ar1.cause())); } }); } diff --git a/src/main/java/io/vertx/core/impl/HAManager.java b/src/main/java/io/vertx/core/impl/HAManager.java index 97c4c7eda..7edfc7b65 100644 --- a/src/main/java/io/vertx/core/impl/HAManager.java +++ b/src/main/java/io/vertx/core/impl/HAManager.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation + * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License 2.0 which is available at @@ -22,20 +22,15 @@ import io.vertx.core.logging.LoggerFactory; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.NodeListener; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import static java.util.concurrent.TimeUnit.*; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.NANOSECONDS; /** * @@ -103,7 +98,6 @@ public class HAManager { private static final Logger log = LoggerFactory.getLogger(HAManager.class); - private static final String CLUSTER_MAP_NAME = "__vertx.haInfo"; private static final long QUORUM_CHECK_PERIOD = 1000; private final VertxInternal vertx; @@ -125,16 +119,16 @@ public class HAManager { private volatile boolean killed; private Consumer> clusterViewChangedHandler; - public HAManager(VertxInternal vertx, DeploymentManager deploymentManager, - ClusterManager clusterManager, int quorumSize, String group, boolean enabled) { + public HAManager(VertxInternal vertx, DeploymentManager deploymentManager, ClusterManager clusterManager, + Map clusterMap, int quorumSize, String group, boolean enabled) { this.vertx = vertx; this.deploymentManager = deploymentManager; this.clusterManager = clusterManager; + this.clusterMap = clusterMap; this.quorumSize = enabled ? quorumSize : 0; this.group = enabled ? group : "__DISABLED__"; this.enabled = enabled; this.haInfo = new JsonObject().put("verticles", new JsonArray()).put("group", this.group); - this.clusterMap = clusterManager.getSyncMap(CLUSTER_MAP_NAME); this.nodeID = clusterManager.getNodeID(); } @@ -204,7 +198,6 @@ public class HAManager { public void stop() { if (!stopped) { if (clusterManager.isActive()) { - clusterMap.remove(nodeID); } vertx.cancelTimer(quorumTimerID); @@ -271,16 +264,23 @@ public class HAManager { private void doDeployVerticle(final String verticleName, DeploymentOptions deploymentOptions, final Handler> doneHandler) { - final Handler> wrappedHandler = asyncResult -> { - if (asyncResult.succeeded()) { - // Tell the other nodes of the cluster about the verticle for HA purposes - addToHA(asyncResult.result(), verticleName, deploymentOptions); - } - if (doneHandler != null) { - doneHandler.handle(asyncResult); - } else if (asyncResult.failed()) { - log.error("Failed to deploy verticle", asyncResult.cause()); - } + final Handler> wrappedHandler = ar1 -> { + vertx.executeBlocking(fut -> { + if (ar1.succeeded()) { + // Tell the other nodes of the cluster about the verticle for HA purposes + String deploymentID = ar1.result(); + addToHA(deploymentID, verticleName, deploymentOptions); + fut.complete(deploymentID); + } else { + fut.fail(ar1.cause()); + } + }, false, ar2 -> { + if (doneHandler != null) { + doneHandler.handle(ar2); + } else if (ar2.failed()) { + log.error("Failed to deploy verticle", ar2.cause()); + } + }); }; deploymentManager.deployVerticle(verticleName, deploymentOptions, wrappedHandler); } diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java index c02f282c0..e2202aab9 100644 --- a/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation + * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License 2.0 which is available at @@ -16,17 +16,8 @@ import io.netty.channel.EventLoopGroup; import io.netty.resolver.AddressResolverGroup; import io.netty.util.ResourceLeakDetector; import io.netty.util.concurrent.GenericFutureListener; -import io.vertx.core.AsyncResult; -import io.vertx.core.Closeable; -import io.vertx.core.Context; -import io.vertx.core.DeploymentOptions; +import io.vertx.core.*; import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.ServiceHelper; -import io.vertx.core.TimeoutStream; -import io.vertx.core.Verticle; -import io.vertx.core.Vertx; -import io.vertx.core.VertxOptions; import io.vertx.core.datagram.DatagramSocket; import io.vertx.core.datagram.DatagramSocketOptions; import io.vertx.core.datagram.impl.DatagramSocketImpl; @@ -58,9 +49,9 @@ import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.impl.NetClientImpl; import io.vertx.core.net.impl.NetServerImpl; import io.vertx.core.net.impl.ServerID; +import io.vertx.core.net.impl.transport.Transport; import io.vertx.core.shareddata.SharedData; import io.vertx.core.shareddata.impl.SharedDataImpl; -import io.vertx.core.net.impl.transport.Transport; import io.vertx.core.spi.VerticleFactory; import io.vertx.core.spi.VertxMetricsFactory; import io.vertx.core.spi.cluster.ClusterManager; @@ -73,18 +64,8 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -97,6 +78,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider { private static final Logger log = LoggerFactory.getLogger(VertxImpl.class); + private static final String CLUSTER_MAP_NAME = "__vertx.haInfo"; private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio"; private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50); @@ -115,7 +97,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider { static void clusteredVertx(VertxOptions options, Handler> resultHandler) { VertxImpl vertx = new VertxImpl(options); - vertx.initClustered(options, resultHandler); + vertx.joinCluster(options, resultHandler); } private final FileSystem fileSystem = getFileSystem(); @@ -204,28 +186,57 @@ public class VertxImpl implements VertxInternal, MetricsProvider { eventBus.start(ar -> {}); } - private void initClustered(VertxOptions options, Handler> resultHandler) { + private void joinCluster(VertxOptions options, Handler> resultHandler) { clusterManager.setVertx(this); - clusterManager.join(ar1 -> { - if (ar1.failed()) { - log.error("Failed to join cluster", ar1.cause()); - resultHandler.handle(Future.failedFuture(ar1.cause())); + clusterManager.join(ar -> { + if (ar.succeeded()) { + createHaManager(options, resultHandler); } else { - haManager = new HAManager(this, deploymentManager, clusterManager, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled()); - eventBus.start(ar2 -> { - AsyncResult res; - if (ar2.succeeded()) { - // Init the manager (i.e register listener and check the quorum) - // after the event bus has been fully started and updated its state - // it will have also set the clustered changed view handler on the ha manager - haManager.init(); - res = Future.succeededFuture(this); - } else { - log.error("Failed to start event bus", ar2.cause()); - res = Future.failedFuture(ar2.cause()); - } - resultHandler.handle(res); - }); + log.error("Failed to join cluster", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void createHaManager(VertxOptions options, Handler> resultHandler) { + this.>executeBlocking(fut -> { + fut.complete(clusterManager.getSyncMap(CLUSTER_MAP_NAME)); + }, false, ar -> { + if (ar.succeeded()) { + Map clusterMap = ar.result(); + haManager = new HAManager(this, deploymentManager, clusterManager, clusterMap, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled()); + startEventBus(resultHandler); + } else { + log.error("Failed to start HAManager", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void startEventBus(Handler> resultHandler) { + eventBus.start(ar -> { + if (ar.succeeded()) { + initializeHaManager(resultHandler); + } else { + log.error("Failed to start event bus", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void initializeHaManager(Handler> resultHandler) { + this.executeBlocking(fut -> { + // Init the manager (i.e register listener and check the quorum) + // after the event bus has been fully started and updated its state + // it will have also set the clustered changed view handler on the ha manager + haManager.init(); + fut.complete(); + }, false, ar -> { + if (ar.succeeded()) { + resultHandler.handle(Future.succeededFuture(this)); + } else { + log.error("Failed to initialize HAManager", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); } }); } @@ -525,40 +536,44 @@ public class VertxImpl implements VertxInternal, MetricsProvider { closeHooks.run(ar -> { deploymentManager.undeployAll(ar1 -> { - if (haManager() != null) { - haManager().stop(); - } - addressResolver.close(ar2 -> { - eventBus.close(ar3 -> { - closeClusterManager(ar4 -> { - // Copy set to prevent ConcurrentModificationException - Set httpServers = new HashSet<>(sharedHttpServers.values()); - Set netServers = new HashSet<>(sharedNetServers.values()); - sharedHttpServers.clear(); - sharedNetServers.clear(); + this.executeBlocking(fut -> { + if (haManager() != null) { + haManager().stop(); + } + fut.complete(); + }, false, ar2 -> { + addressResolver.close(ar3 -> { + eventBus.close(ar4 -> { + closeClusterManager(ar5 -> { + // Copy set to prevent ConcurrentModificationException + Set httpServers = new HashSet<>(sharedHttpServers.values()); + Set netServers = new HashSet<>(sharedNetServers.values()); + sharedHttpServers.clear(); + sharedNetServers.clear(); - int serverCount = httpServers.size() + netServers.size(); + int serverCount = httpServers.size() + netServers.size(); - AtomicInteger serverCloseCount = new AtomicInteger(); + AtomicInteger serverCloseCount = new AtomicInteger(); - Handler> serverCloseHandler = res -> { - if (res.failed()) { - log.error("Failure in shutting down server", res.cause()); + Handler> serverCloseHandler = res -> { + if (res.failed()) { + log.error("Failure in shutting down server", res.cause()); + } + if (serverCloseCount.incrementAndGet() == serverCount) { + deleteCacheDirAndShutdown(completionHandler); + } + }; + + for (HttpServer server : httpServers) { + server.close(serverCloseHandler); } - if (serverCloseCount.incrementAndGet() == serverCount) { + for (NetServerImpl server : netServers) { + server.close(serverCloseHandler); + } + if (serverCount == 0) { deleteCacheDirAndShutdown(completionHandler); } - }; - - for (HttpServer server : httpServers) { - server.close(serverCloseHandler); - } - for (NetServerImpl server : netServers) { - server.close(serverCloseHandler); - } - if (serverCount == 0) { - deleteCacheDirAndShutdown(completionHandler); - } + }); }); }); }); @@ -668,10 +683,18 @@ public class VertxImpl implements VertxInternal, MetricsProvider { @Override public void undeploy(String deploymentID, Handler> completionHandler) { - if (haManager() != null && haManager().isEnabled()) { - haManager().removeFromHA(deploymentID); - } - deploymentManager.undeployVerticle(deploymentID, completionHandler); + this.executeBlocking(fut -> { + if (haManager() != null && haManager().isEnabled()) { + haManager().removeFromHA(deploymentID); + } + fut.complete(); + }, false, ar -> { + if (ar.succeeded()) { + deploymentManager.undeployVerticle(deploymentID, completionHandler); + } else { + completionHandler.handle(Future.failedFuture(ar.cause())); + } + }); } @Override