diff --git a/.gitignore b/.gitignore index 000016a72..a0f4b1e0c 100644 --- a/.gitignore +++ b/.gitignore @@ -64,6 +64,9 @@ build/ node_modules/ node/ +# Helidon CLI +.helidon + # Other *~ user.txt diff --git a/dependencies/pom.xml b/dependencies/pom.xml index 3508b3b1d..6fc300452 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -118,9 +118,11 @@ 1.0.6 2.12.5 2.12.0 - 2.3.1 + 2.5.0 3.2.1 1.0.3 + 2.12.10 + 3.5.7 @@ -924,8 +926,22 @@ org.apache.kafka - kafka_2.11 + kafka_2.12 ${version.lib.kafka} + + + org.apache.zookeeper + zookeeper + + + org.scala-lang + scala-library + + + org.scala-lang + scala-reflect + + @@ -1033,6 +1049,22 @@ com.salesforce.kafka.test kafka-junit5 ${version.lib.kafka-junit5} + + + org.apache.zookeeper + zookeeper + + + + + org.apache.zookeeper + zookeeper + ${version.lib.zookeeper} + + + org.scala-lang + scala-library + ${version.lib.scala} org.reactivestreams diff --git a/examples/messaging/README.md b/examples/messaging/README.md new file mode 100644 index 000000000..0a1e49ecb --- /dev/null +++ b/examples/messaging/README.md @@ -0,0 +1,28 @@ +# Helidon Messaging with Kafka Examples + +## Prerequisites +* Docker +* Java 11+ + +### Test Kafka server +To make examples easily runnable, +small, pocket size and pre-configured testing Kafka server Docker image is available. + +* To run it locally: `./kafkaRun.sh` + * Pre-configured topics: + * `messaging-test-topic-1` + * `messaging-test-topic-2` + * Stop it with `Ctrl+c` + +* Send messages manually with: `./kafkaProduce.sh [topic-name]` +* Consume messages manually with: `./kafkaConsume.sh [topic-name]` + +## Helidon SE Reactive Messaging with Kafka Example +For demonstration of Helidon SE Messaging with Kafka connector, +continue to [Kafka with WebSocket SE Example](kafka-websocket-se/README.md) + +## Helidon MP Reactive Messaging with Kafka Example +For demonstration of Helidon MP Messaging with Kafka connector, +continue to [Kafka with WebSocket MP Example](kafka-websocket-mp/README.md) + + diff --git a/examples/messaging/docker/Dockerfile.kafka b/examples/messaging/docker/Dockerfile.kafka new file mode 100644 index 000000000..bae2a3771 --- /dev/null +++ b/examples/messaging/docker/Dockerfile.kafka @@ -0,0 +1,39 @@ +# +# Copyright (c) 2019, 2020 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM openjdk:8-jre-alpine + +ENV VERSION=2.5.0 + +RUN apk add --no-cache bash curl jq + +# Find closest mirror, download and extract Kafka +RUN MIRROR=$(curl -s 'https://www.apache.org/dyn/closer.cgi?as_json=1' | jq -r '.http[0]') \ +&& wget -q -O kafka.tar.gz ${MIRROR}kafka/${VERSION}/kafka_2.12-${VERSION}.tgz \ +&& tar -xzf kafka.tar.gz -C /opt && rm kafka.tar.gz \ +&& mv /opt/kafka* /opt/kafka + +WORKDIR /opt/kafka + +COPY start_kafka.sh start_kafka.sh +COPY init_topics.sh init_topics.sh + +RUN chmod a+x ./*.sh + +# Expose Zookeeper and Kafka ports +EXPOSE 2181 9092 + +CMD bash start_kafka.sh diff --git a/examples/messaging/docker/init_topics.sh b/examples/messaging/docker/init_topics.sh new file mode 100644 index 000000000..01be423ca --- /dev/null +++ b/examples/messaging/docker/init_topics.sh @@ -0,0 +1,46 @@ +#!/bin/bash +# +# Copyright (c) 2020 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Wait for Kafka to start and create test topics: +# topic messaging-test-topic-1 and topic messaging-test-topic-2 +# + +ZOOKEEPER_URL=localhost:2181 +KAFKA_TOPICS="/opt/kafka/bin/kafka-topics.sh --if-not-exists --zookeeper $ZOOKEEPER_URL" + +while sleep 2; do + brokers=$(echo dump | nc localhost 2181 | grep brokers) + echo "Checking if Kafka is up: ${brokers}" + if [[ -z $brokers ]]; then + echo "KAFKA IS UP !!!" + + echo "Creating test topics" + bash "$KAFKA_TOPICS" \ + --create \ + --replication-factor 1 \ + --partitions 10 \ + --topic messaging-test-topic-1 + bash "$KAFKA_TOPICS" \ + --create \ + --replication-factor 1 \ + --partitions 10 \ + --topic messaging-test-topic-2 + + exit 0 + fi +done diff --git a/examples/messaging/docker/start_kafka.sh b/examples/messaging/docker/start_kafka.sh new file mode 100644 index 000000000..c34d37bee --- /dev/null +++ b/examples/messaging/docker/start_kafka.sh @@ -0,0 +1,46 @@ +#!/bin/bash +# +# Copyright (c) 2020 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Start Zookeeper, wait for it to come up and start Kafka. +# + +# Allow ruok +echo "4lw.commands.whitelist=*" >>/opt/kafka/config/zookeeper.properties + +# Start Zookeeper +/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties & + +while sleep 2; do + isOk=$(echo ruok | nc localhost 2181) + echo "Checking if Zookeeper is up: ${isOk}" + if [ "${isOk}" = "imok" ]; then + echo "ZOOKEEPER IS UP !!!" + break + fi +done + +# Create test topics when Kafka is ready +/opt/kafka/init_topics.sh & + +# Start Kafka +/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties +state=$? +if [ $state -ne 0 ]; then + echo "Kafka stopped." + exit $state +fi diff --git a/examples/messaging/kafka-websocket-mp/README.md b/examples/messaging/kafka-websocket-mp/README.md new file mode 100644 index 000000000..5f3374419 --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/README.md @@ -0,0 +1,11 @@ +# Helidon MP Reactive Messaging with Kafka Example + +## Prerequisites +* Docker +* Java 11+ +* [Kafka bootstrap server](../README.md) running on `localhost:9092` + +## Build & Run +1. `mvn clean install` +2. `java -jar mp-example.jar` +3. Visit http://localhost:7001 \ No newline at end of file diff --git a/examples/messaging/kafka-websocket-mp/pom.xml b/examples/messaging/kafka-websocket-mp/pom.xml new file mode 100644 index 000000000..6d82bf80c --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/pom.xml @@ -0,0 +1,86 @@ + + + + + 4.0.0 + + io.helidon.applications + helidon-mp + 2.0.0-SNAPSHOT + ../../../applications/mp/pom.xml + + io.helidon.messaging.mp.example + mp-example + 1.0-SNAPSHOT + mp-messaging-example + + + + io.helidon.microprofile.bundles + helidon-microprofile + + + io.helidon.microprofile.messaging + helidon-microprofile-messaging + + + io.helidon.messaging.kafka + helidon-messaging-kafka + + + io.helidon.microprofile.websocket + helidon-microprofile-websocket + + + org.jboss + jandex + runtime + true + + + jakarta.activation + jakarta.activation-api + runtime + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-libs + + + + + org.jboss.jandex + jandex-maven-plugin + + + make-index + + + + + + diff --git a/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/MsgProcessingBean.java b/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/MsgProcessingBean.java new file mode 100644 index 000000000..2189c5f26 --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/MsgProcessingBean.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.messaging.mp.example; + +import java.util.concurrent.SubmissionPublisher; + +import javax.enterprise.context.ApplicationScoped; + +import io.helidon.common.reactive.Multi; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder; +import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; +import org.reactivestreams.FlowAdapters; +import org.reactivestreams.Publisher; + +/** + * Bean for message processing. + */ +@ApplicationScoped +public class MsgProcessingBean { + + private final SubmissionPublisher emitter = new SubmissionPublisher<>(); + private final SubmissionPublisher broadCaster = new SubmissionPublisher<>(); + + /** + * Create a publisher for the emitter. + * + * @return A Publisher from the emitter + */ + @Outgoing("multiplyVariants") + public Publisher preparePublisher() { + // Create new publisher for emitting to by this::process + return ReactiveStreams + .fromPublisher(FlowAdapters.toPublisher(Multi.create(emitter))) + .buildRs(); + } + + /** + * Returns a builder for a processor that maps a string into three variants. + * + * @return ProcessorBuilder + */ + @Incoming("multiplyVariants") + @Outgoing("toKafka") + public ProcessorBuilder> multiply() { + // Multiply to 3 variants of same message + return ReactiveStreams.builder() + .flatMap(o -> + ReactiveStreams.of( + // upper case variant + o.toUpperCase(), + // repeat twice variant + o.repeat(2), + // reverse chars 'tnairav' + new StringBuilder(o).reverse().toString()) + ).map(Message::of); + } + + /** + * Broadcasts an event. + * + * @param msg Message to broadcast + */ + @Incoming("fromKafka") + public void broadcast(String msg) { + // Broadcast to all subscribers + broadCaster.submit(msg); + } + + /** + * Subscribe new Multi to broadcasting publisher. + * + * @return new Multi subscribed to broadcaster + */ + public Multi subscribeMulti() { + return Multi.create(broadCaster); + } + + /** + * Emit a message. + * + * @param msg message to emit + */ + public void process(final String msg) { + emitter.submit(msg); + } +} diff --git a/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/SendingResource.java b/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/SendingResource.java new file mode 100644 index 000000000..93261b56c --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/SendingResource.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.messaging.mp.example; + +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +/** + * Expose send method for publishing to messaging. + */ +@Path("rest/messages") +@RequestScoped +public class SendingResource { + private final MsgProcessingBean msgBean; + + /** + * Constructor injection of field values. + * + * @param msgBean Messaging example bean + */ + @Inject + public SendingResource(MsgProcessingBean msgBean) { + this.msgBean = msgBean; + } + + + /** + * Send message through Messaging to Kafka. + * + * @param msg message to process + */ + @Path("/send/{msg}") + @GET + @Produces(MediaType.APPLICATION_JSON) + public void getSend(@PathParam("msg") String msg) { + msgBean.process(msg); + } +} diff --git a/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/WebSocketEndpoint.java b/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/WebSocketEndpoint.java new file mode 100644 index 000000000..d90b0528a --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/WebSocketEndpoint.java @@ -0,0 +1,96 @@ + +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.messaging.mp.example; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.inject.Inject; +import javax.websocket.CloseReason; +import javax.websocket.EndpointConfig; +import javax.websocket.OnClose; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.ServerEndpoint; + +import io.helidon.common.reactive.Single; + +/** + * Register all WebSocket connection as subscribers + * of broadcasting {@link java.util.concurrent.SubmissionPublisher} + * in the {@link io.helidon.messaging.mp.example.MsgProcessingBean}. + *

