diff --git a/Cargo.lock b/Cargo.lock index fb63420db1f9..01792a8b108d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -198,7 +198,7 @@ dependencies = [ [[package]] name = "apache-avro" version = "0.17.0" -source = "git+https://github.com/icelake-io/avro.git?rev=4866a4ad0ed5d6af7160c9b52af898ab6d0551f9#4866a4ad0ed5d6af7160c9b52af898ab6d0551f9" +source = "git+https://github.com/icelake-io/avro.git?branch=icelake-dev#4b828e9283e7248fd3ca42f5b590c2160b201785" dependencies = [ "apache-avro-derive", "bigdecimal 0.4.2", @@ -221,7 +221,7 @@ dependencies = [ [[package]] name = "apache-avro-derive" version = "0.17.0" -source = "git+https://github.com/icelake-io/avro.git?rev=4866a4ad0ed5d6af7160c9b52af898ab6d0551f9#4866a4ad0ed5d6af7160c9b52af898ab6d0551f9" +source = "git+https://github.com/icelake-io/avro.git?branch=icelake-dev#4b828e9283e7248fd3ca42f5b590c2160b201785" dependencies = [ "darling 0.20.3", "proc-macro2", @@ -5051,7 +5051,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=b4f4ca3c6d29092bd331925ead0bcceaa38bdd57#b4f4ca3c6d29092bd331925ead0bcceaa38bdd57" +source = "git+https://github.com/icelake-io/icelake?rev=32c0bbf242f5c47b1e743f10577012fe7436c770#32c0bbf242f5c47b1e743f10577012fe7436c770" dependencies = [ "anyhow", "apache-avro 0.17.0", diff --git a/Cargo.toml b/Cargo.toml index d3fec07a9c3d..555e6a7d2a1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "b4f4ca3c6d29092bd331925ead0bcceaa38bdd57", features = [ +icelake = { git = "https://github.com/icelake-io/icelake", rev = "32c0bbf242f5c47b1e743f10577012fe7436c770", features = [ "prometheus", ] } arrow-array = "49" diff --git a/e2e_test/source/basic/pubsub.slt b/e2e_test/source/basic/pubsub.slt index e5d991140579..b245d9b2aea8 100644 --- a/e2e_test/source/basic/pubsub.slt +++ b/e2e_test/source/basic/pubsub.slt @@ -2,16 +2,14 @@ statement error CREATE TABLE s1 (v1 int, v2 varchar) WITH ( pubsub.subscription = 'test-subscription-1', - pubsub.emulator_host = 'localhost:5981', - pubsub.split_count = 3 + pubsub.emulator_host = 'invalid_host:5981' ) FORMAT PLAIN ENCODE JSON; statement ok CREATE TABLE s1 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-1', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 3 + pubsub.emulator_host = 'localhost:5980' ) FORMAT PLAIN ENCODE JSON; statement ok @@ -25,25 +23,14 @@ statement error CREATE TABLE s2 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-not-2', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 3 + pubsub.emulator_host = 'localhost:5980' ) FORMAT PLAIN ENCODE JSON; statement ok CREATE TABLE s2 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-2', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 3 -) FORMAT PLAIN ENCODE JSON; - -# fail with invalid split count -statement error -CREATE TABLE s3 (v1 int, v2 varchar) WITH ( - connector = 'google_pubsub', - pubsub.subscription = 'test-subscription-3', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 0 + pubsub.emulator_host = 'localhost:5980' ) FORMAT PLAIN ENCODE JSON; # fail if both start_offset and start_snapshot are provided @@ -52,7 +39,6 @@ CREATE TABLE s3 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-3', pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 2, pubsub.start_offset = "121212", pubsub.start_snapshot = "snapshot-that-doesnt-exist" ) FORMAT PLAIN ENCODE JSON; diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index e957bac8a641..01809a3c773b 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -37,13 +37,8 @@ impl SplitEnumerator for PubsubSplitEnumerator { properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> anyhow::Result { - let split_count = properties.split_count; let subscription = properties.subscription.to_owned(); - if split_count < 1 { - bail!("split_count must be >= 1") - } - if properties.credentials.is_none() && properties.emulator_host.is_none() { bail!("credentials must be set if not using the pubsub emulator") } @@ -87,7 +82,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { } (None, Some(snapshot)) => Some(SeekTo::Snapshot(snapshot)), (Some(_), Some(_)) => { - bail!("specify atmost one of start_offset or start_snapshot") + bail!("specify at most one of start_offset or start_snapshot") } }; @@ -99,7 +94,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { Ok(Self { subscription, - split_count, + split_count: 1, }) } diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index 18fd11b1e3ba..aeec1accd820 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -21,7 +21,6 @@ pub mod source; pub mod split; pub use enumerator::*; -use serde_with::{serde_as, DisplayFromStr}; pub use source::*; pub use split::*; use with_options::WithOptions; @@ -30,13 +29,8 @@ use crate::source::SourceProperties; pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub"; -#[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct PubsubProperties { - #[serde_as(as = "DisplayFromStr")] - #[serde(rename = "pubsub.split_count")] - pub split_count: u32, - /// pubsub subscription to consume messages from /// The subscription should be configured with the `retain-on-ack` property to enable /// message recovery within risingwave. @@ -48,19 +42,19 @@ pub struct PubsubProperties { #[serde(rename = "pubsub.emulator_host")] pub emulator_host: Option, - /// credentials JSON object encoded with base64 + /// `credentials` is a JSON string containing the service account credentials. /// See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). /// The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles). #[serde(rename = "pubsub.credentials")] pub credentials: Option, - /// `start_offset` is a numeric timestamp, ideallly the publish timestamp of a message + /// `start_offset` is a numeric timestamp, ideally the publish timestamp of a message /// in the subscription. If present, the connector will attempt to seek the subscription /// to the timestamp and start consuming from there. Note that the seek operation is /// subject to limitations around the message retention policy of the subscription. See /// [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for /// more details. - #[serde(rename = "pubsub.start_offset")] + #[serde(rename = "pubsub.start_offset.nanos")] pub start_offset: Option, /// `start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek @@ -127,7 +121,6 @@ mod tests { let default_properties = PubsubProperties { credentials: None, emulator_host: None, - split_count: 1, start_offset: None, start_snapshot: None, subscription: String::from("test-subscription"), diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 187780cd2382..84a125813733 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -471,9 +471,6 @@ PosixFsProperties: default: Default::default PubsubProperties: fields: - - name: pubsub.split_count - field_type: u32 - required: true - name: pubsub.subscription field_type: String comments: pubsub subscription to consume messages from The subscription should be configured with the `retain-on-ack` property to enable message recovery within risingwave. @@ -484,11 +481,11 @@ PubsubProperties: required: false - name: pubsub.credentials field_type: String - comments: credentials JSON object encoded with base64 See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles). + comments: '`credentials` is a JSON string containing the service account credentials. See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).' required: false - - name: pubsub.start_offset + - name: pubsub.start_offset.nanos field_type: String - comments: '`start_offset` is a numeric timestamp, ideallly the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' + comments: '`start_offset` is a numeric timestamp, ideally the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' required: false - name: pubsub.start_snapshot field_type: String