mirror of
https://github.com/jlengrand/quarkus-workshop.git
synced 2026-03-10 08:41:21 +00:00
267 lines
12 KiB
Plaintext
267 lines
12 KiB
Plaintext
## Reactive Streams with Quarkus and Kafka
|
||
|
||
In this exercise, you will use the Quarkus Kafka extension to build a streaming application using MicroProfile Reative Streams Messaging and https://kafka.apache.org[Apache Kafka], a distributed streaming platform. You will also use https://strimzi.io/[Strimzi], which provides an easy way to run an Apache Kafka cluster on Kubernetes using [Operators](https://operatorhub.io/what-is-an-operator).
|
||
|
||
### What is Apache Kafka?
|
||
|
||
Apache Kafka is a distributed streaming platform. A streaming platform has three key capabilities:
|
||
|
||
* Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
|
||
* Store streams of records in a fault-tolerant durable way.
|
||
* Process streams of records as they occur.
|
||
|
||
Kafka is generally used for two broad classes of applications:
|
||
|
||
* Building real-time streaming data pipelines that reliably get data between systems or applications
|
||
* Building real-time streaming applications that transform or react to the streams of data
|
||
|
||
### What is Strimzi?
|
||
|
||
Strimzi provides a way to run an Apache Kafka cluster on Kubernetes in various deployment configurations.
|
||
|
||
Strimzi is based on Apache Kafka, and makes it easy to run Apache Kafka on OpenShift or Kubernetes.
|
||
|
||
Strimzi provides three operators:
|
||
|
||
* **Cluster Operator** - Responsible for deploying and managing Apache Kafka clusters within an OpenShift or Kubernetes cluster.
|
||
* **Topic Operator** - Responsible for managing Kafka topics within a Kafka cluster running within an OpenShift or Kubernetes cluster.
|
||
* **User Operator** - Responsible for managing Kafka users within a Kafka cluster running within an OpenShift or Kubernetes cluster.
|
||
|
||
## The Goal
|
||
|
||
In this exercise, we are going to generate (random) names in one component. These names are written in a Kafka topic (`names`). A second component reads from the `names` Kafka topic and applies some magic conversion to the name (adding an honorific). The result is sent to an _in-memory stream_ consumed by a JAX-RS resource. The data is sent to a browser using _server-sent events_ and displayed in the browser. It will look like this:
|
||
|
||
image::names.png[names,800]
|
||
|
||
## Deploy Kafka via Strimzi
|
||
|
||
The Strimzi operator installs and manages Kafka clusters on Kubernetes. It's been pre-installed for you, so all you have to do is create a Kafka cluster inside your namespace.
|
||
|
||
First, open the {{CONSOLE_URL}}[OpenShift Console], log in using your username/password if needed (e.g. `user12`/`pass12`) and navigate to your project (`userNN-project`). Once there, on the left menu click on _Catalog > Developer Catalog_ and type in `kafka` in the keyword filter box:
|
||
|
||
image::kafkacatalog.png[kafkacatalog,800]
|
||
|
||
These are all of the Kafka cluster elements you can install. Click on **Kafka**, and then click on **Create**. This will open a yaml file for you to configure the cluster before it's installed. Change the name of the cluster from `my-cluster` to `names-cluster` (under the _metadata_ section of the YAML file). Leave all other values as-is, and click **Create**:
|
||
|
||
image::createkafka.png[createkafka,600]
|
||
|
||
This will create a new Kafka Kubernetes object in your namespace, triggering the Operator to deploy Kafka.
|
||
After clicking **Create** you will be taken to the list of objects created by the Kafka operator.
|
||
|
||
Next, click on the **Kafka Topic** tab:
|
||
|
||
image::topictab.png[topictab,600]
|
||
|
||
and then click **Create Kafka Topic**. We'll need to create a topic for our application to stream to and from, so in the YAML:
|
||
|
||
* Change the _metadata > names_ value from `my-topic` to `names`.
|
||
* Change the vale of the `strimzi.io/cluster` label from `my-cluster` to `names-cluster`
|
||
|
||
Then click **Create**.
|
||
|
||
image::topiccreate.png[topiccreate,600]
|
||
|
||
This will cause the Operator to provision a new Topic in the Kafka cluster.
|
||
|
||
Verify that the Kafka and Zookeeper pods are starting up by executing this command in a Terminal in Che:
|
||
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
oc get pods|grep names-cluster
|
||
----
|
||
|
||
[NOTE]
|
||
====
|
||
You may be logged out of the cluster if you have reloaded the page. If so, just run this command to log in again:
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
oc login https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT --insecure-skip-tls-verify=true
|
||
----
|
||
====
|
||
|
||
Don't worry if they're not all in the _Running_ status, they will eventually complete and we'll use them later on in this exercise.
|
||
|
||
## Add Quarkus Kafka Extension
|
||
|
||
With Kafka installing, turn your attention back to the app. Like other exercises, we'll need another extension to integrate with Kafka. Install it with:
|
||
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
mvn quarkus:add-extension -Dextensions="kafka"
|
||
----
|
||
|
||
This will add the necessary entries in your `pom.xml` to bring in the Kafka extension.
|
||
|
||
## Create name generator
|
||
|
||
To start building the app, create a new Java class in the `org.acme.people.stream` called `NameGenerator`. This class will generate random names and publish them to our Kafka topic for further processing. Use this code:
|
||
|
||
[source,java,role="copypaste"]
|
||
----
|
||
package org.acme.people.stream;
|
||
|
||
import io.reactivex.Flowable;
|
||
import javax.enterprise.context.ApplicationScoped;
|
||
import org.acme.people.utils.CuteNameGenerator;
|
||
import org.eclipse.microprofile.reactive.messaging.Outgoing;
|
||
import java.util.concurrent.TimeUnit;
|
||
|
||
@ApplicationScoped
|
||
public class NameGenerator {
|
||
|
||
@Outgoing("generated-name") // <1>
|
||
public Flowable<String> generate() { // <2>
|
||
return Flowable.interval(5, TimeUnit.SECONDS)
|
||
.map(tick -> CuteNameGenerator.generate());
|
||
}
|
||
|
||
}
|
||
----
|
||
<1> Instruct Reactive Messaging to dispatch the items from returned stream to `generated-name`
|
||
<2> The method returns a RX Java 2 stream (Flowable) emitting a random name every 5 seconds
|
||
|
||
The method returns a Reactive Stream. The generated items are sent to the stream named `generated-name`. This stream is mapped to Kafka using the application.properties file that we will create soon.
|
||
|
||
## Add honorifics
|
||
|
||
The name converter reads the names from Kafka, and transforms them, adding a random (English) honorific to the beginning of the name.
|
||
|
||
Create a new Java class in the same package called `NameConverter`. Use this code:
|
||
|
||
[source,java,role="copypaste"]
|
||
----
|
||
package org.acme.people.stream;
|
||
|
||
import javax.enterprise.context.ApplicationScoped;
|
||
import org.eclipse.microprofile.reactive.messaging.Incoming;
|
||
import org.eclipse.microprofile.reactive.messaging.Outgoing;
|
||
import io.smallrye.reactive.messaging.annotations.Broadcast;
|
||
|
||
@ApplicationScoped
|
||
public class NameConverter {
|
||
|
||
private static final String[] honorifics = {"Mr.", "Mrs.", "Sir", "Madam", "Lord", "Lady", "Dr.", "Professor", "Vice-Chancellor", "Regent", "Provost", "Prefect"};
|
||
|
||
@Incoming("names") // <1>
|
||
@Outgoing("my-data-stream") // <2>
|
||
@Broadcast // <3>
|
||
public String process(String name) {
|
||
String honorific = honorifics[(int)Math.floor(Math.random() * honorifics.length)];
|
||
return honorific + " " + name;
|
||
}
|
||
}
|
||
----
|
||
<1> Indicates that the method consumes the items from the `names` topic
|
||
<2> Indicates that the objects returned by the method are sent to the `my-data-stream` stream
|
||
<3> Indicates that the item are dispatched to all _subscribers_
|
||
|
||
The process method is called for every Kafka record from the `names` topic (configured in the application configuration). Every result is sent to the my-data-stream in-memory stream.
|
||
|
||
## Expose to front end
|
||
|
||
Finally, let’s bind our stream to a JAX-RS resource. Create a new Java class in the same package called `NameResource`. Use this code:
|
||
|
||
[source,java,role="copypaste"]
|
||
----
|
||
package org.acme.people.stream;
|
||
|
||
import io.smallrye.reactive.messaging.annotations.Stream;
|
||
import org.reactivestreams.Publisher;
|
||
import javax.inject.Inject;
|
||
import javax.ws.rs.GET;
|
||
import javax.ws.rs.Path;
|
||
import javax.ws.rs.Produces;
|
||
import javax.ws.rs.core.MediaType;
|
||
|
||
/**
|
||
* A simple resource retrieving the in-memory "my-data-stream" and sending the items as server-sent events.
|
||
*/
|
||
@Path("/names")
|
||
public class NameResource {
|
||
|
||
@Inject
|
||
@Stream("my-data-stream") Publisher<String> names; // <1>
|
||
|
||
@GET
|
||
@Path("/stream")
|
||
@Produces(MediaType.SERVER_SENT_EVENTS) // <2>
|
||
public Publisher<String> stream() { // <3>
|
||
return names;
|
||
}
|
||
}
|
||
----
|
||
<1> Injects the `my-data-stream` stream using the `@Stream` qualifier
|
||
<2> Indicates that the content is sent using _Server Sent Events_
|
||
<3> Returns the stream (Reactive Stream)
|
||
|
||
## Configure application
|
||
|
||
We need to configure the Kafka connector. This is done in the `application.properties` file (in the `src/main/resources` directory). The keys are structured as follows:
|
||
|
||
`mp.messaging.[outgoing|incoming].{channel-name}.property=value`
|
||
|
||
* The `channel-name` segment must match the value set in the @`Incoming` and `@Outgoing` annotation:
|
||
* `generated-price` → sink in which we write the prices
|
||
* `prices` → source in which we read the prices
|
||
|
||
Add the following values to the `application.properties`:
|
||
|
||
[source,none,role="copypaste"]
|
||
----
|
||
# Configure the Kafka sink (we write to it)
|
||
%prod.mp.messaging.outgoing.generated-name.bootstrap.servers=names-cluster-kafka-bootstrap:9092
|
||
%prod.mp.messaging.outgoing.generated-name.connector=smallrye-kafka
|
||
%prod.mp.messaging.outgoing.generated-name.topic=names
|
||
%prod.mp.messaging.outgoing.generated-name.value.serializer=org.apache.kafka.common.serialization.StringSerializer
|
||
|
||
# Configure the Kafka source (we read from it)
|
||
%prod.mp.messaging.incoming.names.bootstrap.servers=names-cluster-kafka-bootstrap:9092
|
||
%prod.mp.messaging.incoming.names.connector=smallrye-kafka
|
||
%prod.mp.messaging.incoming.names.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
||
----
|
||
We have prefixed these with `%prod` to avoid our app trying to connect when in `dev` or `test` mode.
|
||
|
||
More details about this configuration is available on the https://kafka.apache.org/documentation/#producerconfigs[Producer configuration] and https://kafka.apache.org/documentation/#consumerconfigs[Consumer configuration] section from the Kafka documentation.
|
||
|
||
[NOTE]
|
||
====
|
||
What about `my-data-stream`? This is an in-memory stream, not connected to a message broker.
|
||
====
|
||
|
||
## Rebuild Executable JAR
|
||
|
||
Now we are ready to run our application. Using the command palette, select **Create Executable JAR**. You should see a bunch of log output that ends with a `SUCCESS` message.
|
||
|
||
## Deploy
|
||
|
||
And now start the build using our executable JAR:
|
||
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
oc start-build people --from-file target/*-runner.jar --follow
|
||
----
|
||
|
||
The build should take a minute or two to complete.
|
||
|
||
## Test
|
||
|
||
Our application should be up and running in a few seconds after the build completes and generating names. To see if it's working, run this command in a Terminal to generate the URL to the sample visualization of the stream of names being generated:
|
||
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
clear; echo; echo http://$(oc get route people -o=go-template --template='{{ .spec.host }}')/names.html ; echo
|
||
----
|
||
|
||
Open a separate browser tab and go to that URL and you should see a cloud of names updating every 5 seconds:
|
||
|
||
image::names.png[names,800]
|
||
|
||
These are the original names streamed through Kafka, altered to add a random honorific like "Sir" or "Madam", and displayed in a "word cloud" for you to enjoy!
|
||
|
||
## Congratulations
|
||
|
||
This guide has shown how you can interact with Kafka using Quarkus. It utilizes MicroProfile Reactive Messaging to build data streaming applications.
|
||
|
||
If you want to go further check the documentation of https://smallrye.io/smallrye-reactive-messaging[SmallRye Reactive Messaging], the implementation used in Quarkus.
|
||
|