Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Add Flow#concatAllDeferred operator. #1652

Open
He-Pin opened this issue Dec 31, 2024 · 5 comments
Open

Feature Request: Add Flow#concatAllDeferred operator. #1652

He-Pin opened this issue Dec 31, 2024 · 5 comments
Labels
t:stream Pekko Streams

Comments

@He-Pin
Copy link
Member

He-Pin commented Dec 31, 2024

Motivation:
The original issue is #1623 and #1566 ,
which do help find some problems, but with how the current interpreter and concatAllLazy are implemented, we can not fix the problem.

refs: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#concat-java.lang.Iterable-

So a new operator is needed.

Modification:
I would like to add a new operator concatAllDeferred to support this usage.

  def concatAllDeferred[U >: Out](those: Graph[SourceShape[U], _]*): Repr[U] =
    concatLazy(Source.lazySource(
      () => Source(those).flatMapConcat(ConstantFun.scalaIdentityFunction)))

Result:

package org.apache.pekko.stream.scaladsl

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.util.ByteString

import scala.concurrent.Await

object PekkoQuickstart extends App {
  private implicit val system: ActorSystem = ActorSystem()

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 400000))
    .take(1000000)
    .prefixAndTail(50000)
    .flatMapConcat { case (prefix, tail) => Source(prefix).concatLazy(tail) }

  val r = Source.empty
    .concatAllDeferred(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore)

  Await.result(r, scala.concurrent.duration.Duration.Inf)
  println(r.value)

  //  Source
  //    .repeat(s)
  //    .take(30000)
  //    .flatMapConcat(x => x)
  //    .runWith(Sink.ignore)
  //    .onComplete(println(_))

  //  Source.empty
  //    .concatAllLazy(List.tabulate(30000)(_ => Source.lazySource(() => s)): _*)
  //    .runWith(Sink.ignore).onComplete(println(_))
}

runs without problem

image
@He-Pin He-Pin added the t:stream Pekko Streams label Dec 31, 2024
@He-Pin He-Pin added this to the 1.2.0 milestone Dec 31, 2024
@queimadus
Copy link

Is adding a new operator for this worth it? Since it's just almost the same as calling flatMapConcat directly?

For more context, this is only really a problem when using the graphDSL and having to connect different these kinds of flows into a many-port Concat stage.

@He-Pin
Copy link
Member Author

He-Pin commented Dec 31, 2024

For easy usage, I think that's ok, but at least, we need to update the doc about this, the current two pending PRs are still valid.
And it help me find some problems in the interpreter too.

@He-Pin He-Pin removed this from the 1.2.0 milestone Jan 2, 2025
@He-Pin
Copy link
Member Author

He-Pin commented Jan 2, 2025

@raboof wdyt about this, I think this is the only way for @queimadus 's user case, or we can add some documents about that.

This method should be handy.

@raboof
Copy link
Member

raboof commented Jan 2, 2025

Hmm, this seems like quite a corner case.

If you're using Streams like this, you'll have to be quite aware of those intricacies already. At that point perhaps it isn't too much to ask the user to combine concatLazy, lazySource and flatMapConcat (would flatten work here?). Adding concatAllDeferred might make things shorter, but I'm not sure it'd make things easier to understand. What would the scaladoc say - can we give a clear description of in what kinds of situations you'd use this function?

@He-Pin
Copy link
Member Author

He-Pin commented Jan 2, 2025

eg: the concatAllLazy will materialize all graphs before they are pulled and keep them in memory until the resulting graph completes, if you have large numbers of graphs that can not be what you want, try concatAllDeferred.

At least it took me many hours to find the true problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
t:stream Pekko Streams
Projects
None yet
Development

No branches or pull requests

3 participants