Rework the HttpClientRequest#sendHead method so it will perform the callback when the header has been sent using HttpClientConnection#writeHead method and also report when writing the headers is a failure. For this purpose the sendHead method callback is changed to Handler<AsyncResult<HttpVersion>> instead of Handler<HttpVersion>. Fixes #3213 - fixes #3214

This commit is contained in:
Julien Viet
2019-12-02 10:42:09 +01:00
parent 223418d0fd
commit 7cfbc84aa8
7 changed files with 33 additions and 25 deletions

View File

@@ -239,15 +239,14 @@ public interface HttpClientRequest extends WriteStream<Buffer>, Future<HttpClien
* @return a reference to this, so the API can be used fluently
* @throws java.lang.IllegalStateException when no response handler is set
*/
@Fluent
HttpClientRequest sendHead();
Future<HttpVersion> sendHead();
/**
* Like {@link #sendHead()} but with an handler after headers have been sent. The handler will be called with
* the {@link HttpVersion} if it can be determined or null otherwise.<p>
*/
@Fluent
HttpClientRequest sendHead(Handler<HttpVersion> completionHandler);
HttpClientRequest sendHead(Handler<AsyncResult<HttpVersion>> completionHandler);
/**
* Same as {@link #end(Buffer)} but writes a String in UTF-8 encoding

View File

@@ -264,12 +264,14 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
@Override
public HttpClientRequest sendHead() {
return sendHead(null);
public Future<HttpVersion> sendHead() {
Promise<HttpVersion> promise = context.promise();
sendHead(promise);
return promise.future();
}
@Override
public synchronized HttpClientRequest sendHead(Handler<HttpVersion> headersHandler) {
public synchronized HttpClientRequest sendHead(Handler<AsyncResult<HttpVersion>> headersHandler) {
checkEnded();
checkResponseHandler();
if (stream != null) {
@@ -413,7 +415,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
return hostHeader != null ? hostHeader : super.hostHeader();
}
private synchronized void connect(Handler<HttpVersion> headersHandler) {
private synchronized void connect(Handler<AsyncResult<HttpVersion>> headersHandler) {
if (!connecting) {
if (method == HttpMethod.OTHER && rawMethod == null) {
@@ -457,7 +459,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
}
private void connected(Handler<HttpVersion> headersHandler, HttpClientStream stream) {
private void connected(Handler<AsyncResult<HttpVersion>> headersHandler, HttpClientStream stream) {
synchronized (this) {
this.stream = stream;
@@ -481,17 +483,22 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
};
}
}
if (headersHandler != null) {
Handler<AsyncResult<Void>> others = handler;
handler = ar -> {
if (others != null) {
others.handle(ar);
}
headersHandler.handle(ar.map(stream.version()));
};
}
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, ended, priority, handler);
if (ended) {
// we also need to write the head so optimize this and write all out in once
tryComplete();
}
this.connecting = false;
this.stream = stream;
}
if (headersHandler != null) {
headersHandler.handle(stream.version());
}
}
@Override

View File

@@ -196,12 +196,12 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase {
}
@Override
public HttpClientRequest sendHead() {
public Future<HttpVersion> sendHead() {
throw new IllegalStateException();
}
@Override
public HttpClientRequest sendHead(Handler<HttpVersion> completionHandler) {
public HttpClientRequest sendHead(Handler<AsyncResult<HttpVersion>> completionHandler) {
throw new IllegalStateException();
}

View File

@@ -4429,9 +4429,10 @@ public class Http1xTest extends HttpTest {
startServer(testAddress);
client.close();
client = vertx.createHttpClient(createBaseClientOptions().setPipelining(true).setMaxPoolSize(1).setKeepAlive(true));
HttpClientRequest req1 = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/", resp -> {
HttpClientRequest req = client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/", resp -> {
complete();
}).sendHead();
});
req.sendHead();
client.request(HttpMethod.GET, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/", resp -> {
complete();
}).end();
@@ -4441,7 +4442,7 @@ public class Http1xTest extends HttpTest {
// Need to wait a little so requests 2 and 3 are appended to the first request
Thread.sleep(300);
// This will end request 1 and make requests 2 and 3 progress
req1.end();
req.end();
await();
}

View File

@@ -1460,11 +1460,11 @@ public class Http2ClientTest extends Http2TestBase {
testComplete();
});
}));
req.sendHead(version -> {
req.sendHead(onSuccess(version -> {
assertEquals(0, status.getAndIncrement());
assertSame(HttpVersion.HTTP_2, version);
req.end();
});
}));
await();
}
@@ -1497,11 +1497,11 @@ public class Http2ClientTest extends Http2TestBase {
testComplete();
});
}));
req.sendHead(version -> {
req.sendHead(onSuccess(version -> {
assertSame(HttpVersion.HTTP_2, version);
req.writeCustomFrame(10, 253, expectedSend);
req.end();
});
}));
await();
}

View File

@@ -119,7 +119,9 @@ public class Http2Test extends HttpTest {
HttpClientRequest req = client.request(HttpMethod.GET, testAddress, 8080, "localhost", "/somepath", onSuccess(resp -> {
assertEquals(200, resp.statusCode());
testComplete();
})).setChunked(true).sendHead();
}))
.setChunked(true);
req.sendHead();
awaitLatch(latch2); // The next write won't be buffered
req.write("hello ");
req.end("world");

View File

@@ -96,7 +96,6 @@ public abstract class HttpTest extends HttpTestBase {
server.listen(testAddress, onSuccess(server -> {
HttpClientRequest req = client.request(HttpMethod.PUT, testAddress, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, noOpHandler());
assertTrue(req.setChunked(true) == req);
assertTrue(req.sendHead() == req);
testComplete();
}));
@@ -4088,8 +4087,8 @@ public abstract class HttpTest extends HttpTestBase {
public void write(String chunk, Handler<AsyncResult<Void>> handler) { throw new UnsupportedOperationException(); }
public void write(String chunk, String enc, Handler<AsyncResult<Void>> handler) { throw new UnsupportedOperationException(); }
public HttpClientRequest continueHandler(@Nullable Handler<Void> handler) { throw new UnsupportedOperationException(); }
public HttpClientRequest sendHead() { throw new UnsupportedOperationException(); }
public HttpClientRequest sendHead(Handler<HttpVersion> completionHandler) { throw new UnsupportedOperationException(); }
public Future<HttpVersion> sendHead() { throw new UnsupportedOperationException(); }
public HttpClientRequest sendHead(Handler<AsyncResult<HttpVersion>> completionHandler) { throw new UnsupportedOperationException(); }
public Future<Void> end(String chunk) { throw new UnsupportedOperationException(); }
public Future<Void> end(String chunk, String enc) { throw new UnsupportedOperationException(); }
public void end(String chunk, Handler<AsyncResult<Void>> handler) { throw new UnsupportedOperationException(); }