Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: share kafka client on meta #19058

Merged
merged 31 commits into from
Nov 2, 2024
Merged

feat: share kafka client on meta #19058

merged 31 commits into from
Nov 2, 2024

Conversation

tabVersion
Copy link
Contributor

@tabVersion tabVersion commented Oct 22, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

as title, reuse the client if the broker addr is the same, to reduce the conn to the broker

The key changes in this PR revolve around optimizing Kafka client management by introducing connection pooling. Here's the main changes:

Introduction of SHARED_KAFKA_CLIENT:

client.rs
// Added a shared cache for Kafka clients
pub static SHARED_KAFKA_CLIENT: LazyLock<MokaCache<KafkaConnection, Weak<KafkaClientType>>> =
    LazyLock::new(|| moka::future::Cache::builder().build());

The main motivations appear to be:

  1. Resource Optimization: Instead of creating new Kafka client connections for each enumerator, connections are now reused when possible through a shared cache.
  2. Memory Management: Uses Weak references to prevent memory leaks (when all related resources are dropped, drop the client then):
// Uses Arc and Weak to manage references
type KafkaClientType = BaseConsumer<RwConsumerContext>;
// Cache stores weak references to allow cleanup when clients are no longer needed
MokaCache<KafkaConnection, Weak<KafkaClientType>>

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Reusing Kafka client instance on Meta to save threads and reduce kafka broker usage.

streaming jobs sharing the same (these options mustd be the same -> same broker, same user, same auth method), can now share one Kafka client on meta, instead of building a new one.

pub struct KafkaConnection {
    #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
    pub brokers: String,

    /// Security protocol used for RisingWave to communicate with Kafka brokers. Could be
    /// PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL.
    #[serde(rename = "properties.security.protocol")]
    security_protocol: Option<String>,

    #[serde(rename = "properties.ssl.endpoint.identification.algorithm")]
    ssl_endpoint_identification_algorithm: Option<String>,

    // For the properties below, please refer to [librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for more information.
    /// Path to CA certificate file for verifying the broker's key.
    #[serde(rename = "properties.ssl.ca.location")]
    ssl_ca_location: Option<String>,

    /// CA certificate string (PEM format) for verifying the broker's key.
    #[serde(rename = "properties.ssl.ca.pem")]
    ssl_ca_pem: Option<String>,

    /// Path to client's certificate file (PEM).
    #[serde(rename = "properties.ssl.certificate.location")]
    ssl_certificate_location: Option<String>,

    /// Client's public key string (PEM format) used for authentication.
    #[serde(rename = "properties.ssl.certificate.pem")]
    ssl_certificate_pem: Option<String>,

    /// Path to client's private key file (PEM).
    #[serde(rename = "properties.ssl.key.location")]
    ssl_key_location: Option<String>,

    /// Client's private key string (PEM format) used for authentication.
    #[serde(rename = "properties.ssl.key.pem")]
    ssl_key_pem: Option<String>,

    /// Passphrase of client's private key.
    #[serde(rename = "properties.ssl.key.password")]
    ssl_key_password: Option<String>,

    /// SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM, GSSAPI, and AWS_MSK_IAM.
    #[serde(rename = "properties.sasl.mechanism")]
    sasl_mechanism: Option<String>,

    /// SASL username for SASL/PLAIN and SASL/SCRAM.
    #[serde(rename = "properties.sasl.username")]
    sasl_username: Option<String>,

    /// SASL password for SASL/PLAIN and SASL/SCRAM.
    #[serde(rename = "properties.sasl.password")]
    sasl_password: Option<String>,

    /// Kafka server's Kerberos principal name under SASL/GSSAPI, not including /hostname@REALM.
    #[serde(rename = "properties.sasl.kerberos.service.name")]
    sasl_kerberos_service_name: Option<String>,

    /// Path to client's Kerberos keytab file under SASL/GSSAPI.
    #[serde(rename = "properties.sasl.kerberos.keytab")]
    sasl_kerberos_keytab: Option<String>,

    /// Client's Kerberos principal name under SASL/GSSAPI.
    #[serde(rename = "properties.sasl.kerberos.principal")]
    sasl_kerberos_principal: Option<String>,

    /// Shell command to refresh or acquire the client's Kerberos ticket under SASL/GSSAPI.
    #[serde(rename = "properties.sasl.kerberos.kinit.cmd")]
    sasl_kerberos_kinit_cmd: Option<String>,

    /// Minimum time in milliseconds between key refresh attempts under SASL/GSSAPI.
    #[serde(rename = "properties.sasl.kerberos.min.time.before.relogin")]
    sasl_kerberos_min_time_before_relogin: Option<String>,

    /// Configurations for SASL/OAUTHBEARER.
    #[serde(rename = "properties.sasl.oauthbearer.config")]
    sasl_oathbearer_config: Option<String>,
}

Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
@hzxa21
Copy link
Collaborator

hzxa21 commented Oct 23, 2024

#18949

@tabVersion tabVersion marked this pull request as ready for review October 23, 2024 03:42
tabversion and others added 6 commits October 23, 2024 13:45
@graphite-app graphite-app bot requested a review from a team October 23, 2024 13:08
src/connector/src/source/kafka/enumerator/client.rs Outdated Show resolved Hide resolved
Comment on lines 120 to 121
// drop the guard and acquire a new one to avoid a 10s blocking call
drop(shared_client_guard);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this happen?

  1. Caller A tries to get a client for a connection, but cache missed, so it takes some time for calller A to build the client.
  2. During it, caller B tries to get a client for the same connection, but cache missed, so it also builds a client.
  3. Caller A built and inserted the client to the map and set ref_count = 1
  4. One second later, caller B also inserted the client to the map and set ref_count = 1, causing the client of caller A to leak and will never be dropped.

Copy link
Member

@fuyufjh fuyufjh Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to use moka to replace the HashMap

use moka::future::Cache;

Particularly, a caching structure should handle these concurrent gets correctly by letting the caller B blocks until caller A completes its operation and insert back the cached item i.e. the Kafka Client.

Copy link
Member

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also run some tests and describe the improvements in the PR description?

@tabVersion tabVersion requested a review from a team as a code owner October 25, 2024 09:38
@tabVersion tabVersion requested a review from xiangjinwu October 25, 2024 09:38
Ok(item_val) => {
let share_item = SharedKafkaItem {
client: item_val.client.clone(),
ref_count: item_val.ref_count - 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why do we need ref_count. Can we just use sth like get_with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then how do we manage the RdKafka client instance if all related sources are dropped? IIUC, if we remove the ref_count, the client instance will always be kept in memory, until restarting the meta node. There is no evict policy enabled for the cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned by Bugen, I think we can store Weak in the cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if Weak fits well with moka. We may also try dashmap or weak-table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BugenZhao Why do you think Weak might not work with moka? Actually I'm also thinking what's the difference between moka (cache) and dashmap (concurrent hashmap)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm also thinking what's the difference between moka (cache) and dashmap (concurrent hashmap)

I think moka is a dashmap with evict policy and guarantees the updates can be done atomically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think dashmap behaves similar to moka::sync::Cache with unlimited capacity. 🤔

Why do you think Weak might not work with moka?

Because the interface does not seem to be that compatible with storing a Weak, like, no auto-eviction, requires the caller to keep the strong reference:

 let mut client_arc: Option<Arc<KafkaClientType>> = None;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requires the caller to keep the strong reference

Isn't this expected usage? i.e., store Weak in the map, while store Arc in the caller. Otherwise who keeps the Arc? 👀

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean when inserting. 🤣 For example, weak-table allows you to pass a closure returning an Arc in insert_with while actually storing a Weak but returning an Arc back to the caller. With moka we need to temporarily hold the strong reference to prevent it from being deallocated.

@graphite-app graphite-app bot requested a review from a team October 28, 2024 08:59
@tabVersion
Copy link
Contributor Author

tabVersion commented Oct 28, 2024

I’m not entirely sure what specific tests you’re looking for.

At least sth like "Manually tested that num of threads is reduced from xxx to yyy for zzz Kafka sources."

Although the idea might be clear, the implementation is not that trivial, so we should verify it works.

Besides, the background of the problem should also be mentioned.

Tested locally

Kafka env

Metadata for test (from broker 0: 127.0.0.1:9092/0):
 1 brokers:
  broker 0 at 127.0.0.1:9092 (controller)
 1 topics:
  topic "test" with 1 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0

command:

for i in {0..100}; do  psql -h localhost -p 4566 -d dev -U root -c "create source s_$i (a int, b varchar) with (connector = 'kafka', topic = 'test', properties.bootstrap.server = '127.0.0.1:9092') format plain encode json ;" ; done

on main (a176ace): 1573 threads
on this pr (ac1d63d): 1272 threads
when system is idle: 767 threads

@xxchan
Copy link
Member

xxchan commented Oct 28, 2024

Would you mind testing Kafka with multiple brokers, where we might see a larger difference?

@tabVersion

This comment was marked as outdated.

@tabVersion
Copy link
Contributor Author

Would you mind testing Kafka with multiple brokers, where we might see a larger difference?

tested with confluent with multiple AZ

Metadata for all topics (from broker -1: sasl_ssl://pkc-p11xm.us-east-1.aws.confluent.cloud:9092/bootstrap):
 18 brokers:
  broker 0 at b0-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 1 at b1-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 2 at b2-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 3 at b3-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 4 at b4-pkc-p11xm.us-east-1.aws.confluent.cloud:9092 (controller)
  broker 5 at b5-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 6 at b6-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 7 at b7-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 8 at b8-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 9 at b9-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 10 at b10-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 11 at b11-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 12 at b12-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 13 at b13-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 14 at b14-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 15 at b15-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 16 at b16-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 17 at b17-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
 1 topics:
  topic "topic_0" with 6 partitions:
    partition 0, leader 12, replicas: 12,17,13, isrs: 12,17,13
    partition 1, leader 17, replicas: 17,13,15, isrs: 17,13,15
    partition 2, leader 13, replicas: 13,15,14, isrs: 13,15,14
    partition 3, leader 15, replicas: 15,14,16, isrs: 15,14,16
    partition 4, leader 14, replicas: 14,16,12, isrs: 14,16,12
    partition 5, leader 16, replicas: 16,12,17, isrs: 16,12,17

testing script

for i in {0..4}; do 
	psql  -h 127.0.0.1 -p 4566 -d dev -U root -c "create source s_$i(a int, b varchar) with (connector = 'kafka', topic = 'topic_0', properties.bootstrap.server = 'xxx.us-east-1.aws.confluent.cloud:9092', properties.security.protocol = 'SASL_SSL', properties.sasl.mechanism = 'PLAIN', properties.sasl.username = '[...]', properties.sasl.password = '[...]') format plain encode json;" ; 
done

EC2 idle: 71
EC2 with risingwave running: 195
on main (a176ace): 744
on this pr (ac1d63d): 654


why not test with more source?

creating 5 source at once seems the SDK's maximum, I got the error afterward:

Caused by these errors (recent errors listed first):         
  1: gRPC request to meta service failed: Internal error
  2: The cluster is recovering                               
  3: get error from control stream, in worker node 1
  4: gRPC request to stream service failed: Internal error
  5: recv actor failure                                                                                                                                                                                                                              
  6: Actor 175 exited unexpectedly                 
  7: Executor error                            
  8: Connector error                                                                                                      
  9: Kafka error                                                                                                          
 10: Meta data fetch error                                                                                                                                                                                                                           
 11: Resolve (Local: Host resolution failure)

a little weird but irrelevant to the issue.

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

src/meta/Cargo.toml Outdated Show resolved Hide resolved
@@ -71,7 +71,7 @@ jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = { workspace = true }
jsonwebtoken = "9.2.0"
maplit = "1.0.2"
moka = { version = "0.12.0", features = ["future"] }
moka = { version = "0.12.8", features = ["future"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we still use 0.12.0 in this PR, so that there's no changes in Cargo.lockand bumping of quanta can be reviewed in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid not. We are using and_try_compute_with, which is unavailable in v0.12.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you try 0.12.3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it works

Copy link
Member

@BugenZhao BugenZhao Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to checkout the Cargo.lock on the main branch to downgrade the locked version of quanta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

moka 0.12.3 still needs a higher version of quanta. I am afraid we have to bump quanta first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. 😢 In its repo the version 0.12.3 released before bumping dependency on quanta, but on crates.io it's the other way around.

Cargo.toml Outdated
@@ -343,7 +339,7 @@ opt-level = 2

[patch.crates-io]
# Patch third-party crates for deterministic simulation.
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
quanta = { git = "https://github.com/tabVersion/quanta.git", rev = "bb6c780894d06c0ec3f487d58c72920665b5cb0a" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may contribute to madsim-rs/quanta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's merge madsim-rs/quanta#2 first and we can switch back to madsim.

@tabVersion tabVersion added this pull request to the merge queue Nov 2, 2024
Merged via the queue into main with commit e7e4a2c Nov 2, 2024
30 of 31 checks passed
@tabVersion tabVersion deleted the tab/share-kafka-client-enum branch November 2, 2024 15:16
@tabVersion tabVersion restored the tab/share-kafka-client-enum branch November 5, 2024 15:15
@xxchan
Copy link
Member

xxchan commented Nov 14, 2024

Do we want to cherry-pick?

@tabVersion
Copy link
Contributor Author

Do we want to cherry-pick?

I guess Feature means no cherry-pick. We also need to add the refactor to doc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants