-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Discuss(streaming): allow downstream consuming upstream buffer with different progress? #14184
Comments
Will it still be blocked by barrier? Even if some chunk is finished ahead in one partition, all executors still need to wait for all barriers to be collected from each parallelism. |
I suppose this is talking about data inside one epoch i.e. between 2 barriers. If so, it seems that the only solution is to use a larger exchange buffer size? From my understanding, the multiple downstream actually is consuming with different progress, but the channel size is limited, in other words, the max lag between the fastest and slowest consumer is limited. |
Exactly. That's what I meant by
|
Yes, but I am not sure if the "side backpressure" makes sense. The slow actor can backpressure the other parallelism. And the CN's network latency will not be the only influencing factor of the exchange buffer size. |
This issue has been open for 60 days with no activity. If you think it is still relevant today, and needs to be done in the near future, you can comment to update the status, or just manually remove the You can also confidently close this issue as not planned to keep our backlog clean. |
We have fixed a part of the issue #13678, But there are still some "alignments" as @BugenZhao said
more specifically, our dispatcher sends stream messages to the downstream one by one. In that model, the sender must wait for all downstream receivers to accept the stream message to send the next message. Under this model, If one of the downstream actors is idle, It still needs to wait for other downstream actors to process this stream message.
This is especially harmful to performance, even for temporary skewness. For example, the metrics graph is one of the real-world cases. The green line at the top is the total throughput of the fragment, sum throughput of different parallelism. And the other lines are the other specific actor's throughput.
There are 8 parallelisms and We can see sometimes, there are only 3 or 2 parallelism working. And this case happens in one barrier so the "alignment" is not introduced by the barrier.
This case can happen easily in some workload.
Let's suppose the agg is the bottleneck and do the DMLs
In that case, the join will amplify the (1,2,3) to 30000 records and send to the downstream. Then the
StreamExchange { dist: HashShard(d.k) }
can receive a serial of events with the same distribution key likeThis issue could also happen when multiple MV depending on the same MV.
So maybe we should allow downstream consuming upstream buffer with different progress.
The text was updated successfully, but these errors were encountered: