diff --git a/akka-stream-utils/jvm/src/test/scala/StreamOpsSpec.scala b/akka-stream-utils/jvm/src/test/scala/StreamOpsSpec.scala index 45a7191..92e5eba 100644 --- a/akka-stream-utils/jvm/src/test/scala/StreamOpsSpec.scala +++ b/akka-stream-utils/jvm/src/test/scala/StreamOpsSpec.scala @@ -242,12 +242,18 @@ class StreamOpsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with checkTable2[Boolean, Int, Either](input, operations) } - "Either flattenF" in { + "Either collectRightF" in { Source(Seq(Right(1), Left(true), Right(2))) - .flattenF + .collectRightF .runWith(Sink.seq) .futureValue shouldBe Seq(1, 2) } + "Either flattenF" in { + Source(Seq(Right(Right(1)), Left(true), Right(Left(false)), Right(Right(2)))) + .flattenF + .runWith(Sink.seq) + .futureValue shouldBe Seq(Right(1), Left(true), Left(false), Right(2)) + } "Either flatMapMergeF" in { Source(Seq(Right(1), Left(true), Right(2))) .flatMapMergeF(8, element => Source(Seq(element * 2, element * 4))) diff --git a/akka-stream-utils/shared/src/main/scala/StreamOps.scala b/akka-stream-utils/shared/src/main/scala/StreamOps.scala index 1ec0d86..8e1d945 100644 --- a/akka-stream-utils/shared/src/main/scala/StreamOps.scala +++ b/akka-stream-utils/shared/src/main/scala/StreamOps.scala @@ -262,10 +262,16 @@ object StreamOps { case _ => Left(zero) }) - def flattenF: s.Repr[OutR] = s.collect { + def collectRightF: s.Repr[OutR] = s.collect { case Right(value) => value } + def flattenF[OutR2](implicit ev: OutR <:< Either[OutL, OutR2]): s.Repr[Either[OutL, OutR2]] = + s.map { + case Left(outL) => Left(outL) + case Right(outR) => ev.apply(outR) + } + def groupByF[K](maxSubstreams: Int, f: OutR => K): SubFlow[Either[OutL, OutR], Mat, s.Repr, s.Closed] = { s.groupBy(maxSubstreams, { case Right(value) => Some(f(value))