Skip to content

Commit

Permalink
Revert "delete persistence interop & tests"
Browse files Browse the repository at this point in the history
This reverts commit 7b75024.
  • Loading branch information
Denys Fakhritdinov committed Dec 9, 2023
1 parent 7b75024 commit df9ffd9
Show file tree
Hide file tree
Showing 11 changed files with 1,175 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package akka.persistence

import akka.actor.ActorSystem

import cats.syntax.all._
import cats.effect.Sync

import com.evolutiongaming.catshelper.FromFuture
import com.evolutiongaming.akkaeffect.ActorEffect
import com.evolutiongaming.akkaeffect.persistence.{ExtendedSnapshotter, EventSourcedId, SeqNr, Snapshot}

import scala.concurrent.duration._
import java.time.Instant

object ExtendedSnapshoterInterop {

def apply[F[_]: Sync: FromFuture](
system: ActorSystem,
timeout: FiniteDuration
): F[ExtendedSnapshotter.Of[F]] =
Sync[F]
.delay {
Persistence(system)
}
.map { persistence =>
new ExtendedSnapshotter.Of[F] {

override def apply[S](snapshotPluginId: String, eventSourcedId: EventSourcedId): F[ExtendedSnapshotter[F, S]] =
Sync[F]
.delay {
val ref = persistence.snapshotStoreFor(snapshotPluginId)
ActorEffect.fromActor(ref)
}
.map { actor =>
new ExtendedSnapshotter[F, S] {

val persistenceId = eventSourcedId.value

override def load(criteria: SnapshotSelectionCriteria, toSequenceNr: SeqNr): F[F[Option[Snapshot[S]]]] = {

val request = SnapshotProtocol.LoadSnapshot(persistenceId, criteria, toSequenceNr)
actor
.ask(request, timeout)
.map { response =>
response.flatMap {

case SnapshotProtocol.LoadSnapshotResult(snapshot, _) =>
snapshot match {

case Some(offer) =>
val payload = offer.snapshot.asInstanceOf[S]
val metadata = Snapshot.Metadata(
offer.metadata.sequenceNr,
Instant.ofEpochMilli(offer.metadata.timestamp)
)
Snapshot.const(payload, metadata).some.pure[F]

case None => none[Snapshot[S]].pure[F]
}

case SnapshotProtocol.LoadSnapshotFailed(err) =>
err.raiseError[F, Option[Snapshot[S]]]
}
}
}

override def save(seqNr: SeqNr, snapshot: S): F[F[Instant]] = {
val metadata = SnapshotMetadata(persistenceId, seqNr)
val request = SnapshotProtocol.SaveSnapshot(metadata, snapshot)
actor
.ask(request, timeout)
.map { response =>
response.flatMap {
case SaveSnapshotSuccess(metadata) => Instant.ofEpochMilli(metadata.timestamp).pure[F]
case SaveSnapshotFailure(_, err) => err.raiseError[F, Instant]
}

}
}

override def delete(seqNr: SeqNr): F[F[Unit]] = {
val metadata = SnapshotMetadata(persistenceId, seqNr)
val request = SnapshotProtocol.DeleteSnapshot(metadata)
actor
.ask(request, timeout)
.map { response =>
response.flatMap {
case DeleteSnapshotSuccess(_) => ().pure[F]
case DeleteSnapshotFailure(_, err) => err.raiseError[F, Unit]
}
}
}

override def delete(criteria: SnapshotSelectionCriteria): F[F[Unit]] = {
val request = SnapshotProtocol.DeleteSnapshots(persistenceId, criteria)
actor
.ask(request, timeout)
.map { response =>
response.flatMap {
case DeleteSnapshotsSuccess(_) => ().pure[F]
case DeleteSnapshotsFailure(_, err) => err.raiseError[F, Unit]
}
}
}

override def delete(criteria: com.evolutiongaming.akkaeffect.persistence.Snapshotter.Criteria): F[F[Unit]] =
delete(criteria.asAkka)

}
}

}

}
}
109 changes: 109 additions & 0 deletions persistence/src/main/scala/akka/persistence/JournallerInterop.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package akka.persistence

