Skip to content

Commit

Permalink
replace EventSourcedPersistence with SnapshotStore & EventStore
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Fakhritdinov committed Dec 11, 2023
1 parent e6f1680 commit d29e5aa
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,6 @@ object JournalKeeper {

def delete(criteria: SnapshotSelectionCriteria) = {

delete(Snapshotter.Criteria(criteria))

}

def delete(criteria: Snapshotter.Criteria): F[F[Unit]] = {

def selected(meta: SnapshotMetadata) = {
meta.seqNr <= criteria.maxSequenceNr && meta.timestamp.toEpochMilli <= criteria.maxSequenceNr
}
Expand All @@ -352,7 +346,6 @@ object JournalKeeper {
.map { _.joinWithNever }
}
}

}
}
}
Expand All @@ -377,4 +370,4 @@ object JournalKeeper {

implicit val configReaderConfig: ConfigReader[Config] = deriveReader
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,13 +510,6 @@ object JournalKeeperTest {
.add(Action.DeleteSnapshots(criteria))
.map { _.pure[F] }
}

def delete(criteria: Snapshotter.Criteria): F[F[Unit]] = {
actions
.add(Action.DeleteSnapshots(criteria.asAkka))
.map { _.pure[F] }
}

}


Expand Down Expand Up @@ -547,4 +540,4 @@ object JournalKeeperTest {
config,
Log.empty)
}
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.evolutiongaming.akkaeffect.persistence

import com.evolutiongaming.sstream

trait EventStore[F[_], A] extends EventStore.Read[F, A] with EventStore.Write[F, A]

object EventStore {

trait Read[F[_], A] {
def read(fromSeqNr: SeqNr): F[sstream.Stream[F, Event[A]]]
}

trait Write[F[_], A] {
def append: Append[F, A]
def deleteTo: DeleteEventsTo[F]
}

final case class Event[A](event: A, seqNr: SeqNr)

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.evolutiongaming.akkaeffect.persistence

import java.time.Instant

trait SnapshotStore[F[_], A] extends SnapshotStore.Read[F, A] with SnapshotStore.Write[F, A]

object SnapshotStore {

trait Read[F[_], A] {
def latest: F[Option[SnapshotStore.Offer[A]]]
}

trait Write[F[_], -A] {
def save(seqNr: SeqNr, snapshot: A): F[F[Instant]]
def delete(seqNr: SeqNr): F[F[Unit]]
def delete(criteria: Criteria): F[F[Unit]]
}

final case class Metadata(seqNr: SeqNr, timestamp: Instant)

final case class Offer[A](snapshot: A, metadata: Metadata)

final case class Criteria(
maxSequenceNr: Long = Long.MaxValue,
maxTimestamp: Long = Long.MaxValue,
minSequenceNr: Long = 0L,
minTimestamp: Long = 0L
)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package akka.persistence

import java.time.Instant

import akka.persistence.SnapshotProtocol.{DeleteSnapshot, DeleteSnapshots, Request, SaveSnapshot}
import akka.util.Timeout
import cats.effect.Sync
Expand Down Expand Up @@ -64,10 +65,6 @@ object SnapshotterInterop {
case a: DeleteSnapshotsFailure => a.cause.raiseError[F, Unit]
}
}

def delete(criteria: akkaeffect.persistence.Snapshotter.Criteria): F[F[Unit]] = {
delete(criteria.asAkka)
}
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object PersistentActorOf {

private def recoveryCompleted(seqNr: SeqNr): Unit = {
val journaller = Journaller[F, Any](resources.append.value, resources.deleteEventsTo).withFail(fail)
val snapshotter = SnapshotterOf[F, Any](actor, timeout).withFail(fail)
val snapshotter = Snapshotter[F, Any](actor, timeout).withFail(fail)
persistence.recoveryCompleted(seqNr, journaller, snapshotter)
}

Expand Down
Loading

0 comments on commit d29e5aa

Please sign in to comment.