diff --git a/Cargo.lock b/Cargo.lock index eeb045f85a93f..afe3079f7601b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4104,11 +4104,21 @@ checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" dependencies = [ "futures-core", "futures-sink", - "nanorand", "pin-project", "spin 0.9.8", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -6389,15 +6399,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "034a0ad7deebf0c2abcf2435950a6666c3c15ea9d8fad0c0f48efa8a7f843fed" -[[package]] -name = "nanorand" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom", -] - [[package]] name = "native-tls" version = "0.2.11" @@ -9310,6 +9311,7 @@ dependencies = [ "rust_decimal", "rustls-native-certs 0.7.0", "rustls-pemfile 2.1.1", + "rustls-pki-types", "rw_futures_util", "serde", "serde_derive", @@ -9326,7 +9328,6 @@ dependencies = [ "time", "tokio-postgres", "tokio-retry", - "tokio-rustls 0.24.1", "tokio-stream", "tokio-util", "tracing", @@ -10491,20 +10492,20 @@ dependencies = [ [[package]] name = "rumqttc" -version = "0.22.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2433b134712bc17a6f85a35e06b901e6e8d0bb20b5367e1121e6fedc140c0ac" +checksum = "e1568e15fab2d546f940ed3a21f48bbbd1c494c90c99c4481339364a497f94a9" dependencies = [ "bytes", - "flume", - "futures", + "flume 0.11.0", + "futures-util", "log", - "rustls-native-certs 0.6.3", - "rustls-pemfile 1.0.4", - "rustls-webpki 0.100.3", + "rustls-native-certs 0.7.0", + "rustls-pemfile 2.1.1", + "rustls-webpki 0.102.2", "thiserror", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls 0.25.0", "url", ] @@ -10711,16 +10712,6 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ede67b28608b4c60685c7d54122d4400d90f62b40caee7700e700380a390fa8" -[[package]] -name = "rustls-webpki" -version = "0.100.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3" -dependencies = [ - "ring 0.16.20", - "untrusted 0.7.1", -] - [[package]] name = "rustls-webpki" version = "0.101.7" @@ -11987,7 +11978,7 @@ checksum = "be4c21bf34c7cae5b283efb3ac1bcc7670df7561124dc2f8bdc0b59be40f79a2" dependencies = [ "atoi", "chrono", - "flume", + "flume 0.10.14", "futures-channel", "futures-core", "futures-executor", @@ -14286,7 +14277,6 @@ dependencies = [ "either", "fail", "flate2", - "flume", "frunk_core", "futures", "futures-channel", @@ -14297,7 +14287,6 @@ dependencies = [ "futures-task", "futures-util", "generic-array", - "getrandom", "governor", "hashbrown 0.13.2", "hashbrown 0.14.3", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index c73a5cd7e910d..648061333b5e0 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -116,10 +116,11 @@ risingwave_common_estimate_size = { workspace = true } risingwave_jni_core = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } -rumqttc = { version = "0.22.0", features = ["url"] } +rumqttc = { version = "0.24.0", features = ["url"] } rust_decimal = "1" rustls-native-certs = "0.7" rustls-pemfile = "2" +rustls-pki-types = "1" rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" @@ -143,7 +144,6 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ ] } tokio-postgres = { version = "0.7", features = ["with-uuid-1"] } tokio-retry = "0.3" -tokio-rustls = "0.24" tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { workspace = true } diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 122383400f5cb..302b68dd664a1 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -687,7 +687,7 @@ impl NatsCommon { pub(crate) fn load_certs( certificates: &str, -) -> ConnectorResult> { +) -> ConnectorResult>> { let cert_bytes = if let Some(path) = certificates.strip_prefix("fs://") { std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())? } else { @@ -695,13 +695,13 @@ pub(crate) fn load_certs( }; rustls_pemfile::certs(&mut cert_bytes.as_slice()) - .map(|cert| Ok(tokio_rustls::rustls::Certificate(cert?.to_vec()))) + .map(|cert| Ok(cert?)) .collect() } pub(crate) fn load_private_key( certificate: &str, -) -> ConnectorResult { +) -> ConnectorResult> { let cert_bytes = if let Some(path) = certificate.strip_prefix("fs://") { std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())? } else { @@ -711,7 +711,5 @@ pub(crate) fn load_private_key( let cert = rustls_pemfile::pkcs8_private_keys(&mut cert_bytes.as_slice()) .next() .ok_or_else(|| anyhow!("No private key found"))?; - Ok(tokio_rustls::rustls::PrivateKey( - cert?.secret_pkcs8_der().to_vec(), - )) + Ok(cert?.into()) } diff --git a/src/connector/src/connector_common/mqtt_common.rs b/src/connector/src/connector_common/mqtt_common.rs index b771fd34143d3..e607decff58aa 100644 --- a/src/connector/src/connector_common/mqtt_common.rs +++ b/src/connector/src/connector_common/mqtt_common.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use rumqttc::tokio_rustls::rustls; use rumqttc::v5::mqttbytes::QoS; use rumqttc::v5::{AsyncClient, EventLoop, MqttOptions}; use serde_derive::Deserialize; @@ -141,26 +142,22 @@ impl MqttCommon { .unwrap_or(QoS::AtMostOnce) } - fn get_tls_config(&self) -> ConnectorResult { - let mut root_cert_store = tokio_rustls::rustls::RootCertStore::empty(); + fn get_tls_config(&self) -> ConnectorResult { + let mut root_cert_store = rustls::RootCertStore::empty(); if let Some(ca) = &self.ca { let certificates = load_certs(ca)?; for cert in certificates { - root_cert_store.add(&cert).unwrap(); + root_cert_store.add(cert).unwrap(); } } else { for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs") { - root_cert_store - .add(&tokio_rustls::rustls::Certificate(cert.to_vec())) - .unwrap(); + root_cert_store.add(cert).unwrap(); } } - let builder = tokio_rustls::rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_cert_store); + let builder = rustls::ClientConfig::builder().with_root_certificates(root_cert_store); let tls_config = if let (Some(client_cert), Some(client_key)) = (self.client_cert.as_ref(), self.client_key.as_ref()) diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index 17885931f58e6..925f90c20c967 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -58,7 +58,7 @@ def_anyhow_newtype! { redis::RedisError => "Redis error", arrow_schema::ArrowError => "Arrow error", google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error", - tokio_rustls::rustls::Error => "TLS error", + rumqttc::tokio_rustls::rustls::Error => "TLS error", rumqttc::v5::ClientError => "MQTT error", rumqttc::v5::OptionError => "MQTT error", diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 2605698d964bc..177ce846390dd 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -49,7 +49,6 @@ digest = { version = "0.10", features = ["mac", "oid", "std"] } either = { version = "1", features = ["serde"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } flate2 = { version = "1", features = ["zlib"] } -flume = { version = "0.10" } frunk_core = { version = "0.4", default-features = false, features = ["std"] } futures = { version = "0.3" } futures-channel = { version = "0.3", features = ["sink"] } @@ -60,7 +59,6 @@ futures-sink = { version = "0.3" } futures-task = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] } -getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae", default-features = false, features = ["js", "rdrand", "std"] } governor = { version = "0.6", default-features = false, features = ["dashmap", "jitter", "std"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } hashbrown-594e8ee84c453af0 = { package = "hashbrown", version = "0.13", features = ["raw"] } @@ -171,7 +169,6 @@ digest = { version = "0.10", features = ["mac", "oid", "std"] } either = { version = "1", features = ["serde"] } frunk_core = { version = "0.4", default-features = false, features = ["std"] } generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] } -getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae", default-features = false, features = ["js", "rdrand", "std"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] } itertools = { version = "0.11" }