Implementation of MP FT 2.1.1 using FT SE (#2348)

* Replacing FailSafe and Hystrix by our own implementation of FT primitives. Some minor changes to our first version of these primitive operations was necessary to be fully compatible with MP and pass all the TCKs.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
This commit is contained in:
Santiago Pericasgeertsen
2020-09-14 13:20:46 -04:00
committed by GitHub
parent ec0a12600d
commit 74956be772
63 changed files with 1705 additions and 2820 deletions

View File

@@ -18,6 +18,7 @@ package io.helidon.faulttolerance;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
@@ -35,14 +36,16 @@ class AsyncImpl implements Async {
CompletableFuture<T> future = new CompletableFuture<>();
AsyncTask<T> task = new AsyncTask<>(supplier, future);
Future<?> taskFuture;
try {
executor.get().submit(task);
taskFuture = executor.get().submit(task);
} catch (Throwable e) {
// rejected execution and other executor related issues
return Single.error(e);
}
return Single.create(future);
Single<T> single = Single.create(future, true);
return single.onCancel(() -> taskFuture.cancel(false)); // cancel task
}
private static class AsyncTask<T> implements Runnable {

View File

@@ -26,6 +26,14 @@ final class AtomicCycle {
this.maxIndex = maxIndex + 1;
}
int get() {
return atomicInteger.get();
}
void set(int n) {
atomicInteger.set(n);
}
int incrementAndGet() {
return atomicInteger.accumulateAndGet(maxIndex, (current, max) -> (current + 1) % max);
}

View File

@@ -125,4 +125,41 @@ public interface Bulkhead extends FtHandler {
}
}
interface Stats {
/**
* Number of concurrent executions at this time.
*
* @return concurrent executions.
*/
long concurrentExecutions();
/**
* Number of calls accepted on the bulkhead.
*
* @return calls accepted.
*/
long callsAccepted();
/**
* Number of calls rejected on the bulkhead.
*
* @return calls rejected.
*/
long callsRejected();
/**
* Size of waiting queue at this time.
*
* @return size of waiting queue.
*/
long waitingQueueSize();
}
/**
* Provides access to internal stats for this bulkhead.
*
* @return internal stats.
*/
Stats stats();
}

View File

