diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index decd3988c..d99a09ce9 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -361,6 +361,34 @@ run the given function, that returns a future. When this returned future complet In this second case, the {@link io.vertx.core.Handler} should complete the `next` future to report its success or failure. +=== CompletionStage interoperability + +The Vert.x `Future` API offers compatibility _from_ and _to_ `CompletionStage` which is the JDK interface for composable +asynchronous operations. + +We can go from a Vert.x `Future` to a `CompletionStage` using the {@link io.vertx.core.Future#toCompletionStage} method, as in: + +[source,$lang] +---- +{@link examples.CompletionStageInteropExamples#toCS} +---- + +We can conversely go from a `CompletionStage` to Vert.x `Future` using {@link io.vertx.core.Future#fromCompletionStage}. +There are 2 variants: + +. the first variant takes just a `CompletionStage` and calls the `Future` methods from the thread that resolves the `CompletionStage` instance, and +. the second variant takes an extra {@link io.vertx.core.Context} parameter to call the `Future` methods on a Vert.x context. + +IMPORTANT: In most cases the variant with a `CompletionStage` and a `Context` is the one you will want to use to respect the Vert.x threading model, +since Vert.x `Future` are more likely to be used with Vert.x code, libraries and clients. + +Here is an example of going from a `CompletionStage` to a Vert.x `Future` and dispatching on a context: + +[source,$lang] +---- +{@link examples.CompletionStageInteropExamples#fromCS} +---- + == Verticles Vert.x comes with a simple, scalable, _actor-like_ deployment and concurrency model out of the box that diff --git a/src/main/java/examples/CompletionStageInteropExamples.java b/src/main/java/examples/CompletionStageInteropExamples.java new file mode 100644 index 000000000..47e483d23 --- /dev/null +++ b/src/main/java/examples/CompletionStageInteropExamples.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package examples; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; + +import java.util.UUID; +import java.util.concurrent.CompletionStage; + +/** + * Examples of the Future / CompletionStage interoperability. + * + * @author Julien Ponge + */ +public class CompletionStageInteropExamples { + + public void toCS(Vertx vertx) { + Future future = vertx.createDnsClient().lookup("vertx.io"); + future.toCompletionStage().whenComplete((ip, err) -> { + if (err != null) { + System.err.println("Could not resolve vertx.io"); + err.printStackTrace(); + } else { + System.out.println("vertx.io => " + ip); + } + }); + } + + private Future storeInDb(String key, String value) { + return Future.succeededFuture("Yo"); + } + + public void fromCS(Vertx vertx, CompletionStage completionStage) { + Future.fromCompletionStage(completionStage, vertx.getOrCreateContext()) + .flatMap(str -> { + String key = UUID.randomUUID().toString(); + return storeInDb(key, str); + }) + .onSuccess(str -> { + System.out.println("We have a result: " + str); + }) + .onFailure(err -> { + System.err.println("We have a problem"); + err.printStackTrace(); + }); + } +} diff --git a/src/main/java/io/vertx/core/Future.java b/src/main/java/io/vertx/core/Future.java index 68454e69b..6be7a8fbd 100644 --- a/src/main/java/io/vertx/core/Future.java +++ b/src/main/java/io/vertx/core/Future.java @@ -17,6 +17,8 @@ import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.impl.ContextInternal; import io.vertx.core.spi.FutureFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Function; /** @@ -479,6 +481,71 @@ public interface Future extends AsyncResult { return (Future) AsyncResult.super.otherwiseEmpty(); } + /** + * Bridges this Vert.x future to a {@link CompletionStage} instance. + *

+ * The {@link CompletionStage} handling methods will be called from the thread that resolves this future. + * + * @return a {@link CompletionStage} that completes when this future resolves + */ + @GenIgnore(GenIgnore.PERMITTED_TYPE) + default CompletionStage toCompletionStage() { + CompletableFuture completableFuture = new CompletableFuture<>(); + this.setHandler(ar -> { + if (ar.succeeded()) { + completableFuture.complete(ar.result()); + } else { + completableFuture.completeExceptionally(ar.cause()); + } + }); + return completableFuture; + } + + /** + * Bridges a {@link CompletionStage} object to a Vert.x future instance. + *

+ * The Vert.x future handling methods will be called from the thread that completes {@code completionStage}. + * + * @param completionStage a completion stage + * @param the result type + * @return a Vert.x future that resolves when {@code completionStage} resolves + */ + @GenIgnore(GenIgnore.PERMITTED_TYPE) + static Future fromCompletionStage(CompletionStage completionStage) { + Promise promise = Promise.promise(); + completionStage.whenComplete((value, err) -> { + if (err != null) { + promise.fail(err); + } else { + promise.complete(value); + } + }); + return promise.future(); + } + + /** + * Bridges a {@link CompletionStage} object to a Vert.x future instance. + *

+ * The Vert.x future handling methods will be called on the provided {@code context}. + * + * @param completionStage a completion stage + * @param context a Vert.x context to dispatch to + * @param the result type + * @return a Vert.x future that resolves when {@code completionStage} resolves + */ + @GenIgnore(GenIgnore.PERMITTED_TYPE) + static Future fromCompletionStage(CompletionStage completionStage, Context context) { + Promise promise = ((ContextInternal) context).promise(); + completionStage.whenComplete((value, err) -> { + if (err != null) { + promise.fail(err); + } else { + promise.complete(value); + } + }); + return promise.future(); + } + @GenIgnore FutureFactory factory = ServiceHelper.loadFactory(FutureFactory.class); diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index 025e0a513..d10462c39 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -29,7 +29,7 @@ import java.util.concurrent.RejectedExecutionException; * @author Tim Fox */ abstract class ContextImpl extends AbstractContext { - + /** * Execute the {@code task} disabling the thread-local association for the duration * of the execution. {@link Vertx#currentContext()} will return {@code null}, diff --git a/src/test/java/io/vertx/core/FutureTest.java b/src/test/java/io/vertx/core/FutureTest.java index b7cbc7d7a..c97608827 100644 --- a/src/test/java/io/vertx/core/FutureTest.java +++ b/src/test/java/io/vertx/core/FutureTest.java @@ -21,9 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -1424,4 +1422,133 @@ public class FutureTest extends VertxTestBase { promise.fail(failure); await(); } + + @Test + public void testToCompletionStageTrampolining() { + waitFor(2); + Thread mainThread = Thread.currentThread(); + Future success = Future.succeededFuture("Yo"); + success.toCompletionStage() + .thenAccept(str -> { + assertEquals("Yo", str); + assertSame(mainThread, Thread.currentThread()); + complete(); + }); + Future failed = Future.failedFuture(new RuntimeException("Woops")); + failed.toCompletionStage() + .whenComplete((str, err) -> { + assertNull(str); + assertTrue(err instanceof RuntimeException); + assertEquals("Woops", err.getMessage()); + assertSame(mainThread, Thread.currentThread()); + complete(); + }); + await(); + } + + @Test + public void testToCompletionStageDelayedCompletion() { + waitFor(2); + Thread mainThread = Thread.currentThread(); + Promise willSucceed = Promise.promise(); + Promise willFail = Promise.promise(); + + willSucceed.future().toCompletionStage().whenComplete((str, err) -> { + assertEquals("Yo", str); + assertNull(err); + assertNotSame(mainThread, Thread.currentThread()); + complete(); + }); + + willFail.future().toCompletionStage().whenComplete((str, err) -> { + assertNull(str); + assertTrue(err instanceof RuntimeException); + assertEquals("Woops", err.getMessage()); + assertNotSame(mainThread, Thread.currentThread()); + complete(); + }); + + disableThreadChecks(); + new Thread(() -> willSucceed.complete("Yo")).start(); + new Thread(() -> willFail.fail(new RuntimeException("Woops"))).start(); + await(); + } + + @Test + public void testFromCompletionStageTrampolining() { + waitFor(2); + disableThreadChecks(); + + AtomicReference successSupplierThread = new AtomicReference<>(); + CompletableFuture willSucceed = new CompletableFuture<>(); + + AtomicReference failureSupplierThread = new AtomicReference<>(); + CompletableFuture willFail = new CompletableFuture<>(); + + Future.fromCompletionStage(willSucceed).onSuccess(str -> { + assertEquals("Ok", str); + assertSame(successSupplierThread.get(), Thread.currentThread()); + complete(); + }); + + Future.fromCompletionStage(willFail).onFailure(err -> { + assertTrue(err instanceof RuntimeException); + assertEquals("Woops", err.getMessage()); + assertSame(failureSupplierThread.get(), Thread.currentThread()); + complete(); + }); + + ForkJoinPool fjp = ForkJoinPool.commonPool(); + fjp.execute(() -> { + successSupplierThread.set(Thread.currentThread()); + willSucceed.complete("Ok"); + }); + fjp.execute(() -> { + failureSupplierThread.set(Thread.currentThread()); + willFail.completeExceptionally(new RuntimeException("Woops")); + }); + + await(); + } + + @Test + public void testFromCompletionStageWithContext() { + waitFor(2); + Context context = vertx.getOrCreateContext(); + + AtomicReference successSupplierThread = new AtomicReference<>(); + CompletableFuture willSucceed = new CompletableFuture<>(); + + AtomicReference failureSupplierThread = new AtomicReference<>(); + CompletableFuture willFail = new CompletableFuture<>(); + + Future.fromCompletionStage(willSucceed, context).onSuccess(str -> { + assertEquals("Ok", str); + assertNotSame(successSupplierThread.get(), Thread.currentThread()); + assertEquals(context, vertx.getOrCreateContext()); + assertTrue(Thread.currentThread().getName().startsWith("vert.x-eventloop-thread")); + complete(); + }); + + Future.fromCompletionStage(willFail, context).onFailure(err -> { + assertTrue(err instanceof RuntimeException); + assertEquals("Woops", err.getMessage()); + assertNotSame(failureSupplierThread.get(), Thread.currentThread()); + assertEquals(context, vertx.getOrCreateContext()); + assertTrue(Thread.currentThread().getName().startsWith("vert.x-eventloop-thread")); + complete(); + }); + + ForkJoinPool fjp = ForkJoinPool.commonPool(); + fjp.execute(() -> { + successSupplierThread.set(Thread.currentThread()); + willSucceed.complete("Ok"); + }); + fjp.execute(() -> { + failureSupplierThread.set(Thread.currentThread()); + willFail.completeExceptionally(new RuntimeException("Woops")); + }); + + await(); + } }