Make sure haInfo sync map is not invoked on an event-loop

Most often, code in HAManager is invoked from cluster manager threads.
Some parts though need to be called from a worker thread.

Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
This commit is contained in:
Thomas Segismont
2018-08-02 18:22:00 +02:00
parent 81d530c0fb
commit 749ff3c472
3 changed files with 140 additions and 119 deletions

View File

@@ -116,9 +116,9 @@ public class ClusteredEventBus extends EventBusImpl {
// Get the HA manager, it has been constructed but it's not yet initialized
HAManager haManager = vertx.haManager();
setClusterViewChangedHandler(haManager);
clusterManager.<String, ClusterNodeInfo>getAsyncMultiMap(SUBS_MAP_NAME, ar2 -> {
if (ar2.succeeded()) {
subs = ar2.result();
clusterManager.<String, ClusterNodeInfo>getAsyncMultiMap(SUBS_MAP_NAME, ar1 -> {
if (ar1.succeeded()) {
subs = ar1.result();
server = vertx.createNetServer(getServerOptions());
server.connectHandler(getServerHandler());
@@ -128,25 +128,23 @@ public class ClusteredEventBus extends EventBusImpl {
String serverHost = getClusterPublicHost(options);
serverID = new ServerID(serverPort, serverHost);
nodeInfo = new ClusterNodeInfo(clusterManager.getNodeID(), serverID);
haManager.addDataToAHAInfo(SERVER_ID_HA_KEY, new JsonObject().put("host", serverID.host).put("port", serverID.port));
if (resultHandler != null) {
started = true;
resultHandler.handle(Future.succeededFuture());
}
vertx.executeBlocking(fut -> {
haManager.addDataToAHAInfo(SERVER_ID_HA_KEY, new JsonObject().put("host", serverID.host).put("port", serverID.port));
fut.complete();
}, false, ar2 -> {
if (ar2.succeeded()) {
started = true;
resultHandler.handle(Future.succeededFuture());
} else {
resultHandler.handle(Future.failedFuture(ar2.cause()));
}
});
} else {
if (resultHandler != null) {
resultHandler.handle(Future.failedFuture(asyncResult.cause()));
} else {
log.error(asyncResult.cause());
}
resultHandler.handle(Future.failedFuture(asyncResult.cause()));
}
});
} else {
if (resultHandler != null) {
resultHandler.handle(Future.failedFuture(ar2.cause()));
} else {
log.error(ar2.cause());
}
resultHandler.handle(Future.failedFuture(ar1.cause()));
}
});
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
@@ -22,20 +22,15 @@ import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static java.util.concurrent.TimeUnit.*;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
*
@@ -103,7 +98,6 @@ public class HAManager {
private static final Logger log = LoggerFactory.getLogger(HAManager.class);
private static final String CLUSTER_MAP_NAME = "__vertx.haInfo";
private static final long QUORUM_CHECK_PERIOD = 1000;
private final VertxInternal vertx;
@@ -125,16 +119,16 @@ public class HAManager {
private volatile boolean killed;
private Consumer<Set<String>> clusterViewChangedHandler;
public HAManager(VertxInternal vertx, DeploymentManager deploymentManager,
ClusterManager clusterManager, int quorumSize, String group, boolean enabled) {
public HAManager(VertxInternal vertx, DeploymentManager deploymentManager, ClusterManager clusterManager,
Map<String, String> clusterMap, int quorumSize, String group, boolean enabled) {
this.vertx = vertx;
this.deploymentManager = deploymentManager;
this.clusterManager = clusterManager;
this.clusterMap = clusterMap;
this.quorumSize = enabled ? quorumSize : 0;
this.group = enabled ? group : "__DISABLED__";
this.enabled = enabled;
this.haInfo = new JsonObject().put("verticles", new JsonArray()).put("group", this.group);
this.clusterMap = clusterManager.getSyncMap(CLUSTER_MAP_NAME);
this.nodeID = clusterManager.getNodeID();
}
@@ -204,7 +198,6 @@ public class HAManager {
public void stop() {
if (!stopped) {
if (clusterManager.isActive()) {
clusterMap.remove(nodeID);
}
vertx.cancelTimer(quorumTimerID);
@@ -271,16 +264,23 @@ public class HAManager {
private void doDeployVerticle(final String verticleName, DeploymentOptions deploymentOptions,
final Handler<AsyncResult<String>> doneHandler) {
final Handler<AsyncResult<String>> wrappedHandler = asyncResult -> {
if (asyncResult.succeeded()) {
// Tell the other nodes of the cluster about the verticle for HA purposes
addToHA(asyncResult.result(), verticleName, deploymentOptions);
}
if (doneHandler != null) {
doneHandler.handle(asyncResult);
} else if (asyncResult.failed()) {
log.error("Failed to deploy verticle", asyncResult.cause());
}
final Handler<AsyncResult<String>> wrappedHandler = ar1 -> {
vertx.<String>executeBlocking(fut -> {
if (ar1.succeeded()) {
// Tell the other nodes of the cluster about the verticle for HA purposes
String deploymentID = ar1.result();
addToHA(deploymentID, verticleName, deploymentOptions);
fut.complete(deploymentID);
} else {
fut.fail(ar1.cause());
}
}, false, ar2 -> {
if (doneHandler != null) {
doneHandler.handle(ar2);
} else if (ar2.failed()) {
log.error("Failed to deploy verticle", ar2.cause());
}
});
};
deploymentManager.deployVerticle(verticleName, deploymentOptions, wrappedHandler);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
@@ -16,17 +16,8 @@ import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.*;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.ServiceHelper;
import io.vertx.core.TimeoutStream;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.datagram.impl.DatagramSocketImpl;
@@ -58,9 +49,9 @@ import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.NetClientImpl;
import io.vertx.core.net.impl.NetServerImpl;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.net.impl.transport.Transport;
import io.vertx.core.shareddata.SharedData;
import io.vertx.core.shareddata.impl.SharedDataImpl;
import io.vertx.core.net.impl.transport.Transport;
import io.vertx.core.spi.VerticleFactory;
import io.vertx.core.spi.VertxMetricsFactory;
import io.vertx.core.spi.cluster.ClusterManager;
@@ -73,18 +64,8 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -97,6 +78,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private static final Logger log = LoggerFactory.getLogger(VertxImpl.class);
private static final String CLUSTER_MAP_NAME = "__vertx.haInfo";
private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio";
private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50);
@@ -115,7 +97,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
static void clusteredVertx(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
VertxImpl vertx = new VertxImpl(options);
vertx.initClustered(options, resultHandler);
vertx.joinCluster(options, resultHandler);
}
private final FileSystem fileSystem = getFileSystem();
@@ -204,28 +186,57 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
eventBus.start(ar -> {});
}
private void initClustered(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
private void joinCluster(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
clusterManager.setVertx(this);
clusterManager.join(ar1 -> {
if (ar1.failed()) {
log.error("Failed to join cluster", ar1.cause());
resultHandler.handle(Future.failedFuture(ar1.cause()));
clusterManager.join(ar -> {
if (ar.succeeded()) {
createHaManager(options, resultHandler);
} else {
haManager = new HAManager(this, deploymentManager, clusterManager, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled());
eventBus.start(ar2 -> {
AsyncResult<Vertx> res;
if (ar2.succeeded()) {
// Init the manager (i.e register listener and check the quorum)
// after the event bus has been fully started and updated its state
// it will have also set the clustered changed view handler on the ha manager
haManager.init();
res = Future.succeededFuture(this);
} else {
log.error("Failed to start event bus", ar2.cause());
res = Future.failedFuture(ar2.cause());
}
resultHandler.handle(res);
});
log.error("Failed to join cluster", ar.cause());
resultHandler.handle(Future.failedFuture(ar.cause()));
}
});
}
private void createHaManager(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
this.<Map<String, String>>executeBlocking(fut -> {
fut.complete(clusterManager.getSyncMap(CLUSTER_MAP_NAME));
}, false, ar -> {
if (ar.succeeded()) {
Map<String, String> clusterMap = ar.result();
haManager = new HAManager(this, deploymentManager, clusterManager, clusterMap, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled());
startEventBus(resultHandler);
} else {
log.error("Failed to start HAManager", ar.cause());
resultHandler.handle(Future.failedFuture(ar.cause()));
}
});
}
private void startEventBus(Handler<AsyncResult<Vertx>> resultHandler) {
eventBus.start(ar -> {
if (ar.succeeded()) {
initializeHaManager(resultHandler);
} else {
log.error("Failed to start event bus", ar.cause());
resultHandler.handle(Future.failedFuture(ar.cause()));
}
});
}
private void initializeHaManager(Handler<AsyncResult<Vertx>> resultHandler) {
this.executeBlocking(fut -> {
// Init the manager (i.e register listener and check the quorum)
// after the event bus has been fully started and updated its state
// it will have also set the clustered changed view handler on the ha manager
haManager.init();
fut.complete();
}, false, ar -> {
if (ar.succeeded()) {
resultHandler.handle(Future.succeededFuture(this));
} else {
log.error("Failed to initialize HAManager", ar.cause());
resultHandler.handle(Future.failedFuture(ar.cause()));
}
});
}
@@ -525,40 +536,44 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
closeHooks.run(ar -> {
deploymentManager.undeployAll(ar1 -> {
if (haManager() != null) {
haManager().stop();
}
addressResolver.close(ar2 -> {
eventBus.close(ar3 -> {
closeClusterManager(ar4 -> {
// Copy set to prevent ConcurrentModificationException
Set<HttpServer> httpServers = new HashSet<>(sharedHttpServers.values());
Set<NetServerImpl> netServers = new HashSet<>(sharedNetServers.values());
sharedHttpServers.clear();
sharedNetServers.clear();
this.executeBlocking(fut -> {
if (haManager() != null) {
haManager().stop();
}
fut.complete();
}, false, ar2 -> {
addressResolver.close(ar3 -> {
eventBus.close(ar4 -> {
closeClusterManager(ar5 -> {
// Copy set to prevent ConcurrentModificationException
Set<HttpServer> httpServers = new HashSet<>(sharedHttpServers.values());
Set<NetServerImpl> netServers = new HashSet<>(sharedNetServers.values());
sharedHttpServers.clear();
sharedNetServers.clear();
int serverCount = httpServers.size() + netServers.size();
int serverCount = httpServers.size() + netServers.size();
AtomicInteger serverCloseCount = new AtomicInteger();
AtomicInteger serverCloseCount = new AtomicInteger();
Handler<AsyncResult<Void>> serverCloseHandler = res -> {
if (res.failed()) {
log.error("Failure in shutting down server", res.cause());
Handler<AsyncResult<Void>> serverCloseHandler = res -> {
if (res.failed()) {
log.error("Failure in shutting down server", res.cause());
}
if (serverCloseCount.incrementAndGet() == serverCount) {
deleteCacheDirAndShutdown(completionHandler);
}
};
for (HttpServer server : httpServers) {
server.close(serverCloseHandler);
}
if (serverCloseCount.incrementAndGet() == serverCount) {
for (NetServerImpl server : netServers) {
server.close(serverCloseHandler);
}
if (serverCount == 0) {
deleteCacheDirAndShutdown(completionHandler);
}
};
for (HttpServer server : httpServers) {
server.close(serverCloseHandler);
}
for (NetServerImpl server : netServers) {
server.close(serverCloseHandler);
}
if (serverCount == 0) {
deleteCacheDirAndShutdown(completionHandler);
}
});
});
});
});
@@ -668,10 +683,18 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
@Override
public void undeploy(String deploymentID, Handler<AsyncResult<Void>> completionHandler) {
if (haManager() != null && haManager().isEnabled()) {
haManager().removeFromHA(deploymentID);
}
deploymentManager.undeployVerticle(deploymentID, completionHandler);
this.executeBlocking(fut -> {
if (haManager() != null && haManager().isEnabled()) {
haManager().removeFromHA(deploymentID);
}
fut.complete();
}, false, ar -> {
if (ar.succeeded()) {
deploymentManager.undeployVerticle(deploymentID, completionHandler);
} else {
completionHandler.handle(Future.failedFuture(ar.cause()));
}
});
}
@Override