Streaming concept finished, requires fixing mocks of the functional tests but smoke tests are passing

This commit is contained in:
Francisco M. Aramburo Torres
2020-03-04 17:28:11 +01:00
parent 1b154b2b5d
commit 93477d699d
2 changed files with 24 additions and 9 deletions

View File

@@ -86,6 +86,7 @@ object ServiceDiscovery extends LazyLogging {
resource.use { client =>
for {
interface <- client.interface.attempt
_ = println(Console.CYAN + interface + Console.RESET)
interactionsOpt = interface match {
case Right((name, types)) => Some(InteractionInstance(
name = name,
@@ -114,9 +115,6 @@ object ServiceDiscovery extends LazyLogging {
cacheRecipeListeners: Ref[IO, Map[RecipeName, List[RecipeListener]]],
cacheBakerListeners: Ref[IO, List[BakerListener]]
)(event: K8SWatchEvent[Service]): IO[Unit] = {
println(Console.MAGENTA + event._type + Console.RESET)
println(Console.MAGENTA + event._object + Console.RESET)
println()
for {
services <- event._type match {
case EventType.ADDED =>
@@ -124,7 +122,7 @@ object ServiceDiscovery extends LazyLogging {
case EventType.DELETED =>
currentServices.updateAndGet(_.filterNot(_ == event._object))
case EventType.MODIFIED =>
currentServices.updateAndGet(_.map(service => if (service == event._object) event._object else service))
currentServices.updateAndGet(_.map(service => if (service.metadata.uid == event._object.metadata.uid) event._object else service))
case EventType.ERROR =>
IO(logger.error(s"Event type ERROR on service watch for service ${event._object}")) *> currentServices.get
}
@@ -142,9 +140,9 @@ object ServiceDiscovery extends LazyLogging {
cacheRecipeListeners <- Ref.of[IO, Map[RecipeName, List[RecipeListener]]](Map.empty)
cacheBakerListeners <- Ref.of[IO, List[BakerListener]](List.empty)
service = new ServiceDiscovery(cacheInteractions, cacheRecipeListeners, cacheBakerListeners)
updateServices = updateServicesWith(currentServices, cacheInteractions, cacheRecipeListeners, cacheBakerListeners)
updateServices = updateServicesWith(currentServices, cacheInteractions, cacheRecipeListeners, cacheBakerListeners) _
killSwitch <- IO {
k8s.watchContinuously[Service]("service")
k8s.watchAllContinuously[Service]()
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(updateServices(_).unsafeRunAsyncAndForget()))(Keep.left)
.run()
@@ -180,6 +178,9 @@ final class ServiceDiscovery private(
override def addImplementation(interaction: InteractionInstance): Future[Unit] =
Future.failed(new IllegalStateException("Adding implmentation instances is not supported on a Bakery cluster."))
override def getImplementation(interaction: InteractionTransition): Future[Option[InteractionInstance]] =
cacheInteractions.get.map(_.find(isCompatibleImplementation(interaction, _))).unsafeToFuture()
cacheInteractions.get.map(_.find(isCompatibleImplementation(interaction, _))).map{ x =>
println(Console.MAGENTA + x + Console.RESET)
x
}.unsafeToFuture()
}
}

View File

@@ -121,7 +121,6 @@ abstract class BakeryFunSpec extends fixture.AsyncFunSpecLike {
kubernetesConfigPath = getClass.getResource("/kubernetes").getPath
prefix = s"[${Console.GREEN}creating env $testUUID${Console.RESET}]"
_ <- exec(prefix, command = s"kubectl create namespace $testUUID")
_ <- exec(prefix, command = s"kubectl apply -f $kubernetesConfigPath -n $testUUID")
_ = if(args.skipCleanup) {
println(Console.YELLOW + s"### Will skip cleanup after the test, to manually clean the environment run: " + Console.RESET)
println(s"\n\tkubectl delete -f $kubernetesConfigPath -n $testUUID && kubectl delete namespace $testUUID\n")
@@ -129,6 +128,12 @@ abstract class BakeryFunSpec extends fixture.AsyncFunSpecLike {
} yield testUUID
}
def applyFile(name: String, namespace: String): IO[Unit] = {
val kubernetesConfigPath = getClass.getResource("/kubernetes").getPath
val prefix = s"[${Console.GREEN}applying file $name $namespace${Console.RESET}]"
exec(prefix, command = s"kubectl apply -f $kubernetesConfigPath/$name -n $namespace").void
}
def getPods(namespaceOpt: Option[String]): IO[Int] =
namespaceOpt match {
case Some(namespace) =>
@@ -151,8 +156,17 @@ abstract class BakeryFunSpec extends fixture.AsyncFunSpecLike {
val recipeEventsClient = new EventListenerClient(client, args.eventListenerHostname)
val bakerEventsClient = new EventListenerClient(client, args.bakerEventListenerHostname)
for {
_ <- setup(applyFile("example-interactions.yaml", namespace.getOrElse("default")))
_ <- setup(applyFile("example-listeners.yaml", namespace.getOrElse("default")))
_ <- setup(applyFile("bakery-cluster.yaml", namespace.getOrElse("default")))
_ <- within(setupWaitTime, setupWaitSplit)(for {
_ <- IO ( println(Console.GREEN + s"\nWaiting for environment (5s)..." + Console.RESET) )
_ <- IO ( println(Console.GREEN + s"\nWaiting for bakery cluster (5s)..." + Console.RESET) )
_ <- getPods(namespace)
status <- client.statusFromUri(args.stateServiceHostname / "api" / "v3" / "getAllRecipes")
} yield assert(status.code == 200))
_ <- setup(applyFile("example-client-app.yaml", namespace.getOrElse("default")))
_ <- within(setupWaitTime, setupWaitSplit)(for {
_ <- IO ( println(Console.GREEN + s"\nWaiting for client app (5s)..." + Console.RESET) )
_ <- getPods(namespace)
status <- exampleAppClient.ping
} yield assert(status.code == 200))