mirror of
https://github.com/jlengrand/quarkus.git
synced 2026-03-10 08:41:22 +00:00
Merge pull request #7231 from cescoffier/features/vertx-mutiny-verticle
Add support for MutinyVerticle
This commit is contained in:
@@ -127,8 +127,8 @@
|
||||
<test-containers.version>1.12.4</test-containers.version>
|
||||
<jboss-logging.version>3.3.2.Final</jboss-logging.version>
|
||||
<mutiny.version>0.4.0</mutiny.version>
|
||||
<axle-client.version>0.0.12</axle-client.version>
|
||||
<mutiny-client.version>0.0.12</mutiny-client.version>
|
||||
<axle-client.version>0.0.13</axle-client.version>
|
||||
<mutiny-client.version>0.0.13</mutiny-client.version>
|
||||
<kafka2.version>2.4.0</kafka2.version>
|
||||
<debezium.version>1.0.0.Final</debezium.version>
|
||||
<zookeeper.version>3.4.14</zookeeper.version>
|
||||
|
||||
@@ -35,7 +35,7 @@
|
||||
<graal-sdk.version-for-documentation>19.3.1</graal-sdk.version-for-documentation>
|
||||
<rest-assured.version>4.1.1</rest-assured.version>
|
||||
<axle-client.version>0.0.12</axle-client.version>
|
||||
<mutiny-client.version>0.0.12</mutiny-client.version>
|
||||
<mutiny-client.version>0.0.13</mutiny-client.version>
|
||||
<vertx.version>3.8.5</vertx.version>
|
||||
|
||||
<!-- Dev tools -->
|
||||
|
||||
@@ -498,6 +498,92 @@ Then, create the native executable with:
|
||||
./mvnw package -Pnative
|
||||
----
|
||||
|
||||
== Deploying verticles
|
||||
|
||||
https://vertx.io/docs/vertx-core/java/#_verticles[Verticles] is "a simple, scalable, actor-like deployment and concurrency model" provided by _Vert.x_.
|
||||
This model does not claim to be a strict actor-model implementation, but it does share similarities especially with respect to concurrency, scaling and deployment.
|
||||
To use this model, you write and _deploy_ verticles, communicating with each other by sending messages on the event bus.
|
||||
|
||||
You can deploy _verticles_ in Quarkus.
|
||||
It supports:
|
||||
|
||||
* _bare_ verticle - Java classes extending `io.vertx.core.AbstractVerticle`
|
||||
* _Mutiny_ verticle - Java classes extending `io.smallrye.mutiny.vertx.core.AbstractVerticle`
|
||||
|
||||
To deploy verticles, use the regular Vert.x API:
|
||||
|
||||
[source, java]
|
||||
====
|
||||
@Inject Vertx vertx;
|
||||
|
||||
// ...
|
||||
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
|
||||
vertx.deployVerticle(new MyVerticle(), ar -> { });
|
||||
====
|
||||
|
||||
You can also pass deployment options to configure the verticle as well as set the number of instances.
|
||||
|
||||
Verticles are not _beans_ by default.
|
||||
However, you can implement them as _ApplicationScoped_ beans and get injection support:
|
||||
|
||||
[source, java]
|
||||
====
|
||||
package io.quarkus.vertx.verticles;
|
||||
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
|
||||
@ApplicationScoped
|
||||
public class MyBeanVerticle extends AbstractVerticle {
|
||||
|
||||
@ConfigProperty(name = "address") String address;
|
||||
|
||||
@Override
|
||||
public Uni<Void> asyncStart() {
|
||||
return vertx.eventBus().consumer(address)
|
||||
.handler(m -> m.replyAndForget("hello"))
|
||||
.completionHandler();
|
||||
}
|
||||
}
|
||||
====
|
||||
|
||||
You don't have to inject the `vertx` instance but instead leverage the instance stored in the protected field of `AbstractVerticle`.
|
||||
|
||||
Then, deploy the verticle instance with:
|
||||
|
||||
[source, java]
|
||||
====
|
||||
package io.quarkus.vertx.verticles;
|
||||
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import io.vertx.mutiny.core.Vertx;
|
||||
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.enterprise.event.Observes;
|
||||
|
||||
@ApplicationScoped
|
||||
public class VerticleDeployer {
|
||||
|
||||
public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
|
||||
vertx.deployVerticle(verticle).await().indefinitely();
|
||||
}
|
||||
}
|
||||
====
|
||||
|
||||
If you want to deploy every exposed `AbstractVerticle`, you can use:
|
||||
|
||||
[source, java]
|
||||
====
|
||||
public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
|
||||
for (AbstractVerticle verticle : verticles) {
|
||||
vertx.deployVerticle(verticle).await().indefinitely();
|
||||
}
|
||||
}
|
||||
====
|
||||
|
||||
== Read only deployment environments
|
||||
|
||||
In environments with read only file systems you may receive errors of the form:
|
||||
|
||||
@@ -49,7 +49,6 @@ import io.quarkus.vertx.ConsumeEvent;
|
||||
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
|
||||
import io.quarkus.vertx.runtime.VertxProducer;
|
||||
import io.quarkus.vertx.runtime.VertxRecorder;
|
||||
import io.vertx.reactivex.core.AbstractVerticle;
|
||||
|
||||
class VertxProcessor {
|
||||
|
||||
@@ -158,10 +157,17 @@ class VertxProcessor {
|
||||
}
|
||||
|
||||
@BuildStep
|
||||
void registerRxVerticleClasses(CombinedIndexBuildItem indexBuildItem,
|
||||
void registerVerticleClasses(CombinedIndexBuildItem indexBuildItem,
|
||||
BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
|
||||
// RX Verticles
|
||||
for (ClassInfo ci : indexBuildItem.getIndex()
|
||||
.getAllKnownSubclasses(DotName.createSimple(AbstractVerticle.class.getName()))) {
|
||||
.getAllKnownSubclasses(DotName.createSimple(io.vertx.reactivex.core.AbstractVerticle.class.getName()))) {
|
||||
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, ci.toString()));
|
||||
}
|
||||
|
||||
// Mutiny Verticles
|
||||
for (ClassInfo ci : indexBuildItem.getIndex()
|
||||
.getAllKnownSubclasses(DotName.createSimple(io.smallrye.mutiny.vertx.core.AbstractVerticle.class.getName()))) {
|
||||
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, ci.toString()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
package io.quarkus.vertx;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.jboss.shrinkwrap.api.ShrinkWrap;
|
||||
import org.jboss.shrinkwrap.api.spec.JavaArchive;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
import io.quarkus.test.QuarkusUnitTest;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import io.smallrye.mutiny.infrastructure.Infrastructure;
|
||||
import io.vertx.mutiny.core.Vertx;
|
||||
import io.vertx.mutiny.core.eventbus.Message;
|
||||
|
||||
public class MutinyCodecTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final QuarkusUnitTest config = new QuarkusUnitTest()
|
||||
.setArchiveProducer(() -> ShrinkWrap
|
||||
.create(JavaArchive.class).addClasses(MyBean.class, MyNonLocalBean.class,
|
||||
MyPetCodec.class, Person.class, Pet.class));
|
||||
|
||||
@Inject
|
||||
MyBean bean;
|
||||
|
||||
/**
|
||||
* Bean setting the consumption to be non-local.
|
||||
* So, the user must configure the codec explicitly.
|
||||
*/
|
||||
@Inject
|
||||
MyNonLocalBean nonLocalBean;
|
||||
|
||||
@Inject
|
||||
Vertx vertx;
|
||||
|
||||
@Test
|
||||
public void testWithGenericCodec() {
|
||||
Greeting hello = vertx.eventBus().<Greeting> request("person", new Person("bob", "morane"))
|
||||
.onItem().apply(Message::body)
|
||||
.await().indefinitely();
|
||||
assertThat(hello.getMessage()).isEqualTo("Hello bob morane");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithUserCodec() {
|
||||
Greeting hello = vertx.eventBus().<Greeting> request("pet", new Pet("neo", "rabbit"))
|
||||
.onItem().apply(Message::body)
|
||||
.await().indefinitely();
|
||||
assertThat(hello.getMessage()).isEqualTo("Hello NEO");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithUserCodecNonLocal() {
|
||||
Greeting hello = vertx.eventBus().<Greeting> request("nl-pet", new Pet("neo", "rabbit"))
|
||||
.onItem().apply(Message::body)
|
||||
.await().indefinitely();
|
||||
assertThat(hello.getMessage()).isEqualTo("Non Local Hello NEO");
|
||||
}
|
||||
|
||||
static class Greeting {
|
||||
private final String message;
|
||||
|
||||
Greeting(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
String getMessage() {
|
||||
return message;
|
||||
}
|
||||
}
|
||||
|
||||
static class MyBean {
|
||||
@ConsumeEvent("person")
|
||||
public Uni<Greeting> hello(Person p) {
|
||||
return Uni.createFrom().item(
|
||||
() -> new Greeting("Hello " + p.getFirstName() + " " + p.getLastName()))
|
||||
.emitOn(Infrastructure.getDefaultExecutor());
|
||||
}
|
||||
|
||||
@ConsumeEvent(value = "pet", codec = MyPetCodec.class)
|
||||
public Uni<Greeting> hello(Pet p) {
|
||||
return Uni.createFrom().item(
|
||||
() -> new Greeting("Hello " + p.getName()))
|
||||
.emitOn(Infrastructure.getDefaultExecutor());
|
||||
}
|
||||
}
|
||||
|
||||
static class MyNonLocalBean {
|
||||
@ConsumeEvent(value = "nl-pet", codec = MyPetCodec.class, local = false)
|
||||
public Uni<Greeting> hello(Pet p) {
|
||||
return Uni.createFrom().item(
|
||||
() -> new Greeting("Non Local Hello " + p.getName()))
|
||||
.emitOn(Infrastructure.getDefaultExecutor());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package io.quarkus.vertx.devmode;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.enterprise.event.Observes;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.jboss.shrinkwrap.api.ShrinkWrap;
|
||||
import org.jboss.shrinkwrap.api.spec.JavaArchive;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
import io.quarkus.test.QuarkusDevModeTest;
|
||||
import io.restassured.RestAssured;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.ext.web.Router;
|
||||
|
||||
public class MutinyVerticleClassnameHotReloadTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final QuarkusDevModeTest test = new QuarkusDevModeTest()
|
||||
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
|
||||
.addClasses(MyMutinyVerticle.class, BeanDeployingAVerticleFromInstance.class));
|
||||
|
||||
@Test
|
||||
public void testDeploymentOfMutinyVerticleClass() {
|
||||
String resp = RestAssured.get("/").asString();
|
||||
Assertions.assertTrue(resp.startsWith("ok"));
|
||||
test.modifySourceFile(MyMutinyVerticle.class, data -> data.replace("ok", "hello"));
|
||||
resp = RestAssured.get("/").asString();
|
||||
Assertions.assertTrue(resp.startsWith("hello"));
|
||||
String resp2 = RestAssured.get("/").asString();
|
||||
Assertions.assertEquals(resp, resp2);
|
||||
}
|
||||
|
||||
@ApplicationScoped
|
||||
public static class BeanDeployingAVerticleFromInstance {
|
||||
@Inject
|
||||
Vertx vertx;
|
||||
|
||||
public void init(@Observes Router router) throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
vertx.deployVerticle(MyMutinyVerticle.class.getName(),
|
||||
ar -> latch.countDown());
|
||||
router.get("/").handler(rc -> vertx.eventBus().<String> request("address", "",
|
||||
ar -> rc.response().end(ar.result().body())));
|
||||
latch.await();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package io.quarkus.vertx.devmode;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.enterprise.event.Observes;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.jboss.shrinkwrap.api.ShrinkWrap;
|
||||
import org.jboss.shrinkwrap.api.spec.JavaArchive;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
import io.quarkus.test.QuarkusDevModeTest;
|
||||
import io.restassured.RestAssured;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.ext.web.Router;
|
||||
|
||||
public class MutinyVerticleInstanceHotReloadTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final QuarkusDevModeTest test = new QuarkusDevModeTest()
|
||||
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
|
||||
.addClasses(MyMutinyVerticle.class, BeanDeployingAVerticleFromInstance.class));
|
||||
|
||||
@Test
|
||||
public void testDeploymentOfMutinyVerticleInstance() {
|
||||
String resp = RestAssured.get("/").asString();
|
||||
Assertions.assertTrue(resp.startsWith("ok"));
|
||||
test.modifySourceFile(MyMutinyVerticle.class, data -> data.replace("ok", "hello"));
|
||||
resp = RestAssured.get("/").asString();
|
||||
Assertions.assertTrue(resp.startsWith("hello"));
|
||||
String resp2 = RestAssured.get("/").asString();
|
||||
Assertions.assertEquals(resp, resp2);
|
||||
}
|
||||
|
||||
@ApplicationScoped
|
||||
public static class BeanDeployingAVerticleFromInstance {
|
||||
@Inject
|
||||
Vertx vertx;
|
||||
|
||||
public void init(@Observes Router router) throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
vertx.deployVerticle(new MyMutinyVerticle(),
|
||||
ar -> latch.countDown());
|
||||
router.get("/").handler(rc -> vertx.eventBus().<String> request("address", "",
|
||||
ar -> rc.response().end(ar.result().body())));
|
||||
latch.await();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.quarkus.vertx.devmode;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
|
||||
|
||||
public class MyMutinyVerticle extends AbstractVerticle {
|
||||
|
||||
private final String id = UUID.randomUUID().toString();
|
||||
|
||||
@Override
|
||||
public Uni<Void> asyncStart() {
|
||||
return vertx.eventBus().consumer("address")
|
||||
.handler(m -> m.replyAndForget("ok-" + id))
|
||||
.completionHandler();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package io.quarkus.vertx.verticles;
|
||||
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
|
||||
|
||||
@ApplicationScoped
|
||||
public class MyBeanVerticle extends AbstractVerticle {
|
||||
|
||||
@ConfigProperty(name = "address")
|
||||
String address;
|
||||
|
||||
@Override
|
||||
public Uni<Void> asyncStart() {
|
||||
return vertx.eventBus().consumer(address)
|
||||
.handler(m -> m.replyAndForget("hello"))
|
||||
.completionHandler();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package io.quarkus.vertx.verticles;
|
||||
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.enterprise.event.Observes;
|
||||
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import io.vertx.mutiny.core.Vertx;
|
||||
|
||||
@ApplicationScoped
|
||||
public class VerticleDeployer {
|
||||
|
||||
void deploy(@Observes StartupEvent event, Vertx vertx, MyBeanVerticle verticle) {
|
||||
vertx.deployVerticle(verticle).await().indefinitely();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package io.quarkus.vertx.verticles;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.jboss.shrinkwrap.api.ShrinkWrap;
|
||||
import org.jboss.shrinkwrap.api.asset.StringAsset;
|
||||
import org.jboss.shrinkwrap.api.spec.JavaArchive;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
import io.quarkus.test.QuarkusUnitTest;
|
||||
import io.vertx.mutiny.core.Vertx;
|
||||
import io.vertx.mutiny.core.eventbus.Message;
|
||||
|
||||
public class VerticleDeploymentTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final QuarkusUnitTest config = new QuarkusUnitTest()
|
||||
.setArchiveProducer(() -> ShrinkWrap
|
||||
.create(JavaArchive.class)
|
||||
.addClasses(MyBeanVerticle.class, VerticleDeployer.class)
|
||||
.addAsResource(new StringAsset("address=foo"), "application.properties"));
|
||||
|
||||
@Inject
|
||||
Vertx vertx;
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
String s = vertx.eventBus().<String> request("foo", "anyone?")
|
||||
.onItem().apply(Message::body)
|
||||
.await().indefinitely();
|
||||
assertThat(s).isEqualTo("hello");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package io.quarkus.it.vertx.verticles;
|
||||
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
|
||||
|
||||
public class MutinyAsyncVerticle extends AbstractVerticle {
|
||||
|
||||
@Override
|
||||
public Uni<Void> asyncStart() {
|
||||
String address = config().getString("id");
|
||||
return vertx.eventBus().consumer(address)
|
||||
.handler(message -> message.replyAndForget("OK-" + address))
|
||||
.completionHandler();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -34,6 +34,14 @@ public class VerticleDeployer {
|
||||
vertx.deployVerticle(RxVerticle.class.getName(), new DeploymentOptions().setConfig(new JsonObject()
|
||||
.put("id", "rx-classname")))
|
||||
.thenAccept(x -> latch.countDown());
|
||||
|
||||
vertx.deployVerticle(MutinyAsyncVerticle::new, new DeploymentOptions().setConfig(new JsonObject()
|
||||
.put("id", "mutiny")))
|
||||
.thenAccept(x -> latch.countDown());
|
||||
|
||||
vertx.deployVerticle(MutinyAsyncVerticle.class.getName(), new DeploymentOptions().setConfig(new JsonObject()
|
||||
.put("id", "mutiny-classname")))
|
||||
.thenAccept(x -> latch.countDown());
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
||||
@@ -46,4 +46,18 @@ public class VerticleEndpoint {
|
||||
.thenApply(Message::body);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/mutiny")
|
||||
public CompletionStage<String> mutiny() {
|
||||
return vertx.eventBus().<String> request("mutiny", "")
|
||||
.thenApply(Message::body);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/mutiny-classname")
|
||||
public CompletionStage<String> mutinyWithClassName() {
|
||||
return vertx.eventBus().<String> request("mutiny-classname", "")
|
||||
.thenApply(Message::body);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user