Update to support multi.

Signed-off-by: Tomas Langer <tomas.langer@oracle.com>
This commit is contained in:
Tomas Langer
2020-07-01 18:04:12 +02:00
committed by Santiago Pericasgeertsen
parent c11ee85a0d
commit 196c91690f
30 changed files with 2384 additions and 527 deletions

View File

@@ -106,13 +106,20 @@ public final class LogConfig {
logConfigStream = new BufferedInputStream(Files.newInputStream(path));
source = "file: " + path.toAbsolutePath();
} else {
// second look for classpath (only the first one)
// second look for classpath (only the first one in production classpath)
InputStream resourceStream = LogConfig.class.getResourceAsStream("/" + LOGGING_FILE);
if (resourceStream != null) {
logConfigStream = new BufferedInputStream(resourceStream);
source = "classpath: /" + LOGGING_FILE;
} else {
return source;
// once more for the current classloader
resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(LOGGING_FILE);
if (resourceStream != null) {
logConfigStream = new BufferedInputStream(resourceStream);
source = "context classpath: /" + LOGGING_FILE;
} else {
return source;
}
}
}

View File

@@ -27,7 +27,7 @@ import java.util.function.LongConsumer;
* user callbacks.
* @param <T> the element type of the sequence
*/
final class MultiTappedPublisher<T> implements Multi<T> {
public final class MultiTappedPublisher<T> implements Multi<T> {
private final Multi<T> source;
@@ -59,6 +59,20 @@ final class MultiTappedPublisher<T> implements Multi<T> {
this.onCancelCallback = onCancelCallback;
}
private MultiTappedPublisher(Builder<T> builder) {
this(builder.source,
builder.onSubscribeCallback,
builder.onNextCallback,
builder.onErrorCallback,
builder.onCompleteCallback,
builder.onRequestCallback,
builder.onCancelCallback);
}
public static <T> Builder<T> builder(Multi<T> source) {
return new Builder<>(source);
}
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");
@@ -280,4 +294,58 @@ final class MultiTappedPublisher<T> implements Multi<T> {
}
}
}
public static class Builder<T> implements io.helidon.common.Builder<MultiTappedPublisher<T>> {
private final Multi<T> source;
private Consumer<? super Flow.Subscription> onSubscribeCallback;
private Consumer<? super T> onNextCallback;
private Runnable onCompleteCallback;
private LongConsumer onRequestCallback;
private Runnable onCancelCallback;
private Consumer<? super Throwable> onErrorCallback;
private Builder(Multi<T> source) {
this.source = source;
}
@Override
public MultiTappedPublisher<T> build() {
return new MultiTappedPublisher<>(this);
}
Builder<T> onSubscribeCallback(Consumer<? super Flow.Subscription> onSubscribeCallback) {
this.onSubscribeCallback = onSubscribeCallback;
return this;
}
public Builder<T> onSubscribeCallback(Runnable onSubscribeCallback) {
this.onSubscribeCallback = subscription -> onSubscribeCallback.run();
return this;
}
public Builder<T> onNextCallback(Consumer<? super T> onNextCallback) {
this.onNextCallback = onNextCallback;
return this;
}
public Builder<T> onCompleteCallback(Runnable onCompleteCallback) {
this.onCompleteCallback = onCompleteCallback;
return this;
}
public Builder<T> onRequestCallback(LongConsumer onRequestCallback) {
this.onRequestCallback = onRequestCallback;
return this;
}
public Builder<T> onCancelCallback(Runnable onCancelCallback) {
this.onCancelCallback = onCancelCallback;
return this;
}
public Builder<T> onErrorCallback(Consumer<? super Throwable> onErrorCallback) {
this.onErrorCallback = onErrorCallback;
return this;
}
}
}

View File

@@ -9,7 +9,10 @@ existing APIs).
The FT requires executor service (or more) to handle some of the features provided. To be able to
configure FT for the whole application, a set of static methods exists on class `FaultTolerance`.
- `FaultTolerance.config(Config)` - use a Helidon config instance to configure defaults
- `FaultTolerance.config(Config)` - use a Helidon config instance to configure defaults
- `FaultTolerance.executor(Supplier)` - Helidon wide executor service for async operation
- `FaultTolerance.scheduledExecutor(Supplier)` - Helidon wide scheduled executor service for scheduling retries,
timeouts and similar
# Asynchronous
Provides an asynchronous execution for a blocking operation. As Helidon SE network stack is using a
@@ -24,9 +27,8 @@ Limits the number of parallel calls to a single resource.
Configuration:
- parallel execution limit
- executor service
- queue
- number of queued records
- executor service - needed to process enqueued records
# Circuit Breaker
Defines a circuit breaker policy to an individual method or a class.
@@ -38,7 +40,8 @@ Half-open: In half-open state, trial executions of the service are allowed. By d
Circuit state transitions will reset the circuit breaker's records.
Configuration:
- fail on `<? extends Throwable>` - these are failures
- scheduled executor service
- apply on `<? extends Throwable>` - these are failures
- skip on `<? extends Throwable>` - these are not failures
- delay (duration) - how long before transitioning from open to half-open
- volume threshold - rolling window size
@@ -56,7 +59,7 @@ May provide a context of execution with information such as
Configuration:
- fallback method/handler
- applyOn `<? extends Throwable>` - these are failures
- apply On `<? extends Throwable>` - these are failures
- skip on `<? extends Throwable>` - these are not failures
# Retry
@@ -67,11 +70,17 @@ Configuration:
- delay between retries (duration)
- overall maximal duration
- jitter (randomize delays a bit - duration) - a jitter of 200 ms will randomly add between -200 and 200 milliseconds to each retry delay.
- retry on `<? extends Throwable>` - these are failures
- abort on `<? extends Throwable>` - these will immediately abort retries
- apply on `<? extends Throwable>` - these are failures
- skip on `<? extends Throwable>` - these will immediately abort retries
# Timeout
Can be simply replaced with `Single.timeout`
Configuration:
- overall timeout (duration)
TODO:
- native image support
- features (experimental)
- documentation

View File

