Messaging with Kafka examples (#2016)

* Bump up Kafka to 2.5.0 with scala 2.12

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
This commit is contained in:
Daniel Kec
2020-06-17 20:48:07 +02:00
committed by GitHub
parent 6b7eb53335
commit c40814526d
45 changed files with 1931 additions and 10 deletions

3
.gitignore vendored
View File

@@ -64,6 +64,9 @@ build/
node_modules/
node/
# Helidon CLI
.helidon
# Other
*~
user.txt

36
dependencies/pom.xml vendored
View File

@@ -118,9 +118,11 @@
<version.lib.yasson>1.0.6</version.lib.yasson>
<version.lib.zipkin>2.12.5</version.lib.zipkin>
<version.lib.zipkin.sender-urlconnection>2.12.0</version.lib.zipkin.sender-urlconnection>
<version.lib.kafka>2.3.1</version.lib.kafka>
<version.lib.kafka>2.5.0</version.lib.kafka>
<version.lib.kafka-junit5>3.2.1</version.lib.kafka-junit5>
<version.lib.reactive-streams-tck>1.0.3</version.lib.reactive-streams-tck>
<version.lib.scala>2.12.10</version.lib.scala>
<version.lib.zookeeper>3.5.7</version.lib.zookeeper>
</properties>
<dependencyManagement>
@@ -924,8 +926,22 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<artifactId>kafka_2.12</artifactId>
<version>${version.lib.kafka}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Testing libraries -->
<dependency>
@@ -1033,6 +1049,22 @@
<groupId>com.salesforce.kafka.test</groupId>
<artifactId>kafka-junit5</artifactId>
<version>${version.lib.kafka-junit5}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${version.lib.zookeeper}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${version.lib.scala}</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>

View File

@@ -0,0 +1,28 @@
# Helidon Messaging with Kafka Examples
## Prerequisites
* Docker
* Java 11+
### Test Kafka server
To make examples easily runnable,
small, pocket size and pre-configured testing Kafka server Docker image is available.
* To run it locally: `./kafkaRun.sh`
* Pre-configured topics:
* `messaging-test-topic-1`
* `messaging-test-topic-2`
* Stop it with `Ctrl+c`
* Send messages manually with: `./kafkaProduce.sh [topic-name]`
* Consume messages manually with: `./kafkaConsume.sh [topic-name]`
## Helidon SE Reactive Messaging with Kafka Example
For demonstration of Helidon SE Messaging with Kafka connector,
continue to [Kafka with WebSocket SE Example](kafka-websocket-se/README.md)
## Helidon MP Reactive Messaging with Kafka Example
For demonstration of Helidon MP Messaging with Kafka connector,
continue to [Kafka with WebSocket MP Example](kafka-websocket-mp/README.md)

View File

@@ -0,0 +1,39 @@
#
# Copyright (c) 2019, 2020 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM openjdk:8-jre-alpine
ENV VERSION=2.5.0
RUN apk add --no-cache bash curl jq
# Find closest mirror, download and extract Kafka
RUN MIRROR=$(curl -s 'https://www.apache.org/dyn/closer.cgi?as_json=1' | jq -r '.http[0]') \
&& wget -q -O kafka.tar.gz ${MIRROR}kafka/${VERSION}/kafka_2.12-${VERSION}.tgz \
&& tar -xzf kafka.tar.gz -C /opt && rm kafka.tar.gz \
&& mv /opt/kafka* /opt/kafka
WORKDIR /opt/kafka
COPY start_kafka.sh start_kafka.sh
COPY init_topics.sh init_topics.sh
RUN chmod a+x ./*.sh
# Expose Zookeeper and Kafka ports
EXPOSE 2181 9092
CMD bash start_kafka.sh

View File

@@ -0,0 +1,46 @@
#!/bin/bash
#
# Copyright (c) 2020 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Wait for Kafka to start and create test topics:
# topic messaging-test-topic-1 and topic messaging-test-topic-2
#
ZOOKEEPER_URL=localhost:2181
KAFKA_TOPICS="/opt/kafka/bin/kafka-topics.sh --if-not-exists --zookeeper $ZOOKEEPER_URL"
while sleep 2; do
brokers=$(echo dump | nc localhost 2181 | grep brokers)
echo "Checking if Kafka is up: ${brokers}"
if [[ -z $brokers ]]; then
echo "KAFKA IS UP !!!"
echo "Creating test topics"
bash "$KAFKA_TOPICS" \
--create \
--replication-factor 1 \
--partitions 10 \
--topic messaging-test-topic-1
bash "$KAFKA_TOPICS" \
--create \
--replication-factor 1 \
--partitions 10 \
--topic messaging-test-topic-2
exit 0
fi
done

View File

@@ -0,0 +1,46 @@
#!/bin/bash
#
# Copyright (c) 2020 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Start Zookeeper, wait for it to come up and start Kafka.
#
# Allow ruok
echo "4lw.commands.whitelist=*" >>/opt/kafka/config/zookeeper.properties
# Start Zookeeper
/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties &
while sleep 2; do
isOk=$(echo ruok | nc localhost 2181)
echo "Checking if Zookeeper is up: ${isOk}"
if [ "${isOk}" = "imok" ]; then
echo "ZOOKEEPER IS UP !!!"
break
fi
done
# Create test topics when Kafka is ready
/opt/kafka/init_topics.sh &
# Start Kafka
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
state=$?
if [ $state -ne 0 ]; then
echo "Kafka stopped."
exit $state
fi

View File

@@ -0,0 +1,11 @@
# Helidon MP Reactive Messaging with Kafka Example
## Prerequisites
* Docker
* Java 11+
* [Kafka bootstrap server](../README.md) running on `localhost:9092`
## Build & Run
1. `mvn clean install`
2. `java -jar mp-example.jar`
3. Visit http://localhost:7001

View File

@@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2020 Oracle and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.helidon.applications</groupId>
<artifactId>helidon-mp</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../../applications/mp/pom.xml</relativePath>
</parent>
<groupId>io.helidon.messaging.mp.example</groupId>
<artifactId>mp-example</artifactId>
<version>1.0-SNAPSHOT</version>
<name>mp-messaging-example</name>
<dependencies>
<dependency>
<groupId>io.helidon.microprofile.bundles</groupId>
<artifactId>helidon-microprofile</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.messaging</groupId>
<artifactId>helidon-microprofile-messaging</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.messaging.kafka</groupId>
<artifactId>helidon-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.websocket</groupId>
<artifactId>helidon-microprofile-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jandex</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-libs</id>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jboss.jandex</groupId>
<artifactId>jandex-maven-plugin</artifactId>
<executions>
<execution>
<id>make-index</id>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,105 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.messaging.mp.example;
import java.util.concurrent.SubmissionPublisher;
import javax.enterprise.context.ApplicationScoped;
import io.helidon.common.reactive.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
/**
* Bean for message processing.
*/
@ApplicationScoped
public class MsgProcessingBean {
private final SubmissionPublisher<String> emitter = new SubmissionPublisher<>();
private final SubmissionPublisher<String> broadCaster = new SubmissionPublisher<>();
/**
* Create a publisher for the emitter.
*
* @return A Publisher from the emitter
*/
@Outgoing("multiplyVariants")
public Publisher<String> preparePublisher() {
// Create new publisher for emitting to by this::process
return ReactiveStreams
.fromPublisher(FlowAdapters.toPublisher(Multi.create(emitter)))
.buildRs();
}
/**
* Returns a builder for a processor that maps a string into three variants.
*
* @return ProcessorBuilder
*/
@Incoming("multiplyVariants")
@Outgoing("toKafka")
public ProcessorBuilder<String, Message<String>> multiply() {
// Multiply to 3 variants of same message
return ReactiveStreams.<String>builder()
.flatMap(o ->
ReactiveStreams.of(
// upper case variant
o.toUpperCase(),
// repeat twice variant
o.repeat(2),
// reverse chars 'tnairav'
new StringBuilder(o).reverse().toString())
).map(Message::of);
}
/**
* Broadcasts an event.
*
* @param msg Message to broadcast
*/
@Incoming("fromKafka")
public void broadcast(String msg) {
// Broadcast to all subscribers
broadCaster.submit(msg);
}
/**
* Subscribe new Multi to broadcasting publisher.
*
* @return new Multi subscribed to broadcaster
*/
public Multi<String> subscribeMulti() {
return Multi.create(broadCaster);
}
/**
* Emit a message.
*
* @param msg message to emit
*/
public void process(final String msg) {
emitter.submit(msg);
}
}

View File

@@ -0,0 +1,57 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.messaging.mp.example;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
/**
* Expose send method for publishing to messaging.
*/
@Path("rest/messages")
@RequestScoped
public class SendingResource {
private final MsgProcessingBean msgBean;
/**
* Constructor injection of field values.
*
* @param msgBean Messaging example bean
*/
@Inject
public SendingResource(MsgProcessingBean msgBean) {
this.msgBean = msgBean;
}
/**
* Send message through Messaging to Kafka.
*
* @param msg message to process
*/
@Path("/send/{msg}")
@GET
@Produces(MediaType.APPLICATION_JSON)
public void getSend(@PathParam("msg") String msg) {
msgBean.process(msg);
}
}

View File

@@ -0,0 +1,96 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.messaging.mp.example;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import io.helidon.common.reactive.Single;
/**
* Register all WebSocket connection as subscribers
* of broadcasting {@link java.util.concurrent.SubmissionPublisher}
* in the {@link io.helidon.messaging.mp.example.MsgProcessingBean}.
* <p>
* When connection is closed, cancel subscription and remove reference.
*/
@ServerEndpoint("/ws/messages")
public class WebSocketEndpoint {
private static final Logger LOGGER = Logger.getLogger(WebSocketEndpoint.class.getName());
private final Map<String, Single<Void>> subscriberRegister = new HashMap<>();
@Inject
private MsgProcessingBean msgProcessingBean;
/**
* On WebSocket session is opened.
*
* @param session web socket session
* @param endpointConfig endpoint config
*/
@OnOpen
public void onOpen(Session session, EndpointConfig endpointConfig) {
System.out.println("New WebSocket client connected with session " + session.getId());
Single<Void> single = msgProcessingBean.subscribeMulti()
// Watch for errors coming from upstream
.onError(throwable -> LOGGER.log(Level.SEVERE, "Upstream error!", throwable))
// Send every item coming from upstream over web socket
.forEach(s -> sendTextMessage(session, s));
//Save forEach single promise for later cancellation
subscriberRegister.put(session.getId(), single);
}
/**
* When WebSocket session is closed.
*
* @param session web socket session
* @param closeReason web socket close reason
*/
@OnClose
public void onClose(final Session session, final CloseReason closeReason) {
LOGGER.info("Closing session " + session.getId());
// Properly unsubscribe from SubmissionPublisher
Optional.ofNullable(subscriberRegister.remove(session.getId()))
.ifPresent(Single::cancel);
}
private void sendTextMessage(Session session, String msg) {
try {
session.getBasicRemote().sendText(msg);
} catch (IOException e) {
LOGGER.log(Level.SEVERE, "Message sending over WebSocket failed", e);
}
}
}

View File

@@ -0,0 +1,20 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Helidon MP Reactive Messaging with Kafka Example.
*/
package io.helidon.messaging.mp.example;

View File

@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2020 Oracle and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/beans_2_0.xsd"
version="2.0"
bean-discovery-mode="annotated">
</beans>

View File

@@ -0,0 +1,36 @@
#
# Copyright (c) 2020 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
server.port=7001
server.host=0.0.0.0
server.static.classpath.location=/WEB
server.static.classpath.welcome=index.html
# Configure channel fromKafka to ask Kafka connector for publisher
mp.messaging.incoming.fromKafka.connector=helidon-kafka
mp.messaging.incoming.fromKafka.enable.auto.commit=true
mp.messaging.incoming.fromKafka.group.id=websocket-mp-example-1
# Configure channel toKafka to ask Kafka connector for subscriber
mp.messaging.outgoing.toKafka.connector=helidon-kafka
# Connector config properties are common to all channels
mp.messaging.connector.helidon-kafka.bootstrap.servers=localhost:9092
mp.messaging.connector.helidon-kafka.topic=messaging-test-topic-1
mp.messaging.connector.helidon-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.connector.helidon-kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.connector.helidon-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.connector.helidon-kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

View File

@@ -0,0 +1,127 @@
<!--
~ Copyright (c) 2020 Oracle and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width">
<title>Helidon Reactive Messaging</title>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<link rel="stylesheet" type="text/css" href="main.css">
<!-- Code snipped highlight -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/highlight.min.js"></script>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/tomorrow.min.css">
</head>
<body>
<div id="root">
<div id="producer">
<div id="inputCloud">
<input type="text" id="input" value="Fly Helidon!"/>
</div>
<div id="submit">Send</div>
</div>
<div id="helidon"></div>
<div id="rest-tip">
<div id="rest-tip-arrow"></div>
<div id="rest-tip-label">REST call /rest/messages/send/{msg}</div>
</div>
<div id="sse-tip">
<div id="sse-tip-label">Messages received from Kafka over websocket</div>
<div id="sse-tip-arrow"></div>
</div>
<div id="msgBox"></div>
<div id="snippet">
<pre>
<code class="java"></code>
</pre>
</div>
</div>
<script>
const snippet = $('.java');
const helidon = $('#helidon');
const messageBox = $('#msgBox');
const input = $("#input");
const submit = $("#submit");
// Include snippet, strip imports and highlight code
$.get('MsgProcessingBean.java', function (data) {
snippet.html(data
.replace(/[\s\S]*@ApplicationScoped/gm,"@ApplicationScoped")
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
);
snippet.each(function(i, e) {hljs.highlightBlock(e)});
}, 'text');
// Pre-filled example messages
const templates = ["Fly Helidon!", "We ❤️ microservices!", "Reactive wings are faster!"];
let template_idx = 1;
submit.on("click", onSubmit);
var ws = new WebSocket("ws://127.0.0.1:7001/ws/messages");
ws.onmessage = function (e) {
console.log(e.data);
onMessage(e.data)
}
function onMessage(data) {
fly();
messageBox.prepend(`<div id="msg">${data}</div>`);
cleanQueue();
}
function cleanQueue() {
const messageQueue = $("#msgBox div");
if (messageQueue.length > 6) {
messageQueue
.last()
.fadeOut(100, function () {
$(this).remove();
cleanQueue();
});
}
}
function onSubmit() {
fetch("/rest/messages/send/" + input.val());
template_idx = template_idx < templates.length - 1 ? template_idx + 1 : 0;
input.val(templates[template_idx]);
}
function fly() {
const id = setInterval(handler, 2);
let position = -500;
function handler() {
if (position > (window.innerWidth)) {
clearInterval(id);
} else {
position += 50;
helidon.css("left", `${position}px`);
}
}
}
</script>
</body>
</html>

View File

@@ -0,0 +1,173 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#root {
background-color: #36ABF2;
font-family: Roboto,sans-serif;
color: #fff;
position: absolute;
overflow-x: hidden;
-ms-overflow-style: none; /* Internet Explorer 10+ */
scrollbar-width: none; /* Firefox */
top: 0;
left: 0;
width: 100%;
height: 100%;
}
#root::-webkit-scrollbar {
display: none; /* Safari and Chrome */
}
#helidon {
width: 509px;
height: 273px;
position: relative;
left: -509px;
z-index: 4;
background: url('img/frank.png');
}
#rest-tip {
position: relative;
top: -80px;
left: 160px;
}
#rest-tip-arrow {
width: 205px;
height: 304px;
z-index: 4;
top: -20px;
background: url('img/arrow-1.png');
}
#rest-tip-label {
position: absolute;
white-space: nowrap;
font-size: 18px;
font-weight: bold;
z-index: 4;
left: -60px;
}
#sse-tip {
position: absolute;
overflow: hidden;
display: flex;
width: auto;
height: auto;
top: 5%;
right: 10%;
z-index: 0;
}
#sse-tip-arrow {
position: relative;
top: -30px;
width: 296px;
height: 262px;
z-index: 4;
background: url('img/arrow-2.png');
}
#sse-tip-label {
position: relative;
white-space: nowrap;
font-size: 18px;
font-weight: bold;
z-index: 4;
}
#producer {
float: left;
position: relative;
width: 300px;
height: 100%;
margin: 50px;
padding: 10px;
z-index: 99;
}
#msgBox {
position: absolute;
width: 300px;
top: 25%;
right: 3%;
height: 100%;
margin: 50px;
padding: 10px;
z-index: 20;
}
#input {
width: 210px;
height: 22px;
top: 58px;
left: 30px;
background-color: white;
border-radius: 10px;
border-style: solid;
border-color: white;
position: absolute;
z-index: 10;
}
#inputCloud {
position: relative;
width: 310px;
height: 150px;
background: url('img/cloud.png');
}
#msg {
background-color: #D2EBFC;
color: #1A9BF4;
border-radius: 10px;
width: 300px;
height: 50px;
margin: 5px;
display: flex;
padding-left: 10px;
justify-content: center;
align-items: center;
z-index: 99;
}
#submit {
font-weight: bold;
background-color: aqua;
color: #1A9BF4;
border-radius: 12px;
width: 100px;
height: 30px;
display: flex;
justify-content: center;
align-items: center;
margin: 5px;
cursor: pointer;
}
#snippet {
position: absolute;
top: 15%;
left: 30%;
width: 40%;
z-index: 5;
}
.hljs {
border-radius: 10px;
font-size: 12px;
}

View File

@@ -0,0 +1,32 @@
#
# Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Example Logging Configuration File
# For more information see $JAVA_HOME/jre/lib/logging.properties
# Send messages to the console
handlers=io.helidon.common.HelidonConsoleHandler
# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n
# Global logging level. Can be overridden by specific loggers
.level=INFO
# Kafka client has exhaustive logs
org.apache.kafka.clients.level=WARNING
org.apache.kafka.clients.consumer.ConsumerConfig.level=SEVERE
org.apache.kafka.clients.producer.ProducerConfig.level=SEVERE

View File

@@ -0,0 +1,12 @@
# Helidon Messaging with Kafka Examples
## Prerequisites
* Java 11+
* Docker
* [Kafka bootstrap server](../README.md) running on `localhost:9092`
## Build & Run
1. `mvn clean install`
2. `java -jar se-example.jar`
3. Visit http://localhost:7001

View File

@@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2020 Oracle and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.helidon.applications</groupId>
<artifactId>helidon-se</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../../applications/se/pom.xml</relativePath>
</parent>
<groupId>io.helidon.messaging.se.example</groupId>
<artifactId>se-example</artifactId>
<version>1.0-SNAPSHOT</version>
<name>se-messaging-example</name>
<properties>
<mainClass>io.helidon.messaging.se.example.Main</mainClass>
</properties>
<dependencies>
<dependency>
<groupId>io.helidon.webserver</groupId>
<artifactId>helidon-webserver</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.messaging</groupId>
<artifactId>helidon-messaging</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.messaging.kafka</groupId>
<artifactId>helidon-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>jakarta.websocket</groupId>
<artifactId>jakarta.websocket-api</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.webserver</groupId>
<artifactId>helidon-webserver-tyrus</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.config</groupId>
<artifactId>helidon-config-yaml</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-libs</id>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,125 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.messaging.se.example;
import java.io.IOException;
import java.io.InputStream;
import java.util.logging.LogManager;
import javax.websocket.server.ServerEndpointConfig;
import io.helidon.config.Config;
import io.helidon.webserver.Routing;
import io.helidon.webserver.StaticContentSupport;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.tyrus.TyrusSupport;
/**
* The application main class.
*/
public final class Main {
/**
* Cannot be instantiated.
*/
private Main() {
}
/**
* Application main entry point.
*
* @param args command line arguments.
* @throws IOException if there are problems reading logging properties
*/
public static void main(final String[] args) throws IOException {
startServer();
}
/**
* Start the server.
*
* @return the created {@link WebServer} instance
* @throws IOException if there are problems reading logging properties
*/
static WebServer startServer() throws IOException {
// load logging configuration
setupLogging();
// By default this will pick up application.yaml from the classpath
Config config = Config.create();
SendingService sendingService = new SendingService(config);
WebServer server = WebServer.builder(createRouting(sendingService))
.config(config.get("server"))
.build();
server.start()
.thenAccept(ws -> {
System.out.println(
"WEB server is up! http://localhost:" + ws.port());
ws.whenShutdown().thenRun(()
-> {
// Stop messaging properly
sendingService.shutdown();
System.out.println("WEB server is DOWN. Good bye!");
});
})
.exceptionally(t -> {
System.err.println("Startup failed: " + t.getMessage());
t.printStackTrace(System.err);
return null;
});
// Server threads are not daemon. No need to block. Just react.
return server;
}
/**
* Creates new {@link Routing}.
*
* @param config configuration of this server
* @return routing configured with JSON support, a health check, and a service
*/
private static Routing createRouting(SendingService sendingService) {
return Routing.builder()
// register static content support (on "/")
.register(StaticContentSupport.builder("/WEB").welcomeFileName("index.html"))
// register rest endpoint for sending to Kafka
.register("/rest/messages", sendingService)
// register WebSocket endpoint to push messages coming from Kafka to client
.register("/ws",
TyrusSupport.builder().register(
ServerEndpointConfig.Builder.create(
WebSocketEndpoint.class, "/messages")
.build())
.build())
.build();
}
/**
* Configure logging from logging.properties file.
*/
private static void setupLogging() throws IOException {
try (InputStream is = Main.class.getResourceAsStream("/logging.properties")) {
LogManager.getLogManager().readConfiguration(is);
}
}
}

View File

@@ -0,0 +1,96 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.messaging.se.example;
import io.helidon.config.Config;
import io.helidon.messaging.Channel;
import io.helidon.messaging.Emitter;
import io.helidon.messaging.Messaging;
import io.helidon.messaging.connectors.kafka.KafkaConnector;
import io.helidon.webserver.Routing;
import io.helidon.webserver.Service;
import org.apache.kafka.common.serialization.StringSerializer;
public class SendingService implements Service {
private final Emitter<String> emitter;
private final Messaging messaging;
SendingService(Config config) {
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
// Prepare channel for connecting processor -> kafka connector with specific subscriber configuration,
// channel -> connector mapping is automatic when using KafkaConnector.configBuilder()
Channel<String> toKafka = Channel.<String>builder()
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.topic(topic)
.keySerializer(StringSerializer.class)
.valueSerializer(StringSerializer.class)
.build()
).build();
// Prepare channel for connecting emitter -> processor
Channel<String> toProcessor = Channel.create();
// Prepare Kafka connector, can be used by any channel
KafkaConnector kafkaConnector = KafkaConnector.create();
// Prepare emitter for manual publishing to channel
emitter = Emitter.create(toProcessor);
messaging = Messaging.builder()
.emitter(emitter)
// Processor connect two channels together
.processor(toProcessor, toKafka, payload -> {
// Transforming to upper-case before sending to kafka
return payload.toUpperCase();
})
.connector(kafkaConnector)
.build()
.start();
}
/**
* A service registers itself by updating the routing rules.
*
* @param rules the routing rules.
*/
@Override
public void update(Routing.Rules rules) {
// Listen for GET /example/send/{msg}
// to send it thru messaging to Kafka
rules.get("/send/{msg}", (req, res) -> {
String msg = req.path().param("msg");
System.out.println("Emitting: " + msg);
emitter.send(msg);
res.send();
});
}
/**
* Gracefully terminate messaging.
*/
public void shutdown() {
messaging.stop();
}
}

View File

@@ -0,0 +1,106 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.messaging.se.example;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import io.helidon.config.Config;
import io.helidon.messaging.Channel;
import io.helidon.messaging.Messaging;
import io.helidon.messaging.connectors.kafka.KafkaConfigBuilder;
import io.helidon.messaging.connectors.kafka.KafkaConnector;
import org.apache.kafka.common.serialization.StringDeserializer;
public class WebSocketEndpoint extends Endpoint {
private static final Logger LOGGER = Logger.getLogger(WebSocketEndpoint.class.getName());
private final Map<String, Messaging> messagingRegister = new HashMap<>();
private final Config config = Config.create();
@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
System.out.println("Session " + session.getId());
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
// Prepare channel for connecting kafka connector with specific publisher configuration -> listener,
// channel -> connector mapping is automatic when using KafkaConnector.configBuilder()
Channel<String> fromKafka = Channel.<String>builder()
.name("from-kafka")
.publisherConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.groupId("example-group-" + session.getId())
.topic(topic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build()
)
.build();
// Prepare Kafka connector, can be used by any channel
KafkaConnector kafkaConnector = KafkaConnector.create();
Messaging messaging = Messaging.builder()
.connector(kafkaConnector)
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
// Send message received from Kafka over websocket
sendTextMessage(session, payload);
})
.build()
.start();
//Save the messaging instance for proper shutdown
// when websocket connection is terminated
messagingRegister.put(session.getId(), messaging);
}
@Override
public void onClose(final Session session, final CloseReason closeReason) {
super.onClose(session, closeReason);
LOGGER.info("Closing session " + session.getId());
// Properly stop messaging when websocket connection is terminated
Optional.ofNullable(messagingRegister.remove(session.getId()))
.ifPresent(Messaging::stop);
}
private void sendTextMessage(Session session, String msg) {
try {
session.getBasicRemote().sendText(msg);
} catch (IOException e) {
LOGGER.log(Level.SEVERE, "Message sending failed", e);
}
}
}

View File

@@ -0,0 +1,20 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Helidon SE Reactive Messaging with Kafka Example.
*/
package io.helidon.messaging.se.example;

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

View File

@@ -0,0 +1,127 @@
<!--
~ Copyright (c) 2020 Oracle and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width">
<title>Helidon Reactive Messaging</title>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<link rel="stylesheet" type="text/css" href="main.css">
<!-- Code snipped highlight -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/highlight.min.js"></script>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/tomorrow.min.css">
</head>
<body>
<div id="root">
<div id="producer">
<div id="inputCloud">
<input type="text" id="input" value="Fly Helidon!"/>
</div>
<div id="submit">Send</div>
</div>
<div id="helidon"></div>
<div id="rest-tip">
<div id="rest-tip-arrow"></div>
<div id="rest-tip-label">REST call /rest/messages/send/{msg}</div>
</div>
<div id="sse-tip">
<div id="sse-tip-label">Messages received from Kafka over websocket</div>
<div id="sse-tip-arrow"></div>
</div>
<div id="msgBox"></div>
<div id="snippet">
<pre>
<code class="java"></code>
</pre>
</div>
</div>
<script>
const snippet = $('.java');
const helidon = $('#helidon');
const messageBox = $('#msgBox');
const input = $("#input");
const submit = $("#submit");
// Include snippet, strip imports and highlight code
$.get('MsgProcessingBean.java', function (data) {
snippet.html(data
.replace(/[\s\S]*@ApplicationScoped/gm,"@ApplicationScoped")
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
);
snippet.each(function(i, e) {hljs.highlightBlock(e)});
}, 'text');
// Pre-filled example messages
const templates = ["Fly Helidon!", "We ❤️ microservices!", "Reactive wings are faster!"];
let template_idx = 1;
submit.on("click", onSubmit);
var ws = new WebSocket("ws://127.0.0.1:7001/ws/messages");
ws.onmessage = function (e) {
console.log(e.data);
onMessage(e.data)
}
function onMessage(data) {
fly();
messageBox.prepend(`<div id="msg">${data}</div>`);
cleanQueue();
}
function cleanQueue() {
const messageQueue = $("#msgBox div");
if (messageQueue.length > 6) {
messageQueue
.last()
.fadeOut(100, function () {
$(this).remove();
cleanQueue();
});
}
}
function onSubmit() {
fetch("/rest/messages/send/" + input.val());
template_idx = template_idx < templates.length - 1 ? template_idx + 1 : 0;
input.val(templates[template_idx]);
}
function fly() {
const id = setInterval(handler, 2);
let position = -500;
function handler() {
if (position > (window.innerWidth)) {
clearInterval(id);
} else {
position += 50;
helidon.css("left", `${position}px`);
}
}
}
</script>
</body>
</html>

View File

@@ -0,0 +1,173 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#root {
background-color: #36ABF2;
font-family: Roboto,sans-serif;
color: #fff;
position: absolute;
overflow-x: hidden;
-ms-overflow-style: none; /* Internet Explorer 10+ */
scrollbar-width: none; /* Firefox */
top: 0;
left: 0;
width: 100%;
height: 100%;
}
#root::-webkit-scrollbar {
display: none; /* Safari and Chrome */
}
#helidon {
width: 509px;
height: 273px;
position: relative;
left: -509px;
z-index: 4;
background: url('img/frank.png');
}
#rest-tip {
position: relative;
top: -80px;
left: 160px;
}
#rest-tip-arrow {
width: 205px;
height: 304px;
z-index: 4;
top: -20px;
background: url('img/arrow-1.png');
}
#rest-tip-label {
position: absolute;
white-space: nowrap;
font-size: 18px;
font-weight: bold;
z-index: 4;
left: -60px;
}
#sse-tip {
position: absolute;
overflow: hidden;
display: flex;
width: auto;
height: auto;
top: 5%;
right: 10%;
z-index: 0;
}
#sse-tip-arrow {
position: relative;
top: -30px;
width: 296px;
height: 262px;
z-index: 4;
background: url('img/arrow-2.png');
}
#sse-tip-label {
position: relative;
white-space: nowrap;
font-size: 18px;
font-weight: bold;
z-index: 4;
}
#producer {
float: left;
position: relative;
width: 300px;
height: 100%;
margin: 50px;
padding: 10px;
z-index: 99;
}
#msgBox {
position: absolute;
width: 300px;
top: 25%;
right: 3%;
height: 100%;
margin: 50px;
padding: 10px;
z-index: 20;
}
#input {
width: 210px;
height: 22px;
top: 58px;
left: 30px;
background-color: white;
border-radius: 10px;
border-style: solid;
border-color: white;
position: absolute;
z-index: 10;
}
#inputCloud {
position: relative;
width: 310px;
height: 150px;
background: url('img/cloud.png');
}
#msg {
background-color: #D2EBFC;
color: #1A9BF4;
border-radius: 10px;
width: 300px;
height: 50px;
margin: 5px;
display: flex;
padding-left: 10px;
justify-content: center;
align-items: center;
z-index: 99;
}
#submit {
font-weight: bold;
background-color: aqua;
color: #1A9BF4;
border-radius: 12px;
width: 100px;
height: 30px;
display: flex;
justify-content: center;
align-items: center;
margin: 5px;
cursor: pointer;
}
#snippet {
position: absolute;
top: 15%;
left: 30%;
width: 40%;
z-index: 5;
}
.hljs {
border-radius: 10px;
font-size: 12px;
}

View File

@@ -0,0 +1,28 @@
#
# Copyright (c) 2020 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
app:
kafka:
bootstrap.servers: localhost:9092
topic: messaging-test-topic-1
server:
port: 7001
host: 0.0.0.0
static:
classpath:
location: /WEB
welcome: index.html

View File

@@ -0,0 +1,34 @@
#
# Copyright (c) 2020 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Example Logging Configuration File
# For more information see $JAVA_HOME/jre/lib/logging.properties
# Send messages to the console
handlers=io.helidon.common.HelidonConsoleHandler
# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n
# Global logging level. Can be overridden by specific loggers
.level=INFO
# Component specific log levels
#io.helidon.webserver.level=INFO
#io.helidon.config.level=INFO
#io.helidon.security.level=INFO
#io.helidon.common.level=INFO
#io.netty.level=INFO

View File

@@ -0,0 +1,29 @@
#!/bin/bash
#
# Copyright (c) 2020 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
TOPIC="messaging-test-topic-1"
SERVER="localhost:9092"
CONTAINER="helidon_kafka"
if [ -z "$1" ]; then
echo "No argument supplied, defaulting to topic ${TOPIC}"
else
TOPIC="$1"
fi
docker exec -it ${CONTAINER} sh -c \
"/opt/kafka/bin/kafka-console-consumer.sh --topic ${TOPIC} --bootstrap-server ${SERVER}"

View File

@@ -0,0 +1,29 @@
#!/bin/bash
#
# Copyright (c) 2020 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
TOPIC="messaging-test-topic-1"
SERVER="localhost:9092"
CONTAINER="helidon_kafka"
if [ -z "$1" ]; then
echo "No argument supplied, defaulting to topic ${TOPIC}"
else
TOPIC="$1"
fi
docker exec -it ${CONTAINER} sh -c \
"/opt/kafka/bin/kafka-console-producer.sh --topic ${TOPIC} --bootstrap-server ${SERVER}"

32
examples/messaging/kafkaRun.sh Executable file
View File

@@ -0,0 +1,32 @@
#!/bin/bash
#
# Copyright (c) 2020 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
if [[ "$(docker images -q helidon-test-kafka 2>/dev/null)" == "" ]]; then
# helidon:test-kafka not found, build it
docker build ./docker -t helidon-test-kafka -f ./docker/Dockerfile.kafka
fi
if [ ! "$(docker ps -q -f name=helidon_kafka)" ]; then
if [ "$(docker ps -aq -f status=exited -f name=helidon_kafka)" ]; then
# Clean up exited container
docker rm helidon_kafka
fi
# Run test Kafka in new container, stop it by pressing Ctrl+C
docker run -it --name helidon_kafka --network="host" helidon-test-kafka
fi
# Clean up exited container
docker rm helidon_kafka

View File

@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2020 Oracle and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.helidon.examples</groupId>
<artifactId>helidon-examples-project</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<groupId>io.helidon.examples.messaging</groupId>
<artifactId>helidon-examples-messaging-project</artifactId>
<name>Helidon Examples Reactive Messaging</name>
<packaging>pom</packaging>
<description>
Examples of Reactive Messaging usage
</description>
<modules>
<module>kafka-websocket-mp</module>
<module>kafka-websocket-se</module>
</modules>
</project>

View File

@@ -54,6 +54,7 @@
<module>dbclient</module>
<module>webclient</module>
<module>cors</module>
<module>messaging</module>
</modules>
<build>

View File

@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/beans_2_0.xsd"
version="2.0"
bean-discovery-mode="annotated">
</beans>

View File

@@ -91,14 +91,18 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<artifactId>kafka_2.12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.salesforce.kafka.test</groupId>

View File

@@ -235,7 +235,6 @@ class KafkaMpTest extends AbstractKafkaTest{
private static void cdiContainerUp() {
Set<Class<?>> classes = new HashSet<>();
classes.add(KafkaConnector.class);
classes.add(AbstractSampleBean.Channel1.class);
classes.add(AbstractSampleBean.Channel4.class);
classes.add(AbstractSampleBean.Channel5.class);