Use pipe instead of pump for file upload streaming to disk as it will handle terminations and errors - fixes #2865

This commit is contained in:
Julien Viet
2019-03-02 09:37:08 +01:00
parent 3598209020
commit ba47fac59a
8 changed files with 145 additions and 66 deletions

View File

@@ -162,6 +162,11 @@ public interface AsyncFile extends ReadStream<Buffer>, WriteStream<Buffer> {
@Fluent
AsyncFile setWritePos(long writePos);
/**
* @return the current write position the file is at
*/
long getWritePos();
/**
* Sets the buffer size that will be used to read the data from the file. Changing this value will impact how much
* the data will be read at a time from the file system.

View File

@@ -305,6 +305,11 @@ public class AsyncFileImpl implements AsyncFile {
return this;
}
@Override
public synchronized long getWritePos() {
return writePos;
}
private synchronized void checkDrained() {
if (drainHandler != null && writesOutstanding <= lwm) {
Handler<Void> handler = drainHandler;

View File

@@ -86,7 +86,7 @@ public interface HttpServerFileUpload extends ReadStream<Buffer> {
long size();
/**
* @return true if the size of the upload can be retrieved via {@link #size()}.
* @return {@code true} if the size of the upload can be retrieved via {@link #size()}.
*/
boolean isSizeAvailable();

View File

@@ -122,12 +122,19 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream<Http2ServerConnecti
private void notifyException(Throwable failure) {
Handler<Throwable> handler;
InterfaceHttpData upload = null;
synchronized (conn) {
handler = exceptionHandler;
if (postRequestDecoder != null) {
upload = postRequestDecoder.currentPartialHttpData();
}
}
if (handler != null) {
handler.handle(failure);
}
if (upload instanceof NettyFileUpload) {
((NettyFileUpload)upload).handleException(failure);
}
}
@Override

View File

@@ -17,7 +17,7 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpServerFileUpload;
import io.vertx.core.streams.Pump;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
import java.nio.charset.Charset;
@@ -60,48 +60,27 @@ class HttpServerFileUploadImpl implements HttpServerFileUpload {
this.contentTransferEncoding = contentTransferEncoding;
this.charset = charset;
this.size = size;
if (size == 0) {
lazyCalculateSize = true;
}
this.lazyCalculateSize = size == 0;
stream.handler(this::handleData);
stream.endHandler(this::handleEnd);
stream.endHandler(v -> this.handleEnd());
}
private void handleData(Buffer data) {
Handler<Buffer> h;
synchronized (HttpServerFileUploadImpl.this) {
h = dataHandler;
if (lazyCalculateSize) {
this.size += data.length();
}
size += data.length();
}
if (h != null) {
h.handle(data);
}
}
private void handleEnd(Void v) {
AsyncFile toClose;
synchronized (this) {
lazyCalculateSize = false;
toClose = file;
}
if (toClose != null) {
toClose.close(ar -> {
if (ar.failed()) {
notifyExceptionHandler(ar.cause());
}
notifyEndHandler();
});
} else {
notifyEndHandler();
}
}
private void notifyEndHandler() {
private void handleEnd() {
Handler<Void> handler;
synchronized (this) {
lazyCalculateSize = false;
handler = endHandler;
}
if (handler != null) {
@@ -183,14 +162,24 @@ class HttpServerFileUploadImpl implements HttpServerFileUpload {
@Override
public HttpServerFileUpload streamToFileSystem(String filename) {
pause();
Pipe<Buffer> pipe = stream.pipe().endOnComplete(false);
context.owner().fileSystem().open(filename, new OpenOptions(), ar -> {
if (ar.succeeded()) {
file = ar.result();
Pump p = Pump.pump(HttpServerFileUploadImpl.this, ar.result());
p.start();
resume();
pipe.to(file, ar2 -> {
file.close(ar3 -> {
Throwable failure = ar2.failed() ? ar2.cause() : ar3.failed() ? ar3.cause() : null;
if (failure != null) {
notifyExceptionHandler(failure);
}
synchronized (HttpServerFileUploadImpl.this) {
size = file.getWritePos();
}
handleEnd();
});
});
} else {
pipe.close();
notifyExceptionHandler(ar.cause());
}
});

View File

@@ -561,9 +561,13 @@ public class HttpServerRequestImpl implements HttpServerRequest {
void handleException(Throwable t) {
Handler<Throwable> handler = null;
HttpServerResponseImpl resp = null;
InterfaceHttpData upload = null;
synchronized (conn) {
if (!isEnded()) {
handler = exceptionHandler;
if (decoder != null) {
upload = decoder.currentPartialHttpData();
}
}
if (!response.ended()) {
if (METRICS_ENABLED) {
@@ -575,6 +579,9 @@ public class HttpServerRequestImpl implements HttpServerRequest {
if (resp != null) {
resp.handleException(t);
}
if (upload instanceof NettyFileUpload) {
((NettyFileUpload)upload).handleException(t);
}
if (handler != null) {
handler.handle(t);
}

View File

@@ -128,6 +128,16 @@ final class NettyFileUpload implements FileUpload, ReadStream<Buffer> {
}
}
public void handleException(Throwable err) {
Handler<Throwable> handler;
synchronized (this) {
handler = exceptionHandler;
}
if (handler != null) {
handler.handle(err);
}
}
@Override
public void setContent(ByteBuf channelBuffer) throws IOException {
completed = true;

View File

@@ -2774,17 +2774,17 @@ public abstract class HttpTest extends HttpTestBase {
@Test
public void testFormUploadSmallFile() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(100), false);
testFormUploadFile(TestUtils.randomAlphaString(100), false, false);
}
@Test
public void testFormUploadMediumFile() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(20000), false);
testFormUploadFile(TestUtils.randomAlphaString(20000), false, false);
}
@Test
public void testFormUploadLargeFile() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(4 * 1024 * 1024), false);
testFormUploadFile(TestUtils.randomAlphaString(4 * 1024 * 1024), false, false);
}
@Test
@@ -2794,20 +2794,60 @@ public abstract class HttpTest extends HttpTestBase {
@Test
public void testFormUploadSmallFileStreamToDisk() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(100), true);
testFormUploadFile(TestUtils.randomAlphaString(100), true, false);
}
@Test
public void testFormUploadMediumFileStreamToDisk() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(20 * 1024), true);
testFormUploadFile(TestUtils.randomAlphaString(20 * 1024), true, false);
}
@Test
public void testFormUploadLargeFileStreamToDisk() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(4 * 1024 * 1024), true);
testFormUploadFile(TestUtils.randomAlphaString(4 * 1024 * 1024), true, false);
}
private void testFormUploadFile(String contentStr, boolean streamToDisk) throws Exception {
@Test
public void testBrokenFormUploadEmptyFile() throws Exception {
testFormUploadFile("", true, true);
}
@Test
public void testBrokenFormUploadSmallFile() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(100), true, true);
}
@Test
public void testBrokenFormUploadMediumFile() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(20 * 1024), true, true);
}
@Test
public void testBrokenFormUploadLargeFile() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(4 * 1024 * 1024), true, true);
}
@Test
public void testBrokenFormUploadEmptyFileStreamToDisk() throws Exception {
testFormUploadFile("", true, true);
}
@Test
public void testBrokenFormUploadSmallFileStreamToDisk() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(100), true, true);
}
@Test
public void testBrokenFormUploadMediumFileStreamToDisk() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(20 * 1024), true, true);
}
@Test
public void testBrokenFormUploadLargeFileStreamToDisk() throws Exception {
testFormUploadFile(TestUtils.randomAlphaString(4 * 1024 * 1024), true, true);
}
private void testFormUploadFile(String contentStr, boolean streamToDisk, boolean abortClient) throws Exception {
waitFor(2);
@@ -2839,16 +2879,23 @@ public abstract class HttpTest extends HttpTestBase {
uploadedFileName = new File(testDir, UUID.randomUUID().toString()).getPath();
upload.streamToFileSystem(uploadedFileName);
}
AtomicInteger failures = new AtomicInteger();
upload.exceptionHandler(err -> failures.incrementAndGet());
upload.endHandler(v -> {
if (streamToDisk) {
Buffer uploaded = vertx.fileSystem().readFileBlocking(uploadedFileName);
assertEquals(content.length(), uploaded.length());
assertEquals(content, uploaded);
if (abortClient) {
assertEquals(1, failures.get());
} else {
assertEquals(content, tot);
assertEquals(0, failures.get());
if (streamToDisk) {
Buffer uploaded = vertx.fileSystem().readFileBlocking(uploadedFileName);
assertEquals(content.length(), uploaded.length());
assertEquals(content, uploaded);
} else {
assertEquals(content, tot);
}
assertTrue(upload.isSizeAvailable());
assertEquals(content.length(), upload.size());
}
assertTrue(upload.isSizeAvailable());
assertEquals(content.length(), upload.size());
AsyncFile file = upload.file();
if (streamToDisk) {
assertNotNull(file);
@@ -2873,30 +2920,39 @@ public abstract class HttpTest extends HttpTestBase {
});
server.listen(onSuccess(s -> {
HttpClientRequest req = client.request(HttpMethod.POST, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/form", onSuccess(resp -> {
// assert the response
assertEquals(200, resp.statusCode());
resp.bodyHandler(body -> {
assertEquals(0, body.length());
});
assertEquals(0, attributeCount.get());
HttpClientRequest req = client.request(HttpMethod.POST, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/form", ar -> {
assertEquals(ar.failed(), abortClient);
if (ar.succeeded()) {
HttpClientResponse resp = ar.result();
// assert the response
assertEquals(200, resp.statusCode());
resp.bodyHandler(body -> {
assertEquals(0, body.length());
});
assertEquals(0, attributeCount.get());
}
complete();
}));
});
String boundary = "dLV9Wyq26L_-JQxk6ferf-RT153LhOO";
Buffer buffer = Buffer.buffer();
String body =
"--" + boundary + "\r\n" +
"Content-Disposition: form-data; name=\"file\"; filename=\"tmp-0.txt\"\r\n" +
"Content-Type: image/gif\r\n" +
"\r\n" +
contentStr + "\r\n" +
"--" + boundary + "--\r\n";
buffer.appendString(body);
req.headers().set("content-length", String.valueOf(buffer.length()));
String epi = "\r\n" +
"--" + boundary + "--\r\n";
String pro = "--" + boundary + "\r\n" +
"Content-Disposition: form-data; name=\"file\"; filename=\"tmp-0.txt\"\r\n" +
"Content-Type: image/gif\r\n" +
"\r\n";
req.headers().set("content-length", "" + (pro + contentStr + epi).length());
req.headers().set("content-type", "multipart/form-data; boundary=" + boundary);
req.write(buffer).end();
if (abortClient) {
req.connectionHandler(conn -> {
vertx.setTimer(100, id -> {
conn.close();
});
});
req.write(pro + contentStr.substring(0, contentStr.length() / 2));
} else {
req.end(pro + contentStr + epi);
}
}));
await();