From 48561de730ae4d4b0d2b6d7f2e6f007417b38081 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 12 Dec 2019 10:49:59 +0100 Subject: [PATCH] CompletionStage interop methods Signed-off-by: Julien Ponge --- src/main/java/io/vertx/core/Future.java | 114 ++++++++++++------ .../java/io/vertx/core/impl/ContextImpl.java | 6 +- 2 files changed, 82 insertions(+), 38 deletions(-) diff --git a/src/main/java/io/vertx/core/Future.java b/src/main/java/io/vertx/core/Future.java index 68454e69b..1dc89c8a6 100644 --- a/src/main/java/io/vertx/core/Future.java +++ b/src/main/java/io/vertx/core/Future.java @@ -17,6 +17,8 @@ import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.impl.ContextInternal; import io.vertx.core.spi.FutureFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Function; /** @@ -32,14 +34,14 @@ public interface Future extends AsyncResult { * Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned. * * @param handler the handler - * @param the result type + * @param the result type * @return the future. */ static Future future(Handler> handler) { Promise promise = Promise.promise(); try { handler.handle(promise); - } catch (Throwable e){ + } catch (Throwable e) { promise.tryFail(e); } return promise.future(); @@ -48,8 +50,8 @@ public interface Future extends AsyncResult { /** * Create a succeeded future with a null result * - * @param the result type - * @return the future + * @param the result type + * @return the future */ static Future succeededFuture() { return factory.succeededFuture(); @@ -58,9 +60,9 @@ public interface Future extends AsyncResult { /** * Created a succeeded future with the specified result. * - * @param result the result - * @param the result type - * @return the future + * @param result the result + * @param the result type + * @return the future */ static Future succeededFuture(T result) { if (result == null) { @@ -73,9 +75,9 @@ public interface Future extends AsyncResult { /** * Create a failed future with the specified failure cause. * - * @param t the failure cause as a Throwable - * @param the result type - * @return the future + * @param t the failure cause as a Throwable + * @param the result type + * @return the future */ static Future failedFuture(Throwable t) { return factory.failedFuture(t); @@ -84,9 +86,9 @@ public interface Future extends AsyncResult { /** * Create a failed future with the specified failure message. * - * @param failureMessage the failure message - * @param the result type - * @return the future + * @param failureMessage the failure message + * @param the result type + * @return the future */ static Future failedFuture(String failureMessage) { return factory.failureFuture(failureMessage); @@ -112,6 +114,7 @@ public interface Future extends AsyncResult { /** * Add a handler to be notified of the result. *
+ * * @param handler the handler that will be called with the result * @return a reference to this, so it can be used fluently */ @@ -121,6 +124,7 @@ public interface Future extends AsyncResult { /** * Add a handler to be notified of the succeeded result. *
+ * * @param handler the handler that will be called with the succeeded result * @return a reference to this, so it can be used fluently */ @@ -136,6 +140,7 @@ public interface Future extends AsyncResult { /** * Add a handler to be notified of the failed result. *
+ * * @param handler the handler that will be called with the failed result * @return a reference to this, so it can be used fluently */ @@ -194,13 +199,13 @@ public interface Future extends AsyncResult { /** * Compose this future with a {@code mapper} function.

- * + *

* When this future (the one on which {@code compose} is called) succeeds, the {@code mapper} will be called with * the completed value and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.

- * + *

* If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- * + *

* When this future fails, the failure will be propagated to the returned future and the {@code mapper} * will not be called. * @@ -220,15 +225,15 @@ public interface Future extends AsyncResult { /** * Compose this future with a {@code successMapper} and {@code failureMapper} functions.

- * + *

* When this future (the one on which {@code compose} is called) succeeds, the {@code successMapper} will be called with * the completed value and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.

- * + *

* When this future (the one on which {@code compose} is called) fails, the {@code failureMapper} will be called with * the failure and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.

- * + *

* If any mapper function throws an exception, the returned future will be failed with this exception.

* * @param successMapper the function mapping the success @@ -275,12 +280,12 @@ public interface Future extends AsyncResult { /** * Apply a {@code mapper} function on this future.

- * + *

* When this future succeeds, the {@code mapper} will be called with the completed value and this mapper * returns a value. This value will complete the future returned by this method call.

- * + *

* If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- * + *

* When this future fails, the failure will be propagated to the returned future and the {@code mapper} * will not be called. * @@ -317,9 +322,9 @@ public interface Future extends AsyncResult { /** * Map the result of a future to a specific {@code value}.

- * + *

* When this future succeeds, this {@code value} will complete the future returned by this method call.

- * + *

* When this future fails, the failure will be propagated to the returned future. * * @param value the value that eventually completes the mapped future @@ -345,11 +350,11 @@ public interface Future extends AsyncResult { /** * Map the result of a future to {@code null}.

- * + *

* This is a conveniency for {@code future.map((T) null)} or {@code future.map((Void) null)}.

- * + *

* When this future succeeds, {@code null} will complete the future returned by this method call.

- * + *

* When this future fails, the failure will be propagated to the returned future. * * @return the mapped future @@ -396,12 +401,12 @@ public interface Future extends AsyncResult { /** * Apply a {@code mapper} function on this future.

- * + *

* When this future fails, the {@code mapper} will be called with the completed value and this mapper * returns a value. This value will complete the future returned by this method call.

- * + *

* If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- * + *

* When this future succeeds, the result will be propagated to the returned future and the {@code mapper} * will not be called. * @@ -438,9 +443,9 @@ public interface Future extends AsyncResult { /** * Map the failure of a future to a specific {@code value}.

- * + *

* When this future fails, this {@code value} will complete the future returned by this method call.

- * + *

* When this future succeeds, the result will be propagated to the returned future. * * @param value the value that eventually completes the mapped future @@ -466,11 +471,11 @@ public interface Future extends AsyncResult { /** * Map the failure of a future to {@code null}.

- * + *

* This is a convenience for {@code future.otherwise((T) null)}.

- * + *

* When this future fails, the {@code null} value will complete the future returned by this method call.

- * + *

* When this future succeeds, the result will be propagated to the returned future. * * @return the mapped future @@ -479,6 +484,45 @@ public interface Future extends AsyncResult { return (Future) AsyncResult.super.otherwiseEmpty(); } + @GenIgnore(GenIgnore.PERMITTED_TYPE) + default CompletionStage toCompletionStage() { + CompletableFuture completableFuture = new CompletableFuture<>(); + this.setHandler(ar -> { + if (ar.succeeded()) { + completableFuture.complete(ar.result()); + } else { + completableFuture.completeExceptionally(ar.cause()); + } + }); + return completableFuture; + } + + @GenIgnore(GenIgnore.PERMITTED_TYPE) + static Future from(CompletionStage completionStage) { + Promise promise = Promise.promise(); + completionStage.whenComplete((value, err) -> { + if (err != null) { + promise.fail(err); + } else { + promise.complete(value); + } + }); + return promise.future(); + } + + @GenIgnore(GenIgnore.PERMITTED_TYPE) + static Future from(CompletionStage completionStage, Context context) { + Promise promise = ((ContextInternal) context).promise(); + completionStage.whenComplete((value, err) -> { + if (err != null) { + promise.fail(err); + } else { + promise.complete(value); + } + }); + return promise.future(); + } + @GenIgnore FutureFactory factory = ServiceHelper.loadFactory(FutureFactory.class); diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index 7e0be2457..e2ea25764 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -13,7 +13,6 @@ package io.vertx.core.impl; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import io.vertx.codegen.annotations.Nullable; import io.vertx.core.*; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; @@ -30,10 +29,11 @@ import java.util.concurrent.RejectedExecutionException; * @author Tim Fox */ abstract class ContextImpl extends AbstractContext { - + /** * Execute the {@code task} disabling the thread-local association for the duration * of the execution. {@link Vertx#currentContext()} will return {@code null}, + * * @param task the task to execute * @throws IllegalStateException if the current thread is not a Vertx thread */ @@ -154,7 +154,7 @@ abstract class ContextImpl extends AbstractContext { } static Future executeBlocking(ContextInternal context, Handler> blockingCodeHandler, - WorkerPool workerPool, TaskQueue queue) { + WorkerPool workerPool, TaskQueue queue) { PoolMetrics metrics = workerPool.metrics(); Object queueMetric = metrics != null ? metrics.submitted() : null; Promise promise = context.promise();