From 27db0908308ee561651af98138925bb7ba5569b0 Mon Sep 17 00:00:00 2001 From: Andreas Gabor Date: Thu, 19 Dec 2024 10:32:47 +0100 Subject: [PATCH] add r2dbc projection tests --- .../r2dbc/R2DbcProjectionSpec.scala | 2 +- .../scala/ClusterComponentTestKitSpec.scala | 123 ++++++++++++++++-- build.sbt | 3 +- 3 files changed, 115 insertions(+), 13 deletions(-) diff --git a/akka-components-persistence-projection-r2dbc/src/test/scala/net/sc8s/akka/components/persistence/projection/r2dbc/R2DbcProjectionSpec.scala b/akka-components-persistence-projection-r2dbc/src/test/scala/net/sc8s/akka/components/persistence/projection/r2dbc/R2DbcProjectionSpec.scala index 0f7f779..186e3a4 100644 --- a/akka-components-persistence-projection-r2dbc/src/test/scala/net/sc8s/akka/components/persistence/projection/r2dbc/R2DbcProjectionSpec.scala +++ b/akka-components-persistence-projection-r2dbc/src/test/scala/net/sc8s/akka/components/persistence/projection/r2dbc/R2DbcProjectionSpec.scala @@ -16,7 +16,7 @@ import scala.concurrent.Future import scala.util.Random /* -This spec is only meant to illustrate the usage of CassandraProjections +This spec is only meant to illustrate the usage of R2dbcProjections */ class R2DbcProjectionSpec extends ScalaTestWithActorTestKit(ConfigFactory.parseString( """ diff --git a/akka-components-testkit/src/test/scala/ClusterComponentTestKitSpec.scala b/akka-components-testkit/src/test/scala/ClusterComponentTestKitSpec.scala index 1912efa..0ffd469 100644 --- a/akka-components-testkit/src/test/scala/ClusterComponentTestKitSpec.scala +++ b/akka-components-testkit/src/test/scala/ClusterComponentTestKitSpec.scala @@ -12,6 +12,7 @@ import io.circe.generic.semiauto.deriveCodec import net.sc8s.akka.circe.CirceSerializer import net.sc8s.akka.components.ClusterComponent import net.sc8s.akka.components.persistence.projection.cassandra.CassandraProjection +import net.sc8s.akka.components.persistence.projection.r2dbc.{R2dbcShardedProjection, R2dbcSingletonProjection} import net.sc8s.akka.components.testkit.ClusterComponentTestKitSpec._ import net.sc8s.logstage.elastic.Logging import org.scalamock.scalatest.MockFactory @@ -20,14 +21,14 @@ import org.scalatest.wordspec.AnyWordSpecLike import scala.concurrent.Future -class ClusterComponentTestKitSpec extends net.sc8s.lagom.circe.testkit.ScalaTestWithActorTestKit(ClusterComponentTestKitSpec.Singleton.serializers ++ ClusterComponentTestKitSpec.SingletonEventSourced.serializers ++ ClusterComponentTestKitSpec.SingletonEventSourcedWithSnapshots.serializers ++ ClusterComponentTestKitSpec.ShardedEventSourcedWithCustomEntityId.serializers) with AnyWordSpecLike with Matchers with ClusterComponentTestKit with Logging with MockFactory { +class ClusterComponentTestKitSpec extends net.sc8s.lagom.circe.testkit.ScalaTestWithActorTestKit(ClusterComponentTestKitSpec.Singleton.serializers ++ ClusterComponentTestKitSpec.SingletonEventSourcedR2dbc.serializers ++ ClusterComponentTestKitSpec.SingletonEventSourcedWithSnapshots.serializers ++ ClusterComponentTestKitSpec.ShardedEventSourcedWithCustomEntityId.serializers) with AnyWordSpecLike with Matchers with ClusterComponentTestKit with Logging with MockFactory { "ComponentTestKit" should { "support Singleton" in { val value1 = spawnComponent(Singleton)(new Singleton.Component) value1 ! Command() } "support EventSourced Singleton" in { - spawnComponent(SingletonEventSourced)(new SingletonEventSourced.Component(mock[ProjectionTarget])) + spawnComponent(SingletonEventSourcedCassandra)(new SingletonEventSourcedCassandra.Component(mock[ProjectionTarget])) .runCommand(Command()) .event shouldBe Event() } @@ -36,10 +37,26 @@ class ClusterComponentTestKitSpec extends net.sc8s.lagom.circe.testkit.ScalaTest .runCommand(Command()) .event shouldBe Event() } - "support EventSourced Singleton with projection testing" in { + "support EventSourced Singleton with cassandra projection testing" in { val projectionTarget = mock[ProjectionTarget] - val component = new SingletonEventSourced.Component(projectionTarget) - val projection = testProjection(SingletonEventSourced)(component)(component.projection, Source(Seq( + val component = new SingletonEventSourcedCassandra.Component(projectionTarget) + val projection = testProjection(SingletonEventSourcedCassandra)(component)(component.projection, Source(Seq( + Event(), + Event(), + ))) + + (projectionTarget.serviceCall _).expects(component.persistenceId.id) + (projectionTarget.serviceCall _).expects(component.persistenceId.id) + + projectionTestKit.runWithTestSink(projection) { probe => + probe.request(2) + probe.expectNextUnordered(Done, Done) + } + } + "support EventSourced Singleton with r2dbc projection testing" in { + val projectionTarget = mock[ProjectionTarget] + val component = new SingletonEventSourcedR2dbc.Component(projectionTarget) + val projection = testProjection(SingletonEventSourcedR2dbc)(component)(component.projection, Source(Seq( Event(), Event(), ))) @@ -56,7 +73,7 @@ class ClusterComponentTestKitSpec extends net.sc8s.lagom.circe.testkit.ScalaTest spawnComponent(Sharded)(new Sharded.Component, "entityId") ! Command() } "support EventSourced Sharded" in { - spawnComponent(ShardedEventSourced)(new ShardedEventSourced.Component(mock[ProjectionTarget]), "entityId") + spawnComponent(ShardedEventSourcedCassandra)(new ShardedEventSourcedCassandra.Component(mock[ProjectionTarget]), "entityId") .runCommand(Command()) .event shouldBe Event() } @@ -70,12 +87,30 @@ class ClusterComponentTestKitSpec extends net.sc8s.lagom.circe.testkit.ScalaTest .runCommand(Command()) .event shouldBe Event() } - "support EventSourced Sharded with projection testing" in { + "support EventSourced Sharded with cassandra projection testing" in { val projectionTarget = mock[ProjectionTarget] val entityId1 = "entityId1" val entityId2 = "entityId2" - val component = new ShardedEventSourced.Component(projectionTarget) - val projection = testProjection(ShardedEventSourced)(component)(component.projection, Source(Seq( + val component = new ShardedEventSourcedCassandra.Component(projectionTarget) + val projection = testProjection(ShardedEventSourcedCassandra)(component)(component.projection, Source(Seq( + entityId1 -> Event(), + entityId2 -> Event(), + ))) + + (projectionTarget.serviceCall _).expects(entityId1) + (projectionTarget.serviceCall _).expects(entityId2) + + projectionTestKit.runWithTestSink(projection) { probe => + probe.request(2) + probe.expectNextUnordered(Done, Done) + } + } + "support EventSourced Sharded with r2dbc projection testing" in { + val projectionTarget = mock[ProjectionTarget] + val entityId1 = "entityId1" + val entityId2 = "entityId2" + val component = new ShardedEventSourcedR2dbc.Component(projectionTarget) + val projection = testProjection(ShardedEventSourcedR2dbc)(component)(component.projection, Source(Seq( entityId1 -> Event(), entityId2 -> Event(), ))) @@ -157,7 +192,40 @@ object ClusterComponentTestKitSpec { } } - object SingletonEventSourced extends ClusterComponent.Singleton.EventSourced with ClusterComponent.SameSerializableCommand { + object SingletonEventSourcedR2dbc extends ClusterComponent.Singleton.EventSourced with ClusterComponent.SameSerializableCommand { + override val name = "name" + override type Command = ClusterComponentTestKitSpec.Command + override val commandSerializer = CirceSerializer() + + override type Event = ClusterComponentTestKitSpec.Event + override val eventSerializer = CirceSerializer() + + override type State = ClusterComponentTestKitSpec.State + + class Component(projectionTarget: ProjectionTarget) extends BaseComponent with R2dbcSingletonProjection { + override val behavior = context => EventSourcedBehavior( + context.persistenceId, + State(), + { + case (state, command) => + Effect.persist(Event()) + }, + { + case (state, event) => state + } + ) + + val projection = createProjection("projection") { + case (event, context) => + projectionTarget.serviceCall(context.persistenceId.id) + Future.successful(Done) + } + + override val projections = wireSet + } + } + + object SingletonEventSourcedCassandra extends ClusterComponent.Singleton.EventSourced with ClusterComponent.SameSerializableCommand { override val name = "name" override type Command = ClusterComponentTestKitSpec.Command override val commandSerializer = CirceSerializer() @@ -235,7 +303,7 @@ object ClusterComponentTestKitSpec { def serviceCall(entityId: String): Unit } - object ShardedEventSourced extends ClusterComponent.Sharded.EventSourced with ClusterComponent.SameSerializableCommand with ClusterComponent.Sharded.StringEntityId { + object ShardedEventSourcedCassandra extends ClusterComponent.Sharded.EventSourced with ClusterComponent.SameSerializableCommand with ClusterComponent.Sharded.StringEntityId { override val name = "name" override type Command = ClusterComponentTestKitSpec.Command override val commandSerializer = CirceSerializer() @@ -268,6 +336,39 @@ object ClusterComponentTestKitSpec { } } + object ShardedEventSourcedR2dbc extends ClusterComponent.Sharded.EventSourced with ClusterComponent.SameSerializableCommand with ClusterComponent.Sharded.StringEntityId { + override val name = "name" + override type Command = ClusterComponentTestKitSpec.Command + override val commandSerializer = CirceSerializer() + + override type Event = ClusterComponentTestKitSpec.Event + override val eventSerializer = CirceSerializer() + + override type State = ClusterComponentTestKitSpec.State + + class Component(projectionTarget: ProjectionTarget) extends BaseComponent with R2dbcShardedProjection { + override val behavior = context => EventSourcedBehavior( + context.persistenceId, + State(), + { + case (state, command) => + Effect.persist(Event()) + }, + { + case (state, event) => state + } + ) + + val projection = createProjection("projection") { + case (event, context) => + projectionTarget.serviceCall(context.entityId) + Future.successful(Done) + } + + override val projections = wireSet + } + } + object ShardedEventSourcedWithCustomEntityId extends ClusterComponent.Sharded.EventSourced with ClusterComponent.SameSerializableCommand with ClusterComponent.Sharded.JsonEntityId { import net.sc8s.akka.circe.implicits._ diff --git a/build.sbt b/build.sbt index 95d2df0..624b315 100644 --- a/build.sbt +++ b/build.sbt @@ -141,9 +141,10 @@ lazy val `akka-components-testkit` = (project in file("akka-components-testkit") logback.classic % Test, logback.core % Test, ), + dependencyOverrides ++= Dependencies.akka.createOverrides(licensed = true), idePackagePrefix := Some("net.sc8s.akka.components.testkit") ) - .dependsOn(`akka-components`, `lagom-server-circe-testkit`, `akka-components-persistence-projection-cassandra`) + .dependsOn(`akka-components`, `lagom-server-circe-testkit`, `akka-components-persistence-projection-cassandra` % Test, `akka-components-persistence-projection-r2dbc` % Test) lazy val `akka-components-persistence-cassandra-lagom-api` = crossProject(JSPlatform, JVMPlatform) .crossType(CrossType.Pure)