Merge pull request #7177 from cescoffier/features/resteasy-mutiny

Implement Resteasy Mutiny support
This commit is contained in:
Guillaume Smet
2020-02-18 17:52:21 +01:00
committed by GitHub
36 changed files with 1687 additions and 141 deletions

View File

@@ -568,7 +568,18 @@
<artifactId>quarkus-scala-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-mutiny-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-qute-deployment</artifactId>

View File

@@ -982,6 +982,11 @@
<artifactId>quarkus-mutiny</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-mutiny</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>

View File

@@ -368,9 +368,10 @@ stages:
parameters:
poolSettings: ${{parameters.poolSettings}}
expectUseVMs: ${{parameters.expectUseVMs}}
timeoutInMinutes: 35
timeoutInMinutes: 55
modules:
- resteasy-jackson
- resteasy-mutiny
- vertx
- vertx-http
- vertx-graphql

View File

@@ -56,6 +56,7 @@ public final class FeatureBuildItem extends MultiBuildItem {
public static final String RESTEASY_JACKSON = "resteasy-jackson";
public static final String RESTEASY_JAXB = "resteasy-jaxb";
public static final String RESTEASY_JSONB = "resteasy-jsonb";
public static final String RESTEASY_MUTINY = "resteasy-mutiny";
public static final String RESTEASY_QUTE = "resteasy-qute";
public static final String REST_CLIENT = "rest-client";
public static final String SCALA = "scala";

View File

@@ -445,16 +445,14 @@ NOTE: Before running the application, don't forget to stop the hot reload mode (
== Async
The resource can also use `CompletionStage` as return type to handle asynchronous actions:
The resource can also use `CompletionStage` or `Uni` (requires the `quarkus-resteasy-mutiny` extension) as return type to handle asynchronous actions:
[source,java]
----
@GET
@Produces(MediaType.TEXT_PLAIN)
public CompletionStage<String> hello() {
return CompletableFuture.supplyAsync(() -> {
return "hello";
});
public Uni<String> hello() {
return Uni.createFrom().item(() -> "hello");
}
----

View File

@@ -199,29 +199,32 @@ Sending and publishing messages use the Vert.x event bus:
[source, java]
----
package org.acme;
package org.acme.vertx;
import io.vertx.axle.core.eventbus.EventBus;
import io.vertx.axle.core.eventbus.Message;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import java.util.concurrent.CompletionStage;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; // <1>
@Inject
EventBus bus; // <1>
@GET
@Path("/{name}")
public CompletionStage<String> hello(String name) {
return bus.<String>request("greeting", name) // <2>
.thenApply(Message::body);
}
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(@PathParam String name) {
return bus.<String>request("greeting", name) // <2>
.onItem().apply(Message::body);
}
}
----
<1> Inject the Event bus
@@ -236,13 +239,12 @@ The `EventBus` object provides methods to:
[source, java]
----
// Case 1
bus.request("address", "hello");
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("address", "hello");
bus.publish("greeting", name)
// Case 3
bus.request("address", "hello, how are you?").thenAccept(message -> {
// response
});
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().apply(Message::body);
----
== Putting things together - bridging HTTP and messages
@@ -272,30 +274,34 @@ Then, creates a new JAX-RS resource with the following content:
----
package org.acme.vertx;
import io.vertx.axle.core.eventbus.EventBus;
import io.vertx.axle.core.eventbus.Message;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import java.util.concurrent.CompletionStage;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/hello")
@Path("/async")
public class EventResource {
@Inject EventBus bus;
@Inject
EventBus bus;
@GET
@Path("/async/{name}")
public CompletionStage<String> hello(@PathParam("name") String name) {
return bus.<String>request("greeting", name) // <1>
.thenApply(Message::body); // <2>
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(@PathParam String name) {
return bus.<String>request("greeting", name) // <1>
.onItem().apply(Message::body); // <2>
}
}
----
<1> send the `name` to the `greeting` address
<2> when we get the reply, extract the body and send this as response to the user
<1> send the `name` to the `greeting` address and request a response
<2> when we get the response, extract the body and send it to the user
If you call this endpoint, you will wait and get a timeout. Indeed, no one is listening.
So, we need a consumer listening on the `greeting` address. Create a `GreetingService` bean with the following content:

View File

@@ -234,7 +234,9 @@ The code above uses link:http://rest-assured.io/[REST Assured]'s link:https://gi
== Async Support
The rest client supports asynchronous rest calls. Let's see it in action by adding a `getByNameAsync` method in our `CountriesService` REST interface. The code should look like:
The rest client supports asynchronous rest calls.
Async support comes in 2 flavors: you can return a `CompletionStage` or a `Uni` (requires the `quarkus-resteasy-mutiny` extension).
Let's see it in action by adding a `getByNameAsync` method in our `CountriesService` REST interface. The code should look like:
[source, java]
----
@@ -324,6 +326,70 @@ public void testCountryNameAsyncEndpoint() {
}
----
The `Uni` version is very similar:
[source, java]
----
package org.acme.restclient;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import java.util.concurrent.CompletionStage;
import java.util.Set;
@Path("/v2")
@RegisterRestClient
public interface CountriesService {
// ...
@GET
@Path("/name/{name}")
@Produces("application/json")
Uni<Set<Country>> getByNameAsUni(@PathParam String name);
}
----
The `CountriesResource` becomes:
[source,java]
----
package org.acme.restclient;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.CompletionStage;
import java.util.Set;
@Path("/country")
public class CountriesResource {
@Inject
@RestClient
CountriesService countriesService;
// ...
@GET
@Path("/name-uni/{name}")
@Produces(MediaType.APPLICATION_JSON)
public Uni<Set<Country>> nameAsync(@PathParam String name) {
return countriesService.getByNameAsUni(name);
}
}
----
== Package and run the application
Run the application with: `./mvnw compile quarkus:dev`.

View File

@@ -135,13 +135,49 @@ Quarkus web resources support asynchronous processing and streaming results over
=== Asynchronous processing
Most programming guides start easy with a greeting service and this one makes no exception.
To asynchronously greet a client, the endpoint method must return a `java.util.concurrent.CompletionStage`:
To asynchronously handle the HTTP request, the endpoint method must return a `java.util.concurrent.CompletionStage` or an `io.smallrye.mutiny.Uni` (requires the `quarkus-resteasy-mutiny` extension):
[source,java]
----
@Path("/hello")
@Path("/lorem")
public class GreetingResource {
@GET
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> doSomethingAsync() {
// Mimic an asynchronous computation.
return Uni.createFrom()
.item(() -> "Hello!")
.onItem().delayIt().by(Duration.ofMillis(10));
}
}
----
[source, shell]
----
./mvnw compile quarkus:dev
----
Then, open your browser to 'http://localhost:8080/lorem' and you should get the message.
So far so good.
Now let's use the Vert.x API instead of this artificial delay:
[source,java]
----
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/lorem")
public class GreetingResource {
@Inject
@@ -149,59 +185,30 @@ public class GreetingResource {
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public CompletionStage<String> greeting(@PathParam String name) {
// When complete, return the content to the client
CompletableFuture<String> future = new CompletableFuture<>();
long start = System.nanoTime();
// TODO: asynchronous greeting
return future;
public Uni<String> doSomethingAsync() {
return vertx.fileSystem().readFile("/META-INF/resources/lorem.txt")
.onItem().apply(b -> b.toString("UTF-8"));
}
}
----
So far so good.
Now let's use the Vert.x API to implement the artificially delayed reply with the `setTimer` provided by Vert.x:
In this code, we inject the `vertx` instance (`io.vertx.mutiny.core.Vertx`) and read a file from the file system.
[source,java]
Create the `src/main/resources/META_INF/resources/lorem.txt` file with the following content:
[source,text]
----
// Delay reply by 10ms
vertx.setTimer(10, l -> {
// Compute elapsed time in milliseconds
long duration = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
// Format message
String message = String.format("Hello %s! (%d ms)%n", name, duration);
// Complete
future.complete(message);
});
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
----
That's it.
Now start Quarkus in `dev` mode with:
[source, shell]
----
./mvnw compile quarkus:dev
----
Eventually, open your browser and navigate to http://localhost:8080/hello/Quarkus, you should see:
[source, shell]
----
Hello Quarkus! (10 ms)
----
Then, refresh the page, you should see the _lorem ipsum_ text.
=== Streaming using Server-Sent Events
Quarkus web resources that need to send content as https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events[server-sent events] must have a method:
* declaring the `text/event-stream` response content type
* returning a https://www.reactive-streams.org/[Reactive Streams] `Publisher` or an RX Java 2 `Observable` or `Flowable`
* returning a https://www.reactive-streams.org/[Reactive Streams] `Publisher` or Mutiny `Multi` (requires the `quarkus-resteasy-mutiny` extension)
In practice, a streaming greeting service would look like:
@@ -212,50 +219,44 @@ public class StreamingResource {
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("{name}/streaming")
public Publisher<String> greeting(@PathParam String name) {
// TODO: create a Reactive Streams publisher
@Path("/{name}")
public Multi<String> greeting(@PathParam String name) {
// TODO: create a Reactive Streams publisher or a Mutiny Multi
return publisher;
}
}
----
How to create a Reactive Streams publisher?
There are a few ways to do this:
1. If you use `io.vertx.axle.core.Vertx`, the API provides `toPublisher` methods (and then use RX Java 2 or Reactive Streams Operators to manipulate the stream)
2. You can also use `io.vertx.reactivex.core.Vertx` which already provides RX Java 2 (RX Java 2 `Flowable` implement Reactive Streams `publisher`).
The first approach can be implemented as follows:
Now we just need to return our `Publisher` or `Multi`:
[source, java]
----
// Use io.vertx.axle.core.Vertx;
@Inject Vertx vertx;
package org.acme.vertx;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("{name}/streaming")
public Publisher<String> greeting(@PathParam String name) {
return vertx.periodicStream(2000).toPublisherBuilder()
.map(l -> String.format("Hello %s! (%s)%n", name, new Date()))
.buildRs();
}
----
import io.smallrye.mutiny.Multi;
import io.vertx.mutiny.core.Vertx;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
The second approach slightly differs:
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.Date;
[source,java]
----
// Use io.vertx.reactivex.core.Vertx;
@Inject Vertx vertx;
@Path("/stream")
public class StreamingResource {
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("{name}/streaming")
public Publisher<String> greeting(@PathParam String name) {
return vertx.periodicStream(2000).toFlowable()
.map(l -> String.format("Hello %s! (%s)%n", name, new Date()));
@Inject
Vertx vertx;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("/{name}")
public Multi<String> greeting(@PathParam String name) {
return vertx.periodicStream(2000).toMulti()
.map(l -> String.format("Hello %s! (%s)%n", name, new Date()));
}
}
----
@@ -284,13 +285,17 @@ The magic, as always, lies in the Javascript code:
[source,javascript]
.META-INF/resources/streaming.js
----
var eventSource = new EventSource("/hello/Quarkus/streaming");
eventSource.onmessage = function (event) {
var container = document.getElementById("container");
var paragraph = document.createElement("p");
paragraph.innerHTML = event.data;
container.appendChild(paragraph);
};
if (!!window.EventSource) {
var eventSource = new EventSource("/stream/Quarkus");
eventSource.onmessage = function (event) {
var container = document.getElementById("container");
var paragraph = document.createElement("p");
paragraph.innerHTML = event.data;
container.appendChild(paragraph);
};
} else {
window.alert("EventSource not available on this browser.")
}
----
IMPORTANT: Most browsers support SSE but some don't.
@@ -301,11 +306,15 @@ A new greeting should show-up every 2 seconds.
[source, text]
----
Hello Quarkus! (Thu Mar 21 17:26:12 CET 2019)
Hello Quarkus! (Wed Feb 12 17:13:55 CET 2020)
Hello Quarkus! (Thu Mar 21 17:26:14 CET 2019)
Hello Quarkus! (Wed Feb 12 17:13:57 CET 2020)
Hello Quarkus! (Thu Mar 21 17:26:16 CET 2019)
Hello Quarkus! (Wed Feb 12 17:13:59 CET 2020)
Hello Quarkus! (Wed Feb 12 17:14:01 CET 2020)
Hello Quarkus! (Wed Feb 12 17:14:03 CET 2020)
...
----
@@ -351,7 +360,7 @@ Then, navigate to http://localhost:8080/hello/Quarkus/array:
["Hello","Quarkus"]
----
Needless to say, this works equally well when the JSON content is a request body or is wrapped in a `CompletionStage` or `Publisher`.
Needless to say, this works equally well when the JSON content is a request body or is wrapped in a `Uni`, `Multi`, `CompletionStage` or `Publisher`.
== Using Vert.x Clients
@@ -397,7 +406,7 @@ In this guide, we are going to use the Axle API, so:
----
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-axle-web-client</artifactId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
----
@@ -408,11 +417,6 @@ Now, create a new resource in your project with the following content:
----
package org.acme.vertx;
import io.vertx.axle.core.Vertx;
import io.vertx.axle.ext.web.client.WebClient;
import io.vertx.axle.ext.web.codec.BodyCodec;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
@@ -420,12 +424,17 @@ import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
@Path("/swapi")
public class ResourceUsingWebClient {
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
@Path("/fruit-data")
public class ResourceUsingWebClient {
@Inject
Vertx vertx;
@@ -435,16 +444,17 @@ public class ResourceUsingWebClient {
@PostConstruct
void initialize() {
this.client = WebClient.create(vertx,
new WebClientOptions().setDefaultHost("swapi.co").setDefaultPort(443).setSsl(true));
new WebClientOptions().setDefaultHost("fruityvice.com")
.setDefaultPort(443).setSsl(true).setTrustAll(true));
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{id}")
public CompletionStage<JsonObject> getStarWarsCharacter(@PathParam int id) {
return client.get("/api/people/" + id)
@Path("/{name}")
public Uni<JsonObject> getFruitData(@PathParam("name") String name) {
return client.get("/api/fruit/" + name)
.send()
.thenApply(resp -> {
.onItem().apply(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
@@ -456,11 +466,12 @@ public class ResourceUsingWebClient {
}
}
----
This resource creates a `WebClient` and upon request use this client to invoke the https://swapi.co/ API.
This resource creates a `WebClient` and upon request use this client to invoke the _fruityvice_ API.
Depending on the result the response is forwarded as it's received, or a new JSON object is created with the status and body.
The `WebClient` is obviously asynchronous (and non-blocking), to the endpoint returns a `CompletionStage`.
The `WebClient` is obviously asynchronous (and non-blocking), to the endpoint returns a `Uni`.
Run the application with:
@@ -469,7 +480,7 @@ Run the application with:
./mvnw compile quarkus:dev
----
And then, open a browser to: http://localhost:8080/swapi/1. You should get _Luke Skywalker_.
And then, open a browser to: `http://localhost:8080/fruit-data/pear`. You should get some details about pears.
The application can also run as a native executable.
But, first, we need to instruct Quarkus to enable _ssl_.

View File

@@ -48,6 +48,7 @@
<module>resteasy-jsonb</module>
<module>resteasy-jackson</module>
<module>resteasy-jaxb</module>
<module>resteasy-mutiny</module>
<module>resteasy-qute</module>
<module>rest-client</module>
<module>smallrye-openapi-common</module>

View File

@@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>quarkus-resteasy-mutiny-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<artifactId>quarkus-resteasy-mutiny-deployment</artifactId>
<name>Quarkus - RESTEasy - Mutiny - Deployment</name>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-mutiny</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,36 @@
package io.quarkus.resteasy.mutiny.deployment;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.resteasy.common.spi.ResteasyJaxrsProviderBuildItem;
import io.quarkus.resteasy.mutiny.runtime.MultiInvokerProvider;
import io.quarkus.resteasy.mutiny.runtime.MultiProvider;
import io.quarkus.resteasy.mutiny.runtime.MultiRxInvoker;
import io.quarkus.resteasy.mutiny.runtime.MultiRxInvokerImpl;
import io.quarkus.resteasy.mutiny.runtime.UniInvokerProvider;
import io.quarkus.resteasy.mutiny.runtime.UniProvider;
import io.quarkus.resteasy.mutiny.runtime.UniRxInvoker;
import io.quarkus.resteasy.mutiny.runtime.UniRxInvokerImpl;
public class ResteasyMutinyProcessor {
@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(FeatureBuildItem.RESTEASY_MUTINY);
}
@BuildStep
public void registerProviders(BuildProducer<ResteasyJaxrsProviderBuildItem> jaxrsProvider) {
jaxrsProvider.produce(new ResteasyJaxrsProviderBuildItem(MultiInvokerProvider.class.getName()));
jaxrsProvider.produce(new ResteasyJaxrsProviderBuildItem(MultiProvider.class.getName()));
jaxrsProvider.produce(new ResteasyJaxrsProviderBuildItem(MultiRxInvoker.class.getName()));
jaxrsProvider.produce(new ResteasyJaxrsProviderBuildItem(MultiRxInvokerImpl.class.getName()));
jaxrsProvider.produce(new ResteasyJaxrsProviderBuildItem(UniInvokerProvider.class.getName()));
jaxrsProvider.produce(new ResteasyJaxrsProviderBuildItem(UniProvider.class.getName()));
jaxrsProvider.produce(new ResteasyJaxrsProviderBuildItem(UniRxInvoker.class.getName()));
jaxrsProvider.produce(new ResteasyJaxrsProviderBuildItem(UniRxInvokerImpl.class.getName()));
}
}

View File

@@ -0,0 +1,40 @@
package io.quarkus.resteasy.mutiny.test;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import javax.ws.rs.ext.Provider;
import org.jboss.resteasy.spi.ContextInjector;
import io.quarkus.resteasy.mutiny.test.annotations.Async;
import io.smallrye.mutiny.Uni;
@Provider
public class MutinyInjector implements ContextInjector<Uni<Integer>, Integer> {
@Override
public Uni<Integer> resolve(Class<? extends Uni<Integer>> rawType, Type genericType,
Annotation[] annotations) {
boolean async = false;
for (Annotation annotation : annotations) {
if (annotation.annotationType() == Async.class) {
async = true;
}
}
if (!async) {
return Uni.createFrom().item(42);
}
return Uni.createFrom().emitter(emitter -> new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
emitter.fail(e);
return;
}
emitter.complete(42);
}).start());
}
}

View File

@@ -0,0 +1,42 @@
package io.quarkus.resteasy.mutiny.test;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import org.jboss.resteasy.annotations.Stream;
import io.quarkus.resteasy.mutiny.test.annotations.Async;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@Path("/")
public class MutinyResource {
@Path("uni")
@GET
public Uni<String> uni() {
return Uni.createFrom().item("hello");
}
@Produces(MediaType.APPLICATION_JSON)
@Path("multi")
@GET
@Stream
public Multi<String> multi() {
return Multi.createFrom().items("hello", "world");
}
@Path("injection")
@GET
public Uni<Integer> injection(@Context Integer value) {
return Uni.createFrom().item(value);
}
@Path("injection-async")
@GET
public Uni<Integer> injectionAsync(@Async @Context Integer value) {
return Uni.createFrom().item(value);
}
}

View File

@@ -0,0 +1,63 @@
package io.quarkus.resteasy.mutiny.test;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.client.ClientBuilder;
import org.jboss.resteasy.client.jaxrs.ResteasyClient;
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import io.quarkus.resteasy.mutiny.test.annotations.Async;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
public class RestEasyMutinyTest {
private static AtomicReference<Object> value = new AtomicReference<>();
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(MutinyResource.class, MutinyInjector.class, Async.class));
@TestHTTPResource
URL url;
private ResteasyClient client;
@BeforeEach
public void before() {
client = ((ResteasyClientBuilder) ClientBuilder.newBuilder())
.readTimeout(5, TimeUnit.SECONDS)
.connectionCheckoutTimeout(5, TimeUnit.SECONDS)
.connectTimeout(5, TimeUnit.SECONDS)
.build();
value.set(null);
CountDownLatch latch = new CountDownLatch(1);
}
@AfterEach
public void after() {
client.close();
}
@Test
public void testInjection() {
Integer data = client.target(url.toExternalForm() + "/injection").request().get(Integer.class);
Assertions.assertEquals((Integer) 42, data);
data = client.target(url.toExternalForm() + "/injection-async").request().get(Integer.class);
Assertions.assertEquals((Integer) 42, data);
}
}

View File

@@ -0,0 +1,14 @@
package io.quarkus.resteasy.mutiny.test.annotations;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ ElementType.PARAMETER, ElementType.METHOD, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
// Marker annotation
}

View File

@@ -0,0 +1,23 @@
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>quarkus-build-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
<relativePath>../../build-parent/pom.xml</relativePath>
</parent>
<artifactId>quarkus-resteasy-mutiny-parent</artifactId>
<name>Quarkus - RESTEasy - Mutiny</name>
<packaging>pom</packaging>
<modules>
<module>deployment</module>
<module>runtime</module>
</modules>
</project>

View File

@@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>quarkus-resteasy-mutiny-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<artifactId>quarkus-resteasy-mutiny</artifactId>
<name>Quarkus - RESTEasy - Mutiny - Runtime</name>
<description>Mutiny integration for RESTEasy</description>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-client</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-bootstrap-maven-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,30 @@
package io.quarkus.resteasy.mutiny.runtime;
import java.util.concurrent.ExecutorService;
import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.client.WebTarget;
public class MultiInvokerProvider implements RxInvokerProvider<MultiRxInvoker> {
WebTarget target;
@Override
public boolean isProviderFor(Class<?> clazz) {
return MultiRxInvoker.class.equals(clazz);
}
@Override
public MultiRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
return new MultiRxInvokerImpl(syncInvoker, executorService);
}
public WebTarget getTarget() {
return target;
}
public void setTarget(WebTarget target) {
this.target = target;
}
}

View File

@@ -0,0 +1,13 @@
package io.quarkus.resteasy.mutiny.runtime;
import org.jboss.resteasy.spi.AsyncStreamProvider;
import org.reactivestreams.Publisher;
import io.smallrye.mutiny.Multi;
public class MultiProvider implements AsyncStreamProvider<Multi<?>> {
@Override
public Publisher<?> toAsyncStream(Multi<?> multi) {
return multi;
}
}

View File

@@ -0,0 +1,14 @@
package io.quarkus.resteasy.mutiny.runtime;
import javax.ws.rs.client.RxInvoker;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
public interface MultiRxInvoker extends RxInvoker<Multi<?>> {
BackPressureStrategy getBackPressureStrategy();
void setBackPressureStrategy(BackPressureStrategy strategy);
}

View File

@@ -0,0 +1,236 @@
package io.quarkus.resteasy.mutiny.runtime;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.SseEventSource;
import org.jboss.resteasy.client.jaxrs.internal.ClientInvocationBuilder;
import org.jboss.resteasy.plugins.providers.sse.InboundSseEventImpl;
import org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
public class MultiRxInvokerImpl implements MultiRxInvoker {
private static Object monitor = new Object();
private ClientInvocationBuilder syncInvoker;
private ScheduledExecutorService executorService;
private BackPressureStrategy backpressureStrategy = BackPressureStrategy.BUFFER;
public MultiRxInvokerImpl(final SyncInvoker syncInvoker, final ExecutorService executorService) {
if (!(syncInvoker instanceof ClientInvocationBuilder)) {
throw new ProcessingException("Expected a ClientInvocationBuilder");
}
this.syncInvoker = (ClientInvocationBuilder) syncInvoker;
if (executorService instanceof ScheduledExecutorService) {
this.executorService = (ScheduledExecutorService) executorService;
}
}
@Override
public BackPressureStrategy getBackPressureStrategy() {
return backpressureStrategy;
}
@Override
public void setBackPressureStrategy(BackPressureStrategy strategy) {
this.backpressureStrategy = strategy;
}
@Override
public Multi<?> get() {
return eventSourceToMulti(getEventSource(), String.class, "GET", null, getAccept());
}
@Override
public <R> Multi<?> get(Class<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "GET", null, getAccept());
}
@Override
public <R> Multi<?> get(GenericType<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "GET", null, getAccept());
}
@Override
public Multi<?> put(Entity<?> entity) {
return eventSourceToMulti(getEventSource(), String.class, "PUT", entity, getAccept());
}
@Override
public <R> Multi<?> put(Entity<?> entity, Class<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "PUT", entity, getAccept());
}
@Override
public <R> Multi<?> put(Entity<?> entity, GenericType<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "PUT", entity, getAccept());
}
@Override
public Multi<?> post(Entity<?> entity) {
return eventSourceToMulti(getEventSource(), String.class, "POST", entity, getAccept());
}
@Override
public <R> Multi<?> post(Entity<?> entity, Class<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "POST", entity, getAccept());
}
@Override
public <R> Multi<?> post(Entity<?> entity, GenericType<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "POST", entity, getAccept());
}
@Override
public Multi<?> delete() {
return eventSourceToMulti(getEventSource(), String.class, "DELETE", null, getAccept());
}
@Override
public <R> Multi<?> delete(Class<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "DELETE", null, getAccept());
}
@Override
public <R> Multi<?> delete(GenericType<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "DELETE", null, getAccept());
}
@Override
public Multi<?> head() {
return eventSourceToMulti(getEventSource(), String.class, "HEAD", null, getAccept());
}
@Override
public Multi<?> options() {
return eventSourceToMulti(getEventSource(), String.class, "OPTIONS", null, getAccept());
}
@Override
public <R> Multi<?> options(Class<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "OPTIONS", null, getAccept());
}
@Override
public <R> Multi<?> options(GenericType<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "OPTIONS", null, getAccept());
}
@Override
public Multi<?> trace() {
return eventSourceToMulti(getEventSource(), String.class, "TRACE", null, getAccept());
}
@Override
public <R> Multi<?> trace(Class<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "TRACE", null, getAccept());
}
@Override
public <R> Multi<?> trace(GenericType<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, "TRACE", null, getAccept());
}
@Override
public Multi<?> method(String name) {
return eventSourceToMulti(getEventSource(), String.class, name, null, getAccept());
}
@Override
public <R> Multi<?> method(String name, Class<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, name, null, getAccept());
}
@Override
public <R> Multi<?> method(String name, GenericType<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, name, null, getAccept());
}
@Override
public Multi<?> method(String name, Entity<?> entity) {
return eventSourceToMulti(getEventSource(), String.class, name, entity, getAccept());
}
@Override
public <R> Multi<?> method(String name, Entity<?> entity, Class<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, name, entity, getAccept());
}
@Override
public <R> Multi<?> method(String name, Entity<?> entity, GenericType<R> responseType) {
return eventSourceToMulti(getEventSource(), responseType, name, entity, getAccept());
}
private <T> Multi<T> eventSourceToMulti(SseEventSourceImpl sseEventSource, Class<T> clazz, String verb,
Entity<?> entity, MediaType[] mediaTypes) {
return eventSourceToMulti(
sseEventSource,
(InboundSseEventImpl e) -> e.readData(clazz, e.getMediaType()),
verb,
entity,
mediaTypes);
}
private <T> Multi<T> eventSourceToMulti(SseEventSourceImpl sseEventSource, GenericType<T> type, String verb,
Entity<?> entity, MediaType[] mediaTypes) {
return eventSourceToMulti(
sseEventSource,
(InboundSseEventImpl e) -> e.readData(type, e.getMediaType()),
verb,
entity,
mediaTypes);
}
private <T> Multi<T> eventSourceToMulti(
final SseEventSourceImpl sseEventSource,
final Function<InboundSseEventImpl, T> tSupplier,
final String verb,
final Entity<?> entity,
final MediaType[] mediaTypes) {
final Multi<T> multi = Multi.createFrom().emitter(emitter -> {
sseEventSource.register(
(InboundSseEvent e) -> emitter.emit(tSupplier.apply((InboundSseEventImpl) e)),
(Throwable t) -> emitter.fail(t),
() -> emitter.complete());
synchronized (monitor) {
if (!sseEventSource.isOpen()) {
sseEventSource.open(null, verb, entity, mediaTypes);
}
}
},
backpressureStrategy);
return multi;
}
private SseEventSourceImpl getEventSource() {
SseEventSourceImpl.SourceBuilder builder = (SseEventSourceImpl.SourceBuilder) SseEventSource
.target(syncInvoker.getTarget());
if (executorService != null) {
builder.executor(executorService);
}
SseEventSourceImpl sseEventSource = (SseEventSourceImpl) builder.build();
sseEventSource.setAlwaysReconnect(false);
return sseEventSource;
}
private MediaType[] getAccept() {
if (syncInvoker != null) {
ClientInvocationBuilder builder = syncInvoker;
List<MediaType> accept = builder.getHeaders().getAcceptableMediaTypes();
return accept.toArray(new MediaType[0]);
} else {
return null;
}
}
}

View File

@@ -0,0 +1,29 @@
package io.quarkus.resteasy.mutiny.runtime;
import java.util.concurrent.ExecutorService;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.CompletionStageRxInvoker;
import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.client.SyncInvoker;
import org.jboss.resteasy.client.jaxrs.internal.ClientInvocationBuilder;
public class UniInvokerProvider implements RxInvokerProvider<UniRxInvoker> {
@Override
public boolean isProviderFor(Class<?> clazz) {
return UniRxInvoker.class.equals(clazz);
}
@Override
public UniRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
if (syncInvoker instanceof ClientInvocationBuilder) {
ClientInvocationBuilder builder = (ClientInvocationBuilder) syncInvoker;
CompletionStageRxInvoker completionStageRxInvoker = builder.rx();
return new UniRxInvokerImpl(completionStageRxInvoker);
} else {
throw new ProcessingException("Expected a ClientInvocationBuilder");
}
}
}

View File

@@ -0,0 +1,22 @@
package io.quarkus.resteasy.mutiny.runtime;
import java.util.concurrent.CompletionStage;
import org.jboss.resteasy.spi.AsyncClientResponseProvider;
import org.jboss.resteasy.spi.AsyncResponseProvider;
import io.smallrye.mutiny.Uni;
public class UniProvider implements AsyncResponseProvider<Uni<?>>, AsyncClientResponseProvider<Uni<?>> {
@Override
public CompletionStage<?> toCompletionStage(Uni<?> uni) {
return uni.subscribeAsCompletionStage();
}
@Override
public Uni<?> fromCompletionStage(CompletionStage<?> completionStage) {
return Uni.createFrom().completionStage(completionStage);
}
}

View File

@@ -0,0 +1,87 @@
package io.quarkus.resteasy.mutiny.runtime;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.RxInvoker;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import io.smallrye.mutiny.Uni;
public interface UniRxInvoker extends RxInvoker<Uni<?>> {
@Override
Uni<Response> get();
@Override
<T> Uni<T> get(Class<T> responseType);
@Override
<T> Uni<T> get(GenericType<T> responseType);
@Override
Uni<Response> put(Entity<?> entity);
@Override
<T> Uni<T> put(Entity<?> entity, Class<T> clazz);
@Override
<T> Uni<T> put(Entity<?> entity, GenericType<T> type);
@Override
Uni<Response> post(Entity<?> entity);
@Override
<T> Uni<T> post(Entity<?> entity, Class<T> clazz);
@Override
<T> Uni<T> post(Entity<?> entity, GenericType<T> type);
@Override
Uni<Response> delete();
@Override
<T> Uni<T> delete(Class<T> responseType);
@Override
<T> Uni<T> delete(GenericType<T> responseType);
@Override
Uni<Response> head();
@Override
Uni<Response> options();
@Override
<T> Uni<T> options(Class<T> responseType);
@Override
<T> Uni<T> options(GenericType<T> responseType);
@Override
Uni<Response> trace();
@Override
<T> Uni<T> trace(Class<T> responseType);
@Override
<T> Uni<T> trace(GenericType<T> responseType);
@Override
Uni<Response> method(String name);
@Override
<T> Uni<T> method(String name, Class<T> responseType);
@Override
<T> Uni<T> method(String name, GenericType<T> responseType);
@Override
Uni<Response> method(String name, Entity<?> entity);
@Override
<T> Uni<T> method(String name, Entity<?> entity, Class<T> responseType);
@Override
<T> Uni<T> method(String name, Entity<?> entity, GenericType<T> responseType);
}

