mirror of
https://github.com/jlengrand/vert.x.git
synced 2026-03-10 08:51:19 +00:00
HTTP event handler should not be called when holding a lock and signal end of stream using a sentinel. Fixes #2915 - Fixes #2916
This commit is contained in:
@@ -193,12 +193,12 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
|
||||
private final int id;
|
||||
private final Http1xClientConnection conn;
|
||||
private final Future<HttpClientStream> fut;
|
||||
private final InboundBuffer<Object> queue;
|
||||
private HttpClientRequestImpl request;
|
||||
private HttpClientResponseImpl response;
|
||||
private boolean requestEnded;
|
||||
private boolean responseEnded;
|
||||
private boolean reset;
|
||||
private InboundBuffer<Buffer> queue;
|
||||
private MultiMap trailers;
|
||||
private StreamImpl next;
|
||||
|
||||
@@ -345,12 +345,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
|
||||
|
||||
@Override
|
||||
public void doFetch(long amount) {
|
||||
synchronized (this) {
|
||||
if (queue.fetch(amount) || !responseEnded) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
response.handleEnd(trailers);
|
||||
queue.fetch(amount);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -460,10 +455,11 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
|
||||
}
|
||||
}
|
||||
}
|
||||
queue.handler(buf -> response.handleChunk(buf));
|
||||
queue.emptyHandler(v -> {
|
||||
if (responseEnded) {
|
||||
queue.handler(buf -> {
|
||||
if (buf == InboundBuffer.END_SENTINEL) {
|
||||
response.handleEnd(trailers);
|
||||
} else {
|
||||
response.handleChunk((Buffer) buf);
|
||||
}
|
||||
});
|
||||
queue.drainHandler(v -> {
|
||||
@@ -486,9 +482,11 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
|
||||
}
|
||||
}
|
||||
trailers = new HeadersAdaptor(trailer.trailingHeaders());
|
||||
if (queue.isEmpty() && !queue.isPaused()) {
|
||||
response.handleEnd(trailers);
|
||||
}
|
||||
conn.close |= !conn.options.isKeepAlive();
|
||||
conn.doResume();
|
||||
}
|
||||
queue.write(InboundBuffer.END_SENTINEL);
|
||||
synchronized (conn) {
|
||||
responseEnded = true;
|
||||
conn.close |= !conn.options.isKeepAlive();
|
||||
conn.doResume();
|
||||
|
||||
@@ -111,7 +111,7 @@ public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocket
|
||||
return metrics;
|
||||
}
|
||||
|
||||
public synchronized void handleMessage(Object msg) {
|
||||
public void handleMessage(Object msg) {
|
||||
if (msg instanceof HttpRequest) {
|
||||
DefaultHttpRequest request = (DefaultHttpRequest) msg;
|
||||
if (request.decoderResult() != DecoderResult.SUCCESS) {
|
||||
@@ -119,15 +119,17 @@ public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocket
|
||||
return;
|
||||
}
|
||||
HttpServerRequestImpl req = new HttpServerRequestImpl(this, request);
|
||||
requestInProgress = req;
|
||||
if (responseInProgress == null) {
|
||||
synchronized (this) {
|
||||
requestInProgress = req;
|
||||
if (responseInProgress != null) {
|
||||
// Deferred until the current response completion
|
||||
req.pause();
|
||||
responseInProgress.enqueue(req);
|
||||
return;
|
||||
}
|
||||
responseInProgress = requestInProgress;
|
||||
req.handleBegin();
|
||||
} else {
|
||||
// Deferred until the current response completion
|
||||
req.pause();
|
||||
responseInProgress.appendRequest(req);
|
||||
}
|
||||
req.handleBegin(requestHandler);
|
||||
} else if (msg == LastHttpContent.EMPTY_LAST_CONTENT) {
|
||||
handleEnd();
|
||||
} else if (msg instanceof HttpContent) {
|
||||
@@ -144,10 +146,14 @@ public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocket
|
||||
return;
|
||||
}
|
||||
Buffer buffer = Buffer.buffer(VertxHandler.safeBuffer(content.content(), chctx.alloc()));
|
||||
if (METRICS_ENABLED) {
|
||||
reportBytesRead(buffer);
|
||||
HttpServerRequestImpl request;
|
||||
synchronized (this) {
|
||||
if (METRICS_ENABLED) {
|
||||
reportBytesRead(buffer);
|
||||
}
|
||||
request = requestInProgress;
|
||||
}
|
||||
requestInProgress.handleContent(buffer);
|
||||
request.handleContent(buffer);
|
||||
//TODO chunk trailers
|
||||
if (content instanceof LastHttpContent) {
|
||||
handleEnd();
|
||||
@@ -155,11 +161,14 @@ public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocket
|
||||
}
|
||||
|
||||
private void handleEnd() {
|
||||
if (METRICS_ENABLED) {
|
||||
reportRequestComplete();
|
||||
HttpServerRequestImpl request;
|
||||
synchronized (this) {
|
||||
if (METRICS_ENABLED) {
|
||||
reportRequestComplete();
|
||||
}
|
||||
request = requestInProgress;
|
||||
requestInProgress = null;
|
||||
}
|
||||
HttpServerRequestImpl request = requestInProgress;
|
||||
requestInProgress = null;
|
||||
request.handleEnd();
|
||||
}
|
||||
|
||||
@@ -169,16 +178,19 @@ public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocket
|
||||
}
|
||||
HttpServerRequestImpl request = responseInProgress;
|
||||
responseInProgress = null;
|
||||
HttpServerRequestImpl next = request.nextRequest();
|
||||
HttpServerRequestImpl next = request.next();
|
||||
if (next != null) {
|
||||
// Handle pipelined request
|
||||
handleNext(next);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleNext(HttpServerRequestImpl request) {
|
||||
responseInProgress = request;
|
||||
getContext().runOnContext(v -> responseInProgress.handlePipelined());
|
||||
private void handleNext(HttpServerRequestImpl next) {
|
||||
responseInProgress = next;
|
||||
getContext().runOnContext(v -> {
|
||||
next.resume();
|
||||
next.handleBegin(requestHandler);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -171,34 +171,38 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
|
||||
}
|
||||
|
||||
void handleEnd(MultiMap trailers) {
|
||||
streamEnded = true;
|
||||
ended = true;
|
||||
conn.reportBytesRead(bytesRead);
|
||||
if (postRequestDecoder != null) {
|
||||
try {
|
||||
postRequestDecoder.offer(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
while (postRequestDecoder.hasNext()) {
|
||||
InterfaceHttpData data = postRequestDecoder.next();
|
||||
if (data instanceof Attribute) {
|
||||
Attribute attr = (Attribute) data;
|
||||
try {
|
||||
formAttributes().add(attr.getName(), attr.getValue());
|
||||
} catch (Exception e) {
|
||||
// Will never happen, anyway handle it somehow just in case
|
||||
handleException(e);
|
||||
Handler<Void> handler;
|
||||
synchronized (conn) {
|
||||
streamEnded = true;
|
||||
ended = true;
|
||||
conn.reportBytesRead(bytesRead);
|
||||
if (postRequestDecoder != null) {
|
||||
try {
|
||||
postRequestDecoder.offer(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
while (postRequestDecoder.hasNext()) {
|
||||
InterfaceHttpData data = postRequestDecoder.next();
|
||||
if (data instanceof Attribute) {
|
||||
Attribute attr = (Attribute) data;
|
||||
try {
|
||||
formAttributes().add(attr.getName(), attr.getValue());
|
||||
} catch (Exception e) {
|
||||
// Will never happen, anyway handle it somehow just in case
|
||||
handleException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (HttpPostRequestDecoder.EndOfDataDecoderException e) {
|
||||
// ignore this as it is expected
|
||||
} catch (Exception e) {
|
||||
handleException(e);
|
||||
} finally {
|
||||
postRequestDecoder.destroy();
|
||||
}
|
||||
} catch (HttpPostRequestDecoder.EndOfDataDecoderException e) {
|
||||
// ignore this as it is expected
|
||||
} catch (Exception e) {
|
||||
handleException(e);
|
||||
} finally {
|
||||
postRequestDecoder.destroy();
|
||||
}
|
||||
handler = endHandler;
|
||||
}
|
||||
if (endHandler != null) {
|
||||
endHandler.handle(null);
|
||||
if (handler != null) {
|
||||
handler.handle(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -225,15 +225,17 @@ public class HttpClientResponseImpl implements HttpClientResponse {
|
||||
}
|
||||
|
||||
void handleChunk(Buffer data) {
|
||||
Handler<Buffer> handler;
|
||||
synchronized (conn) {
|
||||
request.dataReceived();
|
||||
bytesRead += data.length();
|
||||
if (dataHandler != null) {
|
||||
try {
|
||||
dataHandler.handle(data);
|
||||
} catch (Throwable t) {
|
||||
handleException(t);
|
||||
}
|
||||
handler = dataHandler;
|
||||
}
|
||||
if (handler != null) {
|
||||
try {
|
||||
handler.handle(data);
|
||||
} catch (Throwable t) {
|
||||
handleException(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,6 @@ import java.net.URISyntaxException;
|
||||
import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED;
|
||||
import static io.netty.handler.codec.http.HttpHeaderValues.MULTIPART_FORM_DATA;
|
||||
import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED;
|
||||
import static io.vertx.core.http.impl.HttpUtils.SC_SWITCHING_PROTOCOLS;
|
||||
|
||||
/**
|
||||
* This class is optimised for performance when used on the same event loop that is was passed to the handler with.
|
||||
@@ -85,8 +84,7 @@ public class HttpServerRequestImpl implements HttpServerRequest {
|
||||
private HttpPostRequestDecoder decoder;
|
||||
private boolean ended;
|
||||
private long bytesRead;
|
||||
|
||||
private InboundBuffer<Buffer> pending;
|
||||
private InboundBuffer<Object> pending;
|
||||
|
||||
HttpServerRequestImpl(Http1xServerConnection conn, HttpRequest request) {
|
||||
this.conn = conn;
|
||||
@@ -109,37 +107,38 @@ public class HttpServerRequestImpl implements HttpServerRequest {
|
||||
}
|
||||
}
|
||||
|
||||
private InboundBuffer<Buffer> pendingQueue() {
|
||||
private InboundBuffer<Object> pendingQueue() {
|
||||
if (pending == null) {
|
||||
pending = new InboundBuffer<>(conn.getContext(), 8);
|
||||
pending.drainHandler(v -> conn.doResume());
|
||||
pending.emptyHandler(v -> {
|
||||
if (ended) {
|
||||
doEnd();
|
||||
pending.handler(buffer -> {
|
||||
if (buffer == InboundBuffer.END_SENTINEL) {
|
||||
onEnd();
|
||||
} else {
|
||||
onData((Buffer) buffer);
|
||||
}
|
||||
});
|
||||
pending.handler(this::handleData);
|
||||
}
|
||||
return pending;
|
||||
}
|
||||
|
||||
private void enqueueData(Buffer chunk) {
|
||||
// We queue requests if paused or a request is in progress to prevent responses being written in the wrong order
|
||||
if (!pendingQueue().write(chunk)) {
|
||||
// We only pause when we are actively called by the connection
|
||||
conn.doPause();
|
||||
}
|
||||
}
|
||||
|
||||
void handleContent(Buffer buffer) {
|
||||
if (pending != null) {
|
||||
enqueueData(buffer);
|
||||
InboundBuffer<Object> queue;
|
||||
synchronized (conn) {
|
||||
queue = pending;
|
||||
}
|
||||
if (queue != null) {
|
||||
// We queue requests if paused or a request is in progress to prevent responses being written in the wrong order
|
||||
if (!queue.write(buffer)) {
|
||||
// We only pause when we are actively called by the connection
|
||||
conn.doPause();
|
||||
}
|
||||
} else {
|
||||
handleData(buffer);
|
||||
onData(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
void handleBegin() {
|
||||
void handleBegin(Handler<HttpServerRequest> handler) {
|
||||
if (Metrics.METRICS_ENABLED) {
|
||||
reportRequestBegin();
|
||||
}
|
||||
@@ -147,33 +146,29 @@ public class HttpServerRequestImpl implements HttpServerRequest {
|
||||
if (conn.handle100ContinueAutomatically) {
|
||||
check100();
|
||||
}
|
||||
conn.requestHandler.handle(this);
|
||||
handler.handle(this);
|
||||
}
|
||||
|
||||
void appendRequest(HttpServerRequestImpl next) {
|
||||
/**
|
||||
* Enqueue a pipelined request.
|
||||
*
|
||||
* @param request the enqueued request
|
||||
*/
|
||||
void enqueue(HttpServerRequestImpl request) {
|
||||
HttpServerRequestImpl current = this;
|
||||
while (current.next != null) {
|
||||
current = current.next;
|
||||
}
|
||||
current.next = next;
|
||||
current.next = request;
|
||||
}
|
||||
|
||||
HttpServerRequestImpl nextRequest() {
|
||||
/**
|
||||
* @return the next request following this one
|
||||
*/
|
||||
HttpServerRequestImpl next() {
|
||||
return next;
|
||||
}
|
||||
|
||||
void handlePipelined() {
|
||||
boolean end = ended;
|
||||
ended = false;
|
||||
handleBegin();
|
||||
if (pending != null && pending.isPaused()) {
|
||||
pending.resume();
|
||||
}
|
||||
if (end) {
|
||||
handleEnd();
|
||||
}
|
||||
}
|
||||
|
||||
private void reportRequestBegin() {
|
||||
if (conn.metrics != null) {
|
||||
metric = conn.metrics.requestBegin(conn.metric(), this);
|
||||
@@ -326,9 +321,7 @@ public class HttpServerRequestImpl implements HttpServerRequest {
|
||||
@Override
|
||||
public HttpServerRequest pause() {
|
||||
synchronized (conn) {
|
||||
if (!isEnded()) {
|
||||
pendingQueue().pause();
|
||||
}
|
||||
pendingQueue().pause();
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -336,15 +329,7 @@ public class HttpServerRequestImpl implements HttpServerRequest {
|
||||
@Override
|
||||
public HttpServerRequest fetch(long amount) {
|
||||
synchronized (conn) {
|
||||
if (!isEnded()) {
|
||||
if (ended) {
|
||||
if (!pending.fetch(amount)) {
|
||||
doEnd();
|
||||
}
|
||||
} else if (pending != null) {
|
||||
pending.fetch(amount);
|
||||
}
|
||||
}
|
||||
pendingQueue().fetch(amount);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -500,7 +485,8 @@ public class HttpServerRequestImpl implements HttpServerRequest {
|
||||
return conn;
|
||||
}
|
||||
|
||||
private void handleData(Buffer data) {
|
||||
private void onData(Buffer data) {
|
||||
Handler<Buffer> handler;
|
||||
synchronized (conn) {
|
||||
bytesRead += data.length();
|
||||
if (decoder != null) {
|
||||
@@ -510,28 +496,37 @@ public class HttpServerRequestImpl implements HttpServerRequest {
|
||||
handleException(e);
|
||||
}
|
||||
}
|
||||
if (dataHandler != null) {
|
||||
dataHandler.handle(data);
|
||||
}
|
||||
handler = dataHandler;
|
||||
}
|
||||
if (handler != null) {
|
||||
handler.handle(data);
|
||||
}
|
||||
}
|
||||
|
||||
void handleEnd() {
|
||||
InboundBuffer<Object> queue;
|
||||
synchronized (conn) {
|
||||
ended = true;
|
||||
if (isEnded()) {
|
||||
doEnd();
|
||||
}
|
||||
queue = pending;
|
||||
}
|
||||
if (queue != null) {
|
||||
queue.write(InboundBuffer.END_SENTINEL);
|
||||
} else {
|
||||
onEnd();
|
||||
}
|
||||
}
|
||||
|
||||
private void doEnd() {
|
||||
if (decoder != null) {
|
||||
endDecode();
|
||||
private void onEnd() {
|
||||
Handler<Void> handler;
|
||||
synchronized (conn) {
|
||||
if (decoder != null) {
|
||||
endDecode();
|
||||
}
|
||||
handler = endHandler;
|
||||
}
|
||||
// If there have been uploads then we let the last one call the end handler once any fileuploads are complete
|
||||
if (endHandler != null) {
|
||||
endHandler.handle(null);
|
||||
if (handler != null) {
|
||||
handler.handle(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,12 +38,11 @@ final class NettyFileUpload implements FileUpload, ReadStream<Buffer> {
|
||||
private Charset charset;
|
||||
private boolean completed;
|
||||
private long maxSize = -1;
|
||||
|
||||
private final HttpServerRequest request;
|
||||
private final InboundBuffer<Buffer> pending;
|
||||
private boolean ended;
|
||||
private final InboundBuffer<Object> pending;
|
||||
private Handler<Void> endHandler;
|
||||
private Handler<Throwable> exceptionHandler;
|
||||
private Handler<Buffer> dataHandler;
|
||||
|
||||
NettyFileUpload(Context context, HttpServerRequest request, String name, String filename, String contentType, String contentTransferEncoding, Charset charset) {
|
||||
this.name = name;
|
||||
@@ -51,11 +50,22 @@ final class NettyFileUpload implements FileUpload, ReadStream<Buffer> {
|
||||
this.contentType = contentType;
|
||||
this.contentTransferEncoding = contentTransferEncoding;
|
||||
this.charset = charset;
|
||||
|
||||
this.request = request;
|
||||
this.pending = new InboundBuffer<Buffer>(context)
|
||||
this.pending = new InboundBuffer<>(context)
|
||||
.drainHandler(v -> request.resume())
|
||||
.emptyHandler(v -> checkComplete());
|
||||
.handler(buff -> {
|
||||
if (buff == InboundBuffer.END_SENTINEL) {
|
||||
Handler<Void> handler = endHandler();
|
||||
if (handler != null) {
|
||||
handler.handle(null);
|
||||
}
|
||||
} else {
|
||||
Handler<Buffer> handler = handler();
|
||||
if (handler != null) {
|
||||
handler.handle((Buffer) buff);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -64,9 +74,13 @@ final class NettyFileUpload implements FileUpload, ReadStream<Buffer> {
|
||||
return this;
|
||||
}
|
||||
|
||||
private Handler<Buffer> handler() {
|
||||
return dataHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NettyFileUpload handler(Handler<Buffer> handler) {
|
||||
pending.handler(handler);
|
||||
public synchronized NettyFileUpload handler(Handler<Buffer> handler) {
|
||||
dataHandler = handler;
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -84,10 +98,13 @@ final class NettyFileUpload implements FileUpload, ReadStream<Buffer> {
|
||||
@Override
|
||||
public NettyFileUpload fetch(long amount) {
|
||||
pending.fetch(amount);
|
||||
checkComplete();
|
||||
return this;
|
||||
}
|
||||
|
||||
private synchronized Handler<Void> endHandler() {
|
||||
return endHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized NettyFileUpload endHandler(Handler<Void> handler) {
|
||||
endHandler = handler;
|
||||
@@ -103,29 +120,7 @@ final class NettyFileUpload implements FileUpload, ReadStream<Buffer> {
|
||||
}
|
||||
|
||||
private void end() {
|
||||
synchronized (this) {
|
||||
ended = true;
|
||||
}
|
||||
checkComplete();
|
||||
}
|
||||
|
||||
private void checkComplete() {
|
||||
synchronized (this) {
|
||||
if (!pending.isEmpty() || pending.isPaused() || !ended) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
notifyEndHandler();
|
||||
}
|
||||
|
||||
private void notifyEndHandler() {
|
||||
Handler<Void> handler;
|
||||
synchronized (this) {
|
||||
handler = endHandler;
|
||||
}
|
||||
if (handler != null) {
|
||||
handler.handle(null);
|
||||
}
|
||||
pending.write(InboundBuffer.END_SENTINEL);
|
||||
}
|
||||
|
||||
public void handleException(Throwable err) {
|
||||
|
||||
@@ -36,7 +36,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
|
||||
protected final ChannelHandlerContext handlerContext;
|
||||
protected final Http2Stream stream;
|
||||
|
||||
private final InboundBuffer<Buffer> pending;
|
||||
private final InboundBuffer<Object> pending;
|
||||
private int pendingBytes;
|
||||
private MultiMap trailers;
|
||||
private boolean writable;
|
||||
@@ -58,13 +58,14 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
|
||||
conn.handler.consume(stream, numBytes);
|
||||
});
|
||||
|
||||
pending.handler(this::handleData);
|
||||
pending.exceptionHandler(context.exceptionHandler());
|
||||
pending.emptyHandler(v -> {
|
||||
if (trailers != null) {
|
||||
pending.handler(buff -> {
|
||||
if (buff == InboundBuffer.END_SENTINEL) {
|
||||
handleEnd(trailers);
|
||||
} else {
|
||||
handleData((Buffer) buff);
|
||||
}
|
||||
});
|
||||
pending.exceptionHandler(context.exceptionHandler());
|
||||
|
||||
pending.resume();
|
||||
}
|
||||
@@ -95,10 +96,8 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
|
||||
void onEnd(MultiMap map) {
|
||||
synchronized (conn) {
|
||||
trailers = map;
|
||||
if (pending.isEmpty() && !pending.isPaused()) {
|
||||
handleEnd(trailers);
|
||||
}
|
||||
}
|
||||
pending.write(InboundBuffer.END_SENTINEL);
|
||||
}
|
||||
|
||||
int id() {
|
||||
@@ -110,11 +109,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
|
||||
}
|
||||
|
||||
public void doFetch(long amount) {
|
||||
if (!pending.fetch(amount)) {
|
||||
if (trailers != null) {
|
||||
handleEnd(trailers);
|
||||
}
|
||||
}
|
||||
pending.fetch(amount);
|
||||
}
|
||||
|
||||
boolean isNotWritable() {
|
||||
|
||||
@@ -73,6 +73,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
|
||||
private Handler<Void> drainHandler;
|
||||
private InboundBuffer<Object> pending;
|
||||
private MessageConsumer registration;
|
||||
private Handler<Object> messageHandler;
|
||||
private boolean closed;
|
||||
|
||||
public NetSocketImpl(VertxInternal vertx, ChannelHandlerContext channel, ContextInternal context,
|
||||
@@ -87,10 +88,22 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
|
||||
this.writeHandlerID = "__vertx.net." + UUID.randomUUID().toString();
|
||||
this.remoteAddress = remoteAddress;
|
||||
this.metrics = metrics;
|
||||
this.messageHandler = NULL_MSG_HANDLER;
|
||||
pending = new InboundBuffer<>(context);
|
||||
pending.drainHandler(v -> doResume());
|
||||
pending.handler(NULL_MSG_HANDLER);
|
||||
pending.emptyHandler(v -> checkEnd());
|
||||
pending.handler(obj -> {
|
||||
if (obj == InboundBuffer.END_SENTINEL) {
|
||||
Handler<Void> handler = endHandler();
|
||||
if (handler != null) {
|
||||
handler.handle(null);
|
||||
}
|
||||
} else {
|
||||
Handler<Object> handler = messageHandler();
|
||||
if (handler != null) {
|
||||
handler.handle(obj);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
synchronized void registerEventBusHandler() {
|
||||
@@ -175,13 +188,13 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
|
||||
return this;
|
||||
}
|
||||
|
||||
private synchronized Handler<Object> messageHandler() {
|
||||
return messageHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized NetSocketInternal messageHandler(Handler<Object> handler) {
|
||||
if (handler != null) {
|
||||
pending.handler(handler);
|
||||
} else {
|
||||
pending.handler(NULL_MSG_HANDLER);
|
||||
}
|
||||
messageHandler = handler;
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -193,9 +206,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
|
||||
|
||||
@Override
|
||||
public NetSocket fetch(long amount) {
|
||||
if (!pending.fetch(amount)) {
|
||||
checkEnd();
|
||||
}
|
||||
pending.fetch(amount);
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -215,6 +226,10 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
|
||||
return isNotWritable();
|
||||
}
|
||||
|
||||
private synchronized Handler<Void> endHandler() {
|
||||
return endHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized NetSocket endHandler(Handler<Void> endHandler) {
|
||||
this.endHandler = endHandler;
|
||||
@@ -349,25 +364,14 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
|
||||
consumer = registration;
|
||||
registration = null;
|
||||
}
|
||||
checkEnd();
|
||||
pending.write(InboundBuffer.END_SENTINEL);
|
||||
super.handleClosed();
|
||||
if (consumer != null) {
|
||||
consumer.unregister();
|
||||
}
|
||||
}
|
||||
|
||||
private void checkEnd() {
|
||||
Handler<Void> handler;
|
||||
synchronized (this) {
|
||||
if (!closed || pending.isPaused() || (handler = endHandler) == null) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
handler.handle(null);
|
||||
}
|
||||
|
||||
public synchronized void handleMessage(Object msg) {
|
||||
checkContext();
|
||||
public void handleMessage(Object msg) {
|
||||
if (!pending.write(msg)) {
|
||||
doPause();
|
||||
}
|
||||
|
||||
@@ -65,6 +65,11 @@ import java.util.Objects;
|
||||
*/
|
||||
public class InboundBuffer<E> {
|
||||
|
||||
/**
|
||||
* A reusable sentinel for signaling the end of a stream.
|
||||
*/
|
||||
public static final Object END_SENTINEL = new Object();
|
||||
|
||||
private final Context context;
|
||||
private final ArrayDeque<E> pending;
|
||||
private final long highWaterMark;
|
||||
|
||||
@@ -1341,6 +1341,39 @@ public class Http1xTest extends HttpTest {
|
||||
await();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPipeliningPauseRequest() throws Exception {
|
||||
int n = 10;
|
||||
client.close();
|
||||
client = vertx.createHttpClient(new HttpClientOptions().setKeepAlive(true).setPipelining(true).setMaxPoolSize(1));
|
||||
server.requestHandler(req -> {
|
||||
AtomicBoolean paused = new AtomicBoolean();
|
||||
paused.set(true);
|
||||
req.pause();
|
||||
req.bodyHandler(buff -> {
|
||||
assertFalse(paused.get());
|
||||
req.response().end();
|
||||
});
|
||||
vertx.setTimer(30, id -> {
|
||||
paused.set(false);
|
||||
req.resume();
|
||||
});
|
||||
});
|
||||
startServer();
|
||||
AtomicInteger remaining = new AtomicInteger(n);
|
||||
for (int i = 0;i < n;i++) {
|
||||
HttpClientRequest req = client.put(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
|
||||
resp.endHandler(v -> {
|
||||
if (remaining.decrementAndGet() == 0) {
|
||||
testComplete();
|
||||
}
|
||||
});
|
||||
});
|
||||
req.end(TestUtils.randomAlphaString(16));
|
||||
}
|
||||
await();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerPipeliningConnectionConcurrency() throws Exception {
|
||||
int n = 5;
|
||||
|
||||
@@ -121,7 +121,12 @@ public abstract class HttpMetricsTestBase extends HttpTestBase {
|
||||
AsyncTestBase.assertWaitUntil(() -> metrics.endpoints().isEmpty());
|
||||
assertEquals(null, metrics.connectionCount("localhost:8080"));
|
||||
AsyncTestBase.assertWaitUntil(() -> !serverMetric.get().socket.connected.get());
|
||||
AsyncTestBase.assertWaitUntil(() -> contentLength == serverMetric.get().socket.bytesRead.get());
|
||||
try {
|
||||
AsyncTestBase.assertWaitUntil(() -> contentLength == serverMetric.get().socket.bytesRead.get());
|
||||
} catch (Exception e) {
|
||||
System.out.println(contentLength + " == " + serverMetric.get().socket.bytesRead.get());
|
||||
throw e;
|
||||
}
|
||||
AsyncTestBase.assertWaitUntil(() -> contentLength == serverMetric.get().socket.bytesWritten.get());
|
||||
AsyncTestBase.assertWaitUntil(() -> !clientMetric.get().socket.connected.get());
|
||||
assertEquals(contentLength, clientMetric.get().socket.bytesRead.get());
|
||||
|
||||
@@ -4276,7 +4276,93 @@ public abstract class HttpTest extends HttpTestBase {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerResponseCloseHandlerNotHoldingLock() throws Exception {
|
||||
public void testEventHandlersNotHoldingLock() throws Exception {
|
||||
waitFor(2);
|
||||
server.requestHandler(req -> {
|
||||
HttpConnection conn = req.connection();
|
||||
switch (req.path()) {
|
||||
case "/0":
|
||||
req.handler(chunk -> {
|
||||
assertFalse(Thread.holdsLock(conn));
|
||||
});
|
||||
req.endHandler(v -> {
|
||||
assertFalse(Thread.holdsLock(conn));
|
||||
req.response().end(TestUtils.randomAlphaString(256));
|
||||
});
|
||||
break;
|
||||
case "/1":
|
||||
AtomicBoolean paused = new AtomicBoolean();
|
||||
req.pause();
|
||||
paused.set(true);
|
||||
vertx.runOnContext(v -> {
|
||||
paused.set(false);
|
||||
req.resume();
|
||||
});
|
||||
req.handler(chunk -> {
|
||||
assertFalse(Thread.holdsLock(conn));
|
||||
assertFalse(paused.get());
|
||||
paused.set(true);
|
||||
req.pause();
|
||||
vertx.runOnContext(v -> {
|
||||
paused.set(false);
|
||||
req.resume();
|
||||
});
|
||||
});
|
||||
req.endHandler(v -> {
|
||||
assertFalse(Thread.holdsLock(conn));
|
||||
assertFalse(paused.get());
|
||||
req.response().end(TestUtils.randomAlphaString(256));
|
||||
});
|
||||
break;
|
||||
}
|
||||
});
|
||||
startServer();
|
||||
for (int i = 0;i < 2;i++) {
|
||||
client.post(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/" + i, resp -> {
|
||||
assertEquals(200, resp.statusCode());
|
||||
HttpConnection conn = resp.request().connection();
|
||||
switch (resp.request().path()) {
|
||||
case "/0":
|
||||
resp.handler(chunk -> {
|
||||
assertFalse(Thread.holdsLock(conn));
|
||||
});
|
||||
resp.endHandler(v -> {
|
||||
assertFalse(Thread.holdsLock(conn));
|
||||
complete();
|
||||
});
|
||||
break;
|
||||
case "/1":
|
||||
AtomicBoolean paused = new AtomicBoolean();
|
||||
resp.pause();
|
||||
paused.set(true);
|
||||
vertx.runOnContext(v -> {
|
||||
paused.set(false);
|
||||
resp.resume();
|
||||
});
|
||||
resp.handler(chunk -> {
|
||||
assertFalse(Thread.holdsLock(conn));
|
||||
assertFalse(paused.get());
|
||||
paused.set(true);
|
||||
resp.pause();
|
||||
vertx.runOnContext(v -> {
|
||||
paused.set(false);
|
||||
resp.resume();
|
||||
});
|
||||
});
|
||||
resp.endHandler(v -> {
|
||||
assertFalse(Thread.holdsLock(conn));
|
||||
assertFalse(paused.get());
|
||||
complete();
|
||||
});
|
||||
break;
|
||||
}
|
||||
}).end(TestUtils.randomAlphaString(256));
|
||||
}
|
||||
await();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventHandlersNotHoldingLockOnClose() throws Exception {
|
||||
waitFor(7);
|
||||
server.requestHandler(req -> {
|
||||
HttpConnection conn = req.connection();
|
||||
|
||||
@@ -928,33 +928,6 @@ public class WebSocketTest extends VertxTestBase {
|
||||
await();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteFromConnectHandlerFromAnotherThread() {
|
||||
Buffer expected = Buffer.buffer("AAA");
|
||||
server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT));
|
||||
server.websocketHandler(ws -> {
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
ws.writeFrame(WebSocketFrame.binaryFrame(expected, true));
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
while (t.getState() != Thread.State.BLOCKED) {
|
||||
Thread.yield();
|
||||
}
|
||||
});
|
||||
server.listen(onSuccess(server -> {
|
||||
client.websocket(DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/", ws -> {
|
||||
ws.handler(buff -> {
|
||||
assertEquals(buff, expected);
|
||||
testComplete();
|
||||
});
|
||||
});
|
||||
}));
|
||||
await();
|
||||
}
|
||||
|
||||
@Test
|
||||
// Test normal negotiation of websocket compression
|
||||
public void testNormalWSDeflateFrameCompressionNegotiation() throws Exception {
|
||||
|
||||
@@ -262,7 +262,8 @@ public class MetricsTest extends VertxTestBase {
|
||||
fail("Should not receive message");
|
||||
});
|
||||
consumer.completionHandler(ar -> {
|
||||
assertEquals(Collections.emptyList(), metrics.getRegistrations());
|
||||
List<HandlerMetric> registrations = metrics.getRegistrations();
|
||||
assertEquals(Collections.emptyList(), registrations);
|
||||
testComplete();
|
||||
});
|
||||
consumer.unregister();
|
||||
|
||||
Reference in New Issue
Block a user