diff --git a/src/main/java/io/vertx/core/http/impl/HttpServerFileUploadImpl.java b/src/main/java/io/vertx/core/http/impl/HttpServerFileUploadImpl.java index aa8125317..6ce902066 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpServerFileUploadImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpServerFileUploadImpl.java @@ -17,9 +17,8 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.file.AsyncFile; import io.vertx.core.file.OpenOptions; import io.vertx.core.http.HttpServerFileUpload; -import io.vertx.core.http.HttpServerRequest; import io.vertx.core.streams.Pump; -import io.vertx.core.streams.impl.InboundBuffer; +import io.vertx.core.streams.ReadStream; import java.nio.charset.Charset; @@ -34,7 +33,7 @@ import java.nio.charset.Charset; */ class HttpServerFileUploadImpl implements HttpServerFileUpload { - private final HttpServerRequest req; + private final ReadStream stream; private final Context context; private final String name; private final String filename; @@ -42,33 +41,78 @@ class HttpServerFileUploadImpl implements HttpServerFileUpload { private final String contentTransferEncoding; private final Charset charset; + private Handler dataHandler; private Handler endHandler; private AsyncFile file; private Handler exceptionHandler; private long size; - private InboundBuffer pending; - private boolean ended; - private boolean completed; private boolean lazyCalculateSize; - HttpServerFileUploadImpl(Context context, HttpServerRequest req, String name, String filename, String contentType, + HttpServerFileUploadImpl(Context context, ReadStream stream, String name, String filename, String contentType, String contentTransferEncoding, Charset charset, long size) { this.context = context; - this.req = req; + this.stream = stream; this.name = name; this.filename = filename; this.contentType = contentType; this.contentTransferEncoding = contentTransferEncoding; this.charset = charset; this.size = size; - this.pending = new InboundBuffer(context) - .drainHandler(v -> req.resume()) - .emptyHandler(v -> checkComplete()); if (size == 0) { lazyCalculateSize = true; } + + stream.handler(this::handleData); + stream.endHandler(this::handleEnd); + } + + private void handleData(Buffer data) { + Handler h; + synchronized (HttpServerFileUploadImpl.this) { + h = dataHandler; + if (lazyCalculateSize) { + this.size += data.length(); + } + } + if (h != null) { + h.handle(data); + } + } + + private void handleEnd(Void v) { + AsyncFile toClose; + synchronized (this) { + lazyCalculateSize = false; + toClose = file; + } + if (toClose != null) { + toClose.close(ar -> { + if (ar.failed()) { + notifyExceptionHandler(ar.cause()); + } + notifyEndHandler(); + }); + } else { + notifyEndHandler(); + } + } + + private void notifyEndHandler() { + Handler handler; + synchronized (this) { + handler = endHandler; + } + if (handler != null) { + handler.handle(null); + } + } + + private void notifyExceptionHandler(Throwable cause) { + if (exceptionHandler != null) { + exceptionHandler.handle(cause); + } } @Override @@ -102,27 +146,26 @@ class HttpServerFileUploadImpl implements HttpServerFileUpload { } @Override - public HttpServerFileUpload handler(Handler handler) { - pending.handler(handler); + public synchronized HttpServerFileUpload handler(Handler handler) { + dataHandler = handler; return this; } @Override public HttpServerFileUpload pause() { - pending.pause(); + stream.pause(); return this; } @Override public HttpServerFileUpload fetch(long amount) { - pending.resume(); + stream.fetch(amount); return this; } @Override public HttpServerFileUpload resume() { - pending.resume(); - checkComplete(); + stream.resume(); return this; } @@ -159,67 +202,6 @@ class HttpServerFileUploadImpl implements HttpServerFileUpload { return !lazyCalculateSize; } - synchronized void receiveData(Buffer data) { - if (data.length() != 0) { - // Can sometimes receive zero length packets from Netty! - if (lazyCalculateSize) { - size += data.length(); - } - doReceiveData(data); - } - } - - private synchronized void doReceiveData(Buffer data) { - if (!pending.write(data)) { - req.pause(); - } - } - - void end() { - synchronized (this) { - ended = true; - } - checkComplete(); - } - - private void checkComplete() { - AsyncFile toClose; - synchronized (this) { - if (!pending.isEmpty() || pending.isPaused() || !ended || completed) { - return; - } - completed = true; - lazyCalculateSize = false; - toClose = file; - } - if (toClose != null) { - toClose.close(ar -> { - if (ar.failed()) { - notifyExceptionHandler(ar.cause()); - } - notifyEndHandler(); - }); - } else { - notifyEndHandler(); - } - } - - private void notifyEndHandler() { - Handler handler; - synchronized (this) { - handler = endHandler; - } - if (handler != null) { - handler.handle(null); - } - } - - private void notifyExceptionHandler(Throwable cause) { - if (exceptionHandler != null) { - exceptionHandler.handle(cause); - } - } - @Override public synchronized AsyncFile file() { return file; diff --git a/src/main/java/io/vertx/core/http/impl/NettyFileUpload.java b/src/main/java/io/vertx/core/http/impl/NettyFileUpload.java index 7c83c5661..9b35279b8 100644 --- a/src/main/java/io/vertx/core/http/impl/NettyFileUpload.java +++ b/src/main/java/io/vertx/core/http/impl/NettyFileUpload.java @@ -14,7 +14,12 @@ package io.vertx.core.http.impl; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.multipart.FileUpload; import io.netty.handler.codec.http.multipart.InterfaceHttpData; +import io.vertx.core.Context; +import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.impl.InboundBuffer; import java.io.File; import java.io.IOException; @@ -24,9 +29,8 @@ import java.nio.charset.Charset; /** * @author Julien Viet */ -final class NettyFileUpload implements FileUpload { +final class NettyFileUpload implements FileUpload, ReadStream { - private final HttpServerFileUploadImpl upload; private final String name; private String contentType; private String filename; @@ -35,28 +39,108 @@ final class NettyFileUpload implements FileUpload { private boolean completed; private long maxSize = -1; - NettyFileUpload(HttpServerFileUploadImpl upload, String name, String filename, String contentType, String contentTransferEncoding, Charset charset) { - this.upload = upload; + private final HttpServerRequest request; + private final InboundBuffer pending; + private boolean ended; + private Handler endHandler; + private Handler exceptionHandler; + + NettyFileUpload(Context context, HttpServerRequest request, String name, String filename, String contentType, String contentTransferEncoding, Charset charset) { this.name = name; this.filename = filename; this.contentType = contentType; this.contentTransferEncoding = contentTransferEncoding; this.charset = charset; + + this.request = request; + this.pending = new InboundBuffer(context) + .drainHandler(v -> request.resume()) + .emptyHandler(v -> checkComplete()); + } + + @Override + public synchronized NettyFileUpload exceptionHandler(Handler handler) { + exceptionHandler = handler; + return this; + } + + @Override + public NettyFileUpload handler(Handler handler) { + pending.handler(handler); + return this; + } + + @Override + public NettyFileUpload pause() { + pending.pause(); + return this; + } + + @Override + public NettyFileUpload resume() { + return fetch(Long.MAX_VALUE); + } + + @Override + public NettyFileUpload fetch(long amount) { + pending.fetch(amount); + checkComplete(); + return this; + } + + @Override + public synchronized NettyFileUpload endHandler(Handler handler) { + endHandler = handler; + return this; + } + + private void receiveData(Buffer data) { + if (data.length() != 0) { + if (!pending.write(data)) { + request.pause(); + } + } + } + + private void end() { + synchronized (this) { + ended = true; + } + checkComplete(); + } + + private void checkComplete() { + synchronized (this) { + if (!pending.isEmpty() || pending.isPaused() || !ended) { + return; + } + } + notifyEndHandler(); + } + + private void notifyEndHandler() { + Handler handler; + synchronized (this) { + handler = endHandler; + } + if (handler != null) { + handler.handle(null); + } } @Override public void setContent(ByteBuf channelBuffer) throws IOException { completed = true; - upload.receiveData(Buffer.buffer(channelBuffer)); - upload.end(); + receiveData(Buffer.buffer(channelBuffer)); + end(); } @Override public void addContent(ByteBuf channelBuffer, boolean last) throws IOException { - upload.receiveData(Buffer.buffer(channelBuffer)); + receiveData(Buffer.buffer(channelBuffer)); if (last) { completed = true; - upload.end(); + end(); } } diff --git a/src/main/java/io/vertx/core/http/impl/NettyFileUploadDataFactory.java b/src/main/java/io/vertx/core/http/impl/NettyFileUploadDataFactory.java index 10d467743..560997696 100644 --- a/src/main/java/io/vertx/core/http/impl/NettyFileUploadDataFactory.java +++ b/src/main/java/io/vertx/core/http/impl/NettyFileUploadDataFactory.java @@ -41,10 +41,10 @@ class NettyFileUploadDataFactory extends DefaultHttpDataFactory { @Override public FileUpload createFileUpload(HttpRequest httpRequest, String name, String filename, String contentType, String contentTransferEncoding, Charset charset, long size) { - HttpServerFileUploadImpl upload = new HttpServerFileUploadImpl(context, request, name, filename, contentType, contentTransferEncoding, charset, - size); - NettyFileUpload nettyUpload = new NettyFileUpload(upload, name, filename, contentType, + NettyFileUpload nettyUpload = new NettyFileUpload(context, request, name, filename, contentType, contentTransferEncoding, charset); + HttpServerFileUploadImpl upload = new HttpServerFileUploadImpl(context, nettyUpload, name, filename, contentType, contentTransferEncoding, charset, + size); Handler uploadHandler = lazyUploadHandler.get(); if (uploadHandler != null) { uploadHandler.handle(upload); diff --git a/src/test/java/io/vertx/core/http/HttpTest.java b/src/test/java/io/vertx/core/http/HttpTest.java index c10966081..7fa324596 100644 --- a/src/test/java/io/vertx/core/http/HttpTest.java +++ b/src/test/java/io/vertx/core/http/HttpTest.java @@ -2769,7 +2769,7 @@ public abstract class HttpTest extends HttpTestBase { @Test public void testFormUploadEmptyFile() throws Exception { - testFormUploadFile("", false); + testFormUploadFile("", false, false); } @Test @@ -2789,7 +2789,7 @@ public abstract class HttpTest extends HttpTestBase { @Test public void testFormUploadEmptyFileStreamToDisk() throws Exception { - testFormUploadFile("", true); + testFormUploadFile("", true, false); } @Test