Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka-free event-sourced persistence #269

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
08e5725
implement akka-free (persistence) recovery API
Nov 7, 2023
23a1177
fix 2.12 compile err
Nov 8, 2023
3d8795e
drop snapshot criteria from new ESS API
Nov 14, 2023
a3140b3
drop snapshot criteria
Nov 14, 2023
b7ae06e
wip: implement event-sourced (persistent) actor
Nov 21, 2023
3d3d1ee
wip: track seqNr in Append
Nov 21, 2023
d3f7f5c
wip: add docs & fix compile err
Nov 22, 2023
f5079cd
wip: implement EventSourcedStore creation from Akka persistent/snapsh…
Nov 22, 2023
36f76fb
wip: add basic EventSourcedActorOf test
Nov 23, 2023
d2490c3
wip: non-func improvement
Nov 23, 2023
2bf7580
wip: add more tests
Nov 27, 2023
16e0b88
wip: cleanup non-related changes
Nov 27, 2023
421f89c
wip: fix compilation of 2.12
Nov 27, 2023
878a64f
wip
Nov 28, 2023
8dff02e
wip: implement PersistenceAdapter
Nov 30, 2023
e7c54ad
make LocalActorRef.! sync & add tests
Dec 1, 2023
00069e3
wip: start working on PersistenceAdapterTest
Dec 1, 2023
8d0fdf3
wip: add tests for journaller/snaps=shotter failures
Dec 5, 2023
14212bf
wip: add tests for snapshotter timeouts
Dec 6, 2023
b1edd41
wip: add timeout into LocalActorRef
Dec 6, 2023
c04d036
wip: use partial func in LocalActorRef
Dec 6, 2023
339d37d
wip: add seqNr tracking
Dec 7, 2023
db68486
connect akka interop and EventSourcedPersistence
Dec 7, 2023
7e80488
add all new classes
Dec 7, 2023
bc9b9be
fix fromSeqNr
Dec 7, 2023
68b2443
non-func renaming
Dec 7, 2023
e88a92b
add persistenceOf test
Dec 9, 2023
f4ca6fe
add persistenceOf test
Dec 9, 2023
3a477bd
fix 2.12 tests
Dec 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ lazy val root = (project in file(".")
testkit,
persistence,
eventsourcing,
`eventsourcing-persistence`,
cluster,
`cluster-sharding`))

Expand Down Expand Up @@ -64,13 +65,20 @@ lazy val testkit = (project in file("testkit")
Akka.testkit % Test,
scalatest % Test)))

lazy val `eventsourcing-persistence` = (project in file("eventsourcing-persistence")
settings (name := "akka-effect-eventsourcing-persistence")
settings commonSettings
settings (
libraryDependencies ++= Seq(sstream)))