+ * When connection is closed, cancel subscription and remove reference. + */ +@ServerEndpoint("/ws/messages") +public class WebSocketEndpoint { + + private static final Logger LOGGER = Logger.getLogger(WebSocketEndpoint.class.getName()); + + private final Map> subscriberRegister = new HashMap<>(); + + @Inject + private MsgProcessingBean msgProcessingBean; + + /** + * On WebSocket session is opened. + * + * @param session web socket session + * @param endpointConfig endpoint config + */ + @OnOpen + public void onOpen(Session session, EndpointConfig endpointConfig) { + System.out.println("New WebSocket client connected with session " + session.getId()); + + Single single = msgProcessingBean.subscribeMulti() + // Watch for errors coming from upstream + .onError(throwable -> LOGGER.log(Level.SEVERE, "Upstream error!", throwable)) + // Send every item coming from upstream over web socket + .forEach(s -> sendTextMessage(session, s)); + + //Save forEach single promise for later cancellation + subscriberRegister.put(session.getId(), single); + } + + /** + * When WebSocket session is closed. + * + * @param session web socket session + * @param closeReason web socket close reason + */ + @OnClose + public void onClose(final Session session, final CloseReason closeReason) { + LOGGER.info("Closing session " + session.getId()); + // Properly unsubscribe from SubmissionPublisher + Optional.ofNullable(subscriberRegister.remove(session.getId())) + .ifPresent(Single::cancel); + } + + private void sendTextMessage(Session session, String msg) { + try { + session.getBasicRemote().sendText(msg); + } catch (IOException e) { + LOGGER.log(Level.SEVERE, "Message sending over WebSocket failed", e); + } + } +} diff --git a/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/package-info.java b/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/package-info.java new file mode 100644 index 000000000..d3ebb5b0b --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/src/main/java/io/helidon/messaging/mp/example/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Helidon MP Reactive Messaging with Kafka Example. + */ +package io.helidon.messaging.mp.example; diff --git a/examples/messaging/kafka-websocket-mp/src/main/resources/META-INF/beans.xml b/examples/messaging/kafka-websocket-mp/src/main/resources/META-INF/beans.xml new file mode 100644 index 000000000..f58f5d006 --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/src/main/resources/META-INF/beans.xml @@ -0,0 +1,26 @@ + + + + + + diff --git a/examples/messaging/kafka-websocket-mp/src/main/resources/META-INF/microprofile-config.properties b/examples/messaging/kafka-websocket-mp/src/main/resources/META-INF/microprofile-config.properties new file mode 100644 index 000000000..1c88f678e --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/src/main/resources/META-INF/microprofile-config.properties @@ -0,0 +1,36 @@ +# +# Copyright (c) 2020 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +server.port=7001 +server.host=0.0.0.0 +server.static.classpath.location=/WEB +server.static.classpath.welcome=index.html + +# Configure channel fromKafka to ask Kafka connector for publisher +mp.messaging.incoming.fromKafka.connector=helidon-kafka +mp.messaging.incoming.fromKafka.enable.auto.commit=true +mp.messaging.incoming.fromKafka.group.id=websocket-mp-example-1 + +# Configure channel toKafka to ask Kafka connector for subscriber +mp.messaging.outgoing.toKafka.connector=helidon-kafka + +# Connector config properties are common to all channels +mp.messaging.connector.helidon-kafka.bootstrap.servers=localhost:9092 +mp.messaging.connector.helidon-kafka.topic=messaging-test-topic-1 +mp.messaging.connector.helidon-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +mp.messaging.connector.helidon-kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer +mp.messaging.connector.helidon-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.connector.helidon-kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer diff --git a/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/favicon.ico b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/favicon.ico new file mode 100644 index 000000000..d91659fdb Binary files /dev/null and b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/favicon.ico differ diff --git a/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/arrow-1.png b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/arrow-1.png new file mode 100644 index 000000000..bbba0aef8 Binary files /dev/null and b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/arrow-1.png differ diff --git a/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/arrow-2.png b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/arrow-2.png new file mode 100644 index 000000000..0b1096b07 Binary files /dev/null and b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/arrow-2.png differ diff --git a/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/cloud.png b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/cloud.png new file mode 100644 index 000000000..3e04833c0 Binary files /dev/null and b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/cloud.png differ diff --git a/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/frank.png b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/frank.png new file mode 100644 index 000000000..51a13d8db Binary files /dev/null and b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/img/frank.png differ diff --git a/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/index.html b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/index.html new file mode 100644 index 000000000..52c7d58bc --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/index.html @@ -0,0 +1,127 @@ + + + + + + + + Helidon Reactive Messaging + + + + + + + + +

+
+
+ +
+
Send
+
+
+
+
+
REST call /rest/messages/send/{msg}
+
+
+
Messages received from Kafka over websocket
+
+
+
+
+
+            
+        
+
+
+ + + + + \ No newline at end of file diff --git a/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/main.css b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/main.css new file mode 100644 index 000000000..61f881132 --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/src/main/resources/WEB/main.css @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#root { + background-color: #36ABF2; + font-family: Roboto,sans-serif; + color: #fff; + position: absolute; + overflow-x: hidden; + -ms-overflow-style: none; /* Internet Explorer 10+ */ + scrollbar-width: none; /* Firefox */ + top: 0; + left: 0; + width: 100%; + height: 100%; +} +#root::-webkit-scrollbar { + display: none; /* Safari and Chrome */ +} + +#helidon { + width: 509px; + height: 273px; + position: relative; + left: -509px; + z-index: 4; + background: url('img/frank.png'); +} + +#rest-tip { + position: relative; + top: -80px; + left: 160px; +} + +#rest-tip-arrow { + width: 205px; + height: 304px; + z-index: 4; + top: -20px; + background: url('img/arrow-1.png'); +} +#rest-tip-label { + position: absolute; + white-space: nowrap; + font-size: 18px; + font-weight: bold; + z-index: 4; + left: -60px; +} + +#sse-tip { + position: absolute; + overflow: hidden; + display: flex; + width: auto; + height: auto; + top: 5%; + right: 10%; + z-index: 0; +} + +#sse-tip-arrow { + position: relative; + top: -30px; + width: 296px; + height: 262px; + z-index: 4; + background: url('img/arrow-2.png'); +} +#sse-tip-label { + position: relative; + white-space: nowrap; + font-size: 18px; + font-weight: bold; + z-index: 4; +} + +#producer { + float: left; + position: relative; + width: 300px; + height: 100%; + margin: 50px; + padding: 10px; + z-index: 99; +} + +#msgBox { + position: absolute; + width: 300px; + top: 25%; + right: 3%; + height: 100%; + margin: 50px; + padding: 10px; + z-index: 20; +} + +#input { + width: 210px; + height: 22px; + top: 58px; + left: 30px; + background-color: white; + border-radius: 10px; + border-style: solid; + border-color: white; + position: absolute; + z-index: 10; +} + +#inputCloud { + position: relative; + width: 310px; + height: 150px; + background: url('img/cloud.png'); +} + +#msg { + background-color: #D2EBFC; + color: #1A9BF4; + border-radius: 10px; + width: 300px; + height: 50px; + margin: 5px; + display: flex; + padding-left: 10px; + justify-content: center; + align-items: center; + z-index: 99; +} + +#submit { + font-weight: bold; + background-color: aqua; + color: #1A9BF4; + border-radius: 12px; + width: 100px; + height: 30px; + display: flex; + justify-content: center; + align-items: center; + margin: 5px; + cursor: pointer; +} + +#snippet { + position: absolute; + top: 15%; + left: 30%; + width: 40%; + z-index: 5; +} + +.hljs { + border-radius: 10px; + font-size: 12px; +} \ No newline at end of file diff --git a/examples/messaging/kafka-websocket-mp/src/main/resources/logging.properties b/examples/messaging/kafka-websocket-mp/src/main/resources/logging.properties new file mode 100644 index 000000000..78d82bde1 --- /dev/null +++ b/examples/messaging/kafka-websocket-mp/src/main/resources/logging.properties @@ -0,0 +1,32 @@ +# +# Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Example Logging Configuration File +# For more information see $JAVA_HOME/jre/lib/logging.properties + +# Send messages to the console +handlers=io.helidon.common.HelidonConsoleHandler + +# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread +java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n + +# Global logging level. Can be overridden by specific loggers +.level=INFO + +# Kafka client has exhaustive logs +org.apache.kafka.clients.level=WARNING +org.apache.kafka.clients.consumer.ConsumerConfig.level=SEVERE +org.apache.kafka.clients.producer.ProducerConfig.level=SEVERE diff --git a/examples/messaging/kafka-websocket-se/README.md b/examples/messaging/kafka-websocket-se/README.md new file mode 100644 index 000000000..8efc1d398 --- /dev/null +++ b/examples/messaging/kafka-websocket-se/README.md @@ -0,0 +1,12 @@ +# Helidon Messaging with Kafka Examples + +## Prerequisites +* Java 11+ +* Docker +* [Kafka bootstrap server](../README.md) running on `localhost:9092` + +## Build & Run +1. `mvn clean install` +2. `java -jar se-example.jar` +3. Visit http://localhost:7001 + diff --git a/examples/messaging/kafka-websocket-se/pom.xml b/examples/messaging/kafka-websocket-se/pom.xml new file mode 100644 index 000000000..b1f34e835 --- /dev/null +++ b/examples/messaging/kafka-websocket-se/pom.xml @@ -0,0 +1,78 @@ + + + + + 4.0.0 + + io.helidon.applications + helidon-se + 2.0.0-SNAPSHOT + ../../../applications/se/pom.xml + + io.helidon.messaging.se.example + se-example + 1.0-SNAPSHOT + se-messaging-example + + + io.helidon.messaging.se.example.Main + + + + + io.helidon.webserver + helidon-webserver + + + io.helidon.messaging + helidon-messaging + + + io.helidon.messaging.kafka + helidon-messaging-kafka + + + jakarta.websocket + jakarta.websocket-api + + + io.helidon.webserver + helidon-webserver-tyrus + + + io.helidon.config + helidon-config-yaml + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-libs + + + + + + diff --git a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/Main.java b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/Main.java new file mode 100644 index 000000000..100c65e0f --- /dev/null +++ b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/Main.java @@ -0,0 +1,125 @@ + +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.messaging.se.example; + +import java.io.IOException; +import java.io.InputStream; +import java.util.logging.LogManager; + +import javax.websocket.server.ServerEndpointConfig; + +import io.helidon.config.Config; +import io.helidon.webserver.Routing; +import io.helidon.webserver.StaticContentSupport; +import io.helidon.webserver.WebServer; +import io.helidon.webserver.tyrus.TyrusSupport; + +/** + * The application main class. + */ +public final class Main { + + /** + * Cannot be instantiated. + */ + private Main() { + } + + /** + * Application main entry point. + * + * @param args command line arguments. + * @throws IOException if there are problems reading logging properties + */ + public static void main(final String[] args) throws IOException { + startServer(); + } + + /** + * Start the server. + * + * @return the created {@link WebServer} instance + * @throws IOException if there are problems reading logging properties + */ + static WebServer startServer() throws IOException { + // load logging configuration + setupLogging(); + + // By default this will pick up application.yaml from the classpath + Config config = Config.create(); + + SendingService sendingService = new SendingService(config); + + WebServer server = WebServer.builder(createRouting(sendingService)) + .config(config.get("server")) + .build(); + + server.start() + .thenAccept(ws -> { + System.out.println( + "WEB server is up! http://localhost:" + ws.port()); + ws.whenShutdown().thenRun(() + -> { + // Stop messaging properly + sendingService.shutdown(); + System.out.println("WEB server is DOWN. Good bye!"); + }); + }) + .exceptionally(t -> { + System.err.println("Startup failed: " + t.getMessage()); + t.printStackTrace(System.err); + return null; + }); + + // Server threads are not daemon. No need to block. Just react. + return server; + } + + /** + * Creates new {@link Routing}. + * + * @param config configuration of this server + * @return routing configured with JSON support, a health check, and a service + */ + private static Routing createRouting(SendingService sendingService) { + + return Routing.builder() + // register static content support (on "/") + .register(StaticContentSupport.builder("/WEB").welcomeFileName("index.html")) + // register rest endpoint for sending to Kafka + .register("/rest/messages", sendingService) + // register WebSocket endpoint to push messages coming from Kafka to client + .register("/ws", + TyrusSupport.builder().register( + ServerEndpointConfig.Builder.create( + WebSocketEndpoint.class, "/messages") + .build()) + .build()) + .build(); + } + + /** + * Configure logging from logging.properties file. + */ + private static void setupLogging() throws IOException { + try (InputStream is = Main.class.getResourceAsStream("/logging.properties")) { + LogManager.getLogManager().readConfiguration(is); + } + } +} diff --git a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/SendingService.java b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/SendingService.java new file mode 100644 index 000000000..816075ecd --- /dev/null +++ b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/SendingService.java @@ -0,0 +1,96 @@ + +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.messaging.se.example; + +import io.helidon.config.Config; +import io.helidon.messaging.Channel; +import io.helidon.messaging.Emitter; +import io.helidon.messaging.Messaging; +import io.helidon.messaging.connectors.kafka.KafkaConnector; +import io.helidon.webserver.Routing; +import io.helidon.webserver.Service; + +import org.apache.kafka.common.serialization.StringSerializer; + +public class SendingService implements Service { + + private final Emitter emitter; + private final Messaging messaging; + + SendingService(Config config) { + + String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get(); + String topic = config.get("app.kafka.topic").asString().get(); + + // Prepare channel for connecting processor -> kafka connector with specific subscriber configuration, + // channel -> connector mapping is automatic when using KafkaConnector.configBuilder() + Channel toKafka = Channel.builder() + .subscriberConfig(KafkaConnector.configBuilder() + .bootstrapServers(kafkaServer) + .topic(topic) + .keySerializer(StringSerializer.class) + .valueSerializer(StringSerializer.class) + .build() + ).build(); + + // Prepare channel for connecting emitter -> processor + Channel toProcessor = Channel.create(); + + // Prepare Kafka connector, can be used by any channel + KafkaConnector kafkaConnector = KafkaConnector.create(); + + // Prepare emitter for manual publishing to channel + emitter = Emitter.create(toProcessor); + + messaging = Messaging.builder() + .emitter(emitter) + // Processor connect two channels together + .processor(toProcessor, toKafka, payload -> { + // Transforming to upper-case before sending to kafka + return payload.toUpperCase(); + }) + .connector(kafkaConnector) + .build() + .start(); + } + + /** + * A service registers itself by updating the routing rules. + * + * @param rules the routing rules. + */ + @Override + public void update(Routing.Rules rules) { + // Listen for GET /example/send/{msg} + // to send it thru messaging to Kafka + rules.get("/send/{msg}", (req, res) -> { + String msg = req.path().param("msg"); + System.out.println("Emitting: " + msg); + emitter.send(msg); + res.send(); + }); + } + + /** + * Gracefully terminate messaging. + */ + public void shutdown() { + messaging.stop(); + } +} diff --git a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/WebSocketEndpoint.java b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/WebSocketEndpoint.java new file mode 100644 index 000000000..be1493f99 --- /dev/null +++ b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/WebSocketEndpoint.java @@ -0,0 +1,106 @@ + +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.messaging.se.example; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.websocket.CloseReason; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.Session; + +import io.helidon.config.Config; +import io.helidon.messaging.Channel; +import io.helidon.messaging.Messaging; +import io.helidon.messaging.connectors.kafka.KafkaConfigBuilder; +import io.helidon.messaging.connectors.kafka.KafkaConnector; + +import org.apache.kafka.common.serialization.StringDeserializer; + +public class WebSocketEndpoint extends Endpoint { + + private static final Logger LOGGER = Logger.getLogger(WebSocketEndpoint.class.getName()); + + private final Map messagingRegister = new HashMap<>(); + private final Config config = Config.create(); + + @Override + public void onOpen(Session session, EndpointConfig endpointConfig) { + + System.out.println("Session " + session.getId()); + + String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get(); + String topic = config.get("app.kafka.topic").asString().get(); + + // Prepare channel for connecting kafka connector with specific publisher configuration -> listener, + // channel -> connector mapping is automatic when using KafkaConnector.configBuilder() + Channel fromKafka = Channel.builder() + .name("from-kafka") + .publisherConfig(KafkaConnector.configBuilder() + .bootstrapServers(kafkaServer) + .groupId("example-group-" + session.getId()) + .topic(topic) + .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST) + .enableAutoCommit(true) + .keyDeserializer(StringDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .build() + ) + .build(); + + // Prepare Kafka connector, can be used by any channel + KafkaConnector kafkaConnector = KafkaConnector.create(); + + Messaging messaging = Messaging.builder() + .connector(kafkaConnector) + .listener(fromKafka, payload -> { + System.out.println("Kafka says: " + payload); + // Send message received from Kafka over websocket + sendTextMessage(session, payload); + }) + .build() + .start(); + + //Save the messaging instance for proper shutdown + // when websocket connection is terminated + messagingRegister.put(session.getId(), messaging); + } + + @Override + public void onClose(final Session session, final CloseReason closeReason) { + super.onClose(session, closeReason); + LOGGER.info("Closing session " + session.getId()); + // Properly stop messaging when websocket connection is terminated + Optional.ofNullable(messagingRegister.remove(session.getId())) + .ifPresent(Messaging::stop); + } + + private void sendTextMessage(Session session, String msg) { + try { + session.getBasicRemote().sendText(msg); + } catch (IOException e) { + LOGGER.log(Level.SEVERE, "Message sending failed", e); + } + } +} diff --git a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/package-info.java b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/package-info.java new file mode 100644 index 000000000..1fcb9f49f --- /dev/null +++ b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/messaging/se/example/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Helidon SE Reactive Messaging with Kafka Example. + */ +package io.helidon.messaging.se.example; diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/WEB/favicon.ico b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/favicon.ico new file mode 100644 index 000000000..d91659fdb Binary files /dev/null and b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/favicon.ico differ diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/arrow-1.png b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/arrow-1.png new file mode 100644 index 000000000..bbba0aef8 Binary files /dev/null and b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/arrow-1.png differ diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/arrow-2.png b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/arrow-2.png new file mode 100644 index 000000000..0b1096b07 Binary files /dev/null and b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/arrow-2.png differ diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/cloud.png b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/cloud.png new file mode 100644 index 000000000..3e04833c0 Binary files /dev/null and b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/cloud.png differ diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/frank.png b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/frank.png new file mode 100644 index 000000000..51a13d8db Binary files /dev/null and b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/img/frank.png differ diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/WEB/index.html b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/index.html new file mode 100644 index 000000000..52c7d58bc --- /dev/null +++ b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/index.html @@ -0,0 +1,127 @@ + + + + + + + + Helidon Reactive Messaging + + + + + + + + +
+
+
+ +
+
Send
+
+
+
+
+
REST call /rest/messages/send/{msg}
+
+
+
Messages received from Kafka over websocket
+
+
+
+
+
+            
+        
+
+
+ + + + + \ No newline at end of file diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/WEB/main.css b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/main.css new file mode 100644 index 000000000..61f881132 --- /dev/null +++ b/examples/messaging/kafka-websocket-se/src/main/resources/WEB/main.css @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#root { + background-color: #36ABF2; + font-family: Roboto,sans-serif; + color: #fff; + position: absolute; + overflow-x: hidden; + -ms-overflow-style: none; /* Internet Explorer 10+ */ + scrollbar-width: none; /* Firefox */ + top: 0; + left: 0; + width: 100%; + height: 100%; +} +#root::-webkit-scrollbar { + display: none; /* Safari and Chrome */ +} + +#helidon { + width: 509px; + height: 273px; + position: relative; + left: -509px; + z-index: 4; + background: url('img/frank.png'); +} + +#rest-tip { + position: relative; + top: -80px; + left: 160px; +} + +#rest-tip-arrow { + width: 205px; + height: 304px; + z-index: 4; + top: -20px; + background: url('img/arrow-1.png'); +} +#rest-tip-label { + position: absolute; + white-space: nowrap; + font-size: 18px; + font-weight: bold; + z-index: 4; + left: -60px; +} + +#sse-tip { + position: absolute; + overflow: hidden; + display: flex; + width: auto; + height: auto; + top: 5%; + right: 10%; + z-index: 0; +} + +#sse-tip-arrow { + position: relative; + top: -30px; + width: 296px; + height: 262px; + z-index: 4; + background: url('img/arrow-2.png'); +} +#sse-tip-label { + position: relative; + white-space: nowrap; + font-size: 18px; + font-weight: bold; + z-index: 4; +} + +#producer { + float: left; + position: relative; + width: 300px; + height: 100%; + margin: 50px; + padding: 10px; + z-index: 99; +} + +#msgBox { + position: absolute; + width: 300px; + top: 25%; + right: 3%; + height: 100%; + margin: 50px; + padding: 10px; + z-index: 20; +} + +#input { + width: 210px; + height: 22px; + top: 58px; + left: 30px; + background-color: white; + border-radius: 10px; + border-style: solid; + border-color: white; + position: absolute; + z-index: 10; +} + +#inputCloud { + position: relative; + width: 310px; + height: 150px; + background: url('img/cloud.png'); +} + +#msg { + background-color: #D2EBFC; + color: #1A9BF4; + border-radius: 10px; + width: 300px; + height: 50px; + margin: 5px; + display: flex; + padding-left: 10px; + justify-content: center; + align-items: center; + z-index: 99; +} + +#submit { + font-weight: bold; + background-color: aqua; + color: #1A9BF4; + border-radius: 12px; + width: 100px; + height: 30px; + display: flex; + justify-content: center; + align-items: center; + margin: 5px; + cursor: pointer; +} + +#snippet { + position: absolute; + top: 15%; + left: 30%; + width: 40%; + z-index: 5; +} + +.hljs { + border-radius: 10px; + font-size: 12px; +} \ No newline at end of file diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml b/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml new file mode 100644 index 000000000..e5e6f96c0 --- /dev/null +++ b/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml @@ -0,0 +1,28 @@ +# +# Copyright (c) 2020 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +app: + kafka: + bootstrap.servers: localhost:9092 + topic: messaging-test-topic-1 + +server: + port: 7001 + host: 0.0.0.0 + static: + classpath: + location: /WEB + welcome: index.html diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/logging.properties b/examples/messaging/kafka-websocket-se/src/main/resources/logging.properties new file mode 100644 index 000000000..fc2a665a6 --- /dev/null +++ b/examples/messaging/kafka-websocket-se/src/main/resources/logging.properties @@ -0,0 +1,34 @@ +# +# Copyright (c) 2020 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Example Logging Configuration File +# For more information see $JAVA_HOME/jre/lib/logging.properties + +# Send messages to the console +handlers=io.helidon.common.HelidonConsoleHandler + +# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread +java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n + +# Global logging level. Can be overridden by specific loggers +.level=INFO + +# Component specific log levels +#io.helidon.webserver.level=INFO +#io.helidon.config.level=INFO +#io.helidon.security.level=INFO +#io.helidon.common.level=INFO +#io.netty.level=INFO diff --git a/examples/messaging/kafkaConsume.sh b/examples/messaging/kafkaConsume.sh new file mode 100755 index 000000000..da1c55c6a --- /dev/null +++ b/examples/messaging/kafkaConsume.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# +# Copyright (c) 2020 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +TOPIC="messaging-test-topic-1" +SERVER="localhost:9092" +CONTAINER="helidon_kafka" + +if [ -z "$1" ]; then + echo "No argument supplied, defaulting to topic ${TOPIC}" +else + TOPIC="$1" +fi + +docker exec -it ${CONTAINER} sh -c \ +"/opt/kafka/bin/kafka-console-consumer.sh --topic ${TOPIC} --bootstrap-server ${SERVER}" diff --git a/examples/messaging/kafkaProduce.sh b/examples/messaging/kafkaProduce.sh new file mode 100755 index 000000000..7e7a02a8b --- /dev/null +++ b/examples/messaging/kafkaProduce.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# +# Copyright (c) 2020 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +TOPIC="messaging-test-topic-1" +SERVER="localhost:9092" +CONTAINER="helidon_kafka" + +if [ -z "$1" ]; then + echo "No argument supplied, defaulting to topic ${TOPIC}" +else + TOPIC="$1" +fi + +docker exec -it ${CONTAINER} sh -c \ +"/opt/kafka/bin/kafka-console-producer.sh --topic ${TOPIC} --bootstrap-server ${SERVER}" diff --git a/examples/messaging/kafkaRun.sh b/examples/messaging/kafkaRun.sh new file mode 100755 index 000000000..c83a610f7 --- /dev/null +++ b/examples/messaging/kafkaRun.sh @@ -0,0 +1,32 @@ +#!/bin/bash +# +# Copyright (c) 2020 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +if [[ "$(docker images -q helidon-test-kafka 2>/dev/null)" == "" ]]; then + # helidon:test-kafka not found, build it + docker build ./docker -t helidon-test-kafka -f ./docker/Dockerfile.kafka +fi + +if [ ! "$(docker ps -q -f name=helidon_kafka)" ]; then + if [ "$(docker ps -aq -f status=exited -f name=helidon_kafka)" ]; then + # Clean up exited container + docker rm helidon_kafka + fi + # Run test Kafka in new container, stop it by pressing Ctrl+C + docker run -it --name helidon_kafka --network="host" helidon-test-kafka +fi +# Clean up exited container +docker rm helidon_kafka diff --git a/examples/messaging/pom.xml b/examples/messaging/pom.xml new file mode 100644 index 000000000..fed93e3cb --- /dev/null +++ b/examples/messaging/pom.xml @@ -0,0 +1,41 @@ + + + + + 4.0.0 + + io.helidon.examples + helidon-examples-project + 2.0.0-SNAPSHOT + + io.helidon.examples.messaging + helidon-examples-messaging-project + Helidon Examples Reactive Messaging + pom + + + Examples of Reactive Messaging usage + + + + kafka-websocket-mp + kafka-websocket-se + + diff --git a/examples/pom.xml b/examples/pom.xml index e1e122642..090e613fa 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -54,6 +54,7 @@ dbclient webclient cors + messaging diff --git a/messaging/kafka/src/main/resources/META-INF/beans.xml b/messaging/kafka/src/main/resources/META-INF/beans.xml new file mode 100644 index 000000000..ff5dd1295 --- /dev/null +++ b/messaging/kafka/src/main/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + diff --git a/tests/integration/kafka/pom.xml b/tests/integration/kafka/pom.xml index 0ec7aa364..818618efb 100644 --- a/tests/integration/kafka/pom.xml +++ b/tests/integration/kafka/pom.xml @@ -91,14 +91,18 @@
org.apache.kafka - kafka_2.11 + kafka_2.12 + test + + + org.apache.zookeeper + zookeeper + test + + + org.scala-lang + scala-library test - - - org.apache.zookeeper - zookeeper - - com.salesforce.kafka.test diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java index 82f14a7d4..a909e0fae 100644 --- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java +++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java @@ -235,7 +235,6 @@ class KafkaMpTest extends AbstractKafkaTest{ private static void cdiContainerUp() { Set> classes = new HashSet<>(); - classes.add(KafkaConnector.class); classes.add(AbstractSampleBean.Channel1.class); classes.add(AbstractSampleBean.Channel4.class); classes.add(AbstractSampleBean.Channel5.class);