From 93477d699d8b8b76f857b1609b78a18e89aa2e09 Mon Sep 17 00:00:00 2001 From: "Francisco M. Aramburo Torres" Date: Wed, 4 Mar 2020 17:28:11 +0100 Subject: [PATCH] Streaming concept finished, requires fixing mocks of the functional tests but smoke tests are passing --- .../baker/baas/state/ServiceDiscovery.scala | 15 ++++++++------- .../ing/baker/baas/smoke/BakeryFunSpec.scala | 18 ++++++++++++++++-- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/baas-node-state/src/main/scala/com/ing/baker/baas/state/ServiceDiscovery.scala b/baas-node-state/src/main/scala/com/ing/baker/baas/state/ServiceDiscovery.scala index cf7d6047..ac241965 100644 --- a/baas-node-state/src/main/scala/com/ing/baker/baas/state/ServiceDiscovery.scala +++ b/baas-node-state/src/main/scala/com/ing/baker/baas/state/ServiceDiscovery.scala @@ -86,6 +86,7 @@ object ServiceDiscovery extends LazyLogging { resource.use { client => for { interface <- client.interface.attempt + _ = println(Console.CYAN + interface + Console.RESET) interactionsOpt = interface match { case Right((name, types)) => Some(InteractionInstance( name = name, @@ -114,9 +115,6 @@ object ServiceDiscovery extends LazyLogging { cacheRecipeListeners: Ref[IO, Map[RecipeName, List[RecipeListener]]], cacheBakerListeners: Ref[IO, List[BakerListener]] )(event: K8SWatchEvent[Service]): IO[Unit] = { - println(Console.MAGENTA + event._type + Console.RESET) - println(Console.MAGENTA + event._object + Console.RESET) - println() for { services <- event._type match { case EventType.ADDED => @@ -124,7 +122,7 @@ object ServiceDiscovery extends LazyLogging { case EventType.DELETED => currentServices.updateAndGet(_.filterNot(_ == event._object)) case EventType.MODIFIED => - currentServices.updateAndGet(_.map(service => if (service == event._object) event._object else service)) + currentServices.updateAndGet(_.map(service => if (service.metadata.uid == event._object.metadata.uid) event._object else service)) case EventType.ERROR => IO(logger.error(s"Event type ERROR on service watch for service ${event._object}")) *> currentServices.get } @@ -142,9 +140,9 @@ object ServiceDiscovery extends LazyLogging { cacheRecipeListeners <- Ref.of[IO, Map[RecipeName, List[RecipeListener]]](Map.empty) cacheBakerListeners <- Ref.of[IO, List[BakerListener]](List.empty) service = new ServiceDiscovery(cacheInteractions, cacheRecipeListeners, cacheBakerListeners) - updateServices = updateServicesWith(currentServices, cacheInteractions, cacheRecipeListeners, cacheBakerListeners) + updateServices = updateServicesWith(currentServices, cacheInteractions, cacheRecipeListeners, cacheBakerListeners) _ killSwitch <- IO { - k8s.watchContinuously[Service]("service") + k8s.watchAllContinuously[Service]() .viaMat(KillSwitches.single)(Keep.right) .toMat(Sink.foreach(updateServices(_).unsafeRunAsyncAndForget()))(Keep.left) .run() @@ -180,6 +178,9 @@ final class ServiceDiscovery private( override def addImplementation(interaction: InteractionInstance): Future[Unit] = Future.failed(new IllegalStateException("Adding implmentation instances is not supported on a Bakery cluster.")) override def getImplementation(interaction: InteractionTransition): Future[Option[InteractionInstance]] = - cacheInteractions.get.map(_.find(isCompatibleImplementation(interaction, _))).unsafeToFuture() + cacheInteractions.get.map(_.find(isCompatibleImplementation(interaction, _))).map{ x => + println(Console.MAGENTA + x + Console.RESET) + x + }.unsafeToFuture() } } diff --git a/baas-smoke-tests/src/test/scala/com/ing/baker/baas/smoke/BakeryFunSpec.scala b/baas-smoke-tests/src/test/scala/com/ing/baker/baas/smoke/BakeryFunSpec.scala index 9340ef62..c527026b 100644 --- a/baas-smoke-tests/src/test/scala/com/ing/baker/baas/smoke/BakeryFunSpec.scala +++ b/baas-smoke-tests/src/test/scala/com/ing/baker/baas/smoke/BakeryFunSpec.scala @@ -121,7 +121,6 @@ abstract class BakeryFunSpec extends fixture.AsyncFunSpecLike { kubernetesConfigPath = getClass.getResource("/kubernetes").getPath prefix = s"[${Console.GREEN}creating env $testUUID${Console.RESET}]" _ <- exec(prefix, command = s"kubectl create namespace $testUUID") - _ <- exec(prefix, command = s"kubectl apply -f $kubernetesConfigPath -n $testUUID") _ = if(args.skipCleanup) { println(Console.YELLOW + s"### Will skip cleanup after the test, to manually clean the environment run: " + Console.RESET) println(s"\n\tkubectl delete -f $kubernetesConfigPath -n $testUUID && kubectl delete namespace $testUUID\n") @@ -129,6 +128,12 @@ abstract class BakeryFunSpec extends fixture.AsyncFunSpecLike { } yield testUUID } + def applyFile(name: String, namespace: String): IO[Unit] = { + val kubernetesConfigPath = getClass.getResource("/kubernetes").getPath + val prefix = s"[${Console.GREEN}applying file $name $namespace${Console.RESET}]" + exec(prefix, command = s"kubectl apply -f $kubernetesConfigPath/$name -n $namespace").void + } + def getPods(namespaceOpt: Option[String]): IO[Int] = namespaceOpt match { case Some(namespace) => @@ -151,8 +156,17 @@ abstract class BakeryFunSpec extends fixture.AsyncFunSpecLike { val recipeEventsClient = new EventListenerClient(client, args.eventListenerHostname) val bakerEventsClient = new EventListenerClient(client, args.bakerEventListenerHostname) for { + _ <- setup(applyFile("example-interactions.yaml", namespace.getOrElse("default"))) + _ <- setup(applyFile("example-listeners.yaml", namespace.getOrElse("default"))) + _ <- setup(applyFile("bakery-cluster.yaml", namespace.getOrElse("default"))) _ <- within(setupWaitTime, setupWaitSplit)(for { - _ <- IO ( println(Console.GREEN + s"\nWaiting for environment (5s)..." + Console.RESET) ) + _ <- IO ( println(Console.GREEN + s"\nWaiting for bakery cluster (5s)..." + Console.RESET) ) + _ <- getPods(namespace) + status <- client.statusFromUri(args.stateServiceHostname / "api" / "v3" / "getAllRecipes") + } yield assert(status.code == 200)) + _ <- setup(applyFile("example-client-app.yaml", namespace.getOrElse("default"))) + _ <- within(setupWaitTime, setupWaitSplit)(for { + _ <- IO ( println(Console.GREEN + s"\nWaiting for client app (5s)..." + Console.RESET) ) _ <- getPods(namespace) status <- exampleAppClient.ping } yield assert(status.code == 200))