Files
quarkus-workshop/docs/kafka.adoc
jamesfalkner b488b0b3c4 window
2019-07-27 10:38:31 -04:00

313 lines
14 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
= Reactive Streams with Quarkus and Kafka
:experimental:
In this exercise, you will use the Quarkus Kafka extension to build a streaming application using MicroProfile Reative Streams Messaging and https://kafka.apache.org[Apache Kafka,window=_blank], a distributed streaming platform. You will also use https://strimzi.io/[Strimzi,window=_blank], which provides an easy way to run an Apache Kafka cluster on Kubernetes using https://operatorhub.io/what-is-an-operator[Operators,window=_blank].
== What is Apache Kafka?
Apache Kafka is a distributed streaming platform. A streaming platform has three key capabilities:
* Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
* Store streams of records in a fault-tolerant durable way.
* Process streams of records as they occur.
Kafka is generally used for two broad classes of applications:
* Building real-time streaming data pipelines that reliably get data between systems or applications
* Building real-time streaming applications that transform or react to the streams of data
== What is Strimzi?
Strimzi provides a way to run an Apache Kafka cluster on Kubernetes in various deployment configurations.
Strimzi is based on Apache Kafka, and makes it easy to run Apache Kafka on OpenShift or Kubernetes.
Strimzi provides three operators:
* **Cluster Operator** - Responsible for deploying and managing Apache Kafka clusters within an OpenShift or Kubernetes cluster.
* **Topic Operator** - Responsible for managing Kafka topics within a Kafka cluster running within an OpenShift or Kubernetes cluster.
* **User Operator** - Responsible for managing Kafka users within a Kafka cluster running within an OpenShift or Kubernetes cluster.
== The Goal
In this exercise, we are going to generate (random) names in one component. These names are written in a Kafka topic (`names`). A second component reads from the `names` Kafka topic and applies some magic conversion to the name (adding an honorific). The result is sent to an _in-memory stream_ consumed by a JAX-RS resource. The data is sent to a browser using https://www.w3.org/TR/eventsource/[_server-sent events_,window=_blank] and displayed in the browser. It will look like this:
image::names.png[names,800]
=== Create Kafka Cluster
The Strimzi operator installs and manages Kafka clusters on Kubernetes. It's been pre-installed for you, so all you have to do is create a Kafka cluster inside your namespace.
First, open the {{CONSOLE_URL}}[OpenShift Console], log in using your username/password if needed (e.g. `user12`/`pass12`) and navigate to your project (`userNN-project`). Once there, on the left menu click on _Catalog > Developer Catalog_ and type in `kafka` in the keyword filter box:
image::kafkacatalog.png[kafkacatalog,800]
These are all of the Kafka cluster elements you can install. Click on **Kafka**, and then click on **Create**. This will open a yaml file for you to configure the cluster before it's installed. Change the name of the cluster from `my-cluster` to `names-cluster` (under the _metadata_ section of the YAML file). Leave all other values as-is, and click **Create**:
image::createkafka.png[createkafka,600]
This will create a new Kafka Kubernetes object in your namespace, triggering the Operator to deploy Kafka.
After clicking **Create** you will be taken to the list of objects created by the Kafka operator.
== Create Kafka Topic
Click _Developer Catalog_ on the left again, and enter `topic` into the search box. Click on the _Kafka Topic_ box, then click **Create**:
image::createkafkatopic.png[createkafka,800]
We'll need to create a topic for our application to stream to and from, so in the YAML:
* Change the _metadata > names_ value from `my-topic` to `names`.
* Change the vale of the `strimzi.io/cluster` label from `my-cluster` to `names-cluster`
Then click **Create**.
image::topiccreate.png[topiccreate,800]
This will cause the Operator to provision a new Topic in the Kafka cluster.
Verify that the Kafka and Zookeeper pods are starting up by executing this command in a Terminal in Che:
[source,sh,role="copypaste"]
----
oc get pods|grep names-cluster
----
You'll see something like:
[source,none]
----
names-cluster-entity-operator-78686cdd4d-rfkwd 3/3 Running 0 6m50s
names-cluster-kafka-0 2/2 Running 0 7m41s
names-cluster-kafka-1 2/2 Running 0 7m41s
names-cluster-kafka-2 2/2 Running 1 7m41s
names-cluster-zookeeper-0 2/2 Running 0 8m31s
names-cluster-zookeeper-1 2/2 Running 0 8m31s
names-cluster-zookeeper-2 2/2 Running 0 8m31s
----
[NOTE]
====
You may be logged out of the cluster if you have reloaded the page. If so, just run this command to log in again:
[source,sh,role="copypaste"]
----
oc login https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT --insecure-skip-tls-verify=true
----
====
Don't worry if they're not all in the _Running_ status, they will eventually complete and we'll use them later on in this exercise.
== Add Quarkus Kafka Extension
With Kafka installing, turn your attention back to the app. Like other exercises, we'll need another extension to integrate with Kafka. Install it with:
[source,sh,role="copypaste"]
----
mvn quarkus:add-extension -Dextensions="kafka"
----
This will add the necessary entries in your `pom.xml` to bring in the Kafka extension.
== The Application You Will Build
The app consists of 3 components that pass messages via Kafka and an in-memory stream, then uses SSE to push messages to the browser. It looks like:
image::kafkaarch.png[kafka, 800]
== Create name generator
To start building the app, create a new Java class in the `org.acme.people.stream` called `NameGenerator`. This class will generate random names and publish them to our Kafka topic for further processing. Use this code:
[source,java,role="copypaste"]
----
package org.acme.people.stream;
import io.reactivex.Flowable;
import javax.enterprise.context.ApplicationScoped;
import org.acme.people.utils.CuteNameGenerator;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import java.util.concurrent.TimeUnit;
@ApplicationScoped
public class NameGenerator {
@Outgoing("generated-name") // <1>
public Flowable<String> generate() { // <2>
return Flowable.interval(5, TimeUnit.SECONDS)
.map(tick -> CuteNameGenerator.generate());
}
}
----
<1> Instruct Reactive Messaging to dispatch the items from returned stream to `generated-name`
<2> The method returns a RX Java 2 stream (Flowable) emitting a random name every 5 seconds
The method returns a Reactive Stream. The generated items are sent to the stream named `generated-name`. This stream is mapped to Kafka using the application.properties file that we will create soon.
== Add honorifics
The name converter reads the names from Kafka, and transforms them, adding a random (English) honorific to the beginning of the name.
Create a new Java class in the same package called `NameConverter`. Use this code:
[source,java,role="copypaste"]
----
package org.acme.people.stream;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Broadcast;
@ApplicationScoped
public class NameConverter {
private static final String[] honorifics = {"Mr.", "Mrs.", "Sir", "Madam", "Lord", "Lady", "Dr.", "Professor", "Vice-Chancellor", "Regent", "Provost", "Prefect"};
@Incoming("names") // <1>
@Outgoing("my-data-stream") // <2>
@Broadcast // <3>
public String process(String name) {
String honorific = honorifics[(int)Math.floor(Math.random() * honorifics.length)];
return honorific + " " + name;
}
}
----
<1> Indicates that the method consumes the items from the `names` topic
<2> Indicates that the objects returned by the method are sent to the `my-data-stream` stream
<3> Indicates that the item are dispatched to all _subscribers_
The process method is called for every Kafka record from the `names` topic (configured in the application configuration). Every result is sent to the my-data-stream in-memory stream.
== Expose to front end
Finally, lets bind our stream to a JAX-RS resource. Create a new Java class in the same package called `NameResource`. Use this code:
[source,java,role="copypaste"]
----
package org.acme.people.stream;
import io.smallrye.reactive.messaging.annotations.Stream;
import org.reactivestreams.Publisher;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
/**
* A simple resource retrieving the in-memory "my-data-stream" and sending the items as server-sent events.
*/
@Path("/names")
public class NameResource {
@Inject
@Stream("my-data-stream") Publisher<String> names; // <1>
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS) // <2>
public Publisher<String> stream() { // <3>
return names;
}
}
----
<1> Injects the `my-data-stream` stream using the `@Stream` qualifier
<2> Indicates that the content is sent using _Server Sent Events_
<3> Returns the stream (Reactive Stream)
[NOTE]
====
There is a pre-created `names.html` page for you to use (in the `src/main/resources/META-INF/resources` directory) which will make a request to this `/names/stream` endpoint using standard JavaScript running in the browser and draw the resulting names using the https://d3js.org/[D3.js library,window=_blank]. The JavaScript that makes this call looks like this (do not copy this into anything!):
[source,javascript]
----
var source = new EventSource("/names/stream"); // <1>
source.onmessage = function (event) { // <2>
console.log("received new name: " + event.data);
// process new name in event.data
// ...
// update the display with the new name
update(); // <3>
};
----
<1> Uses your browser's support for the `EventSource` API (part of the W3C SSE standard) to call the endpoint
<2> Each time a message is received via SSE, _react_ to it by running this function
<3> Refresh the display using the D3.js library
====
== Configure application
We need to configure the Kafka connector. This is done in the `application.properties` file (in the `src/main/resources` directory). The keys are structured as follows:
`mp.messaging.[outgoing|incoming].{channel-name}.property=value`
* The `channel-name` segment must match the value set in the @`Incoming` and `@Outgoing` annotation:
* `generated-price` → sink in which we write the prices
* `prices` → source in which we read the prices
Add the following values to the `application.properties`:
[source,none,role="copypaste"]
----
# Configure the Kafka sink (we write to it)
%prod.mp.messaging.outgoing.generated-name.bootstrap.servers=names-cluster-kafka-bootstrap:9092
%prod.mp.messaging.outgoing.generated-name.connector=smallrye-kafka
%prod.mp.messaging.outgoing.generated-name.topic=names
%prod.mp.messaging.outgoing.generated-name.value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Configure the Kafka source (we read from it)
%prod.mp.messaging.incoming.names.bootstrap.servers=names-cluster-kafka-bootstrap:9092
%prod.mp.messaging.incoming.names.connector=smallrye-kafka
%prod.mp.messaging.incoming.names.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
----
We have prefixed these with `%prod` to avoid our app trying to connect when in `dev` or `test` mode.
More details about this configuration is available on the https://kafka.apache.org/documentation/#producerconfigs[Producer configuration] and https://kafka.apache.org/documentation/#consumerconfigs[Consumer configuration,window=_blank] section from the Kafka documentation.
[NOTE]
====
What about `my-data-stream`? This is an in-memory stream, not connected to a message broker.
====
== Rebuild Executable JAR
Now we are ready to run our application. Using the command palette, select **Create Executable JAR**. You should see a bunch of log output that ends with a `SUCCESS` message.
== Deploy
And now start the build using our executable JAR:
[source,sh,role="copypaste"]
----
oc start-build people --from-file target/*-runner.jar --follow
----
The build should take a minute or two to complete.
== Test
Our application should be up and running in a few seconds after the build completes and generating names. To see if it's working, run this command in a Terminal to generate the URL to the sample visualization of the stream of names being generated:
[source,sh,role="copypaste"]
----
clear; echo; echo http://$(oc get route people -o=go-template --template={% raw %}'{{ .spec.host }}'{% endraw %})/names.html ; echo
----
Open a separate browser tab and go to that URL and you should see a cloud of names updating every 5 seconds (it may take a few seconds for it to start!):
image::names.png[names,800]
These are the original names streamed through Kafka, altered to add a random honorific like "Sir" or "Madam", and displayed in a "word cloud" for you to enjoy!
== Congratulations!
This guide has shown how you can interact with Kafka using Quarkus. It utilizes MicroProfile Reactive Messaging to build data streaming applications.
If you want to go further check the documentation of https://smallrye.io/smallrye-reactive-messaging[SmallRye Reactive Messaging,window=_blank], the implementation used in Quarkus.