Delay the end handler notification when an stream end handler is called - see #2852

This commit is contained in:
Julien Viet
2019-01-09 10:43:41 +01:00
parent 90455f85b2
commit 24434e9ecf
17 changed files with 229 additions and 76 deletions

View File

@@ -69,6 +69,7 @@ public class AsyncFileImpl implements AsyncFile {
private int lwm = maxWrites / 2;
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
private InboundBuffer<Buffer> queue;
private Handler<Buffer> handler;
private Handler<Void> endHandler;
private long readPos;
private long readLength = Long.MAX_VALUE;
@@ -102,7 +103,13 @@ public class AsyncFileImpl implements AsyncFile {
}
this.context = context;
this.queue = new InboundBuffer<>(context, 0);
queue.handler(buff -> {
if (buff.length() > 0) {
handleBuffer(buff);
} else {
handleEnd();
}
});
queue.drainHandler(v -> {
doRead();
});
@@ -235,7 +242,7 @@ public class AsyncFileImpl implements AsyncFile {
if (closed) {
return this;
}
queue.handler(handler);
this.handler = handler;
if (handler != null) {
doRead();
} else {
@@ -346,15 +353,11 @@ public class AsyncFileImpl implements AsyncFile {
doRead(buff, 0, bb, readPos, ar -> {
if (ar.succeeded()) {
Buffer buffer = ar.result();
if (buffer.length() == 0) {
// Empty buffer represents end of file
handleEnd();
} else {
readPos += buffer.length();
readLength -= buffer.length();
if (queue.write(buffer)) {
doRead(bb);
}
readPos += buffer.length();
readLength -= buffer.length();
// Empty buffer represents end of file
if (queue.write(buffer) && buffer.length() > 0) {
doRead(bb);
}
} else {
handleException(ar.cause());
@@ -362,8 +365,15 @@ public class AsyncFileImpl implements AsyncFile {
});
}
private synchronized void handleBuffer(Buffer buff) {
if (handler != null) {
checkContext();
handler.handle(buff);
}
}
private synchronized void handleEnd() {
queue.handler(null);
handler = null;
if (endHandler != null) {
checkContext();
endHandler.handle(null);

View File

@@ -348,12 +348,9 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC
@Override
public void doFetch(long amount) {
queue.fetch(amount);
}
@Override
public void doResume() {
queue.resume();
if (!queue.fetch(amount)) {
response.handleEnd(trailers);
}
}
@Override
@@ -489,7 +486,7 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC
}
}
trailers = new HeadersAdaptor(trailer.trailingHeaders());
if (queue.isEmpty()) {
if (queue.isEmpty() && !queue.isPaused()) {
response.handleEnd(trailers);
}
responseEnded = true;

View File

@@ -225,19 +225,22 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
@Override
public HttpServerRequest pause() {
doPause();
synchronized (conn) {
checkEnded();
doPause();
}
return this;
}
@Override
public HttpServerRequest resume() {
doResume();
return this;
return fetch(Long.MAX_VALUE);
}
@Override
public HttpServerRequest fetch(long amount) {
synchronized (conn) {
checkEnded();
doFetch(amount);
}
return this;

View File

@@ -215,11 +215,6 @@ public class Http2UpgradedClientConnection implements HttpClientConnection {
stream.doPause();
}
@Override
public void doResume() {
stream.doResume();
}
@Override
public void doFetch(long amount) {
stream.doFetch(amount);

View File

@@ -132,26 +132,41 @@ public class HttpClientResponseImpl implements HttpClientResponse {
}
}
private void checkEnded() {
if (trailers != null) {
throw new IllegalStateException();
}
}
@Override
public HttpClientResponse handler(Handler<Buffer> dataHandler) {
public HttpClientResponse handler(Handler<Buffer> handle) {
synchronized (conn) {
this.dataHandler = dataHandler;
if (handle != null) {
checkEnded();
}
dataHandler = handle;
return this;
}
}
@Override
public HttpClientResponse endHandler(Handler<Void> endHandler) {
public HttpClientResponse endHandler(Handler<Void> handler) {
synchronized (conn) {
this.endHandler = endHandler;
if (handler != null) {
checkEnded();
}
endHandler = handler;
return this;
}
}
@Override
public HttpClientResponse exceptionHandler(Handler<Throwable> exceptionHandler) {
public HttpClientResponse exceptionHandler(Handler<Throwable> handler) {
synchronized (conn) {
this.exceptionHandler = exceptionHandler;
if (handler != null) {
checkEnded();
}
exceptionHandler = handler;
return this;
}
}
@@ -164,8 +179,7 @@ public class HttpClientResponseImpl implements HttpClientResponse {
@Override
public HttpClientResponse resume() {
stream.doResume();
return this;
return fetch(Long.MAX_VALUE);
}
@Override
@@ -175,16 +189,24 @@ public class HttpClientResponseImpl implements HttpClientResponse {
}
@Override
public HttpClientResponse bodyHandler(final Handler<Buffer> bodyHandler) {
BodyHandler handler = new BodyHandler();
handler(handler);
endHandler(v -> handler.notifyHandler(bodyHandler));
public HttpClientResponse bodyHandler(final Handler<Buffer> handler) {
if (handler != null) {
BodyHandler bodyHandler = new BodyHandler();
handler(bodyHandler);
endHandler(v -> bodyHandler.notifyHandler(handler));
} else {
handler(null);
endHandler(null);
}
return this;
}
@Override
public HttpClientResponse customFrameHandler(Handler<HttpFrame> handler) {
synchronized (conn) {
if (endHandler != null) {
checkEnded();
}
customFrameHandler = handler;
return this;
}
@@ -217,16 +239,19 @@ public class HttpClientResponseImpl implements HttpClientResponse {
}
void handleEnd(MultiMap trailers) {
Handler<Void> handler;
synchronized (conn) {
stream.reportBytesRead(bytesRead);
bytesRead = 0;
this.trailers = trailers;
if (endHandler != null) {
try {
endHandler.handle(null);
} catch (Throwable t) {
handleException(t);
}
handler = endHandler;
endHandler = null;
}
if (handler != null) {
try {
handler.handle(null);
} catch (Throwable t) {
handleException(t);
}
}
}
@@ -277,6 +302,9 @@ public class HttpClientResponseImpl implements HttpClientResponse {
@Override
public HttpClientResponse streamPriorityHandler(Handler<StreamPriority> handler) {
synchronized (conn) {
if (handler != null) {
checkEnded();
}
priorityHandler = handler;
}
return this;

View File

@@ -49,7 +49,6 @@ interface HttpClientStream {
void doSetWriteQueueMaxSize(int size);
boolean isNotWritable();
void doPause();
void doResume();
void doFetch(long amount);
void reset(long code);

View File

@@ -161,7 +161,7 @@ public class HttpServerRequestImpl implements HttpServerRequest {
boolean end = ended;
ended = false;
handleBegin();
if (pending != null && pending.size() > 0) {
if (pending != null && pending.isPaused()) {
pending.resume();
}
if (end) {
@@ -340,7 +340,13 @@ public class HttpServerRequestImpl implements HttpServerRequest {
public HttpServerRequest resume() {
synchronized (conn) {
if (!isEnded()) {
pendingQueue().resume();
if (ended) {
if (!pending.resume()) {
doEnd();
}
} else if (pending != null) {
pending.resume();
}
}
return this;
}
@@ -478,7 +484,7 @@ public class HttpServerRequestImpl implements HttpServerRequest {
@Override
public boolean isEnded() {
synchronized (conn) {
return ended && (pending == null || pending.isEmpty());
return ended && (pending == null || (!pending.isPaused() && pending.isEmpty()));
}
}

View File

@@ -151,8 +151,7 @@ class VertxHttp2NetSocket<C extends Http2ConnectionBase> extends VertxHttp2Strea
@Override
public NetSocket resume() {
doResume();
return this;
return fetch(Long.MAX_VALUE);
}
@Override

View File

@@ -36,7 +36,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
protected final ChannelHandlerContext handlerContext;
protected final Http2Stream stream;
private InboundBuffer<Buffer> pending;
private final InboundBuffer<Buffer> pending;
private int pendingBytes;
private MultiMap trailers;
private boolean writable;
@@ -95,7 +95,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
void onEnd(MultiMap map) {
synchronized (conn) {
trailers = map;
if (pending.isEmpty()) {
if (pending.isEmpty() && !pending.isPaused()) {
handleEnd(trailers);
}
}
@@ -109,12 +109,12 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
pending.pause();
}
public void doResume() {
pending.resume();
}
public void doFetch(long amount) {
pending.fetch(amount);
if (!pending.fetch(amount)) {
if (trailers != null) {
handleEnd(trailers);
}
}
}
boolean isNotWritable() {

View File

@@ -21,6 +21,7 @@ import io.vertx.core.http.WebSocketBase;
import io.vertx.core.http.WebSocketFrame;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.streams.impl.InboundBuffer;
@@ -444,7 +445,7 @@ public abstract class WebSocketImplBase<S extends WebSocketBase> implements WebS
Handler<Void> endHandler;
Handler<Void> closeHandler;
synchronized (conn) {
endHandler = this.endHandler;
endHandler = pending.isPaused() ? null : this.endHandler;
closeHandler = this.closeHandler;
closed = true;
binaryHandlerRegistration = null;
@@ -548,8 +549,17 @@ public abstract class WebSocketImplBase<S extends WebSocketBase> implements WebS
@Override
public S resume() {
if (!isClosed()) {
pending.resume();
synchronized (this) {
if (isClosed()) {
Handler<Void> handler = endHandler;
endHandler = null;
if (handler != null) {
ContextInternal ctx = conn.getContext();
ctx.runOnContext(v -> handler.handle(null));
}
} else {
pending.resume();
}
}
return (S) this;
}

View File

@@ -193,14 +193,15 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
@Override
public NetSocket fetch(long amount) {
pending.fetch(amount);
if (!pending.fetch(amount)) {
checkEnd();
}
return this;
}
@Override
public synchronized NetSocket resume() {
pending.resume();
return this;
return fetch(Long.MAX_VALUE);
}
@Override
@@ -358,7 +359,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
private void checkEnd() {
Handler<Void> handler;
synchronized (this) {
if (!closed || pending.size() > 0 || (handler = endHandler) == null) {
if (!closed || pending.isPaused() || (handler = endHandler) == null) {
return;
}
}

View File

@@ -251,9 +251,9 @@ public class InboundBuffer<E> {
* <p/>
* This method can be called from any thread.
*
* @return a reference to this, so the API can be used fluently
* @return {@code true} when the buffer will be drained
*/
public InboundBuffer<E> fetch(long amount) {
public boolean fetch(long amount) {
if (amount < 0L) {
throw new IllegalArgumentException();
}
@@ -263,12 +263,12 @@ public class InboundBuffer<E> {
demand = Long.MAX_VALUE;
}
if (emitting || (pending.isEmpty() && !overflow)) {
return this;
return false;
}
emitting = true;
}
context.runOnContext(v -> drain());
return this;
return true;
}
/**
@@ -313,9 +313,9 @@ public class InboundBuffer<E> {
* <p/>
* This method can be called from any thread.
*
* @return a reference to this, so the API can be used fluently
* @return {@code true} when the buffer will be drained
*/
public InboundBuffer<E> resume() {
public boolean resume() {
return fetch(Long.MAX_VALUE);
}

View File

@@ -1748,6 +1748,28 @@ public class FileSystemTest extends VertxTestBase {
await();
}
@Test
public void testPausedEnd() throws Exception {
String fileName = "some-file.dat";
createFile(fileName, new byte[0]);
AtomicBoolean paused = new AtomicBoolean(false);
vertx.fileSystem().open(testDir + pathSep + fileName, new OpenOptions(), onSuccess(file -> {
Buffer buffer = Buffer.buffer();
paused.set(true);
file.pause();
vertx.setTimer(100, id -> {
paused.set(false);
file.resume();
});
file.endHandler(v -> {
assertFalse(paused.get());
testComplete();
});
file.handler(buffer::appendBuffer);
}));
await();
}
private Handler<AsyncResult<Void>> createHandler(boolean shouldPass, Handler<Void> afterOK) {
return ar -> {
if (ar.failed()) {

View File

@@ -14,7 +14,6 @@ package io.vertx.core.http;
import io.netty.handler.codec.TooLongFrameException;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.HttpClientRequestImpl;
import io.vertx.core.http.impl.HttpServerImpl;
import io.vertx.core.http.impl.HttpUtils;
import io.vertx.core.impl.ConcurrentHashSet;
@@ -2741,9 +2740,9 @@ public class Http1xTest extends HttpTest {
client = vertx.createHttpClient(new HttpClientOptions().setMaxPoolSize(1).setPipelining(true).setKeepAlive(true));
AtomicInteger connCount = new AtomicInteger();
client.connectionHandler(conn -> connCount.incrementAndGet());
HttpClientRequestImpl req = (HttpClientRequestImpl) client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/first", resp -> {
HttpClientRequest req = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/first", onSuccess(resp -> {
fail();
});
}));
req.reset(0);
CountDownLatch respLatch = new CountDownLatch(2);
client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/second", onSuccess(resp -> {

View File

@@ -2688,13 +2688,23 @@ public abstract class HttpTest extends HttpTestBase {
}
@Test
public void testPausedHttpServerRequestDuringLastChunkEndsTheRequest() throws Exception {
public void testHttpServerRequestPausedDuringLastChunk() throws Exception {
server.requestHandler(req -> {
AtomicBoolean ended = new AtomicBoolean();
AtomicBoolean paused = new AtomicBoolean();
req.handler(buff -> {
assertEquals("small", buff.toString());
req.pause();
paused.set(true);
vertx.setTimer(20, id -> {
assertFalse(ended.get());
paused.set(false);
req.resume();
});
});
req.endHandler(v -> {
assertFalse(paused.get());
ended.set(true);
req.response().end();
});
});
@@ -2707,6 +2717,36 @@ public abstract class HttpTest extends HttpTestBase {
await();
}
@Test
public void testHttpClientResponsePausedDuringLastChunk() throws Exception {
server.requestHandler(req -> {
req.response().end("small");
});
startServer();
client.close();
client = vertx.createHttpClient(createBaseClientOptions().setMaxPoolSize(1));
client.getNow(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/someuri", onSuccess(resp -> {
AtomicBoolean ended = new AtomicBoolean();
AtomicBoolean paused = new AtomicBoolean();
resp.handler(buff -> {
assertEquals("small", buff.toString());
resp.pause();
paused.set(true);
vertx.setTimer(20, id -> {
assertFalse(ended.get());
paused.set(false);
resp.resume();
});
});
resp.endHandler(v -> {
assertFalse(paused.get());
ended.set(true);
complete();
});
}));
await();
}
@Test
public void testFormUploadSmallFile() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(100), false);

View File

@@ -2694,4 +2694,26 @@ public class WebSocketTest extends VertxTestBase {
}));
await();
}
}
@Test
public void testPausedDuringLastChunk() {
server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT))
.websocketHandler(ws -> {
AtomicBoolean paused = new AtomicBoolean(true);
ws.pause();
ws.closeHandler(v -> {
paused.set(false);
ws.resume();
});
ws.endHandler(v -> {
assertFalse(paused.get());
testComplete();
});
})
.listen(onSuccess(v -> {
client.websocket(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/someuri", ws -> {
ws.close();
});
}));
await();
}}

View File

@@ -3383,6 +3383,28 @@ public class NetTest extends VertxTestBase {
awaitLatch(latch);
}
@Test
public void testPausedDuringLastChunk() throws Exception {
server.connectHandler(so -> {
AtomicBoolean paused = new AtomicBoolean();
paused.set(true);
so.pause();
so.closeHandler(v -> {
paused.set(false);
so.resume();
});
so.endHandler(v -> {
assertFalse(paused.get());
testComplete();
});
});
startServer();
client.connect(1234, "localhost", onSuccess(so -> {
so.close();
}));
await();
}
protected void startServer() throws Exception {
startServer(testAddress, vertx.getOrCreateContext());
}