-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Meta node can only handle ~600 Kafka sources. #18949
Comments
According to confluentinc/librdkafka#1600, it seems each kafka consumer will create ~ In Meta, currently each source has a However, I'm concerned that after fixing Meta, Compute nodes still cannot handle it, since it will have even more Kafka consumers (multiply by parallelism). I'm feeling 50 brokers might be too large. Is it possible for you to divide it into multiple smaller clusters with fewer brokers? 🤔 |
Thank you for the detailed response! A few follow-up questions:
No. I'm afraid that is not possible. |
I think we haven't optimized it just because your scale is quite uncommon.
This seems to be the Kafka library's limitation. Even just for fetching metadata, we need to create consumers. And the consumers will immediately connect to all brokers (instead of only the bootstrap server)
As I mentioned above, implementing this in meta should be quite feasible. However, sharing consumers in compute node could be much more difficult. I found the Kafka library does indeed have an API for such usage: split_partition_queue. But currently we share nothing between different sources, and difference parallelisms (actors) of a source. To share consumers, it may require a quite large change on the implementation. Besides, the We are also not sure about the performance implications of the change. So it requires investigation and testing if we want to go with this approach. |
Hi @xxchan do you mean if I had a Kafka cluster with 3 brokers and a source with 48 parallelisms, there'd be 144 (48*3) threads and connections within the CN node? That sounds crazy 😅. |
Yes. More precisely, the parallelisms is number of partitions of the Kafka topics, not necessarily RisingWave actors. Edit: should be |
There's an alternative solution besides the refactoring mentioned above: Use a pure async Rust Kafka library, e.g., rskafka. Then there will be no extra threads at all. All the threads will be managed by tokio. Possible drawbacks:
|
Is it feasible to let the user switch between the async Rust or librdkafka libraries of Kafka source under different use cases? |
Yes. My rough idea is to introduce This is just my idea anyway. We will need to see others’ opinions to decide which way to go. |
If that is the case, why not close the consumer after fetching the metadata?
I agree that benchmarking is necessary, but intuitively, I would expect sharing consumers to be beneficial for much fewer than 600 sources. For example, with just 4 sources and 50 brokers, you're already dealing with over 200 OS threads, leading to significant context switching and other overhead. I suspect that this negatively impacts performance at scales far smaller (fewer brokers and sources) than what we are currently dealing with.
This seems like a promising option. Have you also considered https://github.com/fede1024/rust-rdkafka? It seems to offer helpful abstractions for making things asynchronous and reducing thread overhead.
I don’t believe it’s intended to create a consumer for every topic with librdkafka. I think that the overhead from the OS threads would degrade performance rather quickly. |
We need to periodically fetch metadata for sources on meta to detect whether there's split change (scaling) happens on the source, so the consumers are kept. Closing and re-establish connection sounds not good, but may also worth considering due to the problem now.
I do not doubt things can be a lot different at your scale. We will mainly need to ensure there's no regression for most users with much smaller scale.
This is exactly the library we are using. It doesn't solve the problem. |
confluentinc/librdkafka#825 (comment) I think it is hard for us to hack librdkafka to support "sparse" thread. Given that librdkafka community mentioned that "The non-connected threads will be mostly idle though.", I wonder whether it is feasible to tune the maximum thread count to mitigate the issue when you have a use case that involves a big kafka cluster with many kafka sources: https://stackoverflow.com/questions/344203/maximum-number-of-threads-per-process-in-linux |
I don't really want to bet on https://github.com/influxdata/rskafka. It's easy to start a PoC version to support Kafka protocol, but things will become much difficult when putting it into production, which may require 100% alignment with the official protocol. The subtitle of Technically, the Kafka client can subscribe to multiple topics and partitions, which means we possibly reuse one Kafka client for all Kafka source actors. This sounds difficult for our current design because each sources are supposed to have their own client properties. However, it seems much less difficult for the Meta service to reuse Kafka client. Worth to take an attempt, I think. |
+1. I think rskafka is a minimal client version meeting InfluxDB's requirements but not for general usage. It lacks a variety of SSL support and the fetch queue implementation. In the long term, we need to do much work to make it stable for RisingWave users. And I doubt the ROI on this. |
It seems each thread has a stable overhead (e.i. stack, TLS, ...) even if most of them are idle. Is it possible to reduce the number of topics by routing messages through a shared kafka source? For example, sending all kinds of messages to one topic and consuming this topic using a shared source, then creating specific mviews or sinks on top of this shared source by filtering the message types. Thus, only num_brokers * min(num_partitions, num_actors) threads are necessary to create. But cdc sources seem not to fit in this approach due to the one topic one table fashion. |
Yes, we have such a design and you can see the progress here #16003 (ready to public preview soon), thanks to @xxchan |
Describe the bug
I attempted to create 700 Kafka Plain Avro sources. However, after successfully creating approximately 600 sources, the meta-node becomes unresponsive, and I encounter the following error:
Additionally, when I shell into the meta-node and try to execute any commands, I get:
To investigate, I ran the command
(ps -eLf | wc -l)
to monitor the number of OS threads while creating the sources. The thread count reached 32,769 after I had created 629 sources, at which point I could no longer execute commands—likely due to the OS exhausting its available resources for creating threads.This behavior seems like a bug. I would expect the number of threads to be significantly lower than the number of sources. Even if there was only one OS thread per source, while inefficient, it would still fit within our use case requirements, but we are seeing more than 50 OS threads per source. It’s also worth noting that none of the sources were being actively used at any time.
Error message/log
To Reproduce
Expected behavior
I expect the sources to be created and the Meta node to not be unresponsive and use many resources.
How did you deploy RisingWave?
More or less like this:
https://github.com/risingwavelabs/risingwave-operator/blob/main/docs/manifests/risingwave/risingwave-postgresql-s3.yaml
But higher limits
The version of RisingWave
PostgreSQL 13.14.0-RisingWave-2.0.1 (0d15632)
Additional context
No response
The text was updated successfully, but these errors were encountered: