Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge small chunks for sink executor #17824

Closed
chenzl25 opened this issue Jul 26, 2024 · 7 comments · Fixed by #17825 or #17968
Closed

feat: merge small chunks for sink executor #17824

chenzl25 opened this issue Jul 26, 2024 · 7 comments · Fixed by #17825 or #17968

Comments

@chenzl25
Copy link
Contributor

Is your feature request related to a problem? Please describe.

It has been found that the chunks of a Kinesis sink executor are too small, with an average cardinality of 1.5. Since the Kinesis sink sends one chunk at a time to the external Kinesis system, if the distance between RisingWave and Kinesis is too great, it will result in low sink throughput.

I think we can support a chunk merge executor for sinks to avoid small chunks.
image

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@fuyufjh
Copy link
Member

fuyufjh commented Jul 29, 2024

IIUC, this problem eixsts for all internal operators: any chunk that passes an Exchange will be splitted into smaller chunks. Saying, the parallelism = 128, then every chunk will be divided by 128 = 2 rows

I am thinking that, since the problem was caused by HashDispatcher, shall we solve it in the Merge? For example, waiting for all Ready chunks and merge them into a single bigger chunk.

@chenzl25
Copy link
Contributor Author

#15713

@st1page
Copy link
Contributor

st1page commented Jul 29, 2024

IIUC, this problem eixsts for all internal operators: any chunk that passes an Exchange will be splitted into smaller chunks. Saying, the parallelism = 128, then every chunk will be divided by 128 = 2 rows

I am thinking that, since the problem was caused by HashDispatcher, shall we solve it in the Merge? For example, waiting for all Ready chunks and merge them into a single bigger chunk.

It will hurt the latency so we need to make it configurable.

@chenzl25
Copy link
Contributor Author

I tested the PR and it works great. There is no back pressure in the sink anymore. Let's discuss whether to apply the merging broadly (like for all types of sink or even the merge executor).

@fuyufjh
Copy link
Member

fuyufjh commented Jul 30, 2024

I feel that discussion is less helpful here. I'd like to write code to implement the idea and run some benchmarks.

For example, waiting for all Ready chunks and merge them into a single bigger chunk.

It will hurt the latency so we need to make it configurable.

I think my proposal won't hurt the latency because it doesn't introduce additional wait.

@chenzl25
Copy link
Contributor Author

Let me add a histogram metric to monitor the input chunk rows for actor first to see in which workload, small chunks will appear. Because I think nexmark is not a good benchmark for this issue.

Copy link
Contributor

This issue has been open for 60 days with no activity.

If you think it is still relevant today, and needs to be done in the near future, you can comment to update the status, or just manually remove the no-issue-activity label.

You can also confidently close this issue as not planned to keep our backlog clean.
Don't worry if you think the issue is still valuable to continue in the future.
It's searchable and can be reopened when it's time. 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment