Files
quarkus-workshop/docs/kafka.adoc
jamesfalkner 5b259d17c6 more stuff
2019-07-03 17:20:28 -04:00

245 lines
11 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
## Reactive Streams with Quarkus and Kafka
In this exercise, you will use the Quarkus Kafka extension to build a streaming application using MicroProfile Reative Streams Messaging and https://kafka.apache.org[Apache Kafka], a distributed streaming platform. You will also use https://strimzi.io/[Strimzi], which provides an easy way to run an Apache Kafka cluster on Kubernetes using [Operators](https://operatorhub.io/what-is-an-operator).
### What is Apache Kafka?
Apache Kafka is a distributed streaming platform. A streaming platform has three key capabilities:
* Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
* Store streams of records in a fault-tolerant durable way.
* Process streams of records as they occur.
Kafka is generally used for two broad classes of applications:
* Building real-time streaming data pipelines that reliably get data between systems or applications
* Building real-time streaming applications that transform or react to the streams of data
### What is Strimzi?
Strimzi provides a way to run an Apache Kafka cluster on Kubernetes in various deployment configurations.
Strimzi is based on Apache Kafka, and makes it easy to run Apache Kafka on OpenShift or Kubernetes.
Strimzi provides three operators:
* Cluster Operator - Responsible for deploying and managing Apache Kafka clusters within an OpenShift or Kubernetes cluster.
* Topic Operator - Responsible for managing Kafka topics within a Kafka cluster running within an OpenShift or Kubernetes cluster.
* User Operator - Responsible for managing Kafka users within a Kafka cluster running within an OpenShift or Kubernetes cluster.
## The Goal
In this exercise, we are going to generate (random) names in one component. These names are written in a Kafka topic (`names`). A second component reads from the `names` Kafka topic and applies some magic conversion to the name (adding an honorific). The result is sent to an in-memory stream consumed by a JAX-RS resource. The data is sent to a browser using server-sent events and displayed in the browser.
::img
## Deploy Kafka via Strimzi
The Strimzi operator installs and manages Kafka clusters on Kubernetes. It's been pre-installed for you, so all you have to do is create a Kafka cluster inside your namespace.
First, open the {{CONSOLE_URL}}[OpenShift Console] and navigate to your project (`userNN-project`). Once there, on the left menu click on Catalog > Developer Catalog and type in `kafka` in the keyword filter box:
::img
These are all of the Kafka cluster elements you can install. Click on **Kafka**, and then click on **Create**. This will open a yaml file for you to configure the cluster before it's installed. Change the name of the cluster from `my-cluster` to `names-cluster` (under the _metadata_ section of the YAML file). Leave all other values as-is, and click **Create**:
::img
This will create a new Kafka Kubernetes object in your namespace, triggering the Operator to deploy Kafka.
After clicking **Create** you will be taken to the list of objects created by the Kafka operator.
Next, click on the **Kafka Topic** tab, and then click **Create Kafka Topic**. We'll need to create a topic for our application to stream to and from, so in the YAML:
* Change the _metadata > names_ value from `my-topic` to `names`.
* Change the vale of the `strimzi.io/cluster` label from `my-cluster` to `names-cluster`
Then click **Create**.
::img
This will cause the Operator to provision a new Topic in the Kafka cluster.
Verify that the Kafka and Zookeeper pods are starting up by navigating in the left menu _Workloads > Pods_. You should see a number of pods for kafka and zookeeper (along with your previous application deployments). Don't worry if they're not all in the _Running_ status, they will eventually complete and we'll use them later on in this exercise.
## Add Quarkus Kafka Extension
With Kafka installing, turn your attention back to the app. Like other exercises, we'll need another extension to integrate with Kafka. Install it with:
[source,sh,role="copypaste"]
----
mvn quarkus:add-extension -Dextensions="kafka"
----
This will add the necessary entries in your `pom.xml` to bring in the Kafka extension.
## Create name generator
To start building the app, create a new Java class in the `org.acme.people.stream` called `NameGenerator`. This class will generate random names and publish them to our Kafka topic for further processing. Use this code:
[source,java,role="copypaste"]
----
package org.acme.people.stream;
import io.reactivex.Flowable;
import javax.enterprise.context.ApplicationScoped;
import org.acme.people.utils.CuteNameGenerator;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import java.util.concurrent.TimeUnit;
@ApplicationScoped
public class NameGenerator {
@Outgoing("generated-name") // <1>
public Flowable<String> generate() { // <2>
return Flowable.interval(5, TimeUnit.SECONDS)
.map(tick -> CuteNameGenerator.generate());
}
}
----
<1> Instruct Reactive Messaging to dispatch the items from returned stream to `generated-name`
<2> The method returns a RX Java 2 stream (Flowable) emitting a random name every 5 seconds
The method returns a Reactive Stream. The generated items are sent to the stream named `generated-name`. This stream is mapped to Kafka using the application.properties file that we will create soon.
## Add honorifics
The name converter reads the names from Kafka, and transforms them, adding a random (English) honorific to the beginning of the name.
Create a new Java class in the same package called `NameConverter`. Use this code:
[source,java,role="copypaste"]
----
package org.acme.people.stream;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Broadcast;
@ApplicationScoped
public class NameConverter {
private static final String[] honorifics = {"Mr.", "Mrs.", "Sir", "Madam", "Lord", "Lady", "Dr.", "Professor", "Vice-Chancellor", "Regent", "Provost", "Prefect"};
@Incoming("names") // <1>
@Outgoing("my-data-stream") // <2>
@Broadcast // <3>
public String process(String name) {
String honorific = honorifics[(int)Math.floor(Math.random() * honorifics.length)];
return honorific + " " + name;
}
}
----
<1> Indicates that the method consumes the items from the `names` topic
<2> Indicates that the objects returned by the method are sent to the `my-data-stream` stream
<3> Indicates that the item are dispatched to all _subscribers_
The process method is called for every Kafka record from the `names` topic (configured in the application configuration). Every result is sent to the my-data-stream in-memory stream.
## Expose to front end
Finally, lets bind our stream to a JAX-RS resource. Create a new Java class in the same package called `NameResource`. Use this code:
[source,java,role="copypaste"]
----
package org.acme.people.stream;
import io.smallrye.reactive.messaging.annotations.Stream;
import org.reactivestreams.Publisher;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
/**
* A simple resource retrieving the in-memory "my-data-stream" and sending the items as server-sent events.
*/
@Path("/names")
public class NameResource {
@Inject
@Stream("my-data-stream") Publisher<String> names; // <1>
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS) // <2>
public Publisher<String> stream() { // <3>
return names;
}
}
----
<1> Injects the `my-data-stream` stream using the `@Stream` qualifier
<2> Indicates that the content is sent using _Server Sent Events_
<3> Returns the stream (Reactive Stream)
## Configure application
We need to configure the Kafka connector. This is done in the `application.properties` file (in the `src/main/resources` directory). The keys are structured as follows:
`mp.messaging.[outgoing|incoming].{channel-name}.property=value`
* The `channel-name` segment must match the value set in the @`Incoming` and `@Outgoing` annotation:
* `generated-price` → sink in which we write the prices
* `prices` → source in which we read the prices
Add the following values to the `application.properties`:
[source,none,role="copypaste"]
----
# Configure the Kafka sink (we write to it)
%prod.mp.messaging.outgoing.generated-name.bootstrap.servers=names-cluster-kafka-bootstrap:9092
%prod.mp.messaging.outgoing.generated-name.connector=smallrye-kafka
%prod.mp.messaging.outgoing.generated-name.topic=names
%prod.mp.messaging.outgoing.generated-name.value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Configure the Kafka source (we read from it)
%prod.mp.messaging.incoming.names.bootstrap.servers=names-cluster-kafka-bootstrap:9092
%prod.mp.messaging.incoming.names.connector=smallrye-kafka
%prod.mp.messaging.incoming.names.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
----
We have prefixed these with `%prod` to avoid our app trying to connect when in `dev` or `test` mode.
More details about this configuration is available on the https://kafka.apache.org/documentation/#producerconfigs[Producer configuration] and https://kafka.apache.org/documentation/#consumerconfigs[Consumer configuration] section from the Kafka documentation.
[NOTE]
====
What about `my-data-stream`? This is an in-memory stream, not connected to a message broker.
====
## Rebuild Executable JAR
Now we are ready to run our application. Using the command palette, select **Build Executable JAR**. You should see a bunch of log output that ends with a `SUCCESS` message.
## Deploy
And now start the build using our executable JAR:
[source,sh,role="copypaste"]
----
oc start-build people --from-file target/*-runner.jar --follow
----
## Test
Our application should be up and running and generating names. To see if it's working, run this command in a Terminal to generate the URL to the sample visualization of the stream of names being generated:
[source,sh,role="copypaste"]
----
echo http://$(oc get route people -o=go-template --template='{{ .spec.host }}')/names.html
----
Open a separate browser tab and go to that URL and you should see a cloud of names updating every 5 seconds:
::img
These are the original names streamed through Kafka, altered to add a random honorific, and displayed in a "word cloud" for you to enjoy!
## Congratulations
This guide has shown how you can interact with Kafka using Quarkus. It utilizes MicroProfile Reactive Messaging to build data streaming applications.
If you want to go further check the documentation of https://smallrye.io/smallrye-reactive-messaging[SmallRye Reactive Messaging], the implementation used in Quarkus.