diff --git a/e2e_test/source_inline/pubsub/prepare-data.rs b/e2e_test/source_inline/pubsub/prepare-data.rs index 81bd1ba73e41..bc07f9de6ccf 100755 --- a/e2e_test/source_inline/pubsub/prepare-data.rs +++ b/e2e_test/source_inline/pubsub/prepare-data.rs @@ -16,7 +16,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ ``` use google_cloud_googleapis::pubsub::v1::PubsubMessage; -use google_cloud_pubsub::client::Client; +use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_pubsub::subscription::SubscriptionConfig; const TOPIC: &str = "test-topic"; @@ -25,65 +25,57 @@ const SUBSCRIPTION_COUNT: usize = 50; #[tokio::main] async fn main() -> anyhow::Result<()> { - std::env::set_var("PUBSUB_EMULATOR_HOST", "127.0.0.1:5980"); + let args: Vec = std::env::args().collect(); + let command = args[1].as_str(); - let client = Client::new(Default::default()).await?; + let use_emulator = std::env::var("PUBSUB_EMULATOR_HOST").is_ok(); + let use_cloud = std::env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_ok(); + if !use_emulator && !use_cloud { + panic!("either PUBSUB_EMULATOR_HOST or GOOGLE_APPLICATION_CREDENTIALS_JSON must be set"); + } + + let config = ClientConfig::default().with_auth().await?; + let client = Client::new(config).await?; - // delete and create "test-topic" let topic = client.topic(TOPIC); - for subscription in topic.subscriptions(None).await? { - subscription.delete(None).await?; - } - let _ = topic.delete(None).await; - topic.create(Some(Default::default()), None).await?; - for i in 0..SUBSCRIPTION_COUNT { - let _ = client - .create_subscription( - format!("test-subscription-{}", i).as_str(), - TOPIC, - SubscriptionConfig { - retain_acked_messages: true, - ..Default::default() - }, - None, - ) - .await?; - } + if command == "create" { + // delete and create "test-topic" + for subscription in topic.subscriptions(None).await? { + subscription.delete(None).await?; + } - let publisher = topic.new_publisher(Default::default()); - for line in DATA.lines() { - let a = publisher - .publish(PubsubMessage { - data: line.to_string().into_bytes(), - ..Default::default() - }) - .await; - a.get().await?; - println!("published {}", line); + let _ = topic.delete(None).await; + topic.create(Some(Default::default()), None).await?; + for i in 0..SUBSCRIPTION_COUNT { + let _ = client + .create_subscription( + format!("test-subscription-{}", i).as_str(), + TOPIC, + SubscriptionConfig { + retain_acked_messages: false, + ..Default::default() + }, + None, + ) + .await?; + } + } else if command == "publish" { + let publisher = topic.new_publisher(Default::default()); + for i in 0..10 { + let data = format!("{{\"v1\":{i},\"v2\":\"name{i}\"}}"); + let a = publisher + .publish(PubsubMessage { + data: data.to_string().into_bytes(), + ..Default::default() + }) + .await; + a.get().await?; + println!("published {}", data); + } + } else { + panic!("unknown command {command}"); } Ok(()) } - -const DATA: &str = r#"{"v1":1,"v2":"name0"} -{"v1":2,"v2":"name0"} -{"v1":6,"v2":"name3"} -{"v1":0,"v2":"name5"} -{"v1":5,"v2":"name8"} -{"v1":6,"v2":"name4"} -{"v1":8,"v2":"name9"} -{"v1":9,"v2":"name2"} -{"v1":4,"v2":"name6"} -{"v1":5,"v2":"name3"} -{"v1":8,"v2":"name8"} -{"v1":9,"v2":"name2"} -{"v1":2,"v2":"name3"} -{"v1":4,"v2":"name7"} -{"v1":7,"v2":"name0"} -{"v1":0,"v2":"name9"} -{"v1":3,"v2":"name2"} -{"v1":7,"v2":"name5"} -{"v1":1,"v2":"name7"} -{"v1":3,"v2":"name9"} -"#; diff --git a/e2e_test/source_inline/pubsub/pubsub.slt b/e2e_test/source_inline/pubsub/pubsub.slt index cfb5f551cf36..35c94d07b962 100644 --- a/e2e_test/source_inline/pubsub/pubsub.slt +++ b/e2e_test/source_inline/pubsub/pubsub.slt @@ -1,80 +1,155 @@ control substitution on system ok -e2e_test/source_inline/pubsub/prepare-data.rs +e2e_test/source_inline/pubsub/prepare-data.rs create + +statement error missing field `pubsub.subscription` +CREATE SOURCE s (v1 int, v2 varchar) WITH ( + connector = 'google_pubsub' +) FORMAT PLAIN ENCODE JSON; + +statement error credentials must be set if not using the pubsub emulator +CREATE SOURCE s (v1 int, v2 varchar) WITH ( + connector = 'google_pubsub', + pubsub.subscription = 'test-subscription-1' +) FORMAT PLAIN ENCODE JSON; -# fail with invalid emulator_host statement error failed to lookup address information -CREATE TABLE s1 (v1 int, v2 varchar) WITH ( +CREATE TABLE s (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-1', pubsub.emulator_host = 'invalid_host:5981' ) FORMAT PLAIN ENCODE JSON; +statement error subscription test-subscription-not-exist does not exist +CREATE TABLE s (v1 int, v2 varchar) WITH ( + ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, + pubsub.subscription = 'test-subscription-not-exist', +) FORMAT PLAIN ENCODE JSON; + +# fail if both start_offset and start_snapshot are provided +statement error specify at most one of start_offset or start_snapshot +CREATE TABLE s (v1 int, v2 varchar) WITH ( + ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, + pubsub.subscription = 'test-subscription-3', + pubsub.start_offset.nanos = '121212', + pubsub.start_snapshot = 'snapshot-that-doesnt-exist' +) FORMAT PLAIN ENCODE JSON; + statement ok CREATE TABLE s1 (v1 int, v2 varchar) WITH ( ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, pubsub.subscription = 'test-subscription-1', + pubsub.parallelism = 5 ) FORMAT PLAIN ENCODE JSON; + +# We want to test Pub/Sub ack messages on checkpoint, so disable checkpointing here. +# Note that DDL & flush will trigger checkpoint. statement ok -SELECT * FROM s1; +ALTER SYSTEM SET checkpoint_frequency TO 114514; + +# We publish data after the tables are created, because DDL will trigger checkpoint. +system ok +e2e_test/source_inline/pubsub/prepare-data.rs publish + +# default ack timeout is 10s. Let it redeliver once. +sleep 15s + +# visibility mode is checkpoint +query IT rowsort +select count(*) from s1; +---- +0 statement ok -DROP TABLE s1; +set visibility_mode = 'all'; + +query IT rowsort +select v1, v2, count(*) FROM s1 group by v1, v2; +---- +0 name0 2 +1 name1 2 +2 name2 2 +3 name3 2 +4 name4 2 +5 name5 2 +6 name6 2 +7 name7 2 +8 name8 2 +9 name9 2 -statement error subscription test-subscription-not-exist does not exist -CREATE TABLE s2 (v1 int, v2 varchar) WITH ( - ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, - pubsub.subscription = 'test-subscription-not-exist', -) FORMAT PLAIN ENCODE JSON; statement ok -CREATE TABLE s2 (v1 int, v2 varchar) WITH ( - ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, - pubsub.subscription = 'test-subscription-2', -) FORMAT PLAIN ENCODE JSON; +RECOVER; -# fail if both start_offset and start_snapshot are provided -statement error specify at most one of start_offset or start_snapshot -CREATE TABLE s3 (v1 int, v2 varchar) WITH ( - ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, - pubsub.subscription = 'test-subscription-3', - pubsub.start_offset.nanos = '121212', - pubsub.start_snapshot = 'snapshot-that-doesnt-exist' -) FORMAT PLAIN ENCODE JSON; +sleep 1s -# wait for source +statement ok +flush; + +# After recovery, uncheckpointed data in RisingWave is lost. +query IT rowsort +select count(*) from s1; +---- +0 + +# Wait for another redelivery sleep 10s -# flush data into storage +query IT rowsort +select v1, v2, count(*) FROM s1 group by v1, v2; +---- +0 name0 1 +1 name1 1 +2 name2 1 +3 name3 1 +4 name4 1 +5 name5 1 +6 name6 1 +7 name7 1 +8 name8 1 +9 name9 1 + +# flush will force a checkpoint (and ack Pub/Sub messages on checkpoint) statement ok flush; query IT rowsort -select v1, v2 FROM s2; +select v1, v2, count(*) FROM s1 group by v1, v2; ---- -0 name5 -0 name9 -1 name0 -1 name7 -2 name0 -2 name3 -3 name2 -3 name9 -4 name6 -4 name7 -5 name3 -5 name8 -6 name3 -6 name4 -7 name0 -7 name5 -8 name8 -8 name9 -9 name2 -9 name2 +0 name0 1 +1 name1 1 +2 name2 1 +3 name3 1 +4 name4 1 +5 name5 1 +6 name6 1 +7 name7 1 +8 name8 1 +9 name9 1 + +sleep 15s + +# no redelivery any more +query IT rowsort +select v1, v2, count(*) FROM s1 group by v1, v2; +---- +0 name0 1 +1 name1 1 +2 name2 1 +3 name3 1 +4 name4 1 +5 name5 1 +6 name6 1 +7 name7 1 +8 name8 1 +9 name9 1 + statement ok -DROP TABLE s2; +DROP TABLE s1; +# Restore to the value in src/config/ci-recovery.toml +statement ok +ALTER SYSTEM SET checkpoint_frequency TO 5; diff --git a/risedev.yml b/risedev.yml index 646ebb551bf6..8872e0a14de5 100644 --- a/risedev.yml +++ b/risedev.yml @@ -863,7 +863,7 @@ profile: port: 29092 ci-inline-source-test: - config-path: src/config/ci.toml + config-path: src/config/ci-recovery.toml steps: - use: minio - use: etcd diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 2baad2a62597..357d7aae84d3 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -176,7 +176,7 @@ impl SourceExecutor { )); let stream = self .source - .to_stream(Some(self.split_list), self.column_ids, source_ctx) + .build_stream(Some(self.split_list), self.column_ids, source_ctx) .await?; #[for_await] diff --git a/src/config/ci-recovery.toml b/src/config/ci-recovery.toml index 95e583462111..69de86279c1c 100644 --- a/src/config/ci-recovery.toml +++ b/src/config/ci-recovery.toml @@ -10,5 +10,6 @@ imm_merge_threshold = 2 [system] barrier_interval_ms = 250 +# If this is changed, remember to also change e2e_test/source_inline/pubsub/pubsub.slt checkpoint_frequency = 5 max_concurrent_creating_streaming_jobs = 0 diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index 25cb28909c47..3bb37845763e 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -15,8 +15,7 @@ use anyhow::Context; use async_trait::async_trait; use chrono::{TimeZone, Utc}; -use google_cloud_pubsub::client::{Client, ClientConfig}; -use google_cloud_pubsub::subscription::{SeekTo, SubscriptionConfig}; +use google_cloud_pubsub::subscription::SeekTo; use risingwave_common::bail; use crate::error::ConnectorResult; @@ -39,41 +38,22 @@ impl SplitEnumerator for PubsubSplitEnumerator { properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> ConnectorResult { - let subscription = properties.subscription.to_owned(); + let split_count = properties.parallelism.unwrap_or(1); + if split_count < 1 { + bail!("parallelism must be >= 1"); + }; if properties.credentials.is_none() && properties.emulator_host.is_none() { bail!("credentials must be set if not using the pubsub emulator") } - properties.initialize_env(); - - // Validate config - let config = ClientConfig::default().with_auth().await?; - let client = Client::new(config) - .await - .context("error initializing pubsub client")?; - - let sub = client.subscription(&subscription); + let sub = properties.subscription_client().await?; if !sub .exists(None) .await .context("error checking subscription validity")? { - bail!("subscription {} does not exist", &subscription) - } - - // We need the `retain_acked_messages` configuration to be true to seek back to timestamps - // as done in the [`PubsubSplitReader`] and here. - let (_, subscription_config) = sub - .config(None) - .await - .context("failed to fetch subscription config")?; - if let SubscriptionConfig { - retain_acked_messages: false, - .. - } = subscription_config - { - bail!("subscription must be configured with retain_acked_messages set to true") + bail!("subscription {} does not exist", &sub.id()) } let seek_to = match (properties.start_offset, properties.start_snapshot) { @@ -98,8 +78,8 @@ impl SplitEnumerator for PubsubSplitEnumerator { } Ok(Self { - subscription, - split_count: 1, + subscription: properties.subscription.to_owned(), + split_count, }) } @@ -109,8 +89,8 @@ impl SplitEnumerator for PubsubSplitEnumerator { .map(|i| PubsubSplit { index: i, subscription: self.subscription.to_owned(), - start_offset: None, - stop_offset: None, + __deprecated_start_offset: None, + __deprecated_stop_offset: None, }) .collect(); diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index 0a49fa6467f6..a0794bb1205d 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -14,26 +14,39 @@ use std::collections::HashMap; +use anyhow::Context; +use google_cloud_pubsub::client::{Client, ClientConfig}; +use google_cloud_pubsub::subscription::Subscription; use serde::Deserialize; pub mod enumerator; pub mod source; pub mod split; - pub use enumerator::*; +use serde_with::{serde_as, DisplayFromStr}; pub use source::*; pub use split::*; use with_options::WithOptions; +use crate::error::ConnectorResult; use crate::source::SourceProperties; pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub"; +/// # Implementation Notes +/// Pub/Sub does not rely on persisted state (`SplitImpl`) to start from a position. +/// It rely on Pub/Sub to load-balance messages between all Readers. +/// We `ack` received messages after checkpoint (see `WaitCheckpointWorker`) to achieve at-least-once delivery. +#[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct PubsubProperties { - /// pubsub subscription to consume messages from - /// The subscription should be configured with the `retain-on-ack` property to enable - /// message recovery within risingwave. + /// Pub/Sub subscription to consume messages from. + /// + /// Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from + /// the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV + /// (shared between the actors of its fragment). + /// Otherwise, different MVs on the same Source will both receive part of the messages. + /// TODO: check and enforce this on Meta. #[serde(rename = "pubsub.subscription")] pub subscription: String, @@ -62,12 +75,18 @@ pub struct PubsubProperties { /// in pub/sub because they guarantee retention of: /// - All unacknowledged messages at the time of their creation. /// - All messages created after their creation. - /// Besides retention guarantees, timestamps are also more precise than timestamp-based seeks. + /// Besides retention guarantees, snapshots are also more precise than timestamp-based seeks. /// See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for /// more details. #[serde(rename = "pubsub.start_snapshot")] pub start_snapshot: Option, + /// `parallelism` is the number of parallel consumers to run for the subscription. + /// TODO: use system parallelism if not set + #[serde_as(as = "Option")] + #[serde(rename = "pubsub.parallelism")] + pub parallelism: Option, + #[serde(flatten)] pub unknown_fields: HashMap, } @@ -97,6 +116,18 @@ impl PubsubProperties { std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", credentials); } } + + pub(crate) async fn subscription_client(&self) -> ConnectorResult { + self.initialize_env(); + + // Validate config + let config = ClientConfig::default().with_auth().await?; + let client = Client::new(config) + .await + .context("error initializing pubsub client")?; + + Ok(client.subscription(&self.subscription)) + } } #[cfg(test)] @@ -123,7 +154,7 @@ mod tests { start_offset: None, start_snapshot: None, subscription: String::from("test-subscription"), - + parallelism: None, unknown_fields: Default::default(), }; diff --git a/src/connector/src/source/google_pubsub/source/message.rs b/src/connector/src/source/google_pubsub/source/message.rs index cf14757b3668..10194453bebb 100644 --- a/src/connector/src/source/google_pubsub/source/message.rs +++ b/src/connector/src/source/google_pubsub/source/message.rs @@ -31,6 +31,7 @@ impl From for SourceMessage { fn from(tagged_message: TaggedReceivedMessage) -> Self { let TaggedReceivedMessage(split_id, message) = tagged_message; + let ack_id = message.ack_id().to_string(); let timestamp = message .message .publish_time @@ -50,8 +51,9 @@ impl From for SourceMessage { _ => Some(payload), } }, - offset: timestamp.timestamp_nanos_opt().unwrap().to_string(), + offset: ack_id, split_id, + // What's the usage of this? meta: SourceMeta::GooglePubsub(GooglePubsubMeta { timestamp: Some(timestamp.timestamp_millis()), }), diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index 0887cb06594f..c771fc8eb638 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::{anyhow, Context}; use async_trait::async_trait; -use chrono::{NaiveDateTime, TimeZone, Utc}; use futures_async_stream::try_stream; -use google_cloud_pubsub::client::{Client, ClientConfig}; -use google_cloud_pubsub::subscription::{SeekTo, Subscription}; +use google_cloud_pubsub::subscription::Subscription; use risingwave_common::{bail, ensure}; use tonic::Code; @@ -34,7 +31,6 @@ const PUBSUB_MAX_FETCH_MESSAGES: usize = 1024; pub struct PubsubSplitReader { subscription: Subscription, - stop_offset: Option, split_id: SplitId, parser_config: ParserConfig, @@ -67,41 +63,16 @@ impl CommonSplitReader for PubsubSplitReader { continue; } - let latest_offset: NaiveDateTime = raw_chunk - .last() - .map(|m| m.message.publish_time.clone().unwrap_or_default()) - .map(|t| { - let mut t = t; - t.normalize(); - NaiveDateTime::from_timestamp_opt(t.seconds, t.nanos as u32).unwrap_or_default() - }) - .unwrap_or_default(); - let mut chunk: Vec = Vec::with_capacity(raw_chunk.len()); - let mut ack_ids: Vec = Vec::with_capacity(raw_chunk.len()); for message in raw_chunk { - ack_ids.push(message.ack_id().into()); chunk.push(SourceMessage::from(TaggedReceivedMessage( self.split_id.clone(), message, ))); } - self.subscription - .ack(ack_ids) - .await - .map_err(|e| anyhow!(e)) - .context("failed to ack pubsub messages")?; - yield chunk; - - // Stop if we've approached the stop_offset - if let Some(stop_offset) = self.stop_offset - && latest_offset >= stop_offset - { - return Ok(()); - } } } } @@ -124,42 +95,11 @@ impl SplitReader for PubsubSplitReader { ); let split = splits.into_iter().next().unwrap(); - // Set environment variables consumed by `google_cloud_pubsub` - properties.initialize_env(); - - let config = ClientConfig::default().with_auth().await?; - let client = Client::new(config).await.map_err(|e| anyhow!(e))?; - let subscription = client.subscription(&properties.subscription); - - if let Some(ref offset) = split.start_offset { - let timestamp = offset - .as_str() - .parse::() - .map(|nanos| Utc.timestamp_nanos(nanos)) - .context("error parsing offset")?; - - subscription - .seek(SeekTo::Timestamp(timestamp.into()), None) - .await - .context("error seeking to pubsub offset")?; - } - - let stop_offset = if let Some(ref offset) = split.stop_offset { - Some( - offset - .as_str() - .parse::() - .map_err(|e| anyhow!(e)) - .map(|nanos| NaiveDateTime::from_timestamp_opt(nanos, 0).unwrap_or_default())?, - ) - } else { - None - }; + let subscription = properties.subscription_client().await?; Ok(Self { subscription, split_id: split.id(), - stop_offset, parser_config, source_ctx, }) diff --git a/src/connector/src/source/google_pubsub/split.rs b/src/connector/src/source/google_pubsub/split.rs index 14c40150488a..85d32f479bc2 100644 --- a/src/connector/src/source/google_pubsub/split.rs +++ b/src/connector/src/source/google_pubsub/split.rs @@ -20,19 +20,18 @@ use crate::source::{SplitId, SplitMetaData}; #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] pub struct PubsubSplit { + // XXX: `index` and `subscription` seems also not useful. It's only for `SplitMetaData::id`. + // Is the split id useful? pub(crate) index: u32, pub(crate) subscription: String, - /// `start_offset` is a numeric timestamp. - /// When not `None`, the `PubsubReader` seeks to the timestamp described by the `start_offset`. - /// These offsets are taken from the `offset` property of the `SourceMessage` yielded by the - /// pubsub reader. - pub(crate) start_offset: Option, + #[serde(rename = "start_offset")] + #[serde(skip_serializing)] + pub(crate) __deprecated_start_offset: Option, - /// `stop_offset` is a numeric timestamp. - /// When not `None`, the `PubsubReader` stops reading messages when the `offset` property of - /// the `SourceMessage` is greater than or equal to the `stop_offset`. - pub(crate) stop_offset: Option, + #[serde(rename = "stop_offset")] + #[serde(skip_serializing)] + pub(crate) __deprecated_stop_offset: Option, } impl SplitMetaData for PubsubSplit { @@ -48,8 +47,11 @@ impl SplitMetaData for PubsubSplit { format!("{}-{}", self.subscription, self.index).into() } - fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> { - self.start_offset = Some(last_seen_offset); + /// No-op. Actually `PubsubSplit` doesn't maintain any state. It's fully managed by Pubsub. + /// One subscription is like one Kafka consumer group. + fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> { + // forcefully set previously persisted start_offset to None + self.__deprecated_start_offset = None; Ok(()) } } diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index f965d373d930..ed8842e70825 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -25,8 +25,10 @@ pub mod mqtt; pub mod nats; pub mod nexmark; pub mod pulsar; + pub use base::{UPSTREAM_SOURCE_KEY, *}; pub(crate) use common::*; +use google_cloud_pubsub::subscription::Subscription; pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR; pub use kafka::KAFKA_CONNECTOR; pub use kinesis::KINESIS_CONNECTOR; @@ -39,6 +41,8 @@ pub mod reader; pub mod test_source; pub use manager::{SourceColumnDesc, SourceColumnType}; +use risingwave_common::array::{Array, ArrayRef}; +use thiserror_ext::AsReport; pub use crate::source::filesystem::opendal_source::{ GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, @@ -68,3 +72,57 @@ pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool PREFIXES.iter().any(|prefix| key.starts_with(prefix)) || (key == "endpoint" && !connector.eq_ignore_ascii_case(KINESIS_CONNECTOR)) } + +/// Tasks executed by `WaitCheckpointWorker` +pub enum WaitCheckpointTask { + CommitCdcOffset(Option<(SplitId, String)>), + AckPubsubMessage(Subscription, Vec), +} + +impl WaitCheckpointTask { + pub async fn run(self) { + use std::str::FromStr; + match self { + WaitCheckpointTask::CommitCdcOffset(updated_offset) => { + if let Some((split_id, offset)) = updated_offset { + let source_id: u64 = u64::from_str(split_id.as_ref()).unwrap(); + // notify cdc connector to commit offset + match cdc::jni_source::commit_cdc_offset(source_id, offset.clone()) { + Ok(()) => {} + Err(e) => { + tracing::error!( + error = %e.as_report(), + "source#{source_id}: failed to commit cdc offset: {offset}.", + ) + } + } + } + } + WaitCheckpointTask::AckPubsubMessage(subscription, ack_id_arrs) => { + async fn ack(subscription: &Subscription, ack_ids: Vec) { + tracing::trace!("acking pubsub messages {:?}", ack_ids); + match subscription.ack(ack_ids).await { + Ok(()) => {} + Err(e) => { + tracing::error!( + error = %e.as_report(), + "failed to ack pubsub messages", + ) + } + } + } + const MAX_ACK_BATCH_SIZE: usize = 1000; + let mut ack_ids: Vec = vec![]; + for arr in ack_id_arrs { + for ack_id in arr.as_utf8().iter().flatten() { + ack_ids.push(ack_id.to_string()); + if ack_ids.len() >= MAX_ACK_BATCH_SIZE { + ack(&subscription, std::mem::take(&mut ack_ids)).await; + } + } + } + ack(&subscription, ack_ids).await; + } + } + } +} diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 9a1f94de0fb8..eb0ba30d1243 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -36,7 +36,7 @@ use crate::source::filesystem::opendal_source::{ use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ create_split_reader, BoxChunkSourceStream, BoxTryStream, Column, ConnectorProperties, - ConnectorState, SourceColumnDesc, SourceContext, SplitReader, + ConnectorState, SourceColumnDesc, SourceContext, SplitReader, WaitCheckpointTask, }; #[derive(Clone, Debug)] @@ -105,13 +105,22 @@ impl SourceReader { } } - /// Postgres and Oracle connectors need to commit the offset to upstream. - /// And we will spawn a separate tokio task to wait for epoch commit and commit the source offset. - pub fn need_commit_offset_to_upstream(&self) -> bool { - matches!(&self.config, ConnectorProperties::PostgresCdc(_)) + /// Refer to `WaitCheckpointWorker` for more details. + pub async fn create_wait_checkpoint_task(&self) -> ConnectorResult> { + Ok(match &self.config { + ConnectorProperties::PostgresCdc(_prop) => { + Some(WaitCheckpointTask::CommitCdcOffset(None)) + } + ConnectorProperties::GooglePubsub(prop) => Some(WaitCheckpointTask::AckPubsubMessage( + prop.subscription_client().await?, + vec![], + )), + _ => None, + }) } - pub async fn to_stream( + /// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s). + pub async fn build_stream( &self, state: ConnectorState, column_ids: Vec, diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 68ed02624199..188ceb0b5b69 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -629,7 +629,7 @@ PubsubProperties: fields: - name: pubsub.subscription field_type: String - comments: pubsub subscription to consume messages from The subscription should be configured with the `retain-on-ack` property to enable message recovery within risingwave. + comments: 'Pub/Sub subscription to consume messages from. Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV (shared between the actors of its fragment). Otherwise, different MVs on the same Source will both receive part of the messages. TODO: check and enforce this on Meta.' required: true - name: pubsub.emulator_host field_type: String @@ -645,7 +645,11 @@ PubsubProperties: required: false - name: pubsub.start_snapshot field_type: String - comments: '`start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek to the snapshot before starting consumption. Snapshots are the preferred seeking mechanism in pub/sub because they guarantee retention of: - All unacknowledged messages at the time of their creation. - All messages created after their creation. Besides retention guarantees, timestamps are also more precise than timestamp-based seeks. See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' + comments: '`start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek to the snapshot before starting consumption. Snapshots are the preferred seeking mechanism in pub/sub because they guarantee retention of: - All unacknowledged messages at the time of their creation. - All messages created after their creation. Besides retention guarantees, snapshots are also more precise than timestamp-based seeks. See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' + required: false + - name: pubsub.parallelism + field_type: u32 + comments: '`parallelism` is the number of parallel consumers to run for the subscription. TODO: use system parallelism if not set' required: false PulsarProperties: fields: diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index b33a65f9986a..5b8415113d1b 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -94,6 +94,7 @@ pub fn generate_risedev_env(services: &Vec) -> String { ServiceConfig::Pubsub(c) => { let address = &c.address; let port = &c.port; + writeln!(env, r#"PUBSUB_EMULATOR_HOST="{address}:{port}""#,).unwrap(); writeln!(env, r#"RISEDEV_PUBSUB_WITH_OPTIONS_COMMON="connector='google_pubsub',pubsub.emulator_host='{address}:{port}'""#,).unwrap(); } ServiceConfig::Postgres(c) => { diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index e4fe36819769..b4c006469e65 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -154,7 +154,7 @@ impl FsFetchExecutor { ) -> StreamExecutorResult { let stream = source_desc .source - .to_stream(batch, column_ids, Arc::new(source_ctx)) + .build_stream(batch, column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error)?; Ok(apply_rate_limit(stream, rate_limit_rps).boxed()) diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 0bc6e19f45fb..29ef22b60fcb 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -218,7 +218,7 @@ impl SourceBackfillExecutorInner { ); let stream = source_desc .source - .to_stream(Some(splits), column_ids, Arc::new(source_ctx)) + .build_stream(Some(splits), column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error)?; Ok(apply_rate_limit(stream, self.rate_limit_rps).boxed()) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index a53f48827454..4bfd1f74a33f 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,22 +13,22 @@ // limitations under the License. use std::collections::HashMap; -use std::str::FromStr; use std::time::Duration; use anyhow::anyhow; use either::Either; use futures::TryStreamExt; use itertools::Itertools; +use risingwave_common::array::ArrayRef; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::{Epoch, EpochPair}; -use risingwave_connector::source::cdc::jni_source; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; +use risingwave_connector::source::reader::reader::SourceReader; use risingwave_connector::source::{ BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, - SplitMetaData, + SplitMetaData, WaitCheckpointTask, }; use risingwave_hummock_sdk::HummockReadEpoch; use thiserror_ext::AsReport; @@ -92,16 +92,24 @@ impl SourceExecutor { } } - pub fn spawn_wait_epoch_worker( + async fn spawn_wait_checkpoint_worker( core: &StreamSourceCore, - ) -> UnboundedSender<(Epoch, HashMap)> { - let (wait_epoch_tx, wait_epoch_rx) = mpsc::unbounded_channel(); - let wait_epoch_worker = WaitEpochWorker { - wait_epoch_rx, + source_reader: SourceReader, + ) -> StreamExecutorResult> { + let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else { + return Ok(None); + }; + let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel(); + let wait_checkpoint_worker = WaitCheckpointWorker { + wait_checkpoint_rx, state_store: core.split_state_store.state_table.state_store().clone(), }; - tokio::spawn(wait_epoch_worker.run()); - wait_epoch_tx + tokio::spawn(wait_checkpoint_worker.run()); + Ok(Some(WaitCheckpointTaskBuilder { + wait_checkpoint_tx, + source_reader, + building_task: initial_task, + })) } pub async fn build_stream_source_reader( @@ -132,7 +140,7 @@ impl SourceExecutor { ); let stream = source_desc .source - .to_stream(state, column_ids, Arc::new(source_ctx)) + .build_stream(state, column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error); @@ -387,11 +395,8 @@ impl SourceExecutor { .build() .map_err(StreamExecutorError::connector_error)?; - let wait_epoch_tx = if source_desc.source.need_commit_offset_to_upstream() { - Some(Self::spawn_wait_epoch_worker(&core)) - } else { - None - }; + let mut wait_checkpoint_task_builder = + Self::spawn_wait_checkpoint_worker(&core, source_desc.source.clone()).await?; let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns) else { @@ -566,19 +571,12 @@ impl SourceExecutor { // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification if barrier.kind.is_checkpoint() - && !updated_splits.is_empty() - && let Some(ref tx) = wait_epoch_tx + && let Some(task_builder) = &mut wait_checkpoint_task_builder { - let mut updated_offsets = HashMap::new(); - for (split_id, split_impl) in updated_splits { - if split_impl.is_cdc_split() { - updated_offsets.insert(split_id, split_impl.get_cdc_split_offset()); - } - } + task_builder.update_task_on_checkpoint(updated_splits); tracing::debug!("epoch to wait {:?}", epoch); - tx.send((Epoch(epoch.prev), updated_offsets)) - .expect("wait_epoch_tx send success"); + task_builder.send(Epoch(epoch.prev)).await? } yield Message::Barrier(barrier); @@ -590,6 +588,10 @@ impl SourceExecutor { } Either::Right(chunk) => { + if let Some(task_builder) = &mut wait_checkpoint_task_builder { + let offset_col = chunk.column_at(offset_idx); + task_builder.update_task_on_chunk(offset_col.clone()); + } // TODO: confirm when split_offset_mapping is None let split_offset_mapping = get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx); @@ -710,18 +712,94 @@ impl Debug for SourceExecutor { } } -struct WaitEpochWorker { - wait_epoch_rx: UnboundedReceiver<(Epoch, HashMap)>, +struct WaitCheckpointTaskBuilder { + wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>, + source_reader: SourceReader, + building_task: WaitCheckpointTask, +} + +impl WaitCheckpointTaskBuilder { + fn update_task_on_chunk(&mut self, offset_col: ArrayRef) { + #[expect(clippy::single_match)] + match &mut self.building_task { + WaitCheckpointTask::AckPubsubMessage(_, arrays) => { + arrays.push(offset_col); + } + _ => {} + } + } + + fn update_task_on_checkpoint(&mut self, updated_splits: HashMap) { + #[expect(clippy::single_match)] + match &mut self.building_task { + WaitCheckpointTask::CommitCdcOffset(offsets) => { + if !updated_splits.is_empty() { + // cdc source only has one split + assert_eq!(1, updated_splits.len()); + for (split_id, split_impl) in updated_splits { + if split_impl.is_cdc_split() { + *offsets = Some((split_id, split_impl.get_cdc_split_offset())); + } else { + unreachable!() + } + } + } + } + _ => {} + } + } + + /// Send and reset the building task to a new one. + async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> { + let new_task = self + .source_reader + .create_wait_checkpoint_task() + .await? + .expect("wait checkpoint task should be created"); + self.wait_checkpoint_tx + .send((epoch, std::mem::replace(&mut self.building_task, new_task))) + .expect("wait_checkpoint_tx send should succeed"); + Ok(()) + } +} + +/// A worker used to do some work after each checkpoint epoch is committed. +/// +/// # Usage Cases +/// +/// Typically there are 2 issues related with ack on checkpoint: +/// +/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data. +/// For message queueing semantics (delete after ack), we should ack to avoid redelivery, +/// and only ack after checkpoint to avoid data loss. +/// +/// 2. Allow upstream to clean data after commit. +/// +/// See also +/// +/// ## CDC +/// +/// Commit last consumed offset to upstream DB, so that old data can be discarded. +/// +/// ## Google Pub/Sub +/// +/// Due to queueing semantics. +/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality, +/// it's quite limited unlike Kafka. +/// +/// See also +struct WaitCheckpointWorker { + wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>, state_store: S, } -impl WaitEpochWorker { +impl WaitCheckpointWorker { pub async fn run(mut self) { tracing::debug!("wait epoch worker start success"); loop { // poll the rx and wait for the epoch commit - match self.wait_epoch_rx.recv().await { - Some((epoch, updated_offsets)) => { + match self.wait_checkpoint_rx.recv().await { + Some((epoch, task)) => { tracing::debug!("start to wait epoch {}", epoch.0); let ret = self .state_store @@ -731,20 +809,7 @@ impl WaitEpochWorker { match ret { Ok(()) => { tracing::debug!(epoch = epoch.0, "wait epoch success"); - // cdc source only has one split - assert_eq!(1, updated_offsets.len()); - let (split_id, offset) = updated_offsets.into_iter().next().unwrap(); - let source_id: u64 = u64::from_str(split_id.as_ref()).unwrap(); - // notify cdc connector to commit offset - match jni_source::commit_cdc_offset(source_id, offset.clone()) { - Ok(_) => {} - Err(e) => { - tracing::error!( - error = %e.as_report(), - "source#{source_id}: failed to commit cdc offset: {offset}.", - ) - } - } + task.run().await; } Err(e) => { tracing::error!(