Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discuss(streaming): allow downstream consuming upstream buffer with different progress? #14184

Closed
st1page opened this issue Dec 25, 2023 · 5 comments
Assignees
Milestone

Comments

@st1page
Copy link
Contributor

st1page commented Dec 25, 2023

We have fixed a part of the issue #13678, But there are still some "alignments" as @BugenZhao said

If there's a lot of downstream jobs and then a lot of actors, this might be a significant improvement. On the other side, the inputs are executed concurrently and permits (#6170) are returned concurrently as well.
However, it's still worth noting that there's still an "alignment" where we have to wait for all dispatchers finishing the dispatching. The asynchronization is achieved by exchange buffers.

more specifically, our dispatcher sends stream messages to the downstream one by one. In that model, the sender must wait for all downstream receivers to accept the stream message to send the next message. Under this model, If one of the downstream actors is idle, It still needs to wait for other downstream actors to process this stream message.
This is especially harmful to performance, even for temporary skewness. For example, the metrics graph is one of the real-world cases. The green line at the top is the total throughput of the fragment, sum throughput of different parallelism. And the other lines are the other specific actor's throughput.
There are 8 parallelisms and We can see sometimes, there are only 3 or 2 parallelism working. And this case happens in one barrier so the "alignment" is not introduced by the barrier.
image

This case can happen easily in some workload.

CREATE TABLE fact(v int, k int);
CREATE TABLE d(k int PRIMARY KEY);

CREATE MATERIALIZED VIEW mv AS 
SELECT sum(v)
FROM fact 
JOIN d ON fact.k = d.k
group by d.k;
                                                  QUERY PLAN                                                   
---------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [sum, d.k(hidden)], stream_key: [d.k], pk_columns: [d.k], pk_conflict: NoCheck }
 └─StreamProject { exprs: [sum(fact.v), d.k] }
   └─StreamHashAgg { group_key: [d.k], aggs: [sum(fact.v), count] }
     └─StreamExchange { dist: HashShard(d.k) }
       └─StreamHashJoin { type: Inner, predicate: fact.k = d.k }
         ├─StreamExchange { dist: HashShard(fact.k) }
         │ └─StreamTableScan { table: fact, columns: [v, k, _row_id] }
         └─StreamExchange { dist: HashShard(d.k) }
           └─StreamTableScan { table: d, columns: [k] }
(9 rows)

Let's suppose the agg is the bottleneck and do the DMLs

insert into fact select generate_series(1, 10000), 1;
insert into fact select generate_series(1, 10000), 2;
insert into fact select generate_series(1, 10000), 3;

insert into d select values(1,2,3);

In that case, the join will amplify the (1,2,3) to 30000 records and send to the downstream. Then the StreamExchange { dist: HashShard(d.k) } can receive a serial of events with the same distribution key like

(1,1), (1,2) ... (1,10000), (2,1), (2,2) ... (2,10000), (3,1), (3,2) ... (3,10000)

This issue could also happen when multiple MV depending on the same MV.

So maybe we should allow downstream consuming upstream buffer with different progress.

@github-actions github-actions bot added this to the release-1.6 milestone Dec 25, 2023
@st1page st1page changed the title If there's a lot of downstream jobs and then a lot of actors, this might be a significant improvement. On the other side, the inputs are executed concurrently and permits (#6170) are returned concurrently as well. Discussiong: allow downstream consuming upstream buffer with different progress? Dec 25, 2023
@st1page st1page changed the title Discussiong: allow downstream consuming upstream buffer with different progress? Discuss(streaming): allow downstream consuming upstream buffer with different progress? Dec 25, 2023
@kwannoel
Copy link
Contributor

Will it still be blocked by barrier?

Even if some chunk is finished ahead in one partition, all executors still need to wait for all barriers to be collected from each parallelism.

@fuyufjh
Copy link
Member

fuyufjh commented Dec 25, 2023

I suppose this is talking about data inside one epoch i.e. between 2 barriers.

If so, it seems that the only solution is to use a larger exchange buffer size? From my understanding, the multiple downstream actually is consuming with different progress, but the channel size is limited, in other words, the max lag between the fastest and slowest consumer is limited.

@BugenZhao
Copy link
Member

the multiple downstream actually is consuming with different progress, but the channel size is limited

Exactly. That's what I meant by

The asynchronization is achieved by exchange buffers.

@st1page
Copy link
Contributor Author

st1page commented Dec 28, 2023

I suppose this is talking about data inside one epoch i.e. between 2 barriers.

If so, it seems that the only solution is to use a larger exchange buffer size? From my understanding, the multiple downstream actually is consuming with different progress, but the channel size is limited, in other words, the max lag between the fastest and slowest consumer is limited.

Yes, but I am not sure if the "side backpressure" makes sense. The slow actor can backpressure the other parallelism. And the CN's network latency will not be the only influencing factor of the exchange buffer size.

@st1page st1page modified the milestones: release-1.6, release-1.7 Jan 9, 2024
@st1page st1page modified the milestones: release-1.7, release-1.8 Mar 6, 2024
@st1page st1page removed this from the release-1.8 milestone Apr 8, 2024
@st1page st1page added this to the release-1.10 milestone May 14, 2024
@st1page st1page self-assigned this May 14, 2024
Copy link
Contributor

github-actions bot commented Aug 1, 2024

This issue has been open for 60 days with no activity.

If you think it is still relevant today, and needs to be done in the near future, you can comment to update the status, or just manually remove the no-issue-activity label.

You can also confidently close this issue as not planned to keep our backlog clean.
Don't worry if you think the issue is still valuable to continue in the future.
It's searchable and can be reopened when it's time. 😄

@st1page st1page closed this as not planned Won't fix, can't repro, duplicate, stale Aug 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants