diff --git a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java index 5a6f42ff8..1eadf05e1 100644 --- a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java @@ -11,42 +11,20 @@ package io.vertx.core.eventbus; -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; -import io.vertx.core.buffer.Buffer; import io.vertx.core.impl.VertxInternal; -import io.vertx.core.net.SelfSignedCertificate; -import io.vertx.core.spi.cluster.AsyncMultiMap; -import io.vertx.core.spi.cluster.ChoosableIterable; -import io.vertx.core.spi.cluster.ClusterManager; -import io.vertx.test.core.Repeat; import io.vertx.test.core.TestUtils; -import io.vertx.test.fakecluster.FakeClusterManager; import io.vertx.test.tls.Cert; -import io.vertx.test.tls.Trust; import org.junit.Test; -import java.io.IOException; -import java.nio.channels.ClosedChannelException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.function.Predicate; -import java.util.stream.IntStream; /** @@ -447,57 +425,4 @@ public class ClusteredEventBusTest extends ClusteredEventBusTestBase { })); await(); } - - @Test - public void testWriteHandlerLookupFailure() { - Throwable cause = new Throwable(); - ClusterManager cm = new FakeClusterManager() { - @Override - public void getAsyncMultiMap(String name, Handler>> handler) { - if ("__vertx.subs".equals(name)) { - super.getAsyncMultiMap(name, ar -> { - handler.handle(ar.map(map -> { - return new AsyncMultiMap() { - @Override - public void add(K k, V v, Handler> completionHandler) { - map.add(k, v, completionHandler); - } - @Override - public void get(K k, Handler>> completionHandler) { - completionHandler.handle(Future.failedFuture(cause)); - } - @Override - public void remove(K k, V v, Handler> completionHandler) { - map.remove(k, v, completionHandler); - } - @Override - public void removeAllForValue(V v, Handler> completionHandler) { - map.removeAllForValue(v, completionHandler); - } - @Override - public void removeAllMatching(Predicate p, Handler> completionHandler) { - map.removeAllMatching(p, completionHandler); - } - }; - })); - }); - } else { - super.getAsyncMultiMap(name, handler); - } - } - }; - VertxOptions options = new VertxOptions().setClusterManager(cm); - options.getEventBusOptions().setHost("localhost").setPort(0).setClustered(true); - vertices = new Vertx[1]; - clusteredVertx(options, onSuccess(node -> { - vertices[0] = node; - })); - assertWaitUntil(() -> vertices[0] != null); - MessageProducer sender = vertices[0].eventBus().sender(ADDRESS1); - sender.write("the_string", onFailure(err -> { - assertSame(cause, err); - testComplete(); - })); - await(); - } } diff --git a/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java b/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java new file mode 100644 index 000000000..8dafae673 --- /dev/null +++ b/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.eventbus; + +import io.vertx.core.*; +import io.vertx.core.spi.cluster.AsyncMultiMap; +import io.vertx.core.spi.cluster.ChoosableIterable; +import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.test.core.VertxTestBase; +import io.vertx.test.fakecluster.FakeClusterManager; +import org.junit.Test; + +import java.util.function.Predicate; + +/** + * @author Julien Viet + */ +public final class WriteHandlerLookupFailureTest extends VertxTestBase { + + @Test + public void test() { + Throwable cause = new Throwable(); + ClusterManager cm = new FakeClusterManager() { + @Override + public void getAsyncMultiMap(String name, Handler>> handler) { + if ("__vertx.subs".equals(name)) { + super.getAsyncMultiMap(name, ar -> { + handler.handle(ar.map(map -> { + return new AsyncMultiMap() { + @Override + public void add(K k, V v, Handler> completionHandler) { + map.add(k, v, completionHandler); + } + + @Override + public void get(K k, Handler>> completionHandler) { + completionHandler.handle(Future.failedFuture(cause)); + } + + @Override + public void remove(K k, V v, Handler> completionHandler) { + map.remove(k, v, completionHandler); + } + + @Override + public void removeAllForValue(V v, Handler> completionHandler) { + map.removeAllForValue(v, completionHandler); + } + + @Override + public void removeAllMatching(Predicate p, Handler> completionHandler) { + map.removeAllMatching(p, completionHandler); + } + }; + })); + }); + } else { + super.getAsyncMultiMap(name, handler); + } + } + }; + VertxOptions options = new VertxOptions().setClusterManager(cm); + options.getEventBusOptions().setHost("localhost").setPort(0).setClustered(true); + vertices = new Vertx[1]; + clusteredVertx(options, onSuccess(node -> { + vertices[0] = node; + })); + assertWaitUntil(() -> vertices[0] != null); + MessageProducer sender = vertices[0].eventBus().sender("foo"); + sender.write("the_string", onFailure(err -> { + assertSame(cause, err); + testComplete(); + })); + await(); + } +}