From 196c91690f1f8743784c999908f234b10249324c Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Wed, 1 Jul 2020 18:04:12 +0200 Subject: [PATCH] Update to support multi. Signed-off-by: Tomas Langer --- .../java/io/helidon/common/LogConfig.java | 11 +- .../common/reactive/MultiTappedPublisher.java | 70 ++- fault-tolerance/DESIGN.md | 23 +- fault-tolerance/pom.xml | 4 + .../java/io/helidon/faulttolerance/Async.java | 105 +++- .../io/helidon/faulttolerance/AsyncImpl.java | 76 +++ .../io/helidon/faulttolerance/Bulkhead.java | 165 +++--- .../faulttolerance/BulkheadException.java | 6 +- .../helidon/faulttolerance/BulkheadImpl.java | 111 ++++ .../faulttolerance/CircuitBreaker.java | 329 ++++++----- .../faulttolerance/CircuitBreakerImpl.java | 180 ++++++ .../CircuitBreakerOpenException.java | 3 + .../helidon/faulttolerance/DelayedTask.java | 147 +++++ .../helidon/faulttolerance/ErrorChecker.java | 46 ++ .../io/helidon/faulttolerance/Fallback.java | 177 ++++-- .../helidon/faulttolerance/FallbackImpl.java | 81 +++ .../faulttolerance/FaultTolerance.java | 53 +- .../io/helidon/faulttolerance/Handler.java | 4 +- .../java/io/helidon/faulttolerance/Retry.java | 535 ++++++++++++------ .../io/helidon/faulttolerance/RetryImpl.java | 158 ++++++ .../io/helidon/faulttolerance/Timeout.java | 7 + .../helidon/faulttolerance/TypedHandler.java | 4 +- .../src/main/java/module-info.java | 1 + .../io/helidon/faulttolerance/AsyncTest.java | 5 + .../helidon/faulttolerance/BulkheadTest.java | 102 +++- .../faulttolerance/CircuitBreakerTest.java | 51 +- .../faulttolerance/DelayRetryPolicyTest.java | 69 +++ .../faulttolerance/JitterRetryPolicyTest.java | 146 +++++ .../io/helidon/faulttolerance/RetryTest.java | 210 ++++++- .../src/test/resources/logging.properties | 32 ++ 30 files changed, 2384 insertions(+), 527 deletions(-) create mode 100644 fault-tolerance/src/main/java/io/helidon/faulttolerance/AsyncImpl.java create mode 100644 fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadImpl.java create mode 100644 fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerImpl.java create mode 100644 fault-tolerance/src/main/java/io/helidon/faulttolerance/DelayedTask.java create mode 100644 fault-tolerance/src/main/java/io/helidon/faulttolerance/ErrorChecker.java create mode 100644 fault-tolerance/src/main/java/io/helidon/faulttolerance/FallbackImpl.java create mode 100644 fault-tolerance/src/main/java/io/helidon/faulttolerance/RetryImpl.java create mode 100644 fault-tolerance/src/test/java/io/helidon/faulttolerance/DelayRetryPolicyTest.java create mode 100644 fault-tolerance/src/test/java/io/helidon/faulttolerance/JitterRetryPolicyTest.java create mode 100644 fault-tolerance/src/test/resources/logging.properties diff --git a/common/common/src/main/java/io/helidon/common/LogConfig.java b/common/common/src/main/java/io/helidon/common/LogConfig.java index 6251a2893..f88bf4800 100644 --- a/common/common/src/main/java/io/helidon/common/LogConfig.java +++ b/common/common/src/main/java/io/helidon/common/LogConfig.java @@ -106,13 +106,20 @@ public final class LogConfig { logConfigStream = new BufferedInputStream(Files.newInputStream(path)); source = "file: " + path.toAbsolutePath(); } else { - // second look for classpath (only the first one) + // second look for classpath (only the first one in production classpath) InputStream resourceStream = LogConfig.class.getResourceAsStream("/" + LOGGING_FILE); if (resourceStream != null) { logConfigStream = new BufferedInputStream(resourceStream); source = "classpath: /" + LOGGING_FILE; } else { - return source; + // once more for the current classloader + resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(LOGGING_FILE); + if (resourceStream != null) { + logConfigStream = new BufferedInputStream(resourceStream); + source = "context classpath: /" + LOGGING_FILE; + } else { + return source; + } } } diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiTappedPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiTappedPublisher.java index e396cd5c4..29d370ab9 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiTappedPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiTappedPublisher.java @@ -27,7 +27,7 @@ import java.util.function.LongConsumer; * user callbacks. * @param the element type of the sequence */ -final class MultiTappedPublisher implements Multi { +public final class MultiTappedPublisher implements Multi { private final Multi source; @@ -59,6 +59,20 @@ final class MultiTappedPublisher implements Multi { this.onCancelCallback = onCancelCallback; } + private MultiTappedPublisher(Builder builder) { + this(builder.source, + builder.onSubscribeCallback, + builder.onNextCallback, + builder.onErrorCallback, + builder.onCompleteCallback, + builder.onRequestCallback, + builder.onCancelCallback); + } + + public static Builder builder(Multi source) { + return new Builder<>(source); + } + @Override public void subscribe(Flow.Subscriber subscriber) { Objects.requireNonNull(subscriber, "subscriber is null"); @@ -280,4 +294,58 @@ final class MultiTappedPublisher implements Multi { } } } + + public static class Builder implements io.helidon.common.Builder> { + private final Multi source; + private Consumer onSubscribeCallback; + private Consumer onNextCallback; + private Runnable onCompleteCallback; + private LongConsumer onRequestCallback; + private Runnable onCancelCallback; + private Consumer onErrorCallback; + + private Builder(Multi source) { + this.source = source; + } + + @Override + public MultiTappedPublisher build() { + return new MultiTappedPublisher<>(this); + } + + Builder onSubscribeCallback(Consumer onSubscribeCallback) { + this.onSubscribeCallback = onSubscribeCallback; + return this; + } + + public Builder onSubscribeCallback(Runnable onSubscribeCallback) { + this.onSubscribeCallback = subscription -> onSubscribeCallback.run(); + return this; + } + + public Builder onNextCallback(Consumer onNextCallback) { + this.onNextCallback = onNextCallback; + return this; + } + + public Builder onCompleteCallback(Runnable onCompleteCallback) { + this.onCompleteCallback = onCompleteCallback; + return this; + } + + public Builder onRequestCallback(LongConsumer onRequestCallback) { + this.onRequestCallback = onRequestCallback; + return this; + } + + public Builder onCancelCallback(Runnable onCancelCallback) { + this.onCancelCallback = onCancelCallback; + return this; + } + + public Builder onErrorCallback(Consumer onErrorCallback) { + this.onErrorCallback = onErrorCallback; + return this; + } + } } diff --git a/fault-tolerance/DESIGN.md b/fault-tolerance/DESIGN.md index 8c0262c7e..a8d5b38ad 100644 --- a/fault-tolerance/DESIGN.md +++ b/fault-tolerance/DESIGN.md @@ -9,7 +9,10 @@ existing APIs). The FT requires executor service (or more) to handle some of the features provided. To be able to configure FT for the whole application, a set of static methods exists on class `FaultTolerance`. -- `FaultTolerance.config(Config)` - use a Helidon config instance to configure defaults +- `FaultTolerance.config(Config)` - use a Helidon config instance to configure defaults +- `FaultTolerance.executor(Supplier)` - Helidon wide executor service for async operation +- `FaultTolerance.scheduledExecutor(Supplier)` - Helidon wide scheduled executor service for scheduling retries, + timeouts and similar # Asynchronous Provides an asynchronous execution for a blocking operation. As Helidon SE network stack is using a @@ -24,9 +27,8 @@ Limits the number of parallel calls to a single resource. Configuration: - parallel execution limit -- executor service -- queue - number of queued records +- executor service - needed to process enqueued records # Circuit Breaker Defines a circuit breaker policy to an individual method or a class. @@ -38,7 +40,8 @@ Half-open: In half-open state, trial executions of the service are allowed. By d Circuit state transitions will reset the circuit breaker's records. Configuration: -- fail on `` - these are failures +- scheduled executor service +- apply on `` - these are failures - skip on `` - these are not failures - delay (duration) - how long before transitioning from open to half-open - volume threshold - rolling window size @@ -56,7 +59,7 @@ May provide a context of execution with information such as Configuration: - fallback method/handler -- applyOn `` - these are failures +- apply On `` - these are failures - skip on `` - these are not failures # Retry @@ -67,11 +70,17 @@ Configuration: - delay between retries (duration) - overall maximal duration - jitter (randomize delays a bit - duration) - a jitter of 200 ms will randomly add between -200 and 200 milliseconds to each retry delay. -- retry on `` - these are failures -- abort on `` - these will immediately abort retries +- apply on `` - these are failures +- skip on `` - these will immediately abort retries # Timeout Can be simply replaced with `Single.timeout` Configuration: - overall timeout (duration) + +TODO: + - native image support + - features (experimental) + - documentation + diff --git a/fault-tolerance/pom.xml b/fault-tolerance/pom.xml index 4383369e0..6a8f1c816 100644 --- a/fault-tolerance/pom.xml +++ b/fault-tolerance/pom.xml @@ -49,5 +49,9 @@ junit-jupiter-api test + + io.helidon.config + helidon-config-testing + \ No newline at end of file diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Async.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Async.java index 6ce8a96de..fa731d24c 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Async.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Async.java @@ -16,45 +16,68 @@ package io.helidon.faulttolerance; -import java.util.concurrent.CompletableFuture; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import io.helidon.common.LazyValue; import io.helidon.common.reactive.Single; -public class Async { - private final LazyValue executor; +/** + * Support for asynchronous execution of synchronous (blocking) calls. + * While this does not miraculously make a synchronous call non-blocking, it at least moves + * the blocking calls to a separate executor service, degrading the service gracefully. + *

+ * Example of usage: + *

+ *     // async instance with default executor service
+ *     Async async = Async.create();
+ *
+ *     // call a method with no parameters
+ *     Single<String> result = async.invoke(this::slowSync);
+ *
+ *     // call a method with parameters
+ *     async.invoke(() -> processRequest(request))
+ *      .thenApply(response::send);
+ *
+ *     // use async to obtain a Multi (from a method returning List of strings)
+ *     Multi<String> stringMulti = async.invoke(this::syncList)
+ *                 .flatMap(Multi::create);
+ * 
+ */ +public interface Async { + /** + * Invoke a synchronous operation asynchronously. + * This method never throws an exception. Any exception is returned via the {@link io.helidon.common.reactive.Single} + * result. + * + * @param supplier supplier of value (may be a method reference) + * @param type of returned value + * @return a Single that is a "promise" of the future result + */ + Single invoke(Supplier supplier); - public Async(Builder builder) { - this.executor = LazyValue.create(builder.executor); + /** + * Async with default executor service. + * + * @return a default async instance + */ + static Async create() { + return AsyncImpl.DefaultAsyncInstance.instance(); } - public static Async create() { - return builder().build(); - } - - public Single invoke(Supplier supplier) { - CompletableFuture future = new CompletableFuture<>(); - - executor.get() - .submit(() -> { - try { - T result = supplier.get(); - future.complete(result); - } catch (Exception e) { - future.completeExceptionally(e); - } - }); - - return Single.create(future); - } - - public static Builder builder() { + /** + * A new builder to build a customized {@link io.helidon.faulttolerance.Async} instance. + * @return a new builder + */ + static Builder builder() { return new Builder(); } - public static class Builder implements io.helidon.common.Builder { + /** + * Fluent API Builder for {@link io.helidon.faulttolerance.Async}. + */ + class Builder implements io.helidon.common.Builder { private LazyValue executor = FaultTolerance.executor(); private Builder() { @@ -62,12 +85,34 @@ public class Async { @Override public Async build() { - return new Async(this); + return new AsyncImpl(this); } - public Builder executor(Supplier executor) { - this.executor = LazyValue.create(executor); + /** + * Configure executor service to use for executing tasks asynchronously. + * + * @param executor executor service supplier + * @return updated builder instance + */ + public Builder executor(Supplier executor) { + this.executor = LazyValue.create(Objects.requireNonNull(executor)); return this; } + + /** + * Configure executor service to use for executing tasks asynchronously. + * + * @param executor executor service + * @return updated builder instance + */ + public Builder executor(ExecutorService executor) { + this.executor = LazyValue.create(Objects.requireNonNull(executor)); + return this; + } + + LazyValue executor() { + return executor; + } } + } diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/AsyncImpl.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/AsyncImpl.java new file mode 100644 index 000000000..33ee960b2 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/AsyncImpl.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; + +import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Single; + +class AsyncImpl implements Async { + private final LazyValue executor; + + AsyncImpl(Builder builder) { + this.executor = LazyValue.create(builder.executor()); + } + + @Override + public Single invoke(Supplier supplier) { + CompletableFuture future = new CompletableFuture<>(); + AsyncTask task = new AsyncTask<>(supplier, future); + + try { + executor.get().submit(task); + } catch (Throwable e) { + // rejected execution and other executor related issues + return Single.error(e); + } + + return Single.create(future); + } + + private static class AsyncTask implements Runnable { + private final Supplier supplier; + private final CompletableFuture future; + + private AsyncTask(Supplier supplier, CompletableFuture future) { + this.supplier = supplier; + this.future = future; + } + + @Override + public void run() { + try { + T result = supplier.get(); + future.complete(result); + } catch (Throwable e) { + future.completeExceptionally(e); + } + } + } + + static final class DefaultAsyncInstance { + private static final Async INSTANCE = Async.builder() + .build(); + + static Async instance() { + return INSTANCE; + } + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Bulkhead.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Bulkhead.java index 4faf8c9a5..400b38e54 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Bulkhead.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Bulkhead.java @@ -16,120 +16,113 @@ package io.helidon.faulttolerance; -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; +import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import io.helidon.common.LazyValue; -import io.helidon.common.reactive.Single; -public class Bulkhead implements Handler { - private final Queue> queue; - private final Semaphore inProgress; - - private Bulkhead(Builder builder) { - this.inProgress = new Semaphore(builder.limit, true); - if (builder.queueLength == 0) { - queue = new NoQueue(); - } else { - this.queue = new LinkedBlockingQueue<>(builder.queueLength); - } - } - - public static Builder builder() { +/** + * Bulkhead protects a resource that cannot serve unlimited parallel + * requests. + *

+ * When the limit of parallel execution is reached, requests are enqueued + * until the queue length is reached. Once both the limit and queue are full, + * additional attempts to invoke will end with a failed response with + * {@link io.helidon.faulttolerance.BulkheadException}. + */ +public interface Bulkhead extends Handler { + /** + * A new builder for {@link io.helidon.faulttolerance.Bulkhead}. + * + * @return a new builder + */ + static Builder builder() { return new Builder(); } - @SuppressWarnings("unchecked") - @Override - public Single invoke(Supplier> supplier) { - if (inProgress.tryAcquire()) { - CompletionStage result = supplier.get(); + /** + * Fluent API builder for {@link io.helidon.faulttolerance.Bulkhead}. + */ + class Builder implements io.helidon.common.Builder { + private static final int DEFAULT_LIMIT = 10; + private static final int DEFAULT_QUEUE_LENGTH = 10; - result.handle((it, throwable) -> { - // we still have an acquired semaphore - Enqueued polled = queue.poll(); - while (polled != null) { - invokeEnqueued((Enqueued) polled); - polled = queue.poll(); - } - inProgress.release(); - return null; - }); - - return Single.create(result); - } else { - Enqueued enqueued = new Enqueued<>(supplier); - if (!queue.offer(enqueued)) { - return Single.error(new BulkheadException("Bulkhead queue is full")); - } - return Single.create(enqueued.future()); - } - } - - private void invokeEnqueued(Enqueued enqueued) { - CompletableFuture future = enqueued.future(); - CompletionStage completionStage = enqueued.originalStage(); - completionStage.thenAccept(future::complete); - completionStage.exceptionally(throwable -> { - future.completeExceptionally(throwable); - return null; - }); - } - - private static class Enqueued { - private LazyValue> resultFuture = LazyValue.create(CompletableFuture::new); - private Supplier> supplier; - - private Enqueued(Supplier> supplier) { - this.supplier = supplier; - } - - private CompletableFuture future() { - return resultFuture.get(); - } - - private CompletionStage originalStage() { - return supplier.get(); - } - } - - public static class Builder implements io.helidon.common.Builder { - private int limit; - private int queueLength = 10; + private LazyValue executor = FaultTolerance.executor(); + private int limit = DEFAULT_LIMIT; + private int queueLength = DEFAULT_QUEUE_LENGTH; + private String name = "Bulkhead-" + System.identityHashCode(this); private Builder() { } @Override public Bulkhead build() { - return new Bulkhead(this); + return new BulkheadImpl(this); } + /** + * Configure executor service to use for executing tasks asynchronously. + * + * @param executor executor service supplier + * @return updated builder instance + */ + public Builder executor(Supplier executor) { + this.executor = LazyValue.create(Objects.requireNonNull(executor)); + return this; + } + + /** + * Maximal number of parallel requests going through this bulkhead. + * When the limit is reached, additional requests are enqueued. + * + * @param limit maximal number of parallel calls, defaults is {@value DEFAULT_LIMIT} + * @return updated builder instance + */ public Builder limit(int limit) { this.limit = limit; return this; } + /** + * Maximal number of enqueued requests waiting for processing. + * When the limit is reached, additional attempts to invoke + * a request will receive a {@link io.helidon.faulttolerance.BulkheadException}. + * + * @param queueLength length of queue + * @return updated builder instance + */ public Builder queueLength(int queueLength) { this.queueLength = queueLength; return this; } - } - private static class NoQueue extends ArrayDeque> { - @Override - public boolean offer(Enqueued enqueued) { - return false; + /** + * Name is useful for debugging and in exception handling. + * + * @param name name of this bulkhead + * @return updated builder instance + */ + public Builder name(String name) { + this.name = name; + return this; } - @Override - public Enqueued poll() { - return null; + int limit() { + return limit; + } + + int queueLength() { + return queueLength; + } + + LazyValue executor() { + return executor; + } + + String name() { + return name; } } + } diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadException.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadException.java index b2615ed06..9128fb025 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadException.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadException.java @@ -16,8 +16,12 @@ package io.helidon.faulttolerance; +/** + * Failure because of {@link io.helidon.faulttolerance.Bulkhead} issues, most likely that the bulkhead does + * not allow any more queued tasks. + */ public class BulkheadException extends RuntimeException { - public BulkheadException(String message) { + BulkheadException(String message) { super(message); } } diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadImpl.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadImpl.java new file mode 100644 index 000000000..69be76370 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadImpl.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Flow; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.function.Supplier; +import java.util.logging.Logger; + +import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Multi; +import io.helidon.common.reactive.Single; + +class BulkheadImpl implements Bulkhead { + private static final Logger LOGGER = Logger.getLogger(BulkheadImpl.class.getName()); + + private final LazyValue executor; + private final Queue> queue; + private final Semaphore inProgress; + private final String name; + + BulkheadImpl(Bulkhead.Builder builder) { + this.executor = builder.executor(); + this.inProgress = new Semaphore(builder.limit(), true); + this.name = builder.name(); + + if (builder.queueLength() == 0) { + queue = new NoQueue(); + } else { + this.queue = new LinkedBlockingQueue<>(builder.queueLength()); + } + } + + @Override + public Single invoke(Supplier> supplier) { + return invokeTask(DelayedTask.createSingle(supplier)); + } + + @Override + public Multi invokeMulti(Supplier> supplier) { + return invokeTask(DelayedTask.createMulti(supplier)); + } + + // this method must be called while NOT holding a permit + private R invokeTask(DelayedTask task) { + if (inProgress.tryAcquire()) { + LOGGER.finest(() -> name + " invoke immediate: " + task); + // free permit, we can invoke + execute(task); + return task.result(); + } else { + // no free permit, let's try to enqueue + if (queue.offer(task)) { + LOGGER.finest(() -> name + " enqueue: " + task); + return task.result(); + } else { + LOGGER.finest(() -> name + " reject: " + task); + return task.error(new BulkheadException("Bulkhead queue \"" + name + "\" is full")); + } + } + } + + // this method must be called while holding a permit + private void execute(DelayedTask task) { + task.execute() + .thenRun(() -> { + LOGGER.finest(() -> name + " finished execution: " + task); + DelayedTask polled = queue.poll(); + if (polled != null) { + LOGGER.finest(() -> name + " invoke in executor: " + polled); + // chain executions from queue until all are executed + executor.get().submit(() -> execute(polled)); + } else { + LOGGER.finest(() -> name + " permit released after: " + task); + // nothing in the queue, release permit + inProgress.release(); + } + }); + } + + private static class NoQueue extends ArrayDeque> { + @Override + public boolean offer(DelayedTask delayedTask) { + return false; + } + + @Override + public DelayedTask poll() { + return null; + } + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreaker.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreaker.java index d18d1f59d..101cda95c 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreaker.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreaker.java @@ -17,157 +17,77 @@ package io.helidon.faulttolerance; import java.time.Duration; -import java.util.concurrent.CompletionStage; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import io.helidon.common.LazyValue; -import io.helidon.common.reactive.Single; -import static io.helidon.faulttolerance.ResultWindow.Result.FAILURE; -import static io.helidon.faulttolerance.ResultWindow.Result.SUCCESS; - -public class CircuitBreaker implements Handler { - /* - Configuration options +/** + * CircuitBreaker protects a potentially failing endpoint from overloading and the application + * from spending resources on such a failing endpoints. + *

+ * In case too many errors are detected, the circuit opens and all new requests fail with a + * {@link io.helidon.faulttolerance.CircuitBreakerOpenException} for a period of time. + * After this period, attempts are made to check if the service is up again - if so, the circuit closes + * and requests can process as usual again. + */ +public interface CircuitBreaker extends Handler { + /** + * Builder to customize configuration of the breaker. + * @return a new builder */ - private final LazyValue executor; - // how long to transition from open to half-open - private final long delayMillis; - // how many successful calls will close a half-open breaker - private final int successThreshold; - - /* - Runtime - */ - private final AtomicReference state = new AtomicReference<>(State.CLOSED); - // rolling window for counting errors to (maybe) open the breaker - private final ResultWindow results; - // to close from half-open - private final AtomicInteger successCounter = new AtomicInteger(); - private final AtomicBoolean halfOpenInProgress = new AtomicBoolean(); - private final AtomicReference> schedule = new AtomicReference<>(); - - private CircuitBreaker(Builder builder) { - this.delayMillis = builder.delay.toMillis(); - this.successThreshold = builder.successThreshold; - this.results = new ResultWindow(builder.volume, builder.ratio); - this.executor = builder.executor(); - } - - public static Builder builder() { + static Builder builder() { return new Builder(); } - @Override - public Single invoke(Supplier> supplier) { - if (state.get() == State.CLOSED) { - // run it! - CompletionStage result = supplier.get(); - result.handle((it, exception) -> { - if (exception == null) { - // success - results.update(SUCCESS); - } else { - results.update(FAILURE); - if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) { - results.reset(); - // if we successfully switch to open, we need to schedule switch to half-open - scheduleHalf(); - } - } + /** + * Current breaker state. + * As the state may change within nanoseconds, this is for information only. + * @return current breaker state + */ + State state(); - return it; - }); - return Single.create(result); - } else if (state.get() == State.OPEN) { - // fail it! - return Single.error(new CircuitBreakerOpenException("CircuitBreaker is open")); - } else { - // half-open - if (halfOpenInProgress.compareAndSet(false, true)) { - CompletionStage result = supplier.get(); - result.handle((it, exception) -> { - if (exception == null) { - // success - int successes = successCounter.incrementAndGet(); - if (successes >= successThreshold) { - // transition to closed - successCounter.set(0); - state.compareAndSet(State.HALF_OPEN, State.CLOSED); - halfOpenInProgress.set(false); - } - halfOpenInProgress.set(false); - } else { - // failure - successCounter.set(0); - state.set(State.OPEN); - halfOpenInProgress.set(false); - // if we successfully switch to open, we need to schedule switch to half-open - scheduleHalf(); - } + /** + * Set state of this circuit breaker. + * Note that all usual processes to re-close or open the circuit are in progress. + *

    + *
  • If set to {@link State#OPEN}, a timer will set it to half open in a + * while
  • + *
  • If set to {@link State#HALF_OPEN}, it may close after first successful request
  • + *
  • If set to {@link State#CLOSED}, it may open again if requests fail
  • + *
+ * So a subsequent call to {@link #state()} may yield a different state than configured here + * @param newState state to configure + */ + void state(State newState); - return it; - }); - return Single.create(result); - } else { - return Single - .error(new CircuitBreakerOpenException("CircuitBreaker is half open, parallel execution in progress")); - } - } + /** + * {@link io.helidon.faulttolerance.CircuitBreaker} states. + */ + enum State { + /** + * Circuit is closed and requests are processed. + */ + CLOSED, + /** + * Circuit is half open and some test requests are processed, others fail with + * {@link io.helidon.faulttolerance.CircuitBreakerOpenException}. + */ + HALF_OPEN, + /** + * Circuit is open and all requests fail with {@link io.helidon.faulttolerance.CircuitBreakerOpenException}. + */ + OPEN } - private void scheduleHalf() { - schedule.set(executor.get() - .schedule(() -> { - state.compareAndSet(State.OPEN, State.HALF_OPEN); - schedule.set(null); - return true; - }, delayMillis, TimeUnit.MILLISECONDS)); - } - - public State state() { - return state.get(); - } - - public void state(State newState) { - if (newState == State.CLOSED) { - if (state.get() == State.CLOSED) { - // fine - resetCounters(); - return; - } - - ScheduledFuture future = schedule.getAndSet(null); - if (future != null) { - future.cancel(false); - } - resetCounters(); - state.set(State.CLOSED); - } else if (newState == State.OPEN) { - state.set(State.OPEN); - ScheduledFuture future = schedule.getAndSet(null); - if (future != null) { - future.cancel(false); - } - resetCounters(); - } else { - // half open - resetCounters(); - } - } - - private void resetCounters() { - results.reset(); - successCounter.set(0); - } - - public static class Builder implements io.helidon.common.Builder { + /** + * Fluent API builder for {@link io.helidon.faulttolerance.CircuitBreaker}. + */ + class Builder implements io.helidon.common.Builder { + private final Set> skipOn = new HashSet<>(); + private final Set> applyOn = new HashSet<>(); // how long to transition from open to half-open private Duration delay = Duration.ofSeconds(5); // how many percents of failures will open the breaker @@ -183,38 +103,153 @@ public class CircuitBreaker implements Handler { @Override public CircuitBreaker build() { - return new CircuitBreaker(this); + return new CircuitBreakerImpl(this); } + /** + * How long to wait before transitioning from open to half-open state. + * + * @param delay to wait + * @return updated builder instance + */ public Builder delay(Duration delay) { this.delay = delay; return this; } - public Builder ratio(int ratio) { + /** + * How many failures out of 100 will trigger the circuit to open. + * This is adapted to the {@link #volume(int)} used to handle the window of requests. + *

If errorRatio is 40, and volume is 10, 4 failed requests will open the circuit. + * + * @param ratio percent of failure that trigger the circuit to open + * @return updated builder instance + * @see #volume(int) + */ + public Builder errorRatio(int ratio) { this.ratio = ratio; return this; } + /** + * How many successful calls will close a half-open circuit. + * Nevertheless the first failed call will open the circuit again. + * + * @param successThreshold number of calls + * @return updated builder instance + */ public Builder successThreshold(int successThreshold) { this.successThreshold = successThreshold; return this; } + /** + * Rolling window size used to calculate ratio of failed requests. + * + * @param volume how big a window is used to calculate error errorRatio + * @return updated builder instance + * @see #errorRatio(int) + */ public Builder volume(int volume) { this.volume = volume; return this; } + /** + * These throwables will be considered failures, and all other will not. + *

+ * Cannot be combined with {@link #skipOn}. + * + * @param classes to consider failures to calculate failure ratio + * @return updated builder instance + */ + public Builder applyOn(Class... classes) { + applyOn.clear(); + Arrays.stream(classes) + .forEach(this::addApplyOn); + + return this; + } + + /** + * Add a throwable to be considered a failure. + * + * @param clazz to consider failure to calculate failure ratio + * @return updated builder instance + * @see #applyOn + */ + public Builder addApplyOn(Class clazz) { + this.applyOn.add(clazz); + return this; + } + + /** + * These throwables will not be considered failures, all other will. + *

+ * Cannot be combined with {@link #applyOn}. + * + * @param classes to consider successful + * @return updated builder instance + */ + public Builder skipOn(Class... classes) { + skipOn.clear(); + Arrays.stream(classes) + .forEach(this::addSkipOn); + + return this; + } + + /** + * This throwable will not be considered failure. + *

+ * + * @param clazz to consider successful + * @return updated builder instance + */ + public Builder addSkipOn(Class clazz) { + this.skipOn.add(clazz); + return this; + } + + /** + * Executor service to schedule future tasks. + * By default uses an executor configured on + * {@link io.helidon.faulttolerance.FaultTolerance#scheduledExecutor(java.util.function.Supplier)}. + * + * @param scheduledExecutor executor to use + * @return updated builder instance + */ + public Builder executor(ScheduledExecutorService scheduledExecutor) { + this.executor = LazyValue.create(scheduledExecutor); + return this; + } + LazyValue executor() { return executor; } - } - public enum State { - CLOSED, - HALF_OPEN, - OPEN - } + Set> skipOn() { + return skipOn; + } + Set> applyOn() { + return applyOn; + } + + Duration delay() { + return delay; + } + + int errorRatio() { + return ratio; + } + + int successThreshold() { + return successThreshold; + } + + int volume() { + return volume; + } + } } diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerImpl.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerImpl.java new file mode 100644 index 000000000..1a2f226fe --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerImpl.java @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Multi; +import io.helidon.common.reactive.Single; + +import static io.helidon.faulttolerance.ResultWindow.Result.FAILURE; +import static io.helidon.faulttolerance.ResultWindow.Result.SUCCESS; + +public class CircuitBreakerImpl implements CircuitBreaker { + /* + Configuration options + */ + private final LazyValue executor; + // how long to transition from open to half-open + private final long delayMillis; + // how many successful calls will close a half-open breaker + private final int successThreshold; + + /* + Runtime + */ + private final AtomicReference state = new AtomicReference<>(State.CLOSED); + // rolling window for counting errors to (maybe) open the breaker + private final ResultWindow results; + // to close from half-open + private final AtomicInteger successCounter = new AtomicInteger(); + private final AtomicBoolean halfOpenInProgress = new AtomicBoolean(); + private final AtomicReference> schedule = new AtomicReference<>(); + private final ErrorChecker errorChecker; + + CircuitBreakerImpl(CircuitBreaker.Builder builder) { + this.delayMillis = builder.delay().toMillis(); + this.successThreshold = builder.successThreshold(); + this.results = new ResultWindow(builder.volume(), builder.errorRatio()); + this.executor = builder.executor(); + this.errorChecker = ErrorChecker.create(builder.skipOn(), builder.applyOn()); + } + + @Override + public Multi invokeMulti(Supplier> supplier) { + return invokeTask(DelayedTask.createMulti(supplier)); + } + + @Override + public Single invoke(Supplier> supplier) { + return invokeTask(DelayedTask.createSingle(supplier)); + } + + private U invokeTask(DelayedTask task) { + if (state.get() == State.CLOSED) { + // run it! + CompletionStage completion = task.execute(); + + completion.handle((it, throwable) -> { + Throwable exception = FaultTolerance.cause(throwable); + if (exception == null || errorChecker.shouldSkip(exception)) { + // success + results.update(SUCCESS); + } else { + results.update(FAILURE); + if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) { + results.reset(); + // if we successfully switch to open, we need to schedule switch to half-open + scheduleHalf(); + } + } + + return it; + }); + return task.result(); + } else if (state.get() == State.OPEN) { + // fail it! + return task.error(new CircuitBreakerOpenException("CircuitBreaker is open")); + } else { + // half-open + if (halfOpenInProgress.compareAndSet(false, true)) { + CompletionStage result = task.execute(); + result.handle((it, throwable) -> { + Throwable exception = FaultTolerance.cause(throwable); + if (exception == null || errorChecker.shouldSkip(exception)) { + // success + int successes = successCounter.incrementAndGet(); + if (successes >= successThreshold) { + // transition to closed + successCounter.set(0); + state.compareAndSet(State.HALF_OPEN, State.CLOSED); + halfOpenInProgress.set(false); + } + halfOpenInProgress.set(false); + } else { + // failure + successCounter.set(0); + state.set(State.OPEN); + halfOpenInProgress.set(false); + // if we successfully switch to open, we need to schedule switch to half-open + scheduleHalf(); + } + + return it; + }); + return task.result(); + } else { + return task + .error(new CircuitBreakerOpenException("CircuitBreaker is half open, parallel execution in progress")); + } + } + } + + private void scheduleHalf() { + schedule.set(executor.get() + .schedule(() -> { + state.compareAndSet(State.OPEN, State.HALF_OPEN); + schedule.set(null); + return true; + }, delayMillis, TimeUnit.MILLISECONDS)); + } + + public State state() { + return state.get(); + } + + public void state(State newState) { + if (newState == State.CLOSED) { + if (state.get() == State.CLOSED) { + // fine + resetCounters(); + return; + } + + ScheduledFuture future = schedule.getAndSet(null); + if (future != null) { + future.cancel(false); + } + resetCounters(); + state.set(State.CLOSED); + } else if (newState == State.OPEN) { + state.set(State.OPEN); + ScheduledFuture future = schedule.getAndSet(null); + if (future != null) { + future.cancel(false); + } + resetCounters(); + } else { + // half open + resetCounters(); + } + } + + private void resetCounters() { + results.reset(); + successCounter.set(0); + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerOpenException.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerOpenException.java index 130de528c..d88ab656e 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerOpenException.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerOpenException.java @@ -16,6 +16,9 @@ package io.helidon.faulttolerance; +/** + * Failure because {@link io.helidon.faulttolerance.CircuitBreaker} is open and does not accept requests. + */ public class CircuitBreakerOpenException extends RuntimeException { public CircuitBreakerOpenException(String message) { super(message); diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/DelayedTask.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/DelayedTask.java new file mode 100644 index 000000000..56bba7847 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/DelayedTask.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; + +import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Multi; +import io.helidon.common.reactive.MultiTappedPublisher; +import io.helidon.common.reactive.Single; + +interface DelayedTask { + // the result completes when the call fully completes (regardless of errors) + CompletionStage execute(); + + // get the (new) result + T result(); + + // create an error result + T error(Throwable throwable); + + // cannot retry or fallback when data was already sent (only useful for multi) + default boolean hadData() { + return false; + } + + static DelayedTask> createMulti(Supplier> supplier) { + return new DelayedTask<>() { + final AtomicBoolean completed = new AtomicBoolean(); + final AtomicBoolean hasData = new AtomicBoolean(); + final LazyValue> completionMarker = LazyValue.create(CompletableFuture::new); + final LazyValue>> publisherFuture = LazyValue.create(CompletableFuture::new); + final LazyValue> multi = LazyValue.create(() -> { + return MultiTappedPublisher.builder(Multi.create(publisherFuture.get()).flatMap(Function.identity())) + .onCancelCallback(() -> failMarker(new CancellationException("Multi was cancelled"))) + .onCompleteCallback(this::completeMarker) + .onErrorCallback(this::failMarker) + .onNextCallback(it -> hasData.set(true)) + .build(); + }); + + @Override + public CompletionStage execute() { + publisherFuture.get().complete(supplier.get()); + + return completionMarker.get(); + } + + @Override + public Multi result() { + return multi.get(); + } + + @Override + public Multi error(Throwable throwable) { + return Multi.error(throwable); + } + + @Override + public String toString() { + return "multi:" + System.identityHashCode(this); + } + + @Override + public boolean hadData() { + return hasData.get(); + } + + private void failMarker(Throwable throwable) { + if (completed.compareAndSet(false, true)) { + completionMarker.get().completeExceptionally(throwable); + } + } + + private void completeMarker() { + if (completed.compareAndSet(false, true)) { + completionMarker.get().complete(null); + } + } + }; + } + + static DelayedTask> createSingle(Supplier> supplier) { + return new DelayedTask<>() { + // future we returned as a result of invoke command + final LazyValue> resultFuture = LazyValue.create(CompletableFuture::new); + + @Override + public CompletionStage execute() { + CompletionStage result = null; + try { + result = supplier.get(); + } catch (Exception e) { + return CompletableFuture.failedStage(e); + } + CompletableFuture future = resultFuture.get(); + + result.handle((it, throwable) -> { + if (throwable == null) { + future.complete(it); + } else { + future.completeExceptionally(throwable); + } + + return null; + }); + + return result.thenRun(() -> {}); + } + + @Override + public Single result() { + return Single.create(resultFuture.get()); + } + + @Override + public Single error(Throwable throwable) { + return Single.error(throwable); + } + + @Override + public String toString() { + return "single:" + System.identityHashCode(this); + } + }; + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/ErrorChecker.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/ErrorChecker.java new file mode 100644 index 000000000..90e024d4e --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/ErrorChecker.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.Set; + +@FunctionalInterface +interface ErrorChecker { + boolean shouldSkip(Throwable throwable); + + static ErrorChecker create(Set> skipOnSet, Set> applyOnSet) { + Set> skipOn = Set.copyOf(skipOnSet); + Set> applyOn = Set.copyOf(applyOnSet); + + if (skipOn.isEmpty()) { + if (applyOn.isEmpty()) { + return throwable -> false; + } else { + return throwable -> !applyOn.contains(throwable.getClass()); + } + } else { + if (applyOn.isEmpty()) { + return throwable -> skipOn.contains(throwable.getClass()); + } else { + throw new IllegalArgumentException("You have defined both skip and apply set of exception classes. " + + "This cannot be correctly handled; skipOn: " + skipOn + + " applyOn: " + applyOn); + } + + } + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java index 1e8edf971..469ad28d8 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java @@ -16,62 +16,171 @@ package io.helidon.faulttolerance; -import java.util.Objects; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; import java.util.function.Function; -import java.util.function.Supplier; +import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; -public class Fallback implements TypedHandler { - private final Function> fallback; - - private Fallback(Builder builder) { - this.fallback = builder.fallback; - } - - public static Builder builder() { +/** + * Fallback allows the user to execute an alternative supplier of results in case the usual one fails. + *

+ * In case you call the {@link #invokeMulti(java.util.function.Supplier)} method, the following restriction applies: + *

    + *
  • In case at least one record was sent (one {@code onNext} was called), the fallback will not trigger.
  • + *
+ * You may provide fallback for both a {@link io.helidon.common.reactive.Multi} and a {@link io.helidon.common.reactive.Single}. + * If none is provided, the method is executed without fallback. + * @param type of the values returned + */ +public interface Fallback extends TypedHandler { + /** + * A builder to customize {@link Fallback}. + * + * @param type of the values returned by the failing method + * @return a new builder + */ + static Builder builder() { return new Builder<>(); } - @Override - public Single invoke(Supplier> supplier) { - CompletableFuture future = new CompletableFuture<>(); - - supplier.get() - .thenAccept(future::complete) - .exceptionally(throwable -> { - Throwable cause = FaultTolerance.getCause(throwable); - fallback.apply(cause) - .thenAccept(future::complete) - .exceptionally(t2 -> { - Throwable cause2 = FaultTolerance.getCause(t2); - cause2.addSuppressed(throwable); - future.completeExceptionally(cause2); - return null; - }); - return null; - }); - - return Single.create(future); + /** + * Create a fallback for a {@link Single} or {@link java.util.concurrent.CompletionStage}. + * + * @param fallback fallback supplier to obtain the alternative result + * @param type of the result + * @return a new fallback + */ + static Fallback create(Function> fallback) { + Builder builder = builder(); + return builder.fallback(fallback).build(); } - public static class Builder implements io.helidon.common.Builder> { - private Function> fallback; + /** + * Create a fallback for a {@link Multi} or {@link java.util.concurrent.Flow.Publisher}. + * + * @param fallback fallback supplier to obtain the alternative result + * @param type of the result + * @return a new fallback + */ + static Fallback createMulti(Function> fallback) { + Builder builder = builder(); + return builder.fallback(fallback).build(); + } + + /** + * Fluent API builder for {@link io.helidon.faulttolerance.Fallback}. + * + * @param type of the values returned + */ + class Builder implements io.helidon.common.Builder> { + private final Set> applyOn = new HashSet<>(); + private final Set> skipOn = new HashSet<>(); + + private Function> fallback = CompletableFuture::failedFuture; + private Function> fallbackMulti = Multi::error; private Builder() { } @Override public Fallback build() { - Objects.requireNonNull(fallback, "Fallback method must be specified"); - return new Fallback<>(this); + return new FallbackImpl<>(this); } + /** + * Configure a fallback for a {@link Single} or {@link java.util.concurrent.CompletionStage}. + * + * @param fallback fallback supplier to obtain the alternative result + * @return updated builder instance + */ public Builder fallback(Function> fallback) { this.fallback = fallback; return this; } + + /** + * Configure a fallback for a {@link Multi} or {@link java.util.concurrent.Flow.Publisher}. + * + * @param fallback fallback supplier to obtain the alternative result + * @return updated builder instance + */ + public Builder fallbackMulti(Function> fallback) { + this.fallbackMulti = fallback; + return this; + } + + /** + * Apply fallback on these throwable classes. + * Cannot be combined with {@link #skipOn(Class[])}. + * + * @param classes classes to fallback on + * @return updated builder instance + */ + public Builder applyOn(Class... classes) { + applyOn.clear(); + Arrays.stream(classes) + .forEach(this::addApplyOn); + + return this; + } + + /** + * Apply fallback on this throwable class. + * + * @param clazz class to fallback on + * @return updated builder instance + */ + public Builder addApplyOn(Class clazz) { + this.applyOn.add(clazz); + return this; + } + + /** + * Do not apply fallback on these throwable classes. + * Cannot be combined with {@link #applyOn(Class[])}. + * + * @param classes classes not to fallback on + * @return updated builder instance + */ + public Builder skipOn(Class... classes) { + skipOn.clear(); + Arrays.stream(classes) + .forEach(this::addSkipOn); + + return this; + } + + /** + * Do not apply fallback on this throwable class. + * + * @param clazz class not to fallback on + * @return updated builder instance + */ + public Builder addSkipOn(Class clazz) { + this.skipOn.add(clazz); + return this; + } + + Set> applyOn() { + return applyOn; + } + + Set> skipOn() { + return skipOn; + } + + Function> fallback() { + return fallback; + } + + Function> fallbackMulti() { + return fallbackMulti; + } } } diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/FallbackImpl.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/FallbackImpl.java new file mode 100644 index 000000000..593e14230 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/FallbackImpl.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import java.util.function.Function; +import java.util.function.Supplier; + +import io.helidon.common.reactive.Multi; +import io.helidon.common.reactive.Single; + +class FallbackImpl implements Fallback { + private final Function> fallback; + private final Function> fallbackMulti; + private final ErrorChecker errorChecker; + + FallbackImpl(Fallback.Builder builder) { + this.fallback = builder.fallback(); + this.fallbackMulti = builder.fallbackMulti(); + this.errorChecker = ErrorChecker.create(builder.skipOn(), builder.applyOn()); + } + + @Override + public Multi invokeMulti(Supplier> supplier) { + DelayedTask> delayedTask = DelayedTask.createMulti(supplier); + + delayedTask.execute(); + + return delayedTask.result() + .onErrorResumeWith(throwable -> { + Throwable cause = FaultTolerance.cause(throwable); + if (delayedTask.hadData() || errorChecker.shouldSkip(cause)) { + return Multi.error(cause); + } else { + return Multi.create(fallbackMulti.apply(cause)); + } + }); + } + + @Override + public Single invoke(Supplier> supplier) { + CompletableFuture future = new CompletableFuture<>(); + + supplier.get() + .thenAccept(future::complete) + .exceptionally(throwable -> { + Throwable cause = FaultTolerance.cause(throwable); + if (errorChecker.shouldSkip(cause)) { + future.completeExceptionally(cause); + } else { + fallback.apply(cause) + .thenAccept(future::complete) + .exceptionally(t2 -> { + Throwable cause2 = FaultTolerance.cause(t2); + cause2.addSuppressed(throwable); + future.completeExceptionally(cause2); + return null; + }); + } + return null; + }); + + return Single.create(future); + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java index 9d4c56e85..f410bac52 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Flow; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -31,6 +32,7 @@ import java.util.function.Supplier; import io.helidon.common.LazyValue; import io.helidon.common.configurable.ScheduledThreadPoolSupplier; import io.helidon.common.configurable.ThreadPoolSupplier; +import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; import io.helidon.config.Config; @@ -116,12 +118,12 @@ public final class FaultTolerance { return new Builder(); } - static Throwable getCause(Throwable throwable) { + static Throwable cause(Throwable throwable) { if (throwable instanceof CompletionException) { - return getCause(throwable.getCause()); + return cause(throwable.getCause()); } if (throwable instanceof ExecutionException) { - return getCause(throwable.getCause()); + return cause(throwable.getCause()); } return throwable; } @@ -168,7 +170,7 @@ public final class FaultTolerance { @Override public void add(Handler ft) { - fts.add(ft::invoke); + fts.add(new TypedWrapper(ft)); } public TypedBuilder addFallback(Fallback fallback) { @@ -179,7 +181,7 @@ public final class FaultTolerance { private TypedBuilder builder(Builder builder) { builder.fts .forEach(it -> { - fts.add(it::invoke); + fts.add(new TypedWrapper(it)); }); return this; } @@ -191,6 +193,18 @@ public final class FaultTolerance { this.validFts = new LinkedList<>(validFts); } + @Override + public Multi invokeMulti(Supplier> supplier) { + Supplier> next = supplier; + + for (TypedHandler validFt : validFts) { + final var finalNext = next; + next = () -> validFt.invokeMulti(finalNext); + } + + return Multi.create(next.get()); + } + @Override public Single invoke(Supplier> supplier) { Supplier> next = supplier; @@ -203,6 +217,24 @@ public final class FaultTolerance { return Single.create(next.get()); } } + + private class TypedWrapper implements TypedHandler { + private final Handler handler; + + private TypedWrapper(Handler handler) { + this.handler = handler; + } + + @Override + public Single invoke(Supplier> supplier) { + return handler.invoke(supplier); + } + + @Override + public Multi invokeMulti(Supplier> supplier) { + return handler.invokeMulti(supplier); + } + } } public static class Builder extends BaseBuilder implements io.helidon.common.Builder { @@ -234,6 +266,17 @@ public final class FaultTolerance { this.validFts = new LinkedList<>(validFts); } + @Override + public Multi invokeMulti(Supplier> supplier) { + Supplier> next = supplier; + + for (Handler validFt : validFts) { + final var finalNext = next; + next = () -> validFt.invokeMulti(finalNext); + } + + return Multi.create(next.get()); + } @Override public Single invoke(Supplier> supplier) { Supplier> next = supplier; diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Handler.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Handler.java index 6edca9fca..a38279cec 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Handler.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Handler.java @@ -17,11 +17,13 @@ package io.helidon.faulttolerance; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; import java.util.function.Supplier; +import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; -@FunctionalInterface public interface Handler { Single invoke(Supplier> supplier); + Multi invokeMulti(Supplier> supplier); } diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Retry.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Retry.java index 720cea3f3..f34d01248 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Retry.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Retry.java @@ -19,155 +19,47 @@ package io.helidon.faulttolerance; import java.time.Duration; import java.util.Arrays; import java.util.HashSet; +import java.util.Optional; import java.util.Random; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import io.helidon.common.LazyValue; -import io.helidon.common.reactive.Single; -public class Retry implements Handler { - private final int calls; - private final long delayMillis; - private final long maxTimeNanos; - private final int jitterMillis; - private final LazyValue scheduledExecutor; - private final Random random = new Random(); - - protected Retry(Builder builder) { - this.calls = builder.calls; - this.delayMillis = builder.delay.toMillis(); - this.maxTimeNanos = builder.maxTime.toNanos(); - long jitter = builder.jitter.toMillis() * 2; - this.jitterMillis = (jitter > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) jitter; - this.scheduledExecutor = builder.scheduledExecutor; - } - - public static Builder builder() { +/** + * Retry supports retry policies to be applied on an execution of asynchronous tasks. + *

+ * In case you call the {@link #invokeMulti(java.util.function.Supplier)} method, the following restriction applies: + *

    + *
  • In case at least one record was sent (one {@code onNext} was called), the retry will not trigger.
  • + *
+ */ +public interface Retry extends Handler { + /** + * A new builder to customize {@code Retry} configuration. + * + * @return a new builder + */ + static Builder builder() { return new Builder(); } - @Override - public Single invoke(Supplier> supplier) { - CompletableFuture future = new CompletableFuture<>(); + /** + * Fluent API builder for {@link io.helidon.faulttolerance.Retry}. + */ + class Builder implements io.helidon.common.Builder { + private final Set> applyOn = new HashSet<>(); + private final Set> skipOn = new HashSet<>(); - new Retrier<>(future, supplier, this) - .retry(); + private RetryPolicy retryPolicy = JitterRetryPolicy.builder() + .calls(4) + .delay(Duration.ofMillis(200)) + .jitter(Duration.ofMillis(50)) + .build(); - return Single.create(future); - } - protected boolean abort(Throwable throwable) { - return false; - } - - private long nextDelay() { - long delay = delayMillis; - int jitterRandom = random.nextInt(jitterMillis) - jitterMillis; - delay = delay + jitterRandom; - delay = Math.max(0, delay); - - return delay; - } - - private class Retrier { - private final AtomicInteger count = new AtomicInteger(); - private final AtomicReference lastThrowable = new AtomicReference<>(); - private final Supplier> supplier; - private final CompletableFuture future; - private final Retry retry; - private final long started = System.nanoTime(); - - private Retrier(CompletableFuture future, - Supplier> supplier, - Retry retry) { - this.future = future; - this.supplier = supplier; - this.retry = retry; - } - - private void retry() { - int currentCount = count.incrementAndGet(); - - CompletionStage stage = null; - try { - stage = supplier.get(); - } catch (Throwable e) { - stage = CompletableFuture.failedStage(e); - } - - stage.handle((it, throwable) -> { - if (throwable == null) { - future.complete(it); - } else { - Throwable current = FaultTolerance.getCause(throwable); - Throwable before = lastThrowable.get(); - if (before != null) { - current.addSuppressed(before); - } - - long now = System.nanoTime(); - if (currentCount >= retry.calls || retry.abort(current)) { - // this is final execution - future.completeExceptionally(current); - } else if (now - started > maxTimeNanos) { - TimeoutException timedOut = new TimeoutException("Retries timed out"); - timedOut.addSuppressed(current); - future.completeExceptionally(timedOut); - } else { - lastThrowable.set(current); - // schedule next retry - scheduledExecutor.get().schedule(this::retry, nextDelay(), TimeUnit.MILLISECONDS); - } - } - return null; - }); - } - } - - private static class RetryWithAbortOn extends Retry { - private final Set> abortOn; - - private RetryWithAbortOn(Builder builder) { - super(builder); - this.abortOn = Set.copyOf(builder.abortOn); - } - - @Override - protected boolean abort(Throwable throwable) { - return abortOn.contains(throwable.getClass()); - } - } - - private static class RetryWithRetryOn extends Retry { - private final Set> retryOn; - - private RetryWithRetryOn(Builder builder) { - super(builder); - this.retryOn = Set.copyOf(builder.retryOn); - } - - @Override - protected boolean abort(Throwable throwable) { - return !retryOn.contains(throwable.getClass()); - } - } - - public static class Builder implements io.helidon.common.Builder { - private final Set> retryOn = new HashSet<>(); - private final Set> abortOn = new HashSet<>(); - - private int calls = 3; - private Duration delay = Duration.ofMillis(200); - private Duration maxTime = Duration.ofSeconds(1); - private Duration jitter = Duration.ofMillis(50); + private Duration overallTimeout = Duration.ofSeconds(1); private LazyValue scheduledExecutor = FaultTolerance.scheduledExecutor(); private Builder() { @@ -175,78 +67,359 @@ public class Retry implements Handler { @Override public Retry build() { - calls = Math.max(1, calls); - if (retryOn.isEmpty()) { - if (abortOn.isEmpty()) { - return new Retry(this); - } else { - return new RetryWithAbortOn(this); - } - } else { - if (abortOn.isEmpty()) { - return new RetryWithRetryOn(this); - } else { - throw new IllegalArgumentException("You have defined both retryOn and abortOn exceptions. " - + "This cannot be correctly handled; abortOn: " + abortOn - + " retryOn: " + retryOn); - } - } + return new RetryImpl(this); } /** - * Total number of calls (first + retries). - * @param calls how many times to call the method + * Configure a retry policy to use to calculate delays between retries. + * Defaults to a {@link io.helidon.faulttolerance.Retry.JitterRetryPolicy} + * with 4 calls (initial call + 3 retries), delay of 200 millis and a jitter of 50 millis. + * + * @param policy retry policy * @return updated builder instance */ - public Builder calls(int calls) { - this.calls = calls; + public Builder retryPolicy(RetryPolicy policy) { + this.retryPolicy = policy; return this; } - public Builder delay(Duration delay) { - this.delay = delay; - return this; - } - - public Builder maxTime(Duration maxTime) { - this.maxTime = maxTime; - return this; - } - - public Builder jitter(Duration jitter) { - this.jitter = jitter; - return this; - } - - public Builder retryOn(Class... classes) { - retryOn.clear(); + /** + * These throwables will be considered failures, and all other will not. + *

+ * Cannot be combined with {@link #skipOn}. + * + * @param classes to consider failures and trigger a retry + * @return updated builder instance + */ + public Builder applyOn(Class... classes) { + applyOn.clear(); Arrays.stream(classes) - .forEach(this::addRetryOn); + .forEach(this::addApplyOn); return this; } - public Builder addRetryOn(Class clazz) { - this.retryOn.add(clazz); + /** + * Add a throwable to be considered a failure. + * + * @param clazz to consider failure and trigger a retry + * @return updated builder instance + * @see #applyOn + */ + public Builder addApplyOn(Class clazz) { + this.applyOn.add(clazz); return this; } - public Builder abortOn(Class... classes) { - abortOn.clear(); + /** + * These throwables will not be considered retriable, all other will. + *

+ * Cannot be combined with {@link #applyOn}. + * + * @param classes to skip retries + * @return updated builder instance + */ + public Builder skipOn(Class... classes) { + skipOn.clear(); Arrays.stream(classes) - .forEach(this::addAbortOn); + .forEach(this::addSkipOn); return this; } - public Builder addAbortOn(Class clazz) { - this.abortOn.add(clazz); + /** + * This throwable will not be considered retriable. + *

+ * + * @param clazz to to skip retries + * @return updated builder instance + */ + public Builder addSkipOn(Class clazz) { + this.skipOn.add(clazz); return this; } + /** + * Executor service to schedule retries. + * By default uses an executor configured on + * {@link io.helidon.faulttolerance.FaultTolerance#scheduledExecutor(java.util.function.Supplier)}. + * + * @param scheduledExecutor executor to use + * @return updated builder instance + */ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) { this.scheduledExecutor = LazyValue.create(scheduledExecutor); return this; } + + /** + * Overall timeout. + * When overall timeout is reached, execution terminates (even if the retry policy + * was not exhausted). + * + * @param overallTimeout an overall timeout + * @return updated builder instance + */ + public Builder overallTimeout(Duration overallTimeout) { + this.overallTimeout = overallTimeout; + return this; + } + + Set> applyOn() { + return applyOn; + } + + Set> skipOn() { + return skipOn; + } + + RetryPolicy retryPolicy() { + return retryPolicy; + } + + Duration overallTimeout() { + return overallTimeout; + } + + LazyValue scheduledExecutor() { + return scheduledExecutor; + } + } + + /** + * Retry policy to handle delays between retries. + * The implementation must not save state, as a single instance + * will be used by multiple threads and executions in parallel. + */ + interface RetryPolicy { + /** + * Return next delay in milliseconds, or an empty optional to finish retries. + * + * @param firstCallMillis milliseconds recorded before the first call using {@link System#currentTimeMillis()} + * @param lastDelay last delay that was used (0 for the first failed call) + * @param call call index (0 for the first failed call) + * @return how long to wait before trying again, or empty to notify this is the end of retries + */ + Optional nextDelayMillis(long firstCallMillis, long lastDelay, int call); + } + + /** + * A retry policy that prolongs the delays between retries by a defined factor. + *

+ * Consider the following setup: + *

    + *
  • {@code calls = 4}
  • + *
  • {@code delayMillis = 100}
  • + *
  • {@code factor = 2.0}
  • + *
+ * The following delays will be used for each call: + * + *
    + *
  • Initial call - always immediate (not handled by retry policy)
  • + *
  • First retry - 100 millis
  • + *
  • Second retry - 200 millis (previous delay * factor)
  • + *
  • Third retry - 400 millis (previous delay * factor)
  • + *
+ */ + class DelayingRetryPolicy implements RetryPolicy { + private final int calls; + private final long delayMillis; + private final double delayFactor; + + private DelayingRetryPolicy(Builder builder) { + this.calls = builder.calls; + this.delayMillis = builder.delay.toMillis(); + this.delayFactor = builder.delayFactor; + } + + /** + * A builder to customize configuration of {@link io.helidon.faulttolerance.Retry.DelayingRetryPolicy}. + * + * @return a new builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Create a retry policy with no delays and with the specified number of calls. + * + * @param calls number of calls to execute (retries + initial call) + * @return a no delay retry policy + */ + public static DelayingRetryPolicy noDelay(int calls) { + return builder() + .delay(Duration.ZERO) + .delayFactor(0) + .calls(calls) + .build(); + } + + @Override + public Optional nextDelayMillis(long firstCallMillis, long lastDelay, int call) { + if (call >= calls) { + return Optional.empty(); + } + + if (call == 0) { + return Optional.of(delayMillis); + } + + return Optional.of((long) (lastDelay * delayFactor)); + } + + /** + * Fluent API builder for {@link io.helidon.faulttolerance.Retry.DelayingRetryPolicy}. + */ + public static class Builder implements io.helidon.common.Builder { + private int calls = 3; + private double delayFactor = 2; + private Duration delay = Duration.ofMillis(200); + + @Override + public DelayingRetryPolicy build() { + return new DelayingRetryPolicy(this); + } + + /** + * Total number of calls (first + retries). + * + * @param calls how many times to call the method + * @return updated builder instance + */ + public Builder calls(int calls) { + this.calls = calls; + return this; + } + + /** + * Base delay between the invocations. + * + * @param delay delay between the invocations + * @return updated builder instance + */ + public Builder delay(Duration delay) { + this.delay = delay; + return this; + } + + /** + * A delay multiplication factor. + * + * @param delayFactor a delay multiplication factor + * @return updated builder instance + */ + public Builder delayFactor(double delayFactor) { + this.delayFactor = delayFactor; + return this; + } + } + } + + /** + * A retry policy that randomizes delays between execution using a "jitter" time. + *

+ * Consider the following setup: + *

    + *
  • {@code calls = 4}
  • + *
  • {@code delayMillis = 100}
  • + *
  • {@code jitter = 50}
  • + *
+ * The following delays will be used for each call: + * + *
    + *
  • Initial call - always immediate (not handled by retry policy)
  • + *
  • First retry: 50 - 150 millis (delay +- Random.nextInt(jitter)
  • + *
  • Second retry: 50 - 150 millis
  • + *
  • Third retry: 50 - 150 millis
  • + *
+ */ + class JitterRetryPolicy implements RetryPolicy { + private final int calls; + private final long delayMillis; + private final Supplier randomJitter; + + private JitterRetryPolicy(Builder builder) { + this.calls = builder.calls; + this.delayMillis = builder.delay.toMillis(); + long jitter = builder.jitter.toMillis(); + int jitterMillis = (jitter > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) jitter; + if (jitterMillis == 0) { + randomJitter = () -> 0; + } else { + Random random = new Random(); + // need a number [-jitterMillis,+jitterMillis] + randomJitter = () -> random.nextInt(jitterMillis * 2) - jitterMillis; + } + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public Optional nextDelayMillis(long firstCallNanos, long lastDelay, int call) { + if (call >= calls) { + return Optional.empty(); + } + + long delay = delayMillis; + int jitterRandom = randomJitter.get(); + delay = delay + jitterRandom; + delay = Math.max(0, delay); + + return Optional.of(delay); + } + + /** + * Fluent API builder for {@link io.helidon.faulttolerance.Retry.JitterRetryPolicy}. + */ + public static class Builder implements io.helidon.common.Builder { + private int calls = 3; + private Duration delay = Duration.ofMillis(200); + private Duration jitter = Duration.ofMillis(50); + + private Builder() { + } + + @Override + public JitterRetryPolicy build() { + return new JitterRetryPolicy(this); + } + + /** + * Total number of calls (first + retries). + * @param calls how many times to call the method + * @return updated builder instance + */ + public Builder calls(int calls) { + this.calls = calls; + return this; + } + + /** + * Base delay between the invocations. + * + * @param delay delay between the invocations + * @return updated builder instance + */ + public Builder delay(Duration delay) { + this.delay = delay; + return this; + } + + /** + * Random part of the delay. + * A number between {@code [-jitter,+jitter]} is applied to delay each time + * delay is calculated. + * + * @param jitter jitter duration + * @return updated builder instance + */ + public Builder jitter(Duration jitter) { + this.jitter = jitter; + return this; + } + } } } diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/RetryImpl.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/RetryImpl.java new file mode 100644 index 000000000..7f8f4001d --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/RetryImpl.java @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Multi; +import io.helidon.common.reactive.Single; + +class RetryImpl implements Retry { + private final LazyValue scheduledExecutor; + private final ErrorChecker errorChecker; + private final long maxTimeNanos; + private final Retry.RetryPolicy retryPolicy; + + RetryImpl(Retry.Builder builder) { + this.scheduledExecutor = builder.scheduledExecutor(); + this.errorChecker = ErrorChecker.create(builder.skipOn(), builder.applyOn()); + this.maxTimeNanos = builder.overallTimeout().toNanos(); + this.retryPolicy = builder.retryPolicy(); + } + + @Override + public Multi invokeMulti(Supplier> supplier) { + return retryMulti(new RetryContext<>(supplier)); + } + + @Override + public Single invoke(Supplier> supplier) { + return retrySingle(new RetryContext<>(supplier)); + } + + private Single retrySingle(RetryContext> context) { + long delay = 0; + if (context.count.get() != 0) { + Optional maybeDelay = retryPolicy.nextDelayMillis(context.startedMillis, + context.lastDelay.get(), + context.count.getAndIncrement()); + if (maybeDelay.isEmpty()) { + return Single.error(context.throwable()); + } + delay = maybeDelay.get(); + } + + long nanos = System.nanoTime() - context.startedNanos; + if (nanos > maxTimeNanos) { + return Single.error(new TimeoutException("Execution took too long. Already executing: " + + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms, must timeout after: " + + TimeUnit.NANOSECONDS.toMillis(maxTimeNanos) + " ms.")); + } + + DelayedTask> task = DelayedTask.createSingle(context.supplier); + if (delay == 0) { + task.execute(); + } else { + scheduledExecutor.get().schedule(task::execute, delay, TimeUnit.MILLISECONDS); + } + + return task.result() + .onErrorResumeWithSingle(throwable -> { + Throwable cause = FaultTolerance.cause(throwable); + context.thrown.add(cause); + if (errorChecker.shouldSkip(cause)) { + return Single.error(context.throwable()); + } + return retrySingle(context); + }); + } + + private Multi retryMulti(RetryContext> context) { + + long delay = 0; + if (context.count.get() != 0) { + Optional maybeDelay = retryPolicy.nextDelayMillis(context.startedMillis, + context.lastDelay.get(), + context.count.getAndIncrement()); + if (maybeDelay.isEmpty()) { + return Multi.error(context.throwable()); + } + delay = maybeDelay.get(); + } + + long nanos = System.nanoTime() - context.startedNanos; + if (nanos > maxTimeNanos) { + return Multi.error(new TimeoutException("Execution took too long. Already executing: " + + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms, must timeout after: " + + TimeUnit.NANOSECONDS.toMillis(maxTimeNanos) + " ms.")); + } + + DelayedTask> task = DelayedTask.createMulti(context.supplier); + if (delay == 0) { + task.execute(); + } else { + scheduledExecutor.get().schedule(task::execute, delay, TimeUnit.MILLISECONDS); + } + + return task.result() + .onErrorResumeWith(throwable -> { + Throwable cause = FaultTolerance.cause(throwable); + context.thrown.add(cause); + if (task.hadData() || errorChecker.shouldSkip(cause)) { + return Multi.error(context.throwable()); + } + return retryMulti(context); + }); + } + + private static class RetryContext { + // retry runtime + private final long startedMillis = System.currentTimeMillis(); + private final long startedNanos = System.nanoTime(); + private final AtomicInteger count = new AtomicInteger(); + private final List thrown = new LinkedList<>(); + private final AtomicLong lastDelay = new AtomicLong(); + + private final Supplier supplier; + + RetryContext(Supplier supplier) { + this.supplier = supplier; + } + + Throwable throwable() { + if (thrown.isEmpty()) { + return new IllegalStateException("Exception list is empty"); + } + Throwable last = thrown.get(thrown.size() - 1); + for (int i = 0; i < thrown.size() - 1; i++) { + last.addSuppressed(thrown.get(i)); + } + return last; + } + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Timeout.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Timeout.java index 5944b46a7..65fd16d53 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Timeout.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Timeout.java @@ -18,11 +18,13 @@ package io.helidon.faulttolerance; import java.time.Duration; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; public class Timeout implements Handler { @@ -38,6 +40,11 @@ public class Timeout implements Handler { return new Builder(); } + @Override + public Multi invokeMulti(Supplier> supplier) { + return null; + } + @Override public Single invoke(Supplier> supplier) { return Single.create(supplier.get()) diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/TypedHandler.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/TypedHandler.java index 761441f35..2aa037d6d 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/TypedHandler.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/TypedHandler.java @@ -17,11 +17,13 @@ package io.helidon.faulttolerance; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; import java.util.function.Supplier; +import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; -@FunctionalInterface public interface TypedHandler { Single invoke(Supplier> supplier); + Multi invokeMulti(Supplier> supplier); } diff --git a/fault-tolerance/src/main/java/module-info.java b/fault-tolerance/src/main/java/module-info.java index ff9fb62b7..3d8974c85 100644 --- a/fault-tolerance/src/main/java/module-info.java +++ b/fault-tolerance/src/main/java/module-info.java @@ -1,4 +1,5 @@ module io.helidon.faulttolerance { requires io.helidon.config; requires io.helidon.common.configurable; + requires java.logging; } \ No newline at end of file diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java index 231026842..9cd756b62 100644 --- a/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java @@ -16,6 +16,7 @@ package io.helidon.faulttolerance; +import java.util.List; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -63,6 +64,10 @@ class AsyncTest { assertThat(cause, instanceOf(MyException.class)); } + private List syncList() { + return List.of("hi", "there"); + } + private String syncError() { throw new MyException(); } diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/BulkheadTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/BulkheadTest.java index 0587f93d3..895d95e19 100644 --- a/fault-tolerance/src/test/java/io/helidon/faulttolerance/BulkheadTest.java +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/BulkheadTest.java @@ -16,17 +16,24 @@ package io.helidon.faulttolerance; +import java.util.List; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; +import io.helidon.common.LogConfig; +import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -34,11 +41,50 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; class BulkheadTest { + @BeforeAll + static void setupTest() { + LogConfig.configureRuntime(); + } + + @Test + void testBulkheadQueue() throws InterruptedException { + Bulkhead bulkhead = Bulkhead.builder() + .limit(1) + .queueLength(1000) + .build(); + + Request inProgress = new Request(0); + Single inProgressResult = bulkhead.invoke(inProgress::invoke); + + Request[] aLotRequests = new Request[999]; + Single[] aLotResults = new Single[999]; + for (int i = 0; i < aLotRequests.length; i++) { + Request req = new Request(i); + aLotRequests[i] = req; + aLotResults[i] = bulkhead.invoke(req::invoke); + } + + for (Request req : aLotRequests) { + req.releaseCdl.countDown(); + } + + inProgress.releaseCdl.countDown(); + if (inProgress.invokedCdl.await(1, TimeUnit.SECONDS)) { + for (Single result : aLotResults) { + result.await(1, TimeUnit.SECONDS); + } + } else { + fail("Should have invoked the first"); + } + } + @Test void testBulkhead() throws InterruptedException { + String name = "unit:testBulkhead"; Bulkhead bulkhead = Bulkhead.builder() .limit(1) .queueLength(1) + .name(name) .build(); Request inProgress = new Request(0); @@ -79,11 +125,11 @@ class BulkheadTest { assertThat(cause, notNullValue()); assertThat(cause, instanceOf(BulkheadException.class)); - assertThat(cause.getMessage(), is("Bulkhead queue is full")); + assertThat(cause.getMessage(), is("Bulkhead queue \"" + name + "\" is full")); } @Test - void testBulkheadWithError() throws InterruptedException { + void testBulkheadWithError() { Bulkhead bulkhead = Bulkhead.builder() .limit(1) .queueLength(1) @@ -99,7 +145,59 @@ class BulkheadTest { inProgress.releaseCdl.countDown(); FaultToleranceTest.completionException(result, IllegalStateException.class); + } + @Test + void testBulkheadWithMulti() { + Bulkhead bulkhead = Bulkhead.builder() + .limit(1) + .queueLength(1) + .build(); + Single result = bulkhead.invoke(() -> Single.error(new IllegalStateException())); + FaultToleranceTest.completionException(result, IllegalStateException.class); + + MultiRequest inProgress = new MultiRequest(0, 5); + Multi multi = bulkhead.invokeMulti(inProgress::invoke); + + // queued + result = bulkhead.invoke(() -> Single.error(new IllegalStateException())); + inProgress.releaseCdl.countDown(); + List allInts = multi.collectList() + .await(1, TimeUnit.SECONDS); + assertThat(allInts, contains(0, 1, 2, 3, 4)); + + FaultToleranceTest.completionException(result, IllegalStateException.class); + } + + private static class MultiRequest { + private final CountDownLatch releaseCdl = new CountDownLatch(1); + private final CountDownLatch invokedCdl = new CountDownLatch(1); + private final AtomicBoolean invoked = new AtomicBoolean(); + private final AtomicBoolean indexed = new AtomicBoolean(); + + private final int index; + private int count; + + MultiRequest(int index, int count) { + this.index = index; + this.count = count; + } + + Flow.Publisher invoke() { + invoked.set(true); + invokedCdl.countDown(); + return FaultTolerance.async(this::index) + .flatMap(it -> Multi.create(IntStream.range(it, it + count).boxed())); + } + + private int index() { + try { + releaseCdl.await(); + } catch (InterruptedException e) { + } + indexed.set(true); + return index; + } } private static class Request { diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/CircuitBreakerTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/CircuitBreakerTest.java index d68653a30..f2469dca9 100644 --- a/fault-tolerance/src/test/java/io/helidon/faulttolerance/CircuitBreakerTest.java +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/CircuitBreakerTest.java @@ -17,16 +17,19 @@ package io.helidon.faulttolerance; import java.time.Duration; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -37,7 +40,7 @@ class CircuitBreakerTest { void testCircuitBreaker() throws InterruptedException { CircuitBreaker breaker = CircuitBreaker.builder() .volume(10) - .ratio(20) + .errorRatio(20) .delay(Duration.ofMillis(200)) .successThreshold(2) .build(); @@ -48,19 +51,21 @@ class CircuitBreakerTest { bad(breaker); good(breaker); + goodMulti(breaker); // should open the breaker bad(breaker); breakerOpen(breaker); + breakerOpenMulti(breaker); assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); // need to wait until half open int count = 0; - while(count++ < 10) { + while (count++ < 10) { Thread.sleep(50); - if(breaker.state() == CircuitBreaker.State.HALF_OPEN) { + if (breaker.state() == CircuitBreaker.State.HALF_OPEN) { break; } } @@ -80,15 +85,15 @@ class CircuitBreakerTest { // need to wait until half open count = 0; - while(count++ < 10) { + while (count++ < 10) { Thread.sleep(50); - if(breaker.state() == CircuitBreaker.State.HALF_OPEN) { + if (breaker.state() == CircuitBreaker.State.HALF_OPEN) { break; } } good(breaker); - bad(breaker); + badMulti(breaker); assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); } @@ -104,6 +109,18 @@ class CircuitBreakerTest { assertThat(cause, instanceOf(CircuitBreakerOpenException.class)); } + private void breakerOpenMulti(CircuitBreaker breaker) { + Multi good = Multi.just(0, 1, 2); + Multi result = breaker.invokeMulti(() -> good); + CompletionException exception = assertThrows(CompletionException.class, + () -> result.collectList().await(1, TimeUnit.SECONDS)); + + Throwable cause = exception.getCause(); + + assertThat(cause, notNullValue()); + assertThat(cause, instanceOf(CircuitBreakerOpenException.class)); + } + private void bad(CircuitBreaker breaker) { Failing failing = new Failing(new IllegalStateException("Fail")); Single failedResult = breaker.invoke(failing::invoke); @@ -116,12 +133,34 @@ class CircuitBreakerTest { } + private void badMulti(CircuitBreaker breaker) { + Multi failing = Multi.error(new IllegalStateException("Fail")); + Multi failedResult = breaker.invokeMulti(() -> failing); + + CompletionException exception = assertThrows(CompletionException.class, + () -> failedResult.collectList().await(1, TimeUnit.SECONDS)); + + Throwable cause = exception.getCause(); + + assertThat(cause, notNullValue()); + assertThat(cause, instanceOf(IllegalStateException.class)); + + } + private void good(CircuitBreaker breaker) { Request good = new Request(); Single result = breaker.invoke(good::invoke); result.await(1, TimeUnit.SECONDS); } + private void goodMulti(CircuitBreaker breaker) { + Multi good = Multi.just(0, 1, 2); + Multi result = breaker.invokeMulti(() -> good); + List list = result.collectList().await(1, TimeUnit.SECONDS); + + assertThat(list, contains(0, 1, 2)); + } + private static class Failing { private final Exception exception; diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/DelayRetryPolicyTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/DelayRetryPolicyTest.java new file mode 100644 index 000000000..31e2347a1 --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/DelayRetryPolicyTest.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.time.Duration; +import java.util.Optional; + +import org.junit.jupiter.api.Test; + +import static io.helidon.config.testing.OptionalMatcher.empty; +import static io.helidon.config.testing.OptionalMatcher.value; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class DelayRetryPolicyTest { + @Test + void testDelay() { + Retry.DelayingRetryPolicy policy = Retry.DelayingRetryPolicy.builder() + .delay(Duration.ofMillis(100)) + .calls(3) + .delayFactor(3) + .build(); + + long firstCall = System.nanoTime(); + + Optional aLong = policy.nextDelayMillis(firstCall, 0, 0); + assertThat(aLong, value(is(100L))); + aLong = policy.nextDelayMillis(firstCall, 100, 1); + assertThat(aLong, value(is(300L))); + aLong = policy.nextDelayMillis(firstCall, 100, 2); // should just apply factor on last delay + assertThat(aLong, value(is(300L))); + aLong = policy.nextDelayMillis(firstCall, 100, 3); // limit of calls + assertThat(aLong, is(empty())); + } + + @Test + void testNoDelay() { + Retry.DelayingRetryPolicy policy = Retry.DelayingRetryPolicy.builder() + .delay(Duration.ZERO) + .calls(3) + .delayFactor(3) + .build(); + + long firstCall = System.nanoTime(); + + Optional aLong = policy.nextDelayMillis(firstCall, 0, 0); + assertThat(aLong, value(is(0L))); + aLong = policy.nextDelayMillis(firstCall, 0, 1); + assertThat(aLong, value(is(0L))); + aLong = policy.nextDelayMillis(firstCall, 0, 2); // should just apply factor on last delay + assertThat(aLong, value(is(0L))); + aLong = policy.nextDelayMillis(firstCall, 100, 3); // limit of calls + assertThat(aLong, is(empty())); + } +} diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/JitterRetryPolicyTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/JitterRetryPolicyTest.java new file mode 100644 index 000000000..10257de8b --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/JitterRetryPolicyTest.java @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.time.Duration; +import java.util.Optional; + +import io.helidon.faulttolerance.Retry.JitterRetryPolicy; + +import org.junit.jupiter.api.Test; + +import static io.helidon.config.testing.OptionalMatcher.empty; +import static io.helidon.config.testing.OptionalMatcher.present; +import static io.helidon.config.testing.OptionalMatcher.value; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + +public class JitterRetryPolicyTest { + @Test + void testFixedDelay() { + JitterRetryPolicy policy = JitterRetryPolicy.builder() + .jitter(Duration.ZERO) + .delay(Duration.ofMillis(100)) + .calls(3) + .build(); + + long firstCall = System.nanoTime(); + Optional aLong = policy.nextDelayMillis(firstCall, 0, 0); + assertThat(aLong, value(is(100L))); + aLong = policy.nextDelayMillis(firstCall, aLong.get(), 1); + assertThat(aLong, value(is(100L))); + } + + @Test + void testRepeats() { + JitterRetryPolicy policy = JitterRetryPolicy.builder() + .jitter(Duration.ZERO) + .delay(Duration.ofMillis(100)) + .calls(3) + .build(); + + for (int i = 0; i < 100; i++) { + long firstCall = System.nanoTime(); + // running in cycle to ensure we do not store state + assertThat(policy.nextDelayMillis(firstCall, 0, 1), value(is(100L))); + assertThat(policy.nextDelayMillis(firstCall, 0, 2), value(is(100L))); + assertThat(policy.nextDelayMillis(firstCall, 0, 3), is(empty())); + assertThat(policy.nextDelayMillis(firstCall, 0, 1), value(is(100L))); + assertThat(policy.nextDelayMillis(firstCall, 0, 3), is(empty())); + } + } + + @Test + void testRandomDelay() { + JitterRetryPolicy policy = JitterRetryPolicy.builder() + .jitter(Duration.ofMillis(50)) + .delay(Duration.ofMillis(100)) + .calls(3) + .build(); + + long firstCall = System.nanoTime(); + + boolean hadNegative = false; + boolean hadPositive = false; + for (int i = 0; i < 10000; i++) { + Optional aLong = policy.nextDelayMillis(firstCall, 0, 0); + assertThat(aLong, present()); + long value = aLong.get(); + assertThat(value, is(both(greaterThan(49L)).and(lessThan(151L)))); + if (value < 100) { + hadNegative = true; + } else if (value > 100) { + hadPositive = true; + } + } + + assertThat("In 10000 tries we should get at least one negative jitter", hadNegative, is(true)); + assertThat("In 10000 tries we should get at least one positive jitter", hadPositive, is(true)); + } + + @Test + void testNoDelayJitter() { + JitterRetryPolicy policy = JitterRetryPolicy.builder() + .jitter(Duration.ofMillis(50)) + .delay(Duration.ZERO) + .calls(3) + .build(); + + long firstCall = System.nanoTime(); + + boolean hadPositive = false; + boolean hadZero = false; + for (int i = 0; i < 10000; i++) { + Optional aLong = policy.nextDelayMillis(firstCall, 0, 0); + assertThat(aLong, present()); + long value = aLong.get(); + assertThat(value, is(both(greaterThan(-1L)).and(lessThan(50L)))); + if (value == 0) { + hadZero = true; + } else if (value > 0) { + hadPositive = true; + } + } + + assertThat("In 10000 tries we should get at least one negative jitter", hadZero, is(true)); + assertThat("In 10000 tries we should get at least one positive jitter", hadPositive, is(true)); + } + + @Test + void testNoDelay() { + JitterRetryPolicy policy = JitterRetryPolicy.builder() + .jitter(Duration.ZERO) + .delay(Duration.ZERO) + .calls(3) + .build(); + + + long firstCall = System.nanoTime(); + + Optional aLong = policy.nextDelayMillis(firstCall, 0, 0); + assertThat(aLong, value(is(0L))); + aLong = policy.nextDelayMillis(firstCall, 0, 1); + assertThat(aLong, value(is(0L))); + aLong = policy.nextDelayMillis(firstCall, 0, 2); // should just apply factor on last delay + assertThat(aLong, value(is(0L))); + aLong = policy.nextDelayMillis(firstCall, 100, 3); // limit of calls + assertThat(aLong, is(empty())); + } +} diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/RetryTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/RetryTest.java index 644b92fc5..6c2d2f4ac 100644 --- a/fault-tolerance/src/test/java/io/helidon/faulttolerance/RetryTest.java +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/RetryTest.java @@ -17,27 +17,38 @@ package io.helidon.faulttolerance; import java.time.Duration; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; import org.junit.jupiter.api.Test; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.junit.jupiter.api.Assertions.assertThrows; class RetryTest { @Test void testRetry() { Retry retry = Retry.builder() - .calls(3) - .delay(Duration.ofMillis(50)) - .maxTime(Duration.ofMillis(500)) - .jitter(Duration.ofMillis(50)) + .retryPolicy(Retry.JitterRetryPolicy.builder() + .calls(3) + .delay(Duration.ofMillis(50)) + .jitter(Duration.ofMillis(50)) + .build()) + .overallTimeout(Duration.ofMillis(500)) .build(); Request req = new Request(3, new TerminalException(), new RetryException()); @@ -54,11 +65,13 @@ class RetryTest { @Test void testRetryOn() { Retry retry = Retry.builder() - .calls(3) - .delay(Duration.ofMillis(100)) - .maxTime(Duration.ofMillis(500)) - .jitter(Duration.ofMillis(50)) - .addRetryOn(RetryException.class) + .retryPolicy(Retry.JitterRetryPolicy.builder() + .calls(3) + .delay(Duration.ofMillis(100)) + .jitter(Duration.ofMillis(50)) + .build()) + .overallTimeout(Duration.ofMillis(500)) + .addApplyOn(RetryException.class) .build(); Request req = new Request(3, new RetryException(), new RetryException()); @@ -80,11 +93,13 @@ class RetryTest { @Test void testAbortOn() { Retry retry = Retry.builder() - .calls(3) - .delay(Duration.ofMillis(100)) - .maxTime(Duration.ofMillis(500)) - .jitter(Duration.ofMillis(50)) - .addAbortOn(TerminalException.class) + .retryPolicy(Retry.JitterRetryPolicy.builder() + .calls(3) + .delay(Duration.ofMillis(100)) + .jitter(Duration.ofMillis(50)) + .build()) + .overallTimeout(Duration.ofMillis(500)) + .addSkipOn(TerminalException.class) .build(); Request req = new Request(3, new RetryException(), new RetryException()); @@ -106,29 +121,179 @@ class RetryTest { @Test void testTimeout() { Retry retry = Retry.builder() - .calls(3) - .delay(Duration.ofMillis(50)) - .maxTime(Duration.ofMillis(45)) - .jitter(Duration.ofMillis(1)) + .retryPolicy(Retry.JitterRetryPolicy.builder() + .calls(3) + .delay(Duration.ofMillis(100)) + .jitter(Duration.ZERO) + .build()) + .overallTimeout(Duration.ofMillis(50)) .build(); Request req = new Request(3, new RetryException(), new RetryException()); Single result = retry.invoke(req::invoke); FaultToleranceTest.completionException(result, TimeoutException.class); - assertThat(req.call.get(), is(2)); + assertThat("Should have been called once", req.call.get(), is(1)); } @Test void testBadConfiguration() { Retry.Builder builder = Retry.builder() - .retryOn(RetryException.class) - .abortOn(TerminalException.class); + .applyOn(RetryException.class) + .skipOn(TerminalException.class); assertThrows(IllegalArgumentException.class, builder::build); } + @Test + void testMultiRetriesNoFailure() throws InterruptedException { + Retry retry = Retry.builder() + .retryPolicy(Retry.DelayingRetryPolicy.noDelay(3)) + .build(); - private static class Request { + Multi multi = retry.invokeMulti(() -> Multi.just(0, 1, 2)); + + TestSubscriber ts = new TestSubscriber(); + multi.subscribe(ts); + ts.request(100); + + ts.cdl.await(1, TimeUnit.SECONDS); + + assertThat("Should be completed", ts.completed.get(), is(true)); + assertThat("Should not be failed", ts.failed.get(), is(false)); + assertThat(ts.values, contains(0, 1, 2)); + } + + @Test + void testMultiRetries() throws InterruptedException { + Retry retry = Retry.builder() + .retryPolicy(Retry.DelayingRetryPolicy.noDelay(3)) + .build(); + + AtomicInteger count = new AtomicInteger(); + + Multi multi = retry.invokeMulti(() -> { + if (count.getAndIncrement() < 2) { + return Multi.error(new RetryException()); + } else { + return Multi.just(0, 1, 2); + } + }); + + TestSubscriber ts = new TestSubscriber(); + multi.subscribe(ts); + ts.request(100); + + ts.cdl.await(1, TimeUnit.SECONDS); + + assertThat("Should be completed", ts.completed.get(), is(true)); + assertThat("Should not be failed", ts.failed.get(), is(false)); + assertThat(ts.values, contains(0, 1, 2)); + } + + @Test + void testMultiRetriesRead() throws InterruptedException { + Retry retry = Retry.builder() + .retryPolicy(Retry.DelayingRetryPolicy.noDelay(3)) + .build(); + + AtomicInteger count = new AtomicInteger(); + + TestSubscriber ts = new TestSubscriber(); + + Multi multi = retry.invokeMulti(() -> { + if (count.getAndIncrement() == 0) { + //return new PartialPublisher(); + return Multi.concat(Multi.just(0), Multi.error(new RetryException())); + } else { + TestSubscriber it = ts; + return Multi.just(0, 1, 2); + } + }); + + +// multi.onError(Throwable::printStackTrace) +// .forEach(System.out::println); +// +// multi = Multi.concat(Multi.just(0), Multi.error(new RetryException())); + + multi.subscribe(ts); + ts.request(2); + + ts.cdl.await(1, TimeUnit.SECONDS); + + assertThat("Should be failed", ts.failed.get(), is(true)); + assertThat(ts.throwable.get(), instanceOf(RetryException.class)); + assertThat("Should not be completed", ts.completed.get(), is(false)); + + } + + private static class PartialPublisher implements Flow.Publisher { + @Override + public void subscribe(Flow.Subscriber subscriber) { + subscriber.onSubscribe(new Flow.Subscription() { + @Override + public void request(long n) { + subscriber.onNext(1); + subscriber.onError(new RetryException()); + } + + @Override + public void cancel() { + + } + }); + } + } + + private static class TestSubscriber implements Flow.Subscriber { + private final AtomicBoolean failed = new AtomicBoolean(); + private final AtomicReference throwable = new AtomicReference<>(); + private final AtomicBoolean completed = new AtomicBoolean(); + private final List values = new LinkedList<>(); + private final AtomicBoolean finished = new AtomicBoolean(); + private final CountDownLatch cdl = new CountDownLatch(1); + + private Flow.Subscription subscription; + + void request(long n) { + subscription.request(n); + } + + void cancel() { + subscription.cancel(); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + } + + @Override + public void onNext(Integer item) { + values.add(item); + } + + @Override + public void onError(Throwable throwable) { + failed.set(true); + this.throwable.set(throwable); + finish(); + } + + @Override + public void onComplete() { + this.completed.set(true); + finish(); + } + + private void finish() { + if (finished.compareAndSet(false, true)) { + cdl.countDown(); + } + } + } + + private static class Request { private final AtomicInteger call = new AtomicInteger(); private final int failures; private final RuntimeException first; @@ -161,6 +326,5 @@ class RetryTest { } private static class TerminalException extends RuntimeException { - } } \ No newline at end of file diff --git a/fault-tolerance/src/test/resources/logging.properties b/fault-tolerance/src/test/resources/logging.properties new file mode 100644 index 000000000..c1ece269d --- /dev/null +++ b/fault-tolerance/src/test/resources/logging.properties @@ -0,0 +1,32 @@ +# +# Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Example Logging Configuration File +# For more information see $JAVA_HOME/jre/lib/logging.properties + +# Send messages to the console +handlers=io.helidon.common.HelidonConsoleHandler + +# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread +java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n + +# Global logging level. Can be overridden by specific loggers +.level=WARNING + +io.helidon.level=INFO +io.helidon.faulttolerance.level=FINEST + +