mirror of
https://github.com/jlengrand/vert.x.git
synced 2026-03-10 08:51:19 +00:00
HttpClient connection handler - fixes #2636
This commit is contained in:
@@ -1391,6 +1391,14 @@ public interface HttpClient extends Measured {
|
||||
ReadStream<WebSocket> websocketStream(String requestURI, MultiMap headers, WebsocketVersion version,
|
||||
String subProtocols);
|
||||
|
||||
/**
|
||||
* Set a connection handler for the client. This handler is called when a new connection is established.
|
||||
*
|
||||
* @return a reference to this, so the API can be used fluently
|
||||
*/
|
||||
@Fluent
|
||||
HttpClient connectionHandler(Handler<HttpConnection> handler);
|
||||
|
||||
/**
|
||||
* Set a redirect handler for the http client.
|
||||
* <p>
|
||||
|
||||
@@ -18,16 +18,7 @@ import io.vertx.core.Handler;
|
||||
import io.vertx.core.MultiMap;
|
||||
import io.vertx.core.VertxException;
|
||||
import io.vertx.core.*;
|
||||
import io.vertx.core.http.HttpClient;
|
||||
import io.vertx.core.http.HttpClientOptions;
|
||||
import io.vertx.core.http.HttpClientRequest;
|
||||
import io.vertx.core.http.HttpClientResponse;
|
||||
import io.vertx.core.http.HttpHeaders;
|
||||
import io.vertx.core.http.HttpMethod;
|
||||
import io.vertx.core.http.HttpVersion;
|
||||
import io.vertx.core.http.RequestOptions;
|
||||
import io.vertx.core.http.WebSocket;
|
||||
import io.vertx.core.http.WebsocketVersion;
|
||||
import io.vertx.core.http.*;
|
||||
import io.vertx.core.impl.ContextInternal;
|
||||
import io.vertx.core.impl.VertxInternal;
|
||||
import io.vertx.core.logging.Logger;
|
||||
@@ -44,7 +35,6 @@ import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
@@ -116,6 +106,7 @@ public class HttpClientImpl implements HttpClient, MetricsProvider {
|
||||
private final boolean keepAlive;
|
||||
private final boolean pipelining;
|
||||
private volatile boolean closed;
|
||||
private volatile Handler<HttpConnection> connectionHandler;
|
||||
private volatile Function<HttpClientResponse, Future<HttpClientRequest>> redirectHandler = DEFAULT_HANDLER;
|
||||
|
||||
public HttpClientImpl(VertxInternal vertx, HttpClientOptions options) {
|
||||
@@ -926,6 +917,16 @@ public class HttpClientImpl implements HttpClient, MetricsProvider {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpClient connectionHandler(Handler<HttpConnection> handler) {
|
||||
connectionHandler = handler;
|
||||
return this;
|
||||
}
|
||||
|
||||
Handler<HttpConnection> connectionHandler() {
|
||||
return connectionHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpClient redirectHandler(Function<HttpClientResponse, Future<HttpClientRequest>> handler) {
|
||||
if (handler == null) {
|
||||
|
||||
@@ -459,7 +459,21 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
|
||||
}
|
||||
|
||||
// Capture some stuff
|
||||
Handler<HttpConnection> initializer = connectionHandler;
|
||||
Handler<HttpConnection> h1 = connectionHandler;
|
||||
Handler<HttpConnection> h2 = client.connectionHandler();
|
||||
Handler<HttpConnection> initializer;
|
||||
if (h1 != null) {
|
||||
if (h2 != null) {
|
||||
initializer = conn -> {
|
||||
h1.handle(conn);
|
||||
h2.handle(conn);
|
||||
};
|
||||
} else {
|
||||
initializer = h1;
|
||||
}
|
||||
} else {
|
||||
initializer = h2;
|
||||
}
|
||||
ContextInternal connectCtx = vertx.getOrCreateContext();
|
||||
|
||||
// We defer actual connection until the first part of body is written or end is called
|
||||
|
||||
@@ -3394,8 +3394,22 @@ public abstract class HttpTest extends HttpTestBase {
|
||||
await();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientLocalConnectionHandler() throws Exception {
|
||||
testClientConnectionHandler(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientGlobalConnectionHandler() throws Exception {
|
||||
testClientConnectionHandler(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientConnectionHandler() throws Exception {
|
||||
testClientConnectionHandler(true, true);
|
||||
}
|
||||
|
||||
private void testClientConnectionHandler(boolean local, boolean global) throws Exception {
|
||||
server.requestHandler(req -> {
|
||||
req.response().end();
|
||||
});
|
||||
@@ -3403,13 +3417,17 @@ public abstract class HttpTest extends HttpTestBase {
|
||||
server.listen(onSuccess(s -> listenLatch.countDown()));
|
||||
awaitLatch(listenLatch);
|
||||
AtomicInteger status = new AtomicInteger();
|
||||
Handler<HttpConnection> handler = conn -> status.getAndIncrement();
|
||||
if (global) {
|
||||
client.connectionHandler(handler);
|
||||
}
|
||||
HttpClientRequest req = client.post(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", resp -> {
|
||||
assertEquals(1, status.getAndIncrement());
|
||||
assertEquals((local ? 1 : 0) + (global ? 1 : 0), status.getAndIncrement());
|
||||
testComplete();
|
||||
});
|
||||
req.connectionHandler(conn -> {
|
||||
assertEquals(0, status.getAndIncrement());
|
||||
});
|
||||
if (local) {
|
||||
req.connectionHandler(handler);
|
||||
}
|
||||
req.end();
|
||||
await();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user