From 82b94ad9c0636296bd5dc3ced536be7067599abd Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Sat, 26 Jan 2019 10:47:52 +0100 Subject: [PATCH] Tracer SPI --- src/main/asciidoc/dataobjects.adoc | 19 ++ .../io/vertx/core/VertxOptionsConverter.java | 8 + .../core/tracing/TracingOptionsConverter.java | 33 +++ src/main/java/io/vertx/core/Context.java | 27 ++ src/main/java/io/vertx/core/VertxOptions.java | 12 + .../io/vertx/core/impl/AbstractContext.java | 160 ++++++++++ .../java/io/vertx/core/impl/ContextImpl.java | 279 ++++++++++-------- .../io/vertx/core/impl/ContextInternal.java | 39 ++- .../io/vertx/core/impl/EventLoopContext.java | 43 ++- .../java/io/vertx/core/impl/VertxImpl.java | 30 +- .../io/vertx/core/impl/VertxInternal.java | 2 +- .../java/io/vertx/core/impl/VertxThread.java | 2 +- .../io/vertx/core/impl/WorkerContext.java | 43 ++- .../vertx/core/impl/WorkerExecutorImpl.java | 2 +- .../io/vertx/core/spi/VertxTracerFactory.java | 57 ++++ .../vertx/core/spi/tracing/VertxTracer.java | 80 +++++ .../io/vertx/core/tracing/TracingOptions.java | 129 ++++++++ .../HttpServerHandlerBenchmark.java | 2 +- .../io/vertx/core/impl/BenchmarkContext.java | 9 +- src/test/java/io/vertx/core/ContextTest.java | 139 +++++++-- .../io/vertx/core/spi/tracing/TracerTest.java | 36 +++ .../io/vertx/test/core/VertxTestBase.java | 13 +- .../io/vertx/test/faketracer/FakeTracer.java | 164 ++++++++++ .../java/io/vertx/test/faketracer/Scope.java | 35 +++ .../java/io/vertx/test/faketracer/Span.java | 74 +++++ 25 files changed, 1271 insertions(+), 166 deletions(-) create mode 100644 src/main/generated/io/vertx/core/tracing/TracingOptionsConverter.java create mode 100644 src/main/java/io/vertx/core/impl/AbstractContext.java create mode 100644 src/main/java/io/vertx/core/spi/VertxTracerFactory.java create mode 100644 src/main/java/io/vertx/core/spi/tracing/VertxTracer.java create mode 100644 src/main/java/io/vertx/core/tracing/TracingOptions.java create mode 100644 src/test/java/io/vertx/core/spi/tracing/TracerTest.java create mode 100644 src/test/java/io/vertx/test/faketracer/FakeTracer.java create mode 100644 src/test/java/io/vertx/test/faketracer/Scope.java create mode 100644 src/test/java/io/vertx/test/faketracer/Span.java diff --git a/src/main/asciidoc/dataobjects.adoc b/src/main/asciidoc/dataobjects.adoc index 09f0d44bb..0c3752df7 100644 --- a/src/main/asciidoc/dataobjects.adoc +++ b/src/main/asciidoc/dataobjects.adoc @@ -1931,6 +1931,24 @@ Set the ALPN usage. +++ |=== +[[TracingOptions]] +== TracingOptions + +++++ + Vert.x tracing base configuration, this class can be extended by provider implementations to configure + those specific implementations. +++++ +''' + +[cols=">25%,25%,50%"] +[frame="topbot"] +|=== +^|Name | Type ^| Description +|[[enabled]]`@enabled`|`Boolean`|+++ +Set whether tracing will be enabled on the Vert.x instance. ++++ +|=== + [[VertxOptions]] == VertxOptions @@ -2022,6 +2040,7 @@ Set wether to prefer the native transport to the JDK transport. |[[quorumSize]]`@quorumSize`|`Number (int)`|+++ Set the quorum size to be used when HA is enabled. +++ +|[[tracingOptions]]`@tracingOptions`|`link:dataobjects.html#TracingOptions[TracingOptions]`|- |[[warningExceptionTime]]`@warningExceptionTime`|`Number (long)`|+++ Set the threshold value above this, the blocked warning contains a stack trace. in link. The default value of link is diff --git a/src/main/generated/io/vertx/core/VertxOptionsConverter.java b/src/main/generated/io/vertx/core/VertxOptionsConverter.java index 5c6dbfa66..0a8a122ee 100644 --- a/src/main/generated/io/vertx/core/VertxOptionsConverter.java +++ b/src/main/generated/io/vertx/core/VertxOptionsConverter.java @@ -129,6 +129,11 @@ import java.time.format.DateTimeFormatter; obj.setQuorumSize(((Number)member.getValue()).intValue()); } break; + case "tracingOptions": + if (member.getValue() instanceof JsonObject) { + obj.setTracingOptions(new io.vertx.core.tracing.TracingOptions((JsonObject)member.getValue())); + } + break; case "warningExceptionTime": if (member.getValue() instanceof Number) { obj.setWarningExceptionTime(((Number)member.getValue()).longValue()); @@ -196,6 +201,9 @@ import java.time.format.DateTimeFormatter; } json.put("preferNativeTransport", obj.getPreferNativeTransport()); json.put("quorumSize", obj.getQuorumSize()); + if (obj.getTracingOptions() != null) { + json.put("tracingOptions", obj.getTracingOptions().toJson()); + } json.put("warningExceptionTime", obj.getWarningExceptionTime()); if (obj.getWarningExceptionTimeUnit() != null) { json.put("warningExceptionTimeUnit", obj.getWarningExceptionTimeUnit().name()); diff --git a/src/main/generated/io/vertx/core/tracing/TracingOptionsConverter.java b/src/main/generated/io/vertx/core/tracing/TracingOptionsConverter.java new file mode 100644 index 000000000..6fea52604 --- /dev/null +++ b/src/main/generated/io/vertx/core/tracing/TracingOptionsConverter.java @@ -0,0 +1,33 @@ +package io.vertx.core.tracing; + +import io.vertx.core.json.JsonObject; +import io.vertx.core.json.JsonArray; +import java.time.Instant; +import java.time.format.DateTimeFormatter; + +/** + * Converter for {@link io.vertx.core.tracing.TracingOptions}. + * NOTE: This class has been automatically generated from the {@link io.vertx.core.tracing.TracingOptions} original class using Vert.x codegen. + */ + class TracingOptionsConverter { + + static void fromJson(Iterable> json, TracingOptions obj) { + for (java.util.Map.Entry member : json) { + switch (member.getKey()) { + case "enabled": + if (member.getValue() instanceof Boolean) { + obj.setEnabled((Boolean)member.getValue()); + } + break; + } + } + } + + static void toJson(TracingOptions obj, JsonObject json) { + toJson(obj, json.getMap()); + } + + static void toJson(TracingOptions obj, java.util.Map json) { + json.put("enabled", obj.isEnabled()); + } +} diff --git a/src/main/java/io/vertx/core/Context.java b/src/main/java/io/vertx/core/Context.java index 53e8c9c6f..2ab7780e6 100644 --- a/src/main/java/io/vertx/core/Context.java +++ b/src/main/java/io/vertx/core/Context.java @@ -204,6 +204,33 @@ public interface Context { */ boolean remove(String key); + /** + * Get some local data from the context. + * + * @param key the key of the data + * @param the type of the data + * @return the data + */ + T getLocal(String key); + + /** + * Put some local data in the context. + *

+ * This can be used to share data between different handlers that share a context + * + * @param key the key of the data + * @param value the data + */ + void putLocal(String key, Object value); + + /** + * Remove some local data from the context. + * + * @param key the key to remove + * @return true if removed successfully, false otherwise + */ + boolean removeLocal(String key); + /** * @return The Vertx instance that created the context */ diff --git a/src/main/java/io/vertx/core/VertxOptions.java b/src/main/java/io/vertx/core/VertxOptions.java index adb995101..479ab162c 100644 --- a/src/main/java/io/vertx/core/VertxOptions.java +++ b/src/main/java/io/vertx/core/VertxOptions.java @@ -19,6 +19,7 @@ import io.vertx.core.impl.cpu.CpuCoreSensor; import io.vertx.core.json.JsonObject; import io.vertx.core.metrics.MetricsOptions; import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.core.tracing.TracingOptions; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -156,6 +157,7 @@ public class VertxOptions { private int quorumSize = DEFAULT_QUORUM_SIZE; private String haGroup = DEFAULT_HA_GROUP; private MetricsOptions metricsOptions = new MetricsOptions(); + private TracingOptions tracingOptions = new TracingOptions(); private FileSystemOptions fileSystemOptions = new FileSystemOptions(); private long warningExceptionTime = DEFAULT_WARNING_EXCEPTION_TIME; private EventBusOptions eventBusOptions = new EventBusOptions(); @@ -197,6 +199,7 @@ public class VertxOptions { this.maxWorkerExecuteTimeUnit = other.maxWorkerExecuteTimeUnit; this.warningExceptionTimeUnit = other.warningExceptionTimeUnit; this.blockedThreadCheckIntervalUnit = other.blockedThreadCheckIntervalUnit; + this.tracingOptions = other.tracingOptions != null ? other.tracingOptions.copy() : null; } /** @@ -805,6 +808,15 @@ public class VertxOptions { return this; } + public TracingOptions getTracingOptions() { + return tracingOptions; + } + + public VertxOptions setTracingOptions(TracingOptions tracingOptions) { + this.tracingOptions = tracingOptions; + return this; + } + public JsonObject toJson() { JsonObject json = new JsonObject(); VertxOptionsConverter.toJson(this, json); diff --git a/src/main/java/io/vertx/core/impl/AbstractContext.java b/src/main/java/io/vertx/core/impl/AbstractContext.java new file mode 100644 index 000000000..4e735d953 --- /dev/null +++ b/src/main/java/io/vertx/core/impl/AbstractContext.java @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.impl; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Starter; +import io.vertx.core.impl.launcher.VertxCommandLauncher; +import io.vertx.core.spi.tracing.VertxTracer; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; + +/** + * A context implementation that does not hold any specific state. + * + * @author Julien Viet + * @author Tim Fox + */ +abstract class AbstractContext implements ContextInternal { + + private static final String THREAD_CHECKS_PROP_NAME = "vertx.threadChecks"; + private static final boolean THREAD_CHECKS = Boolean.getBoolean(THREAD_CHECKS_PROP_NAME); + + abstract void executeAsync(Handler task); + + abstract void execute(T value, Handler task); + + @Override + public abstract boolean isEventLoopContext(); + + @Override + public boolean isWorkerContext() { + return !isEventLoopContext(); + } + + static boolean isOnVertxThread(boolean worker) { + Thread t = Thread.currentThread(); + if (t instanceof VertxThread) { + VertxThread vt = (VertxThread) t; + return vt.isWorker() == worker; + } + return false; + } + + // This is called to execute code where the origin is IO (from Netty probably). + // In such a case we should already be on an event loop thread (as Netty manages the event loops) + // but check this anyway, then execute directly + @Override + public final void executeFromIO(Handler task) { + executeFromIO(null, task); + } + + @Override + public final void schedule(Handler task) { + schedule(null, task); + } + + @Override + public final void dispatch(Handler task) { + dispatch(null, task); + } + + @Override + public final void dispatch(T arg, Handler task) { + VertxThread currentThread = ContextInternal.beginDispatch(this); + try { + task.handle(arg); + } catch (Throwable t) { + reportException(t); + } finally { + ContextInternal.endDispatch(currentThread); + } + } + + @Override + public final void executeFromIO(T value, Handler task) { + if (THREAD_CHECKS) { + checkEventLoopThread(); + } + execute(value, task); + } + + private void checkEventLoopThread() { + Thread current = Thread.currentThread(); + if (!(current instanceof VertxThread)) { + throw new IllegalStateException("Expected to be on Vert.x thread, but actually on: " + current); + } else if (((VertxThread) current).isWorker()) { + throw new IllegalStateException("Event delivered on unexpected worker thread " + current); + } + } + + // Run the task asynchronously on this same context + @Override + public final void runOnContext(Handler task) { + try { + executeAsync(task); + } catch (RejectedExecutionException ignore) { + // Pool is already shut down + } + } + + @Override + public final List processArgs() { + // As we are maintaining the launcher and starter class, choose the right one. + List processArgument = VertxCommandLauncher.getProcessArguments(); + return processArgument != null ? processArgument : Starter.PROCESS_ARGS; + } + + @Override + public final void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler) { + executeBlocking(blockingCodeHandler, true, resultHandler); + } + + @Override + public final ContextInternal duplicate() { + return duplicate(null); + } + + @SuppressWarnings("unchecked") + @Override + public final T get(String key) { + return (T) contextData().get(key); + } + + @Override + public final void put(String key, Object value) { + contextData().put(key, value); + } + + @Override + public final boolean remove(String key) { + return contextData().remove(key) != null; + } + + @SuppressWarnings("unchecked") + @Override + public final T getLocal(String key) { + return (T) localContextData().get(key); + } + + @Override + public final void putLocal(String key, Object value) { + localContextData().put(key, value); + } + + @Override + public final boolean removeLocal(String key) { + return localContextData().remove(key) != null; + } +} diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index 079297ef0..1c8cdb231 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -13,19 +13,18 @@ package io.vertx.core.impl; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.vertx.codegen.annotations.Nullable; import io.vertx.core.AsyncResult; import io.vertx.core.Closeable; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.Starter; -import io.vertx.core.impl.launcher.VertxCommandLauncher; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.core.spi.metrics.PoolMetrics; +import io.vertx.core.spi.tracing.VertxTracer; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -34,7 +33,7 @@ import java.util.concurrent.RejectedExecutionException; /** * @author Tim Fox */ -abstract class ContextImpl implements ContextInternal { +abstract class ContextImpl extends AbstractContext { private static EventLoop getEventLoop(VertxInternal vertx) { EventLoopGroup group = vertx.getEventLoopGroup(); @@ -47,34 +46,35 @@ abstract class ContextImpl implements ContextInternal { private static final Logger log = LoggerFactory.getLogger(ContextImpl.class); - private static final String THREAD_CHECKS_PROP_NAME = "vertx.threadChecks"; private static final String DISABLE_TIMINGS_PROP_NAME = "vertx.disableContextTimings"; - private static final boolean THREAD_CHECKS = Boolean.getBoolean(THREAD_CHECKS_PROP_NAME); static final boolean DISABLE_TIMINGS = Boolean.getBoolean(DISABLE_TIMINGS_PROP_NAME); protected final VertxInternal owner; + protected final VertxTracer tracer; protected final JsonObject config; private final Deployment deployment; private final CloseHooks closeHooks; private final ClassLoader tccl; private final EventLoop eventLoop; - private ConcurrentMap contextData; + private ConcurrentMap data; + private ConcurrentMap localData; private volatile Handler exceptionHandler; - private final WorkerPool internalBlockingPool; - private final TaskQueue internalOrderedTasks; + final TaskQueue internalOrderedTasks; + final WorkerPool internalBlockingPool; final WorkerPool workerPool; final TaskQueue orderedTasks; - protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, + protected ContextImpl(VertxInternal vertx, VertxTracer tracer, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) { - this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deployment, tccl); + this(vertx, tracer, getEventLoop(vertx), internalBlockingPool, workerPool, deployment, tccl); } - protected ContextImpl(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, + protected ContextImpl(VertxInternal vertx, VertxTracer tracer, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) { if (VertxThread.DISABLE_TCCL && tccl != ClassLoader.getSystemClassLoader()) { log.warn("You have disabled TCCL checks but you have a custom TCCL to set."); } + this.tracer = tracer; this.deployment = deployment; this.config = deployment != null ? deployment.config() : new JsonObject(); this.eventLoop = eventLoop; @@ -105,100 +105,6 @@ abstract class ContextImpl implements ContextInternal { VertxThreadFactory.unsetContext(this); } - abstract void executeAsync(Handler task); - - abstract void execute(T value, Handler task); - - @Override - public abstract boolean isEventLoopContext(); - - @Override - @SuppressWarnings("unchecked") - public T get(String key) { - return (T) contextData().get(key); - } - - @Override - public void put(String key, Object value) { - contextData().put(key, value); - } - - @Override - public boolean remove(String key) { - return contextData().remove(key) != null; - } - - @Override - public boolean isWorkerContext() { - return !isEventLoopContext(); - } - - static boolean isOnVertxThread(boolean worker) { - Thread t = Thread.currentThread(); - if (t instanceof VertxThread) { - VertxThread vt = (VertxThread) t; - return vt.isWorker() == worker; - } - return false; - } - - // This is called to execute code where the origin is IO (from Netty probably). - // In such a case we should already be on an event loop thread (as Netty manages the event loops) - // but check this anyway, then execute directly - @Override - public final void executeFromIO(Handler task) { - executeFromIO(null, task); - } - - @Override - public final void schedule(Handler task) { - schedule(null, task); - } - - @Override - public final void dispatch(Handler task) { - dispatch(null, task); - } - - @Override - public final void dispatch(T arg, Handler task) { - VertxThread currentThread = ContextInternal.beginDispatch(this); - try { - task.handle(arg); - } catch (Throwable t) { - reportException(t); - } finally { - ContextInternal.endDispatch(currentThread); - } - } - - @Override - public final void executeFromIO(T value, Handler task) { - if (THREAD_CHECKS) { - checkEventLoopThread(); - } - execute(value, task); - } - - private void checkEventLoopThread() { - Thread current = Thread.currentThread(); - if (!(current instanceof VertxThread)) { - throw new IllegalStateException("Expected to be on Vert.x thread, but actually on: " + current); - } else if (((VertxThread) current).isWorker()) { - throw new IllegalStateException("Event delivered on unexpected worker thread " + current); - } - } - - // Run the task asynchronously on this same context - @Override - public void runOnContext(Handler task) { - try { - executeAsync(task); - } catch (RejectedExecutionException ignore) { - // Pool is already shut down - } - } - @Override public String deploymentID() { return deployment != null ? deployment.deploymentID() : null; @@ -209,13 +115,6 @@ abstract class ContextImpl implements ContextInternal { return config; } - @Override - public List processArgs() { - // As we are maintaining the launcher and starter class, choose the right one. - List processArgument = VertxCommandLauncher.getProcessArguments(); - return processArgument != null ? processArgument : Starter.PROCESS_ARGS; - } - public EventLoop nettyEventLoop() { return eventLoop; } @@ -226,25 +125,20 @@ abstract class ContextImpl implements ContextInternal { @Override public void executeBlockingInternal(Handler> action, Handler> resultHandler) { - executeBlocking(action, resultHandler, internalBlockingPool, internalOrderedTasks); + executeBlocking(this, action, resultHandler, internalBlockingPool, internalOrderedTasks); } @Override public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler) { - executeBlocking(blockingCodeHandler, resultHandler, workerPool, ordered ? orderedTasks : null); - } - - @Override - public void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler) { - executeBlocking(blockingCodeHandler, true, resultHandler); + executeBlocking(this, blockingCodeHandler, resultHandler, workerPool, ordered ? orderedTasks : null); } @Override public void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler) { - executeBlocking(blockingCodeHandler, resultHandler, workerPool, queue); + executeBlocking(this, blockingCodeHandler, resultHandler, workerPool, queue); } - void executeBlocking(Handler> blockingCodeHandler, + static void executeBlocking(ContextInternal context, Handler> blockingCodeHandler, Handler> resultHandler, WorkerPool workerPool, TaskQueue queue) { PoolMetrics metrics = workerPool.metrics(); @@ -261,7 +155,7 @@ abstract class ContextImpl implements ContextInternal { } Future res = Future.future(); try { - ContextInternal.setContext(this); + ContextInternal.setContext(context); blockingCodeHandler.handle(res); } catch (Throwable e) { res.tryFail(e); @@ -274,7 +168,7 @@ abstract class ContextImpl implements ContextInternal { metrics.end(execMetric, res.succeeded()); } if (resultHandler != null) { - res.setHandler(ar -> runOnContext(v -> resultHandler.handle(ar))); + res.setHandler(ar -> context.runOnContext(v -> resultHandler.handle(ar))); } }; Executor exec = workerPool.executor(); @@ -292,6 +186,11 @@ abstract class ContextImpl implements ContextInternal { } } + @Override + public VertxTracer tracer() { + return tracer; + } + @Override public ClassLoader classLoader() { return tccl; @@ -299,14 +198,22 @@ abstract class ContextImpl implements ContextInternal { @Override public synchronized ConcurrentMap contextData() { - if (contextData == null) { - contextData = new ConcurrentHashMap<>(); + if (data == null) { + data = new ConcurrentHashMap<>(); } - return contextData; + return data; + } + + @Override + public synchronized ConcurrentMap localContextData() { + if (localData == null) { + localData = new ConcurrentHashMap<>(); + } + return localData; } public void reportException(Throwable t) { - Handler handler = this.exceptionHandler; + Handler handler = exceptionHandler; if (handler == null) { handler = owner.exceptionHandler(); } @@ -340,4 +247,120 @@ abstract class ContextImpl implements ContextInternal { } return deployment.deploymentOptions().getInstances(); } + + static abstract class Duplicated extends AbstractContext { + + protected final C delegate; + private final ContextInternal other; + private ConcurrentMap localData; + + public Duplicated(C delegate, ContextInternal other) { + this.delegate = delegate; + this.other = other; + } + + @Override + public VertxTracer tracer() { + return delegate.tracer(); + } + + public final void executeBlockingInternal(Handler> action, Handler> resultHandler) { + ContextImpl.executeBlocking(this, action, resultHandler, delegate.internalBlockingPool, delegate.internalOrderedTasks); + } + + @Override + public final void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler) { + ContextImpl.executeBlocking(this, blockingCodeHandler, resultHandler, delegate.workerPool, ordered ? delegate.orderedTasks : null); + } + + @Override + public final void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler) { + ContextImpl.executeBlocking(this, blockingCodeHandler, resultHandler, delegate.workerPool, queue); + } + + @Override + public final void schedule(T value, Handler task) { + delegate.schedule(value, task); + } + + @Override + public final String deploymentID() { + return delegate.deploymentID(); + } + + @Override + public final JsonObject config() { + return delegate.config(); + } + + @Override + public final int getInstanceCount() { + return delegate.getInstanceCount(); + } + + @Override + public final Context exceptionHandler(Handler handler) { + delegate.exceptionHandler(handler); + return this; + } + + @Override + public final Handler exceptionHandler() { + return delegate.exceptionHandler(); + } + + @Override + public final void addCloseHook(Closeable hook) { + delegate.addCloseHook(hook); + } + + @Override + public final void removeCloseHook(Closeable hook) { + delegate.removeCloseHook(hook); + } + + @Override + public final EventLoop nettyEventLoop() { + return delegate.nettyEventLoop(); + } + + @Override + public final Deployment getDeployment() { + return delegate.getDeployment(); + } + + @Override + public final VertxInternal owner() { + return delegate.owner(); + } + + @Override + public final ClassLoader classLoader() { + return delegate.classLoader(); + } + + @Override + public final void reportException(Throwable t) { + delegate.reportException(t); + } + + @Override + public final ConcurrentMap contextData() { + return delegate.contextData(); + } + + @Override + public final ConcurrentMap localContextData() { + if (other == null) { + synchronized (this) { + if (localData == null) { + localData = new ConcurrentHashMap<>(); + } + return localData; + } + } else { + return other.localContextData(); + } + } + } } diff --git a/src/main/java/io/vertx/core/impl/ContextInternal.java b/src/main/java/io/vertx/core/impl/ContextInternal.java index 0ce74b119..20f7827d9 100644 --- a/src/main/java/io/vertx/core/impl/ContextInternal.java +++ b/src/main/java/io/vertx/core/impl/ContextInternal.java @@ -17,6 +17,7 @@ import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.core.spi.tracing.VertxTracer; import java.util.concurrent.ConcurrentMap; @@ -69,10 +70,10 @@ public interface ContextInternal extends Context { } } - static void setContext(ContextInternal context) { + static ContextInternal setContext(ContextInternal context) { Thread current = Thread.currentThread(); if (current instanceof VertxThread) { - ((VertxThread)current).setContext(context); + return ((VertxThread)current).setContext(context); } else { throw new IllegalStateException("Attempt to setContext on non Vert.x thread " + Thread.currentThread()); } @@ -185,9 +186,43 @@ public interface ContextInternal extends Context { */ ConcurrentMap contextData(); + /** + * @return the {@link ConcurrentMap} used to store local context data + */ + ConcurrentMap localContextData(); + /** * @return the classloader associated with this context */ ClassLoader classLoader(); + /** + * @return the tracer for this context + */ + VertxTracer tracer(); + + /** + * Returns a context which shares the whole behavior of this context but not the {@link #localContextData()} which + * remains private to the context: + *

    + *
  • same concurrency
  • + *
  • same exception handler
  • + *
  • same context context
  • + *
  • same deployment
  • + *
  • same config
  • + *
  • same classloader
  • + *
+ *

+ * The duplicated context will have its own private local context data. + * + * @return a context whose behavior will is equivalent to this context but with new private + */ + ContextInternal duplicate(); + + /** + * Like {@link #duplicate()} but the duplicated context local data will adopt the local data of the specified + * {@code context} argument. + */ + ContextInternal duplicate(ContextInternal context); + } diff --git a/src/main/java/io/vertx/core/impl/EventLoopContext.java b/src/main/java/io/vertx/core/impl/EventLoopContext.java index 58629015b..ce839d96e 100644 --- a/src/main/java/io/vertx/core/impl/EventLoopContext.java +++ b/src/main/java/io/vertx/core/impl/EventLoopContext.java @@ -15,6 +15,9 @@ import io.netty.channel.EventLoop; import io.vertx.core.Handler; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; +import io.vertx.core.spi.tracing.VertxTracer; + +import java.util.concurrent.ConcurrentMap; /** * @author Tim Fox @@ -23,14 +26,14 @@ public class EventLoopContext extends ContextImpl { private static final Logger log = LoggerFactory.getLogger(EventLoopContext.class); - EventLoopContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, - ClassLoader tccl) { - super(vertx, internalBlockingPool, workerPool, deployment, tccl); + EventLoopContext(VertxInternal vertx, VertxTracer tracer, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, + ClassLoader tccl) { + super(vertx, tracer, internalBlockingPool, workerPool, deployment, tccl); } - public EventLoopContext(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, + public EventLoopContext(VertxInternal vertx, VertxTracer tracer, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) { - super(vertx, eventLoop, internalBlockingPool, workerPool, deployment, tccl); + super(vertx, tracer, eventLoop, internalBlockingPool, workerPool, deployment, tccl); } void executeAsync(Handler task) { @@ -52,4 +55,34 @@ public class EventLoopContext extends ContextImpl { return true; } + @Override + public ContextInternal duplicate(ContextInternal in) { + return new Duplicated(this, in); + } + + static class Duplicated extends ContextImpl.Duplicated { + + Duplicated(EventLoopContext delegate, ContextInternal other) { + super(delegate, other); + } + + void executeAsync(Handler task) { + nettyEventLoop().execute(() -> dispatch(null, task)); + } + + @Override + void execute(T value, Handler task) { + dispatch(value, task); + } + + @Override + public boolean isEventLoopContext() { + return true; + } + + @Override + public ContextInternal duplicate(ContextInternal context) { + return new Duplicated(delegate, context); + } + } } diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java index e396888f0..f73f103c3 100644 --- a/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -39,7 +39,6 @@ import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.impl.HttpClientImpl; import io.vertx.core.http.impl.HttpServerImpl; import io.vertx.core.impl.resolver.DnsResolverProvider; -import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.core.net.NetClient; @@ -52,6 +51,7 @@ import io.vertx.core.net.impl.ServerID; import io.vertx.core.net.impl.transport.Transport; import io.vertx.core.shareddata.SharedData; import io.vertx.core.shareddata.impl.SharedDataImpl; +import io.vertx.core.spi.VertxTracerFactory; import io.vertx.core.spi.VerticleFactory; import io.vertx.core.spi.VertxMetricsFactory; import io.vertx.core.spi.cluster.ClusterManager; @@ -59,6 +59,7 @@ import io.vertx.core.spi.metrics.Metrics; import io.vertx.core.spi.metrics.MetricsProvider; import io.vertx.core.spi.metrics.PoolMetrics; import io.vertx.core.spi.metrics.VertxMetrics; +import io.vertx.core.spi.tracing.VertxTracer; import java.io.File; import java.io.IOException; @@ -134,6 +135,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider { private final TimeUnit defaultWorkerMaxExecTimeUnit; private final CloseHooks closeHooks; private final Transport transport; + final VertxTracer tracer; private VertxImpl(VertxOptions options, Transport transport) { // Sanity check @@ -169,6 +171,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider { this.addressResolverOptions = options.getAddressResolverOptions(); this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions()); this.deploymentManager = new DeploymentManager(this); + this.tracer = initializeTracer(options); if (options.isClustered()) { this.clusterManager = getClusterManager(options); this.eventBus = new ClusteredEventBus(this, options, clusterManager); @@ -394,7 +397,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider { @Override public EventLoopContext createEventLoopContext(Deployment deployment, WorkerPool workerPool, ClassLoader tccl) { - return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deployment, tccl); + return new EventLoopContext(this, tracer, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deployment, tccl); } @Override @@ -402,7 +405,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider { if (workerPool == null) { workerPool = this.workerPool; } - return new WorkerContext(this, internalBlockingPool, workerPool, deployment, tccl); + return new WorkerContext(this, tracer, internalBlockingPool, workerPool, deployment, tccl); } @Override @@ -448,6 +451,24 @@ public class VertxImpl implements VertxInternal, MetricsProvider { return null; } + private VertxTracer initializeTracer(VertxOptions options) { + if (options.getTracingOptions() != null && options.getTracingOptions().isEnabled()) { + VertxTracerFactory factory = options.getTracingOptions().getFactory(); + if (factory == null) { + factory = ServiceHelper.loadFactoryOrNull(VertxTracerFactory.class); + if (factory == null) { + log.warn("Metrics has been set to enabled but no TracerFactory found on classpath"); + } + } + if (factory != null) { + VertxTracer tracer = factory.tracer(options.getTracingOptions()); + Objects.requireNonNull(tracer, "The tracer instance created from " + factory + " cannot be null"); + return tracer; + } + } + return null; + } + private ClusterManager getClusterManager(VertxOptions options) { ClusterManager mgr = options.getClusterManager(); if (mgr == null) { @@ -839,6 +860,9 @@ public class VertxImpl implements VertxInternal, MetricsProvider { if (metrics != null) { metrics.close(); } + if (tracer != null) { + tracer.close(); + } checker.close(); diff --git a/src/main/java/io/vertx/core/impl/VertxInternal.java b/src/main/java/io/vertx/core/impl/VertxInternal.java index 5ed3a2145..d59e4333e 100644 --- a/src/main/java/io/vertx/core/impl/VertxInternal.java +++ b/src/main/java/io/vertx/core/impl/VertxInternal.java @@ -16,7 +16,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.resolver.AddressResolverGroup; import io.vertx.core.*; import io.vertx.core.http.impl.HttpServerImpl; -import io.vertx.core.json.JsonObject; import io.vertx.core.net.impl.NetServerImpl; import io.vertx.core.net.impl.ServerID; import io.vertx.core.net.impl.transport.Transport; @@ -128,4 +127,5 @@ public interface VertxInternal extends Vertx { void addCloseHook(Closeable hook); void removeCloseHook(Closeable hook); + } diff --git a/src/main/java/io/vertx/core/impl/VertxThread.java b/src/main/java/io/vertx/core/impl/VertxThread.java index ea3e65407..60c81e7a6 100644 --- a/src/main/java/io/vertx/core/impl/VertxThread.java +++ b/src/main/java/io/vertx/core/impl/VertxThread.java @@ -29,7 +29,7 @@ public final class VertxThread extends FastThreadLocalThread { private long execStart; private ContextInternal context; - VertxThread(Runnable target, String name, boolean worker, long maxExecTime, TimeUnit maxExecTimeUnit) { + public VertxThread(Runnable target, String name, boolean worker, long maxExecTime, TimeUnit maxExecTimeUnit) { super(target, name); this.worker = worker; this.maxExecTime = maxExecTime; diff --git a/src/main/java/io/vertx/core/impl/WorkerContext.java b/src/main/java/io/vertx/core/impl/WorkerContext.java index 986ebebb8..f932390dd 100644 --- a/src/main/java/io/vertx/core/impl/WorkerContext.java +++ b/src/main/java/io/vertx/core/impl/WorkerContext.java @@ -13,15 +13,16 @@ package io.vertx.core.impl; import io.vertx.core.Handler; import io.vertx.core.spi.metrics.PoolMetrics; +import io.vertx.core.spi.tracing.VertxTracer; /** * @author Tim Fox */ class WorkerContext extends ContextImpl { - WorkerContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, - ClassLoader tccl) { - super(vertx, internalBlockingPool, workerPool, deployment, tccl); + WorkerContext(VertxInternal vertx, VertxTracer tracer, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, + ClassLoader tccl) { + super(vertx, tracer, internalBlockingPool, workerPool, deployment, tccl); } @Override @@ -38,6 +39,10 @@ class WorkerContext extends ContextImpl { // so we need to execute it on the worker thread @Override void execute(T value, Handler task) { + execute(this, value ,task); + } + + private void execute(ContextInternal ctx, T value, Handler task) { PoolMetrics metrics = workerPool.metrics(); Object metric = metrics != null ? metrics.submitted() : null; orderedTasks.execute(() -> { @@ -45,7 +50,7 @@ class WorkerContext extends ContextImpl { metrics.begin(metric); } try { - dispatch(value, task); + ctx.dispatch(value, task); } finally { if (metrics != null) { metrics.end(metric, true); @@ -71,4 +76,34 @@ class WorkerContext extends ContextImpl { } }, workerPool.executor()); } + + public ContextInternal duplicate(ContextInternal in) { + return new Duplicated(this, in); + } + + static class Duplicated extends ContextImpl.Duplicated { + + Duplicated(WorkerContext delegate, ContextInternal other) { + super(delegate, other); + } + + void executeAsync(Handler task) { + execute(null, task); + } + + @Override + void execute(T value, Handler task) { + delegate.execute(this, value, task); + } + + @Override + public boolean isEventLoopContext() { + return false; + } + + @Override + public ContextInternal duplicate(ContextInternal context) { + return new Duplicated(delegate, context); + } + } } diff --git a/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java b/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java index 2d2f42cf1..c18fdb18a 100644 --- a/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java +++ b/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java @@ -56,7 +56,7 @@ class WorkerExecutorImpl implements MetricsProvider, WorkerExecutorInternal { throw new IllegalStateException("Worker executor closed"); } ContextImpl context = (ContextImpl) ctx.owner().getOrCreateContext(); - context.executeBlocking(blockingCodeHandler, asyncResultHandler, pool, ordered ? context.orderedTasks : null); + ContextImpl.executeBlocking(context, blockingCodeHandler, asyncResultHandler, pool, ordered ? context.orderedTasks : null); } @Override diff --git a/src/main/java/io/vertx/core/spi/VertxTracerFactory.java b/src/main/java/io/vertx/core/spi/VertxTracerFactory.java new file mode 100644 index 000000000..13cf78f26 --- /dev/null +++ b/src/main/java/io/vertx/core/spi/VertxTracerFactory.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.spi; + +import io.vertx.core.json.JsonObject; +import io.vertx.core.spi.tracing.VertxTracer; +import io.vertx.core.tracing.TracingOptions; + +/** + * A factory for the plug-able tracing SPI. + * + * @author Julien Viet + */ +public interface VertxTracerFactory { + + /** + * Create a new {@link VertxTracer} object.

+ * + * No specific thread and context can be expected when this method is called. + * + * @param options the metrics configuration option + * @return the tracing implementation + */ + VertxTracer tracer(TracingOptions options); + + /** + * Create an empty tracing options. + * Providers can override this method to provide a custom tracing options subclass that exposes custom configuration. + * It is used by the {@link io.vertx.core.Launcher} class when creating new options when building a CLI Vert.x. + * + * @return new tracing options + */ + default TracingOptions newOptions() { + return new TracingOptions(); + } + + /** + * Create tracing options from the provided {@code jsonObject}. + * Providers can override this method to provide a custom tracing options subclass that exposes custom configuration. + * It is used by the {@link io.vertx.core.Launcher} class when creating new options when building a CLI Vert.x. + * + * @param jsonObject json provided by the user + * @return new tracing options + */ + default TracingOptions newOptions(JsonObject jsonObject) { + return new TracingOptions(jsonObject); + } +} diff --git a/src/main/java/io/vertx/core/spi/tracing/VertxTracer.java b/src/main/java/io/vertx/core/spi/tracing/VertxTracer.java new file mode 100644 index 000000000..962cf4250 --- /dev/null +++ b/src/main/java/io/vertx/core/spi/tracing/VertxTracer.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.spi.tracing; + +import io.vertx.core.Context; + +import java.util.Map; +import java.util.function.BiConsumer; + +/** + * @author Julien Viet + */ +public interface VertxTracer { + + /** + * Signal a request has been received and will be processed. + * + * @param context the context data attached to the request + * @param request the request object + * @param operation the request operation + * @param headers a read-only view of the request headers + * @param tags the request tags + * @return the request trace + */ + default I receiveRequest(Context context, Object request, String operation, Iterable> headers, Iterable> tags) { + return null; + } + + /** + * Signal the response is sent. + * + * @param context the context data attached to the request + * @param response the response sent + * @param payload the payload returned by {@link #receiveRequest} + * @param failure the failure when not {@code null} + * @param tags the response tags + */ + default void sendResponse(Context context, Object response, I payload, Throwable failure, Iterable> tags) { + } + + /** + * Signal a request is sent. + * + * @param context the context data attached to the request + * @param request the request object + * @param operation the request operation + * @param headers a write only-view of the request headers + * @param tags the request tags + * @return the request trace + */ + default O sendRequest(Context context, Object request, String operation, BiConsumer headers, Iterable> tags) { + return null; + } + + /** + * Signal a response has been received. + * + * @param context the context data attached to the request + * @param response the response sent + * @param payload the payload returned by {@link #sendRequest} + * @param failure the failure when not {@code null} + * @param tags the response tags + */ + default void receiveResponse(Context context, Object response, O payload, Throwable failure, Iterable> tags) { + } + + /** + * Close the tracer. + */ + default void close() { + } +} diff --git a/src/main/java/io/vertx/core/tracing/TracingOptions.java b/src/main/java/io/vertx/core/tracing/TracingOptions.java new file mode 100644 index 000000000..93bb44342 --- /dev/null +++ b/src/main/java/io/vertx/core/tracing/TracingOptions.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.tracing; + +import io.vertx.codegen.annotations.DataObject; +import io.vertx.core.json.JsonObject; +import io.vertx.core.spi.VertxTracerFactory; + +/** + * Vert.x tracing base configuration, this class can be extended by provider implementations to configure + * those specific implementations. + * + * @author Julien Viet + */ +@DataObject(generateConverter = true, publicConverter = false) +public class TracingOptions { + + /** + * The default value of tracing enabled false + */ + public static final boolean DEFAULT_TRACING_ENABLED = false; + + private boolean enabled; + private JsonObject json; // Keep a copy of the original json, so we don't lose info when building options subclasses + private VertxTracerFactory factory; + + /** + * Default constructor + */ + public TracingOptions() { + enabled = DEFAULT_TRACING_ENABLED; + } + + /** + * Copy constructor + * + * @param other The other {@link TracingOptions} to copy when creating this + */ + public TracingOptions(TracingOptions other) { + enabled = other.isEnabled(); + factory = other.factory; + } + + /** + * Create an instance from a {@link JsonObject} + * + * @param json the JsonObject to create it from + */ + public TracingOptions(JsonObject json) { + this(); + TracingOptionsConverter.fromJson(json, this); + this.json = json.copy(); + } + + /** + * Will tracing be enabled on the Vert.x instance? + * + * @return true if enabled, false if not. + */ + public boolean isEnabled() { + return enabled; + } + + /** + * Set whether tracing will be enabled on the Vert.x instance. + * + * @param enable true if tracing enabled, or false if not. + * @return a reference to this, so the API can be used fluently + */ + public TracingOptions setEnabled(boolean enable) { + this.enabled = enable; + return this; + } + + /** + * Get the tracer factory to be used when tracing are enabled. + *

+ * If the tracer factory has been programmatically set here, then that will be used when tracing are enabled + * for creating the {@link io.vertx.core.spi.tracing.VertxTracer} instance. + *

+ * Otherwise Vert.x attempts to locate a tracer factory implementation on the classpath. + * + * @return the tracer factory + */ + public VertxTracerFactory getFactory() { + return factory; + } + + /** + * Programmatically set the tracer factory to be used when tracing are enabled. + *

+ * Only valid if {@link TracingOptions#isEnabled} = true. + *

+ * Normally Vert.x will look on the classpath for a tracer factory implementation, but if you want to set one + * programmatically you can use this method. + * + * @param factory the tracer factory + * @return a reference to this, so the API can be used fluently + */ + public TracingOptions setFactory(VertxTracerFactory factory) { + this.factory = factory; + return this; + } + + public TracingOptions copy() { + return new TracingOptions(this); + } + + public JsonObject toJson() { + return json != null ? json.copy() : new JsonObject(); + } + + @Override + public String toString() { + return "TracingOptions{" + + "enabled=" + enabled + + ", json=" + json + + '}'; + } +} diff --git a/src/test/benchmarks/io/vertx/benchmarks/HttpServerHandlerBenchmark.java b/src/test/benchmarks/io/vertx/benchmarks/HttpServerHandlerBenchmark.java index 2ecb582d2..154e22162 100644 --- a/src/test/benchmarks/io/vertx/benchmarks/HttpServerHandlerBenchmark.java +++ b/src/test/benchmarks/io/vertx/benchmarks/HttpServerHandlerBenchmark.java @@ -222,7 +222,7 @@ public class HttpServerHandlerBenchmark extends BenchmarkBase { - ContextInternal context = new EventLoopContext(vertx, vertxChannel.eventLoop(), null, null, null, Thread.currentThread().getContextClassLoader()); + ContextInternal context = new EventLoopContext(vertx, null, vertxChannel.eventLoop(), null, null, null, Thread.currentThread().getContextClassLoader()); Handler app = request -> { HttpServerResponse response = request.response(); MultiMap headers = response.headers(); diff --git a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java index 53c96e32c..e770f60d6 100644 --- a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java +++ b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java @@ -14,6 +14,8 @@ package io.vertx.core.impl; import io.vertx.core.Handler; import io.vertx.core.Vertx; +import java.util.concurrent.ConcurrentMap; + /** * @author Julien Viet */ @@ -25,7 +27,7 @@ public class BenchmarkContext extends ContextImpl { } public BenchmarkContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) { - super(vertx, internalBlockingPool, workerPool, deployment, tccl); + super(vertx, null, internalBlockingPool, workerPool, deployment, tccl); } @Override @@ -47,4 +49,9 @@ public class BenchmarkContext extends ContextImpl { public boolean isEventLoopContext() { return false; } + + @Override + public ContextInternal duplicate(ContextInternal in) { + throw new UnsupportedOperationException(); + } } diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java index 4c926e6b8..6b7cd3a25 100644 --- a/src/test/java/io/vertx/core/ContextTest.java +++ b/src/test/java/io/vertx/core/ContextTest.java @@ -14,6 +14,7 @@ package io.vertx.core; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.TaskQueue; import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.VertxThread; import io.vertx.core.impl.WorkerPool; import io.vertx.test.core.VertxTestBase; import org.junit.Test; @@ -28,6 +29,24 @@ import java.util.concurrent.atomic.AtomicReference; */ public class ContextTest extends VertxTestBase { + private ExecutorService workerExecutor; + + private ContextInternal createWorkerContext() { + return ((VertxInternal) vertx).createWorkerContext(null, new WorkerPool(workerExecutor, null), Thread.currentThread().getContextClassLoader()); + } + + @Override + public void setUp() throws Exception { + workerExecutor = Executors.newSingleThreadExecutor(r -> new VertxThread(r, "vert.x-worker-thread", true, 10, TimeUnit.SECONDS)); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + workerExecutor.shutdown(); + super.tearDown(); + } + @Test public void testRunOnContext() throws Exception { vertx.runOnContext(v -> { @@ -189,22 +208,13 @@ public class ContextTest extends VertxTestBase { } @Test - public void testWorkerExecuteFromIo() throws Exception { - AtomicReference workerContext = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - vertx.deployVerticle(new AbstractVerticle() { - @Override - public void start() throws Exception { - workerContext.set((ContextInternal) context); - latch.countDown(); - } - }, new DeploymentOptions().setWorker(true)); - awaitLatch(latch); - workerContext.get().nettyEventLoop().execute(() -> { + public void testWorkerExecuteFromIo() { + ContextInternal workerContext = createWorkerContext(); + workerContext.nettyEventLoop().execute(() -> { assertNull(Vertx.currentContext()); - workerContext.get().nettyEventLoop().execute(() -> { - workerContext.get().executeFromIO(v -> { - assertSame(workerContext.get(), Vertx.currentContext()); + workerContext.nettyEventLoop().execute(() -> { + workerContext.executeFromIO(v -> { + assertSame(workerContext, Vertx.currentContext()); assertTrue(Context.isOnWorkerThread()); testComplete(); }); @@ -409,8 +419,7 @@ public class ContextTest extends VertxTestBase { @Test public void testExecuteFromIOWorkerFromNonVertxThread() { assertEquals("true", System.getProperty("vertx.threadChecks")); - ExecutorService a = Executors.newSingleThreadExecutor(); - ContextInternal ctx = ((VertxInternal) vertx).createWorkerContext(null, new WorkerPool(a, null), Thread.currentThread().getContextClassLoader()); + ContextInternal ctx = createWorkerContext(); AtomicBoolean called = new AtomicBoolean(); try { ctx.executeFromIO(v -> { @@ -432,4 +441,100 @@ public class ContextTest extends VertxTestBase { ctx.reportException(expected); assertSame(expected, err.get()); } + + @Test + public void testDuplicate() throws Exception { + ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicate = ctx.duplicate(); + checkDuplicate(ctx, duplicate); + } + + @Test + public void testDuplicateWorker() throws Exception { + ContextInternal ctx = createWorkerContext(); + ContextInternal duplicate = ctx.duplicate(); + checkDuplicate(ctx, duplicate); + } + + @Test + public void testDuplicateTwice() throws Exception { + ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = ctx.duplicate().duplicate(); + checkDuplicate(ctx, duplicated); + } + + @Test + public void testDuplicateWith() throws Exception { + ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal other = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = ctx.duplicate(other); + checkDuplicate(ctx, duplicated); + checkDuplicateWith(other, duplicated); + } + + private void checkDuplicate(ContextInternal ctx, ContextInternal duplicated) throws Exception { + assertSame(ctx.nettyEventLoop(), duplicated.nettyEventLoop()); + assertSame(ctx.getDeployment(), duplicated.getDeployment()); + assertSame(ctx.classLoader(), duplicated.classLoader()); + assertSame(ctx.owner(), duplicated.owner()); + Object shared = new Object(); + Object local = new Object(); + ctx.put("key", shared); + ctx.putLocal("key", local); + assertSame(shared, duplicated.get("key")); + assertNull(duplicated.getLocal("key")); + assertTrue(duplicated.remove("key")); + assertNull(ctx.get("key")); + + CountDownLatch latch1 = new CountDownLatch(1); + duplicated.runOnContext(v -> { + assertSame(Vertx.currentContext(), duplicated); + latch1.countDown(); + }); + awaitLatch(latch1); + + CountDownLatch latch2 = new CountDownLatch(1); + Throwable failure = new Throwable(); + ctx.exceptionHandler(err -> { + assertSame(failure, err); + latch2.countDown(); + }); + duplicated.reportException(failure); + awaitLatch(latch2); + + CountDownLatch latch3 = new CountDownLatch(1); + duplicated.runOnContext(v -> { + vertx.setTimer(10, id -> { + assertSame(duplicated, Vertx.currentContext()); + latch3.countDown(); + }); + }); + awaitLatch(latch3); + + CountDownLatch latch4 = new CountDownLatch(1); + duplicated.runOnContext(v -> { + vertx.executeBlocking(Future::complete, res -> { + assertSame(duplicated, Vertx.currentContext()); + latch4.countDown(); + }); + }); + awaitLatch(latch4); + } + + @Test + public void testDuplicateWithTwice() throws Exception { + ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal other = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = ctx.duplicate().duplicate(other); + checkDuplicate(ctx, duplicated); + checkDuplicateWith(other, duplicated); + } + + private void checkDuplicateWith(ContextInternal ctx, ContextInternal duplicated) { + Object val = new Object(); + ctx.putLocal("key", val); + assertSame(val, duplicated.getLocal("key")); + duplicated.removeLocal("key"); + assertNull(ctx.getLocal("key")); + } } diff --git a/src/test/java/io/vertx/core/spi/tracing/TracerTest.java b/src/test/java/io/vertx/core/spi/tracing/TracerTest.java new file mode 100644 index 000000000..e3ab4d203 --- /dev/null +++ b/src/test/java/io/vertx/core/spi/tracing/TracerTest.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.spi.tracing; + +import io.vertx.test.core.VertxTestBase; +import io.vertx.test.faketracer.FakeTracer; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; + +public class TracerTest extends VertxTestBase { + + private FakeTracer tracer = new FakeTracer(); + + @Override + protected VertxTracer getTracer() { + return tracer; + } + + @Test + public void testClose() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + assertEquals(0, tracer.closeCount()); + vertx.close(ar -> latch.countDown()); + awaitLatch(latch); + assertEquals(1, tracer.closeCount()); + } +} diff --git a/src/test/java/io/vertx/test/core/VertxTestBase.java b/src/test/java/io/vertx/test/core/VertxTestBase.java index 72cb1536f..f21d03619 100644 --- a/src/test/java/io/vertx/test/core/VertxTestBase.java +++ b/src/test/java/io/vertx/test/core/VertxTestBase.java @@ -26,11 +26,12 @@ import io.vertx.core.net.PemKeyCertOptions; import io.vertx.core.net.PfxOptions; import io.vertx.core.net.TCPSSLOptions; import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.core.spi.tracing.VertxTracer; +import io.vertx.core.tracing.TracingOptions; import io.vertx.test.fakecluster.FakeClusterManager; import org.junit.Rule; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Collections; @@ -74,9 +75,17 @@ public class VertxTestBase extends AsyncTestBase { } } + protected VertxTracer getTracer() { + return null; + } + protected VertxOptions getOptions() { VertxOptions options = new VertxOptions(); options.setPreferNativeTransport(USE_NATIVE_TRANSPORT); + VertxTracer tracer = getTracer(); + if (tracer != null) { + options.setTracingOptions(new TracingOptions().setEnabled(true).setFactory(opts -> tracer)); + } return options; } @@ -243,7 +252,7 @@ public class VertxTestBase extends AsyncTestBase { for (StackTraceElement elt : Thread.currentThread().getStackTrace()) { String className = elt.getClassName(); String methodName = elt.getMethodName(); - if (className.equals("io.vertx.core.impl.ContextImpl") && methodName.equals("dispatch")) { + if (className.equals("io.vertx.core.impl.AbstractContext") && methodName.equals("dispatch")) { return; } } diff --git a/src/test/java/io/vertx/test/faketracer/FakeTracer.java b/src/test/java/io/vertx/test/faketracer/FakeTracer.java new file mode 100644 index 000000000..9d83ea0d4 --- /dev/null +++ b/src/test/java/io/vertx/test/faketracer/FakeTracer.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.test.faketracer; + +import io.vertx.core.Context; +import io.vertx.core.Vertx; +import io.vertx.core.spi.tracing.VertxTracer; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +/** + * @author Pavol Loffay + */ +public class FakeTracer implements VertxTracer { + + private static final String ACTIVE_SCOPE_KEY = "active.scope"; + + private AtomicInteger idGenerator = new AtomicInteger(0); + List finishedSpans = new CopyOnWriteArrayList<>(); + private AtomicInteger closeCount = new AtomicInteger(); + + int nextId() { + return idGenerator.getAndIncrement(); + } + + public Span newTrace(String operation) { + return new Span(this, nextId(), nextId(), nextId(), operation); + } + + public Span newTrace() { + return new Span(this, nextId(), nextId(), nextId(), null); + } + + public Span activeSpan() { + return activeSpan(Vertx.currentContext()); + } + + public Span activeSpan(Context data) { + Scope scope = data.getLocal(ACTIVE_SCOPE_KEY); + return scope != null ? scope.wrapped : null; + } + + public Scope activate(Span span) { + return activate(Vertx.currentContext(), span); + } + + public Scope activate(Context context, Span span) { + Scope toRestore = context.getLocal(ACTIVE_SCOPE_KEY); + Scope active = new Scope(this, span, toRestore); + context.putLocal(ACTIVE_SCOPE_KEY, active); + return active; + } + + public void encode(Span span, BiConsumer headers) { + headers.accept("span-trace-id", "" + span.traceId); + headers.accept("span-parent-id", "" + span.parentId); + headers.accept("span-id", "" + span.id); + } + + public Span decode(String operation, Iterable> headers) { + String traceId = null; + String spanId = null; + String spanParentId = null; + for (Map.Entry header : headers) { + switch (header.getKey()) { + case "span-trace-id": + traceId = header.getValue(); + break; + case "span-id": + spanId = header.getValue(); + break; + case "span-parent-id": + spanParentId = header.getValue(); + break; + } + } + if (traceId != null && spanId != null && spanParentId != null) { + return new Span(this, Integer.parseInt(traceId), Integer.parseInt(spanParentId), + Integer.parseInt(spanId), operation); + } + return null; + } + + private Span getServerSpan(String operation, Iterable> headers) { + Span parent = decode(operation, headers); + if (parent != null) { + return parent.createChild(operation); + } else { + return newTrace(operation); + } + } + + @Override + public Span receiveRequest(Context context, Object inbound, String operation, Iterable> headers, Iterable> tags) { + Span serverSpan = getServerSpan(operation, headers); + serverSpan.addTag("span_kind", "server"); + addTags(serverSpan, tags); + // Create scope + return activate(context, serverSpan).span(); + } + + @Override + public void sendResponse(Context context, Object response, Span span, Throwable failure, Iterable> tags) { + if (span != null) { + addTags(span, tags); + span.finish(); + } + } + + @Override + public Span sendRequest(Context context, Object outbound, String operation, BiConsumer headers, Iterable> tags) { + Span span = activeSpan(context); + if (span == null) { + span = newTrace(operation); + } else { + span = span.createChild(operation); + } + span.addTag("span_kind", "client"); + addTags(span, tags); + encode(span, headers); + return span; + } + + @Override + public void receiveResponse(Context context, Object response, Span span, Throwable failure, Iterable> tags) { + if (span != null) { + addTags(span, tags); + span.finish(); + } + } + + private void addTags(Span span, Iterable> tags) { + for (Map.Entry tag : tags) { + span.addTag(tag.getKey(), tag.getValue()); + } + } + + public List getFinishedSpans() { + return Collections.unmodifiableList(finishedSpans); + } + + @Override + public void close() { + closeCount.incrementAndGet(); + } + + public int closeCount() { + return closeCount.get(); + } +} diff --git a/src/test/java/io/vertx/test/faketracer/Scope.java b/src/test/java/io/vertx/test/faketracer/Scope.java new file mode 100644 index 000000000..0631932fe --- /dev/null +++ b/src/test/java/io/vertx/test/faketracer/Scope.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.test.faketracer; + +/** + * @author Pavol Loffay + */ +public class Scope { + Span wrapped; + private Scope toRestore; + private FakeTracer tracer; + + public Scope(FakeTracer tracer, Span span, Scope toRestore) { + this.tracer = tracer; + this.wrapped = span; + this.toRestore = toRestore; + } + + public Span span() { + return wrapped; + } + + public void close() { + // tracer.currentSpan.set(toRestore); + } +} diff --git a/src/test/java/io/vertx/test/faketracer/Span.java b/src/test/java/io/vertx/test/faketracer/Span.java new file mode 100644 index 000000000..6a942df84 --- /dev/null +++ b/src/test/java/io/vertx/test/faketracer/Span.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.test.faketracer; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author Pavol Loffay + */ +public class Span { + public final int traceId; + public final int parentId; + public final int id; + public final String operation; + final FakeTracer tracer; + private AtomicBoolean finished = new AtomicBoolean(); + private final Map tags = new ConcurrentHashMap<>(); + + Span(FakeTracer tracer, int traceId, int parentId, int id, String operation) { + this.tracer = tracer; + this.traceId = traceId; + this.parentId = parentId; + this.id = id; + this.operation = operation; + } + + public Map getTags() { + return Collections.unmodifiableMap(tags); + } + + public Span createChild() { + return createChild(null); + } + + public Span createChild(String operation) { + return new Span(tracer, traceId, id, tracer.nextId(), operation); + } + + public void addTag(String key, String value) { + if (value != null) { + tags.put(key, value); + } + } + + public void finish() { + if (finished.getAndSet(true)) { + throw new IllegalStateException("Finishing already finished span!"); + } + tracer.finishedSpans.add(this); + } + + @Override + public boolean equals(Object obj) { + Span span = (Span) obj; + return span.traceId == traceId && span.parentId == parentId && span.id == id; + } + + @Override + public String toString() { + return "Span[traceId=" + traceId + ",parentId=" + parentId + ",id=" + id + "]"; + } +}