Multipart decoder rework (#2193)

* MultiPartDecoder implementation update (first pass)

* Fix TCK tests

* fix checkstyle

* update internal doc

* improve indentation

* Fix issue with distinguishing a concurrent onError signal from a concurrent onComplete.

* Incorporate review feedback

* add upstream = CANCELED to cleanup()
This commit is contained in:
Romain Grecourt
2020-07-28 13:30:46 -07:00
committed by GitHub
parent 11b0a77587
commit b19f866327
7 changed files with 1233 additions and 463 deletions

View File

@@ -0,0 +1,489 @@
# io.helidon.media.multipart.MultiPartDecoder
This document provides additional details about the implementation of `MultiPartDecoder`.
## Design considerations
Reactive `Processor` should assume it is used concurrently, and yet deliver signals to downstream in
a total order. There are a few other considerations stemming from the reactive specification.
When an error occurs it must be routed downstream, this `Processor` may have more than one downstream over time, thus
a given error may be signaled to many subscribers if needed.
`Subscriber` may cancel their `Subscription`. This should translate into a cancellation of upstream
subscription at the appropriate time: inner `Subscriber` should allow the outer `Subscriber` to make
progress; outer Subscribers should not cancel upstream subscription while the inner `Subscriber` may
need to interact with upstream to make progress.
`Subscriber` may issue bad requests. This should translate into a cancellation of upstream
subscription at the appropriate time: inner `Subscriber` should allow the outer `Subscriber` to make
progress; outer `Subscriber` should not generate errors that can be seen by inner `Subscriber`.
Resources pinned by this `Processor` should be released as soon as they are not needed, and it is
practical to do so: subsequent requests or cancellations may occur at some arbitrary time in the
future.
Whenever this `Processor` is known to have entered a terminal state (including cancellation or bad request),
it must release any resources.
Since we are essentially dealing with `DataChunk`, need to keep track of who owns the `DataChunk` - that is,
whose responsibility it is to release it (important for cases when `DataChunk` is backed by Netty buffers).
In this implementation all interactions with upstream, parser, or any of the `Subscriber` is done
in `drainBoth()`, which is guaranteed to be executed single-threadedly, with appropriate memory
fences between any two invocations of `drainBoth()`. This allows much of the state to be implemented as
non-thread-safe data structures. Additionally, the operation of the `Processor` can be understood
by observing `drainBoth()` method alone. The rest then is just a way to cause `drainBoth()` to make further
state transitions.
The state is described by:
- error: for errors that need to be signalled to both inner and outer `Subscriber` (produced by the parser or upstream)
- cancelled: for cancellations signalled by outer `Subscriber`
- parser: a helper object to capture parser state across multiple `DataChunk`
- iterator: parser iterator that holds `ParserEvents` and is used to transition parser state
- partsRequested: for outer `Subscriber` to indicate demand for MIME parts
- demand for DataChunks by inner `Subscriber` (exposed by DataChunkPublisher through API)
Whenever any of these change, `drain()` is called to enter `drainBoth()` or demand to re-do it again, if
a thread already inside `drainBoth()` is detected.
Additionally, special care is taken when dealing with:
- `upstream`: to interact with upstream
- `downstream`: outer `Subscriber`
- `bodyPartPublisher`: a special `Publisher` that interacts with inner `Subscriber`
At high level, `drainBoth()` operates like a flat map of a stream of `DataChunk` into a stream of
`ParserEvents`: `[DataChunk]` -> `[[ParserEvent]]` -> `[ParserEvent]`, which then is fanned out into a stream
of streams of DataChunk: `[ParserEvent]` -> `[ReadableBodyPart]`, which is essentially
`[ParserEvent]` -> `[[DataChunk]]`. In fact, if not for resource management and the `Processor` interface,
it could have been constructed as a composition of existing reactive components.
The explanation here may appear in reverse order to what `drainBoth()` is doing, but it may be easier to
see it in this order, the goals from high level to low level:
- `DataChunk` are requested from upstream one at a time
- this way we do not retain too many `DataChunk`, and flattening `[[ParserEvent]]` is trivial
- this is ensued by inner and outer `Subscriber` detecting when the demand changes from zero
- additionally, the demand of the outer `Subscriber` can become zero only after the next part is done ; this means
that the demand of the outer `Subscriber` is essentially unable to issue upstream request until after the inner
`Subscriber` is done
- `DataChunk` are not requested, nor any errors are signalled, while the parser iterator is able to
produce more ParserEvents
- all `onError` events are totally ordered after all possible `onNext` that can be emitted without
requesting more `DataChunk` from upstream
- parser iterator does not produce more events, unless there is evidence of demand from inner or
outer `Subscriber`
- outer `Subscriber` demand is ignored while there is a `bodyPartPublisher` responsible for dealing with
the demand of an inner `Subscriber`
- cancellation or error state of inner `Subscriber` appears to `drainBoth()` as a demand for infinite number
of `DataChunk`; this way we can make progress to the end of the MIME part, and serve the demand of the outer
`Subscriber` if any
- inner `Subscriber` demand is witnessed by inner `Subscriber` calling `drain()`, and that observing that
`bodyPartPublisher` is unable to satisfy the demand
- parser iterator is not asked for more events, while there is a `bodyPartPublisher` and it satisfies
the demand for `DataChunk` by inner `Subscriber` by the `DataChunk` already given to it
## DataChunkPublisher
Inner `Subscriber` is dealt with using `DataChunkPublisher`. Essentially, it is a flat map
`[[DataChunk]]` -> `[DataChunk]` (given iterators of `BufferEntry`, one at a time, emits `DataChunk` one at a
time). In fact, if not for resource management, it could have been constructed using existing reactive
components.
The design is very simple:
- keep track of change of demand and cancellations of inner `Subscriber`
- expose methods to allow total order of signals emitted by `drainBoth()`
- when cancelled, or a bad request is received, appear as unlimited unsatisfied demand, and merely discard
all `DataChunk` that are received
- relies on `drainBoth()` not attempting to deliver `onError` before the previous iterator of `BufferEntry` is
emptied ; this simplifies resource management
## Initialization
Both `MultiPartDecoder` and `DataChunkPublisher` share a similar approach: they have an atomic counter that:
- is initialized to a value indicating the uninitialized state that can never occur naturally throughout the
`Publisher` lifetime
- can be transitioned into "subscribed" state once and only once in its lifetime
- is finally transitioned into initialized state only after `onSubscribe` has returned.
This allows to ensure that no more than one `Subscriber` is associated with the `Publisher` and enforce the rule that
all on* signals get delivered only after `onSubscribe` and none during `onSubscribe`. Concurrent cases are commonly
omitted, but here we do take care of them - hence it looks a little more complex than others.
`DataChunkPublisher` is pretty much done at that stage. `MultiPartDecoder` needs a bit more explanation, as it
has two ends that need initializing:
- upstream signalling `onSubscribe`, potentially immediately followed by `onError` or `onComplete` for
empty upstream
- downstream outer `Subscriber` being attached by `subscribe()`
The use of `contenders` atomic counter allows to synchronize all these.
The partial order of possible events is:
```
uninitialized
| |
.-------------------------------' `----------------------------.
| |
V V
subscribe(...) --> halfInit(UPSTREAM_INIT) --> deferredInit() deferredInit() <-- halfInit(DOWNSTREAM_INIT) <-- onSubscribe(...)
| | | |
V | onError / onComplete | V
subscribe(...) --> !halfInit(UPSTREAM_INIT) | | | !halfInit(DOWNSTREAM_INIT) <-- onSubscribe(...)
| | | request | |
V | | | | V
subscribe(...) --> !halfInit(UPSTREAM_INIT) `--. | | .--' !halfInit(DOWNSTREAM_INIT) <-- onSubscribe(...)
| | | | | |
V V V V V V
... atomic update of contenders ...
|
V
contenders >= 0
|
V
initialized
```
`halfInit()` ensures that one and only one of `UPSTREAM_INIT` / `DOWNSTREAM_INIT` returns true, and any subsequent
future invocations with the same argument get false.
Of all atomic updates of contenders counter only the updates by `deferredInit()` are able to turn the value
into a non-negative. All other updates observe they are "locked out" from entering `drainBoth()`. The second
`deferredInit()` can witness if any of `onError`/`onComplete`/`request` happened, and enter `drainBoth()` on their
behalf.
Uninitialized state is represented by `Integer.MIN_VALUE` (`0b1000`). Each end attempts to transition to
half-initialized for their end, unless it is already initialized. It is safe to enter `drainBoth()`
only after both ends have initialized, so the number of ends that have been initialized is tracked as
the fourth bit: each end tries to add `SUBSCRIPTION_LOCK` (`0b0001`). Before the second of them,
the counter necessarily appears as `0b1111`, and adding `SUBSCRIPTION_LOCK` clears all the high
bits, leaving only zero, unless there were already attempts to enter `drainBoth()` by outer Subscriber
requesting parts as part of `onSubscribe`, or by upstream delivering `onError` or `onComplete`, which are
allowed to occur without requests from downstream.
## Normal flow
```
upstream outer Subscriber bodyPartPublisher inner Subscriber
initialized request
| |
V V
upstream.request(1)
|
.--------------'
|
V
onNext --> parserIterator = parser.parseIterator
|
...
|
V
parserIterator.next() == END_HEADERS
|
V
bodyPartPublisher = new DataChunkPublisher
|
V
onNext ----------------------> onSubscribe ---------------> request
| |
.--------------------------------+--------------------------'
|
V
parserIterator.next() == BODY
|
V
enter bodyPartPublisher.drain() ----------> onNext
|
V
onNext
|
...
|
.--------------------------------'
|
V
return !bodyPartPublisher.drain() // effectively wait for request request
|
.-----------------------------------------------------------------'
|
V
enter bodyPartPublisher.drain() ----------> onNext
|
.--------------------------------'
|
V
return bodyPartPublisher.drain()
|
V
parserIterator.next() == BODY
|
V
enter bodyPartPublisher.drain() ----------> onNext
|
.--------------------------------'
|
...
|
V
return bodyPartPublisher.drain()
|
V
!parserIterator.hasNext
|
V
upstream.request(1)
|
.---------------'
|
V
onNext --> parserIterator = parser.parseIterator
|
...
|
V
parserIterator.next() == END_PART
|
V
bodyPartPublisher.complete() --------> onComplete
|
.--------------------------------'
|
V
partRequested == 0 // essentially wait for request for more parts
request
|
V
partsRequested > 0
|
V
parserIterator.hasNext
|
...
|
V
!parserIterator.hasNext
|
V
partsRequested > 0
|
V
upstream.request(1)
|
.---------------'
|
V
onNext --> parserIterator = parser.parseIterator
| |
... ...
| |
V |
onComplete --------+
|
...
|
V
!parserIterator.hasNext
|
V
onComplete
```
## Errors and cancellations
Inner `Subscriber` makes a bad request:
(This state is not visible to anyone but `bodyPartPublisher` - the inner `Subscription`
appears as the one with the forever unsatisfied demand)
```
bodyPartPublisher inner Subscriber
... request(0 or negative)
| |
+---------------------'
|
...
|
V
return bodyPartPublisher.drain() // always returns true without interacting with inner Subscriber
|
...
|
V
parserIterator.next() == END_PART
|
V
bodyPartPublisher.complete --> onError
|
...
```
Inner `Subscriber` cancels:
(This state is not visible to anyone but `bodyPartPublisher` - the inner `Subscription`
appears as the one with the forever unsatisfied demand)
```
bodyPartPublisher inner Subscriber
... cancel
| |
+---------------------'
|
...
|
V
return bodyPartPublisher.drain() // always returns true without interacting with inner Subscriber
|
...
|
V
parserIterator.next() == END_PART
|
V
bodyPartPublisher.complete
|
...
```
Outer `Subscriber` cancels:
(it is difficult to depict the absence of events signalled to downstream exactly)
```
upstream outer Subscriber
cancel
|
...
|
V
bodyPartPublisher == null
|
V
upstream.cancel
|
V
cleanup
|
V
... cancelled
| |
V V
onNext ----> parserIterator = parser.parseIterator // may throw
| |
V |
onError / V
onComplete --> upstream.cancel
|
V
cleanup
```
Outer `Subscriber` makes a bad request:
(As soon as the current part is done, report `onError`, and appear to upstream
as a cancelled `Subscription` after that)
```
upstream outer Subscriber
request(0 or negative)
|
...
|
V
bodyPartPublisher == null
|
V
upstream.cancel
|
V
onError
|
V
cleanup
|
V
... cancelled
| |
V V
onNext ----> parserIterator = parser.parseIterator // may throw
| |
V |
onError / V
onComplete --> upstream.cancel
|
V
cleanup
```
Upstream reports `onError`:
```
upstream outer Subscriber bodyPartPublisher inner Subscriber
... ...
| |
V |
onError --------+
|
...
|
V
!parserIterator.hasNext
| |
| V
| bodyPartPublisher != null ---> onError
|
V
onError
|
V
upstream.cancel
|
V
cancelled
```
Parser throws:
(As soon as the next parser event is requested. Report to inner and outer `Subscriber`,
appear to upstream as a cancelled `Subscription`)
```
upstream outer Subscriber bodyPartPublisher
...
|
V
parser or parserIterator throws
|
V
parserIterator = EMPTY_ITER
|
V
!parserIterator.hasNext
| |
| V
| bodyPartPublisher != null ---> onError
|
V
upstream.cancel
|
V
onError
|
V
cleanup
|
V
... cancelled
| |
V V
onNext ----> parserIterator = parser.parseIterator // may throw
| |
V |
onError / V
onComplete --> upstream.cancel
|
V
cleanup
```

View File

@@ -19,6 +19,7 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -58,10 +59,9 @@ final class MimeParser {
END_HEADERS,
/**
* This event is issued for each part chunk parsed. The event
* It may be generated more than once for each part.
* This event is used by the Iterator to pass the whole body.
*/
CONTENT,
BODY,
/**
* This event is issued when the content for a part is complete.
@@ -74,21 +74,6 @@ final class MimeParser {
* only once.
*/
END_MESSAGE,
/**
* This event is issued when there is not enough data in the buffer to
* continue parsing. If issued after:
* <ul>
* <li>{@link #START_MESSAGE} - the parser did not detect the end of
* the preamble</li>
* <li>{@link #HEADER} - the parser
* did not detect the blank line that separates the part headers and the
* part body</li>
* <li>{@link #CONTENT} - the parser did not
* detect the next starting boundary or closing boundary</li>
* </ul>
*/
DATA_REQUIRED
}
/**
@@ -111,21 +96,11 @@ final class MimeParser {
}
/**
* Get this event as a {@link ContentEvent}.
*
* @return ContentEvent
* Get this event as a {@link BodyEvent}.
* @return HeaderEvent
*/
ContentEvent asContentEvent() {
return (ContentEvent) this;
}
/**
* Get this event as a {@link DataRequiredEvent}.
*
* @return DataRequiredEvent
*/
DataRequiredEvent asDataRequiredEvent() {
return (DataRequiredEvent) this;
BodyEvent asBodyEvent() {
return (BodyEvent) this;
}
}
@@ -199,23 +174,23 @@ final class MimeParser {
}
/**
* The event class for {@link EventType#CONTENT}.
* The event class for {@link EventType#BODY}.
*/
static final class ContentEvent extends ParserEvent {
static final class BodyEvent extends ParserEvent {
private final VirtualBuffer.BufferEntry bufferEntry;
private final List<VirtualBuffer.BufferEntry> buffers;
ContentEvent(VirtualBuffer.BufferEntry data) {
this.bufferEntry = data;
BodyEvent(List<VirtualBuffer.BufferEntry> data) {
this.buffers = data;
}
VirtualBuffer.BufferEntry content() {
return bufferEntry;
List<VirtualBuffer.BufferEntry> body() {
return buffers;
}
@Override
EventType type() {
return EventType.CONTENT;
return EventType.BODY;
}
}
@@ -247,43 +222,6 @@ final class MimeParser {
}
}
/**
* The event class for {@link EventType#DATA_REQUIRED}.
*/
static final class DataRequiredEvent extends ParserEvent {
private final boolean content;
private DataRequiredEvent(boolean content) {
this.content = content;
}
/**
* Indicate if the required data is for the body content of a part.
* @return {@code true} if for body content, {@code false} otherwise
*/
boolean isContent() {
return content;
}
@Override
EventType type() {
return EventType.DATA_REQUIRED;
}
}
/**
* Callback interface to the parser.
*/
interface EventProcessor {
/**
* Process a parser event.
* @param event generated event
*/
void process(ParserEvent event);
}
/**
* MIME Parsing exception.
*/
@@ -391,17 +329,11 @@ final class MimeParser {
*/
private boolean closed;
/**
* The event listener.
*/
private final EventProcessor listener;
/**
* Parses the MIME content.
*/
MimeParser(String boundary, EventProcessor eventListener) {
MimeParser(String boundary) {
bndbytes = getBytes("--" + boundary);
listener = eventListener;
bl = bndbytes.length;
gss = new int[bl];
buf = new VirtualBuffer();
@@ -445,11 +377,10 @@ final class MimeParser {
* or {@code START_MESSAGE}
*/
void close() throws ParsingException {
cleanup();
switch (state) {
case START_MESSAGE:
case END_MESSAGE:
closed = true;
buf.clear();
break;
case DATA_REQUIRED:
switch (resumeState) {
@@ -468,130 +399,159 @@ final class MimeParser {
}
}
/**
* Like close(), but just releases resources and does not throw.
*/
void cleanup() {
closed = true;
buf.clear();
}
/**
* Advances parsing.
* @throws ParsingException if an error occurs during parsing
*/
void parse() throws ParsingException {
try {
while (true) {
switch (state) {
case START_MESSAGE:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.START_MESSAGE);
}
state = STATE.SKIP_PREAMBLE;
listener.process(START_MESSAGE_EVENT);
break;
Iterator<ParserEvent> parseIterator() {
return new Iterator<>() {
case SKIP_PREAMBLE:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.SKIP_PREAMBLE);
}
skipPreamble();
if (bndStart == -1) {
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
}
state = STATE.DATA_REQUIRED;
resumeState = STATE.SKIP_PREAMBLE;
listener.process(new DataRequiredEvent(false));
return;
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "Skipped the preamble. position={0}", position);
}
state = STATE.START_PART;
break;
private ParserEvent nextEvent;
private boolean done;
// fall through
case START_PART:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.START_PART);
}
state = STATE.HEADERS;
listener.process(START_PART_EVENT);
break;
@Override
public ParserEvent next() {
if (!hasNext()) {
throw new IllegalStateException("Read past end of stream");
}
ParserEvent ne = nextEvent;
nextEvent = null;
done = ne == END_MESSAGE_EVENT;
return ne;
}
case HEADERS:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.HEADERS);
}
String headerLine = readHeaderLine();
if (headerLine == null) {
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
}
state = STATE.DATA_REQUIRED;
resumeState = STATE.HEADERS;
listener.process(new DataRequiredEvent(false));
return;
}
if (!headerLine.isEmpty()) {
Hdr header = new Hdr(headerLine);
listener.process(new HeaderEvent(header.name(), header.value()));
break;
}
state = STATE.BODY;
bol = true;
listener.process(END_HEADERS_EVENT);
break;
@Override
public boolean hasNext() {
if (nextEvent != null) {
return true;
}
case BODY:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.BODY);
}
List<VirtualBuffer.BufferEntry> bodyContent = readBody();
if (bndStart == -1 || bodyContent.isEmpty()) {
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
}
state = STATE.DATA_REQUIRED;
resumeState = STATE.BODY;
if (bodyContent.isEmpty()) {
listener.process(new DataRequiredEvent(true));
return;
}
} else {
bol = false;
}
for (VirtualBuffer.BufferEntry content : bodyContent) {
listener.process(new ContentEvent(content));
}
break;
try {
while (true) {
switch (state) {
case START_MESSAGE:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.START_MESSAGE);
}
state = STATE.SKIP_PREAMBLE;
nextEvent = START_MESSAGE_EVENT;
return true;
case END_PART:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.END_PART);
}
if (done) {
state = STATE.END_MESSAGE;
} else {
state = STATE.START_PART;
}
listener.process(END_PART_EVENT);
break;
case SKIP_PREAMBLE:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.SKIP_PREAMBLE);
}
skipPreamble();
if (bndStart == -1) {
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
}
state = STATE.DATA_REQUIRED;
resumeState = STATE.SKIP_PREAMBLE;
return false;
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "Skipped the preamble. position={0}", position);
}
state = STATE.START_PART;
break;
case END_MESSAGE:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.END_MESSAGE);
// fall through
case START_PART:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.START_PART);
}
state = STATE.HEADERS;
nextEvent = START_PART_EVENT;
return true;
case HEADERS:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.HEADERS);
}
String headerLine = readHeaderLine();
if (headerLine == null) {
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
}
state = STATE.DATA_REQUIRED;
resumeState = STATE.HEADERS;
return false;
}
if (!headerLine.isEmpty()) {
Hdr header = new Hdr(headerLine);
nextEvent = new HeaderEvent(header.name(), header.value());
return true;
}
state = STATE.BODY;
bol = true;
nextEvent = END_HEADERS_EVENT;
return true;
case BODY:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.BODY);
}
List<VirtualBuffer.BufferEntry> bodyContent = readBody();
if (bndStart == -1 || bodyContent.isEmpty()) {
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
}
state = STATE.DATA_REQUIRED;
resumeState = STATE.BODY;
if (bodyContent.isEmpty()) {
return false;
}
} else {
bol = false;
}
nextEvent = new BodyEvent(bodyContent);
return true;
case END_PART:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.END_PART);
}
if (MimeParser.this.done) {
state = STATE.END_MESSAGE;
} else {
state = STATE.START_PART;
}
nextEvent = END_PART_EVENT;
return true;
case END_MESSAGE:
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.log(Level.FINER, "state={0}", STATE.END_MESSAGE);
}
if (done) {
return false;
}
nextEvent = END_MESSAGE_EVENT;
return true;
case DATA_REQUIRED:
return false;
default:
// nothing to do
}
listener.process(END_MESSAGE_EVENT);
return;
case DATA_REQUIRED:
listener.process(new DataRequiredEvent(resumeState == STATE.BODY));
return;
default:
// nothing to do
}
} catch (ParsingException ex) {
throw ex;
} catch (Throwable ex) {
throw new ParsingException(ex);
}
}
} catch (ParsingException ex) {
throw ex;
} catch (Throwable ex) {
throw new ParsingException(ex);
}
};
}
/**
@@ -740,6 +700,7 @@ final class MimeParser {
* from the buffer
*/
private String readHeaderLine() {
// FIXME: what about multi-line headers?
int bufLen = buf.length();
// need more data to progress
// need at least one blank line to read (no headers)

View File

@@ -18,16 +18,17 @@ package io.helidon.media.multipart;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.BufferedEmittingPublisher;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.SubscriptionHelper;
import io.helidon.media.common.MessageBodyReadableContent;
import io.helidon.media.common.MessageBodyReaderContext;
@@ -35,20 +36,29 @@ import io.helidon.media.multipart.VirtualBuffer.BufferEntry;
/**
* Reactive processor that decodes HTTP payload as a stream of {@link BodyPart}.
*
* This implementation is documented here {@code /docs-internal/multipartdecoder.md}.
*/
public class MultiPartDecoder implements Processor<DataChunk, ReadableBodyPart> {
private Subscription upstream;
private static final int DOWNSTREAM_INIT = Integer.MIN_VALUE >>> 1;
private static final int UPSTREAM_INIT = Integer.MIN_VALUE >>> 2;
private static final int SUBSCRIPTION_LOCK = Integer.MIN_VALUE >>> 3;
private static final Iterator<BufferEntry> EMPTY_BUFFER_ENTRY_ITERATOR = new EmptyIterator<>();
private static final Iterator<MimeParser.ParserEvent> EMPTY_PARSER_ITERATOR = new EmptyIterator<>();
private volatile Subscription upstream;
private Subscriber<? super ReadableBodyPart> downstream;
private BufferedEmittingPublisher<ReadableBodyPart> emitter;
private ReadableBodyPart.Builder bodyPartBuilder;
private ReadableBodyPartHeaders.Builder bodyPartHeaderBuilder;
private BufferedEmittingPublisher<DataChunk> bodyPartPublisher;
private final CompletableFuture<BufferedEmittingPublisher<ReadableBodyPart>> initFuture;
private final LinkedList<ReadableBodyPart> bodyParts;
private DataChunkPublisher bodyPartPublisher;
private Iterator<MimeParser.ParserEvent> parserIterator = EMPTY_PARSER_ITERATOR;
private volatile Throwable error;
private boolean cancelled;
private AtomicInteger contenders = new AtomicInteger(Integer.MIN_VALUE);
private AtomicLong partsRequested = new AtomicLong();
private final HashMap<Integer, DataChunk> chunksByIds;
private final MimeParser parser;
private final ParserEventProcessor parserEventProcessor;
private final MessageBodyReaderContext context;
/**
@@ -61,10 +71,7 @@ public class MultiPartDecoder implements Processor<DataChunk, ReadableBodyPart>
Objects.requireNonNull(boundary, "boundary cannot be null!");
Objects.requireNonNull(context, "context cannot be null!");
this.context = context;
parserEventProcessor = new ParserEventProcessor();
parser = new MimeParser(boundary, parserEventProcessor);
initFuture = new CompletableFuture<>();
bodyParts = new LinkedList<>();
parser = new MimeParser(boundary);
chunksByIds = new HashMap<>();
}
@@ -82,18 +89,47 @@ public class MultiPartDecoder implements Processor<DataChunk, ReadableBodyPart>
@Override
public void subscribe(Subscriber<? super ReadableBodyPart> subscriber) {
Objects.requireNonNull(subscriber);
if (emitter != null || downstream != null) {
subscriber.onSubscribe(SubscriptionHelper.CANCELED);
subscriber.onError(new IllegalStateException("Only one Subscriber allowed"));
if (!halfInit(UPSTREAM_INIT)) {
Multi.<ReadableBodyPart>error(new IllegalStateException("Only one Subscriber allowed"))
.subscribe(subscriber);
return;
}
this.downstream = subscriber;
// contenders < 0, so any part request will be deferred
// drain() will not request anything from upstream until the second deferredInit() invocation witnesses
// that upstream is set
downstream.onSubscribe(new Subscription() {
@Override
public void request(long n) {
long curr = n <= 0
? partsRequested.getAndSet(-1)
: partsRequested.getAndUpdate(v -> Long.MAX_VALUE - v > n
? v + n : v < 0 ? v : Long.MAX_VALUE);
if (curr == 0) {
drain();
}
}
@Override
public void cancel() {
cancelled = true;
if (partsRequested.getAndSet(-1) == 0) {
drain();
}
}
});
deferredInit();
}
@Override
public void onSubscribe(Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
if (!halfInit(DOWNSTREAM_INIT)) {
SubscriptionHelper.validate(upstream, subscription);
return;
}
this.upstream = subscription;
deferredInit();
}
@@ -106,89 +142,223 @@ public class MultiPartDecoder implements Processor<DataChunk, ReadableBodyPart>
int id = parser.offer(byteBuffers[i]);
// record the chunk using the id of the last buffer
if (i == byteBuffers.length - 1) {
// drain() cannot be invoked concurrently, it is safe to use HashMap
chunksByIds.put(id, chunk);
}
}
parser.parse();
parserIterator = parser.parseIterator();
drain();
} catch (MimeParser.ParsingException ex) {
emitter.fail(ex);
chunk.release();
releaseChunks();
}
// submit parsed parts
while (!bodyParts.isEmpty()) {
if (emitter.isCancelled()) {
return;
}
emitter.emit(bodyParts.poll());
}
// complete the parts publisher
if (parserEventProcessor.isCompleted()) {
emitter.complete();
// parts are delivered sequentially
// we potentially drop the last part if not requested
emitter.clearBuffer(this::drainPart);
releaseChunks();
}
// request more data to detect the next part
// if not in the middle of a part content
// or if the part content subscriber needs more
if (upstream != SubscriptionHelper.CANCELED
&& emitter.hasRequests()
&& parserEventProcessor.isDataRequired()
&& (!parserEventProcessor.isContentDataRequired() || bodyPartPublisher.hasRequests())) {
upstream.request(1);
drain(ex);
}
}
@Override
public void onError(Throwable throwable) {
Objects.requireNonNull(throwable);
initFuture.whenComplete((e, t) -> e.fail(throwable));
error = throwable;
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
drain();
}
}
@Override
public void onComplete() {
initFuture.whenComplete((e, t) -> {
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
try {
parser.close();
} catch (MimeParser.ParsingException ex) {
emitter.fail(ex);
releaseChunks();
}
}
});
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
drain();
}
}
private boolean halfInit(int mask) {
// Attempts to set the given init mask, if contenders is in the right state for that, and
// reports whether the contenders was in a state where that part of init needed completing.
int c = contenders.getAndUpdate(v -> v < 0 ? v | mask : v);
return c < 0 && (c & mask) == 0;
}
private void deferredInit() {
if (upstream != null && downstream != null) {
emitter = BufferedEmittingPublisher.create();
emitter.onRequest(this::onPartRequest);
emitter.onEmit(this::drainPart);
//emitter.onCancel(this::onPartCancel);
emitter.subscribe(downstream);
initFuture.complete(emitter);
downstream = null;
// deferredInit is invoked twice: onSubscribe and subscribe
// after onSubscribe and subscribe three top bits are set
// adding SUBSCRIPTION_LOCK for the first time sets the fourth bit
// adding SUBSCRIPTION_LOCK for the second time clears all top bits
// making the contenders counter 0,
// unless there were onError, onComplete, request for parts or downstream cancellation
if (contenders.addAndGet(SUBSCRIPTION_LOCK) > 0) {
drainLoop();
}
}
private void onPartRequest(long requested, long total) {
// require more raw chunks to decode if the decoding has not
// yet started or if more data is required to make progress
if (!parserEventProcessor.isStarted() || parserEventProcessor.isDataRequired()) {
upstream.request(1);
}
private long partsRequested() {
// Returns a negative number, if we are serving outer Subscriber, and we can tell it may
// not issue more requests for parts: either cancelled, or a bad request was observed.
// Otherwise returns a positive number, if serving inner Subscriber, or actual partsRequested.
return bodyPartPublisher != null ? 1 : partsRequested.get();
}
private void onPartCancel() {
emitter.clearBuffer(this::drainPart);
private void cleanup() {
// drop the reference to parserIterator, but keep it safe for any later invocation of parserIterator
parserIterator = EMPTY_PARSER_ITERATOR;
error = null;
upstream = SubscriptionHelper.CANCELED;
downstream = null; // after cleanup no uses of downstream are reachable
cancelled = true; // after cleanup the processor appears as cancelled
bodyPartHeaderBuilder = null;
bodyPartBuilder = null;
partsRequested.set(-1);
releaseChunks();
parser.cleanup();
}
/**
* Drain the upstream data if the contenders value is positive.
*/
protected void drain() {
// We do not serve the next part until the last chunk of the previous part has been consumed
// (sent to inner Subscriber).
// Signals to outer Subscriber are serialized with the signals to the inner Subscriber.
// drain() is a loop that retrieves ParserEvents one by one, and transitions to the next state,
// unless waiting on the inner or outer subscriber.
// There are three ways to enter drain():
// 1. We are not processing a part and an outer Subscriber has unsatisfied demand for parts
// 2. We are processing a part and an inner Subscriber has unsatisfied demand for chunks of a part
// 3. Upstream is delivering a DataChunk to satisfy the request from outer or inner Subscriber
if (contenders.getAndIncrement() != 0) {
return;
}
drainLoop();
}
/**
* Drain the upstream data in a loop while the contenders value is positive.
*/
protected void drainLoop() {
for (int c = 1; c > 0; c = contenders.addAndGet(-c)) {
drainBoth();
}
}
/**
* Drain the upstream data and signal the given error.
*
* @param th the error to signal
*/
protected void drain(Throwable th) {
error = th;
drain();
}
/**
* Drain upstream (raw) data and decoded downstream data.
*/
protected void drainBoth() {
if (bodyPartPublisher != null && !bodyPartPublisher.drain()) {
return;
}
try {
// Proceed to drain parserIterator only if parts or body part chunks were requested
// ie. bodyPartPublisher != null && partsRequested > 0
// if bodyPartPublisher != null, then we are here when inner Subscriber has unsatisfied demand
long requested = partsRequested();
while (requested >= 0 && parserIterator.hasNext()) {
// It is safe to consume next ParserEvent only the right Subscriber is ready to receive onNext
// i.e partsRequested > 0
if (requested == 0) {
// This means there was an attempt to deliver onError or onComplete from upstream
// which are allowed to be issued without request from outer Subscriber.
// - partsRequested > 0 for valid requests
// - partsRequested < 0 cancellation or invalid request
// we wait until demand has been manifested and parserIterator is drained
return;
}
MimeParser.ParserEvent event = parserIterator.next();
switch (event.type()) {
case START_PART:
bodyPartHeaderBuilder = ReadableBodyPartHeaders.builder();
bodyPartBuilder = ReadableBodyPart.builder();
break;
case HEADER:
MimeParser.HeaderEvent headerEvent = event.asHeaderEvent();
bodyPartHeaderBuilder.header(headerEvent.name(), headerEvent.value());
break;
case END_HEADERS:
bodyPartPublisher = new DataChunkPublisher();
downstream.onNext(createPart());
bodyPartHeaderBuilder = null;
bodyPartBuilder = null;
// exit the parser iterator loop
// the parser events processing will resume upon inner Subscriber demand
return;
case BODY:
Iterator<BufferEntry> bodyIterator = event.asBodyEvent().body().iterator();
bodyPartPublisher.nextIterator(bodyIterator);
if (!bodyPartPublisher.drain()) {
// the body was not fully drained, exit the parser iterator loop
// the parser events processing will resume upon inner Subscriber demand
return;
}
break;
case END_PART:
bodyPartPublisher.complete(null);
bodyPartPublisher = null;
requested = partsRequested.updateAndGet(v -> v == Long.MAX_VALUE || v < 0 ? v : v - 1);
break;
default:
}
}
// we allow requested <= 0 to reach here, because we want to allow delivery of termination signals
// without requests or cancellations, but ultimately need to make sure we do not request from
// upstream, unless actual demand is observed (requested > 0)
if (requested < 0) {
if (cancelled) {
upstream.cancel();
cleanup();
return;
}
// now is the right time to convert a bad request into an error
// bodyPartPublisher is null, so this error gets delivered only to outer Subscriber
error = new IllegalArgumentException("Expecting only positive requests for parts");
}
// ordering the delivery of errors after the delivery of all signals that precede it
// in the order of events emitted by the parser
if (upstream == SubscriptionHelper.CANCELED || error != null) {
if (error != null) {
if (bodyPartPublisher != null) {
bodyPartPublisher.complete(error);
bodyPartPublisher = null;
}
upstream.cancel();
downstream.onError(error);
} else {
// parser will throw, if we attempt to close it before it finished parsing the message
// and onError will be delivered
parser.close();
downstream.onComplete();
}
cleanup();
return;
}
// parserIterator is drained, drop the reference to it, but keep it safe for any later invocations
parserIterator = EMPTY_PARSER_ITERATOR;
if (requested > 0) {
upstream.request(1);
}
} catch (MimeParser.ParsingException ex) {
// make sure we do not interact with the parser through the iterator, and do not re-enter
// the iterator draining loop
parserIterator = EMPTY_PARSER_ITERATOR;
drain(ex);
}
}
private void releaseChunks() {
@@ -200,28 +370,6 @@ public class MultiPartDecoder implements Processor<DataChunk, ReadableBodyPart>
}
}
private void drainPart(ReadableBodyPart part) {
part.content().subscribe(new Subscriber<DataChunk>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(DataChunk item) {
item.release();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
}
private ReadableBodyPart createPart() {
ReadableBodyPartHeaders headers = bodyPartHeaderBuilder.build();
@@ -247,6 +395,7 @@ public class MultiPartDecoder implements Processor<DataChunk, ReadableBodyPart>
throw new IllegalStateException("Parent chunk not found, id=" + id);
}
ByteBuffer[] originalBuffers = chunk.data();
// FIXME: the current resource management is not implemented properly and needs to be fixed
boolean release = data.limit() == originalBuffers[originalBuffers.length - 1].limit();
if (release) {
chunksByIds.remove(id);
@@ -254,78 +403,142 @@ public class MultiPartDecoder implements Processor<DataChunk, ReadableBodyPart>
return new BodyPartChunk(data, release ? chunk : null);
}
private final class ParserEventProcessor implements MimeParser.EventProcessor {
/**
* Inner publisher that publishes the body part as {@link DataChunk}.
*/
protected final class DataChunkPublisher implements Publisher<DataChunk> {
private MimeParser.ParserEvent lastEvent = null;
private final AtomicLong chunksRequested = new AtomicLong(Long.MIN_VALUE + 1);
private Iterator<BufferEntry> bufferEntryIterator = EMPTY_BUFFER_ENTRY_ITERATOR;
private boolean cancelled;
private Subscriber<? super DataChunk> subscriber;
@Override
public void process(MimeParser.ParserEvent event) {
MimeParser.EventType eventType = event.type();
switch (eventType) {
case START_PART:
bodyPartPublisher = BufferedEmittingPublisher.create();
bodyPartHeaderBuilder = ReadableBodyPartHeaders.builder();
bodyPartBuilder = ReadableBodyPart.builder();
break;
case HEADER:
MimeParser.HeaderEvent headerEvent = event.asHeaderEvent();
bodyPartHeaderBuilder.header(headerEvent.name(), headerEvent.value());
break;
case END_HEADERS:
bodyParts.add(createPart());
break;
case CONTENT:
bodyPartPublisher.emit(createPartChunk(event.asContentEvent().content()));
break;
case END_PART:
bodyPartPublisher.complete();
bodyPartPublisher = null;
bodyPartHeaderBuilder = null;
bodyPartBuilder = null;
break;
default:
// nothing to do
public void subscribe(Subscriber<? super DataChunk> sub) {
if (!chunksRequested.compareAndSet(Long.MIN_VALUE + 1, Long.MIN_VALUE)) {
Multi.<DataChunk>error(new IllegalStateException("Only one Subscriber allowed"))
.subscribe(subscriber);
return;
}
lastEvent = event;
subscriber = sub;
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
// Illegal n makes chunksRequested negative, which interacts with drain() to drain the
// entire bufferEntryIterator, and signal onError
long curr = n <= 0
? chunksRequested.getAndSet(-1)
: chunksRequested.getAndUpdate(v -> Long.MAX_VALUE - v > n
? v + n : v < 0 ? v == Long.MIN_VALUE ? n : v : Long.MAX_VALUE);
if (curr == 0) {
MultiPartDecoder.this.drain();
}
}
@Override
public void cancel() {
cancelled = true;
// Ensure the part chunks are drained to make the next part available
if (chunksRequested.getAndSet(-1) == 0) {
MultiPartDecoder.this.drain();
}
}
});
if (chunksRequested.compareAndSet(Long.MIN_VALUE, 0)) {
return;
}
MultiPartDecoder.this.drain();
}
/**
* Indicate if the parser has received any data.
*
* @return {@code true} if the parser has been offered data,
* {@code false} otherwise
* Set the next buffer entry iterator.
* @param iterator the iterator to set
*/
boolean isStarted() {
return lastEvent != null;
void nextIterator(Iterator<BufferEntry> iterator) {
// This is invoked only when the previous bufferEntryIterator has been consumed fully,
// and chunksRequested > 0, so no one is calling drain() concurrently
// chunksRequested is modified atomically, so any future invocation of drain() will observe
// bufferEntryIterator normal store (bufferEntryIterator and all of its content is published safely)
bufferEntryIterator = iterator;
}
/**
* Indicate if the parser has reached the end of the message.
* Complete the publisher.
*
* @return {@code true} if completed, {@code false} otherwise
* @param th throwable, if not {@code null} signals {@code onError}, otherwise signals {@code onComplete}
*/
boolean isCompleted() {
return lastEvent.type() == MimeParser.EventType.END_MESSAGE;
void complete(Throwable th) {
if (chunksRequested.get() < 0) {
if (cancelled) {
subscriber = null;
return;
}
th = new IllegalArgumentException("Expecting only positive requests");
}
cancelled = true;
chunksRequested.set(-1);
// bufferEntryIterator is drained because complete() is invoked only by drain() which proceeds past
// state == BODY only when drain() returned true
if (th != null) {
subscriber.onError(th);
} else {
subscriber.onComplete();
}
subscriber = null;
}
/**
* Indicate if the parser requires more data to make progress.
* Drain the current buffer entry iterator according to the current request count.
*
* @return {@code true} if more data is required, {@code false}
* otherwise
* @return {@code true} if the iterator was fully drained, {@code false} otherwise
*/
boolean isDataRequired() {
return lastEvent.type() == MimeParser.EventType.DATA_REQUIRED;
boolean drain() {
long requested = chunksRequested.get();
long chunksEmitted = 0;
// requested < 0 behaves like cancel, i.e drain bufferEntryIterator
while (chunksEmitted < requested && bufferEntryIterator.hasNext()) {
do {
DataChunk chunk = createPartChunk(bufferEntryIterator.next());
subscriber.onNext(chunk);
chunk.release();
chunksEmitted++;
} while (chunksEmitted < requested && bufferEntryIterator.hasNext());
long ce = chunksEmitted;
requested = chunksRequested.updateAndGet(v -> v == Long.MAX_VALUE || v < 0 ? v : v - ce);
chunksEmitted = 0;
}
if (requested < 0) {
while (bufferEntryIterator.hasNext()) {
createPartChunk(bufferEntryIterator.next()).release();
}
}
if (requested != 0) {
// bufferEntryIterator is drained, drop the reference
bufferEntryIterator = EMPTY_BUFFER_ENTRY_ITERATOR;
return true;
}
return false;
}
}
private static final class EmptyIterator<T> implements Iterator<T> {
@Override
public boolean hasNext() {
return false;
}
/**
* Indicate if more content data is required.
*
* @return {@code true} if more content data is required, {@code false}
* otherwise
*/
boolean isContentDataRequired() {
return isDataRequired() && lastEvent.asDataRequiredEvent().isContent();
@Override
public T next() {
throw new IllegalStateException("Read beyond EOF");
}
}
}

View File

@@ -16,6 +16,7 @@
package io.helidon.media.multipart;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ArrayList;
@@ -382,26 +383,6 @@ public class MimeParserTest {
+ "this-is-the-2nd-slice-of-the-body")));
}
@Test
public void testBoundaryAcrossChunksDataRequired() {
String boundary = "boundary";
final byte[] chunk1 = ("--" + boundary + "\n"
+ "Content-Id: part1\n"
+ "\n"
+ "this-is-the-body-of-part1\n"
+ "--" + boundary.substring(0, 3)).getBytes();
ParserEventProcessor processor = new ParserEventProcessor();
MimeParser parser = new MimeParser(boundary, processor);
parser.offer(ByteBuffer.wrap(chunk1));
parser.parse();
assertThat(processor.partContent, is(notNullValue()));
assertThat(new String(processor.partContent), is(equalTo("this-is-the-body-of-")));
assertThat(processor.lastEvent, is(notNullValue()));
assertThat(processor.lastEvent.type(), is(equalTo(MimeParser.EventType.DATA_REQUIRED)));
}
@Test
public void testBoundaryAcrossChunks() {
String boundary = "boundary";
@@ -637,11 +618,9 @@ public class MimeParserTest {
@Test
public void testParserClosed() {
try {
ParserEventProcessor processor = new ParserEventProcessor();
MimeParser parser = new MimeParser("boundary", processor);
MimeParser parser = new MimeParser("boundary");
parser.close();
parser.offer(ByteBuffer.wrap("foo".getBytes()));
parser.parse();
fail("exception should be thrown");
} catch (MimeParser.ParsingException ex) {
assertThat(ex.getMessage(), is(equalTo("Parser is closed")));
@@ -673,9 +652,9 @@ public class MimeParserTest {
*
* @param boundary boundary string
* @param data for the chunks to parse
* @return test parser event processor
* @return parser result
*/
static ParserEventProcessor parse(String boundary, byte[] data) {
static ParserResult parse(String boundary, byte[] data) {
return parse(boundary, List.of(data));
}
@@ -684,67 +663,78 @@ public class MimeParserTest {
*
* @param boundary boundary string
* @param data for the chunks to parse
* @return test parser event processor
* @return parser result
*/
static ParserEventProcessor parse(String boundary, List<byte[]> data) {
ParserEventProcessor processor = new ParserEventProcessor();
MimeParser parser = new MimeParser(boundary, processor);
for (byte[] bytes : data) {
parser.offer(ByteBuffer.wrap(bytes));
parser.parse();
}
parser.close();
return processor;
}
/**
* Test parser event processor.
*/
static final class ParserEventProcessor implements MimeParser.EventProcessor {
static ParserResult parse(String boundary, List<byte[]> data) {
MimeParser parser = new MimeParser(boundary);
List<MimePart> parts = new LinkedList<>();
Map<String, List<String>> partHeaders = new HashMap<>();
byte[] partContent = null;
MimeParser.ParserEvent lastEvent = null;
@Override
public void process(MimeParser.ParserEvent event) {
switch (event.type()) {
case START_PART:
partHeaders = new HashMap<>();
partContent = null;
break;
case HEADER:
MimeParser.HeaderEvent headerEvent = event.asHeaderEvent();
String name = headerEvent.name();
String value = headerEvent.value();
assertThat(name, notNullValue());
assertThat(name.length(), not(equalTo(0)));
assertThat(value, notNullValue());
List<String> values = partHeaders.get(name);
if (values == null) {
values = new ArrayList<>();
partHeaders.put(name, values);
}
values.add(value);
break;
case CONTENT:
ByteBuffer content = event.asContentEvent().content().buffer();
assertThat(content, is(notNullValue()));
if (partContent == null) {
partContent = Utils.toByteArray(content);
} else {
partContent = concat(partContent, Utils.toByteArray(content));
}
break;
case END_PART:
parts.add(new MimePart(partHeaders, partContent));
break;
for (byte[] bytes : data) {
parser.offer(ByteBuffer.wrap(bytes));
Iterator<MimeParser.ParserEvent> it = parser.parseIterator();
while(it.hasNext()) {
MimeParser.ParserEvent event = it.next();
switch (event.type()) {
case START_PART:
partHeaders = new HashMap<>();
partContent = null;
break;
case HEADER:
MimeParser.HeaderEvent headerEvent = event.asHeaderEvent();
String name = headerEvent.name();
String value = headerEvent.value();
assertThat(name, notNullValue());
assertThat(name.length(), not(equalTo(0)));
assertThat(value, notNullValue());
List<String> values = partHeaders.get(name);
if (values == null) {
values = new ArrayList<>();
partHeaders.put(name, values);
}
values.add(value);
break;
case BODY:
for (VirtualBuffer.BufferEntry content : event.asBodyEvent().body()) {
ByteBuffer buffer = content.buffer();
assertThat(buffer, is(notNullValue()));
if (partContent == null) {
partContent = Utils.toByteArray(buffer);
} else {
partContent = concat(partContent, Utils.toByteArray(buffer));
}
}
break;
case END_PART:
parts.add(new MimePart(partHeaders, partContent));
break;
}
lastEvent = event;
}
lastEvent = event;
}
parser.close();
return new ParserResult(parts, partHeaders, partContent, lastEvent);
}
/**
* Parser result.
*/
static final class ParserResult {
final List<MimePart> parts;
final Map<String, List<String>> partHeaders;
final byte[] partContent;
final MimeParser.ParserEvent lastEvent;
ParserResult(List<MimePart> parts,
Map<String, List<String>> partHeaders,
byte[] partContent,
MimeParser.ParserEvent lastEvent) {
this.parts = parts;
this.partHeaders = partHeaders;
this.partContent = partContent;
this.lastEvent = lastEvent;
}
}

View File

@@ -86,7 +86,7 @@ public class MultiPartDecoderSubsWhiteBoxTckTest extends FlowSubscriberWhiteboxV
}
};
Multi.create(decoder).forEach(part -> {});
Multi.create(decoder).forEach(part -> part.content().forEach(chunk -> {}));
return decoder;
}

View File

@@ -61,7 +61,10 @@ public class MultiPartDecoderTckTest extends FlowPublisherVerification<ReadableB
public Flow.Publisher<ReadableBodyPart> createFlowPublisher(final long l) {
MultiPartDecoder decoder = MultiPartDecoder.create("boundary", MEDIA_CONTEXT.readerContext());
upstream(l).subscribe(decoder);
return decoder;
return Multi.create(decoder).map(part -> {
part.content().forEach(chunk -> {});
return part;
});
}
@Override

View File

@@ -17,11 +17,13 @@ package io.helidon.media.multipart;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -53,6 +55,8 @@ public class MultiPartDecoderTest {
+ "--" + boundary + "--").getBytes();
final CountDownLatch latch = new CountDownLatch(2);
final CompletableFuture<Void> testDone = new CompletableFuture<>();
Consumer<BodyPart> consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"),
@@ -60,16 +64,26 @@ public class MultiPartDecoderTest {
DataChunkSubscriber subscriber = new DataChunkSubscriber();
part.content().subscribe(subscriber);
subscriber.content().thenAccept(body -> {
latch.countDown();
assertThat(body, is(equalTo("body 1")));
latch.countDown();
if (latch.getCount() == 0) {
testDone.complete(null);
}
}).exceptionally((ex) -> {
testDone.completeExceptionally(ex);
return null;
});
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(
SUBSCRIBER_TYPE.INFINITE, consumer);
partsPublisher(boundary, chunk1).subscribe(testSubscriber);
waitOnLatch(latch);
assertThat(testSubscriber.error, is(nullValue()));
assertThat(testSubscriber.complete, is(equalTo(true)));
testDone.orTimeout(5, TimeUnit.SECONDS).join();
try {
boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
assertThat(b, is(equalTo(true)));
} catch(CompletionException error) {
assertThat(error, is(nullValue()));
}
}
@Test
@@ -108,9 +122,13 @@ public class MultiPartDecoderTest {
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.INFINITE, consumer);
partsPublisher(boundary, chunk1).subscribe(testSubscriber);
try {
boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
assertThat(b, is(equalTo(true)));
} catch(CompletionException error) {
assertThat(error, is(nullValue()));
}
waitOnLatch(latch);
assertThat(testSubscriber.error, is(nullValue()));
assertThat(testSubscriber.complete, is(equalTo(true)));
}
@Test
@@ -138,9 +156,53 @@ public class MultiPartDecoderTest {
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.INFINITE, consumer);
partsPublisher(boundary, List.of(chunk1, chunk2)).subscribe(testSubscriber);
try {
boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
assertThat(b, is(equalTo(true)));
} catch(CompletionException error) {
assertThat(error, is(nullValue()));
}
waitOnLatch(latch);
}
@Test
public void testContentAcrossChunksAsyncRequest() {
String boundary = "boundary";
final byte[] chunk1 = ("--" + boundary + "\n"
+ "Content-Id: part1\n"
+ "\n"
+ "thi").getBytes();
final byte[] chunk11 = ("s-is-the-1st-slice-of-the-body\n").getBytes();
final byte[] chunk12 = ("t").getBytes();
final byte[] chunk2 = ("his-is-the-2nd-slice-of-the-body\n"
+ "--" + boundary + "--").getBytes();
final CountDownLatch latch = new CountDownLatch(2);
Consumer<BodyPart> consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
DataChunkSubscriber subscriber = new DataChunkSubscriber();
part.content().subscribe(subscriber);
subscriber.content().thenAccept(body -> {
assertThat(body, is(equalTo(
"this-is-the-1st-slice-of-the-body\n"
+ "this-is-the-2nd-slice-of-the-body")));
}).exceptionally((ex) -> {
System.out.println("UH-OH... " + ex);
return null;
}).thenAccept((_i) -> {
latch.countDown();
});
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.ONE_BY_ONE, consumer);
partsPublisher(boundary, List.of(chunk1, chunk11, chunk12, chunk2)).subscribe(testSubscriber);
try {
boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
assertThat(b, is(equalTo(true)));
} catch(CompletionException error) {
assertThat(error, is(nullValue()));
}
waitOnLatch(latch);
assertThat(testSubscriber.error, is(nullValue()));
assertThat(testSubscriber.complete, is(equalTo(true)));
}
@Test
@@ -170,9 +232,13 @@ public class MultiPartDecoderTest {
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.INFINITE, consumer);
partsPublisher(boundary, List.of(chunk1, chunk2, chunk3, chunk4, chunk5)).subscribe(testSubscriber);
try {
boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
assertThat(b, is(equalTo(true)));
} catch(CompletionException error) {
assertThat(error, is(nullValue()));
}
waitOnLatch(latch);
assertThat(testSubscriber.error, is(nullValue()));
assertThat(testSubscriber.complete, is(equalTo(true)));
}
@Test
@@ -211,9 +277,13 @@ public class MultiPartDecoderTest {
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.ONE_BY_ONE, consumer);
partsPublisher(boundary, chunk1).subscribe(testSubscriber);
try {
boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
assertThat(b, is(equalTo(true)));
} catch(CompletionException error) {
assertThat(error, is(nullValue()));
}
waitOnLatch(latch);
assertThat(testSubscriber.error, is(nullValue()));
assertThat(testSubscriber.complete, is(equalTo(true)));
}
@Test
@@ -257,12 +327,34 @@ public class MultiPartDecoderTest {
+ "\n"
+ "<foo>bar</foo>\n").getBytes();
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.CANCEL_AFTER_ONE, null);
DataChunkSubscriber s1 = new DataChunkSubscriber();
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.ONE_BY_ONE, p -> p.content().subscribe(s1));
partsPublisher(boundary, chunk1).subscribe(testSubscriber);
assertThat(testSubscriber.complete, is(equalTo(false)));
assertThat(testSubscriber.error, is(notNullValue()));
assertThat(testSubscriber.error.getClass(), is(equalTo(MimeParser.ParsingException.class)));
assertThat(testSubscriber.error.getMessage(), is(equalTo("No closing MIME boundary")));
try {
s1.future.orTimeout(100, TimeUnit.MILLISECONDS).join();
throw new IllegalStateException("Should have terminated exceptionally");
} catch(CompletionException e) {
Throwable error = e.getCause();
assertThat(error.getClass(), is(equalTo(MimeParser.ParsingException.class)));
assertThat(error.getMessage(), is(equalTo("No closing MIME boundary")));
}
// CANCEL_AFTER_ONE emits cancel as soon as the first part is arrived.
// Once testSubscriber notified it is cancelled, no signals are guaranteed to arrive to it, so one cannot
// expect error to be signalled to it.
//
// One should expect the error to arrive to the inner subscriber, but here it is not set at all - the
// inner subscriber does not request the body, so the absence of any content after what has been published
// is not guaranteed. (Subscribers are required to eventually either issue a request, or cancel)
try {
testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
throw new IllegalStateException("Not expecting to terminate normally");
} catch(CompletionException e) {
Throwable error = e.getCause();
assertThat(error, is(notNullValue()));
assertThat(error.getClass(), is(equalTo(MimeParser.ParsingException.class)));
assertThat(error.getMessage(), is(equalTo("No closing MIME boundary")));
}
}
@Test
@@ -312,8 +404,14 @@ public class MultiPartDecoderTest {
partsPublisher(boundary, List.of(chunk1, chunk2, chunk3, chunk4)).subscribe(testSubscriber);
waitOnLatchNegative(latch, "the 2nd part should not be processed");
assertThat(latch.getCount(), is(equalTo(1L)));
assertThat(testSubscriber.error, is(nullValue()));
assertThat(testSubscriber.complete, is(equalTo(false)));
try {
testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
throw new IllegalStateException("Not expecting to make progress, unless the part is consumed");
} catch(CompletionException e) {
Throwable error = e.getCause();
// This is the expected outcome - the testSubcriber is not making progress
assertThat(error.getClass(), is(equalTo(TimeoutException.class)));
}
}
@Test
@@ -322,9 +420,14 @@ public class MultiPartDecoderTest {
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.INFINITE, null);
decoder.subscribe(testSubscriber);
Multi.<DataChunk>error(new IllegalStateException("oops")).subscribe(decoder);
assertThat(testSubscriber.complete, is(equalTo(false)));
assertThat(testSubscriber.error, is(notNullValue()));
assertThat(testSubscriber.error.getMessage(), is(equalTo("oops")));
try {
testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
throw new IllegalStateException("Normal termination is not expected");
} catch(CompletionException e) {
Throwable error = e.getCause();
assertThat(error, is(notNullValue()));
assertThat(error.getMessage(), is(equalTo("oops")));
}
}
@Test
@@ -355,9 +458,9 @@ public class MultiPartDecoderTest {
private final SUBSCRIBER_TYPE subscriberType;
private final Consumer<BodyPart> consumer;
private Subscription subcription;
private Throwable error;
private boolean complete;
private Subscription subscription;
public CompletableFuture<Boolean> complete = new CompletableFuture<>();
public CompletableFuture<Void> cancelled = new CompletableFuture<>();
BodyPartSubscriber(SUBSCRIBER_TYPE subscriberType, Consumer<BodyPart> consumer) {
this.subscriberType = subscriberType;
@@ -366,7 +469,7 @@ public class MultiPartDecoderTest {
@Override
public void onSubscribe(Subscription subscription) {
this.subcription = subscription;
this.subscription = subscription;
if (subscriberType == SUBSCRIBER_TYPE.INFINITE) {
subscription.request(Long.MAX_VALUE);
} else {
@@ -381,20 +484,21 @@ public class MultiPartDecoderTest {
}
consumer.accept(item);
if (subscriberType == SUBSCRIBER_TYPE.ONE_BY_ONE) {
subcription.request(1);
subscription.request(1);
} else if (subscriberType == SUBSCRIBER_TYPE.CANCEL_AFTER_ONE) {
subcription.cancel();
subscription.cancel();
cancelled.complete(null);
}
}
@Override
public void onError(Throwable ex) {
error = ex;
complete.completeExceptionally(ex);
}
@Override
public void onComplete() {
complete = true;
complete.complete(true);
}
}
@@ -482,16 +586,26 @@ public class MultiPartDecoderTest {
static class DataChunkSubscriber implements Subscriber<DataChunk> {
private final StringBuilder sb = new StringBuilder();
private final CompletableFuture<String> future = new CompletableFuture<>();
public final CompletableFuture<String> future = new CompletableFuture<>();
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(DataChunk item) {
sb.append(item.bytes());
sb.append(new String(item.bytes()));
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10);
} catch(Exception e) {
}
subscription.request(1);
return 0;
});
}
@Override