Multi await feature for intentional blocking (#1664)

* Multi await feature for intentional blocking
* Single implementing CompletionStage
This commit is contained in:
Daniel Kec
2020-05-13 18:31:57 +02:00
committed by GitHub
parent ba18d37b5b
commit 573cd6e602
34 changed files with 804 additions and 38 deletions

View File

@@ -0,0 +1,75 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Makes intentional blocking when waiting for {@link CompletableFuture} more convenient with {@link Awaitable#await()}
* and {@link Awaitable#await(long, java.util.concurrent.TimeUnit)} methods.
*
* @param <T> payload type
*/
public interface Awaitable<T> {
/**
* Returns a {@link java.util.concurrent.CompletableFuture} maintaining the same
* completion properties as this stage. If this stage is already a
* CompletableFuture, this method may return this stage itself.
* Otherwise, invocation of this method may be equivalent in
* effect to {@code thenApply(x -> x)}, but returning an instance
* of type {@code CompletableFuture}.
*
* @return the CompletableFuture
*/
CompletableFuture<T> toCompletableFuture();
/**
* Block until {@link CompletableFuture} is completed, throws only unchecked exceptions.
*
* @return T payload type
* @throws java.util.concurrent.CancellationException if the computation was cancelled
* @throws java.util.concurrent.CompletionException if this future completed
*/
default T await() {
return this.toCompletableFuture().join();
}
/**
* Block until {@link CompletableFuture} is completed, throws only unchecked exceptions.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the result value
* @throws java.util.concurrent.CancellationException if this future was cancelled
* @throws java.util.concurrent.CompletionException if this future completed exceptionally,
* was interrupted while waiting or the wait timed out
*/
default T await(long timeout, TimeUnit unit) {
try {
return this.toCompletableFuture().get(timeout, unit);
} catch (ExecutionException e) {
throw new CompletionException(e.getCause());
} catch (InterruptedException | TimeoutException e) {
throw new CompletionException(e);
}
}
}

View File

@@ -0,0 +1,301 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* {@link CompletionStage} wrapper enriched with {@link io.helidon.common.reactive.Awaitable}.
*
* @param <T> payload type
*/
public class CompletionAwaitable<T> implements CompletionStage<T>, Awaitable<T> {
private Supplier<CompletionStage<T>> originalStage;
CompletionAwaitable(Supplier<CompletionStage<T>> originalStage) {
this.originalStage = originalStage;
}
CompletionAwaitable() {
}
void setOriginalStage(final Supplier<CompletionStage<T>> originalStage) {
this.originalStage = originalStage;
}
@Override
public <U> CompletionAwaitable<U> thenApply(final Function<? super T, ? extends U> fn) {
CompletionStage<U> completionStage = originalStage.get().thenApply(fn);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> thenApplyAsync(final Function<? super T, ? extends U> fn) {
CompletionStage<U> completionStage = originalStage.get().thenApplyAsync(fn);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> thenApplyAsync(final Function<? super T, ? extends U> fn, final Executor executor) {
CompletionStage<U> completionStage = originalStage.get().thenApplyAsync(fn, executor);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> thenAccept(final Consumer<? super T> action) {
CompletionStage<Void> completionStage = originalStage.get().thenAccept(action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> thenAcceptAsync(final Consumer<? super T> action) {
CompletionStage<Void> completionStage = originalStage.get().thenAcceptAsync(action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> thenAcceptAsync(final Consumer<? super T> action, final Executor executor) {
CompletionStage<Void> completionStage = originalStage.get().thenAcceptAsync(action, executor);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> thenRun(final Runnable action) {
CompletionStage<Void> completionStage = originalStage.get().thenRun(action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> thenRunAsync(final Runnable action) {
CompletionStage<Void> completionStage = originalStage.get().thenRunAsync(action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> thenRunAsync(final Runnable action, final Executor executor) {
CompletionStage<Void> completionStage = originalStage.get().thenRunAsync(action, executor);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public <U, V> CompletionAwaitable<V> thenCombine(final CompletionStage<? extends U> other,
final BiFunction<? super T, ? super U, ? extends V> fn) {
CompletionStage<V> completionStage = originalStage.get().thenCombine(other, fn);
return new CompletionAwaitable<V>(() -> completionStage);
}
@Override
public <U, V> CompletionAwaitable<V> thenCombineAsync(final CompletionStage<? extends U> other,
final BiFunction<? super T, ? super U, ? extends V> fn) {
CompletionStage<V> completionStage = originalStage.get().thenCombineAsync(other, fn);
return new CompletionAwaitable<V>(() -> completionStage);
}
@Override
public <U, V> CompletionAwaitable<V> thenCombineAsync(final CompletionStage<? extends U> other,
final BiFunction<? super T, ? super U, ? extends V> fn,
final Executor executor) {
CompletionStage<V> completionStage = originalStage.get().thenCombineAsync(other, fn, executor);
return new CompletionAwaitable<V>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<Void> thenAcceptBoth(final CompletionStage<? extends U> other,
final BiConsumer<? super T, ? super U> action) {
CompletionStage<Void> completionStage = originalStage.get().thenAcceptBoth(other, action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<Void> thenAcceptBothAsync(final CompletionStage<? extends U> other,
final BiConsumer<? super T, ? super U> action) {
CompletionStage<Void> completionStage = originalStage.get().thenAcceptBothAsync(other, action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<Void> thenAcceptBothAsync(final CompletionStage<? extends U> other,
final BiConsumer<? super T, ? super U> action,
final Executor executor) {
CompletionStage<Void> completionStage = originalStage.get().thenAcceptBothAsync(other, action, executor);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> runAfterBoth(final CompletionStage<?> other,
final Runnable action) {
CompletionStage<Void> completionStage = originalStage.get().runAfterBoth(other, action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> runAfterBothAsync(final CompletionStage<?> other,
final Runnable action) {
CompletionStage<Void> completionStage = originalStage.get().runAfterBothAsync(other, action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> runAfterBothAsync(final CompletionStage<?> other,
final Runnable action,
final Executor executor) {
CompletionStage<Void> completionStage = originalStage.get().runAfterBothAsync(other, action, executor);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> applyToEither(final CompletionStage<? extends T> other,
final Function<? super T, U> fn) {
CompletionStage<U> completionStage = originalStage.get().applyToEither(other, fn);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> applyToEitherAsync(final CompletionStage<? extends T> other,
final Function<? super T, U> fn) {
CompletionStage<U> completionStage = originalStage.get().applyToEitherAsync(other, fn);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> applyToEitherAsync(final CompletionStage<? extends T> other,
final Function<? super T, U> fn,
final Executor executor) {
CompletionStage<U> completionStage = originalStage.get().applyToEitherAsync(other, fn, executor);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> acceptEither(final CompletionStage<? extends T> other,
final Consumer<? super T> action) {
CompletionStage<Void> completionStage = originalStage.get().acceptEither(other, action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> acceptEitherAsync(final CompletionStage<? extends T> other,
final Consumer<? super T> action) {
CompletionStage<Void> completionStage = originalStage.get().acceptEitherAsync(other, action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> acceptEitherAsync(final CompletionStage<? extends T> other,
final Consumer<? super T> action,
final Executor executor) {
CompletionStage<Void> completionStage = originalStage.get().acceptEitherAsync(other, action, executor);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> runAfterEither(final CompletionStage<?> other,
final Runnable action) {
CompletionStage<Void> completionStage = originalStage.get().runAfterEither(other, action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> runAfterEitherAsync(final CompletionStage<?> other,
final Runnable action) {
CompletionStage<Void> completionStage = originalStage.get().runAfterEitherAsync(other, action);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public CompletionAwaitable<Void> runAfterEitherAsync(final CompletionStage<?> other,
final Runnable action, final Executor executor) {
CompletionStage<Void> completionStage = originalStage.get().runAfterEitherAsync(other, action, executor);
return new CompletionAwaitable<Void>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> thenCompose(final Function<? super T, ? extends CompletionStage<U>> fn) {
CompletionStage<U> completionStage = originalStage.get().thenCompose(fn);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> thenComposeAsync(final Function<? super T, ? extends CompletionStage<U>> fn) {
CompletionStage<U> completionStage = originalStage.get().thenComposeAsync(fn);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> thenComposeAsync(final Function<? super T, ? extends CompletionStage<U>> fn,
final Executor executor) {
CompletionStage<U> completionStage = originalStage.get().thenComposeAsync(fn, executor);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> handle(final BiFunction<? super T, Throwable, ? extends U> fn) {
CompletionStage<U> completionStage = originalStage.get().handle(fn);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> handleAsync(final BiFunction<? super T, Throwable, ? extends U> fn) {
CompletionStage<U> completionStage = originalStage.get().handleAsync(fn);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public <U> CompletionAwaitable<U> handleAsync(final BiFunction<? super T, Throwable, ? extends U> fn,
final Executor executor) {
CompletionStage<U> completionStage = originalStage.get().handleAsync(fn, executor);
return new CompletionAwaitable<U>(() -> completionStage);
}
@Override
public CompletionAwaitable<T> whenComplete(final BiConsumer<? super T, ? super Throwable> action) {
CompletionStage<T> completionStage = originalStage.get().whenComplete(action);
return new CompletionAwaitable<T>(() -> completionStage);
}
@Override
public CompletionAwaitable<T> whenCompleteAsync(final BiConsumer<? super T, ? super Throwable> action) {
CompletionStage<T> completionStage = originalStage.get().whenCompleteAsync(action);
return new CompletionAwaitable<T>(() -> completionStage);
}
@Override
public CompletionAwaitable<T> whenCompleteAsync(final BiConsumer<? super T, ? super Throwable> action,
final Executor executor) {
CompletionStage<T> completionStage = originalStage.get().whenCompleteAsync(action, executor);
return new CompletionAwaitable<T>(() -> completionStage);
}
@Override
public CompletionAwaitable<T> exceptionally(final Function<Throwable, ? extends T> fn) {
CompletionStage<T> completionStage = originalStage.get().exceptionally(fn);
return new CompletionAwaitable<T>(() -> completionStage);
}
@Override
public CompletableFuture<T> toCompletableFuture() {
return originalStage.get().toCompletableFuture();
}
}

View File

@@ -0,0 +1,61 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
/**
* Single as CompletionStage.
*
* @param <T> payload type
*/
public abstract class CompletionSingle<T> extends CompletionAwaitable<T> implements Single<T> {
private final AtomicReference<CompletableFuture<T>> stageReference = new AtomicReference<>();
private final CompletableFuture<Void> cancelFuture = new CompletableFuture<>();
protected CompletionSingle() {
setOriginalStage(this::getLazyStage);
}
private CompletableFuture<T> getLazyStage() {
stageReference.compareAndSet(null, this.toNullableStage());
return stageReference.get();
}
private CompletableFuture<T> toNullableStage() {
SingleToFuture<T> subscriber = new SingleToFuture<>(true);
this.subscribe(subscriber);
return subscriber;
}
@Override
public Single<T> onCancel(final Runnable onCancel) {
cancelFuture.thenRun(onCancel);
return this;
}
@Override
public Single<T> cancel() {
Single<T> single = Single.super.cancel();
this.cancelFuture.complete(null);
return single;
}
}

View File

@@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
@@ -152,6 +153,18 @@ public interface Multi<T> extends Subscribable<T> {
return new MultiFromPublisher<>(source);
}
/**
* Create a {@link Multi} instance wrapped around the given {@link Single}.
*
* @param <T> item type
* @param single source {@link Single} publisher
* @return Multi
* @throws NullPointerException if source is {@code null}
*/
static <T> Multi<T> from(Single<T> single) {
return from((Publisher<T>) single);
}
/**
* Create a {@link Multi} instance that publishes the given {@link Stream}.
* <p>
@@ -842,10 +855,23 @@ public interface Multi<T> extends Subscribable<T> {
* Terminal stage, invokes provided consumer for every item in the stream.
*
* @param consumer consumer to be invoked for each item
* @return Single completed when the stream terminates
*/
default void forEach(Consumer<? super T> consumer) {
FunctionalSubscriber<T> subscriber = new FunctionalSubscriber<>(consumer, null, null, null);
default Single<Void> forEach(Consumer<? super T> consumer) {
CompletableFuture<Void> future = new CompletableFuture<>();
Single<Void> single = Single.from(future, true);
FunctionalSubscriber<T> subscriber = new FunctionalSubscriber<>(consumer,
future::completeExceptionally,
() -> future.complete(null),
subscription -> {
subscription.request(Long.MAX_VALUE);
single.onCancel(subscription::cancel);
}
);
this.subscribe(subscriber);
return single;
}
}

View File

@@ -26,7 +26,7 @@ import java.util.function.Supplier;
* @param <T> the element type of the upstream
* @param <U> the collection type
*/
final class MultiCollectPublisher<T, U> implements Single<U> {
final class MultiCollectPublisher<T, U> extends CompletionSingle<U> {
private final Multi<T> source;

View File

@@ -35,7 +35,7 @@ import java.util.stream.Collector;
* @param <A> the collection type
* @param <R> the result type
*/
final class MultiCollectorPublisher<T, A, R> implements Single<R> {
final class MultiCollectorPublisher<T, A, R> extends CompletionSingle<R> {
private final Multi<T> source;

View File

@@ -17,7 +17,7 @@ package io.helidon.common.reactive;
import java.util.concurrent.Flow;
final class MultiFirstPublisher<T> implements Single<T> {
final class MultiFirstPublisher<T> extends CompletionSingle<T> {
private final Multi<T> source;

View File

@@ -26,7 +26,7 @@ import java.util.function.BiFunction;
* the result as a Single.
* @param <T> the element type of the source and result
*/
final class MultiReduce<T> implements Single<T> {
final class MultiReduce<T> extends CompletionSingle<T> {
private final Multi<T> source;

View File

@@ -27,7 +27,7 @@ import java.util.function.Supplier;
* @param <T> the source value type
* @param <R> the accumulator and result type
*/
final class MultiReduceFull<T, R> implements Single<R> {
final class MultiReduceFull<T, R> extends CompletionSingle<R> {
private final Multi<T> source;

View File

@@ -27,6 +27,7 @@ import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
@@ -42,7 +43,7 @@ import io.helidon.common.mapper.Mapper;
* @param <T> item type
* @see Multi
*/
public interface Single<T> extends Subscribable<T> {
public interface Single<T> extends Subscribable<T>, CompletionStage<T>, Awaitable<T> {
// --------------------------------------------------------------------------------------------------------
// Factory (source-like) methods
@@ -130,6 +131,18 @@ public interface Single<T> extends Subscribable<T> {
return new SingleFromPublisher<>(source);
}
/**
* Create a {@link Single} instance that publishes the first and only item received from the given {@link Single}.
*
* @param <T> item type
* @param single source {@link Single} publisher
* @return Single
* @throws NullPointerException if source is {@code null}
*/
static <T> Single<T> from(Single<T> single) {
return from((Publisher<T>) single);
}
/**
* Create a {@link Single} instance that publishes the given item to its subscriber(s).
*
@@ -541,7 +554,7 @@ public interface Single<T> extends Subscribable<T> {
*/
default CompletionStage<T> toStage() {
try {
SingleToFuture<T> subscriber = new SingleToFuture<>();
SingleToFuture<T> subscriber = new SingleToFuture<>(false);
this.subscribe(subscriber);
return subscriber;
} catch (Throwable ex) {
@@ -550,4 +563,139 @@ public interface Single<T> extends Subscribable<T> {
return future;
}
}
/**
* Cancel upstream.
*
* @return new {@link Single} for eventually received single value.
*/
default Single<T> cancel() {
CompletableFuture<T> future = new CompletableFuture<>();
FunctionalSubscriber<T> subscriber = new FunctionalSubscriber<>(future::complete,
future::completeExceptionally,
() -> future.complete(null),
Flow.Subscription::cancel
);
this.subscribe(subscriber);
return Single.from(future);
}
@Override
<U> CompletionAwaitable<U> thenApply(Function<? super T, ? extends U> fn);
@Override
<U> CompletionAwaitable<U> thenApplyAsync(Function<? super T, ? extends U> fn);
@Override
<U> CompletionAwaitable<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor);
@Override
CompletionAwaitable<Void> thenAccept(Consumer<? super T> action);
@Override
CompletionAwaitable<Void> thenAcceptAsync(Consumer<? super T> action);
@Override
CompletionAwaitable<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
@Override
CompletionAwaitable<Void> thenRun(Runnable action);
@Override
CompletionAwaitable<Void> thenRunAsync(Runnable action);
@Override
CompletionAwaitable<Void> thenRunAsync(Runnable action, Executor executor);
@Override
<U, V> CompletionAwaitable<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn);
@Override
<U, V> CompletionAwaitable<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn);
@Override
<U, V> CompletionAwaitable<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn, Executor executor);
@Override
<U> CompletionAwaitable<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
@Override
<U> CompletionAwaitable<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
@Override
<U> CompletionAwaitable<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);
@Override
CompletionAwaitable<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
@Override
CompletionAwaitable<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
@Override
CompletionAwaitable<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);
@Override
<U> CompletionAwaitable<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
@Override
<U> CompletionAwaitable<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
@Override
<U> CompletionAwaitable<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor);
@Override
CompletionAwaitable<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
@Override
CompletionAwaitable<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
@Override
CompletionAwaitable<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor);
@Override
CompletionAwaitable<Void> runAfterEither(CompletionStage<?> other, Runnable action);
@Override
CompletionAwaitable<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
@Override
CompletionAwaitable<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);
@Override
<U> CompletionAwaitable<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
@Override
<U> CompletionAwaitable<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
@Override
<U> CompletionAwaitable<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);
@Override
<U> CompletionAwaitable<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
@Override
<U> CompletionAwaitable<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
@Override
<U> CompletionAwaitable<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
@Override
CompletionAwaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
@Override
CompletionAwaitable<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
@Override
CompletionAwaitable<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
@Override
CompletionAwaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
}

View File

@@ -23,7 +23,7 @@ import java.util.concurrent.Flow;
* Signal an item if the source is empty.
* @param <T> the element type
*/
final class SingleDefaultIfEmpty<T> implements Single<T> {
final class SingleDefaultIfEmpty<T> extends CompletionSingle<T> {
private final Single<T> source;

View File

@@ -25,7 +25,7 @@ import java.util.function.Supplier;
* Create a Single for each incoming subscriber via a supplier callback.
* @param <T> the element type of the sequence
*/
final class SingleDefer<T> implements Single<T> {
final class SingleDefer<T> extends CompletionSingle<T> {
private final Supplier<? extends Single<? extends T>> supplier;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +22,7 @@ import java.util.concurrent.Flow;
* invoking {@link java.util.concurrent.Flow.Subscriber#onComplete() } during
* {@link java.util.concurrent.Flow.Publisher#subscribe(java.util.concurrent.Flow.Subscriber)}.
*/
final class SingleEmpty implements Single<Object> {
final class SingleEmpty extends CompletionSingle<Object> {
/**
* Singleton instance.

View File

@@ -26,7 +26,7 @@ import java.util.concurrent.Flow.Subscriber;
*
* @param <T> item type
*/
final class SingleError<T> implements Single<T> {
final class SingleError<T> extends CompletionSingle<T> {
private final Throwable error;

View File

@@ -27,7 +27,7 @@ import java.util.function.Function;
* @param <T> the upstream value type
* @param <R> the result value type
*/
final class SingleFlatMapSingle<T, R> implements Single<R> {
final class SingleFlatMapSingle<T, R> extends CompletionSingle<R> {
private final Single<T> source;

View File

@@ -22,7 +22,7 @@ import java.util.concurrent.Flow;
* Signal the outcome of the give CompletionStage.
* @param <T> the element type of the source and result
*/
final class SingleFromCompletionStage<T> implements Single<T> {
final class SingleFromCompletionStage<T> extends CompletionSingle<T> {
private final CompletionStage<T> source;

View File

@@ -23,7 +23,7 @@ import java.util.concurrent.Flow;
* an error otherwise.
* @param <T> the element type of the flow
*/
final class SingleFromPublisher<T> implements Single<T> {
final class SingleFromPublisher<T> extends CompletionSingle<T> {
private final Flow.Publisher<T> source;

View File

@@ -23,7 +23,7 @@ import java.util.concurrent.Flow.Subscriber;
*
* @param <T> item type
*/
final class SingleJust<T> implements Single<T> {
final class SingleJust<T> extends CompletionSingle<T> {
private final T value;

View File

@@ -25,7 +25,7 @@ import io.helidon.common.mapper.Mapper;
* @param <T> the upstream value type
* @param <R> the result value type
*/
final class SingleMapperPublisher<T, R> implements Single<R> {
final class SingleMapperPublisher<T, R> extends CompletionSingle<R> {
private final Flow.Publisher<T> source;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +22,7 @@ import java.util.concurrent.Flow.Subscriber;
* {@link Subscriber#onComplete()} or
* {@link Subscriber#onError(java.lang.Throwable)}.
*/
final class SingleNever implements Single<Object> {
final class SingleNever extends CompletionSingle<Object> {
/**
* Singleton instance.

View File

@@ -23,7 +23,7 @@ import java.util.concurrent.Flow;
* Re-emit the item or terminal signals on the given executor's thread.
* @param <T> the element type
*/
final class SingleObserveOn<T> implements Single<T> {
final class SingleObserveOn<T> extends CompletionSingle<T> {
private final Single<T> source;

View File

@@ -24,7 +24,7 @@ import java.util.function.Function;
* If the upstream fails, generate a fallback success item via a function.
* @param <T> the element type of the sequence
*/
final class SingleOnErrorResume<T> implements Single<T> {
final class SingleOnErrorResume<T> extends CompletionSingle<T> {
private final Single<T> source;

View File

@@ -25,7 +25,7 @@ import java.util.function.Function;
* If the upstream fails, generate a fallback Single and emit its signals.
* @param <T> the element type of the source and fallback
*/
final class SingleOnErrorResumeWith<T> implements Single<T> {
final class SingleOnErrorResumeWith<T> extends CompletionSingle<T> {
private final Single<T> source;

View File

@@ -29,7 +29,7 @@ import java.util.function.BiPredicate;
* @param <T> the element type of the source
* @param <U> the signal type of the publisher indicating when to retry
*/
final class SingleRetry<T, U> implements Single<T> {
final class SingleRetry<T, U> extends CompletionSingle<T> {
private final Single<T> source;

View File

@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
* Switch to another Single if the main is empty.
* @param <T> the element type
*/
final class SingleSwitchIfEmpty<T> implements Single<T> {
final class SingleSwitchIfEmpty<T> extends CompletionSingle<T> {
private final Single<T> source;

View File

@@ -25,7 +25,7 @@ import java.util.concurrent.Flow;
* @param <T> the upstream and output value type
* @param <U> the other sequence indicating when the main sequence should stop
*/
final class SingleTakeUntilPublisher<T, U> implements Single<T> {
final class SingleTakeUntilPublisher<T, U> extends CompletionSingle<T> {
private final Single<T> source;

View File

@@ -31,7 +31,7 @@ import io.helidon.common.reactive.MultiTappedPublisher.RunnableChain;
* user callbacks.
* @param <T> the element type of the sequence
*/
final class SingleTappedPublisher<T> implements Single<T> {
final class SingleTappedPublisher<T> extends CompletionSingle<T> {
private final Single<T> source;

View File

@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
* or completes within a specified time window.
* @param <T> the element type
*/
final class SingleTimeout<T> implements Single<T> {
final class SingleTimeout<T> extends CompletionSingle<T> {
private final Single<T> source;

View File

@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
/**
* Signal 0L and complete after the specified time.
*/
final class SingleTimer implements Single<Long> {
final class SingleTimer extends CompletionSingle<Long> {
private final long time;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,11 @@ import java.util.concurrent.atomic.AtomicReference;
final class SingleToFuture<T> extends CompletableFuture<T> implements Subscriber<T> {
private final AtomicReference<Subscription> ref = new AtomicReference<>();
private final boolean completeWithoutValue;
SingleToFuture(boolean completeWithoutValue) {
this.completeWithoutValue = completeWithoutValue;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
@@ -71,7 +76,11 @@ final class SingleToFuture<T> extends CompletableFuture<T> implements Subscriber
@Override
public void onComplete() {
if (ref.getAndSet(null) != null) {
super.completeExceptionally(new IllegalStateException("Completed without value"));
if (completeWithoutValue) {
super.complete(null);
} else {
super.completeExceptionally(new IllegalStateException("Completed without value"));
}
}
}

View File

@@ -0,0 +1,140 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.testng.Assert.assertThrows;
import org.junit.jupiter.api.Test;
public class AwaitTest {
private static final long EXPECTED_SUM = 10L;
private static final long SAFE_WAIT_MILLIS = 200L;
@Test
void forEachAwait() {
AtomicLong sum = new AtomicLong();
testMulti()
.forEach(sum::addAndGet)
.await();
assertThat(sum.get(), equalTo(EXPECTED_SUM));
}
@Test
void forEachAwaitChain() {
AtomicLong sum = new AtomicLong();
AtomicLong completedTimes = new AtomicLong();
testMulti()
.forEach(sum::addAndGet)
.whenComplete((aVoid, throwable) -> completedTimes.incrementAndGet())
.whenComplete((aVoid, throwable) -> completedTimes.incrementAndGet())
.whenComplete((aVoid, throwable) -> completedTimes.incrementAndGet())
.thenRun(completedTimes::incrementAndGet)
.thenAccept(aVoid -> completedTimes.incrementAndGet())
.await();
assertThat(sum.get(), equalTo(EXPECTED_SUM));
assertThat(completedTimes.get(), is(5L));
}
@Test
void forEachWhenComplete() throws InterruptedException, ExecutionException, TimeoutException {
AtomicLong sum = new AtomicLong();
CompletableFuture<Void> completeFuture = new CompletableFuture<>();
testMulti()
.forEach(sum::addAndGet)
.whenComplete((aVoid, throwable) -> Optional.ofNullable(throwable)
.ifPresentOrElse(completeFuture::completeExceptionally, () -> completeFuture.complete(null)));
completeFuture.get(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS);
assertThat(sum.get(), equalTo(EXPECTED_SUM));
}
@Test
void forEachAwaitTimeout() {
AtomicLong sum = new AtomicLong();
testMulti()
.forEach(sum::addAndGet)
.await(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS);
assertThat(sum.get(), equalTo(EXPECTED_SUM));
}
@Test
void forEachCancel() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
CompletableFuture<Void> cancelled = new CompletableFuture<>();
Single<Void> single = testMulti()
.onCancel(() -> cancelled.complete(null))
.forEach(l -> latch.countDown());
single.cancel();
// Wait for 1 item out of 5
latch.await(50, TimeUnit.MILLISECONDS);
// Expect cancel eventually(100 millis had to be enough)
cancelled.get(100, TimeUnit.MILLISECONDS);
}
@Test
void forEachAwaitTimeoutNegative() {
assertThrows(CompletionException.class, () -> testMulti()
.forEach(TestConsumer.noop())
.await(10, TimeUnit.MILLISECONDS));
}
@Test
void singleAwait() {
assertThat(testSingle().await(), equalTo(EXPECTED_SUM));
}
@Test
void singleAwaitTimeout() {
assertThat(testSingle().await(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS), equalTo(EXPECTED_SUM));
}
@Test
void singleAwaitTimeoutNegative() {
assertThrows(CompletionException.class, () -> testSingle().await(10, TimeUnit.MILLISECONDS));
}
/**
* Return stream of 5 long numbers 0,1,2,3,4 emitted in interval of 20 millis,
* whole stream should be finished shortly after 100 millis.
*
* @return {@link io.helidon.common.reactive.Multi<Long>}
*/
private Multi<Long> testMulti() {
return Multi.interval(20, TimeUnit.MILLISECONDS, Executors.newSingleThreadScheduledExecutor())
.limit(5);
}
private Single<Long> testSingle() {
return testMulti().reduce(Long::sum);
}
}

View File

@@ -15,6 +15,7 @@
*/
package io.helidon.common.reactive;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
@@ -289,7 +290,7 @@ public class SingleTest {
@Test
public void testBadSingleToFuture() throws InterruptedException, TimeoutException {
Single<String> single = new Single<String>() {
Single<String> single = new CompletionSingle<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
throw new IllegalStateException("foo!");
@@ -313,7 +314,7 @@ public class SingleTest {
@Test
public void testToFutureDoubleOnError() throws InterruptedException, TimeoutException {
Single<String> single = new Single<String>() {
Single<String> single = new CompletionSingle<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new Subscription() {
@@ -338,7 +339,7 @@ public class SingleTest {
@Test
public void testToFutureDoubleOnNext() throws InterruptedException, ExecutionException {
Single<String> single = new Single<String>() {
Single<String> single = new CompletionSingle<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new Subscription() {
@@ -386,7 +387,7 @@ public class SingleTest {
public void testToFutureDoubleOnSubscribe() throws InterruptedException, ExecutionException {
TestSubscription subscription1 = new TestSubscription();
TestSubscription subscription2 = new TestSubscription();
Single<String> single = new Single<String>() {
Single<String> single = new CompletionSingle<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(subscription1);

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,4 +28,8 @@ class TestConsumer<T> implements Consumer<T> {
public void accept(T t) {
item = t;
}
static <T> Consumer<T> noop(){
return t -> {};
}
}

View File

@@ -31,6 +31,7 @@ import io.helidon.common.GenericType;
import io.helidon.common.http.DataChunk;
import io.helidon.common.http.MediaType;
import io.helidon.common.http.ReadOnlyParameters;
import io.helidon.common.reactive.CompletionSingle;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
@@ -396,7 +397,7 @@ public final class MessageBodyReaderContext extends MessageBodyContext implement
* Single from future.
* @param <T> item type
*/
private static final class SingleFromCompletionStage<T> implements Single<T> {
private static final class SingleFromCompletionStage<T> extends CompletionSingle<T> {
private final CompletionStage<? extends T> future;
private Subscriber<? super T> subscriber;