From f5a5e68787f9ced3205ae38da2480444b0914836 Mon Sep 17 00:00:00 2001 From: "Francisco M. Aramburo Torres" Date: Fri, 28 Feb 2020 16:33:47 +0100 Subject: [PATCH] Finished the transition to http4s --- .../src/test/resources/logback-test.xml | 17 +++++ .../ing/baker/baas/scaladsl/BakerClient.scala | 27 ++++--- .../baas/javadsl/RemoteEventListener.scala | 6 +- .../src/test/resources/logback-test.xml | 17 +++++ .../src/test/resources/logback-test.xml | 17 +++++ .../interaction/RemoteInteractionSpec.scala | 2 +- .../scala/com/ing/baker/baas/state/Main.scala | 3 +- .../baker/baas/state/ServiceDiscovery.scala | 53 ++++++++------ .../src/test/resources/logback-test.xml | 2 +- .../baker/baas/mocks/RemoteInteraction.scala | 2 +- .../ing/baker/baas/state/StateNodeSpec.scala | 50 ++++++------- .../baas/protocol/MarshallingUtils.scala | 70 ------------------- .../baker/baas/interaction/BakeryHttp.scala | 15 +++- .../interaction/RemoteInteractionClient.scala | 9 ++- build.sbt | 3 - project/Dependencies.scala | 2 - .../internal/InteractionManagerSpec.scala | 20 +++--- 17 files changed, 155 insertions(+), 160 deletions(-) create mode 100644 baas-node-baker-event-listener/src/test/resources/logback-test.xml create mode 100644 baas-node-event-listener/src/test/resources/logback-test.xml create mode 100644 baas-node-interaction/src/test/resources/logback-test.xml delete mode 100644 baas-protocol-baker/src/main/scala/com/ing/baker/baas/protocol/MarshallingUtils.scala diff --git a/baas-node-baker-event-listener/src/test/resources/logback-test.xml b/baas-node-baker-event-listener/src/test/resources/logback-test.xml new file mode 100644 index 00000000..954e441d --- /dev/null +++ b/baas-node-baker-event-listener/src/test/resources/logback-test.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n + + + + + diff --git a/baas-node-client/src/main/scala/com/ing/baker/baas/scaladsl/BakerClient.scala b/baas-node-client/src/main/scala/com/ing/baker/baas/scaladsl/BakerClient.scala index 071df4a4..d5eb53c7 100644 --- a/baas-node-client/src/main/scala/com/ing/baker/baas/scaladsl/BakerClient.scala +++ b/baas-node-client/src/main/scala/com/ing/baker/baas/scaladsl/BakerClient.scala @@ -1,5 +1,6 @@ package com.ing.baker.baas.scaladsl +import cats.data.EitherT import cats.effect.{ContextShift, IO, Resource, Timer} import com.ing.baker.baas.protocol.BaaSProto._ import com.ing.baker.baas.protocol.BaaSProtocol @@ -9,13 +10,15 @@ import com.ing.baker.runtime.common.SensoryEventStatus import com.ing.baker.runtime.scaladsl.{BakerEvent, EventInstance, EventMoment, EventResolutions, InteractionInstance, RecipeEventMetadata, RecipeInformation, RecipeInstanceMetadata, RecipeInstanceState, SensoryEventResult, Baker => ScalaBaker} import com.ing.baker.runtime.serialization.ProtoMap import com.ing.baker.types.Value +import org.http4s.EntityDecoder.collectBinary import org.http4s.Method._ import org.http4s.client.Client import org.http4s.client.blaze.BlazeClientBuilder import org.http4s.client.dsl.io._ -import org.http4s.{EntityDecoder, MediaType, Request, Uri} +import org.http4s._ import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} object BakerClient { @@ -76,7 +79,7 @@ final class BakerClient(client: Client[IO], hostname: Uri)(implicit ec: Executio * @return All recipes in the form of map of recipeId -> CompiledRecipe */ override def getAllRecipes: Future[Map[String, RecipeInformation]] = { - val request = POST( + val request = GET( Root / "getAllRecipes" ) handleBakerResponse[BaaSProtocol.GetAllRecipesResponse, Map[String, RecipeInformation]](request)(_.map) @@ -336,16 +339,24 @@ final class BakerClient(client: Client[IO], hostname: Uri)(implicit ec: Executio private type WithBakerException[A] = Either[BaaSProtocol.BaaSRemoteFailure, A] private implicit def withBakerExeptionEntityDecoder[A, P <: ProtoMessage[P]](implicit protoMap: ProtoMap[A, P]): EntityDecoder[IO, WithBakerException[A]] = - protoDecoder(baaSRemoteFailureProto).map(Left(_)).orElse(protoDecoder[A, P].map(Right(_))) - - private def liftRemoteFailure[A](either: Either[BaaSProtocol.BaaSRemoteFailure, A]): IO[A] = - either.fold(failure => IO.raiseError(failure.error), IO.pure) + EntityDecoder.decodeBy(MediaType.application.`octet-stream`)(collectBinary[IO]).map(_.toArray) + .flatMapR { bytes => + val eitherTry: Try[WithBakerException[A]] = + baaSRemoteFailureProto.fromByteArray(bytes).map[WithBakerException[A]](Left(_)) + .orElse(protoMap.fromByteArray(bytes).map[WithBakerException[A]](Right(_))) + eitherTry match { + case Success(a) => + EitherT.fromEither[IO](Right(a)) + case Failure(exception) => + EitherT.fromEither[IO](Left(MalformedMessageBodyFailure(exception.getMessage, Some(exception)))) + } + } final class HandleBakerResponsePartial[A, R] { def apply[P <: ProtoMessage[P]](request: IO[Request[IO]])(f: A => R)(implicit protoMap: ProtoMap[A, P]): Future[R] = client .expect[WithBakerException[A]](request) - .flatMap(liftRemoteFailure _) + .flatMap(_.fold(failure => IO.raiseError(failure.error), IO.pure)) .map(f) .unsafeToFuture() } @@ -356,7 +367,7 @@ final class BakerClient(client: Client[IO], hostname: Uri)(implicit ec: Executio request.flatMap(client.run(_).use(response => response.contentType match { case Some(contentType) if contentType.mediaType == MediaType.application.`octet-stream` => EntityDecoder[IO, BaaSProtocol.BaaSRemoteFailure] - .decode(response, true) + .decode(response, strict = true) .value .flatMap(_.fold(IO.raiseError(_), e => IO.raiseError(e.error))) case _ => IO.unit diff --git a/baas-node-event-listener/src/main/scala/com/ing/baker/baas/javadsl/RemoteEventListener.scala b/baas-node-event-listener/src/main/scala/com/ing/baker/baas/javadsl/RemoteEventListener.scala index 160395e8..c7fe2a44 100644 --- a/baas-node-event-listener/src/main/scala/com/ing/baker/baas/javadsl/RemoteEventListener.scala +++ b/baas-node-event-listener/src/main/scala/com/ing/baker/baas/javadsl/RemoteEventListener.scala @@ -3,14 +3,10 @@ package com.ing.baker.baas.javadsl import java.util.concurrent.CompletableFuture import java.util.function.BiConsumer -import akka.actor.ActorSystem -import com.ing.baker.baas.common -import com.ing.baker.baas.scaladsl +import com.ing.baker.baas.{common, scaladsl} import com.ing.baker.runtime.common.LanguageDataStructures.JavaApi import com.ing.baker.runtime.javadsl.{EventInstance, RecipeEventMetadata} -import scala.compat.java8.FutureConverters - object RemoteEventListener extends common.RemoteEventListener[CompletableFuture] with JavaApi { override type EventInstanceType = EventInstance diff --git a/baas-node-event-listener/src/test/resources/logback-test.xml b/baas-node-event-listener/src/test/resources/logback-test.xml new file mode 100644 index 00000000..954e441d --- /dev/null +++ b/baas-node-event-listener/src/test/resources/logback-test.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n + + + + + diff --git a/baas-node-interaction/src/test/resources/logback-test.xml b/baas-node-interaction/src/test/resources/logback-test.xml new file mode 100644 index 00000000..954e441d --- /dev/null +++ b/baas-node-interaction/src/test/resources/logback-test.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n + + + + + diff --git a/baas-node-interaction/src/test/scala/com/ing/baker/baas/interaction/RemoteInteractionSpec.scala b/baas-node-interaction/src/test/scala/com/ing/baker/baas/interaction/RemoteInteractionSpec.scala index 390b16f3..94478735 100644 --- a/baas-node-interaction/src/test/scala/com/ing/baker/baas/interaction/RemoteInteractionSpec.scala +++ b/baas-node-interaction/src/test/scala/com/ing/baker/baas/interaction/RemoteInteractionSpec.scala @@ -38,7 +38,7 @@ class RemoteInteractionSpec extends BakeryFunSpec { .flatMap(server => RemoteInteractionClient.resource(server.baseUri, executionContext)) .use(runTest) }) - Resource.pure(context) + Resource.pure[IO, Context](context) } /** Refines the `ConfigMap` populated with the -Dkey=value arguments coming from the "sbt testOnly" command. diff --git a/baas-node-state/src/main/scala/com/ing/baker/baas/state/Main.scala b/baas-node-state/src/main/scala/com/ing/baker/baas/state/Main.scala index 24c82265..9d5efc1c 100644 --- a/baas-node-state/src/main/scala/com/ing/baker/baas/state/Main.scala +++ b/baas-node-state/src/main/scala/com/ing/baker/baas/state/Main.scala @@ -6,7 +6,7 @@ import java.util.concurrent.Executors import akka.actor.ActorSystem import akka.cluster.Cluster import akka.stream.{ActorMaterializer, Materializer} -import cats.effect.{ExitCode, IO, IOApp} +import cats.effect.{ExitCode, IO, IOApp, Resource} import cats.implicits._ import com.ing.baker.runtime.akka.{AkkaBaker, AkkaBakerConfig} import com.ing.baker.runtime.scaladsl.Baker @@ -43,6 +43,7 @@ object Main extends IOApp { timeouts = AkkaBakerConfig.Timeouts.from(config), bakerValidationSettings = AkkaBakerConfig.BakerValidationSettings.from(config) )(system)) + _ <- Resource.liftF(serviceDiscovery.plugBakerEventListeners(baker)) _ <- StateNodeService.resource(baker, hostname) } yield () diff --git a/baas-node-state/src/main/scala/com/ing/baker/baas/state/ServiceDiscovery.scala b/baas-node-state/src/main/scala/com/ing/baker/baas/state/ServiceDiscovery.scala index 9932c1e3..920e0bad 100644 --- a/baas-node-state/src/main/scala/com/ing/baker/baas/state/ServiceDiscovery.scala +++ b/baas-node-state/src/main/scala/com/ing/baker/baas/state/ServiceDiscovery.scala @@ -10,6 +10,7 @@ import com.ing.baker.baas.state.ServiceDiscovery.{BakerListener, RecipeListener, import com.ing.baker.il.petrinet.InteractionTransition import com.ing.baker.runtime.akka.internal.InteractionManager import com.ing.baker.runtime.scaladsl.{Baker, InteractionInstance} +import com.typesafe.scalalogging.LazyLogging import fs2.Stream import io.kubernetes.client.openapi.ApiClient import io.kubernetes.client.openapi.apis.CoreV1Api @@ -21,7 +22,7 @@ import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} -object ServiceDiscovery { +object ServiceDiscovery extends LazyLogging { private[state] type RecipeName = String @@ -33,7 +34,7 @@ object ServiceDiscovery { * ServiceDiscovery module to give corresponding InteractionInstances and clients to the event listeners. * When the resource is released the polling to the Kubernetes API stops. * - * Current hard coded polling periods: 5 seconds + * Current hard coded polling periods: 2 seconds * * @param connectionPool to be used for client connections * @param namespace Kubernetes namespace to be queried @@ -46,15 +47,19 @@ object ServiceDiscovery { def resource(connectionPool: ExecutionContext, namespace: String, client: ApiClient = ClientBuilder.cluster.build)(implicit contextShift: ContextShift[IO], timer: Timer[IO], blockingEC: ExecutionContext): Resource[IO, ServiceDiscovery] = { val api = new CoreV1Api(client) - val cacheIO = Ref.of[IO, List[V1Service]](List.empty) - def fetchServices(namespace: String, api: CoreV1Api)(implicit contextShift: ContextShift[IO], blockingEC: ExecutionContext): IO[List[V1Service]] = contextShift.evalOn(blockingEC)(IO { api.listNamespacedService(namespace, null, null, null, null, null, null, null, null, null) .getItems .asScala .toList - }) + }).attempt.flatMap { + case Right(services) => + IO.pure(services) + case Left(e) => + IO(logger.warn("Failed to communicate with the Kubernetes service: " + e.getMessage)) + IO.pure(List.empty) + } def getInteractionAddresses(currentServices: List[V1Service]): List[Uri] = currentServices @@ -83,21 +88,24 @@ object ServiceDiscovery { def buildInteractions(currentServices: List[V1Service]): IO[List[InteractionInstance]] = getInteractionAddresses(currentServices) .map(RemoteInteractionClient.resource(_, connectionPool)) - .parTraverse[IO, Option[InteractionInstance]](_.use { client => - for { - interface <- client.interface.attempt - interactionsOpt = interface match { - case Right((name, types)) => Some(InteractionInstance( - name = name, - input = types, - run = client.runInteraction(_).unsafeToFuture() - )) - case Left(_) => None - } - } yield interactionsOpt - }) + .parTraverse[IO, Option[InteractionInstance]](buildInteractionInstance) .map(_.flatten) + def buildInteractionInstance(resource: Resource[IO, RemoteInteractionClient]): IO[Option[InteractionInstance]] = + resource.use { client => + for { + interface <- client.interface.attempt + interactionsOpt = interface match { + case Right((name, types)) => Some(InteractionInstance( + name = name, + input = types, + run = input => resource.use(_.runInteraction(input)).unsafeToFuture() + )) + case Left(_) => None + } + } yield interactionsOpt + } + def buildRecipeListeners(currentServices: List[V1Service]): Map[RecipeName, List[RecipeListener]] = getEventListenersAddresses(currentServices) .map { case (recipe, address) => (recipe, RemoteEventListenerClient.resource(address, connectionPool)) } @@ -114,17 +122,16 @@ object ServiceDiscovery { cacheRecipeListeners <- Stream.eval(Ref.of[IO, Map[RecipeName, List[RecipeListener]]](Map.empty)) cacheBakerListeners <- Stream.eval(Ref.of[IO, List[BakerListener]](List.empty)) service = new ServiceDiscovery(cacheInteractions, cacheRecipeListeners, cacheBakerListeners) - updater = Stream - .fixedRate(5.seconds) - .evalMap(_ => fetchServices(namespace, api)) - .evalMap { currentServices => + updateServices = fetchServices(namespace, api) + .flatMap { currentServices => List( buildInteractions(currentServices).flatMap(cacheInteractions.set), cacheRecipeListeners.set(buildRecipeListeners(currentServices)), cacheBakerListeners.set(buildBakerListeners(currentServices)) ).parSequence } - _ <- Stream.emit(service).concurrently(updater) + updater = Stream.fixedRate(5.seconds).evalMap(_ => updateServices) + _ <- Stream.eval(updateServices).concurrently(updater) } yield service stream.compile.resource.lastOrError diff --git a/baas-node-state/src/test/resources/logback-test.xml b/baas-node-state/src/test/resources/logback-test.xml index d7926f0c..954e441d 100644 --- a/baas-node-state/src/test/resources/logback-test.xml +++ b/baas-node-state/src/test/resources/logback-test.xml @@ -9,7 +9,7 @@ - %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - aoushdoauhd;ouawhd %msg%n + %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n diff --git a/baas-node-state/src/test/scala/com/ing/baker/baas/mocks/RemoteInteraction.scala b/baas-node-state/src/test/scala/com/ing/baker/baas/mocks/RemoteInteraction.scala index 737b4344..77d37c78 100644 --- a/baas-node-state/src/test/scala/com/ing/baker/baas/mocks/RemoteInteraction.scala +++ b/baas-node-state/src/test/scala/com/ing/baker/baas/mocks/RemoteInteraction.scala @@ -60,7 +60,7 @@ class RemoteInteraction(mock: ClientAndServer, interaction: Interaction) { private def applyMatch = request() .withMethod("POST") - .withPath("/api/v3/apply") + .withPath("/api/v3/run-interaction") .withHeader("X-Bakery-Intent", s"Remote-Interaction:localhost") } diff --git a/baas-node-state/src/test/scala/com/ing/baker/baas/state/StateNodeSpec.scala b/baas-node-state/src/test/scala/com/ing/baker/baas/state/StateNodeSpec.scala index ee0fc964..217dda14 100644 --- a/baas-node-state/src/test/scala/com/ing/baker/baas/state/StateNodeSpec.scala +++ b/baas-node-state/src/test/scala/com/ing/baker/baas/state/StateNodeSpec.scala @@ -3,10 +3,9 @@ package com.ing.baker.baas.state import java.net.InetSocketAddress import java.util.UUID -import cats.implicits._ import akka.actor.ActorSystem -import akka.stream.{ActorMaterializer, Materializer} import cats.effect.{IO, Resource} +import cats.implicits._ import com.ing.baker.baas.mocks.{KubeApiServer, RemoteComponents, RemoteEventListener, RemoteInteraction} import com.ing.baker.baas.recipe.Events.{ItemsReserved, OrderPlaced} import com.ing.baker.baas.recipe.Ingredients.{Item, OrderId, ReservedItems} @@ -51,7 +50,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Recipe management") { context => for { - _ <- context.remoteComponents.registerToTheCluster recipeId <- io(context.client.addRecipe(recipe)) recipeInformation <- io(context.client.getRecipe(recipeId)) noSuchRecipeError <- io(context.client @@ -69,7 +67,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.bake") { context => val recipeInstanceId: String = UUID.randomUUID().toString for { - _ <- context.remoteComponents.registerToTheCluster _ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent) recipeId <- io(context.client.addRecipe(recipe)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) @@ -83,7 +80,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.bake (fail with ProcessAlreadyExistsException)") { context => val recipeInstanceId: String = UUID.randomUUID().toString for { - _ <- context.remoteComponents.registerToTheCluster _ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent) recipeId <- io(context.client.addRecipe(recipe)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) @@ -101,7 +97,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.bake (fail with NoSuchRecipeException)") { context => val recipeInstanceId: String = UUID.randomUUID().toString for { - _ <- context.remoteComponents.registerToTheCluster e <- io(context.client .bake("non-existent", recipeInstanceId) .map(_ => None) @@ -111,7 +106,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.getRecipeInstanceState (fails with NoSuchProcessException)") { context => for { - _ <- context.remoteComponents.registerToTheCluster e <- io(context.client .getRecipeInstanceState("non-existent") .map(_ => None) @@ -122,7 +116,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.fireEventAndResolveWhenReceived") { context => val recipeInstanceId: String = UUID.randomUUID().toString for { - _ <- context.remoteComponents.registerToTheCluster recipeId <- io(context.client.addRecipe(recipe)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) _ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent) @@ -133,13 +126,12 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.fireEventAndResolveWhenCompleted") { context => val recipeInstanceId: String = UUID.randomUUID().toString for { - _ <- context.remoteComponents.registerToTheCluster _ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent) recipeId <- io(context.client.addRecipe(recipe)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) result <- io(context.client.fireEventAndResolveWhenCompleted(recipeInstanceId, OrderPlacedEvent)) serverState <- io(context.client.getRecipeInstanceState(recipeInstanceId)) - _ <- eventually(context.remoteEventListener.verifyEventsReceived(2)) + _ <- eventually(eventually(context.remoteEventListener.verifyEventsReceived(2))) } yield { result.eventNames shouldBe Seq("OrderPlaced", "ItemsReserved") serverState.events.map(_.name) shouldBe Seq("OrderPlaced", "ItemsReserved") @@ -150,7 +142,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { val recipeInstanceId: String = UUID.randomUUID().toString val event = EventInstance("non-existent", Map.empty) for { - _ <- context.remoteComponents.registerToTheCluster recipeId <- io(context.client.addRecipe(recipe)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) result <- io(context.client @@ -168,7 +159,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.fireEventAndResolveOnEvent") { context => val recipeInstanceId: String = UUID.randomUUID().toString for { - _ <- context.remoteComponents.registerToTheCluster _ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent) recipeId <- io(context.client.addRecipe(recipe)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) @@ -176,7 +166,7 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { _ <- eventually { for { serverState <- io(context.client.getRecipeInstanceState(recipeInstanceId)) - _ <- context.remoteEventListener.verifyEventsReceived(2) + _ <- eventually(context.remoteEventListener.verifyEventsReceived(2)) } yield serverState.events.map(_.name) shouldBe Seq("OrderPlaced", "ItemsReserved") } } yield result.eventNames shouldBe Seq("OrderPlaced") @@ -185,7 +175,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.getAllRecipeInstancesMetadata") { context => val recipeInstanceId: String = UUID.randomUUID().toString for { - _ <- context.remoteComponents.registerToTheCluster _ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent) recipeId <- io(context.client.addRecipe(recipe)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) @@ -197,7 +186,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.getVisualState") { context => val recipeInstanceId: String = UUID.randomUUID().toString for { - _ <- context.remoteComponents.registerToTheCluster _ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent) recipeId <- io(context.client.addRecipe(recipe)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) @@ -208,7 +196,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.retryInteraction") { context => val recipeInstanceId: String = UUID.randomUUID().toString for { - _ <- context.remoteComponents.registerToTheCluster recipeId <- io(context.client.addRecipe(recipeWithBlockingStrategy)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) _ <- context.remoteInteraction.processesWithFailure(new RuntimeException("functional failure")) @@ -217,7 +204,7 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { _ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent) _ <- io(context.client.retryInteraction(recipeInstanceId, "ReserveItems")) state2 <- io(context.client.getRecipeInstanceState(recipeInstanceId).map(_.events.map(_.name))) - _ <- context.remoteEventListener.verifyEventsReceived(2) + _ <- eventually(context.remoteEventListener.verifyEventsReceived(2)) } yield { state1 should contain("OrderPlaced") state1 should not contain("ItemsReserved") @@ -232,7 +219,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { ItemsReserved(reservedItems = ReservedItems(items = List(Item("resolution-item")), data = Array.empty)) ) for { - _ <- context.remoteComponents.registerToTheCluster recipeId <- io(context.client.addRecipe(recipeWithBlockingStrategy)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) _ <- context.remoteInteraction.processesWithFailure(new RuntimeException("functional failure")) @@ -243,7 +229,7 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { state2 = state2data.events.map(_.name) eventState = state2data.ingredients.get("reservedItems").map(_.as[ReservedItems].items.head.itemId) // TODO Currently the event listener receives the OrderPlaced... shouldn't also receive the resolved event? - _ <- context.remoteEventListener.verifyEventsReceived(1) + _ <- eventually(context.remoteEventListener.verifyEventsReceived(1)) } yield { state1 should contain("OrderPlaced") state1 should not contain("ItemsReserved") @@ -256,7 +242,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { test("Baker.stopRetryingInteraction") { context => val recipeInstanceId: String = UUID.randomUUID().toString for { - _ <- context.remoteComponents.registerToTheCluster recipeId <- io(context.client.addRecipe(recipe)) _ <- io(context.client.bake(recipeId, recipeInstanceId)) _ <- context.remoteInteraction.processesWithFailure(new RuntimeException("functional failure")) @@ -265,7 +250,7 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { _ <- io(context.client.stopRetryingInteraction(recipeInstanceId, "ReserveItems")) state2data <- io(context.client.getRecipeInstanceState(recipeInstanceId)) state2 = state2data.events.map(_.name) - _ <- context.remoteEventListener.verifyEventsReceived(1) + _ <- eventually(eventually(context.remoteEventListener.verifyEventsReceived(1))) } yield { state1 should contain("OrderPlaced") state1 should not contain("ItemsReserved") @@ -299,26 +284,35 @@ class StateNodeSpec extends BakeryFunSpec with Matchers { * @return the resources each test can use */ def contextBuilder(testArguments: TestArguments): Resource[IO, TestContext] = { - val mockServerAddress = new InetSocketAddress("localhost", 0) for { // Mock server - mockServer <- Resource.make(IO(ClientAndServer.startClientAndServer(mockServerAddress.getPort)))(s => IO(s.stop())) + mockServer <- Resource.make(IO(ClientAndServer.startClientAndServer(0)))(s => IO(s.stop())) remoteInteraction = new RemoteInteraction(mockServer, Interactions.ReserveItemsInteraction) remoteEventListener = new RemoteEventListener(mockServer) kubeApiServer = new KubeApiServer(mockServer) remoteComponents = new RemoteComponents(kubeApiServer, remoteInteraction, remoteEventListener) + _ <- Resource.liftF(remoteComponents.registerToTheCluster) - system <- Resource.make(IO(ActorSystem(UUID.randomUUID().toString, ConfigFactory.empty())))( - system => IO.fromFuture(IO(system.terminate().flatMap(_ => system.whenTerminated))).void) - materializer: Materializer = ActorMaterializer()(system) + makeActorSystem = IO { + ActorSystem(UUID.randomUUID().toString, ConfigFactory.parseString( + """ + |akka { + | stdout-loglevel = "OFF" + | loglevel = "OFF" + |} + |""".stripMargin)) } + stopActorSystem = (system: ActorSystem) => IO.fromFuture(IO { + system.terminate().flatMap(_ => system.whenTerminated) }).void + system <- Resource.make(makeActorSystem)(stopActorSystem) - kubernetesApi = new ApiClient().setBasePath(s"http://localhost:${mockServerAddress.getPort}") + kubernetesApi = new ApiClient().setBasePath(s"http://localhost:${mockServer.getLocalPort}") serviceDiscovery <- ServiceDiscovery.resource(executionContext, "default", kubernetesApi) baker = AkkaBaker.withConfig( - AkkaBakerConfig.localDefault(ActorSystem(UUID.randomUUID().toString)).copy( + AkkaBakerConfig.localDefault(system).copy( interactionManager = serviceDiscovery.buildInteractionManager, bakerValidationSettings = AkkaBakerConfig.BakerValidationSettings( allowAddingRecipeWithoutRequiringInstances = true))(system)) + _ <- Resource.liftF(serviceDiscovery.plugBakerEventListeners(baker)) server <- StateNodeService.resource(baker, InetSocketAddress.createUnresolved("0.0.0.0", 0)) client <- BakerClient.resource(server.baseUri, executionContext) } yield Context( diff --git a/baas-protocol-baker/src/main/scala/com/ing/baker/baas/protocol/MarshallingUtils.scala b/baas-protocol-baker/src/main/scala/com/ing/baker/baas/protocol/MarshallingUtils.scala deleted file mode 100644 index 79dd2160..00000000 --- a/baas-protocol-baker/src/main/scala/com/ing/baker/baas/protocol/MarshallingUtils.scala +++ /dev/null @@ -1,70 +0,0 @@ -package com.ing.baker.baas.protocol - -import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller} -import akka.http.scaladsl.model.{ContentTypes, HttpResponse, MediaTypes, StatusCodes} -import akka.http.scaladsl.server.Directives.{complete, onSuccess} -import akka.http.scaladsl.server.Route -import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshal, Unmarshaller} -import akka.stream.Materializer -import com.ing.baker.runtime.common.BakerException -import com.ing.baker.runtime.serialization.ProtoMap -import BaaSProto._ - -import scala.concurrent.{ExecutionContext, Future} - -object MarshallingUtils { - - type ProtoMessage[A] = scalapb.GeneratedMessage with scalapb.Message[A] - - def completeWithBakerFailures[A, P <: ProtoMessage[P]](f: Future[A])(implicit ec: ExecutionContext, m1: ProtoMap[A, P]): Route = - complete(f.map(Right(_)).recover { case e: BakerException => Left(BaaSProtocol.BaaSRemoteFailure(e)) }) - - def completeWithBakerFailures(f: Future[Unit])(implicit ec: ExecutionContext): Route = - onSuccess(f.map(_ => None).recover { case e: BakerException => Some(e) }) { - case Some(e) => complete(BaaSProtocol.BaaSRemoteFailure(e)) - case None => complete(StatusCodes.OK) - } - - case class UnmarshalWithBakerExceptions[A](response: HttpResponse) { - - def withBakerExceptions[P <: ProtoMessage[P]](implicit ec: ExecutionContext, mat: Materializer, m1: ProtoMap[A, P]): Future[A] = { - for { - decoded <- Unmarshal(response).to[Either[BaaSProtocol.BaaSRemoteFailure, A]] - response <- decoded match { - case Left(e) => Future.failed(e.error) - case Right(a) => Future.successful(a) - } - } yield response - } - } - - def unmarshal[A](response: HttpResponse): UnmarshalWithBakerExceptions[A] = - UnmarshalWithBakerExceptions[A](response) - - def unmarshalBakerExceptions(response: HttpResponse)(implicit ec: ExecutionContext, mat: Materializer): Future[Unit] = - response.entity.httpEntity.contentType match { - case ContentTypes.`application/octet-stream` => - Unmarshal(response) - .to[BaaSProtocol.BaaSRemoteFailure] - .flatMap(e => Future.failed(e.error)) - case _ => - Future.successful(()) - } - - implicit def protoMarshaller[A, P <: ProtoMessage[P]](implicit mapping: ProtoMap[A, P]): ToEntityMarshaller[A] = - Marshaller.ByteArrayMarshaller.wrap(MediaTypes.`application/octet-stream`)(mapping.toByteArray) - - implicit def protoUnmarshaller[A, P <: ProtoMessage[P]](implicit mapping: ProtoMap[A, P]): FromEntityUnmarshaller[A] = - Unmarshaller.byteArrayUnmarshaller.map(mapping.fromByteArray(_).get) - - implicit def protoEitherMarshaller[A, P0 <: ProtoMessage[P0], B, P1 <: ProtoMessage[P1]](implicit m1: ProtoMap[A, P0], m2: ProtoMap[B, P1]): ToEntityMarshaller[Either[A, B]] = - Marshaller.ByteArrayMarshaller.wrap(MediaTypes.`application/octet-stream`) { - case Left(a) => m1.toByteArray(a) - case Right(b) => m2.toByteArray(b) - } - - implicit def protoEitherUnmarshaller[A, P0 <: ProtoMessage[P0], B, P1 <: ProtoMessage[P1]](implicit m1: ProtoMap[A, P0], m2: ProtoMap[B, P1]): FromEntityUnmarshaller[Either[A, B]] = - Unmarshaller.byteArrayUnmarshaller.map { byteArray => - m1.fromByteArray(byteArray).map(Left(_)).orElse(m2.fromByteArray(byteArray).map(Right(_))).get - } -} diff --git a/baas-protocol-interaction-scheduling/src/main/scala/com/ing/baker/baas/interaction/BakeryHttp.scala b/baas-protocol-interaction-scheduling/src/main/scala/com/ing/baker/baas/interaction/BakeryHttp.scala index 41c21956..3d63e549 100644 --- a/baas-protocol-interaction-scheduling/src/main/scala/com/ing/baker/baas/interaction/BakeryHttp.scala +++ b/baas-protocol-interaction-scheduling/src/main/scala/com/ing/baker/baas/interaction/BakeryHttp.scala @@ -7,7 +7,7 @@ import org.http4s.EntityDecoder.collectBinary import org.http4s.util.CaseInsensitiveString import org.http4s._ -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} object BakeryHttp { @@ -30,7 +30,18 @@ object BakeryHttp { EntityEncoder.byteArrayEncoder[IO].contramap(protoMap.toByteArray) implicit def protoEitherDecoder[A, P0 <: ProtoMessage[P0], B, P1 <: ProtoMessage[P1]](implicit p1: ProtoMap[A, P0], p2: ProtoMap[B, P1]): EntityDecoder[IO, Either[A, B]] = - protoDecoder[A, P0].map(Left(_)).orElse(protoDecoder[B, P1].map(Right(_))) + EntityDecoder.decodeBy(MediaType.application.`octet-stream`)(collectBinary[IO]).map(_.toArray) + .flatMapR { bytes => + val eitherTry: Try[Either[A, B]] = + p1.fromByteArray(bytes).map[Either[A, B]](Left(_)) + .orElse(p2.fromByteArray(bytes).map[Either[A, B]](Right(_))) + eitherTry match { + case Success(a) => + EitherT.fromEither[IO](Right(a)) + case Failure(exception) => + EitherT.fromEither[IO](Left(MalformedMessageBodyFailure(exception.getMessage, Some(exception)))) + } + } } object Headers { diff --git a/baas-protocol-interaction-scheduling/src/main/scala/com/ing/baker/baas/interaction/RemoteInteractionClient.scala b/baas-protocol-interaction-scheduling/src/main/scala/com/ing/baker/baas/interaction/RemoteInteractionClient.scala index 25cbb352..4647a03b 100644 --- a/baas-protocol-interaction-scheduling/src/main/scala/com/ing/baker/baas/interaction/RemoteInteractionClient.scala +++ b/baas-protocol-interaction-scheduling/src/main/scala/com/ing/baker/baas/interaction/RemoteInteractionClient.scala @@ -1,18 +1,17 @@ package com.ing.baker.baas.interaction import cats.effect.{ContextShift, IO, Resource, Timer} +import com.ing.baker.baas.interaction.BakeryHttp.Headers.{Intent, `X-Bakery-Intent`} +import com.ing.baker.baas.interaction.BakeryHttp.ProtoEntityEncoders._ import com.ing.baker.baas.protocol.InteractionSchedulingProto._ import com.ing.baker.baas.protocol.ProtocolInteractionExecution -import com.ing.baker.baas.interaction.BakeryHttp.Headers.`X-Bakery-Intent` -import com.ing.baker.baas.interaction.BakeryHttp.Headers.Intent -import com.ing.baker.baas.interaction.BakeryHttp.ProtoEntityEncoders._ -import com.ing.baker.runtime.scaladsl.{EventInstance, IngredientInstance, RecipeEventMetadata} +import com.ing.baker.runtime.scaladsl.{EventInstance, IngredientInstance} import com.ing.baker.types.Type import org.http4s.Method._ +import org.http4s.Uri import org.http4s.client.Client import org.http4s.client.blaze.BlazeClientBuilder import org.http4s.client.dsl.io._ -import org.http4s.{Status, Uri} import scala.concurrent.ExecutionContext diff --git a/build.sbt b/build.sbt index cf83ea1c..49d52a83 100644 --- a/build.sbt +++ b/build.sbt @@ -215,8 +215,6 @@ lazy val `baas-protocol-baker` = project.in(file("baas-protocol-baker")) .settings( moduleName := "baas-protocol-baker", libraryDependencies ++= Seq( - akkaStream, - akkaHttp, http4s, http4sDsl ) @@ -298,7 +296,6 @@ lazy val `baas-node-state` = project.in(file("baas-node-state")) slf4jSimple, scalaTest, mockServer, - akkaHttpCirce, circe, circeGeneric ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 126008ae..ad1119a1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -40,8 +40,6 @@ object Dependencies { val akkaManagementHttp = "com.lightbend.akka.management" %% "akka-management-cluster-http" % "1.0.5" val akkaClusterBoostrap = "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % "1.0.5" val akkaDiscoveryKube = "com.lightbend.akka.discovery" %% "akka-discovery-kubernetes-api" % "1.0.5" - val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11" - val akkaHttpCirce = "de.heikoseeberger" %% "akka-http-circe" % "1.28.0" val akkaBoostrap = "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % "1.0.5" val levelDB = "org.iq80.leveldb" % "leveldb" % "0.12" diff --git a/runtime/src/test/scala/com/ing/baker/runtime/akka/internal/InteractionManagerSpec.scala b/runtime/src/test/scala/com/ing/baker/runtime/akka/internal/InteractionManagerSpec.scala index 7dbf1bfb..360a8bc0 100644 --- a/runtime/src/test/scala/com/ing/baker/runtime/akka/internal/InteractionManagerSpec.scala +++ b/runtime/src/test/scala/com/ing/baker/runtime/akka/internal/InteractionManagerSpec.scala @@ -6,10 +6,10 @@ import com.ing.baker.runtime.scaladsl.InteractionInstance import com.ing.baker.types import com.ing.baker.types.Type import org.mockito.Mockito.when -import org.scalatest.{Matchers, WordSpecLike} +import org.scalatest.{AsyncWordSpecLike, Matchers} import org.scalatestplus.mockito.MockitoSugar -class InteractionManagerLocalSpec extends WordSpecLike with Matchers with MockitoSugar { +class InteractionManagerLocalSpec extends AsyncWordSpecLike with Matchers with MockitoSugar { "getImplementation" should { "return Some" when { "an interaction implementation is available" in { @@ -23,7 +23,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.Int32) when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor)) - interactionManager.getImplementation(interactionTransition) should equal(Some(interactionImplementation)) + interactionManager.getImplementation(interactionTransition).map(_ should equal(Some(interactionImplementation))) } "multiple interaction implementations are available" in { @@ -41,7 +41,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.Int32) when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor)) - interactionManager.getImplementation(interactionTransition) should equal(Some(interactionImplementation1)) + interactionManager.getImplementation(interactionTransition).map(_ should equal(Some(interactionImplementation1))) } "two implementations with the same correct name but only one has the correct input types" in { @@ -59,7 +59,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.Int32) when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor)) - interactionManager.getImplementation(interactionTransition) should equal(Some(interactionImplementation2)) + interactionManager.getImplementation(interactionTransition).map(_ should equal(Some(interactionImplementation2))) } } @@ -75,7 +75,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.Int32) when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor)) - interactionManager.getImplementation(interactionTransition) should equal(None) + interactionManager.getImplementation(interactionTransition).map(_ should equal(None)) } "an interaction implementation has a wrong ingredient input type" in { @@ -89,7 +89,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.CharArray) when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor)) - interactionManager.getImplementation(interactionTransition) should equal(None) + interactionManager.getImplementation(interactionTransition).map(_ should equal(None)) } "an interaction implementation has extra ingredient input types" in { @@ -103,7 +103,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.Int32) when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor)) - interactionManager.getImplementation(interactionTransition) should equal(None) + interactionManager.getImplementation(interactionTransition).map(_ should equal(None)) } "an interaction implementation has not enough ingredient input types" in { @@ -118,14 +118,14 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit val ingredientDescriptor2: IngredientDescriptor = IngredientDescriptor("ingredientName2", types.CharArray) when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor, ingredientDescriptor2)) - interactionManager.getImplementation(interactionTransition) should equal(None) + interactionManager.getImplementation(interactionTransition).map(_ should equal(None)) } "empty interaction seq" in { val interactionManager: InteractionManagerLocal = new InteractionManagerLocal(Seq.empty) val interactionTransition: InteractionTransition = mock[InteractionTransition] - interactionManager.getImplementation(interactionTransition) should equal(None) + interactionManager.getImplementation(interactionTransition).map(_ should equal(None)) } } }