Finished the transition to http4s

This commit is contained in:
Francisco M. Aramburo Torres
2020-02-28 16:33:47 +01:00
parent 3cf2572521
commit f5a5e68787
17 changed files with 155 additions and 160 deletions

View File

@@ -0,0 +1,17 @@
<configuration>
<logger name="org.mockserver.log.MockServerEventLog" level="OFF"/>
<root level="info">
<appender-ref ref="CONSOLE"/>
</root>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
</configuration>

View File

@@ -1,5 +1,6 @@
package com.ing.baker.baas.scaladsl
import cats.data.EitherT
import cats.effect.{ContextShift, IO, Resource, Timer}
import com.ing.baker.baas.protocol.BaaSProto._
import com.ing.baker.baas.protocol.BaaSProtocol
@@ -9,13 +10,15 @@ import com.ing.baker.runtime.common.SensoryEventStatus
import com.ing.baker.runtime.scaladsl.{BakerEvent, EventInstance, EventMoment, EventResolutions, InteractionInstance, RecipeEventMetadata, RecipeInformation, RecipeInstanceMetadata, RecipeInstanceState, SensoryEventResult, Baker => ScalaBaker}
import com.ing.baker.runtime.serialization.ProtoMap
import com.ing.baker.types.Value
import org.http4s.EntityDecoder.collectBinary
import org.http4s.Method._
import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.client.dsl.io._
import org.http4s.{EntityDecoder, MediaType, Request, Uri}
import org.http4s._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
object BakerClient {
@@ -76,7 +79,7 @@ final class BakerClient(client: Client[IO], hostname: Uri)(implicit ec: Executio
* @return All recipes in the form of map of recipeId -> CompiledRecipe
*/
override def getAllRecipes: Future[Map[String, RecipeInformation]] = {
val request = POST(
val request = GET(
Root / "getAllRecipes"
)
handleBakerResponse[BaaSProtocol.GetAllRecipesResponse, Map[String, RecipeInformation]](request)(_.map)
@@ -336,16 +339,24 @@ final class BakerClient(client: Client[IO], hostname: Uri)(implicit ec: Executio
private type WithBakerException[A] = Either[BaaSProtocol.BaaSRemoteFailure, A]
private implicit def withBakerExeptionEntityDecoder[A, P <: ProtoMessage[P]](implicit protoMap: ProtoMap[A, P]): EntityDecoder[IO, WithBakerException[A]] =
protoDecoder(baaSRemoteFailureProto).map(Left(_)).orElse(protoDecoder[A, P].map(Right(_)))
private def liftRemoteFailure[A](either: Either[BaaSProtocol.BaaSRemoteFailure, A]): IO[A] =
either.fold(failure => IO.raiseError(failure.error), IO.pure)
EntityDecoder.decodeBy(MediaType.application.`octet-stream`)(collectBinary[IO]).map(_.toArray)
.flatMapR { bytes =>
val eitherTry: Try[WithBakerException[A]] =
baaSRemoteFailureProto.fromByteArray(bytes).map[WithBakerException[A]](Left(_))
.orElse(protoMap.fromByteArray(bytes).map[WithBakerException[A]](Right(_)))
eitherTry match {
case Success(a) =>
EitherT.fromEither[IO](Right(a))
case Failure(exception) =>
EitherT.fromEither[IO](Left(MalformedMessageBodyFailure(exception.getMessage, Some(exception))))
}
}
final class HandleBakerResponsePartial[A, R] {
def apply[P <: ProtoMessage[P]](request: IO[Request[IO]])(f: A => R)(implicit protoMap: ProtoMap[A, P]): Future[R] =
client
.expect[WithBakerException[A]](request)
.flatMap(liftRemoteFailure _)
.flatMap(_.fold(failure => IO.raiseError(failure.error), IO.pure))
.map(f)
.unsafeToFuture()
}
@@ -356,7 +367,7 @@ final class BakerClient(client: Client[IO], hostname: Uri)(implicit ec: Executio
request.flatMap(client.run(_).use(response => response.contentType match {
case Some(contentType) if contentType.mediaType == MediaType.application.`octet-stream` =>
EntityDecoder[IO, BaaSProtocol.BaaSRemoteFailure]
.decode(response, true)
.decode(response, strict = true)
.value
.flatMap(_.fold(IO.raiseError(_), e => IO.raiseError(e.error)))
case _ => IO.unit

View File

@@ -3,14 +3,10 @@ package com.ing.baker.baas.javadsl
import java.util.concurrent.CompletableFuture
import java.util.function.BiConsumer
import akka.actor.ActorSystem
import com.ing.baker.baas.common
import com.ing.baker.baas.scaladsl
import com.ing.baker.baas.{common, scaladsl}
import com.ing.baker.runtime.common.LanguageDataStructures.JavaApi
import com.ing.baker.runtime.javadsl.{EventInstance, RecipeEventMetadata}
import scala.compat.java8.FutureConverters
object RemoteEventListener extends common.RemoteEventListener[CompletableFuture] with JavaApi {
override type EventInstanceType = EventInstance

View File

@@ -0,0 +1,17 @@
<configuration>
<logger name="org.mockserver.log.MockServerEventLog" level="OFF"/>
<root level="info">
<appender-ref ref="CONSOLE"/>
</root>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
</configuration>

View File

@@ -0,0 +1,17 @@
<configuration>
<logger name="org.mockserver.log.MockServerEventLog" level="OFF"/>
<root level="info">
<appender-ref ref="CONSOLE"/>
</root>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
</configuration>

View File

@@ -38,7 +38,7 @@ class RemoteInteractionSpec extends BakeryFunSpec {
.flatMap(server => RemoteInteractionClient.resource(server.baseUri, executionContext))
.use(runTest)
})
Resource.pure(context)
Resource.pure[IO, Context](context)
}
/** Refines the `ConfigMap` populated with the -Dkey=value arguments coming from the "sbt testOnly" command.

View File

@@ -6,7 +6,7 @@ import java.util.concurrent.Executors
import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.stream.{ActorMaterializer, Materializer}
import cats.effect.{ExitCode, IO, IOApp}
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import com.ing.baker.runtime.akka.{AkkaBaker, AkkaBakerConfig}
import com.ing.baker.runtime.scaladsl.Baker
@@ -43,6 +43,7 @@ object Main extends IOApp {
timeouts = AkkaBakerConfig.Timeouts.from(config),
bakerValidationSettings = AkkaBakerConfig.BakerValidationSettings.from(config)
)(system))
_ <- Resource.liftF(serviceDiscovery.plugBakerEventListeners(baker))
_ <- StateNodeService.resource(baker, hostname)
} yield ()

View File

@@ -10,6 +10,7 @@ import com.ing.baker.baas.state.ServiceDiscovery.{BakerListener, RecipeListener,
import com.ing.baker.il.petrinet.InteractionTransition
import com.ing.baker.runtime.akka.internal.InteractionManager
import com.ing.baker.runtime.scaladsl.{Baker, InteractionInstance}
import com.typesafe.scalalogging.LazyLogging
import fs2.Stream
import io.kubernetes.client.openapi.ApiClient
import io.kubernetes.client.openapi.apis.CoreV1Api
@@ -21,7 +22,7 @@ import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
object ServiceDiscovery {
object ServiceDiscovery extends LazyLogging {
private[state] type RecipeName = String
@@ -33,7 +34,7 @@ object ServiceDiscovery {
* ServiceDiscovery module to give corresponding InteractionInstances and clients to the event listeners.
* When the resource is released the polling to the Kubernetes API stops.
*
* Current hard coded polling periods: 5 seconds
* Current hard coded polling periods: 2 seconds
*
* @param connectionPool to be used for client connections
* @param namespace Kubernetes namespace to be queried
@@ -46,15 +47,19 @@ object ServiceDiscovery {
def resource(connectionPool: ExecutionContext, namespace: String, client: ApiClient = ClientBuilder.cluster.build)(implicit contextShift: ContextShift[IO], timer: Timer[IO], blockingEC: ExecutionContext): Resource[IO, ServiceDiscovery] = {
val api = new CoreV1Api(client)
val cacheIO = Ref.of[IO, List[V1Service]](List.empty)
def fetchServices(namespace: String, api: CoreV1Api)(implicit contextShift: ContextShift[IO], blockingEC: ExecutionContext): IO[List[V1Service]] =
contextShift.evalOn(blockingEC)(IO {
api.listNamespacedService(namespace, null, null, null, null, null, null, null, null, null)
.getItems
.asScala
.toList
})
}).attempt.flatMap {
case Right(services) =>
IO.pure(services)
case Left(e) =>
IO(logger.warn("Failed to communicate with the Kubernetes service: " + e.getMessage))
IO.pure(List.empty)
}
def getInteractionAddresses(currentServices: List[V1Service]): List[Uri] =
currentServices
@@ -83,21 +88,24 @@ object ServiceDiscovery {
def buildInteractions(currentServices: List[V1Service]): IO[List[InteractionInstance]] =
getInteractionAddresses(currentServices)
.map(RemoteInteractionClient.resource(_, connectionPool))
.parTraverse[IO, Option[InteractionInstance]](_.use { client =>
for {
interface <- client.interface.attempt
interactionsOpt = interface match {
case Right((name, types)) => Some(InteractionInstance(
name = name,
input = types,
run = client.runInteraction(_).unsafeToFuture()
))
case Left(_) => None
}
} yield interactionsOpt
})
.parTraverse[IO, Option[InteractionInstance]](buildInteractionInstance)
.map(_.flatten)
def buildInteractionInstance(resource: Resource[IO, RemoteInteractionClient]): IO[Option[InteractionInstance]] =
resource.use { client =>
for {
interface <- client.interface.attempt
interactionsOpt = interface match {
case Right((name, types)) => Some(InteractionInstance(
name = name,
input = types,
run = input => resource.use(_.runInteraction(input)).unsafeToFuture()
))
case Left(_) => None
}
} yield interactionsOpt
}
def buildRecipeListeners(currentServices: List[V1Service]): Map[RecipeName, List[RecipeListener]] =
getEventListenersAddresses(currentServices)
.map { case (recipe, address) => (recipe, RemoteEventListenerClient.resource(address, connectionPool)) }
@@ -114,17 +122,16 @@ object ServiceDiscovery {
cacheRecipeListeners <- Stream.eval(Ref.of[IO, Map[RecipeName, List[RecipeListener]]](Map.empty))
cacheBakerListeners <- Stream.eval(Ref.of[IO, List[BakerListener]](List.empty))
service = new ServiceDiscovery(cacheInteractions, cacheRecipeListeners, cacheBakerListeners)
updater = Stream
.fixedRate(5.seconds)
.evalMap(_ => fetchServices(namespace, api))
.evalMap { currentServices =>
updateServices = fetchServices(namespace, api)
.flatMap { currentServices =>
List(
buildInteractions(currentServices).flatMap(cacheInteractions.set),
cacheRecipeListeners.set(buildRecipeListeners(currentServices)),
cacheBakerListeners.set(buildBakerListeners(currentServices))
).parSequence
}
_ <- Stream.emit(service).concurrently(updater)
updater = Stream.fixedRate(5.seconds).evalMap(_ => updateServices)
_ <- Stream.eval(updateServices).concurrently(updater)
} yield service
stream.compile.resource.lastOrError

View File

@@ -9,7 +9,7 @@
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - aoushdoauhd;ouawhd %msg%n
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>

View File

@@ -60,7 +60,7 @@ class RemoteInteraction(mock: ClientAndServer, interaction: Interaction) {
private def applyMatch =
request()
.withMethod("POST")
.withPath("/api/v3/apply")
.withPath("/api/v3/run-interaction")
.withHeader("X-Bakery-Intent", s"Remote-Interaction:localhost")
}

View File

@@ -3,10 +3,9 @@ package com.ing.baker.baas.state
import java.net.InetSocketAddress
import java.util.UUID
import cats.implicits._
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import cats.effect.{IO, Resource}
import cats.implicits._
import com.ing.baker.baas.mocks.{KubeApiServer, RemoteComponents, RemoteEventListener, RemoteInteraction}
import com.ing.baker.baas.recipe.Events.{ItemsReserved, OrderPlaced}
import com.ing.baker.baas.recipe.Ingredients.{Item, OrderId, ReservedItems}
@@ -51,7 +50,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Recipe management") { context =>
for {
_ <- context.remoteComponents.registerToTheCluster
recipeId <- io(context.client.addRecipe(recipe))
recipeInformation <- io(context.client.getRecipe(recipeId))
noSuchRecipeError <- io(context.client
@@ -69,7 +67,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.bake") { context =>
val recipeInstanceId: String = UUID.randomUUID().toString
for {
_ <- context.remoteComponents.registerToTheCluster
_ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent)
recipeId <- io(context.client.addRecipe(recipe))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
@@ -83,7 +80,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.bake (fail with ProcessAlreadyExistsException)") { context =>
val recipeInstanceId: String = UUID.randomUUID().toString
for {
_ <- context.remoteComponents.registerToTheCluster
_ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent)
recipeId <- io(context.client.addRecipe(recipe))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
@@ -101,7 +97,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.bake (fail with NoSuchRecipeException)") { context =>
val recipeInstanceId: String = UUID.randomUUID().toString
for {
_ <- context.remoteComponents.registerToTheCluster
e <- io(context.client
.bake("non-existent", recipeInstanceId)
.map(_ => None)
@@ -111,7 +106,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.getRecipeInstanceState (fails with NoSuchProcessException)") { context =>
for {
_ <- context.remoteComponents.registerToTheCluster
e <- io(context.client
.getRecipeInstanceState("non-existent")
.map(_ => None)
@@ -122,7 +116,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.fireEventAndResolveWhenReceived") { context =>
val recipeInstanceId: String = UUID.randomUUID().toString
for {
_ <- context.remoteComponents.registerToTheCluster
recipeId <- io(context.client.addRecipe(recipe))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
_ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent)
@@ -133,13 +126,12 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.fireEventAndResolveWhenCompleted") { context =>
val recipeInstanceId: String = UUID.randomUUID().toString
for {
_ <- context.remoteComponents.registerToTheCluster
_ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent)
recipeId <- io(context.client.addRecipe(recipe))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
result <- io(context.client.fireEventAndResolveWhenCompleted(recipeInstanceId, OrderPlacedEvent))
serverState <- io(context.client.getRecipeInstanceState(recipeInstanceId))
_ <- eventually(context.remoteEventListener.verifyEventsReceived(2))
_ <- eventually(eventually(context.remoteEventListener.verifyEventsReceived(2)))
} yield {
result.eventNames shouldBe Seq("OrderPlaced", "ItemsReserved")
serverState.events.map(_.name) shouldBe Seq("OrderPlaced", "ItemsReserved")
@@ -150,7 +142,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
val recipeInstanceId: String = UUID.randomUUID().toString
val event = EventInstance("non-existent", Map.empty)
for {
_ <- context.remoteComponents.registerToTheCluster
recipeId <- io(context.client.addRecipe(recipe))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
result <- io(context.client
@@ -168,7 +159,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.fireEventAndResolveOnEvent") { context =>
val recipeInstanceId: String = UUID.randomUUID().toString
for {
_ <- context.remoteComponents.registerToTheCluster
_ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent)
recipeId <- io(context.client.addRecipe(recipe))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
@@ -176,7 +166,7 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
_ <- eventually {
for {
serverState <- io(context.client.getRecipeInstanceState(recipeInstanceId))
_ <- context.remoteEventListener.verifyEventsReceived(2)
_ <- eventually(context.remoteEventListener.verifyEventsReceived(2))
} yield serverState.events.map(_.name) shouldBe Seq("OrderPlaced", "ItemsReserved")
}
} yield result.eventNames shouldBe Seq("OrderPlaced")
@@ -185,7 +175,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.getAllRecipeInstancesMetadata") { context =>
val recipeInstanceId: String = UUID.randomUUID().toString
for {
_ <- context.remoteComponents.registerToTheCluster
_ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent)
recipeId <- io(context.client.addRecipe(recipe))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
@@ -197,7 +186,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.getVisualState") { context =>
val recipeInstanceId: String = UUID.randomUUID().toString
for {
_ <- context.remoteComponents.registerToTheCluster
_ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent)
recipeId <- io(context.client.addRecipe(recipe))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
@@ -208,7 +196,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.retryInteraction") { context =>
val recipeInstanceId: String = UUID.randomUUID().toString
for {
_ <- context.remoteComponents.registerToTheCluster
recipeId <- io(context.client.addRecipe(recipeWithBlockingStrategy))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
_ <- context.remoteInteraction.processesWithFailure(new RuntimeException("functional failure"))
@@ -217,7 +204,7 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
_ <- context.remoteInteraction.processesSuccessfullyAndFires(ItemsReservedEvent)
_ <- io(context.client.retryInteraction(recipeInstanceId, "ReserveItems"))
state2 <- io(context.client.getRecipeInstanceState(recipeInstanceId).map(_.events.map(_.name)))
_ <- context.remoteEventListener.verifyEventsReceived(2)
_ <- eventually(context.remoteEventListener.verifyEventsReceived(2))
} yield {
state1 should contain("OrderPlaced")
state1 should not contain("ItemsReserved")
@@ -232,7 +219,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
ItemsReserved(reservedItems = ReservedItems(items = List(Item("resolution-item")), data = Array.empty))
)
for {
_ <- context.remoteComponents.registerToTheCluster
recipeId <- io(context.client.addRecipe(recipeWithBlockingStrategy))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
_ <- context.remoteInteraction.processesWithFailure(new RuntimeException("functional failure"))
@@ -243,7 +229,7 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
state2 = state2data.events.map(_.name)
eventState = state2data.ingredients.get("reservedItems").map(_.as[ReservedItems].items.head.itemId)
// TODO Currently the event listener receives the OrderPlaced... shouldn't also receive the resolved event?
_ <- context.remoteEventListener.verifyEventsReceived(1)
_ <- eventually(context.remoteEventListener.verifyEventsReceived(1))
} yield {
state1 should contain("OrderPlaced")
state1 should not contain("ItemsReserved")
@@ -256,7 +242,6 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
test("Baker.stopRetryingInteraction") { context =>
val recipeInstanceId: String = UUID.randomUUID().toString
for {
_ <- context.remoteComponents.registerToTheCluster
recipeId <- io(context.client.addRecipe(recipe))
_ <- io(context.client.bake(recipeId, recipeInstanceId))
_ <- context.remoteInteraction.processesWithFailure(new RuntimeException("functional failure"))
@@ -265,7 +250,7 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
_ <- io(context.client.stopRetryingInteraction(recipeInstanceId, "ReserveItems"))
state2data <- io(context.client.getRecipeInstanceState(recipeInstanceId))
state2 = state2data.events.map(_.name)
_ <- context.remoteEventListener.verifyEventsReceived(1)
_ <- eventually(eventually(context.remoteEventListener.verifyEventsReceived(1)))
} yield {
state1 should contain("OrderPlaced")
state1 should not contain("ItemsReserved")
@@ -299,26 +284,35 @@ class StateNodeSpec extends BakeryFunSpec with Matchers {
* @return the resources each test can use
*/
def contextBuilder(testArguments: TestArguments): Resource[IO, TestContext] = {
val mockServerAddress = new InetSocketAddress("localhost", 0)
for {
// Mock server
mockServer <- Resource.make(IO(ClientAndServer.startClientAndServer(mockServerAddress.getPort)))(s => IO(s.stop()))
mockServer <- Resource.make(IO(ClientAndServer.startClientAndServer(0)))(s => IO(s.stop()))
remoteInteraction = new RemoteInteraction(mockServer, Interactions.ReserveItemsInteraction)
remoteEventListener = new RemoteEventListener(mockServer)
kubeApiServer = new KubeApiServer(mockServer)
remoteComponents = new RemoteComponents(kubeApiServer, remoteInteraction, remoteEventListener)
_ <- Resource.liftF(remoteComponents.registerToTheCluster)
system <- Resource.make(IO(ActorSystem(UUID.randomUUID().toString, ConfigFactory.empty())))(
system => IO.fromFuture(IO(system.terminate().flatMap(_ => system.whenTerminated))).void)
materializer: Materializer = ActorMaterializer()(system)
makeActorSystem = IO {
ActorSystem(UUID.randomUUID().toString, ConfigFactory.parseString(
"""
|akka {
| stdout-loglevel = "OFF"
| loglevel = "OFF"
|}
|""".stripMargin)) }
stopActorSystem = (system: ActorSystem) => IO.fromFuture(IO {
system.terminate().flatMap(_ => system.whenTerminated) }).void
system <- Resource.make(makeActorSystem)(stopActorSystem)
kubernetesApi = new ApiClient().setBasePath(s"http://localhost:${mockServerAddress.getPort}")
kubernetesApi = new ApiClient().setBasePath(s"http://localhost:${mockServer.getLocalPort}")
serviceDiscovery <- ServiceDiscovery.resource(executionContext, "default", kubernetesApi)
baker = AkkaBaker.withConfig(
AkkaBakerConfig.localDefault(ActorSystem(UUID.randomUUID().toString)).copy(
AkkaBakerConfig.localDefault(system).copy(
interactionManager = serviceDiscovery.buildInteractionManager,
bakerValidationSettings = AkkaBakerConfig.BakerValidationSettings(
allowAddingRecipeWithoutRequiringInstances = true))(system))
_ <- Resource.liftF(serviceDiscovery.plugBakerEventListeners(baker))
server <- StateNodeService.resource(baker, InetSocketAddress.createUnresolved("0.0.0.0", 0))
client <- BakerClient.resource(server.baseUri, executionContext)
} yield Context(

View File

@@ -1,70 +0,0 @@
package com.ing.baker.baas.protocol
import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller}
import akka.http.scaladsl.model.{ContentTypes, HttpResponse, MediaTypes, StatusCodes}
import akka.http.scaladsl.server.Directives.{complete, onSuccess}
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshal, Unmarshaller}
import akka.stream.Materializer
import com.ing.baker.runtime.common.BakerException
import com.ing.baker.runtime.serialization.ProtoMap
import BaaSProto._
import scala.concurrent.{ExecutionContext, Future}
object MarshallingUtils {
type ProtoMessage[A] = scalapb.GeneratedMessage with scalapb.Message[A]
def completeWithBakerFailures[A, P <: ProtoMessage[P]](f: Future[A])(implicit ec: ExecutionContext, m1: ProtoMap[A, P]): Route =
complete(f.map(Right(_)).recover { case e: BakerException => Left(BaaSProtocol.BaaSRemoteFailure(e)) })
def completeWithBakerFailures(f: Future[Unit])(implicit ec: ExecutionContext): Route =
onSuccess(f.map(_ => None).recover { case e: BakerException => Some(e) }) {
case Some(e) => complete(BaaSProtocol.BaaSRemoteFailure(e))
case None => complete(StatusCodes.OK)
}
case class UnmarshalWithBakerExceptions[A](response: HttpResponse) {
def withBakerExceptions[P <: ProtoMessage[P]](implicit ec: ExecutionContext, mat: Materializer, m1: ProtoMap[A, P]): Future[A] = {
for {
decoded <- Unmarshal(response).to[Either[BaaSProtocol.BaaSRemoteFailure, A]]
response <- decoded match {
case Left(e) => Future.failed(e.error)
case Right(a) => Future.successful(a)
}
} yield response
}
}
def unmarshal[A](response: HttpResponse): UnmarshalWithBakerExceptions[A] =
UnmarshalWithBakerExceptions[A](response)
def unmarshalBakerExceptions(response: HttpResponse)(implicit ec: ExecutionContext, mat: Materializer): Future[Unit] =
response.entity.httpEntity.contentType match {
case ContentTypes.`application/octet-stream` =>
Unmarshal(response)
.to[BaaSProtocol.BaaSRemoteFailure]
.flatMap(e => Future.failed(e.error))
case _ =>
Future.successful(())
}
implicit def protoMarshaller[A, P <: ProtoMessage[P]](implicit mapping: ProtoMap[A, P]): ToEntityMarshaller[A] =
Marshaller.ByteArrayMarshaller.wrap(MediaTypes.`application/octet-stream`)(mapping.toByteArray)
implicit def protoUnmarshaller[A, P <: ProtoMessage[P]](implicit mapping: ProtoMap[A, P]): FromEntityUnmarshaller[A] =
Unmarshaller.byteArrayUnmarshaller.map(mapping.fromByteArray(_).get)
implicit def protoEitherMarshaller[A, P0 <: ProtoMessage[P0], B, P1 <: ProtoMessage[P1]](implicit m1: ProtoMap[A, P0], m2: ProtoMap[B, P1]): ToEntityMarshaller[Either[A, B]] =
Marshaller.ByteArrayMarshaller.wrap(MediaTypes.`application/octet-stream`) {
case Left(a) => m1.toByteArray(a)
case Right(b) => m2.toByteArray(b)
}
implicit def protoEitherUnmarshaller[A, P0 <: ProtoMessage[P0], B, P1 <: ProtoMessage[P1]](implicit m1: ProtoMap[A, P0], m2: ProtoMap[B, P1]): FromEntityUnmarshaller[Either[A, B]] =
Unmarshaller.byteArrayUnmarshaller.map { byteArray =>
m1.fromByteArray(byteArray).map(Left(_)).orElse(m2.fromByteArray(byteArray).map(Right(_))).get
}
}

View File

@@ -7,7 +7,7 @@ import org.http4s.EntityDecoder.collectBinary
import org.http4s.util.CaseInsensitiveString
import org.http4s._
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}
object BakeryHttp {
@@ -30,7 +30,18 @@ object BakeryHttp {
EntityEncoder.byteArrayEncoder[IO].contramap(protoMap.toByteArray)
implicit def protoEitherDecoder[A, P0 <: ProtoMessage[P0], B, P1 <: ProtoMessage[P1]](implicit p1: ProtoMap[A, P0], p2: ProtoMap[B, P1]): EntityDecoder[IO, Either[A, B]] =
protoDecoder[A, P0].map(Left(_)).orElse(protoDecoder[B, P1].map(Right(_)))
EntityDecoder.decodeBy(MediaType.application.`octet-stream`)(collectBinary[IO]).map(_.toArray)
.flatMapR { bytes =>
val eitherTry: Try[Either[A, B]] =
p1.fromByteArray(bytes).map[Either[A, B]](Left(_))
.orElse(p2.fromByteArray(bytes).map[Either[A, B]](Right(_)))
eitherTry match {
case Success(a) =>
EitherT.fromEither[IO](Right(a))
case Failure(exception) =>
EitherT.fromEither[IO](Left(MalformedMessageBodyFailure(exception.getMessage, Some(exception))))
}
}
}
object Headers {

View File

@@ -1,18 +1,17 @@
package com.ing.baker.baas.interaction
import cats.effect.{ContextShift, IO, Resource, Timer}
import com.ing.baker.baas.interaction.BakeryHttp.Headers.{Intent, `X-Bakery-Intent`}
import com.ing.baker.baas.interaction.BakeryHttp.ProtoEntityEncoders._
import com.ing.baker.baas.protocol.InteractionSchedulingProto._
import com.ing.baker.baas.protocol.ProtocolInteractionExecution
import com.ing.baker.baas.interaction.BakeryHttp.Headers.`X-Bakery-Intent`
import com.ing.baker.baas.interaction.BakeryHttp.Headers.Intent
import com.ing.baker.baas.interaction.BakeryHttp.ProtoEntityEncoders._
import com.ing.baker.runtime.scaladsl.{EventInstance, IngredientInstance, RecipeEventMetadata}
import com.ing.baker.runtime.scaladsl.{EventInstance, IngredientInstance}
import com.ing.baker.types.Type
import org.http4s.Method._
import org.http4s.Uri
import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.client.dsl.io._
import org.http4s.{Status, Uri}
import scala.concurrent.ExecutionContext

View File

@@ -215,8 +215,6 @@ lazy val `baas-protocol-baker` = project.in(file("baas-protocol-baker"))
.settings(
moduleName := "baas-protocol-baker",
libraryDependencies ++= Seq(
akkaStream,
akkaHttp,
http4s,
http4sDsl
)
@@ -298,7 +296,6 @@ lazy val `baas-node-state` = project.in(file("baas-node-state"))
slf4jSimple,
scalaTest,
mockServer,
akkaHttpCirce,
circe,
circeGeneric
)

View File

@@ -40,8 +40,6 @@ object Dependencies {
val akkaManagementHttp = "com.lightbend.akka.management" %% "akka-management-cluster-http" % "1.0.5"
val akkaClusterBoostrap = "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % "1.0.5"
val akkaDiscoveryKube = "com.lightbend.akka.discovery" %% "akka-discovery-kubernetes-api" % "1.0.5"
val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11"
val akkaHttpCirce = "de.heikoseeberger" %% "akka-http-circe" % "1.28.0"
val akkaBoostrap = "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % "1.0.5"
val levelDB = "org.iq80.leveldb" % "leveldb" % "0.12"

View File

@@ -6,10 +6,10 @@ import com.ing.baker.runtime.scaladsl.InteractionInstance
import com.ing.baker.types
import com.ing.baker.types.Type
import org.mockito.Mockito.when
import org.scalatest.{Matchers, WordSpecLike}
import org.scalatest.{AsyncWordSpecLike, Matchers}
import org.scalatestplus.mockito.MockitoSugar
class InteractionManagerLocalSpec extends WordSpecLike with Matchers with MockitoSugar {
class InteractionManagerLocalSpec extends AsyncWordSpecLike with Matchers with MockitoSugar {
"getImplementation" should {
"return Some" when {
"an interaction implementation is available" in {
@@ -23,7 +23,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit
val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.Int32)
when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor))
interactionManager.getImplementation(interactionTransition) should equal(Some(interactionImplementation))
interactionManager.getImplementation(interactionTransition).map(_ should equal(Some(interactionImplementation)))
}
"multiple interaction implementations are available" in {
@@ -41,7 +41,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit
val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.Int32)
when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor))
interactionManager.getImplementation(interactionTransition) should equal(Some(interactionImplementation1))
interactionManager.getImplementation(interactionTransition).map(_ should equal(Some(interactionImplementation1)))
}
"two implementations with the same correct name but only one has the correct input types" in {
@@ -59,7 +59,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit
val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.Int32)
when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor))
interactionManager.getImplementation(interactionTransition) should equal(Some(interactionImplementation2))
interactionManager.getImplementation(interactionTransition).map(_ should equal(Some(interactionImplementation2)))
}
}
@@ -75,7 +75,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit
val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.Int32)
when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor))
interactionManager.getImplementation(interactionTransition) should equal(None)
interactionManager.getImplementation(interactionTransition).map(_ should equal(None))
}
"an interaction implementation has a wrong ingredient input type" in {
@@ -89,7 +89,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit
val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.CharArray)
when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor))
interactionManager.getImplementation(interactionTransition) should equal(None)
interactionManager.getImplementation(interactionTransition).map(_ should equal(None))
}
"an interaction implementation has extra ingredient input types" in {
@@ -103,7 +103,7 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit
val ingredientDescriptor: IngredientDescriptor = IngredientDescriptor("ingredientName", types.Int32)
when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor))
interactionManager.getImplementation(interactionTransition) should equal(None)
interactionManager.getImplementation(interactionTransition).map(_ should equal(None))
}
"an interaction implementation has not enough ingredient input types" in {
@@ -118,14 +118,14 @@ class InteractionManagerLocalSpec extends WordSpecLike with Matchers with Mockit
val ingredientDescriptor2: IngredientDescriptor = IngredientDescriptor("ingredientName2", types.CharArray)
when(interactionTransition.requiredIngredients).thenReturn(Seq(ingredientDescriptor, ingredientDescriptor2))
interactionManager.getImplementation(interactionTransition) should equal(None)
interactionManager.getImplementation(interactionTransition).map(_ should equal(None))
}
"empty interaction seq" in {
val interactionManager: InteractionManagerLocal = new InteractionManagerLocal(Seq.empty)
val interactionTransition: InteractionTransition = mock[InteractionTransition]
interactionManager.getImplementation(interactionTransition) should equal(None)
interactionManager.getImplementation(interactionTransition).map(_ should equal(None))
}
}
}