Files
quarkus-workshop/docs/messaging.adoc
2019-11-04 09:36:54 -05:00

181 lines
7.2 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
= Asynchronous messaging between beans
:experimental:
Quarkus allows different beans to interact using asynchronous messages, enforcing loose-coupling. The messages are sent to _virtual_ addresses. It offers 3 types of delivery mechanism:
* **Point-to-Point** - send the message, one consumer receives it. If several consumers listen to the address, a round robin is applied;
* **Publish/Subscribe** - publish a message, all the consumers listening to the address are receiving the message;
* **Request/Reply** - send the message and expect a response. The receiver can respond to the message in an asynchronous-fashion
All these delivery mechanism are non-blocking, and provide one of the fundamental building blocks of reactive systems which promise better performance, reduced developer burden, better isolation between services, and improved recovery from failure.
[NOTE]
====
The asynchronous message passing feature in Quarkus allows _replying_ to messages -- which is not supported by Reactive Messaging. However, it is limited to single-event behavior (no stream) and to local messages.
====
== Add extension
This mechanism uses the https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x EventBus,window=_blank], so you need to enable the `vertx` extension to use this feature. Add the extension in the Terminal using this command:
[source,sh,role="copypaste"]
----
mvn quarkus:add-extension -Dextensions="vertx"
----
[NOTE]
====
https://vertx.io/[Eclipse Vert.x,window=_blank] is a toolkit for building reactive applications. It is designed to be lightweight and embeddable. Vert.x defines a reactive execution model and provides a large ecosystem. Quarkus integrates Vert.x to implement different reactive features, such as asynchronous message passing (the subject of this exercise), and non-blocking HTTP client. Basically, Quarkus uses Vert.x as its reactive engine. While lots of reactive features from Quarkus dont show Vert.x, its used underneath. But you can also access the managed Vert.x instance and benefit from the Vert.x ecosystem.
====
Quarkus provides 3 Vert.x APIs:
* _bare_ - for advanced usage or if you have existing Vert.x code you want to reuse in your Quarkus application
* _Axle_ - works well with Quarkus and MicroProfile APIs (`CompletionStage` for single results and `Publisher` for streams)
* _Rx Java 2_ - when you need support for a wide range of data transformation operators on your streams
We're using the _Axle_ variant here, which provides good support for async operations in HTTP resources.
== Create RESTful resource
We'll start by creating a new asynchronous endpoint. Open the `PersonResource` class and add a new field which will provide access to the Vert.x event bus which is used to pass messages between components:
[source,java,role="copypaste"]
----
@Inject EventBus bus; // <1>
----
<1> Use _Assistant > Organize Imports_, and be sure to import the correct `EventBus` class - which is `io.vertx.axle.core.eventbus.EventBus`.
Next, create two new endpoints in the same class which creates new people in our database given a name, and finds people by their name:
[source,java,role="copypaste"]
----
@POST
@Path("/{name}")
@Produces(MediaType.APPLICATION_JSON)
public CompletionStage<String> addPerson(@PathParam("name") String name) {
return bus.<String>send("add-person", name) // <1>
.thenApply(Message::body); // <2>
}
@GET
@Path("/name/{name}")
@Produces(MediaType.APPLICATION_JSON)
public Person byName(@PathParam("name") String name) {
return Person.find("name", name).firstResult();
}
----
<1> send the name to the `add-person` address
<2> when we get the reply, extract the body and send this as response to the user
Don't forget to _Assistant > Organize Imports_ and import the _axle_ variant of the `Message` class.
This uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the JAX-RS endpoint, we are sending a message. This message is consumed by another bean and the response is sent using the reply mechanism.
The `EventBus` object provides methods to:
* Send a message to a specific address - one single consumer receives the message
* Publish a message to a specific address - all consumers receive the messages
* Send a message and expect reply
With this endpoint we can POST to the `/person/joe` endpoint to create a new user given the name.
== Try it, and fail
With our endpoint, confirm it fails using the Terminal to execute:
[source,sh,role="copypaste"]
----
curl -X POST http://localhost:8080/person/joe
----
**This will fail** with an `Internal Server Error`. If you look at the Live Coding terminal, you'll also see the reason:
[source,none]
----
ERROR [org.jbo.res.res.i18n] (executor-thread-1) RESTEASY002020: Unhandled asynchronous exception, sending back 500: (NO_HANDLERS,-1) No handlers for address add-person
----
We posted the message to the Vert.x event bus at the `add-person` address, but there's nothing to receive it!
== Create consumer
Create a new class in the `org.acme.people.service` package called `PersonService`. Use the following code to implement our message consumer:
[source,java,role="copypaste"]
----
package org.acme.people.service;
import java.time.LocalDate;
import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;
import org.acme.people.model.EyeColor;
import org.acme.people.model.Person;
import io.quarkus.vertx.ConsumeEvent;
@ApplicationScoped
public class PersonService {
@ConsumeEvent("add-person")
@Transactional
public String addPerson(String name) {
LocalDate birth = LocalDate.now().plusWeeks(Math.round(Math.floor(Math.random() * 20 * 52 * -1)));
EyeColor color = EyeColor.values()[(int)(Math.floor(Math.random() * EyeColor.values().length))];
Person p = new Person();
p.birth = birth;
p.eyes = color;
p.name = name;
Person.persist(p); // <1>
return p.name; // <2>
}
}
----
<1> A new Person entity is created and persisted
<2> The return value of a method annotated with `@ConsumeEvent` is used as response to the incoming message.
This bean receives the name, and creates a new `Person` entity and persists it, and then echos back the name (or a well defined failure if things go wrong).
Let's try our test again:
[source,sh,role="copypaste"]
----
curl -X POST http://localhost:8080/person/joe
----
You should get back the name you put in (`joe`). Now let's confirm Joe is present:
[source,sh,role="copypaste"]
----
curl -s http://localhost:8080/person/name/joe | jq
----
You should get back Joe!
[source,json]
----
{
"id": 1004,
"birth": "2000-03-15",
"eyes": "BROWN",
"name": "joe"
}
----
To better understand, lets detail how the HTTP request/response has been handled:
. The request is received by the addPerson method
. a message containing the desired name is sent to the event bus
. Another bean receives this message and computes the response
. This response is sent back using the reply mechanism
. Once the reply is received by the sender, the content is written to the HTTP response
== Congratulations!
In this exercise you learned how Quarkus allows different beans to interact using asynchronous messages. We'll take this to the next level in the next exercise.