mirror of
https://github.com/jlengrand/vert.x.git
synced 2026-03-10 08:51:19 +00:00
@@ -34,14 +34,14 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
* Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned.
|
||||
*
|
||||
* @param handler the handler
|
||||
* @param <T> the result type
|
||||
* @param <T> the result type
|
||||
* @return the future.
|
||||
*/
|
||||
static <T> Future<T> future(Handler<Promise<T>> handler) {
|
||||
Promise<T> promise = Promise.promise();
|
||||
try {
|
||||
handler.handle(promise);
|
||||
} catch (Throwable e) {
|
||||
} catch (Throwable e){
|
||||
promise.tryFail(e);
|
||||
}
|
||||
return promise.future();
|
||||
@@ -50,8 +50,8 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
/**
|
||||
* Create a succeeded future with a null result
|
||||
*
|
||||
* @param <T> the result type
|
||||
* @return the future
|
||||
* @param <T> the result type
|
||||
* @return the future
|
||||
*/
|
||||
static <T> Future<T> succeededFuture() {
|
||||
return factory.succeededFuture();
|
||||
@@ -60,9 +60,9 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
/**
|
||||
* Created a succeeded future with the specified result.
|
||||
*
|
||||
* @param result the result
|
||||
* @param <T> the result type
|
||||
* @return the future
|
||||
* @param result the result
|
||||
* @param <T> the result type
|
||||
* @return the future
|
||||
*/
|
||||
static <T> Future<T> succeededFuture(T result) {
|
||||
if (result == null) {
|
||||
@@ -75,9 +75,9 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
/**
|
||||
* Create a failed future with the specified failure cause.
|
||||
*
|
||||
* @param t the failure cause as a Throwable
|
||||
* @param <T> the result type
|
||||
* @return the future
|
||||
* @param t the failure cause as a Throwable
|
||||
* @param <T> the result type
|
||||
* @return the future
|
||||
*/
|
||||
static <T> Future<T> failedFuture(Throwable t) {
|
||||
return factory.failedFuture(t);
|
||||
@@ -86,9 +86,9 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
/**
|
||||
* Create a failed future with the specified failure message.
|
||||
*
|
||||
* @param failureMessage the failure message
|
||||
* @param <T> the result type
|
||||
* @return the future
|
||||
* @param failureMessage the failure message
|
||||
* @param <T> the result type
|
||||
* @return the future
|
||||
*/
|
||||
static <T> Future<T> failedFuture(String failureMessage) {
|
||||
return factory.failureFuture(failureMessage);
|
||||
@@ -114,7 +114,6 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
/**
|
||||
* Add a handler to be notified of the result.
|
||||
* <br/>
|
||||
*
|
||||
* @param handler the handler that will be called with the result
|
||||
* @return a reference to this, so it can be used fluently
|
||||
*/
|
||||
@@ -124,7 +123,6 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
/**
|
||||
* Add a handler to be notified of the succeeded result.
|
||||
* <br/>
|
||||
*
|
||||
* @param handler the handler that will be called with the succeeded result
|
||||
* @return a reference to this, so it can be used fluently
|
||||
*/
|
||||
@@ -140,7 +138,6 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
/**
|
||||
* Add a handler to be notified of the failed result.
|
||||
* <br/>
|
||||
*
|
||||
* @param handler the handler that will be called with the failed result
|
||||
* @return a reference to this, so it can be used fluently
|
||||
*/
|
||||
@@ -199,13 +196,13 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
|
||||
/**
|
||||
* Compose this future with a {@code mapper} function.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future (the one on which {@code compose} is called) succeeds, the {@code mapper} will be called with
|
||||
* the completed value and this mapper returns another future object. This returned future completion will complete
|
||||
* the future returned by this method call.<p>
|
||||
* <p>
|
||||
*
|
||||
* If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future fails, the failure will be propagated to the returned future and the {@code mapper}
|
||||
* will not be called.
|
||||
*
|
||||
@@ -225,15 +222,15 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
|
||||
/**
|
||||
* Compose this future with a {@code successMapper} and {@code failureMapper} functions.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future (the one on which {@code compose} is called) succeeds, the {@code successMapper} will be called with
|
||||
* the completed value and this mapper returns another future object. This returned future completion will complete
|
||||
* the future returned by this method call.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future (the one on which {@code compose} is called) fails, the {@code failureMapper} will be called with
|
||||
* the failure and this mapper returns another future object. This returned future completion will complete
|
||||
* the future returned by this method call.<p>
|
||||
* <p>
|
||||
*
|
||||
* If any mapper function throws an exception, the returned future will be failed with this exception.<p>
|
||||
*
|
||||
* @param successMapper the function mapping the success
|
||||
@@ -280,12 +277,12 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
|
||||
/**
|
||||
* Apply a {@code mapper} function on this future.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future succeeds, the {@code mapper} will be called with the completed value and this mapper
|
||||
* returns a value. This value will complete the future returned by this method call.<p>
|
||||
* <p>
|
||||
*
|
||||
* If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future fails, the failure will be propagated to the returned future and the {@code mapper}
|
||||
* will not be called.
|
||||
*
|
||||
@@ -322,9 +319,9 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
|
||||
/**
|
||||
* Map the result of a future to a specific {@code value}.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future succeeds, this {@code value} will complete the future returned by this method call.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future fails, the failure will be propagated to the returned future.
|
||||
*
|
||||
* @param value the value that eventually completes the mapped future
|
||||
@@ -350,11 +347,11 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
|
||||
/**
|
||||
* Map the result of a future to {@code null}.<p>
|
||||
* <p>
|
||||
*
|
||||
* This is a conveniency for {@code future.map((T) null)} or {@code future.map((Void) null)}.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future succeeds, {@code null} will complete the future returned by this method call.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future fails, the failure will be propagated to the returned future.
|
||||
*
|
||||
* @return the mapped future
|
||||
@@ -401,12 +398,12 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
|
||||
/**
|
||||
* Apply a {@code mapper} function on this future.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future fails, the {@code mapper} will be called with the completed value and this mapper
|
||||
* returns a value. This value will complete the future returned by this method call.<p>
|
||||
* <p>
|
||||
*
|
||||
* If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future succeeds, the result will be propagated to the returned future and the {@code mapper}
|
||||
* will not be called.
|
||||
*
|
||||
@@ -443,9 +440,9 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
|
||||
/**
|
||||
* Map the failure of a future to a specific {@code value}.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future fails, this {@code value} will complete the future returned by this method call.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future succeeds, the result will be propagated to the returned future.
|
||||
*
|
||||
* @param value the value that eventually completes the mapped future
|
||||
@@ -471,11 +468,11 @@ public interface Future<T> extends AsyncResult<T> {
|
||||
|
||||
/**
|
||||
* Map the failure of a future to {@code null}.<p>
|
||||
* <p>
|
||||
*
|
||||
* This is a convenience for {@code future.otherwise((T) null)}.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future fails, the {@code null} value will complete the future returned by this method call.<p>
|
||||
* <p>
|
||||
*
|
||||
* When this future succeeds, the result will be propagated to the returned future.
|
||||
*
|
||||
* @return the mapped future
|
||||
|
||||
@@ -13,6 +13,7 @@ package io.vertx.core.impl;
|
||||
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.vertx.codegen.annotations.Nullable;
|
||||
import io.vertx.core.*;
|
||||
import io.vertx.core.impl.logging.Logger;
|
||||
import io.vertx.core.impl.logging.LoggerFactory;
|
||||
@@ -33,7 +34,6 @@ abstract class ContextImpl extends AbstractContext {
|
||||
/**
|
||||
* Execute the {@code task} disabling the thread-local association for the duration
|
||||
* of the execution. {@link Vertx#currentContext()} will return {@code null},
|
||||
*
|
||||
* @param task the task to execute
|
||||
* @throws IllegalStateException if the current thread is not a Vertx thread
|
||||
*/
|
||||
@@ -154,7 +154,7 @@ abstract class ContextImpl extends AbstractContext {
|
||||
}
|
||||
|
||||
static <T> Future<T> executeBlocking(ContextInternal context, Handler<Promise<T>> blockingCodeHandler,
|
||||
WorkerPool workerPool, TaskQueue queue) {
|
||||
WorkerPool workerPool, TaskQueue queue) {
|
||||
PoolMetrics metrics = workerPool.metrics();
|
||||
Object queueMetric = metrics != null ? metrics.submitted() : null;
|
||||
Promise<T> promise = context.promise();
|
||||
|
||||
@@ -197,7 +197,7 @@ public class FutureTest extends VertxTestBase {
|
||||
|
||||
@Test
|
||||
public void testCreateFailedWithNullFailure() {
|
||||
Future<String> future = Future.failedFuture((Throwable) null);
|
||||
Future<String> future = Future.failedFuture((Throwable)null);
|
||||
Checker<String> checker = new Checker<>(future);
|
||||
NoStackTraceThrowable failure = (NoStackTraceThrowable) checker.assertFailed();
|
||||
assertNull(failure.getMessage());
|
||||
@@ -206,7 +206,7 @@ public class FutureTest extends VertxTestBase {
|
||||
@Test
|
||||
public void testFailureFutureWithNullFailure() {
|
||||
Promise<String> promise = Promise.promise();
|
||||
promise.fail((Throwable) null);
|
||||
promise.fail((Throwable)null);
|
||||
Checker<String> checker = new Checker<>(promise.future());
|
||||
NoStackTraceThrowable failure = (NoStackTraceThrowable) checker.assertFailed();
|
||||
assertNull(failure.getMessage());
|
||||
@@ -239,7 +239,7 @@ public class FutureTest extends VertxTestBase {
|
||||
p2.complete(3);
|
||||
checker.assertSucceeded(composite);
|
||||
assertEquals("something", composite.resultAt(0));
|
||||
assertEquals(3, (int) composite.resultAt(1));
|
||||
assertEquals(3, (int)composite.resultAt(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -283,22 +283,22 @@ public class FutureTest extends VertxTestBase {
|
||||
|
||||
private void testAllLargeList(int size) {
|
||||
List<Future> list = new ArrayList<>();
|
||||
for (int i = 0; i < size; i++) {
|
||||
for (int i = 0;i < size;i++) {
|
||||
list.add(Future.succeededFuture());
|
||||
}
|
||||
CompositeFuture composite = CompositeFuture.all(list);
|
||||
Checker<CompositeFuture> checker = new Checker<>(composite);
|
||||
checker.assertSucceeded(composite);
|
||||
for (int i = 0; i < size; i++) {
|
||||
for (int i = 0;i < size;i++) {
|
||||
list.clear();
|
||||
Throwable cause = new Exception();
|
||||
for (int j = 0; j < size; j++) {
|
||||
for (int j = 0;j < size;j++) {
|
||||
list.add(i == j ? Future.failedFuture(cause) : Future.succeededFuture());
|
||||
}
|
||||
composite = CompositeFuture.all(list);
|
||||
checker = new Checker<>(composite);
|
||||
checker.assertFailed(cause);
|
||||
for (int j = 0; j < size; j++) {
|
||||
for (int j = 0;j < size;j++) {
|
||||
if (i == j) {
|
||||
assertTrue(composite.failed(j));
|
||||
} else {
|
||||
@@ -397,21 +397,21 @@ public class FutureTest extends VertxTestBase {
|
||||
|
||||
private void testAnyLargeList(int size) {
|
||||
List<Future> list = new ArrayList<>();
|
||||
for (int i = 0; i < size; i++) {
|
||||
for (int i = 0;i < size;i++) {
|
||||
list.add(Future.failedFuture(new Exception()));
|
||||
}
|
||||
CompositeFuture composite = CompositeFuture.any(list);
|
||||
Checker<CompositeFuture> checker = new Checker<>(composite);
|
||||
assertNotNull(checker.assertFailed());
|
||||
for (int i = 0; i < size; i++) {
|
||||
for (int i = 0;i < size;i++) {
|
||||
list.clear();
|
||||
for (int j = 0; j < size; j++) {
|
||||
for (int j = 0;j < size;j++) {
|
||||
list.add(i == j ? Future.succeededFuture() : Future.failedFuture(new RuntimeException()));
|
||||
}
|
||||
composite = CompositeFuture.any(list);
|
||||
checker = new Checker<>(composite);
|
||||
checker.assertSucceeded(composite);
|
||||
for (int j = 0; j < size; j++) {
|
||||
for (int j = 0;j < size;j++) {
|
||||
if (i == j) {
|
||||
assertTrue(composite.succeeded(j));
|
||||
} else {
|
||||
@@ -552,7 +552,7 @@ public class FutureTest extends VertxTestBase {
|
||||
ref.set(string);
|
||||
return c;
|
||||
});
|
||||
Checker<Integer> checker = new Checker<>(f4);
|
||||
Checker<Integer> checker = new Checker<>(f4);
|
||||
p3.complete("abcdef");
|
||||
checker.assertNotCompleted();
|
||||
assertEquals("abcdef", ref.get());
|
||||
@@ -595,9 +595,7 @@ public class FutureTest extends VertxTestBase {
|
||||
RuntimeException cause = new RuntimeException();
|
||||
Promise<String> p3 = Promise.promise();
|
||||
Future<String> f3 = p3.future();
|
||||
Future<Integer> f4 = f3.compose(string -> {
|
||||
throw cause;
|
||||
});
|
||||
Future<Integer> f4 = f3.compose(string -> { throw cause; });
|
||||
Checker<Integer> checker = new Checker<>(f4);
|
||||
p3.complete("foo");
|
||||
checker.assertFailed(cause);
|
||||
@@ -788,58 +786,22 @@ public class FutureTest extends VertxTestBase {
|
||||
public void testDefaultCompleter() {
|
||||
AsyncResult<Object> succeededAsyncResult = new AsyncResult<Object>() {
|
||||
Object result = new Object();
|
||||
|
||||
public Object result() {
|
||||
return result;
|
||||
}
|
||||
|
||||
public Throwable cause() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public boolean succeeded() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean failed() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public <U> AsyncResult<U> map(Function<Object, U> mapper) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public <V> AsyncResult<V> map(V value) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
public Object result() { return result; }
|
||||
public Throwable cause() { throw new UnsupportedOperationException(); }
|
||||
public boolean succeeded() { return true; }
|
||||
public boolean failed() { throw new UnsupportedOperationException(); }
|
||||
public <U> AsyncResult<U> map(Function<Object, U> mapper) { throw new UnsupportedOperationException(); }
|
||||
public <V> AsyncResult<V> map(V value) { throw new UnsupportedOperationException(); }
|
||||
};
|
||||
|
||||
AsyncResult<Object> failedAsyncResult = new AsyncResult<Object>() {
|
||||
Throwable cause = new Throwable();
|
||||
|
||||
public Object result() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public Throwable cause() {
|
||||
return cause;
|
||||
}
|
||||
|
||||
public boolean succeeded() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean failed() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public <U> AsyncResult<U> map(Function<Object, U> mapper) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public <V> AsyncResult<V> map(V value) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
public Object result() { throw new UnsupportedOperationException(); }
|
||||
public Throwable cause() { return cause; }
|
||||
public boolean succeeded() { return false; }
|
||||
public boolean failed() { throw new UnsupportedOperationException(); }
|
||||
public <U> AsyncResult<U> map(Function<Object, U> mapper) { throw new UnsupportedOperationException(); }
|
||||
public <V> AsyncResult<V> map(V value) { throw new UnsupportedOperationException(); }
|
||||
};
|
||||
|
||||
class DefaultCompleterTestFuture<T> implements Future<T> {
|
||||
@@ -847,43 +809,30 @@ public class FutureTest extends VertxTestBase {
|
||||
boolean failed;
|
||||
T result;
|
||||
Throwable cause;
|
||||
|
||||
public boolean isComplete() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public Future<T> onComplete(Handler<AsyncResult<T>> handler) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public Handler<AsyncResult<T>> getHandler() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
public boolean isComplete() { throw new UnsupportedOperationException(); }
|
||||
public Future<T> onComplete(Handler<AsyncResult<T>> handler) { throw new UnsupportedOperationException(); }
|
||||
public Handler<AsyncResult<T>> getHandler() { throw new UnsupportedOperationException(); }
|
||||
|
||||
public void complete(T result) {
|
||||
if (!tryComplete(result)) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
public void complete() {
|
||||
if (!tryComplete()) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
public void fail(Throwable cause) {
|
||||
if (!tryFail(cause)) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
public void fail(String failureMessage) {
|
||||
if (!tryFail(failureMessage)) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean tryComplete(T result) {
|
||||
if (succeeded || failed) {
|
||||
return false;
|
||||
@@ -892,11 +841,7 @@ public class FutureTest extends VertxTestBase {
|
||||
this.result = result;
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean tryComplete() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public boolean tryComplete() { throw new UnsupportedOperationException(); }
|
||||
public boolean tryFail(Throwable cause) {
|
||||
if (succeeded || failed) {
|
||||
return false;
|
||||
@@ -905,27 +850,11 @@ public class FutureTest extends VertxTestBase {
|
||||
this.cause = cause;
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean tryFail(String failureMessage) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public T result() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public Throwable cause() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public boolean succeeded() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public boolean failed() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public boolean tryFail(String failureMessage) { throw new UnsupportedOperationException(); }
|
||||
public T result() { throw new UnsupportedOperationException(); }
|
||||
public Throwable cause() { throw new UnsupportedOperationException(); }
|
||||
public boolean succeeded() { throw new UnsupportedOperationException(); }
|
||||
public boolean failed() { throw new UnsupportedOperationException(); }
|
||||
public void handle(AsyncResult<T> asyncResult) {
|
||||
if (asyncResult.succeeded()) {
|
||||
complete(asyncResult.result());
|
||||
@@ -1025,9 +954,9 @@ public class FutureTest extends VertxTestBase {
|
||||
AsyncResult<Integer> map1 = res.map(String::length);
|
||||
AsyncResult<Integer> map2 = res.map(17);
|
||||
p.complete("foobar");
|
||||
assertEquals(6, (int) map1.result());
|
||||
assertEquals(6, (int)map1.result());
|
||||
assertNull(map1.cause());
|
||||
assertEquals(17, (int) map2.result());
|
||||
assertEquals(17, (int)map2.result());
|
||||
assertNull(map2.cause());
|
||||
}
|
||||
|
||||
@@ -1287,21 +1216,17 @@ public class FutureTest extends VertxTestBase {
|
||||
Future<String> f = promise.future();
|
||||
Field handlerField = f.getClass().getDeclaredField("handler");
|
||||
handlerField.setAccessible(true);
|
||||
f.setHandler(ar -> {
|
||||
});
|
||||
f.setHandler(ar -> {});
|
||||
promise.complete();
|
||||
assertNull(handlerField.get(f));
|
||||
f.setHandler(ar -> {
|
||||
});
|
||||
f.setHandler(ar -> {});
|
||||
assertNull(handlerField.get(f));
|
||||
promise = Promise.promise();
|
||||
f = promise.future();
|
||||
f.setHandler(ar -> {
|
||||
});
|
||||
f.setHandler(ar -> {});
|
||||
promise.fail("abc");
|
||||
assertNull(handlerField.get(f));
|
||||
f.setHandler(ar -> {
|
||||
});
|
||||
f.setHandler(ar -> {});
|
||||
assertNull(handlerField.get(f));
|
||||
}
|
||||
|
||||
@@ -1330,7 +1255,7 @@ public class FutureTest extends VertxTestBase {
|
||||
ctx.runOnContext(v -> {
|
||||
latch.complete(Thread.currentThread());
|
||||
});
|
||||
Thread elThread = latch.get(10, SECONDS);
|
||||
Thread elThread = latch.get(10, TimeUnit.SECONDS);
|
||||
|
||||
//
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
|
||||
Reference in New Issue
Block a user