mirror of
https://github.com/jlengrand/helidon.git
synced 2026-03-10 08:21:17 +00:00
Helidon Reactive documentation (#1483)
* Helidon Reactive documentation Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
This commit is contained in:
@@ -23,4 +23,140 @@
|
||||
:description: {spec-name} support in Helidon MP
|
||||
:keywords: helidon, mp, microprofile, messaging
|
||||
|
||||
== This page is Under Construction and will be available soon
|
||||
== Reactive Messaging
|
||||
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html[MicroProfile Reactive Messaging]
|
||||
uses CDI beans to produce, consume or process messages over Reactive Streams.
|
||||
Such messaging bean is expected to be either in `ApplicationScoped` or `Dependent` scope.
|
||||
Messages are managed by methods annotated by `@Incoming` and `@Outgoing`
|
||||
and the invocation is always driven by message core - either at assembly time, or for every message coming from the stream.
|
||||
|
||||
WARNING: Messaging methods are not meant to be invoked directly!
|
||||
|
||||
[[terms]]
|
||||
.Terms definition
|
||||
|===
|
||||
|https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_supported_method_signatures[messaging method]| bean method invoked by messaging Specification
|
||||
|https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_connector[connector]| Reactive Messaging connector
|
||||
|https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_channel[channel]| named pair of producer and consumer, both sides can be either messaging method or connector
|
||||
|===
|
||||
|
||||
The bean can have methods annotated by
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_message_consumption_with_incoming[`@Incoming`],
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_message_production_with_outgoing[`@Outgoing`] or both.
|
||||
|
||||
=== Consuming methods with `@Incoming` annotation
|
||||
|
||||
The annotation has one required attribute `value` that defines the channel name.
|
||||
|
||||
Such annotated <<terms,messaging method>> can function in two ways:
|
||||
|
||||
* consume every message coming from the stream connected to the <<terms, channel>>
|
||||
* prepare reactive stream's subscriber and connect it to the channel
|
||||
|
||||
[source,java]
|
||||
.Example consuming every message from channel `example-channel-2`:
|
||||
----
|
||||
@Incoming("example-channel-2")
|
||||
public void printMessage(String msg) {
|
||||
System.out.println("Just received message: " + msg);
|
||||
}
|
||||
----
|
||||
|
||||
[source,java]
|
||||
.Example preparing reactive stream subscriber for channel `example-channel-1`:
|
||||
----
|
||||
@Incoming("example-channel-2")
|
||||
public Subscriber<String> printMessage() {
|
||||
return ReactiveStreams.<String>builder()
|
||||
.forEach(msg -> System.out.println("Just received message: " + msg))
|
||||
.build();
|
||||
}
|
||||
----
|
||||
|
||||
=== Producing methods with `@Outgoing` annotation
|
||||
|
||||
The annotation has one required attribute `value` that defines the
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_channel[channel]
|
||||
name.
|
||||
|
||||
Such annotated <<terms,messaging method>> can function in two ways:
|
||||
|
||||
* produce exactly one message to the stream connected to the
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_channel[channel]
|
||||
* prepare reactive stream's publisher and connect it to the
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_channel[channel]
|
||||
|
||||
[source,java]
|
||||
.Example producing exactly one message to channel `example-channel-1`:
|
||||
----
|
||||
@Outgoing("example-channel-1")
|
||||
public String produceMessage() {
|
||||
return "foo";
|
||||
}
|
||||
----
|
||||
|
||||
[source,java]
|
||||
.Example preparing reactive stream publisher publishing three messages to the channel `example-channel-1`:
|
||||
----
|
||||
@Outgoing("example-channel-1")
|
||||
public Publisher<String> printMessage() {
|
||||
return ReactiveStreams.of("foo", "bar", "baz").buildRs();
|
||||
}
|
||||
----
|
||||
|
||||
=== Processing methods with `@Incoming` and `@Outgoing` annotation
|
||||
|
||||
Such
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_method_consuming_and_producing[methods]
|
||||
acts as processors, consuming messages from one channel and producing to another.
|
||||
|
||||
Such annotated <<terms,messaging method>> can function in multiple ways:
|
||||
|
||||
* process every message
|
||||
* prepare reactive stream's processor and connect it between the channels
|
||||
* on every message prepare new publisher(equivalent to `flatMap` operator)
|
||||
|
||||
[source,java]
|
||||
.Example processing every message from channel `example-channel-1` to channel `example-channel-2`:
|
||||
----
|
||||
@Incoming("example-channel-1")
|
||||
@Outgoing("example-channel-2")
|
||||
public String processMessage(String msg) {
|
||||
return msg.toUpperCase();
|
||||
}
|
||||
----
|
||||
|
||||
[source,java]
|
||||
.Example preparing processor stream to be connected between channels `example-channel-1` and `example-channel-2`:
|
||||
----
|
||||
@Incoming("example-channel-1")
|
||||
@Outgoing("example-channel-2")
|
||||
public Processor<String, String> processMessage() {
|
||||
return ReactiveStreams.<String>builder()
|
||||
.map(String::toUpperCase)
|
||||
.buildRs();
|
||||
}
|
||||
----
|
||||
|
||||
[source,java]
|
||||
.Example processing every message from channel `example-channel-1`as stream to be flattened to channel `example-channel-2`:
|
||||
----
|
||||
@Incoming("example-channel-1")
|
||||
@Outgoing("example-channel-2")
|
||||
public String processMessage(String msg) {
|
||||
return ReactiveStreams.of(msg.toUpperCase(), msg.toLowerCase()).buildRs();
|
||||
}
|
||||
----
|
||||
|
||||
=== Dependency
|
||||
|
||||
Declare the following dependency in your project:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>io.helidon.microprofile.messaging</groupId>
|
||||
<artifactId>helidon-microprofile-messaging</artifactId>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
146
docs/mp/reactivemessaging/02_message.adoc
Normal file
146
docs/mp/reactivemessaging/02_message.adoc
Normal file
@@ -0,0 +1,146 @@
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
= Message
|
||||
|
||||
== Message
|
||||
The Reactive Messaging
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_message[Message]
|
||||
class can be used to wrap or unwrap data items between methods and connectors.
|
||||
The message wrapping and unwrapping can be performed explicitly by using
|
||||
`org.eclipse.microprofile.reactive.messaging.Message#of(T)` or implicitly through the messaging core.
|
||||
|
||||
[source,java]
|
||||
.Example of explicit and implicit wrapping and unwrapping
|
||||
----
|
||||
@Outgoing("publisher-payload")
|
||||
public PublisherBuilder<Integer> streamOfMessages() {
|
||||
return ReactiveStreams.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
|
||||
}
|
||||
|
||||
@Incoming("publisher-payload")
|
||||
@Outgoing("wrapped-message")
|
||||
public Message<String> rewrapMessageManually(Message<Integer> message) {
|
||||
return Message.of(Integer.toString(message.getPayload()));
|
||||
}
|
||||
|
||||
@Incoming("wrapped-message")
|
||||
public void consumeImplicitlyUnwrappedMessage(String value) {
|
||||
System.out.println("Consuming message: " + value);
|
||||
}
|
||||
----
|
||||
|
||||
=== Acknowledgement
|
||||
Message carries a callback for reception acknowledgement, acknowledgement in messaging methods is possible manually by
|
||||
`org.eclipse.microprofile.reactive.messaging.Message#ack` or automatically according explicit
|
||||
or implicit acknowledgement strategy by messaging core. Explicit strategy configuration is possible
|
||||
with `@Acknowledgment` annotation which has one required attribute `value` that expects the strategy type from enum
|
||||
`org.eclipse.microprofile.reactive.messaging.Acknowledgment.Strategy`. More information about supported signatures
|
||||
and implicit automatic acknowledgement can be found in specification
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_message_acknowledgement[Message acknowledgement].
|
||||
|
||||
[[terms]]
|
||||
.Acknowledgement strategies
|
||||
|===
|
||||
|`@Acknowledgment(Acknowledgment.Strategy.NONE)`| No acknowledgment
|
||||
|`@Acknowledgment(Acknowledgment.Strategy.MANUAL)`| No automatic acknowledgment
|
||||
|`@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)`| Ack automatically before method invocation or processing
|
||||
|`@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)`| Ack automatically after method invocation or processing
|
||||
|===
|
||||
|
||||
[source,java]
|
||||
.Example of manual acknowledgment
|
||||
----
|
||||
@Outgoing("consume-and-ack")
|
||||
public PublisherBuilder<Integer> streamOfMessages() {
|
||||
return ReactiveStreams.of(Message.of("This is Payload", () -> {
|
||||
System.out.println("This articular message was acked!");
|
||||
return CompletableFuture.completedFuture(null);
|
||||
})).buildRs();
|
||||
}
|
||||
|
||||
@Incoming("consume-and-ack")
|
||||
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
|
||||
public void receiveAndAckMessage(Message<String> msg) {
|
||||
// Calling ack() will print "This articular message was acked!" to System.out
|
||||
msg.ack();
|
||||
}
|
||||
----
|
||||
|
||||
[source,java]
|
||||
.Example of manual acknowledgment
|
||||
----
|
||||
@Outgoing("consume-and-ack")
|
||||
public PublisherBuilder<Integer> streamOfMessages() {
|
||||
return ReactiveStreams.of(Message.of("This is Payload", () -> {
|
||||
System.out.println("This articular message was acked!");
|
||||
return CompletableFuture.completedFuture(null);
|
||||
})).buildRs();
|
||||
}
|
||||
|
||||
@Incoming("consume-and-ack")
|
||||
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
|
||||
public void receiveAndAckMessage(Message<String> msg) {
|
||||
// Calling ack() will print "This articular message was acked!" to System.out
|
||||
msg.ack();
|
||||
}
|
||||
----
|
||||
[source,java]
|
||||
.Example of explicit pre-process acknowledgment
|
||||
----
|
||||
@Outgoing("consume-and-ack")
|
||||
public PublisherBuilder<Integer> streamOfMessages() {
|
||||
return ReactiveStreams.of(Message.of("This is Payload", () -> {
|
||||
System.out.println("This articular message was acked!");
|
||||
return CompletableFuture.completedFuture(null);
|
||||
})).buildRs();
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints to the console:
|
||||
* > This articular message was acked!
|
||||
* > Method invocation!
|
||||
*/
|
||||
@Incoming("consume-and-ack")
|
||||
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
|
||||
public void receiveAndAckMessage(Message<String> msg) {
|
||||
System.out.println("Method invocation!");
|
||||
}
|
||||
----
|
||||
[source,java]
|
||||
.Example of explicit post-rocess acknowledgment
|
||||
----
|
||||
@Outgoing("consume-and-ack")
|
||||
public PublisherBuilder<Integer> streamOfMessages() {
|
||||
return ReactiveStreams.of(Message.of("This is Payload", () -> {
|
||||
System.out.println("This articular message was acked!");
|
||||
return CompletableFuture.completedFuture(null);
|
||||
})).buildRs();
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints to the console:
|
||||
* > Method invocation!
|
||||
* > This articular message was acked!
|
||||
*/
|
||||
@Incoming("consume-and-ack")
|
||||
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
|
||||
public void receiveAndAckMessage(Message<String> msg) {
|
||||
System.out.println("Method invocation!");
|
||||
}
|
||||
----
|
||||
123
docs/mp/reactivemessaging/03_connector.adoc
Normal file
123
docs/mp/reactivemessaging/03_connector.adoc
Normal file
@@ -0,0 +1,123 @@
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
= Connector
|
||||
|
||||
== Connector Bean
|
||||
|
||||
Messaging connector is just an application scoped bean which implements
|
||||
`IncomingConnectorFactory`, `OutgoingConnectorFactory` or both.
|
||||
|
||||
[source,java]
|
||||
.Example connector `example-connector`:
|
||||
----
|
||||
@ApplicationScoped
|
||||
@Connector("example-connector")
|
||||
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
|
||||
|
||||
@Override
|
||||
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
|
||||
return ReactiveStreams.of("foo", "bar")
|
||||
.map(Message::of);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
|
||||
return ReactiveStreams.<Message<?>>builder()
|
||||
.map(Message::getPayload)
|
||||
.forEach(o -> System.out.println("Connector says: " + o));
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
[source,yaml]
|
||||
.Example of channel to connector mapping config:
|
||||
----
|
||||
mp.messaging.outgoing.to-connector-channel.connector: example-connector
|
||||
mp.messaging.incoming.from-connector-channel.connector: example-connector
|
||||
----
|
||||
|
||||
[source,java]
|
||||
.Example producing to connector:
|
||||
----
|
||||
@Outgoing("to-connector-channel")
|
||||
public Publisher<String> produce() {
|
||||
return Flowable.just("fee", "fie");
|
||||
}
|
||||
|
||||
> Connector says: fee
|
||||
> Connector says: fie
|
||||
----
|
||||
|
||||
|
||||
[source,java]
|
||||
.Example consuming from connector:
|
||||
----
|
||||
@Incoming("from-connector-channel")
|
||||
public void consume(String value) {
|
||||
System.out.println("Consuming: " + value);
|
||||
}
|
||||
|
||||
> Consuming: foo
|
||||
> Consuming: bar
|
||||
----
|
||||
|
||||
=== Configuration
|
||||
|
||||
[source,java]
|
||||
.Example connector accessing configuration:
|
||||
----
|
||||
@ApplicationScoped
|
||||
@Connector("example-connector")
|
||||
public class ExampleConnector implements IncomingConnectorFactory {
|
||||
|
||||
@Override
|
||||
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(final Config config) {
|
||||
|
||||
// Config context is merged from channel and connector contexts
|
||||
String firstPropValue = config.getValue("first-test-prop", String.class);
|
||||
String secondPropValue = config.getValue("second-test-prop", String.class);
|
||||
|
||||
return ReactiveStreams.of(firstPropValue, secondPropValue)
|
||||
.map(Message::of);
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
[source,yaml]
|
||||
.Example of channel to connector mapping config with custom properties:
|
||||
----
|
||||
# Channel -> Connector mapping
|
||||
mp.messaging.incoming.from-connector-channel.connector: example-connector
|
||||
# Channel configuration properties
|
||||
mp.messaging.incoming.from-connector-channel.first-test-prop: foo
|
||||
# Connector configuration properties
|
||||
mp.messaging.connector.example-connector.second-test-prop: bar
|
||||
----
|
||||
|
||||
[source,java]
|
||||
.Example consuming from connector:
|
||||
----
|
||||
@Incoming("from-connector-channel")
|
||||
public void consume(String value) {
|
||||
System.out.println("Consuming: " + value);
|
||||
}
|
||||
|
||||
> Consuming: foo
|
||||
> Consuming: bar
|
||||
----
|
||||
48
docs/mp/reactivestreams/01_overview.adoc
Normal file
48
docs/mp/reactivestreams/01_overview.adoc
Normal file
@@ -0,0 +1,48 @@
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
= Overview
|
||||
:toc:
|
||||
:toc-placement: preamble
|
||||
:h1Prefix: Se & Mp
|
||||
:helidon: Helidon
|
||||
:description: Reactive Streams support in {helidon}
|
||||
:keywords: helidon, mp, microprofile, reactivestreams
|
||||
|
||||
== Reactive Streams
|
||||
|
||||
[PILLARS]
|
||||
====
|
||||
[CARD]
|
||||
.Helidon Reactive Engine
|
||||
[icon=fa-cogs,link=mp/reactivestreams/02_engine.adoc]
|
||||
--
|
||||
A set of reactive operators.
|
||||
--
|
||||
|
||||
[CARD]
|
||||
.MicroProfile Reactive Streams Operators
|
||||
[icon=fa-book,link=mp/reactivestreams/03_rsoperators.adoc]
|
||||
--
|
||||
Microprofile implementation.
|
||||
--
|
||||
====
|
||||
|
||||
There are two handy apis for working with reactive streams available in {helidon},
|
||||
one for working with `java.util.concurrent.Flow`
|
||||
and another for `org.reactivestreams` based reactive components.
|
||||
25
docs/mp/reactivestreams/02_engine.adoc
Normal file
25
docs/mp/reactivestreams/02_engine.adoc
Normal file
@@ -0,0 +1,25 @@
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
= Helidon Reactive Engine
|
||||
:h1Prefix: Se & Mp
|
||||
:description: Dependecy-less reactive operators
|
||||
:keywords: helidon, reactive, streams, multi, single
|
||||
|
||||
== Helidon Reactive Engine
|
||||
include::../../shared/reactivestreams/02_engine.adoc[lines=21..]
|
||||
@@ -19,8 +19,10 @@
|
||||
= Reactive Streams Operators
|
||||
:toc:
|
||||
:toc-placement: preamble
|
||||
:h1Prefix: Se & Mp
|
||||
:spec-name: MicroProfile Reactive Streams Operators
|
||||
:description: {spec-name} support in Helidon MP
|
||||
:keywords: helidon, mp, microprofile, reactivestreams
|
||||
|
||||
== This page is Under Construction and will be available soon
|
||||
== Reactive Streams Operators
|
||||
include::../../shared/reactivestreams/03_rsoperators.adoc[lines=21..]
|
||||
48
docs/se/reactivestreams/01_overview.adoc
Normal file
48
docs/se/reactivestreams/01_overview.adoc
Normal file
@@ -0,0 +1,48 @@
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
= Overview
|
||||
:toc:
|
||||
:toc-placement: preamble
|
||||
:h1Prefix: Se & Mp
|
||||
:helidon: Helidon
|
||||
:description: Reactive Streams support in {helidon}
|
||||
:keywords: helidon, se, microprofile, reactivestreams
|
||||
|
||||
== Reactive Streams
|
||||
|
||||
[PILLARS]
|
||||
====
|
||||
[CARD]
|
||||
.Helidon Reactive Engine
|
||||
[icon=fa-cogs,link=se/reactivestreams/02_engine.adoc]
|
||||
--
|
||||
A set of reactive operators.
|
||||
--
|
||||
|
||||
[CARD]
|
||||
.MicroProfile Reactive Streams Operators
|
||||
[icon=fa-book,link=se/reactivestreams/03_rsoperators.adoc]
|
||||
--
|
||||
Microprofile implementation.
|
||||
--
|
||||
====
|
||||
|
||||
There are two handy apis for working with reactive streams available in {helidon},
|
||||
one for working with `java.util.concurrent.Flow`
|
||||
and second for `org.reactivestreams` based reactive components.
|
||||
25
docs/se/reactivestreams/02_engine.adoc
Normal file
25
docs/se/reactivestreams/02_engine.adoc
Normal file
@@ -0,0 +1,25 @@
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
= Helidon Reactive Engine
|
||||
:h1Prefix: Se & Mp
|
||||
:description: Dependecy-less reactive operators
|
||||
:keywords: helidon, reactive, streams, multi, single
|
||||
|
||||
== Helidon Reactive Engine
|
||||
include::../../shared/reactivestreams/02_engine.adoc[lines=21..]
|
||||
@@ -19,7 +19,10 @@
|
||||
= Reactive Streams Operators
|
||||
:toc:
|
||||
:toc-placement: preamble
|
||||
:description: Reactive Streams Operators support in Helidon SE
|
||||
:h1Prefix: Se & Mp
|
||||
:spec-name: MicroProfile Reactive Streams Operators
|
||||
:description: {spec-name} support in Helidon SE
|
||||
:keywords: helidon, se, microprofile, reactivestreams
|
||||
|
||||
== This page is Under Construction and will be available soon
|
||||
== Reactive Streams Operators
|
||||
include::../../shared/reactivestreams/03_rsoperators.adoc[lines=21..]
|
||||
152
docs/shared/reactivestreams/02_engine.adoc
Normal file
152
docs/shared/reactivestreams/02_engine.adoc
Normal file
@@ -0,0 +1,152 @@
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
= Reactive Engine
|
||||
== Reactive Engine
|
||||
|
||||
Helidon has its own set of reactive operators that have no dependencies outside of the Helidon ecosystem.
|
||||
These operators can be used with `java.util.concurrent.Flow` based reactive streams.
|
||||
Stream processing operator chain can be easily constructed by `io.helidon.common.reactive.Multi`, or
|
||||
`io.helidon.common.reactive.Single` for streams with single value.
|
||||
Implementation was contributed to Helidon by the world-renown reactive programming expert,
|
||||
project lead of RxJava and co-father of project Reactor,
|
||||
https://twitter.com/akarnokd[Dr. David Karnok].
|
||||
|
||||
[source,java]
|
||||
.Example of Multi usage:
|
||||
----
|
||||
AtomicInteger sum = new AtomicInteger();
|
||||
|
||||
Multi.just("1", "2", "3", "4", "5")
|
||||
.limit(3)
|
||||
.map(Integer::parseInt)
|
||||
.forEach(sum::addAndGet);
|
||||
|
||||
System.out.println("Sum: " + sum.get());
|
||||
|
||||
> Sum: 6
|
||||
----
|
||||
|
||||
[source,java]
|
||||
.Example of Single usage:
|
||||
----
|
||||
Single.just("1")
|
||||
.map(Integer::parseInt)
|
||||
.map(i -> i + 5)
|
||||
.toStage()
|
||||
.whenComplete((i, t) -> System.out.println("Result: " + i));
|
||||
|
||||
> Result: 6
|
||||
----
|
||||
|
||||
[[terms]]
|
||||
.Operators
|
||||
|===
|
||||
|defer|Call the given supplier function for each individual downstream Subscriber to return a Flow.Publisher to subscribe to.
|
||||
|map|Map this `Multi` instance to a new `Multi` of another type using the given `Mapper`.
|
||||
|defaultIfEmpty|Signals the default item if the upstream is empty.
|
||||
|switchIfEmpty|Switch to the other publisher if the upstream is empty.
|
||||
|peek|Invoke provided consumer for every item in stream.
|
||||
|distinct|Filter out all duplicates.
|
||||
|filter|Filter stream items with provided predicate.
|
||||
|takeWhile|Take the longest prefix of elements from this stream that satisfy the given predicate. As long as predicate returns true, items from upstream are sent to downstream, when predicate returns false stream is completed.
|
||||
|dropWhile|Drop the longest prefix of elements from this stream that satisfy the given predicate. As long as predicate returns true, items from upstream are NOT sent to downstream but being dropped, predicate is never called again after it returns false for the first time.
|
||||
|limit|Limit stream to allow only specified number of items to pass.
|
||||
|skip|Skip first n items, all the others are emitted.
|
||||
|flatMap|Transform each upstream item with the supplied function into a `Flow.Publisher`, subscribe to them and then flatten their items into a single sequence of items emitted to the downstream.
|
||||
|flatMap|Transform each upstream item with the supplied function and flatten the resulting `Flow.Publisher` to downstream while limiting the maximum number of concurrent inner `Flow.Publisher`s and their in-flight item count, optionally aggregating and delaying all errors until all sources terminate.
|
||||
|flatMapIterable|Transform each upstream item with the supplied function and flatten the resulting `Iterable` to the downstream.
|
||||
|observeOn|Re-emit the upstream's signals to the downstream on the given executor's thread using a default buffer size of 32 and errors skipping ahead of items.
|
||||
|observeOn|Re-emit the upstream's signals to the downstream on the given executor's thread.
|
||||
|forEach|Terminal stage, invokes provided consumer for every item in the stream.
|
||||
|collectList|Collect the items of this `Multi` instance into a `Single` of `List`.
|
||||
|collect|Collect the items of this `Multi` instance into a `Single`.
|
||||
|collect|Collect the items of this `Multi` into a collection provided via a `Supplier` and mutated by a `BiConsumer` callback.
|
||||
|collectStream|Collects up upstream items with the help of the callbacks of a `java.util.stream.Collector`.
|
||||
|reduce|Combine subsequent items via a callback function and emit the final value result as a Single.
|
||||
|reduce|Combine every upstream item with an accumulator value to produce a new accumulator value and emit the final accumulator value as a Single.
|
||||
|first|Get the first item of this `Multi` instance as a `Single`.
|
||||
|from|Wrap a CompletionStage into a Multi and signal its outcome non-blockingly.
|
||||
|from|Wrap a CompletionStage into a Multi and signal its outcome non-blockingly.
|
||||
|from|Create a `Multi` instance wrapped around the given publisher.
|
||||
|from|Create a `Multi` instance that publishes the given iterable.
|
||||
|from|Create a `Multi` instance that publishes the given `Stream`.
|
||||
|just|Create a `Multi` instance that publishes the given items to a single subscriber.
|
||||
|just|Create a `Multi` instance that publishes the given items to a single subscriber.
|
||||
|singleton|Create a `Multi` that emits a pre-existing item and then completes.
|
||||
|error|Create a `Multi` instance that reports the given exception to its subscriber(s). The exception is reported by invoking `Subscriber#onError(java.lang.Throwable)` when `Publisher#subscribe(Subscriber)` is called.
|
||||
|empty|Get a `Multi` instance that completes immediately.
|
||||
|never|Get a `Multi` instance that never completes.
|
||||
|concat|Concat streams to one.
|
||||
|onTerminate|Executes given `java.lang.Runnable` when any of signals onComplete, onCancel or onError is received.
|
||||
|onComplete|Executes given `java.lang.Runnable` when onComplete signal is received.
|
||||
|onError|Executes the given java.util.function.Consumer when an onError signal is received.
|
||||
|onCancel|Executes given `java.lang.Runnable` when a cancel signal is received.
|
||||
|takeUntil|Relay upstream items until the other source signals an item or completes.
|
||||
|range|Emits a range of ever increasing integers.
|
||||
|rangeLong|Emits a range of ever increasing longs.
|
||||
|timer|Signal 0L and complete the sequence after the given time elapsed.
|
||||
|interval|Signal 0L, 1L and so on periodically to the downstream.
|
||||
|interval|Signal 0L after an initial delay, then 1L, 2L and so on periodically to the downstream.
|
||||
|timeout|Signals a `TimeoutException` if the upstream doesn't signal the next item, error or completion within the specified time.
|
||||
|timeout|Switches to a fallback source if the upstream doesn't signal the next item, error or completion within the specified time.
|
||||
|onErrorResume|`java.util.function.Function` providing one item to be submitted as onNext in case of onError signal is received.
|
||||
|onErrorResumeWith|Resume stream from supplied publisher if onError signal is intercepted.
|
||||
|retry|Retry a failing upstream at most the given number of times before giving up.
|
||||
|retry|Retry a failing upstream if the predicate returns true.
|
||||
|retryWhen|Retry a failing upstream when the given function returns a publisher that signals an item.
|
||||
|
||||
|===
|
||||
|
||||
=== Operator chains composition
|
||||
|
||||
In the situations when part of the operator chain needs to be prepared in advance,
|
||||
`compose` and `to` operators are at hand.
|
||||
|
||||
[source,java]
|
||||
.Combining operator chains:
|
||||
----
|
||||
// Assembly of stream, nothing is streamed yet
|
||||
Multi<String> publisherStage =
|
||||
Multi.just("foo", "bar")
|
||||
.map(String::trim);
|
||||
|
||||
Function<Multi<T>, Multi<T>> processorStage =
|
||||
upstream ->
|
||||
upstream.map(String::toUpperCase);
|
||||
|
||||
// Execution of pre-prepared stream
|
||||
publisherStage
|
||||
.compose(processorStage)
|
||||
.map(s -> "Item received: " + s)
|
||||
.forEach(System.out::println);
|
||||
|
||||
> Item received: FOO
|
||||
> Item received: BAR
|
||||
----
|
||||
|
||||
=== Dependency
|
||||
|
||||
Declare the following dependency in your project:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>io.helidon.common</groupId>
|
||||
<artifactId>helidon-common-reactive</artifactId>
|
||||
</dependency>
|
||||
----
|
||||
125
docs/shared/reactivestreams/03_rsoperators.adoc
Normal file
125
docs/shared/reactivestreams/03_rsoperators.adoc
Normal file
@@ -0,0 +1,125 @@
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
= Reactive Streams Operators
|
||||
== Reactive Streams Operators
|
||||
|
||||
Implementation of
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-streams-operators-1.0.1/microprofile-reactive-streams-operators-spec.html[MicroProfile Reactive Streams Operators]
|
||||
specification. A standardised tool for manipulation with https://www.reactive-streams.org/[Reactive Streams],
|
||||
provides set of operators as so called stages,
|
||||
and the builders to prepare graphs of stages for streams to be build from.
|
||||
|
||||
[source,java]
|
||||
.Example of simple closed graph usage:
|
||||
----
|
||||
AtomicInteger sum = new AtomicInteger();
|
||||
|
||||
ReactiveStreams.of("1", "2", "3", "4", "5")
|
||||
.limit(3)
|
||||
.map(Integer::parseInt)
|
||||
.forEach(sum::addAndGet)
|
||||
.run()
|
||||
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get()));
|
||||
|
||||
> Sum: 6
|
||||
----
|
||||
|
||||
[[terms]]
|
||||
.Operators(Stages)
|
||||
|===
|
||||
|fromIterable | Create new PublisherBuilder from supplied Iterable
|
||||
|of | Create new PublisherBuilder emitting supplied elements
|
||||
|ofNullable | Empty stream if supplied item is null
|
||||
|iterate | Create infinite stream with every next item created by supplied operator from previous item
|
||||
|generate| Create infinite stream with every item created by invocation of supplier
|
||||
|empty| Create new PublisherBuilder emitting as a first thing complete signal
|
||||
|failed| Create new PublisherBuilder emitting as a first thing error signal
|
||||
|concat| Concat two streams
|
||||
|coupled| Two parallel streams sharing cancel, onError and onComplete signals
|
||||
|limit| Limit the size of the stream, when limit is reached completes
|
||||
|peek| Invoke consumer for every item passing this operator
|
||||
|filter| Drop item when expression result to false
|
||||
|map| Transform items
|
||||
|flatMap| Flatten supplied stream to current stream
|
||||
|flatMapIterable| Flatten supplied iterable to current stream
|
||||
|flatMapCompletionStage| Map elements to completion stage and wait for each to be completed, keeps the order
|
||||
|flatMapRSPublisher| Map elements to Publishers and flatten this sub streams to original stream
|
||||
|takeWhile| Let items pass until expression is true, first time its false completes
|
||||
|dropWhile| Drop items until expression is true, first time its false let everything pass
|
||||
|skip| Drop first n items
|
||||
|distinct| Let pass only distinct items
|
||||
|via| Connect supplied processor to current stream return supplied processor
|
||||
|onError| Invoke supplied consumer when onError signal received
|
||||
|onErrorResume| Emit one last supplied item when onError signal received
|
||||
|onErrorResumeWith| When onError signal received continue emitting from supplied publisher builder
|
||||
|onErrorResumeWithRsPublisher| When onError signal received continue emitting from supplied publisher
|
||||
|onComplete| Invoke supplied runnable when onComplete signal received
|
||||
|onTerminate| Invoke supplied runnable when onComplete or onError signal received
|
||||
|to| Connect this stream to supplied subscriber
|
||||
|toList| Collect all intercepted items to List
|
||||
|collect| Collect all intercepted items with provided collector
|
||||
|forEach| Invoke supplied Consumer for each intercepted item
|
||||
|ignore| Ignore all onNext signals, wait for onComplete
|
||||
|reduce| Reduction with provided expression
|
||||
|cancel| Cancel stream immediately
|
||||
|findFirst| Return first intercepted element
|
||||
|===
|
||||
|
||||
=== Graphs
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-streams-operators-1.0.1/microprofile-reactive-streams-operators-spec.html#_graphs[Graphs]
|
||||
are pre-prepared stream builders with
|
||||
https://download.eclipse.org/microprofile/microprofile-reactive-streams-operators-1.0.1/microprofile-reactive-streams-operators-spec.html#_stages[stages],
|
||||
which can be combined together to closed graph with methods `via` and `to`.
|
||||
|
||||
[source,java]
|
||||
.Combining the graphs and running the stream:
|
||||
----
|
||||
// Assembly of stream, nothing is streamed yet
|
||||
PublisherBuilder<String> publisherStage =
|
||||
ReactiveStreams.of("foo", "bar")
|
||||
.map(String::trim);
|
||||
|
||||
ProcessorBuilder<String, String> processorStage =
|
||||
ReactiveStreams.<String>builder()
|
||||
.map(String::toUpperCase);
|
||||
|
||||
SubscriberBuilder<String, Void> subscriberStage =
|
||||
ReactiveStreams.<String>builder()
|
||||
.map(s -> "Item received: " + s)
|
||||
.forEach(System.out::println);
|
||||
|
||||
// Execution of pre-prepared stream
|
||||
publisherStage
|
||||
.via(processorStage)
|
||||
.to(subscriberStage).run();
|
||||
|
||||
> Item received: FOO
|
||||
> Item received: BAR
|
||||
----
|
||||
|
||||
=== Dependency
|
||||
|
||||
Declare the following dependency in your project:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>io.helidon.microprofile.reactive-streams</groupId>
|
||||
<artifactId>helidon-microprofile-reactive-streams</artifactId>
|
||||
</dependency>
|
||||
----
|
||||
Reference in New Issue
Block a user