This commit is contained in:
Julien Viet
2019-05-26 21:03:40 +02:00
parent d4e4c7ae0f
commit 651da8a679
62 changed files with 836 additions and 779 deletions

View File

@@ -355,12 +355,8 @@ In this example, 3 operations are chained:
When these 3 steps are successful, the final future (`startFuture`) is succeeded. However, if one
of the steps fails, the final future is failed.
This example uses:
* {@link io.vertx.core.Future#compose(java.util.function.Function)}: when the current future completes,
This example uses {@link io.vertx.core.Future#compose(java.util.function.Function)}: when the current future completes,
run the given function, that returns a future. When this returned future completes, it completes the composition.
* {@link io.vertx.core.Future#compose(io.vertx.core.Handler,io.vertx.core.Future)}: when the current future
completes, run the given handler that completes the given `future` (next).
In this second case, the {@link io.vertx.core.Handler} should complete the `next` future to report its success or
failure.

View File

@@ -71,10 +71,10 @@ public class CoreExamples {
}
public void example7(Vertx vertx) {
vertx.executeBlocking(future -> {
vertx.executeBlocking(promise -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
promise.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});
@@ -82,10 +82,10 @@ public class CoreExamples {
public void workerExecutor1(Vertx vertx) {
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(future -> {
executor.executeBlocking(promise -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
promise.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});
@@ -116,11 +116,9 @@ public class CoreExamples {
}
public void exampleFutureAll1(HttpServer httpServer, NetServer netServer) {
Future<HttpServer> httpServerFuture = Future.future();
httpServer.listen(httpServerFuture);
Future<HttpServer> httpServerFuture = Future.future(promise -> httpServer.listen(promise));
Future<NetServer> netServerFuture = Future.future();
netServer.listen(netServerFuture);
Future<NetServer> netServerFuture = Future.future(promise -> netServer.listen(promise));
CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
if (ar.succeeded()) {
@@ -166,22 +164,18 @@ public class CoreExamples {
public void exampleFuture6(Vertx vertx) {
FileSystem fs = vertx.fileSystem();
Future<Void> startFuture = Future.future();
Future<Void> fut1 = Future.future();
fs.createFile("/foo", fut1);
Future<Void> fut1 = Future.future(promise -> fs.createFile("/foo", promise));
fut1.compose(v -> {
Future<Void> startFuture = fut1
.compose(v -> {
// When the file is created (fut1), execute this:
Future<Void> fut2 = Future.future();
fs.writeFile("/foo", Buffer.buffer(), fut2);
return fut2;
}).compose(v -> {
// When the file is written (fut2), execute this:
fs.move("/foo", "/bar", startFuture);
},
// mark startFuture it as failed if any step fails.
startFuture);
return Future.<Void>future(promise -> fs.writeFile("/foo", Buffer.buffer(), promise));
})
.compose(v -> {
// When the file is written (fut2), execute this:
return Future.future(promise -> fs.move("/foo", "/bar", promise));
});
}
public void example7_1(Vertx vertx) {
@@ -200,9 +194,9 @@ public class CoreExamples {
public void multiThreadedWorkerVerticleAlternative2(Vertx vertx, String someresult) {
vertx.eventBus().consumer("foo", msg -> {
vertx.executeBlocking(fut -> {
vertx.executeBlocking(promise -> {
// Invoke blocking code with received message data
fut.complete(someresult);
promise.complete(someresult);
}, false, ar -> { // ordered == false
// Handle result, e.g. reply to the message
});

View File

@@ -15,6 +15,7 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
@@ -739,16 +740,16 @@ public class HTTPExamples {
public void exampleAsynchronousHandshake(HttpServer server) {
server.websocketHandler(websocket -> {
Future<Integer> fut = Future.future();
websocket.setHandshake(fut);
Promise<Integer> promise = Promise.promise();
websocket.setHandshake(promise);
authenticate(websocket, ar -> {
if (ar.succeeded()) {
// Terminate the handshake with the status code 101 (Switching Protocol)
// Reject the handshake with 401 (Unauthorized)
fut.complete(ar.succeeded() ? 101 : 401);
promise.complete(ar.succeeded() ? 101 : 401);
} else {
// Will send a 500 error
fut.fail(ar.cause());
promise.fail(ar.cause());
}
});
});

View File

@@ -21,10 +21,10 @@ import java.util.List;
* <p>
* Instead of implementing {@link io.vertx.core.Verticle} directly, it is often simpler to just extend this class.
* <p>
* In the simplest case, just override the {@link #start} method. If you have verticle clean-up to do you can
* optionally override the {@link #stop} method too.
* In the simplest case, just override the {@link #start(Promise)} method. If you have verticle clean-up to do you can
* optionally override the {@link #stop(Promise)} method too.
* <p>If your verticle does extra start-up or clean-up that takes some time (e.g. it deploys other verticles) then
* you should override the asynchronous {@link #start(Future) start} and {@link #stop(Future) stop} methods.
* you should override the asynchronous {@link #start(Promise) start} and {@link #stop(Promise) stop} methods.
* <p>
* This class also maintains references to the {@link io.vertx.core.Vertx} and {@link io.vertx.core.Context}
* instances of the verticle for easy access.<p>
@@ -102,7 +102,7 @@ public abstract class AbstractVerticle implements Verticle {
* @throws Exception
*/
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
start();
startFuture.complete();
}
@@ -116,7 +116,7 @@ public abstract class AbstractVerticle implements Verticle {
* @throws Exception
*/
@Override
public void stop(Future<Void> stopFuture) throws Exception {
public void stop(Promise<Void> stopFuture) throws Exception {
stop();
stopFuture.complete();
}

View File

@@ -181,20 +181,6 @@ public interface CompositeFuture extends Future<CompositeFuture> {
@Override
CompositeFuture setHandler(Handler<AsyncResult<CompositeFuture>> handler);
/**
* Set this instance as result. Any handler will be called, if there is one, and the future will be marked as completed.
*/
@Override
void complete();
/**
* Try to set this instance as result. When it happens, any handler will be called, if there is one, and the future will be marked as completed.
*
* @return false when the future is already completed
*/
@Override
boolean tryComplete();
/**
* Returns a cause of a wrapped future
*

View File

@@ -108,7 +108,7 @@ public interface Context {
* (e.g. on the original event loop of the caller).
* <p>
* A {@code Future} instance is passed into {@code blockingCodeHandler}. When the blocking code successfully completes,
* the handler should call the {@link Future#complete} or {@link Future#complete(Object)} method, or the {@link Future#fail}
* the handler should call the {@link Promise#complete} or {@link Promise#complete(Object)} method, or the {@link Promise#fail}
* method if it failed.
* <p>
* The blocking code should block for a reasonable amount of time (i.e no more than a few seconds). Long blocking operations
@@ -127,7 +127,7 @@ public interface Context {
* guarantees
* @param <T> the type of the result
*/
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<@Nullable T>> resultHandler);
<T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<@Nullable T>> resultHandler);
/**
* Invoke {@link #executeBlocking(Handler, boolean, Handler)} with order = true.
@@ -135,7 +135,7 @@ public interface Context {
* @param resultHandler handler that will be called when the blocking code is complete
* @param <T> the type of the result
*/
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<@Nullable T>> resultHandler);
<T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, Handler<AsyncResult<@Nullable T>> resultHandler);
/**
* If the context is associated with a Verticle deployment, this returns the deployment ID of that deployment.

View File

@@ -11,7 +11,6 @@
package io.vertx.core;
import io.vertx.codegen.annotations.CacheReturn;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
@@ -26,7 +25,7 @@ import java.util.function.Function;
* @author <a href="http://tfox.org">Tim Fox</a>
*/
@VertxGen
public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
public interface Future<T> extends AsyncResult<T> {
/**
* Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned.
@@ -35,20 +34,10 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
* @param <T> the result type
* @return the future.
*/
static <T> Future<T> future(Handler<Future<T>> handler) {
Future<T> fut = future();
handler.handle(fut);
return fut;
}
/**
* Create a future that hasn't completed yet
*
* @param <T> the result type
* @return the future
*/
static <T> Future<T> future() {
return factory.future();
static <T> Future<T> future(Handler<Promise<T>> handler) {
Promise<T> promise = Promise.promise();
handler.handle(promise);
return promise.future();
}
/**
@@ -121,63 +110,6 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
*/
Handler<AsyncResult<T>> getHandler();
/**
* Set the result. Any handler will be called, if there is one, and the future will be marked as completed.
*
* @param result the result
*/
void complete(T result);
/**
* Set a null result. Any handler will be called, if there is one, and the future will be marked as completed.
*/
void complete();
/**
* Set the failure. Any handler will be called, if there is one, and the future will be marked as completed.
*
* @param cause the failure cause
*/
void fail(Throwable cause);
/**
* Try to set the failure. When it happens, any handler will be called, if there is one, and the future will be marked as completed.
*
* @param failureMessage the failure message
*/
void fail(String failureMessage);
/**
* Set the failure. Any handler will be called, if there is one, and the future will be marked as completed.
*
* @param result the result
* @return false when the future is already completed
*/
boolean tryComplete(T result);
/**
* Try to set the result. When it happens, any handler will be called, if there is one, and the future will be marked as completed.
*
* @return false when the future is already completed
*/
boolean tryComplete();
/**
* Try to set the failure. When it happens, any handler will be called, if there is one, and the future will be marked as completed.
*
* @param cause the failure cause
* @return false when the future is already completed
*/
boolean tryFail(Throwable cause);
/**
* Try to set the failure. When it happens, any handler will be called, if there is one, and the future will be marked as completed.
*
* @param failureMessage the failure message
* @return false when the future is already completed
*/
boolean tryFail(String failureMessage);
/**
* The result of the operation. This will be null if the operation failed.
*
@@ -210,39 +142,6 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
@Override
boolean failed();
/**
* Compose this future with a provided {@code next} future.<p>
*
* When this (the one on which {@code compose} is called) future succeeds, the {@code handler} will be called with
* the completed value, this handler should complete the next future.<p>
*
* If the {@code handler} throws an exception, the returned future will be failed with this exception.<p>
*
* When this future fails, the failure will be propagated to the {@code next} future and the {@code handler}
* will not be called.
*
* @param handler the handler
* @param next the next future
* @return the next future, used for chaining
*/
default <U> Future<U> compose(Handler<T> handler, Future<U> next) {
setHandler(ar -> {
if (ar.succeeded()) {
try {
handler.handle(ar.result());
} catch (Throwable err) {
if (next.isComplete()) {
throw err;
}
next.fail(err);
}
} else {
next.fail(ar.cause());
}
});
return next;
}
/**
* Compose this future with a {@code mapper} function.<p>
*
@@ -262,7 +161,7 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
if (mapper == null) {
throw new NullPointerException();
}
Future<U> ret = Future.future();
Promise<U> ret = Promise.promise();
setHandler(ar -> {
if (ar.succeeded()) {
Future<U> apply;
@@ -277,9 +176,8 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
ret.fail(ar.cause());
}
});
return ret;
return ret.future();
}
/**
* Apply a {@code mapper} function on this future.<p>
*
@@ -298,7 +196,7 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
if (mapper == null) {
throw new NullPointerException();
}
Future<U> ret = Future.future();
Promise<U> ret = Promise.promise();
setHandler(ar -> {
if (ar.succeeded()) {
U mapped;
@@ -313,7 +211,7 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
ret.fail(ar.cause());
}
});
return ret;
return ret.future();
}
/**
@@ -327,7 +225,7 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
* @return the mapped future
*/
default <V> Future<V> map(V value) {
Future<V> ret = Future.future();
Promise<V> ret = Promise.promise();
setHandler(ar -> {
if (ar.succeeded()) {
ret.complete(value);
@@ -335,7 +233,7 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
ret.fail(ar.cause());
}
});
return ret;
return ret.future();
}
/**
@@ -354,15 +252,6 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
return (Future<V>) AsyncResult.super.mapEmpty();
}
/**
* Succeed or fail this future with the {@link AsyncResult} event.
*
* @param asyncResult the async result to handle
*/
@GenIgnore
@Override
void handle(AsyncResult<T> asyncResult);
/**
* Handles a failure of this Future by returning the result of another Future.
* If the mapper fails, then the returned future will be failed with this failure.
@@ -374,7 +263,7 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
if (mapper == null) {
throw new NullPointerException();
}
Future<T> ret = Future.future();
Promise<T> ret = Promise.promise();
setHandler(ar -> {
if (ar.succeeded()) {
ret.complete(result());
@@ -389,9 +278,8 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
mapped.setHandler(ret);
}
});
return ret;
return ret.future();
}
/**
* Apply a {@code mapper} function on this future.<p>
*
@@ -410,7 +298,7 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
if (mapper == null) {
throw new NullPointerException();
}
Future<T> ret = Future.future();
Promise<T> ret = Promise.promise();
setHandler(ar -> {
if (ar.succeeded()) {
ret.complete(result());
@@ -425,7 +313,7 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
ret.complete(value);
}
});
return ret;
return ret.future();
}
/**
@@ -439,7 +327,7 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
* @return the mapped future
*/
default Future<T> otherwise(T value) {
Future<T> ret = Future.future();
Promise<T> ret = Promise.promise();
setHandler(ar -> {
if (ar.succeeded()) {
ret.complete(result());
@@ -447,7 +335,7 @@ public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
ret.complete(value);
}
});
return ret;
return ret.future();
}
/**

View File

@@ -0,0 +1,166 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core;
import io.vertx.codegen.annotations.CacheReturn;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import static io.vertx.core.Future.factory;
/**
* Represents the writable side of an action that may, or may not, have occurred yet.
* <p>
* The {@link #future()} method returns the {@link Future} associated with a promise, the future
* can be used for getting notified of the promise completion and retrieve its value.
* <p>
* A promise extends {@code Handler<AsyncResult<T>>} so it can be used as a callback.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
@VertxGen
public interface Promise<T> extends Handler<AsyncResult<T>> {
/**
* Create a succeeded promise with a {@code null} result
*
* @param <T> the result type
* @return the promise
*/
static <T> Promise<T> succeededPromise() {
return factory.succeededPromise();
}
/**
* Created a succeeded promise with the specified {@code result}.
*
* @param result the result
* @param <T> the result type
* @return the promise
*/
static <T> Promise<T> succeededPromise(T result) {
return factory.succeededPromise(result);
}
/**
* Create a failed promise with the specified failure {@code cause}.
*
* @param cause the failure cause as a Throwable
* @param <T> the result type
* @return the promise
*/
static <T> Promise<T> failedPromise(Throwable cause) {
return factory.failedPromise(cause);
}
/**
* Create a failed promise with the specified {@code failureMessage}.
*
* @param failureMessage the failure message
* @param <T> the result type
* @return the promise
*/
static <T> Promise<T> failedPromise(String failureMessage) {
return factory.failurePromise(failureMessage);
}
/**
* Create a promise that hasn't completed yet
*
* @param <T> the result type
* @return the promise
*/
static <T> Promise<T> promise() {
return factory.promise();
}
/**
* Succeed or fail this promise with the {@link AsyncResult} event.
*
* @param asyncResult the async result to handle
*/
@GenIgnore
@Override
void handle(AsyncResult<T> asyncResult);
/**
* Set the result. Any handler will be called, if there is one, and the promise will be marked as completed.
* <p/>
* Any handler set on the associated promise will be called.
*
* @param result the result
* @throws IllegalStateException when the promise is already completed
*/
void complete(T result);
/**
* Calls {@code complete(null)}
*
* @throws IllegalStateException when the promise is already completed
*/
void complete();
/**
* Set the failure. Any handler will be called, if there is one, and the future will be marked as completed.
*
* @param cause the failure cause
* @throws IllegalStateException when the promise is already completed
*/
void fail(Throwable cause);
/**
* Calls {@link #fail(Throwable)} with the {@code message}.
*
* @param message the failure message
* @throws IllegalStateException when the promise is already completed
*/
void fail(String message);
/**
* Like {@link #complete(Object)} but returns {@code false} when the promise is already completed instead of throwing
* an {@link IllegalStateException}, it returns {@code true} otherwise.
*
* @param result the result
* @return {@code false} when the future is already completed
*/
boolean tryComplete(T result);
/**
* Calls {@code tryComplete(null)}.
*
* @return {@code false} when the future is already completed
*/
boolean tryComplete();
/**
* Like {@link #fail(Throwable)} but returns {@code false} when the promise is already completed instead of throwing
* an {@link IllegalStateException}, it returns {@code true} otherwise.
*
* @param cause the failure cause
* @return {@code false} when the future is already completed
*/
boolean tryFail(Throwable cause);
/**
* Calls {@link #fail(Throwable)} with the {@code message}.
*
* @param message the failure message
* @return false when the future is already completed
*/
boolean tryFail(String message);
/**
* @return the {@link Future} associated with this promise, it can be used to be aware of the promise completion
*/
@CacheReturn
Future<T> future();
}

View File

@@ -26,6 +26,7 @@ package io.vertx.core;
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
@SuppressWarnings( "deprecation" )
public interface Verticle {
/**
@@ -50,22 +51,22 @@ public interface Verticle {
* <p>
* Vert.x calls this method when deploying the instance. You do not call it yourself.
* <p>
* A future is passed into the method, and when deployment is complete the verticle should either call
* {@link io.vertx.core.Future#complete} or {@link io.vertx.core.Future#fail} the future.
* A promise is passed into the method, and when deployment is complete the verticle should either call
* {@link io.vertx.core.Promise#complete} or {@link io.vertx.core.Promise#fail} the future.
*
* @param startFuture the future
* @param startPromise the future
*/
void start(Future<Void> startFuture) throws Exception;
void start(Promise<Void> startPromise) throws Exception;
/**
* Stop the verticle instance.
* <p>
* Vert.x calls this method when un-deploying the instance. You do not call it yourself.
* <p>
* A future is passed into the method, and when un-deployment is complete the verticle should either call
* {@link io.vertx.core.Future#complete} or {@link io.vertx.core.Future#fail} the future.
* A promise is passed into the method, and when un-deployment is complete the verticle should either call
* {@link io.vertx.core.Promise#complete} or {@link io.vertx.core.Promise#fail} the future.
*
* @param stopFuture the future
* @param stopPromise the future
*/
void stop(Future<Void> stopFuture) throws Exception;
void stop(Promise<Void> stopPromise) throws Exception;
}

View File

@@ -503,7 +503,7 @@ public interface Vertx extends Measured {
* (e.g. on the original event loop of the caller).
* <p>
* A {@code Future} instance is passed into {@code blockingCodeHandler}. When the blocking code successfully completes,
* the handler should call the {@link Future#complete} or {@link Future#complete(Object)} method, or the {@link Future#fail}
* the handler should call the {@link Promise#complete} or {@link Promise#complete(Object)} method, or the {@link Promise#fail}
* method if it failed.
* <p>
* In the {@code blockingCodeHandler} the current context remains the original context and therefore any task
@@ -525,12 +525,12 @@ public interface Vertx extends Measured {
* guarantees
* @param <T> the type of the result
*/
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<@Nullable T>> resultHandler);
<T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<@Nullable T>> resultHandler);
/**
* Like {@link #executeBlocking(Handler, boolean, Handler)} called with ordered = true.
*/
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<@Nullable T>> resultHandler);
<T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, Handler<AsyncResult<@Nullable T>> resultHandler);
/**
* Return the Netty EventLoopGroup used by Vert.x

View File

@@ -35,7 +35,7 @@ public interface WorkerExecutor extends Measured {
* (i.e. on the original event loop of the caller).
* <p>
* A {@code Future} instance is passed into {@code blockingCodeHandler}. When the blocking code successfully completes,
* the handler should call the {@link Future#complete} or {@link Future#complete(Object)} method, or the {@link Future#fail}
* the handler should call the {@link Promise#complete} or {@link Promise#complete(Object)} method, or the {@link Promise#fail}
* method if it failed.
* <p>
* In the {@code blockingCodeHandler} the current context remains the original context and therefore any task
@@ -48,12 +48,12 @@ public interface WorkerExecutor extends Measured {
* guarantees
* @param <T> the type of the result
*/
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<@Nullable T>> resultHandler);
<T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<@Nullable T>> resultHandler);
/**
* Like {@link #executeBlocking(Handler, boolean, Handler)} called with ordered = true.
*/
default <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<@Nullable T>> resultHandler) {
default <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, Handler<AsyncResult<@Nullable T>> resultHandler) {
executeBlocking(blockingCodeHandler, true, resultHandler);
}

View File

@@ -236,17 +236,19 @@ public final class DnsClientImpl implements DnsClient {
private class Query<T> {
final DatagramDnsQuery msg;
final Future<List<T>> fut;
final Promise<List<T>> promise;
final String name;
final DnsRecordType[] types;
long timerID;
public Query(String name, DnsRecordType[] types, Handler<AsyncResult<List<T>>> handler) {
Promise<List<T>> promise = Promise.promise();
promise.future().setHandler(handler);
this.msg = new DatagramDnsQuery(null, dnsServer, ThreadLocalRandom.current().nextInt()).setRecursionDesired(options.isRecursionDesired());
for (DnsRecordType type: types) {
msg.addRecord(DnsSection.QUESTION, new DefaultDnsQuestion(name, type, DnsRecord.CLASS_IN));
}
this.fut = Future.<List<T>>future().setHandler(handler);
this.promise = promise;
this.types = types;
this.name = name;
}
@@ -256,7 +258,7 @@ public final class DnsClientImpl implements DnsClient {
if (timerID >= 0) {
vertx.cancelTimer(timerID);
}
fut.tryFail(cause);
promise.tryFail(cause);
}
void handle(DnsResponse msg) {
@@ -279,7 +281,7 @@ public final class DnsClientImpl implements DnsClient {
Collections.sort((List) records);
}
actualCtx.executeFromIO(v -> {
fut.tryComplete(records);
promise.tryComplete(records);
});
} else {
actualCtx.executeFromIO(new DnsException(code), this::fail);

View File

@@ -17,6 +17,7 @@ import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.*;
import io.vertx.core.impl.ContextInternal;
@@ -460,7 +461,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
message.setReplyAddress(replyAddress);
HandlerRegistration<T> registration = new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, src);
ReplyHandler<T> handler = new ReplyHandler<>(registration, timeout);
handler.result.setHandler(replyHandler);
handler.result.future().setHandler(replyHandler);
registration.handler(handler);
return handler;
} else {
@@ -470,13 +471,13 @@ public class EventBusImpl implements EventBus, MetricsProvider {
public class ReplyHandler<T> implements Handler<Message<T>> {
final Future<Message<T>> result;
final Promise<Message<T>> result;
final HandlerRegistration<T> registration;
final long timeoutID;
public Object trace;
ReplyHandler(HandlerRegistration<T> registration, long timeout) {
this.result = Future.future();
this.result = Promise.promise();
this.registration = registration;
this.timeoutID = vertx.setTimer(timeout, id -> {
fail(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + registration.address + ", repliedAddress: " + registration.repliedAddress));

View File

@@ -11,6 +11,22 @@
package io.vertx.core.file.impl;
import io.netty.buffer.ByteBuf;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.FileSystemException;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.streams.impl.InboundBuffer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
@@ -25,21 +41,6 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.FileSystemException;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.streams.impl.InboundBuffer;
/**
*
* This class is optimised for performance when used on the same event loop that is was passed to the handler with.
@@ -398,7 +399,7 @@ public class AsyncFileImpl implements AsyncFile {
private synchronized void doFlush(Handler<AsyncResult<Void>> handler) {
checkClosed();
context.executeBlockingInternal((Future<Void> fut) -> {
context.executeBlockingInternal((Promise<Void> fut) -> {
try {
ch.force(false);
fut.complete();

View File

@@ -12,8 +12,8 @@
package io.vertx.core.file.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.CopyOptions;
@@ -951,7 +951,7 @@ public class FileSystemImpl implements FileSystem {
};
}
protected abstract class BlockingAction<T> implements Handler<Future<T>> {
protected abstract class BlockingAction<T> implements Handler<Promise<T>> {
private final Handler<AsyncResult<T>> handler;
protected final ContextInternal context;
@@ -968,7 +968,7 @@ public class FileSystemImpl implements FileSystem {
}
@Override
public void handle(Future<T> fut) {
public void handle(Promise<T> fut) {
try {
T result = perform();
fut.complete(result);

View File

@@ -19,6 +19,7 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
@@ -165,7 +166,7 @@ public interface ServerWebSocket extends WebSocketBase {
*
* The provided future might be completed by the WebSocket itself, e.g calling the {@link #close()} method
* will try to accept the handshake and close the WebSocket afterward. Thus it is advised to try to complete
* the {@code future} with {@link Future#tryComplete} or {@link Future#tryFail}.
* the {@code future} with {@link Promise#tryComplete} or {@link Promise#tryFail}.
* <p>
* This method should be called from the WebSocket handler to explicitly set an asynchronous handshake.
* <p>
@@ -174,6 +175,13 @@ public interface ServerWebSocket extends WebSocketBase {
* @param future the future to complete with
* @throws IllegalStateException when the WebSocket has already an asynchronous result
*/
void setHandshake(Promise<Integer> future);
/**
* @deprecated instead use {@link #setHandshake(Promise)}
*/
@GenIgnore
@Deprecated
void setHandshake(Future<Integer> future);
/**

View File

@@ -34,6 +34,7 @@ import io.netty.handler.stream.ChunkedWriteHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import java.io.RandomAccessFile;
import java.net.SocketAddress;
@@ -56,7 +57,7 @@ class FileStreamChannel extends AbstractChannel {
private final VertxHttp2Stream stream;
FileStreamChannel(
Future<Long> result,
Promise<Long> result,
VertxHttp2Stream stream,
long offset,
long length) {

View File

@@ -48,7 +48,6 @@ import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static io.vertx.core.http.HttpHeaders.*;
@@ -194,7 +193,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
private final int id;
private final Http1xClientConnection conn;
private final ContextInternal context;
private final Future<HttpClientStream> fut;
private final Promise<HttpClientStream> promise;
private final InboundBuffer<Object> queue;
private HttpClientRequestImpl request;
private HttpClientResponseImpl response;
@@ -208,9 +207,11 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
StreamImpl(ContextInternal context, Http1xClientConnection conn, int id, Handler<AsyncResult<HttpClientStream>> handler) {
this.context = context;
this.conn = conn;
this.fut = Future.<HttpClientStream>future().setHandler(handler);
this.promise = Promise.promise();
this.id = id;
this.queue = new InboundBuffer<>(context, 5);
promise.future().setHandler(handler);
}
private void append(StreamImpl s) {
@@ -517,12 +518,12 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
void handleException(Throwable cause) {
HttpClientRequestImpl request;
HttpClientResponseImpl response;
Future<HttpClientStream> fut;
Promise<HttpClientStream> promise;
boolean requestEnded;
synchronized (conn) {
request = this.request;
response = this.response;
fut = this.fut;
promise = this.promise;
requestEnded = this.requestEnded;
}
if (request != null) {
@@ -535,7 +536,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
response.handleException(cause);
}
} else {
fut.tryFail(cause);
promise.tryFail(cause);
}
}
}
@@ -663,7 +664,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
checkLifecycle();
}
if (next != null) {
next.fut.complete(next);
next.promise.complete(next);
}
}
@@ -848,7 +849,7 @@ class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> impleme
}
requestInProgress = stream;
}
stream.context.dispatch(Future.succeededFuture(stream), stream.fut);
stream.context.dispatch(Future.succeededFuture(stream), stream.promise);
}
private void recycle() {

View File

@@ -24,6 +24,7 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.impl.ContextInternal;
@@ -204,7 +205,7 @@ public class Http2ServerConnection extends Http2ConnectionBase {
private final String uri;
private final String contentEncoding;
private Http2ServerResponseImpl response;
private final Future<HttpServerResponse> completionHandler;
private final Promise<HttpServerResponse> completionHandler;
public Push(Http2Stream stream,
ContextInternal context,
@@ -214,10 +215,12 @@ public class Http2ServerConnection extends Http2ConnectionBase {
boolean writable,
Handler<AsyncResult<HttpServerResponse>> completionHandler) {
super(Http2ServerConnection.this, context, stream, writable);
Promise<HttpServerResponse> promise = Promise.promise();
promise.future().setHandler(completionHandler);
this.method = method;
this.uri = uri;
this.contentEncoding = contentEncoding;
this.completionHandler = Future.<HttpServerResponse>future().setHandler(completionHandler);
this.completionHandler = promise;
}
@Override

View File

@@ -26,6 +26,7 @@ import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
@@ -614,8 +615,8 @@ public class Http2ServerResponseImpl implements HttpServerResponse {
}
checkSendHeaders(false);
Future<Long> result = Future.future();
result.setHandler(ar -> {
Promise<Long> result = Promise.promise();
result.future().setHandler(ar -> {
if (ar.succeeded()) {
bytesWritten += ar.result();
end();

View File

@@ -20,6 +20,7 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.pool.ConnectResult;
@@ -93,18 +94,19 @@ class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {
@Override
public void connect(ConnectionListener<HttpClientConnection> listener, ContextInternal context, Handler<AsyncResult<ConnectResult<HttpClientConnection>>> handler) {
Future<ConnectResult<HttpClientConnection>> future = Future.<ConnectResult<HttpClientConnection>>future().setHandler(handler);
Promise<ConnectResult<HttpClientConnection>> promise = Promise.promise();
promise.future().setHandler(handler);
try {
doConnect(listener, context, future);
doConnect(listener, context, promise);
} catch(Exception e) {
future.tryFail(e);
promise.tryFail(e);
}
}
private void doConnect(
ConnectionListener<HttpClientConnection> listener,
ContextInternal context,
Future<ConnectResult<HttpClientConnection>> future) {
Promise<ConnectResult<HttpClientConnection>> future) {
boolean domainSocket = server.path() != null;
boolean useAlpn = options.isUseAlpn();
@@ -200,7 +202,7 @@ class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {
boolean ssl,
ContextInternal context,
Channel ch, long weight,
Future<ConnectResult<HttpClientConnection>> future) {
Promise<ConnectResult<HttpClientConnection>> future) {
boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade();
VertxHandler<Http1xClientConnection> clientHandler = VertxHandler.create(context, chctx -> {
Http1xClientConnection conn = new Http1xClientConnection(listener, upgrade ? HttpVersion.HTTP_1_1 : version, client, endpointMetric, chctx, ssl, server, context, metrics);
@@ -229,7 +231,7 @@ class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {
private void http2Connected(ConnectionListener<HttpClientConnection> listener,
ContextInternal context,
Channel ch,
Future<ConnectResult<HttpClientConnection>> future) {
Promise<ConnectResult<HttpClientConnection>> future) {
try {
VertxHttp2ConnectionHandler<Http2ClientConnection> clientHandler = Http2ClientConnection.createHttp2ConnectionHandler(client, endpointMetric, listener, context, null, (conn, concurrency) -> {
future.complete(new ConnectResult<>(conn, concurrency, http2Weight));
@@ -241,7 +243,7 @@ class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {
}
}
private void connectFailed(Channel ch, ConnectionListener<HttpClientConnection> listener, Throwable t, Future<ConnectResult<HttpClientConnection>> future) {
private void connectFailed(Channel ch, ConnectionListener<HttpClientConnection> listener, Throwable t, Promise<ConnectResult<HttpClientConnection>> future) {
if (ch != null) {
try {
ch.close();

View File

@@ -53,7 +53,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
static final Logger log = LoggerFactory.getLogger(HttpClientRequestImpl.class);
private final VertxInternal vertx;
private final Future<HttpClientResponse> respFut;
private final Promise<HttpClientResponse> responsePromise;
private boolean chunked;
private String hostHeader;
private String rawMethod;
@@ -84,7 +84,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
this.chunked = false;
this.vertx = vertx;
this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY;
this.respFut = Future.future();
this.responsePromise = Promise.promise();
}
@Override
@@ -100,7 +100,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
}
handler.handle(t);
respFut.tryFail(t);
responsePromise.tryFail(t);
}
@Override
@@ -119,7 +119,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
if (handler != null) {
checkComplete();
}
respFut.setHandler(handler);
responsePromise.future().setHandler(handler);
return this;
}
@@ -388,7 +388,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
private void handleNextRequest(HttpClientRequestImpl next, long timeoutMs) {
next.handler(respFut.getHandler());
next.handler(responsePromise.future().getHandler());
next.exceptionHandler(exceptionHandler());
exceptionHandler(null);
next.pushHandler = pushHandler;
@@ -400,8 +400,9 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
if (headers != null && next.headers == null) {
next.headers().addAll(headers);
}
Future<Void> fut = Future.future();
fut.setHandler(ar -> {
Promise<Void> promise = Promise.promise();
Future<Void> future = promise.future();
future.setHandler(ar -> {
if (ar.succeeded()) {
if (timeoutMs > 0) {
next.setTimeout(timeoutMs);
@@ -412,19 +413,19 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
});
if (exceptionOccurred != null) {
fut.fail(exceptionOccurred);
promise.fail(exceptionOccurred);
}
else if (completed) {
fut.complete();
promise.complete();
} else {
exceptionHandler(err -> {
if (!fut.isComplete()) {
fut.fail(err);
if (!future.isComplete()) {
promise.fail(err);
}
});
completionHandler = v -> {
if (!fut.isComplete()) {
fut.complete();
if (!future.isComplete()) {
promise.complete();
}
};
}
@@ -451,7 +452,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
continueHandler.handle(null);
}
} else {
respFut.complete(resp);
responsePromise.complete(resp);
}
}
}
@@ -736,7 +737,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
}
private void checkResponseHandler() {
if (stream == null && !connecting && respFut.getHandler() == null) {
if (stream == null && !connecting && responsePromise.future().getHandler() == null) {
throw new IllegalStateException("You must set a response handler before connecting to the server");
}
}

View File

@@ -14,9 +14,9 @@ package io.vertx.core.http.impl;
import io.netty.handler.codec.http2.Http2Stream;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.net.SocketAddress;
@@ -30,7 +30,7 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase {
private final Http2ClientConnection.Http2ClientStream stream;
private final String rawMethod;
private final MultiMap headers;
private final Future<HttpClientResponse> respHandler;
private final Promise<HttpClientResponse> responsePromise;
public HttpClientRequestPushPromise(
Http2ClientConnection conn,
@@ -48,7 +48,7 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase {
this.stream = new Http2ClientConnection.Http2ClientStream(conn, conn.getContext(), this, stream, false);
this.rawMethod = rawMethod;
this.headers = headers;
this.respHandler = Future.future();
this.responsePromise = Promise.promise();
}
Http2ClientConnection.Http2ClientStream getStream() {
@@ -57,7 +57,7 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase {
@Override
protected void doHandleResponse(HttpClientResponseImpl resp, long timeoutMs) {
respHandler.complete(resp);
responsePromise.complete(resp);
}
@Override
@@ -71,7 +71,7 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase {
@Override
public synchronized HttpClientRequest handler(Handler<AsyncResult<HttpClientResponse>> handler) {
respHandler.setHandler(handler);
responsePromise.future().setHandler(handler);
return this;
}

View File

@@ -20,10 +20,10 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketFrame;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
@@ -54,7 +54,7 @@ public class ServerWebSocketImpl extends WebSocketImplBase<ServerWebSocketImpl>
private final MultiMap headers;
private HttpServerRequestImpl request;
private Integer status;
private Future<Integer> handshakeFuture;
private Promise<Integer> handshakePromise;
ServerWebSocketImpl(ContextInternal context,
Http1xServerConnection conn,
@@ -130,10 +130,10 @@ public class ServerWebSocketImpl extends WebSocketImplBase<ServerWebSocketImpl>
synchronized (conn) {
checkClosed();
if (status == null) {
if (handshakeFuture == null) {
if (handshakePromise == null) {
tryHandshake(101);
} else {
handshakeFuture.tryComplete(101);
handshakePromise.tryComplete(101);
}
}
}
@@ -195,8 +195,8 @@ public class ServerWebSocketImpl extends WebSocketImplBase<ServerWebSocketImpl>
Boolean tryHandshake(int sc) {
synchronized (conn) {
if (status == null && handshakeFuture == null) {
setHandshake(Future.succeededFuture(sc));
if (status == null && handshakePromise == null) {
setHandshake(Promise.succeededPromise(sc));
}
return status == null ? null : status == sc;
}
@@ -204,16 +204,21 @@ public class ServerWebSocketImpl extends WebSocketImplBase<ServerWebSocketImpl>
@Override
public void setHandshake(Future<Integer> future) {
if (future == null) {
setHandshake((Promise<Integer>) future);
}
@Override
public void setHandshake(Promise<Integer> promise) {
if (promise == null) {
throw new NullPointerException();
}
synchronized (conn) {
if (handshakeFuture != null) {
if (handshakePromise != null) {
throw new IllegalStateException();
}
handshakeFuture = future;
handshakePromise = promise;
}
future.setHandler(ar -> {
promise.future().setHandler(ar -> {
if (ar.succeeded()) {
handleHandshake(ar.result());
} else {

View File

@@ -21,6 +21,7 @@ import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.http.StreamResetException;
@@ -269,8 +270,8 @@ class VertxHttp2NetSocket<C extends Http2ConnectionBase> extends VertxHttp2Strea
long contentLength = Math.min(length, file.length() - offset);
Future<Long> result = Future.future();
result.setHandler(ar -> {
Promise<Long> result = Promise.promise();
result.future().setHandler(ar -> {
if (resultHandler != null) {
resultCtx.runOnContext(v -> {
resultHandler.handle(Future.succeededFuture());

View File

@@ -11,8 +11,8 @@
package io.vertx.core.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Starter;
import io.vertx.core.impl.launcher.VertxCommandLauncher;
@@ -108,7 +108,7 @@ abstract class AbstractContext implements ContextInternal {
}
@Override
public final <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
public final <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(blockingCodeHandler, true, resultHandler);
}

View File

@@ -15,6 +15,7 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.impl.logging.Logger;
import java.util.HashSet;
@@ -86,8 +87,8 @@ class CloseHooks {
AtomicInteger count = new AtomicInteger();
AtomicBoolean failed = new AtomicBoolean();
for (Closeable hook : copy) {
Future<Void> a = Future.future();
a.setHandler(ar -> {
Promise<Void> promise = Promise.promise();
promise.future().setHandler(ar -> {
if (ar.failed()) {
if (failed.compareAndSet(false, true)) {
// Only report one failure
@@ -101,10 +102,10 @@ class CloseHooks {
}
});
try {
hook.close(a);
hook.close(promise);
} catch (Throwable t) {
log.warn("Failed to run close hooks", t);
a.tryFail(t);
promise.tryFail(t);
}
}
} else {

View File

@@ -21,7 +21,7 @@ import java.util.function.Function;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class CompositeFutureImpl implements CompositeFuture, Handler<AsyncResult<CompositeFuture>> {
public class CompositeFutureImpl implements CompositeFuture {
private static final Handler<AsyncResult<CompositeFuture>> NO_HANDLER = c -> {};
@@ -216,66 +216,6 @@ public class CompositeFutureImpl implements CompositeFuture, Handler<AsyncResult
return completed && cause == null ? this : null;
}
@Override
public void complete() {
if (!tryComplete()) {
throw new IllegalStateException("Result is already complete: " + (this.cause == null ? "succeeded" : "failed"));
}
}
@Override
public void complete(CompositeFuture result) {
if (!tryComplete(result)) {
throw new IllegalStateException("Result is already complete: " + (this.cause == null ? "succeeded" : "failed"));
}
}
@Override
public void fail(Throwable cause) {
if (!tryFail(cause)) {
throw new IllegalStateException("Result is already complete: " + (this.cause == null ? "succeeded" : "failed"));
}
}
@Override
public void fail(String failureMessage) {
if (!tryFail(failureMessage)) {
throw new IllegalStateException("Result is already complete: " + (this.cause == null ? "succeeded" : "failed"));
}
}
@Override
public boolean tryComplete(CompositeFuture result) {
Handler<AsyncResult<CompositeFuture>> handler = setCompleted(null);
if (handler != null) {
handler.handle(this);
return true;
} else {
return false;
}
}
@Override
public boolean tryComplete() {
return tryComplete(this);
}
@Override
public boolean tryFail(Throwable cause) {
Handler<AsyncResult<CompositeFuture>> handler = setCompleted(cause);
if (handler != null) {
handler.handle(this);
return true;
} else {
return false;
}
}
@Override
public boolean tryFail(String failureMessage) {
return tryFail(new NoStackTraceThrowable(failureMessage));
}
private Handler<AsyncResult<CompositeFuture>> setCompleted(Throwable cause) {
synchronized (this) {
if (completed) {
@@ -286,13 +226,4 @@ public class CompositeFutureImpl implements CompositeFuture, Handler<AsyncResult
return handler != null ? handler : NO_HANDLER;
}
}
@Override
public void handle(AsyncResult<CompositeFuture> asyncResult) {
if (asyncResult.succeeded()) {
complete(this);
} else {
fail(asyncResult.cause());
}
}
}

View File

@@ -18,6 +18,7 @@ import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.impl.logging.Logger;
@@ -143,21 +144,21 @@ abstract class ContextImpl extends AbstractContext {
}
@Override
public <T> void executeBlockingInternal(Handler<Future<T>> action, Handler<AsyncResult<T>> resultHandler) {
public <T> void executeBlockingInternal(Handler<Promise<T>> action, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(this, action, resultHandler, internalBlockingPool, internalOrderedTasks);
}
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(this, blockingCodeHandler, resultHandler, workerPool, ordered ? orderedTasks : null);
}
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler) {
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(this, blockingCodeHandler, resultHandler, workerPool, queue);
}
static <T> void executeBlocking(ContextInternal context, Handler<Future<T>> blockingCodeHandler,
static <T> void executeBlocking(ContextInternal context, Handler<Promise<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
WorkerPool workerPool, TaskQueue queue) {
PoolMetrics metrics = workerPool.metrics();
@@ -168,7 +169,8 @@ abstract class ContextImpl extends AbstractContext {
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
Future<T> res = Future.future();
Promise<T> res = Promise.promise();
Future<T> fut = res.future();
context.dispatch(res, f -> {
try {
blockingCodeHandler.handle(res);
@@ -177,10 +179,10 @@ abstract class ContextImpl extends AbstractContext {
}
});
if (metrics != null) {
metrics.end(execMetric, res.succeeded());
metrics.end(execMetric, fut.succeeded());
}
if (resultHandler != null) {
res.setHandler(ar -> context.runOnContext(v -> resultHandler.handle(ar)));
fut.setHandler(ar -> context.runOnContext(v -> resultHandler.handle(ar)));
}
};
Executor exec = workerPool.executor();
@@ -276,17 +278,17 @@ abstract class ContextImpl extends AbstractContext {
return delegate.tracer();
}
public final <T> void executeBlockingInternal(Handler<Future<T>> action, Handler<AsyncResult<T>> resultHandler) {
public final <T> void executeBlockingInternal(Handler<Promise<T>> action, Handler<AsyncResult<T>> resultHandler) {
ContextImpl.executeBlocking(this, action, resultHandler, delegate.internalBlockingPool, delegate.internalOrderedTasks);
}
@Override
public final <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
public final <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
ContextImpl.executeBlocking(this, blockingCodeHandler, resultHandler, delegate.workerPool, ordered ? delegate.orderedTasks : null);
}
@Override
public final <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler) {
public final <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler) {
ContextImpl.executeBlocking(this, blockingCodeHandler, resultHandler, delegate.workerPool, queue);
}

View File

@@ -14,8 +14,8 @@ package io.vertx.core.impl;
import io.netty.channel.EventLoop;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.spi.tracing.VertxTracer;
@@ -42,12 +42,12 @@ public interface ContextInternal extends Context {
* Like {@link #executeBlocking(Handler, boolean, Handler)} but uses the {@code queue} to order the tasks instead
* of the internal queue of this context.
*/
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler);
<T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler);
/**
* Execute an internal task on the internal blocking ordered executor.
*/
<T> void executeBlockingInternal(Handler<Future<T>> action, Handler<AsyncResult<T>> resultHandler);
<T> void executeBlockingInternal(Handler<Promise<T>> action, Handler<AsyncResult<T>> resultHandler);
/**
* @return the deployment associated with this context or {@code null}

View File

@@ -16,6 +16,7 @@ import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.ServiceHelper;
import io.vertx.core.Verticle;
import io.vertx.core.json.JsonObject;
@@ -155,21 +156,21 @@ public class DeploymentManager {
Handler<AsyncResult<String>> completionHandler) {
if (iter.hasNext()) {
VerticleFactory verticleFactory = iter.next();
Future<String> fut = Future.future();
Promise<String> promise = Promise.promise();
if (verticleFactory.requiresResolve()) {
try {
verticleFactory.resolve(identifier, options, cl, fut);
verticleFactory.resolve(identifier, options, cl, promise);
} catch (Exception e) {
try {
fut.fail(e);
promise.fail(e);
} catch (Exception ignore) {
// Too late
}
}
} else {
fut.complete(identifier);
promise.complete(identifier);
}
fut.setHandler(ar -> {
promise.future().setHandler(ar -> {
Throwable err;
if (ar.succeeded()) {
String resolvedName = ar.result();
@@ -482,8 +483,9 @@ public class DeploymentManager {
context.runOnContext(v -> {
try {
verticle.init(vertx, context);
Future<Void> startFuture = Future.future();
verticle.start(startFuture);
Promise<Void> startPromise = Promise.promise();
Future<Void> startFuture = startPromise.future();
verticle.start(startPromise);
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (parent != null) {
@@ -623,7 +625,8 @@ public class DeploymentManager {
for (VerticleHolder verticleHolder: verticles) {
ContextImpl context = verticleHolder.context;
context.runOnContext(v -> {
Future<Void> stopFuture = Future.future();
Promise<Void> stopPromise = Promise.promise();
Future<Void> stopFuture = stopPromise.future();
AtomicBoolean failureReported = new AtomicBoolean();
stopFuture.setHandler(ar -> {
deployments.remove(deploymentID);
@@ -645,9 +648,9 @@ public class DeploymentManager {
});
});
try {
verticleHolder.verticle.stop(stopFuture);
verticleHolder.verticle.stop(stopPromise);
} catch (Throwable t) {
stopFuture.fail(t);
stopPromise.fail(t);
}
});
}

View File

@@ -14,11 +14,12 @@ package io.vertx.core.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class FailedFuture<T> implements Future<T> {
public class FailedFuture<T> implements Future<T>, Promise<T> {
private final Throwable cause;
@@ -119,6 +120,11 @@ public class FailedFuture<T> implements Future<T> {
throw new IllegalStateException("Result is already complete: failed");
}
@Override
public Future<T> future() {
return this;
}
@Override
public String toString() {
return "Future{cause=" + cause.getMessage() + "}";

View File

@@ -12,6 +12,7 @@
package io.vertx.core.impl;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.spi.FutureFactory;
/**
@@ -21,6 +22,33 @@ public class FutureFactoryImpl implements FutureFactory {
private static final SucceededFuture EMPTY = new SucceededFuture<>(null);
@Override
public <T> Promise<T> promise() {
return new FutureImpl<>();
}
@Override
public <T> Promise<T> succeededPromise() {
@SuppressWarnings("unchecked")
Promise<T> promise = EMPTY;
return promise;
}
@Override
public <T> Promise<T> succeededPromise(T result) {
return new SucceededFuture<>(result);
}
@Override
public <T> Promise<T> failedPromise(Throwable t) {
return new FailedFuture<>(t);
}
@Override
public <T> Promise<T> failurePromise(String failureMessage) {
return new FailedFuture<>(failureMessage);
}
@Override
public <T> Future<T> future() {
return new FutureImpl<>();

View File

@@ -14,8 +14,9 @@ package io.vertx.core.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
class FutureImpl<T> implements Future<T> {
class FutureImpl<T> implements Promise<T>, Future<T> {
private boolean failed;
private boolean succeeded;
@@ -177,6 +178,11 @@ class FutureImpl<T> implements Future<T> {
return tryFail(new NoStackTraceThrowable(failureMessage));
}
@Override
public Future<T> future() {
return this;
}
@Override
public String toString() {
synchronized (this) {

View File

@@ -14,11 +14,12 @@ package io.vertx.core.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class SucceededFuture<T> implements Future<T> {
class SucceededFuture<T> implements Future<T>, Promise<T> {
private final T result;
@@ -111,6 +112,11 @@ class SucceededFuture<T> implements Future<T> {
throw new IllegalStateException("Result is already complete: succeeded");
}
@Override
public Future<T> future() {
return this;
}
@Override
public String toString() {
return "Future{result=" + result + "}";

View File

@@ -555,16 +555,16 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
closeHooks.run(ar -> {
deploymentManager.undeployAll(ar1 -> {
HAManager haManager = haManager();
Future<Void> haFuture = Future.future();
Promise<Void> haPromise = Promise.promise();
if (haManager != null) {
this.executeBlocking(fut -> {
haManager.stop();
fut.complete();
}, false, haFuture);
}, false, haPromise);
} else {
haFuture.complete();
haPromise.complete();
}
haFuture.setHandler(ar2 -> {
haPromise.future().setHandler(ar2 -> {
addressResolver.close(ar3 -> {
eventBus.close(ar4 -> {
closeClusterManager(ar5 -> {
@@ -707,7 +707,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
@Override
public void undeploy(String deploymentID, Handler<AsyncResult<Void>> completionHandler) {
HAManager haManager = haManager();
Future<Void> haFuture = Future.future();
Promise<Void> haFuture = Promise.promise();
if (haManager != null && haManager.isEnabled()) {
this.executeBlocking(fut -> {
haManager.removeFromHA(deploymentID);
@@ -716,10 +716,10 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
} else {
haFuture.complete();
}
haFuture.compose(v -> {
Future<Void> deploymentFuture = Future.future();
haFuture.future().compose(v -> {
Promise<Void> deploymentFuture = Promise.promise();
deploymentManager.undeployVerticle(deploymentID, deploymentFuture);
return deploymentFuture;
return deploymentFuture.future();
}).setHandler(completionHandler);
}
@@ -744,21 +744,21 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
}
@Override
public <T> void executeBlockingInternal(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
public <T> void executeBlockingInternal(Handler<Promise<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
ContextInternal context = getOrCreateContext();
context.executeBlockingInternal(blockingCodeHandler, resultHandler);
}
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered,
Handler<AsyncResult<T>> asyncResultHandler) {
ContextInternal context = getOrCreateContext();
context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler);
}
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler,
Handler<AsyncResult<T>> asyncResultHandler) {
executeBlocking(blockingCodeHandler, true, asyncResultHandler);
}

View File

@@ -100,7 +100,7 @@ public interface VertxInternal extends Vertx {
/**
* Like {@link #executeBlocking(Handler, Handler)} but using the internal worker thread pool.
*/
<T> void executeBlockingInternal(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler);
<T> void executeBlockingInternal(Handler<Promise<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler);
ClusterManager getClusterManager();

View File

@@ -51,7 +51,7 @@ class WorkerExecutorImpl implements MetricsProvider, WorkerExecutorInternal {
return pool;
}
public synchronized <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> asyncResultHandler) {
public synchronized <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> asyncResultHandler) {
if (closed) {
throw new IllegalStateException("Worker executor closed");
}

View File

@@ -12,12 +12,23 @@
package io.vertx.core.spi;
import io.vertx.core.Future;
import io.vertx.core.Promise;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public interface FutureFactory {
<T> Promise<T> promise();
<T> Promise<T> succeededPromise();
<T> Promise<T> succeededPromise(T result);
<T> Promise<T> failedPromise(Throwable t);
<T> Promise<T> failurePromise(String failureMessage);
<T> Future<T> future();
<T> Future<T> succeededFuture();

View File

@@ -13,6 +13,7 @@ package io.vertx.core.spi;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
@@ -80,7 +81,7 @@ public interface VerticleFactory {
* @param classLoader The classloader
* @param resolution A future which will receive the result of the resolution.
*/
default void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Future<String> resolution) {
default void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Promise<String> resolution) {
resolution.complete(identifier);
}

View File

@@ -13,6 +13,7 @@ package io.vertx.core.streams.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.VertxException;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
@@ -22,7 +23,7 @@ public class PipeImpl<T> implements Pipe<T> {
private static final Handler<AsyncResult<Void>> NULL_HANDLER = ar -> {};
private final Future<Void> result;
private final Promise<Void> result;
private final ReadStream<T> src;
private boolean endOnSuccess = true;
private boolean endOnFailure = true;
@@ -30,7 +31,7 @@ public class PipeImpl<T> implements Pipe<T> {
public PipeImpl(ReadStream<T> src) {
this.src = src;
this.result = Future.<Void>future();
this.result = Promise.promise();
// Set handlers now
src.endHandler(result::tryComplete);
@@ -86,7 +87,7 @@ public class PipeImpl<T> implements Pipe<T> {
});
ws.exceptionHandler(err -> result.tryFail(new WriteException(err)));
src.resume();
result.setHandler(ar -> {
result.future().setHandler(ar -> {
try {
src.handler(null);
} catch (Exception ignore) {
@@ -132,7 +133,7 @@ public class PipeImpl<T> implements Pipe<T> {
dst.drainHandler(null);
dst.exceptionHandler(null);
}
if (result.isComplete()) {
if (result.future().isComplete()) {
return;
}
}

View File

@@ -148,7 +148,7 @@ public class ContextTest extends VertxTestBase {
Context context = vertx.getOrCreateContext();
context.<Void>runOnContext(v -> {
Thread expected = Thread.currentThread();
context.executeBlocking(Future::complete, r -> {
context.executeBlocking(Promise::complete, r -> {
assertSame(expected, Thread.currentThread());
testComplete();
});
@@ -307,7 +307,7 @@ public class ContextTest extends VertxTestBase {
ctx.runOnContext(v -> {
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
context.runOnContext(startFuture::complete);
}
}, ar -> {
@@ -515,7 +515,7 @@ public class ContextTest extends VertxTestBase {
CountDownLatch latch4 = new CountDownLatch(1);
duplicated.runOnContext(v -> {
vertx.executeBlocking(Future::complete, res -> {
vertx.executeBlocking(Promise::complete, res -> {
assertSame(duplicated, Vertx.currentContext());
latch4.countDown();
});

View File

@@ -741,7 +741,7 @@ public class DeploymentTest extends VertxTestBase {
AtomicInteger childUndeployed = new AtomicInteger();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
@@ -769,7 +769,7 @@ public class DeploymentTest extends VertxTestBase {
AtomicInteger childUndeployed = new AtomicInteger();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
vertx.deployVerticle(new AbstractVerticle() {
@Override
@@ -798,10 +798,10 @@ public class DeploymentTest extends VertxTestBase {
AtomicInteger parentFailed = new AtomicInteger();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Future<Void> fut) throws Exception {
public void start(Promise<Void> fut) throws Exception {
vertx.setTimer(100, id -> {
fut.complete();
});
@@ -947,11 +947,11 @@ public class DeploymentTest extends VertxTestBase {
public void testChildUndeployedDirectly() throws Exception {
Verticle parent = new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
Verticle child = new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
startFuture.complete();
// Undeploy it directly
@@ -964,11 +964,6 @@ public class DeploymentTest extends VertxTestBase {
}));
}
@Override
public void stop(Future<Void> stopFuture) throws Exception {
super.stop(stopFuture);
}
};
vertx.deployVerticle(parent, onSuccess(depID -> {
@@ -1133,7 +1128,7 @@ public class DeploymentTest extends VertxTestBase {
public static class ParentVerticle extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
vertx.deployVerticle("java:" + ChildVerticle.class.getName(), ar -> {
if (ar.succeeded()) {
startFuture.complete(null);
@@ -1249,7 +1244,7 @@ public class DeploymentTest extends VertxTestBase {
public void testFailedVerticleStopNotCalled() {
Verticle verticleChild = new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
startFuture.fail("wibble");
}
@Override
@@ -1260,7 +1255,7 @@ public class DeploymentTest extends VertxTestBase {
};
Verticle verticleParent = new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
vertx.deployVerticle(verticleChild, onFailure(v -> {
startFuture.complete();
}));
@@ -1411,7 +1406,7 @@ public class DeploymentTest extends VertxTestBase {
};
Verticle v = new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
this.context.addCloseHook(closeable);
startFuture.fail("Fail to deploy.");
}
@@ -1436,7 +1431,7 @@ public class DeploymentTest extends VertxTestBase {
vertx.deployVerticle(() -> {
Verticle v = new AbstractVerticle() {
@Override
public void start(final Future<Void> startFuture) throws Exception {
public void start(final Promise<Void> startFuture) throws Exception {
startFuture.fail("Fail to deploy.");
}
};
@@ -1580,23 +1575,23 @@ public class DeploymentTest extends VertxTestBase {
public class MyAsyncVerticle extends AbstractVerticle {
private final Consumer<Future<Void>> startConsumer;
private final Consumer<Future<Void>> stopConsumer;
private final Consumer<Promise<Void>> startConsumer;
private final Consumer<Promise<Void>> stopConsumer;
public MyAsyncVerticle(Consumer<Future<Void>> startConsumer, Consumer<Future<Void>> stopConsumer) {
public MyAsyncVerticle(Consumer<Promise<Void>> startConsumer, Consumer<Promise<Void>> stopConsumer) {
this.startConsumer = startConsumer;
this.stopConsumer = stopConsumer;
}
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
if (startConsumer != null) {
startConsumer.accept(startFuture);
}
}
@Override
public void stop(Future<Void> stopFuture) throws Exception {
public void stop(Promise<Void> stopFuture) throws Exception {
if (stopConsumer != null) {
stopConsumer.accept(stopFuture);
}

File diff suppressed because it is too large Load Diff

View File

@@ -53,7 +53,7 @@ public class NamedWorkerPoolTest extends VertxTestBase {
.setMaxWorkerExecuteTime(maxWorkerExecuteTime);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
vertx.executeBlocking(fut -> {
try {
SECONDS.sleep(5);

View File

@@ -256,7 +256,7 @@ public class VerticleFactoryTest extends VertxTestBase {
return true;
}
@Override
public void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Future<String> resolution) {
public void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Promise<String> resolution) {
vertx.runOnContext(v -> {
// Async resolution
resolution.complete("whatever");
@@ -585,7 +585,7 @@ public class VerticleFactoryTest extends VertxTestBase {
}
@Override
public void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Future<String> resolution) {
public void resolve(String identifier, DeploymentOptions deploymentOptions, ClassLoader classLoader, Promise<String> resolution) {
if (failInResolve) {
resolution.fail(new IOException("whatever"));
} else {

View File

@@ -220,7 +220,7 @@ public class ClusteredEventBusTestBase extends EventBusTestBase {
boolean unregisterCalled;
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
EventBus eventBus = getVertx().eventBus();
MessageConsumer<String> consumer = eventBus.consumer("whatever");
consumer.handler(m -> {

View File

@@ -16,6 +16,7 @@ import io.netty.buffer.Unpooled;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.impl.AsyncFileImpl;
@@ -1238,7 +1239,7 @@ public class FileSystemTest extends VertxTestBase {
@Override
Future<Void> handle(ReadStream<Buffer> stream) {
Future<Void> fut = Future.future();
Promise<Void> fut = Promise.promise();
assert flowing.getAndSet(false);
stream.pause();
Vertx.currentContext().owner().setTimer(1, id -> {
@@ -1246,7 +1247,7 @@ public class FileSystemTest extends VertxTestBase {
stream.resume();
fut.complete();
});
return fut;
return fut.future();
}
},
@@ -1259,14 +1260,14 @@ public class FileSystemTest extends VertxTestBase {
}
@Override
Future<Void> handle(ReadStream<Buffer> stream) {
Future<Void> fut = Future.future();
Promise<Void> fut = Promise.promise();
assert fetching.getAndSet(false);
Vertx.currentContext().owner().setTimer(1, id -> {
assert !fetching.getAndSet(true);
stream.fetch(1);
fut.complete();
});
return fut;
return fut.future();
}
};

View File

@@ -35,6 +35,7 @@ import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
@@ -79,9 +80,9 @@ public class Http2ClientTest extends Http2TestBase {
io.vertx.core.http.Http2Settings updatedSettings = TestUtils.randomHttp2Settings();
updatedSettings.setHeaderTableSize(initialSettings.getHeaderTableSize()); // Otherwise it raise "invalid max dynamic table size" in Netty
AtomicInteger count = new AtomicInteger();
Future<Void> end = Future.future();
Promise<Void> end = Promise.promise();
server.requestHandler(req -> {
end.setHandler(v -> {
end.future().setHandler(v -> {
req.response().end();
});
}).connectionHandler(conn -> {
@@ -463,7 +464,7 @@ public class Http2ClientTest extends Http2TestBase {
public void testClientResponsePauseResume() throws Exception {
String content = TestUtils.randomAlphaString(1024);
Buffer expected = Buffer.buffer();
Future<Void> whenFull = Future.future();
Promise<Void> whenFull = Promise.promise();
AtomicBoolean drain = new AtomicBoolean();
server.requestHandler(req -> {
HttpServerResponse resp = req.response();
@@ -493,7 +494,7 @@ public class Http2ClientTest extends Http2TestBase {
Buffer received = Buffer.buffer();
resp.pause();
resp.handler(buff -> {
if (whenFull.isComplete()) {
if (whenFull.future().isComplete()) {
assertSame(ctx, Vertx.currentContext());
} else {
assertOnIOContext(ctx);
@@ -504,7 +505,7 @@ public class Http2ClientTest extends Http2TestBase {
assertEquals(expected.toString().length(), received.toString().length());
testComplete();
});
whenFull.setHandler(v -> {
whenFull.future().setHandler(v -> {
resp.resume();
});
}));
@@ -640,9 +641,9 @@ public class Http2ClientTest extends Http2TestBase {
public void testServerResetClientStreamDuringResponse() throws Exception {
waitFor(2);
String chunk = TestUtils.randomAlphaString(1024);
Future<Void> doReset = Future.future();
Promise<Void> doReset = Promise.promise();
server.requestHandler(req -> {
doReset.setHandler(onSuccess(v -> {
doReset.future().setHandler(onSuccess(v -> {
req.response().reset(8);
}));
req.response().setChunked(true).write(Buffer.buffer(chunk));
@@ -669,7 +670,7 @@ public class Http2ClientTest extends Http2TestBase {
@Test
public void testClientResetServerStreamDuringRequest() throws Exception {
Future<Void> bufReceived = Future.future();
Promise<Void> bufReceived = Promise.promise();
server.requestHandler(req -> {
req.handler(buf -> {
bufReceived.complete();
@@ -695,7 +696,7 @@ public class Http2ClientTest extends Http2TestBase {
HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
fail();
}).setChunked(true).write(Buffer.buffer("hello"));
bufReceived.setHandler(ar -> {
bufReceived.future().setHandler(ar -> {
req.reset(10);
});
await();

View File

@@ -49,6 +49,7 @@ import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.Http1xOrH2CHandler;
@@ -806,7 +807,7 @@ public class Http2ServerTest extends Http2TestBase {
Context ctx = vertx.getOrCreateContext();
String content = TestUtils.randomAlphaString(1024);
StringBuilder expected = new StringBuilder();
Future<Void> whenFull = Future.future();
Promise<Void> whenFull = Promise.promise();
AtomicBoolean drain = new AtomicBoolean();
server.requestHandler(req -> {
WriteStream<Buffer> stream = streamProvider.apply(req);
@@ -859,7 +860,7 @@ public class Http2ServerTest extends Http2TestBase {
}
}
});
whenFull.setHandler(ar -> {
whenFull.future().setHandler(ar -> {
request.context.executor().execute(() -> {
try {
request.decoder.flowController().consumeBytes(request.connection.stream(id), toAck.intValue());
@@ -964,7 +965,7 @@ public class Http2ServerTest extends Http2TestBase {
@Test
public void testClientResetServerStream() throws Exception {
Context ctx = vertx.getOrCreateContext();
Future<Void> bufReceived = Future.future();
Promise<Void> bufReceived = Promise.promise();
AtomicInteger resetCount = new AtomicInteger();
server.requestHandler(req -> {
req.handler(buf -> {
@@ -994,7 +995,7 @@ public class Http2ServerTest extends Http2TestBase {
Http2ConnectionEncoder encoder = request.encoder;
encoder.writeHeaders(request.context, id, GET("/"), 0, false, request.context.newPromise());
encoder.writeData(request.context, id, Buffer.buffer("hello").getByteBuf(), 0, false, request.context.newPromise());
bufReceived.setHandler(ar -> {
bufReceived.future().setHandler(ar -> {
encoder.writeRstStream(request.context, id, 10, request.context.newPromise());
request.context.flush();
});
@@ -1468,7 +1469,7 @@ public class Http2ServerTest extends Http2TestBase {
@Test
public void testStreamError() throws Exception {
waitFor(2);
Future<Void> when = Future.future();
Promise<Void> when = Promise.promise();
Context ctx = vertx.getOrCreateContext();
server.requestHandler(req -> {
AtomicInteger reqErrors = new AtomicInteger();
@@ -1501,7 +1502,7 @@ public class Http2ServerTest extends Http2TestBase {
Http2ConnectionEncoder encoder = request.encoder;
encoder.writeHeaders(request.context, id, GET("/"), 0, false, request.context.newPromise());
request.context.flush();
when.setHandler(ar -> {
when.future().setHandler(ar -> {
// Send a corrupted frame on purpose to check we get the corresponding error in the request exception handler
// the error is : greater padding value 0c -> 1F
// ChannelFuture a = encoder.frameWriter().writeData(request.context, id, Buffer.buffer("hello").getByteBuf(), 12, false, request.context.newPromise());
@@ -1522,7 +1523,7 @@ public class Http2ServerTest extends Http2TestBase {
public void testPromiseStreamError() throws Exception {
Context ctx = vertx.getOrCreateContext();
waitFor(2);
Future<Void> when = Future.future();
Promise<Void> when = Promise.promise();
server.requestHandler(req -> {
req.response().push(HttpMethod.GET, "/wibble", ar -> {
assertTrue(ar.succeeded());
@@ -1552,7 +1553,7 @@ public class Http2ServerTest extends Http2TestBase {
request.decoder.frameListener(new Http2EventAdapter() {
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
when.setHandler(ar -> {
when.future().setHandler(ar -> {
Http2ConnectionEncoder encoder = request.encoder;
encoder.frameWriter().writeHeaders(request.context, promisedStreamId, GET("/"), 0, false, request.context.newPromise());
request.context.flush();
@@ -1572,7 +1573,7 @@ public class Http2ServerTest extends Http2TestBase {
public void testConnectionDecodeError() throws Exception {
Context ctx = vertx.getOrCreateContext();
waitFor(3);
Future<Void> when = Future.future();
Promise<Void> when = Promise.promise();
server.requestHandler(req -> {
AtomicInteger reqFailures = new AtomicInteger();
AtomicInteger respFailures = new AtomicInteger();
@@ -1612,7 +1613,7 @@ public class Http2ServerTest extends Http2TestBase {
ChannelFuture fut = client.connect(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, request -> {
int id = request.nextStreamId();
Http2ConnectionEncoder encoder = request.encoder;
when.setHandler(ar -> {
when.future().setHandler(ar -> {
// Send a stream ID that does not exists
encoder.frameWriter().writeRstStream(request.context, 10, 0, request.context.newPromise());
request.context.flush();
@@ -1859,7 +1860,7 @@ public class Http2ServerTest extends Http2TestBase {
@Test
public void testClientSendGoAwayNoError() throws Exception {
Future<Void> abc = Future.future();
Promise<Void> abc = Promise.promise();
Context ctx = vertx.getOrCreateContext();
Handler<HttpServerRequest> requestHandler = req -> {
HttpConnection conn = req.connection();
@@ -1892,7 +1893,7 @@ public class Http2ServerTest extends Http2TestBase {
int id = request.nextStreamId();
encoder.writeHeaders(request.context, id, GET("/"), 0, true, request.context.newPromise());
request.context.flush();
abc.setHandler(ar -> {
abc.future().setHandler(ar -> {
encoder.writeGoAway(request.context, id, 0, Unpooled.EMPTY_BUFFER, request.context.newPromise());
request.context.flush();
});
@@ -1903,7 +1904,7 @@ public class Http2ServerTest extends Http2TestBase {
@Test
public void testClientSendGoAwayInternalError() throws Exception {
Future<Void> abc = Future.future();
Promise<Void> abc = Promise.promise();
Context ctx = vertx.getOrCreateContext();
Handler<HttpServerRequest> requestHandler = req -> {
HttpConnection conn = req.connection();
@@ -1931,7 +1932,7 @@ public class Http2ServerTest extends Http2TestBase {
int id = request.nextStreamId();
encoder.writeHeaders(request.context, id, GET("/"), 0, true, request.context.newPromise());
request.context.flush();
abc.setHandler(ar -> {
abc.future().setHandler(ar -> {
encoder.writeGoAway(request.context, id, 3, Unpooled.EMPTY_BUFFER, request.context.newPromise());
request.context.flush();
});
@@ -2602,20 +2603,23 @@ public class Http2ServerTest extends Http2TestBase {
setUseAlpn(false).
setSsl(false).
setInitialSettings(new io.vertx.core.http.Http2Settings().setMaxConcurrentStreams(10000)));
doRequest(method, expected, conn -> clientConnectionCount.incrementAndGet(),
Future.<HttpClientResponse>future().setHandler(onSuccess(resp -> {
assertEquals(HttpVersion.HTTP_2, resp.version());
// assertEquals(20000, req.connection().remoteSettings().getMaxConcurrentStreams());
assertEquals(1, serverConnectionCount.get());
assertEquals(1, clientConnectionCount.get());
doRequest(method, expected, null, Future.<HttpClientResponse>future().setHandler(onSuccess(resp2 -> {
testComplete();
})));
})));
Promise<HttpClientResponse> p1 = Promise.promise();
p1.future().setHandler(onSuccess(resp -> {
assertEquals(HttpVersion.HTTP_2, resp.version());
// assertEquals(20000, req.connection().remoteSettings().getMaxConcurrentStreams());
assertEquals(1, serverConnectionCount.get());
assertEquals(1, clientConnectionCount.get());
Promise<HttpClientResponse> p2 = Promise.promise();
p2.future().setHandler(onSuccess(resp2 -> {
testComplete();
}));
doRequest(method, expected, null, p2);
}));
doRequest(method, expected, conn -> clientConnectionCount.incrementAndGet(), p1);
await();
}
private void doRequest(HttpMethod method, Buffer expected, Handler<HttpConnection> connHandler, Future<HttpClientResponse> fut) {
private void doRequest(HttpMethod method, Buffer expected, Handler<HttpConnection> connHandler, Promise<HttpClientResponse> fut) {
HttpClientRequest req = client.request(method, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", onSuccess(resp -> {
assertEquals(HttpVersion.HTTP_2, resp.version());
// assertEquals(20000, req.connection().remoteSettings().getMaxConcurrentStreams());

View File

@@ -12,7 +12,7 @@
package io.vertx.core.http;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.PfxOptions;
import io.vertx.test.core.VertxTestBase;
@@ -34,7 +34,7 @@ public class HttpServerCloseHookTest extends VertxTestBase {
private static class TestVerticle extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) {
public void start(Promise<Void> startFuture) {
HttpServerOptions invalidOptions = new HttpServerOptions()
.setSsl(true)
.setPfxTrustOptions(new PfxOptions().setValue(Buffer.buffer("boom")));

View File

@@ -2137,13 +2137,13 @@ public abstract class HttpTest extends HttpTestBase {
await();
}
private void pausingServer(Consumer<Future<Void>> consumer) {
Future<Void> resumeFuture = Future.future();
private void pausingServer(Consumer<Promise<Void>> consumer) {
Promise<Void> resumeFuture = Promise.promise();
server.requestHandler(req -> {
req.response().setChunked(true);
req.pause();
Context ctx = vertx.getOrCreateContext();
resumeFuture.setHandler(v1 -> {
resumeFuture.future().setHandler(v1 -> {
ctx.runOnContext(v2 -> {
req.resume();
});
@@ -2170,7 +2170,7 @@ public abstract class HttpTest extends HttpTestBase {
private void drainingServer(Consumer<Future<Void>> consumer) {
Future<Void> resumeFuture = Future.future();
Promise<Void> resumeFuture = Promise.promise();
server.requestHandler(req -> {
req.response().setChunked(true);
@@ -2194,7 +2194,7 @@ public abstract class HttpTest extends HttpTestBase {
});
});
server.listen(testAddress, onSuccess(s -> consumer.accept(resumeFuture)));
server.listen(testAddress, onSuccess(s -> consumer.accept(resumeFuture.future())));
}
@Test
@@ -4014,14 +4014,14 @@ public abstract class HttpTest extends HttpTestBase {
});
startServer(server2);
client.redirectHandler(resp -> {
Future<HttpClientRequest> fut = Future.future();
Promise<HttpClientRequest> fut = Promise.promise();
vertx.setTimer(25, id -> {
HttpClientRequest req = client.getAbs(scheme + "://localhost:" + port + "/custom");
req.putHeader("foo", "foo_another");
req.setHost("localhost:" + port);
fut.complete(req);
});
return fut;
return fut.future();
});
client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath", onSuccess(resp -> {
assertEquals(scheme + "://localhost:" + port + "/custom", resp.request().absoluteURI());

View File

@@ -22,6 +22,7 @@ import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
@@ -1285,7 +1286,7 @@ public class WebSocketTest extends VertxTestBase {
public void testAsyncAccept() {
AtomicBoolean resolved = new AtomicBoolean();
server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT)).websocketHandler(ws -> {
Future<Integer> fut = Future.future();
Promise<Integer> fut = Promise.promise();
ws.setHandshake(fut);
try {
ws.accept();
@@ -1316,11 +1317,11 @@ public class WebSocketTest extends VertxTestBase {
@Test
public void testCloseAsyncPending() {
server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT)).websocketHandler(ws -> {
Future<Integer> fut = Future.future();
ws.setHandshake(fut);
Promise<Integer> promise = Promise.promise();
ws.setHandshake(promise);
ws.close();
assertTrue(fut.isComplete());
assertEquals(101, (int)fut.result());
assertTrue(promise.future().isComplete());
assertEquals(101, (int)promise.future().result());
});
server.listen(onSuccess(s -> {
client.webSocket(DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/some/path", onSuccess(ws -> {
@@ -2877,7 +2878,7 @@ public class WebSocketTest extends VertxTestBase {
@Test
public void testDrainServerWebSocket() {
Future<Void> resume = Future.future();
Promise<Void> resume = Promise.promise();
server = vertx.createHttpServer()
.websocketHandler(ws -> {
fillQueue(ws, v1 -> {
@@ -2889,7 +2890,7 @@ public class WebSocketTest extends VertxTestBase {
}).listen(DEFAULT_HTTP_PORT, onSuccess(v1 -> {
client.webSocket(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/someuri", onSuccess(ws -> {
ws.pause();
resume.setHandler(onSuccess(v2 -> {
resume.future().setHandler(onSuccess(v2 -> {
ws.resume();
}));
}));
@@ -2899,11 +2900,11 @@ public class WebSocketTest extends VertxTestBase {
@Test
public void testDrainClientWebSocket() {
Future<Void> resume = Future.future();
Promise<Void> resume = Promise.promise();
server = vertx.createHttpServer()
.websocketHandler(ws -> {
ws.pause();
resume.setHandler(onSuccess(v2 -> {
resume.future().setHandler(onSuccess(v2 -> {
ws.resume();
}));
}).listen(DEFAULT_HTTP_PORT, onSuccess(v1 -> {

View File

@@ -567,7 +567,10 @@ public class ConnectionPoolTest extends VertxTestBase {
@Override
public void connect(ConnectionListener<FakeConnection> listener, ContextInternal context, Handler<AsyncResult<ConnectResult<FakeConnection>>> handler) {
int i = ThreadLocalRandom.current().nextInt(100);
FakeConnection conn = new FakeConnection(context, listener, Future.<ConnectResult<FakeConnection>>future().setHandler(handler));
Promise<ConnectResult<FakeConnection>> promise = Promise.promise();
Future<ConnectResult<FakeConnection>> future = promise.future();
future.setHandler(handler);
FakeConnection conn = new FakeConnection(context, listener, promise);
if (i < 10) {
conn.fail(new Exception("Could not connect"));
} else {
@@ -844,13 +847,13 @@ public class ConnectionPoolTest extends VertxTestBase {
private final ContextInternal context;
private final ConnectionListener<FakeConnection> listener;
private final Future<ConnectResult<FakeConnection>> future;
private final Promise<ConnectResult<FakeConnection>> future;
private long inflight;
private long concurrency = 1;
private int status = DISCONNECTED;
FakeConnection(ContextInternal context, ConnectionListener<FakeConnection> listener, Future<ConnectResult<FakeConnection>> future) {
FakeConnection(ContextInternal context, ConnectionListener<FakeConnection> listener, Promise<ConnectResult<FakeConnection>> future) {
this.context = context;
this.listener = listener;
this.future = future;
@@ -955,7 +958,9 @@ public class ConnectionPoolTest extends VertxTestBase {
@Override
public void connect(ConnectionListener<FakeConnection> listener, ContextInternal context, Handler<AsyncResult<ConnectResult<FakeConnection>>> handler) {
pendingRequests.add(new FakeConnection(context, listener, Future.<ConnectResult<FakeConnection>>future().setHandler(handler)));
Promise<ConnectResult<FakeConnection>> promise = Promise.promise();
promise.future().setHandler(handler);
pendingRequests.add(new FakeConnection(context, listener, promise));
}
}
}

View File

@@ -3091,7 +3091,7 @@ public class NetTest extends VertxTestBase {
String expected = TestUtils.randomAlphaString(2000);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
NetServer server = vertx.createNetServer();
server.connectHandler(so -> {
Buffer received = Buffer.buffer();

View File

@@ -13,6 +13,7 @@ package io.vertx.core.shareddata;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
@@ -684,11 +685,11 @@ public abstract class AsyncMapTest extends VertxTestBase {
protected void loadData(Map<JsonObject, Buffer> map, BiConsumer<Vertx, AsyncMap<JsonObject, Buffer>> test) {
List<Future> futures = new ArrayList<>(map.size());
map.forEach((key, value) -> {
Future future = Future.future();
Promise future = Promise.promise();
getVertx().sharedData().getAsyncMap("foo", onSuccess(asyncMap -> {
asyncMap.put(key, value, future);
}));
futures.add(future);
futures.add(future.future());
});
CompositeFuture.all(futures).setHandler(onSuccess(cf -> {
Vertx v = getVertx();

View File

@@ -861,7 +861,7 @@ public class MetricsTest extends VertxTestBase {
assertThat(metrics.getPoolSize(), is(getOptions().getInternalBlockingPoolSize()));
assertThat(metrics.numberOfIdleThreads(), is(getOptions().getWorkerPoolSize()));
Handler<Future<Void>> job = getSomeDumbTask();
Handler<Promise<Void>> job = getSomeDumbTask();
AtomicInteger counter = new AtomicInteger();
AtomicBoolean hadWaitingQueue = new AtomicBoolean();
@@ -977,7 +977,7 @@ public class MetricsTest extends VertxTestBase {
CountDownLatch latch = new CountDownLatch(1);
Verticle worker = new AbstractVerticle() {
@Override
public void start(Future<Void> done) throws Exception {
public void start(Promise<Void> done) throws Exception {
vertx.eventBus().localConsumer("message", d -> {
msg.incrementAndGet();
try {
@@ -1043,7 +1043,7 @@ public class MetricsTest extends VertxTestBase {
assertThat(metrics.getPoolSize(), is(10));
assertThat(metrics.numberOfIdleThreads(), is(10));
Handler<Future<Void>> job = getSomeDumbTask();
Handler<Promise<Void>> job = getSomeDumbTask();
AtomicInteger counter = new AtomicInteger();
AtomicBoolean hadWaitingQueue = new AtomicBoolean();
@@ -1106,7 +1106,7 @@ public class MetricsTest extends VertxTestBase {
assertTrue(metrics2.isClosed());
}
private Handler<Future<Void>> getSomeDumbTask() {
private Handler<Promise<Void>> getSomeDumbTask() {
return (future) -> {
try {
Thread.sleep(50);

View File

@@ -14,6 +14,7 @@ import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.test.core.AsyncTestBase;
import org.junit.Test;
@@ -38,21 +39,21 @@ public class WriteStreamTest extends AsyncTestBase {
static class EndWithItemStreamAsync extends StreamBase<Object> {
AtomicInteger writeCount = new AtomicInteger();
Future<Void> writeFut = Future.future();
Promise<Void> writeFut = Promise.promise();
AtomicInteger endCount = new AtomicInteger();
Future<Void> endFut = Future.future();
Promise<Void> endFut = Promise.promise();
AtomicInteger resolvedCount = new AtomicInteger();
Future<Void> resolvedFut = Future.future();
Promise<Void> resolvedFut = Promise.promise();
@Override
public StreamBase<Object> write(Object data, Handler<AsyncResult<Void>> handler) {
writeCount.incrementAndGet();
writeFut.setHandler(handler);
writeFut.future().setHandler(handler);
return this;
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
endCount.incrementAndGet();
endFut.setHandler(handler);
endFut.future().setHandler(handler);
}
public void end(Object item) {
end(item, ar -> {
@@ -80,8 +81,8 @@ public class WriteStreamTest extends AsyncTestBase {
assertEquals(1, src.writeCount.get());
assertEquals(1, src.endCount.get());
assertEquals(1, src.resolvedCount.get());
assertTrue(src.resolvedFut.succeeded());
assertNull(src.resolvedFut.result());
assertTrue(src.resolvedFut.future().succeeded());
assertNull(src.resolvedFut.future().result());
src = new EndWithItemStreamAsync();
src.end(item);
@@ -89,8 +90,8 @@ public class WriteStreamTest extends AsyncTestBase {
assertEquals(1, src.writeCount.get());
assertEquals(0, src.endCount.get());
assertEquals(1, src.resolvedCount.get());
assertTrue(src.resolvedFut.failed());
assertSame(cause, src.resolvedFut.cause());
assertTrue(src.resolvedFut.future().failed());
assertSame(cause, src.resolvedFut.future().cause());
src = new EndWithItemStreamAsync();
src.end(item);
@@ -99,8 +100,8 @@ public class WriteStreamTest extends AsyncTestBase {
assertEquals(1, src.writeCount.get());
assertEquals(1, src.endCount.get());
assertEquals(1, src.resolvedCount.get());
assertTrue(src.resolvedFut.failed());
assertSame(cause, src.resolvedFut.cause());
assertTrue(src.resolvedFut.future().failed());
assertSame(cause, src.resolvedFut.future().cause());
}
static class EndStreamSync extends StreamBase<Object> {

View File

@@ -14,6 +14,7 @@ package io.vertx.test.verticles;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
@@ -41,12 +42,12 @@ public class FaultToleranceVerticle extends AbstractVerticle {
numAddresses = config.getInteger("addressesCount");
List<Future> registrationFutures = new ArrayList<>(numAddresses);
for (int i = 0; i < numAddresses; i++) {
Future<Void> registrationFuture = Future.future();
registrationFutures.add(registrationFuture);
Promise<Void> registrationFuture = Promise.promise();
registrationFutures.add(registrationFuture.future());
vertx.eventBus().consumer(createAddress(id, i), msg -> msg.reply("pong")).completionHandler(registrationFuture);
}
Future<Void> registrationFuture = Future.future();
registrationFutures.add(registrationFuture);
Promise<Void> registrationFuture = Promise.promise();
registrationFutures.add(registrationFuture.future());
vertx.eventBus().consumer("ping", this::ping).completionHandler(registrationFuture);
CompositeFuture.all(registrationFutures).setHandler(ar -> {
if (ar.succeeded()) {

View File

@@ -12,7 +12,7 @@
package io.vertx.test.verticles;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
@@ -22,7 +22,7 @@ import io.vertx.core.http.HttpServerOptions;
public class SimpleServer extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
HttpServer server = vertx.createHttpServer(new HttpServerOptions().setPort(8080));
server.requestHandler(req -> req.response().end());
server.listen(res -> {

View File

@@ -13,7 +13,7 @@ package io.vertx.test.verticles;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import java.util.HashSet;
import java.util.Set;
@@ -38,9 +38,9 @@ public class TestVerticle2 extends AbstractVerticle {
}
@Override
public void stop(Future<Void> stopFuture) throws Exception {
public void stop(Promise<Void> stopPromise) throws Exception {
vertx.eventBus().send("tvstopped", "stopped", reply -> {
stopFuture.complete(null);
stopPromise.complete(null);
});
}
}

View File

@@ -13,7 +13,7 @@ package io.vertx.test.verticles.sourceverticle;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.test.verticles.sourceverticle.somepackage.OtherSourceVerticle;
/**
@@ -23,7 +23,7 @@ public class SourceVerticle extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(Promise<Void> startFuture) throws Exception {
vertx.deployVerticle("java:" + OtherSourceVerticle.class.getName().replace('.', '/') + ".java", new DeploymentOptions(), ar -> {
if (ar.succeeded()) {
startFuture.complete((Void) null);