Simplify Context / InternalContext implementation and make some cleanup, also properly handle block thread checker when it comes to nested dispatch which might happen sometimes

This commit is contained in:
Julien Viet
2019-03-09 11:24:55 +01:00
parent 8ab6b7b3f7
commit 9ac41801bb
9 changed files with 71 additions and 125 deletions

View File

@@ -15,7 +15,7 @@ import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxThread;
import io.vertx.core.json.JsonObject;
import java.util.List;
@@ -66,7 +66,8 @@ public interface Context {
* @return true if current thread is a worker thread, false otherwise
*/
static boolean isOnWorkerThread() {
return ContextInternal.isOnWorkerThread();
Thread t = Thread.currentThread();
return t instanceof VertxThread && ((VertxThread) t).isWorker();
}
/**
@@ -78,7 +79,8 @@ public interface Context {
* @return true if current thread is a worker thread, false otherwise
*/
static boolean isOnEventLoopThread() {
return ContextInternal.isOnEventLoopThread();
Thread t = Thread.currentThread();
return t instanceof VertxThread && !((VertxThread) t).isWorker();
}
/**
@@ -87,7 +89,7 @@ public interface Context {
* @return true if current thread is a Vert.x thread, false otherwise
*/
static boolean isOnVertxThread() {
return ContextInternal.isOnVertxThread();
return Thread.currentThread() instanceof VertxThread;
}
/**

View File

@@ -15,7 +15,6 @@ import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Starter;
import io.vertx.core.impl.launcher.VertxCommandLauncher;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
@@ -43,15 +42,6 @@ abstract class AbstractContext implements ContextInternal {
return !isEventLoopContext();
}
static boolean isOnVertxThread(boolean worker) {
Thread t = Thread.currentThread();
if (t instanceof VertxThread) {
VertxThread vt = (VertxThread) t;
return vt.isWorker() == worker;
}
return false;
}
// This is called to execute code where the origin is IO (from Netty probably).
// In such a case we should already be on an event loop thread (as Netty manages the event loops)
// but check this anyway, then execute directly

View File

@@ -13,12 +13,12 @@ package io.vertx.core.impl;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
@@ -35,6 +35,27 @@ import java.util.concurrent.RejectedExecutionException;
*/
abstract class ContextImpl extends AbstractContext {
/**
* Execute the {@code task} disabling the thread-local association for the duration
* of the execution. {@link Vertx#currentContext()} will return {@code null},
* @param task the task to execute
* @throws IllegalStateException if the current thread is not a Vertx thread
*/
static void executeIsolated(Handler<Void> task) {
Thread currentThread = Thread.currentThread();
if (currentThread instanceof VertxThread) {
VertxThread vertxThread = (VertxThread) currentThread;
ContextInternal prev = vertxThread.beginDispatch(null);
try {
task.handle(null);
} finally {
vertxThread.endDispatch(prev);
}
} else {
task.handle(null);
}
}
private static EventLoop getEventLoop(VertxInternal vertx) {
EventLoopGroup group = vertx.getEventLoopGroup();
if (group != null) {
@@ -101,8 +122,6 @@ abstract class ContextImpl extends AbstractContext {
public void runCloseHooks(Handler<AsyncResult<Void>> completionHandler) {
closeHooks.run(completionHandler);
// Now remove context references from threads
VertxThreadFactory.unsetContext(this);
}
@Override
@@ -145,25 +164,18 @@ abstract class ContextImpl extends AbstractContext {
Object queueMetric = metrics != null ? metrics.submitted() : null;
try {
Runnable command = () -> {
VertxThread current = (VertxThread) Thread.currentThread();
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
if (!DISABLE_TIMINGS) {
current.executeStart();
}
Future<T> res = Future.future();
try {
ContextInternal.setContext(context);
blockingCodeHandler.handle(res);
} catch (Throwable e) {
res.tryFail(e);
} finally {
if (!DISABLE_TIMINGS) {
current.executeEnd();
context.dispatch(res, f -> {
try {
blockingCodeHandler.handle(res);
} catch (Throwable e) {
res.tryFail(e);
}
}
});
if (metrics != null) {
metrics.end(execMetric, res.succeeded());
}

View File

@@ -30,28 +30,6 @@ import java.util.concurrent.ConcurrentMap;
*/
public interface ContextInternal extends Context {
static ContextInternal setContext(ContextInternal context) {
Thread current = Thread.currentThread();
if (current instanceof VertxThread) {
return ((VertxThread)current).setContext(context);
} else {
throw new IllegalStateException("Attempt to setContext on non Vert.x thread " + Thread.currentThread());
}
}
static boolean isOnWorkerThread() {
return ContextImpl.isOnVertxThread(true);
}
static boolean isOnEventLoopThread() {
return ContextImpl.isOnVertxThread(false);
}
static boolean isOnVertxThread() {
Thread t = Thread.currentThread();
return (t instanceof VertxThread);
}
/**
* Return the Netty EventLoop used by this Context. This can be used to integrate
* a Netty Server with a Vert.x runtime, specially the Context part.

View File

@@ -347,14 +347,10 @@ public class HAManager {
if (System.currentTimeMillis() - start > 10000) {
log.warn("Timed out waiting for group information to appear");
} else if (!stopped) {
ContextInternal context = vertx.getContext();
try {
// Remove any context we have here (from the timer) otherwise will screw things up when verticles are deployed
ContextInternal.setContext(null);
// Remove any context we have here (from the timer) otherwise will screw things up when verticles are deployed
ContextImpl.executeIsolated(v -> {
checkQuorumWhenAdded(nodeID, start);
} finally {
ContextInternal.setContext(context);
}
});
}
fut.complete();
}, null);
@@ -410,13 +406,9 @@ public class HAManager {
private void addToHADeployList(final String verticleName, final DeploymentOptions deploymentOptions,
final Handler<AsyncResult<String>> doneHandler) {
toDeployOnQuorum.add(() -> {
ContextInternal ctx = vertx.getContext();
try {
ContextInternal.setContext(null);
ContextImpl.executeIsolated(v -> {
deployVerticle(verticleName, deploymentOptions, doneHandler);
} finally {
ContextInternal.setContext(ctx);
}
});
});
}
@@ -438,9 +430,7 @@ public class HAManager {
Deployment dep = deploymentManager.getDeployment(deploymentID);
if (dep != null) {
if (dep.deploymentOptions().isHa()) {
ContextInternal ctx = vertx.getContext();
try {
ContextInternal.setContext(null);
ContextImpl.executeIsolated(v -> {
deploymentManager.undeployVerticle(deploymentID, result -> {
if (result.succeeded()) {
log.info("Successfully undeployed HA deployment " + deploymentID + "-" + dep.verticleIdentifier() + " as there is no quorum");
@@ -455,9 +445,7 @@ public class HAManager {
log.error("Failed to undeploy deployment on lost quorum", result.cause());
}
});
} finally {
ContextInternal.setContext(ctx);
}
});
}
}
}
@@ -547,13 +535,8 @@ public class HAManager {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> err = new AtomicReference<>();
// Now deploy this verticle on this node
ContextInternal ctx = vertx.getContext();
if (ctx != null) {
// We could be on main thread in which case we don't want to overwrite tccl
ContextInternal.setContext(null);
}
JsonObject options = failedVerticle.getJsonObject("options");
try {
ContextImpl.executeIsolated(v -> {
JsonObject options = failedVerticle.getJsonObject("options");
doDeployVerticle(verticleName, new DeploymentOptions(options), result -> {
if (result.succeeded()) {
log.info("Successfully redeployed verticle " + verticleName + " after failover");
@@ -567,11 +550,7 @@ public class HAManager {
throw new VertxException(t);
}
});
} finally {
if (ctx != null) {
ContextInternal.setContext(ctx);
}
}
});
try {
if (!latch.await(120, TimeUnit.SECONDS)) {
throw new VertxException("Timed out waiting for redeploy on failover");

View File

@@ -505,7 +505,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
public static Context context() {
Thread current = Thread.currentThread();
if (current instanceof VertxThread) {
return ((VertxThread) current).getContext();
return ((VertxThread) current).context();
}
return null;
}

View File

@@ -48,25 +48,23 @@ public final class VertxThread extends FastThreadLocalThread {
this.maxExecTimeUnit = maxExecTimeUnit;
}
ContextInternal setContext(ContextInternal next) {
ContextInternal prev = context;
context = next;
if (!DISABLE_TCCL) {
setContextClassLoader(next != null ? next.classLoader() : null);
}
return prev;
}
ContextInternal getContext() {
/**
* @return the current context of this thread, this method must be called from the current thread
*/
ContextInternal context() {
return context;
}
public final void executeStart() {
execStart = System.nanoTime();
private void executeStart() {
if (context == null) {
execStart = System.nanoTime();
}
}
public final void executeEnd() {
execStart = 0;
private void executeEnd() {
if (context == null) {
execStart = 0;
}
}
public long startTime() {
@@ -112,16 +110,17 @@ public final class VertxThread extends FastThreadLocalThread {
* This is a low level interface that should not be used, instead {@link ContextInternal#dispatch(Object, io.vertx.core.Handler)}
* shall be used.
*
* @param context the previous context thread to restore, might be {@code null}
* @param prev the previous context thread to restore, might be {@code null}
*/
public void endDispatch(ContextInternal context) {
public void endDispatch(ContextInternal prev) {
// We don't unset the context after execution - this is done later when the context is closed via
// VertxThreadFactory
this.context = context;
context = prev;
if (!DISABLE_TCCL) {
setContextClassLoader(context != null ? context.classLoader() : null);
setContextClassLoader(prev != null ? prev.classLoader() : null);
}
if (!ContextImpl.DISABLE_TIMINGS) {
executeEnd();
}
}}
}
}

View File

@@ -11,10 +11,6 @@
package io.vertx.core.impl;
import io.vertx.core.VertxOptions;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,15 +20,6 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class VertxThreadFactory implements ThreadFactory {
// We store all threads in a weak map - we retain this so we can unset context from threads when
// context is undeployed
private static final Object FOO = new Object();
private static Map<VertxThread, Object> weakMap = new WeakHashMap<>();
private static synchronized void addToMap(VertxThread thread) {
weakMap.put(thread, FOO);
}
private final String prefix;
private final AtomicInteger threadCount = new AtomicInteger(0);
private final BlockedThreadChecker checker;
@@ -48,14 +35,6 @@ public class VertxThreadFactory implements ThreadFactory {
this.maxExecTimeUnit = maxExecTimeUnit;
}
public static synchronized void unsetContext(ContextImpl ctx) {
for (VertxThread thread: weakMap.keySet()) {
if (thread.getContext() == ctx) {
thread.setContext(null);
}
}
}
public Thread newThread(Runnable runnable) {
VertxThread t = new VertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecTime, maxExecTimeUnit);
// Vert.x threads are NOT daemons - we want them to prevent JVM exit so embededd user doesn't
@@ -63,7 +42,6 @@ public class VertxThreadFactory implements ThreadFactory {
if (checker != null) {
checker.registerThread(t);
}
addToMap(t);
// I know the default is false anyway, but just to be explicit- Vert.x threads are NOT daemons
// we want to prevent the JVM from exiting until Vert.x instances are closed
t.setDaemon(false);

View File

@@ -549,10 +549,18 @@ public class ContextTest extends VertxTestBase {
assertSame(ctx, Vertx.currentContext());
assertSame(cl, Thread.currentThread().getContextClassLoader());
int[] called = new int[1];
VertxThread thread = VertxThread.current();
long start = thread.startTime();
ctx.dispatch(v2 -> {
called[0]++;
assertSame(cl, Thread.currentThread().getContextClassLoader());
try {
Thread.sleep(2);
} catch (InterruptedException e) {
fail(e);
}
});
assertEquals(start, thread.startTime());
assertEquals(1, called[0]);
assertSame(ctx, Vertx.currentContext());
assertSame(cl, Thread.currentThread().getContextClassLoader());