mirror of
https://github.com/jlengrand/vert.x.git
synced 2026-03-10 08:51:19 +00:00
Rework file upload so that NettyFileUpload is a ReadStream<Buffer>
This commit is contained in:
@@ -17,9 +17,8 @@ import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.file.AsyncFile;
|
||||
import io.vertx.core.file.OpenOptions;
|
||||
import io.vertx.core.http.HttpServerFileUpload;
|
||||
import io.vertx.core.http.HttpServerRequest;
|
||||
import io.vertx.core.streams.Pump;
|
||||
import io.vertx.core.streams.impl.InboundBuffer;
|
||||
import io.vertx.core.streams.ReadStream;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
@@ -34,7 +33,7 @@ import java.nio.charset.Charset;
|
||||
*/
|
||||
class HttpServerFileUploadImpl implements HttpServerFileUpload {
|
||||
|
||||
private final HttpServerRequest req;
|
||||
private final ReadStream<Buffer> stream;
|
||||
private final Context context;
|
||||
private final String name;
|
||||
private final String filename;
|
||||
@@ -42,33 +41,78 @@ class HttpServerFileUploadImpl implements HttpServerFileUpload {
|
||||
private final String contentTransferEncoding;
|
||||
private final Charset charset;
|
||||
|
||||
private Handler<Buffer> dataHandler;
|
||||
private Handler<Void> endHandler;
|
||||
private AsyncFile file;
|
||||
private Handler<Throwable> exceptionHandler;
|
||||
|
||||
private long size;
|
||||
private InboundBuffer<Buffer> pending;
|
||||
private boolean ended;
|
||||
private boolean completed;
|
||||
private boolean lazyCalculateSize;
|
||||
|
||||
HttpServerFileUploadImpl(Context context, HttpServerRequest req, String name, String filename, String contentType,
|
||||
HttpServerFileUploadImpl(Context context, ReadStream<Buffer> stream, String name, String filename, String contentType,
|
||||
String contentTransferEncoding,
|
||||
Charset charset, long size) {
|
||||
this.context = context;
|
||||
this.req = req;
|
||||
this.stream = stream;
|
||||
this.name = name;
|
||||
this.filename = filename;
|
||||
this.contentType = contentType;
|
||||
this.contentTransferEncoding = contentTransferEncoding;
|
||||
this.charset = charset;
|
||||
this.size = size;
|
||||
this.pending = new InboundBuffer<Buffer>(context)
|
||||
.drainHandler(v -> req.resume())
|
||||
.emptyHandler(v -> checkComplete());
|
||||
if (size == 0) {
|
||||
lazyCalculateSize = true;
|
||||
}
|
||||
|
||||
stream.handler(this::handleData);
|
||||
stream.endHandler(this::handleEnd);
|
||||
}
|
||||
|
||||
private void handleData(Buffer data) {
|
||||
Handler<Buffer> h;
|
||||
synchronized (HttpServerFileUploadImpl.this) {
|
||||
h = dataHandler;
|
||||
if (lazyCalculateSize) {
|
||||
this.size += data.length();
|
||||
}
|
||||
}
|
||||
if (h != null) {
|
||||
h.handle(data);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleEnd(Void v) {
|
||||
AsyncFile toClose;
|
||||
synchronized (this) {
|
||||
lazyCalculateSize = false;
|
||||
toClose = file;
|
||||
}
|
||||
if (toClose != null) {
|
||||
toClose.close(ar -> {
|
||||
if (ar.failed()) {
|
||||
notifyExceptionHandler(ar.cause());
|
||||
}
|
||||
notifyEndHandler();
|
||||
});
|
||||
} else {
|
||||
notifyEndHandler();
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyEndHandler() {
|
||||
Handler<Void> handler;
|
||||
synchronized (this) {
|
||||
handler = endHandler;
|
||||
}
|
||||
if (handler != null) {
|
||||
handler.handle(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyExceptionHandler(Throwable cause) {
|
||||
if (exceptionHandler != null) {
|
||||
exceptionHandler.handle(cause);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -102,27 +146,26 @@ class HttpServerFileUploadImpl implements HttpServerFileUpload {
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpServerFileUpload handler(Handler<Buffer> handler) {
|
||||
pending.handler(handler);
|
||||
public synchronized HttpServerFileUpload handler(Handler<Buffer> handler) {
|
||||
dataHandler = handler;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpServerFileUpload pause() {
|
||||
pending.pause();
|
||||
stream.pause();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpServerFileUpload fetch(long amount) {
|
||||
pending.resume();
|
||||
stream.fetch(amount);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpServerFileUpload resume() {
|
||||
pending.resume();
|
||||
checkComplete();
|
||||
stream.resume();
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -159,67 +202,6 @@ class HttpServerFileUploadImpl implements HttpServerFileUpload {
|
||||
return !lazyCalculateSize;
|
||||
}
|
||||
|
||||
synchronized void receiveData(Buffer data) {
|
||||
if (data.length() != 0) {
|
||||
// Can sometimes receive zero length packets from Netty!
|
||||
if (lazyCalculateSize) {
|
||||
size += data.length();
|
||||
}
|
||||
doReceiveData(data);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void doReceiveData(Buffer data) {
|
||||
if (!pending.write(data)) {
|
||||
req.pause();
|
||||
}
|
||||
}
|
||||
|
||||
void end() {
|
||||
synchronized (this) {
|
||||
ended = true;
|
||||
}
|
||||
checkComplete();
|
||||
}
|
||||
|
||||
private void checkComplete() {
|
||||
AsyncFile toClose;
|
||||
synchronized (this) {
|
||||
if (!pending.isEmpty() || pending.isPaused() || !ended || completed) {
|
||||
return;
|
||||
}
|
||||
completed = true;
|
||||
lazyCalculateSize = false;
|
||||
toClose = file;
|
||||
}
|
||||
if (toClose != null) {
|
||||
toClose.close(ar -> {
|
||||
if (ar.failed()) {
|
||||
notifyExceptionHandler(ar.cause());
|
||||
}
|
||||
notifyEndHandler();
|
||||
});
|
||||
} else {
|
||||
notifyEndHandler();
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyEndHandler() {
|
||||
Handler<Void> handler;
|
||||
synchronized (this) {
|
||||
handler = endHandler;
|
||||
}
|
||||
if (handler != null) {
|
||||
handler.handle(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyExceptionHandler(Throwable cause) {
|
||||
if (exceptionHandler != null) {
|
||||
exceptionHandler.handle(cause);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized AsyncFile file() {
|
||||
return file;
|
||||
|
||||
@@ -14,7 +14,12 @@ package io.vertx.core.http.impl;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.handler.codec.http.multipart.FileUpload;
|
||||
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
|
||||
import io.vertx.core.Context;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.http.HttpServerRequest;
|
||||
import io.vertx.core.streams.ReadStream;
|
||||
import io.vertx.core.streams.impl.InboundBuffer;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
@@ -24,9 +29,8 @@ import java.nio.charset.Charset;
|
||||
/**
|
||||
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
|
||||
*/
|
||||
final class NettyFileUpload implements FileUpload {
|
||||
final class NettyFileUpload implements FileUpload, ReadStream<Buffer> {
|
||||
|
||||
private final HttpServerFileUploadImpl upload;
|
||||
private final String name;
|
||||
private String contentType;
|
||||
private String filename;
|
||||
@@ -35,28 +39,108 @@ final class NettyFileUpload implements FileUpload {
|
||||
private boolean completed;
|
||||
private long maxSize = -1;
|
||||
|
||||
NettyFileUpload(HttpServerFileUploadImpl upload, String name, String filename, String contentType, String contentTransferEncoding, Charset charset) {
|
||||
this.upload = upload;
|
||||
private final HttpServerRequest request;
|
||||
private final InboundBuffer<Buffer> pending;
|
||||
private boolean ended;
|
||||
private Handler<Void> endHandler;
|
||||
private Handler<Throwable> exceptionHandler;
|
||||
|
||||
NettyFileUpload(Context context, HttpServerRequest request, String name, String filename, String contentType, String contentTransferEncoding, Charset charset) {
|
||||
this.name = name;
|
||||
this.filename = filename;
|
||||
this.contentType = contentType;
|
||||
this.contentTransferEncoding = contentTransferEncoding;
|
||||
this.charset = charset;
|
||||
|
||||
this.request = request;
|
||||
this.pending = new InboundBuffer<Buffer>(context)
|
||||
.drainHandler(v -> request.resume())
|
||||
.emptyHandler(v -> checkComplete());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized NettyFileUpload exceptionHandler(Handler<Throwable> handler) {
|
||||
exceptionHandler = handler;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NettyFileUpload handler(Handler<Buffer> handler) {
|
||||
pending.handler(handler);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NettyFileUpload pause() {
|
||||
pending.pause();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NettyFileUpload resume() {
|
||||
return fetch(Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NettyFileUpload fetch(long amount) {
|
||||
pending.fetch(amount);
|
||||
checkComplete();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized NettyFileUpload endHandler(Handler<Void> handler) {
|
||||
endHandler = handler;
|
||||
return this;
|
||||
}
|
||||
|
||||
private void receiveData(Buffer data) {
|
||||
if (data.length() != 0) {
|
||||
if (!pending.write(data)) {
|
||||
request.pause();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void end() {
|
||||
synchronized (this) {
|
||||
ended = true;
|
||||
}
|
||||
checkComplete();
|
||||
}
|
||||
|
||||
private void checkComplete() {
|
||||
synchronized (this) {
|
||||
if (!pending.isEmpty() || pending.isPaused() || !ended) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
notifyEndHandler();
|
||||
}
|
||||
|
||||
private void notifyEndHandler() {
|
||||
Handler<Void> handler;
|
||||
synchronized (this) {
|
||||
handler = endHandler;
|
||||
}
|
||||
if (handler != null) {
|
||||
handler.handle(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContent(ByteBuf channelBuffer) throws IOException {
|
||||
completed = true;
|
||||
upload.receiveData(Buffer.buffer(channelBuffer));
|
||||
upload.end();
|
||||
receiveData(Buffer.buffer(channelBuffer));
|
||||
end();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addContent(ByteBuf channelBuffer, boolean last) throws IOException {
|
||||
upload.receiveData(Buffer.buffer(channelBuffer));
|
||||
receiveData(Buffer.buffer(channelBuffer));
|
||||
if (last) {
|
||||
completed = true;
|
||||
upload.end();
|
||||
end();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -41,10 +41,10 @@ class NettyFileUploadDataFactory extends DefaultHttpDataFactory {
|
||||
|
||||
@Override
|
||||
public FileUpload createFileUpload(HttpRequest httpRequest, String name, String filename, String contentType, String contentTransferEncoding, Charset charset, long size) {
|
||||
HttpServerFileUploadImpl upload = new HttpServerFileUploadImpl(context, request, name, filename, contentType, contentTransferEncoding, charset,
|
||||
size);
|
||||
NettyFileUpload nettyUpload = new NettyFileUpload(upload, name, filename, contentType,
|
||||
NettyFileUpload nettyUpload = new NettyFileUpload(context, request, name, filename, contentType,
|
||||
contentTransferEncoding, charset);
|
||||
HttpServerFileUploadImpl upload = new HttpServerFileUploadImpl(context, nettyUpload, name, filename, contentType, contentTransferEncoding, charset,
|
||||
size);
|
||||
Handler<HttpServerFileUpload> uploadHandler = lazyUploadHandler.get();
|
||||
if (uploadHandler != null) {
|
||||
uploadHandler.handle(upload);
|
||||
|
||||
@@ -2769,7 +2769,7 @@ public abstract class HttpTest extends HttpTestBase {
|
||||
|
||||
@Test
|
||||
public void testFormUploadEmptyFile() throws Exception {
|
||||
testFormUploadFile("", false);
|
||||
testFormUploadFile("", false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -2789,7 +2789,7 @@ public abstract class HttpTest extends HttpTestBase {
|
||||
|
||||
@Test
|
||||
public void testFormUploadEmptyFileStreamToDisk() throws Exception {
|
||||
testFormUploadFile("", true);
|
||||
testFormUploadFile("", true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user