import akka.actor.ActorSystem

import cats.syntax.all._
import cats.effect.Async

import com.evolutiongaming.catshelper.{ToTry, FromFuture}
import com.evolutiongaming.akkaeffect.ActorEffect
import com.evolutiongaming.akkaeffect.persistence.{JournallerOf, EventSourcedId, Journaller, Events, SeqNr}

import scala.concurrent.duration._
import com.evolutiongaming.akkaeffect.persistence.Append
import com.evolutiongaming.akkaeffect.persistence.DeleteEventsTo

object JournallerInterop {

def apply[F[_]: Async: ToTry: FromFuture](
system: ActorSystem,
timeout: FiniteDuration
): F[JournallerOf[F]] =
Async[F]
.delay {
Persistence(system)
}
.map { persistence =>
new JournallerOf[F] {

val F = Async[F]

override def apply[E](journalPluginId: String, eventSourcedId: EventSourcedId, currentSeqNr: SeqNr): F[Journaller[F, E]] =
for {
actorRef <- F.delay(persistence.journalFor(journalPluginId))
journaller <- F.delay(ActorEffect.fromActor(actorRef))
appendedSeqNr <- F.ref(currentSeqNr)
} yield new Journaller[F, E] {

val persistenceId = eventSourcedId.value

override def append: Append[F, E] = new Append[F, E] {

override def apply(events: Events[E]): F[F[SeqNr]] = {

case class State(writes: Long, maxSeqNr: SeqNr)
val state = State(events.size, SeqNr.Min)
val actor = LocalActorRef[F, State, SeqNr](state, timeout) {

case (state, JournalProtocol.WriteMessagesSuccessful) => state.asLeft[SeqNr].pure[F]

case (state, JournalProtocol.WriteMessageSuccess(persistent, _)) =>
val seqNr = persistent.sequenceNr max state.maxSeqNr
val result =
if (state.writes == 1) seqNr.asRight[State]
else State(state.writes - 1, seqNr).asLeft[SeqNr]
result.pure[F]

case (_, JournalProtocol.WriteMessageRejected(_, error, _)) => error.raiseError[F, Either[State, SeqNr]]

case (_, JournalProtocol.WriteMessagesFailed(error, _)) => error.raiseError[F, Either[State, SeqNr]]

case (_, JournalProtocol.WriteMessageFailure(_, error, _)) => error.raiseError[F, Either[State, SeqNr]]
}

for {
messages <- appendedSeqNr.modify { seqNr =>
var _seqNr = seqNr
def nextSeqNr = {
_seqNr = _seqNr + 1
_seqNr
}
val messages = events.values.toList.map { events =>
val persistent = events.toList.map { event =>
PersistentRepr(event, persistenceId = persistenceId, sequenceNr = nextSeqNr)
}
AtomicWrite(persistent)
}
_seqNr -> messages
}
actor <- actor
request = JournalProtocol.WriteMessages(messages, actor.ref, 0)
_ <- journaller.tell(request)
} yield actor.res
}

}

override def deleteTo: DeleteEventsTo[F] = new DeleteEventsTo[F] {

override def apply(seqNr: SeqNr): F[F[Unit]] = {

val actor = LocalActorRef[F, Unit, Unit]({}, timeout) {
case (_, DeleteMessagesSuccess(_)) => ().asRight[Unit].pure[F]
case (_, DeleteMessagesFailure(e, _)) => e.raiseError[F, Either[Unit, Unit]]
}

for {
actor <- actor
request = JournalProtocol.DeleteMessagesTo(persistenceId, seqNr, actor.ref)
_ <- journaller.tell(request)
} yield actor.res
}

}
}

}
}

}
134 changes: 134 additions & 0 deletions persistence/src/main/scala/akka/persistence/LocalActorRef.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package akka.persistence

