diff --git a/akka-stream-utils/jvm/src/test/scala/StreamOpsSpec.scala b/akka-stream-utils/jvm/src/test/scala/StreamOpsSpec.scala index 45a7191..92e5eba 100644 --- a/akka-stream-utils/jvm/src/test/scala/StreamOpsSpec.scala +++ b/akka-stream-utils/jvm/src/test/scala/StreamOpsSpec.scala @@ -242,12 +242,18 @@ class StreamOpsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with checkTable2[Boolean, Int, Either](input, operations) } - "Either flattenF" in { + "Either collectRightF" in { Source(Seq(Right(1), Left(true), Right(2))) - .flattenF + .collectRightF .runWith(Sink.seq) .futureValue shouldBe Seq(1, 2) } + "Either flattenF" in { + Source(Seq(Right(Right(1)), Left(true), Right(Left(false)), Right(Right(2)))) + .flattenF + .runWith(Sink.seq) + .futureValue shouldBe Seq(Right(1), Left(true), Left(false), Right(2)) + } "Either flatMapMergeF" in { Source(Seq(Right(1), Left(true), Right(2))) .flatMapMergeF(8, element => Source(Seq(element * 2, element * 4))) diff --git a/akka-stream-utils/shared/src/main/scala/StreamOps.scala b/akka-stream-utils/shared/src/main/scala/StreamOps.scala index 1ec0d86..8e1d945 100644 --- a/akka-stream-utils/shared/src/main/scala/StreamOps.scala +++ b/akka-stream-utils/shared/src/main/scala/StreamOps.scala @@ -262,10 +262,16 @@ object StreamOps { case _ => Left(zero) }) - def flattenF: s.Repr[OutR] = s.collect { + def collectRightF: s.Repr[OutR] = s.collect { case Right(value) => value } + def flattenF[OutR2](implicit ev: OutR <:< Either[OutL, OutR2]): s.Repr[Either[OutL, OutR2]] = + s.map { + case Left(outL) => Left(outL) + case Right(outR) => ev.apply(outR) + } + def groupByF[K](maxSubstreams: Int, f: OutR => K): SubFlow[Either[OutL, OutR], Mat, s.Repr, s.Closed] = { s.groupBy(maxSubstreams, { case Right(value) => Some(f(value)) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 364c962..cc1eb3d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -161,14 +161,14 @@ object Dependencies { } object logback { - private val logback = "1.5.8" + private val logback = "1.5.12" val core = "ch.qos.logback" % "logback-core" % logback val classic = "ch.qos.logback" % "logback-classic" % logback } object elastic4s { - private val elastic4s = "8.15.2" + private val elastic4s = "8.15.3" private val elasticsearch = "8.15.3" val clientAkka = "nl.gn0s1s" %% "elastic4s-client-akka" % elastic4s diff --git a/project/build.properties b/project/build.properties index 0b699c3..09feeee 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.10.2 +sbt.version=1.10.4 diff --git a/project/plugins.sbt b/project/plugins.sbt index 72ebc6d..6eacb82 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,7 @@ addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.3.2") addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.17.0") addSbtPlugin("com.github.sbt" % "sbt-github-actions" % "0.24.0") -addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.7.0") +addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.9.0") addSbtPlugin("org.jetbrains.scala" % "sbt-ide-settings" % "1.1.2") addSbtPlugin("io.github.cquiroz" % "sbt-tzdb" % "4.3.0")