diff --git a/docs-internal/multipartdecoder.md b/docs-internal/multipartdecoder.md
new file mode 100644
index 000000000..bd88fbf01
--- /dev/null
+++ b/docs-internal/multipartdecoder.md
@@ -0,0 +1,489 @@
+# io.helidon.media.multipart.MultiPartDecoder
+
+This document provides additional details about the implementation of `MultiPartDecoder`.
+
+## Design considerations
+
+Reactive `Processor` should assume it is used concurrently, and yet deliver signals to downstream in
+a total order. There are a few other considerations stemming from the reactive specification.
+
+When an error occurs it must be routed downstream, this `Processor` may have more than one downstream over time, thus
+a given error may be signaled to many subscribers if needed.
+
+`Subscriber` may cancel their `Subscription`. This should translate into a cancellation of upstream
+subscription at the appropriate time: inner `Subscriber` should allow the outer `Subscriber` to make
+progress; outer Subscribers should not cancel upstream subscription while the inner `Subscriber` may
+need to interact with upstream to make progress.
+
+`Subscriber` may issue bad requests. This should translate into a cancellation of upstream
+subscription at the appropriate time: inner `Subscriber` should allow the outer `Subscriber` to make
+progress; outer `Subscriber` should not generate errors that can be seen by inner `Subscriber`.
+
+Resources pinned by this `Processor` should be released as soon as they are not needed, and it is
+practical to do so: subsequent requests or cancellations may occur at some arbitrary time in the
+future.
+
+Whenever this `Processor` is known to have entered a terminal state (including cancellation or bad request),
+ it must release any resources.
+
+Since we are essentially dealing with `DataChunk`, need to keep track of who owns the `DataChunk` - that is,
+ whose responsibility it is to release it (important for cases when `DataChunk` is backed by Netty buffers).
+
+In this implementation all interactions with upstream, parser, or any of the `Subscriber` is done
+in `drainBoth()`, which is guaranteed to be executed single-threadedly, with appropriate memory
+fences between any two invocations of `drainBoth()`. This allows much of the state to be implemented as
+non-thread-safe data structures. Additionally, the operation of the `Processor` can be understood
+by observing `drainBoth()` method alone. The rest then is just a way to cause `drainBoth()` to make further
+state transitions.
+
+The state is described by:
+- error: for errors that need to be signalled to both inner and outer `Subscriber` (produced by the parser or upstream)
+- cancelled: for cancellations signalled by outer `Subscriber`
+- parser: a helper object to capture parser state across multiple `DataChunk`
+- iterator: parser iterator that holds `ParserEvents` and is used to transition parser state
+- partsRequested: for outer `Subscriber` to indicate demand for MIME parts
+- demand for DataChunks by inner `Subscriber` (exposed by DataChunkPublisher through API)
+
+Whenever any of these change, `drain()` is called to enter `drainBoth()` or demand to re-do it again, if
+a thread already inside `drainBoth()` is detected.
+
+Additionally, special care is taken when dealing with:
+- `upstream`: to interact with upstream
+- `downstream`: outer `Subscriber`
+- `bodyPartPublisher`: a special `Publisher` that interacts with inner `Subscriber`
+
+At high level, `drainBoth()` operates like a flat map of a stream of `DataChunk` into a stream of
+`ParserEvents`: `[DataChunk]` -> `[[ParserEvent]]` -> `[ParserEvent]`, which then is fanned out into a stream
+of streams of DataChunk: `[ParserEvent]` -> `[ReadableBodyPart]`, which is essentially
+`[ParserEvent]` -> `[[DataChunk]]`. In fact, if not for resource management and the `Processor` interface,
+it could have been constructed as a composition of existing reactive components.
+
+The explanation here may appear in reverse order to what `drainBoth()` is doing, but it may be easier to
+see it in this order, the goals from high level to low level:
+
+- `DataChunk` are requested from upstream one at a time
+ - this way we do not retain too many `DataChunk`, and flattening `[[ParserEvent]]` is trivial
+ - this is ensued by inner and outer `Subscriber` detecting when the demand changes from zero
+ - additionally, the demand of the outer `Subscriber` can become zero only after the next part is done ; this means
+ that the demand of the outer `Subscriber` is essentially unable to issue upstream request until after the inner
+ `Subscriber` is done
+- `DataChunk` are not requested, nor any errors are signalled, while the parser iterator is able to
+ produce more ParserEvents
+ - all `onError` events are totally ordered after all possible `onNext` that can be emitted without
+ requesting more `DataChunk` from upstream
+- parser iterator does not produce more events, unless there is evidence of demand from inner or
+ outer `Subscriber`
+ - outer `Subscriber` demand is ignored while there is a `bodyPartPublisher` responsible for dealing with
+ the demand of an inner `Subscriber`
+ - cancellation or error state of inner `Subscriber` appears to `drainBoth()` as a demand for infinite number
+ of `DataChunk`; this way we can make progress to the end of the MIME part, and serve the demand of the outer
+ `Subscriber` if any
+ - inner `Subscriber` demand is witnessed by inner `Subscriber` calling `drain()`, and that observing that
+ `bodyPartPublisher` is unable to satisfy the demand
+- parser iterator is not asked for more events, while there is a `bodyPartPublisher` and it satisfies
+ the demand for `DataChunk` by inner `Subscriber` by the `DataChunk` already given to it
+
+## DataChunkPublisher
+
+Inner `Subscriber` is dealt with using `DataChunkPublisher`. Essentially, it is a flat map
+`[[DataChunk]]` -> `[DataChunk]` (given iterators of `BufferEntry`, one at a time, emits `DataChunk` one at a
+time). In fact, if not for resource management, it could have been constructed using existing reactive
+components.
+
+The design is very simple:
+- keep track of change of demand and cancellations of inner `Subscriber`
+- expose methods to allow total order of signals emitted by `drainBoth()`
+- when cancelled, or a bad request is received, appear as unlimited unsatisfied demand, and merely discard
+ all `DataChunk` that are received
+- relies on `drainBoth()` not attempting to deliver `onError` before the previous iterator of `BufferEntry` is
+ emptied ; this simplifies resource management
+
+## Initialization
+
+Both `MultiPartDecoder` and `DataChunkPublisher` share a similar approach: they have an atomic counter that:
+- is initialized to a value indicating the uninitialized state that can never occur naturally throughout the
+`Publisher` lifetime
+- can be transitioned into "subscribed" state once and only once in its lifetime
+- is finally transitioned into initialized state only after `onSubscribe` has returned.
+
+This allows to ensure that no more than one `Subscriber` is associated with the `Publisher` and enforce the rule that
+ all on* signals get delivered only after `onSubscribe` and none during `onSubscribe`. Concurrent cases are commonly
+ omitted, but here we do take care of them - hence it looks a little more complex than others.
+
+`DataChunkPublisher` is pretty much done at that stage. `MultiPartDecoder` needs a bit more explanation, as it
+has two ends that need initializing:
+- upstream signalling `onSubscribe`, potentially immediately followed by `onError` or `onComplete` for
+ empty upstream
+- downstream outer `Subscriber` being attached by `subscribe()`
+
+The use of `contenders` atomic counter allows to synchronize all these.
+
+The partial order of possible events is:
+
+```
+ uninitialized
+ | |
+ .-------------------------------' `----------------------------.
+ | |
+ V V
+subscribe(...) --> halfInit(UPSTREAM_INIT) --> deferredInit() deferredInit() <-- halfInit(DOWNSTREAM_INIT) <-- onSubscribe(...)
+ | | | |
+ V | onError / onComplete | V
+subscribe(...) --> !halfInit(UPSTREAM_INIT) | | | !halfInit(DOWNSTREAM_INIT) <-- onSubscribe(...)
+ | | | request | |
+ V | | | | V
+subscribe(...) --> !halfInit(UPSTREAM_INIT) `--. | | .--' !halfInit(DOWNSTREAM_INIT) <-- onSubscribe(...)
+ | | | | | |
+ V V V V V V
+ ... atomic update of contenders ...
+ |
+ V
+ contenders >= 0
+ |
+ V
+ initialized
+```
+
+`halfInit()` ensures that one and only one of `UPSTREAM_INIT` / `DOWNSTREAM_INIT` returns true, and any subsequent
+ future invocations with the same argument get false.
+
+Of all atomic updates of contenders counter only the updates by `deferredInit()` are able to turn the value
+into a non-negative. All other updates observe they are "locked out" from entering `drainBoth()`. The second
+`deferredInit()` can witness if any of `onError`/`onComplete`/`request` happened, and enter `drainBoth()` on their
+behalf.
+
+Uninitialized state is represented by `Integer.MIN_VALUE` (`0b1000`). Each end attempts to transition to
+half-initialized for their end, unless it is already initialized. It is safe to enter `drainBoth()`
+only after both ends have initialized, so the number of ends that have been initialized is tracked as
+the fourth bit: each end tries to add `SUBSCRIPTION_LOCK` (`0b0001`). Before the second of them,
+the counter necessarily appears as `0b1111`, and adding `SUBSCRIPTION_LOCK` clears all the high
+bits, leaving only zero, unless there were already attempts to enter `drainBoth()` by outer Subscriber
+requesting parts as part of `onSubscribe`, or by upstream delivering `onError` or `onComplete`, which are
+allowed to occur without requests from downstream.
+
+## Normal flow
+
+```
+upstream outer Subscriber bodyPartPublisher inner Subscriber
+
+ initialized request
+ | |
+ V V
+ upstream.request(1)
+ |
+ .--------------'
+ |
+ V
+ onNext --> parserIterator = parser.parseIterator
+ |
+ ...
+ |
+ V
+ parserIterator.next() == END_HEADERS
+ |
+ V
+ bodyPartPublisher = new DataChunkPublisher
+ |
+ V
+ onNext ----------------------> onSubscribe ---------------> request
+ | |
+ .--------------------------------+--------------------------'
+ |
+ V
+ parserIterator.next() == BODY
+ |
+ V
+ enter bodyPartPublisher.drain() ----------> onNext
+ |
+ V
+ onNext
+ |
+ ...
+ |
+ .--------------------------------'
+ |
+ V
+ return !bodyPartPublisher.drain() // effectively wait for request request
+ |
+ .-----------------------------------------------------------------'
+ |
+ V
+ enter bodyPartPublisher.drain() ----------> onNext
+ |
+ .--------------------------------'
+ |
+ V
+ return bodyPartPublisher.drain()
+ |
+ V
+ parserIterator.next() == BODY
+ |
+ V
+ enter bodyPartPublisher.drain() ----------> onNext
+ |
+ .--------------------------------'
+ |
+ ...
+ |
+ V
+ return bodyPartPublisher.drain()
+ |
+ V
+ !parserIterator.hasNext
+ |
+ V
+ upstream.request(1)
+ |
+ .---------------'
+ |
+ V
+ onNext --> parserIterator = parser.parseIterator
+ |
+ ...
+ |
+ V
+ parserIterator.next() == END_PART
+ |
+ V
+ bodyPartPublisher.complete() --------> onComplete
+ |
+ .--------------------------------'
+ |
+ V
+ partRequested == 0 // essentially wait for request for more parts
+
+ request
+ |
+ V
+ partsRequested > 0
+ |
+ V
+ parserIterator.hasNext
+ |
+ ...
+ |
+ V
+ !parserIterator.hasNext
+ |
+ V
+ partsRequested > 0
+ |
+ V
+ upstream.request(1)
+ |
+ .---------------'
+ |
+ V
+ onNext --> parserIterator = parser.parseIterator
+ | |
+ ... ...
+ | |
+ V |
+ onComplete --------+
+ |
+ ...
+ |
+ V
+ !parserIterator.hasNext
+ |
+ V
+ onComplete
+```
+
+## Errors and cancellations
+
+Inner `Subscriber` makes a bad request:
+(This state is not visible to anyone but `bodyPartPublisher` - the inner `Subscription`
+appears as the one with the forever unsatisfied demand)
+
+```
+bodyPartPublisher inner Subscriber
+
+ ... request(0 or negative)
+ | |
+ +---------------------'
+ |
+ ...
+ |
+ V
+ return bodyPartPublisher.drain() // always returns true without interacting with inner Subscriber
+ |
+ ...
+ |
+ V
+ parserIterator.next() == END_PART
+ |
+ V
+ bodyPartPublisher.complete --> onError
+ |
+ ...
+```
+
+Inner `Subscriber` cancels:
+(This state is not visible to anyone but `bodyPartPublisher` - the inner `Subscription`
+appears as the one with the forever unsatisfied demand)
+
+```
+bodyPartPublisher inner Subscriber
+
+ ... cancel
+ | |
+ +---------------------'
+ |
+ ...
+ |
+ V
+ return bodyPartPublisher.drain() // always returns true without interacting with inner Subscriber
+ |
+ ...
+ |
+ V
+ parserIterator.next() == END_PART
+ |
+ V
+ bodyPartPublisher.complete
+ |
+ ...
+```
+
+Outer `Subscriber` cancels:
+(it is difficult to depict the absence of events signalled to downstream exactly)
+
+```
+upstream outer Subscriber
+
+ cancel
+ |
+ ...
+ |
+ V
+ bodyPartPublisher == null
+ |
+ V
+ upstream.cancel
+ |
+ V
+ cleanup
+ |
+ V
+ ... cancelled
+ | |
+ V V
+ onNext ----> parserIterator = parser.parseIterator // may throw
+ | |
+ V |
+ onError / V
+onComplete --> upstream.cancel
+ |
+ V
+ cleanup
+```
+
+Outer `Subscriber` makes a bad request:
+(As soon as the current part is done, report `onError`, and appear to upstream
+as a cancelled `Subscription` after that)
+
+```
+upstream outer Subscriber
+
+ request(0 or negative)
+ |
+ ...
+ |
+ V
+ bodyPartPublisher == null
+ |
+ V
+ upstream.cancel
+ |
+ V
+ onError
+ |
+ V
+ cleanup
+ |
+ V
+ ... cancelled
+ | |
+ V V
+ onNext ----> parserIterator = parser.parseIterator // may throw
+ | |
+ V |
+ onError / V
+onComplete --> upstream.cancel
+ |
+ V
+ cleanup
+```
+
+Upstream reports `onError`:
+
+```
+upstream outer Subscriber bodyPartPublisher inner Subscriber
+
+ ... ...
+ | |
+ V |
+ onError --------+
+ |
+ ...
+ |
+ V
+ !parserIterator.hasNext
+ | |
+ | V
+ | bodyPartPublisher != null ---> onError
+ |
+ V
+ onError
+ |
+ V
+ upstream.cancel
+ |
+ V
+ cancelled
+```
+
+Parser throws:
+(As soon as the next parser event is requested. Report to inner and outer `Subscriber`,
+appear to upstream as a cancelled `Subscription`)
+
+```
+upstream outer Subscriber bodyPartPublisher
+
+ ...
+ |
+ V
+ parser or parserIterator throws
+ |
+ V
+ parserIterator = EMPTY_ITER
+ |
+ V
+ !parserIterator.hasNext
+ | |
+ | V
+ | bodyPartPublisher != null ---> onError
+ |
+ V
+ upstream.cancel
+ |
+ V
+ onError
+ |
+ V
+ cleanup
+ |
+ V
+ ... cancelled
+ | |
+ V V
+ onNext ----> parserIterator = parser.parseIterator // may throw
+ | |
+ V |
+ onError / V
+onComplete --> upstream.cancel
+ |
+ V
+ cleanup
+```
\ No newline at end of file
diff --git a/media/multipart/src/main/java/io/helidon/media/multipart/MimeParser.java b/media/multipart/src/main/java/io/helidon/media/multipart/MimeParser.java
index 66563a22f..2ec008032 100644
--- a/media/multipart/src/main/java/io/helidon/media/multipart/MimeParser.java
+++ b/media/multipart/src/main/java/io/helidon/media/multipart/MimeParser.java
@@ -19,6 +19,7 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -58,10 +59,9 @@ final class MimeParser {
END_HEADERS,
/**
- * This event is issued for each part chunk parsed. The event
- * It may be generated more than once for each part.
+ * This event is used by the Iterator to pass the whole body.
*/
- CONTENT,
+ BODY,
/**
* This event is issued when the content for a part is complete.
@@ -74,21 +74,6 @@ final class MimeParser {
* only once.
*/
END_MESSAGE,
-
- /**
- * This event is issued when there is not enough data in the buffer to
- * continue parsing. If issued after:
- *
- * - {@link #START_MESSAGE} - the parser did not detect the end of
- * the preamble
- * - {@link #HEADER} - the parser
- * did not detect the blank line that separates the part headers and the
- * part body
- * - {@link #CONTENT} - the parser did not
- * detect the next starting boundary or closing boundary
- *
- */
- DATA_REQUIRED
}
/**
@@ -111,21 +96,11 @@ final class MimeParser {
}
/**
- * Get this event as a {@link ContentEvent}.
- *
- * @return ContentEvent
+ * Get this event as a {@link BodyEvent}.
+ * @return HeaderEvent
*/
- ContentEvent asContentEvent() {
- return (ContentEvent) this;
- }
-
- /**
- * Get this event as a {@link DataRequiredEvent}.
- *
- * @return DataRequiredEvent
- */
- DataRequiredEvent asDataRequiredEvent() {
- return (DataRequiredEvent) this;
+ BodyEvent asBodyEvent() {
+ return (BodyEvent) this;
}
}
@@ -199,23 +174,23 @@ final class MimeParser {
}
/**
- * The event class for {@link EventType#CONTENT}.
+ * The event class for {@link EventType#BODY}.
*/
- static final class ContentEvent extends ParserEvent {
+ static final class BodyEvent extends ParserEvent {
- private final VirtualBuffer.BufferEntry bufferEntry;
+ private final List buffers;
- ContentEvent(VirtualBuffer.BufferEntry data) {
- this.bufferEntry = data;
+ BodyEvent(List data) {
+ this.buffers = data;
}
- VirtualBuffer.BufferEntry content() {
- return bufferEntry;
+ List body() {
+ return buffers;
}
@Override
EventType type() {
- return EventType.CONTENT;
+ return EventType.BODY;
}
}
@@ -247,43 +222,6 @@ final class MimeParser {
}
}
- /**
- * The event class for {@link EventType#DATA_REQUIRED}.
- */
- static final class DataRequiredEvent extends ParserEvent {
-
- private final boolean content;
-
- private DataRequiredEvent(boolean content) {
- this.content = content;
- }
-
- /**
- * Indicate if the required data is for the body content of a part.
- * @return {@code true} if for body content, {@code false} otherwise
- */
- boolean isContent() {
- return content;
- }
-
- @Override
- EventType type() {
- return EventType.DATA_REQUIRED;
- }
- }
-
- /**
- * Callback interface to the parser.
- */
- interface EventProcessor {
-
- /**
- * Process a parser event.
- * @param event generated event
- */
- void process(ParserEvent event);
- }
-
/**
* MIME Parsing exception.
*/
@@ -391,17 +329,11 @@ final class MimeParser {
*/
private boolean closed;
- /**
- * The event listener.
- */
- private final EventProcessor listener;
-
/**
* Parses the MIME content.
*/
- MimeParser(String boundary, EventProcessor eventListener) {
+ MimeParser(String boundary) {
bndbytes = getBytes("--" + boundary);
- listener = eventListener;
bl = bndbytes.length;
gss = new int[bl];
buf = new VirtualBuffer();
@@ -445,11 +377,10 @@ final class MimeParser {
* or {@code START_MESSAGE}
*/
void close() throws ParsingException {
+ cleanup();
switch (state) {
case START_MESSAGE:
case END_MESSAGE:
- closed = true;
- buf.clear();
break;
case DATA_REQUIRED:
switch (resumeState) {
@@ -468,130 +399,159 @@ final class MimeParser {
}
}
+ /**
+ * Like close(), but just releases resources and does not throw.
+ */
+ void cleanup() {
+ closed = true;
+ buf.clear();
+ }
+
/**
* Advances parsing.
* @throws ParsingException if an error occurs during parsing
*/
- void parse() throws ParsingException {
- try {
- while (true) {
- switch (state) {
- case START_MESSAGE:
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.log(Level.FINER, "state={0}", STATE.START_MESSAGE);
- }
- state = STATE.SKIP_PREAMBLE;
- listener.process(START_MESSAGE_EVENT);
- break;
+ Iterator parseIterator() {
+ return new Iterator<>() {
- case SKIP_PREAMBLE:
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.log(Level.FINER, "state={0}", STATE.SKIP_PREAMBLE);
- }
- skipPreamble();
- if (bndStart == -1) {
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
- }
- state = STATE.DATA_REQUIRED;
- resumeState = STATE.SKIP_PREAMBLE;
- listener.process(new DataRequiredEvent(false));
- return;
- }
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.log(Level.FINE, "Skipped the preamble. position={0}", position);
- }
- state = STATE.START_PART;
- break;
+ private ParserEvent nextEvent;
+ private boolean done;
- // fall through
- case START_PART:
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.log(Level.FINER, "state={0}", STATE.START_PART);
- }
- state = STATE.HEADERS;
- listener.process(START_PART_EVENT);
- break;
+ @Override
+ public ParserEvent next() {
+ if (!hasNext()) {
+ throw new IllegalStateException("Read past end of stream");
+ }
+ ParserEvent ne = nextEvent;
+ nextEvent = null;
+ done = ne == END_MESSAGE_EVENT;
+ return ne;
+ }
- case HEADERS:
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.log(Level.FINER, "state={0}", STATE.HEADERS);
- }
- String headerLine = readHeaderLine();
- if (headerLine == null) {
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
- }
- state = STATE.DATA_REQUIRED;
- resumeState = STATE.HEADERS;
- listener.process(new DataRequiredEvent(false));
- return;
- }
- if (!headerLine.isEmpty()) {
- Hdr header = new Hdr(headerLine);
- listener.process(new HeaderEvent(header.name(), header.value()));
- break;
- }
- state = STATE.BODY;
- bol = true;
- listener.process(END_HEADERS_EVENT);
- break;
+ @Override
+ public boolean hasNext() {
+ if (nextEvent != null) {
+ return true;
+ }
- case BODY:
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.log(Level.FINER, "state={0}", STATE.BODY);
- }
- List bodyContent = readBody();
- if (bndStart == -1 || bodyContent.isEmpty()) {
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
- }
- state = STATE.DATA_REQUIRED;
- resumeState = STATE.BODY;
- if (bodyContent.isEmpty()) {
- listener.process(new DataRequiredEvent(true));
- return;
- }
- } else {
- bol = false;
- }
- for (VirtualBuffer.BufferEntry content : bodyContent) {
- listener.process(new ContentEvent(content));
- }
- break;
+ try {
+ while (true) {
+ switch (state) {
+ case START_MESSAGE:
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.log(Level.FINER, "state={0}", STATE.START_MESSAGE);
+ }
+ state = STATE.SKIP_PREAMBLE;
+ nextEvent = START_MESSAGE_EVENT;
+ return true;
- case END_PART:
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.log(Level.FINER, "state={0}", STATE.END_PART);
- }
- if (done) {
- state = STATE.END_MESSAGE;
- } else {
- state = STATE.START_PART;
- }
- listener.process(END_PART_EVENT);
- break;
+ case SKIP_PREAMBLE:
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.log(Level.FINER, "state={0}", STATE.SKIP_PREAMBLE);
+ }
+ skipPreamble();
+ if (bndStart == -1) {
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
+ }
+ state = STATE.DATA_REQUIRED;
+ resumeState = STATE.SKIP_PREAMBLE;
+ return false;
+ }
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.log(Level.FINE, "Skipped the preamble. position={0}", position);
+ }
+ state = STATE.START_PART;
+ break;
- case END_MESSAGE:
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.log(Level.FINER, "state={0}", STATE.END_MESSAGE);
+ // fall through
+ case START_PART:
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.log(Level.FINER, "state={0}", STATE.START_PART);
+ }
+ state = STATE.HEADERS;
+ nextEvent = START_PART_EVENT;
+ return true;
+
+ case HEADERS:
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.log(Level.FINER, "state={0}", STATE.HEADERS);
+ }
+ String headerLine = readHeaderLine();
+ if (headerLine == null) {
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
+ }
+ state = STATE.DATA_REQUIRED;
+ resumeState = STATE.HEADERS;
+ return false;
+ }
+ if (!headerLine.isEmpty()) {
+ Hdr header = new Hdr(headerLine);
+ nextEvent = new HeaderEvent(header.name(), header.value());
+ return true;
+ }
+ state = STATE.BODY;
+ bol = true;
+ nextEvent = END_HEADERS_EVENT;
+ return true;
+
+ case BODY:
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.log(Level.FINER, "state={0}", STATE.BODY);
+ }
+ List bodyContent = readBody();
+ if (bndStart == -1 || bodyContent.isEmpty()) {
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.log(Level.FINER, "state={0}", STATE.DATA_REQUIRED);
+ }
+ state = STATE.DATA_REQUIRED;
+ resumeState = STATE.BODY;
+ if (bodyContent.isEmpty()) {
+ return false;
+ }
+ } else {
+ bol = false;
+ }
+ nextEvent = new BodyEvent(bodyContent);
+ return true;
+
+ case END_PART:
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.log(Level.FINER, "state={0}", STATE.END_PART);
+ }
+ if (MimeParser.this.done) {
+ state = STATE.END_MESSAGE;
+ } else {
+ state = STATE.START_PART;
+ }
+ nextEvent = END_PART_EVENT;
+ return true;
+
+ case END_MESSAGE:
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.log(Level.FINER, "state={0}", STATE.END_MESSAGE);
+ }
+ if (done) {
+ return false;
+ }
+ nextEvent = END_MESSAGE_EVENT;
+ return true;
+
+ case DATA_REQUIRED:
+ return false;
+
+ default:
+ // nothing to do
}
- listener.process(END_MESSAGE_EVENT);
- return;
-
- case DATA_REQUIRED:
- listener.process(new DataRequiredEvent(resumeState == STATE.BODY));
- return;
-
- default:
- // nothing to do
+ }
+ } catch (ParsingException ex) {
+ throw ex;
+ } catch (Throwable ex) {
+ throw new ParsingException(ex);
}
}
- } catch (ParsingException ex) {
- throw ex;
- } catch (Throwable ex) {
- throw new ParsingException(ex);
- }
+ };
}
/**
@@ -740,6 +700,7 @@ final class MimeParser {
* from the buffer
*/
private String readHeaderLine() {
+ // FIXME: what about multi-line headers?
int bufLen = buf.length();
// need more data to progress
// need at least one blank line to read (no headers)
diff --git a/media/multipart/src/main/java/io/helidon/media/multipart/MultiPartDecoder.java b/media/multipart/src/main/java/io/helidon/media/multipart/MultiPartDecoder.java
index 0860668a6..8e75d9241 100644
--- a/media/multipart/src/main/java/io/helidon/media/multipart/MultiPartDecoder.java
+++ b/media/multipart/src/main/java/io/helidon/media/multipart/MultiPartDecoder.java
@@ -18,16 +18,17 @@ package io.helidon.media.multipart;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Processor;
+import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import io.helidon.common.http.DataChunk;
-import io.helidon.common.reactive.BufferedEmittingPublisher;
+import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.SubscriptionHelper;
import io.helidon.media.common.MessageBodyReadableContent;
import io.helidon.media.common.MessageBodyReaderContext;
@@ -35,20 +36,29 @@ import io.helidon.media.multipart.VirtualBuffer.BufferEntry;
/**
* Reactive processor that decodes HTTP payload as a stream of {@link BodyPart}.
+ *
+ * This implementation is documented here {@code /docs-internal/multipartdecoder.md}.
*/
public class MultiPartDecoder implements Processor {
- private Subscription upstream;
+ private static final int DOWNSTREAM_INIT = Integer.MIN_VALUE >>> 1;
+ private static final int UPSTREAM_INIT = Integer.MIN_VALUE >>> 2;
+ private static final int SUBSCRIPTION_LOCK = Integer.MIN_VALUE >>> 3;
+ private static final Iterator EMPTY_BUFFER_ENTRY_ITERATOR = new EmptyIterator<>();
+ private static final Iterator EMPTY_PARSER_ITERATOR = new EmptyIterator<>();
+
+ private volatile Subscription upstream;
private Subscriber super ReadableBodyPart> downstream;
- private BufferedEmittingPublisher emitter;
private ReadableBodyPart.Builder bodyPartBuilder;
private ReadableBodyPartHeaders.Builder bodyPartHeaderBuilder;
- private BufferedEmittingPublisher bodyPartPublisher;
- private final CompletableFuture> initFuture;
- private final LinkedList bodyParts;
+ private DataChunkPublisher bodyPartPublisher;
+ private Iterator parserIterator = EMPTY_PARSER_ITERATOR;
+ private volatile Throwable error;
+ private boolean cancelled;
+ private AtomicInteger contenders = new AtomicInteger(Integer.MIN_VALUE);
+ private AtomicLong partsRequested = new AtomicLong();
private final HashMap chunksByIds;
private final MimeParser parser;
- private final ParserEventProcessor parserEventProcessor;
private final MessageBodyReaderContext context;
/**
@@ -61,10 +71,7 @@ public class MultiPartDecoder implements Processor
Objects.requireNonNull(boundary, "boundary cannot be null!");
Objects.requireNonNull(context, "context cannot be null!");
this.context = context;
- parserEventProcessor = new ParserEventProcessor();
- parser = new MimeParser(boundary, parserEventProcessor);
- initFuture = new CompletableFuture<>();
- bodyParts = new LinkedList<>();
+ parser = new MimeParser(boundary);
chunksByIds = new HashMap<>();
}
@@ -82,18 +89,47 @@ public class MultiPartDecoder implements Processor
@Override
public void subscribe(Subscriber super ReadableBodyPart> subscriber) {
Objects.requireNonNull(subscriber);
- if (emitter != null || downstream != null) {
- subscriber.onSubscribe(SubscriptionHelper.CANCELED);
- subscriber.onError(new IllegalStateException("Only one Subscriber allowed"));
+ if (!halfInit(UPSTREAM_INIT)) {
+ Multi.error(new IllegalStateException("Only one Subscriber allowed"))
+ .subscribe(subscriber);
return;
}
this.downstream = subscriber;
+
+ // contenders < 0, so any part request will be deferred
+ // drain() will not request anything from upstream until the second deferredInit() invocation witnesses
+ // that upstream is set
+ downstream.onSubscribe(new Subscription() {
+
+ @Override
+ public void request(long n) {
+ long curr = n <= 0
+ ? partsRequested.getAndSet(-1)
+ : partsRequested.getAndUpdate(v -> Long.MAX_VALUE - v > n
+ ? v + n : v < 0 ? v : Long.MAX_VALUE);
+ if (curr == 0) {
+ drain();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ if (partsRequested.getAndSet(-1) == 0) {
+ drain();
+ }
+ }
+ });
+
deferredInit();
}
@Override
public void onSubscribe(Subscription subscription) {
- SubscriptionHelper.validate(upstream, subscription);
+ if (!halfInit(DOWNSTREAM_INIT)) {
+ SubscriptionHelper.validate(upstream, subscription);
+ return;
+ }
this.upstream = subscription;
deferredInit();
}
@@ -106,89 +142,223 @@ public class MultiPartDecoder implements Processor
int id = parser.offer(byteBuffers[i]);
// record the chunk using the id of the last buffer
if (i == byteBuffers.length - 1) {
+ // drain() cannot be invoked concurrently, it is safe to use HashMap
chunksByIds.put(id, chunk);
}
}
- parser.parse();
+ parserIterator = parser.parseIterator();
+ drain();
} catch (MimeParser.ParsingException ex) {
- emitter.fail(ex);
- chunk.release();
- releaseChunks();
- }
-
- // submit parsed parts
- while (!bodyParts.isEmpty()) {
- if (emitter.isCancelled()) {
- return;
- }
- emitter.emit(bodyParts.poll());
- }
-
- // complete the parts publisher
- if (parserEventProcessor.isCompleted()) {
- emitter.complete();
- // parts are delivered sequentially
- // we potentially drop the last part if not requested
- emitter.clearBuffer(this::drainPart);
- releaseChunks();
- }
-
- // request more data to detect the next part
- // if not in the middle of a part content
- // or if the part content subscriber needs more
- if (upstream != SubscriptionHelper.CANCELED
- && emitter.hasRequests()
- && parserEventProcessor.isDataRequired()
- && (!parserEventProcessor.isContentDataRequired() || bodyPartPublisher.hasRequests())) {
-
- upstream.request(1);
+ drain(ex);
}
}
@Override
public void onError(Throwable throwable) {
Objects.requireNonNull(throwable);
- initFuture.whenComplete((e, t) -> e.fail(throwable));
+ error = throwable;
+ if (upstream != SubscriptionHelper.CANCELED) {
+ upstream = SubscriptionHelper.CANCELED;
+ drain();
+ }
}
@Override
public void onComplete() {
- initFuture.whenComplete((e, t) -> {
- if (upstream != SubscriptionHelper.CANCELED) {
- upstream = SubscriptionHelper.CANCELED;
- try {
- parser.close();
- } catch (MimeParser.ParsingException ex) {
- emitter.fail(ex);
- releaseChunks();
- }
- }
- });
+ if (upstream != SubscriptionHelper.CANCELED) {
+ upstream = SubscriptionHelper.CANCELED;
+ drain();
+ }
+ }
+
+ private boolean halfInit(int mask) {
+ // Attempts to set the given init mask, if contenders is in the right state for that, and
+ // reports whether the contenders was in a state where that part of init needed completing.
+ int c = contenders.getAndUpdate(v -> v < 0 ? v | mask : v);
+ return c < 0 && (c & mask) == 0;
}
private void deferredInit() {
- if (upstream != null && downstream != null) {
- emitter = BufferedEmittingPublisher.create();
- emitter.onRequest(this::onPartRequest);
- emitter.onEmit(this::drainPart);
- //emitter.onCancel(this::onPartCancel);
- emitter.subscribe(downstream);
- initFuture.complete(emitter);
- downstream = null;
+ // deferredInit is invoked twice: onSubscribe and subscribe
+ // after onSubscribe and subscribe three top bits are set
+ // adding SUBSCRIPTION_LOCK for the first time sets the fourth bit
+ // adding SUBSCRIPTION_LOCK for the second time clears all top bits
+ // making the contenders counter 0,
+ // unless there were onError, onComplete, request for parts or downstream cancellation
+ if (contenders.addAndGet(SUBSCRIPTION_LOCK) > 0) {
+ drainLoop();
}
}
- private void onPartRequest(long requested, long total) {
- // require more raw chunks to decode if the decoding has not
- // yet started or if more data is required to make progress
- if (!parserEventProcessor.isStarted() || parserEventProcessor.isDataRequired()) {
- upstream.request(1);
- }
+ private long partsRequested() {
+ // Returns a negative number, if we are serving outer Subscriber, and we can tell it may
+ // not issue more requests for parts: either cancelled, or a bad request was observed.
+ // Otherwise returns a positive number, if serving inner Subscriber, or actual partsRequested.
+ return bodyPartPublisher != null ? 1 : partsRequested.get();
}
- private void onPartCancel() {
- emitter.clearBuffer(this::drainPart);
+ private void cleanup() {
+ // drop the reference to parserIterator, but keep it safe for any later invocation of parserIterator
+ parserIterator = EMPTY_PARSER_ITERATOR;
+ error = null;
+ upstream = SubscriptionHelper.CANCELED;
+ downstream = null; // after cleanup no uses of downstream are reachable
+ cancelled = true; // after cleanup the processor appears as cancelled
+ bodyPartHeaderBuilder = null;
+ bodyPartBuilder = null;
+ partsRequested.set(-1);
releaseChunks();
+ parser.cleanup();
+ }
+
+ /**
+ * Drain the upstream data if the contenders value is positive.
+ */
+ protected void drain() {
+ // We do not serve the next part until the last chunk of the previous part has been consumed
+ // (sent to inner Subscriber).
+ // Signals to outer Subscriber are serialized with the signals to the inner Subscriber.
+
+ // drain() is a loop that retrieves ParserEvents one by one, and transitions to the next state,
+ // unless waiting on the inner or outer subscriber.
+
+ // There are three ways to enter drain():
+ // 1. We are not processing a part and an outer Subscriber has unsatisfied demand for parts
+ // 2. We are processing a part and an inner Subscriber has unsatisfied demand for chunks of a part
+ // 3. Upstream is delivering a DataChunk to satisfy the request from outer or inner Subscriber
+ if (contenders.getAndIncrement() != 0) {
+ return;
+ }
+ drainLoop();
+ }
+
+ /**
+ * Drain the upstream data in a loop while the contenders value is positive.
+ */
+ protected void drainLoop() {
+ for (int c = 1; c > 0; c = contenders.addAndGet(-c)) {
+ drainBoth();
+ }
+ }
+
+ /**
+ * Drain the upstream data and signal the given error.
+ *
+ * @param th the error to signal
+ */
+ protected void drain(Throwable th) {
+ error = th;
+ drain();
+ }
+
+ /**
+ * Drain upstream (raw) data and decoded downstream data.
+ */
+ protected void drainBoth() {
+ if (bodyPartPublisher != null && !bodyPartPublisher.drain()) {
+ return;
+ }
+
+ try {
+ // Proceed to drain parserIterator only if parts or body part chunks were requested
+ // ie. bodyPartPublisher != null && partsRequested > 0
+ // if bodyPartPublisher != null, then we are here when inner Subscriber has unsatisfied demand
+ long requested = partsRequested();
+ while (requested >= 0 && parserIterator.hasNext()) {
+ // It is safe to consume next ParserEvent only the right Subscriber is ready to receive onNext
+ // i.e partsRequested > 0
+ if (requested == 0) {
+ // This means there was an attempt to deliver onError or onComplete from upstream
+ // which are allowed to be issued without request from outer Subscriber.
+ // - partsRequested > 0 for valid requests
+ // - partsRequested < 0 cancellation or invalid request
+ // we wait until demand has been manifested and parserIterator is drained
+ return;
+ }
+
+ MimeParser.ParserEvent event = parserIterator.next();
+ switch (event.type()) {
+ case START_PART:
+ bodyPartHeaderBuilder = ReadableBodyPartHeaders.builder();
+ bodyPartBuilder = ReadableBodyPart.builder();
+ break;
+ case HEADER:
+ MimeParser.HeaderEvent headerEvent = event.asHeaderEvent();
+ bodyPartHeaderBuilder.header(headerEvent.name(), headerEvent.value());
+ break;
+ case END_HEADERS:
+ bodyPartPublisher = new DataChunkPublisher();
+ downstream.onNext(createPart());
+ bodyPartHeaderBuilder = null;
+ bodyPartBuilder = null;
+ // exit the parser iterator loop
+ // the parser events processing will resume upon inner Subscriber demand
+ return;
+ case BODY:
+ Iterator bodyIterator = event.asBodyEvent().body().iterator();
+ bodyPartPublisher.nextIterator(bodyIterator);
+ if (!bodyPartPublisher.drain()) {
+ // the body was not fully drained, exit the parser iterator loop
+ // the parser events processing will resume upon inner Subscriber demand
+ return;
+ }
+ break;
+ case END_PART:
+ bodyPartPublisher.complete(null);
+ bodyPartPublisher = null;
+ requested = partsRequested.updateAndGet(v -> v == Long.MAX_VALUE || v < 0 ? v : v - 1);
+ break;
+ default:
+ }
+ }
+
+ // we allow requested <= 0 to reach here, because we want to allow delivery of termination signals
+ // without requests or cancellations, but ultimately need to make sure we do not request from
+ // upstream, unless actual demand is observed (requested > 0)
+ if (requested < 0) {
+ if (cancelled) {
+ upstream.cancel();
+ cleanup();
+ return;
+ }
+ // now is the right time to convert a bad request into an error
+ // bodyPartPublisher is null, so this error gets delivered only to outer Subscriber
+ error = new IllegalArgumentException("Expecting only positive requests for parts");
+ }
+
+ // ordering the delivery of errors after the delivery of all signals that precede it
+ // in the order of events emitted by the parser
+ if (upstream == SubscriptionHelper.CANCELED || error != null) {
+ if (error != null) {
+ if (bodyPartPublisher != null) {
+ bodyPartPublisher.complete(error);
+ bodyPartPublisher = null;
+ }
+ upstream.cancel();
+ downstream.onError(error);
+ } else {
+ // parser will throw, if we attempt to close it before it finished parsing the message
+ // and onError will be delivered
+ parser.close();
+ downstream.onComplete();
+ }
+ cleanup();
+ return;
+ }
+
+ // parserIterator is drained, drop the reference to it, but keep it safe for any later invocations
+ parserIterator = EMPTY_PARSER_ITERATOR;
+
+ if (requested > 0) {
+ upstream.request(1);
+ }
+
+ } catch (MimeParser.ParsingException ex) {
+ // make sure we do not interact with the parser through the iterator, and do not re-enter
+ // the iterator draining loop
+ parserIterator = EMPTY_PARSER_ITERATOR;
+ drain(ex);
+ }
}
private void releaseChunks() {
@@ -200,28 +370,6 @@ public class MultiPartDecoder implements Processor
}
}
- private void drainPart(ReadableBodyPart part) {
- part.content().subscribe(new Subscriber() {
- @Override
- public void onSubscribe(Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
- }
-
- @Override
- public void onNext(DataChunk item) {
- item.release();
- }
-
- @Override
- public void onError(Throwable throwable) {
- }
-
- @Override
- public void onComplete() {
- }
- });
- }
-
private ReadableBodyPart createPart() {
ReadableBodyPartHeaders headers = bodyPartHeaderBuilder.build();
@@ -247,6 +395,7 @@ public class MultiPartDecoder implements Processor
throw new IllegalStateException("Parent chunk not found, id=" + id);
}
ByteBuffer[] originalBuffers = chunk.data();
+ // FIXME: the current resource management is not implemented properly and needs to be fixed
boolean release = data.limit() == originalBuffers[originalBuffers.length - 1].limit();
if (release) {
chunksByIds.remove(id);
@@ -254,78 +403,142 @@ public class MultiPartDecoder implements Processor
return new BodyPartChunk(data, release ? chunk : null);
}
- private final class ParserEventProcessor implements MimeParser.EventProcessor {
+ /**
+ * Inner publisher that publishes the body part as {@link DataChunk}.
+ */
+ protected final class DataChunkPublisher implements Publisher {
- private MimeParser.ParserEvent lastEvent = null;
+ private final AtomicLong chunksRequested = new AtomicLong(Long.MIN_VALUE + 1);
+ private Iterator bufferEntryIterator = EMPTY_BUFFER_ENTRY_ITERATOR;
+ private boolean cancelled;
+ private Subscriber super DataChunk> subscriber;
@Override
- public void process(MimeParser.ParserEvent event) {
- MimeParser.EventType eventType = event.type();
- switch (eventType) {
- case START_PART:
- bodyPartPublisher = BufferedEmittingPublisher.create();
- bodyPartHeaderBuilder = ReadableBodyPartHeaders.builder();
- bodyPartBuilder = ReadableBodyPart.builder();
- break;
- case HEADER:
- MimeParser.HeaderEvent headerEvent = event.asHeaderEvent();
- bodyPartHeaderBuilder.header(headerEvent.name(), headerEvent.value());
- break;
- case END_HEADERS:
- bodyParts.add(createPart());
- break;
- case CONTENT:
- bodyPartPublisher.emit(createPartChunk(event.asContentEvent().content()));
- break;
- case END_PART:
- bodyPartPublisher.complete();
- bodyPartPublisher = null;
- bodyPartHeaderBuilder = null;
- bodyPartBuilder = null;
- break;
- default:
- // nothing to do
+ public void subscribe(Subscriber super DataChunk> sub) {
+ if (!chunksRequested.compareAndSet(Long.MIN_VALUE + 1, Long.MIN_VALUE)) {
+ Multi.error(new IllegalStateException("Only one Subscriber allowed"))
+ .subscribe(subscriber);
+ return;
}
- lastEvent = event;
+
+ subscriber = sub;
+ sub.onSubscribe(new Subscription() {
+
+ @Override
+ public void request(long n) {
+ // Illegal n makes chunksRequested negative, which interacts with drain() to drain the
+ // entire bufferEntryIterator, and signal onError
+ long curr = n <= 0
+ ? chunksRequested.getAndSet(-1)
+ : chunksRequested.getAndUpdate(v -> Long.MAX_VALUE - v > n
+ ? v + n : v < 0 ? v == Long.MIN_VALUE ? n : v : Long.MAX_VALUE);
+ if (curr == 0) {
+ MultiPartDecoder.this.drain();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ // Ensure the part chunks are drained to make the next part available
+ if (chunksRequested.getAndSet(-1) == 0) {
+ MultiPartDecoder.this.drain();
+ }
+ }
+ });
+
+ if (chunksRequested.compareAndSet(Long.MIN_VALUE, 0)) {
+ return;
+ }
+ MultiPartDecoder.this.drain();
}
/**
- * Indicate if the parser has received any data.
- *
- * @return {@code true} if the parser has been offered data,
- * {@code false} otherwise
+ * Set the next buffer entry iterator.
+ * @param iterator the iterator to set
*/
- boolean isStarted() {
- return lastEvent != null;
+ void nextIterator(Iterator iterator) {
+ // This is invoked only when the previous bufferEntryIterator has been consumed fully,
+ // and chunksRequested > 0, so no one is calling drain() concurrently
+ // chunksRequested is modified atomically, so any future invocation of drain() will observe
+ // bufferEntryIterator normal store (bufferEntryIterator and all of its content is published safely)
+ bufferEntryIterator = iterator;
}
/**
- * Indicate if the parser has reached the end of the message.
+ * Complete the publisher.
*
- * @return {@code true} if completed, {@code false} otherwise
+ * @param th throwable, if not {@code null} signals {@code onError}, otherwise signals {@code onComplete}
*/
- boolean isCompleted() {
- return lastEvent.type() == MimeParser.EventType.END_MESSAGE;
+ void complete(Throwable th) {
+ if (chunksRequested.get() < 0) {
+ if (cancelled) {
+ subscriber = null;
+ return;
+ }
+ th = new IllegalArgumentException("Expecting only positive requests");
+ }
+ cancelled = true;
+ chunksRequested.set(-1);
+
+ // bufferEntryIterator is drained because complete() is invoked only by drain() which proceeds past
+ // state == BODY only when drain() returned true
+ if (th != null) {
+ subscriber.onError(th);
+ } else {
+ subscriber.onComplete();
+ }
+ subscriber = null;
}
/**
- * Indicate if the parser requires more data to make progress.
+ * Drain the current buffer entry iterator according to the current request count.
*
- * @return {@code true} if more data is required, {@code false}
- * otherwise
+ * @return {@code true} if the iterator was fully drained, {@code false} otherwise
*/
- boolean isDataRequired() {
- return lastEvent.type() == MimeParser.EventType.DATA_REQUIRED;
+ boolean drain() {
+ long requested = chunksRequested.get();
+ long chunksEmitted = 0;
+
+ // requested < 0 behaves like cancel, i.e drain bufferEntryIterator
+ while (chunksEmitted < requested && bufferEntryIterator.hasNext()) {
+ do {
+ DataChunk chunk = createPartChunk(bufferEntryIterator.next());
+ subscriber.onNext(chunk);
+ chunk.release();
+ chunksEmitted++;
+ } while (chunksEmitted < requested && bufferEntryIterator.hasNext());
+
+ long ce = chunksEmitted;
+ requested = chunksRequested.updateAndGet(v -> v == Long.MAX_VALUE || v < 0 ? v : v - ce);
+ chunksEmitted = 0;
+ }
+
+ if (requested < 0) {
+ while (bufferEntryIterator.hasNext()) {
+ createPartChunk(bufferEntryIterator.next()).release();
+ }
+ }
+
+ if (requested != 0) {
+ // bufferEntryIterator is drained, drop the reference
+ bufferEntryIterator = EMPTY_BUFFER_ENTRY_ITERATOR;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ private static final class EmptyIterator implements Iterator {
+
+ @Override
+ public boolean hasNext() {
+ return false;
}
- /**
- * Indicate if more content data is required.
- *
- * @return {@code true} if more content data is required, {@code false}
- * otherwise
- */
- boolean isContentDataRequired() {
- return isDataRequired() && lastEvent.asDataRequiredEvent().isContent();
+ @Override
+ public T next() {
+ throw new IllegalStateException("Read beyond EOF");
}
}
}
diff --git a/media/multipart/src/test/java/io/helidon/media/multipart/MimeParserTest.java b/media/multipart/src/test/java/io/helidon/media/multipart/MimeParserTest.java
index d4c0bf9c3..564a9acf6 100644
--- a/media/multipart/src/test/java/io/helidon/media/multipart/MimeParserTest.java
+++ b/media/multipart/src/test/java/io/helidon/media/multipart/MimeParserTest.java
@@ -16,6 +16,7 @@
package io.helidon.media.multipart;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ArrayList;
@@ -382,26 +383,6 @@ public class MimeParserTest {
+ "this-is-the-2nd-slice-of-the-body")));
}
- @Test
- public void testBoundaryAcrossChunksDataRequired() {
- String boundary = "boundary";
- final byte[] chunk1 = ("--" + boundary + "\n"
- + "Content-Id: part1\n"
- + "\n"
- + "this-is-the-body-of-part1\n"
- + "--" + boundary.substring(0, 3)).getBytes();
-
- ParserEventProcessor processor = new ParserEventProcessor();
- MimeParser parser = new MimeParser(boundary, processor);
- parser.offer(ByteBuffer.wrap(chunk1));
- parser.parse();
-
- assertThat(processor.partContent, is(notNullValue()));
- assertThat(new String(processor.partContent), is(equalTo("this-is-the-body-of-")));
- assertThat(processor.lastEvent, is(notNullValue()));
- assertThat(processor.lastEvent.type(), is(equalTo(MimeParser.EventType.DATA_REQUIRED)));
- }
-
@Test
public void testBoundaryAcrossChunks() {
String boundary = "boundary";
@@ -637,11 +618,9 @@ public class MimeParserTest {
@Test
public void testParserClosed() {
try {
- ParserEventProcessor processor = new ParserEventProcessor();
- MimeParser parser = new MimeParser("boundary", processor);
+ MimeParser parser = new MimeParser("boundary");
parser.close();
parser.offer(ByteBuffer.wrap("foo".getBytes()));
- parser.parse();
fail("exception should be thrown");
} catch (MimeParser.ParsingException ex) {
assertThat(ex.getMessage(), is(equalTo("Parser is closed")));
@@ -673,9 +652,9 @@ public class MimeParserTest {
*
* @param boundary boundary string
* @param data for the chunks to parse
- * @return test parser event processor
+ * @return parser result
*/
- static ParserEventProcessor parse(String boundary, byte[] data) {
+ static ParserResult parse(String boundary, byte[] data) {
return parse(boundary, List.of(data));
}
@@ -684,67 +663,78 @@ public class MimeParserTest {
*
* @param boundary boundary string
* @param data for the chunks to parse
- * @return test parser event processor
+ * @return parser result
*/
- static ParserEventProcessor parse(String boundary, List data) {
- ParserEventProcessor processor = new ParserEventProcessor();
- MimeParser parser = new MimeParser(boundary, processor);
- for (byte[] bytes : data) {
- parser.offer(ByteBuffer.wrap(bytes));
- parser.parse();
- }
- parser.close();
- return processor;
- }
-
- /**
- * Test parser event processor.
- */
- static final class ParserEventProcessor implements MimeParser.EventProcessor {
-
+ static ParserResult parse(String boundary, List data) {
+ MimeParser parser = new MimeParser(boundary);
List parts = new LinkedList<>();
Map> partHeaders = new HashMap<>();
byte[] partContent = null;
MimeParser.ParserEvent lastEvent = null;
-
- @Override
- public void process(MimeParser.ParserEvent event) {
- switch (event.type()) {
- case START_PART:
- partHeaders = new HashMap<>();
- partContent = null;
- break;
-
- case HEADER:
- MimeParser.HeaderEvent headerEvent = event.asHeaderEvent();
- String name = headerEvent.name();
- String value = headerEvent.value();
- assertThat(name, notNullValue());
- assertThat(name.length(), not(equalTo(0)));
- assertThat(value, notNullValue());
- List values = partHeaders.get(name);
- if (values == null) {
- values = new ArrayList<>();
- partHeaders.put(name, values);
- }
- values.add(value);
- break;
-
- case CONTENT:
- ByteBuffer content = event.asContentEvent().content().buffer();
- assertThat(content, is(notNullValue()));
- if (partContent == null) {
- partContent = Utils.toByteArray(content);
- } else {
- partContent = concat(partContent, Utils.toByteArray(content));
- }
- break;
-
- case END_PART:
- parts.add(new MimePart(partHeaders, partContent));
- break;
+ for (byte[] bytes : data) {
+ parser.offer(ByteBuffer.wrap(bytes));
+ Iterator it = parser.parseIterator();
+ while(it.hasNext()) {
+ MimeParser.ParserEvent event = it.next();
+ switch (event.type()) {
+ case START_PART:
+ partHeaders = new HashMap<>();
+ partContent = null;
+ break;
+ case HEADER:
+ MimeParser.HeaderEvent headerEvent = event.asHeaderEvent();
+ String name = headerEvent.name();
+ String value = headerEvent.value();
+ assertThat(name, notNullValue());
+ assertThat(name.length(), not(equalTo(0)));
+ assertThat(value, notNullValue());
+ List values = partHeaders.get(name);
+ if (values == null) {
+ values = new ArrayList<>();
+ partHeaders.put(name, values);
+ }
+ values.add(value);
+ break;
+ case BODY:
+ for (VirtualBuffer.BufferEntry content : event.asBodyEvent().body()) {
+ ByteBuffer buffer = content.buffer();
+ assertThat(buffer, is(notNullValue()));
+ if (partContent == null) {
+ partContent = Utils.toByteArray(buffer);
+ } else {
+ partContent = concat(partContent, Utils.toByteArray(buffer));
+ }
+ }
+ break;
+ case END_PART:
+ parts.add(new MimePart(partHeaders, partContent));
+ break;
+ }
+ lastEvent = event;
}
- lastEvent = event;
+ }
+ parser.close();
+ return new ParserResult(parts, partHeaders, partContent, lastEvent);
+ }
+
+ /**
+ * Parser result.
+ */
+ static final class ParserResult {
+
+ final List parts;
+ final Map> partHeaders;
+ final byte[] partContent;
+ final MimeParser.ParserEvent lastEvent;
+
+ ParserResult(List parts,
+ Map> partHeaders,
+ byte[] partContent,
+ MimeParser.ParserEvent lastEvent) {
+ this.parts = parts;
+ this.partHeaders = partHeaders;
+ this.partContent = partContent;
+ this.lastEvent = lastEvent;
}
}
diff --git a/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderSubsWhiteBoxTckTest.java b/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderSubsWhiteBoxTckTest.java
index 3321799f7..c2f22c81b 100644
--- a/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderSubsWhiteBoxTckTest.java
+++ b/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderSubsWhiteBoxTckTest.java
@@ -86,7 +86,7 @@ public class MultiPartDecoderSubsWhiteBoxTckTest extends FlowSubscriberWhiteboxV
}
};
- Multi.create(decoder).forEach(part -> {});
+ Multi.create(decoder).forEach(part -> part.content().forEach(chunk -> {}));
return decoder;
}
diff --git a/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTckTest.java b/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTckTest.java
index 8b61bc8cc..aee272865 100644
--- a/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTckTest.java
+++ b/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTckTest.java
@@ -61,7 +61,10 @@ public class MultiPartDecoderTckTest extends FlowPublisherVerification createFlowPublisher(final long l) {
MultiPartDecoder decoder = MultiPartDecoder.create("boundary", MEDIA_CONTEXT.readerContext());
upstream(l).subscribe(decoder);
- return decoder;
+ return Multi.create(decoder).map(part -> {
+ part.content().forEach(chunk -> {});
+ return part;
+ });
}
@Override
diff --git a/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTest.java b/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTest.java
index bc0cfe171..7e6b4d4d0 100644
--- a/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTest.java
+++ b/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTest.java
@@ -17,11 +17,13 @@ package io.helidon.media.multipart;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -53,6 +55,8 @@ public class MultiPartDecoderTest {
+ "--" + boundary + "--").getBytes();
final CountDownLatch latch = new CountDownLatch(2);
+ final CompletableFuture testDone = new CompletableFuture<>();
+
Consumer consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"),
@@ -60,16 +64,26 @@ public class MultiPartDecoderTest {
DataChunkSubscriber subscriber = new DataChunkSubscriber();
part.content().subscribe(subscriber);
subscriber.content().thenAccept(body -> {
- latch.countDown();
assertThat(body, is(equalTo("body 1")));
+ latch.countDown();
+ if (latch.getCount() == 0) {
+ testDone.complete(null);
+ }
+ }).exceptionally((ex) -> {
+ testDone.completeExceptionally(ex);
+ return null;
});
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(
SUBSCRIBER_TYPE.INFINITE, consumer);
partsPublisher(boundary, chunk1).subscribe(testSubscriber);
- waitOnLatch(latch);
- assertThat(testSubscriber.error, is(nullValue()));
- assertThat(testSubscriber.complete, is(equalTo(true)));
+ testDone.orTimeout(5, TimeUnit.SECONDS).join();
+ try {
+ boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
+ assertThat(b, is(equalTo(true)));
+ } catch(CompletionException error) {
+ assertThat(error, is(nullValue()));
+ }
}
@Test
@@ -108,9 +122,13 @@ public class MultiPartDecoderTest {
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.INFINITE, consumer);
partsPublisher(boundary, chunk1).subscribe(testSubscriber);
+ try {
+ boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
+ assertThat(b, is(equalTo(true)));
+ } catch(CompletionException error) {
+ assertThat(error, is(nullValue()));
+ }
waitOnLatch(latch);
- assertThat(testSubscriber.error, is(nullValue()));
- assertThat(testSubscriber.complete, is(equalTo(true)));
}
@Test
@@ -138,9 +156,53 @@ public class MultiPartDecoderTest {
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.INFINITE, consumer);
partsPublisher(boundary, List.of(chunk1, chunk2)).subscribe(testSubscriber);
+ try {
+ boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
+ assertThat(b, is(equalTo(true)));
+ } catch(CompletionException error) {
+ assertThat(error, is(nullValue()));
+ }
+ waitOnLatch(latch);
+ }
+
+ @Test
+ public void testContentAcrossChunksAsyncRequest() {
+ String boundary = "boundary";
+ final byte[] chunk1 = ("--" + boundary + "\n"
+ + "Content-Id: part1\n"
+ + "\n"
+ + "thi").getBytes();
+ final byte[] chunk11 = ("s-is-the-1st-slice-of-the-body\n").getBytes();
+ final byte[] chunk12 = ("t").getBytes();
+ final byte[] chunk2 = ("his-is-the-2nd-slice-of-the-body\n"
+ + "--" + boundary + "--").getBytes();
+
+ final CountDownLatch latch = new CountDownLatch(2);
+ Consumer consumer = (part) -> {
+ latch.countDown();
+ assertThat(part.headers().values("Content-Id"), hasItems("part1"));
+ DataChunkSubscriber subscriber = new DataChunkSubscriber();
+ part.content().subscribe(subscriber);
+ subscriber.content().thenAccept(body -> {
+ assertThat(body, is(equalTo(
+ "this-is-the-1st-slice-of-the-body\n"
+ + "this-is-the-2nd-slice-of-the-body")));
+ }).exceptionally((ex) -> {
+ System.out.println("UH-OH... " + ex);
+ return null;
+ }).thenAccept((_i) -> {
+ latch.countDown();
+ });
+ };
+ BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.ONE_BY_ONE, consumer);
+ partsPublisher(boundary, List.of(chunk1, chunk11, chunk12, chunk2)).subscribe(testSubscriber);
+ try {
+ boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
+ assertThat(b, is(equalTo(true)));
+ } catch(CompletionException error) {
+ assertThat(error, is(nullValue()));
+ }
waitOnLatch(latch);
- assertThat(testSubscriber.error, is(nullValue()));
- assertThat(testSubscriber.complete, is(equalTo(true)));
}
@Test
@@ -170,9 +232,13 @@ public class MultiPartDecoderTest {
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.INFINITE, consumer);
partsPublisher(boundary, List.of(chunk1, chunk2, chunk3, chunk4, chunk5)).subscribe(testSubscriber);
+ try {
+ boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
+ assertThat(b, is(equalTo(true)));
+ } catch(CompletionException error) {
+ assertThat(error, is(nullValue()));
+ }
waitOnLatch(latch);
- assertThat(testSubscriber.error, is(nullValue()));
- assertThat(testSubscriber.complete, is(equalTo(true)));
}
@Test
@@ -211,9 +277,13 @@ public class MultiPartDecoderTest {
};
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.ONE_BY_ONE, consumer);
partsPublisher(boundary, chunk1).subscribe(testSubscriber);
+ try {
+ boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
+ assertThat(b, is(equalTo(true)));
+ } catch(CompletionException error) {
+ assertThat(error, is(nullValue()));
+ }
waitOnLatch(latch);
- assertThat(testSubscriber.error, is(nullValue()));
- assertThat(testSubscriber.complete, is(equalTo(true)));
}
@Test
@@ -257,12 +327,34 @@ public class MultiPartDecoderTest {
+ "\n"
+ "bar\n").getBytes();
- BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.CANCEL_AFTER_ONE, null);
+ DataChunkSubscriber s1 = new DataChunkSubscriber();
+ BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.ONE_BY_ONE, p -> p.content().subscribe(s1));
partsPublisher(boundary, chunk1).subscribe(testSubscriber);
- assertThat(testSubscriber.complete, is(equalTo(false)));
- assertThat(testSubscriber.error, is(notNullValue()));
- assertThat(testSubscriber.error.getClass(), is(equalTo(MimeParser.ParsingException.class)));
- assertThat(testSubscriber.error.getMessage(), is(equalTo("No closing MIME boundary")));
+ try {
+ s1.future.orTimeout(100, TimeUnit.MILLISECONDS).join();
+ throw new IllegalStateException("Should have terminated exceptionally");
+ } catch(CompletionException e) {
+ Throwable error = e.getCause();
+ assertThat(error.getClass(), is(equalTo(MimeParser.ParsingException.class)));
+ assertThat(error.getMessage(), is(equalTo("No closing MIME boundary")));
+ }
+
+ // CANCEL_AFTER_ONE emits cancel as soon as the first part is arrived.
+ // Once testSubscriber notified it is cancelled, no signals are guaranteed to arrive to it, so one cannot
+ // expect error to be signalled to it.
+ //
+ // One should expect the error to arrive to the inner subscriber, but here it is not set at all - the
+ // inner subscriber does not request the body, so the absence of any content after what has been published
+ // is not guaranteed. (Subscribers are required to eventually either issue a request, or cancel)
+ try {
+ testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
+ throw new IllegalStateException("Not expecting to terminate normally");
+ } catch(CompletionException e) {
+ Throwable error = e.getCause();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getClass(), is(equalTo(MimeParser.ParsingException.class)));
+ assertThat(error.getMessage(), is(equalTo("No closing MIME boundary")));
+ }
}
@Test
@@ -312,8 +404,14 @@ public class MultiPartDecoderTest {
partsPublisher(boundary, List.of(chunk1, chunk2, chunk3, chunk4)).subscribe(testSubscriber);
waitOnLatchNegative(latch, "the 2nd part should not be processed");
assertThat(latch.getCount(), is(equalTo(1L)));
- assertThat(testSubscriber.error, is(nullValue()));
- assertThat(testSubscriber.complete, is(equalTo(false)));
+ try {
+ testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
+ throw new IllegalStateException("Not expecting to make progress, unless the part is consumed");
+ } catch(CompletionException e) {
+ Throwable error = e.getCause();
+ // This is the expected outcome - the testSubcriber is not making progress
+ assertThat(error.getClass(), is(equalTo(TimeoutException.class)));
+ }
}
@Test
@@ -322,9 +420,14 @@ public class MultiPartDecoderTest {
BodyPartSubscriber testSubscriber = new BodyPartSubscriber(SUBSCRIBER_TYPE.INFINITE, null);
decoder.subscribe(testSubscriber);
Multi.error(new IllegalStateException("oops")).subscribe(decoder);
- assertThat(testSubscriber.complete, is(equalTo(false)));
- assertThat(testSubscriber.error, is(notNullValue()));
- assertThat(testSubscriber.error.getMessage(), is(equalTo("oops")));
+ try {
+ testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
+ throw new IllegalStateException("Normal termination is not expected");
+ } catch(CompletionException e) {
+ Throwable error = e.getCause();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getMessage(), is(equalTo("oops")));
+ }
}
@Test
@@ -355,9 +458,9 @@ public class MultiPartDecoderTest {
private final SUBSCRIBER_TYPE subscriberType;
private final Consumer consumer;
- private Subscription subcription;
- private Throwable error;
- private boolean complete;
+ private Subscription subscription;
+ public CompletableFuture complete = new CompletableFuture<>();
+ public CompletableFuture cancelled = new CompletableFuture<>();
BodyPartSubscriber(SUBSCRIBER_TYPE subscriberType, Consumer consumer) {
this.subscriberType = subscriberType;
@@ -366,7 +469,7 @@ public class MultiPartDecoderTest {
@Override
public void onSubscribe(Subscription subscription) {
- this.subcription = subscription;
+ this.subscription = subscription;
if (subscriberType == SUBSCRIBER_TYPE.INFINITE) {
subscription.request(Long.MAX_VALUE);
} else {
@@ -381,20 +484,21 @@ public class MultiPartDecoderTest {
}
consumer.accept(item);
if (subscriberType == SUBSCRIBER_TYPE.ONE_BY_ONE) {
- subcription.request(1);
+ subscription.request(1);
} else if (subscriberType == SUBSCRIBER_TYPE.CANCEL_AFTER_ONE) {
- subcription.cancel();
+ subscription.cancel();
+ cancelled.complete(null);
}
}
@Override
public void onError(Throwable ex) {
- error = ex;
+ complete.completeExceptionally(ex);
}
@Override
public void onComplete() {
- complete = true;
+ complete.complete(true);
}
}
@@ -482,16 +586,26 @@ public class MultiPartDecoderTest {
static class DataChunkSubscriber implements Subscriber {
private final StringBuilder sb = new StringBuilder();
- private final CompletableFuture future = new CompletableFuture<>();
+ public final CompletableFuture future = new CompletableFuture<>();
+ private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
+ this.subscription = subscription;
+ subscription.request(1);
}
@Override
public void onNext(DataChunk item) {
- sb.append(item.bytes());
+ sb.append(new String(item.bytes()));
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ Thread.sleep(10);
+ } catch(Exception e) {
+ }
+ subscription.request(1);
+ return 0;
+ });
}
@Override