Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Flow.iterate method, allowing an Iterator-style traversal of flows. #3278

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

lowasser
Copy link
Contributor

Implements #3274.

@lowasser
Copy link
Contributor Author

(Not ready for submission; needs tests.)

@elizarov
Copy link
Contributor

elizarov commented May 11, 2022

I don't see how that's different from the flow-to-channel conversion, that's already supported. E.g. instead of:

flow.iterate { iter ->
   while (iter.hasNext()) {
       if (!predicate(iter.next())) return@iterate false // stops collecting the flow
   }
   true
}

You can write, to the same effect:

coroutineScope { // optional, you might already have a scope
   flow.produceIn(this).consume { // consume ensures that the channel is closed
      val iter = iterator()
      while (iter.hasNext()) {
         if (!predicate(iter.next())) return@consume // stops collecting the flow
      }
   }
}

You can create a simple iterate extension in your project to capture this pattern, if needed. You don't need all these FlowIterator definitions.

@lowasser
Copy link
Contributor Author

We want something linear: something that doesn't buffer ahead, something that doesn't collect the flow concurrently with the handling of its elements. This is especially bad in the case of testing, where we want a very strict sequence of stimulus and response.

@lowasser
Copy link
Contributor Author

In particular, this should deterministically succeed:

flow {
  emit(1)
  throw Exception()
}.iterate { iter ->
   while (iter.hasNext()) {
       if (itr.next() % 2 != 0) return@iterate true
   }
   false
}

@elizarov
Copy link
Contributor

elizarov commented May 12, 2022

There is no thread in the code I've posted. It launches a new coroutine just like your code. I don't see much difference. What I say is that there is already a solution via channels that does essentially the same thing that you propose.

@lowasser
Copy link
Contributor Author

Let me demonstrate the difference with some diagrams and some Kotlin Playground demos. It might be a difference you consider "essentially the same thing," but we've found it matters a lot in practice for us (Google).

Let A be the coroutine launched by produceIn, that collects the flow and sends it to a channel, and let B be the consumer of the flow.

Suppose the channel is a RENDEZVOUS channel, for the moment. (This is the best case scenario for the issue I'm going to discuss.) Let's suppose the flow emits X, Y, and let's step to the point in the program where A has emitted X and B has received X from the channel.

What produceIn -- or any solution based on a single Channel sending from A to B -- would do is to resume A now, working on emitting Y, while B is still processing X.

Here is a diagram of the control flow:

A(X)-B(X)-B(Y)
     A(Y)
      ^
      B(X) and A(Y) run concurrently/race
      that's the specific issue we care about
      it seems impossible to avoid in solutions that send from A to B via a channel

On the Kotlin playground, we can demonstrate this behavior: when I run https://pl.kotl.in/O04Er2PT5 (which does use buffer(0)), it prints

A(x)
A(y)
B(x)

It's clear that this could happen with nonzero buffer sizes, but it still happens even with 0/RENDEZVOUS. It is unavoidable with produceIn, as far as we can tell.

What we want instead

Instead, we want the control flow to look like:

A(X)-B(X)-A(Y)-B(Y)

with the strict alternating ordering. (This is the same control flow we'd have, for example, if B was a simple map or filter, but not if B was buffer.)

It launches a new coroutine just like your code

The PR does launch a separate coroutine to collect the flow, but coordinates between them to guarantee that they don't actually execute concurrently, and alternate as described above. That's the "new" thing in the PR.

Specifically, B suspends at a call to hasNext and passes its continuation to A via an internal channel. A receives the continuation on the channel and resumes, collecting the flow until it gets the next element. A then passes the element back to B via the continuation (resuming B), and suspends until B calls hasNext again to send another continuation. So A is always suspended while B is running, and B is always suspended while A is running.

https://pl.kotl.in/GB8_0J6NN demonstrates the proposed implementation (with some bugfixes I'll merge) and that it accomplishes the desired behavior (at least for a simple example).

// both versions terminate collection at B(x)
produceIn
A(x)
A(y)
B(x)
iterate
A(x)
B(x)

Why this matters

This is a subtle distinction, and you may still consider these "essentially the same thing." But there are several differences we care about. None of these are theoretical, we have encountered these issues the hard way.

  • In test use cases, debugging becomes significantly harder when A and B are going concurrently, or even in a single-threaded environment, if their ordering is not defined. The desirable state when debugging is that, at a breakpoint at B(X), A is suspended at the point where it just emitted X and has not done any more work. produceIn cannot guarantee this; iterate can.
  • The case I outlined in my earlier comment does not succeed deterministically with produceIn. A(Y) is throw Exception, and B(X) is return true, and since they race with produceIn, the function succeeds or fails nondeterministically. This comes up frequently in tests: flows often represent streaming RPC results, which may well throw partway through collection. We often test that case, and want something iterator-like so we can say "this stimulus, then the flow emits this, then this stimulus..." Using produceIn makes such tests almost unavoidably flaky because of races of that form.
  • Many flow transformations we want to build and run in production want to consume flows in this style and offer determinism guarantees to our users about what is and isn't consumed when. This isn't just handy for tests, but in code that wants to be testable and predictable.

This is not a one-or-two project need for us. Googlers have wanted this practically everywhere flows are tested, plus a variety of production use cases. Using produceIn and similar alternatives has resulted in observable issues like flaky tests and more difficulty in debugging, because of the described race.

Hopefully I've

  • described the desired behavior
  • proved that produceIn doesn't provide that behavior with the Playground demo
  • described how the PR hopes to implement the desired behavior
  • showed that, at least in a simple Playground example, the PR does implement the desired behavior
  • described why the desired behavior is not "essentially the same" to us
  • described why we need the desired behavior at more than a "project-level helper" level

@elizarov
Copy link
Contributor

Thanks a lot for an explanation! Now I think I understand what exactly you are trying to achieve.

Now, we need to better understand use-cases for such an iteration primitive. It seems that the motivating example you've provided can be readily implemented using collectWhile (see #1087). collectWhile has a much smaller API surface, a simpler implementation, and much better performance, so it would be a preferred solution for this particular use-case. Do you have any other use-cases in mind?

@qwwdfsad
Copy link
Member

Hi Louis, I'm back from vacation, so expect much more timely responses now.

Before discussing the API, let me express my general concerns about the push model -- it's both significantly[0] slower than the pull model in general and much more familiar to users. So, all things being equal, it's likely that a person not-yet-familiar with flows or Rx paradigm would pick push-based API just because it's more familiar, and because it takes less cognitive load to express the desired operator/API/transformation/business entity.

[0] -- it's not an exaggeration. Here is the simple benchmark that stresses pull vs push and push-based API is 30 times slower. I realize that the prototype is not yet polished, but my educated guess is that even if we cut all non-essentials (namely, channel), it still will be at least 5 times slower.

So, if we decide to introduce a push-based general-purpose API, its benefits should significantly outweigh these cons.
It looks like the PR is mainly driven by two use-cases: deterministic testing & debugging and "deterministic" flow operators, it would be nice to discuss them separately.

The desirable state when debugging is that, at a breakpoint at B(X), A is suspended at the point where it just emitted X and has not done any more work.

It looks like it has to be solved not on the operator level, but at the infrastructure/testing one. E.g. (optional) injectable dispatchers to the production source code and sequential testing primitives: kotlinx-coroutines-test or turbine. We just released a properly designed kotlinx-coroutines-test and Flow support is lagging behind there, so it could be a good opportunity to improve it. Could you please elaborate on why the testing infrastructure is not sufficient for your use case?
Also, I second Roman with collectWhile. If there is anything not covered with the current set of operators, high chance that we can provide it with a pull-based model.

Many flow transformations we want to build and run in production want to consume flows in this style and offer determinism guarantees

Could you please describe (or show an implementation) of a few such operators? It would be nice to know what kind of operators exactly we are talking about

@qwwdfsad
Copy link
Member

Also, if you don't mind, it would be really nice if we could continue our discussion on the corresponding issue (#3274) -- it's much easier to keep track of, it's where other people look at and it's what ends up in the changelog. Also, more convenient for the sake of historical digging :)

val result = flow.iterate {
next().also {
yield()
// not obvious if this results in a deterministic test?
Copy link
Member

@qwwdfsad qwwdfsad May 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have our own internal mechanism for making such tests explicitly deterministic: expect(num), expectUnreached() and finish(lastNum).
But probably it's not worth changing these tests in the prototype

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants