Using async akka.streams.Sink on the ServiceDiscovery

This commit is contained in:
Francisco M. Aramburo Torres
2020-03-06 23:04:25 +01:00
parent 4a2bfc40a4
commit b3cc04156e

View File

@@ -130,6 +130,8 @@ object ServiceDiscovery extends LazyLogging {
} yield ()
}
val paralellism = math.max(2, Runtime.getRuntime.availableProcessors())
val createServiceDiscovery = for {
currentServices <- Ref.of[IO, List[Service]](List.empty)
cacheInteractions <- Ref.of[IO, List[InteractionInstance]](List.empty)
@@ -140,7 +142,7 @@ object ServiceDiscovery extends LazyLogging {
killSwitch <- IO {
k8s.watchAllContinuously[Service]()
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(updateServices(_).unsafeRunAsyncAndForget()))(Keep.left)
.toMat(Sink.foreachAsync(paralellism)(updateServices(_).unsafeToFuture()))(Keep.left)
.run()
}
} yield (service, killSwitch)