diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index c13155f27..8565ca036 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -355,12 +355,8 @@ In this example, 3 operations are chained: When these 3 steps are successful, the final future (`startFuture`) is succeeded. However, if one of the steps fails, the final future is failed. -This example uses: - -* {@link io.vertx.core.Future#compose(java.util.function.Function)}: when the current future completes, +This example uses {@link io.vertx.core.Future#compose(java.util.function.Function)}: when the current future completes, run the given function, that returns a future. When this returned future completes, it completes the composition. -* {@link io.vertx.core.Future#compose(io.vertx.core.Handler,io.vertx.core.Future)}: when the current future -completes, run the given handler that completes the given `future` (next). In this second case, the {@link io.vertx.core.Handler} should complete the `next` future to report its success or failure. diff --git a/src/main/java/examples/CoreExamples.java b/src/main/java/examples/CoreExamples.java index 3a745d314..ea9299e2e 100644 --- a/src/main/java/examples/CoreExamples.java +++ b/src/main/java/examples/CoreExamples.java @@ -71,10 +71,10 @@ public class CoreExamples { } public void example7(Vertx vertx) { - vertx.executeBlocking(future -> { + vertx.executeBlocking(promise -> { // Call some blocking API that takes a significant amount of time to return String result = someAPI.blockingMethod("hello"); - future.complete(result); + promise.complete(result); }, res -> { System.out.println("The result is: " + res.result()); }); @@ -82,10 +82,10 @@ public class CoreExamples { public void workerExecutor1(Vertx vertx) { WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool"); - executor.executeBlocking(future -> { + executor.executeBlocking(promise -> { // Call some blocking API that takes a significant amount of time to return String result = someAPI.blockingMethod("hello"); - future.complete(result); + promise.complete(result); }, res -> { System.out.println("The result is: " + res.result()); }); @@ -116,11 +116,9 @@ public class CoreExamples { } public void exampleFutureAll1(HttpServer httpServer, NetServer netServer) { - Future httpServerFuture = Future.future(); - httpServer.listen(httpServerFuture); + Future httpServerFuture = Future.future(promise -> httpServer.listen(promise)); - Future netServerFuture = Future.future(); - netServer.listen(netServerFuture); + Future netServerFuture = Future.future(promise -> netServer.listen(promise)); CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> { if (ar.succeeded()) { @@ -166,22 +164,18 @@ public class CoreExamples { public void exampleFuture6(Vertx vertx) { FileSystem fs = vertx.fileSystem(); - Future startFuture = Future.future(); - Future fut1 = Future.future(); - fs.createFile("/foo", fut1); + Future fut1 = Future.future(promise -> fs.createFile("/foo", promise)); - fut1.compose(v -> { + Future startFuture = fut1 + .compose(v -> { // When the file is created (fut1), execute this: - Future fut2 = Future.future(); - fs.writeFile("/foo", Buffer.buffer(), fut2); - return fut2; - }).compose(v -> { - // When the file is written (fut2), execute this: - fs.move("/foo", "/bar", startFuture); - }, - // mark startFuture it as failed if any step fails. - startFuture); + return Future.future(promise -> fs.writeFile("/foo", Buffer.buffer(), promise)); + }) + .compose(v -> { + // When the file is written (fut2), execute this: + return Future.future(promise -> fs.move("/foo", "/bar", promise)); + }); } public void example7_1(Vertx vertx) { @@ -200,9 +194,9 @@ public class CoreExamples { public void multiThreadedWorkerVerticleAlternative2(Vertx vertx, String someresult) { vertx.eventBus().consumer("foo", msg -> { - vertx.executeBlocking(fut -> { + vertx.executeBlocking(promise -> { // Invoke blocking code with received message data - fut.complete(someresult); + promise.complete(someresult); }, false, ar -> { // ordered == false // Handle result, e.g. reply to the message }); diff --git a/src/main/java/examples/HTTPExamples.java b/src/main/java/examples/HTTPExamples.java index ed3f6617a..2372e5e57 100644 --- a/src/main/java/examples/HTTPExamples.java +++ b/src/main/java/examples/HTTPExamples.java @@ -15,6 +15,7 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.AsyncFile; @@ -739,16 +740,16 @@ public class HTTPExamples { public void exampleAsynchronousHandshake(HttpServer server) { server.websocketHandler(websocket -> { - Future fut = Future.future(); - websocket.setHandshake(fut); + Promise promise = Promise.promise(); + websocket.setHandshake(promise); authenticate(websocket, ar -> { if (ar.succeeded()) { // Terminate the handshake with the status code 101 (Switching Protocol) // Reject the handshake with 401 (Unauthorized) - fut.complete(ar.succeeded() ? 101 : 401); + promise.complete(ar.succeeded() ? 101 : 401); } else { // Will send a 500 error - fut.fail(ar.cause()); + promise.fail(ar.cause()); } }); }); diff --git a/src/main/java/io/vertx/core/AbstractVerticle.java b/src/main/java/io/vertx/core/AbstractVerticle.java index dbd58e98b..af67d0979 100644 --- a/src/main/java/io/vertx/core/AbstractVerticle.java +++ b/src/main/java/io/vertx/core/AbstractVerticle.java @@ -21,10 +21,10 @@ import java.util.List; *

* Instead of implementing {@link io.vertx.core.Verticle} directly, it is often simpler to just extend this class. *

- * In the simplest case, just override the {@link #start} method. If you have verticle clean-up to do you can - * optionally override the {@link #stop} method too. + * In the simplest case, just override the {@link #start(Promise)} method. If you have verticle clean-up to do you can + * optionally override the {@link #stop(Promise)} method too. *

If your verticle does extra start-up or clean-up that takes some time (e.g. it deploys other verticles) then - * you should override the asynchronous {@link #start(Future) start} and {@link #stop(Future) stop} methods. + * you should override the asynchronous {@link #start(Promise) start} and {@link #stop(Promise) stop} methods. *

* This class also maintains references to the {@link io.vertx.core.Vertx} and {@link io.vertx.core.Context} * instances of the verticle for easy access.

@@ -102,7 +102,7 @@ public abstract class AbstractVerticle implements Verticle { * @throws Exception */ @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { start(); startFuture.complete(); } @@ -116,7 +116,7 @@ public abstract class AbstractVerticle implements Verticle { * @throws Exception */ @Override - public void stop(Future stopFuture) throws Exception { + public void stop(Promise stopFuture) throws Exception { stop(); stopFuture.complete(); } diff --git a/src/main/java/io/vertx/core/CompositeFuture.java b/src/main/java/io/vertx/core/CompositeFuture.java index 990efb53a..ace5dbcca 100644 --- a/src/main/java/io/vertx/core/CompositeFuture.java +++ b/src/main/java/io/vertx/core/CompositeFuture.java @@ -181,20 +181,6 @@ public interface CompositeFuture extends Future { @Override CompositeFuture setHandler(Handler> handler); - /** - * Set this instance as result. Any handler will be called, if there is one, and the future will be marked as completed. - */ - @Override - void complete(); - - /** - * Try to set this instance as result. When it happens, any handler will be called, if there is one, and the future will be marked as completed. - * - * @return false when the future is already completed - */ - @Override - boolean tryComplete(); - /** * Returns a cause of a wrapped future * diff --git a/src/main/java/io/vertx/core/Context.java b/src/main/java/io/vertx/core/Context.java index 714779bde..2c6875b89 100644 --- a/src/main/java/io/vertx/core/Context.java +++ b/src/main/java/io/vertx/core/Context.java @@ -108,7 +108,7 @@ public interface Context { * (e.g. on the original event loop of the caller). *

* A {@code Future} instance is passed into {@code blockingCodeHandler}. When the blocking code successfully completes, - * the handler should call the {@link Future#complete} or {@link Future#complete(Object)} method, or the {@link Future#fail} + * the handler should call the {@link Promise#complete} or {@link Promise#complete(Object)} method, or the {@link Promise#fail} * method if it failed. *

* The blocking code should block for a reasonable amount of time (i.e no more than a few seconds). Long blocking operations @@ -127,7 +127,7 @@ public interface Context { * guarantees * @param the type of the result */ - void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler); + void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler); /** * Invoke {@link #executeBlocking(Handler, boolean, Handler)} with order = true. @@ -135,7 +135,7 @@ public interface Context { * @param resultHandler handler that will be called when the blocking code is complete * @param the type of the result */ - void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler); + void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler); /** * If the context is associated with a Verticle deployment, this returns the deployment ID of that deployment. diff --git a/src/main/java/io/vertx/core/Future.java b/src/main/java/io/vertx/core/Future.java index 70d8d8b43..edcac763b 100644 --- a/src/main/java/io/vertx/core/Future.java +++ b/src/main/java/io/vertx/core/Future.java @@ -11,7 +11,6 @@ package io.vertx.core; -import io.vertx.codegen.annotations.CacheReturn; import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.VertxGen; @@ -26,7 +25,7 @@ import java.util.function.Function; * @author Tim Fox */ @VertxGen -public interface Future extends AsyncResult, Handler> { +public interface Future extends AsyncResult { /** * Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned. @@ -35,20 +34,10 @@ public interface Future extends AsyncResult, Handler> { * @param the result type * @return the future. */ - static Future future(Handler> handler) { - Future fut = future(); - handler.handle(fut); - return fut; - } - - /** - * Create a future that hasn't completed yet - * - * @param the result type - * @return the future - */ - static Future future() { - return factory.future(); + static Future future(Handler> handler) { + Promise promise = Promise.promise(); + handler.handle(promise); + return promise.future(); } /** @@ -121,63 +110,6 @@ public interface Future extends AsyncResult, Handler> { */ Handler> getHandler(); - /** - * Set the result. Any handler will be called, if there is one, and the future will be marked as completed. - * - * @param result the result - */ - void complete(T result); - - /** - * Set a null result. Any handler will be called, if there is one, and the future will be marked as completed. - */ - void complete(); - - /** - * Set the failure. Any handler will be called, if there is one, and the future will be marked as completed. - * - * @param cause the failure cause - */ - void fail(Throwable cause); - - /** - * Try to set the failure. When it happens, any handler will be called, if there is one, and the future will be marked as completed. - * - * @param failureMessage the failure message - */ - void fail(String failureMessage); - - /** - * Set the failure. Any handler will be called, if there is one, and the future will be marked as completed. - * - * @param result the result - * @return false when the future is already completed - */ - boolean tryComplete(T result); - - /** - * Try to set the result. When it happens, any handler will be called, if there is one, and the future will be marked as completed. - * - * @return false when the future is already completed - */ - boolean tryComplete(); - - /** - * Try to set the failure. When it happens, any handler will be called, if there is one, and the future will be marked as completed. - * - * @param cause the failure cause - * @return false when the future is already completed - */ - boolean tryFail(Throwable cause); - - /** - * Try to set the failure. When it happens, any handler will be called, if there is one, and the future will be marked as completed. - * - * @param failureMessage the failure message - * @return false when the future is already completed - */ - boolean tryFail(String failureMessage); - /** * The result of the operation. This will be null if the operation failed. * @@ -210,39 +142,6 @@ public interface Future extends AsyncResult, Handler> { @Override boolean failed(); - /** - * Compose this future with a provided {@code next} future.

- * - * When this (the one on which {@code compose} is called) future succeeds, the {@code handler} will be called with - * the completed value, this handler should complete the next future.

- * - * If the {@code handler} throws an exception, the returned future will be failed with this exception.

- * - * When this future fails, the failure will be propagated to the {@code next} future and the {@code handler} - * will not be called. - * - * @param handler the handler - * @param next the next future - * @return the next future, used for chaining - */ - default Future compose(Handler handler, Future next) { - setHandler(ar -> { - if (ar.succeeded()) { - try { - handler.handle(ar.result()); - } catch (Throwable err) { - if (next.isComplete()) { - throw err; - } - next.fail(err); - } - } else { - next.fail(ar.cause()); - } - }); - return next; - } - /** * Compose this future with a {@code mapper} function.

* @@ -262,7 +161,7 @@ public interface Future extends AsyncResult, Handler> { if (mapper == null) { throw new NullPointerException(); } - Future ret = Future.future(); + Promise ret = Promise.promise(); setHandler(ar -> { if (ar.succeeded()) { Future apply; @@ -277,9 +176,8 @@ public interface Future extends AsyncResult, Handler> { ret.fail(ar.cause()); } }); - return ret; + return ret.future(); } - /** * Apply a {@code mapper} function on this future.

* @@ -298,7 +196,7 @@ public interface Future extends AsyncResult, Handler> { if (mapper == null) { throw new NullPointerException(); } - Future ret = Future.future(); + Promise ret = Promise.promise(); setHandler(ar -> { if (ar.succeeded()) { U mapped; @@ -313,7 +211,7 @@ public interface Future extends AsyncResult, Handler> { ret.fail(ar.cause()); } }); - return ret; + return ret.future(); } /** @@ -327,7 +225,7 @@ public interface Future extends AsyncResult, Handler> { * @return the mapped future */ default Future map(V value) { - Future ret = Future.future(); + Promise ret = Promise.promise(); setHandler(ar -> { if (ar.succeeded()) { ret.complete(value); @@ -335,7 +233,7 @@ public interface Future extends AsyncResult, Handler> { ret.fail(ar.cause()); } }); - return ret; + return ret.future(); } /** @@ -354,15 +252,6 @@ public interface Future extends AsyncResult, Handler> { return (Future) AsyncResult.super.mapEmpty(); } - /** - * Succeed or fail this future with the {@link AsyncResult} event. - * - * @param asyncResult the async result to handle - */ - @GenIgnore - @Override - void handle(AsyncResult asyncResult); - /** * Handles a failure of this Future by returning the result of another Future. * If the mapper fails, then the returned future will be failed with this failure. @@ -374,7 +263,7 @@ public interface Future extends AsyncResult, Handler> { if (mapper == null) { throw new NullPointerException(); } - Future ret = Future.future(); + Promise ret = Promise.promise(); setHandler(ar -> { if (ar.succeeded()) { ret.complete(result()); @@ -389,9 +278,8 @@ public interface Future extends AsyncResult, Handler> { mapped.setHandler(ret); } }); - return ret; + return ret.future(); } - /** * Apply a {@code mapper} function on this future.

* @@ -410,7 +298,7 @@ public interface Future extends AsyncResult, Handler> { if (mapper == null) { throw new NullPointerException(); } - Future ret = Future.future(); + Promise ret = Promise.promise(); setHandler(ar -> { if (ar.succeeded()) { ret.complete(result()); @@ -425,7 +313,7 @@ public interface Future extends AsyncResult, Handler> { ret.complete(value); } }); - return ret; + return ret.future(); } /** @@ -439,7 +327,7 @@ public interface Future extends AsyncResult, Handler> { * @return the mapped future */ default Future otherwise(T value) { - Future ret = Future.future(); + Promise ret = Promise.promise(); setHandler(ar -> { if (ar.succeeded()) { ret.complete(result()); @@ -447,7 +335,7 @@ public interface Future extends AsyncResult, Handler> { ret.complete(value); } }); - return ret; + return ret.future(); } /** diff --git a/src/main/java/io/vertx/core/Promise.java b/src/main/java/io/vertx/core/Promise.java new file mode 100644 index 000000000..9cab2735d --- /dev/null +++ b/src/main/java/io/vertx/core/Promise.java @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core; + +import io.vertx.codegen.annotations.CacheReturn; +import io.vertx.codegen.annotations.GenIgnore; +import io.vertx.codegen.annotations.VertxGen; + +import static io.vertx.core.Future.factory; + +/** + * Represents the writable side of an action that may, or may not, have occurred yet. + *

+ * The {@link #future()} method returns the {@link Future} associated with a promise, the future + * can be used for getting notified of the promise completion and retrieve its value. + *

+ * A promise extends {@code Handler>} so it can be used as a callback. + * + * @author Julien Viet + */ +@VertxGen +public interface Promise extends Handler> { + + /** + * Create a succeeded promise with a {@code null} result + * + * @param the result type + * @return the promise + */ + static Promise succeededPromise() { + return factory.succeededPromise(); + } + + /** + * Created a succeeded promise with the specified {@code result}. + * + * @param result the result + * @param the result type + * @return the promise + */ + static Promise succeededPromise(T result) { + return factory.succeededPromise(result); + } + + /** + * Create a failed promise with the specified failure {@code cause}. + * + * @param cause the failure cause as a Throwable + * @param the result type + * @return the promise + */ + static Promise failedPromise(Throwable cause) { + return factory.failedPromise(cause); + } + + /** + * Create a failed promise with the specified {@code failureMessage}. + * + * @param failureMessage the failure message + * @param the result type + * @return the promise + */ + static Promise failedPromise(String failureMessage) { + return factory.failurePromise(failureMessage); + } + + /** + * Create a promise that hasn't completed yet + * + * @param the result type + * @return the promise + */ + static Promise promise() { + return factory.promise(); + } + + /** + * Succeed or fail this promise with the {@link AsyncResult} event. + * + * @param asyncResult the async result to handle + */ + @GenIgnore + @Override + void handle(AsyncResult asyncResult); + + /** + * Set the result. Any handler will be called, if there is one, and the promise will be marked as completed. + *

+ * Any handler set on the associated promise will be called. + * + * @param result the result + * @throws IllegalStateException when the promise is already completed + */ + void complete(T result); + + /** + * Calls {@code complete(null)} + * + * @throws IllegalStateException when the promise is already completed + */ + void complete(); + + /** + * Set the failure. Any handler will be called, if there is one, and the future will be marked as completed. + * + * @param cause the failure cause + * @throws IllegalStateException when the promise is already completed + */ + void fail(Throwable cause); + + /** + * Calls {@link #fail(Throwable)} with the {@code message}. + * + * @param message the failure message + * @throws IllegalStateException when the promise is already completed + */ + void fail(String message); + + /** + * Like {@link #complete(Object)} but returns {@code false} when the promise is already completed instead of throwing + * an {@link IllegalStateException}, it returns {@code true} otherwise. + * + * @param result the result + * @return {@code false} when the future is already completed + */ + boolean tryComplete(T result); + + /** + * Calls {@code tryComplete(null)}. + * + * @return {@code false} when the future is already completed + */ + boolean tryComplete(); + + /** + * Like {@link #fail(Throwable)} but returns {@code false} when the promise is already completed instead of throwing + * an {@link IllegalStateException}, it returns {@code true} otherwise. + * + * @param cause the failure cause + * @return {@code false} when the future is already completed + */ + boolean tryFail(Throwable cause); + + /** + * Calls {@link #fail(Throwable)} with the {@code message}. + * + * @param message the failure message + * @return false when the future is already completed + */ + boolean tryFail(String message); + + /** + * @return the {@link Future} associated with this promise, it can be used to be aware of the promise completion + */ + @CacheReturn + Future future(); + +} diff --git a/src/main/java/io/vertx/core/Verticle.java b/src/main/java/io/vertx/core/Verticle.java index 52cc32c01..8148704d9 100644 --- a/src/main/java/io/vertx/core/Verticle.java +++ b/src/main/java/io/vertx/core/Verticle.java @@ -26,6 +26,7 @@ package io.vertx.core; * * @author Tim Fox */ +@SuppressWarnings( "deprecation" ) public interface Verticle { /** @@ -50,22 +51,22 @@ public interface Verticle { *

* Vert.x calls this method when deploying the instance. You do not call it yourself. *

- * A future is passed into the method, and when deployment is complete the verticle should either call - * {@link io.vertx.core.Future#complete} or {@link io.vertx.core.Future#fail} the future. + * A promise is passed into the method, and when deployment is complete the verticle should either call + * {@link io.vertx.core.Promise#complete} or {@link io.vertx.core.Promise#fail} the future. * - * @param startFuture the future + * @param startPromise the future */ - void start(Future startFuture) throws Exception; + void start(Promise startPromise) throws Exception; /** * Stop the verticle instance. *

* Vert.x calls this method when un-deploying the instance. You do not call it yourself. *

- * A future is passed into the method, and when un-deployment is complete the verticle should either call - * {@link io.vertx.core.Future#complete} or {@link io.vertx.core.Future#fail} the future. + * A promise is passed into the method, and when un-deployment is complete the verticle should either call + * {@link io.vertx.core.Promise#complete} or {@link io.vertx.core.Promise#fail} the future. * - * @param stopFuture the future + * @param stopPromise the future */ - void stop(Future stopFuture) throws Exception; + void stop(Promise stopPromise) throws Exception; } diff --git a/src/main/java/io/vertx/core/Vertx.java b/src/main/java/io/vertx/core/Vertx.java index c1329f975..720ee13f3 100644 --- a/src/main/java/io/vertx/core/Vertx.java +++ b/src/main/java/io/vertx/core/Vertx.java @@ -503,7 +503,7 @@ public interface Vertx extends Measured { * (e.g. on the original event loop of the caller). *

* A {@code Future} instance is passed into {@code blockingCodeHandler}. When the blocking code successfully completes, - * the handler should call the {@link Future#complete} or {@link Future#complete(Object)} method, or the {@link Future#fail} + * the handler should call the {@link Promise#complete} or {@link Promise#complete(Object)} method, or the {@link Promise#fail} * method if it failed. *

* In the {@code blockingCodeHandler} the current context remains the original context and therefore any task @@ -525,12 +525,12 @@ public interface Vertx extends Measured { * guarantees * @param the type of the result */ - void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler); + void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler); /** * Like {@link #executeBlocking(Handler, boolean, Handler)} called with ordered = true. */ - void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler); + void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler); /** * Return the Netty EventLoopGroup used by Vert.x diff --git a/src/main/java/io/vertx/core/WorkerExecutor.java b/src/main/java/io/vertx/core/WorkerExecutor.java index ad3f5eb9e..a289dcc47 100644 --- a/src/main/java/io/vertx/core/WorkerExecutor.java +++ b/src/main/java/io/vertx/core/WorkerExecutor.java @@ -35,7 +35,7 @@ public interface WorkerExecutor extends Measured { * (i.e. on the original event loop of the caller). *

* A {@code Future} instance is passed into {@code blockingCodeHandler}. When the blocking code successfully completes, - * the handler should call the {@link Future#complete} or {@link Future#complete(Object)} method, or the {@link Future#fail} + * the handler should call the {@link Promise#complete} or {@link Promise#complete(Object)} method, or the {@link Promise#fail} * method if it failed. *

* In the {@code blockingCodeHandler} the current context remains the original context and therefore any task @@ -48,12 +48,12 @@ public interface WorkerExecutor extends Measured { * guarantees * @param the type of the result */ - void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler); + void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler); /** * Like {@link #executeBlocking(Handler, boolean, Handler)} called with ordered = true. */ - default void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler) { + default void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler) { executeBlocking(blockingCodeHandler, true, resultHandler); } diff --git a/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java b/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java index d88f6d259..70b91ae7a 100644 --- a/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java +++ b/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java @@ -236,17 +236,19 @@ public final class DnsClientImpl implements DnsClient { private class Query { final DatagramDnsQuery msg; - final Future> fut; + final Promise> promise; final String name; final DnsRecordType[] types; long timerID; public Query(String name, DnsRecordType[] types, Handler>> handler) { + Promise> promise = Promise.promise(); + promise.future().setHandler(handler); this.msg = new DatagramDnsQuery(null, dnsServer, ThreadLocalRandom.current().nextInt()).setRecursionDesired(options.isRecursionDesired()); for (DnsRecordType type: types) { msg.addRecord(DnsSection.QUESTION, new DefaultDnsQuestion(name, type, DnsRecord.CLASS_IN)); } - this.fut = Future.>future().setHandler(handler); + this.promise = promise; this.types = types; this.name = name; } @@ -256,7 +258,7 @@ public final class DnsClientImpl implements DnsClient { if (timerID >= 0) { vertx.cancelTimer(timerID); } - fut.tryFail(cause); + promise.tryFail(cause); } void handle(DnsResponse msg) { @@ -279,7 +281,7 @@ public final class DnsClientImpl implements DnsClient { Collections.sort((List) records); } actualCtx.executeFromIO(v -> { - fut.tryComplete(records); + promise.tryComplete(records); }); } else { actualCtx.executeFromIO(new DnsException(code), this::fail); diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index a08ef56bc..26de71507 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -17,6 +17,7 @@ import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.*; import io.vertx.core.impl.ContextInternal; @@ -460,7 +461,7 @@ public class EventBusImpl implements EventBus, MetricsProvider { message.setReplyAddress(replyAddress); HandlerRegistration registration = new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, src); ReplyHandler handler = new ReplyHandler<>(registration, timeout); - handler.result.setHandler(replyHandler); + handler.result.future().setHandler(replyHandler); registration.handler(handler); return handler; } else { @@ -470,13 +471,13 @@ public class EventBusImpl implements EventBus, MetricsProvider { public class ReplyHandler implements Handler> { - final Future> result; + final Promise> result; final HandlerRegistration registration; final long timeoutID; public Object trace; ReplyHandler(HandlerRegistration registration, long timeout) { - this.result = Future.future(); + this.result = Promise.promise(); this.registration = registration; this.timeoutID = vertx.setTimer(timeout, id -> { fail(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + registration.address + ", repliedAddress: " + registration.repliedAddress)); diff --git a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java index d5ba3130c..bcf7ed8e4 100644 --- a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java +++ b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java @@ -11,6 +11,22 @@ package io.vertx.core.file.impl; +import io.netty.buffer.ByteBuf; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.AsyncFile; +import io.vertx.core.file.FileSystemException; +import io.vertx.core.file.OpenOptions; +import io.vertx.core.impl.Arguments; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +import io.vertx.core.streams.impl.InboundBuffer; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; @@ -25,21 +41,6 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.buffer.ByteBuf; -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.file.AsyncFile; -import io.vertx.core.file.FileSystemException; -import io.vertx.core.file.OpenOptions; -import io.vertx.core.impl.Arguments; -import io.vertx.core.impl.ContextInternal; -import io.vertx.core.impl.VertxInternal; -import io.vertx.core.impl.logging.Logger; -import io.vertx.core.impl.logging.LoggerFactory; -import io.vertx.core.streams.impl.InboundBuffer; - /** * * This class is optimised for performance when used on the same event loop that is was passed to the handler with. @@ -398,7 +399,7 @@ public class AsyncFileImpl implements AsyncFile { private synchronized void doFlush(Handler> handler) { checkClosed(); - context.executeBlockingInternal((Future fut) -> { + context.executeBlockingInternal((Promise fut) -> { try { ch.force(false); fut.complete(); diff --git a/src/main/java/io/vertx/core/file/impl/FileSystemImpl.java b/src/main/java/io/vertx/core/file/impl/FileSystemImpl.java index b51fc253c..622166ae2 100644 --- a/src/main/java/io/vertx/core/file/impl/FileSystemImpl.java +++ b/src/main/java/io/vertx/core/file/impl/FileSystemImpl.java @@ -12,8 +12,8 @@ package io.vertx.core.file.impl; import io.vertx.core.AsyncResult; -import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.AsyncFile; import io.vertx.core.file.CopyOptions; @@ -951,7 +951,7 @@ public class FileSystemImpl implements FileSystem { }; } - protected abstract class BlockingAction implements Handler> { + protected abstract class BlockingAction implements Handler> { private final Handler> handler; protected final ContextInternal context; @@ -968,7 +968,7 @@ public class FileSystemImpl implements FileSystem { } @Override - public void handle(Future fut) { + public void handle(Promise fut) { try { T result = perform(); fut.complete(result); diff --git a/src/main/java/io/vertx/core/http/ServerWebSocket.java b/src/main/java/io/vertx/core/http/ServerWebSocket.java index a7dfa96d9..84ec3da44 100644 --- a/src/main/java/io/vertx/core/http/ServerWebSocket.java +++ b/src/main/java/io/vertx/core/http/ServerWebSocket.java @@ -19,6 +19,7 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.streams.ReadStream; @@ -165,7 +166,7 @@ public interface ServerWebSocket extends WebSocketBase { * * The provided future might be completed by the WebSocket itself, e.g calling the {@link #close()} method * will try to accept the handshake and close the WebSocket afterward. Thus it is advised to try to complete - * the {@code future} with {@link Future#tryComplete} or {@link Future#tryFail}. + * the {@code future} with {@link Promise#tryComplete} or {@link Promise#tryFail}. *

* This method should be called from the WebSocket handler to explicitly set an asynchronous handshake. *

@@ -174,6 +175,13 @@ public interface ServerWebSocket extends WebSocketBase { * @param future the future to complete with * @throws IllegalStateException when the WebSocket has already an asynchronous result */ + void setHandshake(Promise future); + + /** + * @deprecated instead use {@link #setHandshake(Promise)} + */ + @GenIgnore + @Deprecated void setHandshake(Future future); /** diff --git a/src/main/java/io/vertx/core/http/impl/FileStreamChannel.java b/src/main/java/io/vertx/core/http/impl/FileStreamChannel.java index 26240fddf..aa4e545d5 100644 --- a/src/main/java/io/vertx/core/http/impl/FileStreamChannel.java +++ b/src/main/java/io/vertx/core/http/impl/FileStreamChannel.java @@ -34,6 +34,7 @@ import io.netty.handler.stream.ChunkedWriteHandler; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import java.io.RandomAccessFile; import java.net.SocketAddress; @@ -56,7 +57,7 @@ class FileStreamChannel extends AbstractChannel { private final VertxHttp2Stream stream; FileStreamChannel( - Future result, + Promise result, VertxHttp2Stream stream, long offset, long length) { diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index b5e41734c..8b5e77256 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -48,7 +48,6 @@ import java.net.URI; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.stream.Collectors; import static io.vertx.core.http.HttpHeaders.*; @@ -194,7 +193,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme private final int id; private final Http1xClientConnection conn; private final ContextInternal context; - private final Future fut; + private final Promise promise; private final InboundBuffer queue; private HttpClientRequestImpl request; private HttpClientResponseImpl response; @@ -208,9 +207,11 @@ class Http1xClientConnection extends Http1xConnectionBase impleme StreamImpl(ContextInternal context, Http1xClientConnection conn, int id, Handler> handler) { this.context = context; this.conn = conn; - this.fut = Future.future().setHandler(handler); + this.promise = Promise.promise(); this.id = id; this.queue = new InboundBuffer<>(context, 5); + + promise.future().setHandler(handler); } private void append(StreamImpl s) { @@ -517,12 +518,12 @@ class Http1xClientConnection extends Http1xConnectionBase impleme void handleException(Throwable cause) { HttpClientRequestImpl request; HttpClientResponseImpl response; - Future fut; + Promise promise; boolean requestEnded; synchronized (conn) { request = this.request; response = this.response; - fut = this.fut; + promise = this.promise; requestEnded = this.requestEnded; } if (request != null) { @@ -535,7 +536,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme response.handleException(cause); } } else { - fut.tryFail(cause); + promise.tryFail(cause); } } } @@ -663,7 +664,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme checkLifecycle(); } if (next != null) { - next.fut.complete(next); + next.promise.complete(next); } } @@ -848,7 +849,7 @@ class Http1xClientConnection extends Http1xConnectionBase impleme } requestInProgress = stream; } - stream.context.dispatch(Future.succeededFuture(stream), stream.fut); + stream.context.dispatch(Future.succeededFuture(stream), stream.promise); } private void recycle() { diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java index 63a9409d1..9a61324ca 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java @@ -24,6 +24,7 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.*; import io.vertx.core.impl.ContextInternal; @@ -204,7 +205,7 @@ public class Http2ServerConnection extends Http2ConnectionBase { private final String uri; private final String contentEncoding; private Http2ServerResponseImpl response; - private final Future completionHandler; + private final Promise completionHandler; public Push(Http2Stream stream, ContextInternal context, @@ -214,10 +215,12 @@ public class Http2ServerConnection extends Http2ConnectionBase { boolean writable, Handler> completionHandler) { super(Http2ServerConnection.this, context, stream, writable); + Promise promise = Promise.promise(); + promise.future().setHandler(completionHandler); this.method = method; this.uri = uri; this.contentEncoding = contentEncoding; - this.completionHandler = Future.future().setHandler(completionHandler); + this.completionHandler = promise; } @Override diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java b/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java index 945c7b4b8..8956ee0ed 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java @@ -26,6 +26,7 @@ import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpMethod; @@ -614,8 +615,8 @@ public class Http2ServerResponseImpl implements HttpServerResponse { } checkSendHeaders(false); - Future result = Future.future(); - result.setHandler(ar -> { + Promise result = Promise.promise(); + result.future().setHandler(ar -> { if (ar.succeeded()) { bytesWritten += ar.result(); end(); diff --git a/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java b/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java index 0ed0d1a00..ef30ef384 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java +++ b/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java @@ -20,6 +20,7 @@ import io.netty.handler.timeout.IdleStateHandler; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpVersion; import io.vertx.core.http.impl.pool.ConnectResult; @@ -93,18 +94,19 @@ class HttpChannelConnector implements ConnectionProvider { @Override public void connect(ConnectionListener listener, ContextInternal context, Handler>> handler) { - Future> future = Future.>future().setHandler(handler); + Promise> promise = Promise.promise(); + promise.future().setHandler(handler); try { - doConnect(listener, context, future); + doConnect(listener, context, promise); } catch(Exception e) { - future.tryFail(e); + promise.tryFail(e); } } private void doConnect( ConnectionListener listener, ContextInternal context, - Future> future) { + Promise> future) { boolean domainSocket = server.path() != null; boolean useAlpn = options.isUseAlpn(); @@ -200,7 +202,7 @@ class HttpChannelConnector implements ConnectionProvider { boolean ssl, ContextInternal context, Channel ch, long weight, - Future> future) { + Promise> future) { boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade(); VertxHandler clientHandler = VertxHandler.create(context, chctx -> { Http1xClientConnection conn = new Http1xClientConnection(listener, upgrade ? HttpVersion.HTTP_1_1 : version, client, endpointMetric, chctx, ssl, server, context, metrics); @@ -229,7 +231,7 @@ class HttpChannelConnector implements ConnectionProvider { private void http2Connected(ConnectionListener listener, ContextInternal context, Channel ch, - Future> future) { + Promise> future) { try { VertxHttp2ConnectionHandler clientHandler = Http2ClientConnection.createHttp2ConnectionHandler(client, endpointMetric, listener, context, null, (conn, concurrency) -> { future.complete(new ConnectResult<>(conn, concurrency, http2Weight)); @@ -241,7 +243,7 @@ class HttpChannelConnector implements ConnectionProvider { } } - private void connectFailed(Channel ch, ConnectionListener listener, Throwable t, Future> future) { + private void connectFailed(Channel ch, ConnectionListener listener, Throwable t, Promise> future) { if (ch != null) { try { ch.close(); diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java index b338a6024..15c1831ea 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java @@ -53,7 +53,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http static final Logger log = LoggerFactory.getLogger(HttpClientRequestImpl.class); private final VertxInternal vertx; - private final Future respFut; + private final Promise responsePromise; private boolean chunked; private String hostHeader; private String rawMethod; @@ -84,7 +84,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http this.chunked = false; this.vertx = vertx; this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY; - this.respFut = Future.future(); + this.responsePromise = Promise.promise(); } @Override @@ -100,7 +100,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } } handler.handle(t); - respFut.tryFail(t); + responsePromise.tryFail(t); } @Override @@ -119,7 +119,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http if (handler != null) { checkComplete(); } - respFut.setHandler(handler); + responsePromise.future().setHandler(handler); return this; } @@ -388,7 +388,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } private void handleNextRequest(HttpClientRequestImpl next, long timeoutMs) { - next.handler(respFut.getHandler()); + next.handler(responsePromise.future().getHandler()); next.exceptionHandler(exceptionHandler()); exceptionHandler(null); next.pushHandler = pushHandler; @@ -400,8 +400,9 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http if (headers != null && next.headers == null) { next.headers().addAll(headers); } - Future fut = Future.future(); - fut.setHandler(ar -> { + Promise promise = Promise.promise(); + Future future = promise.future(); + future.setHandler(ar -> { if (ar.succeeded()) { if (timeoutMs > 0) { next.setTimeout(timeoutMs); @@ -412,19 +413,19 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } }); if (exceptionOccurred != null) { - fut.fail(exceptionOccurred); + promise.fail(exceptionOccurred); } else if (completed) { - fut.complete(); + promise.complete(); } else { exceptionHandler(err -> { - if (!fut.isComplete()) { - fut.fail(err); + if (!future.isComplete()) { + promise.fail(err); } }); completionHandler = v -> { - if (!fut.isComplete()) { - fut.complete(); + if (!future.isComplete()) { + promise.complete(); } }; } @@ -451,7 +452,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http continueHandler.handle(null); } } else { - respFut.complete(resp); + responsePromise.complete(resp); } } } @@ -736,7 +737,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http } private void checkResponseHandler() { - if (stream == null && !connecting && respFut.getHandler() == null) { + if (stream == null && !connecting && responsePromise.future().getHandler() == null) { throw new IllegalStateException("You must set a response handler before connecting to the server"); } } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java index 183f3327a..5bbee5d3a 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java @@ -14,9 +14,9 @@ package io.vertx.core.http.impl; import io.netty.handler.codec.http2.Http2Stream; import io.vertx.codegen.annotations.Nullable; import io.vertx.core.AsyncResult; -import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.*; import io.vertx.core.net.SocketAddress; @@ -30,7 +30,7 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase { private final Http2ClientConnection.Http2ClientStream stream; private final String rawMethod; private final MultiMap headers; - private final Future respHandler; + private final Promise responsePromise; public HttpClientRequestPushPromise( Http2ClientConnection conn, @@ -48,7 +48,7 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase { this.stream = new Http2ClientConnection.Http2ClientStream(conn, conn.getContext(), this, stream, false); this.rawMethod = rawMethod; this.headers = headers; - this.respHandler = Future.future(); + this.responsePromise = Promise.promise(); } Http2ClientConnection.Http2ClientStream getStream() { @@ -57,7 +57,7 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase { @Override protected void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs) { - respHandler.complete(resp); + responsePromise.complete(resp); } @Override @@ -71,7 +71,7 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase { @Override public synchronized HttpClientRequest handler(Handler> handler) { - respHandler.setHandler(handler); + responsePromise.future().setHandler(handler); return this; } diff --git a/src/main/java/io/vertx/core/http/impl/ServerWebSocketImpl.java b/src/main/java/io/vertx/core/http/impl/ServerWebSocketImpl.java index 4d58250bb..2be84bd18 100644 --- a/src/main/java/io/vertx/core/http/impl/ServerWebSocketImpl.java +++ b/src/main/java/io/vertx/core/http/impl/ServerWebSocketImpl.java @@ -20,10 +20,10 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.http.ServerWebSocket; import io.vertx.core.http.WebSocketFrame; import io.vertx.core.impl.ContextInternal; -import io.vertx.core.impl.VertxInternal; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; @@ -54,7 +54,7 @@ public class ServerWebSocketImpl extends WebSocketImplBase private final MultiMap headers; private HttpServerRequestImpl request; private Integer status; - private Future handshakeFuture; + private Promise handshakePromise; ServerWebSocketImpl(ContextInternal context, Http1xServerConnection conn, @@ -130,10 +130,10 @@ public class ServerWebSocketImpl extends WebSocketImplBase synchronized (conn) { checkClosed(); if (status == null) { - if (handshakeFuture == null) { + if (handshakePromise == null) { tryHandshake(101); } else { - handshakeFuture.tryComplete(101); + handshakePromise.tryComplete(101); } } } @@ -195,8 +195,8 @@ public class ServerWebSocketImpl extends WebSocketImplBase Boolean tryHandshake(int sc) { synchronized (conn) { - if (status == null && handshakeFuture == null) { - setHandshake(Future.succeededFuture(sc)); + if (status == null && handshakePromise == null) { + setHandshake(Promise.succeededPromise(sc)); } return status == null ? null : status == sc; } @@ -204,16 +204,21 @@ public class ServerWebSocketImpl extends WebSocketImplBase @Override public void setHandshake(Future future) { - if (future == null) { + setHandshake((Promise) future); + } + + @Override + public void setHandshake(Promise promise) { + if (promise == null) { throw new NullPointerException(); } synchronized (conn) { - if (handshakeFuture != null) { + if (handshakePromise != null) { throw new IllegalStateException(); } - handshakeFuture = future; + handshakePromise = promise; } - future.setHandler(ar -> { + promise.future().setHandler(ar -> { if (ar.succeeded()) { handleHandshake(ar.result()); } else { diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java index 5bbff21bb..37c3c9785 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2NetSocket.java @@ -21,6 +21,7 @@ import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.StreamPriority; import io.vertx.core.http.StreamResetException; @@ -269,8 +270,8 @@ class VertxHttp2NetSocket extends VertxHttp2Strea long contentLength = Math.min(length, file.length() - offset); - Future result = Future.future(); - result.setHandler(ar -> { + Promise result = Promise.promise(); + result.future().setHandler(ar -> { if (resultHandler != null) { resultCtx.runOnContext(v -> { resultHandler.handle(Future.succeededFuture()); diff --git a/src/main/java/io/vertx/core/impl/AbstractContext.java b/src/main/java/io/vertx/core/impl/AbstractContext.java index 0435853b5..edf19abe5 100644 --- a/src/main/java/io/vertx/core/impl/AbstractContext.java +++ b/src/main/java/io/vertx/core/impl/AbstractContext.java @@ -11,8 +11,8 @@ package io.vertx.core.impl; import io.vertx.core.AsyncResult; -import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Starter; import io.vertx.core.impl.launcher.VertxCommandLauncher; @@ -108,7 +108,7 @@ abstract class AbstractContext implements ContextInternal { } @Override - public final void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler) { + public final void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler) { executeBlocking(blockingCodeHandler, true, resultHandler); } diff --git a/src/main/java/io/vertx/core/impl/CloseHooks.java b/src/main/java/io/vertx/core/impl/CloseHooks.java index 5dcf329b4..401bb8ed3 100644 --- a/src/main/java/io/vertx/core/impl/CloseHooks.java +++ b/src/main/java/io/vertx/core/impl/CloseHooks.java @@ -15,6 +15,7 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Closeable; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.impl.logging.Logger; import java.util.HashSet; @@ -86,8 +87,8 @@ class CloseHooks { AtomicInteger count = new AtomicInteger(); AtomicBoolean failed = new AtomicBoolean(); for (Closeable hook : copy) { - Future a = Future.future(); - a.setHandler(ar -> { + Promise promise = Promise.promise(); + promise.future().setHandler(ar -> { if (ar.failed()) { if (failed.compareAndSet(false, true)) { // Only report one failure @@ -101,10 +102,10 @@ class CloseHooks { } }); try { - hook.close(a); + hook.close(promise); } catch (Throwable t) { log.warn("Failed to run close hooks", t); - a.tryFail(t); + promise.tryFail(t); } } } else { diff --git a/src/main/java/io/vertx/core/impl/CompositeFutureImpl.java b/src/main/java/io/vertx/core/impl/CompositeFutureImpl.java index 4aaa30068..b3a1490b1 100644 --- a/src/main/java/io/vertx/core/impl/CompositeFutureImpl.java +++ b/src/main/java/io/vertx/core/impl/CompositeFutureImpl.java @@ -21,7 +21,7 @@ import java.util.function.Function; /** * @author Julien Viet */ -public class CompositeFutureImpl implements CompositeFuture, Handler> { +public class CompositeFutureImpl implements CompositeFuture { private static final Handler> NO_HANDLER = c -> {}; @@ -216,66 +216,6 @@ public class CompositeFutureImpl implements CompositeFuture, Handler> handler = setCompleted(null); - if (handler != null) { - handler.handle(this); - return true; - } else { - return false; - } - } - - @Override - public boolean tryComplete() { - return tryComplete(this); - } - - @Override - public boolean tryFail(Throwable cause) { - Handler> handler = setCompleted(cause); - if (handler != null) { - handler.handle(this); - return true; - } else { - return false; - } - } - - @Override - public boolean tryFail(String failureMessage) { - return tryFail(new NoStackTraceThrowable(failureMessage)); - } - private Handler> setCompleted(Throwable cause) { synchronized (this) { if (completed) { @@ -286,13 +226,4 @@ public class CompositeFutureImpl implements CompositeFuture, Handler asyncResult) { - if (asyncResult.succeeded()) { - complete(this); - } else { - fail(asyncResult.cause()); - } - } } diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index 913c30553..d3e0893ab 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -18,6 +18,7 @@ import io.vertx.core.Closeable; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.core.impl.logging.Logger; @@ -143,21 +144,21 @@ abstract class ContextImpl extends AbstractContext { } @Override - public void executeBlockingInternal(Handler> action, Handler> resultHandler) { + public void executeBlockingInternal(Handler> action, Handler> resultHandler) { executeBlocking(this, action, resultHandler, internalBlockingPool, internalOrderedTasks); } @Override - public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler) { + public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler) { executeBlocking(this, blockingCodeHandler, resultHandler, workerPool, ordered ? orderedTasks : null); } @Override - public void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler) { + public void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler) { executeBlocking(this, blockingCodeHandler, resultHandler, workerPool, queue); } - static void executeBlocking(ContextInternal context, Handler> blockingCodeHandler, + static void executeBlocking(ContextInternal context, Handler> blockingCodeHandler, Handler> resultHandler, WorkerPool workerPool, TaskQueue queue) { PoolMetrics metrics = workerPool.metrics(); @@ -168,7 +169,8 @@ abstract class ContextImpl extends AbstractContext { if (metrics != null) { execMetric = metrics.begin(queueMetric); } - Future res = Future.future(); + Promise res = Promise.promise(); + Future fut = res.future(); context.dispatch(res, f -> { try { blockingCodeHandler.handle(res); @@ -177,10 +179,10 @@ abstract class ContextImpl extends AbstractContext { } }); if (metrics != null) { - metrics.end(execMetric, res.succeeded()); + metrics.end(execMetric, fut.succeeded()); } if (resultHandler != null) { - res.setHandler(ar -> context.runOnContext(v -> resultHandler.handle(ar))); + fut.setHandler(ar -> context.runOnContext(v -> resultHandler.handle(ar))); } }; Executor exec = workerPool.executor(); @@ -276,17 +278,17 @@ abstract class ContextImpl extends AbstractContext { return delegate.tracer(); } - public final void executeBlockingInternal(Handler> action, Handler> resultHandler) { + public final void executeBlockingInternal(Handler> action, Handler> resultHandler) { ContextImpl.executeBlocking(this, action, resultHandler, delegate.internalBlockingPool, delegate.internalOrderedTasks); } @Override - public final void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler) { + public final void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler) { ContextImpl.executeBlocking(this, blockingCodeHandler, resultHandler, delegate.workerPool, ordered ? delegate.orderedTasks : null); } @Override - public final void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler) { + public final void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler) { ContextImpl.executeBlocking(this, blockingCodeHandler, resultHandler, delegate.workerPool, queue); } diff --git a/src/main/java/io/vertx/core/impl/ContextInternal.java b/src/main/java/io/vertx/core/impl/ContextInternal.java index 031357007..b85583d70 100644 --- a/src/main/java/io/vertx/core/impl/ContextInternal.java +++ b/src/main/java/io/vertx/core/impl/ContextInternal.java @@ -14,8 +14,8 @@ package io.vertx.core.impl; import io.netty.channel.EventLoop; import io.vertx.core.AsyncResult; import io.vertx.core.Context; -import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.spi.tracing.VertxTracer; @@ -42,12 +42,12 @@ public interface ContextInternal extends Context { * Like {@link #executeBlocking(Handler, boolean, Handler)} but uses the {@code queue} to order the tasks instead * of the internal queue of this context. */ - void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler); + void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler); /** * Execute an internal task on the internal blocking ordered executor. */ - void executeBlockingInternal(Handler> action, Handler> resultHandler); + void executeBlockingInternal(Handler> action, Handler> resultHandler); /** * @return the deployment associated with this context or {@code null} diff --git a/src/main/java/io/vertx/core/impl/DeploymentManager.java b/src/main/java/io/vertx/core/impl/DeploymentManager.java index 0230eef6c..72e06e725 100644 --- a/src/main/java/io/vertx/core/impl/DeploymentManager.java +++ b/src/main/java/io/vertx/core/impl/DeploymentManager.java @@ -16,6 +16,7 @@ import io.vertx.core.Context; import io.vertx.core.DeploymentOptions; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.ServiceHelper; import io.vertx.core.Verticle; import io.vertx.core.json.JsonObject; @@ -155,21 +156,21 @@ public class DeploymentManager { Handler> completionHandler) { if (iter.hasNext()) { VerticleFactory verticleFactory = iter.next(); - Future fut = Future.future(); + Promise promise = Promise.promise(); if (verticleFactory.requiresResolve()) { try { - verticleFactory.resolve(identifier, options, cl, fut); + verticleFactory.resolve(identifier, options, cl, promise); } catch (Exception e) { try { - fut.fail(e); + promise.fail(e); } catch (Exception ignore) { // Too late } } } else { - fut.complete(identifier); + promise.complete(identifier); } - fut.setHandler(ar -> { + promise.future().setHandler(ar -> { Throwable err; if (ar.succeeded()) { String resolvedName = ar.result(); @@ -482,8 +483,9 @@ public class DeploymentManager { context.runOnContext(v -> { try { verticle.init(vertx, context); - Future startFuture = Future.future(); - verticle.start(startFuture); + Promise startPromise = Promise.promise(); + Future startFuture = startPromise.future(); + verticle.start(startPromise); startFuture.setHandler(ar -> { if (ar.succeeded()) { if (parent != null) { @@ -623,7 +625,8 @@ public class DeploymentManager { for (VerticleHolder verticleHolder: verticles) { ContextImpl context = verticleHolder.context; context.runOnContext(v -> { - Future stopFuture = Future.future(); + Promise stopPromise = Promise.promise(); + Future stopFuture = stopPromise.future(); AtomicBoolean failureReported = new AtomicBoolean(); stopFuture.setHandler(ar -> { deployments.remove(deploymentID); @@ -645,9 +648,9 @@ public class DeploymentManager { }); }); try { - verticleHolder.verticle.stop(stopFuture); + verticleHolder.verticle.stop(stopPromise); } catch (Throwable t) { - stopFuture.fail(t); + stopPromise.fail(t); } }); } diff --git a/src/main/java/io/vertx/core/impl/FailedFuture.java b/src/main/java/io/vertx/core/impl/FailedFuture.java index 2a57745ec..71f21a68f 100644 --- a/src/main/java/io/vertx/core/impl/FailedFuture.java +++ b/src/main/java/io/vertx/core/impl/FailedFuture.java @@ -14,11 +14,12 @@ package io.vertx.core.impl; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; /** * @author Julien Viet */ -public class FailedFuture implements Future { +public class FailedFuture implements Future, Promise { private final Throwable cause; @@ -119,6 +120,11 @@ public class FailedFuture implements Future { throw new IllegalStateException("Result is already complete: failed"); } + @Override + public Future future() { + return this; + } + @Override public String toString() { return "Future{cause=" + cause.getMessage() + "}"; diff --git a/src/main/java/io/vertx/core/impl/FutureFactoryImpl.java b/src/main/java/io/vertx/core/impl/FutureFactoryImpl.java index 34f6fc77f..3342c7687 100644 --- a/src/main/java/io/vertx/core/impl/FutureFactoryImpl.java +++ b/src/main/java/io/vertx/core/impl/FutureFactoryImpl.java @@ -12,6 +12,7 @@ package io.vertx.core.impl; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.spi.FutureFactory; /** @@ -21,6 +22,33 @@ public class FutureFactoryImpl implements FutureFactory { private static final SucceededFuture EMPTY = new SucceededFuture<>(null); + @Override + public Promise promise() { + return new FutureImpl<>(); + } + + @Override + public Promise succeededPromise() { + @SuppressWarnings("unchecked") + Promise promise = EMPTY; + return promise; + } + + @Override + public Promise succeededPromise(T result) { + return new SucceededFuture<>(result); + } + + @Override + public Promise failedPromise(Throwable t) { + return new FailedFuture<>(t); + } + + @Override + public Promise failurePromise(String failureMessage) { + return new FailedFuture<>(failureMessage); + } + @Override public Future future() { return new FutureImpl<>(); diff --git a/src/main/java/io/vertx/core/impl/FutureImpl.java b/src/main/java/io/vertx/core/impl/FutureImpl.java index 4906b9e6e..cb704d1f8 100644 --- a/src/main/java/io/vertx/core/impl/FutureImpl.java +++ b/src/main/java/io/vertx/core/impl/FutureImpl.java @@ -14,8 +14,9 @@ package io.vertx.core.impl; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; -class FutureImpl implements Future { +class FutureImpl implements Promise, Future { private boolean failed; private boolean succeeded; @@ -177,6 +178,11 @@ class FutureImpl implements Future { return tryFail(new NoStackTraceThrowable(failureMessage)); } + @Override + public Future future() { + return this; + } + @Override public String toString() { synchronized (this) { diff --git a/src/main/java/io/vertx/core/impl/SucceededFuture.java b/src/main/java/io/vertx/core/impl/SucceededFuture.java index 17d66b548..f6ca995c2 100644 --- a/src/main/java/io/vertx/core/impl/SucceededFuture.java +++ b/src/main/java/io/vertx/core/impl/SucceededFuture.java @@ -14,11 +14,12 @@ package io.vertx.core.impl; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; /** * @author Julien Viet */ -class SucceededFuture implements Future { +class SucceededFuture implements Future, Promise { private final T result; @@ -111,6 +112,11 @@ class SucceededFuture implements Future { throw new IllegalStateException("Result is already complete: succeeded"); } + @Override + public Future future() { + return this; + } + @Override public String toString() { return "Future{result=" + result + "}"; diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java index 4f0a3b7d3..20f693aed 100644 --- a/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -555,16 +555,16 @@ public class VertxImpl implements VertxInternal, MetricsProvider { closeHooks.run(ar -> { deploymentManager.undeployAll(ar1 -> { HAManager haManager = haManager(); - Future haFuture = Future.future(); + Promise haPromise = Promise.promise(); if (haManager != null) { this.executeBlocking(fut -> { haManager.stop(); fut.complete(); - }, false, haFuture); + }, false, haPromise); } else { - haFuture.complete(); + haPromise.complete(); } - haFuture.setHandler(ar2 -> { + haPromise.future().setHandler(ar2 -> { addressResolver.close(ar3 -> { eventBus.close(ar4 -> { closeClusterManager(ar5 -> { @@ -707,7 +707,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider { @Override public void undeploy(String deploymentID, Handler> completionHandler) { HAManager haManager = haManager(); - Future haFuture = Future.future(); + Promise haFuture = Promise.promise(); if (haManager != null && haManager.isEnabled()) { this.executeBlocking(fut -> { haManager.removeFromHA(deploymentID); @@ -716,10 +716,10 @@ public class VertxImpl implements VertxInternal, MetricsProvider { } else { haFuture.complete(); } - haFuture.compose(v -> { - Future deploymentFuture = Future.future(); + haFuture.future().compose(v -> { + Promise deploymentFuture = Promise.promise(); deploymentManager.undeployVerticle(deploymentID, deploymentFuture); - return deploymentFuture; + return deploymentFuture.future(); }).setHandler(completionHandler); } @@ -744,21 +744,21 @@ public class VertxImpl implements VertxInternal, MetricsProvider { } @Override - public void executeBlockingInternal(Handler> blockingCodeHandler, Handler> resultHandler) { + public void executeBlockingInternal(Handler> blockingCodeHandler, Handler> resultHandler) { ContextInternal context = getOrCreateContext(); context.executeBlockingInternal(blockingCodeHandler, resultHandler); } @Override - public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, + public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> asyncResultHandler) { ContextInternal context = getOrCreateContext(); context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler); } @Override - public void executeBlocking(Handler> blockingCodeHandler, + public void executeBlocking(Handler> blockingCodeHandler, Handler> asyncResultHandler) { executeBlocking(blockingCodeHandler, true, asyncResultHandler); } diff --git a/src/main/java/io/vertx/core/impl/VertxInternal.java b/src/main/java/io/vertx/core/impl/VertxInternal.java index d59e4333e..5d535cb8c 100644 --- a/src/main/java/io/vertx/core/impl/VertxInternal.java +++ b/src/main/java/io/vertx/core/impl/VertxInternal.java @@ -100,7 +100,7 @@ public interface VertxInternal extends Vertx { /** * Like {@link #executeBlocking(Handler, Handler)} but using the internal worker thread pool. */ - void executeBlockingInternal(Handler> blockingCodeHandler, Handler> resultHandler); + void executeBlockingInternal(Handler> blockingCodeHandler, Handler> resultHandler); ClusterManager getClusterManager(); diff --git a/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java b/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java index c18fdb18a..c29f7ed57 100644 --- a/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java +++ b/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java @@ -51,7 +51,7 @@ class WorkerExecutorImpl implements MetricsProvider, WorkerExecutorInternal { return pool; } - public synchronized void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> asyncResultHandler) { + public synchronized void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> asyncResultHandler) { if (closed) { throw new IllegalStateException("Worker executor closed"); } diff --git a/src/main/java/io/vertx/core/spi/FutureFactory.java b/src/main/java/io/vertx/core/spi/FutureFactory.java index 1ce2484a6..1042f544d 100644 --- a/src/main/java/io/vertx/core/spi/FutureFactory.java +++ b/src/main/java/io/vertx/core/spi/FutureFactory.java @@ -12,12 +12,23 @@ package io.vertx.core.spi; import io.vertx.core.Future; +import io.vertx.core.Promise; /** * @author Tim Fox */ public interface FutureFactory { + Promise promise(); + + Promise succeededPromise(); + + Promise succeededPromise(T result); + + Promise failedPromise(Throwable t); + + Promise failurePromise(String failureMessage); + Future future(); Future succeededFuture(); diff --git a/src/main/java/io/vertx/core/spi/VerticleFactory.java b/src/main/java/io/vertx/core/spi/VerticleFactory.java index 4eb37a9bc..d7a115ece 100644 --- a/src/main/java/io/vertx/core/spi/VerticleFactory.java +++ b/src/main/java/io/vertx/core/spi/VerticleFactory.java @@ -13,6 +13,7 @@ package io.vertx.core.spi; import io.vertx.core.DeploymentOptions; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Verticle; import io.vertx.core.Vertx; @@ -80,7 +81,7 @@ public interface VerticleFactory { * @param classLoader The classloader * @param resolution A future which will receive the result of the resolution. */ - default void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Future resolution) { + default void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Promise resolution) { resolution.complete(identifier); } diff --git a/src/main/java/io/vertx/core/streams/impl/PipeImpl.java b/src/main/java/io/vertx/core/streams/impl/PipeImpl.java index 5df462e76..5edd68f44 100644 --- a/src/main/java/io/vertx/core/streams/impl/PipeImpl.java +++ b/src/main/java/io/vertx/core/streams/impl/PipeImpl.java @@ -13,6 +13,7 @@ package io.vertx.core.streams.impl; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.VertxException; import io.vertx.core.streams.Pipe; import io.vertx.core.streams.ReadStream; @@ -22,7 +23,7 @@ public class PipeImpl implements Pipe { private static final Handler> NULL_HANDLER = ar -> {}; - private final Future result; + private final Promise result; private final ReadStream src; private boolean endOnSuccess = true; private boolean endOnFailure = true; @@ -30,7 +31,7 @@ public class PipeImpl implements Pipe { public PipeImpl(ReadStream src) { this.src = src; - this.result = Future.future(); + this.result = Promise.promise(); // Set handlers now src.endHandler(result::tryComplete); @@ -86,7 +87,7 @@ public class PipeImpl implements Pipe { }); ws.exceptionHandler(err -> result.tryFail(new WriteException(err))); src.resume(); - result.setHandler(ar -> { + result.future().setHandler(ar -> { try { src.handler(null); } catch (Exception ignore) { @@ -132,7 +133,7 @@ public class PipeImpl implements Pipe { dst.drainHandler(null); dst.exceptionHandler(null); } - if (result.isComplete()) { + if (result.future().isComplete()) { return; } } diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java index a36c84da0..9631630c5 100644 --- a/src/test/java/io/vertx/core/ContextTest.java +++ b/src/test/java/io/vertx/core/ContextTest.java @@ -148,7 +148,7 @@ public class ContextTest extends VertxTestBase { Context context = vertx.getOrCreateContext(); context.runOnContext(v -> { Thread expected = Thread.currentThread(); - context.executeBlocking(Future::complete, r -> { + context.executeBlocking(Promise::complete, r -> { assertSame(expected, Thread.currentThread()); testComplete(); }); @@ -307,7 +307,7 @@ public class ContextTest extends VertxTestBase { ctx.runOnContext(v -> { vertx.deployVerticle(new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { context.runOnContext(startFuture::complete); } }, ar -> { @@ -515,7 +515,7 @@ public class ContextTest extends VertxTestBase { CountDownLatch latch4 = new CountDownLatch(1); duplicated.runOnContext(v -> { - vertx.executeBlocking(Future::complete, res -> { + vertx.executeBlocking(Promise::complete, res -> { assertSame(duplicated, Vertx.currentContext()); latch4.countDown(); }); diff --git a/src/test/java/io/vertx/core/DeploymentTest.java b/src/test/java/io/vertx/core/DeploymentTest.java index 1f0db87ac..3a2cb5615 100644 --- a/src/test/java/io/vertx/core/DeploymentTest.java +++ b/src/test/java/io/vertx/core/DeploymentTest.java @@ -741,7 +741,7 @@ public class DeploymentTest extends VertxTestBase { AtomicInteger childUndeployed = new AtomicInteger(); vertx.deployVerticle(new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { vertx.deployVerticle(new AbstractVerticle() { @Override public void start() throws Exception { @@ -769,7 +769,7 @@ public class DeploymentTest extends VertxTestBase { AtomicInteger childUndeployed = new AtomicInteger(); vertx.deployVerticle(new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { CountDownLatch latch = new CountDownLatch(1); vertx.deployVerticle(new AbstractVerticle() { @Override @@ -798,10 +798,10 @@ public class DeploymentTest extends VertxTestBase { AtomicInteger parentFailed = new AtomicInteger(); vertx.deployVerticle(new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { vertx.deployVerticle(new AbstractVerticle() { @Override - public void start(Future fut) throws Exception { + public void start(Promise fut) throws Exception { vertx.setTimer(100, id -> { fut.complete(); }); @@ -947,11 +947,11 @@ public class DeploymentTest extends VertxTestBase { public void testChildUndeployedDirectly() throws Exception { Verticle parent = new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { Verticle child = new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { startFuture.complete(); // Undeploy it directly @@ -964,11 +964,6 @@ public class DeploymentTest extends VertxTestBase { })); } - - @Override - public void stop(Future stopFuture) throws Exception { - super.stop(stopFuture); - } }; vertx.deployVerticle(parent, onSuccess(depID -> { @@ -1133,7 +1128,7 @@ public class DeploymentTest extends VertxTestBase { public static class ParentVerticle extends AbstractVerticle { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { vertx.deployVerticle("java:" + ChildVerticle.class.getName(), ar -> { if (ar.succeeded()) { startFuture.complete(null); @@ -1249,7 +1244,7 @@ public class DeploymentTest extends VertxTestBase { public void testFailedVerticleStopNotCalled() { Verticle verticleChild = new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { startFuture.fail("wibble"); } @Override @@ -1260,7 +1255,7 @@ public class DeploymentTest extends VertxTestBase { }; Verticle verticleParent = new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { vertx.deployVerticle(verticleChild, onFailure(v -> { startFuture.complete(); })); @@ -1411,7 +1406,7 @@ public class DeploymentTest extends VertxTestBase { }; Verticle v = new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { this.context.addCloseHook(closeable); startFuture.fail("Fail to deploy."); } @@ -1436,7 +1431,7 @@ public class DeploymentTest extends VertxTestBase { vertx.deployVerticle(() -> { Verticle v = new AbstractVerticle() { @Override - public void start(final Future startFuture) throws Exception { + public void start(final Promise startFuture) throws Exception { startFuture.fail("Fail to deploy."); } }; @@ -1580,23 +1575,23 @@ public class DeploymentTest extends VertxTestBase { public class MyAsyncVerticle extends AbstractVerticle { - private final Consumer> startConsumer; - private final Consumer> stopConsumer; + private final Consumer> startConsumer; + private final Consumer> stopConsumer; - public MyAsyncVerticle(Consumer> startConsumer, Consumer> stopConsumer) { + public MyAsyncVerticle(Consumer> startConsumer, Consumer> stopConsumer) { this.startConsumer = startConsumer; this.stopConsumer = stopConsumer; } @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { if (startConsumer != null) { startConsumer.accept(startFuture); } } @Override - public void stop(Future stopFuture) throws Exception { + public void stop(Promise stopFuture) throws Exception { if (stopConsumer != null) { stopConsumer.accept(stopFuture); } diff --git a/src/test/java/io/vertx/core/FutureTest.java b/src/test/java/io/vertx/core/FutureTest.java index 00c3bfecb..3c3e4a024 100644 --- a/src/test/java/io/vertx/core/FutureTest.java +++ b/src/test/java/io/vertx/core/FutureTest.java @@ -11,10 +11,6 @@ package io.vertx.core; -import io.vertx.core.AsyncResult; -import io.vertx.core.CompositeFuture; -import io.vertx.core.Future; -import io.vertx.core.Handler; import io.vertx.core.impl.NoStackTraceThrowable; import io.vertx.test.core.VertxTestBase; import org.junit.Test; @@ -39,13 +35,13 @@ public class FutureTest extends VertxTestBase { @Test public void testCreateWithHandler() { AtomicInteger count = new AtomicInteger(); - AtomicReference> ref = new AtomicReference<>(); - Future f2 = Future.future(f1 -> { - assertFalse(f1.isComplete()); + AtomicReference> ref = new AtomicReference<>(); + Future f2 = Future.future(p1 -> { + assertFalse(p1.future().isComplete()); count.incrementAndGet(); - ref.set(f1); + ref.set(p1); }); - assertSame(f2, ref.get()); + assertSame(f2, ref.get().future()); assertEquals(1, count.get()); new Checker<>(f2).assertNotCompleted(); ref.set(null); @@ -55,7 +51,7 @@ public class FutureTest extends VertxTestBase { ref.set(f1); f1.complete("the-value"); }); - assertSame(f2, ref.get()); + assertSame(f2, ref.get().future()); assertEquals(1, count.get()); new Checker<>(f2).assertSucceeded("the-value"); ref.set(null); @@ -66,7 +62,7 @@ public class FutureTest extends VertxTestBase { ref.set(f1); f1.fail(cause); }); - assertSame(f2, ref.get()); + assertSame(f2, ref.get().future()); assertEquals(1, count.get()); new Checker<>(f2).assertFailed(cause); try { @@ -98,41 +94,41 @@ public class FutureTest extends VertxTestBase { } @Test - public void testSetResultOnCompletedFuture() { - ArrayList> futures = new ArrayList<>(); - futures.add(Future.succeededFuture()); - futures.add(Future.succeededFuture()); - futures.add(Future.succeededFuture(new Object())); - futures.add(Future.succeededFuture(new Object())); - futures.add(Future.failedFuture(new Exception())); - futures.add(Future.failedFuture(new Exception())); - for (Future future : futures) { + public void testSetResultOnCompletedPromise() { + ArrayList> promises = new ArrayList<>(); + promises.add(Promise.succeededPromise()); + promises.add(Promise.succeededPromise()); + promises.add(Promise.succeededPromise(new Object())); + promises.add(Promise.succeededPromise(new Object())); + promises.add(Promise.failedPromise(new Exception())); + promises.add(Promise.failedPromise(new Exception())); + for (Promise promise : promises) { try { - future.complete(new Object()); + promise.complete(new Object()); fail(); } catch (IllegalStateException ignore) { } - assertFalse(future.tryComplete(new Object())); + assertFalse(promise.tryComplete(new Object())); try { - future.complete(null); + promise.complete(null); fail(); } catch (IllegalStateException ignore) { } - assertFalse(future.tryComplete(null)); + assertFalse(promise.tryComplete(null)); try { - future.fail(new Exception()); + promise.fail(new Exception()); fail(); } catch (IllegalStateException ignore) { } - assertFalse(future.tryFail(new Exception())); + assertFalse(promise.tryFail(new Exception())); } } @Test public void testCallSetHandlerBeforeCompletion() { AtomicBoolean called = new AtomicBoolean(); - Future future = Future.future(); - future.setHandler(result -> { + Promise promise = Promise.promise(); + promise.future().setHandler(result -> { assertTrue(result.succeeded()); assertFalse(result.failed()); assertEquals(null, result.result()); @@ -140,12 +136,12 @@ public class FutureTest extends VertxTestBase { called.set(true); }); assertFalse(called.get()); - future.complete(null); + promise.complete(null); assertTrue(called.get()); called.set(false); Object foo = new Object(); - future = Future.future(); - future.setHandler(result -> { + promise = Promise.promise(); + promise.future().setHandler(result -> { called.set(true); assertTrue(result.succeeded()); assertFalse(result.failed()); @@ -153,12 +149,12 @@ public class FutureTest extends VertxTestBase { assertEquals(null, result.cause()); }); assertFalse(called.get()); - future.complete(foo); + promise.complete(foo); assertTrue(called.get()); called.set(false); Exception cause = new Exception(); - future = Future.future(); - future.setHandler(result -> { + promise = Promise.promise(); + promise.future().setHandler(result -> { called.set(true); assertFalse(result.succeeded()); assertTrue(result.failed()); @@ -166,7 +162,7 @@ public class FutureTest extends VertxTestBase { assertEquals(cause, result.cause()); }); assertFalse(called.get()); - future.fail(cause); + promise.fail(cause); assertTrue(called.get()); } @@ -208,14 +204,12 @@ public class FutureTest extends VertxTestBase { @Test public void testResolveFutureToHandler() { - Consumer>> consumer = handler -> { - handler.handle(Future.succeededFuture("the-result")); - }; - Future fut = Future.future(); - consumer.accept(fut); - assertTrue(fut.isComplete()); - assertTrue(fut.succeeded()); - assertEquals("the-result", fut.result()); + Consumer>> consumer = handler -> handler.handle(Future.succeededFuture("the-result")); + Promise promise = Promise.promise(); + consumer.accept(promise); + assertTrue(promise.future().isComplete()); + assertTrue(promise.future().succeeded()); + assertEquals("the-result", promise.future().result()); } @Test @@ -224,11 +218,11 @@ public class FutureTest extends VertxTestBase { Consumer>> consumer = handler -> { handler.handle(Future.failedFuture(cause)); }; - Future fut = Future.future(); - consumer.accept(fut); - assertTrue(fut.isComplete()); - assertTrue(fut.failed()); - assertEquals(cause, fut.cause()); + Promise promise = Promise.promise(); + consumer.accept(promise); + assertTrue(promise.future().isComplete()); + assertTrue(promise.future().failed()); + assertEquals(cause, promise.future().cause()); } @@ -242,39 +236,13 @@ public class FutureTest extends VertxTestBase { @Test public void testFailureFutureWithNullFailure() { - Future future = Future.future(); - future.fail((Throwable)null); - Checker checker = new Checker<>(future); + Promise promise = Promise.promise(); + promise.fail((Throwable)null); + Checker checker = new Checker<>(promise.future()); NoStackTraceThrowable failure = (NoStackTraceThrowable) checker.assertFailed(); assertNull(failure.getMessage()); } - @Test - public void testCompositeComplete() { - CompositeFuture composite = CompositeFuture.all(Future.future(), Future.future()); - Checker checker = new Checker<>(composite); - composite.complete(composite); - checker.assertSucceeded(composite); - composite = CompositeFuture.all(Future.future(), Future.future()); - checker = new Checker<>(composite); - composite.complete(); - checker.assertSucceeded(composite); - } - - @Test - public void testCompositeFail() { - Throwable cause = new Throwable(); - Future f1 = Future.future(); - Future f2 = Future.future(); - CompositeFuture composite = CompositeFuture.all(f1, f2); - Checker checker = new Checker<>(composite); - composite.fail(cause); - checker.assertFailed(cause); - f1.complete(); - f2.complete(); - checker.assertFailed(cause); - } - @Test public void testAllSucceeded() { testAllSucceeded(CompositeFuture::all); @@ -286,18 +254,20 @@ public class FutureTest extends VertxTestBase { } private void testAllSucceeded(BiFunction, Future, CompositeFuture> all) { - Future f1 = Future.future(); - Future f2 = Future.future(); + Promise p1 = Promise.promise(); + Future f1 = p1.future(); + Promise p2 = Promise.promise(); + Future f2 = p2.future(); CompositeFuture composite = all.apply(f1, f2); Checker checker = new Checker<>(composite); checker.assertNotCompleted(); assertEquals(null, composite.resultAt(0)); assertEquals(null, composite.resultAt(1)); - f1.complete("something"); + p1.complete("something"); checker.assertNotCompleted(); assertEquals("something", composite.resultAt(0)); assertEquals(null, composite.resultAt(1)); - f2.complete(3); + p2.complete(3); checker.assertSucceeded(composite); assertEquals("something", composite.resultAt(0)); assertEquals(3, (int)composite.resultAt(1)); @@ -320,13 +290,15 @@ public class FutureTest extends VertxTestBase { } private void testAllFailed(BiFunction, Future, CompositeFuture> all) { - Future f1 = Future.future(); - Future f2 = Future.future(); + Promise p1 = Promise.promise(); + Future f1 = p1.future(); + Promise p2 = Promise.promise(); + Future f2 = p2.future(); CompositeFuture composite = all.apply(f1, f2); Checker checker = new Checker<>(composite); - f1.complete("s"); + p1.complete("s"); Exception cause = new Exception(); - f2.fail(cause); + p2.fail(cause); checker.assertFailed(cause); assertEquals("s", composite.resultAt(0)); assertEquals(null, composite.resultAt(1)); @@ -378,16 +350,18 @@ public class FutureTest extends VertxTestBase { } private void testAnySucceeded1(BiFunction, Future, CompositeFuture> any) { - Future f1 = Future.future(); - Future f2 = Future.future(); + Promise p1 = Promise.promise(); + Future f1 = p1.future(); + Promise p2 = Promise.promise(); + Future f2 = p2.future(); CompositeFuture composite = any.apply(f1, f2); Checker checker = new Checker<>(composite); checker.assertNotCompleted(); assertEquals(null, composite.resultAt(0)); assertEquals(null, composite.resultAt(1)); - f1.complete("something"); + p1.complete("something"); checker.assertSucceeded(composite); - f2.complete(3); + p2.complete(3); checker.assertSucceeded(composite); } @@ -408,13 +382,15 @@ public class FutureTest extends VertxTestBase { } private void testAnySucceeded2(BiFunction, Future, CompositeFuture> any) { - Future f1 = Future.future(); - Future f2 = Future.future(); + Promise p1 = Promise.promise(); + Future f1 = p1.future(); + Promise p2 = Promise.promise(); + Future f2 = p2.future(); CompositeFuture composite = any.apply(f1, f2); Checker checker = new Checker<>(composite); - f1.fail("failure"); + p1.fail("failure"); checker.assertNotCompleted(); - f2.complete(3); + p2.complete(3); checker.assertSucceeded(composite); } @@ -429,14 +405,16 @@ public class FutureTest extends VertxTestBase { } private void testAnyFailed(BiFunction, Future, CompositeFuture> any) { - Future f1 = Future.future(); - Future f2 = Future.future(); + Promise p1 = Promise.promise(); + Future f1 = p1.future(); + Promise p2 = Promise.promise(); + Future f2 = p2.future(); CompositeFuture composite = any.apply(f1, f2); Checker checker = new Checker<>(composite); - f1.fail("failure"); + p1.fail("failure"); checker.assertNotCompleted(); Throwable cause = new Exception(); - f2.fail(cause); + p2.fail(cause); checker.assertFailed(cause); } @@ -455,7 +433,7 @@ public class FutureTest extends VertxTestBase { } CompositeFuture composite = CompositeFuture.any(list); Checker checker = new Checker<>(composite); - checker.assertFailed(); + assertNotNull(checker.assertFailed()); for (int i = 0;i < size;i++) { list.clear(); for (int j = 0;j < size;j++) { @@ -485,14 +463,16 @@ public class FutureTest extends VertxTestBase { } private void testJoinSucceeded(BiFunction, Future, CompositeFuture> join) { - Future f1 = Future.future(); - Future f2 = Future.future(); + Promise p1 = Promise.promise(); + Future f1 = p1.future(); + Promise p2 = Promise.promise(); + Future f2 = p2.future(); CompositeFuture composite = join.apply(f1, f2); Checker checker = new Checker<>(composite); checker.assertNotCompleted(); - f1.complete("foo"); + p1.complete("foo"); checker.assertNotCompleted(); - f2.complete(); + p2.complete(); checker.assertSucceeded(composite); } @@ -507,15 +487,17 @@ public class FutureTest extends VertxTestBase { } private void testJoinFailed1(BiFunction, Future, CompositeFuture> join) { - Future f1 = Future.future(); - Future f2 = Future.future(); + Promise p1 = Promise.promise(); + Future f1 = p1.future(); + Promise p2 = Promise.promise(); + Future f2 = p2.future(); CompositeFuture composite = join.apply(f1, f2); Checker checker = new Checker<>(composite); checker.assertNotCompleted(); - f1.complete("foo"); + p1.complete("foo"); checker.assertNotCompleted(); Throwable cause = new Throwable(); - f2.fail(cause); + p2.fail(cause); assertSame(checker.assertFailed(), cause); } @@ -530,15 +512,17 @@ public class FutureTest extends VertxTestBase { } private void testJoinFailed2(BiFunction, Future, CompositeFuture> join) { - Future f1 = Future.future(); - Future f2 = Future.future(); + Promise p1 = Promise.promise(); + Future f1 = p1.future(); + Promise p2 = Promise.promise(); + Future f2 = p2.future(); CompositeFuture composite = join.apply(f1, f2); Checker checker = new Checker<>(composite); checker.assertNotCompleted(); Throwable cause = new Throwable(); - f1.fail(cause); + p1.fail(cause); checker.assertNotCompleted(); - f2.complete(10); + p2.complete(10); assertSame(cause, checker.assertFailed()); } @@ -553,16 +537,18 @@ public class FutureTest extends VertxTestBase { } private void testJoinFailed3(BiFunction, Future, CompositeFuture> join) { - Future f1 = Future.future(); - Future f2 = Future.future(); + Promise p1 = Promise.promise(); + Future f1 = p1.future(); + Promise p2 = Promise.promise(); + Future f2 = p2.future(); CompositeFuture composite = join.apply(f1, f2); Checker checker = new Checker<>(composite); checker.assertNotCompleted(); Throwable cause1 = new Throwable(); - f1.fail(cause1); + p1.fail(cause1); checker.assertNotCompleted(); Throwable cause2 = new Throwable(); - f2.fail(cause2); + p2.fail(cause2); assertSame(cause1, checker.assertFailed()); } @@ -574,38 +560,34 @@ public class FutureTest extends VertxTestBase { @Test public void testCompositeFutureToList() { - Future f1 = Future.future(); - Future f2 = Future.future(); + Promise p1 = Promise.promise(); + Future f1 = p1.future(); + Promise p2 = Promise.promise(); + Future f2 = p2.future(); CompositeFuture composite = CompositeFuture.all(f1, f2); assertEquals(Arrays.asList(null, null), composite.list()); - f1.complete("foo"); + p1.complete("foo"); assertEquals(Arrays.asList("foo", null), composite.list()); - f2.complete(4); + p2.complete(4); assertEquals(Arrays.asList("foo", 4), composite.list()); } @Test public void testComposeSuccessToSuccess() { - Future f1 = Future.future(); - Future f2 = Future.future(); - Checker checker = new Checker<>(f2); - f1.compose(string -> f2.complete(string.length()), f2); - checker.assertNotCompleted(); - f1.complete("abcdef"); - checker.assertSucceeded(6); - AtomicReference ref = new AtomicReference<>(); - Future c = Future.future(); - Future f3 = Future.future(); + Promise p = Promise.promise(); + Future c = p.future(); + Promise p3 = Promise.promise(); + Future f3 = p3.future(); Future f4 = f3.compose(string -> { ref.set(string); return c; }); - checker = new Checker<>(f4); - f3.complete("abcdef"); + Checker checker = new Checker<>(f4); + p3.complete("abcdef"); checker.assertNotCompleted(); assertEquals("abcdef", ref.get()); - c.complete(6); + p.complete(6); checker.assertSucceeded(6); } @@ -613,15 +595,17 @@ public class FutureTest extends VertxTestBase { public void testComposeSuccessToFailure() { Throwable cause = new Throwable(); AtomicReference ref = new AtomicReference<>(); - Future c = Future.future(); - Future f3 = Future.future(); + Promise p = Promise.promise(); + Future c = p.future(); + Promise p3 = Promise.promise(); + Future f3 = p3.future(); Future f4 = f3.compose(string -> { ref.set(string); return c; }); Checker checker = new Checker<>(f4); - f3.complete("abcdef"); - c.fail(cause); + p3.complete("abcdef"); + p.fail(cause); checker.assertFailed(cause); } @@ -629,62 +613,31 @@ public class FutureTest extends VertxTestBase { public void testComposeFailure() { Exception cause = new Exception(); - Future f1 = Future.future(); - Future f2 = Future.future(); - Checker checker = new Checker<>(f2); - f1.compose(string -> f2.complete(string.length()), f2); - f1.fail(cause); - checker.assertFailed(cause); - - Future f3 = Future.future(); + Promise p3 = Promise.promise(); + Future f3 = p3.future(); Future f4 = f3.compose(string -> Future.succeededFuture(string.length())); - checker = new Checker<>(f4); - f3.fail(cause); + Checker checker = new Checker<>(f4); + p3.fail(cause); checker.assertFailed(cause); } @Test public void testComposeFails() { RuntimeException cause = new RuntimeException(); - - Future f1 = Future.future(); - Future f2 = Future.future(); - Checker checker = new Checker<>(f2); - f1.compose(string -> { throw cause; }, f2); - f1.complete("foo"); - checker.assertFailed(cause); - - Future f3 = Future.future(); + Promise p3 = Promise.promise(); + Future f3 = p3.future(); Future f4 = f3.compose(string -> { throw cause; }); - checker = new Checker<>(f4); - f3.complete("foo"); + Checker checker = new Checker<>(f4); + p3.complete("foo"); checker.assertFailed(cause); } - @Test - public void testComposeFailsAfterCompletion() { - Future f1 = Future.future(); - Future f2 = Future.future(); - Checker checker = new Checker<>(f2); - RuntimeException cause = new RuntimeException(); - f1.compose(string -> { - f2.complete(46); - throw cause; - }, f2); - try { - f1.complete("foo"); - fail(); - } catch (Exception e) { - assertEquals(cause, e); - } - checker.assertSucceeded(46); - } - @Test public void testComposeWithNullFunction() { - Future fut = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); try { - fut.compose((Function>) null); + f.compose((Function>) null); fail(); } catch (NullPointerException ignore) { } @@ -692,47 +645,51 @@ public class FutureTest extends VertxTestBase { @Test public void testMapSuccess() { - Future fut = Future.future(); - Future mapped = fut.map(Object::toString); + Promise p = Promise.promise(); + Future f = p.future(); + Future mapped = f.map(Object::toString); Checker checker = new Checker<>(mapped); checker.assertNotCompleted(); - fut.complete(3); + p.complete(3); checker.assertSucceeded("3"); } @Test public void testMapFailure() { Throwable cause = new Throwable(); - Future fut = Future.future(); - Future mapped = fut.map(Object::toString); + Promise p = Promise.promise(); + Future f = p.future(); + Future mapped = f.map(Object::toString); Checker checker = new Checker<>(mapped); checker.assertNotCompleted(); - fut.fail(cause); + p.fail(cause); checker.assertFailed(cause); } @Test public void testMapFails() { RuntimeException cause = new RuntimeException(); - Future fut = Future.future(); - Future mapped = fut.map(i -> { + Promise p = Promise.promise(); + Future f = p.future(); + Future mapped = f.map(i -> { throw cause; }); Checker checker = new Checker<>(mapped); - fut.fail(cause); + p.fail(cause); checker.assertFailed(cause); } @Test public void testMapWithNullFunction() { - Future fut = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); try { - fut.map((Function) null); + f.map((Function) null); fail(); } catch (NullPointerException ignore) { } try { - asyncResult(fut).map((Function) null); + asyncResult(f).map((Function) null); fail(); } catch (NullPointerException ignore) { } @@ -740,25 +697,27 @@ public class FutureTest extends VertxTestBase { @Test public void testMapEmpty() { - Future fut = Future.future(); - Future mapped = fut.mapEmpty(); + Promise p = Promise.promise(); + Future f = p.future(); + Future mapped = f.mapEmpty(); Checker checker = new Checker<>(mapped); checker.assertNotCompleted(); - fut.complete(3); + p.complete(3); checker.assertSucceeded(null); } @Test public void testRecoverSuccessWithSuccess() { AtomicBoolean called = new AtomicBoolean(); - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); Future r = f.recover(t -> { called.set(true); throw new AssertionError(); }); Checker checker = new Checker<>(r); checker.assertNotCompleted(); - f.complete("yeah"); + p.complete("yeah"); assertTrue(r.succeeded()); checker.assertSucceeded("yeah"); assertFalse(called.get()); @@ -766,43 +725,47 @@ public class FutureTest extends VertxTestBase { @Test public void testRecoverFailureWithSuccess() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); Future r = f.recover(t -> Future.succeededFuture(t.getMessage())); Checker checker = new Checker<>(r); checker.assertNotCompleted(); - f.fail("recovered"); + p.fail("recovered"); checker.assertSucceeded("recovered"); } @Test public void testRecoverFailureWithFailure() { Throwable cause = new Throwable(); - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); Future r = f.recover(t -> Future.failedFuture(cause)); Checker checker = new Checker<>(r); checker.assertNotCompleted(); - f.fail("recovered"); + p.fail("recovered"); checker.assertFailed(cause); } @Test public void testRecoverFailureFails() { RuntimeException cause = new RuntimeException("throw"); - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); Future r = f.recover(t -> { throw cause; }); Checker checker = new Checker<>(r); checker.assertNotCompleted(); - f.fail("recovered"); + p.fail("recovered"); checker.assertFailed(cause); } @Test public void testRecoverWithNullFunction() { - Future fut = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); try { - fut.recover(null); + f.recover(null); fail(); } catch (NullPointerException ignore) { } @@ -811,14 +774,15 @@ public class FutureTest extends VertxTestBase { @Test public void testOtherwiseSuccessWithSuccess() { AtomicBoolean called = new AtomicBoolean(); - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); Future r = f.otherwise(t -> { called.set(true); throw new AssertionError(); }); Checker checker = new Checker<>(r); checker.assertNotCompleted(); - f.complete("yeah"); + p.complete("yeah"); assertTrue(r.succeeded()); checker.assertSucceeded("yeah"); assertFalse(called.get()); @@ -826,24 +790,26 @@ public class FutureTest extends VertxTestBase { @Test public void testOtherwiseFailureWithSuccess() { - Future f = Future.future(); - Future r = f.otherwise(t -> t.getMessage()); + Promise p = Promise.promise(); + Future f = p.future(); + Future r = f.otherwise(Throwable::getMessage); Checker checker = new Checker<>(r); checker.assertNotCompleted(); - f.fail("recovered"); + p.fail("recovered"); checker.assertSucceeded("recovered"); } @Test public void testOtherwiseFails() { RuntimeException cause = new RuntimeException("throw"); - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); Future r = f.otherwise(t -> { throw cause; }); Checker checker = new Checker<>(r); checker.assertNotCompleted(); - f.fail("recovered"); + p.fail("recovered"); checker.assertFailed(cause); } @@ -1000,7 +966,8 @@ public class FutureTest extends VertxTestBase { @Test public void testUncompletedAsyncResultMap() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); AsyncResult map1 = res.map(String::length); AsyncResult map2 = res.map(17); @@ -1012,11 +979,12 @@ public class FutureTest extends VertxTestBase { @Test public void testSucceededAsyncResultMap() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); AsyncResult map1 = res.map(String::length); AsyncResult map2 = res.map(17); - f.complete("foobar"); + p.complete("foobar"); assertEquals(6, (int)map1.result()); assertNull(map1.cause()); assertEquals(17, (int)map2.result()); @@ -1025,12 +993,13 @@ public class FutureTest extends VertxTestBase { @Test public void testFailedAsyncResultMap() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); AsyncResult map1 = res.map(String::length); AsyncResult map2 = res.map(17); Throwable cause = new Throwable(); - f.fail(cause); + p.fail(cause); assertNull(map1.result()); assertSame(cause, map1.cause()); assertNull(map2.result()); @@ -1039,53 +1008,59 @@ public class FutureTest extends VertxTestBase { @Test public void testAsyncResultMapEmpty() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); AsyncResult map = res.mapEmpty(); - f.complete("foobar"); + p.complete("foobar"); assertNull(null, map.result()); assertNull(map.cause()); } @Test public void testSucceededFutureRecover() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); Future r = f.recover(t -> Future.succeededFuture(t.getMessage())); - f.complete("yeah"); + p.complete("yeah"); assertTrue(r.succeeded()); assertEquals(r.result(), "yeah"); } @Test public void testFailedFutureRecover() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); Future r = f.recover(t -> Future.succeededFuture(t.getMessage())); - f.fail("recovered"); + p.fail("recovered"); assertTrue(r.succeeded()); assertEquals(r.result(), "recovered"); } @Test public void testFailedMapperFutureRecover() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); Future r = f.recover(t -> { throw new RuntimeException("throw"); }); - f.fail("recovered"); + p.fail("recovered"); assertTrue(r.failed()); assertEquals(r.cause().getMessage(), "throw"); } @Test public void testUncompletedAsyncResultOtherwise() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); testUncompletedAsyncResultOtherwise(res); } @Test public void testUncompletedFutureOtherwise() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); testUncompletedAsyncResultOtherwise(f); } @@ -1099,14 +1074,16 @@ public class FutureTest extends VertxTestBase { @Test public void testUncompletedAsyncResultOtherwiseApplyFunction() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); testUncompletedOtherwiseApplyFunction(res); } @Test public void testUncompletedFutureOtherwiseApplyFunction() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); testUncompletedOtherwiseApplyFunction(f); } @@ -1120,20 +1097,22 @@ public class FutureTest extends VertxTestBase { @Test public void testSucceededAsyncResultOtherwise() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); - testSucceededOtherwise(res, f); + testSucceededOtherwise(res, p); } @Test public void testSucceededFutureOtherwise() { - Future f = Future.future(); - testSucceededOtherwise(f, f); + Promise p = Promise.promise(); + Future f = p.future(); + testSucceededOtherwise(f, p); } - private void testSucceededOtherwise(AsyncResult res, Future f) { + private void testSucceededOtherwise(AsyncResult res, Promise p) { AsyncResult ar = res.otherwise(Throwable::getMessage); - f.complete("foobar"); + p.complete("foobar"); assertTrue(ar.succeeded()); assertFalse(ar.failed()); assertEquals("foobar", ar.result()); @@ -1142,20 +1121,22 @@ public class FutureTest extends VertxTestBase { @Test public void testSucceededAsyncResultOtherwiseApplyFunction() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); - testSucceededOtherwiseApplyFunction(res, f); + testSucceededOtherwiseApplyFunction(res, p); } @Test public void testSucceededFutureOtherwiseApplyFunction() { - Future f = Future.future(); - testSucceededOtherwiseApplyFunction(f, f); + Promise p = Promise.promise(); + Future f = p.future(); + testSucceededOtherwiseApplyFunction(f, p); } - private void testSucceededOtherwiseApplyFunction(AsyncResult res, Future f) { + private void testSucceededOtherwiseApplyFunction(AsyncResult res, Promise p) { AsyncResult ar = res.otherwise("whatever"); - f.complete("foobar"); + p.complete("foobar"); assertTrue(ar.succeeded()); assertFalse(ar.failed()); assertEquals("foobar", ar.result()); @@ -1164,21 +1145,23 @@ public class FutureTest extends VertxTestBase { @Test public void testFailedAsyncResultOtherwise() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); - testFailedOtherwise(res, f); + testFailedOtherwise(res, p); } @Test public void testFailedFutureOtherwise() { - Future f = Future.future(); - testFailedOtherwise(f, f); + Promise p = Promise.promise(); + Future f = p.future(); + testFailedOtherwise(f, p); } - private void testFailedOtherwise(AsyncResult res, Future f) { + private void testFailedOtherwise(AsyncResult res, Promise p) { AsyncResult map1 = res.otherwise("something-else"); Throwable cause = new Throwable("the-failure"); - f.fail(cause); + p.fail(cause); assertTrue(map1.succeeded()); assertFalse(map1.failed()); assertEquals("something-else", map1.result()); @@ -1187,21 +1170,23 @@ public class FutureTest extends VertxTestBase { @Test public void testFailedAsyncResultOtherwiseApplyFunction() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); - testFailedOtherwiseApplyFunction(res, f); + testFailedOtherwiseApplyFunction(res, p); } @Test public void testFailedFutureOtherwiseApplyFunction() { - Future f = Future.future(); - testFailedOtherwiseApplyFunction(f, f); + Promise p = Promise.promise(); + Future f = p.future(); + testFailedOtherwiseApplyFunction(f, p); } - private void testFailedOtherwiseApplyFunction(AsyncResult res, Future f) { + private void testFailedOtherwiseApplyFunction(AsyncResult res, Promise p) { AsyncResult map1 = res.otherwise(Throwable::getMessage); Throwable cause = new Throwable("the-failure"); - f.fail(cause); + p.fail(cause); assertTrue(map1.succeeded()); assertFalse(map1.failed()); assertEquals("the-failure", map1.result()); @@ -1210,7 +1195,8 @@ public class FutureTest extends VertxTestBase { @Test public void testOtherwiseWithNullFunction() { - Future fut = Future.future(); + Promise p = Promise.promise(); + Future fut = p.future(); try { fut.otherwise((Function) null); fail(); @@ -1225,54 +1211,60 @@ public class FutureTest extends VertxTestBase { @Test public void testAsyncResultOtherwiseEmpty() { - Future f = Future.future(); + Promise p = Promise.promise(); + Future f = p.future(); AsyncResult res = asyncResult(f); - testOtherwiseEmpty(res, f); + testOtherwiseEmpty(res, p); } @Test public void testFutureOtherwiseEmpty() { - Future f = Future.future(); - testOtherwiseEmpty(f, f); + Promise p = Promise.promise(); + Future f = p.future(); + testOtherwiseEmpty(f, p); } @Test public void testToString() { - assertEquals("Future{unresolved}", Future.future().toString()); + assertEquals("Future{unresolved}", Promise.promise().future().toString()); assertEquals("Future{result=abc}", Future.succeededFuture("abc").toString()); assertEquals("Future{cause=It's like that, and that's the way it is}", Future.failedFuture("It's like that, and that's the way it is").toString()); - Future f = Future.future(); - f.complete("abc"); + Promise p = Promise.promise(); + Future f = p.future(); + p.complete("abc"); assertEquals("Future{result=abc}", f.toString()); - f = Future.future(); - f.fail("abc"); + p = Promise.promise(); + f = p.future(); + p.fail("abc"); assertEquals("Future{cause=abc}", f.toString()); } @Test public void testReleaseHandlerAfterCompletion() throws Exception { - Future f = Future.future(); + Promise promise = Promise.promise(); + Future f = promise.future(); Field handlerField = f.getClass().getDeclaredField("handler"); handlerField.setAccessible(true); f.setHandler(ar -> {}); - f.complete(); + promise.complete(); assertNull(handlerField.get(f)); f.setHandler(ar -> {}); assertNull(handlerField.get(f)); - f = Future.future(); + promise = Promise.promise(); + f = promise.future(); f.setHandler(ar -> {}); - f.fail("abc"); + promise.fail("abc"); assertNull(handlerField.get(f)); f.setHandler(ar -> {}); assertNull(handlerField.get(f)); } - private void testOtherwiseEmpty(AsyncResult res, Future f) { + private void testOtherwiseEmpty(AsyncResult res, Promise p) { AsyncResult otherwise = res.otherwiseEmpty(); Throwable cause = new Throwable("the-failure"); - f.fail(cause); + p.fail(cause); assertTrue(otherwise.succeeded()); assertFalse(otherwise.failed()); assertEquals(null, otherwise.result()); diff --git a/src/test/java/io/vertx/core/NamedWorkerPoolTest.java b/src/test/java/io/vertx/core/NamedWorkerPoolTest.java index 9bebee3d6..50a69d1e5 100644 --- a/src/test/java/io/vertx/core/NamedWorkerPoolTest.java +++ b/src/test/java/io/vertx/core/NamedWorkerPoolTest.java @@ -53,7 +53,7 @@ public class NamedWorkerPoolTest extends VertxTestBase { .setMaxWorkerExecuteTime(maxWorkerExecuteTime); vertx.deployVerticle(new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { vertx.executeBlocking(fut -> { try { SECONDS.sleep(5); diff --git a/src/test/java/io/vertx/core/VerticleFactoryTest.java b/src/test/java/io/vertx/core/VerticleFactoryTest.java index f96a2e237..62d8ca399 100644 --- a/src/test/java/io/vertx/core/VerticleFactoryTest.java +++ b/src/test/java/io/vertx/core/VerticleFactoryTest.java @@ -256,7 +256,7 @@ public class VerticleFactoryTest extends VertxTestBase { return true; } @Override - public void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Future resolution) { + public void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Promise resolution) { vertx.runOnContext(v -> { // Async resolution resolution.complete("whatever"); @@ -585,7 +585,7 @@ public class VerticleFactoryTest extends VertxTestBase { } @Override - public void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Future resolution) { + public void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Promise resolution) { if (failInResolve) { resolution.fail(new IOException("whatever")); } else { diff --git a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTestBase.java b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTestBase.java index 370cf9d12..1aae669bc 100644 --- a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTestBase.java +++ b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTestBase.java @@ -220,7 +220,7 @@ public class ClusteredEventBusTestBase extends EventBusTestBase { boolean unregisterCalled; @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { EventBus eventBus = getVertx().eventBus(); MessageConsumer consumer = eventBus.consumer("whatever"); consumer.handler(m -> { diff --git a/src/test/java/io/vertx/core/file/FileSystemTest.java b/src/test/java/io/vertx/core/file/FileSystemTest.java index 7cbe8ece5..e2ae33313 100644 --- a/src/test/java/io/vertx/core/file/FileSystemTest.java +++ b/src/test/java/io/vertx/core/file/FileSystemTest.java @@ -16,6 +16,7 @@ import io.netty.buffer.Unpooled; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.impl.AsyncFileImpl; @@ -1238,7 +1239,7 @@ public class FileSystemTest extends VertxTestBase { @Override Future handle(ReadStream stream) { - Future fut = Future.future(); + Promise fut = Promise.promise(); assert flowing.getAndSet(false); stream.pause(); Vertx.currentContext().owner().setTimer(1, id -> { @@ -1246,7 +1247,7 @@ public class FileSystemTest extends VertxTestBase { stream.resume(); fut.complete(); }); - return fut; + return fut.future(); } }, @@ -1259,14 +1260,14 @@ public class FileSystemTest extends VertxTestBase { } @Override Future handle(ReadStream stream) { - Future fut = Future.future(); + Promise fut = Promise.promise(); assert fetching.getAndSet(false); Vertx.currentContext().owner().setTimer(1, id -> { assert !fetching.getAndSet(true); stream.fetch(1); fut.complete(); }); - return fut; + return fut.future(); } }; diff --git a/src/test/java/io/vertx/core/http/Http2ClientTest.java b/src/test/java/io/vertx/core/http/Http2ClientTest.java index 58b1680b3..be241465a 100644 --- a/src/test/java/io/vertx/core/http/Http2ClientTest.java +++ b/src/test/java/io/vertx/core/http/Http2ClientTest.java @@ -35,6 +35,7 @@ import io.vertx.core.DeploymentOptions; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.Verticle; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -79,9 +80,9 @@ public class Http2ClientTest extends Http2TestBase { io.vertx.core.http.Http2Settings updatedSettings = TestUtils.randomHttp2Settings(); updatedSettings.setHeaderTableSize(initialSettings.getHeaderTableSize()); // Otherwise it raise "invalid max dynamic table size" in Netty AtomicInteger count = new AtomicInteger(); - Future end = Future.future(); + Promise end = Promise.promise(); server.requestHandler(req -> { - end.setHandler(v -> { + end.future().setHandler(v -> { req.response().end(); }); }).connectionHandler(conn -> { @@ -463,7 +464,7 @@ public class Http2ClientTest extends Http2TestBase { public void testClientResponsePauseResume() throws Exception { String content = TestUtils.randomAlphaString(1024); Buffer expected = Buffer.buffer(); - Future whenFull = Future.future(); + Promise whenFull = Promise.promise(); AtomicBoolean drain = new AtomicBoolean(); server.requestHandler(req -> { HttpServerResponse resp = req.response(); @@ -493,7 +494,7 @@ public class Http2ClientTest extends Http2TestBase { Buffer received = Buffer.buffer(); resp.pause(); resp.handler(buff -> { - if (whenFull.isComplete()) { + if (whenFull.future().isComplete()) { assertSame(ctx, Vertx.currentContext()); } else { assertOnIOContext(ctx); @@ -504,7 +505,7 @@ public class Http2ClientTest extends Http2TestBase { assertEquals(expected.toString().length(), received.toString().length()); testComplete(); }); - whenFull.setHandler(v -> { + whenFull.future().setHandler(v -> { resp.resume(); }); })); @@ -640,9 +641,9 @@ public class Http2ClientTest extends Http2TestBase { public void testServerResetClientStreamDuringResponse() throws Exception { waitFor(2); String chunk = TestUtils.randomAlphaString(1024); - Future doReset = Future.future(); + Promise doReset = Promise.promise(); server.requestHandler(req -> { - doReset.setHandler(onSuccess(v -> { + doReset.future().setHandler(onSuccess(v -> { req.response().reset(8); })); req.response().setChunked(true).write(Buffer.buffer(chunk)); @@ -669,7 +670,7 @@ public class Http2ClientTest extends Http2TestBase { @Test public void testClientResetServerStreamDuringRequest() throws Exception { - Future bufReceived = Future.future(); + Promise bufReceived = Promise.promise(); server.requestHandler(req -> { req.handler(buf -> { bufReceived.complete(); @@ -695,7 +696,7 @@ public class Http2ClientTest extends Http2TestBase { HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { fail(); }).setChunked(true).write(Buffer.buffer("hello")); - bufReceived.setHandler(ar -> { + bufReceived.future().setHandler(ar -> { req.reset(10); }); await(); diff --git a/src/test/java/io/vertx/core/http/Http2ServerTest.java b/src/test/java/io/vertx/core/http/Http2ServerTest.java index 546851766..d6b38f087 100644 --- a/src/test/java/io/vertx/core/http/Http2ServerTest.java +++ b/src/test/java/io/vertx/core/http/Http2ServerTest.java @@ -49,6 +49,7 @@ import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.impl.Http1xOrH2CHandler; @@ -806,7 +807,7 @@ public class Http2ServerTest extends Http2TestBase { Context ctx = vertx.getOrCreateContext(); String content = TestUtils.randomAlphaString(1024); StringBuilder expected = new StringBuilder(); - Future whenFull = Future.future(); + Promise whenFull = Promise.promise(); AtomicBoolean drain = new AtomicBoolean(); server.requestHandler(req -> { WriteStream stream = streamProvider.apply(req); @@ -859,7 +860,7 @@ public class Http2ServerTest extends Http2TestBase { } } }); - whenFull.setHandler(ar -> { + whenFull.future().setHandler(ar -> { request.context.executor().execute(() -> { try { request.decoder.flowController().consumeBytes(request.connection.stream(id), toAck.intValue()); @@ -964,7 +965,7 @@ public class Http2ServerTest extends Http2TestBase { @Test public void testClientResetServerStream() throws Exception { Context ctx = vertx.getOrCreateContext(); - Future bufReceived = Future.future(); + Promise bufReceived = Promise.promise(); AtomicInteger resetCount = new AtomicInteger(); server.requestHandler(req -> { req.handler(buf -> { @@ -994,7 +995,7 @@ public class Http2ServerTest extends Http2TestBase { Http2ConnectionEncoder encoder = request.encoder; encoder.writeHeaders(request.context, id, GET("/"), 0, false, request.context.newPromise()); encoder.writeData(request.context, id, Buffer.buffer("hello").getByteBuf(), 0, false, request.context.newPromise()); - bufReceived.setHandler(ar -> { + bufReceived.future().setHandler(ar -> { encoder.writeRstStream(request.context, id, 10, request.context.newPromise()); request.context.flush(); }); @@ -1468,7 +1469,7 @@ public class Http2ServerTest extends Http2TestBase { @Test public void testStreamError() throws Exception { waitFor(2); - Future when = Future.future(); + Promise when = Promise.promise(); Context ctx = vertx.getOrCreateContext(); server.requestHandler(req -> { AtomicInteger reqErrors = new AtomicInteger(); @@ -1501,7 +1502,7 @@ public class Http2ServerTest extends Http2TestBase { Http2ConnectionEncoder encoder = request.encoder; encoder.writeHeaders(request.context, id, GET("/"), 0, false, request.context.newPromise()); request.context.flush(); - when.setHandler(ar -> { + when.future().setHandler(ar -> { // Send a corrupted frame on purpose to check we get the corresponding error in the request exception handler // the error is : greater padding value 0c -> 1F // ChannelFuture a = encoder.frameWriter().writeData(request.context, id, Buffer.buffer("hello").getByteBuf(), 12, false, request.context.newPromise()); @@ -1522,7 +1523,7 @@ public class Http2ServerTest extends Http2TestBase { public void testPromiseStreamError() throws Exception { Context ctx = vertx.getOrCreateContext(); waitFor(2); - Future when = Future.future(); + Promise when = Promise.promise(); server.requestHandler(req -> { req.response().push(HttpMethod.GET, "/wibble", ar -> { assertTrue(ar.succeeded()); @@ -1552,7 +1553,7 @@ public class Http2ServerTest extends Http2TestBase { request.decoder.frameListener(new Http2EventAdapter() { @Override public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception { - when.setHandler(ar -> { + when.future().setHandler(ar -> { Http2ConnectionEncoder encoder = request.encoder; encoder.frameWriter().writeHeaders(request.context, promisedStreamId, GET("/"), 0, false, request.context.newPromise()); request.context.flush(); @@ -1572,7 +1573,7 @@ public class Http2ServerTest extends Http2TestBase { public void testConnectionDecodeError() throws Exception { Context ctx = vertx.getOrCreateContext(); waitFor(3); - Future when = Future.future(); + Promise when = Promise.promise(); server.requestHandler(req -> { AtomicInteger reqFailures = new AtomicInteger(); AtomicInteger respFailures = new AtomicInteger(); @@ -1612,7 +1613,7 @@ public class Http2ServerTest extends Http2TestBase { ChannelFuture fut = client.connect(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, request -> { int id = request.nextStreamId(); Http2ConnectionEncoder encoder = request.encoder; - when.setHandler(ar -> { + when.future().setHandler(ar -> { // Send a stream ID that does not exists encoder.frameWriter().writeRstStream(request.context, 10, 0, request.context.newPromise()); request.context.flush(); @@ -1859,7 +1860,7 @@ public class Http2ServerTest extends Http2TestBase { @Test public void testClientSendGoAwayNoError() throws Exception { - Future abc = Future.future(); + Promise abc = Promise.promise(); Context ctx = vertx.getOrCreateContext(); Handler requestHandler = req -> { HttpConnection conn = req.connection(); @@ -1892,7 +1893,7 @@ public class Http2ServerTest extends Http2TestBase { int id = request.nextStreamId(); encoder.writeHeaders(request.context, id, GET("/"), 0, true, request.context.newPromise()); request.context.flush(); - abc.setHandler(ar -> { + abc.future().setHandler(ar -> { encoder.writeGoAway(request.context, id, 0, Unpooled.EMPTY_BUFFER, request.context.newPromise()); request.context.flush(); }); @@ -1903,7 +1904,7 @@ public class Http2ServerTest extends Http2TestBase { @Test public void testClientSendGoAwayInternalError() throws Exception { - Future abc = Future.future(); + Promise abc = Promise.promise(); Context ctx = vertx.getOrCreateContext(); Handler requestHandler = req -> { HttpConnection conn = req.connection(); @@ -1931,7 +1932,7 @@ public class Http2ServerTest extends Http2TestBase { int id = request.nextStreamId(); encoder.writeHeaders(request.context, id, GET("/"), 0, true, request.context.newPromise()); request.context.flush(); - abc.setHandler(ar -> { + abc.future().setHandler(ar -> { encoder.writeGoAway(request.context, id, 3, Unpooled.EMPTY_BUFFER, request.context.newPromise()); request.context.flush(); }); @@ -2602,20 +2603,23 @@ public class Http2ServerTest extends Http2TestBase { setUseAlpn(false). setSsl(false). setInitialSettings(new io.vertx.core.http.Http2Settings().setMaxConcurrentStreams(10000))); - doRequest(method, expected, conn -> clientConnectionCount.incrementAndGet(), - Future.future().setHandler(onSuccess(resp -> { - assertEquals(HttpVersion.HTTP_2, resp.version()); - // assertEquals(20000, req.connection().remoteSettings().getMaxConcurrentStreams()); - assertEquals(1, serverConnectionCount.get()); - assertEquals(1, clientConnectionCount.get()); - doRequest(method, expected, null, Future.future().setHandler(onSuccess(resp2 -> { - testComplete(); - }))); - }))); + Promise p1 = Promise.promise(); + p1.future().setHandler(onSuccess(resp -> { + assertEquals(HttpVersion.HTTP_2, resp.version()); + // assertEquals(20000, req.connection().remoteSettings().getMaxConcurrentStreams()); + assertEquals(1, serverConnectionCount.get()); + assertEquals(1, clientConnectionCount.get()); + Promise p2 = Promise.promise(); + p2.future().setHandler(onSuccess(resp2 -> { + testComplete(); + })); + doRequest(method, expected, null, p2); + })); + doRequest(method, expected, conn -> clientConnectionCount.incrementAndGet(), p1); await(); } - private void doRequest(HttpMethod method, Buffer expected, Handler connHandler, Future fut) { + private void doRequest(HttpMethod method, Buffer expected, Handler connHandler, Promise fut) { HttpClientRequest req = client.request(method, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", onSuccess(resp -> { assertEquals(HttpVersion.HTTP_2, resp.version()); // assertEquals(20000, req.connection().remoteSettings().getMaxConcurrentStreams()); diff --git a/src/test/java/io/vertx/core/http/HttpServerCloseHookTest.java b/src/test/java/io/vertx/core/http/HttpServerCloseHookTest.java index 224837be9..e771f11e0 100644 --- a/src/test/java/io/vertx/core/http/HttpServerCloseHookTest.java +++ b/src/test/java/io/vertx/core/http/HttpServerCloseHookTest.java @@ -12,7 +12,7 @@ package io.vertx.core.http; import io.vertx.core.AbstractVerticle; -import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.PfxOptions; import io.vertx.test.core.VertxTestBase; @@ -34,7 +34,7 @@ public class HttpServerCloseHookTest extends VertxTestBase { private static class TestVerticle extends AbstractVerticle { @Override - public void start(Future startFuture) { + public void start(Promise startFuture) { HttpServerOptions invalidOptions = new HttpServerOptions() .setSsl(true) .setPfxTrustOptions(new PfxOptions().setValue(Buffer.buffer("boom"))); diff --git a/src/test/java/io/vertx/core/http/HttpTest.java b/src/test/java/io/vertx/core/http/HttpTest.java index 8d1c11e02..6470a696c 100644 --- a/src/test/java/io/vertx/core/http/HttpTest.java +++ b/src/test/java/io/vertx/core/http/HttpTest.java @@ -2137,13 +2137,13 @@ public abstract class HttpTest extends HttpTestBase { await(); } - private void pausingServer(Consumer> consumer) { - Future resumeFuture = Future.future(); + private void pausingServer(Consumer> consumer) { + Promise resumeFuture = Promise.promise(); server.requestHandler(req -> { req.response().setChunked(true); req.pause(); Context ctx = vertx.getOrCreateContext(); - resumeFuture.setHandler(v1 -> { + resumeFuture.future().setHandler(v1 -> { ctx.runOnContext(v2 -> { req.resume(); }); @@ -2170,7 +2170,7 @@ public abstract class HttpTest extends HttpTestBase { private void drainingServer(Consumer> consumer) { - Future resumeFuture = Future.future(); + Promise resumeFuture = Promise.promise(); server.requestHandler(req -> { req.response().setChunked(true); @@ -2194,7 +2194,7 @@ public abstract class HttpTest extends HttpTestBase { }); }); - server.listen(testAddress, onSuccess(s -> consumer.accept(resumeFuture))); + server.listen(testAddress, onSuccess(s -> consumer.accept(resumeFuture.future()))); } @Test @@ -4014,14 +4014,14 @@ public abstract class HttpTest extends HttpTestBase { }); startServer(server2); client.redirectHandler(resp -> { - Future fut = Future.future(); + Promise fut = Promise.promise(); vertx.setTimer(25, id -> { HttpClientRequest req = client.getAbs(scheme + "://localhost:" + port + "/custom"); req.putHeader("foo", "foo_another"); req.setHost("localhost:" + port); fut.complete(req); }); - return fut; + return fut.future(); }); client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", onSuccess(resp -> { assertEquals(scheme + "://localhost:" + port + "/custom", resp.request().absoluteURI()); diff --git a/src/test/java/io/vertx/core/http/WebSocketTest.java b/src/test/java/io/vertx/core/http/WebSocketTest.java index fd933d013..8eeb65058 100644 --- a/src/test/java/io/vertx/core/http/WebSocketTest.java +++ b/src/test/java/io/vertx/core/http/WebSocketTest.java @@ -22,6 +22,7 @@ import io.vertx.core.Context; import io.vertx.core.DeploymentOptions; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; @@ -1285,7 +1286,7 @@ public class WebSocketTest extends VertxTestBase { public void testAsyncAccept() { AtomicBoolean resolved = new AtomicBoolean(); server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT)).websocketHandler(ws -> { - Future fut = Future.future(); + Promise fut = Promise.promise(); ws.setHandshake(fut); try { ws.accept(); @@ -1316,11 +1317,11 @@ public class WebSocketTest extends VertxTestBase { @Test public void testCloseAsyncPending() { server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT)).websocketHandler(ws -> { - Future fut = Future.future(); - ws.setHandshake(fut); + Promise promise = Promise.promise(); + ws.setHandshake(promise); ws.close(); - assertTrue(fut.isComplete()); - assertEquals(101, (int)fut.result()); + assertTrue(promise.future().isComplete()); + assertEquals(101, (int)promise.future().result()); }); server.listen(onSuccess(s -> { client.webSocket(DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/some/path", onSuccess(ws -> { @@ -2877,7 +2878,7 @@ public class WebSocketTest extends VertxTestBase { @Test public void testDrainServerWebSocket() { - Future resume = Future.future(); + Promise resume = Promise.promise(); server = vertx.createHttpServer() .websocketHandler(ws -> { fillQueue(ws, v1 -> { @@ -2889,7 +2890,7 @@ public class WebSocketTest extends VertxTestBase { }).listen(DEFAULT_HTTP_PORT, onSuccess(v1 -> { client.webSocket(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/someuri", onSuccess(ws -> { ws.pause(); - resume.setHandler(onSuccess(v2 -> { + resume.future().setHandler(onSuccess(v2 -> { ws.resume(); })); })); @@ -2899,11 +2900,11 @@ public class WebSocketTest extends VertxTestBase { @Test public void testDrainClientWebSocket() { - Future resume = Future.future(); + Promise resume = Promise.promise(); server = vertx.createHttpServer() .websocketHandler(ws -> { ws.pause(); - resume.setHandler(onSuccess(v2 -> { + resume.future().setHandler(onSuccess(v2 -> { ws.resume(); })); }).listen(DEFAULT_HTTP_PORT, onSuccess(v1 -> { diff --git a/src/test/java/io/vertx/core/net/ConnectionPoolTest.java b/src/test/java/io/vertx/core/net/ConnectionPoolTest.java index 133893169..47a9fa7f8 100644 --- a/src/test/java/io/vertx/core/net/ConnectionPoolTest.java +++ b/src/test/java/io/vertx/core/net/ConnectionPoolTest.java @@ -567,7 +567,10 @@ public class ConnectionPoolTest extends VertxTestBase { @Override public void connect(ConnectionListener listener, ContextInternal context, Handler>> handler) { int i = ThreadLocalRandom.current().nextInt(100); - FakeConnection conn = new FakeConnection(context, listener, Future.>future().setHandler(handler)); + Promise> promise = Promise.promise(); + Future> future = promise.future(); + future.setHandler(handler); + FakeConnection conn = new FakeConnection(context, listener, promise); if (i < 10) { conn.fail(new Exception("Could not connect")); } else { @@ -844,13 +847,13 @@ public class ConnectionPoolTest extends VertxTestBase { private final ContextInternal context; private final ConnectionListener listener; - private final Future> future; + private final Promise> future; private long inflight; private long concurrency = 1; private int status = DISCONNECTED; - FakeConnection(ContextInternal context, ConnectionListener listener, Future> future) { + FakeConnection(ContextInternal context, ConnectionListener listener, Promise> future) { this.context = context; this.listener = listener; this.future = future; @@ -955,7 +958,9 @@ public class ConnectionPoolTest extends VertxTestBase { @Override public void connect(ConnectionListener listener, ContextInternal context, Handler>> handler) { - pendingRequests.add(new FakeConnection(context, listener, Future.>future().setHandler(handler))); + Promise> promise = Promise.promise(); + promise.future().setHandler(handler); + pendingRequests.add(new FakeConnection(context, listener, promise)); } } } diff --git a/src/test/java/io/vertx/core/net/NetTest.java b/src/test/java/io/vertx/core/net/NetTest.java index 113ec0701..7a3770e35 100755 --- a/src/test/java/io/vertx/core/net/NetTest.java +++ b/src/test/java/io/vertx/core/net/NetTest.java @@ -3091,7 +3091,7 @@ public class NetTest extends VertxTestBase { String expected = TestUtils.randomAlphaString(2000); vertx.deployVerticle(new AbstractVerticle() { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { NetServer server = vertx.createNetServer(); server.connectHandler(so -> { Buffer received = Buffer.buffer(); diff --git a/src/test/java/io/vertx/core/shareddata/AsyncMapTest.java b/src/test/java/io/vertx/core/shareddata/AsyncMapTest.java index 53906e753..2b664b729 100644 --- a/src/test/java/io/vertx/core/shareddata/AsyncMapTest.java +++ b/src/test/java/io/vertx/core/shareddata/AsyncMapTest.java @@ -13,6 +13,7 @@ package io.vertx.core.shareddata; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonArray; @@ -684,11 +685,11 @@ public abstract class AsyncMapTest extends VertxTestBase { protected void loadData(Map map, BiConsumer> test) { List futures = new ArrayList<>(map.size()); map.forEach((key, value) -> { - Future future = Future.future(); + Promise future = Promise.promise(); getVertx().sharedData().getAsyncMap("foo", onSuccess(asyncMap -> { asyncMap.put(key, value, future); })); - futures.add(future); + futures.add(future.future()); }); CompositeFuture.all(futures).setHandler(onSuccess(cf -> { Vertx v = getVertx(); diff --git a/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java b/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java index 88565f284..556bb383e 100644 --- a/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java +++ b/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java @@ -861,7 +861,7 @@ public class MetricsTest extends VertxTestBase { assertThat(metrics.getPoolSize(), is(getOptions().getInternalBlockingPoolSize())); assertThat(metrics.numberOfIdleThreads(), is(getOptions().getWorkerPoolSize())); - Handler> job = getSomeDumbTask(); + Handler> job = getSomeDumbTask(); AtomicInteger counter = new AtomicInteger(); AtomicBoolean hadWaitingQueue = new AtomicBoolean(); @@ -977,7 +977,7 @@ public class MetricsTest extends VertxTestBase { CountDownLatch latch = new CountDownLatch(1); Verticle worker = new AbstractVerticle() { @Override - public void start(Future done) throws Exception { + public void start(Promise done) throws Exception { vertx.eventBus().localConsumer("message", d -> { msg.incrementAndGet(); try { @@ -1043,7 +1043,7 @@ public class MetricsTest extends VertxTestBase { assertThat(metrics.getPoolSize(), is(10)); assertThat(metrics.numberOfIdleThreads(), is(10)); - Handler> job = getSomeDumbTask(); + Handler> job = getSomeDumbTask(); AtomicInteger counter = new AtomicInteger(); AtomicBoolean hadWaitingQueue = new AtomicBoolean(); @@ -1106,7 +1106,7 @@ public class MetricsTest extends VertxTestBase { assertTrue(metrics2.isClosed()); } - private Handler> getSomeDumbTask() { + private Handler> getSomeDumbTask() { return (future) -> { try { Thread.sleep(50); diff --git a/src/test/java/io/vertx/core/streams/WriteStreamTest.java b/src/test/java/io/vertx/core/streams/WriteStreamTest.java index 5233231f2..53b16a744 100644 --- a/src/test/java/io/vertx/core/streams/WriteStreamTest.java +++ b/src/test/java/io/vertx/core/streams/WriteStreamTest.java @@ -14,6 +14,7 @@ import io.vertx.codegen.annotations.Nullable; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.test.core.AsyncTestBase; import org.junit.Test; @@ -38,21 +39,21 @@ public class WriteStreamTest extends AsyncTestBase { static class EndWithItemStreamAsync extends StreamBase { AtomicInteger writeCount = new AtomicInteger(); - Future writeFut = Future.future(); + Promise writeFut = Promise.promise(); AtomicInteger endCount = new AtomicInteger(); - Future endFut = Future.future(); + Promise endFut = Promise.promise(); AtomicInteger resolvedCount = new AtomicInteger(); - Future resolvedFut = Future.future(); + Promise resolvedFut = Promise.promise(); @Override public StreamBase write(Object data, Handler> handler) { writeCount.incrementAndGet(); - writeFut.setHandler(handler); + writeFut.future().setHandler(handler); return this; } @Override public void end(Handler> handler) { endCount.incrementAndGet(); - endFut.setHandler(handler); + endFut.future().setHandler(handler); } public void end(Object item) { end(item, ar -> { @@ -80,8 +81,8 @@ public class WriteStreamTest extends AsyncTestBase { assertEquals(1, src.writeCount.get()); assertEquals(1, src.endCount.get()); assertEquals(1, src.resolvedCount.get()); - assertTrue(src.resolvedFut.succeeded()); - assertNull(src.resolvedFut.result()); + assertTrue(src.resolvedFut.future().succeeded()); + assertNull(src.resolvedFut.future().result()); src = new EndWithItemStreamAsync(); src.end(item); @@ -89,8 +90,8 @@ public class WriteStreamTest extends AsyncTestBase { assertEquals(1, src.writeCount.get()); assertEquals(0, src.endCount.get()); assertEquals(1, src.resolvedCount.get()); - assertTrue(src.resolvedFut.failed()); - assertSame(cause, src.resolvedFut.cause()); + assertTrue(src.resolvedFut.future().failed()); + assertSame(cause, src.resolvedFut.future().cause()); src = new EndWithItemStreamAsync(); src.end(item); @@ -99,8 +100,8 @@ public class WriteStreamTest extends AsyncTestBase { assertEquals(1, src.writeCount.get()); assertEquals(1, src.endCount.get()); assertEquals(1, src.resolvedCount.get()); - assertTrue(src.resolvedFut.failed()); - assertSame(cause, src.resolvedFut.cause()); + assertTrue(src.resolvedFut.future().failed()); + assertSame(cause, src.resolvedFut.future().cause()); } static class EndStreamSync extends StreamBase { diff --git a/src/test/java/io/vertx/test/verticles/FaultToleranceVerticle.java b/src/test/java/io/vertx/test/verticles/FaultToleranceVerticle.java index cb2e8226e..1414834ab 100644 --- a/src/test/java/io/vertx/test/verticles/FaultToleranceVerticle.java +++ b/src/test/java/io/vertx/test/verticles/FaultToleranceVerticle.java @@ -14,6 +14,7 @@ package io.vertx.test.verticles; import io.vertx.core.AbstractVerticle; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.ReplyException; import io.vertx.core.eventbus.ReplyFailure; @@ -41,12 +42,12 @@ public class FaultToleranceVerticle extends AbstractVerticle { numAddresses = config.getInteger("addressesCount"); List registrationFutures = new ArrayList<>(numAddresses); for (int i = 0; i < numAddresses; i++) { - Future registrationFuture = Future.future(); - registrationFutures.add(registrationFuture); + Promise registrationFuture = Promise.promise(); + registrationFutures.add(registrationFuture.future()); vertx.eventBus().consumer(createAddress(id, i), msg -> msg.reply("pong")).completionHandler(registrationFuture); } - Future registrationFuture = Future.future(); - registrationFutures.add(registrationFuture); + Promise registrationFuture = Promise.promise(); + registrationFutures.add(registrationFuture.future()); vertx.eventBus().consumer("ping", this::ping).completionHandler(registrationFuture); CompositeFuture.all(registrationFutures).setHandler(ar -> { if (ar.succeeded()) { diff --git a/src/test/java/io/vertx/test/verticles/SimpleServer.java b/src/test/java/io/vertx/test/verticles/SimpleServer.java index b0f61ce79..24eff2735 100644 --- a/src/test/java/io/vertx/test/verticles/SimpleServer.java +++ b/src/test/java/io/vertx/test/verticles/SimpleServer.java @@ -12,7 +12,7 @@ package io.vertx.test.verticles; import io.vertx.core.AbstractVerticle; -import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; @@ -22,7 +22,7 @@ import io.vertx.core.http.HttpServerOptions; public class SimpleServer extends AbstractVerticle { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { HttpServer server = vertx.createHttpServer(new HttpServerOptions().setPort(8080)); server.requestHandler(req -> req.response().end()); server.listen(res -> { diff --git a/src/test/java/io/vertx/test/verticles/TestVerticle2.java b/src/test/java/io/vertx/test/verticles/TestVerticle2.java index e4402b09e..d61649f5a 100644 --- a/src/test/java/io/vertx/test/verticles/TestVerticle2.java +++ b/src/test/java/io/vertx/test/verticles/TestVerticle2.java @@ -13,7 +13,7 @@ package io.vertx.test.verticles; import io.vertx.core.AbstractVerticle; import io.vertx.core.Context; -import io.vertx.core.Future; +import io.vertx.core.Promise; import java.util.HashSet; import java.util.Set; @@ -38,9 +38,9 @@ public class TestVerticle2 extends AbstractVerticle { } @Override - public void stop(Future stopFuture) throws Exception { + public void stop(Promise stopPromise) throws Exception { vertx.eventBus().send("tvstopped", "stopped", reply -> { - stopFuture.complete(null); + stopPromise.complete(null); }); } } diff --git a/src/test/java/io/vertx/test/verticles/sourceverticle/SourceVerticle.java b/src/test/java/io/vertx/test/verticles/sourceverticle/SourceVerticle.java index 094ed21da..2175e5d65 100644 --- a/src/test/java/io/vertx/test/verticles/sourceverticle/SourceVerticle.java +++ b/src/test/java/io/vertx/test/verticles/sourceverticle/SourceVerticle.java @@ -13,7 +13,7 @@ package io.vertx.test.verticles.sourceverticle; import io.vertx.core.AbstractVerticle; import io.vertx.core.DeploymentOptions; -import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.test.verticles.sourceverticle.somepackage.OtherSourceVerticle; /** @@ -23,7 +23,7 @@ public class SourceVerticle extends AbstractVerticle { @Override - public void start(Future startFuture) throws Exception { + public void start(Promise startFuture) throws Exception { vertx.deployVerticle("java:" + OtherSourceVerticle.class.getName().replace('.', '/') + ".java", new DeploymentOptions(), ar -> { if (ar.succeeded()) { startFuture.complete((Void) null);