-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Review #1
Comments
@oleg-py Thank you very much! WorkerPool
I don't understand clearly, how can I separate what must be uncancelable from what can be cancelable... def exec(a: A): F[B] =
for {
currentVersion <- version.get
b <- Concurrent[F].uncancelable {
current.take >>= {
case (workerVersion, worker) =>
if (currentVersion == workerVersion)
worker(a).guarantee(back(workerVersion, worker))
else exec(a)
}
}
} yield b Race
Awesome! Thank you: object success {
def unapply[F[_], A](attempt: Attempt[F, A]): Option[Success[F, A]] =
attempt.leftMap(_.swap).merge match {
case (fib, Right(v)) => (v, fib).some
case _ => none
}
}
object failure {
def unapply[F[_], A](attempt: Attempt[F, A]): Option[Failure[F, A]] =
attempt.leftMap(_.swap).merge match {
case (fib, Left(e)) => (fib, CompositeException.one(e)).some
case _ => none
}
}
I don't understand how it possible without any runtime checking/pattern matching. All what I get: case class CompositeException(ex: NonEmptyList[Throwable]) extends Exception("All race candidates have failed") {
def compose(e: CompositeException): CompositeException = CompositeException(ex concatNel e.ex)
}
case object CompositeException {
def apply(ex: Throwable): CompositeException = CompositeException(NonEmptyList.one(ex))
}
object failure {
def unapply[F[_], A](attempt: Attempt[F, A]): Option[Failure[F, A]] =
attempt.leftMap(_.swap).merge match {
case (fib, Left(e)) => (fib, CompositeException(e)).some
case _ => none
}
}
def composeAndContinue[F[_]: Sync, A](fib: Fiber[F, Either[Throwable, A]], ea: CompositeException): F[A] =
fib.join >>= {
// TODO: How avoid PM on CompositeException?
case Left(eb: CompositeException) => Sync[F].raiseError[A](ea.compose(eb))
case Left(eb) => Sync[F].raiseError[A](ea.compose(CompositeException(eb)))
case Right(v) => v.pure[F]
} |
WorkerPool
I like your implementation with versions, however,
exec
not safe wrt cancellation:Because of cancelable flatMaps, it's possible for cancellation to happen at
[X]
or[Y]
, in which case you leak the worker (as it's never returned to the MVar, even if I didn't doremoveAll
).You can fix it using
uncancelable
, but you don't want to make everything uncancelable.bracket
would work too (acquire and release parts of bracket are uncancelable), but looping and de-structuring would be painful.Race
It seems (from the code) that you're building a tree of CompositeExceptions with
prepend
, i.e.It also seems that your
getMessage
override would not discriminate between these cases :) Ideally, you'd want to somehow ensure you work withCompositeException
instead ofThrowable
, which you can safely combine together.Custom extractors are a nice touch, but there's also an option to avoid matching on two cases: Either has
merge
method, which can only be called if left and right have (potential) same type. So, you can do something likeeither.leftMap(_.swap).merge
to get(Fiber[...], Either[...])
.And if it all seems like nitpicking, that's because there's not really anything significant for me to complain about. Nice solutions, and congratulations 👍
The text was updated successfully, but these errors were encountered: