Tracer SPI

This commit is contained in:
Julien Viet
2019-01-26 10:47:52 +01:00
parent f679110824
commit 82b94ad9c0
25 changed files with 1271 additions and 166 deletions

View File

@@ -1931,6 +1931,24 @@ Set the ALPN usage.
+++
|===
[[TracingOptions]]
== TracingOptions
++++
Vert.x tracing base configuration, this class can be extended by provider implementations to configure
those specific implementations.
++++
'''
[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[enabled]]`@enabled`|`Boolean`|+++
Set whether tracing will be enabled on the Vert.x instance.
+++
|===
[[VertxOptions]]
== VertxOptions
@@ -2022,6 +2040,7 @@ Set wether to prefer the native transport to the JDK transport.
|[[quorumSize]]`@quorumSize`|`Number (int)`|+++
Set the quorum size to be used when HA is enabled.
+++
|[[tracingOptions]]`@tracingOptions`|`link:dataobjects.html#TracingOptions[TracingOptions]`|-
|[[warningExceptionTime]]`@warningExceptionTime`|`Number (long)`|+++
Set the threshold value above this, the blocked warning contains a stack trace. in link.
The default value of link is

View File

@@ -129,6 +129,11 @@ import java.time.format.DateTimeFormatter;
obj.setQuorumSize(((Number)member.getValue()).intValue());
}
break;
case "tracingOptions":
if (member.getValue() instanceof JsonObject) {
obj.setTracingOptions(new io.vertx.core.tracing.TracingOptions((JsonObject)member.getValue()));
}
break;
case "warningExceptionTime":
if (member.getValue() instanceof Number) {
obj.setWarningExceptionTime(((Number)member.getValue()).longValue());
@@ -196,6 +201,9 @@ import java.time.format.DateTimeFormatter;
}
json.put("preferNativeTransport", obj.getPreferNativeTransport());
json.put("quorumSize", obj.getQuorumSize());
if (obj.getTracingOptions() != null) {
json.put("tracingOptions", obj.getTracingOptions().toJson());
}
json.put("warningExceptionTime", obj.getWarningExceptionTime());
if (obj.getWarningExceptionTimeUnit() != null) {
json.put("warningExceptionTimeUnit", obj.getWarningExceptionTimeUnit().name());

View File

@@ -0,0 +1,33 @@
package io.vertx.core.tracing;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
/**
* Converter for {@link io.vertx.core.tracing.TracingOptions}.
* NOTE: This class has been automatically generated from the {@link io.vertx.core.tracing.TracingOptions} original class using Vert.x codegen.
*/
class TracingOptionsConverter {
static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, TracingOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "enabled":
if (member.getValue() instanceof Boolean) {
obj.setEnabled((Boolean)member.getValue());
}
break;
}
}
}
static void toJson(TracingOptions obj, JsonObject json) {
toJson(obj, json.getMap());
}
static void toJson(TracingOptions obj, java.util.Map<String, Object> json) {
json.put("enabled", obj.isEnabled());
}
}

View File

@@ -204,6 +204,33 @@ public interface Context {
*/
boolean remove(String key);
/**
* Get some local data from the context.
*
* @param key the key of the data
* @param <T> the type of the data
* @return the data
*/
<T> T getLocal(String key);
/**
* Put some local data in the context.
* <p>
* This can be used to share data between different handlers that share a context
*
* @param key the key of the data
* @param value the data
*/
void putLocal(String key, Object value);
/**
* Remove some local data from the context.
*
* @param key the key to remove
* @return true if removed successfully, false otherwise
*/
boolean removeLocal(String key);
/**
* @return The Vertx instance that created the context
*/

View File

@@ -19,6 +19,7 @@ import io.vertx.core.impl.cpu.CpuCoreSensor;
import io.vertx.core.json.JsonObject;
import io.vertx.core.metrics.MetricsOptions;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.tracing.TracingOptions;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -156,6 +157,7 @@ public class VertxOptions {
private int quorumSize = DEFAULT_QUORUM_SIZE;
private String haGroup = DEFAULT_HA_GROUP;
private MetricsOptions metricsOptions = new MetricsOptions();
private TracingOptions tracingOptions = new TracingOptions();
private FileSystemOptions fileSystemOptions = new FileSystemOptions();
private long warningExceptionTime = DEFAULT_WARNING_EXCEPTION_TIME;
private EventBusOptions eventBusOptions = new EventBusOptions();
@@ -197,6 +199,7 @@ public class VertxOptions {
this.maxWorkerExecuteTimeUnit = other.maxWorkerExecuteTimeUnit;
this.warningExceptionTimeUnit = other.warningExceptionTimeUnit;
this.blockedThreadCheckIntervalUnit = other.blockedThreadCheckIntervalUnit;
this.tracingOptions = other.tracingOptions != null ? other.tracingOptions.copy() : null;
}
/**
@@ -805,6 +808,15 @@ public class VertxOptions {
return this;
}
public TracingOptions getTracingOptions() {
return tracingOptions;
}
public VertxOptions setTracingOptions(TracingOptions tracingOptions) {
this.tracingOptions = tracingOptions;
return this;
}
public JsonObject toJson() {
JsonObject json = new JsonObject();
VertxOptionsConverter.toJson(this, json);

View File

@@ -0,0 +1,160 @@
/*
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Starter;
import io.vertx.core.impl.launcher.VertxCommandLauncher;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
/**
* A context implementation that does not hold any specific state.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
* @author <a href="http://tfox.org">Tim Fox</a>
*/
abstract class AbstractContext implements ContextInternal {
private static final String THREAD_CHECKS_PROP_NAME = "vertx.threadChecks";
private static final boolean THREAD_CHECKS = Boolean.getBoolean(THREAD_CHECKS_PROP_NAME);
abstract void executeAsync(Handler<Void> task);
abstract <T> void execute(T value, Handler<T> task);
@Override
public abstract boolean isEventLoopContext();
@Override
public boolean isWorkerContext() {
return !isEventLoopContext();
}
static boolean isOnVertxThread(boolean worker) {
Thread t = Thread.currentThread();
if (t instanceof VertxThread) {
VertxThread vt = (VertxThread) t;
return vt.isWorker() == worker;
}
return false;
}
// This is called to execute code where the origin is IO (from Netty probably).
// In such a case we should already be on an event loop thread (as Netty manages the event loops)
// but check this anyway, then execute directly
@Override
public final void executeFromIO(Handler<Void> task) {
executeFromIO(null, task);
}
@Override
public final void schedule(Handler<Void> task) {
schedule(null, task);
}
@Override
public final void dispatch(Handler<Void> task) {
dispatch(null, task);
}
@Override
public final <T> void dispatch(T arg, Handler<T> task) {
VertxThread currentThread = ContextInternal.beginDispatch(this);
try {
task.handle(arg);
} catch (Throwable t) {
reportException(t);
} finally {
ContextInternal.endDispatch(currentThread);
}
}
@Override
public final <T> void executeFromIO(T value, Handler<T> task) {
if (THREAD_CHECKS) {
checkEventLoopThread();
}
execute(value, task);
}
private void checkEventLoopThread() {
Thread current = Thread.currentThread();
if (!(current instanceof VertxThread)) {
throw new IllegalStateException("Expected to be on Vert.x thread, but actually on: " + current);
} else if (((VertxThread) current).isWorker()) {
throw new IllegalStateException("Event delivered on unexpected worker thread " + current);
}
}
// Run the task asynchronously on this same context
@Override
public final void runOnContext(Handler<Void> task) {
try {
executeAsync(task);
} catch (RejectedExecutionException ignore) {
// Pool is already shut down
}
}
@Override
public final List<String> processArgs() {
// As we are maintaining the launcher and starter class, choose the right one.
List<String> processArgument = VertxCommandLauncher.getProcessArguments();
return processArgument != null ? processArgument : Starter.PROCESS_ARGS;
}
@Override
public final <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(blockingCodeHandler, true, resultHandler);
}
@Override
public final ContextInternal duplicate() {
return duplicate(null);
}
@SuppressWarnings("unchecked")
@Override
public final <T> T get(String key) {
return (T) contextData().get(key);
}
@Override
public final void put(String key, Object value) {
contextData().put(key, value);
}
@Override
public final boolean remove(String key) {
return contextData().remove(key) != null;
}
@SuppressWarnings("unchecked")
@Override
public final <T> T getLocal(String key) {
return (T) localContextData().get(key);
}
@Override
public final void putLocal(String key, Object value) {
localContextData().put(key, value);
}
@Override
public final boolean removeLocal(String key) {
return localContextData().remove(key) != null;
}
}

View File

@@ -13,19 +13,18 @@ package io.vertx.core.impl;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Starter;
import io.vertx.core.impl.launcher.VertxCommandLauncher;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@@ -34,7 +33,7 @@ import java.util.concurrent.RejectedExecutionException;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
abstract class ContextImpl implements ContextInternal {
abstract class ContextImpl extends AbstractContext {
private static EventLoop getEventLoop(VertxInternal vertx) {
EventLoopGroup group = vertx.getEventLoopGroup();
@@ -47,34 +46,35 @@ abstract class ContextImpl implements ContextInternal {
private static final Logger log = LoggerFactory.getLogger(ContextImpl.class);
private static final String THREAD_CHECKS_PROP_NAME = "vertx.threadChecks";
private static final String DISABLE_TIMINGS_PROP_NAME = "vertx.disableContextTimings";
private static final boolean THREAD_CHECKS = Boolean.getBoolean(THREAD_CHECKS_PROP_NAME);
static final boolean DISABLE_TIMINGS = Boolean.getBoolean(DISABLE_TIMINGS_PROP_NAME);
protected final VertxInternal owner;
protected final VertxTracer<?, ?> tracer;
protected final JsonObject config;
private final Deployment deployment;
private final CloseHooks closeHooks;
private final ClassLoader tccl;
private final EventLoop eventLoop;
private ConcurrentMap<Object, Object> contextData;
private ConcurrentMap<Object, Object> data;
private ConcurrentMap<Object, Object> localData;
private volatile Handler<Throwable> exceptionHandler;
private final WorkerPool internalBlockingPool;
private final TaskQueue internalOrderedTasks;
final TaskQueue internalOrderedTasks;
final WorkerPool internalBlockingPool;
final WorkerPool workerPool;
final TaskQueue orderedTasks;
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment,
protected ContextImpl(VertxInternal vertx, VertxTracer<?, ?> tracer, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment,
ClassLoader tccl) {
this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deployment, tccl);
this(vertx, tracer, getEventLoop(vertx), internalBlockingPool, workerPool, deployment, tccl);
}
protected ContextImpl(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment,
protected ContextImpl(VertxInternal vertx, VertxTracer<?, ?> tracer, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment,
ClassLoader tccl) {
if (VertxThread.DISABLE_TCCL && tccl != ClassLoader.getSystemClassLoader()) {
log.warn("You have disabled TCCL checks but you have a custom TCCL to set.");
}
this.tracer = tracer;
this.deployment = deployment;
this.config = deployment != null ? deployment.config() : new JsonObject();
this.eventLoop = eventLoop;
@@ -105,100 +105,6 @@ abstract class ContextImpl implements ContextInternal {
VertxThreadFactory.unsetContext(this);
}
abstract void executeAsync(Handler<Void> task);
abstract <T> void execute(T value, Handler<T> task);
@Override
public abstract boolean isEventLoopContext();
@Override
@SuppressWarnings("unchecked")
public <T> T get(String key) {
return (T) contextData().get(key);
}
@Override
public void put(String key, Object value) {
contextData().put(key, value);
}
@Override
public boolean remove(String key) {
return contextData().remove(key) != null;
}
@Override
public boolean isWorkerContext() {
return !isEventLoopContext();
}
static boolean isOnVertxThread(boolean worker) {
Thread t = Thread.currentThread();
if (t instanceof VertxThread) {
VertxThread vt = (VertxThread) t;
return vt.isWorker() == worker;
}
return false;
}
// This is called to execute code where the origin is IO (from Netty probably).
// In such a case we should already be on an event loop thread (as Netty manages the event loops)
// but check this anyway, then execute directly
@Override
public final void executeFromIO(Handler<Void> task) {
executeFromIO(null, task);
}
@Override
public final void schedule(Handler<Void> task) {
schedule(null, task);
}
@Override
public final void dispatch(Handler<Void> task) {
dispatch(null, task);
}
@Override
public final <T> void dispatch(T arg, Handler<T> task) {
VertxThread currentThread = ContextInternal.beginDispatch(this);
try {
task.handle(arg);
} catch (Throwable t) {
reportException(t);
} finally {
ContextInternal.endDispatch(currentThread);
}
}
@Override
public final <T> void executeFromIO(T value, Handler<T> task) {
if (THREAD_CHECKS) {
checkEventLoopThread();
}
execute(value, task);
}
private void checkEventLoopThread() {
Thread current = Thread.currentThread();
if (!(current instanceof VertxThread)) {
throw new IllegalStateException("Expected to be on Vert.x thread, but actually on: " + current);
} else if (((VertxThread) current).isWorker()) {
throw new IllegalStateException("Event delivered on unexpected worker thread " + current);
}
}
// Run the task asynchronously on this same context
@Override
public void runOnContext(Handler<Void> task) {
try {
executeAsync(task);
} catch (RejectedExecutionException ignore) {
// Pool is already shut down
}
}
@Override
public String deploymentID() {
return deployment != null ? deployment.deploymentID() : null;
@@ -209,13 +115,6 @@ abstract class ContextImpl implements ContextInternal {
return config;
}
@Override
public List<String> processArgs() {
// As we are maintaining the launcher and starter class, choose the right one.
List<String> processArgument = VertxCommandLauncher.getProcessArguments();
return processArgument != null ? processArgument : Starter.PROCESS_ARGS;
}
public EventLoop nettyEventLoop() {
return eventLoop;
}
@@ -226,25 +125,20 @@ abstract class ContextImpl implements ContextInternal {
@Override
public <T> void executeBlockingInternal(Handler<Future<T>> action, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(action, resultHandler, internalBlockingPool, internalOrderedTasks);
executeBlocking(this, action, resultHandler, internalBlockingPool, internalOrderedTasks);
}
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(blockingCodeHandler, resultHandler, workerPool, ordered ? orderedTasks : null);
}
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(blockingCodeHandler, true, resultHandler);
executeBlocking(this, blockingCodeHandler, resultHandler, workerPool, ordered ? orderedTasks : null);
}
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(blockingCodeHandler, resultHandler, workerPool, queue);
executeBlocking(this, blockingCodeHandler, resultHandler, workerPool, queue);
}
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
static <T> void executeBlocking(ContextInternal context, Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
WorkerPool workerPool, TaskQueue queue) {
PoolMetrics metrics = workerPool.metrics();
@@ -261,7 +155,7 @@ abstract class ContextImpl implements ContextInternal {
}
Future<T> res = Future.future();
try {
ContextInternal.setContext(this);
ContextInternal.setContext(context);
blockingCodeHandler.handle(res);
} catch (Throwable e) {
res.tryFail(e);
@@ -274,7 +168,7 @@ abstract class ContextImpl implements ContextInternal {
metrics.end(execMetric, res.succeeded());
}
if (resultHandler != null) {
res.setHandler(ar -> runOnContext(v -> resultHandler.handle(ar)));
res.setHandler(ar -> context.runOnContext(v -> resultHandler.handle(ar)));
}
};
Executor exec = workerPool.executor();
@@ -292,6 +186,11 @@ abstract class ContextImpl implements ContextInternal {
}
}
@Override
public VertxTracer tracer() {
return tracer;
}
@Override
public ClassLoader classLoader() {
return tccl;
@@ -299,14 +198,22 @@ abstract class ContextImpl implements ContextInternal {
@Override
public synchronized ConcurrentMap<Object, Object> contextData() {
if (contextData == null) {
contextData = new ConcurrentHashMap<>();
if (data == null) {
data = new ConcurrentHashMap<>();
}
return contextData;
return data;
}
@Override
public synchronized ConcurrentMap<Object, Object> localContextData() {
if (localData == null) {
localData = new ConcurrentHashMap<>();
}
return localData;
}
public void reportException(Throwable t) {
Handler<Throwable> handler = this.exceptionHandler;
Handler<Throwable> handler = exceptionHandler;
if (handler == null) {
handler = owner.exceptionHandler();
}
@@ -340,4 +247,120 @@ abstract class ContextImpl implements ContextInternal {
}
return deployment.deploymentOptions().getInstances();
}
static abstract class Duplicated<C extends ContextImpl> extends AbstractContext {
protected final C delegate;
private final ContextInternal other;
private ConcurrentMap<Object, Object> localData;
public Duplicated(C delegate, ContextInternal other) {
this.delegate = delegate;
this.other = other;
}
@Override
public VertxTracer tracer() {
return delegate.tracer();
}
public final <T> void executeBlockingInternal(Handler<Future<T>> action, Handler<AsyncResult<T>> resultHandler) {
ContextImpl.executeBlocking(this, action, resultHandler, delegate.internalBlockingPool, delegate.internalOrderedTasks);
}
@Override
public final <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
ContextImpl.executeBlocking(this, blockingCodeHandler, resultHandler, delegate.workerPool, ordered ? delegate.orderedTasks : null);
}
@Override
public final <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler) {
ContextImpl.executeBlocking(this, blockingCodeHandler, resultHandler, delegate.workerPool, queue);
}
@Override
public final <T> void schedule(T value, Handler<T> task) {
delegate.schedule(value, task);
}
@Override
public final String deploymentID() {
return delegate.deploymentID();
}
@Override
public final JsonObject config() {
return delegate.config();
}
@Override
public final int getInstanceCount() {
return delegate.getInstanceCount();
}
@Override
public final Context exceptionHandler(Handler<Throwable> handler) {
delegate.exceptionHandler(handler);
return this;
}
@Override
public final Handler<Throwable> exceptionHandler() {
return delegate.exceptionHandler();
}
@Override
public final void addCloseHook(Closeable hook) {
delegate.addCloseHook(hook);
}
@Override
public final void removeCloseHook(Closeable hook) {
delegate.removeCloseHook(hook);
}
@Override
public final EventLoop nettyEventLoop() {
return delegate.nettyEventLoop();
}
@Override
public final Deployment getDeployment() {
return delegate.getDeployment();
}
@Override
public final VertxInternal owner() {
return delegate.owner();
}
@Override
public final ClassLoader classLoader() {
return delegate.classLoader();
}
@Override
public final void reportException(Throwable t) {
delegate.reportException(t);
}
@Override
public final ConcurrentMap<Object, Object> contextData() {
return delegate.contextData();
}
@Override
public final ConcurrentMap<Object, Object> localContextData() {
if (other == null) {
synchronized (this) {
if (localData == null) {
localData = new ConcurrentHashMap<>();
}
return localData;
}
} else {
return other.localContextData();
}
}
}
}

View File

@@ -17,6 +17,7 @@ import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.concurrent.ConcurrentMap;
@@ -69,10 +70,10 @@ public interface ContextInternal extends Context {
}
}
static void setContext(ContextInternal context) {
static ContextInternal setContext(ContextInternal context) {
Thread current = Thread.currentThread();
if (current instanceof VertxThread) {
((VertxThread)current).setContext(context);
return ((VertxThread)current).setContext(context);
} else {
throw new IllegalStateException("Attempt to setContext on non Vert.x thread " + Thread.currentThread());
}
@@ -185,9 +186,43 @@ public interface ContextInternal extends Context {
*/
ConcurrentMap<Object, Object> contextData();
/**
* @return the {@link ConcurrentMap} used to store local context data
*/
ConcurrentMap<Object, Object> localContextData();
/**
* @return the classloader associated with this context
*/
ClassLoader classLoader();
/**
* @return the tracer for this context
*/
VertxTracer tracer();
/**
* Returns a context which shares the whole behavior of this context but not the {@link #localContextData()} which
* remains private to the context:
* <ul>
* <li>same concurrency</li>
* <li>same exception handler</li>
* <li>same context context</li>
* <li>same deployment</li>
* <li>same config</li>
* <li>same classloader</li>
* </ul>
* <p>
* The duplicated context will have its own private local context data.
*
* @return a context whose behavior will is equivalent to this context but with new private
*/
ContextInternal duplicate();
/**
* Like {@link #duplicate()} but the duplicated context local data will adopt the local data of the specified
* {@code context} argument.
*/
ContextInternal duplicate(ContextInternal context);
}

View File

@@ -15,6 +15,9 @@ import io.netty.channel.EventLoop;
import io.vertx.core.Handler;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.concurrent.ConcurrentMap;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
@@ -23,14 +26,14 @@ public class EventLoopContext extends ContextImpl {
private static final Logger log = LoggerFactory.getLogger(EventLoopContext.class);
EventLoopContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment,
ClassLoader tccl) {
super(vertx, internalBlockingPool, workerPool, deployment, tccl);
EventLoopContext(VertxInternal vertx, VertxTracer<?, ?> tracer, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment,
ClassLoader tccl) {
super(vertx, tracer, internalBlockingPool, workerPool, deployment, tccl);
}
public EventLoopContext(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment,
public EventLoopContext(VertxInternal vertx, VertxTracer<?, ?> tracer, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment,
ClassLoader tccl) {
super(vertx, eventLoop, internalBlockingPool, workerPool, deployment, tccl);
super(vertx, tracer, eventLoop, internalBlockingPool, workerPool, deployment, tccl);
}
void executeAsync(Handler<Void> task) {
@@ -52,4 +55,34 @@ public class EventLoopContext extends ContextImpl {
return true;
}
@Override
public ContextInternal duplicate(ContextInternal in) {
return new Duplicated(this, in);
}
static class Duplicated extends ContextImpl.Duplicated<EventLoopContext> {
Duplicated(EventLoopContext delegate, ContextInternal other) {
super(delegate, other);
}
void executeAsync(Handler<Void> task) {
nettyEventLoop().execute(() -> dispatch(null, task));
}
@Override
<T> void execute(T value, Handler<T> task) {
dispatch(value, task);
}
@Override
public boolean isEventLoopContext() {
return true;
}
@Override
public ContextInternal duplicate(ContextInternal context) {
return new Duplicated(delegate, context);
}
}
}

View File

@@ -39,7 +39,6 @@ import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.impl.HttpClientImpl;
import io.vertx.core.http.impl.HttpServerImpl;
import io.vertx.core.impl.resolver.DnsResolverProvider;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
@@ -52,6 +51,7 @@ import io.vertx.core.net.impl.ServerID;
import io.vertx.core.net.impl.transport.Transport;
import io.vertx.core.shareddata.SharedData;
import io.vertx.core.shareddata.impl.SharedDataImpl;
import io.vertx.core.spi.VertxTracerFactory;
import io.vertx.core.spi.VerticleFactory;
import io.vertx.core.spi.VertxMetricsFactory;
import io.vertx.core.spi.cluster.ClusterManager;
@@ -59,6 +59,7 @@ import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.core.spi.tracing.VertxTracer;
import java.io.File;
import java.io.IOException;
@@ -134,6 +135,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private final TimeUnit defaultWorkerMaxExecTimeUnit;
private final CloseHooks closeHooks;
private final Transport transport;
final VertxTracer tracer;
private VertxImpl(VertxOptions options, Transport transport) {
// Sanity check
@@ -169,6 +171,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
this.addressResolverOptions = options.getAddressResolverOptions();
this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions());
this.deploymentManager = new DeploymentManager(this);
this.tracer = initializeTracer(options);
if (options.isClustered()) {
this.clusterManager = getClusterManager(options);
this.eventBus = new ClusteredEventBus(this, options, clusterManager);
@@ -394,7 +397,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
@Override
public EventLoopContext createEventLoopContext(Deployment deployment, WorkerPool workerPool, ClassLoader tccl) {
return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deployment, tccl);
return new EventLoopContext(this, tracer, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deployment, tccl);
}
@Override
@@ -402,7 +405,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
if (workerPool == null) {
workerPool = this.workerPool;
}
return new WorkerContext(this, internalBlockingPool, workerPool, deployment, tccl);
return new WorkerContext(this, tracer, internalBlockingPool, workerPool, deployment, tccl);
}
@Override
@@ -448,6 +451,24 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
return null;
}
private VertxTracer initializeTracer(VertxOptions options) {
if (options.getTracingOptions() != null && options.getTracingOptions().isEnabled()) {
VertxTracerFactory factory = options.getTracingOptions().getFactory();
if (factory == null) {
factory = ServiceHelper.loadFactoryOrNull(VertxTracerFactory.class);
if (factory == null) {
log.warn("Metrics has been set to enabled but no TracerFactory found on classpath");
}
}
if (factory != null) {
VertxTracer tracer = factory.tracer(options.getTracingOptions());
Objects.requireNonNull(tracer, "The tracer instance created from " + factory + " cannot be null");
return tracer;
}
}
return null;
}
private ClusterManager getClusterManager(VertxOptions options) {
ClusterManager mgr = options.getClusterManager();
if (mgr == null) {
@@ -839,6 +860,9 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
if (metrics != null) {
metrics.close();
}
if (tracer != null) {
tracer.close();
}
checker.close();

View File

@@ -16,7 +16,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import io.vertx.core.*;
import io.vertx.core.http.impl.HttpServerImpl;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.impl.NetServerImpl;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.net.impl.transport.Transport;
@@ -128,4 +127,5 @@ public interface VertxInternal extends Vertx {
void addCloseHook(Closeable hook);
void removeCloseHook(Closeable hook);
}

View File

@@ -29,7 +29,7 @@ public final class VertxThread extends FastThreadLocalThread {
private long execStart;
private ContextInternal context;
VertxThread(Runnable target, String name, boolean worker, long maxExecTime, TimeUnit maxExecTimeUnit) {
public VertxThread(Runnable target, String name, boolean worker, long maxExecTime, TimeUnit maxExecTimeUnit) {
super(target, name);
this.worker = worker;
this.maxExecTime = maxExecTime;

View File

@@ -13,15 +13,16 @@ package io.vertx.core.impl;
import io.vertx.core.Handler;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.tracing.VertxTracer;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
class WorkerContext extends ContextImpl {
WorkerContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment,
ClassLoader tccl) {
super(vertx, internalBlockingPool, workerPool, deployment, tccl);
WorkerContext(VertxInternal vertx, VertxTracer<?, ?> tracer, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment,
ClassLoader tccl) {
super(vertx, tracer, internalBlockingPool, workerPool, deployment, tccl);
}
@Override
@@ -38,6 +39,10 @@ class WorkerContext extends ContextImpl {
// so we need to execute it on the worker thread
@Override
<T> void execute(T value, Handler<T> task) {
execute(this, value ,task);
}
private <T> void execute(ContextInternal ctx, T value, Handler<T> task) {
PoolMetrics metrics = workerPool.metrics();
Object metric = metrics != null ? metrics.submitted() : null;
orderedTasks.execute(() -> {
@@ -45,7 +50,7 @@ class WorkerContext extends ContextImpl {
metrics.begin(metric);
}
try {
dispatch(value, task);
ctx.dispatch(value, task);
} finally {
if (metrics != null) {
metrics.end(metric, true);
@@ -71,4 +76,34 @@ class WorkerContext extends ContextImpl {
}
}, workerPool.executor());
}
public ContextInternal duplicate(ContextInternal in) {
return new Duplicated(this, in);
}
static class Duplicated extends ContextImpl.Duplicated<WorkerContext> {
Duplicated(WorkerContext delegate, ContextInternal other) {
super(delegate, other);
}
void executeAsync(Handler<Void> task) {
execute(null, task);
}
@Override
<T> void execute(T value, Handler<T> task) {
delegate.execute(this, value, task);
}
@Override
public boolean isEventLoopContext() {
return false;
}
@Override
public ContextInternal duplicate(ContextInternal context) {
return new Duplicated(delegate, context);
}
}
}

View File

@@ -56,7 +56,7 @@ class WorkerExecutorImpl implements MetricsProvider, WorkerExecutorInternal {
throw new IllegalStateException("Worker executor closed");
}
ContextImpl context = (ContextImpl) ctx.owner().getOrCreateContext();
context.executeBlocking(blockingCodeHandler, asyncResultHandler, pool, ordered ? context.orderedTasks : null);
ContextImpl.executeBlocking(context, blockingCodeHandler, asyncResultHandler, pool, ordered ? context.orderedTasks : null);
}
@Override

View File

@@ -0,0 +1,57 @@
/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.spi;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingOptions;
/**
* A factory for the plug-able tracing SPI.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public interface VertxTracerFactory {
/**
* Create a new {@link VertxTracer} object.<p/>
*
* No specific thread and context can be expected when this method is called.
*
* @param options the metrics configuration option
* @return the tracing implementation
*/
VertxTracer tracer(TracingOptions options);
/**
* Create an empty tracing options.
* Providers can override this method to provide a custom tracing options subclass that exposes custom configuration.
* It is used by the {@link io.vertx.core.Launcher} class when creating new options when building a CLI Vert.x.
*
* @return new tracing options
*/
default TracingOptions newOptions() {
return new TracingOptions();
}
/**
* Create tracing options from the provided {@code jsonObject}.
* Providers can override this method to provide a custom tracing options subclass that exposes custom configuration.
* It is used by the {@link io.vertx.core.Launcher} class when creating new options when building a CLI Vert.x.
*
* @param jsonObject json provided by the user
* @return new tracing options
*/
default TracingOptions newOptions(JsonObject jsonObject) {
return new TracingOptions(jsonObject);
}
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.spi.tracing;
import io.vertx.core.Context;
import java.util.Map;
import java.util.function.BiConsumer;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public interface VertxTracer<I, O> {
/**
* Signal a request has been received and will be processed.
*
* @param context the context data attached to the request
* @param request the request object
* @param operation the request operation
* @param headers a read-only view of the request headers
* @param tags the request tags
* @return the request trace
*/
default I receiveRequest(Context context, Object request, String operation, Iterable<Map.Entry<String, String>> headers, Iterable<Map.Entry<String, String>> tags) {
return null;
}
/**
* Signal the response is sent.
*
* @param context the context data attached to the request
* @param response the response sent
* @param payload the payload returned by {@link #receiveRequest}
* @param failure the failure when not {@code null}
* @param tags the response tags
*/
default void sendResponse(Context context, Object response, I payload, Throwable failure, Iterable<Map.Entry<String, String>> tags) {
}
/**
* Signal a request is sent.
*
* @param context the context data attached to the request
* @param request the request object
* @param operation the request operation
* @param headers a write only-view of the request headers
* @param tags the request tags
* @return the request trace
*/
default O sendRequest(Context context, Object request, String operation, BiConsumer<String, String> headers, Iterable<Map.Entry<String, String>> tags) {
return null;
}
/**
* Signal a response has been received.
*
* @param context the context data attached to the request
* @param response the response sent
* @param payload the payload returned by {@link #sendRequest}
* @param failure the failure when not {@code null}
* @param tags the response tags
*/
default void receiveResponse(Context context, Object response, O payload, Throwable failure, Iterable<Map.Entry<String, String>> tags) {
}
/**
* Close the tracer.
*/
default void close() {
}
}

View File

@@ -0,0 +1,129 @@
/*
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.tracing;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.VertxTracerFactory;
/**
* Vert.x tracing base configuration, this class can be extended by provider implementations to configure
* those specific implementations.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
@DataObject(generateConverter = true, publicConverter = false)
public class TracingOptions {
/**
* The default value of tracing enabled false
*/
public static final boolean DEFAULT_TRACING_ENABLED = false;
private boolean enabled;
private JsonObject json; // Keep a copy of the original json, so we don't lose info when building options subclasses
private VertxTracerFactory factory;
/**
* Default constructor
*/
public TracingOptions() {
enabled = DEFAULT_TRACING_ENABLED;
}
/**
* Copy constructor
*
* @param other The other {@link TracingOptions} to copy when creating this
*/
public TracingOptions(TracingOptions other) {
enabled = other.isEnabled();
factory = other.factory;
}
/**
* Create an instance from a {@link JsonObject}
*
* @param json the JsonObject to create it from
*/
public TracingOptions(JsonObject json) {
this();
TracingOptionsConverter.fromJson(json, this);
this.json = json.copy();
}
/**
* Will tracing be enabled on the Vert.x instance?
*
* @return true if enabled, false if not.
*/
public boolean isEnabled() {
return enabled;
}
/**
* Set whether tracing will be enabled on the Vert.x instance.
*
* @param enable true if tracing enabled, or false if not.
* @return a reference to this, so the API can be used fluently
*/
public TracingOptions setEnabled(boolean enable) {
this.enabled = enable;
return this;
}
/**
* Get the tracer factory to be used when tracing are enabled.
* <p>
* If the tracer factory has been programmatically set here, then that will be used when tracing are enabled
* for creating the {@link io.vertx.core.spi.tracing.VertxTracer} instance.
* <p>
* Otherwise Vert.x attempts to locate a tracer factory implementation on the classpath.
*
* @return the tracer factory
*/
public VertxTracerFactory getFactory() {
return factory;
}
/**
* Programmatically set the tracer factory to be used when tracing are enabled.
* <p>
* Only valid if {@link TracingOptions#isEnabled} = true.
* <p>
* Normally Vert.x will look on the classpath for a tracer factory implementation, but if you want to set one
* programmatically you can use this method.
*
* @param factory the tracer factory
* @return a reference to this, so the API can be used fluently
*/
public TracingOptions setFactory(VertxTracerFactory factory) {
this.factory = factory;
return this;
}
public TracingOptions copy() {
return new TracingOptions(this);
}
public JsonObject toJson() {
return json != null ? json.copy() : new JsonObject();
}
@Override
public String toString() {
return "TracingOptions{" +
"enabled=" + enabled +
", json=" + json +
'}';
}
}

View File

@@ -222,7 +222,7 @@ public class HttpServerHandlerBenchmark extends BenchmarkBase {
ContextInternal context = new EventLoopContext(vertx, vertxChannel.eventLoop(), null, null, null, Thread.currentThread().getContextClassLoader());
ContextInternal context = new EventLoopContext(vertx, null, vertxChannel.eventLoop(), null, null, null, Thread.currentThread().getContextClassLoader());
Handler<HttpServerRequest> app = request -> {
HttpServerResponse response = request.response();
MultiMap headers = response.headers();

View File

@@ -14,6 +14,8 @@ package io.vertx.core.impl;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.concurrent.ConcurrentMap;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
@@ -25,7 +27,7 @@ public class BenchmarkContext extends ContextImpl {
}
public BenchmarkContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) {
super(vertx, internalBlockingPool, workerPool, deployment, tccl);
super(vertx, null, internalBlockingPool, workerPool, deployment, tccl);
}
@Override
@@ -47,4 +49,9 @@ public class BenchmarkContext extends ContextImpl {
public boolean isEventLoopContext() {
return false;
}
@Override
public ContextInternal duplicate(ContextInternal in) {
throw new UnsupportedOperationException();
}
}

View File

@@ -14,6 +14,7 @@ package io.vertx.core;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.TaskQueue;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.VertxThread;
import io.vertx.core.impl.WorkerPool;
import io.vertx.test.core.VertxTestBase;
import org.junit.Test;
@@ -28,6 +29,24 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ContextTest extends VertxTestBase {
private ExecutorService workerExecutor;
private ContextInternal createWorkerContext() {
return ((VertxInternal) vertx).createWorkerContext(null, new WorkerPool(workerExecutor, null), Thread.currentThread().getContextClassLoader());
}
@Override
public void setUp() throws Exception {
workerExecutor = Executors.newSingleThreadExecutor(r -> new VertxThread(r, "vert.x-worker-thread", true, 10, TimeUnit.SECONDS));
super.setUp();
}
@Override
protected void tearDown() throws Exception {
workerExecutor.shutdown();
super.tearDown();
}
@Test
public void testRunOnContext() throws Exception {
vertx.runOnContext(v -> {
@@ -189,22 +208,13 @@ public class ContextTest extends VertxTestBase {
}
@Test
public void testWorkerExecuteFromIo() throws Exception {
AtomicReference<ContextInternal> workerContext = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
workerContext.set((ContextInternal) context);
latch.countDown();
}
}, new DeploymentOptions().setWorker(true));
awaitLatch(latch);
workerContext.get().nettyEventLoop().execute(() -> {
public void testWorkerExecuteFromIo() {
ContextInternal workerContext = createWorkerContext();
workerContext.nettyEventLoop().execute(() -> {
assertNull(Vertx.currentContext());
workerContext.get().nettyEventLoop().execute(() -> {
workerContext.get().executeFromIO(v -> {
assertSame(workerContext.get(), Vertx.currentContext());
workerContext.nettyEventLoop().execute(() -> {
workerContext.executeFromIO(v -> {
assertSame(workerContext, Vertx.currentContext());
assertTrue(Context.isOnWorkerThread());
testComplete();
});
@@ -409,8 +419,7 @@ public class ContextTest extends VertxTestBase {
@Test
public void testExecuteFromIOWorkerFromNonVertxThread() {
assertEquals("true", System.getProperty("vertx.threadChecks"));
ExecutorService a = Executors.newSingleThreadExecutor();
ContextInternal ctx = ((VertxInternal) vertx).createWorkerContext(null, new WorkerPool(a, null), Thread.currentThread().getContextClassLoader());
ContextInternal ctx = createWorkerContext();
AtomicBoolean called = new AtomicBoolean();
try {
ctx.executeFromIO(v -> {
@@ -432,4 +441,100 @@ public class ContextTest extends VertxTestBase {
ctx.reportException(expected);
assertSame(expected, err.get());
}
@Test
public void testDuplicate() throws Exception {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
ContextInternal duplicate = ctx.duplicate();
checkDuplicate(ctx, duplicate);
}
@Test
public void testDuplicateWorker() throws Exception {
ContextInternal ctx = createWorkerContext();
ContextInternal duplicate = ctx.duplicate();
checkDuplicate(ctx, duplicate);
}
@Test
public void testDuplicateTwice() throws Exception {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
ContextInternal duplicated = ctx.duplicate().duplicate();
checkDuplicate(ctx, duplicated);
}
@Test
public void testDuplicateWith() throws Exception {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
ContextInternal other = (ContextInternal) vertx.getOrCreateContext();
ContextInternal duplicated = ctx.duplicate(other);
checkDuplicate(ctx, duplicated);
checkDuplicateWith(other, duplicated);
}
private void checkDuplicate(ContextInternal ctx, ContextInternal duplicated) throws Exception {
assertSame(ctx.nettyEventLoop(), duplicated.nettyEventLoop());
assertSame(ctx.getDeployment(), duplicated.getDeployment());
assertSame(ctx.classLoader(), duplicated.classLoader());
assertSame(ctx.owner(), duplicated.owner());
Object shared = new Object();
Object local = new Object();
ctx.put("key", shared);
ctx.putLocal("key", local);
assertSame(shared, duplicated.get("key"));
assertNull(duplicated.getLocal("key"));
assertTrue(duplicated.remove("key"));
assertNull(ctx.get("key"));
CountDownLatch latch1 = new CountDownLatch(1);
duplicated.runOnContext(v -> {
assertSame(Vertx.currentContext(), duplicated);
latch1.countDown();
});
awaitLatch(latch1);
CountDownLatch latch2 = new CountDownLatch(1);
Throwable failure = new Throwable();
ctx.exceptionHandler(err -> {
assertSame(failure, err);
latch2.countDown();
});
duplicated.reportException(failure);
awaitLatch(latch2);
CountDownLatch latch3 = new CountDownLatch(1);
duplicated.runOnContext(v -> {
vertx.setTimer(10, id -> {
assertSame(duplicated, Vertx.currentContext());
latch3.countDown();
});
});
awaitLatch(latch3);
CountDownLatch latch4 = new CountDownLatch(1);
duplicated.runOnContext(v -> {
vertx.executeBlocking(Future::complete, res -> {
assertSame(duplicated, Vertx.currentContext());
latch4.countDown();
});
});
awaitLatch(latch4);
}
@Test
public void testDuplicateWithTwice() throws Exception {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
ContextInternal other = (ContextInternal) vertx.getOrCreateContext();
ContextInternal duplicated = ctx.duplicate().duplicate(other);
checkDuplicate(ctx, duplicated);
checkDuplicateWith(other, duplicated);
}
private void checkDuplicateWith(ContextInternal ctx, ContextInternal duplicated) {
Object val = new Object();
ctx.putLocal("key", val);
assertSame(val, duplicated.getLocal("key"));
duplicated.removeLocal("key");
assertNull(ctx.getLocal("key"));
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.spi.tracing;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.faketracer.FakeTracer;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
public class TracerTest extends VertxTestBase {
private FakeTracer tracer = new FakeTracer();
@Override
protected VertxTracer getTracer() {
return tracer;
}
@Test
public void testClose() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
assertEquals(0, tracer.closeCount());
vertx.close(ar -> latch.countDown());
awaitLatch(latch);
assertEquals(1, tracer.closeCount());
}
}

View File

@@ -26,11 +26,12 @@ import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.TCPSSLOptions;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingOptions;
import io.vertx.test.fakecluster.FakeClusterManager;
import org.junit.Rule;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
@@ -74,9 +75,17 @@ public class VertxTestBase extends AsyncTestBase {
}
}
protected VertxTracer getTracer() {
return null;
}
protected VertxOptions getOptions() {
VertxOptions options = new VertxOptions();
options.setPreferNativeTransport(USE_NATIVE_TRANSPORT);
VertxTracer tracer = getTracer();
if (tracer != null) {
options.setTracingOptions(new TracingOptions().setEnabled(true).setFactory(opts -> tracer));
}
return options;
}
@@ -243,7 +252,7 @@ public class VertxTestBase extends AsyncTestBase {
for (StackTraceElement elt : Thread.currentThread().getStackTrace()) {
String className = elt.getClassName();
String methodName = elt.getMethodName();
if (className.equals("io.vertx.core.impl.ContextImpl") && methodName.equals("dispatch")) {
if (className.equals("io.vertx.core.impl.AbstractContext") && methodName.equals("dispatch")) {
return;
}
}

View File

@@ -0,0 +1,164 @@
/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.test.faketracer;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
/**
* @author Pavol Loffay
*/
public class FakeTracer implements VertxTracer<Span, Span> {
private static final String ACTIVE_SCOPE_KEY = "active.scope";
private AtomicInteger idGenerator = new AtomicInteger(0);
List<Span> finishedSpans = new CopyOnWriteArrayList<>();
private AtomicInteger closeCount = new AtomicInteger();
int nextId() {
return idGenerator.getAndIncrement();
}
public Span newTrace(String operation) {
return new Span(this, nextId(), nextId(), nextId(), operation);
}
public Span newTrace() {
return new Span(this, nextId(), nextId(), nextId(), null);
}
public Span activeSpan() {
return activeSpan(Vertx.currentContext());
}
public Span activeSpan(Context data) {
Scope scope = data.getLocal(ACTIVE_SCOPE_KEY);
return scope != null ? scope.wrapped : null;
}
public Scope activate(Span span) {
return activate(Vertx.currentContext(), span);
}
public Scope activate(Context context, Span span) {
Scope toRestore = context.getLocal(ACTIVE_SCOPE_KEY);
Scope active = new Scope(this, span, toRestore);
context.putLocal(ACTIVE_SCOPE_KEY, active);
return active;
}
public void encode(Span span, BiConsumer<String, String> headers) {
headers.accept("span-trace-id", "" + span.traceId);
headers.accept("span-parent-id", "" + span.parentId);
headers.accept("span-id", "" + span.id);
}
public Span decode(String operation, Iterable<Map.Entry<String, String>> headers) {
String traceId = null;
String spanId = null;
String spanParentId = null;
for (Map.Entry<String, String> header : headers) {
switch (header.getKey()) {
case "span-trace-id":
traceId = header.getValue();
break;
case "span-id":
spanId = header.getValue();
break;
case "span-parent-id":
spanParentId = header.getValue();
break;
}
}
if (traceId != null && spanId != null && spanParentId != null) {
return new Span(this, Integer.parseInt(traceId), Integer.parseInt(spanParentId),
Integer.parseInt(spanId), operation);
}
return null;
}
private Span getServerSpan(String operation, Iterable<Map.Entry<String, String>> headers) {
Span parent = decode(operation, headers);
if (parent != null) {
return parent.createChild(operation);
} else {
return newTrace(operation);
}
}
@Override
public Span receiveRequest(Context context, Object inbound, String operation, Iterable<Map.Entry<String, String>> headers, Iterable<Map.Entry<String, String>> tags) {
Span serverSpan = getServerSpan(operation, headers);
serverSpan.addTag("span_kind", "server");
addTags(serverSpan, tags);
// Create scope
return activate(context, serverSpan).span();
}
@Override
public void sendResponse(Context context, Object response, Span span, Throwable failure, Iterable<Map.Entry<String, String>> tags) {
if (span != null) {
addTags(span, tags);
span.finish();
}
}
@Override
public Span sendRequest(Context context, Object outbound, String operation, BiConsumer<String, String> headers, Iterable<Map.Entry<String, String>> tags) {
Span span = activeSpan(context);
if (span == null) {
span = newTrace(operation);
} else {
span = span.createChild(operation);
}
span.addTag("span_kind", "client");
addTags(span, tags);
encode(span, headers);
return span;
}
@Override
public void receiveResponse(Context context, Object response, Span span, Throwable failure, Iterable<Map.Entry<String, String>> tags) {
if (span != null) {
addTags(span, tags);
span.finish();
}
}
private void addTags(Span span, Iterable<Map.Entry<String, String>> tags) {
for (Map.Entry<String, String> tag : tags) {
span.addTag(tag.getKey(), tag.getValue());
}
}
public List<Span> getFinishedSpans() {
return Collections.unmodifiableList(finishedSpans);
}
@Override
public void close() {
closeCount.incrementAndGet();
}
public int closeCount() {
return closeCount.get();
}
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.test.faketracer;
/**
* @author Pavol Loffay
*/
public class Scope {
Span wrapped;
private Scope toRestore;
private FakeTracer tracer;
public Scope(FakeTracer tracer, Span span, Scope toRestore) {
this.tracer = tracer;
this.wrapped = span;
this.toRestore = toRestore;
}
public Span span() {
return wrapped;
}
public void close() {
// tracer.currentSpan.set(toRestore);
}
}

View File

@@ -0,0 +1,74 @@
/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.test.faketracer;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Pavol Loffay
*/
public class Span {
public final int traceId;
public final int parentId;
public final int id;
public final String operation;
final FakeTracer tracer;
private AtomicBoolean finished = new AtomicBoolean();
private final Map<String, String> tags = new ConcurrentHashMap<>();
Span(FakeTracer tracer, int traceId, int parentId, int id, String operation) {
this.tracer = tracer;
this.traceId = traceId;
this.parentId = parentId;
this.id = id;
this.operation = operation;
}
public Map<String, String> getTags() {
return Collections.unmodifiableMap(tags);
}
public Span createChild() {
return createChild(null);
}
public Span createChild(String operation) {
return new Span(tracer, traceId, id, tracer.nextId(), operation);
}
public void addTag(String key, String value) {
if (value != null) {
tags.put(key, value);
}
}
public void finish() {
if (finished.getAndSet(true)) {
throw new IllegalStateException("Finishing already finished span!");
}
tracer.finishedSpans.add(this);
}
@Override
public boolean equals(Object obj) {
Span span = (Span) obj;
return span.traceId == traceId && span.parentId == parentId && span.id == id;
}
@Override
public String toString() {
return "Span[traceId=" + traceId + ",parentId=" + parentId + ",id=" + id + "]";
}
}