@@ -49,5 +49,9 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.config</groupId>
<artifactId>helidon-config-testing</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -16,45 +16,68 @@
package io.helidon.faulttolerance;
import java.util.concurrent.CompletableFuture;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
import io.helidon.common.reactive.Single;
public class Async {
private final LazyValue<? extends ExecutorService> executor;
/**
* Support for asynchronous execution of synchronous (blocking) calls.
* While this does not miraculously make a synchronous call non-blocking, it at least moves
* the blocking calls to a separate executor service, degrading the service gracefully.
* <p>
* Example of usage:
* <pre>
* // async instance with default executor service
* Async async = Async.create();
*
* // call a method with no parameters
* Single&lt;String> result = async.invoke(this::slowSync);
*
* // call a method with parameters
* async.invoke(() -> processRequest(request))
* .thenApply(response::send);
*
* // use async to obtain a Multi (from a method returning List of strings)
* Multi&lt;String> stringMulti = async.invoke(this::syncList)
* .flatMap(Multi::create);
* </pre>
*/
public interface Async {
/**
* Invoke a synchronous operation asynchronously.
* This method never throws an exception. Any exception is returned via the {@link io.helidon.common.reactive.Single}
* result.
*
* @param supplier supplier of value (may be a method reference)
* @param <T> type of returned value
* @return a Single that is a "promise" of the future result
*/
<T> Single<T> invoke(Supplier<T> supplier);
public Async(Builder builder) {
this.executor = LazyValue.create(builder.executor);
/**
* Async with default executor service.
*
* @return a default async instance
*/
static Async create() {
return AsyncImpl.DefaultAsyncInstance.instance();
}
public static Async create() {
return builder().build();
}
public <T> Single<T> invoke(Supplier<T> supplier) {
CompletableFuture<T> future = new CompletableFuture<>();
executor.get()
.submit(() -> {
try {
T result = supplier.get();
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return Single.create(future);
}
public static Builder builder() {
/**
* A new builder to build a customized {@link io.helidon.faulttolerance.Async} instance.
* @return a new builder
*/
static Builder builder() {
return new Builder();
}
public static class Builder implements io.helidon.common.Builder<Async> {
/**
* Fluent API Builder for {@link io.helidon.faulttolerance.Async}.
*/
class Builder implements io.helidon.common.Builder<Async> {
private LazyValue<? extends ExecutorService> executor = FaultTolerance.executor();
private Builder() {
@@ -62,12 +85,34 @@ public class Async {
@Override
public Async build() {
return new Async(this);
return new AsyncImpl(this);
}
public Builder executor(Supplier<ExecutorService> executor) {
this.executor = LazyValue.create(executor);
/**
* Configure executor service to use for executing tasks asynchronously.
*
* @param executor executor service supplier
* @return updated builder instance
*/
public Builder executor(Supplier<? extends ExecutorService> executor) {
this.executor = LazyValue.create(Objects.requireNonNull(executor));
return this;
}
/**
* Configure executor service to use for executing tasks asynchronously.
*
* @param executor executor service
* @return updated builder instance
*/
public Builder executor(ExecutorService executor) {
this.executor = LazyValue.create(Objects.requireNonNull(executor));
return this;
}
LazyValue<? extends ExecutorService> executor() {
return executor;
}
}
}

View File

@@ -0,0 +1,76 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.faulttolerance;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
import io.helidon.common.reactive.Single;
class AsyncImpl implements Async {
private final LazyValue<? extends ExecutorService> executor;
AsyncImpl(Builder builder) {
this.executor = LazyValue.create(builder.executor());
}
@Override
public <T> Single<T> invoke(Supplier<T> supplier) {
CompletableFuture<T> future = new CompletableFuture<>();
AsyncTask<T> task = new AsyncTask<>(supplier, future);
try {
executor.get().submit(task);
} catch (Throwable e) {
// rejected execution and other executor related issues
return Single.error(e);
}
return Single.create(future);
}
private static class AsyncTask<T> implements Runnable {
private final Supplier<T> supplier;
private final CompletableFuture<T> future;
private AsyncTask(Supplier<T> supplier, CompletableFuture<T> future) {
this.supplier = supplier;
this.future = future;
}
@Override
public void run() {
try {
T result = supplier.get();
future.complete(result);
} catch (Throwable e) {
future.completeExceptionally(e);
}
}
}
static final class DefaultAsyncInstance {
private static final Async INSTANCE = Async.builder()
.build();
static Async instance() {
return INSTANCE;
}
}
}

View File

@@ -16,120 +16,113 @@
package io.helidon.faulttolerance;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
import io.helidon.common.reactive.Single;
public class Bulkhead implements Handler {
private final Queue<Enqueued<?>> queue;
private final Semaphore inProgress;
private Bulkhead(Builder builder) {
this.inProgress = new Semaphore(builder.limit, true);
if (builder.queueLength == 0) {
queue = new NoQueue();
} else {
this.queue = new LinkedBlockingQueue<>(builder.queueLength);
}
}
public static Builder builder() {
/**
* Bulkhead protects a resource that cannot serve unlimited parallel
* requests.
* <p>
* When the limit of parallel execution is reached, requests are enqueued
* until the queue length is reached. Once both the limit and queue are full,
* additional attempts to invoke will end with a failed response with
* {@link io.helidon.faulttolerance.BulkheadException}.
*/
public interface Bulkhead extends Handler {
/**
* A new builder for {@link io.helidon.faulttolerance.Bulkhead}.
*
* @return a new builder
*/
static Builder builder() {
return new Builder();
}
@SuppressWarnings("unchecked")
@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
if (inProgress.tryAcquire()) {
CompletionStage<T> result = supplier.get();
/**
* Fluent API builder for {@link io.helidon.faulttolerance.Bulkhead}.
*/
class Builder implements io.helidon.common.Builder<Bulkhead> {
private static final int DEFAULT_LIMIT = 10;
private static final int DEFAULT_QUEUE_LENGTH = 10;
result.handle((it, throwable) -> {
// we still have an acquired semaphore
Enqueued<?> polled = queue.poll();
while (polled != null) {
invokeEnqueued((Enqueued<Object>) polled);
polled = queue.poll();
}
inProgress.release();
return null;
});
return Single.create(result);
} else {
Enqueued<T> enqueued = new Enqueued<>(supplier);
if (!queue.offer(enqueued)) {
return Single.error(new BulkheadException("Bulkhead queue is full"));
}
return Single.create(enqueued.future());
}
}
private void invokeEnqueued(Enqueued<Object> enqueued) {
CompletableFuture<Object> future = enqueued.future();
CompletionStage<Object> completionStage = enqueued.originalStage();
completionStage.thenAccept(future::complete);
completionStage.exceptionally(throwable -> {
future.completeExceptionally(throwable);
return null;
});
}
private static class Enqueued<T> {
private LazyValue<CompletableFuture<T>> resultFuture = LazyValue.create(CompletableFuture::new);
private Supplier<? extends CompletionStage<T>> supplier;
private Enqueued(Supplier<? extends CompletionStage<T>> supplier) {
this.supplier = supplier;
}
private CompletableFuture<T> future() {
return resultFuture.get();
}
private CompletionStage<T> originalStage() {
return supplier.get();
}
}
public static class Builder implements io.helidon.common.Builder<Bulkhead> {
private int limit;
private int queueLength = 10;
private LazyValue<? extends ExecutorService> executor = FaultTolerance.executor();
private int limit = DEFAULT_LIMIT;
private int queueLength = DEFAULT_QUEUE_LENGTH;
private String name = "Bulkhead-" + System.identityHashCode(this);
private Builder() {
}
@Override
public Bulkhead build() {
return new Bulkhead(this);
return new BulkheadImpl(this);
}
/**
* Configure executor service to use for executing tasks asynchronously.
*
* @param executor executor service supplier
* @return updated builder instance
*/
public Builder executor(Supplier<? extends ExecutorService> executor) {
this.executor = LazyValue.create(Objects.requireNonNull(executor));
return this;
}
/**
* Maximal number of parallel requests going through this bulkhead.
* When the limit is reached, additional requests are enqueued.
*
* @param limit maximal number of parallel calls, defaults is {@value DEFAULT_LIMIT}
* @return updated builder instance
*/
public Builder limit(int limit) {
this.limit = limit;
return this;
}
/**
* Maximal number of enqueued requests waiting for processing.
* When the limit is reached, additional attempts to invoke
* a request will receive a {@link io.helidon.faulttolerance.BulkheadException}.
*
* @param queueLength length of queue
* @return updated builder instance
*/
public Builder queueLength(int queueLength) {
this.queueLength = queueLength;
return this;
}
}
private static class NoQueue extends ArrayDeque<Enqueued<?>> {
@Override
public boolean offer(Enqueued<?> enqueued) {
return false;
/**
* Name is useful for debugging and in exception handling.
*
* @param name name of this bulkhead
* @return updated builder instance
*/
public Builder name(String name) {
this.name = name;
return this;
}
@Override
public Enqueued<?> poll() {
return null;
int limit() {
return limit;
}
int queueLength() {
return queueLength;
}
LazyValue<? extends ExecutorService> executor() {
return executor;
}
String name() {
return name;
}
}
}

View File

@@ -16,8 +16,12 @@
package io.helidon.faulttolerance;
/**
* Failure because of {@link io.helidon.faulttolerance.Bulkhead} issues, most likely that the bulkhead does
* not allow any more queued tasks.
*/
public class BulkheadException extends RuntimeException {
public BulkheadException(String message) {
BulkheadException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,111 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.faulttolerance;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import java.util.logging.Logger;
import io.helidon.common.LazyValue;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
class BulkheadImpl implements Bulkhead {
private static final Logger LOGGER = Logger.getLogger(BulkheadImpl.class.getName());
private final LazyValue<? extends ExecutorService> executor;
private final Queue<DelayedTask<?>> queue;
private final Semaphore inProgress;
private final String name;
BulkheadImpl(Bulkhead.Builder builder) {
this.executor = builder.executor();
this.inProgress = new Semaphore(builder.limit(), true);
this.name = builder.name();
if (builder.queueLength() == 0) {
queue = new NoQueue();
} else {
this.queue = new LinkedBlockingQueue<>(builder.queueLength());
}
}
@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
return invokeTask(DelayedTask.createSingle(supplier));
}
@Override
public <T> Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
return invokeTask(DelayedTask.createMulti(supplier));
}
// this method must be called while NOT holding a permit
private <R> R invokeTask(DelayedTask<R> task) {
if (inProgress.tryAcquire()) {
LOGGER.finest(() -> name + " invoke immediate: " + task);
// free permit, we can invoke
execute(task);
return task.result();
} else {
// no free permit, let's try to enqueue
if (queue.offer(task)) {
LOGGER.finest(() -> name + " enqueue: " + task);
return task.result();
} else {
LOGGER.finest(() -> name + " reject: " + task);
return task.error(new BulkheadException("Bulkhead queue \"" + name + "\" is full"));
}
}
}
// this method must be called while holding a permit
private void execute(DelayedTask<?> task) {
task.execute()
.thenRun(() -> {
LOGGER.finest(() -> name + " finished execution: " + task);
DelayedTask<?> polled = queue.poll();
if (polled != null) {
LOGGER.finest(() -> name + " invoke in executor: " + polled);
// chain executions from queue until all are executed
executor.get().submit(() -> execute(polled));
} else {
LOGGER.finest(() -> name + " permit released after: " + task);
// nothing in the queue, release permit
inProgress.release();
}
});
}
private static class NoQueue extends ArrayDeque<DelayedTask<?>> {
@Override
public boolean offer(DelayedTask delayedTask) {
return false;
}
@Override
public DelayedTask poll() {
return null;
}
}
}

View File

@@ -17,157 +17,77 @@
package io.helidon.faulttolerance;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
import io.helidon.common.reactive.Single;
import static io.helidon.faulttolerance.ResultWindow.Result.FAILURE;
import static io.helidon.faulttolerance.ResultWindow.Result.SUCCESS;
public class CircuitBreaker implements Handler {
/*
Configuration options
/**
* CircuitBreaker protects a potentially failing endpoint from overloading and the application
* from spending resources on such a failing endpoints.
* <p>
* In case too many errors are detected, the circuit opens and all new requests fail with a
* {@link io.helidon.faulttolerance.CircuitBreakerOpenException} for a period of time.
* After this period, attempts are made to check if the service is up again - if so, the circuit closes
* and requests can process as usual again.
*/
public interface CircuitBreaker extends Handler {
/**
* Builder to customize configuration of the breaker.
* @return a new builder
*/
private final LazyValue<? extends ScheduledExecutorService> executor;
// how long to transition from open to half-open
private final long delayMillis;
// how many successful calls will close a half-open breaker
private final int successThreshold;
/*
Runtime
*/
private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
// rolling window for counting errors to (maybe) open the breaker
private final ResultWindow results;
// to close from half-open
private final AtomicInteger successCounter = new AtomicInteger();
private final AtomicBoolean halfOpenInProgress = new AtomicBoolean();
private final AtomicReference<ScheduledFuture<Boolean>> schedule = new AtomicReference<>();
private CircuitBreaker(Builder builder) {
this.delayMillis = builder.delay.toMillis();
this.successThreshold = builder.successThreshold;
this.results = new ResultWindow(builder.volume, builder.ratio);
this.executor = builder.executor();
}
public static Builder builder() {
static Builder builder() {
return new Builder();
}
@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
if (state.get() == State.CLOSED) {
// run it!
CompletionStage<T> result = supplier.get();
result.handle((it, exception) -> {
if (exception == null) {
// success
results.update(SUCCESS);
} else {
results.update(FAILURE);
if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) {
results.reset();
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
}
/**
* Current breaker state.
* As the state may change within nanoseconds, this is for information only.
* @return current breaker state
*/
State state();
return it;
});
return Single.create(result);
} else if (state.get() == State.OPEN) {
// fail it!
return Single.error(new CircuitBreakerOpenException("CircuitBreaker is open"));
} else {
// half-open
if (halfOpenInProgress.compareAndSet(false, true)) {
CompletionStage<T> result = supplier.get();
result.handle((it, exception) -> {
if (exception == null) {
// success
int successes = successCounter.incrementAndGet();
if (successes >= successThreshold) {
// transition to closed
successCounter.set(0);
state.compareAndSet(State.HALF_OPEN, State.CLOSED);
halfOpenInProgress.set(false);
}
halfOpenInProgress.set(false);
} else {
// failure
successCounter.set(0);
state.set(State.OPEN);
halfOpenInProgress.set(false);
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
/**
* Set state of this circuit breaker.
* Note that all usual processes to re-close or open the circuit are in progress.
* <ul>
* <li>If set to {@link State#OPEN}, a timer will set it to half open in a
* while</li>
* <li>If set to {@link State#HALF_OPEN}, it may close after first successful request</li>
* <li>If set to {@link State#CLOSED}, it may open again if requests fail</li>
* </ul>
* So a subsequent call to {@link #state()} may yield a different state than configured here
* @param newState state to configure
*/
void state(State newState);
return it;
});
return Single.create(result);
} else {
return Single
.error(new CircuitBreakerOpenException("CircuitBreaker is half open, parallel execution in progress"));
}
}
/**
* {@link io.helidon.faulttolerance.CircuitBreaker} states.
*/
enum State {
/**
* Circuit is closed and requests are processed.
*/
CLOSED,
/**
* Circuit is half open and some test requests are processed, others fail with
* {@link io.helidon.faulttolerance.CircuitBreakerOpenException}.
*/
HALF_OPEN,
/**
* Circuit is open and all requests fail with {@link io.helidon.faulttolerance.CircuitBreakerOpenException}.
*/
OPEN
}
private void scheduleHalf() {
schedule.set(executor.get()
.schedule(() -> {
state.compareAndSet(State.OPEN, State.HALF_OPEN);
schedule.set(null);
return true;
}, delayMillis, TimeUnit.MILLISECONDS));
}
public State state() {
return state.get();
}
public void state(State newState) {
if (newState == State.CLOSED) {
if (state.get() == State.CLOSED) {
// fine
resetCounters();
return;
}
ScheduledFuture<Boolean> future = schedule.getAndSet(null);
if (future != null) {
future.cancel(false);
}
resetCounters();
state.set(State.CLOSED);
} else if (newState == State.OPEN) {
state.set(State.OPEN);
ScheduledFuture<Boolean> future = schedule.getAndSet(null);
if (future != null) {
future.cancel(false);
}
resetCounters();
} else {
// half open
resetCounters();
}
}
private void resetCounters() {
results.reset();
successCounter.set(0);
}
public static class Builder implements io.helidon.common.Builder<CircuitBreaker> {
/**
* Fluent API builder for {@link io.helidon.faulttolerance.CircuitBreaker}.
*/
class Builder implements io.helidon.common.Builder<CircuitBreaker> {
private final Set<Class<? extends Throwable>> skipOn = new HashSet<>();
private final Set<Class<? extends Throwable>> applyOn = new HashSet<>();
// how long to transition from open to half-open
private Duration delay = Duration.ofSeconds(5);
// how many percents of failures will open the breaker
@@ -183,38 +103,153 @@ public class CircuitBreaker implements Handler {
@Override
public CircuitBreaker build() {
return new CircuitBreaker(this);
return new CircuitBreakerImpl(this);
}
/**
* How long to wait before transitioning from open to half-open state.
*
* @param delay to wait
* @return updated builder instance
*/
public Builder delay(Duration delay) {
this.delay = delay;
return this;
}
public Builder ratio(int ratio) {
/**
* How many failures out of 100 will trigger the circuit to open.
* This is adapted to the {@link #volume(int)} used to handle the window of requests.
* <p>If errorRatio is 40, and volume is 10, 4 failed requests will open the circuit.
*
* @param ratio percent of failure that trigger the circuit to open
* @return updated builder instance
* @see #volume(int)
*/
public Builder errorRatio(int ratio) {
this.ratio = ratio;
return this;
}
/**
* How many successful calls will close a half-open circuit.
* Nevertheless the first failed call will open the circuit again.
*
* @param successThreshold number of calls
* @return updated builder instance
*/
public Builder successThreshold(int successThreshold) {
this.successThreshold = successThreshold;
return this;
}
/**
* Rolling window size used to calculate ratio of failed requests.
*
* @param volume how big a window is used to calculate error errorRatio
* @return updated builder instance
* @see #errorRatio(int)
*/
public Builder volume(int volume) {
this.volume = volume;
return this;
}
/**
* These throwables will be considered failures, and all other will not.
* <p>
* Cannot be combined with {@link #skipOn}.
*
* @param classes to consider failures to calculate failure ratio
* @return updated builder instance
*/
public Builder applyOn(Class<? extends Throwable>... classes) {
applyOn.clear();
Arrays.stream(classes)
.forEach(this::addApplyOn);
return this;
}
/**
* Add a throwable to be considered a failure.
*
* @param clazz to consider failure to calculate failure ratio
* @return updated builder instance
* @see #applyOn
*/
public Builder addApplyOn(Class<? extends Throwable> clazz) {
this.applyOn.add(clazz);
return this;
}
/**
* These throwables will not be considered failures, all other will.
* <p>
* Cannot be combined with {@link #applyOn}.
*
* @param classes to consider successful
* @return updated builder instance
*/
public Builder skipOn(Class<? extends Throwable>... classes) {
skipOn.clear();
Arrays.stream(classes)
.forEach(this::addSkipOn);
return this;
}
/**
* This throwable will not be considered failure.
* <p>
*
* @param clazz to consider successful
* @return updated builder instance
*/
public Builder addSkipOn(Class<? extends Throwable> clazz) {
this.skipOn.add(clazz);
return this;
}
/**
* Executor service to schedule future tasks.
* By default uses an executor configured on
* {@link io.helidon.faulttolerance.FaultTolerance#scheduledExecutor(java.util.function.Supplier)}.
*
* @param scheduledExecutor executor to use
* @return updated builder instance
*/
public Builder executor(ScheduledExecutorService scheduledExecutor) {
this.executor = LazyValue.create(scheduledExecutor);
return this;
}
LazyValue<? extends ScheduledExecutorService> executor() {
return executor;
}
}
public enum State {
CLOSED,
HALF_OPEN,
OPEN
}
Set<Class<? extends Throwable>> skipOn() {
return skipOn;
}
Set<Class<? extends Throwable>> applyOn() {
return applyOn;
}
Duration delay() {
return delay;
}
int errorRatio() {
return ratio;
}
int successThreshold() {
return successThreshold;
}
int volume() {
return volume;
}
}
}

View File

@@ -0,0 +1,180 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.faulttolerance;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import static io.helidon.faulttolerance.ResultWindow.Result.FAILURE;
import static io.helidon.faulttolerance.ResultWindow.Result.SUCCESS;
public class CircuitBreakerImpl implements CircuitBreaker {
/*
Configuration options
*/
private final LazyValue<? extends ScheduledExecutorService> executor;
// how long to transition from open to half-open
private final long delayMillis;
// how many successful calls will close a half-open breaker
private final int successThreshold;
/*
Runtime
*/
private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
// rolling window for counting errors to (maybe) open the breaker
private final ResultWindow results;
// to close from half-open
private final AtomicInteger successCounter = new AtomicInteger();
private final AtomicBoolean halfOpenInProgress = new AtomicBoolean();
private final AtomicReference<ScheduledFuture<Boolean>> schedule = new AtomicReference<>();
private final ErrorChecker errorChecker;
CircuitBreakerImpl(CircuitBreaker.Builder builder) {
this.delayMillis = builder.delay().toMillis();
this.successThreshold = builder.successThreshold();
this.results = new ResultWindow(builder.volume(), builder.errorRatio());
this.executor = builder.executor();
this.errorChecker = ErrorChecker.create(builder.skipOn(), builder.applyOn());
}
@Override
public <T> Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
return invokeTask(DelayedTask.createMulti(supplier));
}
@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
return invokeTask(DelayedTask.createSingle(supplier));
}
private <U> U invokeTask(DelayedTask<U> task) {
if (state.get() == State.CLOSED) {
// run it!
CompletionStage<Void> completion = task.execute();
completion.handle((it, throwable) -> {
Throwable exception = FaultTolerance.cause(throwable);
if (exception == null || errorChecker.shouldSkip(exception)) {
// success
results.update(SUCCESS);
} else {
results.update(FAILURE);
if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) {
results.reset();
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
}
return it;
});
return task.result();
} else if (state.get() == State.OPEN) {
// fail it!
return task.error(new CircuitBreakerOpenException("CircuitBreaker is open"));
} else {
// half-open
if (halfOpenInProgress.compareAndSet(false, true)) {
CompletionStage<Void> result = task.execute();
result.handle((it, throwable) -> {
Throwable exception = FaultTolerance.cause(throwable);
if (exception == null || errorChecker.shouldSkip(exception)) {
// success
int successes = successCounter.incrementAndGet();
if (successes >= successThreshold) {
// transition to closed
successCounter.set(0);
state.compareAndSet(State.HALF_OPEN, State.CLOSED);
halfOpenInProgress.set(false);
}
halfOpenInProgress.set(false);
} else {
// failure
successCounter.set(0);
state.set(State.OPEN);
halfOpenInProgress.set(false);
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
return it;
});
return task.result();
} else {
return task
.error(new CircuitBreakerOpenException("CircuitBreaker is half open, parallel execution in progress"));
}
}
}
private void scheduleHalf() {
schedule.set(executor.get()
.schedule(() -> {
state.compareAndSet(State.OPEN, State.HALF_OPEN);
schedule.set(null);
return true;
}, delayMillis, TimeUnit.MILLISECONDS));
}
public State state() {
return state.get();
}
public void state(State newState) {
if (newState == State.CLOSED) {
if (state.get() == State.CLOSED) {
// fine
resetCounters();
return;
}
ScheduledFuture<Boolean> future = schedule.getAndSet(null);
if (future != null) {
future.cancel(false);
}
resetCounters();
state.set(State.CLOSED);
} else if (newState == State.OPEN) {
state.set(State.OPEN);
ScheduledFuture<Boolean> future = schedule.getAndSet(null);
if (future != null) {
future.cancel(false);
}
resetCounters();
} else {
// half open
resetCounters();
}
}
private void resetCounters() {
results.reset();
successCounter.set(0);
}
}

View File

@@ -16,6 +16,9 @@
package io.helidon.faulttolerance;
/**
* Failure because {@link io.helidon.faulttolerance.CircuitBreaker} is open and does not accept requests.
*/
public class CircuitBreakerOpenException extends RuntimeException {
public CircuitBreakerOpenException(String message) {
super(message);

View File

@@ -0,0 +1,147 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.faulttolerance;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.MultiTappedPublisher;
import io.helidon.common.reactive.Single;
interface DelayedTask<T> {
// the result completes when the call fully completes (regardless of errors)
CompletionStage<Void> execute();
// get the (new) result
T result();
// create an error result
T error(Throwable throwable);
// cannot retry or fallback when data was already sent (only useful for multi)
default boolean hadData() {
return false;
}
static <T> DelayedTask<Multi<T>> createMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
return new DelayedTask<>() {
final AtomicBoolean completed = new AtomicBoolean();
final AtomicBoolean hasData = new AtomicBoolean();
final LazyValue<CompletableFuture<Void>> completionMarker = LazyValue.create(CompletableFuture::new);
final LazyValue<CompletableFuture<Flow.Publisher<T>>> publisherFuture = LazyValue.create(CompletableFuture::new);
final LazyValue<Multi<T>> multi = LazyValue.create(() -> {
return MultiTappedPublisher.builder(Multi.create(publisherFuture.get()).flatMap(Function.identity()))
.onCancelCallback(() -> failMarker(new CancellationException("Multi was cancelled")))
.onCompleteCallback(this::completeMarker)
.onErrorCallback(this::failMarker)
.onNextCallback(it -> hasData.set(true))
.build();
});
@Override
public CompletionStage<Void> execute() {
publisherFuture.get().complete(supplier.get());
return completionMarker.get();
}
@Override
public Multi<T> result() {
return multi.get();
}
@Override
public Multi<T> error(Throwable throwable) {
return Multi.error(throwable);
}
@Override
public String toString() {
return "multi:" + System.identityHashCode(this);
}
@Override
public boolean hadData() {
return hasData.get();
}
private void failMarker(Throwable throwable) {
if (completed.compareAndSet(false, true)) {
completionMarker.get().completeExceptionally(throwable);
}
}
private void completeMarker() {
if (completed.compareAndSet(false, true)) {
completionMarker.get().complete(null);
}
}
};
}
static <T> DelayedTask<Single<T>> createSingle(Supplier<? extends CompletionStage<T>> supplier) {
return new DelayedTask<>() {
// future we returned as a result of invoke command
final LazyValue<CompletableFuture<T>> resultFuture = LazyValue.create(CompletableFuture::new);
@Override
public CompletionStage<Void> execute() {
CompletionStage<T> result = null;
try {
result = supplier.get();
} catch (Exception e) {
return CompletableFuture.failedStage(e);
}
CompletableFuture<T> future = resultFuture.get();
result.handle((it, throwable) -> {
if (throwable == null) {
future.complete(it);
} else {
future.completeExceptionally(throwable);
}
return null;
});
return result.thenRun(() -> {});
}
@Override
public Single<T> result() {
return Single.create(resultFuture.get());
}
@Override
public Single<T> error(Throwable throwable) {
return Single.error(throwable);
}
@Override
public String toString() {
return "single:" + System.identityHashCode(this);
}
};
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.faulttolerance;
import java.util.Set;
@FunctionalInterface
interface ErrorChecker {
boolean shouldSkip(Throwable throwable);
static ErrorChecker create(Set<Class<? extends Throwable>> skipOnSet, Set<Class<? extends Throwable>> applyOnSet) {
Set<Class<? extends Throwable>> skipOn = Set.copyOf(skipOnSet);
Set<Class<? extends Throwable>> applyOn = Set.copyOf(applyOnSet);
if (skipOn.isEmpty()) {
if (applyOn.isEmpty()) {
return throwable -> false;
} else {
return throwable -> !applyOn.contains(throwable.getClass());
}
} else {
if (applyOn.isEmpty()) {
return throwable -> skipOn.contains(throwable.getClass());
} else {
throw new IllegalArgumentException("You have defined both skip and apply set of exception classes. "
+ "This cannot be correctly handled; skipOn: " + skipOn
+ " applyOn: " + applyOn);
}
}
}
}

View File

@@ -16,62 +16,171 @@
package io.helidon.faulttolerance;
import java.util.Objects;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.function.Supplier;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
public class Fallback<T> implements TypedHandler<T> {
private final Function<Throwable, ? extends CompletionStage<T>> fallback;
private Fallback(Builder<T> builder) {
this.fallback = builder.fallback;
}
public static <T> Builder<T> builder() {
/**
* Fallback allows the user to execute an alternative supplier of results in case the usual one fails.
* <p>
* In case you call the {@link #invokeMulti(java.util.function.Supplier)} method, the following restriction applies:
* <ul>
* <li>In case at least one record was sent (one {@code onNext} was called), the fallback will not trigger.</li>
* </ul>
* You may provide fallback for both a {@link io.helidon.common.reactive.Multi} and a {@link io.helidon.common.reactive.Single}.
* If none is provided, the method is executed without fallback.
* @param <T> type of the values returned
*/
public interface Fallback<T> extends TypedHandler<T> {
/**
* A builder to customize {@link Fallback}.
*
* @param <T> type of the values returned by the failing method
* @return a new builder
*/
static <T> Builder<T> builder() {
return new Builder<>();
}
@Override
public Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
CompletableFuture<T> future = new CompletableFuture<>();
supplier.get()
.thenAccept(future::complete)
.exceptionally(throwable -> {
Throwable cause = FaultTolerance.getCause(throwable);
fallback.apply(cause)
.thenAccept(future::complete)
.exceptionally(t2 -> {
Throwable cause2 = FaultTolerance.getCause(t2);
cause2.addSuppressed(throwable);
future.completeExceptionally(cause2);
return null;
});
return null;
});
return Single.create(future);
/**
* Create a fallback for a {@link Single} or {@link java.util.concurrent.CompletionStage}.
*
* @param fallback fallback supplier to obtain the alternative result
* @param <T> type of the result
* @return a new fallback
*/
static <T> Fallback<T> create(Function<Throwable, ? extends CompletionStage<T>> fallback) {
Builder<T> builder = builder();
return builder.fallback(fallback).build();
}
public static class Builder<T> implements io.helidon.common.Builder<Fallback<T>> {
private Function<Throwable, ? extends CompletionStage<T>> fallback;
/**
* Create a fallback for a {@link Multi} or {@link java.util.concurrent.Flow.Publisher}.
*
* @param fallback fallback supplier to obtain the alternative result
* @param <T> type of the result
* @return a new fallback
*/
static <T> Fallback<T> createMulti(Function<Throwable, ? extends CompletionStage<T>> fallback) {
Builder<T> builder = builder();
return builder.fallback(fallback).build();
}
/**
* Fluent API builder for {@link io.helidon.faulttolerance.Fallback}.
*
* @param <T> type of the values returned
*/
class Builder<T> implements io.helidon.common.Builder<Fallback<T>> {
private final Set<Class<? extends Throwable>> applyOn = new HashSet<>();
private final Set<Class<? extends Throwable>> skipOn = new HashSet<>();
private Function<Throwable, ? extends CompletionStage<T>> fallback = CompletableFuture::failedFuture;
private Function<Throwable, ? extends Flow.Publisher<T>> fallbackMulti = Multi::error;
private Builder() {
}
@Override
public Fallback<T> build() {
Objects.requireNonNull(fallback, "Fallback method must be specified");
return new Fallback<>(this);
return new FallbackImpl<>(this);
}
/**
* Configure a fallback for a {@link Single} or {@link java.util.concurrent.CompletionStage}.
*
* @param fallback fallback supplier to obtain the alternative result
* @return updated builder instance
*/
public Builder<T> fallback(Function<Throwable, ? extends CompletionStage<T>> fallback) {
this.fallback = fallback;
return this;
}
/**
* Configure a fallback for a {@link Multi} or {@link java.util.concurrent.Flow.Publisher}.
*
* @param fallback fallback supplier to obtain the alternative result
* @return updated builder instance
*/
public Builder<T> fallbackMulti(Function<Throwable, ? extends Flow.Publisher<T>> fallback) {
this.fallbackMulti = fallback;
return this;
}
/**
* Apply fallback on these throwable classes.
* Cannot be combined with {@link #skipOn(Class[])}.
*
* @param classes classes to fallback on
* @return updated builder instance
*/
public Builder<T> applyOn(Class<? extends Throwable>... classes) {
applyOn.clear();
Arrays.stream(classes)
.forEach(this::addApplyOn);
return this;
}
/**
* Apply fallback on this throwable class.
*
* @param clazz class to fallback on
* @return updated builder instance
*/
public Builder<T> addApplyOn(Class<? extends Throwable> clazz) {
this.applyOn.add(clazz);
return this;
}
/**
* Do not apply fallback on these throwable classes.
* Cannot be combined with {@link #applyOn(Class[])}.
*
* @param classes classes not to fallback on
* @return updated builder instance
*/
public Builder<T> skipOn(Class<? extends Throwable>... classes) {
skipOn.clear();
Arrays.stream(classes)
.forEach(this::addSkipOn);
return this;
}
/**
* Do not apply fallback on this throwable class.
*
* @param clazz class not to fallback on
* @return updated builder instance
*/
public Builder<T> addSkipOn(Class<? extends Throwable> clazz) {
this.skipOn.add(clazz);
return this;
}
Set<Class<? extends Throwable>> applyOn() {
return applyOn;
}
Set<Class<? extends Throwable>> skipOn() {
return skipOn;
}
Function<Throwable, ? extends CompletionStage<T>> fallback() {
return fallback;
}
Function<Throwable, ? extends Flow.Publisher<T>> fallbackMulti() {
return fallbackMulti;
}
}
}

View File

@@ -0,0 +1,81 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.faulttolerance;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.function.Supplier;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
class FallbackImpl<T> implements Fallback<T> {
private final Function<Throwable, ? extends CompletionStage<T>> fallback;
private final Function<Throwable, ? extends Flow.Publisher<T>> fallbackMulti;
private final ErrorChecker errorChecker;
FallbackImpl(Fallback.Builder<T> builder) {
this.fallback = builder.fallback();
this.fallbackMulti = builder.fallbackMulti();
this.errorChecker = ErrorChecker.create(builder.skipOn(), builder.applyOn());
}
@Override
public Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
DelayedTask<Multi<T>> delayedTask = DelayedTask.createMulti(supplier);
delayedTask.execute();
return delayedTask.result()
.onErrorResumeWith(throwable -> {
Throwable cause = FaultTolerance.cause(throwable);
if (delayedTask.hadData() || errorChecker.shouldSkip(cause)) {
return Multi.error(cause);
} else {
return Multi.create(fallbackMulti.apply(cause));
}
});
}
@Override
public Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
CompletableFuture<T> future = new CompletableFuture<>();
supplier.get()
.thenAccept(future::complete)
.exceptionally(throwable -> {
Throwable cause = FaultTolerance.cause(throwable);
if (errorChecker.shouldSkip(cause)) {
future.completeExceptionally(cause);
} else {
fallback.apply(cause)
.thenAccept(future::complete)
.exceptionally(t2 -> {
Throwable cause2 = FaultTolerance.cause(t2);
cause2.addSuppressed(throwable);
future.completeExceptionally(cause2);
return null;
});
}
return null;
});
return Single.create(future);
}
}

View File

@@ -23,6 +23,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -31,6 +32,7 @@ import java.util.function.Supplier;
import io.helidon.common.LazyValue;
import io.helidon.common.configurable.ScheduledThreadPoolSupplier;
import io.helidon.common.configurable.ThreadPoolSupplier;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import io.helidon.config.Config;
@@ -116,12 +118,12 @@ public final class FaultTolerance {
return new Builder();
}
static Throwable getCause(Throwable throwable) {
static Throwable cause(Throwable throwable) {
if (throwable instanceof CompletionException) {
return getCause(throwable.getCause());
return cause(throwable.getCause());
}
if (throwable instanceof ExecutionException) {
return getCause(throwable.getCause());
return cause(throwable.getCause());
}
return throwable;
}
@@ -168,7 +170,7 @@ public final class FaultTolerance {
@Override
public void add(Handler ft) {
fts.add(ft::invoke);
fts.add(new TypedWrapper(ft));
}
public TypedBuilder<T> addFallback(Fallback<T> fallback) {
@@ -179,7 +181,7 @@ public final class FaultTolerance {
private TypedBuilder<T> builder(Builder builder) {
builder.fts
.forEach(it -> {
fts.add(it::invoke);
fts.add(new TypedWrapper(it));
});
return this;
}
@@ -191,6 +193,18 @@ public final class FaultTolerance {
this.validFts = new LinkedList<>(validFts);
}
@Override
public Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
Supplier<? extends Flow.Publisher<T>> next = supplier;
for (TypedHandler<T> validFt : validFts) {
final var finalNext = next;
next = () -> validFt.invokeMulti(finalNext);
}
return Multi.create(next.get());
}
@Override
public Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
Supplier<? extends CompletionStage<T>> next = supplier;
@@ -203,6 +217,24 @@ public final class FaultTolerance {
return Single.create(next.get());
}
}
private class TypedWrapper implements TypedHandler<T> {
private final Handler handler;
private TypedWrapper(Handler handler) {
this.handler = handler;
}
@Override
public Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
return handler.invoke(supplier);
}
@Override
public Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
return handler.invokeMulti(supplier);
}
}
}
public static class Builder extends BaseBuilder<Builder> implements io.helidon.common.Builder<Handler> {
@@ -234,6 +266,17 @@ public final class FaultTolerance {
this.validFts = new LinkedList<>(validFts);
}
@Override
public <T> Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
Supplier<? extends Flow.Publisher<T>> next = supplier;
for (Handler validFt : validFts) {
final var finalNext = next;
next = () -> validFt.invokeMulti(finalNext);
}
return Multi.create(next.get());
}
@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
Supplier<? extends CompletionStage<T>> next = supplier;

View File

@@ -17,11 +17,13 @@
package io.helidon.faulttolerance;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Supplier;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
@FunctionalInterface
public interface Handler {
<T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier);
<T> Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier);
}

View File

@@ -19,155 +19,47 @@ package io.helidon.faulttolerance;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
import io.helidon.common.reactive.Single;
public class Retry implements Handler {
private final int calls;
private final long delayMillis;
private final long maxTimeNanos;
private final int jitterMillis;
private final LazyValue<? extends ScheduledExecutorService> scheduledExecutor;
private final Random random = new Random();
protected Retry(Builder builder) {
this.calls = builder.calls;
this.delayMillis = builder.delay.toMillis();
this.maxTimeNanos = builder.maxTime.toNanos();
long jitter = builder.jitter.toMillis() * 2;
this.jitterMillis = (jitter > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) jitter;
this.scheduledExecutor = builder.scheduledExecutor;
}
public static Builder builder() {
/**
* Retry supports retry policies to be applied on an execution of asynchronous tasks.
* <p>
* In case you call the {@link #invokeMulti(java.util.function.Supplier)} method, the following restriction applies:
* <ul>
* <li>In case at least one record was sent (one {@code onNext} was called), the retry will not trigger.</li>
* </ul>
*/
public interface Retry extends Handler {
/**
* A new builder to customize {@code Retry} configuration.
*
* @return a new builder
*/
static Builder builder() {
return new Builder();
}
@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
CompletableFuture<T> future = new CompletableFuture<>();
/**
* Fluent API builder for {@link io.helidon.faulttolerance.Retry}.
*/
class Builder implements io.helidon.common.Builder<Retry> {
private final Set<Class<? extends Throwable>> applyOn = new HashSet<>();
private final Set<Class<? extends Throwable>> skipOn = new HashSet<>();
new Retrier<>(future, supplier, this)
.retry();
private RetryPolicy retryPolicy = JitterRetryPolicy.builder()
.calls(4)
.delay(Duration.ofMillis(200))
.jitter(Duration.ofMillis(50))
.build();
return Single.create(future);
}
protected boolean abort(Throwable throwable) {
return false;
}
private long nextDelay() {
long delay = delayMillis;
int jitterRandom = random.nextInt(jitterMillis) - jitterMillis;
delay = delay + jitterRandom;
delay = Math.max(0, delay);
return delay;
}
private class Retrier<T> {
private final AtomicInteger count = new AtomicInteger();
private final AtomicReference<Throwable> lastThrowable = new AtomicReference<>();
private final Supplier<? extends CompletionStage<T>> supplier;
private final CompletableFuture<T> future;
private final Retry retry;
private final long started = System.nanoTime();
private Retrier(CompletableFuture<T> future,
Supplier<? extends CompletionStage<T>> supplier,
Retry retry) {
this.future = future;
this.supplier = supplier;
this.retry = retry;
}
private void retry() {
int currentCount = count.incrementAndGet();
CompletionStage<T> stage = null;
try {
stage = supplier.get();
} catch (Throwable e) {
stage = CompletableFuture.failedStage(e);
}
stage.handle((it, throwable) -> {
if (throwable == null) {
future.complete(it);
} else {
Throwable current = FaultTolerance.getCause(throwable);
Throwable before = lastThrowable.get();
if (before != null) {
current.addSuppressed(before);
}
long now = System.nanoTime();
if (currentCount >= retry.calls || retry.abort(current)) {
// this is final execution
future.completeExceptionally(current);
} else if (now - started > maxTimeNanos) {
TimeoutException timedOut = new TimeoutException("Retries timed out");
timedOut.addSuppressed(current);
future.completeExceptionally(timedOut);
} else {
lastThrowable.set(current);
// schedule next retry
scheduledExecutor.get().schedule(this::retry, nextDelay(), TimeUnit.MILLISECONDS);
}
}
return null;
});
}
}
private static class RetryWithAbortOn extends Retry {
private final Set<Class<? extends Throwable>> abortOn;
private RetryWithAbortOn(Builder builder) {
super(builder);
this.abortOn = Set.copyOf(builder.abortOn);
}
@Override
protected boolean abort(Throwable throwable) {
return abortOn.contains(throwable.getClass());
}
}
private static class RetryWithRetryOn extends Retry {
private final Set<Class<? extends Throwable>> retryOn;
private RetryWithRetryOn(Builder builder) {
super(builder);
this.retryOn = Set.copyOf(builder.retryOn);
}
@Override
protected boolean abort(Throwable throwable) {
return !retryOn.contains(throwable.getClass());
}
}
public static class Builder implements io.helidon.common.Builder<Retry> {
private final Set<Class<? extends Throwable>> retryOn = new HashSet<>();
private final Set<Class<? extends Throwable>> abortOn = new HashSet<>();
private int calls = 3;
private Duration delay = Duration.ofMillis(200);
private Duration maxTime = Duration.ofSeconds(1);
private Duration jitter = Duration.ofMillis(50);
private Duration overallTimeout = Duration.ofSeconds(1);
private LazyValue<? extends ScheduledExecutorService> scheduledExecutor = FaultTolerance.scheduledExecutor();
private Builder() {
@@ -175,78 +67,359 @@ public class Retry implements Handler {
@Override
public Retry build() {
calls = Math.max(1, calls);
if (retryOn.isEmpty()) {
if (abortOn.isEmpty()) {
return new Retry(this);
} else {
return new RetryWithAbortOn(this);
}
} else {
if (abortOn.isEmpty()) {
return new RetryWithRetryOn(this);
} else {
throw new IllegalArgumentException("You have defined both retryOn and abortOn exceptions. "
+ "This cannot be correctly handled; abortOn: " + abortOn
+ " retryOn: " + retryOn);
}
}
return new RetryImpl(this);
}
/**
* Total number of calls (first + retries).
* @param calls how many times to call the method
* Configure a retry policy to use to calculate delays between retries.
* Defaults to a {@link io.helidon.faulttolerance.Retry.JitterRetryPolicy}
* with 4 calls (initial call + 3 retries), delay of 200 millis and a jitter of 50 millis.
*
* @param policy retry policy
* @return updated builder instance
*/
public Builder calls(int calls) {
this.calls = calls;
public Builder retryPolicy(RetryPolicy policy) {
this.retryPolicy = policy;
return this;
}
public Builder delay(Duration delay) {
this.delay = delay;
return this;
}
public Builder maxTime(Duration maxTime) {
this.maxTime = maxTime;
return this;
}
public Builder jitter(Duration jitter) {
this.jitter = jitter;
return this;
}
public Builder retryOn(Class<? extends Throwable>... classes) {
retryOn.clear();
/**
* These throwables will be considered failures, and all other will not.
* <p>
* Cannot be combined with {@link #skipOn}.
*
* @param classes to consider failures and trigger a retry
* @return updated builder instance
*/
public Builder applyOn(Class<? extends Throwable>... classes) {
applyOn.clear();
Arrays.stream(classes)
.forEach(this::addRetryOn);
.forEach(this::addApplyOn);
return this;
}
public Builder addRetryOn(Class<? extends Throwable> clazz) {
this.retryOn.add(clazz);
/**
* Add a throwable to be considered a failure.
*
* @param clazz to consider failure and trigger a retry
* @return updated builder instance
* @see #applyOn
*/
public Builder addApplyOn(Class<? extends Throwable> clazz) {
this.applyOn.add(clazz);
return this;
}
public Builder abortOn(Class<? extends Throwable>... classes) {
abortOn.clear();
/**
* These throwables will not be considered retriable, all other will.
* <p>
* Cannot be combined with {@link #applyOn}.
*
* @param classes to skip retries
* @return updated builder instance
*/
public Builder skipOn(Class<? extends Throwable>... classes) {
skipOn.clear();
Arrays.stream(classes)
.forEach(this::addAbortOn);
.forEach(this::addSkipOn);
return this;
}
public Builder addAbortOn(Class<? extends Throwable> clazz) {
this.abortOn.add(clazz);
/**
* This throwable will not be considered retriable.
* <p>
*
* @param clazz to to skip retries
* @return updated builder instance
*/
public Builder addSkipOn(Class<? extends Throwable> clazz) {
this.skipOn.add(clazz);
return this;
}
/**
* Executor service to schedule retries.
* By default uses an executor configured on
* {@link io.helidon.faulttolerance.FaultTolerance#scheduledExecutor(java.util.function.Supplier)}.
*
* @param scheduledExecutor executor to use
* @return updated builder instance
*/
public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
this.scheduledExecutor = LazyValue.create(scheduledExecutor);
return this;
}
/**
* Overall timeout.
* When overall timeout is reached, execution terminates (even if the retry policy
* was not exhausted).
*
* @param overallTimeout an overall timeout
* @return updated builder instance
*/
public Builder overallTimeout(Duration overallTimeout) {
this.overallTimeout = overallTimeout;
return this;
}
Set<Class<? extends Throwable>> applyOn() {
return applyOn;
}
Set<Class<? extends Throwable>> skipOn() {
return skipOn;
}
RetryPolicy retryPolicy() {
return retryPolicy;
}
Duration overallTimeout() {
return overallTimeout;
}
LazyValue<? extends ScheduledExecutorService> scheduledExecutor() {
return scheduledExecutor;
}
}
/**
* Retry policy to handle delays between retries.
* The implementation must not save state, as a single instance
* will be used by multiple threads and executions in parallel.
*/
interface RetryPolicy {
/**
* Return next delay in milliseconds, or an empty optional to finish retries.
*
* @param firstCallMillis milliseconds recorded before the first call using {@link System#currentTimeMillis()}
* @param lastDelay last delay that was used (0 for the first failed call)
* @param call call index (0 for the first failed call)
* @return how long to wait before trying again, or empty to notify this is the end of retries
*/
Optional<Long> nextDelayMillis(long firstCallMillis, long lastDelay, int call);
}
/**
* A retry policy that prolongs the delays between retries by a defined factor.
* <p>
* Consider the following setup:
* <ul>
* <li>{@code calls = 4}</li>
* <li>{@code delayMillis = 100}</li>
* <li>{@code factor = 2.0}</li>
* </ul>
* The following delays will be used for each call:
*
* <ul>
* <li>Initial call - always immediate (not handled by retry policy)</li>
* <li>First retry - 100 millis</li>
* <li>Second retry - 200 millis (previous delay * factor)</li>
* <li>Third retry - 400 millis (previous delay * factor)</li>
* </ul>
*/
class DelayingRetryPolicy implements RetryPolicy {
private final int calls;
private final long delayMillis;
private final double delayFactor;
private DelayingRetryPolicy(Builder builder) {
this.calls = builder.calls;
this.delayMillis = builder.delay.toMillis();
this.delayFactor = builder.delayFactor;
}
/**
* A builder to customize configuration of {@link io.helidon.faulttolerance.Retry.DelayingRetryPolicy}.
*
* @return a new builder
*/
public static Builder builder() {
return new Builder();
}
/**
* Create a retry policy with no delays and with the specified number of calls.
*
* @param calls number of calls to execute (retries + initial call)
* @return a no delay retry policy
*/
public static DelayingRetryPolicy noDelay(int calls) {
return builder()
.delay(Duration.ZERO)
.delayFactor(0)
.calls(calls)
.build();
}
@Override
public Optional<Long> nextDelayMillis(long firstCallMillis, long lastDelay, int call) {
if (call >= calls) {
return Optional.empty();
}
if (call == 0) {
return Optional.of(delayMillis);
}
return Optional.of((long) (lastDelay * delayFactor));
}
/**
* Fluent API builder for {@link io.helidon.faulttolerance.Retry.DelayingRetryPolicy}.
*/
public static class Builder implements io.helidon.common.Builder<DelayingRetryPolicy> {
private int calls = 3;
private double delayFactor = 2;
private Duration delay = Duration.ofMillis(200);
@Override
public DelayingRetryPolicy build() {
return new DelayingRetryPolicy(this);
}
/**
* Total number of calls (first + retries).
*
* @param calls how many times to call the method
* @return updated builder instance
*/
public Builder calls(int calls) {
this.calls = calls;
return this;
}
/**
* Base delay between the invocations.
*
* @param delay delay between the invocations
* @return updated builder instance
*/
public Builder delay(Duration delay) {
this.delay = delay;
return this;
}
/**
* A delay multiplication factor.
*
* @param delayFactor a delay multiplication factor
* @return updated builder instance
*/
public Builder delayFactor(double delayFactor) {
this.delayFactor = delayFactor;
return this;
}
}
}
/**
* A retry policy that randomizes delays between execution using a "jitter" time.
* <p>
* Consider the following setup:
* <ul>
* <li>{@code calls = 4}</li>
* <li>{@code delayMillis = 100}</li>
* <li>{@code jitter = 50}</li>
* </ul>
* The following delays will be used for each call:
*
* <ul>
* <li>Initial call - always immediate (not handled by retry policy)</li>
* <li>First retry: 50 - 150 millis (delay +- Random.nextInt(jitter)</li>
* <li>Second retry: 50 - 150 millis</li>
* <li>Third retry: 50 - 150 millis</li>
* </ul>
*/
class JitterRetryPolicy implements RetryPolicy {
private final int calls;
private final long delayMillis;
private final Supplier<Integer> randomJitter;
private JitterRetryPolicy(Builder builder) {
this.calls = builder.calls;
this.delayMillis = builder.delay.toMillis();
long jitter = builder.jitter.toMillis();
int jitterMillis = (jitter > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) jitter;
if (jitterMillis == 0) {
randomJitter = () -> 0;
} else {
Random random = new Random();
// need a number [-jitterMillis,+jitterMillis]
randomJitter = () -> random.nextInt(jitterMillis * 2) - jitterMillis;
}
}
public static Builder builder() {
return new Builder();
}
@Override
public Optional<Long> nextDelayMillis(long firstCallNanos, long lastDelay, int call) {
if (call >= calls) {
return Optional.empty();
}
long delay = delayMillis;
int jitterRandom = randomJitter.get();
delay = delay + jitterRandom;
delay = Math.max(0, delay);
return Optional.of(delay);
}
/**
* Fluent API builder for {@link io.helidon.faulttolerance.Retry.JitterRetryPolicy}.
*/
public static class Builder implements io.helidon.common.Builder<JitterRetryPolicy> {
private int calls = 3;
private Duration delay = Duration.ofMillis(200);
private Duration jitter = Duration.ofMillis(50);
private Builder() {
}
@Override
public JitterRetryPolicy build() {
return new JitterRetryPolicy(this);
}
/**
* Total number of calls (first + retries).
* @param calls how many times to call the method
* @return updated builder instance
*/
public Builder calls(int calls) {
this.calls = calls;
return this;
}
/**
* Base delay between the invocations.
*
* @param delay delay between the invocations
* @return updated builder instance
*/
public Builder delay(Duration delay) {
this.delay = delay;
return this;
}
/**
* Random part of the delay.
* A number between {@code [-jitter,+jitter]} is applied to delay each time
* delay is calculated.
*
* @param jitter jitter duration
* @return updated builder instance
*/
public Builder jitter(Duration jitter) {
this.jitter = jitter;
return this;
}
}
}
}

View File

@@ -0,0 +1,158 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.faulttolerance;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
class RetryImpl implements Retry {
private final LazyValue<? extends ScheduledExecutorService> scheduledExecutor;
private final ErrorChecker errorChecker;
private final long maxTimeNanos;
private final Retry.RetryPolicy retryPolicy;
RetryImpl(Retry.Builder builder) {
this.scheduledExecutor = builder.scheduledExecutor();
this.errorChecker = ErrorChecker.create(builder.skipOn(), builder.applyOn());
this.maxTimeNanos = builder.overallTimeout().toNanos();
this.retryPolicy = builder.retryPolicy();
}
@Override
public <T> Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
return retryMulti(new RetryContext<>(supplier));
}
@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
return retrySingle(new RetryContext<>(supplier));
}
private <T> Single<T> retrySingle(RetryContext<? extends CompletionStage<T>> context) {
long delay = 0;
if (context.count.get() != 0) {
Optional<Long> maybeDelay = retryPolicy.nextDelayMillis(context.startedMillis,
context.lastDelay.get(),
context.count.getAndIncrement());
if (maybeDelay.isEmpty()) {
return Single.error(context.throwable());
}
delay = maybeDelay.get();
}
long nanos = System.nanoTime() - context.startedNanos;
if (nanos > maxTimeNanos) {
return Single.error(new TimeoutException("Execution took too long. Already executing: "
+ TimeUnit.NANOSECONDS.toMillis(nanos) + " ms, must timeout after: "
+ TimeUnit.NANOSECONDS.toMillis(maxTimeNanos) + " ms."));
}
DelayedTask<Single<T>> task = DelayedTask.createSingle(context.supplier);
if (delay == 0) {
task.execute();
} else {
scheduledExecutor.get().schedule(task::execute, delay, TimeUnit.MILLISECONDS);
}
return task.result()
.onErrorResumeWithSingle(throwable -> {
Throwable cause = FaultTolerance.cause(throwable);
context.thrown.add(cause);
if (errorChecker.shouldSkip(cause)) {
return Single.error(context.throwable());
}
return retrySingle(context);
});
}
private <T> Multi<T> retryMulti(RetryContext<? extends Flow.Publisher<T>> context) {
long delay = 0;
if (context.count.get() != 0) {
Optional<Long> maybeDelay = retryPolicy.nextDelayMillis(context.startedMillis,
context.lastDelay.get(),
context.count.getAndIncrement());
if (maybeDelay.isEmpty()) {
return Multi.error(context.throwable());
}
delay = maybeDelay.get();
}
long nanos = System.nanoTime() - context.startedNanos;
if (nanos > maxTimeNanos) {
return Multi.error(new TimeoutException("Execution took too long. Already executing: "
+ TimeUnit.NANOSECONDS.toMillis(nanos) + " ms, must timeout after: "
+ TimeUnit.NANOSECONDS.toMillis(maxTimeNanos) + " ms."));
}
DelayedTask<Multi<T>> task = DelayedTask.createMulti(context.supplier);
if (delay == 0) {
task.execute();
} else {
scheduledExecutor.get().schedule(task::execute, delay, TimeUnit.MILLISECONDS);
}
return task.result()
.onErrorResumeWith(throwable -> {
Throwable cause = FaultTolerance.cause(throwable);
context.thrown.add(cause);
if (task.hadData() || errorChecker.shouldSkip(cause)) {
return Multi.error(context.throwable());
}
return retryMulti(context);
});
}
private static class RetryContext<U> {
// retry runtime
private final long startedMillis = System.currentTimeMillis();
private final long startedNanos = System.nanoTime();
private final AtomicInteger count = new AtomicInteger();
private final List<Throwable> thrown = new LinkedList<>();
private final AtomicLong lastDelay = new AtomicLong();
private final Supplier<U> supplier;
RetryContext(Supplier<U> supplier) {
this.supplier = supplier;
}
Throwable throwable() {
if (thrown.isEmpty()) {
return new IllegalStateException("Exception list is empty");
}
Throwable last = thrown.get(thrown.size() - 1);
for (int i = 0; i < thrown.size() - 1; i++) {
last.addSuppressed(thrown.get(i));
}
return last;
}
}
}

View File

@@ -18,11 +18,13 @@ package io.helidon.faulttolerance;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
public class Timeout implements Handler {
@@ -38,6 +40,11 @@ public class Timeout implements Handler {
return new Builder();
}
@Override
public <T> Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
return null;
}
@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
return Single.create(supplier.get())

View File

@@ -17,11 +17,13 @@
package io.helidon.faulttolerance;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Supplier;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
@FunctionalInterface
public interface TypedHandler<T> {
Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier);
Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier);
}

View File

@@ -1,4 +1,5 @@
module io.helidon.faulttolerance {
requires io.helidon.config;
requires io.helidon.common.configurable;
requires java.logging;
}

View File

@@ -16,6 +16,7 @@
package io.helidon.faulttolerance;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -63,6 +64,10 @@ class AsyncTest {
assertThat(cause, instanceOf(MyException.class));
}
private List<String> syncList() {
return List.of("hi", "there");
}
private String syncError() {
throw new MyException();
}

View File

@@ -16,17 +16,24 @@
package io.helidon.faulttolerance;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import io.helidon.common.LogConfig;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -34,11 +41,50 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
class BulkheadTest {
@BeforeAll
static void setupTest() {
LogConfig.configureRuntime();
}
@Test
void testBulkheadQueue() throws InterruptedException {
Bulkhead bulkhead = Bulkhead.builder()
.limit(1)
.queueLength(1000)
.build();
Request inProgress = new Request(0);
Single<Integer> inProgressResult = bulkhead.invoke(inProgress::invoke);
Request[] aLotRequests = new Request[999];
Single[] aLotResults = new Single[999];
for (int i = 0; i < aLotRequests.length; i++) {
Request req = new Request(i);
aLotRequests[i] = req;
aLotResults[i] = bulkhead.invoke(req::invoke);
}
for (Request req : aLotRequests) {
req.releaseCdl.countDown();
}
inProgress.releaseCdl.countDown();
if (inProgress.invokedCdl.await(1, TimeUnit.SECONDS)) {
for (Single result : aLotResults) {
result.await(1, TimeUnit.SECONDS);
}
} else {
fail("Should have invoked the first");
}
}
@Test
void testBulkhead() throws InterruptedException {
String name = "unit:testBulkhead";
Bulkhead bulkhead = Bulkhead.builder()
.limit(1)
.queueLength(1)
.name(name)
.build();
Request inProgress = new Request(0);
@@ -79,11 +125,11 @@ class BulkheadTest {
assertThat(cause, notNullValue());
assertThat(cause, instanceOf(BulkheadException.class));
assertThat(cause.getMessage(), is("Bulkhead queue is full"));
assertThat(cause.getMessage(), is("Bulkhead queue \"" + name + "\" is full"));
}
@Test
void testBulkheadWithError() throws InterruptedException {
void testBulkheadWithError() {
Bulkhead bulkhead = Bulkhead.builder()
.limit(1)
.queueLength(1)
@@ -99,7 +145,59 @@ class BulkheadTest {
inProgress.releaseCdl.countDown();
FaultToleranceTest.completionException(result, IllegalStateException.class);
}
@Test
void testBulkheadWithMulti() {
Bulkhead bulkhead = Bulkhead.builder()
.limit(1)
.queueLength(1)
.build();
Single<Object> result = bulkhead.invoke(() -> Single.error(new IllegalStateException()));
FaultToleranceTest.completionException(result, IllegalStateException.class);
MultiRequest inProgress = new MultiRequest(0, 5);
Multi<Integer> multi = bulkhead.invokeMulti(inProgress::invoke);
// queued
result = bulkhead.invoke(() -> Single.error(new IllegalStateException()));
inProgress.releaseCdl.countDown();
List<Integer> allInts = multi.collectList()
.await(1, TimeUnit.SECONDS);
assertThat(allInts, contains(0, 1, 2, 3, 4));
FaultToleranceTest.completionException(result, IllegalStateException.class);
}
private static class MultiRequest {
private final CountDownLatch releaseCdl = new CountDownLatch(1);
private final CountDownLatch invokedCdl = new CountDownLatch(1);
private final AtomicBoolean invoked = new AtomicBoolean();
private final AtomicBoolean indexed = new AtomicBoolean();
private final int index;
private int count;
MultiRequest(int index, int count) {
this.index = index;
this.count = count;
}
Flow.Publisher<Integer> invoke() {
invoked.set(true);
invokedCdl.countDown();
return FaultTolerance.async(this::index)
.flatMap(it -> Multi.create(IntStream.range(it, it + count).boxed()));
}
private int index() {
try {
releaseCdl.await();
} catch (InterruptedException e) {
}
indexed.set(true);
return index;
}
}
private static class Request {

View File

@@ -17,16 +17,19 @@
package io.helidon.faulttolerance;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -37,7 +40,7 @@ class CircuitBreakerTest {
void testCircuitBreaker() throws InterruptedException {
CircuitBreaker breaker = CircuitBreaker.builder()
.volume(10)
.ratio(20)
.errorRatio(20)
.delay(Duration.ofMillis(200))
.successThreshold(2)
.build();
@@ -48,19 +51,21 @@ class CircuitBreakerTest {
bad(breaker);
good(breaker);
goodMulti(breaker);
// should open the breaker
bad(breaker);
breakerOpen(breaker);
breakerOpenMulti(breaker);
assertThat(breaker.state(), is(CircuitBreaker.State.OPEN));
// need to wait until half open
int count = 0;
while(count++ < 10) {
while (count++ < 10) {
Thread.sleep(50);
if(breaker.state() == CircuitBreaker.State.HALF_OPEN) {
if (breaker.state() == CircuitBreaker.State.HALF_OPEN) {
break;
}
}
@@ -80,15 +85,15 @@ class CircuitBreakerTest {
// need to wait until half open
count = 0;
while(count++ < 10) {
while (count++ < 10) {
Thread.sleep(50);
if(breaker.state() == CircuitBreaker.State.HALF_OPEN) {
if (breaker.state() == CircuitBreaker.State.HALF_OPEN) {
break;
}
}
good(breaker);
bad(breaker);
badMulti(breaker);
assertThat(breaker.state(), is(CircuitBreaker.State.OPEN));
}
@@ -104,6 +109,18 @@ class CircuitBreakerTest {
assertThat(cause, instanceOf(CircuitBreakerOpenException.class));
}
private void breakerOpenMulti(CircuitBreaker breaker) {
Multi<Integer> good = Multi.just(0, 1, 2);
Multi<Integer> result = breaker.invokeMulti(() -> good);
CompletionException exception = assertThrows(CompletionException.class,
() -> result.collectList().await(1, TimeUnit.SECONDS));
Throwable cause = exception.getCause();
assertThat(cause, notNullValue());
assertThat(cause, instanceOf(CircuitBreakerOpenException.class));
}
private void bad(CircuitBreaker breaker) {
Failing failing = new Failing(new IllegalStateException("Fail"));
Single<Integer> failedResult = breaker.invoke(failing::invoke);
@@ -116,12 +133,34 @@ class CircuitBreakerTest {
}
private void badMulti(CircuitBreaker breaker) {
Multi<Integer> failing = Multi.error(new IllegalStateException("Fail"));
Multi<Integer> failedResult = breaker.invokeMulti(() -> failing);
CompletionException exception = assertThrows(CompletionException.class,
() -> failedResult.collectList().await(1, TimeUnit.SECONDS));
Throwable cause = exception.getCause();
assertThat(cause, notNullValue());
assertThat(cause, instanceOf(IllegalStateException.class));
}
private void good(CircuitBreaker breaker) {
Request good = new Request();
Single<Integer> result = breaker.invoke(good::invoke);
result.await(1, TimeUnit.SECONDS);
}
private void goodMulti(CircuitBreaker breaker) {
Multi<Integer> good = Multi.just(0, 1, 2);
Multi<Integer> result = breaker.invokeMulti(() -> good);
List<Integer> list = result.collectList().await(1, TimeUnit.SECONDS);
assertThat(list, contains(0, 1, 2));
}
private static class Failing {
private final Exception exception;

View File

@@ -0,0 +1,69 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.faulttolerance;
import java.time.Duration;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import static io.helidon.config.testing.OptionalMatcher.empty;
import static io.helidon.config.testing.OptionalMatcher.value;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
public class DelayRetryPolicyTest {
@Test
void testDelay() {
Retry.DelayingRetryPolicy policy = Retry.DelayingRetryPolicy.builder()
.delay(Duration.ofMillis(100))
.calls(3)
.delayFactor(3)
.build();
long firstCall = System.nanoTime();
Optional<Long> aLong = policy.nextDelayMillis(firstCall, 0, 0);
assertThat(aLong, value(is(100L)));
aLong = policy.nextDelayMillis(firstCall, 100, 1);
assertThat(aLong, value(is(300L)));
aLong = policy.nextDelayMillis(firstCall, 100, 2); // should just apply factor on last delay
assertThat(aLong, value(is(300L)));
aLong = policy.nextDelayMillis(firstCall, 100, 3); // limit of calls
assertThat(aLong, is(empty()));
}
@Test
void testNoDelay() {
Retry.DelayingRetryPolicy policy = Retry.DelayingRetryPolicy.builder()
.delay(Duration.ZERO)
.calls(3)
.delayFactor(3)
.build();
long firstCall = System.nanoTime();
Optional<Long> aLong = policy.nextDelayMillis(firstCall, 0, 0);
assertThat(aLong, value(is(0L)));
aLong = policy.nextDelayMillis(firstCall, 0, 1);
assertThat(aLong, value(is(0L)));
aLong = policy.nextDelayMillis(firstCall, 0, 2); // should just apply factor on last delay
assertThat(aLong, value(is(0L)));
aLong = policy.nextDelayMillis(firstCall, 100, 3); // limit of calls
assertThat(aLong, is(empty()));
}
}

View File

@@ -0,0 +1,146 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.faulttolerance;
import java.time.Duration;
import java.util.Optional;
import io.helidon.faulttolerance.Retry.JitterRetryPolicy;
import org.junit.jupiter.api.Test;
import static io.helidon.config.testing.OptionalMatcher.empty;
import static io.helidon.config.testing.OptionalMatcher.present;
import static io.helidon.config.testing.OptionalMatcher.value;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
public class JitterRetryPolicyTest {
@Test
void testFixedDelay() {
JitterRetryPolicy policy = JitterRetryPolicy.builder()
.jitter(Duration.ZERO)
.delay(Duration.ofMillis(100))
.calls(3)
.build();
long firstCall = System.nanoTime();
Optional<Long> aLong = policy.nextDelayMillis(firstCall, 0, 0);
assertThat(aLong, value(is(100L)));
aLong = policy.nextDelayMillis(firstCall, aLong.get(), 1);
assertThat(aLong, value(is(100L)));
}
@Test
void testRepeats() {
JitterRetryPolicy policy = JitterRetryPolicy.builder()
.jitter(Duration.ZERO)
.delay(Duration.ofMillis(100))
.calls(3)
.build();
for (int i = 0; i < 100; i++) {
long firstCall = System.nanoTime();
// running in cycle to ensure we do not store state
assertThat(policy.nextDelayMillis(firstCall, 0, 1), value(is(100L)));
assertThat(policy.nextDelayMillis(firstCall, 0, 2), value(is(100L)));
assertThat(policy.nextDelayMillis(firstCall, 0, 3), is(empty()));
assertThat(policy.nextDelayMillis(firstCall, 0, 1), value(is(100L)));
assertThat(policy.nextDelayMillis(firstCall, 0, 3), is(empty()));
}
}
@Test
void testRandomDelay() {
JitterRetryPolicy policy = JitterRetryPolicy.builder()
.jitter(Duration.ofMillis(50))
.delay(Duration.ofMillis(100))
.calls(3)
.build();
long firstCall = System.nanoTime();
boolean hadNegative = false;
boolean hadPositive = false;
for (int i = 0; i < 10000; i++) {
Optional<Long> aLong = policy.nextDelayMillis(firstCall, 0, 0);
assertThat(aLong, present());
long value = aLong.get();
assertThat(value, is(both(greaterThan(49L)).and(lessThan(151L))));
if (value < 100) {
hadNegative = true;
} else if (value > 100) {
hadPositive = true;
}
}
assertThat("In 10000 tries we should get at least one negative jitter", hadNegative, is(true));
assertThat("In 10000 tries we should get at least one positive jitter", hadPositive, is(true));
}
@Test
void testNoDelayJitter() {
JitterRetryPolicy policy = JitterRetryPolicy.builder()
.jitter(Duration.ofMillis(50))
.delay(Duration.ZERO)
.calls(3)
.build();
long firstCall = System.nanoTime();
boolean hadPositive = false;
boolean hadZero = false;
for (int i = 0; i < 10000; i++) {
Optional<Long> aLong = policy.nextDelayMillis(firstCall, 0, 0);
assertThat(aLong, present());
long value = aLong.get();
assertThat(value, is(both(greaterThan(-1L)).and(lessThan(50L))));
if (value == 0) {
hadZero = true;
} else if (value > 0) {
hadPositive = true;
}
}
assertThat("In 10000 tries we should get at least one negative jitter", hadZero, is(true));
assertThat("In 10000 tries we should get at least one positive jitter", hadPositive, is(true));
}
@Test
void testNoDelay() {
JitterRetryPolicy policy = JitterRetryPolicy.builder()
.jitter(Duration.ZERO)
.delay(Duration.ZERO)
.calls(3)
.build();
long firstCall = System.nanoTime();
Optional<Long> aLong = policy.nextDelayMillis(firstCall, 0, 0);
assertThat(aLong, value(is(0L)));
aLong = policy.nextDelayMillis(firstCall, 0, 1);
assertThat(aLong, value(is(0L)));
aLong = policy.nextDelayMillis(firstCall, 0, 2); // should just apply factor on last delay
assertThat(aLong, value(is(0L)));
aLong = policy.nextDelayMillis(firstCall, 100, 3); // limit of calls
assertThat(aLong, is(empty()));
}
}

View File

@@ -17,27 +17,38 @@
package io.helidon.faulttolerance;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertThrows;
class RetryTest {
@Test
void testRetry() {
Retry retry = Retry.builder()
.calls(3)
.delay(Duration.ofMillis(50))
.maxTime(Duration.ofMillis(500))
.jitter(Duration.ofMillis(50))
.retryPolicy(Retry.JitterRetryPolicy.builder()
.calls(3)
.delay(Duration.ofMillis(50))
.jitter(Duration.ofMillis(50))
.build())
.overallTimeout(Duration.ofMillis(500))
.build();
Request req = new Request(3, new TerminalException(), new RetryException());
@@ -54,11 +65,13 @@ class RetryTest {
@Test
void testRetryOn() {
Retry retry = Retry.builder()
.calls(3)
.delay(Duration.ofMillis(100))
.maxTime(Duration.ofMillis(500))
.jitter(Duration.ofMillis(50))
.addRetryOn(RetryException.class)
.retryPolicy(Retry.JitterRetryPolicy.builder()
.calls(3)
.delay(Duration.ofMillis(100))
.jitter(Duration.ofMillis(50))
.build())
.overallTimeout(Duration.ofMillis(500))
.addApplyOn(RetryException.class)
.build();
Request req = new Request(3, new RetryException(), new RetryException());
@@ -80,11 +93,13 @@ class RetryTest {
@Test
void testAbortOn() {
Retry retry = Retry.builder()
.calls(3)
.delay(Duration.ofMillis(100))
.maxTime(Duration.ofMillis(500))
.jitter(Duration.ofMillis(50))
.addAbortOn(TerminalException.class)
.retryPolicy(Retry.JitterRetryPolicy.builder()
.calls(3)
.delay(Duration.ofMillis(100))
.jitter(Duration.ofMillis(50))
.build())
.overallTimeout(Duration.ofMillis(500))
.addSkipOn(TerminalException.class)
.build();
Request req = new Request(3, new RetryException(), new RetryException());
@@ -106,29 +121,179 @@ class RetryTest {
@Test
void testTimeout() {
Retry retry = Retry.builder()
.calls(3)
.delay(Duration.ofMillis(50))
.maxTime(Duration.ofMillis(45))
.jitter(Duration.ofMillis(1))
.retryPolicy(Retry.JitterRetryPolicy.builder()
.calls(3)
.delay(Duration.ofMillis(100))
.jitter(Duration.ZERO)
.build())
.overallTimeout(Duration.ofMillis(50))
.build();
Request req = new Request(3, new RetryException(), new RetryException());
Single<Integer> result = retry.invoke(req::invoke);
FaultToleranceTest.completionException(result, TimeoutException.class);
assertThat(req.call.get(), is(2));
assertThat("Should have been called once", req.call.get(), is(1));
}
@Test
void testBadConfiguration() {
Retry.Builder builder = Retry.builder()
.retryOn(RetryException.class)
.abortOn(TerminalException.class);
.applyOn(RetryException.class)
.skipOn(TerminalException.class);
assertThrows(IllegalArgumentException.class, builder::build);
}
@Test
void testMultiRetriesNoFailure() throws InterruptedException {
Retry retry = Retry.builder()
.retryPolicy(Retry.DelayingRetryPolicy.noDelay(3))
.build();
private static class Request {
Multi<Integer> multi = retry.invokeMulti(() -> Multi.just(0, 1, 2));
TestSubscriber ts = new TestSubscriber();
multi.subscribe(ts);
ts.request(100);
ts.cdl.await(1, TimeUnit.SECONDS);
assertThat("Should be completed", ts.completed.get(), is(true));
assertThat("Should not be failed", ts.failed.get(), is(false));
assertThat(ts.values, contains(0, 1, 2));
}
@Test
void testMultiRetries() throws InterruptedException {
Retry retry = Retry.builder()
.retryPolicy(Retry.DelayingRetryPolicy.noDelay(3))
.build();
AtomicInteger count = new AtomicInteger();
Multi<Integer> multi = retry.invokeMulti(() -> {
if (count.getAndIncrement() < 2) {
return Multi.error(new RetryException());
} else {
return Multi.just(0, 1, 2);
}
});
TestSubscriber ts = new TestSubscriber();
multi.subscribe(ts);
ts.request(100);
ts.cdl.await(1, TimeUnit.SECONDS);
assertThat("Should be completed", ts.completed.get(), is(true));
assertThat("Should not be failed", ts.failed.get(), is(false));
assertThat(ts.values, contains(0, 1, 2));
}
@Test
void testMultiRetriesRead() throws InterruptedException {
Retry retry = Retry.builder()
.retryPolicy(Retry.DelayingRetryPolicy.noDelay(3))
.build();
AtomicInteger count = new AtomicInteger();
TestSubscriber ts = new TestSubscriber();
Multi<Integer> multi = retry.invokeMulti(() -> {
if (count.getAndIncrement() == 0) {
//return new PartialPublisher();
return Multi.concat(Multi.just(0), Multi.error(new RetryException()));
} else {
TestSubscriber it = ts;
return Multi.just(0, 1, 2);
}
});
// multi.onError(Throwable::printStackTrace)
// .forEach(System.out::println);
//
// multi = Multi.concat(Multi.just(0), Multi.error(new RetryException()));
multi.subscribe(ts);
ts.request(2);
ts.cdl.await(1, TimeUnit.SECONDS);
assertThat("Should be failed", ts.failed.get(), is(true));
assertThat(ts.throwable.get(), instanceOf(RetryException.class));
assertThat("Should not be completed", ts.completed.get(), is(false));
}
private static class PartialPublisher implements Flow.Publisher<Integer> {
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
subscriber.onNext(1);
subscriber.onError(new RetryException());
}
@Override
public void cancel() {
}
});
}
}
private static class TestSubscriber implements Flow.Subscriber<Integer> {
private final AtomicBoolean failed = new AtomicBoolean();
private final AtomicReference<Throwable> throwable = new AtomicReference<>();
private final AtomicBoolean completed = new AtomicBoolean();
private final List<Integer> values = new LinkedList<>();
private final AtomicBoolean finished = new AtomicBoolean();
private final CountDownLatch cdl = new CountDownLatch(1);
private Flow.Subscription subscription;
void request(long n) {
subscription.request(n);
}
void cancel() {
subscription.cancel();
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(Integer item) {
values.add(item);
}
@Override
public void onError(Throwable throwable) {
failed.set(true);
this.throwable.set(throwable);
finish();
}
@Override
public void onComplete() {
this.completed.set(true);
finish();
}
private void finish() {
if (finished.compareAndSet(false, true)) {
cdl.countDown();
}
}
}
private static class Request {
private final AtomicInteger call = new AtomicInteger();
private final int failures;
private final RuntimeException first;
@@ -161,6 +326,5 @@ class RetryTest {
}
private static class TerminalException extends RuntimeException {
}
}

View File

@@ -0,0 +1,32 @@
#
# Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Example Logging Configuration File
# For more information see $JAVA_HOME/jre/lib/logging.properties
# Send messages to the console
handlers=io.helidon.common.HelidonConsoleHandler
# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n
# Global logging level. Can be overridden by specific loggers
.level=WARNING
io.helidon.level=INFO
io.helidon.faulttolerance.level=FINEST