From f1fd42688347fac61fb39292b175cccfc0e4ec64 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Mon, 18 Nov 2019 15:10:40 +1100 Subject: [PATCH 1/2] Add the notion of a current vert.x request This allows the Vert.x routing context to be injected --- .../runtime/standalone/VertxHttpRequest.java | 80 +++++++++++++------ .../standalone/VertxRequestHandler.java | 50 ++++++++---- .../runtime/UndertowDeploymentRecorder.java | 11 +++ .../http/deployment/VertxHttpProcessor.java | 7 +- .../http/runtime/CurrentVertxRequest.java | 80 +++++++++++++++++++ 5 files changed, 186 insertions(+), 42 deletions(-) create mode 100644 extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java diff --git a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java index eee6889ec..1560a717b 100644 --- a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java +++ b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java @@ -5,7 +5,6 @@ import java.io.InputStream; import java.util.Collections; import java.util.Date; import java.util.Enumeration; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -26,7 +25,9 @@ import org.jboss.resteasy.spi.NotImplementedYetException; import org.jboss.resteasy.spi.ResteasyAsynchronousContext; import org.jboss.resteasy.spi.ResteasyAsynchronousResponse; +import io.quarkus.arc.ManagedContext; import io.vertx.core.Context; +import io.vertx.ext.web.RoutingContext; /** * Abstraction for an inbound http request on the server, or a response from a server to a client @@ -43,18 +44,21 @@ public final class VertxHttpRequest extends BaseHttpRequest { private String httpMethod; private LazyHostSupplier remoteHost; private InputStream inputStream; - private Map attributes; private VertxHttpResponse response; private VertxExecutionContext executionContext; + private final RoutingContext routingContext; private final Context context; + private final ManagedContext requestContext; + private final ManagedContext.ContextState requestContextState; public VertxHttpRequest(Context context, + RoutingContext routingContext, ResteasyHttpHeaders httpHeaders, ResteasyUriInfo uri, String httpMethod, LazyHostSupplier remoteHost, SynchronousDispatcher dispatcher, - VertxHttpResponse response) { + VertxHttpResponse response, ManagedContext requestContext) { super(uri); this.context = context; this.response = response; @@ -62,6 +66,9 @@ public final class VertxHttpRequest extends BaseHttpRequest { this.httpMethod = httpMethod; this.remoteHost = remoteHost; this.executionContext = new VertxExecutionContext(this, response, dispatcher); + this.requestContext = requestContext; + this.requestContextState = requestContext.getState(); + this.routingContext = routingContext; } @Override @@ -76,7 +83,7 @@ public final class VertxHttpRequest extends BaseHttpRequest { @Override public Enumeration getAttributeNames() { - final Map attributes = this.attributes; + final Map attributes = routingContext.data(); if (attributes == null) { return Collections.emptyEnumeration(); } else { @@ -104,22 +111,17 @@ public final class VertxHttpRequest extends BaseHttpRequest { @Override public Object getAttribute(String attribute) { - return attributes != null ? attributes.get(attribute) : null; + return routingContext.get(attribute); } @Override public void setAttribute(String name, Object value) { - if (attributes == null) { - attributes = new HashMap(); - } - attributes.put(name, value); + routingContext.put(name, value); } @Override public void removeAttribute(String name) { - if (attributes != null) { - attributes.remove(name); - } + routingContext.remove(name); } @Override @@ -241,11 +243,12 @@ public final class VertxHttpRequest extends BaseHttpRequest { @Override public void complete() { synchronized (responseLock) { - if (done) - return; - if (cancelled) + if (done || cancelled) { return; + } done = true; + requestContext.activate(requestContextState); + requestContext.terminate(); vertxFlush(); } } @@ -258,7 +261,12 @@ public final class VertxHttpRequest extends BaseHttpRequest { if (cancelled) return false; done = true; - return internalResume(entity, t -> vertxFlush()); + requestContext.activate(requestContextState); + try { + return internalResume(entity, t -> vertxFlush()); + } finally { + requestContext.terminate(); + } } } @@ -270,7 +278,12 @@ public final class VertxHttpRequest extends BaseHttpRequest { if (cancelled) return false; done = true; - return internalResume(ex, t -> vertxFlush()); + requestContext.activate(requestContextState); + try { + return internalResume(ex, t -> vertxFlush()); + } finally { + requestContext.terminate(); + } } } @@ -285,7 +298,12 @@ public final class VertxHttpRequest extends BaseHttpRequest { } done = true; cancelled = true; - return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush()); + requestContext.activate(requestContextState); + try { + return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush()); + } finally { + requestContext.terminate(); + } } } @@ -298,10 +316,15 @@ public final class VertxHttpRequest extends BaseHttpRequest { return false; done = true; cancelled = true; - return internalResume( - Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter) - .build(), - t -> vertxFlush()); + requestContext.activate(requestContextState); + try { + return internalResume( + Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter) + .build(), + t -> vertxFlush()); + } finally { + requestContext.terminate(); + } } } @@ -322,10 +345,15 @@ public final class VertxHttpRequest extends BaseHttpRequest { return false; done = true; cancelled = true; - return internalResume( - Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter) - .build(), - t -> vertxFlush()); + requestContext.activate(requestContextState); + try { + return internalResume( + Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter) + .build(), + t -> vertxFlush()); + } finally { + requestContext.terminate(); + } } } diff --git a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java index 154150ba9..d57c8a456 100644 --- a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java +++ b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java @@ -18,6 +18,7 @@ import org.jboss.resteasy.spi.ResteasyDeployment; import io.quarkus.arc.ManagedContext; import io.quarkus.arc.runtime.BeanContainer; import io.quarkus.security.identity.CurrentIdentityAssociation; +import io.quarkus.vertx.http.runtime.CurrentVertxRequest; import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser; import io.vertx.core.Context; import io.vertx.core.Handler; @@ -38,6 +39,7 @@ public class VertxRequestHandler implements Handler { protected final BufferAllocator allocator; protected final BeanContainer beanContainer; protected final CurrentIdentityAssociation association; + protected final CurrentVertxRequest currentVertxRequest; public VertxRequestHandler(Vertx vertx, BeanContainer beanContainer, @@ -52,6 +54,7 @@ public class VertxRequestHandler implements Handler { this.allocator = allocator; Instance association = CDI.current().select(CurrentIdentityAssociation.class); this.association = association.isResolvable() ? association.get() : null; + currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get(); } @Override @@ -67,26 +70,22 @@ public class VertxRequestHandler implements Handler { } vertx.executeBlocking(event -> { - dispatchRequestContext(request, is, new VertxBlockingOutput(request.request())); + dispatch(request, is, new VertxBlockingOutput(request.request())); }, false, event -> { + if (event.failed()) { + request.fail(event.cause()); + } }); } - private void dispatchRequestContext(RoutingContext request, InputStream is, VertxOutput output) { + private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput output) { ManagedContext requestContext = beanContainer.requestContext(); requestContext.activate(); - QuarkusHttpUser user = (QuarkusHttpUser) request.user(); + QuarkusHttpUser user = (QuarkusHttpUser) routingContext.user(); if (user != null && association != null) { association.setIdentity(user.getSecurityIdentity()); } - try { - dispatch(request, is, output); - } finally { - requestContext.terminate(); - } - } - - private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput output) { + currentVertxRequest.setCurrent(routingContext); try { Context ctx = vertx.getOrCreateContext(); HttpServerRequest request = routingContext.request(); @@ -99,8 +98,9 @@ public class VertxRequestHandler implements Handler { // using a supplier to make the remote Address resolution lazy: often it's not needed and it's not very cheap to create. LazyHostSupplier hostSupplier = new LazyHostSupplier(request); - VertxHttpRequest vertxRequest = new VertxHttpRequest(ctx, headers, uriInfo, request.rawMethod(), hostSupplier, - dispatcher.getDispatcher(), vertxResponse); + VertxHttpRequest vertxRequest = new VertxHttpRequest(ctx, routingContext, headers, uriInfo, request.rawMethod(), + hostSupplier, + dispatcher.getDispatcher(), vertxResponse, requestContext); vertxRequest.setInputStream(is); try { ResteasyContext.pushContext(SecurityContext.class, new QuarkusResteasySecurityContext(request)); @@ -115,15 +115,35 @@ public class VertxRequestHandler implements Handler { routingContext.fail(ex); } - if (!vertxRequest.getAsyncContext().isSuspended()) { + boolean suspended = vertxRequest.getAsyncContext().isSuspended(); + boolean requestContextActive = requestContext.isActive(); + if (requestContextActive) { + //it is possible that there was an async response, that then finished in the same thread + //the async response will have terminated the request context in this case + currentVertxRequest.initialInvocationComplete(suspended); + } + if (!suspended) { try { vertxResponse.finish(); } catch (IOException e) { log.error("Unexpected failure", e); + } finally { + if (requestContextActive) { + requestContext.terminate(); + } } + } else { + //we need the request context to stick around + requestContext.deactivate(); } } catch (Throwable t) { - routingContext.fail(t); + try { + routingContext.fail(t); + } finally { + if (requestContext.isActive()) { + requestContext.terminate(); + } + } } } } diff --git a/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java b/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java index 594b8b118..1537a5b69 100644 --- a/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java +++ b/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java @@ -37,6 +37,7 @@ import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; import io.quarkus.runtime.configuration.MemorySize; +import io.quarkus.vertx.http.runtime.CurrentVertxRequest; import io.quarkus.vertx.http.runtime.HttpConfiguration; import io.undertow.httpcore.BufferAllocator; import io.undertow.httpcore.StatusCodes; @@ -448,6 +449,7 @@ public class UndertowDeploymentRecorder { return new ServletExtension() { @Override public void handleDeployment(DeploymentInfo deploymentInfo, ServletContext servletContext) { + CurrentVertxRequest currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get(); deploymentInfo.addThreadSetupAction(new ThreadSetupHandler() { @Override public ThreadSetupHandler.Action create(Action action) { @@ -458,6 +460,7 @@ public class UndertowDeploymentRecorder { if (exchange == null) { return action.call(exchange, context); } + boolean vertxFirst = false; ManagedContext requestContext = beanContainer.requestContext(); if (requestContext.isActive()) { return action.call(exchange, context); @@ -466,11 +469,19 @@ public class UndertowDeploymentRecorder { .getAttachment(REQUEST_CONTEXT); try { requestContext.activate(existingRequestContext); + vertxFirst = existingRequestContext == null; + + VertxHttpExchange delegate = (VertxHttpExchange) exchange.getDelegate(); + RoutingContext rc = (RoutingContext) delegate.getContext(); + currentVertxRequest.setCurrent(rc); return action.call(exchange, context); } finally { ServletRequestContext src = exchange .getAttachment(ServletRequestContext.ATTACHMENT_KEY); HttpServletRequestImpl req = src.getOriginalRequest(); + if (vertxFirst) { + currentVertxRequest.initialInvocationComplete(req.isAsyncStarted()); + } if (req.isAsyncStarted()) { exchange.putAttachment(REQUEST_CONTEXT, requestContext.getState()); requestContext.deactivate(); diff --git a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/VertxHttpProcessor.java b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/VertxHttpProcessor.java index 3171a927e..10529a8c8 100644 --- a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/VertxHttpProcessor.java +++ b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/VertxHttpProcessor.java @@ -27,6 +27,7 @@ import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.RuntimeValue; import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem; import io.quarkus.vertx.core.deployment.InternalWebVertxBuildItem; +import io.quarkus.vertx.http.runtime.CurrentVertxRequest; import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig; import io.quarkus.vertx.http.runtime.HttpConfiguration; import io.quarkus.vertx.http.runtime.RouterProducer; @@ -51,7 +52,11 @@ class VertxHttpProcessor { @BuildStep AdditionalBeanBuildItem additionalBeans() { - return AdditionalBeanBuildItem.unremovableOf(RouterProducer.class); + return AdditionalBeanBuildItem.builder() + .setUnremovable() + .addBeanClass(RouterProducer.class) + .addBeanClass(CurrentVertxRequest.class) + .build(); } /** diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java new file mode 100644 index 000000000..e7dd3617c --- /dev/null +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java @@ -0,0 +1,80 @@ +package io.quarkus.vertx.http.runtime; + +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.PreDestroy; +import javax.enterprise.context.RequestScoped; +import javax.enterprise.inject.Produces; + +import org.jboss.logging.Logger; + +import io.vertx.ext.web.RoutingContext; + +@RequestScoped +public class CurrentVertxRequest { + + private final Logger log = Logger.getLogger(CurrentVertxRequest.class); + + public RoutingContext current; + private List doneListeners; + + @Produces + @RequestScoped + public RoutingContext getCurrent() { + return current; + } + + public CurrentVertxRequest setCurrent(RoutingContext current) { + this.current = current; + return this; + } + + public void addRequestDoneListener(Listener doneListener) { + if (doneListeners == null) { + doneListeners = new ArrayList<>(); + } + doneListeners.add(doneListener); + } + + public void initialInvocationComplete(boolean goingAsync) { + + if (current == null) { + return; + } + if (doneListeners != null) { + for (Listener i : doneListeners) { + try { + i.initialInvocationComplete(current, goingAsync); + } catch (Throwable t) { + log.errorf(t, "Failed to process invocation listener %s", i); + } + } + } + } + + @PreDestroy + void done() { + if (current == null) { + return; + } + if (doneListeners != null) { + for (Listener i : doneListeners) { + try { + i.responseComplete(current); + } catch (Throwable t) { + log.errorf(t, "Failed to process done listener %s", i); + } + } + } + } + + public interface Listener { + + void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync); + + void responseComplete(RoutingContext routingContext); + + } + +} From 507ee79be7c1c4774ed39e61b07f0c2dc0b237d5 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Mon, 18 Nov 2019 15:11:10 +1100 Subject: [PATCH 2/2] Remove Undertow dependency from Opentracing Fixes #5419 --- .../smallrye-opentracing/deployment/pom.xml | 4 - .../SmallRyeOpenTracingProcessor.java | 29 ++++--- .../deployment/TracerRegistrar.java | 13 ++- .../opentracing/deployment/TracingTest.java | 31 +++++--- .../smallrye-opentracing/runtime/pom.xml | 2 +- ...eTracingStandaloneVertxDynamicFeature.java | 79 +++++++++++++++++++ 6 files changed, 127 insertions(+), 31 deletions(-) create mode 100644 extensions/smallrye-opentracing/runtime/src/main/java/io/quarkus/smallrye/opentracing/runtime/QuarkusSmallRyeTracingStandaloneVertxDynamicFeature.java diff --git a/extensions/smallrye-opentracing/deployment/pom.xml b/extensions/smallrye-opentracing/deployment/pom.xml index f96c0923a..704196b32 100644 --- a/extensions/smallrye-opentracing/deployment/pom.xml +++ b/extensions/smallrye-opentracing/deployment/pom.xml @@ -22,10 +22,6 @@ io.quarkus quarkus-resteasy-deployment - - io.quarkus - quarkus-undertow-deployment - io.quarkus quarkus-arc-deployment diff --git a/extensions/smallrye-opentracing/deployment/src/main/java/io/quarkus/smallrye/opentracing/deployment/SmallRyeOpenTracingProcessor.java b/extensions/smallrye-opentracing/deployment/src/main/java/io/quarkus/smallrye/opentracing/deployment/SmallRyeOpenTracingProcessor.java index 23ff2fd62..26306b4c2 100644 --- a/extensions/smallrye-opentracing/deployment/src/main/java/io/quarkus/smallrye/opentracing/deployment/SmallRyeOpenTracingProcessor.java +++ b/extensions/smallrye-opentracing/deployment/src/main/java/io/quarkus/smallrye/opentracing/deployment/SmallRyeOpenTracingProcessor.java @@ -8,12 +8,14 @@ import javax.servlet.DispatcherType; import io.opentracing.contrib.interceptors.OpenTracingInterceptor; import io.opentracing.contrib.jaxrs2.server.SpanFinishingFilter; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.deployment.Capabilities; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveMethodBuildItem; import io.quarkus.resteasy.common.spi.ResteasyJaxrsProviderBuildItem; import io.quarkus.smallrye.opentracing.runtime.QuarkusSmallRyeTracingDynamicFeature; +import io.quarkus.smallrye.opentracing.runtime.QuarkusSmallRyeTracingStandaloneVertxDynamicFeature; import io.quarkus.smallrye.opentracing.runtime.TracerProducer; import io.quarkus.undertow.deployment.FilterBuildItem; @@ -33,21 +35,28 @@ public class SmallRyeOpenTracingProcessor { @BuildStep void setupFilter(BuildProducer providers, BuildProducer filterProducer, - BuildProducer feature) { + BuildProducer feature, + Capabilities capabilities) { feature.produce(new FeatureBuildItem(FeatureBuildItem.SMALLRYE_OPENTRACING)); providers.produce(new ResteasyJaxrsProviderBuildItem(QuarkusSmallRyeTracingDynamicFeature.class.getName())); - FilterBuildItem filterInfo = FilterBuildItem.builder("tracingFilter", SpanFinishingFilter.class.getName()) - .setAsyncSupported(true) - .addFilterUrlMapping("*", DispatcherType.FORWARD) - .addFilterUrlMapping("*", DispatcherType.INCLUDE) - .addFilterUrlMapping("*", DispatcherType.REQUEST) - .addFilterUrlMapping("*", DispatcherType.ASYNC) - .addFilterUrlMapping("*", DispatcherType.ERROR) - .build(); - filterProducer.produce(filterInfo); + if (capabilities.isCapabilityPresent(Capabilities.SERVLET)) { + FilterBuildItem filterInfo = FilterBuildItem.builder("tracingFilter", SpanFinishingFilter.class.getName()) + .setAsyncSupported(true) + .addFilterUrlMapping("*", DispatcherType.FORWARD) + .addFilterUrlMapping("*", DispatcherType.INCLUDE) + .addFilterUrlMapping("*", DispatcherType.REQUEST) + .addFilterUrlMapping("*", DispatcherType.ASYNC) + .addFilterUrlMapping("*", DispatcherType.ERROR) + .build(); + filterProducer.produce(filterInfo); + } else { + //otherwise we know we have RESTeasy on vert.x + providers.produce( + new ResteasyJaxrsProviderBuildItem(QuarkusSmallRyeTracingStandaloneVertxDynamicFeature.class.getName())); + } } } diff --git a/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracerRegistrar.java b/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracerRegistrar.java index 3e86ea8b8..2b8705eec 100644 --- a/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracerRegistrar.java +++ b/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracerRegistrar.java @@ -1,16 +1,15 @@ package io.quarkus.smallrye.opentracing.deployment; -import javax.servlet.ServletContextEvent; -import javax.servlet.ServletContextListener; -import javax.servlet.annotation.WebListener; +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; import io.opentracing.util.GlobalTracer; +import io.quarkus.runtime.StartupEvent; -@WebListener -public class TracerRegistrar implements ServletContextListener { +@ApplicationScoped +public class TracerRegistrar { - @Override - public void contextInitialized(ServletContextEvent sce) { + public void start(@Observes StartupEvent start) { GlobalTracer.register(TracingTest.mockTracer); } } diff --git a/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracingTest.java b/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracingTest.java index 9ab1d26d7..1fb556310 100644 --- a/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracingTest.java +++ b/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracingTest.java @@ -1,7 +1,9 @@ package io.quarkus.smallrye.opentracing.deployment; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.awaitility.Awaitility; import org.jboss.shrinkwrap.api.ShrinkWrap; @@ -46,13 +48,15 @@ public class TracingTest { } @Test - public void testSingleServerRequest() { + public void testSingleServerRequest() throws InterruptedException { try { RestAssured.defaultParser = Parser.TEXT; RestAssured.when().get("/hello") .then() .statusCode(200); - Assertions.assertEquals(1, mockTracer.finishedSpans().size()); + //inherently racy, tracer is completed after response is sent back to the client + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> mockTracer.finishedSpans().size() == 1); Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello", mockTracer.finishedSpans().get(0).operationName()); } finally { @@ -61,12 +65,15 @@ public class TracingTest { } @Test - public void testCDI() { + public void testCDI() throws InterruptedException { try { RestAssured.defaultParser = Parser.TEXT; RestAssured.when().get("/cdi") .then() .statusCode(200); + //inherently racy, tracer is completed after response is sent back to the client + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> mockTracer.finishedSpans().size() == 2); Assertions.assertEquals(2, mockTracer.finishedSpans().size()); Assertions.assertEquals("io.quarkus.smallrye.opentracing.deployment.Service.foo", mockTracer.finishedSpans().get(0).operationName()); @@ -78,18 +85,24 @@ public class TracingTest { } @Test - public void testMPRestClient() { + public void testMPRestClient() throws InterruptedException { try { RestAssured.defaultParser = Parser.TEXT; RestAssured.when().get("/restClient") .then() .statusCode(200); + //inherently racy, tracer is completed after response is sent back to the client + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> mockTracer.finishedSpans().size() == 3); Assertions.assertEquals(3, mockTracer.finishedSpans().size()); - Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello", - mockTracer.finishedSpans().get(0).operationName()); - Assertions.assertEquals("GET", mockTracer.finishedSpans().get(1).operationName()); - Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.restClient", - mockTracer.finishedSpans().get(2).operationName()); + //these can come in any order, as the 'hello' span is finished after the request is sent back. + //this means the client might have already dealt with the response before the hello span is finished + //in practice this means that the spans might be in any order, as they are ordered by the time + //they are completed rather than the time they are started + Set results = mockTracer.finishedSpans().stream().map(MockSpan::operationName).collect(Collectors.toSet()); + Assertions.assertTrue(results.contains("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello")); + Assertions.assertTrue(results.contains("GET")); + Assertions.assertTrue(results.contains("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.restClient")); } finally { RestAssured.reset(); } diff --git a/extensions/smallrye-opentracing/runtime/pom.xml b/extensions/smallrye-opentracing/runtime/pom.xml index f68c44a41..3b4b099c8 100644 --- a/extensions/smallrye-opentracing/runtime/pom.xml +++ b/extensions/smallrye-opentracing/runtime/pom.xml @@ -46,7 +46,7 @@ io.quarkus - quarkus-undertow + quarkus-resteasy io.quarkus diff --git a/extensions/smallrye-opentracing/runtime/src/main/java/io/quarkus/smallrye/opentracing/runtime/QuarkusSmallRyeTracingStandaloneVertxDynamicFeature.java b/extensions/smallrye-opentracing/runtime/src/main/java/io/quarkus/smallrye/opentracing/runtime/QuarkusSmallRyeTracingStandaloneVertxDynamicFeature.java new file mode 100644 index 000000000..2ce0f0b01 --- /dev/null +++ b/extensions/smallrye-opentracing/runtime/src/main/java/io/quarkus/smallrye/opentracing/runtime/QuarkusSmallRyeTracingStandaloneVertxDynamicFeature.java @@ -0,0 +1,79 @@ +package io.quarkus.smallrye.opentracing.runtime; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import javax.enterprise.inject.spi.CDI; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.DynamicFeature; +import javax.ws.rs.container.ResourceInfo; +import javax.ws.rs.core.FeatureContext; +import javax.ws.rs.ext.Provider; + +import io.opentracing.Span; +import io.opentracing.contrib.jaxrs2.internal.SpanWrapper; +import io.opentracing.tag.Tags; +import io.quarkus.vertx.http.runtime.CurrentVertxRequest; +import io.vertx.ext.web.RoutingContext; + +@Provider +public class QuarkusSmallRyeTracingStandaloneVertxDynamicFeature implements DynamicFeature { + + @Override + public void configure(ResourceInfo resourceInfo, FeatureContext context) { + context.register(StandaloneFilter.class); + } + + public static class StandaloneFilter implements ContainerRequestFilter { + + volatile CurrentVertxRequest currentVertxRequest; + + CurrentVertxRequest request() { + if (currentVertxRequest == null) { + currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get(); + } + return currentVertxRequest; + } + + @Override + public void filter(ContainerRequestContext requestContext) throws IOException { + request().addRequestDoneListener(new CurrentVertxRequest.Listener() { + @Override + public void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync) { + SpanWrapper wrapper = routingContext.get(SpanWrapper.PROPERTY_NAME); + if (wrapper != null) { + wrapper.getScope().close(); + } + } + + @Override + public void responseComplete(RoutingContext routingContext) { + SpanWrapper wrapper = routingContext.get(SpanWrapper.PROPERTY_NAME); + if (wrapper == null) { + return; + } + + Tags.HTTP_STATUS.set(wrapper.get(), routingContext.response().getStatusCode()); + if (routingContext.failure() != null) { + addExceptionLogs(wrapper.get(), routingContext.failure()); + } + wrapper.finish(); + + } + }); + + } + + private static void addExceptionLogs(Span span, Throwable throwable) { + Tags.ERROR.set(span, true); + if (throwable != null) { + Map errorLogs = new HashMap<>(2); + errorLogs.put("event", Tags.ERROR.getKey()); + errorLogs.put("error.object", throwable); + span.log(errorLogs); + } + } + } +}