mirror of
https://github.com/jlengrand/helidon.git
synced 2026-03-10 08:21:17 +00:00
FaultTolerance for SE.
Signed-off-by: Tomas Langer <tomas.langer@oracle.com>
This commit is contained in:
committed by
Santiago Pericasgeertsen
parent
8a68523d83
commit
c11ee85a0d
@@ -16,7 +16,6 @@
|
||||
|
||||
package io.helidon.common.configurable;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
@@ -32,7 +31,7 @@ import io.helidon.config.Config;
|
||||
* Supplier of a custom scheduled thread pool.
|
||||
* The returned thread pool supports {@link io.helidon.common.context.Context} propagation.
|
||||
*/
|
||||
public final class ScheduledThreadPoolSupplier implements Supplier<ExecutorService> {
|
||||
public final class ScheduledThreadPoolSupplier implements Supplier<ScheduledExecutorService> {
|
||||
private static final int EXECUTOR_DEFAULT_CORE_POOL_SIZE = 16;
|
||||
private static final boolean EXECUTOR_DEFAULT_IS_DAEMON = true;
|
||||
private static final String EXECUTOR_DEFAULT_THREAD_NAME_PREFIX = "helidon-";
|
||||
|
||||
77
fault-tolerance/DESIGN.md
Normal file
77
fault-tolerance/DESIGN.md
Normal file
@@ -0,0 +1,77 @@
|
||||
Fault Tolerance
|
||||
---
|
||||
|
||||
Fault tolerance (FT) covers a wide area of features. The following document
|
||||
describes each FT feature and its API for Helidon SE (or approach to use to achieve such a feature using
|
||||
existing APIs).
|
||||
|
||||
# Common API
|
||||
The FT requires executor service (or more) to handle some of the features provided. To be able to
|
||||
configure FT for the whole application, a set of static methods exists on class `FaultTolerance`.
|
||||
|
||||
- `FaultTolerance.config(Config)` - use a Helidon config instance to configure defaults
|
||||
|
||||
# Asynchronous
|
||||
Provides an asynchronous execution for a blocking operation. As Helidon SE network stack is using a
|
||||
non blocking reactive API, applications cannot block the threads. You can use this API to execute a blocking
|
||||
operation in a separate executor service and obtain its result as a Helidon reactive `Single`.
|
||||
|
||||
Configuration:
|
||||
- executor service
|
||||
|
||||
# Bulkhead
|
||||
Limits the number of parallel calls to a single resource.
|
||||
|
||||
Configuration:
|
||||
- parallel execution limit
|
||||
- executor service
|
||||
- queue
|
||||
- number of queued records
|
||||
|
||||
# Circuit Breaker
|
||||
Defines a circuit breaker policy to an individual method or a class.
|
||||
A circuit breaker aims to prevent further damage by not executing functionality that is doomed to fail. After a failure situation has been detected, circuit breakers prevent methods from being executed and instead throw exceptions immediately. After a certain delay or wait time, the functionality is attempted to be executed again.
|
||||
A circuit breaker can be in one of the following states:
|
||||
Closed: In normal operation, the circuit is closed. If a failure occurs, the Circuit Breaker records the event. In closed state the requestVolumeThreshold and failureRatio parameters may be configured in order to specify the conditions under which the breaker will transition the circuit to open. If the failure conditions are met, the circuit will be opened.
|
||||
Open: When the circuit is open, calls to the service operating under the circuit breaker will fail immediately. A delay may be configured for the circuit breaker. After the specified delay, the circuit transitions to half-open state.
|
||||
Half-open: In half-open state, trial executions of the service are allowed. By default one trial call to the service is permitted. If the call fails, the circuit will return to open state. The successThreshold parameter allows the configuration of the number of trial executions that must succeed before the circuit can be closed. After the specified number of successful executions, the circuit will be closed. If a failure occurs before the successThreshold is reached the circuit will transition to open.
|
||||
Circuit state transitions will reset the circuit breaker's records.
|
||||
|
||||
Configuration:
|
||||
- fail on `<? extends Throwable>` - these are failures
|
||||
- skip on `<? extends Throwable>` - these are not failures
|
||||
- delay (duration) - how long before transitioning from open to half-open
|
||||
- volume threshold - rolling window size
|
||||
- ratio (percentage) - how many failures will trigger this to open
|
||||
- success threshold - how many successful calls will close a half-open breaker
|
||||
|
||||
# Fallback
|
||||
What to call when this fails.
|
||||
Could be replaced with `onException` of `CompletionStage`
|
||||
|
||||
May provide a context of execution with information such as
|
||||
- `Class`
|
||||
- some description of the method called (not reflection - we want to avoid reflection at all costs)
|
||||
- parameter values posted to original method
|
||||
|
||||
Configuration:
|
||||
- fallback method/handler
|
||||
- applyOn `<? extends Throwable>` - these are failures
|
||||
- skip on `<? extends Throwable>` - these are not failures
|
||||
|
||||
# Retry
|
||||
Retry execution.
|
||||
|
||||
Configuration:
|
||||
- maximal number of retries
|
||||
- delay between retries (duration)
|
||||
- overall maximal duration
|
||||
- jitter (randomize delays a bit - duration) - a jitter of 200 ms will randomly add between -200 and 200 milliseconds to each retry delay.
|
||||
- retry on `<? extends Throwable>` - these are failures
|
||||
- abort on `<? extends Throwable>` - these will immediately abort retries
|
||||
|
||||
# Timeout
|
||||
Can be simply replaced with `Single.timeout`
|
||||
|
||||
Configuration:
|
||||
- overall timeout (duration)
|
||||
53
fault-tolerance/pom.xml
Normal file
53
fault-tolerance/pom.xml
Normal file
@@ -0,0 +1,53 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
~
|
||||
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~ you may not use this file except in compliance with the License.
|
||||
~ You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>io.helidon</groupId>
|
||||
<artifactId>helidon-project</artifactId>
|
||||
<version>2.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>io.helidon.fault-tolerance</groupId>
|
||||
<artifactId>helidon-fault-tolerance</artifactId>
|
||||
<name>Helidon Fault Tolerance</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.helidon.config</groupId>
|
||||
<artifactId>helidon-config</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.helidon.common</groupId>
|
||||
<artifactId>helidon-common-configurable</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import io.helidon.common.LazyValue;
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
public class Async {
|
||||
private final LazyValue<? extends ExecutorService> executor;
|
||||
|
||||
public Async(Builder builder) {
|
||||
this.executor = LazyValue.create(builder.executor);
|
||||
}
|
||||
|
||||
public static Async create() {
|
||||
return builder().build();
|
||||
}
|
||||
|
||||
public <T> Single<T> invoke(Supplier<T> supplier) {
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
|
||||
executor.get()
|
||||
.submit(() -> {
|
||||
try {
|
||||
T result = supplier.get();
|
||||
future.complete(result);
|
||||
} catch (Exception e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
});
|
||||
|
||||
return Single.create(future);
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static class Builder implements io.helidon.common.Builder<Async> {
|
||||
private LazyValue<? extends ExecutorService> executor = FaultTolerance.executor();
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Async build() {
|
||||
return new Async(this);
|
||||
}
|
||||
|
||||
public Builder executor(Supplier<ExecutorService> executor) {
|
||||
this.executor = LazyValue.create(executor);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
final class AtomicCycle {
|
||||
private final AtomicInteger atomicInteger = new AtomicInteger(-1);
|
||||
private final int maxIndex;
|
||||
|
||||
AtomicCycle(int maxIndex) {
|
||||
this.maxIndex = maxIndex;
|
||||
}
|
||||
|
||||
int incrementAndGet() {
|
||||
int currentIndex;
|
||||
int nextIndex;
|
||||
do {
|
||||
currentIndex = atomicInteger.get();
|
||||
nextIndex = (currentIndex == maxIndex) ? 0 : currentIndex + 1;
|
||||
} while (!atomicInteger.compareAndSet(currentIndex, nextIndex));
|
||||
|
||||
return nextIndex;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,135 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import io.helidon.common.LazyValue;
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
public class Bulkhead implements Handler {
|
||||
private final Queue<Enqueued<?>> queue;
|
||||
private final Semaphore inProgress;
|
||||
|
||||
private Bulkhead(Builder builder) {
|
||||
this.inProgress = new Semaphore(builder.limit, true);
|
||||
if (builder.queueLength == 0) {
|
||||
queue = new NoQueue();
|
||||
} else {
|
||||
this.queue = new LinkedBlockingQueue<>(builder.queueLength);
|
||||
}
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
|
||||
if (inProgress.tryAcquire()) {
|
||||
CompletionStage<T> result = supplier.get();
|
||||
|
||||
result.handle((it, throwable) -> {
|
||||
// we still have an acquired semaphore
|
||||
Enqueued<?> polled = queue.poll();
|
||||
while (polled != null) {
|
||||
invokeEnqueued((Enqueued<Object>) polled);
|
||||
polled = queue.poll();
|
||||
}
|
||||
inProgress.release();
|
||||
return null;
|
||||
});
|
||||
|
||||
return Single.create(result);
|
||||
} else {
|
||||
Enqueued<T> enqueued = new Enqueued<>(supplier);
|
||||
if (!queue.offer(enqueued)) {
|
||||
return Single.error(new BulkheadException("Bulkhead queue is full"));
|
||||
}
|
||||
return Single.create(enqueued.future());
|
||||
}
|
||||
}
|
||||
|
||||
private void invokeEnqueued(Enqueued<Object> enqueued) {
|
||||
CompletableFuture<Object> future = enqueued.future();
|
||||
CompletionStage<Object> completionStage = enqueued.originalStage();
|
||||
completionStage.thenAccept(future::complete);
|
||||
completionStage.exceptionally(throwable -> {
|
||||
future.completeExceptionally(throwable);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private static class Enqueued<T> {
|
||||
private LazyValue<CompletableFuture<T>> resultFuture = LazyValue.create(CompletableFuture::new);
|
||||
private Supplier<? extends CompletionStage<T>> supplier;
|
||||
|
||||
private Enqueued(Supplier<? extends CompletionStage<T>> supplier) {
|
||||
this.supplier = supplier;
|
||||
}
|
||||
|
||||
private CompletableFuture<T> future() {
|
||||
return resultFuture.get();
|
||||
}
|
||||
|
||||
private CompletionStage<T> originalStage() {
|
||||
return supplier.get();
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder implements io.helidon.common.Builder<Bulkhead> {
|
||||
private int limit;
|
||||
private int queueLength = 10;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bulkhead build() {
|
||||
return new Bulkhead(this);
|
||||
}
|
||||
|
||||
public Builder limit(int limit) {
|
||||
this.limit = limit;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder queueLength(int queueLength) {
|
||||
this.queueLength = queueLength;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
private static class NoQueue extends ArrayDeque<Enqueued<?>> {
|
||||
@Override
|
||||
public boolean offer(Enqueued<?> enqueued) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Enqueued<?> poll() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
public class BulkheadException extends RuntimeException {
|
||||
public BulkheadException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,220 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import io.helidon.common.LazyValue;
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
import static io.helidon.faulttolerance.ResultWindow.Result.FAILURE;
|
||||
import static io.helidon.faulttolerance.ResultWindow.Result.SUCCESS;
|
||||
|
||||
public class CircuitBreaker implements Handler {
|
||||
/*
|
||||
Configuration options
|
||||
*/
|
||||
private final LazyValue<? extends ScheduledExecutorService> executor;
|
||||
// how long to transition from open to half-open
|
||||
private final long delayMillis;
|
||||
// how many successful calls will close a half-open breaker
|
||||
private final int successThreshold;
|
||||
|
||||
/*
|
||||
Runtime
|
||||
*/
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
|
||||
// rolling window for counting errors to (maybe) open the breaker
|
||||
private final ResultWindow results;
|
||||
// to close from half-open
|
||||
private final AtomicInteger successCounter = new AtomicInteger();
|
||||
private final AtomicBoolean halfOpenInProgress = new AtomicBoolean();
|
||||
private final AtomicReference<ScheduledFuture<Boolean>> schedule = new AtomicReference<>();
|
||||
|
||||
private CircuitBreaker(Builder builder) {
|
||||
this.delayMillis = builder.delay.toMillis();
|
||||
this.successThreshold = builder.successThreshold;
|
||||
this.results = new ResultWindow(builder.volume, builder.ratio);
|
||||
this.executor = builder.executor();
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
|
||||
if (state.get() == State.CLOSED) {
|
||||
// run it!
|
||||
CompletionStage<T> result = supplier.get();
|
||||
result.handle((it, exception) -> {
|
||||
if (exception == null) {
|
||||
// success
|
||||
results.update(SUCCESS);
|
||||
} else {
|
||||
results.update(FAILURE);
|
||||
if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) {
|
||||
results.reset();
|
||||
// if we successfully switch to open, we need to schedule switch to half-open
|
||||
scheduleHalf();
|
||||
}
|
||||
}
|
||||
|
||||
return it;
|
||||
});
|
||||
return Single.create(result);
|
||||
} else if (state.get() == State.OPEN) {
|
||||
// fail it!
|
||||
return Single.error(new CircuitBreakerOpenException("CircuitBreaker is open"));
|
||||
} else {
|
||||
// half-open
|
||||
if (halfOpenInProgress.compareAndSet(false, true)) {
|
||||
CompletionStage<T> result = supplier.get();
|
||||
result.handle((it, exception) -> {
|
||||
if (exception == null) {
|
||||
// success
|
||||
int successes = successCounter.incrementAndGet();
|
||||
if (successes >= successThreshold) {
|
||||
// transition to closed
|
||||
successCounter.set(0);
|
||||
state.compareAndSet(State.HALF_OPEN, State.CLOSED);
|
||||
halfOpenInProgress.set(false);
|
||||
}
|
||||
halfOpenInProgress.set(false);
|
||||
} else {
|
||||
// failure
|
||||
successCounter.set(0);
|
||||
state.set(State.OPEN);
|
||||
halfOpenInProgress.set(false);
|
||||
// if we successfully switch to open, we need to schedule switch to half-open
|
||||
scheduleHalf();
|
||||
}
|
||||
|
||||
return it;
|
||||
});
|
||||
return Single.create(result);
|
||||
} else {
|
||||
return Single
|
||||
.error(new CircuitBreakerOpenException("CircuitBreaker is half open, parallel execution in progress"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleHalf() {
|
||||
schedule.set(executor.get()
|
||||
.schedule(() -> {
|
||||
state.compareAndSet(State.OPEN, State.HALF_OPEN);
|
||||
schedule.set(null);
|
||||
return true;
|
||||
}, delayMillis, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
public State state() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
public void state(State newState) {
|
||||
if (newState == State.CLOSED) {
|
||||
if (state.get() == State.CLOSED) {
|
||||
// fine
|
||||
resetCounters();
|
||||
return;
|
||||
}
|
||||
|
||||
ScheduledFuture<Boolean> future = schedule.getAndSet(null);
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
}
|
||||
resetCounters();
|
||||
state.set(State.CLOSED);
|
||||
} else if (newState == State.OPEN) {
|
||||
state.set(State.OPEN);
|
||||
ScheduledFuture<Boolean> future = schedule.getAndSet(null);
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
}
|
||||
resetCounters();
|
||||
} else {
|
||||
// half open
|
||||
resetCounters();
|
||||
}
|
||||
}
|
||||
|
||||
private void resetCounters() {
|
||||
results.reset();
|
||||
successCounter.set(0);
|
||||
}
|
||||
|
||||
public static class Builder implements io.helidon.common.Builder<CircuitBreaker> {
|
||||
// how long to transition from open to half-open
|
||||
private Duration delay = Duration.ofSeconds(5);
|
||||
// how many percents of failures will open the breaker
|
||||
private int ratio = 60;
|
||||
// how many successful calls will close a half-open breaker
|
||||
private int successThreshold = 1;
|
||||
// rolling window size to
|
||||
private int volume = 10;
|
||||
private LazyValue<? extends ScheduledExecutorService> executor = FaultTolerance.scheduledExecutor();
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public CircuitBreaker build() {
|
||||
return new CircuitBreaker(this);
|
||||
}
|
||||
|
||||
public Builder delay(Duration delay) {
|
||||
this.delay = delay;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder ratio(int ratio) {
|
||||
this.ratio = ratio;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder successThreshold(int successThreshold) {
|
||||
this.successThreshold = successThreshold;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder volume(int volume) {
|
||||
this.volume = volume;
|
||||
return this;
|
||||
}
|
||||
|
||||
LazyValue<? extends ScheduledExecutorService> executor() {
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
|
||||
public enum State {
|
||||
CLOSED,
|
||||
HALF_OPEN,
|
||||
OPEN
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
public class CircuitBreakerOpenException extends RuntimeException {
|
||||
public CircuitBreakerOpenException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
public class Fallback<T> implements TypedHandler<T> {
|
||||
private final Function<Throwable, ? extends CompletionStage<T>> fallback;
|
||||
|
||||
private Fallback(Builder<T> builder) {
|
||||
this.fallback = builder.fallback;
|
||||
}
|
||||
|
||||
public static <T> Builder<T> builder() {
|
||||
return new Builder<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
|
||||
supplier.get()
|
||||
.thenAccept(future::complete)
|
||||
.exceptionally(throwable -> {
|
||||
Throwable cause = FaultTolerance.getCause(throwable);
|
||||
fallback.apply(cause)
|
||||
.thenAccept(future::complete)
|
||||
.exceptionally(t2 -> {
|
||||
Throwable cause2 = FaultTolerance.getCause(t2);
|
||||
cause2.addSuppressed(throwable);
|
||||
future.completeExceptionally(cause2);
|
||||
return null;
|
||||
});
|
||||
return null;
|
||||
});
|
||||
|
||||
return Single.create(future);
|
||||
}
|
||||
|
||||
public static class Builder<T> implements io.helidon.common.Builder<Fallback<T>> {
|
||||
private Function<Throwable, ? extends CompletionStage<T>> fallback;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Fallback<T> build() {
|
||||
Objects.requireNonNull(fallback, "Fallback method must be specified");
|
||||
return new Fallback<>(this);
|
||||
}
|
||||
|
||||
public Builder<T> fallback(Function<Throwable, ? extends CompletionStage<T>> fallback) {
|
||||
this.fallback = fallback;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,250 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import io.helidon.common.LazyValue;
|
||||
import io.helidon.common.configurable.ScheduledThreadPoolSupplier;
|
||||
import io.helidon.common.configurable.ThreadPoolSupplier;
|
||||
import io.helidon.common.reactive.Single;
|
||||
import io.helidon.config.Config;
|
||||
|
||||
/**
|
||||
* Access to fault tolerance features.
|
||||
*/
|
||||
public final class FaultTolerance {
|
||||
private static final AtomicReference<LazyValue<? extends ScheduledExecutorService>> SCHEDULED_EXECUTOR =
|
||||
new AtomicReference<>();
|
||||
private static final AtomicReference<LazyValue<ExecutorService>> EXECUTOR = new AtomicReference<>();
|
||||
private static final AtomicReference<Config> CONFIG = new AtomicReference<>(Config.empty());
|
||||
|
||||
static {
|
||||
SCHEDULED_EXECUTOR.set(LazyValue.create(ScheduledThreadPoolSupplier.builder()
|
||||
.threadNamePrefix("ft-schedule-")
|
||||
.config(CONFIG.get().get("scheduled-executor"))
|
||||
.build()));
|
||||
|
||||
EXECUTOR.set(LazyValue.create(ThreadPoolSupplier.builder()
|
||||
.threadNamePrefix("ft-")
|
||||
.config(CONFIG.get().get("executor"))
|
||||
.build()));
|
||||
}
|
||||
|
||||
private FaultTolerance() {
|
||||
}
|
||||
|
||||
public static void config(Config config) {
|
||||
CONFIG.set(config);
|
||||
|
||||
SCHEDULED_EXECUTOR.set(LazyValue.create(ScheduledThreadPoolSupplier.create(CONFIG.get().get("scheduled-executor"))));
|
||||
EXECUTOR.set(LazyValue.create(ThreadPoolSupplier.create(CONFIG.get().get("executor"))));
|
||||
}
|
||||
|
||||
public static void executor(Supplier<? extends ExecutorService> executor) {
|
||||
EXECUTOR.set(LazyValue.create(executor::get));
|
||||
}
|
||||
|
||||
public static void scheduledExecutor(Supplier<? extends ScheduledExecutorService> executor) {
|
||||
SCHEDULED_EXECUTOR.set(LazyValue.create(executor));
|
||||
}
|
||||
|
||||
static LazyValue<? extends ExecutorService> executor() {
|
||||
return EXECUTOR.get();
|
||||
}
|
||||
|
||||
static LazyValue<? extends ScheduledExecutorService> scheduledExecutor() {
|
||||
return SCHEDULED_EXECUTOR.get();
|
||||
}
|
||||
|
||||
public static <T> Single<T> async(Supplier<T> syncSupplier) {
|
||||
return Async.builder()
|
||||
.executor(EXECUTOR.get())
|
||||
.build()
|
||||
.invoke(syncSupplier);
|
||||
}
|
||||
|
||||
public static <T> Single<T> fallback(Supplier<? extends CompletionStage<T>> primary,
|
||||
Function<Throwable, ? extends CompletionStage<T>> fallback) {
|
||||
return Fallback.<T>builder()
|
||||
.fallback(fallback)
|
||||
.build()
|
||||
.invoke(primary);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> Single<T> retry(Supplier<? extends CompletionStage<T>> command) {
|
||||
return Retry.builder()
|
||||
.build()
|
||||
.invoke(command);
|
||||
}
|
||||
|
||||
public static <T> Single<T> timeout(Duration timeout,
|
||||
Supplier<? extends CompletionStage<T>> command) {
|
||||
Timeout ft = Timeout.builder()
|
||||
.timeout(timeout)
|
||||
.build();
|
||||
|
||||
return ft.invoke(command);
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
static Throwable getCause(Throwable throwable) {
|
||||
if (throwable instanceof CompletionException) {
|
||||
return getCause(throwable.getCause());
|
||||
}
|
||||
if (throwable instanceof ExecutionException) {
|
||||
return getCause(throwable.getCause());
|
||||
}
|
||||
return throwable;
|
||||
}
|
||||
|
||||
abstract static class BaseBuilder<B extends BaseBuilder<B>> {
|
||||
@SuppressWarnings("unchecked")
|
||||
private B me() {
|
||||
return (B) this;
|
||||
}
|
||||
|
||||
public B addBulkhead(Bulkhead bulkhead) {
|
||||
add(bulkhead);
|
||||
return me();
|
||||
}
|
||||
|
||||
public B addBreaker(CircuitBreaker breaker) {
|
||||
add(breaker);
|
||||
return me();
|
||||
}
|
||||
|
||||
public B addTimeout(Timeout timeout) {
|
||||
add(timeout);
|
||||
return me();
|
||||
}
|
||||
|
||||
public B addRetry(Retry retry) {
|
||||
add(retry);
|
||||
return me();
|
||||
}
|
||||
|
||||
public abstract void add(Handler ft);
|
||||
}
|
||||
|
||||
public static class TypedBuilder<T> extends BaseBuilder<TypedBuilder<T>> implements io.helidon.common.Builder<TypedHandler<T>> {
|
||||
private final List<TypedHandler<T>> fts = new LinkedList<>();
|
||||
|
||||
private TypedBuilder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TypedHandler<T> build() {
|
||||
return new TypedHandlerImpl<T>(fts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(Handler ft) {
|
||||
fts.add(ft::invoke);
|
||||
}
|
||||
|
||||
public TypedBuilder<T> addFallback(Fallback<T> fallback) {
|
||||
fts.add(fallback);
|
||||
return this;
|
||||
}
|
||||
|
||||
private TypedBuilder<T> builder(Builder builder) {
|
||||
builder.fts
|
||||
.forEach(it -> {
|
||||
fts.add(it::invoke);
|
||||
});
|
||||
return this;
|
||||
}
|
||||
|
||||
private static class TypedHandlerImpl<T> implements TypedHandler<T> {
|
||||
private final List<TypedHandler<T>> validFts;
|
||||
|
||||
private TypedHandlerImpl(List<TypedHandler<T>> validFts) {
|
||||
this.validFts = new LinkedList<>(validFts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
|
||||
Supplier<? extends CompletionStage<T>> next = supplier;
|
||||
|
||||
for (TypedHandler<T> validFt : validFts) {
|
||||
final var finalNext = next;
|
||||
next = () -> validFt.invoke(finalNext);
|
||||
}
|
||||
|
||||
return Single.create(next.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder extends BaseBuilder<Builder> implements io.helidon.common.Builder<Handler> {
|
||||
private final List<Handler> fts = new LinkedList<>();
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Handler build() {
|
||||
return new HandlerImpl(fts);
|
||||
}
|
||||
|
||||
public <U> TypedBuilder<U> addFallback(Fallback<U> fallback) {
|
||||
return new TypedBuilder<U>()
|
||||
.builder(this)
|
||||
.addFallback(fallback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(Handler ft) {
|
||||
fts.add(ft);
|
||||
}
|
||||
|
||||
private static class HandlerImpl implements Handler {
|
||||
private final List<Handler> validFts;
|
||||
|
||||
private HandlerImpl(List<Handler> validFts) {
|
||||
this.validFts = new LinkedList<>(validFts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
|
||||
Supplier<? extends CompletionStage<T>> next = supplier;
|
||||
|
||||
for (Handler validFt : validFts) {
|
||||
final var finalNext = next;
|
||||
next = () -> validFt.invoke(finalNext);
|
||||
}
|
||||
|
||||
return Single.create(next.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface Handler {
|
||||
<T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier);
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Floating window of results.
|
||||
* The status is eventually consistent - there may be inconsistencies under heavier load (the sum may be off-beat
|
||||
* from the actual results due to parallel execution).
|
||||
* This should not be a significant issue, as the calculations work on a state (that may change anyway when checking
|
||||
* whether to open the circuit).
|
||||
*/
|
||||
final class ResultWindow {
|
||||
private final AtomicInteger currentSum = new AtomicInteger();
|
||||
private final AtomicCycle index;
|
||||
private final AtomicInteger[] results;
|
||||
private final int thresholdSum;
|
||||
|
||||
ResultWindow(int size, int ratio) {
|
||||
results = new AtomicInteger[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
results[i] = new AtomicInteger();
|
||||
}
|
||||
index = new AtomicCycle(size - 1);
|
||||
// calculate the sum needed to open the breaker
|
||||
int threshold = (size * ratio) / 100;
|
||||
thresholdSum = threshold == 0 ? 1 : threshold;
|
||||
|
||||
}
|
||||
|
||||
void update(Result resultEnum) {
|
||||
// success is zero, failure is 1
|
||||
int result = resultEnum.ordinal();
|
||||
|
||||
AtomicInteger mine = results[index.incrementAndGet()];
|
||||
int origValue = mine.getAndSet(result);
|
||||
|
||||
if (origValue == result) {
|
||||
// no change
|
||||
return;
|
||||
}
|
||||
|
||||
if (origValue == 1) {
|
||||
currentSum.decrementAndGet();
|
||||
} else {
|
||||
currentSum.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
boolean shouldOpen() {
|
||||
return currentSum.get() >= thresholdSum;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
// "soft" reset - send in success equal to window size
|
||||
for (int i = 0; i < results.length; i++) {
|
||||
update(Result.SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
// order is significant, do not change
|
||||
enum Result {
|
||||
SUCCESS,
|
||||
FAILURE
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,252 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import io.helidon.common.LazyValue;
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
public class Retry implements Handler {
|
||||
private final int calls;
|
||||
private final long delayMillis;
|
||||
private final long maxTimeNanos;
|
||||
private final int jitterMillis;
|
||||
private final LazyValue<? extends ScheduledExecutorService> scheduledExecutor;
|
||||
private final Random random = new Random();
|
||||
|
||||
protected Retry(Builder builder) {
|
||||
this.calls = builder.calls;
|
||||
this.delayMillis = builder.delay.toMillis();
|
||||
this.maxTimeNanos = builder.maxTime.toNanos();
|
||||
long jitter = builder.jitter.toMillis() * 2;
|
||||
this.jitterMillis = (jitter > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) jitter;
|
||||
this.scheduledExecutor = builder.scheduledExecutor;
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
|
||||
new Retrier<>(future, supplier, this)
|
||||
.retry();
|
||||
|
||||
return Single.create(future);
|
||||
}
|
||||
|
||||
protected boolean abort(Throwable throwable) {
|
||||
return false;
|
||||
}
|
||||
|
||||
private long nextDelay() {
|
||||
long delay = delayMillis;
|
||||
int jitterRandom = random.nextInt(jitterMillis) - jitterMillis;
|
||||
delay = delay + jitterRandom;
|
||||
delay = Math.max(0, delay);
|
||||
|
||||
return delay;
|
||||
}
|
||||
|
||||
private class Retrier<T> {
|
||||
private final AtomicInteger count = new AtomicInteger();
|
||||
private final AtomicReference<Throwable> lastThrowable = new AtomicReference<>();
|
||||
private final Supplier<? extends CompletionStage<T>> supplier;
|
||||
private final CompletableFuture<T> future;
|
||||
private final Retry retry;
|
||||
private final long started = System.nanoTime();
|
||||
|
||||
private Retrier(CompletableFuture<T> future,
|
||||
Supplier<? extends CompletionStage<T>> supplier,
|
||||
Retry retry) {
|
||||
this.future = future;
|
||||
this.supplier = supplier;
|
||||
this.retry = retry;
|
||||
}
|
||||
|
||||
private void retry() {
|
||||
int currentCount = count.incrementAndGet();
|
||||
|
||||
CompletionStage<T> stage = null;
|
||||
try {
|
||||
stage = supplier.get();
|
||||
} catch (Throwable e) {
|
||||
stage = CompletableFuture.failedStage(e);
|
||||
}
|
||||
|
||||
stage.handle((it, throwable) -> {
|
||||
if (throwable == null) {
|
||||
future.complete(it);
|
||||
} else {
|
||||
Throwable current = FaultTolerance.getCause(throwable);
|
||||
Throwable before = lastThrowable.get();
|
||||
if (before != null) {
|
||||
current.addSuppressed(before);
|
||||
}
|
||||
|
||||
long now = System.nanoTime();
|
||||
if (currentCount >= retry.calls || retry.abort(current)) {
|
||||
// this is final execution
|
||||
future.completeExceptionally(current);
|
||||
} else if (now - started > maxTimeNanos) {
|
||||
TimeoutException timedOut = new TimeoutException("Retries timed out");
|
||||
timedOut.addSuppressed(current);
|
||||
future.completeExceptionally(timedOut);
|
||||
} else {
|
||||
lastThrowable.set(current);
|
||||
// schedule next retry
|
||||
scheduledExecutor.get().schedule(this::retry, nextDelay(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static class RetryWithAbortOn extends Retry {
|
||||
private final Set<Class<? extends Throwable>> abortOn;
|
||||
|
||||
private RetryWithAbortOn(Builder builder) {
|
||||
super(builder);
|
||||
this.abortOn = Set.copyOf(builder.abortOn);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(Throwable throwable) {
|
||||
return abortOn.contains(throwable.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
private static class RetryWithRetryOn extends Retry {
|
||||
private final Set<Class<? extends Throwable>> retryOn;
|
||||
|
||||
private RetryWithRetryOn(Builder builder) {
|
||||
super(builder);
|
||||
this.retryOn = Set.copyOf(builder.retryOn);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(Throwable throwable) {
|
||||
return !retryOn.contains(throwable.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder implements io.helidon.common.Builder<Retry> {
|
||||
private final Set<Class<? extends Throwable>> retryOn = new HashSet<>();
|
||||
private final Set<Class<? extends Throwable>> abortOn = new HashSet<>();
|
||||
|
||||
private int calls = 3;
|
||||
private Duration delay = Duration.ofMillis(200);
|
||||
private Duration maxTime = Duration.ofSeconds(1);
|
||||
private Duration jitter = Duration.ofMillis(50);
|
||||
private LazyValue<? extends ScheduledExecutorService> scheduledExecutor = FaultTolerance.scheduledExecutor();
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Retry build() {
|
||||
calls = Math.max(1, calls);
|
||||
if (retryOn.isEmpty()) {
|
||||
if (abortOn.isEmpty()) {
|
||||
return new Retry(this);
|
||||
} else {
|
||||
return new RetryWithAbortOn(this);
|
||||
}
|
||||
} else {
|
||||
if (abortOn.isEmpty()) {
|
||||
return new RetryWithRetryOn(this);
|
||||
} else {
|
||||
throw new IllegalArgumentException("You have defined both retryOn and abortOn exceptions. "
|
||||
+ "This cannot be correctly handled; abortOn: " + abortOn
|
||||
+ " retryOn: " + retryOn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Total number of calls (first + retries).
|
||||
* @param calls how many times to call the method
|
||||
* @return updated builder instance
|
||||
*/
|
||||
public Builder calls(int calls) {
|
||||
this.calls = calls;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder delay(Duration delay) {
|
||||
this.delay = delay;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder maxTime(Duration maxTime) {
|
||||
this.maxTime = maxTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder jitter(Duration jitter) {
|
||||
this.jitter = jitter;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder retryOn(Class<? extends Throwable>... classes) {
|
||||
retryOn.clear();
|
||||
Arrays.stream(classes)
|
||||
.forEach(this::addRetryOn);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder addRetryOn(Class<? extends Throwable> clazz) {
|
||||
this.retryOn.add(clazz);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder abortOn(Class<? extends Throwable>... classes) {
|
||||
abortOn.clear();
|
||||
Arrays.stream(classes)
|
||||
.forEach(this::addAbortOn);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder addAbortOn(Class<? extends Throwable> clazz) {
|
||||
this.abortOn.add(clazz);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
|
||||
this.scheduledExecutor = LazyValue.create(scheduledExecutor);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import io.helidon.common.LazyValue;
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
public class Timeout implements Handler {
|
||||
private final Duration timeout;
|
||||
private final LazyValue<? extends ScheduledExecutorService> executor;
|
||||
|
||||
private Timeout(Builder builder) {
|
||||
this.timeout = builder.timeout;
|
||||
this.executor = builder.executor;;
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
|
||||
return Single.create(supplier.get())
|
||||
.timeout(timeout.toMillis(), TimeUnit.MILLISECONDS, executor.get());
|
||||
}
|
||||
|
||||
public static class Builder implements io.helidon.common.Builder<Timeout> {
|
||||
private Duration timeout = Duration.ofSeconds(10);
|
||||
private LazyValue<? extends ScheduledExecutorService> executor = FaultTolerance.scheduledExecutor();;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Timeout build() {
|
||||
return new Timeout(this);
|
||||
}
|
||||
|
||||
public Builder timeout(Duration timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder executor(ScheduledExecutorService executor) {
|
||||
this.executor = LazyValue.create(executor);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface TypedHandler<T> {
|
||||
Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier);
|
||||
}
|
||||
4
fault-tolerance/src/main/java/module-info.java
Normal file
4
fault-tolerance/src/main/java/module-info.java
Normal file
@@ -0,0 +1,4 @@
|
||||
module io.helidon.faulttolerance {
|
||||
requires io.helidon.config;
|
||||
requires io.helidon.common.configurable;
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
class AsyncTest {
|
||||
private final AtomicInteger syncCounter = new AtomicInteger();
|
||||
|
||||
@BeforeEach
|
||||
void reset() {
|
||||
syncCounter.set(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAsync() {
|
||||
Thread result = Async.create()
|
||||
.invoke(this::sync)
|
||||
.await(1, TimeUnit.SECONDS);
|
||||
|
||||
assertThat(result, is(not(Thread.currentThread())));
|
||||
assertThat(syncCounter.get(), is(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAsyncError() {
|
||||
Single<String> result = Async.create()
|
||||
.invoke(this::syncError);
|
||||
|
||||
CompletionException exception = assertThrows(CompletionException.class, () -> result.await(1, TimeUnit.SECONDS));
|
||||
|
||||
Throwable cause = exception.getCause();
|
||||
|
||||
assertThat(cause, notNullValue());
|
||||
assertThat(cause, instanceOf(MyException.class));
|
||||
}
|
||||
|
||||
private String syncError() {
|
||||
throw new MyException();
|
||||
}
|
||||
|
||||
private Thread sync() {
|
||||
syncCounter.incrementAndGet();
|
||||
return Thread.currentThread();
|
||||
}
|
||||
|
||||
private static class MyException extends RuntimeException {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
class AtomicCycleTest {
|
||||
@Test
|
||||
void test() {
|
||||
AtomicCycle cycle = new AtomicCycle(2);
|
||||
|
||||
assertThat(cycle.incrementAndGet(), is(0));
|
||||
assertThat(cycle.incrementAndGet(), is(1));
|
||||
assertThat(cycle.incrementAndGet(), is(2));
|
||||
assertThat(cycle.incrementAndGet(), is(0));
|
||||
assertThat(cycle.incrementAndGet(), is(1));
|
||||
assertThat(cycle.incrementAndGet(), is(2));
|
||||
assertThat(cycle.incrementAndGet(), is(0));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,132 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
class BulkheadTest {
|
||||
@Test
|
||||
void testBulkhead() throws InterruptedException {
|
||||
Bulkhead bulkhead = Bulkhead.builder()
|
||||
.limit(1)
|
||||
.queueLength(1)
|
||||
.build();
|
||||
|
||||
Request inProgress = new Request(0);
|
||||
Request enqueued = new Request(1);
|
||||
Request rejected = new Request(2);
|
||||
|
||||
Single<Integer> inProgressResult = bulkhead.invoke(inProgress::invoke);
|
||||
Single<Integer> enqueuedResult = bulkhead.invoke(enqueued::invoke);
|
||||
Single<Integer> rejectedResult = bulkhead.invoke(rejected::invoke);
|
||||
|
||||
if (!inProgress.invokedCdl.await(1, TimeUnit.SECONDS)) {
|
||||
fail("Invoke method of inProgress was not called");
|
||||
}
|
||||
assertThat(inProgress.invoked.get(), is(true));
|
||||
assertThat(inProgress.indexed.get(), is(false));
|
||||
assertThat(enqueued.invoked.get(), is(false));
|
||||
assertThat(enqueued.indexed.get(), is(false));
|
||||
assertThat(rejected.invoked.get(), is(false));
|
||||
assertThat(rejected.indexed.get(), is(false));
|
||||
|
||||
inProgress.releaseCdl.countDown();
|
||||
if (!enqueued.invokedCdl.await(1, TimeUnit.SECONDS)) {
|
||||
fail("Invoke method of enqueued was not called");
|
||||
}
|
||||
assertThat(inProgress.indexed.get(), is(true));
|
||||
assertThat(enqueued.invoked.get(), is(true));
|
||||
assertThat(enqueued.indexed.get(), is(false));
|
||||
assertThat(rejected.invoked.get(), is(false));
|
||||
assertThat(rejected.indexed.get(), is(false));
|
||||
|
||||
enqueued.releaseCdl.countDown();
|
||||
|
||||
assertThat(inProgressResult.await(1, TimeUnit.SECONDS), is(0));
|
||||
assertThat(enqueuedResult.await(1, TimeUnit.SECONDS), is(1));
|
||||
CompletionException completionException = assertThrows(CompletionException.class,
|
||||
() -> rejectedResult.await(1, TimeUnit.SECONDS));
|
||||
Throwable cause = completionException.getCause();
|
||||
|
||||
assertThat(cause, notNullValue());
|
||||
assertThat(cause, instanceOf(BulkheadException.class));
|
||||
assertThat(cause.getMessage(), is("Bulkhead queue is full"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testBulkheadWithError() throws InterruptedException {
|
||||
Bulkhead bulkhead = Bulkhead.builder()
|
||||
.limit(1)
|
||||
.queueLength(1)
|
||||
.build();
|
||||
Single<Object> result = bulkhead.invoke(() -> Single.error(new IllegalStateException()));
|
||||
FaultToleranceTest.completionException(result, IllegalStateException.class);
|
||||
|
||||
Request inProgress = new Request(0);
|
||||
bulkhead.invoke(inProgress::invoke);
|
||||
|
||||
// queued
|
||||
result = bulkhead.invoke(() -> Single.error(new IllegalStateException()));
|
||||
inProgress.releaseCdl.countDown();
|
||||
|
||||
FaultToleranceTest.completionException(result, IllegalStateException.class);
|
||||
|
||||
}
|
||||
|
||||
private static class Request {
|
||||
private final CountDownLatch releaseCdl = new CountDownLatch(1);
|
||||
private final CountDownLatch invokedCdl = new CountDownLatch(1);
|
||||
private final AtomicBoolean invoked = new AtomicBoolean();
|
||||
private final AtomicBoolean indexed = new AtomicBoolean();
|
||||
|
||||
private final int index;
|
||||
|
||||
Request(int index) {
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
CompletionStage<Integer> invoke() {
|
||||
invoked.set(true);
|
||||
invokedCdl.countDown();
|
||||
return FaultTolerance.async(this::index);
|
||||
}
|
||||
|
||||
private int index() {
|
||||
try {
|
||||
releaseCdl.await();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
indexed.set(true);
|
||||
return index;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,145 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
class CircuitBreakerTest {
|
||||
@Test
|
||||
void testCircuitBreaker() throws InterruptedException {
|
||||
CircuitBreaker breaker = CircuitBreaker.builder()
|
||||
.volume(10)
|
||||
.ratio(20)
|
||||
.delay(Duration.ofMillis(200))
|
||||
.successThreshold(2)
|
||||
.build();
|
||||
|
||||
good(breaker);
|
||||
good(breaker);
|
||||
|
||||
bad(breaker);
|
||||
|
||||
good(breaker);
|
||||
|
||||
// should open the breaker
|
||||
bad(breaker);
|
||||
|
||||
breakerOpen(breaker);
|
||||
|
||||
assertThat(breaker.state(), is(CircuitBreaker.State.OPEN));
|
||||
|
||||
// need to wait until half open
|
||||
int count = 0;
|
||||
while(count++ < 10) {
|
||||
Thread.sleep(50);
|
||||
if(breaker.state() == CircuitBreaker.State.HALF_OPEN) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(breaker.state(), is(CircuitBreaker.State.HALF_OPEN));
|
||||
|
||||
good(breaker);
|
||||
good(breaker);
|
||||
|
||||
assertThat(breaker.state(), is(CircuitBreaker.State.CLOSED));
|
||||
|
||||
// should open the breaker
|
||||
bad(breaker);
|
||||
bad(breaker);
|
||||
|
||||
assertThat(breaker.state(), is(CircuitBreaker.State.OPEN));
|
||||
|
||||
// need to wait until half open
|
||||
count = 0;
|
||||
while(count++ < 10) {
|
||||
Thread.sleep(50);
|
||||
if(breaker.state() == CircuitBreaker.State.HALF_OPEN) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
good(breaker);
|
||||
bad(breaker);
|
||||
|
||||
assertThat(breaker.state(), is(CircuitBreaker.State.OPEN));
|
||||
}
|
||||
|
||||
private void breakerOpen(CircuitBreaker breaker) {
|
||||
Request good = new Request();
|
||||
Single<Integer> result = breaker.invoke(good::invoke);
|
||||
CompletionException exception = assertThrows(CompletionException.class, () -> result.await(1, TimeUnit.SECONDS));
|
||||
|
||||
Throwable cause = exception.getCause();
|
||||
|
||||
assertThat(cause, notNullValue());
|
||||
assertThat(cause, instanceOf(CircuitBreakerOpenException.class));
|
||||
}
|
||||
|
||||
private void bad(CircuitBreaker breaker) {
|
||||
Failing failing = new Failing(new IllegalStateException("Fail"));
|
||||
Single<Integer> failedResult = breaker.invoke(failing::invoke);
|
||||
CompletionException exception = assertThrows(CompletionException.class, () -> failedResult.await(1, TimeUnit.SECONDS));
|
||||
|
||||
Throwable cause = exception.getCause();
|
||||
|
||||
assertThat(cause, notNullValue());
|
||||
assertThat(cause, instanceOf(IllegalStateException.class));
|
||||
|
||||
}
|
||||
|
||||
private void good(CircuitBreaker breaker) {
|
||||
Request good = new Request();
|
||||
Single<Integer> result = breaker.invoke(good::invoke);
|
||||
result.await(1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private static class Failing {
|
||||
private final Exception exception;
|
||||
|
||||
Failing(Exception exception) {
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
CompletionStage<Integer> invoke() {
|
||||
return CompletableFuture.failedFuture(exception);
|
||||
}
|
||||
}
|
||||
|
||||
private static class Request {
|
||||
Request() {
|
||||
}
|
||||
|
||||
CompletionStage<Integer> invoke() {
|
||||
return CompletableFuture.completedFuture(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import io.helidon.common.reactive.Single;
|
||||
import io.helidon.config.ConfigException;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
class FallbackTest {
|
||||
private final AtomicInteger primaryCounter = new AtomicInteger();
|
||||
private final AtomicInteger fallbackCounter = new AtomicInteger();
|
||||
|
||||
@BeforeEach
|
||||
void reset() {
|
||||
primaryCounter.set(0);
|
||||
fallbackCounter.set(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFallback() {
|
||||
String result = FaultTolerance.fallback(this::primary, this::fallback)
|
||||
.await(1, TimeUnit.SECONDS);
|
||||
|
||||
assertThat(result, is("fallback"));
|
||||
assertThat(primaryCounter.get(), is(1));
|
||||
assertThat(fallbackCounter.get(), is(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFallbackFails() {
|
||||
Single<String> result = FaultTolerance.fallback(this::primary, this::fallbackFail);
|
||||
ConfigException exception = FaultToleranceTest.completionException(result, ConfigException.class);
|
||||
Throwable[] suppressed = exception.getSuppressed();
|
||||
assertThat("Should have a suppressed exception: " + Arrays.toString(suppressed), suppressed.length, is(1));
|
||||
assertThat(suppressed[0], instanceOf(CompletionException.class));
|
||||
Throwable suppressedCause = suppressed[0].getCause();
|
||||
assertThat(suppressedCause, instanceOf(IllegalArgumentException.class));
|
||||
}
|
||||
|
||||
private Single<String> primary() {
|
||||
primaryCounter.incrementAndGet();
|
||||
return Single.error(new IllegalArgumentException("Intentional failure"));
|
||||
}
|
||||
|
||||
private Single<String> fallback(Throwable throwable) {
|
||||
fallbackCounter.incrementAndGet();
|
||||
return Single.just("fallback");
|
||||
}
|
||||
|
||||
private Single<String> fallbackFail(Throwable throwable) {
|
||||
fallbackCounter.incrementAndGet();
|
||||
return Single.error(new ConfigException("Intentional failure"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class FaultToleranceTest {
|
||||
|
||||
@Test
|
||||
void testCustomCombination() {
|
||||
CircuitBreaker breaker = CircuitBreaker.builder()
|
||||
.build();
|
||||
|
||||
Bulkhead bulkhead = Bulkhead.builder()
|
||||
.limit(1)
|
||||
.queueLength(0)
|
||||
.build();
|
||||
|
||||
TypedHandler<String> faultTolerance = FaultTolerance.builder()
|
||||
.addBreaker(breaker)
|
||||
.addBulkhead(bulkhead)
|
||||
.addTimeout(Timeout.builder().timeout(Duration.ofMillis(100)).build())
|
||||
.addFallback(Fallback.<String>builder()
|
||||
.fallback(this::fallback)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
Single<String> result = faultTolerance.invoke(this::primary);
|
||||
assertThat(result.await(1, TimeUnit.SECONDS), is(MyException.class.getName()));
|
||||
|
||||
breaker.state(CircuitBreaker.State.OPEN);
|
||||
result = faultTolerance.invoke(this::primary);
|
||||
assertThat(result.await(1, TimeUnit.SECONDS), is(CircuitBreakerOpenException.class.getName()));
|
||||
|
||||
breaker.state(CircuitBreaker.State.CLOSED);
|
||||
|
||||
Manual m = new Manual();
|
||||
Single<String> manualResult = bulkhead.invoke(m::call);
|
||||
|
||||
result = faultTolerance.invoke(this::primary);
|
||||
assertThat(result.await(1, TimeUnit.SECONDS), is(BulkheadException.class.getName()));
|
||||
|
||||
m.future.complete("result");
|
||||
manualResult.await(1, TimeUnit.SECONDS);
|
||||
|
||||
m = new Manual();
|
||||
result = faultTolerance.invoke(m::call);
|
||||
assertThat(result.await(1, TimeUnit.SECONDS), is(TimeoutException.class.getName()));
|
||||
|
||||
m.future.complete("hu");
|
||||
}
|
||||
|
||||
private Single<String> primary() {
|
||||
return Single.error(new MyException());
|
||||
}
|
||||
|
||||
private Single<String> fallback(Throwable throwable) {
|
||||
return Single.just(throwable.getClass().getName());
|
||||
}
|
||||
|
||||
static <T extends Throwable> T completionException(Single<?> result, Class<T> expected) {
|
||||
CompletionException completionException = assertThrows(CompletionException.class,
|
||||
() -> result.await(1, TimeUnit.SECONDS));
|
||||
Throwable cause = completionException.getCause();
|
||||
assertThat(cause, notNullValue());
|
||||
assertThat(cause, instanceOf(expected));
|
||||
|
||||
return expected.cast(cause);
|
||||
}
|
||||
|
||||
private class Manual {
|
||||
private final CompletableFuture<String> future = new CompletableFuture<>();
|
||||
|
||||
private CompletionStage<String> call() {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
||||
private static class MyException extends RuntimeException {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
class ResultWindowTest {
|
||||
@Test
|
||||
void test() {
|
||||
ResultWindow window = new ResultWindow(10, 10);
|
||||
assertThat("Empty should not open", window.shouldOpen(), is(false));
|
||||
window.update(ResultWindow.Result.SUCCESS);
|
||||
window.update(ResultWindow.Result.SUCCESS);
|
||||
window.update(ResultWindow.Result.SUCCESS);
|
||||
assertThat("Only success should not open", window.shouldOpen(), is(false));
|
||||
window.update(ResultWindow.Result.FAILURE);
|
||||
assertThat("Should open on first failure (10% of 10 size)", window.shouldOpen(), is(true));
|
||||
//now cycle through window and replace all with success
|
||||
for (int i = 0; i < 10; i++) {
|
||||
window.update(ResultWindow.Result.SUCCESS);
|
||||
}
|
||||
assertThat("All success should not open", window.shouldOpen(), is(false));
|
||||
window.update(ResultWindow.Result.FAILURE);
|
||||
assertThat("Should open on first failure (10% of 10 size)", window.shouldOpen(), is(true));
|
||||
window.reset();
|
||||
assertThat("Should not open after reset", window.shouldOpen(), is(false));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,166 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import io.helidon.common.reactive.Single;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
class RetryTest {
|
||||
@Test
|
||||
void testRetry() {
|
||||
Retry retry = Retry.builder()
|
||||
.calls(3)
|
||||
.delay(Duration.ofMillis(50))
|
||||
.maxTime(Duration.ofMillis(500))
|
||||
.jitter(Duration.ofMillis(50))
|
||||
.build();
|
||||
|
||||
Request req = new Request(3, new TerminalException(), new RetryException());
|
||||
Single<Integer> result = retry.invoke(req::invoke);
|
||||
FaultToleranceTest.completionException(result, TerminalException.class);
|
||||
assertThat(req.call.get(), is(3));
|
||||
|
||||
req = new Request(2, new TerminalException(), new RetryException());
|
||||
result = retry.invoke(req::invoke);
|
||||
int count = result.await(1, TimeUnit.SECONDS);
|
||||
assertThat(count, is(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRetryOn() {
|
||||
Retry retry = Retry.builder()
|
||||
.calls(3)
|
||||
.delay(Duration.ofMillis(100))
|
||||
.maxTime(Duration.ofMillis(500))
|
||||
.jitter(Duration.ofMillis(50))
|
||||
.addRetryOn(RetryException.class)
|
||||
.build();
|
||||
|
||||
Request req = new Request(3, new RetryException(), new RetryException());
|
||||
Single<Integer> result = retry.invoke(req::invoke);
|
||||
FaultToleranceTest.completionException(result, RetryException.class);
|
||||
assertThat(req.call.get(), is(3));
|
||||
|
||||
req = new Request(2, new RetryException(), new TerminalException());
|
||||
result = retry.invoke(req::invoke);
|
||||
FaultToleranceTest.completionException(result, TerminalException.class);
|
||||
assertThat(req.call.get(), is(2));
|
||||
|
||||
req = new Request(2, new RetryException(), new RetryException());
|
||||
result = retry.invoke(req::invoke);
|
||||
int count = result.await(1, TimeUnit.SECONDS);
|
||||
assertThat(count, is(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAbortOn() {
|
||||
Retry retry = Retry.builder()
|
||||
.calls(3)
|
||||
.delay(Duration.ofMillis(100))
|
||||
.maxTime(Duration.ofMillis(500))
|
||||
.jitter(Duration.ofMillis(50))
|
||||
.addAbortOn(TerminalException.class)
|
||||
.build();
|
||||
|
||||
Request req = new Request(3, new RetryException(), new RetryException());
|
||||
Single<Integer> result = retry.invoke(req::invoke);
|
||||
FaultToleranceTest.completionException(result, RetryException.class);
|
||||
assertThat(req.call.get(), is(3));
|
||||
|
||||
req = new Request(2, new RetryException(), new TerminalException());
|
||||
result = retry.invoke(req::invoke);
|
||||
FaultToleranceTest.completionException(result, TerminalException.class);
|
||||
assertThat(req.call.get(), is(2));
|
||||
|
||||
req = new Request(2, new RetryException(), new RetryException());
|
||||
result = retry.invoke(req::invoke);
|
||||
int count = result.await(1, TimeUnit.SECONDS);
|
||||
assertThat(count, is(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testTimeout() {
|
||||
Retry retry = Retry.builder()
|
||||
.calls(3)
|
||||
.delay(Duration.ofMillis(50))
|
||||
.maxTime(Duration.ofMillis(45))
|
||||
.jitter(Duration.ofMillis(1))
|
||||
.build();
|
||||
|
||||
Request req = new Request(3, new RetryException(), new RetryException());
|
||||
Single<Integer> result = retry.invoke(req::invoke);
|
||||
FaultToleranceTest.completionException(result, TimeoutException.class);
|
||||
assertThat(req.call.get(), is(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testBadConfiguration() {
|
||||
Retry.Builder builder = Retry.builder()
|
||||
.retryOn(RetryException.class)
|
||||
.abortOn(TerminalException.class);
|
||||
|
||||
assertThrows(IllegalArgumentException.class, builder::build);
|
||||
}
|
||||
|
||||
|
||||
private static class Request {
|
||||
private final AtomicInteger call = new AtomicInteger();
|
||||
private final int failures;
|
||||
private final RuntimeException first;
|
||||
private final RuntimeException second;
|
||||
|
||||
private Request(int failures, RuntimeException first, RuntimeException second) {
|
||||
this.failures = failures;
|
||||
this.first = first;
|
||||
this.second = second;
|
||||
}
|
||||
|
||||
CompletionStage<Integer> invoke() {
|
||||
//failures 1
|
||||
// call
|
||||
int now = call.incrementAndGet();
|
||||
if (now <= failures) {
|
||||
if (now == 1) {
|
||||
throw first;
|
||||
} else if (now == 2) {
|
||||
throw second;
|
||||
} else {
|
||||
throw first;
|
||||
}
|
||||
}
|
||||
return Single.just(now);
|
||||
}
|
||||
}
|
||||
|
||||
private static class RetryException extends RuntimeException {
|
||||
}
|
||||
|
||||
private static class TerminalException extends RuntimeException {
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.faulttolerance;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
class TimeoutTest {
|
||||
|
||||
@BeforeEach
|
||||
void reset() {
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAsync() {
|
||||
CompletionException exc = assertThrows(CompletionException.class,
|
||||
() -> FaultTolerance.timeout(Duration.ofSeconds(1), this::timeOut)
|
||||
.await());
|
||||
|
||||
Throwable cause = exc.getCause();
|
||||
assertThat(cause, notNullValue());
|
||||
assertThat(cause, instanceOf(TimeoutException.class));
|
||||
}
|
||||
|
||||
private CompletionStage<String> timeOut() {
|
||||
// never completing
|
||||
return new CompletableFuture<>();
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user