working example of the remote baker event listener

This commit is contained in:
Francisco Aramburo
2020-02-09 02:17:21 +01:00
parent 2fbbcd1cba
commit a1bd7a003f
8 changed files with 146 additions and 30 deletions

View File

@@ -2,7 +2,7 @@ package com.ing.baker.baas.state
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.ing.baker.baas.akka.RemoteEventListenerClient
import com.ing.baker.baas.akka.{RemoteBakerEventListenerClient, RemoteEventListenerClient}
import com.ing.baker.runtime.scaladsl.Baker
import com.ing.baker.runtime.serialization.Encryption
import com.typesafe.scalalogging.LazyLogging
@@ -17,12 +17,13 @@ class EventListenersServiceDiscovery(discovery: ServiceDiscovery, baker: Baker)(
type RecipeName = String
private var listenersCache: Map[RecipeName, List[RemoteEventListenerClient]] = Map.empty
private var recipeListenersCache: Map[RecipeName, List[RemoteEventListenerClient]] = Map.empty
private var bakerListenersCache: List[RemoteBakerEventListenerClient] = List.empty
private def loadListeners: Future[Map[RecipeName, List[RemoteEventListenerClient]]] = {
discovery
.getEventListenersAddresses
.map( _
.map(_
.map { case (recipe, address) => (recipe, RemoteEventListenerClient(address)) }
.toList
.foldLeft(Map.empty[RecipeName, List[RemoteEventListenerClient]]) { case (acc, (recipeName, client)) =>
@@ -30,18 +31,32 @@ class EventListenersServiceDiscovery(discovery: ServiceDiscovery, baker: Baker)(
})
}
private def loadBakerListeners: Future[List[RemoteBakerEventListenerClient]] = {
discovery
.getBakerEventListenersAddresses
.map(_
.map(RemoteBakerEventListenerClient(_))
.toList)
}
private def updateCache: Runnable = () => {
loadListeners.foreach { listeners =>
logger.info("Updating the InteractionManager")
listenersCache = listeners
recipeListenersCache = listeners
}
loadBakerListeners.foreach { listeners =>
bakerListenersCache = listeners
}
}
system.scheduler.schedule(30.seconds, 10.seconds, updateCache)
def initializeEventListeners: Future[Unit] =
def initializeEventListeners: Future[Unit] = {
baker.registerEventListener((metadata, event) => {
listenersCache.get(metadata.recipeName).foreach(_.foreach(_.apply(metadata, event)))
listenersCache.get("All-Recipes").foreach(_.foreach(_.apply(metadata, event)))
recipeListenersCache.get(metadata.recipeName).foreach(_.foreach(_.apply(metadata, event)))
recipeListenersCache.get("All-Recipes").foreach(_.foreach(_.apply(metadata, event)))
})
baker.registerBakerEventListener(event => {
bakerListenersCache.foreach(_.apply(event))
})
}
}

View File

@@ -7,5 +7,7 @@ trait ServiceDiscovery {
def getInteractionAddresses: Future[Seq[String]]
def getEventListenersAddresses: Future[Seq[(String, String)]]
def getBakerEventListenersAddresses: Future[Seq[String]]
}

View File

@@ -8,7 +8,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future
class ServiceDiscoveryKubernetes(namespace: String) extends ServiceDiscovery {
class ServiceDiscoveryKubernetes(namespace: String) extends ServiceDiscovery {
private val api = new CoreV1Api(ClientBuilder.cluster.build)
@@ -22,6 +22,12 @@ class ServiceDiscoveryKubernetes(namespace: String) extends ServiceDiscovery {
})
}
override def getBakerEventListenersAddresses: Future[Seq[String]] = {
Future.successful(getBakerEventListenerServices().map { service =>
"http://" + service.getMetadata.getName + ":8080"
})
}
private def getInteractionServices(): mutable.Seq[V1Service] = {
api.listNamespacedService(namespace, null, null, null, null, null, null, null, null, null)
.getItems
@@ -37,5 +43,13 @@ class ServiceDiscoveryKubernetes(namespace: String) extends ServiceDiscovery {
.filter(_.getMetadata.getLabels.getOrDefault("baas-component", "Wrong")
.equals("remote-event-listener"))
}
private def getBakerEventListenerServices(): mutable.Seq[V1Service] = {
api.listNamespacedService(namespace, null, null, null, null, null, null, null, null, null)
.getItems
.asScala
.filter(_.getMetadata.getLabels.getOrDefault("baas-component", "Wrong")
.equals("remote-baker-event-listener"))
}
}

View File

@@ -136,8 +136,6 @@ lazy val runtime = project.in(file("runtime"))
.dependsOn(
intermediateLanguage,
`baker-interface`,
`baas-protocol-interaction-scheduling`,
`baas-protocol-recipe-event-publishing`,
testScope(recipeDsl),
testScope(recipeCompiler),
testScope(bakertypes))
@@ -283,7 +281,12 @@ lazy val `baas-node-state` = project.in(file("baas-node-state"))
packageName in Docker := "baas-node-state",
dockerRepository in Docker := sys.env.get("BAAS_DOCKER_REPO")
)
.dependsOn(runtime, `baas-protocol-baker`, `baas-protocol-interaction-scheduling`)
.dependsOn(
runtime,
`baas-protocol-baker`,
`baas-protocol-interaction-scheduling`,
`baas-protocol-recipe-event-publishing`,
`baas-protocol-baker-event-publishing`)
lazy val `baas-node-interaction` = project.in(file("baas-node-interaction"))
.settings(defaultModuleSettings)
@@ -451,21 +454,41 @@ lazy val `baas-event-listener-example` = project
libraryDependencies ++=
compileDeps(
slf4jApi,
slf4jSimple,
catsEffect
) ++ testDeps(
scalaTest,
scalaCheck
)
slf4jSimple
) ++ testDeps()
)
.settings(
maintainer in Docker := "The Apollo Squad",
packageSummary in Docker := "A web-shop checkout service interaction instances example running on baas",
packageSummary in Docker := "A web-shop checkout service example running on baas",
packageName in Docker := "baas-event-listener-example",
dockerRepository in Docker := sys.env.get("BAAS_DOCKER_REPO")
)
.dependsOn(`baas-node-event-listener`)
lazy val `baas-baker-event-listener-example` = project
.in(file("examples/baas-baker-event-listener-example"))
.enablePlugins(JavaAppPackaging)
.settings(commonSettings)
.settings(noPublishSettings)
.settings(
moduleName := "baas-baker-event-listener-example",
scalacOptions ++= Seq(
"-Ypartial-unification"
),
libraryDependencies ++=
compileDeps(
slf4jApi,
slf4jSimple
) ++ testDeps()
)
.settings(
maintainer in Docker := "The Apollo Squad",
packageSummary in Docker := "A web-shop checkout service example running on baas",
packageName in Docker := "baas-baker-event-listener-example",
dockerRepository in Docker := sys.env.get("BAAS_DOCKER_REPO")
)
.dependsOn(`baas-node-baker-event-listener`)
lazy val `baas-interaction-example-reserve-items` = project.in(file("examples/baas-interaction-examples/reserve-items"))
.enablePlugins(JavaAppPackaging)
.settings(commonSettings)

View File

@@ -0,0 +1,11 @@
package webshop.webservice
import com.ing.baker.baas.scaladsl.RemoteBakerEventListener
import com.typesafe.scalalogging.LazyLogging
object Main extends App with LazyLogging {
RemoteBakerEventListener.load(event => {
println(Console.YELLOW + event + Console.RESET)
logger.info(event.toString)
})
}

View File

@@ -1,11 +0,0 @@
package webshop.webservice
import com.ing.baker.baas.scaladsl.RemoteEventListener
import com.typesafe.scalalogging.LazyLogging
object Main extends App with LazyLogging {
RemoteEventListener.load((metadata, event) => {
println(Console.YELLOW + metadata.recipeName + " [" + metadata.recipeInstanceId + "] " + event.name + Console.RESET)
logger.info(metadata.recipeName + " [" + metadata.recipeInstanceId + "] " + event.name)
})
}

View File

@@ -251,6 +251,67 @@ spec:
---
apiVersion: v1
kind: Service
metadata:
name: baas-baker-event-listener-service
labels:
baas-component: remote-baker-event-listener
spec:
selector:
app: baas-baker-event-listener
ports:
- name: http-api
port: 8080
targetPort: http-api
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: baas-baker-event-listener
name: baas-baker-event-listener
spec:
replicas: 1
selector:
matchLabels:
app: baas-baker-event-listener
template:
metadata:
labels:
app: baas-baker-event-listener
spec:
containers:
- name: baas-baker-event-listener-example
image: baas-baker-event-listener-example:3.0.2-SNAPSHOT
imagePullPolicy: Never
env:
- name: BAAS_INGREDIENT_ENCRYPTION_ENABLED
value: "true"
- name: BAAS_INGREDIENT_ENCRYPTION_SECRET
valueFrom:
secretKeyRef:
name: ingredient-encryption-secret
key: encryption-secret
readinessProbe:
httpGet:
path: /api/v3/health
port: 8080
livenessProbe:
httpGet:
path: /api/v3/health
port: 8080
ports:
- name: http-api
containerPort: 8080
protocol: TCP
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:

View File

@@ -8,3 +8,4 @@ sbt baas-interaction-example-make-payment/docker:publishLocal
sbt baas-interaction-example-reserve-items/docker:publishLocal
sbt baas-interaction-example-ship-items/docker:publishLocal
sbt baas-event-listener-example/docker:publishLocal
sbt baas-baker-event-listener-example/docker:publishLocal