From a1669fcdb368637bf0fddb4559ebf9056938e3c8 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 28 Feb 2023 13:52:23 +0800 Subject: [PATCH 1/6] rely on pulsar api for enumerator Signed-off-by: tabVersion --- Cargo.lock | 1 + src/connector/Cargo.toml | 2 +- .../src/source/pulsar/admin/client.rs | 161 ------------------ src/connector/src/source/pulsar/admin/mod.rs | 17 -- .../src/source/pulsar/enumerator/client.rs | 149 +++------------- src/connector/src/source/pulsar/mod.rs | 4 - .../src/source/pulsar/source/reader.rs | 24 +++ 7 files changed, 46 insertions(+), 312 deletions(-) delete mode 100644 src/connector/src/source/pulsar/admin/client.rs delete mode 100644 src/connector/src/source/pulsar/admin/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 659fd13f3376f..977198cd9679a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5127,6 +5127,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-util", + "tracing", "url", "uuid", ] diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 2ac67f4d68c18..f61a634bdc3dc 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -47,7 +47,7 @@ prometheus = { version = "0.13", features = ["process"] } prost = { version = "0.11.0", features = ["no-recursion-limit"] } prost-reflect = "0.9.2" protobuf-native = "0.2.1" -pulsar = { version = "4.2", default-features = false, features = ["tokio-runtime"] } +pulsar = { version = "4.2", default-features = false, features = ["tokio-runtime", "telemetry"] } rdkafka = { package = "madsim-rdkafka", version = "=0.2.14-alpha", features = ["cmake-build", "ssl-vendored", "gssapi"] } reqwest = { version = "0.11", features = ["json"] } risingwave_common = { path = "../common" } diff --git a/src/connector/src/source/pulsar/admin/client.rs b/src/connector/src/source/pulsar/admin/client.rs deleted file mode 100644 index ba024efdb9625..0000000000000 --- a/src/connector/src/source/pulsar/admin/client.rs +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use anyhow::{anyhow, bail, Result}; -use http::{Response, StatusCode}; -use hyper::body::Buf; -use hyper::{Body, Client, Uri}; -use hyper_tls::HttpsConnector; -use serde_derive::{Deserialize, Serialize}; - -use crate::source::pulsar::topic::Topic; - -#[derive(Debug, Default)] -pub struct PulsarAdminClient { - pub(crate) base_path: String, - pub(crate) auth_token: Option, -} - -impl PulsarAdminClient { - pub fn new(base_path: String, auth_token: Option) -> Self { - Self { - base_path: base_path.trim_end_matches('/').to_string(), - auth_token, - } - } -} - -impl PulsarAdminClient { - pub async fn get_last_message_id(&self, topic: &Topic) -> Result { - self.get(topic, "lastMessageId").await - } - - pub async fn get_topic_metadata(&self, topic: &Topic) -> Result { - let res = self.http_get(topic, "partitions").await?; - - if res.status() == StatusCode::NOT_FOUND { - bail!( - "could not find metadata for pulsar topic {}", - topic.to_string() - ); - } - - let body = hyper::body::aggregate(res).await?; - serde_json::from_reader(body.reader()).map_err(|e| anyhow!(e)) - } - - pub async fn http_get(&self, topic: &Topic, api: &str) -> Result> { - let client = Client::builder().build::<_, hyper::Body>(HttpsConnector::new()); - - let url = format!( - "{}/{}/{}/{}", - self.base_path, - "admin/v2", - topic.rest_path(), - api - ); - let mut req = hyper::Request::builder() - .method("GET") - .uri(url.parse::()?) - .body(Body::empty()) - .unwrap(); - - if let Some(auth_token) = &self.auth_token { - req.headers_mut() - .insert("Authorization", auth_token.to_string().parse().unwrap()); - } - - client.request(req).await.map_err(|e| anyhow!(e)) - } - - pub async fn get(&self, topic: &Topic, api: &str) -> Result - where - T: for<'a> serde::Deserialize<'a>, - { - let res = self.http_get(topic, api).await?; - let body = hyper::body::aggregate(res).await?; - let result: T = serde_json::from_reader(body.reader())?; - Ok(result) - } -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct LastMessageId { - pub ledger_id: i64, - pub entry_id: i64, - pub partition_index: i64, - pub batch_index: Option, - pub batch_size: Option, - pub acker: Option, - pub outstanding_acks_in_same_batch: Option, -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct LastMessageIdAcker { - pub batch_size: Option, - pub prev_batch_cumulatively_acked: Option, - pub outstanding_acks: Option, - pub bit_set_size: Option, -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PartitionedTopicMetadata { - pub partitions: i64, -} - -#[cfg(test)] -mod test { - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use crate::source::pulsar::admin::client::PulsarAdminClient; - use crate::source::pulsar::topic::parse_topic; - - async fn mock_server(web_path: &str, body: &str) -> MockServer { - let mock_server = MockServer::start().await; - use wiremock::matchers::{method, path}; - - let response = ResponseTemplate::new(200) - .set_body_string(body) - .append_header("content-type", "application/json"); - - Mock::given(method("GET")) - .and(path(web_path)) - .respond_with(response) - .mount(&mock_server) - .await; - - mock_server - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // MockServer is not supported in simulation. - async fn test_get_topic_metadata() { - let server = mock_server( - "/admin/v2/persistent/public/default/t2/partitions", - "{\"partitions\":3}", - ) - .await; - - let client = PulsarAdminClient::new(server.uri(), None); - - let topic = parse_topic("public/default/t2").unwrap(); - - let meta = client.get_topic_metadata(&topic).await.unwrap(); - - assert_eq!(meta.partitions, 3); - } -} diff --git a/src/connector/src/source/pulsar/admin/mod.rs b/src/connector/src/source/pulsar/admin/mod.rs deleted file mode 100644 index ea9e0a396a09d..0000000000000 --- a/src/connector/src/source/pulsar/admin/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub use client::*; - -mod client; diff --git a/src/connector/src/source/pulsar/enumerator/client.rs b/src/connector/src/source/pulsar/enumerator/client.rs index 49f2dad2a3a64..652b91003b48b 100644 --- a/src/connector/src/source/pulsar/enumerator/client.rs +++ b/src/connector/src/source/pulsar/enumerator/client.rs @@ -15,16 +15,16 @@ use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use itertools::Itertools; +use pulsar::{Authentication, Pulsar, TokioExecutor}; use serde::{Deserialize, Serialize}; -use crate::source::pulsar::admin::PulsarAdminClient; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::topic::{parse_topic, Topic}; use crate::source::pulsar::PulsarProperties; use crate::source::SplitEnumerator; pub struct PulsarSplitEnumerator { - admin_client: PulsarAdminClient, + client: Pulsar, topic: Topic, start_offset: PulsarEnumeratorOffset, } @@ -44,7 +44,6 @@ impl SplitEnumerator for PulsarSplitEnumerator { async fn new(properties: PulsarProperties) -> Result { let topic = properties.topic; - let admin_url = properties.admin_url; let parsed_topic = parse_topic(&topic)?; let mut scan_start_offset = match properties @@ -67,8 +66,18 @@ impl SplitEnumerator for PulsarSplitEnumerator { scan_start_offset = PulsarEnumeratorOffset::Timestamp(time_offset) } + let mut pulsar_builder = Pulsar::builder(properties.service_url, TokioExecutor); + if let Some(auth_token) = properties.auth_token { + pulsar_builder = pulsar_builder.with_auth(Authentication { + name: "token".to_string(), + data: Vec::from(auth_token), + }); + } + + let pulsar = pulsar_builder.build().await.map_err(|e| anyhow!(e))?; + Ok(PulsarSplitEnumerator { - admin_client: PulsarAdminClient::new(admin_url, properties.auth_token), + client: pulsar, topic: parsed_topic, start_offset: scan_start_offset, }) @@ -79,19 +88,15 @@ impl SplitEnumerator for PulsarSplitEnumerator { // MessageId is only used when recovering from a State assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_))); - let topic_metadata = self.admin_client.get_topic_metadata(&self.topic).await?; - // note: may check topic exists by get stats - if topic_metadata.partitions < 0 { - bail!( - "illegal metadata {:?} for pulsar topic {}", - topic_metadata.partitions, - self.topic.to_string() - ); - } + let topic_metadata = self + .client + .lookup_partitioned_topic_number(&self.topic.to_string()) + .await + .map_err(|e| anyhow!(e))?; - let splits = if topic_metadata.partitions > 0 { + let splits = if topic_metadata > 0 { // partitioned topic - (0..topic_metadata.partitions as i32) + (0..topic_metadata as i32) .map(|p| PulsarSplit { topic: self.topic.sub_topic(p).unwrap(), start_offset: offset.clone(), @@ -108,117 +113,3 @@ impl SplitEnumerator for PulsarSplitEnumerator { Ok(splits) } } - -#[cfg(test)] -mod test { - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties, PulsarSplitEnumerator}; - use crate::source::SplitEnumerator; - - async fn empty_mock_server() -> MockServer { - MockServer::start().await - } - - pub async fn mock_server(web_path: &str, body: &str) -> MockServer { - let mock_server = MockServer::start().await; - use wiremock::matchers::{method, path}; - - let response = ResponseTemplate::new(200) - .set_body_string(body) - .append_header("content-type", "application/json"); - - Mock::given(method("GET")) - .and(path(web_path)) - .respond_with(response) - .mount(&mock_server) - .await; - - mock_server - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // MockServer is not supported in simulation. - async fn test_list_splits_on_no_existing_pulsar() { - let prop = PulsarProperties { - topic: "t".to_string(), - admin_url: "http://test_illegal_url:8000".to_string(), - service_url: "pulsar://localhost:6650".to_string(), - scan_startup_mode: Some("earliest".to_string()), - time_offset: None, - auth_token: None, - }; - let mut enumerator = PulsarSplitEnumerator::new(prop).await.unwrap(); - assert!(enumerator.list_splits().await.is_err()); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // MockServer is not supported in simulation. - async fn test_list_on_no_existing_topic() { - let server = empty_mock_server().await; - - let prop = PulsarProperties { - topic: "t".to_string(), - admin_url: server.uri(), - service_url: "pulsar://localhost:6650".to_string(), - scan_startup_mode: Some("earliest".to_string()), - time_offset: None, - auth_token: None, - }; - let mut enumerator = PulsarSplitEnumerator::new(prop).await.unwrap(); - assert!(enumerator.list_splits().await.is_err()); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // MockServer is not supported in simulation. - async fn test_list_splits_with_partitioned_topic() { - let server = mock_server( - "/admin/v2/persistent/public/default/t/partitions", - "{\"partitions\":3}", - ) - .await; - - let prop = PulsarProperties { - topic: "t".to_string(), - admin_url: server.uri(), - service_url: "pulsar://localhost:6650".to_string(), - scan_startup_mode: Some("earliest".to_string()), - time_offset: None, - auth_token: None, - }; - let mut enumerator = PulsarSplitEnumerator::new(prop).await.unwrap(); - - let splits = enumerator.list_splits().await.unwrap(); - assert_eq!(splits.len(), 3); - - (0..3).for_each(|i| { - assert_eq!(splits[i].start_offset, PulsarEnumeratorOffset::Earliest); - assert_eq!(splits[i].topic.partition_index, Some(i as i32)); - }); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // MockServer is not supported in simulation. - async fn test_list_splits_with_non_partitioned_topic() { - let server = mock_server( - "/admin/v2/persistent/public/default/t/partitions", - "{\"partitions\":0}", - ) - .await; - - let prop = PulsarProperties { - topic: "t".to_string(), - admin_url: server.uri(), - service_url: "pulsar://localhost:6650".to_string(), - scan_startup_mode: Some("earliest".to_string()), - time_offset: None, - auth_token: None, - }; - let mut enumerator = PulsarSplitEnumerator::new(prop).await.unwrap(); - - let splits = enumerator.list_splits().await.unwrap(); - assert_eq!(splits.len(), 1); - assert_eq!(splits[0].start_offset, PulsarEnumeratorOffset::Earliest); - assert_eq!(splits[0].topic.partition_index, None); - } -} diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 92233686901f1..1698d0bc3010b 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod admin; pub mod enumerator; pub mod source; pub mod split; @@ -29,9 +28,6 @@ pub struct PulsarProperties { #[serde(rename = "topic", alias = "pulsar.topic")] pub topic: String, - #[serde(rename = "admin.url", alias = "pulsar.admin.url")] - pub admin_url: String, - #[serde(rename = "service.url", alias = "pulsar.service.url")] pub service_url: String, diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 25eab2daaadc9..be28c8574b7a5 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -183,3 +183,27 @@ impl PulsarSplitReader { } } } + +#[tokio::test] +async fn test() { + let service_url = "pulsar+ssl://pulsar-gcp-useast1.streaming.datastax.com:6651"; + let topic = "non-persistent://meetup/default/demo2"; + let auth_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2NzgwOTUyMTIsImlhdCI6MTY3NzQ5MDQxMiwiaXNzIjoiZGF0YXN0YXgiLCJzdWIiOiJjbGllbnQ7Yjg5Nzg2YTctMGI3Mi00ZWIwLWIxZWItNzA4MTlhODQ2YWQzO2JXVmxkSFZ3OzJkZWI3ODc2MjAiLCJ0b2tlbmlkIjoiMmRlYjc4NzYyMCJ9.i4kokLAcowtNZAjlVCVu2ACwTNzZDkWEfnwvulCtrIFRKHMdZHo-M9xUS7wiD_tugZ5MxVdV1xDrAb3OX2Z0_9e-Qd_HdktNuBv0lCLIXvFNGe9JgtzL0fgXtsE_NlRB3q77ltIfnQsI16OUK6JMPmM7udvoqkq8XNVq5DmFsOO470ans4TRJEu9WhOko7sihWz2P1y9QpPIIaM__iVJ7N03_-SuX-Du-UPfOPc7nfBTxnldNwXmNXTdD8gXmuvZ5RvnK2bx_7Zf03LQjj5crcw5R4e70oA4y-BTAdH6rxCl0urkLJAZh0DW9Ewe3EQp1vVJDS2OtCkNnEyOpnqY7w"; + + let mut pulsar_builder = Pulsar::builder(service_url, TokioExecutor); + pulsar_builder = pulsar_builder.with_auth(Authentication { + name: "token".to_string(), + data: Vec::from(auth_token), + }); + + let pulsar = pulsar_builder + .build() + .await + .map_err(|e| anyhow!(e)) + .unwrap(); + + println!( + "{:?}", + pulsar.lookup_partitioned_topic_number(topic).await.unwrap() + ); +} From 6959b46badcdf32f9787c6702fd10d2cedf48e59 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 28 Feb 2023 15:54:37 +0800 Subject: [PATCH 2/6] oauth Signed-off-by: tabVersion --- Cargo.lock | 169 ++++++++++++++++-- src/connector/Cargo.toml | 2 +- src/connector/src/source/pulsar/mod.rs | 52 ++++++ .../src/source/pulsar/source/reader.rs | 36 +++- src/workspace-hack/Cargo.toml | 10 +- 5 files changed, 244 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf0700e5ae205..c646cacf0273b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1869,14 +1869,38 @@ dependencies = [ "syn", ] +[[package]] +name = "darling" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" +dependencies = [ + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + [[package]] name = "darling" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0808e1bd8671fb44a113a14e13497557533369847788fa2ae912b6ebfce9fa8" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.14.3", + "darling_macro 0.14.3", +] + +[[package]] +name = "darling_core" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", ] [[package]] @@ -1893,13 +1917,24 @@ dependencies = [ "syn", ] +[[package]] +name = "darling_macro" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" +dependencies = [ + "darling_core 0.13.4", + "quote", + "syn", +] + [[package]] name = "darling_macro" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b36230598a2d5de7ec1c6f51f72d8a99a9208daff41de2084d06e3fd3ea56685" dependencies = [ - "darling_core", + "darling_core 0.14.3", "quote", "syn", ] @@ -1932,6 +1967,12 @@ dependencies = [ "parking_lot_core 0.9.7", ] +[[package]] +name = "data-url" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d7439c3735f405729d52c3fbbe4de140eaf938a1fe47d227c27f8254d4302a5" + [[package]] name = "deadpool" version = "0.9.5" @@ -2563,8 +2604,10 @@ version = "0.2.7" source = "git+https://github.com/madsim-rs/getrandom.git?rev=cc95ee3#cc95ee36a2ae473edb01fcdcf34da3f2dcfc4b2f" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -3542,7 +3585,7 @@ dependencies = [ "http", "madsim", "serde", - "serde_with", + "serde_with 2.2.0", "spin 0.9.5", "thiserror", "tokio", @@ -3557,7 +3600,7 @@ version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" dependencies = [ - "darling", + "darling 0.14.3", "proc-macro2", "quote", "syn", @@ -4106,6 +4149,26 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" +[[package]] +name = "oauth2" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeaf26a72311c087f8c5ba617c96fac67a5c04f430e716ac8d8ab2de62e23368" +dependencies = [ + "base64 0.13.1", + "chrono", + "getrandom 0.2.7", + "http", + "rand 0.8.5", + "reqwest", + "serde", + "serde_json", + "serde_path_to_error", + "sha2", + "thiserror", + "url", +] + [[package]] name = "object" version = "0.30.3" @@ -4160,6 +4223,33 @@ dependencies = [ "uuid", ] +[[package]] +name = "openidconnect" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a0f47b0f1499d08c4a8480c963d49c5ec77f4249c2b6869780979415f45809" +dependencies = [ + "base64 0.13.1", + "chrono", + "http", + "itertools", + "log", + "num-bigint", + "oauth2", + "rand 0.8.5", + "ring", + "serde", + "serde-value", + "serde_derive", + "serde_json", + "serde_path_to_error", + "serde_plain", + "serde_with 1.14.0", + "subtle", + "thiserror", + "url", +] + [[package]] name = "openssl" version = "0.10.45" @@ -4285,6 +4375,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.4.3" @@ -5119,18 +5218,23 @@ dependencies = [ "bytes", "chrono", "crc 3.0.1", + "data-url", "futures", "futures-io", "futures-timer", "log", "native-tls", "nom", + "oauth2", + "openidconnect", "pem", "prost 0.11.8", "prost-build", "prost-derive 0.11.8", "rand 0.8.5", "regex", + "serde", + "serde_json", "tokio", "tokio-native-tls", "tokio-util", @@ -5465,6 +5569,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", + "webpki-roots", "winreg", ] @@ -5519,7 +5624,7 @@ dependencies = [ "regex", "serde", "serde_json", - "serde_with", + "serde_with 2.2.0", "serde_yaml", "tempfile", "workspace-hack", @@ -5909,7 +6014,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "serde_with", + "serde_with 2.2.0", "simd-json", "tempfile", "thiserror", @@ -6224,7 +6329,7 @@ dependencies = [ "risingwave_frontend", "risingwave_sqlparser", "serde", - "serde_with", + "serde_with 2.2.0", "serde_yaml", "tempfile", "walkdir", @@ -6805,6 +6910,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float 2.10.0", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.152" @@ -6836,6 +6951,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_plain" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6018081315db179d0ce57b1fe4b62a12a0028c9cf9bbef868c9cf477b3c34ae" +dependencies = [ + "serde", +] + [[package]] name = "serde_qs" version = "0.8.5" @@ -6868,6 +6992,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" +dependencies = [ + "serde", + "serde_with_macros 1.5.2", +] + [[package]] name = "serde_with" version = "2.2.0" @@ -6880,17 +7014,29 @@ dependencies = [ "indexmap", "serde", "serde_json", - "serde_with_macros", + "serde_with_macros 2.2.0", "time 0.3.17", ] +[[package]] +name = "serde_with_macros" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" +dependencies = [ + "darling 0.13.4", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_with_macros" version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1966009f3c05f095697c537312f5415d1e3ed31ce0a56942bac4c771c5c335e" dependencies = [ - "darling", + "darling 0.14.3", "proc-macro2", "quote", "syn", @@ -7463,7 +7609,7 @@ dependencies = [ "byteorder", "integer-encoding", "log", - "ordered-float", + "ordered-float 1.1.1", "threadpool", ] @@ -8662,6 +8808,7 @@ dependencies = [ "socket2", "stable_deref_trait", "strum", + "subtle", "syn", "time 0.3.17", "tokio", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index f61a634bdc3dc..3643f351d5559 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -47,7 +47,7 @@ prometheus = { version = "0.13", features = ["process"] } prost = { version = "0.11.0", features = ["no-recursion-limit"] } prost-reflect = "0.9.2" protobuf-native = "0.2.1" -pulsar = { version = "4.2", default-features = false, features = ["tokio-runtime", "telemetry"] } +pulsar = { version = "4.2", default-features = false, features = ["tokio-runtime", "telemetry", "auth-oauth2"] } rdkafka = { package = "madsim-rdkafka", version = "=0.2.14-alpha", features = ["cmake-build", "ssl-vendored", "gssapi"] } reqwest = { version = "0.11", features = ["json"] } risingwave_common = { path = "../common" } diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 1698d0bc3010b..5f9f5aaf99769 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -17,12 +17,33 @@ pub mod source; pub mod split; pub mod topic; +use anyhow::{anyhow, Result}; pub use enumerator::*; +use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; +use pulsar::{Authentication, Pulsar, TokioExecutor}; use serde::Deserialize; pub use split::*; +use url::Url; pub const PULSAR_CONNECTOR: &str = "pulsar"; +#[derive(Clone, Debug, Deserialize)] +pub struct PulsarOauth { + #[serde(rename = "oauth.issuer.url")] + pub issuer_url: String, + + #[serde(rename = "oauth.credentials.url")] + pub credentials_url: String, + + #[serde(rename = "oauth.audience")] + pub audience: Option, + + #[serde(rename = "oauth.scope")] + pub scope: Option, + // #[serde(flatten)] + // pub s3_cridentials: Option<>, +} + #[derive(Clone, Debug, Deserialize)] pub struct PulsarProperties { #[serde(rename = "topic", alias = "pulsar.topic")] @@ -39,4 +60,35 @@ pub struct PulsarProperties { #[serde(rename = "auth.token")] pub auth_token: Option, + + #[serde(flatten)] + pub oauth: Option, +} + +impl PulsarProperties { + pub async fn build_pulsar_client(&mut self) -> Result> { + let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor); + if let Some(oauth) = self.oauth.take() { + let url = Url::parse(&oauth.credentials_url)?; + if url.scheme() == "s3" { + todo!("s3 oauth credentials not supported yet"); + } + + let auth_params = OAuth2Params { + issuer_url: oauth.issuer_url, + credentials_url: oauth.credentials_url, + audience: oauth.audience, + scope: oauth.scope, + }; + let oauth2 = OAuth2Authentication::client_credentials(auth_params); + pulsar_builder = pulsar_builder.with_auth_provider(oauth2); + } else if let Some(auth_token) = &self.auth_token { + pulsar_builder = pulsar_builder.with_auth(Authentication { + name: "token".to_string(), + data: Vec::from(auth_token.as_str()), + }); + } + + pulsar_builder.build().await.map_err(|e| anyhow!(e)) + } } diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index be28c8574b7a5..bc4b057303d90 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -120,7 +120,7 @@ impl SplitReader for PulsarSplitReader { let builder: ConsumerBuilder = pulsar .consumer() - .with_topic(topic) + .with_topic(&topic) .with_subscription_type(SubType::Exclusive) .with_subscription(format!( "consumer-{}", @@ -131,17 +131,35 @@ impl SplitReader for PulsarSplitReader { )); let builder = match split.start_offset.clone() { - PulsarEnumeratorOffset::Earliest => builder.with_options( - ConsumerOptions::default().with_initial_position(InitialPosition::Earliest), - ), + PulsarEnumeratorOffset::Earliest => { + if topic.starts_with("non-persistent://") { + tracing::warn!("Earliest offset is not supported for non-persistent topic, use Latest instead"); + builder.with_options( + ConsumerOptions::default().with_initial_position(InitialPosition::Latest), + ) + } else { + builder.with_options( + ConsumerOptions::default().with_initial_position(InitialPosition::Earliest), + ) + } + } PulsarEnumeratorOffset::Latest => builder.with_options( ConsumerOptions::default().with_initial_position(InitialPosition::Latest), ), - PulsarEnumeratorOffset::MessageId(m) => builder.with_options(pulsar::ConsumerOptions { - durable: Some(false), - start_message_id: parse_message_id(m.as_str()).ok(), - ..Default::default() - }), + PulsarEnumeratorOffset::MessageId(m) => { + if topic.starts_with("non-persistent://") { + tracing::warn!("MessageId offset is not supported for non-persistent topic, use Latest instead"); + builder.with_options( + ConsumerOptions::default().with_initial_position(InitialPosition::Latest), + ) + } else { + builder.with_options(pulsar::ConsumerOptions { + durable: Some(false), + start_message_id: parse_message_id(m.as_str()).ok(), + ..Default::default() + }) + } + } PulsarEnumeratorOffset::Timestamp(_) => builder, }; diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 14680804350d5..25af24b675efc 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -27,7 +27,7 @@ aws-smithy-client = { version = "0.51", default-features = false, features = ["n aws-types = { version = "0.51", default-features = false, features = ["hardcoded-credentials"] } base64 = { version = "0.21" } bytes = { version = "1", features = ["serde"] } -chrono = { version = "0.4" } +chrono = { version = "0.4", features = ["serde"] } clap = { version = "3", features = ["derive", "env"] } combine = { version = "4" } criterion = { version = "0.4", features = ["async_futures", "async_tokio"] } @@ -82,7 +82,7 @@ rand_chacha = { version = "0.3" } rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-syntax = { version = "0.6" } -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } @@ -90,6 +90,7 @@ smallvec = { version = "1", default-features = false, features = ["serde"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } stable_deref_trait = { version = "1" } strum = { version = "0.24", features = ["derive"] } +subtle = { version = "2" } time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "signal", "stats", "sync", "time", "tracing"] } tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710", features = ["net"] } @@ -120,7 +121,7 @@ aws-types = { version = "0.51", default-features = false, features = ["hardcoded base64 = { version = "0.21" } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } -chrono = { version = "0.4" } +chrono = { version = "0.4", features = ["serde"] } clap = { version = "3", features = ["derive", "env"] } combine = { version = "4" } criterion = { version = "0.4", features = ["async_futures", "async_tokio"] } @@ -176,7 +177,7 @@ rand_chacha = { version = "0.3" } rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-syntax = { version = "0.6" } -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } @@ -184,6 +185,7 @@ smallvec = { version = "1", default-features = false, features = ["serde"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } stable_deref_trait = { version = "1" } strum = { version = "0.24", features = ["derive"] } +subtle = { version = "2" } syn = { version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] } time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "signal", "stats", "sync", "time", "tracing"] } From e4b07e6ea8711712856d7f2d9b4ef748e3d3ac9b Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 28 Feb 2023 15:58:06 +0800 Subject: [PATCH 3/6] fix Signed-off-by: tabVersion --- src/connector/src/source/pulsar/enumerator/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector/src/source/pulsar/enumerator/client.rs b/src/connector/src/source/pulsar/enumerator/client.rs index 652b91003b48b..c2a0877c9a643 100644 --- a/src/connector/src/source/pulsar/enumerator/client.rs +++ b/src/connector/src/source/pulsar/enumerator/client.rs @@ -88,15 +88,15 @@ impl SplitEnumerator for PulsarSplitEnumerator { // MessageId is only used when recovering from a State assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_))); - let topic_metadata = self + let topic_partitions = self .client .lookup_partitioned_topic_number(&self.topic.to_string()) .await .map_err(|e| anyhow!(e))?; - let splits = if topic_metadata > 0 { + let splits = if topic_partitions > 0 { // partitioned topic - (0..topic_metadata as i32) + (0..topic_partitions as i32) .map(|p| PulsarSplit { topic: self.topic.sub_topic(p).unwrap(), start_offset: offset.clone(), From 105d0a3365e3c3a65f8793b76bcb080e82b9221d Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 1 Mar 2023 14:53:58 +0800 Subject: [PATCH 4/6] fix oauth Signed-off-by: tabVersion --- .../src/source/pulsar/enumerator/client.rs | 13 +----- src/connector/src/source/pulsar/mod.rs | 19 +++++---- .../src/source/pulsar/source/reader.rs | 41 +------------------ 3 files changed, 14 insertions(+), 59 deletions(-) diff --git a/src/connector/src/source/pulsar/enumerator/client.rs b/src/connector/src/source/pulsar/enumerator/client.rs index c2a0877c9a643..cdafd49b90168 100644 --- a/src/connector/src/source/pulsar/enumerator/client.rs +++ b/src/connector/src/source/pulsar/enumerator/client.rs @@ -15,7 +15,7 @@ use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use itertools::Itertools; -use pulsar::{Authentication, Pulsar, TokioExecutor}; +use pulsar::{Pulsar, TokioExecutor}; use serde::{Deserialize, Serialize}; use crate::source::pulsar::split::PulsarSplit; @@ -43,6 +43,7 @@ impl SplitEnumerator for PulsarSplitEnumerator { type Split = PulsarSplit; async fn new(properties: PulsarProperties) -> Result { + let pulsar = properties.build_pulsar_client().await?; let topic = properties.topic; let parsed_topic = parse_topic(&topic)?; @@ -66,16 +67,6 @@ impl SplitEnumerator for PulsarSplitEnumerator { scan_start_offset = PulsarEnumeratorOffset::Timestamp(time_offset) } - let mut pulsar_builder = Pulsar::builder(properties.service_url, TokioExecutor); - if let Some(auth_token) = properties.auth_token { - pulsar_builder = pulsar_builder.with_auth(Authentication { - name: "token".to_string(), - data: Vec::from(auth_token), - }); - } - - let pulsar = pulsar_builder.build().await.map_err(|e| anyhow!(e))?; - Ok(PulsarSplitEnumerator { client: pulsar, topic: parsed_topic, diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 5f9f5aaf99769..0c055442a9785 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -36,7 +36,7 @@ pub struct PulsarOauth { pub credentials_url: String, #[serde(rename = "oauth.audience")] - pub audience: Option, + pub audience: String, #[serde(rename = "oauth.scope")] pub scope: Option, @@ -66,22 +66,23 @@ pub struct PulsarProperties { } impl PulsarProperties { - pub async fn build_pulsar_client(&mut self) -> Result> { + pub async fn build_pulsar_client(&self) -> Result> { let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor); - if let Some(oauth) = self.oauth.take() { + if let Some(oauth) = &self.oauth { let url = Url::parse(&oauth.credentials_url)?; if url.scheme() == "s3" { todo!("s3 oauth credentials not supported yet"); } let auth_params = OAuth2Params { - issuer_url: oauth.issuer_url, - credentials_url: oauth.credentials_url, - audience: oauth.audience, - scope: oauth.scope, + issuer_url: oauth.issuer_url.clone(), + credentials_url: oauth.credentials_url.clone(), + audience: Some(oauth.audience.clone()), + scope: oauth.scope.clone(), }; - let oauth2 = OAuth2Authentication::client_credentials(auth_params); - pulsar_builder = pulsar_builder.with_auth_provider(oauth2); + + pulsar_builder = pulsar_builder + .with_auth_provider(OAuth2Authentication::client_credentials(auth_params)); } else if let Some(auth_token) = &self.auth_token { pulsar_builder = pulsar_builder.with_auth(Authentication { name: "token".to_string(), diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index bc4b057303d90..22d89516f7d39 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -21,9 +21,7 @@ use futures_async_stream::try_stream; use itertools::Itertools; use pulsar::consumer::InitialPosition; use pulsar::message::proto::MessageIdData; -use pulsar::{ - Authentication, Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor, -}; +use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor}; use risingwave_common::try_match_expand; use crate::impl_common_split_reader_logic; @@ -102,22 +100,11 @@ impl SplitReader for PulsarSplitReader { ) -> Result { ensure!(splits.len() == 1, "only support single split"); let split = try_match_expand!(splits.into_iter().next().unwrap(), SplitImpl::Pulsar)?; - - let service_url = &props.service_url; + let pulsar = props.build_pulsar_client().await?; let topic = split.topic.to_string(); tracing::debug!("creating consumer for pulsar split topic {}", topic,); - let mut pulsar_builder = Pulsar::builder(service_url, TokioExecutor); - if let Some(auth_token) = props.auth_token { - pulsar_builder = pulsar_builder.with_auth(Authentication { - name: "token".to_string(), - data: Vec::from(auth_token), - }); - } - - let pulsar = pulsar_builder.build().await.map_err(|e| anyhow!(e))?; - let builder: ConsumerBuilder = pulsar .consumer() .with_topic(&topic) @@ -201,27 +188,3 @@ impl PulsarSplitReader { } } } - -#[tokio::test] -async fn test() { - let service_url = "pulsar+ssl://pulsar-gcp-useast1.streaming.datastax.com:6651"; - let topic = "non-persistent://meetup/default/demo2"; - let auth_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2NzgwOTUyMTIsImlhdCI6MTY3NzQ5MDQxMiwiaXNzIjoiZGF0YXN0YXgiLCJzdWIiOiJjbGllbnQ7Yjg5Nzg2YTctMGI3Mi00ZWIwLWIxZWItNzA4MTlhODQ2YWQzO2JXVmxkSFZ3OzJkZWI3ODc2MjAiLCJ0b2tlbmlkIjoiMmRlYjc4NzYyMCJ9.i4kokLAcowtNZAjlVCVu2ACwTNzZDkWEfnwvulCtrIFRKHMdZHo-M9xUS7wiD_tugZ5MxVdV1xDrAb3OX2Z0_9e-Qd_HdktNuBv0lCLIXvFNGe9JgtzL0fgXtsE_NlRB3q77ltIfnQsI16OUK6JMPmM7udvoqkq8XNVq5DmFsOO470ans4TRJEu9WhOko7sihWz2P1y9QpPIIaM__iVJ7N03_-SuX-Du-UPfOPc7nfBTxnldNwXmNXTdD8gXmuvZ5RvnK2bx_7Zf03LQjj5crcw5R4e70oA4y-BTAdH6rxCl0urkLJAZh0DW9Ewe3EQp1vVJDS2OtCkNnEyOpnqY7w"; - - let mut pulsar_builder = Pulsar::builder(service_url, TokioExecutor); - pulsar_builder = pulsar_builder.with_auth(Authentication { - name: "token".to_string(), - data: Vec::from(auth_token), - }); - - let pulsar = pulsar_builder - .build() - .await - .map_err(|e| anyhow!(e)) - .unwrap(); - - println!( - "{:?}", - pulsar.lookup_partitioned_topic_number(topic).await.unwrap() - ); -} From dff6b0dfcfe2e824ca8d92724d16a76a45950aba Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 2 Mar 2023 13:27:12 +0800 Subject: [PATCH 5/6] stash Signed-off-by: tabVersion --- src/workspace-hack/Cargo.toml | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 80c6706c66df4..8c2069ee05b21 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -27,7 +27,7 @@ aws-smithy-client = { version = "0.51", default-features = false, features = ["n aws-types = { version = "0.51", default-features = false, features = ["hardcoded-credentials"] } base64 = { version = "0.21" } bytes = { version = "1", features = ["serde"] } -chrono = { version = "0.4" } +chrono = { version = "0.4", features = ["serde"] } clap = { version = "4", features = ["derive", "env"] } combine = { version = "4" } criterion = { version = "0.4", features = ["async_futures", "async_tokio"] } @@ -82,7 +82,7 @@ rand_chacha = { version = "0.3" } rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-syntax = { version = "0.6" } -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } @@ -90,6 +90,7 @@ smallvec = { version = "1", default-features = false, features = ["serde"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } stable_deref_trait = { version = "1" } strum = { version = "0.24", features = ["derive"] } +subtle = { version = "2" } time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "signal", "stats", "sync", "time", "tracing"] } tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710", features = ["net"] } @@ -120,7 +121,7 @@ aws-types = { version = "0.51", default-features = false, features = ["hardcoded base64 = { version = "0.21" } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } -chrono = { version = "0.4" } +chrono = { version = "0.4", features = ["serde"] } clap = { version = "4", features = ["derive", "env"] } combine = { version = "4" } criterion = { version = "0.4", features = ["async_futures", "async_tokio"] } @@ -176,7 +177,7 @@ rand_chacha = { version = "0.3" } rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-syntax = { version = "0.6" } -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } @@ -184,6 +185,7 @@ smallvec = { version = "1", default-features = false, features = ["serde"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } stable_deref_trait = { version = "1" } strum = { version = "0.24", features = ["derive"] } +subtle = { version = "2" } syn = { version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] } time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "signal", "stats", "sync", "time", "tracing"] } From af21721f5f7bd307281ed86408c4a68d2c79dd86 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 3 Mar 2023 00:55:52 +0800 Subject: [PATCH 6/6] fix Signed-off-by: tabVersion --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index ef1d62f50070f..98e4a27390e25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2021,7 +2021,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" dependencies = [ - "darling", + "darling 0.14.3", "proc-macro2", "quote", "syn",