mirror of
https://github.com/jlengrand/quarkus.git
synced 2026-03-10 08:41:22 +00:00
Merge pull request #5505 from stuartwdouglas/5419
Opentracing without Servlet
This commit is contained in:
@@ -5,7 +5,6 @@ import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -26,7 +25,9 @@ import org.jboss.resteasy.spi.NotImplementedYetException;
|
||||
import org.jboss.resteasy.spi.ResteasyAsynchronousContext;
|
||||
import org.jboss.resteasy.spi.ResteasyAsynchronousResponse;
|
||||
|
||||
import io.quarkus.arc.ManagedContext;
|
||||
import io.vertx.core.Context;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
|
||||
/**
|
||||
* Abstraction for an inbound http request on the server, or a response from a server to a client
|
||||
@@ -43,18 +44,21 @@ public final class VertxHttpRequest extends BaseHttpRequest {
|
||||
private String httpMethod;
|
||||
private LazyHostSupplier remoteHost;
|
||||
private InputStream inputStream;
|
||||
private Map<String, Object> attributes;
|
||||
private VertxHttpResponse response;
|
||||
private VertxExecutionContext executionContext;
|
||||
private final RoutingContext routingContext;
|
||||
private final Context context;
|
||||
private final ManagedContext requestContext;
|
||||
private final ManagedContext.ContextState requestContextState;
|
||||
|
||||
public VertxHttpRequest(Context context,
|
||||
RoutingContext routingContext,
|
||||
ResteasyHttpHeaders httpHeaders,
|
||||
ResteasyUriInfo uri,
|
||||
String httpMethod,
|
||||
LazyHostSupplier remoteHost,
|
||||
SynchronousDispatcher dispatcher,
|
||||
VertxHttpResponse response) {
|
||||
VertxHttpResponse response, ManagedContext requestContext) {
|
||||
super(uri);
|
||||
this.context = context;
|
||||
this.response = response;
|
||||
@@ -62,6 +66,9 @@ public final class VertxHttpRequest extends BaseHttpRequest {
|
||||
this.httpMethod = httpMethod;
|
||||
this.remoteHost = remoteHost;
|
||||
this.executionContext = new VertxExecutionContext(this, response, dispatcher);
|
||||
this.requestContext = requestContext;
|
||||
this.requestContextState = requestContext.getState();
|
||||
this.routingContext = routingContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -76,7 +83,7 @@ public final class VertxHttpRequest extends BaseHttpRequest {
|
||||
|
||||
@Override
|
||||
public Enumeration<String> getAttributeNames() {
|
||||
final Map<String, Object> attributes = this.attributes;
|
||||
final Map<String, Object> attributes = routingContext.data();
|
||||
if (attributes == null) {
|
||||
return Collections.emptyEnumeration();
|
||||
} else {
|
||||
@@ -104,22 +111,17 @@ public final class VertxHttpRequest extends BaseHttpRequest {
|
||||
|
||||
@Override
|
||||
public Object getAttribute(String attribute) {
|
||||
return attributes != null ? attributes.get(attribute) : null;
|
||||
return routingContext.get(attribute);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAttribute(String name, Object value) {
|
||||
if (attributes == null) {
|
||||
attributes = new HashMap<String, Object>();
|
||||
}
|
||||
attributes.put(name, value);
|
||||
routingContext.put(name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAttribute(String name) {
|
||||
if (attributes != null) {
|
||||
attributes.remove(name);
|
||||
}
|
||||
routingContext.remove(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -241,11 +243,12 @@ public final class VertxHttpRequest extends BaseHttpRequest {
|
||||
@Override
|
||||
public void complete() {
|
||||
synchronized (responseLock) {
|
||||
if (done)
|
||||
return;
|
||||
if (cancelled)
|
||||
if (done || cancelled) {
|
||||
return;
|
||||
}
|
||||
done = true;
|
||||
requestContext.activate(requestContextState);
|
||||
requestContext.terminate();
|
||||
vertxFlush();
|
||||
}
|
||||
}
|
||||
@@ -258,7 +261,12 @@ public final class VertxHttpRequest extends BaseHttpRequest {
|
||||
if (cancelled)
|
||||
return false;
|
||||
done = true;
|
||||
return internalResume(entity, t -> vertxFlush());
|
||||
requestContext.activate(requestContextState);
|
||||
try {
|
||||
return internalResume(entity, t -> vertxFlush());
|
||||
} finally {
|
||||
requestContext.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,7 +278,12 @@ public final class VertxHttpRequest extends BaseHttpRequest {
|
||||
if (cancelled)
|
||||
return false;
|
||||
done = true;
|
||||
return internalResume(ex, t -> vertxFlush());
|
||||
requestContext.activate(requestContextState);
|
||||
try {
|
||||
return internalResume(ex, t -> vertxFlush());
|
||||
} finally {
|
||||
requestContext.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -285,7 +298,12 @@ public final class VertxHttpRequest extends BaseHttpRequest {
|
||||
}
|
||||
done = true;
|
||||
cancelled = true;
|
||||
return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush());
|
||||
requestContext.activate(requestContextState);
|
||||
try {
|
||||
return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush());
|
||||
} finally {
|
||||
requestContext.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,10 +316,15 @@ public final class VertxHttpRequest extends BaseHttpRequest {
|
||||
return false;
|
||||
done = true;
|
||||
cancelled = true;
|
||||
return internalResume(
|
||||
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
|
||||
.build(),
|
||||
t -> vertxFlush());
|
||||
requestContext.activate(requestContextState);
|
||||
try {
|
||||
return internalResume(
|
||||
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
|
||||
.build(),
|
||||
t -> vertxFlush());
|
||||
} finally {
|
||||
requestContext.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -322,10 +345,15 @@ public final class VertxHttpRequest extends BaseHttpRequest {
|
||||
return false;
|
||||
done = true;
|
||||
cancelled = true;
|
||||
return internalResume(
|
||||
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
|
||||
.build(),
|
||||
t -> vertxFlush());
|
||||
requestContext.activate(requestContextState);
|
||||
try {
|
||||
return internalResume(
|
||||
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
|
||||
.build(),
|
||||
t -> vertxFlush());
|
||||
} finally {
|
||||
requestContext.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import org.jboss.resteasy.spi.ResteasyDeployment;
|
||||
import io.quarkus.arc.ManagedContext;
|
||||
import io.quarkus.arc.runtime.BeanContainer;
|
||||
import io.quarkus.security.identity.CurrentIdentityAssociation;
|
||||
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
|
||||
import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser;
|
||||
import io.vertx.core.Context;
|
||||
import io.vertx.core.Handler;
|
||||
@@ -38,6 +39,7 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
|
||||
protected final BufferAllocator allocator;
|
||||
protected final BeanContainer beanContainer;
|
||||
protected final CurrentIdentityAssociation association;
|
||||
protected final CurrentVertxRequest currentVertxRequest;
|
||||
|
||||
public VertxRequestHandler(Vertx vertx,
|
||||
BeanContainer beanContainer,
|
||||
@@ -52,6 +54,7 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
|
||||
this.allocator = allocator;
|
||||
Instance<CurrentIdentityAssociation> association = CDI.current().select(CurrentIdentityAssociation.class);
|
||||
this.association = association.isResolvable() ? association.get() : null;
|
||||
currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -67,26 +70,22 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
|
||||
}
|
||||
|
||||
vertx.executeBlocking(event -> {
|
||||
dispatchRequestContext(request, is, new VertxBlockingOutput(request.request()));
|
||||
dispatch(request, is, new VertxBlockingOutput(request.request()));
|
||||
}, false, event -> {
|
||||
if (event.failed()) {
|
||||
request.fail(event.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void dispatchRequestContext(RoutingContext request, InputStream is, VertxOutput output) {
|
||||
private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput output) {
|
||||
ManagedContext requestContext = beanContainer.requestContext();
|
||||
requestContext.activate();
|
||||
QuarkusHttpUser user = (QuarkusHttpUser) request.user();
|
||||
QuarkusHttpUser user = (QuarkusHttpUser) routingContext.user();
|
||||
if (user != null && association != null) {
|
||||
association.setIdentity(user.getSecurityIdentity());
|
||||
}
|
||||
try {
|
||||
dispatch(request, is, output);
|
||||
} finally {
|
||||
requestContext.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput output) {
|
||||
currentVertxRequest.setCurrent(routingContext);
|
||||
try {
|
||||
Context ctx = vertx.getOrCreateContext();
|
||||
HttpServerRequest request = routingContext.request();
|
||||
@@ -99,8 +98,9 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
|
||||
// using a supplier to make the remote Address resolution lazy: often it's not needed and it's not very cheap to create.
|
||||
LazyHostSupplier hostSupplier = new LazyHostSupplier(request);
|
||||
|
||||
VertxHttpRequest vertxRequest = new VertxHttpRequest(ctx, headers, uriInfo, request.rawMethod(), hostSupplier,
|
||||
dispatcher.getDispatcher(), vertxResponse);
|
||||
VertxHttpRequest vertxRequest = new VertxHttpRequest(ctx, routingContext, headers, uriInfo, request.rawMethod(),
|
||||
hostSupplier,
|
||||
dispatcher.getDispatcher(), vertxResponse, requestContext);
|
||||
vertxRequest.setInputStream(is);
|
||||
try {
|
||||
ResteasyContext.pushContext(SecurityContext.class, new QuarkusResteasySecurityContext(request));
|
||||
@@ -115,15 +115,35 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
|
||||
routingContext.fail(ex);
|
||||
}
|
||||
|
||||
if (!vertxRequest.getAsyncContext().isSuspended()) {
|
||||
boolean suspended = vertxRequest.getAsyncContext().isSuspended();
|
||||
boolean requestContextActive = requestContext.isActive();
|
||||
if (requestContextActive) {
|
||||
//it is possible that there was an async response, that then finished in the same thread
|
||||
//the async response will have terminated the request context in this case
|
||||
currentVertxRequest.initialInvocationComplete(suspended);
|
||||
}
|
||||
if (!suspended) {
|
||||
try {
|
||||
vertxResponse.finish();
|
||||
} catch (IOException e) {
|
||||
log.error("Unexpected failure", e);
|
||||
} finally {
|
||||
if (requestContextActive) {
|
||||
requestContext.terminate();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
//we need the request context to stick around
|
||||
requestContext.deactivate();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
routingContext.fail(t);
|
||||
try {
|
||||
routingContext.fail(t);
|
||||
} finally {
|
||||
if (requestContext.isActive()) {
|
||||
requestContext.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,10 +22,6 @@
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-resteasy-deployment</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-undertow-deployment</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-arc-deployment</artifactId>
|
||||
|
||||
@@ -8,12 +8,14 @@ import javax.servlet.DispatcherType;
|
||||
import io.opentracing.contrib.interceptors.OpenTracingInterceptor;
|
||||
import io.opentracing.contrib.jaxrs2.server.SpanFinishingFilter;
|
||||
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
|
||||
import io.quarkus.deployment.Capabilities;
|
||||
import io.quarkus.deployment.annotations.BuildProducer;
|
||||
import io.quarkus.deployment.annotations.BuildStep;
|
||||
import io.quarkus.deployment.builditem.FeatureBuildItem;
|
||||
import io.quarkus.deployment.builditem.nativeimage.ReflectiveMethodBuildItem;
|
||||
import io.quarkus.resteasy.common.spi.ResteasyJaxrsProviderBuildItem;
|
||||
import io.quarkus.smallrye.opentracing.runtime.QuarkusSmallRyeTracingDynamicFeature;
|
||||
import io.quarkus.smallrye.opentracing.runtime.QuarkusSmallRyeTracingStandaloneVertxDynamicFeature;
|
||||
import io.quarkus.smallrye.opentracing.runtime.TracerProducer;
|
||||
import io.quarkus.undertow.deployment.FilterBuildItem;
|
||||
|
||||
@@ -33,21 +35,28 @@ public class SmallRyeOpenTracingProcessor {
|
||||
@BuildStep
|
||||
void setupFilter(BuildProducer<ResteasyJaxrsProviderBuildItem> providers,
|
||||
BuildProducer<FilterBuildItem> filterProducer,
|
||||
BuildProducer<FeatureBuildItem> feature) {
|
||||
BuildProducer<FeatureBuildItem> feature,
|
||||
Capabilities capabilities) {
|
||||
|
||||
feature.produce(new FeatureBuildItem(FeatureBuildItem.SMALLRYE_OPENTRACING));
|
||||
|
||||
providers.produce(new ResteasyJaxrsProviderBuildItem(QuarkusSmallRyeTracingDynamicFeature.class.getName()));
|
||||
|
||||
FilterBuildItem filterInfo = FilterBuildItem.builder("tracingFilter", SpanFinishingFilter.class.getName())
|
||||
.setAsyncSupported(true)
|
||||
.addFilterUrlMapping("*", DispatcherType.FORWARD)
|
||||
.addFilterUrlMapping("*", DispatcherType.INCLUDE)
|
||||
.addFilterUrlMapping("*", DispatcherType.REQUEST)
|
||||
.addFilterUrlMapping("*", DispatcherType.ASYNC)
|
||||
.addFilterUrlMapping("*", DispatcherType.ERROR)
|
||||
.build();
|
||||
filterProducer.produce(filterInfo);
|
||||
if (capabilities.isCapabilityPresent(Capabilities.SERVLET)) {
|
||||
FilterBuildItem filterInfo = FilterBuildItem.builder("tracingFilter", SpanFinishingFilter.class.getName())
|
||||
.setAsyncSupported(true)
|
||||
.addFilterUrlMapping("*", DispatcherType.FORWARD)
|
||||
.addFilterUrlMapping("*", DispatcherType.INCLUDE)
|
||||
.addFilterUrlMapping("*", DispatcherType.REQUEST)
|
||||
.addFilterUrlMapping("*", DispatcherType.ASYNC)
|
||||
.addFilterUrlMapping("*", DispatcherType.ERROR)
|
||||
.build();
|
||||
filterProducer.produce(filterInfo);
|
||||
} else {
|
||||
//otherwise we know we have RESTeasy on vert.x
|
||||
providers.produce(
|
||||
new ResteasyJaxrsProviderBuildItem(QuarkusSmallRyeTracingStandaloneVertxDynamicFeature.class.getName()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,16 +1,15 @@
|
||||
package io.quarkus.smallrye.opentracing.deployment;
|
||||
|
||||
import javax.servlet.ServletContextEvent;
|
||||
import javax.servlet.ServletContextListener;
|
||||
import javax.servlet.annotation.WebListener;
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.enterprise.event.Observes;
|
||||
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
|
||||
@WebListener
|
||||
public class TracerRegistrar implements ServletContextListener {
|
||||
@ApplicationScoped
|
||||
public class TracerRegistrar {
|
||||
|
||||
@Override
|
||||
public void contextInitialized(ServletContextEvent sce) {
|
||||
public void start(@Observes StartupEvent start) {
|
||||
GlobalTracer.register(TracingTest.mockTracer);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package io.quarkus.smallrye.opentracing.deployment;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.awaitility.Awaitility;
|
||||
import org.jboss.shrinkwrap.api.ShrinkWrap;
|
||||
@@ -46,13 +48,15 @@ public class TracingTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleServerRequest() {
|
||||
public void testSingleServerRequest() throws InterruptedException {
|
||||
try {
|
||||
RestAssured.defaultParser = Parser.TEXT;
|
||||
RestAssured.when().get("/hello")
|
||||
.then()
|
||||
.statusCode(200);
|
||||
Assertions.assertEquals(1, mockTracer.finishedSpans().size());
|
||||
//inherently racy, tracer is completed after response is sent back to the client
|
||||
Awaitility.await().atMost(5, TimeUnit.SECONDS)
|
||||
.until(() -> mockTracer.finishedSpans().size() == 1);
|
||||
Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello",
|
||||
mockTracer.finishedSpans().get(0).operationName());
|
||||
} finally {
|
||||
@@ -61,12 +65,15 @@ public class TracingTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCDI() {
|
||||
public void testCDI() throws InterruptedException {
|
||||
try {
|
||||
RestAssured.defaultParser = Parser.TEXT;
|
||||
RestAssured.when().get("/cdi")
|
||||
.then()
|
||||
.statusCode(200);
|
||||
//inherently racy, tracer is completed after response is sent back to the client
|
||||
Awaitility.await().atMost(5, TimeUnit.SECONDS)
|
||||
.until(() -> mockTracer.finishedSpans().size() == 2);
|
||||
Assertions.assertEquals(2, mockTracer.finishedSpans().size());
|
||||
Assertions.assertEquals("io.quarkus.smallrye.opentracing.deployment.Service.foo",
|
||||
mockTracer.finishedSpans().get(0).operationName());
|
||||
@@ -78,18 +85,24 @@ public class TracingTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMPRestClient() {
|
||||
public void testMPRestClient() throws InterruptedException {
|
||||
try {
|
||||
RestAssured.defaultParser = Parser.TEXT;
|
||||
RestAssured.when().get("/restClient")
|
||||
.then()
|
||||
.statusCode(200);
|
||||
//inherently racy, tracer is completed after response is sent back to the client
|
||||
Awaitility.await().atMost(5, TimeUnit.SECONDS)
|
||||
.until(() -> mockTracer.finishedSpans().size() == 3);
|
||||
Assertions.assertEquals(3, mockTracer.finishedSpans().size());
|
||||
Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello",
|
||||
mockTracer.finishedSpans().get(0).operationName());
|
||||
Assertions.assertEquals("GET", mockTracer.finishedSpans().get(1).operationName());
|
||||
Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.restClient",
|
||||
mockTracer.finishedSpans().get(2).operationName());
|
||||
//these can come in any order, as the 'hello' span is finished after the request is sent back.
|
||||
//this means the client might have already dealt with the response before the hello span is finished
|
||||
//in practice this means that the spans might be in any order, as they are ordered by the time
|
||||
//they are completed rather than the time they are started
|
||||
Set<String> results = mockTracer.finishedSpans().stream().map(MockSpan::operationName).collect(Collectors.toSet());
|
||||
Assertions.assertTrue(results.contains("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello"));
|
||||
Assertions.assertTrue(results.contains("GET"));
|
||||
Assertions.assertTrue(results.contains("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.restClient"));
|
||||
} finally {
|
||||
RestAssured.reset();
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-undertow</artifactId>
|
||||
<artifactId>quarkus-resteasy</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
package io.quarkus.smallrye.opentracing.runtime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.enterprise.inject.spi.CDI;
|
||||
import javax.ws.rs.container.ContainerRequestContext;
|
||||
import javax.ws.rs.container.ContainerRequestFilter;
|
||||
import javax.ws.rs.container.DynamicFeature;
|
||||
import javax.ws.rs.container.ResourceInfo;
|
||||
import javax.ws.rs.core.FeatureContext;
|
||||
import javax.ws.rs.ext.Provider;
|
||||
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.contrib.jaxrs2.internal.SpanWrapper;
|
||||
import io.opentracing.tag.Tags;
|
||||
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
|
||||
@Provider
|
||||
public class QuarkusSmallRyeTracingStandaloneVertxDynamicFeature implements DynamicFeature {
|
||||
|
||||
@Override
|
||||
public void configure(ResourceInfo resourceInfo, FeatureContext context) {
|
||||
context.register(StandaloneFilter.class);
|
||||
}
|
||||
|
||||
public static class StandaloneFilter implements ContainerRequestFilter {
|
||||
|
||||
volatile CurrentVertxRequest currentVertxRequest;
|
||||
|
||||
CurrentVertxRequest request() {
|
||||
if (currentVertxRequest == null) {
|
||||
currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get();
|
||||
}
|
||||
return currentVertxRequest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void filter(ContainerRequestContext requestContext) throws IOException {
|
||||
request().addRequestDoneListener(new CurrentVertxRequest.Listener() {
|
||||
@Override
|
||||
public void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync) {
|
||||
SpanWrapper wrapper = routingContext.get(SpanWrapper.PROPERTY_NAME);
|
||||
if (wrapper != null) {
|
||||
wrapper.getScope().close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void responseComplete(RoutingContext routingContext) {
|
||||
SpanWrapper wrapper = routingContext.get(SpanWrapper.PROPERTY_NAME);
|
||||
if (wrapper == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Tags.HTTP_STATUS.set(wrapper.get(), routingContext.response().getStatusCode());
|
||||
if (routingContext.failure() != null) {
|
||||
addExceptionLogs(wrapper.get(), routingContext.failure());
|
||||
}
|
||||
wrapper.finish();
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static void addExceptionLogs(Span span, Throwable throwable) {
|
||||
Tags.ERROR.set(span, true);
|
||||
if (throwable != null) {
|
||||
Map<String, Object> errorLogs = new HashMap<>(2);
|
||||
errorLogs.put("event", Tags.ERROR.getKey());
|
||||
errorLogs.put("error.object", throwable);
|
||||
span.log(errorLogs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -37,6 +37,7 @@ import io.quarkus.runtime.RuntimeValue;
|
||||
import io.quarkus.runtime.ShutdownContext;
|
||||
import io.quarkus.runtime.annotations.Recorder;
|
||||
import io.quarkus.runtime.configuration.MemorySize;
|
||||
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
|
||||
import io.quarkus.vertx.http.runtime.HttpConfiguration;
|
||||
import io.undertow.httpcore.BufferAllocator;
|
||||
import io.undertow.httpcore.StatusCodes;
|
||||
@@ -448,6 +449,7 @@ public class UndertowDeploymentRecorder {
|
||||
return new ServletExtension() {
|
||||
@Override
|
||||
public void handleDeployment(DeploymentInfo deploymentInfo, ServletContext servletContext) {
|
||||
CurrentVertxRequest currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get();
|
||||
deploymentInfo.addThreadSetupAction(new ThreadSetupHandler() {
|
||||
@Override
|
||||
public <T, C> ThreadSetupHandler.Action<T, C> create(Action<T, C> action) {
|
||||
@@ -458,6 +460,7 @@ public class UndertowDeploymentRecorder {
|
||||
if (exchange == null) {
|
||||
return action.call(exchange, context);
|
||||
}
|
||||
boolean vertxFirst = false;
|
||||
ManagedContext requestContext = beanContainer.requestContext();
|
||||
if (requestContext.isActive()) {
|
||||
return action.call(exchange, context);
|
||||
@@ -466,11 +469,19 @@ public class UndertowDeploymentRecorder {
|
||||
.getAttachment(REQUEST_CONTEXT);
|
||||
try {
|
||||
requestContext.activate(existingRequestContext);
|
||||
vertxFirst = existingRequestContext == null;
|
||||
|
||||
VertxHttpExchange delegate = (VertxHttpExchange) exchange.getDelegate();
|
||||
RoutingContext rc = (RoutingContext) delegate.getContext();
|
||||
currentVertxRequest.setCurrent(rc);
|
||||
return action.call(exchange, context);
|
||||
} finally {
|
||||
ServletRequestContext src = exchange
|
||||
.getAttachment(ServletRequestContext.ATTACHMENT_KEY);
|
||||
HttpServletRequestImpl req = src.getOriginalRequest();
|
||||
if (vertxFirst) {
|
||||
currentVertxRequest.initialInvocationComplete(req.isAsyncStarted());
|
||||
}
|
||||
if (req.isAsyncStarted()) {
|
||||
exchange.putAttachment(REQUEST_CONTEXT, requestContext.getState());
|
||||
requestContext.deactivate();
|
||||
|
||||
@@ -27,6 +27,7 @@ import io.quarkus.runtime.LaunchMode;
|
||||
import io.quarkus.runtime.RuntimeValue;
|
||||
import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem;
|
||||
import io.quarkus.vertx.core.deployment.InternalWebVertxBuildItem;
|
||||
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
|
||||
import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig;
|
||||
import io.quarkus.vertx.http.runtime.HttpConfiguration;
|
||||
import io.quarkus.vertx.http.runtime.RouterProducer;
|
||||
@@ -51,7 +52,11 @@ class VertxHttpProcessor {
|
||||
|
||||
@BuildStep
|
||||
AdditionalBeanBuildItem additionalBeans() {
|
||||
return AdditionalBeanBuildItem.unremovableOf(RouterProducer.class);
|
||||
return AdditionalBeanBuildItem.builder()
|
||||
.setUnremovable()
|
||||
.addBeanClass(RouterProducer.class)
|
||||
.addBeanClass(CurrentVertxRequest.class)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
package io.quarkus.vertx.http.runtime;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.enterprise.context.RequestScoped;
|
||||
import javax.enterprise.inject.Produces;
|
||||
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
|
||||
@RequestScoped
|
||||
public class CurrentVertxRequest {
|
||||
|
||||
private final Logger log = Logger.getLogger(CurrentVertxRequest.class);
|
||||
|
||||
public RoutingContext current;
|
||||
private List<Listener> doneListeners;
|
||||
|
||||
@Produces
|
||||
@RequestScoped
|
||||
public RoutingContext getCurrent() {
|
||||
return current;
|
||||
}
|
||||
|
||||
public CurrentVertxRequest setCurrent(RoutingContext current) {
|
||||
this.current = current;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void addRequestDoneListener(Listener doneListener) {
|
||||
if (doneListeners == null) {
|
||||
doneListeners = new ArrayList<>();
|
||||
}
|
||||
doneListeners.add(doneListener);
|
||||
}
|
||||
|
||||
public void initialInvocationComplete(boolean goingAsync) {
|
||||
|
||||
if (current == null) {
|
||||
return;
|
||||
}
|
||||
if (doneListeners != null) {
|
||||
for (Listener i : doneListeners) {
|
||||
try {
|
||||
i.initialInvocationComplete(current, goingAsync);
|
||||
} catch (Throwable t) {
|
||||
log.errorf(t, "Failed to process invocation listener %s", i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
void done() {
|
||||
if (current == null) {
|
||||
return;
|
||||
}
|
||||
if (doneListeners != null) {
|
||||
for (Listener i : doneListeners) {
|
||||
try {
|
||||
i.responseComplete(current);
|
||||
} catch (Throwable t) {
|
||||
log.errorf(t, "Failed to process done listener %s", i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public interface Listener {
|
||||
|
||||
void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync);
|
||||
|
||||
void responseComplete(RoutingContext routingContext);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user