mirror of
https://github.com/jlengrand/baker.git
synced 2026-03-10 08:01:23 +00:00
Merged master
This commit is contained in:
@@ -1,15 +1,14 @@
|
||||
package com.ing.baker.types
|
||||
|
||||
import com.typesafe.config.{ ConfigFactory, ConfigValue }
|
||||
import org.slf4j.{ Logger, LoggerFactory }
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.reflect.runtime.universe
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
import scala.util.Try
|
||||
|
||||
object Converters {
|
||||
object Converters extends LazyLogging{
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger("com.ing.baker.types")
|
||||
|
||||
private val configPathPrefix: String = "baker.types"
|
||||
private val defaultTypeConverter: TypeAdapter = new TypeAdapter(loadDefaultModulesFromConfig())
|
||||
@@ -38,7 +37,7 @@ object Converters {
|
||||
(clazz, module)
|
||||
}
|
||||
|
||||
if (tried.isFailure) log.error(s"Failed to load type module ${entry.getKey}")
|
||||
if (tried.isFailure) logger.error(s"Failed to load type module ${entry.getKey}")
|
||||
|
||||
tried.toOption
|
||||
}
|
||||
|
||||
@@ -1,24 +1,24 @@
|
||||
package com.ing.baker.types.modules
|
||||
|
||||
import com.ing.baker.types
|
||||
import com.ing.baker.types.{Converters, Int64}
|
||||
import com.ing.baker.types.Converters
|
||||
import org.joda.time.{DateTime, LocalDate, LocalDateTime}
|
||||
import org.scalacheck.Gen
|
||||
import org.scalacheck.Test.Parameters.defaultVerbose
|
||||
import org.scalatest.prop.Checkers
|
||||
import org.scalatest.{Matchers, WordSpecLike}
|
||||
import org.scalatestplus.scalacheck.Checkers
|
||||
|
||||
|
||||
class JodaTimeModuleSpec extends WordSpecLike with Matchers with Checkers {
|
||||
|
||||
val minSuccessfulTests = 100
|
||||
private val minSuccessfulTests = 100
|
||||
|
||||
// Long.MaxValue is not supported by joda time for local dates, resulting in a integer overflow
|
||||
// This shifts the long max value 1 bit to the right (divides by 2)
|
||||
// This translates to the date: Fri Apr 24 17:36:27 CEST 146140482
|
||||
val maxMillis = Long.MaxValue >> 1
|
||||
private val maxMillis = Long.MaxValue >> 1
|
||||
|
||||
val numGen: Gen[Long] = Gen.chooseNum[Long](
|
||||
private val numGen: Gen[Long] = Gen.chooseNum[Long](
|
||||
0L, maxMillis, 0, maxMillis
|
||||
)
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ import com.ing.baker.types.modules.PrimitiveModuleSpec._
|
||||
import org.scalacheck.Gen
|
||||
import org.scalacheck.Prop.{BooleanOperators, forAll}
|
||||
import org.scalacheck.Test.Parameters.defaultVerbose
|
||||
import org.scalatest.prop.Checkers
|
||||
import org.scalatestplus.scalacheck.Checkers
|
||||
import org.scalatest.{Matchers, WordSpecLike}
|
||||
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
@@ -17,15 +17,15 @@ import scala.reflect.runtime.universe.TypeTag
|
||||
object PrimitiveModuleSpec {
|
||||
|
||||
val intGen: Gen[Int] = Gen.chooseNum[Int](Integer.MIN_VALUE, Integer.MAX_VALUE)
|
||||
val langIntegerGen: Gen[lang.Integer] = intGen.map(Int.box(_))
|
||||
val langIntegerGen: Gen[lang.Integer] = intGen.map(Int.box)
|
||||
val longGen: Gen[Long] = Gen.chooseNum[Long](Long.MinValue, Long.MaxValue)
|
||||
val langLongGen: Gen[lang.Long] = Gen.chooseNum[Long](Long.MinValue, Long.MaxValue).map(Long.box(_))
|
||||
val langLongGen: Gen[lang.Long] = Gen.chooseNum[Long](Long.MinValue, Long.MaxValue).map(Long.box)
|
||||
val shortGen: Gen[Short] = Gen.chooseNum[Short](Short.MinValue, Short.MaxValue)
|
||||
val langShortGen: Gen[lang.Short] = shortGen.map(Short.box(_))
|
||||
val langShortGen: Gen[lang.Short] = shortGen.map(Short.box)
|
||||
val floatGen: Gen[Float] = Gen.chooseNum(Float.MinValue, Float.MaxValue)
|
||||
val langFloatGen: Gen[lang.Float] = floatGen.map(Float.box(_))
|
||||
val langFloatGen: Gen[lang.Float] = floatGen.map(Float.box)
|
||||
val doubleGen: Gen[Double] = Gen.chooseNum[Double](Double.MinValue, Double.MaxValue)
|
||||
val langDoubleGen: Gen[lang.Double] = doubleGen.map(Double.box(_))
|
||||
val langDoubleGen: Gen[lang.Double] = doubleGen.map(Double.box)
|
||||
val stringGen: Gen[String] = Gen.alphaStr
|
||||
val bigIntGen: Gen[BigInt] = longGen.map(BigInt(_))
|
||||
val javaBigIntGen: Gen[java.math.BigInteger] = bigIntGen.map(_.bigInteger)
|
||||
|
||||
@@ -3,22 +3,22 @@ package com.ing.baker.types.modules
|
||||
import com.ing.baker.types
|
||||
import com.ing.baker.types.Converters.{readJavaType, toJava, toValue}
|
||||
import com.ing.baker.types._
|
||||
import org.scalatest.prop.Checkers
|
||||
import org.scalatest.{Matchers, WordSpecLike}
|
||||
import org.scalatestplus.scalacheck.Checkers
|
||||
|
||||
class ScalaModulesSpec extends WordSpecLike with Matchers with Checkers {
|
||||
|
||||
val listValue123 = ListValue(List(PrimitiveValue(1), PrimitiveValue(2), PrimitiveValue(3)))
|
||||
private val listValue123 = ListValue(List(PrimitiveValue(1), PrimitiveValue(2), PrimitiveValue(3)))
|
||||
|
||||
val recordPerson = RecordValue(Map("name" -> PrimitiveValue("john"), "age" -> PrimitiveValue(42)))
|
||||
private val recordPerson = RecordValue(Map("name" -> PrimitiveValue("john"), "age" -> PrimitiveValue(42)))
|
||||
|
||||
val valueMap = RecordValue(Map(
|
||||
private val valueMap = RecordValue(Map(
|
||||
"a" -> PrimitiveValue(1),
|
||||
"b" -> PrimitiveValue(2),
|
||||
"c" -> PrimitiveValue(3)
|
||||
))
|
||||
|
||||
val scalaMap = Map(
|
||||
private val scalaMap = Map(
|
||||
"a" -> 1,
|
||||
"b" -> 2,
|
||||
"c" -> 3
|
||||
@@ -35,24 +35,20 @@ class ScalaModulesSpec extends WordSpecLike with Matchers with Checkers {
|
||||
}
|
||||
|
||||
"correctly parse set types" in {
|
||||
|
||||
readJavaType[Set[String]] shouldBe ListType(types.CharArray)
|
||||
readJavaType[java.util.Set[String]] shouldBe ListType(types.CharArray)
|
||||
}
|
||||
|
||||
"correctly parse map types" in {
|
||||
|
||||
readJavaType[Map[String, Int]] shouldBe MapType(types.Int32)
|
||||
readJavaType[java.util.Map[String, Int]] shouldBe MapType(types.Int32)
|
||||
}
|
||||
|
||||
"be able to autobox null values to scala Options" in {
|
||||
|
||||
toJava[Option[Int]](NullValue) shouldBe None
|
||||
}
|
||||
|
||||
"be able to autobox primitive values to scala Options" in {
|
||||
|
||||
toJava[Option[Int]](PrimitiveValue(42)) shouldBe Some(42)
|
||||
}
|
||||
|
||||
@@ -61,42 +57,34 @@ class ScalaModulesSpec extends WordSpecLike with Matchers with Checkers {
|
||||
}
|
||||
|
||||
"be able to parse scala.collection.immutable.List objects" in {
|
||||
|
||||
toValue(List(1, 2, 3)) shouldBe listValue123
|
||||
}
|
||||
|
||||
"be able to create scala.collection.immutable.List objects" in {
|
||||
|
||||
toJava[List[Int]](listValue123) shouldBe List(1, 2, 3)
|
||||
}
|
||||
|
||||
"be able to parse scala.collection.immutable.Set objects" in {
|
||||
|
||||
toValue(Set(1, 2, 3)) shouldBe listValue123
|
||||
}
|
||||
|
||||
"be able to create scala.collection.immutable.Set objects" in {
|
||||
|
||||
toJava[Set[Int]](listValue123) shouldBe Set(1, 2, 3)
|
||||
}
|
||||
|
||||
"be able to parse case class objects" in {
|
||||
|
||||
toValue(PersonCaseClass("john", 42)) shouldBe recordPerson
|
||||
}
|
||||
|
||||
"be able to create case class objects" in {
|
||||
|
||||
toJava[PersonCaseClass](recordPerson) shouldBe PersonCaseClass("john", 42)
|
||||
}
|
||||
|
||||
"be able to parse scala.collection.immutable.Map objects" in {
|
||||
|
||||
toValue(scalaMap) shouldBe valueMap
|
||||
}
|
||||
|
||||
"be able to create scala.collection.immutable.Map objects" in {
|
||||
|
||||
toJava[Map[String, Int]](valueMap) shouldBe scalaMap
|
||||
}
|
||||
}
|
||||
|
||||
32
build.sbt
32
build.sbt
@@ -1,7 +1,7 @@
|
||||
import Dependencies.{scalaGraph, _}
|
||||
import sbt.Keys._
|
||||
|
||||
def testScope(project: ProjectReference) = project % "test->test;test->compile"
|
||||
def testScope(project: ProjectReference): ClasspathDep[ProjectReference] = project % "test->test;test->compile"
|
||||
|
||||
val commonSettings = Defaults.coreDefaultSettings ++ Seq(
|
||||
organization := "com.ing.baker",
|
||||
@@ -45,7 +45,7 @@ lazy val noPublishSettings = Seq(
|
||||
publishArtifact := false
|
||||
)
|
||||
|
||||
lazy val defaultModuleSettings = commonSettings ++ dependencyOverrideSettings ++ Revolver.settings ++ SonatypePublish.settings
|
||||
lazy val defaultModuleSettings = commonSettings ++ dependencyOverrideSettings ++ SonatypePublish.settings
|
||||
|
||||
lazy val scalaPBSettings = Seq(PB.targets in Compile := Seq(scalapb.gen() -> (sourceManaged in Compile).value))
|
||||
|
||||
@@ -55,11 +55,12 @@ lazy val bakertypes = project.in(file("bakertypes"))
|
||||
moduleName := "baker-types",
|
||||
libraryDependencies ++= compileDeps(
|
||||
slf4jApi,
|
||||
ficusConfig,
|
||||
objenisis,
|
||||
scalapbRuntime,
|
||||
jodaTime,
|
||||
scalaReflect(scalaVersion.value)
|
||||
typeSafeConfig,
|
||||
scalaReflect(scalaVersion.value),
|
||||
scalaLogging
|
||||
) ++ testDeps(scalaTest, scalaCheck, logback, scalaCheck)
|
||||
)
|
||||
|
||||
@@ -71,8 +72,8 @@ lazy val intermediateLanguage = project.in(file("intermediate-language"))
|
||||
scalaGraph,
|
||||
slf4jApi,
|
||||
scalaGraphDot,
|
||||
objenisis,
|
||||
typeSafeConfig
|
||||
typeSafeConfig,
|
||||
scalaLogging
|
||||
) ++ testDeps(scalaTest, scalaCheck, logback)
|
||||
).dependsOn(bakertypes)
|
||||
|
||||
@@ -106,19 +107,14 @@ lazy val runtime = project.in(file("runtime"))
|
||||
akkaDistributedData,
|
||||
akkaClusterSharding,
|
||||
akkaBoostrap,
|
||||
akkaInmemoryJournal,
|
||||
akkaSlf4j,
|
||||
ficusConfig,
|
||||
catsCore,
|
||||
catsEffect,
|
||||
guava,
|
||||
chill,
|
||||
objenisis,
|
||||
scalapbRuntime,
|
||||
protobufJava,
|
||||
kryo,
|
||||
kryoSerializers,
|
||||
slf4jApi
|
||||
slf4jApi,
|
||||
scalaLogging
|
||||
) ++ testDeps(
|
||||
akkaStream,
|
||||
akkaTestKit,
|
||||
@@ -158,13 +154,14 @@ lazy val splitBrainResolver = project.in(file("split-brain-resolver"))
|
||||
compileDeps(
|
||||
akkaActor,
|
||||
akkaCluster,
|
||||
akkaSlf4j,
|
||||
ficusConfig
|
||||
ficusConfig,
|
||||
slf4jApi,
|
||||
scalaLogging
|
||||
) ++ testDeps(
|
||||
akkaTestKit,
|
||||
akkaMultiNodeTestkit,
|
||||
scalaTest
|
||||
) ++ providedDeps(findbugs)
|
||||
)
|
||||
)
|
||||
.enablePlugins(MultiJvmPlugin)
|
||||
.configs(MultiJvm)
|
||||
@@ -198,7 +195,7 @@ lazy val recipeCompiler = project.in(file("compiler"))
|
||||
.settings(
|
||||
moduleName := "baker-compiler",
|
||||
libraryDependencies ++=
|
||||
compileDeps(slf4jApi) ++ testDeps(scalaTest, scalaCheck, logback, junitJupiter)
|
||||
testDeps(scalaTest, scalaCheck, logback, junitJupiter)
|
||||
)
|
||||
.dependsOn(recipeDsl, intermediateLanguage, testScope(recipeDsl))
|
||||
|
||||
@@ -298,6 +295,7 @@ lazy val `baas-tests` = project.in(file("baas-tests"))
|
||||
testDeps(
|
||||
akkaSlf4j,
|
||||
akkaTestKit,
|
||||
akkaInmemoryJournal,
|
||||
logback,
|
||||
scalaTest,
|
||||
junitInterface,
|
||||
|
||||
@@ -221,4 +221,12 @@ In this release we have removed sieves completely.
|
||||
In JBaker we supported using a UUID as a processId.
|
||||
In this release this has been removed completely and we accept Strings.
|
||||
This is again in line with being clearer what Baker does.
|
||||
Internally we were just transforming this to a String.
|
||||
Internally we were just transforming this to a String.
|
||||
|
||||
### Accesss to RuntimeEvents
|
||||
Version 2 would provided undocumented public interfaces returning RuntimeEvent instances.
|
||||
In version 3, the possibility to get Ingredients provided by a specific Event is gone.
|
||||
Users should not care where ingredients are provided from.
|
||||
This could be from an SensoryEvent or as output of a Event from an Interaction.
|
||||
This should not matter, only if the ingredient is available should matter.
|
||||
This allow users to flexibly recipes without impacting client code.
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
package com.ing.baker.il
|
||||
|
||||
import com.ing.baker.il.RecipeVisualizer.log
|
||||
import com.typesafe.config.Config
|
||||
import scalax.collection.io.dot.{DotAttr, DotAttrStmt, Elem}
|
||||
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
import scala.collection.JavaConverters._
|
||||
import scalax.collection.io.dot.implicits._
|
||||
import scalax.collection.io.dot.{ DotAttr, DotAttrStmt, Elem }
|
||||
|
||||
object RecipeVisualStyle {
|
||||
object RecipeVisualStyle extends LazyLogging {
|
||||
|
||||
def default: RecipeVisualStyle = RecipeVisualStyle()
|
||||
|
||||
@@ -16,7 +15,7 @@ object RecipeVisualStyle {
|
||||
val visualizationConfig = config.getConfig("baker.visualization")
|
||||
val configuredStyle = visualizationConfig.getString("style")
|
||||
val pickedStyle = if (!visualizationConfig.hasPath(s"styles.$configuredStyle")) {
|
||||
log.warn(s"no configuration for recipe style '$configuredStyle' found, falling back to 'default' style")
|
||||
logger.warn(s"no configuration for recipe style '$configuredStyle' found, falling back to 'default' style")
|
||||
"default"
|
||||
} else
|
||||
configuredStyle
|
||||
@@ -33,15 +32,15 @@ object RecipeVisualStyle {
|
||||
values
|
||||
.-("shape") // shape is not allowed to be overriden
|
||||
.map {
|
||||
case (key, s: String) => Some(DotAttr(key, s))
|
||||
case (key, n: java.lang.Integer) => Some(DotAttr(key, n.intValue()))
|
||||
case (key, n: java.lang.Long) => Some(DotAttr(key, n.longValue()))
|
||||
case (key, n: java.lang.Float) => Some(DotAttr(key, n.floatValue()))
|
||||
case (key, n: java.lang.Double) => Some(DotAttr(key, n.doubleValue()))
|
||||
case (key, other) =>
|
||||
RecipeVisualizer.log.warn(s"unusable configuration: $key = $other");
|
||||
None
|
||||
}.toList.flatten
|
||||
case (key, s: String) => Some(DotAttr(key, s))
|
||||
case (key, n: java.lang.Integer) => Some(DotAttr(key, n.intValue()))
|
||||
case (key, n: java.lang.Long) => Some(DotAttr(key, n.longValue()))
|
||||
case (key, n: java.lang.Float) => Some(DotAttr(key, n.floatValue()))
|
||||
case (key, n: java.lang.Double) => Some(DotAttr(key, n.doubleValue()))
|
||||
case (key, other) =>
|
||||
RecipeVisualizer.logger.warn(s"unusable configuration: $key = $other")
|
||||
None
|
||||
}.toList.flatten
|
||||
}
|
||||
|
||||
RecipeVisualStyle(
|
||||
|
||||
@@ -3,18 +3,18 @@ package com.ing.baker.il
|
||||
import com.ing.baker.il.petrinet.Place._
|
||||
import com.ing.baker.il.petrinet._
|
||||
import com.ing.baker.petrinet.api._
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
import com.typesafe.scalalogging.{ LazyLogging, Logger }
|
||||
import org.slf4j.LoggerFactory
|
||||
import scala.language.higherKinds
|
||||
import scalax.collection.Graph
|
||||
import scalax.collection.edge.WLDiEdge
|
||||
import scalax.collection.io.dot.implicits._
|
||||
import scalax.collection.io.dot.{DotAttr, _}
|
||||
|
||||
import scala.language.higherKinds
|
||||
import scalax.collection.io.dot.{ DotAttr, _ }
|
||||
|
||||
object RecipeVisualizer {
|
||||
|
||||
val log = LoggerFactory.getLogger("com.ing.baker.il.RecipeVisualizer")
|
||||
@transient
|
||||
lazy val logger: Logger = Logger(LoggerFactory.getLogger(getClass.getName))
|
||||
|
||||
type RecipePetriNetGraph = Graph[Either[Place, Transition], WLDiEdge]
|
||||
|
||||
@@ -23,7 +23,7 @@ object RecipeVisualizer {
|
||||
def compactNode(node: RecipePetriNetGraph#NodeT): RecipePetriNetGraph = {
|
||||
|
||||
// create direct edges from all incoming to outgoing nodes
|
||||
val newEdges = node.incomingNodes.flatMap { incomingNode =>
|
||||
val newEdges = node.incomingNodes.flatMap {incomingNode =>
|
||||
node.outgoingNodes.map(n => WLDiEdge[Node, String](incomingNode, n)(0, ""))
|
||||
}
|
||||
|
||||
@@ -34,13 +34,13 @@ object RecipeVisualizer {
|
||||
def compactAllNodes(fn: RecipePetriNetGraph#NodeT => Boolean): RecipePetriNetGraph =
|
||||
graph.nodes.foldLeft(graph) {
|
||||
case (acc, node) if fn(node) => acc.compactNode(node)
|
||||
case (acc, _) => acc
|
||||
case (acc, _) => acc
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the label for a node.
|
||||
*/
|
||||
* Returns the label for a node.
|
||||
*/
|
||||
private def nodeLabelFn: Either[Place, Transition] ⇒ String = {
|
||||
case Left(Place(label, EmptyEventIngredientPlace)) ⇒ s"empty:${label}"
|
||||
case Left(place) ⇒ place.label
|
||||
@@ -49,8 +49,8 @@ object RecipeVisualizer {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the style attributes for a node.
|
||||
*/
|
||||
* Returns the style attributes for a node.
|
||||
*/
|
||||
private def nodeDotAttrFn(style: RecipeVisualStyle): (RecipePetriNetGraph#NodeT, Set[String], Set[String]) => List[DotAttr] =
|
||||
(node: RecipePetriNetGraph#NodeT, eventNames: Set[String], ingredientNames: Set[String]) ⇒
|
||||
node.value match {
|
||||
@@ -90,10 +90,10 @@ object RecipeVisualizer {
|
||||
|
||||
// specifies which places to compact (remove)
|
||||
val placesToCompact = (node: RecipePetriNetGraph#NodeT) => node.value match {
|
||||
case Left(Place(_, IngredientPlace)) => false
|
||||
case Left(Place(_, IngredientPlace)) => false
|
||||
case Left(Place(_, EmptyEventIngredientPlace)) => false
|
||||
case Left(Place(_, EventOrPreconditionPlace)) => false
|
||||
case Left(Place(_, _)) => true
|
||||
case Left(Place(_, EventOrPreconditionPlace)) => false
|
||||
case Left(Place(_, _)) => true
|
||||
case _ => false
|
||||
}
|
||||
|
||||
@@ -118,22 +118,22 @@ object RecipeVisualizer {
|
||||
}
|
||||
|
||||
def visualizeRecipe(recipe: CompiledRecipe,
|
||||
style: RecipeVisualStyle,
|
||||
filter: String => Boolean = _ => true,
|
||||
eventNames: Set[String] = Set.empty,
|
||||
ingredientNames: Set[String] = Set.empty): String =
|
||||
style: RecipeVisualStyle,
|
||||
filter: String => Boolean = _ => true,
|
||||
eventNames: Set[String] = Set.empty,
|
||||
ingredientNames: Set[String] = Set.empty): String =
|
||||
generateDot(recipe.petriNet.innerGraph, style, filter, eventNames, ingredientNames)
|
||||
|
||||
|
||||
def visualizePetriNet[P, T](graph: PetriNetGraph[P, T]): String = {
|
||||
|
||||
val nodeLabelFn: Either[P, T] ⇒ String = node ⇒ node match {
|
||||
case Left(p) ⇒ p.toString
|
||||
case Left(p) ⇒ p.toString
|
||||
case Right(t) ⇒ t.toString
|
||||
}
|
||||
|
||||
val nodeDotAttrFn: Either[P, T] => List[DotAttr] = node ⇒ node match {
|
||||
case Left(_) ⇒ List(DotAttr("shape", "circle"))
|
||||
case Left(_) ⇒ List(DotAttr("shape", "circle"))
|
||||
case Right(_) ⇒ List(DotAttr("shape", "square"))
|
||||
}
|
||||
|
||||
|
||||
@@ -2,13 +2,9 @@ package com.ing.baker.il
|
||||
|
||||
import org.scalacheck.{Gen, Prop, Test}
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.prop.Checkers
|
||||
import org.scalatestplus.scalacheck.Checkers
|
||||
|
||||
class HashcodeGenerationSpec extends FunSuite with Checkers {
|
||||
|
||||
// def hash(str: String): Long = str.hashCode // Test fails with this hash function
|
||||
def hash(str: String): Long = sha256HashCode(str)
|
||||
|
||||
test("sha256 hash function") {
|
||||
val prop = Prop.forAll(Gen.alphaNumStr, Gen.alphaNumStr) {
|
||||
(s1: String, s2: String) => {
|
||||
@@ -20,7 +16,5 @@ class HashcodeGenerationSpec extends FunSuite with Checkers {
|
||||
check(prop, Test.Parameters.defaultVerbose.withMinSuccessfulTests(100 * 1000))
|
||||
}
|
||||
|
||||
// test("test2") {
|
||||
// assert("sr".hashCode != "u4".hashCode)
|
||||
// }
|
||||
private def hash(str: String): Long = sha256HashCode(str)
|
||||
}
|
||||
|
||||
@@ -3,32 +3,30 @@ import sbt._
|
||||
//noinspection TypeAnnotation
|
||||
object Dependencies {
|
||||
|
||||
val akkaVersion = "2.5.23"
|
||||
val akkaVersion = "2.5.26"
|
||||
val http4sVersion = "0.20.0"
|
||||
val circeVersion = "0.11.1"
|
||||
|
||||
val jvmV = "1.8"
|
||||
val scalapbVersion = scalapb.compiler.Version.scalapbVersion
|
||||
|
||||
val typeSafeConfig = "com.typesafe" % "config" % "1.3.1"
|
||||
|
||||
val akkaInmemoryJournal = ("com.github.dnvriend" %% "akka-persistence-inmemory" % "2.5.1.1")
|
||||
val akkaInmemoryJournal = ("com.github.dnvriend" %% "akka-persistence-inmemory" % "2.5.15.2")
|
||||
.exclude("com.typesafe.akka", "akka-actor")
|
||||
.exclude("com.typesafe.akka", "akka-persistence")
|
||||
.exclude("com.typesafe.akka", "akka-persistence-query")
|
||||
.exclude("com.typesafe.akka", "akka-stream")
|
||||
|
||||
val scalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0"
|
||||
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.5"
|
||||
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.8"
|
||||
val mockito = "org.mockito" % "mockito-all" % "1.10.19"
|
||||
val junitInterface = "com.novocode" % "junit-interface" % "0.11"
|
||||
val junitJupiter = "org.junit.jupiter" % "junit-jupiter-engine" % "5.0.0"
|
||||
val junitJupiter = "org.junit.jupiter" % "junit-jupiter-engine" % "5.5.2"
|
||||
|
||||
val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion
|
||||
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
|
||||
val akkaPersistence = "com.typesafe.akka" %% "akka-persistence" % akkaVersion
|
||||
val akkaPersistenceQuery = "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion
|
||||
val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.54"
|
||||
val akkaPersistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.100"
|
||||
val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % akkaVersion
|
||||
val akkaClusterSharding = "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
|
||||
val akkaDistributedData = "com.typesafe.akka" %% "akka-distributed-data" % akkaVersion
|
||||
@@ -40,9 +38,11 @@ object Dependencies {
|
||||
val akkaManagementHttp = "com.lightbend.akka.management" %% "akka-management-cluster-http" % "1.0.5"
|
||||
val akkaClusterBoostrap = "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % "1.0.5"
|
||||
val akkaDiscoveryKube = "com.lightbend.akka.discovery" %% "akka-discovery-kubernetes-api" % "1.0.5"
|
||||
val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.10"
|
||||
val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11"
|
||||
val akkaBoostrap = "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % "1.0.5"
|
||||
val levelDB = "org.iq80.leveldb" % "leveldb" % "0.7"
|
||||
|
||||
val levelDB = "org.iq80.leveldb" % "leveldb" % "0.12"
|
||||
|
||||
val levelDBJni = "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
|
||||
|
||||
val logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
|
||||
@@ -50,7 +50,7 @@ object Dependencies {
|
||||
|
||||
val scalaGraph = "org.scala-graph" %% "graph-core" % "1.11.5"
|
||||
val scalaGraphDot = "org.scala-graph" %% "graph-dot" % "1.11.5"
|
||||
val graphvizJava = "guru.nidi" % "graphviz-java" % "0.8.0"
|
||||
val graphvizJava = "guru.nidi" % "graphviz-java" % "0.8.10"
|
||||
|
||||
val kamon = "io.kamon" %% "kamon-bundle" % "2.0.0"
|
||||
val kamonAkka = "io.kamon" %% "kamon-akka" % "2.0.0"
|
||||
@@ -62,33 +62,33 @@ object Dependencies {
|
||||
val http4sCirce = "org.http4s" %% "http4s-circe" % http4sVersion
|
||||
val circe = "io.circe" %% "circe-core" % circeVersion
|
||||
val circeGeneric = "io.circe" %% "circe-generic" % circeVersion
|
||||
val catsEffect = "org.typelevel" %% "cats-effect" % "1.2.0"
|
||||
val catsCore = "org.typelevel" %% "cats-core" % "1.5.0"
|
||||
|
||||
val catsEffect = "org.typelevel" %% "cats-effect" % "2.0.0"
|
||||
val catsCore = "org.typelevel" %% "cats-core" % "2.0.0"
|
||||
|
||||
def scalaReflect(scalaV: String): ModuleID = "org.scala-lang"% "scala-reflect" % scalaV
|
||||
val javaxInject = "javax.inject" % "javax.inject" % "1"
|
||||
|
||||
val paranamer = "com.thoughtworks.paranamer" % "paranamer" % "2.8"
|
||||
val guava = "com.google.guava" % "guava" % "19.0"
|
||||
val findbugs = "com.google.code.findbugs" % "jsr305" % "1.3.9"
|
||||
|
||||
val scalapbRuntime = "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf"
|
||||
val chill = ("com.twitter" %% "chill-akka" % "0.9.4")
|
||||
.exclude("com.typesafe.akka", "akka-actor")
|
||||
|
||||
val kryo = "com.esotericsoftware" % "kryo" % "4.0.0"
|
||||
val protobufJava = "com.google.protobuf" % "protobuf-java" % "3.11.1"
|
||||
|
||||
val protobufJava = "com.google.protobuf" % "protobuf-java" % "3.5.1"
|
||||
val betterFiles = "com.github.pathikrit" %% "better-files" % "3.8.0"
|
||||
|
||||
val betterFiles = "com.github.pathikrit" %% "better-files" % "3.6.0"
|
||||
val typeSafeConfig = "com.typesafe" % "config" % "1.4.0"
|
||||
|
||||
val kryoSerializers = "de.javakaffee" % "kryo-serializers" % "0.41"
|
||||
val objenisis = "org.objenesis" % "objenesis" % "2.5.1"
|
||||
val objenisis = "org.objenesis" % "objenesis" % "3.1"
|
||||
|
||||
val jodaTime = "joda-time" % "joda-time" % "2.10.5"
|
||||
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.29"
|
||||
|
||||
val jodaTime = "joda-time" % "joda-time" % "2.9.9"
|
||||
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.25"
|
||||
val slf4jSimple = "org.slf4j" % "slf4j-simple" % "1.7.5"
|
||||
val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.13.4"
|
||||
val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.13.5"
|
||||
|
||||
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2"
|
||||
|
||||
def scopeDeps(scope: String, modules: Seq[ModuleID]) = modules.map(m => m % scope)
|
||||
def compileDeps(modules: ModuleID*) = modules.toSeq
|
||||
|
||||
@@ -1 +1 @@
|
||||
sbt.version=1.1.4
|
||||
sbt.version=1.2.8
|
||||
|
||||
@@ -1,26 +1,20 @@
|
||||
addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")
|
||||
|
||||
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
|
||||
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
|
||||
|
||||
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.0")
|
||||
|
||||
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.7")
|
||||
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.12")
|
||||
|
||||
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1")
|
||||
|
||||
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.3.4")
|
||||
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.8.1")
|
||||
|
||||
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3")
|
||||
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.0")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.4.0")
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.5.1")
|
||||
|
||||
libraryDependencies += "org.slf4j" % "slf4j-nop" % "1.7.25"
|
||||
libraryDependencies += "org.slf4j" % "slf4j-nop" % "1.7.29"
|
||||
|
||||
// For the example application
|
||||
//addSbtPlugin("io.kamon" % "sbt-kanela-runner" % "2.0.1")
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
|
||||
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.27")
|
||||
|
||||
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4"
|
||||
@@ -7,7 +7,7 @@ import org.scalacheck.Prop.forAll
|
||||
import org.scalacheck.Test.Parameters.defaultVerbose
|
||||
import org.scalacheck.{Arbitrary, Gen, Test}
|
||||
import org.scalatest.FunSuiteLike
|
||||
import org.scalatest.prop.Checkers
|
||||
import org.scalatestplus.scalacheck.Checkers
|
||||
|
||||
class HashingSpec extends FunSuiteLike with Checkers {
|
||||
|
||||
@@ -16,7 +16,7 @@ class HashingSpec extends FunSuiteLike with Checkers {
|
||||
|
||||
def hashingLaw[A: Arbitrary](): Unit = {
|
||||
check(forAll { (x: A, y: A) =>
|
||||
if(x == y) x.hashCode() == y.hashCode()
|
||||
if (x == y) x.hashCode() == y.hashCode()
|
||||
else x.hashCode() != y.hashCode()
|
||||
}, config)
|
||||
}
|
||||
@@ -82,5 +82,4 @@ object HashingSpec {
|
||||
|
||||
def optionTypeGen: Gen[OptionType] =
|
||||
primitiveTypeGen.map(OptionType)
|
||||
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,7 @@ import com.ing.baker.runtime.{javadsl, scaladsl}
|
||||
import com.ing.baker.runtime.scaladsl._
|
||||
import com.ing.baker.types.Value
|
||||
import com.typesafe.config.Config
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.Future
|
||||
@@ -51,15 +51,13 @@ object AkkaBaker {
|
||||
}
|
||||
|
||||
/**
|
||||
* The Baker is the component of the Baker library that runs one or multiples recipes.
|
||||
* For each recipe a new instance can be baked, sensory events can be send and state can be inquired upon
|
||||
*/
|
||||
class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker {
|
||||
* The Baker is the component of the Baker library that runs one or multiples recipes.
|
||||
* For each recipe a new instance can be baked, sensory events can be send and state can be inquired upon
|
||||
*/
|
||||
class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker with LazyLogging {
|
||||
|
||||
import config.system
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger(classOf[AkkaBaker])
|
||||
|
||||
val recipeManager: ActorRef =
|
||||
config.bakerActorProvider.createRecipeManagerActor()
|
||||
|
||||
@@ -67,13 +65,13 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
|
||||
config.bakerActorProvider.createProcessIndexActor(config.interactionManager, recipeManager)
|
||||
|
||||
/**
|
||||
* Adds a recipe to baker and returns a recipeId for the recipe.
|
||||
*
|
||||
* This function is idempotent, if the same (equal) recipe was added earlier this will return the same recipeId
|
||||
*
|
||||
* @param compiledRecipe The compiled recipe.
|
||||
* @return A recipeId
|
||||
*/
|
||||
* Adds a recipe to baker and returns a recipeId for the recipe.
|
||||
*
|
||||
* This function is idempotent, if the same (equal) recipe was added earlier this will return the same recipeId
|
||||
*
|
||||
* @param compiledRecipe The compiled recipe.
|
||||
* @return A recipeId
|
||||
*/
|
||||
override def addRecipe(compiledRecipe: CompiledRecipe): Future[String] = {
|
||||
|
||||
// check if every interaction has an implementation
|
||||
@@ -96,11 +94,11 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the recipe information for the given RecipeId
|
||||
*
|
||||
* @param recipeId
|
||||
* @return
|
||||
*/
|
||||
* Returns the recipe information for the given RecipeId
|
||||
*
|
||||
* @param recipeId
|
||||
* @return
|
||||
*/
|
||||
override def getRecipe(recipeId: String): Future[RecipeInformation] = {
|
||||
// here we ask the RecipeManager actor to return us the recipe for the given id
|
||||
recipeManager.ask(RecipeManagerProtocol.GetRecipe(recipeId))(config.defaultInquireTimeout).flatMap {
|
||||
@@ -112,24 +110,24 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all recipes added to this baker instance.
|
||||
*
|
||||
* @return All recipes in the form of map of recipeId -> CompiledRecipe
|
||||
*/
|
||||
* Returns all recipes added to this baker instance.
|
||||
*
|
||||
* @return All recipes in the form of map of recipeId -> CompiledRecipe
|
||||
*/
|
||||
override def getAllRecipes: Future[Map[String, RecipeInformation]] =
|
||||
recipeManager.ask(RecipeManagerProtocol.GetAllRecipes)(config.defaultInquireTimeout)
|
||||
.mapTo[RecipeManagerProtocol.AllRecipes]
|
||||
.map(_.recipes.map { ri =>
|
||||
.map(_.recipes.map {ri =>
|
||||
ri.compiledRecipe.recipeId -> RecipeInformation(ri.compiledRecipe, ri.timestamp, getImplementationErrors(ri.compiledRecipe))
|
||||
}.toMap)
|
||||
|
||||
/**
|
||||
* Creates a process instance for the given recipeId with the given RecipeInstanceId as identifier
|
||||
*
|
||||
* @param recipeId The recipeId for the recipe to bake
|
||||
* @param recipeInstanceId The identifier for the newly baked process
|
||||
* @return
|
||||
*/
|
||||
* Creates a process instance for the given recipeId with the given RecipeInstanceId as identifier
|
||||
*
|
||||
* @param recipeId The recipeId for the recipe to bake
|
||||
* @param recipeInstanceId The identifier for the newly baked process
|
||||
* @return
|
||||
*/
|
||||
override def bake(recipeId: String, recipeInstanceId: String): Future[Unit] = {
|
||||
processIndexActor.ask(CreateProcess(recipeId, recipeInstanceId))(config.defaultBakeTimeout).flatMap {
|
||||
case _: Initialized =>
|
||||
@@ -262,44 +260,44 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
|
||||
}
|
||||
|
||||
/**
|
||||
* Retries a blocked interaction.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
* Retries a blocked interaction.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
override def retryInteraction(recipeInstanceId: String, interactionName: String): Future[Unit] = {
|
||||
processIndexActor.ask(RetryBlockedInteraction(recipeInstanceId, interactionName))(config.defaultProcessEventTimeout).map(_ => ())
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves a blocked interaction by specifying it's output.
|
||||
*
|
||||
* !!! You should provide an event of the original interaction. Event / ingredient renames are done by Baker.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
* Resolves a blocked interaction by specifying it's output.
|
||||
*
|
||||
* !!! You should provide an event of the original interaction. Event / ingredient renames are done by Baker.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
override def resolveInteraction(recipeInstanceId: String, interactionName: String, event: EventInstance): Future[Unit] = {
|
||||
processIndexActor.ask(ResolveBlockedInteraction(recipeInstanceId, interactionName, event))(config.defaultProcessEventTimeout).map(_ => ())
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the retrying of an interaction.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
* Stops the retrying of an interaction.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
override def stopRetryingInteraction(recipeInstanceId: String, interactionName: String): Future[Unit] = {
|
||||
processIndexActor.ask(StopRetryingInteraction(recipeInstanceId, interactionName))(config.defaultProcessEventTimeout).map(_ => ())
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an index of all processes.
|
||||
*
|
||||
* Can potentially return a partial index when baker runs in cluster mode
|
||||
* and not all shards can be reached within the given timeout.
|
||||
*
|
||||
* Does not include deleted processes.
|
||||
*
|
||||
* @return An index of all processes
|
||||
*/
|
||||
* Returns an index of all processes.
|
||||
*
|
||||
* Can potentially return a partial index when baker runs in cluster mode
|
||||
* and not all shards can be reached within the given timeout.
|
||||
*
|
||||
* Does not include deleted processes.
|
||||
*
|
||||
* @return An index of all processes
|
||||
*/
|
||||
override def getAllRecipeInstancesMetadata: Future[Set[RecipeInstanceMetadata]] = {
|
||||
Future.successful(config.bakerActorProvider
|
||||
.getAllProcessesMetadata(processIndexActor)(system, config.defaultInquireTimeout)
|
||||
@@ -307,11 +305,11 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the process state.
|
||||
*
|
||||
* @param recipeInstanceId The process identifier
|
||||
* @return The process state.
|
||||
*/
|
||||
* Returns the process state.
|
||||
*
|
||||
* @param recipeInstanceId The process identifier
|
||||
* @return The process state.
|
||||
*/
|
||||
override def getRecipeInstanceState(recipeInstanceId: String): Future[RecipeInstanceState] =
|
||||
processIndexActor
|
||||
.ask(GetProcessState(recipeInstanceId))(Timeout.durationToTimeout(config.defaultInquireTimeout))
|
||||
@@ -322,38 +320,38 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all provided ingredients for a given process id.
|
||||
*
|
||||
* @param recipeInstanceId The process id.
|
||||
* @return The provided ingredients.
|
||||
*/
|
||||
* Returns all provided ingredients for a given process id.
|
||||
*
|
||||
* @param recipeInstanceId The process id.
|
||||
* @return The provided ingredients.
|
||||
*/
|
||||
override def getIngredients(recipeInstanceId: String): Future[Map[String, Value]] =
|
||||
getRecipeInstanceState(recipeInstanceId).map(_.ingredients)
|
||||
|
||||
/**
|
||||
* Returns all fired events for a given RecipeInstance id.
|
||||
*
|
||||
* @param recipeInstanceId The process id.
|
||||
* @return The events
|
||||
*/
|
||||
* Returns all fired events for a given RecipeInstance id.
|
||||
*
|
||||
* @param recipeInstanceId The process id.
|
||||
* @return The events
|
||||
*/
|
||||
override def getEvents(recipeInstanceId: String): Future[Seq[EventMoment]] =
|
||||
getRecipeInstanceState(recipeInstanceId).map(_.events)
|
||||
|
||||
/**
|
||||
* Returns all names of fired events for a given RecipeInstance id.
|
||||
*
|
||||
* @param recipeInstanceId The process id.
|
||||
* @return The event names
|
||||
*/
|
||||
* Returns all names of fired events for a given RecipeInstance id.
|
||||
*
|
||||
* @param recipeInstanceId The process id.
|
||||
* @return The event names
|
||||
*/
|
||||
override def getEventNames(recipeInstanceId: String): Future[Seq[String]] =
|
||||
getRecipeInstanceState(recipeInstanceId).map(_.eventNames)
|
||||
|
||||
/**
|
||||
* Returns the visual state (.dot) for a given process.
|
||||
*
|
||||
* @param recipeInstanceId The process identifier.
|
||||
* @return A visual (.dot) representation of the process state.
|
||||
*/
|
||||
* Returns the visual state (.dot) for a given process.
|
||||
*
|
||||
* @param recipeInstanceId The process identifier.
|
||||
* @return A visual (.dot) representation of the process state.
|
||||
*/
|
||||
@throws[ProcessDeletedException]("If the process is already deleted")
|
||||
@throws[NoSuchProcessException]("If the process is not found")
|
||||
override def getVisualState(recipeInstanceId: String, style: RecipeVisualStyle = RecipeVisualStyle.default): Future[String] = {
|
||||
@@ -388,38 +386,38 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a listener to all runtime events for recipes with the given name run in this baker instance.
|
||||
*
|
||||
* Note that the delivery guarantee is *AT MOST ONCE*. Do not use it for critical functionality
|
||||
*/
|
||||
* Registers a listener to all runtime events for recipes with the given name run in this baker instance.
|
||||
*
|
||||
* Note that the delivery guarantee is *AT MOST ONCE*. Do not use it for critical functionality
|
||||
*/
|
||||
override def registerEventListener(recipeName: String, listenerFunction: (RecipeEventMetadata, EventInstance) => Unit): Future[Unit] =
|
||||
doRegisterEventListener(listenerFunction, _ == recipeName)
|
||||
|
||||
/**
|
||||
* Registers a listener to all runtime events for all recipes that run in this Baker instance.
|
||||
*
|
||||
* Note that the delivery guarantee is *AT MOST ONCE*. Do not use it for critical functionality
|
||||
*/
|
||||
* Registers a listener to all runtime events for all recipes that run in this Baker instance.
|
||||
*
|
||||
* Note that the delivery guarantee is *AT MOST ONCE*. Do not use it for critical functionality
|
||||
*/
|
||||
// @deprecated("Use event bus instead", "1.4.0")
|
||||
override def registerEventListener(listenerFunction: (RecipeEventMetadata, EventInstance) => Unit): Future[Unit] =
|
||||
doRegisterEventListener(listenerFunction, _ => true)
|
||||
|
||||
/**
|
||||
* Registers a listener function that listens to all BakerEvents
|
||||
*
|
||||
* Note that the delivery guarantee is *AT MOST ONCE*. Do not use it for critical functionality
|
||||
*
|
||||
* @param listenerFunction
|
||||
* @return
|
||||
*/
|
||||
* Registers a listener function that listens to all BakerEvents
|
||||
*
|
||||
* Note that the delivery guarantee is *AT MOST ONCE*. Do not use it for critical functionality
|
||||
*
|
||||
* @param listenerFunction
|
||||
* @return
|
||||
*/
|
||||
override def registerBakerEventListener(listenerFunction: BakerEvent => Unit): Future[Unit] = {
|
||||
Future.successful {
|
||||
val listenerActor = system.actorOf(Props(new Actor() {
|
||||
override def receive: Receive = {
|
||||
case event: BakerEvent => Try {
|
||||
listenerFunction.apply(event)
|
||||
}.failed.foreach { e =>
|
||||
log.warn(s"Listener function threw exception for event: $event", e)
|
||||
}.failed.foreach {e =>
|
||||
logger.warn(s"Listener function threw exception for event: $event", e)
|
||||
}
|
||||
}
|
||||
}))
|
||||
@@ -428,26 +426,26 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an interaction implementation to baker.
|
||||
*
|
||||
* This is assumed to be a an object with a method named 'apply' defined on it.
|
||||
*
|
||||
* @param implementation The implementation object
|
||||
*/
|
||||
* Adds an interaction implementation to baker.
|
||||
*
|
||||
* This is assumed to be a an object with a method named 'apply' defined on it.
|
||||
*
|
||||
* @param implementation The implementation object
|
||||
*/
|
||||
override def addInteractionInstance(implementation: InteractionInstance): Future[Unit] =
|
||||
Future.successful(config.interactionManager.addImplementation(implementation))
|
||||
|
||||
/**
|
||||
* Adds a sequence of interaction implementation to baker.
|
||||
*
|
||||
* @param implementations The implementation object
|
||||
*/
|
||||
* Adds a sequence of interaction implementation to baker.
|
||||
*
|
||||
* @param implementations The implementation object
|
||||
*/
|
||||
override def addInteractionInstances(implementations: Seq[InteractionInstance]): Future[Unit] =
|
||||
Future.successful(implementations.foreach(addInteractionInstance))
|
||||
|
||||
/**
|
||||
* Attempts to gracefully shutdown the baker system.
|
||||
*/
|
||||
* Attempts to gracefully shutdown the baker system.
|
||||
*/
|
||||
override def gracefulShutdown: Future[Unit] =
|
||||
Future.successful(GracefulShutdown.gracefulShutdownActorSystem(system, config.defaultShutdownTimeout))
|
||||
}
|
||||
|
||||
@@ -17,8 +17,7 @@ import com.ing.baker.runtime.akka.actor.process_index._
|
||||
import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManager
|
||||
import com.ing.baker.runtime.akka.internal.InteractionManager
|
||||
import com.ing.baker.runtime.serialization.{BakerSerializable, Encryption}
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{Await, TimeoutException}
|
||||
|
||||
@@ -33,25 +32,25 @@ object ClusterBakerActorProvider {
|
||||
case object ServiceDiscovery extends ClusterBootstrapMode
|
||||
|
||||
/**
|
||||
* This function calculates the names of the ActorIndex actors
|
||||
* gets the least significant bits of the UUID, and returns the MOD 10
|
||||
* So we have at most 10 manager actors created, all the petrinet actors will fall under these 10 actors
|
||||
* Note, the nrOfShards used here has to be aligned with the nrOfShards used in the shardIdExtractor
|
||||
*/
|
||||
* This function calculates the names of the ActorIndex actors
|
||||
* gets the least significant bits of the UUID, and returns the MOD 10
|
||||
* So we have at most 10 manager actors created, all the petrinet actors will fall under these 10 actors
|
||||
* Note, the nrOfShards used here has to be aligned with the nrOfShards used in the shardIdExtractor
|
||||
*/
|
||||
def entityId(recipeInstanceId: String, nrOfShards: Int): String =
|
||||
s"index-${Math.abs(sha256HashCode(recipeInstanceId) % nrOfShards)}"
|
||||
|
||||
// extracts the actor id -> message from the incoming message
|
||||
// Entity id is the first character of the UUID
|
||||
def entityIdExtractor(nrOfShards: Int): ExtractEntityId = {
|
||||
case msg:ProcessIndexMessage => (entityId(msg.recipeInstanceId, nrOfShards), msg)
|
||||
case msg: ProcessIndexMessage => (entityId(msg.recipeInstanceId, nrOfShards), msg)
|
||||
case GetShardIndex(entityId) => (entityId, GetIndex)
|
||||
case msg => throw new IllegalArgumentException(s"Message of type ${msg.getClass} not recognized")
|
||||
}
|
||||
|
||||
// extracts the shard id from the incoming message
|
||||
def shardIdExtractor(nrOfShards: Int): ExtractShardId = {
|
||||
case msg:ProcessIndexMessage => Math.abs(sha256HashCode(msg.recipeInstanceId) % nrOfShards).toString
|
||||
case msg: ProcessIndexMessage => Math.abs(sha256HashCode(msg.recipeInstanceId) % nrOfShards).toString
|
||||
case GetShardIndex(entityId) => entityId.split(s"index-").last
|
||||
case ShardRegion.StartEntity(entityId) => entityId.split(s"index-").last
|
||||
case msg => throw new IllegalArgumentException(s"Message of type ${msg.getClass} not recognized")
|
||||
@@ -68,21 +67,19 @@ class ClusterBakerActorProvider(
|
||||
seedNodes: ClusterBootstrapMode,
|
||||
ingredientsFilter: List[String],
|
||||
configuredEncryption: Encryption
|
||||
) extends BakerActorProvider {
|
||||
|
||||
private val log = LoggerFactory.getLogger(classOf[ClusterBakerActorProvider])
|
||||
) extends BakerActorProvider with LazyLogging {
|
||||
|
||||
private def initializeCluster()(implicit actorSystem: ActorSystem): Unit = {
|
||||
/**
|
||||
* Join cluster after waiting for the persistenceInit actor, otherwise terminate here.
|
||||
*/
|
||||
* Join cluster after waiting for the persistenceInit actor, otherwise terminate here.
|
||||
*/
|
||||
try {
|
||||
Await.result(Util.persistenceInit(journalInitializeTimeout), journalInitializeTimeout)
|
||||
} catch {
|
||||
case _: TimeoutException => throw new IllegalStateException(s"Timeout when trying to initialize the akka journal, waited $journalInitializeTimeout")
|
||||
}
|
||||
// join the cluster
|
||||
log.info("PersistenceInit actor started successfully, joining cluster seed nodes {}", seedNodes)
|
||||
logger.info("PersistenceInit actor started successfully, joining cluster seed nodes {}", seedNodes)
|
||||
seedNodes match {
|
||||
case SeedNodesList(nel) =>
|
||||
Cluster.get(actorSystem).joinSeedNodes(nel.toList)
|
||||
@@ -133,13 +130,13 @@ class ClusterBakerActorProvider(
|
||||
actorSystem.actorOf(props = singletonProxyProps, name = "RecipeManagerProxy")
|
||||
}
|
||||
|
||||
def getAllProcessesMetadata(actor: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration) = {
|
||||
def getAllProcessesMetadata(actor: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration): Seq[ActorMetadata] = {
|
||||
|
||||
import akka.pattern.ask
|
||||
import system.dispatcher
|
||||
implicit val akkaTimeout: Timeout = timeout
|
||||
|
||||
val futures = (0 to nrOfShards).map { shard => actor.ask(GetShardIndex(s"index-$shard")).mapTo[Index].map(_.entries) }
|
||||
val futures = (0 to nrOfShards).map {shard => actor.ask(GetShardIndex(s"index-$shard")).mapTo[Index].map(_.entries)}
|
||||
val collected: Seq[ActorMetadata] = Util.collectFuturesWithin(futures, timeout, system.scheduler).flatten
|
||||
|
||||
collected
|
||||
|
||||
@@ -5,17 +5,18 @@ import akka.cluster.Cluster
|
||||
import akka.pattern.ask
|
||||
import akka.util.Timeout
|
||||
import com.ing.baker.runtime.akka.actor.GracefulShutdownShardRegions.InitiateGracefulShutdown
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.{Await, Promise, TimeoutException}
|
||||
import scala.util.{Failure, Success, Try}
|
||||
import scala.concurrent.{ Await, Promise, TimeoutException }
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
|
||||
object GracefulShutdown {
|
||||
|
||||
val log = LoggerFactory.getLogger("com.ing.baker.runtime.core.actor.GracefulShutdown")
|
||||
@transient
|
||||
lazy val logger: Logger = Logger(LoggerFactory.getLogger(getClass.getName))
|
||||
|
||||
def gracefulShutdownActorSystem(actorSystem: ActorSystem, timeout: FiniteDuration) = {
|
||||
def gracefulShutdownActorSystem(actorSystem: ActorSystem, timeout: FiniteDuration): Any = {
|
||||
|
||||
Try {
|
||||
Cluster.get(actorSystem)
|
||||
@@ -26,12 +27,12 @@ object GracefulShutdown {
|
||||
gracefulShutdownShards(Seq("ProcessIndexActor"))(Timeout(timeout), actorSystem)
|
||||
|
||||
// then leave the cluster
|
||||
log.warn("Leaving the akka cluster")
|
||||
logger.warn("Leaving the akka cluster")
|
||||
|
||||
val promise: Promise[Boolean] = Promise()
|
||||
|
||||
cluster.registerOnMemberRemoved {
|
||||
log.warn("Successfully left the akka cluster, terminating the actor system")
|
||||
logger.warn("Successfully left the akka cluster, terminating the actor system")
|
||||
promise.success(true)
|
||||
actorSystem.terminate()
|
||||
}
|
||||
@@ -41,10 +42,10 @@ object GracefulShutdown {
|
||||
Await.result(promise.future, timeout)
|
||||
|
||||
case Success(_) =>
|
||||
log.warn("Not a member of a cluster, terminating the actor system")
|
||||
logger.warn("Not a member of a cluster, terminating the actor system")
|
||||
actorSystem.terminate()
|
||||
case Failure(exception) =>
|
||||
log.warn("Cluster not available for actor system", exception)
|
||||
logger.warn("Cluster not available for actor system", exception)
|
||||
actorSystem.terminate()
|
||||
}
|
||||
}
|
||||
@@ -57,7 +58,7 @@ object GracefulShutdown {
|
||||
Await.result(actor.ask(InitiateGracefulShutdown), timeout.duration)
|
||||
} catch {
|
||||
case _: TimeoutException =>
|
||||
log.warn(s"Graceful shutdown of shards timed out after $timeout")
|
||||
logger.warn(s"Graceful shutdown of shards timed out after $timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
package com.ing.baker.runtime.akka.actor
|
||||
|
||||
import akka.actor._
|
||||
import akka.cluster.sharding.{ClusterSharding, ShardRegion}
|
||||
import com.ing.baker.runtime.akka.actor.GracefulShutdownShardRegions.{GracefulShutdownSuccessful, GracefulShutdownTimedOut, InitiateGracefulShutdown}
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import akka.cluster.sharding.{ ClusterSharding, ShardRegion }
|
||||
import com.ing.baker.runtime.akka.actor.GracefulShutdownShardRegions.{ GracefulShutdownSuccessful, GracefulShutdownTimedOut, InitiateGracefulShutdown }
|
||||
import scala.collection._
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.duration._
|
||||
@@ -13,7 +11,9 @@ import scala.language.postfixOps
|
||||
object GracefulShutdownShardRegions {
|
||||
|
||||
case object InitiateGracefulShutdown
|
||||
|
||||
case object GracefulShutdownTimedOut
|
||||
|
||||
case object GracefulShutdownSuccessful
|
||||
|
||||
def props(shardHandOverTimeout: FiniteDuration, typeNames: Seq[String]): Props =
|
||||
@@ -21,23 +21,21 @@ object GracefulShutdownShardRegions {
|
||||
}
|
||||
|
||||
class GracefulShutdownShardRegions(shardHandOverTimeout: FiniteDuration, typeNames: Seq[String]) extends Actor
|
||||
with ActorLogging {
|
||||
with ActorLogging {
|
||||
|
||||
val system = context.system
|
||||
private val system = context.system
|
||||
|
||||
// all the shard region actor refs
|
||||
val shardRegionsRefs = typeNames.map(name => ClusterSharding(system).shardRegion(name)).toSet
|
||||
private val shardRegionsRefs = typeNames.map(name => ClusterSharding(system).shardRegion(name)).toSet
|
||||
|
||||
implicit val ec: ExecutionContext = system.dispatcher
|
||||
private implicit val ec: ExecutionContext = system.dispatcher
|
||||
|
||||
val config: Config = system.settings.config
|
||||
|
||||
override def receive = waitForLeaveCommand(shardRegionsRefs)
|
||||
override def receive: Receive = waitForLeaveCommand(shardRegionsRefs)
|
||||
|
||||
def waitForLeaveCommand(regions: Set[ActorRef]): Receive = {
|
||||
case InitiateGracefulShutdown =>
|
||||
GracefulShutdown.log.warn(s"Initiating graceful shut down of shard regions: ${typeNames.mkString(",")}")
|
||||
regions.foreach { region =>
|
||||
GracefulShutdown.logger.warn(s"Initiating graceful shut down of shard regions: ${typeNames.mkString(",")}")
|
||||
regions.foreach {region =>
|
||||
context watch region
|
||||
region ! ShardRegion.GracefulShutdown
|
||||
}
|
||||
@@ -46,16 +44,17 @@ class GracefulShutdownShardRegions(shardHandOverTimeout: FiniteDuration, typeNam
|
||||
}
|
||||
|
||||
def waitingForTermination(regions: Set[ActorRef], initiator: ActorRef): Receive = {
|
||||
case GracefulShutdownTimedOut =>
|
||||
GracefulShutdown.log.warn(s"Graceful shutdown of shard regions timed out after $shardHandOverTimeout")
|
||||
case GracefulShutdownTimedOut =>
|
||||
GracefulShutdown.logger.warn(s"Graceful shutdown of shard regions timed out after $shardHandOverTimeout")
|
||||
context.stop(self)
|
||||
case Terminated(region) =>
|
||||
val newRegions = regions - region
|
||||
if (newRegions.isEmpty) {
|
||||
GracefulShutdown.log.warn("Graceful shutdown of shard regions successful")
|
||||
GracefulShutdown.logger.warn("Graceful shutdown of shard regions successful")
|
||||
initiator ! GracefulShutdownSuccessful
|
||||
context.stop(self)
|
||||
} else
|
||||
} else {
|
||||
context.become(waitingForTermination(newRegions, initiator))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
package com.ing.baker.runtime.akka.actor.process_index
|
||||
|
||||
import akka.actor.{Actor, ActorRef, Props, ReceiveTimeout}
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, ReceiveTimeout }
|
||||
import com.ing.baker.il.CompiledRecipe
|
||||
import com.ing.baker.runtime.scaladsl.{EventInstance, EventReceived, EventRejected, SensoryEventResult, RecipeInstanceState}
|
||||
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol.{FireSensoryEventReaction, FireSensoryEventRejection, ProcessEvent, ProcessEventCompletedResponse, ProcessEventReceivedResponse}
|
||||
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._
|
||||
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol
|
||||
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol._
|
||||
import com.ing.baker.runtime.common.SensoryEventStatus
|
||||
import com.ing.baker.types.{PrimitiveValue, Value}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
import com.ing.baker.runtime.scaladsl.{ EventInstance, EventReceived, EventRejected, SensoryEventResult }
|
||||
import com.ing.baker.types.{ PrimitiveValue, Value }
|
||||
|
||||
object SensoryEventResponseHandler {
|
||||
|
||||
@@ -17,17 +16,16 @@ object SensoryEventResponseHandler {
|
||||
}
|
||||
|
||||
/**
|
||||
* An actor which builds the response to fireSensoryEvent* requests
|
||||
* - Obtains the data from the process instance (by accumulating transition firing outcomes)
|
||||
* - Publishes events to the system event stream
|
||||
* - Does involving logging
|
||||
*/
|
||||
class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ingredientsFilter: Seq[String]) extends Actor {
|
||||
* An actor which builds the response to fireSensoryEvent* requests
|
||||
* - Obtains the data from the process instance (by accumulating transition firing outcomes)
|
||||
* - Publishes events to the system event stream
|
||||
* - Does involving logging
|
||||
*/
|
||||
class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ingredientsFilter: Seq[String])
|
||||
extends Actor with ActorLogging {
|
||||
|
||||
context.setReceiveTimeout(command.timeout)
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger(classOf[SensoryEventResponseHandler])
|
||||
|
||||
val waitForRetries: Boolean = command.reaction match {
|
||||
case FireSensoryEventReaction.NotifyWhenReceived => false
|
||||
case FireSensoryEventReaction.NotifyWhenCompleted(waitForRetries0) => waitForRetries0
|
||||
@@ -154,15 +152,15 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing
|
||||
case FireSensoryEventReaction.NotifyOnEvent(_, onEvent)
|
||||
if Option(cache.head.asInstanceOf[TransitionFired].output.asInstanceOf[EventInstance]).exists(_.name == onEvent) =>
|
||||
notifyComplete(cache.reverse)
|
||||
PartialFunction { _ => () }
|
||||
PartialFunction {_ => ()}
|
||||
|
||||
case FireSensoryEventReaction.NotifyWhenCompleted(_) if runningJobs.isEmpty =>
|
||||
notifyComplete(cache.reverse)
|
||||
PartialFunction { _ => () }
|
||||
PartialFunction {_ => ()}
|
||||
|
||||
case FireSensoryEventReaction.NotifyBoth(_, _) if runningJobs.isEmpty =>
|
||||
notifyComplete(cache.reverse)
|
||||
PartialFunction { _ => () }
|
||||
PartialFunction {_ => ()}
|
||||
|
||||
case _ =>
|
||||
PartialFunction {
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.ing.baker.runtime.akka._
|
||||
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceEventSourcing._
|
||||
import com.ing.baker.runtime.akka.actor.process_instance.internal.ExceptionStrategy.BlockTransition
|
||||
import com.ing.baker.runtime.akka.actor.process_instance.internal.{ExceptionStrategy, Instance, Job}
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
/**
|
||||
@@ -19,7 +20,7 @@ import org.slf4j.{Logger, LoggerFactory}
|
||||
* @tparam S The state type
|
||||
* @tparam E The event type
|
||||
*/
|
||||
trait ProcessInstanceRuntime[P, T, S, E] {
|
||||
trait ProcessInstanceRuntime[P, T, S, E] extends LazyLogging {
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger("com.ing.baker.runtime.core.actor.process_instance.ProcessInstanceRuntime")
|
||||
|
||||
@@ -31,15 +32,15 @@ trait ProcessInstanceRuntime[P, T, S, E] {
|
||||
val eventSource: T ⇒ S ⇒ E ⇒ S = _ ⇒ s ⇒ _ ⇒ s
|
||||
|
||||
/**
|
||||
* This function is called when a transition throws an exception.
|
||||
*
|
||||
* By default the transition is blocked.
|
||||
*/
|
||||
* This function is called when a transition throws an exception.
|
||||
*
|
||||
* By default the transition is blocked.
|
||||
*/
|
||||
def handleException(job: Job[P, T, S])(throwable: Throwable, failureCount: Int, startTime: Long, outMarking: MultiSet[P]): ExceptionStrategy = BlockTransition
|
||||
|
||||
/**
|
||||
* Returns the task that should be executed for a transition.
|
||||
*/
|
||||
* Returns the task that should be executed for a transition.
|
||||
*/
|
||||
def transitionTask(petriNet: PetriNet[P, T], t: T)(marking: Marking[P], state: S, input: Any): IO[(Marking[P], E)]
|
||||
|
||||
/**
|
||||
@@ -50,25 +51,25 @@ trait ProcessInstanceRuntime[P, T, S, E] {
|
||||
def canBeFiredAutomatically(instance: Instance[P, T, S], t: T): Boolean = instance.petriNet.incomingPlaces(t).nonEmpty
|
||||
|
||||
/**
|
||||
* Defines which tokens from a marking for a particular place are consumable by a transition.
|
||||
*
|
||||
* By default ALL tokens from that place are consumable.
|
||||
*
|
||||
* You can override this for example in case you use a colored (data) petri net model with filter rules on the edges.
|
||||
*/
|
||||
* Defines which tokens from a marking for a particular place are consumable by a transition.
|
||||
*
|
||||
* By default ALL tokens from that place are consumable.
|
||||
*
|
||||
* You can override this for example in case you use a colored (data) petri net model with filter rules on the edges.
|
||||
*/
|
||||
def consumableTokens(petriNet: PetriNet[P, T])(marking: Marking[P], p: P, t: T): MultiSet[Any] = marking.getOrElse(p, MultiSet.empty)
|
||||
|
||||
/**
|
||||
* Takes a Job specification, executes it and returns a TransitionEvent (asychronously using cats.effect.IO)
|
||||
*
|
||||
* TODO
|
||||
*
|
||||
* The use of cats.effect.IO is not really necessary at this point. It was mainly chosen to support cancellation in
|
||||
* the future: https://typelevel.org/cats-effect/datatypes/io.html#cancelable-processes
|
||||
*
|
||||
* However, since that is not used this can be refactored to a simple function: Job -> TransitionEvent
|
||||
*
|
||||
*/
|
||||
* Takes a Job specification, executes it and returns a TransitionEvent (asychronously using cats.effect.IO)
|
||||
*
|
||||
* TODO
|
||||
*
|
||||
* The use of cats.effect.IO is not really necessary at this point. It was mainly chosen to support cancellation in
|
||||
* the future: https://typelevel.org/cats-effect/datatypes/io.html#cancelable-processes
|
||||
*
|
||||
* However, since that is not used this can be refactored to a simple function: Job -> TransitionEvent
|
||||
*
|
||||
*/
|
||||
def jobExecutor(topology: PetriNet[P, T])(implicit transitionIdentifier: Identifiable[T], placeIdentifier: Identifiable[P]): Job[P, T, S] ⇒ IO[TransitionEvent] = {
|
||||
|
||||
def exceptionStackTrace(e: Throwable): String = {
|
||||
@@ -83,7 +84,7 @@ trait ProcessInstanceRuntime[P, T, S, E] {
|
||||
val transition = job.transition
|
||||
val consumed: Marking[Id] = job.consume.marshall
|
||||
|
||||
IO.unit.flatMap { _ =>
|
||||
IO.unit.flatMap {_ =>
|
||||
// calling transitionTask(...) could potentially throw an exception
|
||||
// TODO I don't believe the last statement is true
|
||||
transitionTask(topology, transition)(job.consume, job.processState, job.input)
|
||||
@@ -99,7 +100,7 @@ trait ProcessInstanceRuntime[P, T, S, E] {
|
||||
}.handleException {
|
||||
// If an exception was thrown while computing the failure strategy we block the interaction from firing
|
||||
case e: Throwable =>
|
||||
log.error(s"Exception while handling transition failure", e)
|
||||
logger.error(s"Exception while handling transition failure", e)
|
||||
TransitionFailedEvent(job.id, transition.getId, job.correlationId, startTime, System.currentTimeMillis(), consumed, job.input, exceptionStackTrace(e), ExceptionStrategy.BlockTransition)
|
||||
}
|
||||
}
|
||||
@@ -115,7 +116,7 @@ trait ProcessInstanceRuntime[P, T, S, E] {
|
||||
}
|
||||
|
||||
// check if any any places have an insufficient number of tokens
|
||||
if (consumable.exists { case (_, count, tokens) ⇒ tokens.multisetSize < count })
|
||||
if (consumable.exists {case (_, count, tokens) ⇒ tokens.multisetSize < count})
|
||||
Seq.empty
|
||||
else {
|
||||
val consume = consumable.map {
|
||||
@@ -128,21 +129,21 @@ trait ProcessInstanceRuntime[P, T, S, E] {
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether a transition is 'enabled' in a marking.
|
||||
*/
|
||||
* Checks whether a transition is 'enabled' in a marking.
|
||||
*/
|
||||
def isEnabled(petriNet: PetriNet[P, T])(marking: Marking[P], t: T): Boolean = consumableMarkings(petriNet)(marking, t).nonEmpty
|
||||
|
||||
/**
|
||||
* Returns all enabled transitions for a marking.
|
||||
*/
|
||||
* Returns all enabled transitions for a marking.
|
||||
*/
|
||||
def enabledTransitions(petriNet: PetriNet[P, T])(marking: Marking[P]): Iterable[T] =
|
||||
petriNet.transitions.filter(t ⇒ consumableMarkings(petriNet)(marking, t).nonEmpty)
|
||||
|
||||
/**
|
||||
* Creates a job for a specific transition with input, computes the marking it should consume
|
||||
*/
|
||||
* Creates a job for a specific transition with input, computes the marking it should consume
|
||||
*/
|
||||
def createJob(transition: T, input: Any, correlationId: Option[String] = None): State[Instance[P, T, S], Either[String, Job[P, T, S]]] =
|
||||
State { instance ⇒
|
||||
State {instance ⇒
|
||||
if (instance.isBlocked(transition))
|
||||
(instance, Left("Transition is blocked by a previous failure"))
|
||||
else
|
||||
@@ -170,8 +171,8 @@ trait ProcessInstanceRuntime[P, T, S, E] {
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds all automated enabled transitions.
|
||||
*/
|
||||
* Finds all automated enabled transitions.
|
||||
*/
|
||||
def allEnabledJobs: State[Instance[P, T, S], Set[Job[P, T, S]]] =
|
||||
firstEnabledJob.flatMap {
|
||||
case None ⇒ State.pure(Set.empty)
|
||||
|
||||
@@ -3,15 +3,15 @@ package com.ing.baker.runtime.akka.actor.serialization
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import com.ing.baker.il
|
||||
import com.ing.baker.runtime.akka.actor.ClusterBakerActorProvider
|
||||
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProto._
|
||||
import com.ing.baker.runtime.akka.actor.process_index.{ProcessIndex, ProcessIndexProtocol}
|
||||
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProto._
|
||||
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol
|
||||
import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProto._
|
||||
import com.ing.baker.runtime.akka.actor.recipe_manager.{RecipeManager, RecipeManagerProtocol}
|
||||
import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeEventMetadata, RecipeInstanceState}
|
||||
import com.ing.baker.runtime.serialization.{ProtoMap, SerializersProvider, TypedProtobufSerializer}
|
||||
import com.ing.baker.runtime.serialization.TypedProtobufSerializer.{BinarySerializable, forType}
|
||||
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProto._
|
||||
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProto._
|
||||
import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProto._
|
||||
import com.ing.baker.runtime.serialization.{ProtoMap, SerializersProvider, TypedProtobufSerializer}
|
||||
|
||||
object BakerTypedProtobufSerializer {
|
||||
|
||||
@@ -21,8 +21,8 @@ object BakerTypedProtobufSerializer {
|
||||
}
|
||||
|
||||
/** Hardcoded serializerId for this serializer. This should not conflict with other serializers.
|
||||
* Values from 0 to 40 are reserved for Akka internal usage.
|
||||
*/
|
||||
* Values from 0 to 40 are reserved for Akka internal usage.
|
||||
*/
|
||||
val identifier = 101
|
||||
|
||||
def commonEntries(implicit ev0: SerializersProvider): List[BinarySerializable] =
|
||||
@@ -42,7 +42,7 @@ object BakerTypedProtobufSerializer {
|
||||
)
|
||||
|
||||
def processIndexEntries(implicit ev0: SerializersProvider): List[BinarySerializable] =
|
||||
List (
|
||||
List(
|
||||
forType[ClusterBakerActorProvider.GetShardIndex]
|
||||
.register("ProcessIndex.GetShardIndex"),
|
||||
forType[ProcessIndex.ActorCreated]
|
||||
@@ -99,8 +99,8 @@ object BakerTypedProtobufSerializer {
|
||||
.register("ProcessIndexProtocol.FireSensoryEventRejection.FiringLimitMet")
|
||||
)
|
||||
|
||||
def processInstanceEntries(implicit ev0: SerializersProvider): List[BinarySerializable] =
|
||||
List(
|
||||
def processInstanceEntries(implicit ev0: SerializersProvider): List[BinarySerializable] =
|
||||
List(
|
||||
forType[ProcessInstanceProtocol.Stop]
|
||||
.register("ProcessInstanceProtocol.Stop"),
|
||||
forType[ProcessInstanceProtocol.GetState.type]
|
||||
|
||||
@@ -3,7 +3,7 @@ package com.ing.baker.runtime.akka.internal
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
|
||||
import akka.event.EventStream
|
||||
import cats.effect.IO
|
||||
import cats.effect.{ContextShift, IO}
|
||||
import com.ing.baker.il
|
||||
import com.ing.baker.il.failurestrategy.ExceptionStrategyOutcome
|
||||
import com.ing.baker.il.petrinet._
|
||||
@@ -120,6 +120,7 @@ object RecipeRuntime {
|
||||
|
||||
class RecipeRuntime(recipe: CompiledRecipe, interactionManager: InteractionManager, eventStream: EventStream)(implicit ec: ExecutionContext) extends ProcessInstanceRuntime[Place, Transition, RecipeInstanceState, EventInstance] {
|
||||
|
||||
protected implicit lazy val contextShift: ContextShift[IO] = IO.contextShift(ec)
|
||||
/**
|
||||
* All transitions except sensory event interactions are auto-fireable by the runtime
|
||||
*/
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package com.ing.baker
|
||||
|
||||
import java.nio.file.Paths
|
||||
import java.util.UUID
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.testkit.TestKit
|
||||
import com.ing.baker.compiler.RecipeCompiler
|
||||
@@ -14,11 +17,15 @@ import java.util.UUID
|
||||
|
||||
import com.ing.baker.recipe.common.Recipe
|
||||
import com.ing.baker.runtime.akka.AkkaBaker
|
||||
import com.ing.baker.recipe.TestRecipe.{fireTwoEventsInteraction, _}
|
||||
import com.ing.baker.recipe.{CaseClassIngredient, common}
|
||||
import com.ing.baker.runtime.scaladsl.{Baker, EventInstance, InteractionInstance}
|
||||
import com.ing.baker.types.{Converters, Value}
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
import org.mockito.Matchers._
|
||||
import org.mockito.Mockito._
|
||||
import org.scalatest._
|
||||
import org.scalatest.mockito.MockitoSugar
|
||||
|
||||
import org.scalatestplus.mockito.MockitoSugar
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
@@ -107,7 +114,7 @@ trait BakerRuntimeTestBase
|
||||
testProvidesNothingInteractionMock).map(InteractionInstance.unsafeFrom(_))
|
||||
|
||||
def writeRecipeToSVGFile(recipe: CompiledRecipe) = {
|
||||
import guru.nidi.graphviz.engine.{ Format, Graphviz }
|
||||
import guru.nidi.graphviz.engine.{Format, Graphviz}
|
||||
import guru.nidi.graphviz.parse.Parser
|
||||
val g = Parser.read(recipe.getRecipeVisualization)
|
||||
Graphviz.fromGraph(g).render(Format.SVG).toFile(Paths.get(recipe.name).toFile)
|
||||
@@ -167,9 +174,9 @@ trait BakerRuntimeTestBase
|
||||
s"""
|
||||
|akka {
|
||||
|
|
||||
| actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
| actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
|
|
||||
| remote {
|
||||
| remote {
|
||||
| netty.tcp {
|
||||
| hostname = localhost
|
||||
| port = $port
|
||||
@@ -177,7 +184,7 @@ trait BakerRuntimeTestBase
|
||||
| }
|
||||
|}
|
||||
|
|
||||
|baker {
|
||||
|baker {
|
||||
| actor.provider = "cluster-sharded"
|
||||
| cluster.seed-nodes = ["akka.tcp://$actorSystemName@localhost:$port"]
|
||||
|}
|
||||
@@ -190,14 +197,14 @@ trait BakerRuntimeTestBase
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Baker instance that contains a simple recipe that can be used in tests
|
||||
* It als sets mocks that return happy flow responses for the interactions
|
||||
*
|
||||
* This recipe contains: See TestRecipe.png for a visualization
|
||||
*
|
||||
* @param recipeName A unique name that is needed for the recipe to insure that the tests do not interfere with each other
|
||||
* @return
|
||||
*/
|
||||
* Returns a Baker instance that contains a simple recipe that can be used in tests
|
||||
* It als sets mocks that return happy flow responses for the interactions
|
||||
*
|
||||
* This recipe contains: See TestRecipe.png for a visualization
|
||||
*
|
||||
* @param recipeName A unique name that is needed for the recipe to insure that the tests do not interfere with each other
|
||||
* @return
|
||||
*/
|
||||
protected def setupBakerWithRecipe(recipeName: String, appendUUIDToTheRecipeName: Boolean = true)
|
||||
(implicit actorSystem: ActorSystem): Future[(Baker, String)] = {
|
||||
val newRecipeName = if (appendUUIDToTheRecipeName) s"$recipeName-${UUID.randomUUID().toString}" else recipeName
|
||||
|
||||
@@ -10,7 +10,7 @@ import org.scalacheck.Prop.forAll
|
||||
import org.scalacheck.Test.Parameters.defaultVerbose
|
||||
import org.scalacheck._
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.prop.Checkers
|
||||
import org.scalatestplus.scalacheck.Checkers
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.Random
|
||||
|
||||
@@ -1,33 +1,28 @@
|
||||
package com.ing.baker.runtime.akka
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.persistence.inmemory.extension.{InMemoryJournalStorage, StorageExtension}
|
||||
import akka.persistence.inmemory.extension.{ InMemoryJournalStorage, StorageExtension }
|
||||
import akka.testkit.TestProbe
|
||||
import com.ing.baker._
|
||||
import com.ing.baker.recipe.TestRecipe._
|
||||
import com.ing.baker.recipe.common.InteractionFailureStrategy
|
||||
import com.ing.baker.recipe.scaladsl.Recipe
|
||||
import com.ing.baker.runtime.common.RejectReason._
|
||||
import com.ing.baker.runtime.scaladsl._
|
||||
import com.ing.baker.runtime.scaladsl.EventInstance
|
||||
import com.ing.baker.runtime.scaladsl.{ EventInstance, _ }
|
||||
import com.ing.baker.types.PrimitiveValue
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
import java.util.UUID
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
|
||||
|
||||
object BakerEventsSpec {
|
||||
|
||||
val log = LoggerFactory.getLogger(classOf[BakerEventsSpec])
|
||||
object BakerEventsSpec extends LazyLogging {
|
||||
|
||||
def listenerFunction(probe: ActorRef, logEvents: Boolean = false): PartialFunction[BakerEvent, Unit] = {
|
||||
case event: BakerEvent =>
|
||||
if (logEvents) {
|
||||
log.info("Listener consumed event {}", event)
|
||||
logger.info("Listener consumed event {}", event)
|
||||
}
|
||||
probe ! event
|
||||
}
|
||||
@@ -76,9 +71,7 @@ class BakerEventsSpec extends BakerRuntimeTestBase {
|
||||
|
||||
override def actorSystemName = "BakerEventsSpec"
|
||||
|
||||
val log = LoggerFactory.getLogger(classOf[BakerEventsSpec])
|
||||
|
||||
val eventReceiveTimeout = 1 seconds
|
||||
private val eventReceiveTimeout = 1 seconds
|
||||
|
||||
before {
|
||||
resetMocks
|
||||
@@ -104,18 +97,18 @@ class BakerEventsSpec extends BakerRuntimeTestBase {
|
||||
_ <- baker.fireEventAndResolveWhenCompleted(recipeInstanceId, EventInstance.unsafeFrom(InitialEvent(initialIngredientValue)), "someId")
|
||||
// TODO check the order of the timestamps later
|
||||
_ = expectMsgInAnyOrderPF(listenerProbe,
|
||||
{ case msg@RecipeInstanceCreated(_, `recipeId`, `recipeName`, `recipeInstanceId`) => msg },
|
||||
{ case msg@EventReceived(_, _, _, `recipeInstanceId`, Some("someId"), EventInstance("InitialEvent", ingredients)) if ingredients == Map("initialIngredient" -> PrimitiveValue(`initialIngredientValue`)) => msg },
|
||||
{ case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "SieveInteraction") => msg },
|
||||
{ case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionOne") => msg },
|
||||
{ case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionTwo") => msg },
|
||||
{ case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionThree") => msg },
|
||||
{ case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "ProvidesNothingInteraction") => msg },
|
||||
{ case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionOne", Some(EventInstance("InteractionOneSuccessful", ingredients))) if ingredients == Map("interactionOneIngredient" -> PrimitiveValue("interactionOneIngredient")) => msg },
|
||||
{ case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionTwo", Some(EventInstance("EventFromInteractionTwo", ingredients))) if ingredients == Map("interactionTwoIngredient" -> PrimitiveValue("interactionTwoIngredient")) => msg },
|
||||
{ case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionThree", Some(EventInstance("InteractionThreeSuccessful", ingredients))) if ingredients == Map("interactionThreeIngredient" -> PrimitiveValue("interactionThreeIngredient")) => msg },
|
||||
{ case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "ProvidesNothingInteraction", None) => msg },
|
||||
{ case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "SieveInteraction", Some(EventInstance("SieveInteractionSuccessful", ingredients))) if ingredients == Map("sievedIngredient" -> PrimitiveValue("sievedIngredient")) => msg }
|
||||
{case msg@RecipeInstanceCreated(_, `recipeId`, `recipeName`, `recipeInstanceId`) => msg},
|
||||
{case msg@EventReceived(_, _, _, `recipeInstanceId`, Some("someId"), EventInstance("InitialEvent", ingredients)) if ingredients == Map("initialIngredient" -> PrimitiveValue(`initialIngredientValue`)) => msg},
|
||||
{case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "SieveInteraction") => msg},
|
||||
{case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionOne") => msg},
|
||||
{case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionTwo") => msg},
|
||||
{case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionThree") => msg},
|
||||
{case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "ProvidesNothingInteraction") => msg},
|
||||
{case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionOne", Some(EventInstance("InteractionOneSuccessful", ingredients))) if ingredients == Map("interactionOneIngredient" -> PrimitiveValue("interactionOneIngredient")) => msg},
|
||||
{case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionTwo", Some(EventInstance("EventFromInteractionTwo", ingredients))) if ingredients == Map("interactionTwoIngredient" -> PrimitiveValue("interactionTwoIngredient")) => msg},
|
||||
{case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionThree", Some(EventInstance("InteractionThreeSuccessful", ingredients))) if ingredients == Map("interactionThreeIngredient" -> PrimitiveValue("interactionThreeIngredient")) => msg},
|
||||
{case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "ProvidesNothingInteraction", None) => msg},
|
||||
{case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "SieveInteraction", Some(EventInstance("SieveInteractionSuccessful", ingredients))) if ingredients == Map("sievedIngredient" -> PrimitiveValue("sievedIngredient")) => msg}
|
||||
)
|
||||
_ = listenerProbe.expectNoMessage(eventReceiveTimeout)
|
||||
} yield succeed
|
||||
|
||||
@@ -22,8 +22,8 @@ import com.typesafe.config.{Config, ConfigFactory}
|
||||
import org.mockito.Matchers.{eq => mockitoEq, _}
|
||||
import org.mockito.Mockito._
|
||||
import org.mockito.invocation.InvocationOnMock
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import org.mockito.stubbing.Answer
|
||||
import org.slf4j.LoggerFactory
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
@@ -34,8 +34,6 @@ class BakerExecutionSpec extends BakerRuntimeTestBase {
|
||||
|
||||
override def actorSystemName = "BakerExecutionSpec"
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger(classOf[BakerExecutionSpec])
|
||||
|
||||
before {
|
||||
resetMocks()
|
||||
setupMockResponse()
|
||||
|
||||
@@ -1,23 +1,16 @@
|
||||
package com.ing.baker.runtime.akka
|
||||
|
||||
import akka.persistence.inmemory.extension.{InMemoryJournalStorage, StorageExtension}
|
||||
import akka.persistence.inmemory.extension.{ InMemoryJournalStorage, StorageExtension }
|
||||
import akka.testkit.TestProbe
|
||||
import com.ing.baker._
|
||||
import com.ing.baker.compiler.RecipeCompiler
|
||||
import com.ing.baker.il.CompiledRecipe
|
||||
import com.ing.baker.recipe.TestRecipe.getRecipe
|
||||
import com.ing.baker.runtime.common.RecipeInformation
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.language.postfixOps
|
||||
|
||||
class BakerInquireSpec extends BakerRuntimeTestBase {
|
||||
|
||||
override def actorSystemName = "BakerInquireSpec"
|
||||
|
||||
val log = LoggerFactory.getLogger(classOf[BakerInquireSpec])
|
||||
|
||||
|
||||
before {
|
||||
resetMocks
|
||||
setupMockResponse()
|
||||
|
||||
@@ -23,8 +23,9 @@ import com.typesafe.config.{Config, ConfigFactory}
|
||||
import org.mockito.Mockito
|
||||
import org.mockito.Mockito.when
|
||||
import org.scalatest.concurrent.Eventually
|
||||
import org.scalatest.mockito.MockitoSugar
|
||||
import org.scalatestplus.mockito.MockitoSugar
|
||||
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Matchers, WordSpecLike}
|
||||
import org.scalatestplus.mockito.MockitoSugar
|
||||
import scalax.collection.immutable.Graph
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
@@ -26,8 +26,8 @@ import org.mockito.invocation.InvocationOnMock
|
||||
import org.mockito.stubbing.Answer
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalatest.mockito.MockitoSugar
|
||||
import org.scalatest.time.{Milliseconds, Span}
|
||||
import org.scalatestplus.mockito.MockitoSugar
|
||||
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.duration._
|
||||
|
||||
@@ -9,7 +9,6 @@ import com.ing.baker.compiler.RecipeCompiler
|
||||
import com.ing.baker.recipe.TestRecipe
|
||||
import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol._
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
object RecipeManagerSpec {
|
||||
val config: Config = ConfigFactory.parseString(
|
||||
@@ -24,8 +23,6 @@ class RecipeManagerSpec extends BakerRuntimeTestBase {
|
||||
|
||||
override def actorSystemName = "RecipeManagerSpec"
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger(classOf[RecipeManagerSpec])
|
||||
|
||||
"The recipe manager" should {
|
||||
"add a recipe to the list when an AddRecipe message is received" in {
|
||||
val compiledRecipe = RecipeCompiler.compileRecipe(TestRecipe.getRecipe("AddRecipeRecipe"))
|
||||
|
||||
@@ -4,8 +4,8 @@ import org.scalacheck.Gen._
|
||||
import org.scalacheck.Prop.forAll
|
||||
import org.scalacheck._
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.prop.Checkers
|
||||
import com.ing.baker.runtime.serialization.Encryption._
|
||||
import org.scalatestplus.scalacheck.Checkers
|
||||
|
||||
class EncryptionPropertiesSpec extends FunSuite with Checkers {
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ import org.scalacheck.Prop.forAll
|
||||
import org.scalacheck.Test.Parameters.defaultVerbose
|
||||
import org.scalacheck._
|
||||
import org.scalatest.FunSuiteLike
|
||||
import org.scalatest.prop.Checkers
|
||||
import org.scalatestplus.scalacheck.Checkers
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.reflect.ClassTag
|
||||
@@ -92,7 +92,7 @@ class SerializationSpec extends TestKit(ActorSystem("BakerProtobufSerializerSpec
|
||||
val serialized = serializer.toBinary(m)
|
||||
val deserialized = serializer.fromBinary(serialized, serializer.manifest(m))
|
||||
deserialized === m &&
|
||||
ctxFromProto(ctxToProto(m)) === Success(m)
|
||||
ctxFromProto(ctxToProto(m)) === Success(m)
|
||||
}
|
||||
|
||||
checkFor[ProcessIndexProtocol.Index].run
|
||||
@@ -359,7 +359,9 @@ object SerializationSpec {
|
||||
} yield CreateProcess(recipeId, recipeInstanceId)
|
||||
|
||||
class SimpleActor extends Actor {
|
||||
override def receive: Receive = { case _ => () }
|
||||
override def receive: Receive = {
|
||||
case _ => ()
|
||||
}
|
||||
}
|
||||
|
||||
val waitForRetriesGen = Gen.oneOf(true, false)
|
||||
@@ -401,7 +403,7 @@ object SerializationSpec {
|
||||
} yield StopRetryingInteraction(recipeInstanceId, interactionName)
|
||||
|
||||
val sensoryEventStatusGen: Gen[SensoryEventStatus] = Gen.oneOf(
|
||||
SensoryEventStatus.AlreadyReceived ,
|
||||
SensoryEventStatus.AlreadyReceived,
|
||||
SensoryEventStatus.Completed,
|
||||
SensoryEventStatus.FiringLimitMet,
|
||||
SensoryEventStatus.Received,
|
||||
@@ -410,9 +412,9 @@ object SerializationSpec {
|
||||
)
|
||||
|
||||
val eventResultGen: Gen[SensoryEventResult] = for {
|
||||
status <- sensoryEventStatusGen
|
||||
events <- Gen.listOf(Gen.alphaStr)
|
||||
ingredients <- Gen.listOf(Runtime.ingredientsGen)
|
||||
status <- sensoryEventStatusGen
|
||||
events <- Gen.listOf(Gen.alphaStr)
|
||||
ingredients <- Gen.listOf(Runtime.ingredientsGen)
|
||||
} yield SensoryEventResult(status, events, ingredients.toMap)
|
||||
|
||||
implicit val processEventResponse: Gen[ProcessEventResponse] = for {
|
||||
|
||||
@@ -6,8 +6,8 @@ import com.ing.baker.runtime.scaladsl.InteractionInstance
|
||||
import com.ing.baker.types
|
||||
import com.ing.baker.types.Type
|
||||
import org.mockito.Mockito.when
|
||||
import org.scalatest.mockito.MockitoSugar
|
||||
import org.scalatest.{Matchers, WordSpecLike}
|
||||
import org.scalatestplus.mockito.MockitoSugar
|
||||
|
||||
class InteractionManagerLocalSpec extends WordSpecLike with Matchers with MockitoSugar {
|
||||
"getImplementation" should {
|
||||
|
||||
@@ -8,8 +8,8 @@ import com.ing.baker.runtime.scaladsl.RecipeInstanceState
|
||||
import com.ing.baker.types.Value
|
||||
import com.ing.baker.{il, types}
|
||||
import org.mockito.Mockito._
|
||||
import org.scalatest.mockito.MockitoSugar
|
||||
import org.scalatest.{Matchers, WordSpecLike}
|
||||
import org.scalatestplus.mockito.MockitoSugar
|
||||
|
||||
class RecipeRuntimeSpec extends WordSpecLike with Matchers with MockitoSugar {
|
||||
"The recipe runtime" should {
|
||||
|
||||
@@ -1,17 +1,15 @@
|
||||
package com.ing.baker.runtime.akka.actor.downing
|
||||
|
||||
import akka.cluster.UniqueAddress
|
||||
import org.slf4j.LoggerFactory
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
|
||||
class MajorityStrategy extends Strategy {
|
||||
|
||||
private val log = LoggerFactory.getLogger(classOf[MajorityStrategy])
|
||||
class MajorityStrategy extends Strategy with LazyLogging {
|
||||
|
||||
override def sbrDecision(clusterHelper: ClusterHelper): Unit = {
|
||||
if (clusterHelper.amIMember && clusterHelper.amILeader && clusterHelper.unreachables.nonEmpty) {
|
||||
val nodesToDown = this.nodesToDown(clusterHelper)
|
||||
|
||||
log.info(s"SplitBrainResolver: ${clusterHelper.myUniqueAddress} downing these nodes $nodesToDown")
|
||||
logger.info(s"SplitBrainResolver: ${clusterHelper.myUniqueAddress} downing these nodes $nodesToDown")
|
||||
if (nodesToDown contains clusterHelper.myUniqueAddress) {
|
||||
// leader going down
|
||||
clusterHelper.downSelf()
|
||||
|
||||
Reference in New Issue
Block a user