Skip to content

Commit

Permalink
Add evalScan1 operator
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostbuster91 committed Mar 15, 2022
1 parent 74a9ee0 commit 1de27e2
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
23 changes: 23 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,29 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
(Pull.output1(z) >> go(z, this)).stream
}

/** Like `[[Stream#scan1]]`, but accepts a function returning an `F[_]`.
*
* @example {{{
* scala> import cats.effect.SyncIO
* scala> Stream(1,2,3,4).covary[SyncIO].evalScan1((acc,i) => SyncIO(acc + i)).compile.toVector.unsafeRunSync()
* res0: Vector[Int] = Vector(1, 3, 6, 10)
* }}}
*/
def evalScan1[F2[x] >: F[x], O2 >: O](f: (O2, O2) => F2[O2]): fs2.Stream[F2, O2] = {
def go(z: O2, s: fs2.Stream[F2, O]): Pull[F2, O2, Unit] =
s.pull.uncons1.flatMap {
case Some((hd, tl)) =>
Pull.eval(f(z, hd)).flatMap(o => Pull.output1(o) >> go(o, tl))
case None => Pull.done
}
this.pull.uncons.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
val (pre, post) = hd.splitAt(1)
Pull.output(pre) >> go(pre(0), tl.cons(post))
}.stream
}

/** Like `observe` but observes with a function `O => F[O2]` instead of a pipe.
* Not as powerful as `observe` since not all pipes can be represented by `O => F[O2]`, but much faster.
* Alias for `evalMap(o => f(o).as(o))`.
Expand Down
13 changes: 13 additions & 0 deletions core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,19 @@ class StreamCombinatorsSuite extends Fs2Suite {
}
}

test("evalScan1") {
forAllF { (s: Stream[Pure, Int]) =>
val v = s.toVector
val f = (a: Int, b: Int) => a + b
val g = (a: Int, b: Int) => IO.pure(a + b)
s.covary[IO]
.evalScan1(g)
.assertEmits(
v.headOption.fold(Vector.empty[Int])(h => v.drop(1).scanLeft(h)(f)).toList
)
}
}

test("every".flaky) {
type BD = (Boolean, FiniteDuration)
def durationSinceLastTrue[F[_]]: Pipe[F, BD, BD] = {
Expand Down

0 comments on commit 1de27e2

Please sign in to comment.