From 41919fd2a9c19efbef6e54c72e675bd50a02cccb Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Fri, 19 Jun 2020 20:27:48 +0200 Subject: [PATCH] SE Messaging doc (#2029) * SE Messaging doc Signed-off-by: Daniel Kec --- .../mp/reactivemessaging/01_introduction.adoc | 1 + docs/mp/reactivemessaging/02_message.adoc | 26 ++- docs/mp/reactivemessaging/03_connector.adoc | 23 +- docs/mp/reactivemessaging/04_kafka.adoc | 88 ++++++++ .../se/reactivemessaging/01_introduction.adoc | 111 +++++++++- docs/se/reactivemessaging/03_connector.adoc | 198 ++++++++++++++++++ docs/se/reactivemessaging/04_kafka.adoc | 166 +++++++++++++++ docs/shared/reactivestreams/02_engine.adoc | 3 - 8 files changed, 592 insertions(+), 24 deletions(-) create mode 100644 docs/mp/reactivemessaging/04_kafka.adoc create mode 100644 docs/se/reactivemessaging/03_connector.adoc create mode 100644 docs/se/reactivemessaging/04_kafka.adoc diff --git a/docs/mp/reactivemessaging/01_introduction.adoc b/docs/mp/reactivemessaging/01_introduction.adoc index 7db07fea9..4c9fd90c3 100644 --- a/docs/mp/reactivemessaging/01_introduction.adoc +++ b/docs/mp/reactivemessaging/01_introduction.adoc @@ -22,6 +22,7 @@ :spec-name: MicroProfile Reactive Messaging :description: {spec-name} support in Helidon MP :keywords: helidon, mp, microprofile, messaging +:h1Prefix: MP == Reactive Messaging diff --git a/docs/mp/reactivemessaging/02_message.adoc b/docs/mp/reactivemessaging/02_message.adoc index 796f98761..7e36299c3 100644 --- a/docs/mp/reactivemessaging/02_message.adoc +++ b/docs/mp/reactivemessaging/02_message.adoc @@ -17,6 +17,11 @@ /////////////////////////////////////////////////////////////////////////////// = Message +:toc: +:toc-placement: preamble +:description: Reactive Messaging Message in Helidon MP +:keywords: helidon, mp, messaging, message +:h1Prefix: MP == Message The Reactive Messaging @@ -69,7 +74,7 @@ https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/mi @Outgoing("consume-and-ack") public PublisherBuilder streamOfMessages() { return ReactiveStreams.of(Message.of("This is Payload", () -> { - System.out.println("This articular message was acked!"); + System.out.println("This particular message was acked!"); return CompletableFuture.completedFuture(null); })).buildRs(); } @@ -77,10 +82,10 @@ public PublisherBuilder streamOfMessages() { @Incoming("consume-and-ack") @Acknowledgment(Acknowledgment.Strategy.MANUAL) public void receiveAndAckMessage(Message msg) { - // Calling ack() will print "This articular message was acked!" to System.out - msg.ack(); + msg.ack();<1> } ---- +<1> Calling ack() will print "This particular message was acked!" to System.out [source,java] .Example of manual acknowledgment @@ -88,7 +93,7 @@ public void receiveAndAckMessage(Message msg) { @Outgoing("consume-and-ack") public PublisherBuilder streamOfMessages() { return ReactiveStreams.of(Message.of("This is Payload", () -> { - System.out.println("This articular message was acked!"); + System.out.println("This particular message was acked!"); return CompletableFuture.completedFuture(null); })).buildRs(); } @@ -96,24 +101,25 @@ public PublisherBuilder streamOfMessages() { @Incoming("consume-and-ack") @Acknowledgment(Acknowledgment.Strategy.MANUAL) public void receiveAndAckMessage(Message msg) { - // Calling ack() will print "This articular message was acked!" to System.out - msg.ack(); + msg.ack();<1> } ---- +<1> Calling ack() will print "This particular message was acked!" to System.out + [source,java] .Example of explicit pre-process acknowledgment ---- @Outgoing("consume-and-ack") public PublisherBuilder streamOfMessages() { return ReactiveStreams.of(Message.of("This is Payload", () -> { - System.out.println("This articular message was acked!"); + System.out.println("This particular message was acked!"); return CompletableFuture.completedFuture(null); })).buildRs(); } /** * Prints to the console: -* > This articular message was acked! +* > This particular message was acked! * > Method invocation! */ @Incoming("consume-and-ack") @@ -128,7 +134,7 @@ public void receiveAndAckMessage(Message msg) { @Outgoing("consume-and-ack") public PublisherBuilder streamOfMessages() { return ReactiveStreams.of(Message.of("This is Payload", () -> { - System.out.println("This articular message was acked!"); + System.out.println("This particular message was acked!"); return CompletableFuture.completedFuture(null); })).buildRs(); } @@ -136,7 +142,7 @@ public PublisherBuilder streamOfMessages() { /** * Prints to the console: * > Method invocation! -* > This articular message was acked! +* > This particular message was acked! */ @Incoming("consume-and-ack") @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) diff --git a/docs/mp/reactivemessaging/03_connector.adoc b/docs/mp/reactivemessaging/03_connector.adoc index 24a8f67a3..a64654bc2 100644 --- a/docs/mp/reactivemessaging/03_connector.adoc +++ b/docs/mp/reactivemessaging/03_connector.adoc @@ -17,8 +17,13 @@ /////////////////////////////////////////////////////////////////////////////// = Connector +:toc: +:toc-placement: preamble +:description: Reactive Messaging Connector in Helidon MP +:keywords: helidon, mp, messaging, connector +:h1Prefix: MP -== Connector Bean +== Messaging Connector Bean Messaging connector is just an application scoped bean which implements `IncomingConnectorFactory`, `OutgoingConnectorFactory` or both. @@ -89,8 +94,7 @@ public class ExampleConnector implements IncomingConnectorFactory { @Override public PublisherBuilder> getPublisherBuilder(final Config config) { - // Config context is merged from channel and connector contexts - String firstPropValue = config.getValue("first-test-prop", String.class); + String firstPropValue = config.getValue("first-test-prop", String.class);<1> String secondPropValue = config.getValue("second-test-prop", String.class); return ReactiveStreams.of(firstPropValue, secondPropValue) @@ -98,17 +102,18 @@ public class ExampleConnector implements IncomingConnectorFactory { } } ---- +<1> Config context is merged from channel and connector contexts [source,yaml] .Example of channel to connector mapping config with custom properties: ---- -# Channel -> Connector mapping -mp.messaging.incoming.from-connector-channel.connector: example-connector -# Channel configuration properties -mp.messaging.incoming.from-connector-channel.first-test-prop: foo -# Connector configuration properties -mp.messaging.connector.example-connector.second-test-prop: bar +mp.messaging.incoming.from-connector-channel.connector: example-connector<1> +mp.messaging.incoming.from-connector-channel.first-test-prop: foo<2> +mp.messaging.connector.example-connector.second-test-prop: bar<3> ---- +<1> Channel -> Connector mapping +<2> Channel configuration properties +<3> Connector configuration properties [source,java] .Example consuming from connector: diff --git a/docs/mp/reactivemessaging/04_kafka.adoc b/docs/mp/reactivemessaging/04_kafka.adoc new file mode 100644 index 000000000..663b0f2ce --- /dev/null +++ b/docs/mp/reactivemessaging/04_kafka.adoc @@ -0,0 +1,88 @@ +/////////////////////////////////////////////////////////////////////////////// + + Copyright (c) 2020 Oracle and/or its affiliates. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +/////////////////////////////////////////////////////////////////////////////// + += Kafka Connector +:toc: +:toc-placement: preamble +:description: Reactive Messaging support for Kafka in Helidon MP +:keywords: helidon, mp, messaging, kafka +:h1Prefix: MP + +== Reactive Kafka Connector +Connecting streams to Kafka with Reactive Messaging couldn't be easier. + +[source,xml] +.Dependencies needed: +---- + + io.helidon.microprofile.messaging + helidon-microprofile-messaging + + + io.helidon.messaging.kafka + helidon-messaging-kafka + +---- + +[source,yaml] +.Example of connector config: +---- +mp.messaging: + + incoming.from-kafka: + connector: helidon-kafka + topic: messaging-test-topic-1 + auto.offset.reset: latest + enable.auto.commit: true + group.id: example-group-id + + outgoing.to-kafka: + connector: helidon-kafka + topic: messaging-test-topic-1 + + connector: + helidon-kafka: + bootstrap.servers: localhost:9092 + key.serializer: org.apache.kafka.common.serialization.StringSerializer + value.serializer: org.apache.kafka.common.serialization.StringSerializer + key.deserializer: org.apache.kafka.common.serialization.StringDeserializer + value.deserializer: org.apache.kafka.common.serialization.StringDeserializer +---- + +[source,java] +.Example of consuming from Kafka: +---- +@Incoming("from-kafka") +public void consumeKafka(String msg) { + System.out.println("Kafka says: " + msg); +} +---- + + +[source,java] +.Example of producing to Kafka: +---- +@Outgoing("to-kafka") +public PublisherBuilder produceToKafka() { + return ReactiveStreams.of("test1", "test2"); +} +---- + +Don't forget to check out the examples with pre-configured Kafka docker image, for easy testing: + +* https://github.com/oracle/helidon/tree/master/examples/messaging \ No newline at end of file diff --git a/docs/se/reactivemessaging/01_introduction.adoc b/docs/se/reactivemessaging/01_introduction.adoc index 4d69ccea3..86f035c8e 100644 --- a/docs/se/reactivemessaging/01_introduction.adoc +++ b/docs/se/reactivemessaging/01_introduction.adoc @@ -20,6 +20,113 @@ :toc: :toc-placement: preamble :description: Reactive Messaging support in Helidon SE -:keywords: helidon, mp, messaging +:keywords: helidon, se, messaging +:h1Prefix: SE -== This page is Under Construction and will be available soon +== Reactive Messaging + +Asynchronous messaging is a commonly used form of communication in the world of microservices. +While its possible to start building your reactive streams directly by combining operators and +connecting them to reactive APIs, with Helidon SE Reactive Messaging, you can now use prepared +tools for repetitive use case scenarios . + +For example connecting your streams to external services usually requires a lot of boiler-plate +code for configuration handling, backpressure propagation, acknowledgement and more. + +For such tasks there is a system of connectors, emitters and means to orchestrate them in Helidon, +called *Reactive Messaging*. It's basically an API for connecting and configuring +Connectors and Emitters with your reactive streams thru so called <>. + +You may wonder how *Reactive Messaging* relates to +<>. +As the making of connectors or even configuring them can be repetitive task leading to +the same results, Helidon SE Reactive Messaging supports very same configuration format +for connectors as its MicroProfile counterpart does. Also, MP Connectors are reusable in +Helidon SE Messaging with some limitation(there is no CDI in Helidon SE). +All Messaging connectors in Helidon are made to be universally usable by Helidon MP and SE. + +=== Channel +Channel is a named pair of `Publisher` and `Subscriber`. Channels can be connected together by +<>. Registering of `Publisher` or `Subscriber` for a channel can be done +by Messaging API, or configured implicitly for using registered <> +for generating such `Publisher` or `Subscriber`. + +[source,java] +.Example of simple channel: +---- +Channel channel1 = Channel.create("channel1"); + +Messaging.builder() + .publisher(channel1, Multi.just("message 1", "message 2") + .map(Message::of)) + .listener(channel1, s -> System.out.println("Intecepted message " + s)) + .build() + .start(); +---- + +=== Processor +Processor is a typical reactive processor acting as a `Subscriber` to upstream and as a `Publisher` +to downstream. In terms of reactive messaging it is able to connect two <> to one +reactive stream. + +[source,java] +.Example of processor usage: +---- +Channel firstChannel = Channel.create("first-channel"); +Channel secondChannel = Channel.create("second-channel"); + +Messaging.builder() + .publisher(secondChannel, ReactiveStreams.of("test1", "test2", "test3") + .map(Message::of)) + .processor(secondChannel, firstChannel, ReactiveStreams.>builder() + .map(Message::getPayload) + .map(String::toUpperCase) + .map(Message::of) + ) + .subscriber(firstChannel, ReactiveStreams.>builder() + .peek(Message::ack) + .map(Message::getPayload) + .forEach(s -> System.out.println("Consuming message " + s))) + .build() + .start(); + +>Consuming message TEST1 +>Consuming message TEST2 +>Consuming message TEST3 +---- + +=== Message +Reactive Messaging in Helidon SE uses the same concept of +<> as MicroProfile messaging. +The only notable difference is that SE Messaging does almost no implicit or automatic +acknowledgement due to _no magic_ philosophy of Helidon SE. + +Only exception to this are variants of methods `Messaging.Builder#listener` and +`Messaging.Builder#processor` with consumer or function params, conveniently unwrapping payload +for you. After such implicit unwrapping is not possible to do a manual acknowledgement, therefore +implicit ack before callback is executed is necessary. + +=== Connector +Connector concept is a way for connecting <> to external sources. +To make <> +as easy and versatile as possible, Helidon SE Messaging uses same API for connectors +like <> does. +This allows connectors to be usable in both flavors of Helidon with one limitation which is +that connector has to be able to work without CDI. + +Example of such a versatile connector is Helidon's own: + + * <> + + +=== Dependency + +Declare the following dependency in your project: + +[source,xml] +---- + + io.helidon.messaging + helidon-messaging + +---- \ No newline at end of file diff --git a/docs/se/reactivemessaging/03_connector.adoc b/docs/se/reactivemessaging/03_connector.adoc new file mode 100644 index 000000000..3846befdc --- /dev/null +++ b/docs/se/reactivemessaging/03_connector.adoc @@ -0,0 +1,198 @@ +/////////////////////////////////////////////////////////////////////////////// + + Copyright (c) 2020 Oracle and/or its affiliates. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +/////////////////////////////////////////////////////////////////////////////// + += Connector +:toc: +:toc-placement: preamble +:description: Reactive Messaging Connector in Helidon SE +:keywords: helidon, se, messaging, connector +:h1Prefix: SE + +== Messaging Connector +Connector for Reactive Messaging is a factory producing Publishers and Subscribers for +Channels in Reactive Messaging. Messaging connector is just an implementation of +`IncomingConnectorFactory`, `OutgoingConnectorFactory` or both. + +[source,java] +.Example connector `example-connector`: +---- +@Connector("example-connector") +public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory { + + @Override + public PublisherBuilder> getPublisherBuilder(Config config) { + return ReactiveStreams.of("foo", "bar") + .map(Message::of); + } + + @Override + public SubscriberBuilder, Void> getSubscriberBuilder(Config config) { + return ReactiveStreams.>builder() + .map(Message::getPayload) + .forEach(o -> System.out.println("Connector says: " + o)); + } +} +---- + +[source,yaml] +.Example of channel to connector mapping config: +---- +mp.messaging.outgoing.to-connector-channel.connector: example-connector +mp.messaging.incoming.from-connector-channel.connector: example-connector +---- + +[source,java] +.Example producing to connector: +---- +Messaging.builder() + .connector(new ExampleConnector()) + .publisher(Channel.create("to-connector-channel"), + ReactiveStreams.of("fee", "fie") + .map(Message::of) + ) + .build() + .start(); + +> Connector says: fee +> Connector says: fie +---- + + +[source,java] +.Example consuming from connector: +---- +Messaging.builder() + .connector(new ExampleConnector()) + .subscriber(Channel.create("from-connector-channel"), + ReactiveStreams.>builder() + .peek(Message::ack) + .map(Message::getPayload) + .forEach(s -> System.out.println("Consuming: " + s)) + ) + .build() + .start(); + +> Consuming: foo +> Consuming: bar +---- + +=== Configuration +Messaging connector in Helidon SE can be configured explicitly by API or implicitly +by config following notation of link:https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_configuration[MicroProfile Reactive Messaging]. + +Configuration is being supplied to connector by Messaging implementation, +two mandatory attributes are always present: + +* `channel-name` name of the channel which has this connector configured as Publisher or Subscriber, `Channel.create('name-of-channel')` in case of explicit configuration or `mp.messaging.incoming.name-of-channel.connector: connector-name` in case of implicit config +* `connector` name of the connector `@Connector("connector-name")` + +[source,java] +.Example connector accessing configuration: +---- +@Connector("example-connector") +public class ExampleConnector implements IncomingConnectorFactory { + + @Override + public PublisherBuilder> getPublisherBuilder(final Config config) { + + String firstPropValue = config.getValue("first-test-prop", String.class);<1> + String secondPropValue = config.getValue("second-test-prop", String.class); + + return ReactiveStreams.of(firstPropValue, secondPropValue) + .map(Message::of); + } +} +---- +<1> Config context is merged from channel and connector contexts + +==== Explicit Config + +An explicit config for channel's publisher is possible with `Channel.Builder#publisherConfig(Config config)` +and for subscriber with `Channel.Builder#subscriberConfig(Config config)`. +Supplied <> is merged with +mandatory attributes and any implicit config found. Resulting config is served to Connector. + +[source,java] +.Example consuming from Kafka connector with explicit config: +---- +String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get(); +String topic = config.get("app.kafka.topic").asString().get(); + +Channel fromKafka = Channel.builder()<1><2> + .name("from-kafka") + .publisherConfig(KafkaConnector.configBuilder() + .bootstrapServers(kafkaServer) + .groupId("example-group-" + session.getId()) + .topic(topic) + .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST) + .enableAutoCommit(true) + .keyDeserializer(StringDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .build() + ) + .build(); + +KafkaConnector kafkaConnector = KafkaConnector.create();<3> + +Messaging messaging = Messaging.builder() + .connector(kafkaConnector) + .listener(fromKafka, payload -> { + System.out.println("Kafka says: " + payload); + }) + .build() + .start(); +---- +<1> Prepare channel for connecting kafka connector with specific publisher configuration -> listener, +<2> Channel -> connector mapping is automatic when using `KafkaConnector.configBuilder()` +<3> Prepare Kafka connector, can be used by any channel + +==== Implicit Config +Implicit config without any hard-coding is possible with <> following notation of link:https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_configuration[MicroProfile Reactive Messaging]. + +[source,yaml] +.Example of channel to connector mapping config with custom properties: +---- +mp.messaging.incoming.from-connector-channel.connector: example-connector<1> +mp.messaging.incoming.from-connector-channel.first-test-prop: foo<2> +mp.messaging.connector.example-connector.second-test-prop: bar<3> +---- +<1> Channel -> Connector mapping +<2> Channel configuration properties +<3> Connector configuration properties + +[source,java] +.Example consuming from connector: +---- +Messaging.builder() + .connector(new ExampleConnector()) + .listener(Channel.create("from-connector-channel"), + s -> System.out.println("Consuming: " + s)) + .build() + .start(); + +> Consuming: foo +> Consuming: bar +---- + +=== Reusability in MP Messaging +As the API is the same for <> +connectors, all that is needed to make connector work in both ways is annotating it with +`@ApplicationScoped`. Such connector is treated as a bean in Helidon MP. + +For specific informations about creating messaging connectors for Helidon MP visit +<>. \ No newline at end of file diff --git a/docs/se/reactivemessaging/04_kafka.adoc b/docs/se/reactivemessaging/04_kafka.adoc new file mode 100644 index 000000000..a46f77309 --- /dev/null +++ b/docs/se/reactivemessaging/04_kafka.adoc @@ -0,0 +1,166 @@ +/////////////////////////////////////////////////////////////////////////////// + + Copyright (c) 2020 Oracle and/or its affiliates. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +/////////////////////////////////////////////////////////////////////////////// + += Kafka Connector +:toc: +:toc-placement: preamble +:description: Reactive Messaging support for Kafka in Helidon SE +:keywords: helidon, se, messaging, kafka +:h1Prefix: SE + +== Reactive Kafka Connector +Connecting streams to Kafka with Reactive Messaging couldn't be easier. + +[source,xml] +.Dependencies needed: +---- + + io.helidon.messaging + helidon-messaging + + + io.helidon.messaging.kafka + helidon-messaging-kafka + +---- + +=== Explicit config with config builder + +[source,java] +.Example of consuming from Kafka: +---- +String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get(); +String topic = config.get("app.kafka.topic").asString().get(); + +Channel fromKafka = Channel.builder()<1><2> + .name("from-kafka") + .publisherConfig(KafkaConnector.configBuilder() + .bootstrapServers(kafkaServer) + .groupId("example-group-" + session.getId()) + .topic(topic) + .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST) + .enableAutoCommit(true) + .keyDeserializer(StringDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .build() + ) + .build(); + +KafkaConnector kafkaConnector = KafkaConnector.create();<3> + +Messaging messaging = Messaging.builder() + .connector(kafkaConnector) + .listener(fromKafka, payload -> { + System.out.println("Kafka says: " + payload); + }) + .build() + .start(); +---- +<1> Prepare a channel for connecting kafka connector with specific publisher configuration -> listener +<2> Channel -> connector mapping is automatic when using KafkaConnector.configBuilder() +<3> Prepare Kafka connector, can be used by any channel + +[source,java] +.Example of producing to Kafka: +---- +String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get(); +String topic = config.get("app.kafka.topic").asString().get(); + +Channel toKafka = Channel.builder()<1><2> + .subscriberConfig(KafkaConnector.configBuilder() + .bootstrapServers(kafkaServer) + .topic(topic) + .keySerializer(StringSerializer.class) + .valueSerializer(StringSerializer.class) + .build() + ).build(); + +KafkaConnector kafkaConnector = KafkaConnector.create();<3> + +messaging = Messaging.builder() + .publisher(toKafka, Multi.just("test1", "test2").map(Message::of)) + .connector(kafkaConnector) + .build() + .start(); +---- +<1> Prepare a channel for connecting kafka connector with specific publisher configuration -> listener +<2> Channel -> connector mapping is automatic when using KafkaConnector.configBuilder() +<3> Prepare Kafka connector, can be used by any channel + +=== Implicit Helidon Config + +[source,yaml] +.Example of connector config: +---- +mp.messaging: + + incoming.from-kafka: + connector: helidon-kafka + topic: messaging-test-topic-1 + auto.offset.reset: latest + enable.auto.commit: true + group.id: example-group-id + + outgoing.to-kafka: + connector: helidon-kafka + topic: messaging-test-topic-1 + + connector: + helidon-kafka: + bootstrap.servers: localhost:9092 + key.serializer: org.apache.kafka.common.serialization.StringSerializer + value.serializer: org.apache.kafka.common.serialization.StringSerializer + key.deserializer: org.apache.kafka.common.serialization.StringDeserializer + value.deserializer: org.apache.kafka.common.serialization.StringDeserializer +---- + +[source,java] +.Example of consuming from Kafka: +---- +Channel fromKafka = Channel.create("from-kafka"); + +KafkaConnector kafkaConnector = KafkaConnector.create();<1> + +Messaging messaging = Messaging.builder() + .connector(kafkaConnector) + .listener(fromKafka, payload -> { + System.out.println("Kafka says: " + payload); + }) + .build() + .start(); +---- +<1> Prepare Kafka connector, can be used by any channel + +[source,java] +.Example of producing to Kafka: +---- +Channel toKafka = Channel.create("to-kafka"); + +KafkaConnector kafkaConnector = KafkaConnector.create();<1> + +messaging = Messaging.builder() + .publisher(toKafka, Multi.just("test1", "test2").map(Message::of)) + .connector(kafkaConnector) + .build() + .start(); +---- +<1> Prepare Kafka connector, can be used by any channel + +Don't forget to check out the examples with pre-configured Kafka docker image, for easy testing: + +* https://github.com/oracle/helidon/tree/master/examples/messaging \ No newline at end of file diff --git a/docs/shared/reactivestreams/02_engine.adoc b/docs/shared/reactivestreams/02_engine.adoc index 04166aab8..6d026352b 100644 --- a/docs/shared/reactivestreams/02_engine.adoc +++ b/docs/shared/reactivestreams/02_engine.adoc @@ -22,9 +22,6 @@ Helidon has its own set of reactive operators that have no dependencies outside These operators can be used with `java.util.concurrent.Flow` based reactive streams. Stream processing operator chain can be easily constructed by `io.helidon.common.reactive.Multi`, or `io.helidon.common.reactive.Single` for streams with single value. -Implementation was contributed to Helidon by the world-renown reactive programming expert, -project lead of RxJava and co-father of project Reactor, -https://twitter.com/akarnokd[Dr. David Karnok]. [source,java] .Example of Multi usage: