mirror of
https://github.com/jlengrand/helidon.git
synced 2026-03-10 08:21:17 +00:00
198 lines
7.4 KiB
Plaintext
198 lines
7.4 KiB
Plaintext
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
Copyright (c) 2020 Oracle and/or its affiliates.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
= Connector
|
|
:toc:
|
|
:toc-placement: preamble
|
|
:description: Reactive Messaging Connector in Helidon SE
|
|
:keywords: helidon, se, messaging, connector
|
|
:h1Prefix: SE
|
|
|
|
== Messaging Connector
|
|
Connector for Reactive Messaging is a factory producing Publishers and Subscribers for
|
|
Channels in Reactive Messaging. Messaging connector is just an implementation of
|
|
`IncomingConnectorFactory`, `OutgoingConnectorFactory` or both.
|
|
|
|
[source,java]
|
|
.Example connector `example-connector`:
|
|
----
|
|
@Connector("example-connector")
|
|
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
|
|
|
|
@Override
|
|
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
|
|
return ReactiveStreams.of("foo", "bar")
|
|
.map(Message::of);
|
|
}
|
|
|
|
@Override
|
|
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
|
|
return ReactiveStreams.<Message<?>>builder()
|
|
.map(Message::getPayload)
|
|
.forEach(o -> System.out.println("Connector says: " + o));
|
|
}
|
|
}
|
|
----
|
|
|
|
[source,yaml]
|
|
.Example of channel to connector mapping config:
|
|
----
|
|
mp.messaging.outgoing.to-connector-channel.connector: example-connector
|
|
mp.messaging.incoming.from-connector-channel.connector: example-connector
|
|
----
|
|
|
|
[source,java]
|
|
.Example producing to connector:
|
|
----
|
|
Messaging.builder()
|
|
.connector(new ExampleConnector())
|
|
.publisher(Channel.create("to-connector-channel"),
|
|
ReactiveStreams.of("fee", "fie")
|
|
.map(Message::of)
|
|
)
|
|
.build()
|
|
.start();
|
|
|
|
> Connector says: fee
|
|
> Connector says: fie
|
|
----
|
|
|
|
|
|
[source,java]
|
|
.Example consuming from connector:
|
|
----
|
|
Messaging.builder()
|
|
.connector(new ExampleConnector())
|
|
.subscriber(Channel.create("from-connector-channel"),
|
|
ReactiveStreams.<Message<String>>builder()
|
|
.peek(Message::ack)
|
|
.map(Message::getPayload)
|
|
.forEach(s -> System.out.println("Consuming: " + s))
|
|
)
|
|
.build()
|
|
.start();
|
|
|
|
> Consuming: foo
|
|
> Consuming: bar
|
|
----
|
|
|
|
=== Configuration
|
|
Messaging connector in Helidon SE can be configured explicitly by API or implicitly
|
|
by config following notation of link:https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_configuration[MicroProfile Reactive Messaging].
|
|
|
|
Configuration is being supplied to connector by Messaging implementation,
|
|
two mandatory attributes are always present:
|
|
|
|
* `channel-name` name of the channel which has this connector configured as Publisher or Subscriber, `Channel.create('name-of-channel')` in case of explicit configuration or `mp.messaging.incoming.name-of-channel.connector: connector-name` in case of implicit config
|
|
* `connector` name of the connector `@Connector("connector-name")`
|
|
|
|
[source,java]
|
|
.Example connector accessing configuration:
|
|
----
|
|
@Connector("example-connector")
|
|
public class ExampleConnector implements IncomingConnectorFactory {
|
|
|
|
@Override
|
|
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(final Config config) {
|
|
|
|
String firstPropValue = config.getValue("first-test-prop", String.class);<1>
|
|
String secondPropValue = config.getValue("second-test-prop", String.class);
|
|
|
|
return ReactiveStreams.of(firstPropValue, secondPropValue)
|
|
.map(Message::of);
|
|
}
|
|
}
|
|
----
|
|
<1> Config context is merged from channel and connector contexts
|
|
|
|
==== Explicit Config
|
|
|
|
An explicit config for channel's publisher is possible with `Channel.Builder#publisherConfig(Config config)`
|
|
and for subscriber with `Channel.Builder#subscriberConfig(Config config)`.
|
|
Supplied <<se/config/01_introduction.adoc,Heldion Config>> is merged with
|
|
mandatory attributes and any implicit config found. Resulting config is served to Connector.
|
|
|
|
[source,java]
|
|
.Example consuming from Kafka connector with explicit config:
|
|
----
|
|
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
|
|
String topic = config.get("app.kafka.topic").asString().get();
|
|
|
|
Channel<String> fromKafka = Channel.<String>builder()<1><2>
|
|
.name("from-kafka")
|
|
.publisherConfig(KafkaConnector.configBuilder()
|
|
.bootstrapServers(kafkaServer)
|
|
.groupId("example-group-" + session.getId())
|
|
.topic(topic)
|
|
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
|
|
.enableAutoCommit(true)
|
|
.keyDeserializer(StringDeserializer.class)
|
|
.valueDeserializer(StringDeserializer.class)
|
|
.build()
|
|
)
|
|
.build();
|
|
|
|
KafkaConnector kafkaConnector = KafkaConnector.create();<3>
|
|
|
|
Messaging messaging = Messaging.builder()
|
|
.connector(kafkaConnector)
|
|
.listener(fromKafka, payload -> {
|
|
System.out.println("Kafka says: " + payload);
|
|
})
|
|
.build()
|
|
.start();
|
|
----
|
|
<1> Prepare channel for connecting kafka connector with specific publisher configuration -> listener,
|
|
<2> Channel -> connector mapping is automatic when using `KafkaConnector.configBuilder()`
|
|
<3> Prepare Kafka connector, can be used by any channel
|
|
|
|
==== Implicit Config
|
|
Implicit config without any hard-coding is possible with <<se/config/01_introduction.adoc,Heldion Config>> following notation of link:https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_configuration[MicroProfile Reactive Messaging].
|
|
|
|
[source,yaml]
|
|
.Example of channel to connector mapping config with custom properties:
|
|
----
|
|
mp.messaging.incoming.from-connector-channel.connector: example-connector<1>
|
|
mp.messaging.incoming.from-connector-channel.first-test-prop: foo<2>
|
|
mp.messaging.connector.example-connector.second-test-prop: bar<3>
|
|
----
|
|
<1> Channel -> Connector mapping
|
|
<2> Channel configuration properties
|
|
<3> Connector configuration properties
|
|
|
|
[source,java]
|
|
.Example consuming from connector:
|
|
----
|
|
Messaging.builder()
|
|
.connector(new ExampleConnector())
|
|
.listener(Channel.create("from-connector-channel"),
|
|
s -> System.out.println("Consuming: " + s))
|
|
.build()
|
|
.start();
|
|
|
|
> Consuming: foo
|
|
> Consuming: bar
|
|
----
|
|
|
|
=== Reusability in MP Messaging
|
|
As the API is the same for <<mp/reactivemessaging/01_introduction.adoc,MicroProfile Reactive Messaging>>
|
|
connectors, all that is needed to make connector work in both ways is annotating it with
|
|
`@ApplicationScoped`. Such connector is treated as a bean in Helidon MP.
|
|
|
|
For specific informations about creating messaging connectors for Helidon MP visit
|
|
<<mp/reactivemessaging/03_connector.adoc,Messaging Connector Bean>>. |