Merge pull request #3226 from jponge/cs-interop

CompletionStage interoperability
This commit is contained in:
Julien Ponge
2020-01-07 11:59:15 +01:00
committed by GitHub
5 changed files with 283 additions and 4 deletions

View File

@@ -361,6 +361,34 @@ run the given function, that returns a future. When this returned future complet
In this second case, the {@link io.vertx.core.Handler} should complete the `next` future to report its success or
failure.
=== CompletionStage interoperability
The Vert.x `Future` API offers compatibility _from_ and _to_ `CompletionStage` which is the JDK interface for composable
asynchronous operations.
We can go from a Vert.x `Future` to a `CompletionStage` using the {@link io.vertx.core.Future#toCompletionStage} method, as in:
[source,$lang]
----
{@link examples.CompletionStageInteropExamples#toCS}
----
We can conversely go from a `CompletionStage` to Vert.x `Future` using {@link io.vertx.core.Future#fromCompletionStage}.
There are 2 variants:
. the first variant takes just a `CompletionStage` and calls the `Future` methods from the thread that resolves the `CompletionStage` instance, and
. the second variant takes an extra {@link io.vertx.core.Context} parameter to call the `Future` methods on a Vert.x context.
IMPORTANT: In most cases the variant with a `CompletionStage` and a `Context` is the one you will want to use to respect the Vert.x threading model,
since Vert.x `Future` are more likely to be used with Vert.x code, libraries and clients.
Here is an example of going from a `CompletionStage` to a Vert.x `Future` and dispatching on a context:
[source,$lang]
----
{@link examples.CompletionStageInteropExamples#fromCS}
----
== Verticles
Vert.x comes with a simple, scalable, _actor-like_ deployment and concurrency model out of the box that

View File

@@ -0,0 +1,57 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package examples;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
/**
* Examples of the Future / CompletionStage interoperability.
*
* @author <a href="https://julien.ponge.org/">Julien Ponge</a>
*/
public class CompletionStageInteropExamples {
public void toCS(Vertx vertx) {
Future<String> future = vertx.createDnsClient().lookup("vertx.io");
future.toCompletionStage().whenComplete((ip, err) -> {
if (err != null) {
System.err.println("Could not resolve vertx.io");
err.printStackTrace();
} else {
System.out.println("vertx.io => " + ip);
}
});
}
private Future<String> storeInDb(String key, String value) {
return Future.succeededFuture("Yo");
}
public void fromCS(Vertx vertx, CompletionStage<String> completionStage) {
Future.fromCompletionStage(completionStage, vertx.getOrCreateContext())
.flatMap(str -> {
String key = UUID.randomUUID().toString();
return storeInDb(key, str);
})
.onSuccess(str -> {
System.out.println("We have a result: " + str);
})
.onFailure(err -> {
System.err.println("We have a problem");
err.printStackTrace();
});
}
}

View File

@@ -17,6 +17,8 @@ import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.FutureFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
/**
@@ -479,6 +481,71 @@ public interface Future<T> extends AsyncResult<T> {
return (Future<T>) AsyncResult.super.otherwiseEmpty();
}
/**
* Bridges this Vert.x future to a {@link CompletionStage} instance.
* <p>
* The {@link CompletionStage} handling methods will be called from the thread that resolves this future.
*
* @return a {@link CompletionStage} that completes when this future resolves
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
default CompletionStage<T> toCompletionStage() {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
this.setHandler(ar -> {
if (ar.succeeded()) {
completableFuture.complete(ar.result());
} else {
completableFuture.completeExceptionally(ar.cause());
}
});
return completableFuture;
}
/**
* Bridges a {@link CompletionStage} object to a Vert.x future instance.
* <p>
* The Vert.x future handling methods will be called from the thread that completes {@code completionStage}.
*
* @param completionStage a completion stage
* @param <T> the result type
* @return a Vert.x future that resolves when {@code completionStage} resolves
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage) {
Promise<T> promise = Promise.promise();
completionStage.whenComplete((value, err) -> {
if (err != null) {
promise.fail(err);
} else {
promise.complete(value);
}
});
return promise.future();
}
/**
* Bridges a {@link CompletionStage} object to a Vert.x future instance.
* <p>
* The Vert.x future handling methods will be called on the provided {@code context}.
*
* @param completionStage a completion stage
* @param context a Vert.x context to dispatch to
* @param <T> the result type
* @return a Vert.x future that resolves when {@code completionStage} resolves
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage, Context context) {
Promise<T> promise = ((ContextInternal) context).promise();
completionStage.whenComplete((value, err) -> {
if (err != null) {
promise.fail(err);
} else {
promise.complete(value);
}
});
return promise.future();
}
@GenIgnore
FutureFactory factory = ServiceHelper.loadFactory(FutureFactory.class);

View File

@@ -29,7 +29,7 @@ import java.util.concurrent.RejectedExecutionException;
* @author <a href="http://tfox.org">Tim Fox</a>
*/
abstract class ContextImpl extends AbstractContext {
/**
* Execute the {@code task} disabling the thread-local association for the duration
* of the execution. {@link Vertx#currentContext()} will return {@code null},

View File

@@ -21,9 +21,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -1424,4 +1422,133 @@ public class FutureTest extends VertxTestBase {
promise.fail(failure);
await();
}
@Test
public void testToCompletionStageTrampolining() {
waitFor(2);
Thread mainThread = Thread.currentThread();
Future<String> success = Future.succeededFuture("Yo");
success.toCompletionStage()
.thenAccept(str -> {
assertEquals("Yo", str);
assertSame(mainThread, Thread.currentThread());
complete();
});
Future<String> failed = Future.failedFuture(new RuntimeException("Woops"));
failed.toCompletionStage()
.whenComplete((str, err) -> {
assertNull(str);
assertTrue(err instanceof RuntimeException);
assertEquals("Woops", err.getMessage());
assertSame(mainThread, Thread.currentThread());
complete();
});
await();
}
@Test
public void testToCompletionStageDelayedCompletion() {
waitFor(2);
Thread mainThread = Thread.currentThread();
Promise<String> willSucceed = Promise.promise();
Promise<String> willFail = Promise.promise();
willSucceed.future().toCompletionStage().whenComplete((str, err) -> {
assertEquals("Yo", str);
assertNull(err);
assertNotSame(mainThread, Thread.currentThread());
complete();
});
willFail.future().toCompletionStage().whenComplete((str, err) -> {
assertNull(str);
assertTrue(err instanceof RuntimeException);
assertEquals("Woops", err.getMessage());
assertNotSame(mainThread, Thread.currentThread());
complete();
});
disableThreadChecks();
new Thread(() -> willSucceed.complete("Yo")).start();
new Thread(() -> willFail.fail(new RuntimeException("Woops"))).start();
await();
}
@Test
public void testFromCompletionStageTrampolining() {
waitFor(2);
disableThreadChecks();
AtomicReference<Thread> successSupplierThread = new AtomicReference<>();
CompletableFuture<String> willSucceed = new CompletableFuture<>();
AtomicReference<Thread> failureSupplierThread = new AtomicReference<>();
CompletableFuture<String> willFail = new CompletableFuture<>();
Future.fromCompletionStage(willSucceed).onSuccess(str -> {
assertEquals("Ok", str);
assertSame(successSupplierThread.get(), Thread.currentThread());
complete();
});
Future.fromCompletionStage(willFail).onFailure(err -> {
assertTrue(err instanceof RuntimeException);
assertEquals("Woops", err.getMessage());
assertSame(failureSupplierThread.get(), Thread.currentThread());
complete();
});
ForkJoinPool fjp = ForkJoinPool.commonPool();
fjp.execute(() -> {
successSupplierThread.set(Thread.currentThread());
willSucceed.complete("Ok");
});
fjp.execute(() -> {
failureSupplierThread.set(Thread.currentThread());
willFail.completeExceptionally(new RuntimeException("Woops"));
});
await();
}
@Test
public void testFromCompletionStageWithContext() {
waitFor(2);
Context context = vertx.getOrCreateContext();
AtomicReference<Thread> successSupplierThread = new AtomicReference<>();
CompletableFuture<String> willSucceed = new CompletableFuture<>();
AtomicReference<Thread> failureSupplierThread = new AtomicReference<>();
CompletableFuture<String> willFail = new CompletableFuture<>();
Future.fromCompletionStage(willSucceed, context).onSuccess(str -> {
assertEquals("Ok", str);
assertNotSame(successSupplierThread.get(), Thread.currentThread());
assertEquals(context, vertx.getOrCreateContext());
assertTrue(Thread.currentThread().getName().startsWith("vert.x-eventloop-thread"));
complete();
});
Future.fromCompletionStage(willFail, context).onFailure(err -> {
assertTrue(err instanceof RuntimeException);
assertEquals("Woops", err.getMessage());
assertNotSame(failureSupplierThread.get(), Thread.currentThread());
assertEquals(context, vertx.getOrCreateContext());
assertTrue(Thread.currentThread().getName().startsWith("vert.x-eventloop-thread"));
complete();
});
ForkJoinPool fjp = ForkJoinPool.commonPool();
fjp.execute(() -> {
successSupplierThread.set(Thread.currentThread());
willSucceed.complete("Ok");
});
fjp.execute(() -> {
failureSupplierThread.set(Thread.currentThread());
willFail.completeExceptionally(new RuntimeException("Woops"));
});
await();
}
}