diff --git a/docs-internal/datachunkinputstream.md b/docs-internal/datachunkinputstream.md new file mode 100644 index 000000000..18c71ace1 --- /dev/null +++ b/docs-internal/datachunkinputstream.md @@ -0,0 +1,102 @@ +# io.helidon.media.common.DataChunkInputStream + +This document provides additional details about the implementation of `DataChunkInputStream`. + +## Implementation and Proof of Correctness + +The input stream implementation is not thread-safe: concurrent accesses should not be +allowed, and even invocations of `read(...)` should be synchronized by out-of-band means for +any stream state updates to be visible across threads. + +The following assumptions are made about the operation of the stream: + +- `Subscription.request` is invoked only after one chunk has been consumed + +- The number of chunks requested is always 1 + +- Publishers fully conforms to `Flow.Publisher` in the Reactive Streams Specification [I] +with respect to: + - total order of `onNext`/`onComplete`/`onError` + - strictly heeding backpressure (not calling `onNext` until more chunks were requested) + - relaxed ordering of calls to request, allowing class after `onComplete`/`onError` + + Given the assumptions that the number of chunks requested is at most 1, the requests are totally + ordered with `onSubscribe`/`onNext` by construction. This affords the following safety guarantees: + + 1. The only place where `next` is assigned is in `onNext`, before the next chunk is published + + 2. Initially `next` and `current` are identical; first `request(1)` is called on subscription + + 3. All subsequent calls to `request(1)` happen after the publishing of the chunk is observed + by `read(...)` + + 4. It follows from (3) and (1) that one and only one assignment to `next` happens before + observing the chunk by `read(...)` --provided the Publisher observes backpressure + + 5. Such `next` is never lost, because it is copied into `current` before `request(1)`, + therefore a new assignment of `next` in `onNext` never loses the reference to a future + with an unobserved chunk --provided the Publisher observes backpressure + + 6. The publishing of the chunk by `onNext` synchronizes-with the observation of the + chunk by a `read(...)`: (1) and (5) ensure `current` observed by `read(...)` is the same + `next` at the time `onNext` is invoked, so `onNext` completes the same future as accessed + by `read(...)`. Moreover, the store to `next` by `onNext` and load of `next` by + `read(...)` are in happens-before relationship due to this synchronizes-with edge, + the program order in `onNext`, and program order in `read(...)` (and out-of-bands + synchronization between multiple reads) + + A conforming Publisher establishes total order of `onNext`, therefore, a total order of + assignments to `next` and `Future.complete`: + + - `onSubscribe`: assert `current == next` + - `request(1)` + + - `onNext`: assert `current == next` + - `prev = next` + - `next = new Future` (A) + - `prev.complete(chunk)` (B): assert `prev == this.current` + + - `read(...)` + - `current.get()` (C): (C) synchronizes-with (B): any read is blocked until (B) + + - `read(...)` (same or subsequent read) + - `current.get()`: synchronizes-with (B) + - chunk is seen to be consumed entirely: release the chunk, and request next: + - `current = next`: (D): (A) happens-before (D), no further `onNext` intervenes + invariant: `current` never references a released chunk as seen by `close(...)`, + assuming `read(...)` and `close(...)` are totally ordered --either by + program order, or through out-of-bands synchronization + - `request(1)`: assert a conforming Publisher does not invoke onNext before this + + - `onNext`: assert `current == next`: a conforming `Publisher` does not invoke `onNext` before + `request(1)` + - `prev = next` + - `next = new Future` (E) + - `prev.complete(chunk)` (F): assert `prev == current` + + - `read(...)` + - `current.get()`: (G): (G) synchronizes-with (F): any read after (D) is blocked until (F) + + + - `onComplete` / `onError`: assert: `next` has not been completed: stream is either empty + (no `onNext` will ever be called), or an `onNext` assigned a new uncompleted future to `next` + - `next.complete(...)`: (H): assert conforming `Publisher` ensures `next` assignments + by `onNext` are visible here by totally ordering `onNext` / `onComplete` / `onError` + + - `read(...)`: assert eventually `current == next`: either initially, or after some read + that consumed the chunk in its entirety and requested the new chunk + - `current.get()`: (I): (I) synchronizes-with (H) + - signal EOF + + -` close(...)`: + - assert `current` never references a released chunk; it either eventually references a chunk + that has been produced by `onNext` and has not been consumed fully by `read(...)`, or a null + produced by `onComplete` / `onError` + - assert if `next != current`, `next` will never produce a new chunk: this is the case + if and only if `onNext` has occurred, but `read(...)` has not consumed the chunk in its + entirety, hence has not requested any new chunks + - `current.whenComplete(release)` + + ## References + + [I] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#specification diff --git a/media/common/src/main/java/io/helidon/media/common/ContentReaders.java b/media/common/src/main/java/io/helidon/media/common/ContentReaders.java index 187617573..da8053898 100644 --- a/media/common/src/main/java/io/helidon/media/common/ContentReaders.java +++ b/media/common/src/main/java/io/helidon/media/common/ContentReaders.java @@ -122,7 +122,7 @@ public final class ContentReaders { * @return a input stream content reader */ public static Reader inputStreamReader() { - return (publisher, clazz) -> CompletableFuture.completedFuture(new PublisherInputStream(publisher)); + return (publisher, clazz) -> CompletableFuture.completedFuture(new DataChunkInputStream(publisher)); } /** diff --git a/media/common/src/main/java/io/helidon/media/common/DataChunkInputStream.java b/media/common/src/main/java/io/helidon/media/common/DataChunkInputStream.java new file mode 100644 index 000000000..52bcb0161 --- /dev/null +++ b/media/common/src/main/java/io/helidon/media/common/DataChunkInputStream.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.media.common; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +import io.helidon.common.http.DataChunk; + +/** + * Provides a bridge between a reactive {@link Flow.Publisher} in Helidon and an {@link InputStream} + * in Jersey. It subscribes to a Helidon publisher of data chunks and makes the data available to + * Jersey using the blocking {@link InputStream} API. + * + * This implementation is documented here {@code /docs-internal/datachunkinputstream.md}. + */ +public class DataChunkInputStream extends InputStream { + private static final Logger LOGGER = Logger.getLogger(DataChunkInputStream.class.getName()); + + private final Flow.Publisher originalPublisher; + private CompletableFuture current = new CompletableFuture<>(); + private CompletableFuture next = current; + private volatile Flow.Subscription subscription; + private byte[] oneByte; + + /** + * This really doesn't need to be AtomicBoolean - all accesses are not thread-safe anyway, so + * are meant to be single-threaded. This remains AtomicBoolean just in case there still is some + * use-case where the existence of the full memory fence on compareAndSet introduces the + * "out-of-bands synchronization" necessary for total ordering of read(...) and close(...). + */ + private final AtomicBoolean subscribed = new AtomicBoolean(false); + + /** + * Stores publisher for later subscription. + * + * @param originalPublisher The original publisher. + */ + public DataChunkInputStream(Flow.Publisher originalPublisher) { + this.originalPublisher = originalPublisher; + } + + /** + * Releases a data chunk. + * + * @param chunk The chunk. + * @param th A throwable. + */ + private static void releaseChunk(DataChunk chunk, Throwable th) { + if (chunk != null && !chunk.isReleased()) { + LOGGER.finest(() -> "Releasing chunk: " + chunk.id()); + chunk.release(); + } + } + + // -- InputStream --------------------------------------------------------- + // + // Following methods are executed by Jersey/Helidon threads + // ------------------------------------------------------------------------ + + @Override + public void close() { + // Assert: if current != next, next cannot ever be resolved with a chunk that needs releasing + current.whenComplete(DataChunkInputStream::releaseChunk); + current = null; + } + + @Override + public int read() throws IOException { + if (oneByte == null) { + oneByte = new byte[1]; + } + // Assert: Chunks are always non-empty, so r is either 1 or negative (EOF) + int r = read(oneByte, 0, 1); + if (r < 0) { + return r; + } + + return oneByte[0] & 0xFF; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + if (subscribed.compareAndSet(false, true)) { + originalPublisher.subscribe(new DataChunkSubscriber()); // subscribe for first time + } + + if (current == null) { + throw new IOException("The input stream has been closed"); + } + + try { + DataChunk chunk = current.get(); // block until data is available + if (chunk == null) { + return -1; + } + + ByteBuffer currentBuffer = chunk.data(); + + if (currentBuffer.position() == 0) { + LOGGER.finest(() -> "Reading chunk ID: " + chunk.id()); + } + + // If there is anything to read, then read as much as fits into buf + int rem = currentBuffer.remaining(); + if (len > rem) { + len = rem; + } + currentBuffer.get(buf, off, len); + + // Chunk is consumed entirely - release the chunk, and prefetch a new chunk; do not + // wait for it to arrive - the next read may have to wait less. + // + // Assert: it is safe to request new chunks eagerly - there is no mechanism + // to push back unconsumed data, so we can assume we own all the chunks, + // consumed and unconsumed. + if (len == rem) { + releaseChunk(chunk, null); + current = next; + subscription.request(1); + } + + return len; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + + // -- DataChunkSubscriber ------------------------------------------------- + // + // Following methods are executed by Netty IO threads (except first chunk) + // ------------------------------------------------------------------------ + + private class DataChunkSubscriber implements Flow.Subscriber { + + @Override + public void onSubscribe(Flow.Subscription subscription) { + DataChunkInputStream.this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(DataChunk item) { + LOGGER.finest(() -> "Processing chunk: " + item.id()); + CompletableFuture prev = next; + next = new CompletableFuture<>(); + prev.complete(item); + } + + @Override + public void onError(Throwable throwable) { + next.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + next.complete(null); + } + } +} diff --git a/media/common/src/main/java/io/helidon/media/common/InputStreamBodyReader.java b/media/common/src/main/java/io/helidon/media/common/InputStreamBodyReader.java index 088e93e6e..3cdc999cf 100644 --- a/media/common/src/main/java/io/helidon/media/common/InputStreamBodyReader.java +++ b/media/common/src/main/java/io/helidon/media/common/InputStreamBodyReader.java @@ -43,7 +43,7 @@ public class InputStreamBodyReader implements MessageBodyReader { public Single read(Publisher publisher, GenericType type, MessageBodyReaderContext context) { - return (Single) Single.just(new PublisherInputStream(publisher)); + return (Single) Single.just(new DataChunkInputStream(publisher)); } /** diff --git a/media/common/src/main/java/io/helidon/media/common/PublisherInputStream.java b/media/common/src/main/java/io/helidon/media/common/PublisherInputStream.java deleted file mode 100644 index c29e0d01b..000000000 --- a/media/common/src/main/java/io/helidon/media/common/PublisherInputStream.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.media.common; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Logger; - -import io.helidon.common.http.DataChunk; - -/** - * An {@link Flow.Subscriber subscriber} that can subscribe to a source of {@code ByteBuffer} data chunks and then make - * them available for consumption via standard blocking {@link InputStream} API. - */ -public class PublisherInputStream extends InputStream implements Flow.Publisher { - - private static final Logger LOGGER = Logger.getLogger(PublisherInputStream.class.getName()); - - private final Flow.Publisher originalPublisher; - private final AtomicBoolean closed = new AtomicBoolean(false); - private volatile CompletableFuture processed = new CompletableFuture<>(); - private volatile Flow.Subscription subscription; - /** - * Wraps the supplied publisher and adds a blocking {@link InputStream} based nature. - * It is illegal to subscribe to the returned publisher. - * - * @param originalPublisher the original publisher to wrap - */ - public PublisherInputStream(Flow.Publisher originalPublisher) { - this.originalPublisher = originalPublisher; - } - private final AtomicBoolean subscribed = new AtomicBoolean(false); - - private static void releaseChunk(DataChunk chunk) { - if (chunk != null && !chunk.isReleased()) { - LOGGER.finest(() -> "Releasing chunk: " + chunk.id()); - chunk.release(); - } - } - - @Override - public int read() throws IOException { - - if (subscribed.compareAndSet(false, true)) { - // do the subscribe for the first time - subscribe(); - } - - try { - while (true) { - - DataChunk chunk = processed.get(); // block until a processing data are available - ByteBuffer currentBuffer = chunk != null && !chunk.isReleased() ? chunk.data() : null; - - if (currentBuffer != null && currentBuffer.position() == 0) { - LOGGER.finest(() -> "Reading chunk ID: " + chunk.id()); - } - - if (currentBuffer != null && currentBuffer.remaining() > 0) { - // if there is anything to read, then read one byte... - return currentBuffer.get() & 0xFF; - } else if (!closed.get()) { - // reinitialize the processed buffer future and request more data - processed = new CompletableFuture<>(); - - releaseChunk(chunk); - subscription.request(1); - } else { - LOGGER.finest(() -> "Ending stream: " + Optional.ofNullable(chunk).map(DataChunk::id).orElse(null)); - // else we have read all the data already and the data inflow has completed - releaseChunk(chunk); - return -1; - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); - } - } - - @Override - public void subscribe(Flow.Subscriber subscriber) { - subscriber.onError(new UnsupportedOperationException("Subscribing on this publisher is not allowed!")); - } - - private void subscribe() { - originalPublisher.subscribe(new Flow.Subscriber() { - - @Override - public void onSubscribe(Flow.Subscription subscription) { - PublisherInputStream.this.subscription = subscription; - subscription.request(1); - } - - @Override - public void onNext(DataChunk item) { - LOGGER.finest(() -> "Processing chunk: " + item.id()); - processed.complete(item); - } - - @Override - public void onError(Throwable throwable) { - closed.set(true); - if (!processed.completeExceptionally(throwable)) { - // best effort exception propagation - processed = new CompletableFuture<>(); - processed.completeExceptionally(throwable); - } - } - - @Override - public void onComplete() { - closed.set(true); - processed.complete(null); // if not already completed, then complete - } - }); - } -} diff --git a/media/common/src/test/java/io/helidon/media/common/PublisherInputStreamTest.java b/media/common/src/test/java/io/helidon/media/common/DataChunkInputStreamTest.java similarity index 92% rename from media/common/src/test/java/io/helidon/media/common/PublisherInputStreamTest.java rename to media/common/src/test/java/io/helidon/media/common/DataChunkInputStreamTest.java index 1260e8567..e94a1ad05 100644 --- a/media/common/src/test/java/io/helidon/media/common/PublisherInputStreamTest.java +++ b/media/common/src/test/java/io/helidon/media/common/DataChunkInputStreamTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2020 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,16 +28,16 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; /** - * Tests {@link io.helidon.media.common.PublisherInputStream}. + * Tests {@link DataChunkInputStream}. */ -public class PublisherInputStreamTest { +public class DataChunkInputStreamTest { @Test public void chunkWith0xFFValue() { final byte[] bytes = new byte[]{ 0, 1, 2, 3, 4, 5, 6, (byte) 0xFF, 7, 8, 9, 10 }; - InputStream is = new PublisherInputStream( + InputStream is = new DataChunkInputStream( new DataChunkPublisher( new DataChunk[]{DataChunk.create(bytes)})); try {