@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.logging.Logger;
@@ -38,6 +39,10 @@ class BulkheadImpl implements Bulkhead {
private final Semaphore inProgress;
private final String name;
private final AtomicLong concurrentExecutions = new AtomicLong(0L);
private final AtomicLong callsAccepted = new AtomicLong(0L);
private final AtomicLong callsRejected = new AtomicLong(0L);
BulkheadImpl(Bulkhead.Builder builder) {
this.executor = builder.executor();
this.inProgress = new Semaphore(builder.limit(), true);
@@ -60,10 +65,36 @@ class BulkheadImpl implements Bulkhead {
return invokeTask(DelayedTask.createMulti(supplier));
}
@Override
public Stats stats() {
return new Stats() {
@Override
public long concurrentExecutions() {
return concurrentExecutions.get();
}
@Override
public long callsAccepted() {
return callsAccepted.get();
}
@Override
public long callsRejected() {
return callsRejected.get();
}
@Override
public long waitingQueueSize() {
return queue.size();
}
};
}
// this method must be called while NOT holding a permit
private <R> R invokeTask(DelayedTask<R> task) {
if (inProgress.tryAcquire()) {
LOGGER.finest(() -> name + " invoke immediate: " + task);
// free permit, we can invoke
execute(task);
return task.result();
@@ -71,9 +102,15 @@ class BulkheadImpl implements Bulkhead {
// no free permit, let's try to enqueue
if (queue.offer(task)) {
LOGGER.finest(() -> name + " enqueue: " + task);
return task.result();
R result = task.result();
if (result instanceof Single<?>) {
Single<Object> single = (Single<Object>) result;
return (R) single.onCancel(() -> queue.remove(task));
}
return result;
} else {
LOGGER.finest(() -> name + " reject: " + task);
callsRejected.incrementAndGet();
return task.error(new BulkheadException("Bulkhead queue \"" + name + "\" is full"));
}
}
@@ -81,8 +118,12 @@ class BulkheadImpl implements Bulkhead {
// this method must be called while holding a permit
private void execute(DelayedTask<?> task) {
callsAccepted.incrementAndGet();
concurrentExecutions.incrementAndGet();
task.execute()
.handle((it, throwable) -> {
concurrentExecutions.decrementAndGet();
// we do not care about execution, but let's record it in debug
LOGGER.finest(() -> name + " finished execution: " + task
+ " (" + (throwable == null ? "success" : "failure") + ")");

View File

@@ -77,21 +77,18 @@ class CircuitBreakerImpl implements CircuitBreaker {
if (state.get() == State.CLOSED) {
// run it!
CompletionStage<Void> completion = task.execute();
completion.handle((it, throwable) -> {
Throwable exception = FaultTolerance.cause(throwable);
if (exception == null || errorChecker.shouldSkip(exception)) {
// success
results.update(SUCCESS);
} else {
results.update(FAILURE);
if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) {
results.reset();
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
}
if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) {
results.reset();
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
return it;
});
return task.result();
@@ -111,18 +108,15 @@ class CircuitBreakerImpl implements CircuitBreaker {
// transition to closed
successCounter.set(0);
state.compareAndSet(State.HALF_OPEN, State.CLOSED);
halfOpenInProgress.set(false);
}
halfOpenInProgress.set(false);
} else {
// failure
successCounter.set(0);
state.set(State.OPEN);
halfOpenInProgress.set(false);
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
halfOpenInProgress.set(false);
return it;
});
return task.result();

View File

@@ -133,7 +133,7 @@ interface DelayedTask<T> {
@Override
public Single<T> result() {
return Single.create(resultFuture.get());
return Single.create(resultFuture.get(), true);
}
@Override

View File

@@ -22,25 +22,24 @@ import java.util.Set;
interface ErrorChecker {
boolean shouldSkip(Throwable throwable);
/**
* Returns ErrorChecker that skips if throwable is in skipOnSet or if applyOnSet
* is not empty and throwable is not in it. Note that if applyOnSet is empty, then
* it is equivalent to it containing {@code Throwable.class}. Sets are copied
* because they are mutable.
*
* @param skipOnSet set of throwables to skip logic on.
* @param applyOnSet set of throwables to apply logic on.
* @return An error checker.
*/
static ErrorChecker create(Set<Class<? extends Throwable>> skipOnSet, Set<Class<? extends Throwable>> applyOnSet) {
Set<Class<? extends Throwable>> skipOn = Set.copyOf(skipOnSet);
Set<Class<? extends Throwable>> applyOn = Set.copyOf(applyOnSet);
return throwable -> containsThrowable(skipOn, throwable)
|| !applyOn.isEmpty() && !containsThrowable(applyOn, throwable);
}
if (skipOn.isEmpty()) {
if (applyOn.isEmpty()) {
return throwable -> false;
} else {
return throwable -> !applyOn.contains(throwable.getClass());
}
} else {
if (applyOn.isEmpty()) {
return throwable -> skipOn.contains(throwable.getClass());
} else {
throw new IllegalArgumentException("You have defined both skip and apply set of exception classes. "
+ "This cannot be correctly handled; skipOn: " + skipOn
+ " applyOn: " + applyOn);
}
}
private static boolean containsThrowable(Set<Class<? extends Throwable>> set, Throwable throwable) {
return set.stream().anyMatch(t -> t.isAssignableFrom(throwable.getClass()));
}
}

View File

@@ -76,6 +76,6 @@ class FallbackImpl<T> implements Fallback<T> {
return null;
});
return Single.create(future);
return Single.create(future, true);
}
}

View File

@@ -121,6 +121,16 @@ public final class FaultTolerance {
return new Builder();
}
/**
* A typed builder to configure a customized sequence of fault tolerance handlers.
*
* @param <T> type of result
* @return a new builder
*/
public static <T> TypedBuilder<T> typedBuilder() {
return new TypedBuilder<>();
}
static Config config() {
return CONFIG.get();
}
@@ -266,7 +276,17 @@ public final class FaultTolerance {
next = () -> validFt.invoke(finalNext);
}
return Single.create(next.get());
return Single.create(next.get(), true);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (int i = validFts.size() - 1; i >= 0; i--) {
sb.append(validFts.get(i).toString());
sb.append("\n");
}
return sb.toString();
}
}
@@ -286,6 +306,11 @@ public final class FaultTolerance {
public Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
return handler.invokeMulti(supplier);
}
@Override
public String toString() {
return handler.getClass().getSimpleName();
}
}
}
@@ -350,7 +375,7 @@ public final class FaultTolerance {
next = () -> validFt.invoke(finalNext);
}
return Single.create(next.get());
return Single.create(next.get(), true);
}
}
}

