* New implementation of PublisherInputStream, now renamed to DataChunkInputStream. This new implementation fixes a couple of important problems: (1) It implements the ability to read into byte arrays, not just one byte at a time for better performance and (2) It changes the implementation to avoid a race condition when accessing the data chunks held in CompletableFuture's. The race condition resulted in certain tests to hang if a thread raced and updated the value of the old processed variable. There is also a new internal document docs-internal/datachunkinputstream.md that describes the new implementation. Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com> * Fixed copyright header. Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
5.0 KiB
io.helidon.media.common.DataChunkInputStream
This document provides additional details about the implementation of DataChunkInputStream.
Implementation and Proof of Correctness
The input stream implementation is not thread-safe: concurrent accesses should not be
allowed, and even invocations of read(...) should be synchronized by out-of-band means for
any stream state updates to be visible across threads.
The following assumptions are made about the operation of the stream:
-
Subscription.requestis invoked only after one chunk has been consumed -
The number of chunks requested is always 1
-
Publishers fully conforms to
Flow.Publisherin the Reactive Streams Specification [I] with respect to:- total order of
onNext/onComplete/onError - strictly heeding backpressure (not calling
onNextuntil more chunks were requested) - relaxed ordering of calls to request, allowing class after
onComplete/onError
- total order of
Given the assumptions that the number of chunks requested is at most 1, the requests are totally
ordered with onSubscribe/onNext by construction. This affords the following safety guarantees:
-
The only place where
nextis assigned is inonNext, before the next chunk is published -
Initially
nextandcurrentare identical; firstrequest(1)is called on subscription -
All subsequent calls to
request(1)happen after the publishing of the chunk is observed byread(...) -
It follows from (3) and (1) that one and only one assignment to
nexthappens before observing the chunk byread(...)--provided the Publisher observes backpressure -
Such
nextis never lost, because it is copied intocurrentbeforerequest(1), therefore a new assignment ofnextinonNextnever loses the reference to a future with an unobserved chunk --provided the Publisher observes backpressure -
The publishing of the chunk by
onNextsynchronizes-with the observation of the chunk by aread(...): (1) and (5) ensurecurrentobserved byread(...)is the samenextat the timeonNextis invoked, soonNextcompletes the same future as accessed byread(...). Moreover, the store tonextbyonNextand load ofnextbyread(...)are in happens-before relationship due to this synchronizes-with edge, the program order inonNext, and program order inread(...)(and out-of-bands synchronization between multiple reads)
A conforming Publisher establishes total order of onNext, therefore, a total order of
assignments to next and Future.complete:
-
onSubscribe: assertcurrent == nextrequest(1)
-
onNext: assertcurrent == nextprev = nextnext = new Future(A)prev.complete(chunk)(B): assertprev == this.current
-
read(...)current.get()(C): (C) synchronizes-with (B): any read is blocked until (B)
-
read(...)(same or subsequent read)current.get(): synchronizes-with (B)- chunk is seen to be consumed entirely: release the chunk, and request next:
current = next: (D): (A) happens-before (D), no furtheronNextintervenes invariant:currentnever references a released chunk as seen byclose(...), assumingread(...)andclose(...)are totally ordered --either by program order, or through out-of-bands synchronizationrequest(1): assert a conforming Publisher does not invoke onNext before this
-
onNext: assertcurrent == next: a conformingPublisherdoes not invokeonNextbeforerequest(1)prev = nextnext = new Future(E)prev.complete(chunk)(F): assertprev == current
-
read(...)current.get(): (G): (G) synchronizes-with (F): any read after (D) is blocked until (F)
-
onComplete/onError: assert:nexthas not been completed: stream is either empty (noonNextwill ever be called), or anonNextassigned a new uncompleted future tonextnext.complete(...): (H): assert conformingPublisherensuresnextassignments byonNextare visible here by totally orderingonNext/onComplete/onError
-
read(...): assert eventuallycurrent == next: either initially, or after some read that consumed the chunk in its entirety and requested the new chunkcurrent.get(): (I): (I) synchronizes-with (H)- signal EOF
- close(...):
- assert
currentnever references a released chunk; it either eventually references a chunk that has been produced byonNextand has not been consumed fully byread(...), or a null produced byonComplete/onError - assert if
next != current,nextwill never produce a new chunk: this is the case if and only ifonNexthas occurred, butread(...)has not consumed the chunk in its entirety, hence has not requested any new chunks current.whenComplete(release)
References
[I] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#specification