diff --git a/common/configurable/src/main/java/io/helidon/common/configurable/ScheduledThreadPoolSupplier.java b/common/configurable/src/main/java/io/helidon/common/configurable/ScheduledThreadPoolSupplier.java index f4a7b5680..3b5dc4e6c 100644 --- a/common/configurable/src/main/java/io/helidon/common/configurable/ScheduledThreadPoolSupplier.java +++ b/common/configurable/src/main/java/io/helidon/common/configurable/ScheduledThreadPoolSupplier.java @@ -16,7 +16,6 @@ package io.helidon.common.configurable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -32,7 +31,7 @@ import io.helidon.config.Config; * Supplier of a custom scheduled thread pool. * The returned thread pool supports {@link io.helidon.common.context.Context} propagation. */ -public final class ScheduledThreadPoolSupplier implements Supplier { +public final class ScheduledThreadPoolSupplier implements Supplier { private static final int EXECUTOR_DEFAULT_CORE_POOL_SIZE = 16; private static final boolean EXECUTOR_DEFAULT_IS_DAEMON = true; private static final String EXECUTOR_DEFAULT_THREAD_NAME_PREFIX = "helidon-"; diff --git a/fault-tolerance/DESIGN.md b/fault-tolerance/DESIGN.md new file mode 100644 index 000000000..8c0262c7e --- /dev/null +++ b/fault-tolerance/DESIGN.md @@ -0,0 +1,77 @@ +Fault Tolerance +--- + +Fault tolerance (FT) covers a wide area of features. The following document +describes each FT feature and its API for Helidon SE (or approach to use to achieve such a feature using +existing APIs). + +# Common API +The FT requires executor service (or more) to handle some of the features provided. To be able to +configure FT for the whole application, a set of static methods exists on class `FaultTolerance`. + +- `FaultTolerance.config(Config)` - use a Helidon config instance to configure defaults + +# Asynchronous +Provides an asynchronous execution for a blocking operation. As Helidon SE network stack is using a +non blocking reactive API, applications cannot block the threads. You can use this API to execute a blocking +operation in a separate executor service and obtain its result as a Helidon reactive `Single`. + +Configuration: +- executor service + +# Bulkhead +Limits the number of parallel calls to a single resource. + +Configuration: +- parallel execution limit +- executor service +- queue +- number of queued records + +# Circuit Breaker +Defines a circuit breaker policy to an individual method or a class. +A circuit breaker aims to prevent further damage by not executing functionality that is doomed to fail. After a failure situation has been detected, circuit breakers prevent methods from being executed and instead throw exceptions immediately. After a certain delay or wait time, the functionality is attempted to be executed again. +A circuit breaker can be in one of the following states: +Closed: In normal operation, the circuit is closed. If a failure occurs, the Circuit Breaker records the event. In closed state the requestVolumeThreshold and failureRatio parameters may be configured in order to specify the conditions under which the breaker will transition the circuit to open. If the failure conditions are met, the circuit will be opened. +Open: When the circuit is open, calls to the service operating under the circuit breaker will fail immediately. A delay may be configured for the circuit breaker. After the specified delay, the circuit transitions to half-open state. +Half-open: In half-open state, trial executions of the service are allowed. By default one trial call to the service is permitted. If the call fails, the circuit will return to open state. The successThreshold parameter allows the configuration of the number of trial executions that must succeed before the circuit can be closed. After the specified number of successful executions, the circuit will be closed. If a failure occurs before the successThreshold is reached the circuit will transition to open. +Circuit state transitions will reset the circuit breaker's records. + +Configuration: +- fail on `` - these are failures +- skip on `` - these are not failures +- delay (duration) - how long before transitioning from open to half-open +- volume threshold - rolling window size +- ratio (percentage) - how many failures will trigger this to open +- success threshold - how many successful calls will close a half-open breaker + +# Fallback +What to call when this fails. +Could be replaced with `onException` of `CompletionStage` + +May provide a context of execution with information such as + - `Class` + - some description of the method called (not reflection - we want to avoid reflection at all costs) + - parameter values posted to original method + +Configuration: +- fallback method/handler +- applyOn `` - these are failures +- skip on `` - these are not failures + +# Retry +Retry execution. + +Configuration: +- maximal number of retries +- delay between retries (duration) +- overall maximal duration +- jitter (randomize delays a bit - duration) - a jitter of 200 ms will randomly add between -200 and 200 milliseconds to each retry delay. +- retry on `` - these are failures +- abort on `` - these will immediately abort retries + +# Timeout +Can be simply replaced with `Single.timeout` + +Configuration: +- overall timeout (duration) diff --git a/fault-tolerance/pom.xml b/fault-tolerance/pom.xml new file mode 100644 index 000000000..4383369e0 --- /dev/null +++ b/fault-tolerance/pom.xml @@ -0,0 +1,53 @@ + + + + + 4.0.0 + + io.helidon + helidon-project + 2.0.1-SNAPSHOT + + + io.helidon.fault-tolerance + helidon-fault-tolerance + Helidon Fault Tolerance + + + + io.helidon.config + helidon-config + + + io.helidon.common + helidon-common-configurable + + + + org.hamcrest + hamcrest-all + test + + + org.junit.jupiter + junit-jupiter-api + test + + + \ No newline at end of file diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Async.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Async.java new file mode 100644 index 000000000..6ce8a96de --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Async.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; + +import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Single; + +public class Async { + private final LazyValue executor; + + public Async(Builder builder) { + this.executor = LazyValue.create(builder.executor); + } + + public static Async create() { + return builder().build(); + } + + public Single invoke(Supplier supplier) { + CompletableFuture future = new CompletableFuture<>(); + + executor.get() + .submit(() -> { + try { + T result = supplier.get(); + future.complete(result); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + + return Single.create(future); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder implements io.helidon.common.Builder { + private LazyValue executor = FaultTolerance.executor(); + + private Builder() { + } + + @Override + public Async build() { + return new Async(this); + } + + public Builder executor(Supplier executor) { + this.executor = LazyValue.create(executor); + return this; + } + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/AtomicCycle.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/AtomicCycle.java new file mode 100644 index 000000000..5b7961160 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/AtomicCycle.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.atomic.AtomicInteger; + +final class AtomicCycle { + private final AtomicInteger atomicInteger = new AtomicInteger(-1); + private final int maxIndex; + + AtomicCycle(int maxIndex) { + this.maxIndex = maxIndex; + } + + int incrementAndGet() { + int currentIndex; + int nextIndex; + do { + currentIndex = atomicInteger.get(); + nextIndex = (currentIndex == maxIndex) ? 0 : currentIndex + 1; + } while (!atomicInteger.compareAndSet(currentIndex, nextIndex)); + + return nextIndex; + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Bulkhead.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Bulkhead.java new file mode 100644 index 000000000..4faf8c9a5 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Bulkhead.java @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.function.Supplier; + +import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Single; + +public class Bulkhead implements Handler { + private final Queue> queue; + private final Semaphore inProgress; + + private Bulkhead(Builder builder) { + this.inProgress = new Semaphore(builder.limit, true); + if (builder.queueLength == 0) { + queue = new NoQueue(); + } else { + this.queue = new LinkedBlockingQueue<>(builder.queueLength); + } + } + + public static Builder builder() { + return new Builder(); + } + + @SuppressWarnings("unchecked") + @Override + public Single invoke(Supplier> supplier) { + if (inProgress.tryAcquire()) { + CompletionStage result = supplier.get(); + + result.handle((it, throwable) -> { + // we still have an acquired semaphore + Enqueued polled = queue.poll(); + while (polled != null) { + invokeEnqueued((Enqueued) polled); + polled = queue.poll(); + } + inProgress.release(); + return null; + }); + + return Single.create(result); + } else { + Enqueued enqueued = new Enqueued<>(supplier); + if (!queue.offer(enqueued)) { + return Single.error(new BulkheadException("Bulkhead queue is full")); + } + return Single.create(enqueued.future()); + } + } + + private void invokeEnqueued(Enqueued enqueued) { + CompletableFuture future = enqueued.future(); + CompletionStage completionStage = enqueued.originalStage(); + completionStage.thenAccept(future::complete); + completionStage.exceptionally(throwable -> { + future.completeExceptionally(throwable); + return null; + }); + } + + private static class Enqueued { + private LazyValue> resultFuture = LazyValue.create(CompletableFuture::new); + private Supplier> supplier; + + private Enqueued(Supplier> supplier) { + this.supplier = supplier; + } + + private CompletableFuture future() { + return resultFuture.get(); + } + + private CompletionStage originalStage() { + return supplier.get(); + } + } + + public static class Builder implements io.helidon.common.Builder { + private int limit; + private int queueLength = 10; + + private Builder() { + } + + @Override + public Bulkhead build() { + return new Bulkhead(this); + } + + public Builder limit(int limit) { + this.limit = limit; + return this; + } + + public Builder queueLength(int queueLength) { + this.queueLength = queueLength; + return this; + } + } + + private static class NoQueue extends ArrayDeque> { + @Override + public boolean offer(Enqueued enqueued) { + return false; + } + + @Override + public Enqueued poll() { + return null; + } + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadException.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadException.java new file mode 100644 index 000000000..b2615ed06 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadException.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +public class BulkheadException extends RuntimeException { + public BulkheadException(String message) { + super(message); + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreaker.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreaker.java new file mode 100644 index 000000000..d18d1f59d --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreaker.java @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Single; + +import static io.helidon.faulttolerance.ResultWindow.Result.FAILURE; +import static io.helidon.faulttolerance.ResultWindow.Result.SUCCESS; + +public class CircuitBreaker implements Handler { + /* + Configuration options + */ + private final LazyValue executor; + // how long to transition from open to half-open + private final long delayMillis; + // how many successful calls will close a half-open breaker + private final int successThreshold; + + /* + Runtime + */ + private final AtomicReference state = new AtomicReference<>(State.CLOSED); + // rolling window for counting errors to (maybe) open the breaker + private final ResultWindow results; + // to close from half-open + private final AtomicInteger successCounter = new AtomicInteger(); + private final AtomicBoolean halfOpenInProgress = new AtomicBoolean(); + private final AtomicReference> schedule = new AtomicReference<>(); + + private CircuitBreaker(Builder builder) { + this.delayMillis = builder.delay.toMillis(); + this.successThreshold = builder.successThreshold; + this.results = new ResultWindow(builder.volume, builder.ratio); + this.executor = builder.executor(); + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public Single invoke(Supplier> supplier) { + if (state.get() == State.CLOSED) { + // run it! + CompletionStage result = supplier.get(); + result.handle((it, exception) -> { + if (exception == null) { + // success + results.update(SUCCESS); + } else { + results.update(FAILURE); + if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) { + results.reset(); + // if we successfully switch to open, we need to schedule switch to half-open + scheduleHalf(); + } + } + + return it; + }); + return Single.create(result); + } else if (state.get() == State.OPEN) { + // fail it! + return Single.error(new CircuitBreakerOpenException("CircuitBreaker is open")); + } else { + // half-open + if (halfOpenInProgress.compareAndSet(false, true)) { + CompletionStage result = supplier.get(); + result.handle((it, exception) -> { + if (exception == null) { + // success + int successes = successCounter.incrementAndGet(); + if (successes >= successThreshold) { + // transition to closed + successCounter.set(0); + state.compareAndSet(State.HALF_OPEN, State.CLOSED); + halfOpenInProgress.set(false); + } + halfOpenInProgress.set(false); + } else { + // failure + successCounter.set(0); + state.set(State.OPEN); + halfOpenInProgress.set(false); + // if we successfully switch to open, we need to schedule switch to half-open + scheduleHalf(); + } + + return it; + }); + return Single.create(result); + } else { + return Single + .error(new CircuitBreakerOpenException("CircuitBreaker is half open, parallel execution in progress")); + } + } + } + + private void scheduleHalf() { + schedule.set(executor.get() + .schedule(() -> { + state.compareAndSet(State.OPEN, State.HALF_OPEN); + schedule.set(null); + return true; + }, delayMillis, TimeUnit.MILLISECONDS)); + } + + public State state() { + return state.get(); + } + + public void state(State newState) { + if (newState == State.CLOSED) { + if (state.get() == State.CLOSED) { + // fine + resetCounters(); + return; + } + + ScheduledFuture future = schedule.getAndSet(null); + if (future != null) { + future.cancel(false); + } + resetCounters(); + state.set(State.CLOSED); + } else if (newState == State.OPEN) { + state.set(State.OPEN); + ScheduledFuture future = schedule.getAndSet(null); + if (future != null) { + future.cancel(false); + } + resetCounters(); + } else { + // half open + resetCounters(); + } + } + + private void resetCounters() { + results.reset(); + successCounter.set(0); + } + + public static class Builder implements io.helidon.common.Builder { + // how long to transition from open to half-open + private Duration delay = Duration.ofSeconds(5); + // how many percents of failures will open the breaker + private int ratio = 60; + // how many successful calls will close a half-open breaker + private int successThreshold = 1; + // rolling window size to + private int volume = 10; + private LazyValue executor = FaultTolerance.scheduledExecutor(); + + private Builder() { + } + + @Override + public CircuitBreaker build() { + return new CircuitBreaker(this); + } + + public Builder delay(Duration delay) { + this.delay = delay; + return this; + } + + public Builder ratio(int ratio) { + this.ratio = ratio; + return this; + } + + public Builder successThreshold(int successThreshold) { + this.successThreshold = successThreshold; + return this; + } + + public Builder volume(int volume) { + this.volume = volume; + return this; + } + + LazyValue executor() { + return executor; + } + } + + public enum State { + CLOSED, + HALF_OPEN, + OPEN + } + +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerOpenException.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerOpenException.java new file mode 100644 index 000000000..130de528c --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/CircuitBreakerOpenException.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +public class CircuitBreakerOpenException extends RuntimeException { + public CircuitBreakerOpenException(String message) { + super(message); + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java new file mode 100644 index 000000000..1e8edf971 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; +import java.util.function.Supplier; + +import io.helidon.common.reactive.Single; + +public class Fallback implements TypedHandler { + private final Function> fallback; + + private Fallback(Builder builder) { + this.fallback = builder.fallback; + } + + public static Builder builder() { + return new Builder<>(); + } + + @Override + public Single invoke(Supplier> supplier) { + CompletableFuture future = new CompletableFuture<>(); + + supplier.get() + .thenAccept(future::complete) + .exceptionally(throwable -> { + Throwable cause = FaultTolerance.getCause(throwable); + fallback.apply(cause) + .thenAccept(future::complete) + .exceptionally(t2 -> { + Throwable cause2 = FaultTolerance.getCause(t2); + cause2.addSuppressed(throwable); + future.completeExceptionally(cause2); + return null; + }); + return null; + }); + + return Single.create(future); + } + + public static class Builder implements io.helidon.common.Builder> { + private Function> fallback; + + private Builder() { + } + + @Override + public Fallback build() { + Objects.requireNonNull(fallback, "Fallback method must be specified"); + return new Fallback<>(this); + } + + public Builder fallback(Function> fallback) { + this.fallback = fallback; + return this; + } + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java new file mode 100644 index 000000000..9d4c56e85 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java @@ -0,0 +1,250 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; + +import io.helidon.common.LazyValue; +import io.helidon.common.configurable.ScheduledThreadPoolSupplier; +import io.helidon.common.configurable.ThreadPoolSupplier; +import io.helidon.common.reactive.Single; +import io.helidon.config.Config; + +/** + * Access to fault tolerance features. + */ +public final class FaultTolerance { + private static final AtomicReference> SCHEDULED_EXECUTOR = + new AtomicReference<>(); + private static final AtomicReference> EXECUTOR = new AtomicReference<>(); + private static final AtomicReference CONFIG = new AtomicReference<>(Config.empty()); + + static { + SCHEDULED_EXECUTOR.set(LazyValue.create(ScheduledThreadPoolSupplier.builder() + .threadNamePrefix("ft-schedule-") + .config(CONFIG.get().get("scheduled-executor")) + .build())); + + EXECUTOR.set(LazyValue.create(ThreadPoolSupplier.builder() + .threadNamePrefix("ft-") + .config(CONFIG.get().get("executor")) + .build())); + } + + private FaultTolerance() { + } + + public static void config(Config config) { + CONFIG.set(config); + + SCHEDULED_EXECUTOR.set(LazyValue.create(ScheduledThreadPoolSupplier.create(CONFIG.get().get("scheduled-executor")))); + EXECUTOR.set(LazyValue.create(ThreadPoolSupplier.create(CONFIG.get().get("executor")))); + } + + public static void executor(Supplier executor) { + EXECUTOR.set(LazyValue.create(executor::get)); + } + + public static void scheduledExecutor(Supplier executor) { + SCHEDULED_EXECUTOR.set(LazyValue.create(executor)); + } + + static LazyValue executor() { + return EXECUTOR.get(); + } + + static LazyValue scheduledExecutor() { + return SCHEDULED_EXECUTOR.get(); + } + + public static Single async(Supplier syncSupplier) { + return Async.builder() + .executor(EXECUTOR.get()) + .build() + .invoke(syncSupplier); + } + + public static Single fallback(Supplier> primary, + Function> fallback) { + return Fallback.builder() + .fallback(fallback) + .build() + .invoke(primary); + } + + @SuppressWarnings("unchecked") + public static Single retry(Supplier> command) { + return Retry.builder() + .build() + .invoke(command); + } + + public static Single timeout(Duration timeout, + Supplier> command) { + Timeout ft = Timeout.builder() + .timeout(timeout) + .build(); + + return ft.invoke(command); + } + + public static Builder builder() { + return new Builder(); + } + + static Throwable getCause(Throwable throwable) { + if (throwable instanceof CompletionException) { + return getCause(throwable.getCause()); + } + if (throwable instanceof ExecutionException) { + return getCause(throwable.getCause()); + } + return throwable; + } + + abstract static class BaseBuilder> { + @SuppressWarnings("unchecked") + private B me() { + return (B) this; + } + + public B addBulkhead(Bulkhead bulkhead) { + add(bulkhead); + return me(); + } + + public B addBreaker(CircuitBreaker breaker) { + add(breaker); + return me(); + } + + public B addTimeout(Timeout timeout) { + add(timeout); + return me(); + } + + public B addRetry(Retry retry) { + add(retry); + return me(); + } + + public abstract void add(Handler ft); + } + + public static class TypedBuilder extends BaseBuilder> implements io.helidon.common.Builder> { + private final List> fts = new LinkedList<>(); + + private TypedBuilder() { + } + + @Override + public TypedHandler build() { + return new TypedHandlerImpl(fts); + } + + @Override + public void add(Handler ft) { + fts.add(ft::invoke); + } + + public TypedBuilder addFallback(Fallback fallback) { + fts.add(fallback); + return this; + } + + private TypedBuilder builder(Builder builder) { + builder.fts + .forEach(it -> { + fts.add(it::invoke); + }); + return this; + } + + private static class TypedHandlerImpl implements TypedHandler { + private final List> validFts; + + private TypedHandlerImpl(List> validFts) { + this.validFts = new LinkedList<>(validFts); + } + + @Override + public Single invoke(Supplier> supplier) { + Supplier> next = supplier; + + for (TypedHandler validFt : validFts) { + final var finalNext = next; + next = () -> validFt.invoke(finalNext); + } + + return Single.create(next.get()); + } + } + } + + public static class Builder extends BaseBuilder implements io.helidon.common.Builder { + private final List fts = new LinkedList<>(); + + private Builder() { + } + + @Override + public Handler build() { + return new HandlerImpl(fts); + } + + public TypedBuilder addFallback(Fallback fallback) { + return new TypedBuilder() + .builder(this) + .addFallback(fallback); + } + + @Override + public void add(Handler ft) { + fts.add(ft); + } + + private static class HandlerImpl implements Handler { + private final List validFts; + + private HandlerImpl(List validFts) { + this.validFts = new LinkedList<>(validFts); + } + + @Override + public Single invoke(Supplier> supplier) { + Supplier> next = supplier; + + for (Handler validFt : validFts) { + final var finalNext = next; + next = () -> validFt.invoke(finalNext); + } + + return Single.create(next.get()); + } + } + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Handler.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Handler.java new file mode 100644 index 000000000..6edca9fca --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Handler.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; + +import io.helidon.common.reactive.Single; + +@FunctionalInterface +public interface Handler { + Single invoke(Supplier> supplier); +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/ResultWindow.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/ResultWindow.java new file mode 100644 index 000000000..77decdac1 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/ResultWindow.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Floating window of results. + * The status is eventually consistent - there may be inconsistencies under heavier load (the sum may be off-beat + * from the actual results due to parallel execution). + * This should not be a significant issue, as the calculations work on a state (that may change anyway when checking + * whether to open the circuit). + */ +final class ResultWindow { + private final AtomicInteger currentSum = new AtomicInteger(); + private final AtomicCycle index; + private final AtomicInteger[] results; + private final int thresholdSum; + + ResultWindow(int size, int ratio) { + results = new AtomicInteger[size]; + for (int i = 0; i < size; i++) { + results[i] = new AtomicInteger(); + } + index = new AtomicCycle(size - 1); + // calculate the sum needed to open the breaker + int threshold = (size * ratio) / 100; + thresholdSum = threshold == 0 ? 1 : threshold; + + } + + void update(Result resultEnum) { + // success is zero, failure is 1 + int result = resultEnum.ordinal(); + + AtomicInteger mine = results[index.incrementAndGet()]; + int origValue = mine.getAndSet(result); + + if (origValue == result) { + // no change + return; + } + + if (origValue == 1) { + currentSum.decrementAndGet(); + } else { + currentSum.incrementAndGet(); + } + } + + boolean shouldOpen() { + return currentSum.get() >= thresholdSum; + } + + void reset() { + // "soft" reset - send in success equal to window size + for (int i = 0; i < results.length; i++) { + update(Result.SUCCESS); + } + } + + // order is significant, do not change + enum Result { + SUCCESS, + FAILURE + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Retry.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Retry.java new file mode 100644 index 000000000..720cea3f3 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Retry.java @@ -0,0 +1,252 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Single; + +public class Retry implements Handler { + private final int calls; + private final long delayMillis; + private final long maxTimeNanos; + private final int jitterMillis; + private final LazyValue scheduledExecutor; + private final Random random = new Random(); + + protected Retry(Builder builder) { + this.calls = builder.calls; + this.delayMillis = builder.delay.toMillis(); + this.maxTimeNanos = builder.maxTime.toNanos(); + long jitter = builder.jitter.toMillis() * 2; + this.jitterMillis = (jitter > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) jitter; + this.scheduledExecutor = builder.scheduledExecutor; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public Single invoke(Supplier> supplier) { + CompletableFuture future = new CompletableFuture<>(); + + new Retrier<>(future, supplier, this) + .retry(); + + return Single.create(future); + } + + protected boolean abort(Throwable throwable) { + return false; + } + + private long nextDelay() { + long delay = delayMillis; + int jitterRandom = random.nextInt(jitterMillis) - jitterMillis; + delay = delay + jitterRandom; + delay = Math.max(0, delay); + + return delay; + } + + private class Retrier { + private final AtomicInteger count = new AtomicInteger(); + private final AtomicReference lastThrowable = new AtomicReference<>(); + private final Supplier> supplier; + private final CompletableFuture future; + private final Retry retry; + private final long started = System.nanoTime(); + + private Retrier(CompletableFuture future, + Supplier> supplier, + Retry retry) { + this.future = future; + this.supplier = supplier; + this.retry = retry; + } + + private void retry() { + int currentCount = count.incrementAndGet(); + + CompletionStage stage = null; + try { + stage = supplier.get(); + } catch (Throwable e) { + stage = CompletableFuture.failedStage(e); + } + + stage.handle((it, throwable) -> { + if (throwable == null) { + future.complete(it); + } else { + Throwable current = FaultTolerance.getCause(throwable); + Throwable before = lastThrowable.get(); + if (before != null) { + current.addSuppressed(before); + } + + long now = System.nanoTime(); + if (currentCount >= retry.calls || retry.abort(current)) { + // this is final execution + future.completeExceptionally(current); + } else if (now - started > maxTimeNanos) { + TimeoutException timedOut = new TimeoutException("Retries timed out"); + timedOut.addSuppressed(current); + future.completeExceptionally(timedOut); + } else { + lastThrowable.set(current); + // schedule next retry + scheduledExecutor.get().schedule(this::retry, nextDelay(), TimeUnit.MILLISECONDS); + } + } + return null; + }); + } + } + + private static class RetryWithAbortOn extends Retry { + private final Set> abortOn; + + private RetryWithAbortOn(Builder builder) { + super(builder); + this.abortOn = Set.copyOf(builder.abortOn); + } + + @Override + protected boolean abort(Throwable throwable) { + return abortOn.contains(throwable.getClass()); + } + } + + private static class RetryWithRetryOn extends Retry { + private final Set> retryOn; + + private RetryWithRetryOn(Builder builder) { + super(builder); + this.retryOn = Set.copyOf(builder.retryOn); + } + + @Override + protected boolean abort(Throwable throwable) { + return !retryOn.contains(throwable.getClass()); + } + } + + public static class Builder implements io.helidon.common.Builder { + private final Set> retryOn = new HashSet<>(); + private final Set> abortOn = new HashSet<>(); + + private int calls = 3; + private Duration delay = Duration.ofMillis(200); + private Duration maxTime = Duration.ofSeconds(1); + private Duration jitter = Duration.ofMillis(50); + private LazyValue scheduledExecutor = FaultTolerance.scheduledExecutor(); + + private Builder() { + } + + @Override + public Retry build() { + calls = Math.max(1, calls); + if (retryOn.isEmpty()) { + if (abortOn.isEmpty()) { + return new Retry(this); + } else { + return new RetryWithAbortOn(this); + } + } else { + if (abortOn.isEmpty()) { + return new RetryWithRetryOn(this); + } else { + throw new IllegalArgumentException("You have defined both retryOn and abortOn exceptions. " + + "This cannot be correctly handled; abortOn: " + abortOn + + " retryOn: " + retryOn); + } + } + } + + /** + * Total number of calls (first + retries). + * @param calls how many times to call the method + * @return updated builder instance + */ + public Builder calls(int calls) { + this.calls = calls; + return this; + } + + public Builder delay(Duration delay) { + this.delay = delay; + return this; + } + + public Builder maxTime(Duration maxTime) { + this.maxTime = maxTime; + return this; + } + + public Builder jitter(Duration jitter) { + this.jitter = jitter; + return this; + } + + public Builder retryOn(Class... classes) { + retryOn.clear(); + Arrays.stream(classes) + .forEach(this::addRetryOn); + + return this; + } + + public Builder addRetryOn(Class clazz) { + this.retryOn.add(clazz); + return this; + } + + public Builder abortOn(Class... classes) { + abortOn.clear(); + Arrays.stream(classes) + .forEach(this::addAbortOn); + + return this; + } + + public Builder addAbortOn(Class clazz) { + this.abortOn.add(clazz); + return this; + } + + public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) { + this.scheduledExecutor = LazyValue.create(scheduledExecutor); + return this; + } + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Timeout.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Timeout.java new file mode 100644 index 000000000..5944b46a7 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Timeout.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import io.helidon.common.LazyValue; +import io.helidon.common.reactive.Single; + +public class Timeout implements Handler { + private final Duration timeout; + private final LazyValue executor; + + private Timeout(Builder builder) { + this.timeout = builder.timeout; + this.executor = builder.executor;; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public Single invoke(Supplier> supplier) { + return Single.create(supplier.get()) + .timeout(timeout.toMillis(), TimeUnit.MILLISECONDS, executor.get()); + } + + public static class Builder implements io.helidon.common.Builder { + private Duration timeout = Duration.ofSeconds(10); + private LazyValue executor = FaultTolerance.scheduledExecutor();; + + private Builder() { + } + + @Override + public Timeout build() { + return new Timeout(this); + } + + public Builder timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public Builder executor(ScheduledExecutorService executor) { + this.executor = LazyValue.create(executor); + return this; + } + } +} diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/TypedHandler.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/TypedHandler.java new file mode 100644 index 000000000..761441f35 --- /dev/null +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/TypedHandler.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; + +import io.helidon.common.reactive.Single; + +@FunctionalInterface +public interface TypedHandler { + Single invoke(Supplier> supplier); +} diff --git a/fault-tolerance/src/main/java/module-info.java b/fault-tolerance/src/main/java/module-info.java new file mode 100644 index 000000000..ff9fb62b7 --- /dev/null +++ b/fault-tolerance/src/main/java/module-info.java @@ -0,0 +1,4 @@ +module io.helidon.faulttolerance { + requires io.helidon.config; + requires io.helidon.common.configurable; +} \ No newline at end of file diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java new file mode 100644 index 000000000..231026842 --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.helidon.common.reactive.Single; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class AsyncTest { + private final AtomicInteger syncCounter = new AtomicInteger(); + + @BeforeEach + void reset() { + syncCounter.set(0); + } + + @Test + void testAsync() { + Thread result = Async.create() + .invoke(this::sync) + .await(1, TimeUnit.SECONDS); + + assertThat(result, is(not(Thread.currentThread()))); + assertThat(syncCounter.get(), is(1)); + } + + @Test + void testAsyncError() { + Single result = Async.create() + .invoke(this::syncError); + + CompletionException exception = assertThrows(CompletionException.class, () -> result.await(1, TimeUnit.SECONDS)); + + Throwable cause = exception.getCause(); + + assertThat(cause, notNullValue()); + assertThat(cause, instanceOf(MyException.class)); + } + + private String syncError() { + throw new MyException(); + } + + private Thread sync() { + syncCounter.incrementAndGet(); + return Thread.currentThread(); + } + + private static class MyException extends RuntimeException { + } +} \ No newline at end of file diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/AtomicCycleTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/AtomicCycleTest.java new file mode 100644 index 000000000..d2ffb75af --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/AtomicCycleTest.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +class AtomicCycleTest { + @Test + void test() { + AtomicCycle cycle = new AtomicCycle(2); + + assertThat(cycle.incrementAndGet(), is(0)); + assertThat(cycle.incrementAndGet(), is(1)); + assertThat(cycle.incrementAndGet(), is(2)); + assertThat(cycle.incrementAndGet(), is(0)); + assertThat(cycle.incrementAndGet(), is(1)); + assertThat(cycle.incrementAndGet(), is(2)); + assertThat(cycle.incrementAndGet(), is(0)); + } +} \ No newline at end of file diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/BulkheadTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/BulkheadTest.java new file mode 100644 index 000000000..0587f93d3 --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/BulkheadTest.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.helidon.common.reactive.Single; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +class BulkheadTest { + @Test + void testBulkhead() throws InterruptedException { + Bulkhead bulkhead = Bulkhead.builder() + .limit(1) + .queueLength(1) + .build(); + + Request inProgress = new Request(0); + Request enqueued = new Request(1); + Request rejected = new Request(2); + + Single inProgressResult = bulkhead.invoke(inProgress::invoke); + Single enqueuedResult = bulkhead.invoke(enqueued::invoke); + Single rejectedResult = bulkhead.invoke(rejected::invoke); + + if (!inProgress.invokedCdl.await(1, TimeUnit.SECONDS)) { + fail("Invoke method of inProgress was not called"); + } + assertThat(inProgress.invoked.get(), is(true)); + assertThat(inProgress.indexed.get(), is(false)); + assertThat(enqueued.invoked.get(), is(false)); + assertThat(enqueued.indexed.get(), is(false)); + assertThat(rejected.invoked.get(), is(false)); + assertThat(rejected.indexed.get(), is(false)); + + inProgress.releaseCdl.countDown(); + if (!enqueued.invokedCdl.await(1, TimeUnit.SECONDS)) { + fail("Invoke method of enqueued was not called"); + } + assertThat(inProgress.indexed.get(), is(true)); + assertThat(enqueued.invoked.get(), is(true)); + assertThat(enqueued.indexed.get(), is(false)); + assertThat(rejected.invoked.get(), is(false)); + assertThat(rejected.indexed.get(), is(false)); + + enqueued.releaseCdl.countDown(); + + assertThat(inProgressResult.await(1, TimeUnit.SECONDS), is(0)); + assertThat(enqueuedResult.await(1, TimeUnit.SECONDS), is(1)); + CompletionException completionException = assertThrows(CompletionException.class, + () -> rejectedResult.await(1, TimeUnit.SECONDS)); + Throwable cause = completionException.getCause(); + + assertThat(cause, notNullValue()); + assertThat(cause, instanceOf(BulkheadException.class)); + assertThat(cause.getMessage(), is("Bulkhead queue is full")); + } + + @Test + void testBulkheadWithError() throws InterruptedException { + Bulkhead bulkhead = Bulkhead.builder() + .limit(1) + .queueLength(1) + .build(); + Single result = bulkhead.invoke(() -> Single.error(new IllegalStateException())); + FaultToleranceTest.completionException(result, IllegalStateException.class); + + Request inProgress = new Request(0); + bulkhead.invoke(inProgress::invoke); + + // queued + result = bulkhead.invoke(() -> Single.error(new IllegalStateException())); + inProgress.releaseCdl.countDown(); + + FaultToleranceTest.completionException(result, IllegalStateException.class); + + } + + private static class Request { + private final CountDownLatch releaseCdl = new CountDownLatch(1); + private final CountDownLatch invokedCdl = new CountDownLatch(1); + private final AtomicBoolean invoked = new AtomicBoolean(); + private final AtomicBoolean indexed = new AtomicBoolean(); + + private final int index; + + Request(int index) { + this.index = index; + } + + CompletionStage invoke() { + invoked.set(true); + invokedCdl.countDown(); + return FaultTolerance.async(this::index); + } + + private int index() { + try { + releaseCdl.await(); + } catch (InterruptedException e) { + } + indexed.set(true); + return index; + } + } +} diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/CircuitBreakerTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/CircuitBreakerTest.java new file mode 100644 index 000000000..d68653a30 --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/CircuitBreakerTest.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import io.helidon.common.reactive.Single; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class CircuitBreakerTest { + @Test + void testCircuitBreaker() throws InterruptedException { + CircuitBreaker breaker = CircuitBreaker.builder() + .volume(10) + .ratio(20) + .delay(Duration.ofMillis(200)) + .successThreshold(2) + .build(); + + good(breaker); + good(breaker); + + bad(breaker); + + good(breaker); + + // should open the breaker + bad(breaker); + + breakerOpen(breaker); + + assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); + + // need to wait until half open + int count = 0; + while(count++ < 10) { + Thread.sleep(50); + if(breaker.state() == CircuitBreaker.State.HALF_OPEN) { + break; + } + } + + assertThat(breaker.state(), is(CircuitBreaker.State.HALF_OPEN)); + + good(breaker); + good(breaker); + + assertThat(breaker.state(), is(CircuitBreaker.State.CLOSED)); + + // should open the breaker + bad(breaker); + bad(breaker); + + assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); + + // need to wait until half open + count = 0; + while(count++ < 10) { + Thread.sleep(50); + if(breaker.state() == CircuitBreaker.State.HALF_OPEN) { + break; + } + } + + good(breaker); + bad(breaker); + + assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); + } + + private void breakerOpen(CircuitBreaker breaker) { + Request good = new Request(); + Single result = breaker.invoke(good::invoke); + CompletionException exception = assertThrows(CompletionException.class, () -> result.await(1, TimeUnit.SECONDS)); + + Throwable cause = exception.getCause(); + + assertThat(cause, notNullValue()); + assertThat(cause, instanceOf(CircuitBreakerOpenException.class)); + } + + private void bad(CircuitBreaker breaker) { + Failing failing = new Failing(new IllegalStateException("Fail")); + Single failedResult = breaker.invoke(failing::invoke); + CompletionException exception = assertThrows(CompletionException.class, () -> failedResult.await(1, TimeUnit.SECONDS)); + + Throwable cause = exception.getCause(); + + assertThat(cause, notNullValue()); + assertThat(cause, instanceOf(IllegalStateException.class)); + + } + + private void good(CircuitBreaker breaker) { + Request good = new Request(); + Single result = breaker.invoke(good::invoke); + result.await(1, TimeUnit.SECONDS); + } + + private static class Failing { + private final Exception exception; + + Failing(Exception exception) { + this.exception = exception; + } + + CompletionStage invoke() { + return CompletableFuture.failedFuture(exception); + } + } + + private static class Request { + Request() { + } + + CompletionStage invoke() { + return CompletableFuture.completedFuture(1); + } + } +} diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/FallbackTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/FallbackTest.java new file mode 100644 index 000000000..f2ca4dae9 --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/FallbackTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.util.Arrays; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.helidon.common.reactive.Single; +import io.helidon.config.ConfigException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +class FallbackTest { + private final AtomicInteger primaryCounter = new AtomicInteger(); + private final AtomicInteger fallbackCounter = new AtomicInteger(); + + @BeforeEach + void reset() { + primaryCounter.set(0); + fallbackCounter.set(0); + } + + @Test + void testFallback() { + String result = FaultTolerance.fallback(this::primary, this::fallback) + .await(1, TimeUnit.SECONDS); + + assertThat(result, is("fallback")); + assertThat(primaryCounter.get(), is(1)); + assertThat(fallbackCounter.get(), is(1)); + } + + @Test + void testFallbackFails() { + Single result = FaultTolerance.fallback(this::primary, this::fallbackFail); + ConfigException exception = FaultToleranceTest.completionException(result, ConfigException.class); + Throwable[] suppressed = exception.getSuppressed(); + assertThat("Should have a suppressed exception: " + Arrays.toString(suppressed), suppressed.length, is(1)); + assertThat(suppressed[0], instanceOf(CompletionException.class)); + Throwable suppressedCause = suppressed[0].getCause(); + assertThat(suppressedCause, instanceOf(IllegalArgumentException.class)); + } + + private Single primary() { + primaryCounter.incrementAndGet(); + return Single.error(new IllegalArgumentException("Intentional failure")); + } + + private Single fallback(Throwable throwable) { + fallbackCounter.incrementAndGet(); + return Single.just("fallback"); + } + + private Single fallbackFail(Throwable throwable) { + fallbackCounter.incrementAndGet(); + return Single.error(new ConfigException("Intentional failure")); + } +} \ No newline at end of file diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/FaultToleranceTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/FaultToleranceTest.java new file mode 100644 index 000000000..31e6eb0a1 --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/FaultToleranceTest.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.helidon.common.reactive.Single; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class FaultToleranceTest { + + @Test + void testCustomCombination() { + CircuitBreaker breaker = CircuitBreaker.builder() + .build(); + + Bulkhead bulkhead = Bulkhead.builder() + .limit(1) + .queueLength(0) + .build(); + + TypedHandler faultTolerance = FaultTolerance.builder() + .addBreaker(breaker) + .addBulkhead(bulkhead) + .addTimeout(Timeout.builder().timeout(Duration.ofMillis(100)).build()) + .addFallback(Fallback.builder() + .fallback(this::fallback) + .build()) + .build(); + + Single result = faultTolerance.invoke(this::primary); + assertThat(result.await(1, TimeUnit.SECONDS), is(MyException.class.getName())); + + breaker.state(CircuitBreaker.State.OPEN); + result = faultTolerance.invoke(this::primary); + assertThat(result.await(1, TimeUnit.SECONDS), is(CircuitBreakerOpenException.class.getName())); + + breaker.state(CircuitBreaker.State.CLOSED); + + Manual m = new Manual(); + Single manualResult = bulkhead.invoke(m::call); + + result = faultTolerance.invoke(this::primary); + assertThat(result.await(1, TimeUnit.SECONDS), is(BulkheadException.class.getName())); + + m.future.complete("result"); + manualResult.await(1, TimeUnit.SECONDS); + + m = new Manual(); + result = faultTolerance.invoke(m::call); + assertThat(result.await(1, TimeUnit.SECONDS), is(TimeoutException.class.getName())); + + m.future.complete("hu"); + } + + private Single primary() { + return Single.error(new MyException()); + } + + private Single fallback(Throwable throwable) { + return Single.just(throwable.getClass().getName()); + } + + static T completionException(Single result, Class expected) { + CompletionException completionException = assertThrows(CompletionException.class, + () -> result.await(1, TimeUnit.SECONDS)); + Throwable cause = completionException.getCause(); + assertThat(cause, notNullValue()); + assertThat(cause, instanceOf(expected)); + + return expected.cast(cause); + } + + private class Manual { + private final CompletableFuture future = new CompletableFuture<>(); + + private CompletionStage call() { + return future; + } + } + + private static class MyException extends RuntimeException { + + } + +} diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/ResultWindowTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/ResultWindowTest.java new file mode 100644 index 000000000..b27a5b18e --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/ResultWindowTest.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +class ResultWindowTest { + @Test + void test() { + ResultWindow window = new ResultWindow(10, 10); + assertThat("Empty should not open", window.shouldOpen(), is(false)); + window.update(ResultWindow.Result.SUCCESS); + window.update(ResultWindow.Result.SUCCESS); + window.update(ResultWindow.Result.SUCCESS); + assertThat("Only success should not open", window.shouldOpen(), is(false)); + window.update(ResultWindow.Result.FAILURE); + assertThat("Should open on first failure (10% of 10 size)", window.shouldOpen(), is(true)); + //now cycle through window and replace all with success + for (int i = 0; i < 10; i++) { + window.update(ResultWindow.Result.SUCCESS); + } + assertThat("All success should not open", window.shouldOpen(), is(false)); + window.update(ResultWindow.Result.FAILURE); + assertThat("Should open on first failure (10% of 10 size)", window.shouldOpen(), is(true)); + window.reset(); + assertThat("Should not open after reset", window.shouldOpen(), is(false)); + } +} \ No newline at end of file diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/RetryTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/RetryTest.java new file mode 100644 index 000000000..644b92fc5 --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/RetryTest.java @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import io.helidon.common.reactive.Single; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class RetryTest { + @Test + void testRetry() { + Retry retry = Retry.builder() + .calls(3) + .delay(Duration.ofMillis(50)) + .maxTime(Duration.ofMillis(500)) + .jitter(Duration.ofMillis(50)) + .build(); + + Request req = new Request(3, new TerminalException(), new RetryException()); + Single result = retry.invoke(req::invoke); + FaultToleranceTest.completionException(result, TerminalException.class); + assertThat(req.call.get(), is(3)); + + req = new Request(2, new TerminalException(), new RetryException()); + result = retry.invoke(req::invoke); + int count = result.await(1, TimeUnit.SECONDS); + assertThat(count, is(3)); + } + + @Test + void testRetryOn() { + Retry retry = Retry.builder() + .calls(3) + .delay(Duration.ofMillis(100)) + .maxTime(Duration.ofMillis(500)) + .jitter(Duration.ofMillis(50)) + .addRetryOn(RetryException.class) + .build(); + + Request req = new Request(3, new RetryException(), new RetryException()); + Single result = retry.invoke(req::invoke); + FaultToleranceTest.completionException(result, RetryException.class); + assertThat(req.call.get(), is(3)); + + req = new Request(2, new RetryException(), new TerminalException()); + result = retry.invoke(req::invoke); + FaultToleranceTest.completionException(result, TerminalException.class); + assertThat(req.call.get(), is(2)); + + req = new Request(2, new RetryException(), new RetryException()); + result = retry.invoke(req::invoke); + int count = result.await(1, TimeUnit.SECONDS); + assertThat(count, is(3)); + } + + @Test + void testAbortOn() { + Retry retry = Retry.builder() + .calls(3) + .delay(Duration.ofMillis(100)) + .maxTime(Duration.ofMillis(500)) + .jitter(Duration.ofMillis(50)) + .addAbortOn(TerminalException.class) + .build(); + + Request req = new Request(3, new RetryException(), new RetryException()); + Single result = retry.invoke(req::invoke); + FaultToleranceTest.completionException(result, RetryException.class); + assertThat(req.call.get(), is(3)); + + req = new Request(2, new RetryException(), new TerminalException()); + result = retry.invoke(req::invoke); + FaultToleranceTest.completionException(result, TerminalException.class); + assertThat(req.call.get(), is(2)); + + req = new Request(2, new RetryException(), new RetryException()); + result = retry.invoke(req::invoke); + int count = result.await(1, TimeUnit.SECONDS); + assertThat(count, is(3)); + } + + @Test + void testTimeout() { + Retry retry = Retry.builder() + .calls(3) + .delay(Duration.ofMillis(50)) + .maxTime(Duration.ofMillis(45)) + .jitter(Duration.ofMillis(1)) + .build(); + + Request req = new Request(3, new RetryException(), new RetryException()); + Single result = retry.invoke(req::invoke); + FaultToleranceTest.completionException(result, TimeoutException.class); + assertThat(req.call.get(), is(2)); + } + + @Test + void testBadConfiguration() { + Retry.Builder builder = Retry.builder() + .retryOn(RetryException.class) + .abortOn(TerminalException.class); + + assertThrows(IllegalArgumentException.class, builder::build); + } + + + private static class Request { + private final AtomicInteger call = new AtomicInteger(); + private final int failures; + private final RuntimeException first; + private final RuntimeException second; + + private Request(int failures, RuntimeException first, RuntimeException second) { + this.failures = failures; + this.first = first; + this.second = second; + } + + CompletionStage invoke() { + //failures 1 + // call + int now = call.incrementAndGet(); + if (now <= failures) { + if (now == 1) { + throw first; + } else if (now == 2) { + throw second; + } else { + throw first; + } + } + return Single.just(now); + } + } + + private static class RetryException extends RuntimeException { + } + + private static class TerminalException extends RuntimeException { + + } +} \ No newline at end of file diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/TimeoutTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/TimeoutTest.java new file mode 100644 index 000000000..2f11d1486 --- /dev/null +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/TimeoutTest.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.faulttolerance; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeoutException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class TimeoutTest { + + @BeforeEach + void reset() { + } + + @Test + void testAsync() { + CompletionException exc = assertThrows(CompletionException.class, + () -> FaultTolerance.timeout(Duration.ofSeconds(1), this::timeOut) + .await()); + + Throwable cause = exc.getCause(); + assertThat(cause, notNullValue()); + assertThat(cause, instanceOf(TimeoutException.class)); + } + + private CompletionStage timeOut() { + // never completing + return new CompletableFuture<>(); + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 00607cbcf..14520b46b 100644 --- a/pom.xml +++ b/pom.xml @@ -173,6 +173,7 @@ integrations dbclient messaging + fault-tolerance