View File

@@ -0,0 +1,143 @@
package io.quarkus.resteasy.mutiny.runtime;
import javax.ws.rs.client.CompletionStageRxInvoker;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import io.smallrye.mutiny.Uni;
public class UniRxInvokerImpl implements UniRxInvoker {
private final CompletionStageRxInvoker completionStageRxInvoker;
private final UniProvider UniProvider;
public UniRxInvokerImpl(final CompletionStageRxInvoker completionStageRxInvoker) {
this.completionStageRxInvoker = completionStageRxInvoker;
this.UniProvider = new UniProvider();
}
@Override
public Uni<Response> get() {
return (Uni<Response>) UniProvider.fromCompletionStage(completionStageRxInvoker.get());
}
@Override
public <T> Uni<T> get(Class<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.get(responseType));
}
@Override
public <T> Uni<T> get(GenericType<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.get(responseType));
}
@Override
public Uni<Response> put(Entity<?> entity) {
return (Uni<Response>) UniProvider.fromCompletionStage(completionStageRxInvoker.put(entity));
}
@Override
public <T> Uni<T> put(Entity<?> entity, Class<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.put(entity, responseType));
}
@Override
public <T> Uni<T> put(Entity<?> entity, GenericType<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.put(entity, responseType));
}
@Override
public Uni<Response> post(Entity<?> entity) {
return (Uni<Response>) UniProvider.fromCompletionStage(completionStageRxInvoker.post(entity));
}
@Override
public <T> Uni<T> post(Entity<?> entity, Class<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.post(entity, responseType));
}
@Override
public <T> Uni<T> post(Entity<?> entity, GenericType<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.post(entity, responseType));
}
@Override
public Uni<Response> delete() {
return (Uni<Response>) UniProvider.fromCompletionStage(completionStageRxInvoker.delete());
}
@Override
public <T> Uni<T> delete(Class<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.delete(responseType));
}
@Override
public <T> Uni<T> delete(GenericType<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.delete(responseType));
}
@Override
public Uni<Response> head() {
return (Uni<Response>) UniProvider.fromCompletionStage(completionStageRxInvoker.head());
}
@Override
public Uni<Response> options() {
return (Uni<Response>) UniProvider.fromCompletionStage(completionStageRxInvoker.options());
}
@Override
public <T> Uni<T> options(Class<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.options(responseType));
}
@Override
public <T> Uni<T> options(GenericType<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.options(responseType));
}
@Override
public Uni<Response> trace() {
return (Uni<Response>) UniProvider.fromCompletionStage(completionStageRxInvoker.trace());
}
@Override
public <T> Uni<T> trace(Class<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.trace(responseType));
}
@Override
public <T> Uni<T> trace(GenericType<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.trace(responseType));
}
@Override
public Uni<Response> method(String name) {
return (Uni<Response>) UniProvider.fromCompletionStage(completionStageRxInvoker.method(name));
}
@Override
public <T> Uni<T> method(String name, Class<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.method(name, responseType));
}
@Override
public <T> Uni<T> method(String name, GenericType<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.method(name, responseType));
}
@Override
public Uni<Response> method(String name, Entity<?> entity) {
return (Uni<Response>) UniProvider.fromCompletionStage(completionStageRxInvoker.method(name, entity));
}
@Override
public <T> Uni<T> method(String name, Entity<?> entity, Class<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.method(name, entity, responseType));
}
@Override
public <T> Uni<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
return (Uni<T>) UniProvider.fromCompletionStage(completionStageRxInvoker.method(name, entity, responseType));
}
}

