New implementation of PublisherInputStream that improves performance and fixes race conditions (#1695)

* New implementation of PublisherInputStream, now renamed to DataChunkInputStream. This new implementation fixes a couple of important problems: (1) It implements the ability to read into byte arrays, not just one byte at a time for better performance and (2) It changes the implementation to avoid a race condition when accessing the data chunks held in CompletableFuture's. The race condition resulted in certain tests to hang if a thread raced and updated the value of the old processed variable. There is also a new internal document docs-internal/datachunkinputstream.md that describes the new implementation.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Fixed copyright header.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
This commit is contained in:
Santiago Pericasgeertsen
2020-04-28 12:56:48 -04:00
committed by GitHub
parent f5403fc02c
commit 906111bcef
6 changed files with 423 additions and 226 deletions

View File

@@ -0,0 +1,102 @@
# io.helidon.media.common.DataChunkInputStream
This document provides additional details about the implementation of `DataChunkInputStream`.
## Implementation and Proof of Correctness
The input stream implementation is not thread-safe: concurrent accesses should not be
allowed, and even invocations of `read(...)` should be synchronized by out-of-band means for
any stream state updates to be visible across threads.
The following assumptions are made about the operation of the stream:
- `Subscription.request` is invoked only after one chunk has been consumed
- The number of chunks requested is always 1
- Publishers fully conforms to `Flow.Publisher` in the Reactive Streams Specification [I]
with respect to:
- total order of `onNext`/`onComplete`/`onError`
- strictly heeding backpressure (not calling `onNext` until more chunks were requested)
- relaxed ordering of calls to request, allowing class after `onComplete`/`onError`
Given the assumptions that the number of chunks requested is at most 1, the requests are totally
ordered with `onSubscribe`/`onNext` by construction. This affords the following safety guarantees:
1. The only place where `next` is assigned is in `onNext`, before the next chunk is published
2. Initially `next` and `current` are identical; first `request(1)` is called on subscription
3. All subsequent calls to `request(1)` happen after the publishing of the chunk is observed
by `read(...)`
4. It follows from (3) and (1) that one and only one assignment to `next` happens before
observing the chunk by `read(...)` --provided the Publisher observes backpressure
5. Such `next` is never lost, because it is copied into `current` before `request(1)`,
therefore a new assignment of `next` in `onNext` never loses the reference to a future
with an unobserved chunk --provided the Publisher observes backpressure
6. The publishing of the chunk by `onNext` synchronizes-with the observation of the
chunk by a `read(...)`: (1) and (5) ensure `current` observed by `read(...)` is the same
`next` at the time `onNext` is invoked, so `onNext` completes the same future as accessed
by `read(...)`. Moreover, the store to `next` by `onNext` and load of `next` by
`read(...)` are in happens-before relationship due to this synchronizes-with edge,
the program order in `onNext`, and program order in `read(...)` (and out-of-bands
synchronization between multiple reads)
A conforming Publisher establishes total order of `onNext`, therefore, a total order of
assignments to `next` and `Future.complete`:
- `onSubscribe`: assert `current == next`
- `request(1)`
- `onNext`: assert `current == next`
- `prev = next`
- `next = new Future` (A)
- `prev.complete(chunk)` (B): assert `prev == this.current`
- `read(...)`
- `current.get()` (C): (C) synchronizes-with (B): any read is blocked until (B)
- `read(...)` (same or subsequent read)
- `current.get()`: synchronizes-with (B)
- chunk is seen to be consumed entirely: release the chunk, and request next:
- `current = next`: (D): (A) happens-before (D), no further `onNext` intervenes
invariant: `current` never references a released chunk as seen by `close(...)`,
assuming `read(...)` and `close(...)` are totally ordered --either by
program order, or through out-of-bands synchronization
- `request(1)`: assert a conforming Publisher does not invoke onNext before this
- `onNext`: assert `current == next`: a conforming `Publisher` does not invoke `onNext` before
`request(1)`
- `prev = next`
- `next = new Future` (E)
- `prev.complete(chunk)` (F): assert `prev == current`
- `read(...)`
- `current.get()`: (G): (G) synchronizes-with (F): any read after (D) is blocked until (F)
- `onComplete` / `onError`: assert: `next` has not been completed: stream is either empty
(no `onNext` will ever be called), or an `onNext` assigned a new uncompleted future to `next`
- `next.complete(...)`: (H): assert conforming `Publisher` ensures `next` assignments
by `onNext` are visible here by totally ordering `onNext` / `onComplete` / `onError`
- `read(...)`: assert eventually `current == next`: either initially, or after some read
that consumed the chunk in its entirety and requested the new chunk
- `current.get()`: (I): (I) synchronizes-with (H)
- signal EOF
-` close(...)`:
- assert `current` never references a released chunk; it either eventually references a chunk
that has been produced by `onNext` and has not been consumed fully by `read(...)`, or a null
produced by `onComplete` / `onError`
- assert if `next != current`, `next` will never produce a new chunk: this is the case
if and only if `onNext` has occurred, but `read(...)` has not consumed the chunk in its
entirety, hence has not requested any new chunks
- `current.whenComplete(release)`
## References
[I] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#specification

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -122,7 +122,7 @@ public final class ContentReaders {
* @return a input stream content reader
*/
public static Reader<InputStream> inputStreamReader() {
return (publisher, clazz) -> CompletableFuture.completedFuture(new PublisherInputStream(publisher));
return (publisher, clazz) -> CompletableFuture.completedFuture(new DataChunkInputStream(publisher));
}
/**

View File

@@ -0,0 +1,183 @@
/*
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.media.common;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.Flow;
/**
* Provides a bridge between a reactive {@link Flow.Publisher} in Helidon and an {@link InputStream}
* in Jersey. It subscribes to a Helidon publisher of data chunks and makes the data available to
* Jersey using the blocking {@link InputStream} API.
*
* This implementation is documented here {@code /docs-internal/datachunkinputstream.md}.
*/
public class DataChunkInputStream extends InputStream {
private static final Logger LOGGER = Logger.getLogger(DataChunkInputStream.class.getName());
private final Flow.Publisher<DataChunk> originalPublisher;
private CompletableFuture<DataChunk> current = new CompletableFuture<>();
private CompletableFuture<DataChunk> next = current;
private volatile Flow.Subscription subscription;
private byte[] oneByte;
/**
* This really doesn't need to be AtomicBoolean - all accesses are not thread-safe anyway, so
* are meant to be single-threaded. This remains AtomicBoolean just in case there still is some
* use-case where the existence of the full memory fence on compareAndSet introduces the
* "out-of-bands synchronization" necessary for total ordering of read(...) and close(...).
*/
private final AtomicBoolean subscribed = new AtomicBoolean(false);
/**
* Stores publisher for later subscription.
*
* @param originalPublisher The original publisher.
*/
public DataChunkInputStream(Flow.Publisher<DataChunk> originalPublisher) {
this.originalPublisher = originalPublisher;
}
/**
* Releases a data chunk.
*
* @param chunk The chunk.
* @param th A throwable.
*/
private static void releaseChunk(DataChunk chunk, Throwable th) {
if (chunk != null && !chunk.isReleased()) {
LOGGER.finest(() -> "Releasing chunk: " + chunk.id());
chunk.release();
}
}
// -- InputStream ---------------------------------------------------------
//
// Following methods are executed by Jersey/Helidon threads
// ------------------------------------------------------------------------
@Override
public void close() {
// Assert: if current != next, next cannot ever be resolved with a chunk that needs releasing
current.whenComplete(DataChunkInputStream::releaseChunk);
current = null;
}
@Override
public int read() throws IOException {
if (oneByte == null) {
oneByte = new byte[1];
}
// Assert: Chunks are always non-empty, so r is either 1 or negative (EOF)
int r = read(oneByte, 0, 1);
if (r < 0) {
return r;
}
return oneByte[0] & 0xFF;
}
@Override
public int read(byte[] buf, int off, int len) throws IOException {
if (subscribed.compareAndSet(false, true)) {
originalPublisher.subscribe(new DataChunkSubscriber()); // subscribe for first time
}
if (current == null) {
throw new IOException("The input stream has been closed");
}
try {
DataChunk chunk = current.get(); // block until data is available
if (chunk == null) {
return -1;
}
ByteBuffer currentBuffer = chunk.data();
if (currentBuffer.position() == 0) {
LOGGER.finest(() -> "Reading chunk ID: " + chunk.id());
}
// If there is anything to read, then read as much as fits into buf
int rem = currentBuffer.remaining();
if (len > rem) {
len = rem;
}
currentBuffer.get(buf, off, len);
// Chunk is consumed entirely - release the chunk, and prefetch a new chunk; do not
// wait for it to arrive - the next read may have to wait less.
//
// Assert: it is safe to request new chunks eagerly - there is no mechanism
// to push back unconsumed data, so we can assume we own all the chunks,
// consumed and unconsumed.
if (len == rem) {
releaseChunk(chunk, null);
current = next;
subscription.request(1);
}
return len;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
// -- DataChunkSubscriber -------------------------------------------------
//
// Following methods are executed by Netty IO threads (except first chunk)
// ------------------------------------------------------------------------
private class DataChunkSubscriber implements Flow.Subscriber<DataChunk> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
DataChunkInputStream.this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(DataChunk item) {
LOGGER.finest(() -> "Processing chunk: " + item.id());
CompletableFuture<DataChunk> prev = next;
next = new CompletableFuture<>();
prev.complete(item);
}
@Override
public void onError(Throwable throwable) {
next.completeExceptionally(throwable);
}
@Override
public void onComplete() {
next.complete(null);
}
}
}

View File

@@ -1,140 +0,0 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.media.common;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.Flow;
/**
* An {@link Flow.Subscriber subscriber} that can subscribe to a source of {@code ByteBuffer} data chunks and then make
* them available for consumption via standard blocking {@link InputStream} API.
*/
public class PublisherInputStream extends InputStream implements Flow.Publisher<DataChunk> {
private static final Logger LOGGER = Logger.getLogger(PublisherInputStream.class.getName());
private final Flow.Publisher<DataChunk> originalPublisher;
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile CompletableFuture<DataChunk> processed = new CompletableFuture<>();
private volatile Flow.Subscription subscription;
/**
* Wraps the supplied publisher and adds a blocking {@link InputStream} based nature.
* It is illegal to subscribe to the returned publisher.
*
* @param originalPublisher the original publisher to wrap
*/
public PublisherInputStream(Flow.Publisher<DataChunk> originalPublisher) {
this.originalPublisher = originalPublisher;
}
private final AtomicBoolean subscribed = new AtomicBoolean(false);
private static void releaseChunk(DataChunk chunk) {
if (chunk != null && !chunk.isReleased()) {
LOGGER.finest(() -> "Releasing chunk: " + chunk.id());
chunk.release();
}
}
@Override
public int read() throws IOException {
if (subscribed.compareAndSet(false, true)) {
// do the subscribe for the first time
subscribe();
}
try {
while (true) {
DataChunk chunk = processed.get(); // block until a processing data are available
ByteBuffer currentBuffer = chunk != null && !chunk.isReleased() ? chunk.data() : null;
if (currentBuffer != null && currentBuffer.position() == 0) {
LOGGER.finest(() -> "Reading chunk ID: " + chunk.id());
}
if (currentBuffer != null && currentBuffer.remaining() > 0) {
// if there is anything to read, then read one byte...
return currentBuffer.get() & 0xFF;
} else if (!closed.get()) {
// reinitialize the processed buffer future and request more data
processed = new CompletableFuture<>();
releaseChunk(chunk);
subscription.request(1);
} else {
LOGGER.finest(() -> "Ending stream: " + Optional.ofNullable(chunk).map(DataChunk::id).orElse(null));
// else we have read all the data already and the data inflow has completed
releaseChunk(chunk);
return -1;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
@Override
public void subscribe(Flow.Subscriber<? super DataChunk> subscriber) {
subscriber.onError(new UnsupportedOperationException("Subscribing on this publisher is not allowed!"));
}
private void subscribe() {
originalPublisher.subscribe(new Flow.Subscriber<DataChunk>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
PublisherInputStream.this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(DataChunk item) {
LOGGER.finest(() -> "Processing chunk: " + item.id());
processed.complete(item);
}
@Override
public void onError(Throwable throwable) {
closed.set(true);
if (!processed.completeExceptionally(throwable)) {
// best effort exception propagation
processed = new CompletableFuture<>();
processed.completeExceptionally(throwable);
}
}
@Override
public void onComplete() {
closed.set(true);
processed.complete(null); // if not already completed, then complete
}
});
}
}

View File

@@ -0,0 +1,136 @@
/*
* Copyright (c) 2018, 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.media.common;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.Flow;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.OriginThreadPublisher;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Tests {@link DataChunkInputStream}.
*/
public class DataChunkInputStreamTest {
@Test
public void differentThreads() throws Exception {
List<String> test_data = Arrays.asList("test0", "test1", "test2", "test3");
List<String> result = new ArrayList<>();
OriginThreadPublisher<String, String> pub = new OriginThreadPublisher<String, String>() {
};
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<?> submitFuture = executorService.submit(() -> {
for (int i = 0; i < test_data.size(); i++) {
pub.submit(test_data.get(i));
sleep();
}
pub.complete();
});
Future<?> receiveFuture = executorService.submit(() -> {
DataChunkInputStream chunkInputStream = new DataChunkInputStream(Multi.from(pub)
.map(s -> DataChunk.create(s.getBytes())));
byte[] bytes = new byte[test_data.get(0).length()];
for (int i = 0; i < test_data.size(); i++) {
try {
chunkInputStream.read(bytes, 0, bytes.length);
String token = new String(bytes);
System.out.println(">>> " + token);
result.add(token);
} catch (IOException e) {
fail(e);
}
}
});
submitFuture.get(500, TimeUnit.MILLISECONDS);
receiveFuture.get(500, TimeUnit.MILLISECONDS);
assertEquals(test_data, result);
}
@Test
public void chunkWith0xFFValue() {
final byte[] bytes = new byte[]{
0, 1, 2, 3, 4, 5, 6, (byte) 0xFF, 7, 8, 9, 10
};
InputStream is = new DataChunkInputStream(
new DataChunkPublisher(
new DataChunk[]{DataChunk.create(bytes)}));
try {
byte[] readBytes = new byte[bytes.length];
is.read(readBytes);
if (!Arrays.equals(bytes, readBytes)) {
fail("expected: " + Arrays.toString(bytes)
+ ", actual: " + Arrays.toString(readBytes));
}
} catch (IOException ex) {
fail(ex);
}
}
static class DataChunkPublisher implements Flow.Publisher<DataChunk> {
private final DataChunk[] chunks;
private int delivered = 0;
public DataChunkPublisher(DataChunk[] chunks) {
this.chunks = chunks;
}
@Override
public void subscribe(Flow.Subscriber<? super DataChunk> subscriber) {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
for (; n > 0 && delivered < chunks.length; delivered++, n--) {
subscriber.onNext(chunks[delivered]);
}
if (delivered == chunks.length) {
subscriber.onComplete();
}
}
@Override
public void cancel() {
}
});
}
}
static void sleep() {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@@ -1,84 +0,0 @@
/*
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.media.common;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.Flow;
import io.helidon.common.reactive.Flow.Publisher;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/**
* Tests {@link io.helidon.media.common.PublisherInputStream}.
*/
public class PublisherInputStreamTest {
@Test
public void chunkWith0xFFValue() {
final byte[] bytes = new byte[]{
0, 1, 2, 3, 4, 5, 6, (byte) 0xFF, 7, 8, 9, 10
};
InputStream is = new PublisherInputStream(
new DataChunkPublisher(
new DataChunk[]{DataChunk.create(bytes)}));
try {
byte[] readBytes = new byte[bytes.length];
is.read(readBytes);
if (!Arrays.equals(bytes, readBytes)) {
Assertions.fail("expected: " + Arrays.toString(bytes)
+ ", actual: " + Arrays.toString(readBytes));
}
} catch (IOException ex) {
Assertions.fail(ex);
}
}
static class DataChunkPublisher implements Publisher<DataChunk> {
private final DataChunk[] chunks;
private volatile int delivered = 0;
public DataChunkPublisher(DataChunk[] chunks) {
this.chunks = chunks;
}
@Override
public void subscribe(Flow.Subscriber<? super DataChunk> subscriber) {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
if(n > 0){
for(; delivered < n && delivered < chunks.length ; delivered ++){
subscriber.onNext(chunks[delivered]);
}
if(delivered == chunks.length){
subscriber.onComplete();
}
}
}
@Override
public void cancel() {
}
});
}
}
}