-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: share kafka client on meta #19058
Changes from 30 commits
b9b2f79
ab46672
3373a58
c5203d6
3b6a6a2
3b3f725
968ed08
a41f3fc
6534ebf
7249e78
0024e1d
ad8b989
ae1b70a
58b5128
f115a0c
d128644
ae9df41
45295bc
a9e34c7
d27ab90
256485c
62cb953
725e23c
832f66f
73f0b7b
ac1d63d
35fb002
ec49096
16d8c42
b3efda6
51eca61
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,7 +71,7 @@ jni = { version = "0.21.1", features = ["invocation"] } | |
jsonbb = { workspace = true } | ||
jsonwebtoken = "9.2.0" | ||
maplit = "1.0.2" | ||
moka = { version = "0.12.0", features = ["future"] } | ||
moka = { version = "0.12.8", features = ["future"] } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we still use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am afraid not. We are using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you try There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it works There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may want to checkout the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right. 😢 In its repo the version |
||
mongodb = { version = "2.8.2", features = ["tokio-runtime"] } | ||
mysql_async = { version = "0.34", default-features = false, features = [ | ||
"default", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
// limitations under the License. | ||
|
||
use std::collections::BTreeMap; | ||
use std::hash::Hash; | ||
use std::io::Write; | ||
use std::time::Duration; | ||
|
||
|
@@ -61,7 +62,7 @@ use aws_types::SdkConfig; | |
use risingwave_common::util::env_var::env_var_is_true; | ||
|
||
/// A flatten config map for aws auth. | ||
#[derive(Deserialize, Debug, Clone, WithOptions)] | ||
#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)] | ||
pub struct AwsAuthProps { | ||
#[serde(rename = "aws.region", alias = "region")] | ||
pub region: Option<String>, | ||
|
@@ -161,21 +162,11 @@ impl AwsAuthProps { | |
} | ||
|
||
#[serde_as] | ||
#[derive(Debug, Clone, Deserialize, WithOptions)] | ||
pub struct KafkaCommon { | ||
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] | ||
pub struct KafkaConnection { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems we will be able to use this same struct for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. |
||
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] | ||
pub brokers: String, | ||
|
||
#[serde(rename = "topic", alias = "kafka.topic")] | ||
pub topic: String, | ||
|
||
#[serde( | ||
rename = "properties.sync.call.timeout", | ||
deserialize_with = "deserialize_duration_from_string", | ||
default = "default_kafka_sync_call_timeout" | ||
)] | ||
pub sync_call_timeout: Duration, | ||
|
||
/// Security protocol used for RisingWave to communicate with Kafka brokers. Could be | ||
/// PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. | ||
#[serde(rename = "properties.security.protocol")] | ||
|
@@ -252,6 +243,20 @@ pub struct KafkaCommon { | |
|
||
#[serde_as] | ||
#[derive(Debug, Clone, Deserialize, WithOptions)] | ||
pub struct KafkaCommon { | ||
#[serde(rename = "topic", alias = "kafka.topic")] | ||
pub topic: String, | ||
|
||
#[serde( | ||
rename = "properties.sync.call.timeout", | ||
deserialize_with = "deserialize_duration_from_string", | ||
default = "default_kafka_sync_call_timeout" | ||
)] | ||
pub sync_call_timeout: Duration, | ||
} | ||
|
||
#[serde_as] | ||
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)] | ||
pub struct KafkaPrivateLinkCommon { | ||
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. | ||
#[serde(rename = "broker.rewrite.endpoints")] | ||
|
@@ -269,7 +274,7 @@ pub struct RdKafkaPropertiesCommon { | |
/// Maximum Kafka protocol request message size. Due to differing framing overhead between | ||
/// protocol versions the producer is unable to reliably enforce a strict max message limit at | ||
/// produce time and may exceed the maximum size by one message in protocol ProduceRequests, | ||
/// the broker will enforce the the topic's max.message.bytes limit | ||
/// the broker will enforce the topic's max.message.bytes limit | ||
#[serde(rename = "properties.message.max.bytes")] | ||
#[serde_as(as = "Option<DisplayFromStr>")] | ||
pub message_max_bytes: Option<usize>, | ||
|
@@ -316,7 +321,7 @@ impl RdKafkaPropertiesCommon { | |
} | ||
} | ||
|
||
impl KafkaCommon { | ||
impl KafkaConnection { | ||
pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) { | ||
// AWS_MSK_IAM | ||
if self.is_aws_msk_iam() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may contribute to
madsim-rs/quanta
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's merge madsim-rs/quanta#2 first and we can switch back to madsim.