View File

@@ -0,0 +1,11 @@
---
name: "RESTEasy Mutiny"
metadata:
keywords:
- "resteasy-mutiny"
- "resteasy"
- "mutiny"
categories:
- "web"
- "reactive"
status: "experimental"

View File

@@ -0,0 +1,26 @@
package io.quarkus.resteasy.mutiny.test;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import io.quarkus.resteasy.mutiny.runtime.MultiProvider;
import io.smallrye.mutiny.Multi;
public class MultiProviderTest {
private final MultiProvider provider = new MultiProvider();
@Test
public void test() {
Multi<?> multi = Multi.createFrom().items(1, 2, 3);
Publisher<?> publisher = provider.toAsyncStream(multi);
List<?> list = Multi.createFrom().publisher(publisher).collectItems().asList().await().indefinitely();
Assertions.assertEquals(1, list.get(0));
Assertions.assertEquals(2, list.get(1));
Assertions.assertEquals(3, list.get(2));
}
}

View File

@@ -0,0 +1,38 @@
package io.quarkus.resteasy.mutiny.test;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import io.quarkus.resteasy.mutiny.runtime.UniProvider;
import io.smallrye.mutiny.Uni;
public class UniProviderTest {
private final UniProvider provider = new UniProvider();
@Test
public void testFromCompletionStage() {
final CompletableFuture<Integer> cs = new CompletableFuture<>();
cs.complete(1);
final Uni<?> uni = provider.fromCompletionStage(cs);
Assertions.assertEquals(1, uni.await().indefinitely());
}
@Test
public void testToCompletionStageCase() {
final Object actual = provider.toCompletionStage(Uni.createFrom().item(1)).toCompletableFuture().join();
Assertions.assertEquals(1, actual);
}
@Test
public void testToCompletionStageNullCase() {
final CompletableFuture<Integer> cs = new CompletableFuture<>();
cs.complete(null);
final Uni<?> uni = Uni.createFrom().completionStage(cs);
final Object actual = provider.toCompletionStage(uni).toCompletableFuture().join();
Assertions.assertNull(actual);
}
}

View File

@@ -76,6 +76,7 @@
<module>jackson</module>
<module>jsonb</module>
<module>resteasy-jackson</module>
<module>resteasy-mutiny</module>
<module>jgit</module>
<module>jsch</module>
<module>virtual-http</module>

View File

@@ -0,0 +1,127 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>quarkus-integration-tests-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<artifactId>quarkus-integration-test-resteasy-mutiny</artifactId>
<name>Quarkus - Integration Tests - RESTEasy Mutiny</name>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-mutiny</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jsonb</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-client</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${project.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>native-image</id>
<activation>
<property>
<name>native</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<systemProperties>
<native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
</systemProperties>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${project.version}</version>
<executions>
<execution>
<id>native-image</id>
<goals>
<goal>native-image</goal>
</goals>
<configuration>
<cleanupServer>true</cleanupServer>
<enableHttpUrlHandler>true</enableHttpUrlHandler>
<graalvmHome>${graalvmHome}</graalvmHome>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@@ -0,0 +1,89 @@
package io.quarkus.it.resteasy.mutiny;
import java.io.IOException;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.jboss.resteasy.annotations.SseElementType;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@Path("/mutiny")
public class MutinyResource {
@Inject
SomeService service;
@GET
@Path("/hello")
public Uni<String> hello() {
return service.greeting();
}
@GET
@Path("/fail")
public Uni<String> fail() {
return Uni.createFrom().failure(new IOException("boom"));
}
@GET
@Path("/response")
public Uni<Response> response() {
return service.greeting()
.onItem().apply(v -> Response.accepted().type("text/plain").entity(v).build());
}
@GET
@Path("/hello/stream")
@Produces(MediaType.APPLICATION_JSON)
public Multi<String> helloAsMulti() {
return service.greetingAsMulti();
}
@GET
@Path("/pet")
@Produces(MediaType.APPLICATION_JSON)
public Uni<Pet> pet() {
return service.getPet();
}
@GET
@Path("/pet/stream")
@Produces(MediaType.APPLICATION_JSON)
public Multi<Pet> pets() {
return service.getPets();
}
@GET
@Path("/pets")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<Pet> sse() {
return service.getMorePets();
}
@Inject
@RestClient
MyRestService client;
@GET
@Path("/client")
public Uni<String> callHello() {
return client.hello();
}
@GET
@Path("/client/pet")
@Produces(MediaType.APPLICATION_JSON)
public Uni<Pet> callPet() {
return client.pet();
}
}

View File

@@ -0,0 +1,25 @@
package io.quarkus.it.resteasy.mutiny;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import io.smallrye.mutiny.Uni;
@RegisterRestClient(configKey = "my-service")
@Path("/mutiny")
public interface MyRestService {
@GET
@Path("/hello")
Uni<String> hello();
@GET
@Path("/pet")
@Consumes(MediaType.APPLICATION_JSON)
Uni<Pet> pet();
}

View File

@@ -0,0 +1,25 @@
package io.quarkus.it.resteasy.mutiny;
public class Pet {
private String name;
private String kind;
public String getName() {
return name;
}
public Pet setName(String name) {
this.name = name;
return this;
}
public String getKind() {
return kind;
}
public Pet setKind(String kind) {
this.kind = kind;
return this;
}
}

View File

@@ -0,0 +1,62 @@
package io.quarkus.it.resteasy.mutiny;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class SomeService {
private ExecutorService executor;
@PostConstruct
public void init() {
executor = Executors.newFixedThreadPool(2);
}
@PreDestroy
public void cleanup() {
executor.shutdownNow();
}
Uni<String> greeting() {
return Uni.createFrom().item("hello")
.emitOn(executor);
}
Multi<String> greetingAsMulti() {
return Multi.createFrom().items("h", "e", "l", "l", "o")
.groupItems().intoMultis().of(2)
.onItem().produceUni(g -> g.collectItems().in(StringBuffer::new, StringBuffer::append)).concatenate()
.emitOn(executor)
.onItem().apply(StringBuffer::toString);
}
Uni<Pet> getPet() {
return Uni.createFrom().item(new Pet().setName("neo").setKind("rabbit"))
.emitOn(executor);
}
public Multi<Pet> getPets() {
return Multi.createFrom().items(
new Pet().setName("neo").setKind("rabbit"),
new Pet().setName("indy").setKind("dog"))
.emitOn(executor);
}
public Multi<Pet> getMorePets() {
return Multi.createFrom().items(
new Pet().setName("neo").setKind("rabbit"),
new Pet().setName("indy").setKind("dog"),
new Pet().setName("plume").setKind("dog"),
new Pet().setName("titi").setKind("bird"),
new Pet().setName("rex").setKind("mouse"))
.emitOn(executor);
}
}

View File

@@ -0,0 +1 @@
my-service/mp-rest/url=${test.url}

View File

@@ -0,0 +1,122 @@
package io.quarkus.it.resteasy.mutiny;
import static io.restassured.RestAssured.get;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.SseEventSource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
@QuarkusTest
public class MutinyTest {
@Test
public void testHello() {
get("/mutiny/hello")
.then()
.body(is("hello"))
.statusCode(200);
}
@Test
public void testFail() {
get("/mutiny/fail")
.then()
.statusCode(500);
}
@Test
public void testResponse() {
get("/mutiny/response")
.then()
.body(is("hello"))
.statusCode(202);
}
@Test
public void testHelloAsMulti() {
get("/mutiny/hello/stream")
.then()
.contentType("application/json")
.body("[0]", is("he"))
.body("[1]", is("ll"))
.body("[2]", is("o"))
.statusCode(200);
}
@Test
public void testSerialization() {
get("/mutiny/pet")
.then()
.contentType("application/json")
.body("name", is("neo"))
.body("kind", is("rabbit"))
.statusCode(200);
}
@Test
public void testMultiWithSerialization() {
get("/mutiny/pet/stream")
.then()
.contentType("application/json")
.body("[0].name", is("neo"))
.body("[0].kind", is("rabbit"))
.body("[1].name", is("indy"))
.body("[1].kind", is("dog"))
.statusCode(200);
}
@Test
public void testSSE() {
Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:" + RestAssured.port + "/mutiny/pets");
SseEventSource source = SseEventSource.target(target).build();
List<Pet> pets = new CopyOnWriteArrayList<>();
try (SseEventSource eventSource = source) {
eventSource.register(event -> {
Pet pet = event.readData(Pet.class, MediaType.APPLICATION_JSON_TYPE);
pets.add(pet);
}, ex -> {
throw new IllegalStateException("SSE failure", ex);
});
eventSource.open();
await().until(() -> pets.size() == 5);
}
Assertions.assertEquals("neo", pets.get(0).getName());
Assertions.assertEquals("indy", pets.get(1).getName());
Assertions.assertEquals("plume", pets.get(2).getName());
Assertions.assertEquals("titi", pets.get(3).getName());
Assertions.assertEquals("rex", pets.get(4).getName());
}
@Test
public void testClientReturningUni() {
get("/mutiny/client")
.then()
.body(is("hello"))
.statusCode(200);
}
@Test
public void testClientReturningUniOfPet() {
get("/mutiny/client/pet")
.then()
.contentType("application/json")
.body("name", is("neo"))
.body("kind", is("rabbit"))
.statusCode(200);
}
}