WebClient request id added and logging messages updated (#2257)

WebClient request id added and logging messages updated

Signed-off-by: David Kral <david.k.kral@oracle.com>
This commit is contained in:
David Král
2020-08-31 08:21:45 +00:00
committed by GitHub
parent c20e2614b5
commit 38f2e60f7f
8 changed files with 102 additions and 19 deletions

View File

@@ -49,6 +49,7 @@ import static io.helidon.webclient.WebClientRequestBuilderImpl.COMPLETED;
import static io.helidon.webclient.WebClientRequestBuilderImpl.IN_USE;
import static io.helidon.webclient.WebClientRequestBuilderImpl.RECEIVED;
import static io.helidon.webclient.WebClientRequestBuilderImpl.REQUEST;
import static io.helidon.webclient.WebClientRequestBuilderImpl.REQUEST_ID;
import static io.helidon.webclient.WebClientRequestBuilderImpl.RESPONSE;
import static io.helidon.webclient.WebClientRequestBuilderImpl.RESULT;
@@ -69,6 +70,7 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
private HttpResponsePublisher publisher;
private ResponseCloser responseCloser;
private long requestId;
/**
* Creates new instance.
@@ -87,10 +89,13 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws IOException {
if (msg instanceof HttpResponse) {
ctx.channel().config().setAutoRead(false);
Channel channel = ctx.channel();
channel.config().setAutoRead(false);
HttpResponse response = (HttpResponse) msg;
WebClientRequestImpl clientRequest = ctx.channel().attr(REQUEST).get();
this.requestId = channel.attr(REQUEST_ID).get();
WebClientRequestImpl clientRequest = channel.attr(REQUEST).get();
RequestConfiguration requestConfiguration = clientRequest.configuration();
LOGGER.finest(() -> "(client reqID: " + requestId + ") Initial http response message received.");
this.publisher = new HttpResponsePublisher(ctx);
this.responseCloser = new ResponseCloser(ctx);
@@ -111,7 +116,7 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
// we got a response, we can safely complete the future
// all errors are now fed only to the publisher
WebClientResponse clientResponse = responseBuilder.build();
ctx.channel().attr(RESPONSE).set(clientResponse);
channel.attr(RESPONSE).set(clientResponse);
for (HttpInterceptor interceptor : HTTP_INTERCEPTORS) {
if (interceptor.shouldIntercept(response.status(), requestConfiguration)) {
@@ -119,7 +124,7 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
if (continueAfter) {
responseCloser.close().thenAccept(future -> LOGGER.finest(() -> "Response closed due to redirection"));
}
interceptor.handleInterception(response, clientRequest, ctx.channel().attr(RESULT).get());
interceptor.handleInterception(response, clientRequest, channel.attr(RESULT).get());
if (continueAfter) {
return;
}
@@ -134,7 +139,7 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
clientResponse.headers(),
clientResponse.status());
ctx.channel().attr(SERVICE_RESPONSE).set(clientServiceResponse);
channel.attr(SERVICE_RESPONSE).set(clientServiceResponse);
List<WebClientService> services = requestConfiguration.services();
CompletionStage<WebClientServiceResponse> csr = CompletableFuture.completedFuture(clientServiceResponse);
@@ -143,8 +148,8 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
csr = csr.thenCompose(clientSerResponse -> service.response(clientRequest, clientSerResponse));
}
CompletableFuture<WebClientServiceResponse> responseReceived = ctx.channel().attr(RECEIVED).get();
CompletableFuture<WebClientResponse> responseFuture = ctx.channel().attr(RESULT).get();
CompletableFuture<WebClientServiceResponse> responseReceived = channel.attr(RECEIVED).get();
CompletableFuture<WebClientResponse> responseFuture = channel.attr(RESULT).get();
csr.whenComplete((clientSerResponse, throwable) -> {
if (throwable != null) {
responseReceived.completeExceptionally(throwable);
@@ -176,6 +181,7 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
}
if (msg instanceof LastHttpContent) {
LOGGER.finest(() -> "(client reqID: " + requestId + ") Last http content received.");
responseCloser.close();
}
}
@@ -286,6 +292,7 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
*/
Single<Void> close() {
if (closed.compareAndSet(false, true)) {
LOGGER.finest(() -> "(client reqID: " + requestId + ") Closing the response from the server.");
Channel channel = ctx.channel();
WebClientServiceResponse clientServiceResponse = channel.attr(SERVICE_RESPONSE).get();
CompletableFuture<WebClientServiceResponse> requestComplete = channel.attr(COMPLETED).get();
@@ -297,7 +304,8 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
ctx.close()
.addListener(future -> {
if (future.isSuccess()) {
LOGGER.finest(() -> "Response from the server has been closed.");
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Response from the server has been closed.");
cf.complete(null);
} else {
LOGGER.log(Level.SEVERE,
@@ -307,6 +315,7 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
}
});
} else {
LOGGER.finest(() -> "(client reqID: " + requestId + ") Returning channel to the cache.");
channel.attr(IN_USE).get().set(false);
cf.complete(null);
channel.read();

View File

@@ -37,8 +37,9 @@ class RedirectInterceptor implements HttpInterceptor {
WebClientRequestImpl clientRequest,
CompletableFuture<WebClientResponse> responseFuture) {
if (httpResponse.headers().contains(Http.Header.LOCATION)) {
long requestId = clientRequest.configuration().requestId();
String newUri = httpResponse.headers().get(Http.Header.LOCATION);
LOGGER.fine(() -> "Redirecting to " + newUri);
LOGGER.finest(() -> "(client reqID: " + requestId + ") Redirecting to " + newUri);
WebClientRequestBuilder requestBuilder = WebClientRequestBuilderImpl
.create(clientRequest);
if (URI.create(newUri).getHost() == null) {

View File

@@ -28,6 +28,7 @@ import io.helidon.webclient.spi.WebClientService;
class RequestConfiguration extends WebClientConfiguration {
private final URI requestURI;
private final long requestId;
private final WebClientServiceRequest clientServiceRequest;
private final List<WebClientService> services;
@@ -36,6 +37,7 @@ class RequestConfiguration extends WebClientConfiguration {
requestURI = builder.requestURI;
clientServiceRequest = builder.clientServiceRequest;
services = builder.services;
requestId = builder.requestId;
}
URI requestURI() {
@@ -50,6 +52,10 @@ class RequestConfiguration extends WebClientConfiguration {
return services;
}
long requestId() {
return requestId;
}
static Builder builder(URI requestURI) {
return new Builder(requestURI);
}
@@ -57,6 +63,7 @@ class RequestConfiguration extends WebClientConfiguration {
static final class Builder extends WebClientConfiguration.Builder<Builder, RequestConfiguration> {
private final URI requestURI;
private long requestId = -1;
private WebClientServiceRequest clientServiceRequest;
private List<WebClientService> services = new ArrayList<>();
@@ -74,6 +81,11 @@ class RequestConfiguration extends WebClientConfiguration {
return this;
}
Builder requestId(long requestId) {
this.requestId = requestId;
return this;
}
@Override
public RequestConfiguration build() {
return new RequestConfiguration(this);

View File

@@ -33,6 +33,7 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import static io.helidon.webclient.WebClientRequestBuilderImpl.REQUEST;
import static io.helidon.webclient.WebClientRequestBuilderImpl.REQUEST_ID;
/**
* Subscriber which handles entity sending.
@@ -46,6 +47,7 @@ class RequestContentSubscriber implements Flow.Subscriber<DataChunk> {
private final CompletableFuture<WebClientServiceRequest> sent;
private final DefaultHttpRequest request;
private final Channel channel;
private final long requestId;
private volatile Flow.Subscription subscription;
private volatile DataChunk firstDataChunk;
@@ -59,13 +61,14 @@ class RequestContentSubscriber implements Flow.Subscriber<DataChunk> {
this.channel = channel;
this.responseFuture = responseFuture;
this.sent = sent;
this.requestId = channel.attr(REQUEST_ID).get();
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
LOGGER.finest(() -> "Writing sending request and its content to the server.");
LOGGER.finest(() -> "(client reqID: " + requestId + ") Writing sending request and its content to the server.");
}
@Override
@@ -103,7 +106,8 @@ class RequestContentSubscriber implements Flow.Subscriber<DataChunk> {
@Override
public void onComplete() {
if (lengthOptimization) {
LOGGER.finest(() -> "Message body contains only one data chunk. Setting chunked encoding to false.");
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Message body contains only one data chunk. Setting chunked encoding to false.");
HttpUtil.setTransferEncodingChunked(request, false);
if (firstDataChunk != null) {
HttpUtil.setContentLength(request, firstDataChunk.remaining());
@@ -113,26 +117,28 @@ class RequestContentSubscriber implements Flow.Subscriber<DataChunk> {
sendData(firstDataChunk);
}
}
LOGGER.finest(() -> "Sending last http content");
LOGGER.finest(() -> "(client reqID: " + requestId + ") Sending last http content");
channel.writeAndFlush(LAST_HTTP_CONTENT)
.addListener(completeOnFailureListener("An exception occurred when writing last http content."))
.addListener(completeOnFailureListener("(client reqID: " + requestId + ") "
+ "An exception occurred when writing last http content."))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
WebClientRequestImpl clientRequest = channel.attr(REQUEST).get();
WebClientServiceRequest serviceRequest = clientRequest.configuration().clientServiceRequest();
sent.complete(serviceRequest);
LOGGER.finest(() -> "(client reqID: " + requestId + ") Request sent");
}
private void sendData(DataChunk data) {
LOGGER.finest(() -> "Sending data chunk");
LOGGER.finest(() -> "(client reqID: " + requestId + ") Sending data chunk");
DefaultHttpContent httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer(data.data()));
channel.writeAndFlush(httpContent)
.addListener(future -> {
data.release();
subscription.request(1);
LOGGER.finest(() -> "Data chunk sent with result: " + future.isSuccess());
LOGGER.finest(() -> "(client reqID: " + requestId + ") Data chunk sent with result: " + future.isSuccess());
})
.addListener(completeOnFailureListener("Failure when sending a content!"))
.addListener(completeOnFailureListener("(client reqID: " + requestId + ") Failure when sending a content!"))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}

View File

@@ -257,6 +257,14 @@ public interface WebClientRequestBuilder {
*/
WebClientRequestBuilder keepAlive(boolean keepAlive);
/**
* Set new request id. This id is used in logging messages.
*
* @param requestId new request id
* @return updated builder instance
*/
WebClientRequestBuilder requestId(long requestId);
/**
* Performs prepared request and transforms response to requested type.
*

View File

@@ -94,6 +94,7 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
static final AttributeKey<AtomicBoolean> IN_USE = AttributeKey.valueOf("inUse");
static final AttributeKey<WebClientResponse> RESPONSE = AttributeKey.valueOf("response");
static final AttributeKey<ConnectionIdent> CONNECTION_IDENT = AttributeKey.valueOf("connectionIdent");
static final AttributeKey<Long> REQUEST_ID = AttributeKey.valueOf("requestID");
private static final AtomicLong REQUEST_NUMBER = new AtomicLong(0);
private static final String DEFAULT_TRANSPORT_PROTOCOL = "http";
@@ -128,6 +129,7 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
private Duration readTimeout;
private Duration connectTimeout;
private boolean keepAlive;
private Long requestId;
private WebClientRequestBuilderImpl(LazyValue<NioEventLoopGroup> eventGroup,
WebClientConfiguration configuration,
@@ -147,7 +149,8 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
this.services = configuration.clientServices();
this.readerContext = MessageBodyReaderContext.create(configuration.readerContext());
this.writerContext = MessageBodyWriterContext.create(configuration.writerContext(), headers);
Context.Builder contextBuilder = Context.builder().id("webclient-" + REQUEST_NUMBER.incrementAndGet());
this.requestId = null;
Context.Builder contextBuilder = Context.builder().id("webclient-" + requestId);
configuration.context().ifPresentOrElse(contextBuilder::parent,
() -> Contexts.context().ifPresent(contextBuilder::parent));
this.context = contextBuilder.build();
@@ -365,6 +368,12 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
return this;
}
@Override
public WebClientRequestBuilder requestId(long requestId) {
this.requestId = requestId;
return this;
}
@Override
public <T> Single<T> request(Class<T> responseType) {
return request(GenericType.create(responseType));
@@ -425,6 +434,10 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
return writerContext;
}
long requestId() {
return requestId;
}
Http.RequestMethod method() {
return method;
}
@@ -481,6 +494,10 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity) {
this.uri = prepareFinalURI();
if (requestId == null) {
requestId = REQUEST_NUMBER.incrementAndGet();
}
// LOGGER.finest(() -> "(client reqID: " + requestId + ") Request final URI: " + uri);
CompletableFuture<WebClientServiceRequest> sent = new CompletableFuture<>();
CompletableFuture<WebClientServiceResponse> responseReceived = new CompletableFuture<>();
CompletableFuture<WebClientServiceResponse> complete = new CompletableFuture<>();
@@ -498,6 +515,7 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
}
return Single.create(rcs.thenCompose(serviceRequest -> {
requestId = serviceRequest.requestId();
HttpHeaders headers = toNettyHttpHeaders();
DefaultHttpRequest request = new DefaultHttpRequest(toNettyHttpVersion(httpVersion),
toNettyMethod(method),
@@ -517,6 +535,7 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
.context(context)
.proxy(proxy)
.keepAlive(keepAlive)
.requestId(requestId)
.build();
WebClientRequestImpl clientRequest = new WebClientRequestImpl(this);
@@ -535,12 +554,13 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
: bootstrap.connect(uri.getHost(), uri.getPort());
channelFuture.addListener((ChannelFutureListener) future -> {
LOGGER.finest(() -> "ChannelFuture hashcode -> " + channelFuture.hashCode());
LOGGER.finest(() -> "Channel hashcode -> " + channelFuture.channel().hashCode());
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Channel hashcode -> " + channelFuture.channel().hashCode());
channelFuture.channel().attr(REQUEST).set(clientRequest);
channelFuture.channel().attr(RECEIVED).set(responseReceived);
channelFuture.channel().attr(COMPLETED).set(complete);
channelFuture.channel().attr(RESULT).set(result);
channelFuture.channel().attr(REQUEST_ID).set(requestId);
Throwable cause = future.cause();
if (null == cause) {
RequestContentSubscriber requestContentSubscriber = new RequestContentSubscriber(request,

View File

@@ -40,6 +40,20 @@ public interface WebClientServiceRequest extends HttpRequest {
*/
Context context();
/**
* Request id which will be used in logging messages.
*
* @return current request id
*/
long requestId();
/**
* Set new request id. This id is used in logging messages.
*
* @param requestId new request id
*/
void requestId(long requestId);
/**
* Completes when the request part of this request is done (e.g. we have sent all headers and bytes).
*

View File

@@ -43,6 +43,8 @@ class WebClientServiceRequestImpl implements WebClientServiceRequest {
private final Single<WebClientServiceResponse> responseReceived;
private final Single<WebClientServiceResponse> complete;
private long requestId;
WebClientServiceRequestImpl(WebClientRequestBuilderImpl requestBuilder,
Single<WebClientServiceRequest> sent,
Single<WebClientServiceResponse> responseReceived,
@@ -58,6 +60,7 @@ class WebClientServiceRequestImpl implements WebClientServiceRequest {
this.path = requestBuilder.path();
this.fragment = requestBuilder.fragment();
this.parameters = new HashMap<>(requestBuilder.properties());
this.requestId = requestBuilder.requestId();
this.sent = sent;
this.complete = complete;
}
@@ -72,6 +75,16 @@ class WebClientServiceRequestImpl implements WebClientServiceRequest {
return context;
}
@Override
public long requestId() {
return requestId;
}
@Override
public void requestId(long requestId) {
this.requestId = requestId;
}
@Override
public Single<WebClientServiceRequest> whenSent() {
return sent;