CompletionStage interop methods

Signed-off-by: Julien Ponge <julien.ponge@gmail.com>
This commit is contained in:
Julien Ponge
2019-12-12 10:49:59 +01:00
parent 14193583ba
commit 48561de730
2 changed files with 82 additions and 38 deletions

View File

@@ -17,6 +17,8 @@ import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.FutureFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
/**
@@ -32,14 +34,14 @@ public interface Future<T> extends AsyncResult<T> {
* Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned.
*
* @param handler the handler
* @param <T> the result type
* @param <T> the result type
* @return the future.
*/
static <T> Future<T> future(Handler<Promise<T>> handler) {
Promise<T> promise = Promise.promise();
try {
handler.handle(promise);
} catch (Throwable e){
} catch (Throwable e) {
promise.tryFail(e);
}
return promise.future();
@@ -48,8 +50,8 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Create a succeeded future with a null result
*
* @param <T> the result type
* @return the future
* @param <T> the result type
* @return the future
*/
static <T> Future<T> succeededFuture() {
return factory.succeededFuture();
@@ -58,9 +60,9 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Created a succeeded future with the specified result.
*
* @param result the result
* @param <T> the result type
* @return the future
* @param result the result
* @param <T> the result type
* @return the future
*/
static <T> Future<T> succeededFuture(T result) {
if (result == null) {
@@ -73,9 +75,9 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Create a failed future with the specified failure cause.
*
* @param t the failure cause as a Throwable
* @param <T> the result type
* @return the future
* @param t the failure cause as a Throwable
* @param <T> the result type
* @return the future
*/
static <T> Future<T> failedFuture(Throwable t) {
return factory.failedFuture(t);
@@ -84,9 +86,9 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Create a failed future with the specified failure message.
*
* @param failureMessage the failure message
* @param <T> the result type
* @return the future
* @param failureMessage the failure message
* @param <T> the result type
* @return the future
*/
static <T> Future<T> failedFuture(String failureMessage) {
return factory.failureFuture(failureMessage);
@@ -112,6 +114,7 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Add a handler to be notified of the result.
* <br/>
*
* @param handler the handler that will be called with the result
* @return a reference to this, so it can be used fluently
*/
@@ -121,6 +124,7 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Add a handler to be notified of the succeeded result.
* <br/>
*
* @param handler the handler that will be called with the succeeded result
* @return a reference to this, so it can be used fluently
*/
@@ -136,6 +140,7 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Add a handler to be notified of the failed result.
* <br/>
*
* @param handler the handler that will be called with the failed result
* @return a reference to this, so it can be used fluently
*/
@@ -194,13 +199,13 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Compose this future with a {@code mapper} function.<p>
*
* <p>
* When this future (the one on which {@code compose} is called) succeeds, the {@code mapper} will be called with
* the completed value and this mapper returns another future object. This returned future completion will complete
* the future returned by this method call.<p>
*
* <p>
* If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
*
* <p>
* When this future fails, the failure will be propagated to the returned future and the {@code mapper}
* will not be called.
*
@@ -220,15 +225,15 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Compose this future with a {@code successMapper} and {@code failureMapper} functions.<p>
*
* <p>
* When this future (the one on which {@code compose} is called) succeeds, the {@code successMapper} will be called with
* the completed value and this mapper returns another future object. This returned future completion will complete
* the future returned by this method call.<p>
*
* <p>
* When this future (the one on which {@code compose} is called) fails, the {@code failureMapper} will be called with
* the failure and this mapper returns another future object. This returned future completion will complete
* the future returned by this method call.<p>
*
* <p>
* If any mapper function throws an exception, the returned future will be failed with this exception.<p>
*
* @param successMapper the function mapping the success
@@ -275,12 +280,12 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Apply a {@code mapper} function on this future.<p>
*
* <p>
* When this future succeeds, the {@code mapper} will be called with the completed value and this mapper
* returns a value. This value will complete the future returned by this method call.<p>
*
* <p>
* If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
*
* <p>
* When this future fails, the failure will be propagated to the returned future and the {@code mapper}
* will not be called.
*
@@ -317,9 +322,9 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Map the result of a future to a specific {@code value}.<p>
*
* <p>
* When this future succeeds, this {@code value} will complete the future returned by this method call.<p>
*
* <p>
* When this future fails, the failure will be propagated to the returned future.
*
* @param value the value that eventually completes the mapped future
@@ -345,11 +350,11 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Map the result of a future to {@code null}.<p>
*
* <p>
* This is a conveniency for {@code future.map((T) null)} or {@code future.map((Void) null)}.<p>
*
* <p>
* When this future succeeds, {@code null} will complete the future returned by this method call.<p>
*
* <p>
* When this future fails, the failure will be propagated to the returned future.
*
* @return the mapped future
@@ -396,12 +401,12 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Apply a {@code mapper} function on this future.<p>
*
* <p>
* When this future fails, the {@code mapper} will be called with the completed value and this mapper
* returns a value. This value will complete the future returned by this method call.<p>
*
* <p>
* If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
*
* <p>
* When this future succeeds, the result will be propagated to the returned future and the {@code mapper}
* will not be called.
*
@@ -438,9 +443,9 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Map the failure of a future to a specific {@code value}.<p>
*
* <p>
* When this future fails, this {@code value} will complete the future returned by this method call.<p>
*
* <p>
* When this future succeeds, the result will be propagated to the returned future.
*
* @param value the value that eventually completes the mapped future
@@ -466,11 +471,11 @@ public interface Future<T> extends AsyncResult<T> {
/**
* Map the failure of a future to {@code null}.<p>
*
* <p>
* This is a convenience for {@code future.otherwise((T) null)}.<p>
*
* <p>
* When this future fails, the {@code null} value will complete the future returned by this method call.<p>
*
* <p>
* When this future succeeds, the result will be propagated to the returned future.
*
* @return the mapped future
@@ -479,6 +484,45 @@ public interface Future<T> extends AsyncResult<T> {
return (Future<T>) AsyncResult.super.otherwiseEmpty();
}
@GenIgnore(GenIgnore.PERMITTED_TYPE)
default CompletionStage<T> toCompletionStage() {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
this.setHandler(ar -> {
if (ar.succeeded()) {
completableFuture.complete(ar.result());
} else {
completableFuture.completeExceptionally(ar.cause());
}
});
return completableFuture;
}
@GenIgnore(GenIgnore.PERMITTED_TYPE)
static <T> Future<T> from(CompletionStage<T> completionStage) {
Promise<T> promise = Promise.promise();
completionStage.whenComplete((value, err) -> {
if (err != null) {
promise.fail(err);
} else {
promise.complete(value);
}
});
return promise.future();
}
@GenIgnore(GenIgnore.PERMITTED_TYPE)
static <T> Future<T> from(CompletionStage<T> completionStage, Context context) {
Promise<T> promise = ((ContextInternal) context).promise();
completionStage.whenComplete((value, err) -> {
if (err != null) {
promise.fail(err);
} else {
promise.complete(value);
}
});
return promise.future();
}
@GenIgnore
FutureFactory factory = ServiceHelper.loadFactory(FutureFactory.class);

View File

@@ -13,7 +13,6 @@ package io.vertx.core.impl;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.*;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
@@ -30,10 +29,11 @@ import java.util.concurrent.RejectedExecutionException;
* @author <a href="http://tfox.org">Tim Fox</a>
*/
abstract class ContextImpl extends AbstractContext {
/**
* Execute the {@code task} disabling the thread-local association for the duration
* of the execution. {@link Vertx#currentContext()} will return {@code null},
*
* @param task the task to execute
* @throws IllegalStateException if the current thread is not a Vertx thread
*/
@@ -154,7 +154,7 @@ abstract class ContextImpl extends AbstractContext {
}
static <T> Future<T> executeBlocking(ContextInternal context, Handler<Promise<T>> blockingCodeHandler,
WorkerPool workerPool, TaskQueue queue) {
WorkerPool workerPool, TaskQueue queue) {
PoolMetrics metrics = workerPool.metrics();
Object queueMetric = metrics != null ? metrics.submitted() : null;
Promise<T> promise = context.promise();