From 48561de730ae4d4b0d2b6d7f2e6f007417b38081 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 12 Dec 2019 10:49:59 +0100 Subject: [PATCH 01/13] CompletionStage interop methods Signed-off-by: Julien Ponge --- src/main/java/io/vertx/core/Future.java | 114 ++++++++++++------ .../java/io/vertx/core/impl/ContextImpl.java | 6 +- 2 files changed, 82 insertions(+), 38 deletions(-) diff --git a/src/main/java/io/vertx/core/Future.java b/src/main/java/io/vertx/core/Future.java index 68454e69b..1dc89c8a6 100644 --- a/src/main/java/io/vertx/core/Future.java +++ b/src/main/java/io/vertx/core/Future.java @@ -17,6 +17,8 @@ import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.impl.ContextInternal; import io.vertx.core.spi.FutureFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Function; /** @@ -32,14 +34,14 @@ public interface Future extends AsyncResult { * Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned. * * @param handler the handler - * @param the result type + * @param the result type * @return the future. */ static Future future(Handler> handler) { Promise promise = Promise.promise(); try { handler.handle(promise); - } catch (Throwable e){ + } catch (Throwable e) { promise.tryFail(e); } return promise.future(); @@ -48,8 +50,8 @@ public interface Future extends AsyncResult { /** * Create a succeeded future with a null result * - * @param the result type - * @return the future + * @param the result type + * @return the future */ static Future succeededFuture() { return factory.succeededFuture(); @@ -58,9 +60,9 @@ public interface Future extends AsyncResult { /** * Created a succeeded future with the specified result. * - * @param result the result - * @param the result type - * @return the future + * @param result the result + * @param the result type + * @return the future */ static Future succeededFuture(T result) { if (result == null) { @@ -73,9 +75,9 @@ public interface Future extends AsyncResult { /** * Create a failed future with the specified failure cause. * - * @param t the failure cause as a Throwable - * @param the result type - * @return the future + * @param t the failure cause as a Throwable + * @param the result type + * @return the future */ static Future failedFuture(Throwable t) { return factory.failedFuture(t); @@ -84,9 +86,9 @@ public interface Future extends AsyncResult { /** * Create a failed future with the specified failure message. * - * @param failureMessage the failure message - * @param the result type - * @return the future + * @param failureMessage the failure message + * @param the result type + * @return the future */ static Future failedFuture(String failureMessage) { return factory.failureFuture(failureMessage); @@ -112,6 +114,7 @@ public interface Future extends AsyncResult { /** * Add a handler to be notified of the result. *
+ * * @param handler the handler that will be called with the result * @return a reference to this, so it can be used fluently */ @@ -121,6 +124,7 @@ public interface Future extends AsyncResult { /** * Add a handler to be notified of the succeeded result. *
+ * * @param handler the handler that will be called with the succeeded result * @return a reference to this, so it can be used fluently */ @@ -136,6 +140,7 @@ public interface Future extends AsyncResult { /** * Add a handler to be notified of the failed result. *
+ * * @param handler the handler that will be called with the failed result * @return a reference to this, so it can be used fluently */ @@ -194,13 +199,13 @@ public interface Future extends AsyncResult { /** * Compose this future with a {@code mapper} function.

- * + *

* When this future (the one on which {@code compose} is called) succeeds, the {@code mapper} will be called with * the completed value and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.

- * + *

* If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- * + *

* When this future fails, the failure will be propagated to the returned future and the {@code mapper} * will not be called. * @@ -220,15 +225,15 @@ public interface Future extends AsyncResult { /** * Compose this future with a {@code successMapper} and {@code failureMapper} functions.

- * + *

* When this future (the one on which {@code compose} is called) succeeds, the {@code successMapper} will be called with * the completed value and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.

- * + *

* When this future (the one on which {@code compose} is called) fails, the {@code failureMapper} will be called with * the failure and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.

- * + *

* If any mapper function throws an exception, the returned future will be failed with this exception.

* * @param successMapper the function mapping the success @@ -275,12 +280,12 @@ public interface Future extends AsyncResult { /** * Apply a {@code mapper} function on this future.

- * + *

* When this future succeeds, the {@code mapper} will be called with the completed value and this mapper * returns a value. This value will complete the future returned by this method call.

- * + *

* If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- * + *

* When this future fails, the failure will be propagated to the returned future and the {@code mapper} * will not be called. * @@ -317,9 +322,9 @@ public interface Future extends AsyncResult { /** * Map the result of a future to a specific {@code value}.

- * + *

* When this future succeeds, this {@code value} will complete the future returned by this method call.

- * + *

* When this future fails, the failure will be propagated to the returned future. * * @param value the value that eventually completes the mapped future @@ -345,11 +350,11 @@ public interface Future extends AsyncResult { /** * Map the result of a future to {@code null}.

- * + *

* This is a conveniency for {@code future.map((T) null)} or {@code future.map((Void) null)}.

- * + *

* When this future succeeds, {@code null} will complete the future returned by this method call.

- * + *

* When this future fails, the failure will be propagated to the returned future. * * @return the mapped future @@ -396,12 +401,12 @@ public interface Future extends AsyncResult { /** * Apply a {@code mapper} function on this future.

- * + *

* When this future fails, the {@code mapper} will be called with the completed value and this mapper * returns a value. This value will complete the future returned by this method call.

- * + *

* If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- * + *

* When this future succeeds, the result will be propagated to the returned future and the {@code mapper} * will not be called. * @@ -438,9 +443,9 @@ public interface Future extends AsyncResult { /** * Map the failure of a future to a specific {@code value}.

- * + *

* When this future fails, this {@code value} will complete the future returned by this method call.

- * + *

* When this future succeeds, the result will be propagated to the returned future. * * @param value the value that eventually completes the mapped future @@ -466,11 +471,11 @@ public interface Future extends AsyncResult { /** * Map the failure of a future to {@code null}.

- * + *

* This is a convenience for {@code future.otherwise((T) null)}.

- * + *

* When this future fails, the {@code null} value will complete the future returned by this method call.

- * + *

* When this future succeeds, the result will be propagated to the returned future. * * @return the mapped future @@ -479,6 +484,45 @@ public interface Future extends AsyncResult { return (Future) AsyncResult.super.otherwiseEmpty(); } + @GenIgnore(GenIgnore.PERMITTED_TYPE) + default CompletionStage toCompletionStage() { + CompletableFuture completableFuture = new CompletableFuture<>(); + this.setHandler(ar -> { + if (ar.succeeded()) { + completableFuture.complete(ar.result()); + } else { + completableFuture.completeExceptionally(ar.cause()); + } + }); + return completableFuture; + } + + @GenIgnore(GenIgnore.PERMITTED_TYPE) + static Future from(CompletionStage completionStage) { + Promise promise = Promise.promise(); + completionStage.whenComplete((value, err) -> { + if (err != null) { + promise.fail(err); + } else { + promise.complete(value); + } + }); + return promise.future(); + } + + @GenIgnore(GenIgnore.PERMITTED_TYPE) + static Future from(CompletionStage completionStage, Context context) { + Promise promise = ((ContextInternal) context).promise(); + completionStage.whenComplete((value, err) -> { + if (err != null) { + promise.fail(err); + } else { + promise.complete(value); + } + }); + return promise.future(); + } + @GenIgnore FutureFactory factory = ServiceHelper.loadFactory(FutureFactory.class); diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index 7e0be2457..e2ea25764 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -13,7 +13,6 @@ package io.vertx.core.impl; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import io.vertx.codegen.annotations.Nullable; import io.vertx.core.*; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; @@ -30,10 +29,11 @@ import java.util.concurrent.RejectedExecutionException; * @author Tim Fox */ abstract class ContextImpl extends AbstractContext { - + /** * Execute the {@code task} disabling the thread-local association for the duration * of the execution. {@link Vertx#currentContext()} will return {@code null}, + * * @param task the task to execute * @throws IllegalStateException if the current thread is not a Vertx thread */ @@ -154,7 +154,7 @@ abstract class ContextImpl extends AbstractContext { } static Future executeBlocking(ContextInternal context, Handler> blockingCodeHandler, - WorkerPool workerPool, TaskQueue queue) { + WorkerPool workerPool, TaskQueue queue) { PoolMetrics metrics = workerPool.metrics(); Object queueMetric = metrics != null ? metrics.submitted() : null; Promise promise = context.promise(); From 81f35814da87938c4681399214a3365b4ba73d19 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 12 Dec 2019 11:18:34 +0100 Subject: [PATCH 02/13] CompletionStage trampolining test Signed-off-by: Julien Ponge --- src/test/java/io/vertx/core/FutureTest.java | 183 +++++++++++++++----- 1 file changed, 142 insertions(+), 41 deletions(-) diff --git a/src/test/java/io/vertx/core/FutureTest.java b/src/test/java/io/vertx/core/FutureTest.java index b7cbc7d7a..79bd817c6 100644 --- a/src/test/java/io/vertx/core/FutureTest.java +++ b/src/test/java/io/vertx/core/FutureTest.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -31,6 +32,8 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import static java.util.concurrent.TimeUnit.SECONDS; + /** * @author Julien Viet */ @@ -197,7 +200,7 @@ public class FutureTest extends VertxTestBase { @Test public void testCreateFailedWithNullFailure() { - Future future = Future.failedFuture((Throwable)null); + Future future = Future.failedFuture((Throwable) null); Checker checker = new Checker<>(future); NoStackTraceThrowable failure = (NoStackTraceThrowable) checker.assertFailed(); assertNull(failure.getMessage()); @@ -206,7 +209,7 @@ public class FutureTest extends VertxTestBase { @Test public void testFailureFutureWithNullFailure() { Promise promise = Promise.promise(); - promise.fail((Throwable)null); + promise.fail((Throwable) null); Checker checker = new Checker<>(promise.future()); NoStackTraceThrowable failure = (NoStackTraceThrowable) checker.assertFailed(); assertNull(failure.getMessage()); @@ -239,7 +242,7 @@ public class FutureTest extends VertxTestBase { p2.complete(3); checker.assertSucceeded(composite); assertEquals("something", composite.resultAt(0)); - assertEquals(3, (int)composite.resultAt(1)); + assertEquals(3, (int) composite.resultAt(1)); } @Test @@ -283,22 +286,22 @@ public class FutureTest extends VertxTestBase { private void testAllLargeList(int size) { List list = new ArrayList<>(); - for (int i = 0;i < size;i++) { + for (int i = 0; i < size; i++) { list.add(Future.succeededFuture()); } CompositeFuture composite = CompositeFuture.all(list); Checker checker = new Checker<>(composite); checker.assertSucceeded(composite); - for (int i = 0;i < size;i++) { + for (int i = 0; i < size; i++) { list.clear(); Throwable cause = new Exception(); - for (int j = 0;j < size;j++) { + for (int j = 0; j < size; j++) { list.add(i == j ? Future.failedFuture(cause) : Future.succeededFuture()); } composite = CompositeFuture.all(list); checker = new Checker<>(composite); checker.assertFailed(cause); - for (int j = 0;j < size;j++) { + for (int j = 0; j < size; j++) { if (i == j) { assertTrue(composite.failed(j)); } else { @@ -397,21 +400,21 @@ public class FutureTest extends VertxTestBase { private void testAnyLargeList(int size) { List list = new ArrayList<>(); - for (int i = 0;i < size;i++) { + for (int i = 0; i < size; i++) { list.add(Future.failedFuture(new Exception())); } CompositeFuture composite = CompositeFuture.any(list); Checker checker = new Checker<>(composite); assertNotNull(checker.assertFailed()); - for (int i = 0;i < size;i++) { + for (int i = 0; i < size; i++) { list.clear(); - for (int j = 0;j < size;j++) { + for (int j = 0; j < size; j++) { list.add(i == j ? Future.succeededFuture() : Future.failedFuture(new RuntimeException())); } composite = CompositeFuture.any(list); checker = new Checker<>(composite); checker.assertSucceeded(composite); - for (int j = 0;j < size;j++) { + for (int j = 0; j < size; j++) { if (i == j) { assertTrue(composite.succeeded(j)); } else { @@ -552,7 +555,7 @@ public class FutureTest extends VertxTestBase { ref.set(string); return c; }); - Checker checker = new Checker<>(f4); + Checker checker = new Checker<>(f4); p3.complete("abcdef"); checker.assertNotCompleted(); assertEquals("abcdef", ref.get()); @@ -595,7 +598,9 @@ public class FutureTest extends VertxTestBase { RuntimeException cause = new RuntimeException(); Promise p3 = Promise.promise(); Future f3 = p3.future(); - Future f4 = f3.compose(string -> { throw cause; }); + Future f4 = f3.compose(string -> { + throw cause; + }); Checker checker = new Checker<>(f4); p3.complete("foo"); checker.assertFailed(cause); @@ -786,22 +791,58 @@ public class FutureTest extends VertxTestBase { public void testDefaultCompleter() { AsyncResult succeededAsyncResult = new AsyncResult() { Object result = new Object(); - public Object result() { return result; } - public Throwable cause() { throw new UnsupportedOperationException(); } - public boolean succeeded() { return true; } - public boolean failed() { throw new UnsupportedOperationException(); } - public AsyncResult map(Function mapper) { throw new UnsupportedOperationException(); } - public AsyncResult map(V value) { throw new UnsupportedOperationException(); } + + public Object result() { + return result; + } + + public Throwable cause() { + throw new UnsupportedOperationException(); + } + + public boolean succeeded() { + return true; + } + + public boolean failed() { + throw new UnsupportedOperationException(); + } + + public AsyncResult map(Function mapper) { + throw new UnsupportedOperationException(); + } + + public AsyncResult map(V value) { + throw new UnsupportedOperationException(); + } }; AsyncResult failedAsyncResult = new AsyncResult() { Throwable cause = new Throwable(); - public Object result() { throw new UnsupportedOperationException(); } - public Throwable cause() { return cause; } - public boolean succeeded() { return false; } - public boolean failed() { throw new UnsupportedOperationException(); } - public AsyncResult map(Function mapper) { throw new UnsupportedOperationException(); } - public AsyncResult map(V value) { throw new UnsupportedOperationException(); } + + public Object result() { + throw new UnsupportedOperationException(); + } + + public Throwable cause() { + return cause; + } + + public boolean succeeded() { + return false; + } + + public boolean failed() { + throw new UnsupportedOperationException(); + } + + public AsyncResult map(Function mapper) { + throw new UnsupportedOperationException(); + } + + public AsyncResult map(V value) { + throw new UnsupportedOperationException(); + } }; class DefaultCompleterTestFuture implements Future { @@ -809,30 +850,43 @@ public class FutureTest extends VertxTestBase { boolean failed; T result; Throwable cause; - public boolean isComplete() { throw new UnsupportedOperationException(); } - public Future onComplete(Handler> handler) { throw new UnsupportedOperationException(); } - public Handler> getHandler() { throw new UnsupportedOperationException(); } + + public boolean isComplete() { + throw new UnsupportedOperationException(); + } + + public Future onComplete(Handler> handler) { + throw new UnsupportedOperationException(); + } + + public Handler> getHandler() { + throw new UnsupportedOperationException(); + } public void complete(T result) { if (!tryComplete(result)) { throw new IllegalStateException(); } } + public void complete() { if (!tryComplete()) { throw new IllegalStateException(); } } + public void fail(Throwable cause) { if (!tryFail(cause)) { throw new IllegalStateException(); } } + public void fail(String failureMessage) { if (!tryFail(failureMessage)) { throw new IllegalStateException(); } } + public boolean tryComplete(T result) { if (succeeded || failed) { return false; @@ -841,7 +895,11 @@ public class FutureTest extends VertxTestBase { this.result = result; return true; } - public boolean tryComplete() { throw new UnsupportedOperationException(); } + + public boolean tryComplete() { + throw new UnsupportedOperationException(); + } + public boolean tryFail(Throwable cause) { if (succeeded || failed) { return false; @@ -850,11 +908,27 @@ public class FutureTest extends VertxTestBase { this.cause = cause; return true; } - public boolean tryFail(String failureMessage) { throw new UnsupportedOperationException(); } - public T result() { throw new UnsupportedOperationException(); } - public Throwable cause() { throw new UnsupportedOperationException(); } - public boolean succeeded() { throw new UnsupportedOperationException(); } - public boolean failed() { throw new UnsupportedOperationException(); } + + public boolean tryFail(String failureMessage) { + throw new UnsupportedOperationException(); + } + + public T result() { + throw new UnsupportedOperationException(); + } + + public Throwable cause() { + throw new UnsupportedOperationException(); + } + + public boolean succeeded() { + throw new UnsupportedOperationException(); + } + + public boolean failed() { + throw new UnsupportedOperationException(); + } + public void handle(AsyncResult asyncResult) { if (asyncResult.succeeded()) { complete(asyncResult.result()); @@ -954,9 +1028,9 @@ public class FutureTest extends VertxTestBase { AsyncResult map1 = res.map(String::length); AsyncResult map2 = res.map(17); p.complete("foobar"); - assertEquals(6, (int)map1.result()); + assertEquals(6, (int) map1.result()); assertNull(map1.cause()); - assertEquals(17, (int)map2.result()); + assertEquals(17, (int) map2.result()); assertNull(map2.cause()); } @@ -1216,17 +1290,21 @@ public class FutureTest extends VertxTestBase { Future f = promise.future(); Field handlerField = f.getClass().getDeclaredField("handler"); handlerField.setAccessible(true); - f.setHandler(ar -> {}); + f.setHandler(ar -> { + }); promise.complete(); assertNull(handlerField.get(f)); - f.setHandler(ar -> {}); + f.setHandler(ar -> { + }); assertNull(handlerField.get(f)); promise = Promise.promise(); f = promise.future(); - f.setHandler(ar -> {}); + f.setHandler(ar -> { + }); promise.fail("abc"); assertNull(handlerField.get(f)); - f.setHandler(ar -> {}); + f.setHandler(ar -> { + }); assertNull(handlerField.get(f)); } @@ -1255,7 +1333,7 @@ public class FutureTest extends VertxTestBase { ctx.runOnContext(v -> { latch.complete(Thread.currentThread()); }); - Thread elThread = latch.get(10, TimeUnit.SECONDS); + Thread elThread = latch.get(10, SECONDS); // CountDownLatch latch1 = new CountDownLatch(1); @@ -1424,4 +1502,27 @@ public class FutureTest extends VertxTestBase { promise.fail(failure); await(); } + + @Test + public void testToCompletionStageTrampolining() { + waitFor(2); + Thread thread = Thread.currentThread(); + Future success = Future.succeededFuture("Yo"); + success.toCompletionStage() + .thenAccept(s -> { + assertEquals("Yo", s); + assertSame(thread, Thread.currentThread()); + complete(); + }); + Future failed = Future.failedFuture(new RuntimeException("Woops")); + failed.toCompletionStage() + .whenComplete((s, err) -> { + assertNull(s); + assertTrue(err instanceof RuntimeException); + assertEquals("Woops", err.getMessage()); + assertSame(thread, Thread.currentThread()); + complete(); + }); + await(5, SECONDS); + } } From 9cdd0598002481795b610c157345feaa761e0ea7 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 12 Dec 2019 11:27:35 +0100 Subject: [PATCH 03/13] CompletionStage delayed completion test Signed-off-by: Julien Ponge --- src/test/java/io/vertx/core/FutureTest.java | 42 +++++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/src/test/java/io/vertx/core/FutureTest.java b/src/test/java/io/vertx/core/FutureTest.java index 79bd817c6..ab02430ee 100644 --- a/src/test/java/io/vertx/core/FutureTest.java +++ b/src/test/java/io/vertx/core/FutureTest.java @@ -1506,23 +1506,51 @@ public class FutureTest extends VertxTestBase { @Test public void testToCompletionStageTrampolining() { waitFor(2); - Thread thread = Thread.currentThread(); + Thread mainThread = Thread.currentThread(); Future success = Future.succeededFuture("Yo"); success.toCompletionStage() - .thenAccept(s -> { - assertEquals("Yo", s); - assertSame(thread, Thread.currentThread()); + .thenAccept(str -> { + assertEquals("Yo", str); + assertSame(mainThread, Thread.currentThread()); complete(); }); Future failed = Future.failedFuture(new RuntimeException("Woops")); failed.toCompletionStage() - .whenComplete((s, err) -> { - assertNull(s); + .whenComplete((str, err) -> { + assertNull(str); assertTrue(err instanceof RuntimeException); assertEquals("Woops", err.getMessage()); - assertSame(thread, Thread.currentThread()); + assertSame(mainThread, Thread.currentThread()); complete(); }); await(5, SECONDS); } + + @Test + public void testToCompletionStageDelayedCompletion() { + waitFor(2); + Thread mainThread = Thread.currentThread(); + Promise willSucceed = Promise.promise(); + Promise willFail = Promise.promise(); + + willSucceed.future().toCompletionStage().whenComplete((str, err) -> { + assertEquals("Yo", str); + assertNull(err); + assertNotSame(mainThread, Thread.currentThread()); + complete(); + }); + + willFail.future().toCompletionStage().whenComplete((str, err) -> { + assertNull(str); + assertTrue(err instanceof RuntimeException); + assertEquals("Woops", err.getMessage()); + assertNotSame(mainThread, Thread.currentThread()); + complete(); + }); + + disableThreadChecks(); + new Thread(() -> willSucceed.complete("Yo")).start(); + new Thread(() -> willFail.fail(new RuntimeException("Woops"))).start(); + await(5, SECONDS); + } } From 7ed8f95af9139d713c3d12cce2008c59f4389efa Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 12 Dec 2019 15:39:41 +0100 Subject: [PATCH 04/13] Future from CompletionStage trampolining Signed-off-by: Julien Ponge --- src/test/java/io/vertx/core/FutureTest.java | 42 +++++++++++++++++++-- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/vertx/core/FutureTest.java b/src/test/java/io/vertx/core/FutureTest.java index ab02430ee..c914869fd 100644 --- a/src/test/java/io/vertx/core/FutureTest.java +++ b/src/test/java/io/vertx/core/FutureTest.java @@ -21,10 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -1553,4 +1550,41 @@ public class FutureTest extends VertxTestBase { new Thread(() -> willFail.fail(new RuntimeException("Woops"))).start(); await(5, SECONDS); } + + @Test + public void testFromCompletionStageTrampolining() { + waitFor(2); + disableThreadChecks(); + + AtomicReference successSupplierThread = new AtomicReference<>(); + CompletableFuture willSucceed = new CompletableFuture<>(); + + AtomicReference failureSupplierThread = new AtomicReference<>(); + CompletableFuture willFail = new CompletableFuture<>(); + + Future.from(willSucceed).onSuccess(str -> { + assertEquals("Ok", str); + assertSame(successSupplierThread.get(), Thread.currentThread()); + complete(); + }); + + Future.from(willFail).onFailure(err -> { + assertTrue(err instanceof RuntimeException); + assertEquals("Woops", err.getMessage()); + assertSame(failureSupplierThread.get(), Thread.currentThread()); + complete(); + }); + + ForkJoinPool fjp = ForkJoinPool.commonPool(); + fjp.execute(() -> { + successSupplierThread.set(Thread.currentThread()); + willSucceed.complete("Ok"); + }); + fjp.execute(() -> { + failureSupplierThread.set(Thread.currentThread()); + willFail.completeExceptionally(new RuntimeException("Woops")); + }); + + await(5, SECONDS); + } } From 0711b2b45e79b1c86c3eb11e6a60af5bca93be55 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 12 Dec 2019 15:47:06 +0100 Subject: [PATCH 05/13] Future from CompletionStage dispatch to Vert.x context Signed-off-by: Julien Ponge --- src/test/java/io/vertx/core/FutureTest.java | 41 +++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/test/java/io/vertx/core/FutureTest.java b/src/test/java/io/vertx/core/FutureTest.java index c914869fd..999fd48bb 100644 --- a/src/test/java/io/vertx/core/FutureTest.java +++ b/src/test/java/io/vertx/core/FutureTest.java @@ -1587,4 +1587,45 @@ public class FutureTest extends VertxTestBase { await(5, SECONDS); } + + @Test + public void testFromCompletionStageWithContext() { + waitFor(2); + Context context = vertx.getOrCreateContext(); + + AtomicReference successSupplierThread = new AtomicReference<>(); + CompletableFuture willSucceed = new CompletableFuture<>(); + + AtomicReference failureSupplierThread = new AtomicReference<>(); + CompletableFuture willFail = new CompletableFuture<>(); + + Future.from(willSucceed, context).onSuccess(str -> { + assertEquals("Ok", str); + assertNotSame(successSupplierThread.get(), Thread.currentThread()); + assertEquals(context, vertx.getOrCreateContext()); + assertTrue(Thread.currentThread().getName().startsWith("vert.x-eventloop-thread")); + complete(); + }); + + Future.from(willFail, context).onFailure(err -> { + assertTrue(err instanceof RuntimeException); + assertEquals("Woops", err.getMessage()); + assertNotSame(failureSupplierThread.get(), Thread.currentThread()); + assertEquals(context, vertx.getOrCreateContext()); + assertTrue(Thread.currentThread().getName().startsWith("vert.x-eventloop-thread")); + complete(); + }); + + ForkJoinPool fjp = ForkJoinPool.commonPool(); + fjp.execute(() -> { + successSupplierThread.set(Thread.currentThread()); + willSucceed.complete("Ok"); + }); + fjp.execute(() -> { + failureSupplierThread.set(Thread.currentThread()); + willFail.completeExceptionally(new RuntimeException("Woops")); + }); + + await(5, SECONDS); + } } From 260fed69b10c882349465da9df32ed2f21e5c94c Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 12 Dec 2019 16:44:03 +0100 Subject: [PATCH 06/13] Javadocs Signed-off-by: Julien Ponge --- src/main/java/io/vertx/core/Future.java | 26 +++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/main/java/io/vertx/core/Future.java b/src/main/java/io/vertx/core/Future.java index 1dc89c8a6..e72ae86bd 100644 --- a/src/main/java/io/vertx/core/Future.java +++ b/src/main/java/io/vertx/core/Future.java @@ -484,6 +484,13 @@ public interface Future extends AsyncResult { return (Future) AsyncResult.super.otherwiseEmpty(); } + /** + * Bridges this Vert.x future to a {@link CompletionStage} instance. + *

+ * The {@link CompletionStage} handling methods will be called from the thread that resolves this future. + * + * @return a {@link CompletionStage} that completes when this future resolves + */ @GenIgnore(GenIgnore.PERMITTED_TYPE) default CompletionStage toCompletionStage() { CompletableFuture completableFuture = new CompletableFuture<>(); @@ -497,6 +504,15 @@ public interface Future extends AsyncResult { return completableFuture; } + /** + * Bridges a {@link CompletionStage} object to a Vert.x future instance. + *

+ * The Vert.x future handling methods will be called from the thread that completes {@code completionStage}. + * + * @param completionStage a completion stage + * @param the result type + * @return a Vert.x future that resolves when {@code completionStage} resolves + */ @GenIgnore(GenIgnore.PERMITTED_TYPE) static Future from(CompletionStage completionStage) { Promise promise = Promise.promise(); @@ -510,6 +526,16 @@ public interface Future extends AsyncResult { return promise.future(); } + /** + * Bridges a {@link CompletionStage} object to a Vert.x future instance. + *

+ * The Vert.x future handling methods will be called from a thread attached to {@code context}. + * + * @param completionStage a completion stage + * @param context a Vert.x context to dispatch to + * @param the result type + * @return a Vert.x future that resolves when {@code completionStage} resolves + */ @GenIgnore(GenIgnore.PERMITTED_TYPE) static Future from(CompletionStage completionStage, Context context) { Promise promise = ((ContextInternal) context).promise(); From a698e86cfa6990f359c80dfd4b3f854eaa9c3e9e Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 12 Dec 2019 17:17:45 +0100 Subject: [PATCH 07/13] Reformating Signed-off-by: Julien Ponge --- src/main/java/io/vertx/core/Future.java | 73 ++++---- .../java/io/vertx/core/impl/ContextImpl.java | 4 +- src/test/java/io/vertx/core/FutureTest.java | 157 +++++------------- 3 files changed, 78 insertions(+), 156 deletions(-) diff --git a/src/main/java/io/vertx/core/Future.java b/src/main/java/io/vertx/core/Future.java index e72ae86bd..4b6f89067 100644 --- a/src/main/java/io/vertx/core/Future.java +++ b/src/main/java/io/vertx/core/Future.java @@ -34,14 +34,14 @@ public interface Future extends AsyncResult { * Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned. * * @param handler the handler - * @param the result type + * @param the result type * @return the future. */ static Future future(Handler> handler) { Promise promise = Promise.promise(); try { handler.handle(promise); - } catch (Throwable e) { + } catch (Throwable e){ promise.tryFail(e); } return promise.future(); @@ -50,8 +50,8 @@ public interface Future extends AsyncResult { /** * Create a succeeded future with a null result * - * @param the result type - * @return the future + * @param the result type + * @return the future */ static Future succeededFuture() { return factory.succeededFuture(); @@ -60,9 +60,9 @@ public interface Future extends AsyncResult { /** * Created a succeeded future with the specified result. * - * @param result the result - * @param the result type - * @return the future + * @param result the result + * @param the result type + * @return the future */ static Future succeededFuture(T result) { if (result == null) { @@ -75,9 +75,9 @@ public interface Future extends AsyncResult { /** * Create a failed future with the specified failure cause. * - * @param t the failure cause as a Throwable - * @param the result type - * @return the future + * @param t the failure cause as a Throwable + * @param the result type + * @return the future */ static Future failedFuture(Throwable t) { return factory.failedFuture(t); @@ -86,9 +86,9 @@ public interface Future extends AsyncResult { /** * Create a failed future with the specified failure message. * - * @param failureMessage the failure message - * @param the result type - * @return the future + * @param failureMessage the failure message + * @param the result type + * @return the future */ static Future failedFuture(String failureMessage) { return factory.failureFuture(failureMessage); @@ -114,7 +114,6 @@ public interface Future extends AsyncResult { /** * Add a handler to be notified of the result. *
- * * @param handler the handler that will be called with the result * @return a reference to this, so it can be used fluently */ @@ -124,7 +123,6 @@ public interface Future extends AsyncResult { /** * Add a handler to be notified of the succeeded result. *
- * * @param handler the handler that will be called with the succeeded result * @return a reference to this, so it can be used fluently */ @@ -140,7 +138,6 @@ public interface Future extends AsyncResult { /** * Add a handler to be notified of the failed result. *
- * * @param handler the handler that will be called with the failed result * @return a reference to this, so it can be used fluently */ @@ -199,13 +196,13 @@ public interface Future extends AsyncResult { /** * Compose this future with a {@code mapper} function.

- *

+ * * When this future (the one on which {@code compose} is called) succeeds, the {@code mapper} will be called with * the completed value and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.

- *

+ * * If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- *

+ * * When this future fails, the failure will be propagated to the returned future and the {@code mapper} * will not be called. * @@ -225,15 +222,15 @@ public interface Future extends AsyncResult { /** * Compose this future with a {@code successMapper} and {@code failureMapper} functions.

- *

+ * * When this future (the one on which {@code compose} is called) succeeds, the {@code successMapper} will be called with * the completed value and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.

- *

+ * * When this future (the one on which {@code compose} is called) fails, the {@code failureMapper} will be called with * the failure and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.

- *

+ * * If any mapper function throws an exception, the returned future will be failed with this exception.

* * @param successMapper the function mapping the success @@ -280,12 +277,12 @@ public interface Future extends AsyncResult { /** * Apply a {@code mapper} function on this future.

- *

+ * * When this future succeeds, the {@code mapper} will be called with the completed value and this mapper * returns a value. This value will complete the future returned by this method call.

- *

+ * * If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- *

+ * * When this future fails, the failure will be propagated to the returned future and the {@code mapper} * will not be called. * @@ -322,9 +319,9 @@ public interface Future extends AsyncResult { /** * Map the result of a future to a specific {@code value}.

- *

+ * * When this future succeeds, this {@code value} will complete the future returned by this method call.

- *

+ * * When this future fails, the failure will be propagated to the returned future. * * @param value the value that eventually completes the mapped future @@ -350,11 +347,11 @@ public interface Future extends AsyncResult { /** * Map the result of a future to {@code null}.

- *

+ * * This is a conveniency for {@code future.map((T) null)} or {@code future.map((Void) null)}.

- *

+ * * When this future succeeds, {@code null} will complete the future returned by this method call.

- *

+ * * When this future fails, the failure will be propagated to the returned future. * * @return the mapped future @@ -401,12 +398,12 @@ public interface Future extends AsyncResult { /** * Apply a {@code mapper} function on this future.

- *

+ * * When this future fails, the {@code mapper} will be called with the completed value and this mapper * returns a value. This value will complete the future returned by this method call.

- *

+ * * If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- *

+ * * When this future succeeds, the result will be propagated to the returned future and the {@code mapper} * will not be called. * @@ -443,9 +440,9 @@ public interface Future extends AsyncResult { /** * Map the failure of a future to a specific {@code value}.

- *

+ * * When this future fails, this {@code value} will complete the future returned by this method call.

- *

+ * * When this future succeeds, the result will be propagated to the returned future. * * @param value the value that eventually completes the mapped future @@ -471,11 +468,11 @@ public interface Future extends AsyncResult { /** * Map the failure of a future to {@code null}.

- *

+ * * This is a convenience for {@code future.otherwise((T) null)}.

- *

+ * * When this future fails, the {@code null} value will complete the future returned by this method call.

- *

+ * * When this future succeeds, the result will be propagated to the returned future. * * @return the mapped future diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index e2ea25764..3cc82cc44 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -13,6 +13,7 @@ package io.vertx.core.impl; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.vertx.codegen.annotations.Nullable; import io.vertx.core.*; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; @@ -33,7 +34,6 @@ abstract class ContextImpl extends AbstractContext { /** * Execute the {@code task} disabling the thread-local association for the duration * of the execution. {@link Vertx#currentContext()} will return {@code null}, - * * @param task the task to execute * @throws IllegalStateException if the current thread is not a Vertx thread */ @@ -154,7 +154,7 @@ abstract class ContextImpl extends AbstractContext { } static Future executeBlocking(ContextInternal context, Handler> blockingCodeHandler, - WorkerPool workerPool, TaskQueue queue) { + WorkerPool workerPool, TaskQueue queue) { PoolMetrics metrics = workerPool.metrics(); Object queueMetric = metrics != null ? metrics.submitted() : null; Promise promise = context.promise(); diff --git a/src/test/java/io/vertx/core/FutureTest.java b/src/test/java/io/vertx/core/FutureTest.java index 999fd48bb..6e41bc63e 100644 --- a/src/test/java/io/vertx/core/FutureTest.java +++ b/src/test/java/io/vertx/core/FutureTest.java @@ -197,7 +197,7 @@ public class FutureTest extends VertxTestBase { @Test public void testCreateFailedWithNullFailure() { - Future future = Future.failedFuture((Throwable) null); + Future future = Future.failedFuture((Throwable)null); Checker checker = new Checker<>(future); NoStackTraceThrowable failure = (NoStackTraceThrowable) checker.assertFailed(); assertNull(failure.getMessage()); @@ -206,7 +206,7 @@ public class FutureTest extends VertxTestBase { @Test public void testFailureFutureWithNullFailure() { Promise promise = Promise.promise(); - promise.fail((Throwable) null); + promise.fail((Throwable)null); Checker checker = new Checker<>(promise.future()); NoStackTraceThrowable failure = (NoStackTraceThrowable) checker.assertFailed(); assertNull(failure.getMessage()); @@ -239,7 +239,7 @@ public class FutureTest extends VertxTestBase { p2.complete(3); checker.assertSucceeded(composite); assertEquals("something", composite.resultAt(0)); - assertEquals(3, (int) composite.resultAt(1)); + assertEquals(3, (int)composite.resultAt(1)); } @Test @@ -283,22 +283,22 @@ public class FutureTest extends VertxTestBase { private void testAllLargeList(int size) { List list = new ArrayList<>(); - for (int i = 0; i < size; i++) { + for (int i = 0;i < size;i++) { list.add(Future.succeededFuture()); } CompositeFuture composite = CompositeFuture.all(list); Checker checker = new Checker<>(composite); checker.assertSucceeded(composite); - for (int i = 0; i < size; i++) { + for (int i = 0;i < size;i++) { list.clear(); Throwable cause = new Exception(); - for (int j = 0; j < size; j++) { + for (int j = 0;j < size;j++) { list.add(i == j ? Future.failedFuture(cause) : Future.succeededFuture()); } composite = CompositeFuture.all(list); checker = new Checker<>(composite); checker.assertFailed(cause); - for (int j = 0; j < size; j++) { + for (int j = 0;j < size;j++) { if (i == j) { assertTrue(composite.failed(j)); } else { @@ -397,21 +397,21 @@ public class FutureTest extends VertxTestBase { private void testAnyLargeList(int size) { List list = new ArrayList<>(); - for (int i = 0; i < size; i++) { + for (int i = 0;i < size;i++) { list.add(Future.failedFuture(new Exception())); } CompositeFuture composite = CompositeFuture.any(list); Checker checker = new Checker<>(composite); assertNotNull(checker.assertFailed()); - for (int i = 0; i < size; i++) { + for (int i = 0;i < size;i++) { list.clear(); - for (int j = 0; j < size; j++) { + for (int j = 0;j < size;j++) { list.add(i == j ? Future.succeededFuture() : Future.failedFuture(new RuntimeException())); } composite = CompositeFuture.any(list); checker = new Checker<>(composite); checker.assertSucceeded(composite); - for (int j = 0; j < size; j++) { + for (int j = 0;j < size;j++) { if (i == j) { assertTrue(composite.succeeded(j)); } else { @@ -552,7 +552,7 @@ public class FutureTest extends VertxTestBase { ref.set(string); return c; }); - Checker checker = new Checker<>(f4); + Checker checker = new Checker<>(f4); p3.complete("abcdef"); checker.assertNotCompleted(); assertEquals("abcdef", ref.get()); @@ -595,9 +595,7 @@ public class FutureTest extends VertxTestBase { RuntimeException cause = new RuntimeException(); Promise p3 = Promise.promise(); Future f3 = p3.future(); - Future f4 = f3.compose(string -> { - throw cause; - }); + Future f4 = f3.compose(string -> { throw cause; }); Checker checker = new Checker<>(f4); p3.complete("foo"); checker.assertFailed(cause); @@ -788,58 +786,22 @@ public class FutureTest extends VertxTestBase { public void testDefaultCompleter() { AsyncResult succeededAsyncResult = new AsyncResult() { Object result = new Object(); - - public Object result() { - return result; - } - - public Throwable cause() { - throw new UnsupportedOperationException(); - } - - public boolean succeeded() { - return true; - } - - public boolean failed() { - throw new UnsupportedOperationException(); - } - - public AsyncResult map(Function mapper) { - throw new UnsupportedOperationException(); - } - - public AsyncResult map(V value) { - throw new UnsupportedOperationException(); - } + public Object result() { return result; } + public Throwable cause() { throw new UnsupportedOperationException(); } + public boolean succeeded() { return true; } + public boolean failed() { throw new UnsupportedOperationException(); } + public AsyncResult map(Function mapper) { throw new UnsupportedOperationException(); } + public AsyncResult map(V value) { throw new UnsupportedOperationException(); } }; AsyncResult failedAsyncResult = new AsyncResult() { Throwable cause = new Throwable(); - - public Object result() { - throw new UnsupportedOperationException(); - } - - public Throwable cause() { - return cause; - } - - public boolean succeeded() { - return false; - } - - public boolean failed() { - throw new UnsupportedOperationException(); - } - - public AsyncResult map(Function mapper) { - throw new UnsupportedOperationException(); - } - - public AsyncResult map(V value) { - throw new UnsupportedOperationException(); - } + public Object result() { throw new UnsupportedOperationException(); } + public Throwable cause() { return cause; } + public boolean succeeded() { return false; } + public boolean failed() { throw new UnsupportedOperationException(); } + public AsyncResult map(Function mapper) { throw new UnsupportedOperationException(); } + public AsyncResult map(V value) { throw new UnsupportedOperationException(); } }; class DefaultCompleterTestFuture implements Future { @@ -847,43 +809,30 @@ public class FutureTest extends VertxTestBase { boolean failed; T result; Throwable cause; - - public boolean isComplete() { - throw new UnsupportedOperationException(); - } - - public Future onComplete(Handler> handler) { - throw new UnsupportedOperationException(); - } - - public Handler> getHandler() { - throw new UnsupportedOperationException(); - } + public boolean isComplete() { throw new UnsupportedOperationException(); } + public Future onComplete(Handler> handler) { throw new UnsupportedOperationException(); } + public Handler> getHandler() { throw new UnsupportedOperationException(); } public void complete(T result) { if (!tryComplete(result)) { throw new IllegalStateException(); } } - public void complete() { if (!tryComplete()) { throw new IllegalStateException(); } } - public void fail(Throwable cause) { if (!tryFail(cause)) { throw new IllegalStateException(); } } - public void fail(String failureMessage) { if (!tryFail(failureMessage)) { throw new IllegalStateException(); } } - public boolean tryComplete(T result) { if (succeeded || failed) { return false; @@ -892,11 +841,7 @@ public class FutureTest extends VertxTestBase { this.result = result; return true; } - - public boolean tryComplete() { - throw new UnsupportedOperationException(); - } - + public boolean tryComplete() { throw new UnsupportedOperationException(); } public boolean tryFail(Throwable cause) { if (succeeded || failed) { return false; @@ -905,27 +850,11 @@ public class FutureTest extends VertxTestBase { this.cause = cause; return true; } - - public boolean tryFail(String failureMessage) { - throw new UnsupportedOperationException(); - } - - public T result() { - throw new UnsupportedOperationException(); - } - - public Throwable cause() { - throw new UnsupportedOperationException(); - } - - public boolean succeeded() { - throw new UnsupportedOperationException(); - } - - public boolean failed() { - throw new UnsupportedOperationException(); - } - + public boolean tryFail(String failureMessage) { throw new UnsupportedOperationException(); } + public T result() { throw new UnsupportedOperationException(); } + public Throwable cause() { throw new UnsupportedOperationException(); } + public boolean succeeded() { throw new UnsupportedOperationException(); } + public boolean failed() { throw new UnsupportedOperationException(); } public void handle(AsyncResult asyncResult) { if (asyncResult.succeeded()) { complete(asyncResult.result()); @@ -1025,9 +954,9 @@ public class FutureTest extends VertxTestBase { AsyncResult map1 = res.map(String::length); AsyncResult map2 = res.map(17); p.complete("foobar"); - assertEquals(6, (int) map1.result()); + assertEquals(6, (int)map1.result()); assertNull(map1.cause()); - assertEquals(17, (int) map2.result()); + assertEquals(17, (int)map2.result()); assertNull(map2.cause()); } @@ -1287,21 +1216,17 @@ public class FutureTest extends VertxTestBase { Future f = promise.future(); Field handlerField = f.getClass().getDeclaredField("handler"); handlerField.setAccessible(true); - f.setHandler(ar -> { - }); + f.setHandler(ar -> {}); promise.complete(); assertNull(handlerField.get(f)); - f.setHandler(ar -> { - }); + f.setHandler(ar -> {}); assertNull(handlerField.get(f)); promise = Promise.promise(); f = promise.future(); - f.setHandler(ar -> { - }); + f.setHandler(ar -> {}); promise.fail("abc"); assertNull(handlerField.get(f)); - f.setHandler(ar -> { - }); + f.setHandler(ar -> {}); assertNull(handlerField.get(f)); } @@ -1330,7 +1255,7 @@ public class FutureTest extends VertxTestBase { ctx.runOnContext(v -> { latch.complete(Thread.currentThread()); }); - Thread elThread = latch.get(10, SECONDS); + Thread elThread = latch.get(10, TimeUnit.SECONDS); // CountDownLatch latch1 = new CountDownLatch(1); From 3be4d274e5f1752d65eca27937e61f055d0a41d9 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 12 Dec 2019 17:19:25 +0100 Subject: [PATCH 08/13] Update src/main/java/io/vertx/core/Future.java Co-Authored-By: Thomas Segismont --- src/main/java/io/vertx/core/Future.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/vertx/core/Future.java b/src/main/java/io/vertx/core/Future.java index 4b6f89067..88c57de93 100644 --- a/src/main/java/io/vertx/core/Future.java +++ b/src/main/java/io/vertx/core/Future.java @@ -526,7 +526,7 @@ public interface Future extends AsyncResult { /** * Bridges a {@link CompletionStage} object to a Vert.x future instance. *

- * The Vert.x future handling methods will be called from a thread attached to {@code context}. + * The Vert.x future handling methods will be called on the provided {@code context}. * * @param completionStage a completion stage * @param context a Vert.x context to dispatch to From edcdf3b05b5be02d8ba5aa3d2523b720d86590f3 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 16 Dec 2019 12:33:24 +0100 Subject: [PATCH 09/13] Documentation + examples of the CS interop Signed-off-by: Julien Ponge --- src/main/asciidoc/index.adoc | 25 ++++++++++ .../CompletionStageInteropExamples.java | 48 +++++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 src/main/java/examples/CompletionStageInteropExamples.java diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index decd3988c..fb6faaf5b 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -361,6 +361,31 @@ run the given function, that returns a future. When this returned future complet In this second case, the {@link io.vertx.core.Handler} should complete the `next` future to report its success or failure. +=== CompletionStage interoperability + +The Vert.x `Future` API offers compatibility _from_ and _to_ `CompletionStage` which is the JDK interface for composable +asynchronous operations. + +We can go from a Vert.x `Future` to a `CompletionStage` using the {@link io.vertx.core.Future#toCompletionStage} method, as in: + +[source,$lang] +---- +{@link examples.CompletionStageInteropExamples#toCS} +---- + +We can conversely go from a `CompletionStage` to Vert.x `Future` using {@link io.vertx.core.Future#from}. +There are 2 variants: + +. the first variant takes just a `CompletionStage` and calls the `Future` methods from the thread that resolves the `CompletionStage` instance, and +. the second variant takes an extra {@link io.vertx.core.Context} parameter to call the `Future` methods on a Vert.x context. + +Here is an example of going from a `CompletionStage` to a Vert.x `Future` and dispatching on a context: + +[source,$lang] +---- +{@link examples.CompletionStageInteropExamples#fromCS} +---- + == Verticles Vert.x comes with a simple, scalable, _actor-like_ deployment and concurrency model out of the box that diff --git a/src/main/java/examples/CompletionStageInteropExamples.java b/src/main/java/examples/CompletionStageInteropExamples.java new file mode 100644 index 000000000..836f8f116 --- /dev/null +++ b/src/main/java/examples/CompletionStageInteropExamples.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package examples; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; + +import java.util.concurrent.CompletionStage; + +/** + * Examples of the Future / CompletionStage interoperability. + * + * @author Julien Ponge + */ +public class CompletionStageInteropExamples { + + public void toCS(Vertx vertx) { + Future future = vertx.createDnsClient().lookup("vertx.io"); + future.toCompletionStage().whenComplete((ip, err) -> { + if (err != null) { + System.err.println("Could not resolve vertx.io"); + err.printStackTrace(); + } else { + System.out.println("vertx.io => " + ip); + } + }); + } + + public void fromCS(Vertx vertx, CompletionStage completionStage) { + Future.from(completionStage, vertx.getOrCreateContext()) + .onSuccess(str -> { + System.out.println("We have a result: " + str); + }) + .onFailure(err -> { + System.err.println("We have a problem"); + err.printStackTrace(); + }); + } +} From 5d61c83a3ba61045c2343e7209b18f3414602fdd Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 16 Dec 2019 18:24:12 +0100 Subject: [PATCH 10/13] Flag which "from" method to prefer Signed-off-by: Julien Ponge --- src/main/asciidoc/index.adoc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index fb6faaf5b..d66497f24 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -379,6 +379,9 @@ There are 2 variants: . the first variant takes just a `CompletionStage` and calls the `Future` methods from the thread that resolves the `CompletionStage` instance, and . the second variant takes an extra {@link io.vertx.core.Context} parameter to call the `Future` methods on a Vert.x context. +IMPORTANT: In most cases the variant with a `CompletionStage` and a `Context` is the one you will want to use to respect the Vert.x threading model, +since Vert.x `Future` are more likely to be used with Vert.x code, libraries and clients. + Here is an example of going from a `CompletionStage` to a Vert.x `Future` and dispatching on a context: [source,$lang] From b03a57e7fff7160270b2ac44084708e34ded4eb4 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 16 Dec 2019 18:24:31 +0100 Subject: [PATCH 11/13] Improved "from" example with a flatMap Signed-off-by: Julien Ponge --- .../java/examples/CompletionStageInteropExamples.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main/java/examples/CompletionStageInteropExamples.java b/src/main/java/examples/CompletionStageInteropExamples.java index 836f8f116..c22a0699b 100644 --- a/src/main/java/examples/CompletionStageInteropExamples.java +++ b/src/main/java/examples/CompletionStageInteropExamples.java @@ -14,6 +14,7 @@ package examples; import io.vertx.core.Future; import io.vertx.core.Vertx; +import java.util.UUID; import java.util.concurrent.CompletionStage; /** @@ -35,8 +36,16 @@ public class CompletionStageInteropExamples { }); } + private Future storeInDb(String key, String value) { + return Future.succeededFuture("Yo"); + } + public void fromCS(Vertx vertx, CompletionStage completionStage) { Future.from(completionStage, vertx.getOrCreateContext()) + .flatMap(str -> { + String key = UUID.randomUUID().toString(); + return storeInDb(key, str); + }) .onSuccess(str -> { System.out.println("We have a result: " + str); }) From f9547d2e0711482015fd4d38f0daf85f6daa5670 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 7 Jan 2020 10:29:08 +0100 Subject: [PATCH 12/13] Just use await with default timeouts Signed-off-by: Julien Ponge --- src/test/java/io/vertx/core/FutureTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/vertx/core/FutureTest.java b/src/test/java/io/vertx/core/FutureTest.java index 6e41bc63e..e6021351d 100644 --- a/src/test/java/io/vertx/core/FutureTest.java +++ b/src/test/java/io/vertx/core/FutureTest.java @@ -1445,7 +1445,7 @@ public class FutureTest extends VertxTestBase { assertSame(mainThread, Thread.currentThread()); complete(); }); - await(5, SECONDS); + await(); } @Test @@ -1473,7 +1473,7 @@ public class FutureTest extends VertxTestBase { disableThreadChecks(); new Thread(() -> willSucceed.complete("Yo")).start(); new Thread(() -> willFail.fail(new RuntimeException("Woops"))).start(); - await(5, SECONDS); + await(); } @Test @@ -1510,7 +1510,7 @@ public class FutureTest extends VertxTestBase { willFail.completeExceptionally(new RuntimeException("Woops")); }); - await(5, SECONDS); + await(); } @Test @@ -1551,6 +1551,6 @@ public class FutureTest extends VertxTestBase { willFail.completeExceptionally(new RuntimeException("Woops")); }); - await(5, SECONDS); + await(); } } From 9c6fcc41c454b56b19b58b9a9e11cab845fbfa4a Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 7 Jan 2020 10:33:50 +0100 Subject: [PATCH 13/13] Rename from to fromCompletionStage Signed-off-by: Julien Ponge --- src/main/asciidoc/index.adoc | 2 +- .../java/examples/CompletionStageInteropExamples.java | 2 +- src/main/java/io/vertx/core/Future.java | 4 ++-- src/test/java/io/vertx/core/FutureTest.java | 10 ++++------ 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index d66497f24..d99a09ce9 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -373,7 +373,7 @@ We can go from a Vert.x `Future` to a `CompletionStage` using the {@link io.vert {@link examples.CompletionStageInteropExamples#toCS} ---- -We can conversely go from a `CompletionStage` to Vert.x `Future` using {@link io.vertx.core.Future#from}. +We can conversely go from a `CompletionStage` to Vert.x `Future` using {@link io.vertx.core.Future#fromCompletionStage}. There are 2 variants: . the first variant takes just a `CompletionStage` and calls the `Future` methods from the thread that resolves the `CompletionStage` instance, and diff --git a/src/main/java/examples/CompletionStageInteropExamples.java b/src/main/java/examples/CompletionStageInteropExamples.java index c22a0699b..47e483d23 100644 --- a/src/main/java/examples/CompletionStageInteropExamples.java +++ b/src/main/java/examples/CompletionStageInteropExamples.java @@ -41,7 +41,7 @@ public class CompletionStageInteropExamples { } public void fromCS(Vertx vertx, CompletionStage completionStage) { - Future.from(completionStage, vertx.getOrCreateContext()) + Future.fromCompletionStage(completionStage, vertx.getOrCreateContext()) .flatMap(str -> { String key = UUID.randomUUID().toString(); return storeInDb(key, str); diff --git a/src/main/java/io/vertx/core/Future.java b/src/main/java/io/vertx/core/Future.java index 88c57de93..6be7a8fbd 100644 --- a/src/main/java/io/vertx/core/Future.java +++ b/src/main/java/io/vertx/core/Future.java @@ -511,7 +511,7 @@ public interface Future extends AsyncResult { * @return a Vert.x future that resolves when {@code completionStage} resolves */ @GenIgnore(GenIgnore.PERMITTED_TYPE) - static Future from(CompletionStage completionStage) { + static Future fromCompletionStage(CompletionStage completionStage) { Promise promise = Promise.promise(); completionStage.whenComplete((value, err) -> { if (err != null) { @@ -534,7 +534,7 @@ public interface Future extends AsyncResult { * @return a Vert.x future that resolves when {@code completionStage} resolves */ @GenIgnore(GenIgnore.PERMITTED_TYPE) - static Future from(CompletionStage completionStage, Context context) { + static Future fromCompletionStage(CompletionStage completionStage, Context context) { Promise promise = ((ContextInternal) context).promise(); completionStage.whenComplete((value, err) -> { if (err != null) { diff --git a/src/test/java/io/vertx/core/FutureTest.java b/src/test/java/io/vertx/core/FutureTest.java index e6021351d..c97608827 100644 --- a/src/test/java/io/vertx/core/FutureTest.java +++ b/src/test/java/io/vertx/core/FutureTest.java @@ -29,8 +29,6 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; -import static java.util.concurrent.TimeUnit.SECONDS; - /** * @author Julien Viet */ @@ -1487,13 +1485,13 @@ public class FutureTest extends VertxTestBase { AtomicReference failureSupplierThread = new AtomicReference<>(); CompletableFuture willFail = new CompletableFuture<>(); - Future.from(willSucceed).onSuccess(str -> { + Future.fromCompletionStage(willSucceed).onSuccess(str -> { assertEquals("Ok", str); assertSame(successSupplierThread.get(), Thread.currentThread()); complete(); }); - Future.from(willFail).onFailure(err -> { + Future.fromCompletionStage(willFail).onFailure(err -> { assertTrue(err instanceof RuntimeException); assertEquals("Woops", err.getMessage()); assertSame(failureSupplierThread.get(), Thread.currentThread()); @@ -1524,7 +1522,7 @@ public class FutureTest extends VertxTestBase { AtomicReference failureSupplierThread = new AtomicReference<>(); CompletableFuture willFail = new CompletableFuture<>(); - Future.from(willSucceed, context).onSuccess(str -> { + Future.fromCompletionStage(willSucceed, context).onSuccess(str -> { assertEquals("Ok", str); assertNotSame(successSupplierThread.get(), Thread.currentThread()); assertEquals(context, vertx.getOrCreateContext()); @@ -1532,7 +1530,7 @@ public class FutureTest extends VertxTestBase { complete(); }); - Future.from(willFail, context).onFailure(err -> { + Future.fromCompletionStage(willFail, context).onFailure(err -> { assertTrue(err instanceof RuntimeException); assertEquals("Woops", err.getMessage()); assertNotSame(failureSupplierThread.get(), Thread.currentThread());