-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a Flow.iterate method, allowing an Iterator-style traversal of flows. #3278
base: master
Are you sure you want to change the base?
Conversation
(Not ready for submission; needs tests.) |
I don't see how that's different from the flow-to-channel conversion, that's already supported. E.g. instead of:
You can write, to the same effect:
You can create a simple |
We want something linear: something that doesn't buffer ahead, something that doesn't collect the flow concurrently with the handling of its elements. This is especially bad in the case of testing, where we want a very strict sequence of stimulus and response. |
In particular, this should deterministically succeed:
|
There is no thread in the code I've posted. It launches a new coroutine just like your code. I don't see much difference. What I say is that there is already a solution via channels that does essentially the same thing that you propose. |
Let me demonstrate the difference with some diagrams and some Kotlin Playground demos. It might be a difference you consider "essentially the same thing," but we've found it matters a lot in practice for us (Google). Let A be the coroutine launched by Suppose the channel is a What Here is a diagram of the control flow:
On the Kotlin playground, we can demonstrate this behavior: when I run https://pl.kotl.in/O04Er2PT5 (which does use
It's clear that this could happen with nonzero buffer sizes, but it still happens even with 0/RENDEZVOUS. It is unavoidable with What we want insteadInstead, we want the control flow to look like:
with the strict alternating ordering. (This is the same control flow we'd have, for example, if B was a simple
The PR does launch a separate coroutine to collect the flow, but coordinates between them to guarantee that they don't actually execute concurrently, and alternate as described above. That's the "new" thing in the PR. Specifically, B suspends at a call to https://pl.kotl.in/GB8_0J6NN demonstrates the proposed implementation (with some bugfixes I'll merge) and that it accomplishes the desired behavior (at least for a simple example).
Why this mattersThis is a subtle distinction, and you may still consider these "essentially the same thing." But there are several differences we care about. None of these are theoretical, we have encountered these issues the hard way.
This is not a one-or-two project need for us. Googlers have wanted this practically everywhere flows are tested, plus a variety of production use cases. Using Hopefully I've
|
Thanks a lot for an explanation! Now I think I understand what exactly you are trying to achieve. Now, we need to better understand use-cases for such an iteration primitive. It seems that the motivating example you've provided can be readily implemented using |
Hi Louis, I'm back from vacation, so expect much more timely responses now. Before discussing the API, let me express my general concerns about the push model -- it's both significantly[0] slower than the pull model in general and much more familiar to users. So, all things being equal, it's likely that a person not-yet-familiar with flows or Rx paradigm would pick push-based API just because it's more familiar, and because it takes less cognitive load to express the desired operator/API/transformation/business entity. [0] -- it's not an exaggeration. Here is the simple benchmark that stresses pull vs push and push-based API is 30 times slower. I realize that the prototype is not yet polished, but my educated guess is that even if we cut all non-essentials (namely, channel), it still will be at least 5 times slower. So, if we decide to introduce a push-based general-purpose API, its benefits should significantly outweigh these cons.
It looks like it has to be solved not on the operator level, but at the infrastructure/testing one. E.g. (optional) injectable dispatchers to the production source code and sequential testing primitives:
Could you please describe (or show an implementation) of a few such operators? It would be nice to know what kind of operators exactly we are talking about |
Also, if you don't mind, it would be really nice if we could continue our discussion on the corresponding issue (#3274) -- it's much easier to keep track of, it's where other people look at and it's what ends up in the changelog. Also, more convenient for the sake of historical digging :) |
val result = flow.iterate { | ||
next().also { | ||
yield() | ||
// not obvious if this results in a deterministic test? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have our own internal mechanism for making such tests explicitly deterministic: expect(num)
, expectUnreached()
and finish(lastNum)
.
But probably it's not worth changing these tests in the prototype
Implements #3274.