mirror of
https://github.com/jlengrand/quarkus-workshop.git
synced 2026-03-10 08:41:21 +00:00
350 lines
15 KiB
Plaintext
350 lines
15 KiB
Plaintext
= Reactive Streams with Quarkus and Kafka
|
||
:experimental:
|
||
|
||
In this exercise, you will use the Quarkus Kafka extension to build a streaming application using MicroProfile Reactive Streams Messaging and https://kafka.apache.org[Apache Kafka,window=_blank], a distributed streaming platform. You will also use https://strimzi.io/[Strimzi,window=_blank], which provides an easy way to run an Apache Kafka cluster on Kubernetes using https://operatorhub.io/what-is-an-operator[Operators,window=_blank].
|
||
|
||
== What is Apache Kafka?
|
||
|
||
Apache Kafka is a distributed streaming platform. A streaming platform has three key capabilities:
|
||
|
||
* Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
|
||
* Store streams of records in a fault-tolerant durable way.
|
||
* Process streams of records as they occur.
|
||
|
||
Kafka is generally used for two broad classes of applications:
|
||
|
||
* Building real-time streaming data pipelines that reliably get data between systems or applications
|
||
* Building real-time streaming applications that transform or react to the streams of data
|
||
|
||
== What is Strimzi?
|
||
|
||
Strimzi provides a way to run an Apache Kafka cluster on Kubernetes in various deployment configurations.
|
||
|
||
Strimzi is based on Apache Kafka, and makes it easy to run Apache Kafka on OpenShift or Kubernetes.
|
||
|
||
Strimzi provides three operators:
|
||
|
||
* **Cluster Operator** - Responsible for deploying and managing Apache Kafka clusters within an OpenShift or Kubernetes cluster.
|
||
* **Topic Operator** - Responsible for managing Kafka topics within a Kafka cluster running within an OpenShift or Kubernetes cluster.
|
||
* **User Operator** - Responsible for managing Kafka users within a Kafka cluster running within an OpenShift or Kubernetes cluster.
|
||
|
||
== The Goal
|
||
|
||
In this exercise, we are going to generate (random) names in one component. These names are written in a Kafka topic (`names`). A second component reads from the `names` Kafka topic and applies some magic conversion to the name (adding an honorific). The result is sent to an _in-memory stream_ consumed by a JAX-RS resource. The data is sent to a browser using https://www.w3.org/TR/eventsource/[_server-sent events_,window=_blank] and displayed in the browser. It will look like this:
|
||
|
||
image::names.png[names,800]
|
||
|
||
=== Create Kafka Cluster
|
||
|
||
The Strimzi operator installs and manages Kafka clusters on Kubernetes. It's been pre-installed for you, so all you have to do is create a Kafka cluster inside your namespace.
|
||
|
||
First, open the {{CONSOLE_URL}}[OpenShift Console,window=_blank], log in using your username/password if needed (e.g. `{{ OPENSHIFT_USER_NAME }}`/`{{ OPENSHIFT_USER_PASSWORD }}`) and navigate to your project (`{{ OPENSHIFT_USER_NAME }}-project`). Once there, on the left menu click on _Catalog > Developer Catalog_ and type in `kafka` in the keyword filter box:
|
||
|
||
image::kafkacatalog.png[kafkacatalog,800]
|
||
|
||
These are all of the Kafka cluster elements you can install. Click on **Kafka**, and then click on **Create**. This will open a yaml file for you to configure the cluster before it's installed. Change the name of the cluster from `my-cluster` to `names-cluster` (under the _metadata_ section of the YAML file). Leave all other values as-is, and click **Create**:
|
||
|
||
image::createkafka.png[createkafka,600]
|
||
|
||
This will create a new Kafka Kubernetes object in your namespace, triggering the Operator to deploy Kafka.
|
||
After clicking **Create** you will be taken to the list of objects created by the Kafka operator.
|
||
|
||
== Create Kafka Topic
|
||
|
||
Click _Developer Catalog_ on the left again, and enter `topic` into the search box. Click on the _Kafka Topic_ box, then click **Create**:
|
||
|
||
image::createkafkatopic.png[createkafka,800]
|
||
|
||
We'll need to create a topic for our application to stream to and from, so in the YAML:
|
||
|
||
* Change the _metadata > name_ value from `my-topic` to `names`.
|
||
* Change the vale of the `strimzi.io/cluster` label from `my-cluster` to `names-cluster`
|
||
|
||
Then click **Create**.
|
||
|
||
image::topiccreate.png[topiccreate,800]
|
||
|
||
This will cause the Operator to provision a new Topic in the Kafka cluster.
|
||
|
||
Verify that the Kafka and Zookeeper pods are starting up by executing this command in a Terminal in Che:
|
||
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
oc get pods|grep names-cluster
|
||
----
|
||
|
||
You'll see something like:
|
||
|
||
[source,none]
|
||
----
|
||
names-cluster-entity-operator-78686cdd4d-rfkwd 3/3 Running 0 6m50s
|
||
names-cluster-kafka-0 2/2 Running 0 7m41s
|
||
names-cluster-kafka-1 2/2 Running 0 7m41s
|
||
names-cluster-kafka-2 2/2 Running 1 7m41s
|
||
names-cluster-zookeeper-0 2/2 Running 0 8m31s
|
||
names-cluster-zookeeper-1 2/2 Running 0 8m31s
|
||
names-cluster-zookeeper-2 2/2 Running 0 8m31s
|
||
----
|
||
|
||
[NOTE]
|
||
====
|
||
You may be logged out of the cluster if you have reloaded the page. If so, just run this command to log in again:
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
oc login https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT --insecure-skip-tls-verify=true
|
||
----
|
||
====
|
||
|
||
Don't worry if they're not all in the _Running_ status, they will eventually complete and we'll use them later on in this exercise.
|
||
|
||
== Add Quarkus Kafka Extension
|
||
|
||
With Kafka installing, turn your attention back to the app. Like other exercises, we'll need another extension to integrate with Kafka. Install it with:
|
||
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
mvn quarkus:add-extension -Dextensions="kafka"
|
||
----
|
||
|
||
This will add the necessary entries in your `pom.xml` to bring in the Kafka extension.
|
||
|
||
== The Application You Will Build
|
||
|
||
The app consists of 3 components that pass messages via Kafka and an in-memory stream, then uses SSE to push messages to the browser. It looks like:
|
||
|
||
image::kafkaarch.png[kafka, 800]
|
||
|
||
== Create name generator
|
||
|
||
To start building the app, create a new Java class in the `org.acme.people.stream` called `NameGenerator`. This class will generate random names and publish them to our Kafka topic for further processing. Use this code:
|
||
|
||
[source,java,role="copypaste"]
|
||
----
|
||
package org.acme.people.stream;
|
||
|
||
import io.reactivex.Flowable;
|
||
import javax.enterprise.context.ApplicationScoped;
|
||
import org.acme.people.utils.CuteNameGenerator;
|
||
import org.eclipse.microprofile.reactive.messaging.Outgoing;
|
||
import java.util.concurrent.TimeUnit;
|
||
|
||
@ApplicationScoped
|
||
public class NameGenerator {
|
||
|
||
@Outgoing("generated-name") // <1>
|
||
public Flowable<String> generate() { // <2>
|
||
return Flowable.interval(5, TimeUnit.SECONDS)
|
||
.map(tick -> CuteNameGenerator.generate());
|
||
}
|
||
|
||
}
|
||
----
|
||
<1> Instruct Reactive Messaging to dispatch the items from returned stream to `generated-name`
|
||
<2> The method returns a RX Java 2 stream (Flowable) emitting a random name every 5 seconds
|
||
|
||
The method returns a Reactive Stream. The generated items are sent to the stream named `generated-name`. This stream is mapped to Kafka using the application.properties file that we will create soon.
|
||
|
||
== Add honorifics
|
||
|
||
The name converter reads the names from Kafka, and transforms them, adding a random (English) honorific to the beginning of the name.
|
||
|
||
Create a new Java class in the same package called `NameConverter`. Use this code:
|
||
|
||
[source,java,role="copypaste"]
|
||
----
|
||
package org.acme.people.stream;
|
||
|
||
import javax.enterprise.context.ApplicationScoped;
|
||
import org.eclipse.microprofile.reactive.messaging.Incoming;
|
||
import org.eclipse.microprofile.reactive.messaging.Outgoing;
|
||
import io.smallrye.reactive.messaging.annotations.Broadcast;
|
||
|
||
@ApplicationScoped
|
||
public class NameConverter {
|
||
|
||
private static final String[] honorifics = {"Mr.", "Mrs.", "Sir", "Madam", "Lord", "Lady", "Dr.", "Professor", "Vice-Chancellor", "Regent", "Provost", "Prefect"};
|
||
|
||
@Incoming("names") // <1>
|
||
@Outgoing("my-data-stream") // <2>
|
||
@Broadcast // <3>
|
||
public String process(String name) {
|
||
String honorific = honorifics[(int)Math.floor(Math.random() * honorifics.length)];
|
||
return honorific + " " + name;
|
||
}
|
||
}
|
||
----
|
||
<1> Indicates that the method consumes the items from the `names` topic
|
||
<2> Indicates that the objects returned by the method are sent to the `my-data-stream` stream
|
||
<3> Indicates that the item are dispatched to all _subscribers_
|
||
|
||
The process method is called for every Kafka record from the `names` topic (configured in the application configuration). Every result is sent to the my-data-stream in-memory stream.
|
||
|
||
== Expose to front end
|
||
|
||
Finally, let’s bind our stream to a JAX-RS resource. Create a new Java class in the same package called `NameResource`. Use this code:
|
||
|
||
[source,java,role="copypaste"]
|
||
----
|
||
package org.acme.people.stream;
|
||
|
||
import io.smallrye.reactive.messaging.annotations.Channel;
|
||
import org.reactivestreams.Publisher;
|
||
import javax.inject.Inject;
|
||
import javax.ws.rs.GET;
|
||
import javax.ws.rs.Path;
|
||
import javax.ws.rs.Produces;
|
||
import javax.ws.rs.core.MediaType;
|
||
import org.jboss.resteasy.annotations.SseElementType;
|
||
|
||
/**
|
||
* A simple resource retrieving the in-memory "my-data-stream" and sending the items as server-sent events.
|
||
*/
|
||
@Path("/names")
|
||
public class NameResource {
|
||
|
||
@Inject
|
||
@Channel("my-data-stream") Publisher<String> names; // <1>
|
||
|
||
@GET
|
||
@Path("/stream")
|
||
@Produces(MediaType.SERVER_SENT_EVENTS)// <2>
|
||
@SseElementType("text/plain") // <3>
|
||
public Publisher<String> stream() { // <4>
|
||
return names;
|
||
}
|
||
}
|
||
----
|
||
<1> Injects the `my-data-stream` stream using the `@Channel` qualifier
|
||
<2> Indicates that the content is sent using _Server Sent Events_
|
||
<3> Indicates that the data contained within the server sent events is of type `text/plain`
|
||
<4> Returns the stream (Reactive Stream)
|
||
|
||
[NOTE]
|
||
====
|
||
There is a pre-created `names.html` page for you to use (in the `src/main/resources/META-INF/resources` directory) which will make a request to this `/names/stream` endpoint using standard JavaScript running in the browser and draw the resulting names using the https://d3js.org/[D3.js library,window=_blank]. The JavaScript that makes this call looks like this (do not copy this into anything!):
|
||
|
||
[source,javascript]
|
||
----
|
||
var source = new EventSource("/names/stream"); // <1>
|
||
|
||
source.onmessage = function (event) { // <2>
|
||
|
||
console.log("received new name: " + event.data);
|
||
// process new name in event.data
|
||
// ...
|
||
|
||
// update the display with the new name
|
||
update(); // <3>
|
||
};
|
||
----
|
||
<1> Uses your browser's support for the `EventSource` API (part of the W3C SSE standard) to call the endpoint
|
||
<2> Each time a message is received via SSE, _react_ to it by running this function
|
||
<3> Refresh the display using the D3.js library
|
||
|
||
====
|
||
|
||
== Configure application
|
||
|
||
We need to configure the Kafka connector. This is done in the `application.properties` file (in the `src/main/resources` directory). The keys are structured as follows:
|
||
|
||
`mp.messaging.[outgoing|incoming].{channel-name}.property=value`
|
||
|
||
The `channel-name` segment must match the value set in the `@Incoming` and `@Outgoing` annotation:
|
||
|
||
* `generated-name` → sink to which we write the names
|
||
* `names` → source from which we read the names
|
||
|
||
Add the following values to the `application.properties`:
|
||
|
||
[source,none,role="copypaste"]
|
||
----
|
||
# Configure the Kafka sink (we write to it)
|
||
%prod.mp.messaging.outgoing.generated-name.bootstrap.servers=names-cluster-kafka-bootstrap:9092<1>
|
||
%prod.mp.messaging.outgoing.generated-name.connector=smallrye-kafka
|
||
%prod.mp.messaging.outgoing.generated-name.topic=names
|
||
%prod.mp.messaging.outgoing.generated-name.value.serializer=org.apache.kafka.common.serialization.StringSerializer
|
||
|
||
# Configure the Kafka source (we read from it)
|
||
%prod.mp.messaging.incoming.names.bootstrap.servers=names-cluster-kafka-bootstrap:9092<1>
|
||
%prod.mp.messaging.incoming.names.connector=smallrye-kafka
|
||
%prod.mp.messaging.incoming.names.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
||
----
|
||
<1> The hostnames you see here will only make sense (be resolvable via DNS) when this app is run in the same Kubernetes namespace as the Kafka cluster you created earlier. So you'll see this and other config values above prefixed with `%prod` which will not try to initialize Kafka when in `dev` mode.
|
||
|
||
More details about this configuration is available on the https://kafka.apache.org/documentation/#producerconfigs[Producer configuration] and https://kafka.apache.org/documentation/#consumerconfigs[Consumer configuration,window=_blank] section from the Kafka documentation.
|
||
|
||
[NOTE]
|
||
====
|
||
What about `my-data-stream`? This is an in-memory stream, not connected to a message broker.
|
||
====
|
||
|
||
== Verify Kafka is running
|
||
|
||
Verify that the Kafka and Zookeeper pods finally started up by running this command:
|
||
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
oc get pods|grep names-cluster
|
||
----
|
||
|
||
You'll should see all pods up and `Running`:
|
||
|
||
[source,none]
|
||
----
|
||
names-cluster-entity-operator-78686cdd4d-rfkwd 3/3 Running 0 6m50s
|
||
names-cluster-kafka-0 2/2 Running 0 7m41s
|
||
names-cluster-kafka-1 2/2 Running 0 7m41s
|
||
names-cluster-kafka-2 2/2 Running 1 7m41s
|
||
names-cluster-zookeeper-0 2/2 Running 0 8m31s
|
||
names-cluster-zookeeper-1 2/2 Running 0 8m31s
|
||
names-cluster-zookeeper-2 2/2 Running 0 8m31s
|
||
----
|
||
|
||
If some of them are still starting, you'll need to wait for them! Run the `oc get pods | grep names-cluster` command repeatedly until they are running. This should take no more than 1-2 minutes.
|
||
|
||
== Rebuild Executable JAR
|
||
|
||
Using the command palette, select **Create Executable JAR**.
|
||
|
||
image:createexec.png[create,600]
|
||
|
||
You should see a bunch of log output that ends with a `SUCCESS` message.
|
||
|
||
== Deploy to OpenShift
|
||
|
||
And now start the build using our executable JAR:
|
||
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
oc start-build people --from-file target/*-runner.jar --follow
|
||
----
|
||
|
||
The build should take a minute or two to complete.
|
||
|
||
== Test
|
||
|
||
Our application should be up and running in a few seconds after the build completes and generating names. To see if it's working, run this command in a Terminal to generate the URL to the sample visualization of the stream of names being generated:
|
||
|
||
[source,sh,role="copypaste"]
|
||
----
|
||
echo; echo http://$(oc get route people -o=go-template --template={% raw %}'{{ .spec.host }}'{% endraw %})/names.html ; echo
|
||
----
|
||
|
||
Open a separate browser tab and go to that URL and you should see a cloud of names updating every 5 seconds (it may take a few seconds for it to start!):
|
||
|
||
[NOTE]
|
||
====
|
||
It takes a few seconds to establish the connection to Kafka. If you don't see new names generated every 5 seconds, reload the browser page to re-initialize the SSE stream.
|
||
====
|
||
|
||
image::names.png[names,800]
|
||
|
||
These are the original names streamed through Kafka, altered to add a random honorific like "Sir" or "Madam", and displayed in a "word cloud" for you to enjoy!
|
||
|
||
== Congratulations!
|
||
|
||
This guide has shown how you can interact with Kafka using Quarkus. It utilizes MicroProfile Reactive Messaging to build data streaming applications.
|
||
|
||
If you want to go further check the documentation of https://smallrye.io/smallrye-reactive-messaging[SmallRye Reactive Messaging,window=_blank], the implementation used in Quarkus.
|
||
|