View File

@@ -29,6 +29,7 @@ final class ResultWindow {
private final AtomicInteger currentSum = new AtomicInteger();
private final AtomicCycle index;
private final AtomicInteger[] results;
private final AtomicInteger totalResults = new AtomicInteger();
private final int thresholdSum;
ResultWindow(int size, int ratio) {
@@ -44,17 +45,18 @@ final class ResultWindow {
}
void update(Result resultEnum) {
// update total number of results
totalResults.incrementAndGet();
// success is zero, failure is 1
int result = resultEnum.ordinal();
AtomicInteger mine = results[index.incrementAndGet()];
int origValue = mine.getAndSet(result);
if (origValue == result) {
// no change
return;
}
if (origValue == 1) {
currentSum.decrementAndGet();
} else {
@@ -62,15 +64,22 @@ final class ResultWindow {
}
}
/**
* Open if we have seen enough results and we are at or over the threshold.
*
* @return outcome of test.
*/
boolean shouldOpen() {
return currentSum.get() >= thresholdSum;
return totalResults.get() >= results.length && currentSum.get() >= thresholdSum;
}
void reset() {
// "soft" reset - send in success equal to window size
for (int i = 0; i < results.length; i++) {
update(Result.SUCCESS);
results[i].set(Result.SUCCESS.ordinal());
}
currentSum.set(0);
index.set(results.length - 1);
totalResults.set(0);
}
// order is significant, do not change

View File

@@ -58,7 +58,6 @@ public interface Retry extends FtHandler {
.jitter(Duration.ofMillis(50))
.build();
private Duration overallTimeout = Duration.ofSeconds(1);
private LazyValue<? extends ScheduledExecutorService> scheduledExecutor = FaultTolerance.scheduledExecutor();
@@ -422,4 +421,12 @@ public interface Retry extends FtHandler {
}
}
}
/**
* Number of times a method called has been retried. This is a monotonically
* increasing counter over the lifetime of the handler.
*
* @return number ot times a method is retried.
*/
long retryCounter();
}

View File

@@ -37,6 +37,7 @@ class RetryImpl implements Retry {
private final ErrorChecker errorChecker;
private final long maxTimeNanos;
private final Retry.RetryPolicy retryPolicy;
private final AtomicLong retryCounter = new AtomicLong(0L);
RetryImpl(Retry.Builder builder) {
this.scheduledExecutor = builder.scheduledExecutor();
@@ -75,6 +76,10 @@ class RetryImpl implements Retry {
+ TimeUnit.NANOSECONDS.toMillis(maxTimeNanos) + " ms."));
}
if (currentCallIndex > 0) {
retryCounter.getAndIncrement();
}
DelayedTask<Single<T>> task = DelayedTask.createSingle(context.supplier);
if (delay == 0) {
task.execute();
@@ -94,7 +99,6 @@ class RetryImpl implements Retry {
}
private <T> Multi<T> retryMulti(RetryContext<? extends Flow.Publisher<T>> context) {
long delay = 0;
int currentCallIndex = context.count.getAndIncrement();
if (currentCallIndex != 0) {
@@ -114,6 +118,10 @@ class RetryImpl implements Retry {
+ TimeUnit.NANOSECONDS.toMillis(maxTimeNanos) + " ms."));
}
if (currentCallIndex > 0) {
retryCounter.getAndIncrement();
}
DelayedTask<Multi<T>> task = DelayedTask.createMulti(context.supplier);
if (delay == 0) {
task.execute();
@@ -132,6 +140,11 @@ class RetryImpl implements Retry {
});
}
@Override
public long retryCounter() {
return retryCounter.get();
}
private static class RetryContext<U> {
// retry runtime
private final long startedMillis = System.currentTimeMillis();

View File

@@ -52,6 +52,7 @@ public interface Timeout extends FtHandler {
class Builder implements io.helidon.common.Builder<Timeout> {
private Duration timeout = Duration.ofSeconds(10);
private LazyValue<? extends ScheduledExecutorService> executor = FaultTolerance.scheduledExecutor();
private boolean currentThread = false;
private Builder() {
}
@@ -72,6 +73,18 @@ public interface Timeout extends FtHandler {
return this;
}
/**
* Flag to indicate that code must be executed in current thread instead
* of in an executor's thread. This flag is {@code false} by default.
*
* @param currentThread setting for this timeout
* @return updated builder instance
*/
public Builder currentThread(boolean currentThread) {
this.currentThread = currentThread;
return this;
}
/**
* Executor service to schedule the timeout.
*
@@ -90,5 +103,9 @@ public interface Timeout extends FtHandler {
LazyValue<? extends ScheduledExecutorService> executor() {
return executor;
}
boolean currentThread() {
return currentThread;
}
}
}

View File

@@ -16,10 +16,14 @@
package io.helidon.faulttolerance;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import io.helidon.common.LazyValue;
@@ -27,23 +31,78 @@ import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
class TimeoutImpl implements Timeout {
private static final long MONITOR_THREAD_TIMEOUT = 100L;
private final long timeoutMillis;
private final LazyValue<? extends ScheduledExecutorService> executor;
private final boolean currentThread;
TimeoutImpl(Timeout.Builder builder) {
this.timeoutMillis = builder.timeout().toMillis();
this.executor = builder.executor();
this.currentThread = builder.currentThread();
}
@Override
public <T> Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
if (currentThread) {
throw new UnsupportedOperationException("Unsupported currentThread flag with Multi");
}
return Multi.create(supplier.get())
.timeout(timeoutMillis, TimeUnit.MILLISECONDS, executor.get());
}
@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
return Single.create(supplier.get())
.timeout(timeoutMillis, TimeUnit.MILLISECONDS, executor.get());
if (!currentThread) {
return Single.create(supplier.get(), true)
.timeout(timeoutMillis, TimeUnit.MILLISECONDS, executor.get());
} else {
Thread thisThread = Thread.currentThread();
CompletableFuture<Void> monitorStarted = new CompletableFuture<>();
AtomicBoolean callReturned = new AtomicBoolean(false);
// Startup monitor thread that can interrupt current thread after timeout
CompletableFuture<T> future = new CompletableFuture<>();
Timeout.builder()
.executor(executor.get()) // propagate executor
.currentThread(false)
.timeout(Duration.ofMillis(timeoutMillis))
.build()
.invoke(() -> {
monitorStarted.complete(null);
return Single.never();
})
.exceptionally(it -> {
if (callReturned.compareAndSet(false, true)) {
future.completeExceptionally(new TimeoutException("Method interrupted by timeout"));
thisThread.interrupt();
}
return null;
});
// Ensure monitor thread has started
try {
monitorStarted.get(MONITOR_THREAD_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (Exception e) {
return Single.error(new IllegalStateException("Timeout monitor thread failed to start"));
}
// Run invocation in current thread
Single<T> single = Single.create(supplier.get(), true);
callReturned.set(true);
single.whenComplete((o, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(o);
}
});
// Clear interrupted flag here -- required for uninterruptible busy loops
Thread.interrupted();
return Single.create(future, true);
}
}
}

View File

@@ -47,14 +47,14 @@ class CircuitBreakerTest {
good(breaker);
good(breaker);
bad(breaker);
good(breaker);
goodMulti(breaker);
// should open the breaker
good(breaker);
good(breaker);
good(breaker);
bad(breaker);
bad(breaker); // should open - window complete
breakerOpen(breaker);
breakerOpenMulti(breaker);
@@ -77,23 +77,19 @@ class CircuitBreakerTest {
assertThat(breaker.state(), is(CircuitBreaker.State.CLOSED));
// should open the breaker
bad(breaker);
bad(breaker);
assertThat(breaker.state(), is(CircuitBreaker.State.OPEN));
// need to wait until half open
count = 0;
while (count++ < 10) {
Thread.sleep(50);
if (breaker.state() == CircuitBreaker.State.HALF_OPEN) {
break;
}
}
good(breaker);
badMulti(breaker);
good(breaker);
bad(breaker);
good(breaker);
goodMulti(breaker);
good(breaker);
good(breaker);
good(breaker);
bad(breaker);
bad(breaker); // should open - window complete
breakerOpen(breaker);
breakerOpenMulti(breaker);
assertThat(breaker.state(), is(CircuitBreaker.State.OPEN));
}

View File

@@ -22,24 +22,66 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
class ResultWindowTest {
@Test
void test() {
ResultWindow window = new ResultWindow(10, 10);
void testNotOpenBeforeCompleteWindow() {
ResultWindow window = new ResultWindow(5, 20);
assertThat("Empty should not open", window.shouldOpen(), is(false));
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.FAILURE);
assertThat("Should not open before complete window", window.shouldOpen(), is(false));
}
@Test
void testOpenAfterCompleteWindow1() {
ResultWindow window = new ResultWindow(5, 20);
assertThat("Empty should not open", window.shouldOpen(), is(false));
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.SUCCESS);
window.update(ResultWindow.Result.SUCCESS);
window.update(ResultWindow.Result.SUCCESS);
assertThat("Should open after complete window > 20%", window.shouldOpen(), is(true));
}
@Test
void testOpenAfterCompleteWindow2() {
ResultWindow window = new ResultWindow(5, 20);
assertThat("Empty should not open", window.shouldOpen(), is(false));
window.update(ResultWindow.Result.SUCCESS);
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.SUCCESS);
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.SUCCESS);
assertThat("Should open after complete window > 20%", window.shouldOpen(), is(true));
}
@Test
void testOpenAfterCompleteWindow3() {
ResultWindow window = new ResultWindow(5, 20);
assertThat("Empty should not open", window.shouldOpen(), is(false));
window.update(ResultWindow.Result.SUCCESS);
window.update(ResultWindow.Result.SUCCESS);
window.update(ResultWindow.Result.SUCCESS);
assertThat("Only success should not open", window.shouldOpen(), is(false));
window.update(ResultWindow.Result.SUCCESS);
window.update(ResultWindow.Result.FAILURE);
assertThat("Should open on first failure (10% of 10 size)", window.shouldOpen(), is(true));
//now cycle through window and replace all with success
for (int i = 0; i < 10; i++) {
window.update(ResultWindow.Result.SUCCESS);
}
assertThat("All success should not open", window.shouldOpen(), is(false));
window.update(ResultWindow.Result.FAILURE);
assertThat("Should open on first failure (10% of 10 size)", window.shouldOpen(), is(true));
assertThat("Should open after complete window > 20%", window.shouldOpen(), is(true));
}
@Test
void testOpenAfterCompleteWindowReset() {
ResultWindow window = new ResultWindow(5, 20);
assertThat("Empty should not open", window.shouldOpen(), is(false));
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.FAILURE);
window.update(ResultWindow.Result.FAILURE);
assertThat("Should open after complete window > 20%", window.shouldOpen(), is(true));
window.reset();
assertThat("Should not open after reset", window.shouldOpen(), is(false));
assertThat("Empty should not open", window.shouldOpen(), is(false));
}
}

View File

@@ -139,15 +139,6 @@ class RetryTest {
assertThat("Should have been called twice", req.call.get(), isOneOf(1, 2));
}
@Test
void testBadConfiguration() {
Retry.Builder builder = Retry.builder()
.applyOn(RetryException.class)
.skipOn(TerminalException.class);
assertThrows(IllegalArgumentException.class, builder::build);
}
@Test
void testMultiRetriesNoFailure() throws InterruptedException {
Retry retry = Retry.builder()