CompositeFuture now delegates handler management to a Promise<CompositeFuture> in order to benefit from multiple handler support built-in Promise.

This commit is contained in:
Julien Viet
2020-01-09 16:06:35 +01:00
parent 950b1b96be
commit 9b4e105d5e
2 changed files with 68 additions and 62 deletions

View File

@@ -15,6 +15,7 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import java.util.function.Function;
@@ -23,35 +24,31 @@ import java.util.function.Function;
*/
public class CompositeFutureImpl implements CompositeFuture {
private static final Handler<AsyncResult<CompositeFuture>> NO_HANDLER = c -> {};
public static CompositeFuture all(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length;
for (int i = 0; i < len; i++) {
results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
for (Future<?> result : results) {
result.setHandler(ar -> {
if (ar.succeeded()) {
synchronized (composite) {
composite.count++;
if (!composite.isComplete() && composite.count == len) {
handler = composite.setCompleted(null);
if (composite.isComplete() || composite.count < len) {
return;
}
}
composite.succeed();
} else {
synchronized (composite) {
if (!composite.isComplete()) {
handler = composite.setCompleted(ar.cause());
if (composite.isComplete()) {
return;
}
}
}
if (handler != null) {
handler.handle(composite);
composite.fail(ar.cause());
}
});
}
if (len == 0) {
composite.setCompleted(null);
composite.succeed();
}
return composite;
}
@@ -59,99 +56,85 @@ public class CompositeFutureImpl implements CompositeFuture {
public static CompositeFuture any(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length;
for (int i = 0;i < len;i++) {
results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
for (Future<?> result : results) {
result.setHandler(ar -> {
if (ar.succeeded()) {
synchronized (composite) {
if (!composite.isComplete()) {
handler = composite.setCompleted(null);
if (composite.isComplete()) {
return;
}
}
composite.succeed();
} else {
synchronized (composite) {
composite.count++;
if (!composite.isComplete() && composite.count == len) {
handler = composite.setCompleted(ar.cause());
if (composite.isComplete() || composite.count < len) {
return;
}
}
}
if (handler != null) {
handler.handle(composite);
composite.fail(ar.cause());
}
});
}
if (results.length == 0) {
composite.setCompleted(null);
composite.succeed();
}
return composite;
}
private static final Function<CompositeFuture, Throwable> ALL = cf -> {
private static final Function<CompositeFuture, Object> ALL = cf -> {
int size = cf.size();
for (int i = 0;i < size;i++) {
if (!cf.succeeded(i)) {
return cf.cause(i);
}
}
return null;
return cf;
};
public static CompositeFuture join(Future<?>... results) {
return join(ALL, results);
}
private static CompositeFuture join(Function<CompositeFuture, Throwable> pred, Future<?>... results) {
private static CompositeFuture join(Function<CompositeFuture, Object> pred, Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length;
for (int i = 0; i < len; i++) {
results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
for (Future<?> result : results) {
result.setHandler(ar -> {
synchronized (composite) {
composite.count++;
if (!composite.isComplete() && composite.count == len) {
// Take decision here
Throwable failure = pred.apply(composite);
handler = composite.setCompleted(failure);
if (composite.isComplete() || composite.count < len) {
return;
}
}
if (handler != null) {
handler.handle(composite);
}
composite.complete(pred.apply(composite));
});
}
if (len == 0) {
composite.setCompleted(null);
composite.succeed();
}
return composite;
}
private final Future[] results;
private int count;
private boolean completed;
private Throwable cause;
private Handler<AsyncResult<CompositeFuture>> handler;
private Object result;
private Promise<CompositeFuture> promise;
private CompositeFutureImpl(Future<?>... results) {
this.results = results;
this.promise = Promise.promise();
}
@Override
public CompositeFuture onComplete(Handler<AsyncResult<CompositeFuture>> handler) {
boolean call;
synchronized (this) {
this.handler = handler;
call = completed;
}
if (call) {
handler.handle(this);
}
promise.future().onComplete(handler);
return this;
}
@Override
public synchronized Handler<AsyncResult<CompositeFuture>> getHandler() {
return this.handler;
return promise.future().getHandler();
}
@Override
@@ -193,37 +176,41 @@ public class CompositeFutureImpl implements CompositeFuture {
@Override
public synchronized boolean isComplete() {
return completed;
return result != null;
}
@Override
public synchronized boolean succeeded() {
return completed && cause == null;
return result == this;
}
@Override
public synchronized boolean failed() {
return completed && cause != null;
return result instanceof Throwable;
}
@Override
public synchronized Throwable cause() {
return completed && cause != null ? cause : null;
return result instanceof Throwable ? (Throwable) result : null;
}
@Override
public synchronized CompositeFuture result() {
return completed && cause == null ? this : null;
return result == this ? this : null;
}
private Handler<AsyncResult<CompositeFuture>> setCompleted(Throwable cause) {
private void succeed() {
complete(this);
}
private void fail(Throwable t) {
complete(t);
}
private void complete(Object result) {
synchronized (this) {
if (completed) {
return null;
}
this.completed = true;
this.cause = cause;
return handler != null ? handler : NO_HANDLER;
this.result = result;
}
promise.handle(this);
}
}

View File

@@ -539,6 +539,25 @@ public class FutureTest extends VertxTestBase {
assertEquals(Arrays.asList("foo", 4), composite.list());
}
@Test
public void testCompositeFutureMulti() {
Promise<String> p1 = Promise.promise();
Future<String> f1 = p1.future();
Promise<Integer> p2 = Promise.promise();
Future<Integer> f2 = p2.future();
CompositeFuture composite = CompositeFuture.all(f1, f2);
AtomicInteger count = new AtomicInteger();
composite.onComplete(ar -> {
count.compareAndSet(0, 1);
});
composite.onComplete(ar -> {
count.compareAndSet(1, 2);
});
p1.complete("foo");
p2.complete(4);
assertEquals(2, count.get());
}
@Test
public void testComposeSuccessToSuccess() {
AtomicReference<String> ref = new AtomicReference<>();