Added EventBus consumer multi threaded worker tests

This commit is contained in:
Julien Viet
2018-11-08 18:21:42 +01:00
parent cd86a49d8f
commit 22dfcb47ff

View File

@@ -26,9 +26,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
@@ -1320,5 +1318,66 @@ public class LocalEventBusTest extends EventBusTestBase {
});
await();
}
@Test
public void testMTWorkerConsumer() {
int num = 3;
waitFor(num);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() {
CyclicBarrier barrier = new CyclicBarrier(3);
vertx.eventBus().consumer(ADDRESS1, msg -> {
try {
barrier.await();
complete();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail(e);
} catch (BrokenBarrierException e) {
fail(e);
}
});
}
}, new DeploymentOptions()
.setInstances(1)
.setWorker(true)
.setMultiThreaded(true), onSuccess(id -> {
for (int i = 0;i < num;i++) {
vertx.eventBus().send(ADDRESS1, "msg-" + i);
}
}));
await();
}
@Test
public void testMTExecBlockingConsumer() {
int num = 3;
waitFor(num);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() {
CyclicBarrier barrier = new CyclicBarrier(3);
vertx.eventBus().consumer(ADDRESS1, msg -> {
vertx.executeBlocking(block -> {
try {
barrier.await();
complete();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail(e);
} catch (BrokenBarrierException e) {
fail(e);
}
}, false, null);
});
}
}, onSuccess(id -> {
for (int i = 0;i < num;i++) {
vertx.eventBus().send(ADDRESS1, "msg-" + i);
}
}));
await();
}
}