diff --git a/persistence/src/main/scala/akka/persistence/ExtendedSnapshoterInterop.scala b/persistence/src/main/scala/akka/persistence/ExtendedSnapshoterInterop.scala new file mode 100644 index 00000000..5d9a7dc3 --- /dev/null +++ b/persistence/src/main/scala/akka/persistence/ExtendedSnapshoterInterop.scala @@ -0,0 +1,115 @@ +package akka.persistence + +import akka.actor.ActorSystem + +import cats.syntax.all._ +import cats.effect.Sync + +import com.evolutiongaming.catshelper.FromFuture +import com.evolutiongaming.akkaeffect.ActorEffect +import com.evolutiongaming.akkaeffect.persistence.{ExtendedSnapshotter, EventSourcedId, SeqNr, Snapshot} + +import scala.concurrent.duration._ +import java.time.Instant + +object ExtendedSnapshoterInterop { + + def apply[F[_]: Sync: FromFuture]( + system: ActorSystem, + timeout: FiniteDuration + ): F[ExtendedSnapshotter.Of[F]] = + Sync[F] + .delay { + Persistence(system) + } + .map { persistence => + new ExtendedSnapshotter.Of[F] { + + override def apply[S](snapshotPluginId: String, eventSourcedId: EventSourcedId): F[ExtendedSnapshotter[F, S]] = + Sync[F] + .delay { + val ref = persistence.snapshotStoreFor(snapshotPluginId) + ActorEffect.fromActor(ref) + } + .map { actor => + new ExtendedSnapshotter[F, S] { + + val persistenceId = eventSourcedId.value + + override def load(criteria: SnapshotSelectionCriteria, toSequenceNr: SeqNr): F[F[Option[Snapshot[S]]]] = { + + val request = SnapshotProtocol.LoadSnapshot(persistenceId, criteria, toSequenceNr) + actor + .ask(request, timeout) + .map { response => + response.flatMap { + + case SnapshotProtocol.LoadSnapshotResult(snapshot, _) => + snapshot match { + + case Some(offer) => + val payload = offer.snapshot.asInstanceOf[S] + val metadata = Snapshot.Metadata( + offer.metadata.sequenceNr, + Instant.ofEpochMilli(offer.metadata.timestamp) + ) + Snapshot.const(payload, metadata).some.pure[F] + + case None => none[Snapshot[S]].pure[F] + } + + case SnapshotProtocol.LoadSnapshotFailed(err) => + err.raiseError[F, Option[Snapshot[S]]] + } + } + } + + override def save(seqNr: SeqNr, snapshot: S): F[F[Instant]] = { + val metadata = SnapshotMetadata(persistenceId, seqNr) + val request = SnapshotProtocol.SaveSnapshot(metadata, snapshot) + actor + .ask(request, timeout) + .map { response => + response.flatMap { + case SaveSnapshotSuccess(metadata) => Instant.ofEpochMilli(metadata.timestamp).pure[F] + case SaveSnapshotFailure(_, err) => err.raiseError[F, Instant] + } + + } + } + + override def delete(seqNr: SeqNr): F[F[Unit]] = { + val metadata = SnapshotMetadata(persistenceId, seqNr) + val request = SnapshotProtocol.DeleteSnapshot(metadata) + actor + .ask(request, timeout) + .map { response => + response.flatMap { + case DeleteSnapshotSuccess(_) => ().pure[F] + case DeleteSnapshotFailure(_, err) => err.raiseError[F, Unit] + } + } + } + + override def delete(criteria: SnapshotSelectionCriteria): F[F[Unit]] = { + val request = SnapshotProtocol.DeleteSnapshots(persistenceId, criteria) + actor + .ask(request, timeout) + .map { response => + response.flatMap { + case DeleteSnapshotsSuccess(_) => ().pure[F] + case DeleteSnapshotsFailure(_, err) => err.raiseError[F, Unit] + } + } + } + + override def delete(criteria: com.evolutiongaming.akkaeffect.persistence.Snapshotter.Criteria): F[F[Unit]] = + delete(criteria.asAkka) + + } + } + + } + + } +} diff --git a/persistence/src/main/scala/akka/persistence/JournallerInterop.scala b/persistence/src/main/scala/akka/persistence/JournallerInterop.scala new file mode 100644 index 00000000..f70fea1d --- /dev/null +++ b/persistence/src/main/scala/akka/persistence/JournallerInterop.scala @@ -0,0 +1,109 @@ +package akka.persistence + +import akka.actor.ActorSystem + +import cats.syntax.all._ +import cats.effect.Async + +import com.evolutiongaming.catshelper.{ToTry, FromFuture} +import com.evolutiongaming.akkaeffect.ActorEffect +import com.evolutiongaming.akkaeffect.persistence.{JournallerOf, EventSourcedId, Journaller, Events, SeqNr} + +import scala.concurrent.duration._ +import com.evolutiongaming.akkaeffect.persistence.Append +import com.evolutiongaming.akkaeffect.persistence.DeleteEventsTo + +object JournallerInterop { + + def apply[F[_]: Async: ToTry: FromFuture]( + system: ActorSystem, + timeout: FiniteDuration + ): F[JournallerOf[F]] = + Async[F] + .delay { + Persistence(system) + } + .map { persistence => + new JournallerOf[F] { + + val F = Async[F] + + override def apply[E](journalPluginId: String, eventSourcedId: EventSourcedId, currentSeqNr: SeqNr): F[Journaller[F, E]] = + for { + actorRef <- F.delay(persistence.journalFor(journalPluginId)) + journaller <- F.delay(ActorEffect.fromActor(actorRef)) + appendedSeqNr <- F.ref(currentSeqNr) + } yield new Journaller[F, E] { + + val persistenceId = eventSourcedId.value + + override def append: Append[F, E] = new Append[F, E] { + + override def apply(events: Events[E]): F[F[SeqNr]] = { + + case class State(writes: Long, maxSeqNr: SeqNr) + val state = State(events.size, SeqNr.Min) + val actor = LocalActorRef[F, State, SeqNr](state, timeout) { + + case (state, JournalProtocol.WriteMessagesSuccessful) => state.asLeft[SeqNr].pure[F] + + case (state, JournalProtocol.WriteMessageSuccess(persistent, _)) => + val seqNr = persistent.sequenceNr max state.maxSeqNr + val result = + if (state.writes == 1) seqNr.asRight[State] + else State(state.writes - 1, seqNr).asLeft[SeqNr] + result.pure[F] + + case (_, JournalProtocol.WriteMessageRejected(_, error, _)) => error.raiseError[F, Either[State, SeqNr]] + + case (_, JournalProtocol.WriteMessagesFailed(error, _)) => error.raiseError[F, Either[State, SeqNr]] + + case (_, JournalProtocol.WriteMessageFailure(_, error, _)) => error.raiseError[F, Either[State, SeqNr]] + } + + for { + messages <- appendedSeqNr.modify { seqNr => + var _seqNr = seqNr + def nextSeqNr = { + _seqNr = _seqNr + 1 + _seqNr + } + val messages = events.values.toList.map { events => + val persistent = events.toList.map { event => + PersistentRepr(event, persistenceId = persistenceId, sequenceNr = nextSeqNr) + } + AtomicWrite(persistent) + } + _seqNr -> messages + } + actor <- actor + request = JournalProtocol.WriteMessages(messages, actor.ref, 0) + _ <- journaller.tell(request) + } yield actor.res + } + + } + + override def deleteTo: DeleteEventsTo[F] = new DeleteEventsTo[F] { + + override def apply(seqNr: SeqNr): F[F[Unit]] = { + + val actor = LocalActorRef[F, Unit, Unit]({}, timeout) { + case (_, DeleteMessagesSuccess(_)) => ().asRight[Unit].pure[F] + case (_, DeleteMessagesFailure(e, _)) => e.raiseError[F, Either[Unit, Unit]] + } + + for { + actor <- actor + request = JournalProtocol.DeleteMessagesTo(persistenceId, seqNr, actor.ref) + _ <- journaller.tell(request) + } yield actor.res + } + + } + } + + } + } + +} diff --git a/persistence/src/main/scala/akka/persistence/LocalActorRef.scala b/persistence/src/main/scala/akka/persistence/LocalActorRef.scala new file mode 100644 index 00000000..fb1ee278 --- /dev/null +++ b/persistence/src/main/scala/akka/persistence/LocalActorRef.scala @@ -0,0 +1,134 @@ +package akka.persistence + +import akka.actor.{ActorRef, MinimalActorRef} +import cats.effect.Temporal +import cats.syntax.all._ +import com.evolutiongaming.catshelper.CatsHelper.OpsCatsHelper +import com.evolutiongaming.catshelper.{SerialRef, ToTry} +import scala.concurrent.duration.FiniteDuration +import java.time.Instant +import java.util.concurrent.TimeoutException +import java.time.temporal.ChronoUnit + +/** Representation of actor capable of constructing result from multiple messages passed into the actor. Inspired by [[PromiseActorRef]] but + * result [[R]] is an aggregate from incomming messages rather that first message. Can be used only locally, does _not_ tolerate. + * [[ActorRef.provider]] and [[ActorRef.path]] functions. + * @tparam F + * The effect type. + * @tparam R + * The result type of the aggregate. + */ +private[persistence] trait LocalActorRef[F[_], R] { + + def ref: ActorRef + + /** Semantically blocking while aggregating result + */ + def res: F[R] + + /** Immidiately get currect state: + * + * \- [[None]] if aggregating not finished yet + * + * \- [[Some(Left(Throwable))]] if aggregation failed or timeout happened + * + * \- [[Some(Right(r))]] if aggregation completed successfully + */ + def get: F[Option[Either[Throwable, R]]] +} + +private[persistence] object LocalActorRef { + + type M = Any + + /** Create new [[LocalActorRef]] + * + * @param initial + * The initial state of type [[S]]. + * @param timeout + * [[TimeoutException]] will be thrown if no incomming messages received within the timeout. + * @param receive + * The aggregate function defining how to apply incomming message on state or produce final result: [[Left]] for continue aggregating + * while [[Right]] for the result. + * @tparam F + * The effect type. + * @tparam S + * The aggregating state type. + * @tparam R + * The final result type. + * @return + */ + def apply[F[_]: Temporal: ToTry, S, R](initial: S, timeout: FiniteDuration)( + receive: PartialFunction[(S, M), F[Either[S, R]]] + ): F[LocalActorRef[F, R]] = { + + val F = Temporal[F] + + case class State(state: S, updated: Instant) + + def timeoutException = new TimeoutException(s"no messages received during period of $timeout") + + for { + now <- F.realTimeInstant + state <- SerialRef.of[F, State](State(initial, now)) + defer <- F.deferred[Either[Throwable, R]] + fiber <- F.start { + val f = for { + _ <- F.sleep(timeout) + s <- state.get + n <- F.realTimeInstant + c = s.updated.plus(timeout.toNanos, ChronoUnit.NANOS).isBefore(n) + _ <- if (c) defer.complete(timeoutException.asLeft) else F.unit + } yield c + + ().tailRecM { _ => + f.ifF(().asRight, ().asLeft) + } + } + } yield new LocalActorRef[F, R] { + + private def done(e: Either[Throwable, R]) = + for { + _ <- defer.complete(e) + _ <- fiber.cancel + } yield {} + + override def ref: ActorRef = new MinimalActorRef { + + override def provider = throw new UnsupportedOperationException() + + override def path = throw new UnsupportedOperationException() + + override def !(m: M)(implicit sender: ActorRef): Unit = + state + .update { s => + if (receive.isDefinedAt(s.state -> m)) { + + for { + t <- Temporal[F].realTimeInstant + r <- receive(s.state -> m) + s <- r match { + case Left(s) => State(s, t).pure[F] + case Right(r) => done(r.asRight).as(s) + } + } yield s + + } else { + s.pure[F] + } + } + .handleErrorWith { e => + done(e.asLeft).void + } + .toTry + .get + + } + + override def res: F[R] = defer.get.flatMap(_.liftTo[F]) + + override def get: F[Option[Either[Throwable, R]]] = defer.tryGet + } + } + +} diff --git a/persistence/src/main/scala/akka/persistence/ReplayerInterop.scala b/persistence/src/main/scala/akka/persistence/ReplayerInterop.scala new file mode 100644 index 00000000..8eb70683 --- /dev/null +++ b/persistence/src/main/scala/akka/persistence/ReplayerInterop.scala @@ -0,0 +1,102 @@ +package akka.persistence + +import akka.actor.ActorSystem + +import cats.effect.{Async, Ref} +import cats.syntax.all._ + +import com.evolutiongaming.akkaeffect.ActorEffect +import com.evolutiongaming.akkaeffect.persistence.{Replayer, EventSourcedId, Event, SeqNr} +import com.evolutiongaming.catshelper.{FromFuture, ToTry} +import com.evolutiongaming.sstream.Stream +import com.evolutiongaming.sstream.FoldWhile._ + +import scala.concurrent.duration._ + +object ReplayerInterop { + + def apply[F[_]: Async: FromFuture: ToTry]( + system: ActorSystem, + timeout: FiniteDuration + ): F[Replayer.Of[F]] = + Async[F] + .delay { + Persistence(system) + } + .map { persistence => + new Replayer.Of[F] { + + override def apply[E](journalPluginId: String, eventSourcedId: EventSourcedId): F[Replayer[F, E]] = + Async[F] + .delay { + val ref = persistence.journalFor(journalPluginId) + ActorEffect.fromActor(ref) + } + .map { journaller => + new Replayer[F, E] { + + val persistenceId = eventSourcedId.value + + override def replay(fromSequenceNr: SeqNr, toSequenceNr: SeqNr, max: Long): F[Stream[F, Event[E]]] = { + + def actor(buffer: Ref[F, Vector[Event[E]]]) = + LocalActorRef[F, Unit, SeqNr]({}, timeout) { + + case (_, JournalProtocol.ReplayedMessage(persisted)) => + if (persisted.deleted) ().asLeft[SeqNr].pure[F] + else { + val payload = persisted.payload.asInstanceOf[E] + val event = Event.const(payload, persisted.sequenceNr) + buffer.update(_ :+ event).as(().asLeft[SeqNr]) + } + + case (_, JournalProtocol.RecoverySuccess(seqNr)) => seqNr.asRight[Unit].pure[F] + + case (_, JournalProtocol.ReplayMessagesFailure(error)) => error.raiseError[F, Either[Unit, SeqNr]] + } + + for { + buffer <- Ref[F].of(Vector.empty[Event[E]]) + actor <- actor(buffer) + request = JournalProtocol.ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, actor.ref) + _ <- journaller.tell(request) + } yield new Stream[F, Event[E]] { + + override def foldWhileM[L, R](l: L)(f: (L, Event[E]) => F[Either[L, R]]): F[Either[L, R]] = + l.asLeft[R] + .tailRecM { + + case Left(l) => + for { + events <- buffer.getAndSet(Vector.empty[Event[E]]) + done <- actor.get + result <- events.foldWhileM(l)(f) + result <- result match { + + case l: Left[L, R] => + done match { + case Some(Right(_)) => l.asRight[Either[L, R]].pure[F] // no more events + case Some(Left(er)) => er.raiseError[F, Either[Either[L, R], Either[L, R]]] // failure + case None => l.asLeft[Either[L, R]].pure[F] // expecting more events + } + + // Right(...), cos user-defined function [[f]] desided to stop consuming stream thus wrapping in Right to break tailRecM loop + case result => result.asRight[Either[L, R]].pure[F] + + } + } yield result + + case result => // cannot happened + result.asRight[Either[L, R]].pure[F] + } + + } + + } + + } + } + + } + } +} diff --git a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedPersistenceOf.scala b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedPersistenceOf.scala new file mode 100644 index 00000000..96da3400 --- /dev/null +++ b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedPersistenceOf.scala @@ -0,0 +1,75 @@ +package com.evolutiongaming.akkaeffect.persistence + +import akka.actor.ActorSystem +import akka.persistence.{ExtendedSnapshoterInterop, ReplayerInterop, JournallerInterop, SnapshotSelectionCriteria} + +import cats.syntax.all._ +import cats.Applicative +import cats.effect.Async + +import com.evolutiongaming.catshelper.ToTry +import com.evolutiongaming.catshelper.FromFuture +import com.evolutiongaming.akkaeffect.persistence.SeqNr +import com.evolutiongaming.sstream.Stream + +import scala.concurrent.duration._ + +trait EventSourcedPersistenceOf[F[_], S, E] { + + def apply(eventSourced: EventSourced[_]): F[EventSourcedPersistence[F, S, E]] + +} + +object EventSourcedPersistenceOf { + + def const[F[_]: Applicative, S, E](store: EventSourcedPersistence[F, S, E]): EventSourcedPersistenceOf[F, S, E] = + _ => store.pure[F] + + def fromAkka[F[_]: Async: ToTry: FromFuture, S, E]( + system: ActorSystem, + timeout: FiniteDuration + ): F[EventSourcedPersistenceOf[F, S, E]] = + for { + snapshotterOf <- ExtendedSnapshoterInterop[F](system, timeout) + replayerOf <- ReplayerInterop[F](system, timeout) + journallerOf <- JournallerInterop[F](system, timeout) + } yield new EventSourcedPersistenceOf[F, S, E] { + + override def apply(eventSourced: EventSourced[_]): F[EventSourcedPersistence[F, S, E]] = { + + val defaultPluginId = "" + val snapshotPluginId = eventSourced.pluginIds.snapshot.getOrElse(defaultPluginId) + val journalPluginId = eventSourced.pluginIds.journal.getOrElse(defaultPluginId) + + for { + extendedSn <- snapshotterOf[S](snapshotPluginId, eventSourced.eventSourcedId) + replayer <- replayerOf[E](journalPluginId, eventSourced.eventSourcedId) + } yield new EventSourcedPersistence[F, S, E] { + + override def recover: F[EventSourcedPersistence.Recovery[F, S, E]] = + extendedSn + .load(SnapshotSelectionCriteria(), SeqNr.Max) + .flatten + .map { snapshotOffer => + new EventSourcedPersistence.Recovery[F, S, E] { + + override def snapshot: Option[Snapshot[S]] = snapshotOffer + + override def events: Stream[F, Event[E]] = { + val fromSeqNr = snapshotOffer.map(_.metadata.seqNr + 1).getOrElse(SeqNr.Min) + val events = replayer.replay(fromSeqNr, SeqNr.Max, Long.MaxValue) + Stream.lift(events).flatten + } + } + } + + override def journaller(seqNr: SeqNr): F[Journaller[F, E]] = journallerOf[E](journalPluginId, eventSourced.eventSourcedId, seqNr) + + override def snapshotter: F[Snapshotter[F, S]] = extendedSn.snapshotter.pure[F] + + } + } + + } + +} diff --git a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/ExtendedSnapshotter.scala b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/ExtendedSnapshotter.scala new file mode 100644 index 00000000..11944a13 --- /dev/null +++ b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/ExtendedSnapshotter.scala @@ -0,0 +1,21 @@ +package com.evolutiongaming.akkaeffect.persistence + +import akka.persistence.SnapshotSelectionCriteria + +trait ExtendedSnapshotter[F[_], S] extends Snapshotter[F, S] { self => + + def load(criteria: SnapshotSelectionCriteria, toSequenceNr: SeqNr): F[F[Option[Snapshot[S]]]] + + val snapshotter: Snapshotter[F, S] = self + +} + +object ExtendedSnapshotter { + + trait Of[F[_]] { + + def apply[S](snapshotPluginId: String, eventSourcedId: EventSourcedId): F[ExtendedSnapshotter[F, S]] + + } + +} diff --git a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Replayer.scala b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Replayer.scala deleted file mode 100644 index 0d52b00d..00000000 --- a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Replayer.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.evolutiongaming.akkaeffect.persistence - -import com.evolutiongaming.sstream.Stream - -trait Replayer[F[_], E] { - - def replay(fromSequenceNr: SeqNr, toSequenceNr: SeqNr, max: Long): F[Stream[F, Event[E]]] - -} - -object Replayer { - - trait Of[F[_]] { - - def apply[E](journalPluginId: String, eventSourcedId: EventSourcedId): F[Replayer[F, E]] - - } - -} diff --git a/persistence/src/test/scala/akka/persistence/ExtendedSnapshoterInteropTest.scala b/persistence/src/test/scala/akka/persistence/ExtendedSnapshoterInteropTest.scala new file mode 100644 index 00000000..b0ba0dfe --- /dev/null +++ b/persistence/src/test/scala/akka/persistence/ExtendedSnapshoterInteropTest.scala @@ -0,0 +1,265 @@ +package akka.persistence + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import cats.syntax.all._ +import cats.effect.IO +import cats.effect.unsafe.implicits.global + +import com.evolutiongaming.akkaeffect.persistence.{EventSourcedId, SeqNr, Event, Snapshotter} +import com.evolutiongaming.akkaeffect.testkit.TestActorSystem + +import scala.util.Random + +import scala.concurrent.Future +import scala.concurrent.duration._ +import akka.persistence.snapshot.SnapshotStore +import akka.pattern.AskTimeoutException + +class ExtendedSnapshoterInteropTest extends AnyFunSuite with Matchers { + + val emptyPluginId = "" + + test("snapshot: load, save and load again") { + + val persistenceId = EventSourcedId("#1") + val payload = Random.nextString(1024) + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + of <- ExtendedSnapshoterInterop[IO](system, 1.second) + snapshotter <- of[String](emptyPluginId, persistenceId) + snapshot <- snapshotter.load(SnapshotSelectionCriteria(), Long.MaxValue).flatten + _ = snapshot shouldEqual none + _ <- snapshotter.save(SeqNr.Min, payload).flatten + snapshot <- snapshotter.load(SnapshotSelectionCriteria(), Long.MaxValue).flatten + _ = snapshot.get.snapshot should equal(payload) + } yield {} + } + + io.unsafeRunSync() + } + + test("snapshot: save, load, delete and load again") { + + val persistenceId = EventSourcedId("#2") + val payload = Random.nextString(1024) + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + of <- ExtendedSnapshoterInterop[IO](system, 1.second) + snapshotter <- of[String](emptyPluginId, persistenceId) + _ <- snapshotter.save(SeqNr.Min, payload).flatten + snapshot <- snapshotter.load(SnapshotSelectionCriteria(), Long.MaxValue).flatten + _ = snapshot.get.snapshot should equal(payload) + _ <- snapshotter.delete(SeqNr.Min).flatten + snapshot <- snapshotter.load(SnapshotSelectionCriteria(), Long.MaxValue).flatten + _ = snapshot shouldEqual none + } yield {} + } + + io.unsafeRunSync() + } + + test("snapshot: fail load snapshot") { + + val pluginId = "failing-snapshot" + val persistenceId = EventSourcedId("#3") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + of <- ExtendedSnapshoterInterop[IO](system, 1.second) + snapshotter <- of[String](pluginId, persistenceId) + snapshot <- snapshotter.load(SnapshotSelectionCriteria(), Long.MaxValue) + error <- snapshot.attempt + _ = error shouldEqual FailingSnapshotter.exception.asLeft[List[Event[String]]] + } yield {} + } + + io.unsafeRunSync() + } + + test("snapshot: fail save snapshot") { + + val pluginId = "failing-snapshot" + val persistenceId = EventSourcedId("#4") + val payload = Random.nextString(1024) + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + of <- ExtendedSnapshoterInterop[IO](system, 1.second) + snapshotter <- of[String](pluginId, persistenceId) + saving <- snapshotter.save(SeqNr.Min, payload) + error <- saving.attempt + _ = error shouldEqual FailingSnapshotter.exception.asLeft[List[Event[String]]] + } yield {} + } + + io.unsafeRunSync() + } + + test("snapshot: fail delete snapshot") { + + val pluginId = "failing-snapshot" + val persistenceId = EventSourcedId("#5") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + of <- ExtendedSnapshoterInterop[IO](system, 1.second) + snapshotter <- of[String](pluginId, persistenceId) + deleting <- snapshotter.delete(SeqNr.Min) + error <- deleting.attempt + _ = error shouldEqual FailingSnapshotter.exception.asLeft[List[Event[String]]] + } yield {} + } + + io.unsafeRunSync() + } + + test("snapshot: fail delete snapshot by criteria") { + + val pluginId = "failing-snapshot" + val persistenceId = EventSourcedId("#6") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + of <- ExtendedSnapshoterInterop[IO](system, 1.second) + snapshotter <- of[String](pluginId, persistenceId) + deleting <- snapshotter.delete(Snapshotter.Criteria()) + error <- deleting.attempt + _ = error shouldEqual FailingSnapshotter.exception.asLeft[List[Event[String]]] + } yield {} + } + + io.unsafeRunSync() + } + + test("snapshot: timeout loading snapshot") { + + val pluginId = "infinite-snapshot" + val persistenceId = EventSourcedId("#7") + + val io = TestActorSystem[IO]("testing", none) + .use(system => + for { + of <- ExtendedSnapshoterInterop[IO](system, 1.second) + snapshotter <- of[String](pluginId, persistenceId) + snapshot <- snapshotter.load(SnapshotSelectionCriteria(), Long.MaxValue) + error <- snapshot.attempt + } yield error match { + case Left(_: AskTimeoutException) => succeed + case Left(other) => fail(other) + case Right(_) => fail("the test should fail with AskTimeoutException but did no") + } + ) + + io.unsafeRunSync() + } + + test("snapshot: timeout saving snapshot") { + + val pluginId = "infinite-snapshot" + val persistenceId = EventSourcedId("#8") + val payload = Random.nextString(1024) + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + of <- ExtendedSnapshoterInterop[IO](system, 1.second) + snapshotter <- of[String](pluginId, persistenceId) + saving <- snapshotter.save(SeqNr.Min, payload) + error <- saving.attempt + } yield error match { + case Left(_: AskTimeoutException) => succeed + case Left(other) => fail(other) + case Right(_) => fail("the test should fail with AskTimeoutException but did no") + } + } + + io.unsafeRunSync() + } + + test("snapshot: timeout deleting snapshot") { + + val pluginId = "infinite-snapshot" + val persistenceId = EventSourcedId("#9") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + of <- ExtendedSnapshoterInterop[IO](system, 1.second) + snapshotter <- of[String](pluginId, persistenceId) + deleting <- snapshotter.delete(SeqNr.Min) + error <- deleting.attempt + } yield error match { + case Left(_: AskTimeoutException) => succeed + case Left(other) => fail(other) + case Right(_) => fail("the test should fail with AskTimeoutException but did no") + } + } + + io.unsafeRunSync() + } + + test("snapshot: timeout deleting snapshot by criteria") { + + val pluginId = "infinite-snapshot" + val persistenceId = EventSourcedId("#10") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + of <- ExtendedSnapshoterInterop[IO](system, 1.second) + snapshotter <- of[String](pluginId, persistenceId) + deleting <- snapshotter.delete(Snapshotter.Criteria()) + error <- deleting.attempt + } yield error match { + case Left(_: AskTimeoutException) => succeed + case Left(other) => fail(other) + case Right(_) => fail("the test should fail with AskTimeoutException but did no") + } + } + + io.unsafeRunSync() + } + +} + +object FailingSnapshotter { + + val exception = new RuntimeException("test exception") + +} + +class FailingSnapshotter extends SnapshotStore { + + override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = + Future.failed(FailingSnapshotter.exception) + + override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = Future.failed(FailingSnapshotter.exception) + + override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = Future.failed(FailingSnapshotter.exception) + + override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = + Future.failed(FailingSnapshotter.exception) + +} + +class InfiniteSnapshotter extends SnapshotStore { + + override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = Future.never + + override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = Future.never + + override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = Future.never + + override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = Future.never + +} diff --git a/persistence/src/test/scala/akka/persistence/JournallerAndReplayerInteropTest.scala b/persistence/src/test/scala/akka/persistence/JournallerAndReplayerInteropTest.scala new file mode 100644 index 00000000..435393db --- /dev/null +++ b/persistence/src/test/scala/akka/persistence/JournallerAndReplayerInteropTest.scala @@ -0,0 +1,206 @@ +package akka.persistence + +import akka.persistence.journal.AsyncWriteJournal +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import cats.syntax.all._ +import com.evolutiongaming.akkaeffect.persistence.Event +import com.evolutiongaming.akkaeffect.persistence.EventSourcedId +import com.evolutiongaming.akkaeffect.persistence.Events +import com.evolutiongaming.akkaeffect.persistence.SeqNr +import com.evolutiongaming.akkaeffect.testkit.TestActorSystem +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.TimeoutException +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.collection.immutable.Seq +import scala.util.Try + +class JournallerAndReplayerInteropTest extends AnyFunSuite with Matchers { + + val emptyPluginId = "" + + test("journal: replay (nothing), save, replay, delete, replay") { + + val persistenceId = EventSourcedId("#11") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + replayerOf <- ReplayerInterop[IO](system, 1.second) + replayer <- replayerOf[String](emptyPluginId, persistenceId) + journallerOf <- JournallerInterop[IO](system, 1.second) + journal <- journallerOf[String](emptyPluginId, persistenceId, SeqNr.Min) + events <- replayer.replay(SeqNr.Min, SeqNr.Max, Long.MaxValue) + events <- events.toList + _ = events shouldEqual List.empty[Event[String]] + seqNr <- journal.append(Events.of("first", "second")).flatten + _ = seqNr shouldEqual 2L + events <- replayer.replay(SeqNr.Min, SeqNr.Max, Long.MaxValue) + events <- events.toList + _ = events shouldEqual List(Event.const("first", 1L), Event.const("second", 2L)) + _ <- journal.deleteTo(1L).flatten + events <- replayer.replay(SeqNr.Min, SeqNr.Max, Long.MaxValue) + events <- events.toList + _ = events shouldEqual List(Event.const("second", 2L)) + } yield {} + } + + io.unsafeRunSync() + } + + test("journal: fail loading events") { + + val pluginId = "failing-journal" + val persistenceId = EventSourcedId("#11") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + replayerOf <- ReplayerInterop[IO](system, 1.second) + replayer <- replayerOf[String](pluginId, persistenceId) + events <- replayer.replay(SeqNr.Min, SeqNr.Max, Long.MaxValue) + error <- events.toList.attempt + } yield error shouldEqual FailingJournal.exception.asLeft[List[Event[String]]] + } + + io.unsafeRunSync() + } + + test("journal: fail persisting events") { + + val pluginId = "failing-journal" + val persistenceId = EventSourcedId("#12") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + journallerOf <- JournallerInterop[IO](system, 1.second) + journal <- journallerOf[String](pluginId, persistenceId, SeqNr.Min) + seqNr <- journal.append(Events.of[String]("first", "second")) + error <- seqNr.attempt + } yield error shouldEqual FailingJournal.exception.asLeft[SeqNr] + } + + io.unsafeRunSync() + } + + test("journal: fail deleting events") { + + val pluginId = "failing-journal" + val persistenceId = EventSourcedId("#13") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + journallerOf <- JournallerInterop[IO](system, 1.second) + journal <- journallerOf[String](pluginId, persistenceId, SeqNr.Min) + deleting <- journal.deleteTo(SeqNr.Max) + error <- deleting.attempt + } yield error shouldEqual FailingJournal.exception.asLeft[Unit] + } + + io.unsafeRunSync() + } + + test("journal: timeout on loading events") { + + val pluginId = "infinite-journal" + val persistenceId = EventSourcedId("#14") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + replayerOf <- ReplayerInterop[IO](system, 1.second) + replayer <- replayerOf[String](pluginId, persistenceId) + events <- replayer.replay(SeqNr.Min, SeqNr.Max, Long.MaxValue) + error <- events.toList.attempt + } yield error match { + case Left(_: TimeoutException) => succeed + case Left(e) => fail(e) + case Right(r) => fail(s"the test should fail with TimeoutException while actual result is $r") + } + } + + io.unsafeRunSync() + } + + test("journal: timeout persisting events") { + + val pluginId = "infinite-journal" + val persistenceId = EventSourcedId("#15") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + journallerOf <- JournallerInterop[IO](system, 1.second) + journal <- journallerOf[String](pluginId, persistenceId, SeqNr.Min) + seqNr <- journal.append(Events.of[String]("first", "second")) + error <- seqNr.attempt + } yield error match { + case Left(_: TimeoutException) => succeed + case Left(e) => fail(e) + case Right(r) => fail(s"the test should fail with TimeoutException while actual result is $r") + } + } + + io.unsafeRunSync() + } + + test("journal: timeout deleting events") { + + val pluginId = "infinite-journal" + val persistenceId = EventSourcedId("#16") + + val io = TestActorSystem[IO]("testing", none) + .use { system => + for { + journallerOf <- JournallerInterop[IO](system, 1.second) + journal <- journallerOf[String](pluginId, persistenceId, SeqNr.Min) + deleting <- journal.deleteTo(SeqNr.Max) + error <- deleting.attempt + } yield error match { + case Left(_: TimeoutException) => succeed + case Left(e) => fail(e) + case Right(r) => fail(s"the test should fail with TimeoutException while actual result is $r") + } + } + + io.unsafeRunSync() + } +} + +object FailingJournal { + val exception = new RuntimeException("test exception") +} + +class FailingJournal extends AsyncWriteJournal { + + override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( + recoveryCallback: PersistentRepr => Unit + ): Future[Unit] = Future.failed(FailingJournal.exception) + + override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = + Future.failed(FailingJournal.exception) + + override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = Future.failed(FailingJournal.exception) + + override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = Future.failed(FailingJournal.exception) + +} + +class InfiniteJournal extends AsyncWriteJournal { + + override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( + recoveryCallback: PersistentRepr => Unit + ): Future[Unit] = Future.never + + override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = Future.never + + override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = Future.never + + override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = Future.never + +} diff --git a/persistence/src/test/scala/akka/persistence/LocalActorRefTest.scala b/persistence/src/test/scala/akka/persistence/LocalActorRefTest.scala new file mode 100644 index 00000000..46f4452d --- /dev/null +++ b/persistence/src/test/scala/akka/persistence/LocalActorRefTest.scala @@ -0,0 +1,101 @@ +package akka.persistence + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import cats.syntax.all._ +import org.scalatest.matchers.should.Matchers +import org.scalatest.funsuite.AnyFunSuite +import scala.concurrent.duration._ +import java.util.concurrent.TimeoutException + +class LocalActorRefTest extends AnyFunSuite with Matchers { + + val poison = 100500 + val timeout = 1.second + + def of = LocalActorRef[IO, Int, Int](0, timeout) { + case (s, `poison`) => IO(s.asRight) + case (s, m: Int) => IO((s + m).asLeft) + } + + test("LocalActorRef can handle messages and produce result") { + val io = for { + r <- of + n <- r.get + _ = n shouldEqual none + _ <- IO(r.ref ! 3) + _ <- IO(r.ref ! 4) + _ <- IO(r.ref ! poison) + r <- r.res + _ = r shouldEqual 7 + } yield {} + + io.unsafeRunSync() + } + + test("LocalActorRef.res semantically blocks until result produced") { + val io = for { + d <- IO.deferred[Unit] + r <- of + f = r.res >> d.complete {} + f <- f.start + d0 <- d.tryGet + _ = d0 shouldEqual none + _ <- IO(r.ref ! poison) + _ <- f.join + d1 <- d.tryGet + _ = d1 shouldEqual {}.some + } yield {} + + io.unsafeRunSync() + } + + test("LocalActorRef should handle concurrent ! operations") { + val io = for { + r <- of + l = List.range(0, 100) + _ <- l.parTraverse(i => IO(r.ref ! i)) + _ <- IO(r.ref ! poison) + r <- r.res + _ = r shouldEqual l.sum + } yield {} + + io.unsafeRunSync() + } + + test(s"LocalActorRef should timeout aftet $timeout") { + val io = for { + r <- of + _ <- IO.sleep(timeout * 2) + e <- r.get + _ = e match { + case Some(Left(_: TimeoutException)) => succeed + case other => fail(s"unexpected result $other") + } + e <- r.res.attempt + _ = e match { + case Left(_: TimeoutException) => succeed + case other => fail(s"unexpected result $other") + } + } yield {} + + io.unsafeRunSync() + } + + test("LocalActorRef should not timeout while receiving messages within timeout span") { + val io = for { + r <- of + _ <- IO.sleep(timeout / 2) + _ <- IO(r.ref ! 2) + _ <- IO.sleep(timeout / 2) + _ <- IO(r.ref ! 3) + _ <- IO.sleep(timeout / 2) + _ <- IO(r.ref ! poison) + r <- r.res + _ = r shouldEqual 5 + } yield {} + + io.unsafeRunSync() + } + +} diff --git a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedPersistenceOfTest.scala b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedPersistenceOfTest.scala new file mode 100644 index 00000000..f9414d66 --- /dev/null +++ b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedPersistenceOfTest.scala @@ -0,0 +1,47 @@ +package com.evolutiongaming.akkaeffect.persistence + +import cats.syntax.all._ +import cats.effect.IO +import cats.effect.unsafe.implicits.global + +import com.evolutiongaming.akkaeffect.testkit.TestActorSystem + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.duration._ +import com.evolutiongaming.akkaeffect.persistence.SeqNr + +class EventSourcedPersistenceOfTest extends AnyFunSuite with Matchers { + + test("recover, append few events, save snapshot, append more events and recover again") { + + val io = TestActorSystem[IO]("testing", none).use { system => + val eventSourced = EventSourced[Unit](EventSourcedId("test#1"), value = {}) + + for { + persistenceOf <- EventSourcedPersistenceOf.fromAkka[IO, String, Int](system, 1.second) + persistence <- persistenceOf(eventSourced) + recover <- persistence.recover + _ = recover.snapshot shouldEqual none + events <- recover.events.toList + _ = events shouldEqual List.empty + journaller <- persistence.journaller(SeqNr.Min) + seqNr <- journaller.append(Events.of[Int](1, 2, 3, 4, 5)).flatten + _ = seqNr shouldEqual 5L + snapshotter <- persistence.snapshotter + at <- snapshotter.save(seqNr, "snapshot#1").flatten + seqNr10 <- journaller.append(Events.of[Int](6, 7, 8, 9, 10)).flatten + _ = seqNr10 shouldEqual 10L + recover <- persistence.recover + _ = recover.snapshot shouldEqual Snapshot.const("snapshot#1", Snapshot.Metadata(seqNr, at)).some + events <- recover.events.toList + _ = events shouldEqual List(6, 7, 8, 9, 10).map(n => Event.const(n, n)) + } yield {} + } + + io.unsafeRunSync() + + } + +}