diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/Awaitable.java b/common/reactive/src/main/java/io/helidon/common/reactive/Awaitable.java new file mode 100644 index 000000000..b44fc3df2 --- /dev/null +++ b/common/reactive/src/main/java/io/helidon/common/reactive/Awaitable.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.reactive; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Makes intentional blocking when waiting for {@link CompletableFuture} more convenient with {@link Awaitable#await()} + * and {@link Awaitable#await(long, java.util.concurrent.TimeUnit)} methods. + * + * @param payload type + */ +public interface Awaitable { + + /** + * Returns a {@link java.util.concurrent.CompletableFuture} maintaining the same + * completion properties as this stage. If this stage is already a + * CompletableFuture, this method may return this stage itself. + * Otherwise, invocation of this method may be equivalent in + * effect to {@code thenApply(x -> x)}, but returning an instance + * of type {@code CompletableFuture}. + * + * @return the CompletableFuture + */ + CompletableFuture toCompletableFuture(); + + /** + * Block until {@link CompletableFuture} is completed, throws only unchecked exceptions. + * + * @return T payload type + * @throws java.util.concurrent.CancellationException if the computation was cancelled + * @throws java.util.concurrent.CompletionException if this future completed + */ + default T await() { + return this.toCompletableFuture().join(); + } + + /** + * Block until {@link CompletableFuture} is completed, throws only unchecked exceptions. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return the result value + * @throws java.util.concurrent.CancellationException if this future was cancelled + * @throws java.util.concurrent.CompletionException if this future completed exceptionally, + * was interrupted while waiting or the wait timed out + */ + default T await(long timeout, TimeUnit unit) { + try { + return this.toCompletableFuture().get(timeout, unit); + } catch (ExecutionException e) { + throw new CompletionException(e.getCause()); + } catch (InterruptedException | TimeoutException e) { + throw new CompletionException(e); + } + } +} diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/CompletionAwaitable.java b/common/reactive/src/main/java/io/helidon/common/reactive/CompletionAwaitable.java new file mode 100644 index 000000000..7a7cb7a55 --- /dev/null +++ b/common/reactive/src/main/java/io/helidon/common/reactive/CompletionAwaitable.java @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.reactive; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * {@link CompletionStage} wrapper enriched with {@link io.helidon.common.reactive.Awaitable}. + * + * @param payload type + */ +public class CompletionAwaitable implements CompletionStage, Awaitable { + + private Supplier> originalStage; + + CompletionAwaitable(Supplier> originalStage) { + this.originalStage = originalStage; + } + + CompletionAwaitable() { + } + + void setOriginalStage(final Supplier> originalStage) { + this.originalStage = originalStage; + } + + @Override + public CompletionAwaitable thenApply(final Function fn) { + CompletionStage completionStage = originalStage.get().thenApply(fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenApplyAsync(final Function fn) { + CompletionStage completionStage = originalStage.get().thenApplyAsync(fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenApplyAsync(final Function fn, final Executor executor) { + CompletionStage completionStage = originalStage.get().thenApplyAsync(fn, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenAccept(final Consumer action) { + CompletionStage completionStage = originalStage.get().thenAccept(action); + return new CompletionAwaitable(() -> completionStage); + + } + + @Override + public CompletionAwaitable thenAcceptAsync(final Consumer action) { + CompletionStage completionStage = originalStage.get().thenAcceptAsync(action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenAcceptAsync(final Consumer action, final Executor executor) { + CompletionStage completionStage = originalStage.get().thenAcceptAsync(action, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenRun(final Runnable action) { + CompletionStage completionStage = originalStage.get().thenRun(action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenRunAsync(final Runnable action) { + CompletionStage completionStage = originalStage.get().thenRunAsync(action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenRunAsync(final Runnable action, final Executor executor) { + CompletionStage completionStage = originalStage.get().thenRunAsync(action, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenCombine(final CompletionStage other, + final BiFunction fn) { + CompletionStage completionStage = originalStage.get().thenCombine(other, fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenCombineAsync(final CompletionStage other, + final BiFunction fn) { + CompletionStage completionStage = originalStage.get().thenCombineAsync(other, fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenCombineAsync(final CompletionStage other, + final BiFunction fn, + final Executor executor) { + CompletionStage completionStage = originalStage.get().thenCombineAsync(other, fn, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenAcceptBoth(final CompletionStage other, + final BiConsumer action) { + CompletionStage completionStage = originalStage.get().thenAcceptBoth(other, action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenAcceptBothAsync(final CompletionStage other, + final BiConsumer action) { + CompletionStage completionStage = originalStage.get().thenAcceptBothAsync(other, action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenAcceptBothAsync(final CompletionStage other, + final BiConsumer action, + final Executor executor) { + CompletionStage completionStage = originalStage.get().thenAcceptBothAsync(other, action, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable runAfterBoth(final CompletionStage other, + final Runnable action) { + CompletionStage completionStage = originalStage.get().runAfterBoth(other, action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable runAfterBothAsync(final CompletionStage other, + final Runnable action) { + CompletionStage completionStage = originalStage.get().runAfterBothAsync(other, action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable runAfterBothAsync(final CompletionStage other, + final Runnable action, + final Executor executor) { + CompletionStage completionStage = originalStage.get().runAfterBothAsync(other, action, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable applyToEither(final CompletionStage other, + final Function fn) { + CompletionStage completionStage = originalStage.get().applyToEither(other, fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable applyToEitherAsync(final CompletionStage other, + final Function fn) { + CompletionStage completionStage = originalStage.get().applyToEitherAsync(other, fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable applyToEitherAsync(final CompletionStage other, + final Function fn, + final Executor executor) { + CompletionStage completionStage = originalStage.get().applyToEitherAsync(other, fn, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable acceptEither(final CompletionStage other, + final Consumer action) { + CompletionStage completionStage = originalStage.get().acceptEither(other, action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable acceptEitherAsync(final CompletionStage other, + final Consumer action) { + CompletionStage completionStage = originalStage.get().acceptEitherAsync(other, action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable acceptEitherAsync(final CompletionStage other, + final Consumer action, + final Executor executor) { + CompletionStage completionStage = originalStage.get().acceptEitherAsync(other, action, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable runAfterEither(final CompletionStage other, + final Runnable action) { + CompletionStage completionStage = originalStage.get().runAfterEither(other, action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable runAfterEitherAsync(final CompletionStage other, + final Runnable action) { + CompletionStage completionStage = originalStage.get().runAfterEitherAsync(other, action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable runAfterEitherAsync(final CompletionStage other, + final Runnable action, final Executor executor) { + CompletionStage completionStage = originalStage.get().runAfterEitherAsync(other, action, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenCompose(final Function> fn) { + CompletionStage completionStage = originalStage.get().thenCompose(fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenComposeAsync(final Function> fn) { + CompletionStage completionStage = originalStage.get().thenComposeAsync(fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable thenComposeAsync(final Function> fn, + final Executor executor) { + CompletionStage completionStage = originalStage.get().thenComposeAsync(fn, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable handle(final BiFunction fn) { + CompletionStage completionStage = originalStage.get().handle(fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable handleAsync(final BiFunction fn) { + CompletionStage completionStage = originalStage.get().handleAsync(fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable handleAsync(final BiFunction fn, + final Executor executor) { + CompletionStage completionStage = originalStage.get().handleAsync(fn, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable whenComplete(final BiConsumer action) { + CompletionStage completionStage = originalStage.get().whenComplete(action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable whenCompleteAsync(final BiConsumer action) { + CompletionStage completionStage = originalStage.get().whenCompleteAsync(action); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable whenCompleteAsync(final BiConsumer action, + final Executor executor) { + CompletionStage completionStage = originalStage.get().whenCompleteAsync(action, executor); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletionAwaitable exceptionally(final Function fn) { + CompletionStage completionStage = originalStage.get().exceptionally(fn); + return new CompletionAwaitable(() -> completionStage); + } + + @Override + public CompletableFuture toCompletableFuture() { + return originalStage.get().toCompletableFuture(); + } +} diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/CompletionSingle.java b/common/reactive/src/main/java/io/helidon/common/reactive/CompletionSingle.java new file mode 100644 index 000000000..c07f1574f --- /dev/null +++ b/common/reactive/src/main/java/io/helidon/common/reactive/CompletionSingle.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.common.reactive; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Single as CompletionStage. + * + * @param payload type + */ +public abstract class CompletionSingle extends CompletionAwaitable implements Single { + + private final AtomicReference> stageReference = new AtomicReference<>(); + private final CompletableFuture cancelFuture = new CompletableFuture<>(); + + protected CompletionSingle() { + setOriginalStage(this::getLazyStage); + } + + private CompletableFuture getLazyStage() { + stageReference.compareAndSet(null, this.toNullableStage()); + return stageReference.get(); + } + + private CompletableFuture toNullableStage() { + SingleToFuture subscriber = new SingleToFuture<>(true); + this.subscribe(subscriber); + return subscriber; + } + + @Override + public Single onCancel(final Runnable onCancel) { + cancelFuture.thenRun(onCancel); + return this; + } + + @Override + public Single cancel() { + Single single = Single.super.cancel(); + this.cancelFuture.complete(null); + return single; + } + +} diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java b/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java index 28e0b92e3..6d7b96af5 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.Flow; @@ -152,6 +153,18 @@ public interface Multi extends Subscribable { return new MultiFromPublisher<>(source); } + /** + * Create a {@link Multi} instance wrapped around the given {@link Single}. + * + * @param item type + * @param single source {@link Single} publisher + * @return Multi + * @throws NullPointerException if source is {@code null} + */ + static Multi from(Single single) { + return from((Publisher) single); + } + /** * Create a {@link Multi} instance that publishes the given {@link Stream}. *

@@ -842,10 +855,23 @@ public interface Multi extends Subscribable { * Terminal stage, invokes provided consumer for every item in the stream. * * @param consumer consumer to be invoked for each item + * @return Single completed when the stream terminates */ - default void forEach(Consumer consumer) { - FunctionalSubscriber subscriber = new FunctionalSubscriber<>(consumer, null, null, null); + default Single forEach(Consumer consumer) { + CompletableFuture future = new CompletableFuture<>(); + Single single = Single.from(future, true); + FunctionalSubscriber subscriber = new FunctionalSubscriber<>(consumer, + future::completeExceptionally, + () -> future.complete(null), + subscription -> { + subscription.request(Long.MAX_VALUE); + single.onCancel(subscription::cancel); + } + ); + this.subscribe(subscriber); + + return single; } } diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiCollectPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiCollectPublisher.java index abc7ff0c4..c1a628e89 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiCollectPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiCollectPublisher.java @@ -26,7 +26,7 @@ import java.util.function.Supplier; * @param the element type of the upstream * @param the collection type */ -final class MultiCollectPublisher implements Single { +final class MultiCollectPublisher extends CompletionSingle { private final Multi source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiCollectorPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiCollectorPublisher.java index 9598dede3..7bcbd4230 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiCollectorPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiCollectorPublisher.java @@ -35,7 +35,7 @@ import java.util.stream.Collector; * @param the collection type * @param the result type */ -final class MultiCollectorPublisher implements Single { +final class MultiCollectorPublisher extends CompletionSingle { private final Multi source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiFirstPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFirstPublisher.java index 47ca69a91..4940af069 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiFirstPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFirstPublisher.java @@ -17,7 +17,7 @@ package io.helidon.common.reactive; import java.util.concurrent.Flow; -final class MultiFirstPublisher implements Single { +final class MultiFirstPublisher extends CompletionSingle { private final Multi source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduce.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduce.java index 487be271d..564236c27 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduce.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduce.java @@ -26,7 +26,7 @@ import java.util.function.BiFunction; * the result as a Single. * @param the element type of the source and result */ -final class MultiReduce implements Single { +final class MultiReduce extends CompletionSingle { private final Multi source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduceFull.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduceFull.java index 04c94eaae..a2138eb45 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduceFull.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduceFull.java @@ -27,7 +27,7 @@ import java.util.function.Supplier; * @param the source value type * @param the accumulator and result type */ -final class MultiReduceFull implements Single { +final class MultiReduceFull extends CompletionSingle { private final Multi source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/Single.java b/common/reactive/src/main/java/io/helidon/common/reactive/Single.java index 92a77f86a..a6d5494bb 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/Single.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/Single.java @@ -27,6 +27,7 @@ import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.BiPredicate; import java.util.function.Consumer; @@ -42,7 +43,7 @@ import io.helidon.common.mapper.Mapper; * @param item type * @see Multi */ -public interface Single extends Subscribable { +public interface Single extends Subscribable, CompletionStage, Awaitable { // -------------------------------------------------------------------------------------------------------- // Factory (source-like) methods @@ -130,6 +131,18 @@ public interface Single extends Subscribable { return new SingleFromPublisher<>(source); } + /** + * Create a {@link Single} instance that publishes the first and only item received from the given {@link Single}. + * + * @param item type + * @param single source {@link Single} publisher + * @return Single + * @throws NullPointerException if source is {@code null} + */ + static Single from(Single single) { + return from((Publisher) single); + } + /** * Create a {@link Single} instance that publishes the given item to its subscriber(s). * @@ -541,7 +554,7 @@ public interface Single extends Subscribable { */ default CompletionStage toStage() { try { - SingleToFuture subscriber = new SingleToFuture<>(); + SingleToFuture subscriber = new SingleToFuture<>(false); this.subscribe(subscriber); return subscriber; } catch (Throwable ex) { @@ -550,4 +563,139 @@ public interface Single extends Subscribable { return future; } } + + /** + * Cancel upstream. + * + * @return new {@link Single} for eventually received single value. + */ + default Single cancel() { + CompletableFuture future = new CompletableFuture<>(); + FunctionalSubscriber subscriber = new FunctionalSubscriber<>(future::complete, + future::completeExceptionally, + () -> future.complete(null), + Flow.Subscription::cancel + ); + this.subscribe(subscriber); + return Single.from(future); + } + + @Override + CompletionAwaitable thenApply(Function fn); + + @Override + CompletionAwaitable thenApplyAsync(Function fn); + + @Override + CompletionAwaitable thenApplyAsync(Function fn, Executor executor); + + @Override + CompletionAwaitable thenAccept(Consumer action); + + @Override + CompletionAwaitable thenAcceptAsync(Consumer action); + + @Override + CompletionAwaitable thenAcceptAsync(Consumer action, Executor executor); + + @Override + CompletionAwaitable thenRun(Runnable action); + + @Override + CompletionAwaitable thenRunAsync(Runnable action); + + @Override + CompletionAwaitable thenRunAsync(Runnable action, Executor executor); + + @Override + CompletionAwaitable thenCombine(CompletionStage other, + BiFunction fn); + + @Override + CompletionAwaitable thenCombineAsync(CompletionStage other, + BiFunction fn); + + @Override + CompletionAwaitable thenCombineAsync(CompletionStage other, + BiFunction fn, Executor executor); + + @Override + CompletionAwaitable thenAcceptBoth(CompletionStage other, + BiConsumer action); + + @Override + CompletionAwaitable thenAcceptBothAsync(CompletionStage other, + BiConsumer action); + + @Override + CompletionAwaitable thenAcceptBothAsync(CompletionStage other, + BiConsumer action, Executor executor); + + @Override + CompletionAwaitable runAfterBoth(CompletionStage other, Runnable action); + + @Override + CompletionAwaitable runAfterBothAsync(CompletionStage other, Runnable action); + + @Override + CompletionAwaitable runAfterBothAsync(CompletionStage other, Runnable action, Executor executor); + + @Override + CompletionAwaitable applyToEither(CompletionStage other, Function fn); + + @Override + CompletionAwaitable applyToEitherAsync(CompletionStage other, Function fn); + + @Override + CompletionAwaitable applyToEitherAsync(CompletionStage other, Function fn, + Executor executor); + + @Override + CompletionAwaitable acceptEither(CompletionStage other, Consumer action); + + @Override + CompletionAwaitable acceptEitherAsync(CompletionStage other, Consumer action); + + @Override + CompletionAwaitable acceptEitherAsync(CompletionStage other, Consumer action, + Executor executor); + + @Override + CompletionAwaitable runAfterEither(CompletionStage other, Runnable action); + + @Override + CompletionAwaitable runAfterEitherAsync(CompletionStage other, Runnable action); + + @Override + CompletionAwaitable runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor); + + @Override + CompletionAwaitable thenCompose(Function> fn); + + @Override + CompletionAwaitable thenComposeAsync(Function> fn); + + @Override + CompletionAwaitable thenComposeAsync(Function> fn, Executor executor); + + @Override + CompletionAwaitable handle(BiFunction fn); + + @Override + CompletionAwaitable handleAsync(BiFunction fn); + + @Override + CompletionAwaitable handleAsync(BiFunction fn, Executor executor); + + @Override + CompletionAwaitable whenComplete(BiConsumer action); + + @Override + CompletionAwaitable whenCompleteAsync(BiConsumer action); + + @Override + CompletionAwaitable whenCompleteAsync(BiConsumer action, Executor executor); + + @Override + CompletionAwaitable exceptionally(Function fn); } diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleDefaultIfEmpty.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleDefaultIfEmpty.java index fb1383a78..162eaede2 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleDefaultIfEmpty.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleDefaultIfEmpty.java @@ -23,7 +23,7 @@ import java.util.concurrent.Flow; * Signal an item if the source is empty. * @param the element type */ -final class SingleDefaultIfEmpty implements Single { +final class SingleDefaultIfEmpty extends CompletionSingle { private final Single source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleDefer.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleDefer.java index 8eef2b77d..20c0b684d 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleDefer.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleDefer.java @@ -25,7 +25,7 @@ import java.util.function.Supplier; * Create a Single for each incoming subscriber via a supplier callback. * @param the element type of the sequence */ -final class SingleDefer implements Single { +final class SingleDefer extends CompletionSingle { private final Supplier> supplier; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleEmpty.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleEmpty.java index a083ab2a6..0b3e32929 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleEmpty.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleEmpty.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ import java.util.concurrent.Flow; * invoking {@link java.util.concurrent.Flow.Subscriber#onComplete() } during * {@link java.util.concurrent.Flow.Publisher#subscribe(java.util.concurrent.Flow.Subscriber)}. */ -final class SingleEmpty implements Single { +final class SingleEmpty extends CompletionSingle { /** * Singleton instance. diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleError.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleError.java index 5ea8509c8..9200c59fc 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleError.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleError.java @@ -26,7 +26,7 @@ import java.util.concurrent.Flow.Subscriber; * * @param item type */ -final class SingleError implements Single { +final class SingleError extends CompletionSingle { private final Throwable error; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleFlatMapSingle.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleFlatMapSingle.java index e1ed9b96e..2cd47659d 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleFlatMapSingle.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleFlatMapSingle.java @@ -27,7 +27,7 @@ import java.util.function.Function; * @param the upstream value type * @param the result value type */ -final class SingleFlatMapSingle implements Single { +final class SingleFlatMapSingle extends CompletionSingle { private final Single source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromCompletionStage.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromCompletionStage.java index ae1162ee5..aa143be07 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromCompletionStage.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromCompletionStage.java @@ -22,7 +22,7 @@ import java.util.concurrent.Flow; * Signal the outcome of the give CompletionStage. * @param the element type of the source and result */ -final class SingleFromCompletionStage implements Single { +final class SingleFromCompletionStage extends CompletionSingle { private final CompletionStage source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromPublisher.java index fd1f4f382..31c002328 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromPublisher.java @@ -23,7 +23,7 @@ import java.util.concurrent.Flow; * an error otherwise. * @param the element type of the flow */ -final class SingleFromPublisher implements Single { +final class SingleFromPublisher extends CompletionSingle { private final Flow.Publisher source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleJust.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleJust.java index 601095fb3..08e70f3b9 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleJust.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleJust.java @@ -23,7 +23,7 @@ import java.util.concurrent.Flow.Subscriber; * * @param item type */ -final class SingleJust implements Single { +final class SingleJust extends CompletionSingle { private final T value; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleMapperPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleMapperPublisher.java index 67789b3ac..236951919 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleMapperPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleMapperPublisher.java @@ -25,7 +25,7 @@ import io.helidon.common.mapper.Mapper; * @param the upstream value type * @param the result value type */ -final class SingleMapperPublisher implements Single { +final class SingleMapperPublisher extends CompletionSingle { private final Flow.Publisher source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleNever.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleNever.java index 20127150e..ccf305605 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleNever.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleNever.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ import java.util.concurrent.Flow.Subscriber; * {@link Subscriber#onComplete()} or * {@link Subscriber#onError(java.lang.Throwable)}. */ -final class SingleNever implements Single { +final class SingleNever extends CompletionSingle { /** * Singleton instance. diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleObserveOn.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleObserveOn.java index 4a1c90110..c9bc8ea6c 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleObserveOn.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleObserveOn.java @@ -23,7 +23,7 @@ import java.util.concurrent.Flow; * Re-emit the item or terminal signals on the given executor's thread. * @param the element type */ -final class SingleObserveOn implements Single { +final class SingleObserveOn extends CompletionSingle { private final Single source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResume.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResume.java index 9c7279df8..9e2b6d00f 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResume.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResume.java @@ -24,7 +24,7 @@ import java.util.function.Function; * If the upstream fails, generate a fallback success item via a function. * @param the element type of the sequence */ -final class SingleOnErrorResume implements Single { +final class SingleOnErrorResume extends CompletionSingle { private final Single source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResumeWith.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResumeWith.java index 65d300a0d..63d292665 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResumeWith.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResumeWith.java @@ -25,7 +25,7 @@ import java.util.function.Function; * If the upstream fails, generate a fallback Single and emit its signals. * @param the element type of the source and fallback */ -final class SingleOnErrorResumeWith implements Single { +final class SingleOnErrorResumeWith extends CompletionSingle { private final Single source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleRetry.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleRetry.java index 39d615a9e..96e2ff632 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleRetry.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleRetry.java @@ -29,7 +29,7 @@ import java.util.function.BiPredicate; * @param the element type of the source * @param the signal type of the publisher indicating when to retry */ -final class SingleRetry implements Single { +final class SingleRetry extends CompletionSingle { private final Single source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleSwitchIfEmpty.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleSwitchIfEmpty.java index 80abd94a3..940b930e0 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleSwitchIfEmpty.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleSwitchIfEmpty.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; * Switch to another Single if the main is empty. * @param the element type */ -final class SingleSwitchIfEmpty implements Single { +final class SingleSwitchIfEmpty extends CompletionSingle { private final Single source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleTakeUntilPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleTakeUntilPublisher.java index 2cc970977..dfe537f69 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleTakeUntilPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleTakeUntilPublisher.java @@ -25,7 +25,7 @@ import java.util.concurrent.Flow; * @param the upstream and output value type * @param the other sequence indicating when the main sequence should stop */ -final class SingleTakeUntilPublisher implements Single { +final class SingleTakeUntilPublisher extends CompletionSingle { private final Single source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleTappedPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleTappedPublisher.java index 96e19e284..e097b181b 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleTappedPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleTappedPublisher.java @@ -31,7 +31,7 @@ import io.helidon.common.reactive.MultiTappedPublisher.RunnableChain; * user callbacks. * @param the element type of the sequence */ -final class SingleTappedPublisher implements Single { +final class SingleTappedPublisher extends CompletionSingle { private final Single source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleTimeout.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleTimeout.java index 6ba99ec50..afab5d7e4 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleTimeout.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleTimeout.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; * or completes within a specified time window. * @param the element type */ -final class SingleTimeout implements Single { +final class SingleTimeout extends CompletionSingle { private final Single source; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleTimer.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleTimer.java index de01bfbe1..4a6702fcc 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleTimer.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleTimer.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; /** * Signal 0L and complete after the specified time. */ -final class SingleTimer implements Single { +final class SingleTimer extends CompletionSingle { private final long time; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleToFuture.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleToFuture.java index ab6de5397..1fc02c929 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleToFuture.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleToFuture.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,11 @@ import java.util.concurrent.atomic.AtomicReference; final class SingleToFuture extends CompletableFuture implements Subscriber { private final AtomicReference ref = new AtomicReference<>(); + private final boolean completeWithoutValue; + + SingleToFuture(boolean completeWithoutValue) { + this.completeWithoutValue = completeWithoutValue; + } @Override public boolean cancel(boolean mayInterruptIfRunning) { @@ -71,7 +76,11 @@ final class SingleToFuture extends CompletableFuture implements Subscriber @Override public void onComplete() { if (ref.getAndSet(null) != null) { - super.completeExceptionally(new IllegalStateException("Completed without value")); + if (completeWithoutValue) { + super.complete(null); + } else { + super.completeExceptionally(new IllegalStateException("Completed without value")); + } } } diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/AwaitTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/AwaitTest.java new file mode 100644 index 000000000..c3c2b2e25 --- /dev/null +++ b/common/reactive/src/test/java/io/helidon/common/reactive/AwaitTest.java @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.common.reactive; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.testng.Assert.assertThrows; + +import org.junit.jupiter.api.Test; + +public class AwaitTest { + + private static final long EXPECTED_SUM = 10L; + private static final long SAFE_WAIT_MILLIS = 200L; + + @Test + void forEachAwait() { + AtomicLong sum = new AtomicLong(); + testMulti() + .forEach(sum::addAndGet) + .await(); + assertThat(sum.get(), equalTo(EXPECTED_SUM)); + } + + @Test + void forEachAwaitChain() { + AtomicLong sum = new AtomicLong(); + AtomicLong completedTimes = new AtomicLong(); + testMulti() + .forEach(sum::addAndGet) + .whenComplete((aVoid, throwable) -> completedTimes.incrementAndGet()) + .whenComplete((aVoid, throwable) -> completedTimes.incrementAndGet()) + .whenComplete((aVoid, throwable) -> completedTimes.incrementAndGet()) + .thenRun(completedTimes::incrementAndGet) + .thenAccept(aVoid -> completedTimes.incrementAndGet()) + .await(); + assertThat(sum.get(), equalTo(EXPECTED_SUM)); + assertThat(completedTimes.get(), is(5L)); + } + + @Test + void forEachWhenComplete() throws InterruptedException, ExecutionException, TimeoutException { + AtomicLong sum = new AtomicLong(); + CompletableFuture completeFuture = new CompletableFuture<>(); + testMulti() + .forEach(sum::addAndGet) + + .whenComplete((aVoid, throwable) -> Optional.ofNullable(throwable) + .ifPresentOrElse(completeFuture::completeExceptionally, () -> completeFuture.complete(null))); + completeFuture.get(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS); + assertThat(sum.get(), equalTo(EXPECTED_SUM)); + } + + @Test + void forEachAwaitTimeout() { + AtomicLong sum = new AtomicLong(); + testMulti() + .forEach(sum::addAndGet) + .await(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS); + assertThat(sum.get(), equalTo(EXPECTED_SUM)); + } + + @Test + void forEachCancel() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + CompletableFuture cancelled = new CompletableFuture<>(); + Single single = testMulti() + .onCancel(() -> cancelled.complete(null)) + .forEach(l -> latch.countDown()); + + single.cancel(); + // Wait for 1 item out of 5 + latch.await(50, TimeUnit.MILLISECONDS); + // Expect cancel eventually(100 millis had to be enough) + cancelled.get(100, TimeUnit.MILLISECONDS); + } + + @Test + void forEachAwaitTimeoutNegative() { + assertThrows(CompletionException.class, () -> testMulti() + .forEach(TestConsumer.noop()) + .await(10, TimeUnit.MILLISECONDS)); + } + + @Test + void singleAwait() { + assertThat(testSingle().await(), equalTo(EXPECTED_SUM)); + } + + @Test + void singleAwaitTimeout() { + assertThat(testSingle().await(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS), equalTo(EXPECTED_SUM)); + } + + @Test + void singleAwaitTimeoutNegative() { + assertThrows(CompletionException.class, () -> testSingle().await(10, TimeUnit.MILLISECONDS)); + } + + /** + * Return stream of 5 long numbers 0,1,2,3,4 emitted in interval of 20 millis, + * whole stream should be finished shortly after 100 millis. + * + * @return {@link io.helidon.common.reactive.Multi} + */ + private Multi testMulti() { + return Multi.interval(20, TimeUnit.MILLISECONDS, Executors.newSingleThreadScheduledExecutor()) + .limit(5); + } + + private Single testSingle() { + return testMulti().reduce(Long::sum); + } +} diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/SingleTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/SingleTest.java index 5df7f48ca..fd868f102 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/SingleTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/SingleTest.java @@ -15,6 +15,7 @@ */ package io.helidon.common.reactive; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; @@ -289,7 +290,7 @@ public class SingleTest { @Test public void testBadSingleToFuture() throws InterruptedException, TimeoutException { - Single single = new Single() { + Single single = new CompletionSingle() { @Override public void subscribe(Subscriber subscriber) { throw new IllegalStateException("foo!"); @@ -313,7 +314,7 @@ public class SingleTest { @Test public void testToFutureDoubleOnError() throws InterruptedException, TimeoutException { - Single single = new Single() { + Single single = new CompletionSingle() { @Override public void subscribe(Subscriber subscriber) { subscriber.onSubscribe(new Subscription() { @@ -338,7 +339,7 @@ public class SingleTest { @Test public void testToFutureDoubleOnNext() throws InterruptedException, ExecutionException { - Single single = new Single() { + Single single = new CompletionSingle() { @Override public void subscribe(Subscriber subscriber) { subscriber.onSubscribe(new Subscription() { @@ -386,7 +387,7 @@ public class SingleTest { public void testToFutureDoubleOnSubscribe() throws InterruptedException, ExecutionException { TestSubscription subscription1 = new TestSubscription(); TestSubscription subscription2 = new TestSubscription(); - Single single = new Single() { + Single single = new CompletionSingle() { @Override public void subscribe(Subscriber subscriber) { subscriber.onSubscribe(subscription1); diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/TestConsumer.java b/common/reactive/src/test/java/io/helidon/common/reactive/TestConsumer.java index 018b4d762..d473be6dd 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/TestConsumer.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/TestConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,4 +28,8 @@ class TestConsumer implements Consumer { public void accept(T t) { item = t; } + + static Consumer noop(){ + return t -> {}; + } } diff --git a/media/common/src/main/java/io/helidon/media/common/MessageBodyReaderContext.java b/media/common/src/main/java/io/helidon/media/common/MessageBodyReaderContext.java index 5ada119ba..869d2eb50 100644 --- a/media/common/src/main/java/io/helidon/media/common/MessageBodyReaderContext.java +++ b/media/common/src/main/java/io/helidon/media/common/MessageBodyReaderContext.java @@ -31,6 +31,7 @@ import io.helidon.common.GenericType; import io.helidon.common.http.DataChunk; import io.helidon.common.http.MediaType; import io.helidon.common.http.ReadOnlyParameters; +import io.helidon.common.reactive.CompletionSingle; import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; @@ -396,7 +397,7 @@ public final class MessageBodyReaderContext extends MessageBodyContext implement * Single from future. * @param item type */ - private static final class SingleFromCompletionStage implements Single { + private static final class SingleFromCompletionStage extends CompletionSingle { private final CompletionStage future; private Subscriber subscriber;