-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: share kafka client on meta #19058
Conversation
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabversion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
…ngwavelabs/risingwave into tab/share-kafka-client-enum
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
// drop the guard and acquire a new one to avoid a 10s blocking call | ||
drop(shared_client_guard); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this happen?
- Caller A tries to get a client for a connection, but cache missed, so it takes some time for calller A to build the client.
- During it, caller B tries to get a client for the same connection, but cache missed, so it also builds a client.
- Caller A built and inserted the client to the map and set ref_count = 1
- One second later, caller B also inserted the client to the map and set ref_count = 1, causing the client of caller A to leak and will never be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend to use moka
to replace the HashMap
use moka::future::Cache;
Particularly, a caching structure should handle these concurrent gets correctly by letting the caller B blocks until caller A completes its operation and insert back the cached item i.e. the Kafka Client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also run some tests and describe the improvements in the PR description?
Ok(item_val) => { | ||
let share_item = SharedKafkaItem { | ||
client: item_val.client.clone(), | ||
ref_count: item_val.ref_count - 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get why do we need ref_count
. Can we just use sth like get_with
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then how do we manage the RdKafka client instance if all related sources are dropped? IIUC, if we remove the ref_count, the client instance will always be kept in memory, until restarting the meta node. There is no evict policy enabled for the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned by Bugen, I think we can store Weak
in the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if Weak
fits well with moka
. We may also try dashmap
or weak-table
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BugenZhao Why do you think Weak
might not work with moka
? Actually I'm also thinking what's the difference between moka
(cache) and dashmap
(concurrent hashmap)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'm also thinking what's the difference between moka (cache) and dashmap (concurrent hashmap)
I think moka is a dashmap with evict policy and guarantees the updates can be done atomically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think dashmap
behaves similar to moka::sync::Cache
with unlimited capacity. 🤔
Why do you think
Weak
might not work withmoka
?
Because the interface does not seem to be that compatible with storing a Weak
, like, no auto-eviction, requires the caller to keep the strong reference:
let mut client_arc: Option<Arc<KafkaClientType>> = None;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requires the caller to keep the strong reference
Isn't this expected usage? i.e., store Weak
in the map, while store Arc
in the caller. Otherwise who keeps the Arc
? 👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean when inserting. 🤣 For example, weak-table
allows you to pass a closure returning an Arc
in insert_with
while actually storing a Weak
but returning an Arc
back to the caller. With moka
we need to temporarily hold the strong reference to prevent it from being deallocated.
Tested locally Kafka env
command:
on main (a176ace): 1573 threads |
Would you mind testing Kafka with multiple brokers, where we might see a larger difference? |
This comment was marked as outdated.
This comment was marked as outdated.
tested with confluent with multiple AZ
testing script
EC2 idle: 71 why not test with more source? creating 5 source at once seems the SDK's maximum, I got the error afterward:
a little weird but irrelevant to the issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
@@ -71,7 +71,7 @@ jni = { version = "0.21.1", features = ["invocation"] } | |||
jsonbb = { workspace = true } | |||
jsonwebtoken = "9.2.0" | |||
maplit = "1.0.2" | |||
moka = { version = "0.12.0", features = ["future"] } | |||
moka = { version = "0.12.8", features = ["future"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we still use 0.12.0
in this PR, so that there's no changes in Cargo.lock
and bumping of quanta
can be reviewed in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid not. We are using and_try_compute_with
, which is unavailable in v0.12.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you try 0.12.3
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to checkout the Cargo.lock
on the main branch to downgrade the locked version of quanta
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. 😢 In its repo the version 0.12.3
released before bumping dependency on quanta
, but on crates.io it's the other way around.
Cargo.toml
Outdated
@@ -343,7 +339,7 @@ opt-level = 2 | |||
|
|||
[patch.crates-io] | |||
# Patch third-party crates for deterministic simulation. | |||
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" } | |||
quanta = { git = "https://github.com/tabVersion/quanta.git", rev = "bb6c780894d06c0ec3f487d58c72920665b5cb0a" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may contribute to madsim-rs/quanta
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's merge madsim-rs/quanta#2 first and we can switch back to madsim.
Do we want to cherry-pick? |
I guess |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
as title, reuse the client if the broker addr is the same, to reduce the conn to the broker
The key changes in this PR revolve around optimizing Kafka client management by introducing connection pooling. Here's the main changes:
Introduction of SHARED_KAFKA_CLIENT:
The main motivations appear to be:
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
Reusing Kafka client instance on Meta to save threads and reduce kafka broker usage.
streaming jobs sharing the same (these options mustd be the same -> same broker, same user, same auth method), can now share one Kafka client on meta, instead of building a new one.