lazy val persistence = (project in file("persistence")
settings (name := "akka-effect-persistence")
settings commonSettings
dependsOn(
actor % "test->test;compile->compile",
testkit % "test->test;test->compile",
`actor-tests` % "test->test")
`eventsourcing-persistence` % "test->test;compile->compile",
actor % "test->test;compile->compile",
testkit % "test->test;test->compile",
`actor-tests` % "test->test")
settings (
libraryDependencies ++= Seq(
Akka.actor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.evolution.akkaeffect.eventsopircing.persistence

trait Event[E] {

def event: E
def seqNr: SeqNr

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.evolution.akkaeffect.eventsopircing.persistence

/**
* Event sourcing persistence API: provides snapshot followed by stream of events
* @tparam F effect
* @tparam S snapshot
* @tparam E event
*/
trait EventSourcedStore[F[_], S, E] {

import EventSourcedStore._

/**
* Start recovery by retrieving snapshot (eager, happening on outer F)
* and preparing for loading events (lazy op, happens on [[Recovery#events()]] stream materialisation)
* @param id persistent ID
* @param criteria snapshot lookup criteria
* @return [[Recovery]] instance, representing __started__ recovery
*/
def recover(id: Id, criteria: Criteria): F[Recovery[F, S, E]]

}

object EventSourcedStore {

/** ID of persistent actor
* @see [[com.evolutiongaming.akkaeffect.persistence.EventSourcedId]]
* @see [[akka.persistence.PersistentActor.persistenceId]]
*/
final case class Id(value: String) extends AnyVal

/**
* Snapshot lookup criteria
* @see [[akka.persistence.SnapshotSelectionCriteria]]
*/
final case class Criteria(maxSequenceNr: Long = Long.MaxValue,
dfakhritdinov marked this conversation as resolved.
Show resolved Hide resolved
maxTimestamp: Long = Long.MaxValue,
minSequenceNr: Long = 0L,
minTimestamp: Long = 0L)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.evolution.akkaeffect.eventsopircing.persistence

import com.evolutiongaming.sstream.Stream

/**
* Representation of __started__ recovery process:
* snapshot is already loaded in memory (if any)
* while events will be loaded only on materialisation of [[Stream]]
* @tparam F effect
* @tparam S snapshot
* @tparam E event
*/
trait Recovery[F[_], S, E] {

def snapshot: Option[Snapshot[S]]
def events: Stream[F, Event[E]]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.evolution.akkaeffect.eventsopircing.persistence

import java.time.Instant

trait Snapshot[S] {

def snapshot: S
def metadata: Snapshot.Metadata

}

object Snapshot {

final case class Metadata(seqNr: SeqNr, timestamp: Instant)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.evolution.akkaeffect.eventsopircing

package object persistence {

type SeqNr = Long

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package com.evolutiongaming.akkaeffect.persistence

import akka.persistence.SnapshotSelectionCriteria
import cats.syntax.all._
import cats.effect.{Async, Ref, Resource, Sync}
import akka.persistence.journal.AsyncRecovery
import akka.persistence.snapshot.SnapshotStore
import com.evolution.akkaeffect.eventsopircing.persistence.{
Event,
EventSourcedStore,
Recovery,
Snapshot
}
import com.evolutiongaming.catshelper.FromFuture
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.sstream.Stream

import java.time.Instant
import scala.concurrent.Future
import scala.util.Try
import com.evolutiongaming.sstream.FoldWhile._

object EventSourcedStoreOf {

/**
* [[EventSourcedStore]] implementation based on Akka Persistence API.
*
* The implementation delegates snapshot and events load to [[SnapshotStore]] and [[AsyncRecovery]].
* Snapshot loaded on [[EventSourcedStore#recover]] F while events loaded lazily:
* first events will be available for [[Stream#foldWhileM]] while tail still loaded by [[AsyncRecovery]]
*
* @param snapshotStore Akka Persistence snapshot (plugin)
* @param journal Akka Persistence journal (plugin)
* @tparam F effect
* @tparam S snapshot
* @tparam E event
* @return resource of [[EventSourcedStore]]
*/
def fromAkka[F[_]: Async: ToTry, S, E](snapshotStore: SnapshotStore,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall I'm more interested in actual implementation that meant to replace PersistentActor :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look on basic part of persistent actor replacement: com.evolutiongaming.akkaeffect.persistence.EventSourcedActorOf

journal: AsyncRecovery,
): Resource[F, EventSourcedStore[F, S, E]] = {

val eventSourcedStore = new EventSourcedStore[F, S, E] {

override def recover(
id: EventSourcedStore.Id,
criteria: EventSourcedStore.Criteria
): F[Recovery[F, S, E]] = {

val snapshotSelectionCriteria = SnapshotSelectionCriteria(
criteria.maxSequenceNr,
criteria.maxTimestamp,
criteria.minSequenceNr,
criteria.minTimestamp
)

snapshotStore
.loadAsync(id.value, snapshotSelectionCriteria)
.liftTo[F]
.map { offer =>
new Recovery[F, S, E] {

override val snapshot: Option[Snapshot[S]] =
offer.map { offer =>
new Snapshot[S] {
override def snapshot: S = offer.snapshot.asInstanceOf[S]

override def metadata: Snapshot.Metadata =
Snapshot.Metadata(
seqNr = offer.metadata.sequenceNr,
timestamp =
Instant.ofEpochMilli(offer.metadata.timestamp)
)
}
}

override val events: Stream[F, Event[E]] = {
val fromSequenceNr =
snapshot.map(_.metadata.seqNr).getOrElse(0L)

val stream = for {

buffer <- Ref[F].of(Vector.empty[Event[E]])

highestSequenceNr <- journal
.asyncReadHighestSequenceNr(id.value, fromSequenceNr)
.liftTo[F]

replayed <- Sync[F].delay {

journal.asyncReplayMessages(
id.value,
fromSequenceNr,
highestSequenceNr,
Long.MaxValue
) { persisted =>
if (persisted.deleted) {} else {
val event = new Event[E] {
override val event: E =
persisted.payload.asInstanceOf[E]
override val seqNr: SeqNr =
persisted.sequenceNr
}
val _ = buffer.update(_ :+ event).toTry
}
}

}
} yield {

new Stream[F, Event[E]] {

override def foldWhileM[L, R](
l: L
)(f: (L, Event[E]) => F[Either[L, R]]): F[Either[L, R]] = {

l.asLeft[R]
.tailRecM {
case Left(l) =>
for {
events <- buffer.getAndSet(Vector.empty[Event[E]])
result <- events.foldWhileM(l)(f)
result <- result match {

case l: Left[L, R] =>
for {
replayed <- Sync[F].delay(
replayed.isCompleted
)
} yield
if (replayed) l.asRight[Either[L, R]]
else l.asLeft[Either[L, R]]

case result =>
result.asRight[Either[L, R]].pure[F]

}
} yield result

case result => result.asRight[Either[L, R]].pure[F]
}
}

}
}

Stream.lift(stream).flatten
}
}
}
}

}

eventSourcedStore.pure[Resource[F, *]]

}

implicit class FromFutureSyntax[A](val future: Future[A]) extends AnyVal {
def liftTo[F[_]: FromFuture]: F[A] = FromFuture[F].apply(future)
}

implicit class ToTrySyntax[F[_], A](val fa: F[A]) extends AnyVal {
def toTry(implicit F: ToTry[F]): Try[A] = F(fa)
}

}
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ object Dependencies {
val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2"
val pureconfig = "com.github.pureconfig" %% "pureconfig" % "0.17.3"
val smetrics = "com.evolutiongaming" %% "smetrics" % "2.0.0"
val sstream = "com.evolutiongaming" %% "sstream" % "1.0.1"

object Cats {
private val version = "2.9.0"
Expand Down