New implementation of PublisherInputStream that improves performance and fixes race conditions (#1690)

* New implementation of PublisherInputStream, now renamed to SubscriberInputStream as it really implements a subscriber not a publisher. This new implementation fixes a couple of important problems: (1) It implements the ability to read into byte arrays, not just one byte a time for better performance and (2) It changes the implementation to avoid a race condition when accessing the data chunks held in CompletableFuture's. The race condition resulted in certain tests to hang if a thread raced and updated the value of the old 'processed' variable.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Created inner class to implement subscriber and renamed class to prevent confusion with other public APIs. Also renamed and re-formatted internal document describing implementation.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Fixed copyright year.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
This commit is contained in:
Santiago Pericasgeertsen
2020-04-27 14:16:05 -04:00
committed by GitHub
parent c434278951
commit 60abc974f3
6 changed files with 291 additions and 146 deletions

View File

@@ -0,0 +1,102 @@
# io.helidon.media.common.DataChunkInputStream
This document provides additional details about the implementation of `DataChunkInputStream`.
## Implementation and Proof of Correctness
The input stream implementation is not thread-safe: concurrent accesses should not be
allowed, and even invocations of `read(...)` should be synchronized by out-of-band means for
any stream state updates to be visible across threads.
The following assumptions are made about the operation of the stream:
- `Subscription.request` is invoked only after one chunk has been consumed
- The number of chunks requested is always 1
- Publishers fully conforms to `Flow.Publisher` in the Reactive Streams Specification [I]
with respect to:
- total order of `onNext`/`onComplete`/`onError`
- strictly heeding backpressure (not calling `onNext` until more chunks were requested)
- relaxed ordering of calls to request, allowing class after `onComplete`/`onError`
Given the assumptions that the number of chunks requested is at most 1, the requests are totally
ordered with `onSubscribe`/`onNext` by construction. This affords the following safety guarantees:
1. The only place where `next` is assigned is in `onNext`, before the next chunk is published
2. Initially `next` and `current` are identical; first `request(1)` is called on subscription
3. All subsequent calls to `request(1)` happen after the publishing of the chunk is observed
by `read(...)`
4. It follows from (3) and (1) that one and only one assignment to `next` happens before
observing the chunk by `read(...)` --provided the Publisher observes backpressure
5. Such `next` is never lost, because it is copied into `current` before `request(1)`,
therefore a new assignment of `next` in `onNext` never loses the reference to a future
with an unobserved chunk --provided the Publisher observes backpressure
6. The publishing of the chunk by `onNext` synchronizes-with the observation of the
chunk by a `read(...)`: (1) and (5) ensure `current` observed by `read(...)` is the same
`next` at the time `onNext` is invoked, so `onNext` completes the same future as accessed
by `read(...)`. Moreover, the store to `next` by `onNext` and load of `next` by
`read(...)` are in happens-before relationship due to this synchronizes-with edge,
the program order in `onNext`, and program order in `read(...)` (and out-of-bands
synchronization between multiple reads)
A conforming Publisher establishes total order of `onNext`, therefore, a total order of
assignments to `next` and `Future.complete`:
- `onSubscribe`: assert `current == next`
- `request(1)`
- `onNext`: assert `current == next`
- `prev = next`
- `next = new Future` (A)
- `prev.complete(chunk)` (B): assert `prev == this.current`
- `read(...)`
- `current.get()` (C): (C) synchronizes-with (B): any read is blocked until (B)
- `read(...)` (same or subsequent read)
- `current.get()`: synchronizes-with (B)
- chunk is seen to be consumed entirely: release the chunk, and request next:
- `current = next`: (D): (A) happens-before (D), no further `onNext` intervenes
invariant: `current` never references a released chunk as seen by `close(...)`,
assuming `read(...)` and `close(...)` are totally ordered --either by
program order, or through out-of-bands synchronization
- `request(1)`: assert a conforming Publisher does not invoke onNext before this
- `onNext`: assert `current == next`: a conforming `Publisher` does not invoke `onNext` before
`request(1)`
- `prev = next`
- `next = new Future` (E)
- `prev.complete(chunk)` (F): assert `prev == current`
- `read(...)`
- `current.get()`: (G): (G) synchronizes-with (F): any read after (D) is blocked until (F)
- `onComplete` / `onError`: assert: `next` has not been completed: stream is either empty
(no `onNext` will ever be called), or an `onNext` assigned a new uncompleted future to `next`
- `next.complete(...)`: (H): assert conforming `Publisher` ensures `next` assignments
by `onNext` are visible here by totally ordering `onNext` / `onComplete` / `onError`
- `read(...)`: assert eventually `current == next`: either initially, or after some read
that consumed the chunk in its entirety and requested the new chunk
- `current.get()`: (I): (I) synchronizes-with (H)
- signal EOF
-` close(...)`:
- assert `current` never references a released chunk; it either eventually references a chunk
that has been produced by `onNext` and has not been consumed fully by `read(...)`, or a null
produced by `onComplete` / `onError`
- assert if `next != current`, `next` will never produce a new chunk: this is the case
if and only if `onNext` has occurred, but `read(...)` has not consumed the chunk in its
entirety, hence has not requested any new chunks
- `current.whenComplete(release)`
## References
[I] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#specification

View File

@@ -122,7 +122,7 @@ public final class ContentReaders {
* @return a input stream content reader
*/
public static Reader<InputStream> inputStreamReader() {
return (publisher, clazz) -> CompletableFuture.completedFuture(new PublisherInputStream(publisher));
return (publisher, clazz) -> CompletableFuture.completedFuture(new DataChunkInputStream(publisher));
}
/**

View File

@@ -0,0 +1,183 @@
/*
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.media.common;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import io.helidon.common.http.DataChunk;
/**
* Provides a bridge between a reactive {@link Flow.Publisher} in Helidon and an {@link InputStream}
* in Jersey. It subscribes to a Helidon publisher of data chunks and makes the data available to
* Jersey using the blocking {@link InputStream} API.
*
* This implementation is documented here {@code /docs-internal/datachunkinputstream.md}.
*/
public class DataChunkInputStream extends InputStream {
private static final Logger LOGGER = Logger.getLogger(DataChunkInputStream.class.getName());
private final Flow.Publisher<DataChunk> originalPublisher;
private CompletableFuture<DataChunk> current = new CompletableFuture<>();
private CompletableFuture<DataChunk> next = current;
private volatile Flow.Subscription subscription;
private byte[] oneByte;
/**
* This really doesn't need to be AtomicBoolean - all accesses are not thread-safe anyway, so
* are meant to be single-threaded. This remains AtomicBoolean just in case there still is some
* use-case where the existence of the full memory fence on compareAndSet introduces the
* "out-of-bands synchronization" necessary for total ordering of read(...) and close(...).
*/
private final AtomicBoolean subscribed = new AtomicBoolean(false);
/**
* Stores publisher for later subscription.
*
* @param originalPublisher The original publisher.
*/
public DataChunkInputStream(Flow.Publisher<DataChunk> originalPublisher) {
this.originalPublisher = originalPublisher;
}
/**
* Releases a data chunk.
*
* @param chunk The chunk.
* @param th A throwable.
*/
private static void releaseChunk(DataChunk chunk, Throwable th) {
if (chunk != null && !chunk.isReleased()) {
LOGGER.finest(() -> "Releasing chunk: " + chunk.id());
chunk.release();
}
}
// -- InputStream ---------------------------------------------------------
//
// Following methods are executed by Jersey/Helidon threads
// ------------------------------------------------------------------------
@Override
public void close() {
// Assert: if current != next, next cannot ever be resolved with a chunk that needs releasing
current.whenComplete(DataChunkInputStream::releaseChunk);
current = null;
}
@Override
public int read() throws IOException {
if (oneByte == null) {
oneByte = new byte[1];
}
// Assert: Chunks are always non-empty, so r is either 1 or negative (EOF)
int r = read(oneByte, 0, 1);
if (r < 0) {
return r;
}
return oneByte[0] & 0xFF;
}
@Override
public int read(byte[] buf, int off, int len) throws IOException {
if (subscribed.compareAndSet(false, true)) {
originalPublisher.subscribe(new DataChunkSubscriber()); // subscribe for first time
}
if (current == null) {
throw new IOException("The input stream has been closed");
}
try {
DataChunk chunk = current.get(); // block until data is available
if (chunk == null) {
return -1;
}
ByteBuffer currentBuffer = chunk.data();
if (currentBuffer.position() == 0) {
LOGGER.finest(() -> "Reading chunk ID: " + chunk.id());
}
// If there is anything to read, then read as much as fits into buf
int rem = currentBuffer.remaining();
if (len > rem) {
len = rem;
}
currentBuffer.get(buf, off, len);
// Chunk is consumed entirely - release the chunk, and prefetch a new chunk; do not
// wait for it to arrive - the next read may have to wait less.
//
// Assert: it is safe to request new chunks eagerly - there is no mechanism
// to push back unconsumed data, so we can assume we own all the chunks,
// consumed and unconsumed.
if (len == rem) {
releaseChunk(chunk, null);
current = next;
subscription.request(1);
}
return len;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
// -- DataChunkSubscriber -------------------------------------------------
//
// Following methods are executed by Netty IO threads (except first chunk)
// ------------------------------------------------------------------------
private class DataChunkSubscriber implements Flow.Subscriber<DataChunk> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
DataChunkInputStream.this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(DataChunk item) {
LOGGER.finest(() -> "Processing chunk: " + item.id());
CompletableFuture<DataChunk> prev = next;
next = new CompletableFuture<>();
prev.complete(item);
}
@Override
public void onError(Throwable throwable) {
next.completeExceptionally(throwable);
}
@Override
public void onComplete() {
next.complete(null);
}
}
}

View File

@@ -43,7 +43,7 @@ public class InputStreamBodyReader implements MessageBodyReader<InputStream> {
public <U extends InputStream> Single<U> read(Publisher<DataChunk> publisher, GenericType<U> type,
MessageBodyReaderContext context) {
return (Single<U>) Single.just(new PublisherInputStream(publisher));
return (Single<U>) Single.just(new DataChunkInputStream(publisher));
}
/**

View File

@@ -1,140 +0,0 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.media.common;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import io.helidon.common.http.DataChunk;
/**
* An {@link Flow.Subscriber subscriber} that can subscribe to a source of {@code ByteBuffer} data chunks and then make
* them available for consumption via standard blocking {@link InputStream} API.
*/
public class PublisherInputStream extends InputStream implements Flow.Publisher<DataChunk> {
private static final Logger LOGGER = Logger.getLogger(PublisherInputStream.class.getName());
private final Flow.Publisher<DataChunk> originalPublisher;
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile CompletableFuture<DataChunk> processed = new CompletableFuture<>();
private volatile Flow.Subscription subscription;
/**
* Wraps the supplied publisher and adds a blocking {@link InputStream} based nature.
* It is illegal to subscribe to the returned publisher.
*
* @param originalPublisher the original publisher to wrap
*/
public PublisherInputStream(Flow.Publisher<DataChunk> originalPublisher) {
this.originalPublisher = originalPublisher;
}
private final AtomicBoolean subscribed = new AtomicBoolean(false);
private static void releaseChunk(DataChunk chunk) {
if (chunk != null && !chunk.isReleased()) {
LOGGER.finest(() -> "Releasing chunk: " + chunk.id());
chunk.release();
}
}
@Override
public int read() throws IOException {
if (subscribed.compareAndSet(false, true)) {
// do the subscribe for the first time
subscribe();
}
try {
while (true) {
DataChunk chunk = processed.get(); // block until a processing data are available
ByteBuffer currentBuffer = chunk != null && !chunk.isReleased() ? chunk.data() : null;
if (currentBuffer != null && currentBuffer.position() == 0) {
LOGGER.finest(() -> "Reading chunk ID: " + chunk.id());
}
if (currentBuffer != null && currentBuffer.remaining() > 0) {
// if there is anything to read, then read one byte...
return currentBuffer.get() & 0xFF;
} else if (!closed.get()) {
// reinitialize the processed buffer future and request more data
processed = new CompletableFuture<>();
releaseChunk(chunk);
subscription.request(1);
} else {
LOGGER.finest(() -> "Ending stream: " + Optional.ofNullable(chunk).map(DataChunk::id).orElse(null));
// else we have read all the data already and the data inflow has completed
releaseChunk(chunk);
return -1;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
@Override
public void subscribe(Flow.Subscriber<? super DataChunk> subscriber) {
subscriber.onError(new UnsupportedOperationException("Subscribing on this publisher is not allowed!"));
}
private void subscribe() {
originalPublisher.subscribe(new Flow.Subscriber<DataChunk>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
PublisherInputStream.this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(DataChunk item) {
LOGGER.finest(() -> "Processing chunk: " + item.id());
processed.complete(item);
}
@Override
public void onError(Throwable throwable) {
closed.set(true);
if (!processed.completeExceptionally(throwable)) {
// best effort exception propagation
processed = new CompletableFuture<>();
processed.completeExceptionally(throwable);
}
}
@Override
public void onComplete() {
closed.set(true);
processed.complete(null); // if not already completed, then complete
}
});
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,16 +28,16 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/**
* Tests {@link io.helidon.media.common.PublisherInputStream}.
* Tests {@link DataChunkInputStream}.
*/
public class PublisherInputStreamTest {
public class DataChunkInputStreamTest {
@Test
public void chunkWith0xFFValue() {
final byte[] bytes = new byte[]{
0, 1, 2, 3, 4, 5, 6, (byte) 0xFF, 7, 8, 9, 10
};
InputStream is = new PublisherInputStream(
InputStream is = new DataChunkInputStream(
new DataChunkPublisher(
new DataChunk[]{DataChunk.create(bytes)}));
try {