import akka.actor.{ActorRef, MinimalActorRef}
import cats.effect.Temporal
import cats.syntax.all._
import com.evolutiongaming.catshelper.CatsHelper.OpsCatsHelper
import com.evolutiongaming.catshelper.{SerialRef, ToTry}
import scala.concurrent.duration.FiniteDuration
import java.time.Instant
import java.util.concurrent.TimeoutException
import java.time.temporal.ChronoUnit

/** Representation of actor capable of constructing result from multiple messages passed into the actor. Inspired by [[PromiseActorRef]] but
* result [[R]] is an aggregate from incomming messages rather that first message. Can be used only locally, does _not_ tolerate.
* [[ActorRef.provider]] and [[ActorRef.path]] functions.
* @tparam F
* The effect type.
* @tparam R
* The result type of the aggregate.
*/
private[persistence] trait LocalActorRef[F[_], R] {

def ref: ActorRef

/** Semantically blocking while aggregating result
*/
def res: F[R]

/** Immidiately get currect state:
*
* \- [[None]] if aggregating not finished yet
*
* \- [[Some(Left(Throwable))]] if aggregation failed or timeout happened
*
* \- [[Some(Right(r))]] if aggregation completed successfully
*/
def get: F[Option[Either[Throwable, R]]]
}

private[persistence] object LocalActorRef {

type M = Any

/** Create new [[LocalActorRef]]
*
* @param initial
* The initial state of type [[S]].
* @param timeout
* [[TimeoutException]] will be thrown if no incomming messages received within the timeout.
* @param receive
* The aggregate function defining how to apply incomming message on state or produce final result: [[Left]] for continue aggregating
* while [[Right]] for the result.
* @tparam F
* The effect type.
* @tparam S
* The aggregating state type.
* @tparam R
* The final result type.
* @return
*/
def apply[F[_]: Temporal: ToTry, S, R](initial: S, timeout: FiniteDuration)(
receive: PartialFunction[(S, M), F[Either[S, R]]]
): F[LocalActorRef[F, R]] = {

val F = Temporal[F]

case class State(state: S, updated: Instant)

def timeoutException = new TimeoutException(s"no messages received during period of $timeout")

for {
now <- F.realTimeInstant
state <- SerialRef.of[F, State](State(initial, now))
defer <- F.deferred[Either[Throwable, R]]
fiber <- F.start {
val f = for {
_ <- F.sleep(timeout)
s <- state.get
n <- F.realTimeInstant
c = s.updated.plus(timeout.toNanos, ChronoUnit.NANOS).isBefore(n)
_ <- if (c) defer.complete(timeoutException.asLeft) else F.unit
} yield c

().tailRecM { _ =>
f.ifF(().asRight, ().asLeft)
}
}
} yield new LocalActorRef[F, R] {

private def done(e: Either[Throwable, R]) =
for {
_ <- defer.complete(e)
_ <- fiber.cancel
} yield {}

override def ref: ActorRef = new MinimalActorRef {

override def provider = throw new UnsupportedOperationException()

override def path = throw new UnsupportedOperationException()

override def !(m: M)(implicit sender: ActorRef): Unit =
state
.update { s =>
if (receive.isDefinedAt(s.state -> m)) {

for {
t <- Temporal[F].realTimeInstant
r <- receive(s.state -> m)
s <- r match {
case Left(s) => State(s, t).pure[F]
case Right(r) => done(r.asRight).as(s)
}
} yield s

} else {
s.pure[F]
}
}
.handleErrorWith { e =>
done(e.asLeft).void
}
.toTry
.get

}

override def res: F[R] = defer.get.flatMap(_.liftTo[F])

override def get: F[Option[Either[Throwable, R]]] = defer.tryGet
}
}

}
Loading

0 comments on commit df9ffd9

Please sign in to comment.