From dbda5f15e7a5f0d9a62cd52b4cb2f6e1b108f702 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 13 May 2024 15:26:31 +0800 Subject: [PATCH 01/13] remove useless stop_offset for pubsub source Signed-off-by: xxchan --- .../source/google_pubsub/enumerator/client.rs | 2 +- .../src/source/google_pubsub/source/reader.rs | 31 ------------------- .../src/source/google_pubsub/split.rs | 7 ++--- 3 files changed, 4 insertions(+), 36 deletions(-) diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index 25cb28909c479..d0955cb11fb5f 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -110,7 +110,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { index: i, subscription: self.subscription.to_owned(), start_offset: None, - stop_offset: None, + __deprecated_stop_offset: None, }) .collect(); diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index 0887cb06594f9..0f451d65ddf3d 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -34,7 +34,6 @@ const PUBSUB_MAX_FETCH_MESSAGES: usize = 1024; pub struct PubsubSplitReader { subscription: Subscription, - stop_offset: Option, split_id: SplitId, parser_config: ParserConfig, @@ -67,16 +66,6 @@ impl CommonSplitReader for PubsubSplitReader { continue; } - let latest_offset: NaiveDateTime = raw_chunk - .last() - .map(|m| m.message.publish_time.clone().unwrap_or_default()) - .map(|t| { - let mut t = t; - t.normalize(); - NaiveDateTime::from_timestamp_opt(t.seconds, t.nanos as u32).unwrap_or_default() - }) - .unwrap_or_default(); - let mut chunk: Vec = Vec::with_capacity(raw_chunk.len()); let mut ack_ids: Vec = Vec::with_capacity(raw_chunk.len()); @@ -95,13 +84,6 @@ impl CommonSplitReader for PubsubSplitReader { .context("failed to ack pubsub messages")?; yield chunk; - - // Stop if we've approached the stop_offset - if let Some(stop_offset) = self.stop_offset - && latest_offset >= stop_offset - { - return Ok(()); - } } } } @@ -144,22 +126,9 @@ impl SplitReader for PubsubSplitReader { .context("error seeking to pubsub offset")?; } - let stop_offset = if let Some(ref offset) = split.stop_offset { - Some( - offset - .as_str() - .parse::() - .map_err(|e| anyhow!(e)) - .map(|nanos| NaiveDateTime::from_timestamp_opt(nanos, 0).unwrap_or_default())?, - ) - } else { - None - }; - Ok(Self { subscription, split_id: split.id(), - stop_offset, parser_config, source_ctx, }) diff --git a/src/connector/src/source/google_pubsub/split.rs b/src/connector/src/source/google_pubsub/split.rs index 14c40150488a1..9c7b77dd7e2f3 100644 --- a/src/connector/src/source/google_pubsub/split.rs +++ b/src/connector/src/source/google_pubsub/split.rs @@ -29,10 +29,9 @@ pub struct PubsubSplit { /// pubsub reader. pub(crate) start_offset: Option, - /// `stop_offset` is a numeric timestamp. - /// When not `None`, the `PubsubReader` stops reading messages when the `offset` property of - /// the `SourceMessage` is greater than or equal to the `stop_offset`. - pub(crate) stop_offset: Option, + #[serde(rename = "stop_offset")] + #[serde(skip_serializing)] + pub(crate) __deprecated_stop_offset: Option, } impl SplitMetaData for PubsubSplit { From 1c0a4fe2d40beb2f9c4ab8e2891f48bd72eb2f28 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 14 May 2024 01:44:18 +0800 Subject: [PATCH 02/13] pubsub source: ack on checkpoint - (message) change the offset column from ts to ack_id - (split) don't update start_offset - (reader) don't ack on read - refactor WaitCheckpointWorker to support update_task_on_chunk Signed-off-by: xxchan --- src/batch/src/executor/source.rs | 2 +- .../source/google_pubsub/enumerator/client.rs | 17 +-- src/connector/src/source/google_pubsub/mod.rs | 18 ++- .../source/google_pubsub/source/message.rs | 4 +- .../src/source/google_pubsub/source/reader.rs | 20 +-- .../src/source/google_pubsub/split.rs | 9 +- src/connector/src/source/mod.rs | 51 +++++++ src/connector/src/source/reader/reader.rs | 21 ++- .../src/executor/source/fetch_executor.rs | 2 +- .../source/source_backfill_executor.rs | 2 +- .../src/executor/source/source_executor.rs | 138 ++++++++++++------ 11 files changed, 194 insertions(+), 90 deletions(-) diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 2baad2a625974..357d7aae84d3f 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -176,7 +176,7 @@ impl SourceExecutor { )); let stream = self .source - .to_stream(Some(self.split_list), self.column_ids, source_ctx) + .build_stream(Some(self.split_list), self.column_ids, source_ctx) .await?; #[for_await] diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index d0955cb11fb5f..641f201cbbf2b 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -15,7 +15,6 @@ use anyhow::Context; use async_trait::async_trait; use chrono::{TimeZone, Utc}; -use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_pubsub::subscription::{SeekTo, SubscriptionConfig}; use risingwave_common::bail; @@ -39,27 +38,17 @@ impl SplitEnumerator for PubsubSplitEnumerator { properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> ConnectorResult { - let subscription = properties.subscription.to_owned(); - if properties.credentials.is_none() && properties.emulator_host.is_none() { bail!("credentials must be set if not using the pubsub emulator") } - properties.initialize_env(); - - // Validate config - let config = ClientConfig::default().with_auth().await?; - let client = Client::new(config) - .await - .context("error initializing pubsub client")?; - - let sub = client.subscription(&subscription); + let sub = properties.subscription_client().await?; if !sub .exists(None) .await .context("error checking subscription validity")? { - bail!("subscription {} does not exist", &subscription) + bail!("subscription {} does not exist", &sub.id()) } // We need the `retain_acked_messages` configuration to be true to seek back to timestamps @@ -98,7 +87,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { } Ok(Self { - subscription, + subscription: properties.subscription.to_owned(), split_count: 1, }) } diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index 0a49fa6467f66..335e33a0ae52d 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -14,6 +14,9 @@ use std::collections::HashMap; +use anyhow::Context; +use google_cloud_pubsub::client::{Client, ClientConfig}; +use google_cloud_pubsub::subscription::Subscription; use serde::Deserialize; pub mod enumerator; @@ -25,6 +28,7 @@ pub use source::*; pub use split::*; use with_options::WithOptions; +use crate::error::ConnectorResult; use crate::source::SourceProperties; pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub"; @@ -32,8 +36,6 @@ pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub"; #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct PubsubProperties { /// pubsub subscription to consume messages from - /// The subscription should be configured with the `retain-on-ack` property to enable - /// message recovery within risingwave. #[serde(rename = "pubsub.subscription")] pub subscription: String, @@ -97,6 +99,18 @@ impl PubsubProperties { std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", credentials); } } + + pub(crate) async fn subscription_client(&self) -> ConnectorResult { + self.initialize_env(); + + // Validate config + let config = ClientConfig::default().with_auth().await?; + let client = Client::new(config) + .await + .context("error initializing pubsub client")?; + + Ok(client.subscription(&self.subscription)) + } } #[cfg(test)] diff --git a/src/connector/src/source/google_pubsub/source/message.rs b/src/connector/src/source/google_pubsub/source/message.rs index cf14757b3668b..10194453bebbc 100644 --- a/src/connector/src/source/google_pubsub/source/message.rs +++ b/src/connector/src/source/google_pubsub/source/message.rs @@ -31,6 +31,7 @@ impl From for SourceMessage { fn from(tagged_message: TaggedReceivedMessage) -> Self { let TaggedReceivedMessage(split_id, message) = tagged_message; + let ack_id = message.ack_id().to_string(); let timestamp = message .message .publish_time @@ -50,8 +51,9 @@ impl From for SourceMessage { _ => Some(payload), } }, - offset: timestamp.timestamp_nanos_opt().unwrap().to_string(), + offset: ack_id, split_id, + // What's the usage of this? meta: SourceMeta::GooglePubsub(GooglePubsubMeta { timestamp: Some(timestamp.timestamp_millis()), }), diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index 0f451d65ddf3d..68f9ba371fec0 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::{anyhow, Context}; +use anyhow::Context; use async_trait::async_trait; -use chrono::{NaiveDateTime, TimeZone, Utc}; +use chrono::{TimeZone, Utc}; use futures_async_stream::try_stream; -use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_pubsub::subscription::{SeekTo, Subscription}; use risingwave_common::{bail, ensure}; use tonic::Code; @@ -67,22 +66,14 @@ impl CommonSplitReader for PubsubSplitReader { } let mut chunk: Vec = Vec::with_capacity(raw_chunk.len()); - let mut ack_ids: Vec = Vec::with_capacity(raw_chunk.len()); for message in raw_chunk { - ack_ids.push(message.ack_id().into()); chunk.push(SourceMessage::from(TaggedReceivedMessage( self.split_id.clone(), message, ))); } - self.subscription - .ack(ack_ids) - .await - .map_err(|e| anyhow!(e)) - .context("failed to ack pubsub messages")?; - yield chunk; } } @@ -106,12 +97,7 @@ impl SplitReader for PubsubSplitReader { ); let split = splits.into_iter().next().unwrap(); - // Set environment variables consumed by `google_cloud_pubsub` - properties.initialize_env(); - - let config = ClientConfig::default().with_auth().await?; - let client = Client::new(config).await.map_err(|e| anyhow!(e))?; - let subscription = client.subscription(&properties.subscription); + let subscription = properties.subscription_client().await?; if let Some(ref offset) = split.start_offset { let timestamp = offset diff --git a/src/connector/src/source/google_pubsub/split.rs b/src/connector/src/source/google_pubsub/split.rs index 9c7b77dd7e2f3..418ef9d138356 100644 --- a/src/connector/src/source/google_pubsub/split.rs +++ b/src/connector/src/source/google_pubsub/split.rs @@ -20,6 +20,8 @@ use crate::source::{SplitId, SplitMetaData}; #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] pub struct PubsubSplit { + // XXX: `index` and `subscription` seems also not useful. It's only for `SplitMetaData::id`. + // Is the split id useful? pub(crate) index: u32, pub(crate) subscription: String, @@ -47,8 +49,11 @@ impl SplitMetaData for PubsubSplit { format!("{}-{}", self.subscription, self.index).into() } - fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> { - self.start_offset = Some(last_seen_offset); + /// No-op. Actually `PubsubSplit` doesn't maintain any state. It's fully managed by Pubsub. + /// One subscription is like one Kafka consumer group. + fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> { + // forcefully set previously persisted start_offset to None + self.start_offset = None; Ok(()) } } diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index f965d373d9306..fca7876680019 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -25,8 +25,11 @@ pub mod mqtt; pub mod nats; pub mod nexmark; pub mod pulsar; +use std::collections::HashMap; + pub use base::{UPSTREAM_SOURCE_KEY, *}; pub(crate) use common::*; +use google_cloud_pubsub::subscription::Subscription; pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR; pub use kafka::KAFKA_CONNECTOR; pub use kinesis::KINESIS_CONNECTOR; @@ -39,6 +42,8 @@ pub mod reader; pub mod test_source; pub use manager::{SourceColumnDesc, SourceColumnType}; +use risingwave_common::array::{Array, ArrayRef}; +use thiserror_ext::AsReport; pub use crate::source::filesystem::opendal_source::{ GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, @@ -68,3 +73,49 @@ pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool PREFIXES.iter().any(|prefix| key.starts_with(prefix)) || (key == "endpoint" && !connector.eq_ignore_ascii_case(KINESIS_CONNECTOR)) } + +/// Tasks executed by `WaitCheckpointWorker` +pub enum WaitCheckpointTask { + CommitCdcOffset(Option<(SplitId, String)>), + AckPubsubMessage(Subscription, Vec), +} + +impl WaitCheckpointTask { + pub async fn run(self) { + use std::str::FromStr; + match self { + WaitCheckpointTask::CommitCdcOffset(updated_offset) => { + if let Some((split_id, offset)) = updated_offset { + let source_id: u64 = u64::from_str(split_id.as_ref()).unwrap(); + // notify cdc connector to commit offset + match cdc::jni_source::commit_cdc_offset(source_id, offset.clone()) { + Ok(()) => {} + Err(e) => { + tracing::error!( + error = %e.as_report(), + "source#{source_id}: failed to commit cdc offset: {offset}.", + ) + } + } + } + } + WaitCheckpointTask::AckPubsubMessage(subscription, ack_id_arrs) => { + let mut ack_ids: Vec = vec![]; + for arr in ack_id_arrs { + for ack_id in arr.as_utf8().iter().flatten() { + ack_ids.push(ack_id.to_string()) + } + } + match subscription.ack(ack_ids).await { + Ok(()) => {} + Err(e) => { + tracing::error!( + error = %e.as_report(), + "failed to ack pubsub messages", + ) + } + } + } + } + } +} diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 9a1f94de0fb8d..eb0ba30d12431 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -36,7 +36,7 @@ use crate::source::filesystem::opendal_source::{ use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ create_split_reader, BoxChunkSourceStream, BoxTryStream, Column, ConnectorProperties, - ConnectorState, SourceColumnDesc, SourceContext, SplitReader, + ConnectorState, SourceColumnDesc, SourceContext, SplitReader, WaitCheckpointTask, }; #[derive(Clone, Debug)] @@ -105,13 +105,22 @@ impl SourceReader { } } - /// Postgres and Oracle connectors need to commit the offset to upstream. - /// And we will spawn a separate tokio task to wait for epoch commit and commit the source offset. - pub fn need_commit_offset_to_upstream(&self) -> bool { - matches!(&self.config, ConnectorProperties::PostgresCdc(_)) + /// Refer to `WaitCheckpointWorker` for more details. + pub async fn create_wait_checkpoint_task(&self) -> ConnectorResult> { + Ok(match &self.config { + ConnectorProperties::PostgresCdc(_prop) => { + Some(WaitCheckpointTask::CommitCdcOffset(None)) + } + ConnectorProperties::GooglePubsub(prop) => Some(WaitCheckpointTask::AckPubsubMessage( + prop.subscription_client().await?, + vec![], + )), + _ => None, + }) } - pub async fn to_stream( + /// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s). + pub async fn build_stream( &self, state: ConnectorState, column_ids: Vec, diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index e4fe368197693..b4c006469e650 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -154,7 +154,7 @@ impl FsFetchExecutor { ) -> StreamExecutorResult { let stream = source_desc .source - .to_stream(batch, column_ids, Arc::new(source_ctx)) + .build_stream(batch, column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error)?; Ok(apply_rate_limit(stream, rate_limit_rps).boxed()) diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 68dadf5806f30..73e6c4a1b2523 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -218,7 +218,7 @@ impl SourceBackfillExecutorInner { ); let stream = source_desc .source - .to_stream(Some(splits), column_ids, Arc::new(source_ctx)) + .build_stream(Some(splits), column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error)?; Ok(apply_rate_limit(stream, self.rate_limit_rps).boxed()) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 455b6e0fc33e4..179769a478d59 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,22 +13,22 @@ // limitations under the License. use std::collections::HashMap; -use std::str::FromStr; use std::time::Duration; use anyhow::anyhow; use either::Either; use futures::TryStreamExt; use itertools::Itertools; +use risingwave_common::array::ArrayRef; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::{Epoch, EpochPair}; -use risingwave_connector::source::cdc::jni_source; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; +use risingwave_connector::source::reader::reader::SourceReader; use risingwave_connector::source::{ BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, - SplitMetaData, + SplitMetaData, WaitCheckpointTask, }; use risingwave_hummock_sdk::HummockReadEpoch; use thiserror_ext::AsReport; @@ -92,16 +92,24 @@ impl SourceExecutor { } } - pub fn spawn_wait_epoch_worker( + async fn spawn_wait_checkpoint_worker( core: &StreamSourceCore, - ) -> UnboundedSender<(Epoch, HashMap)> { - let (wait_epoch_tx, wait_epoch_rx) = mpsc::unbounded_channel(); - let wait_epoch_worker = WaitEpochWorker { - wait_epoch_rx, + source_reader: SourceReader, + ) -> StreamExecutorResult> { + let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else { + return Ok(None); + }; + let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel(); + let wait_checkpoint_worker = WaitCheckpointWorker { + wait_checkpoint_rx, state_store: core.split_state_store.state_table.state_store().clone(), }; - tokio::spawn(wait_epoch_worker.run()); - wait_epoch_tx + tokio::spawn(wait_checkpoint_worker.run()); + Ok(Some(WaitCheckpointTaskBuilder { + wait_checkpoint_tx, + source_reader, + building_task: initial_task, + })) } pub async fn build_stream_source_reader( @@ -132,7 +140,7 @@ impl SourceExecutor { ); let stream = source_desc .source - .to_stream(state, column_ids, Arc::new(source_ctx)) + .build_stream(state, column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error); @@ -387,11 +395,8 @@ impl SourceExecutor { .build() .map_err(StreamExecutorError::connector_error)?; - let wait_epoch_tx = if source_desc.source.need_commit_offset_to_upstream() { - Some(Self::spawn_wait_epoch_worker(&core)) - } else { - None - }; + let mut wait_checkpoint_task_builder = + Self::spawn_wait_checkpoint_worker(&core, source_desc.source.clone()).await?; let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns) else { @@ -548,19 +553,12 @@ impl SourceExecutor { // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification if barrier.kind.is_checkpoint() - && !updated_splits.is_empty() - && let Some(ref tx) = wait_epoch_tx + && let Some(task_builder) = &mut wait_checkpoint_task_builder { - let mut updated_offsets = HashMap::new(); - for (split_id, split_impl) in updated_splits { - if split_impl.is_cdc_split() { - updated_offsets.insert(split_id, split_impl.get_cdc_split_offset()); - } - } + task_builder.update_task_on_checkpoint(updated_splits); tracing::debug!("epoch to wait {:?}", epoch); - tx.send((Epoch(epoch.prev), updated_offsets)) - .expect("wait_epoch_tx send success"); + task_builder.send(Epoch(epoch.prev)).await? } yield Message::Barrier(barrier); @@ -572,6 +570,10 @@ impl SourceExecutor { } Either::Right(chunk) => { + if let Some(task_builder) = &mut wait_checkpoint_task_builder { + let offset_col = chunk.column_at(offset_idx); + task_builder.update_task_on_chunk(offset_col.clone()); + } // TODO: confirm when split_offset_mapping is None let split_offset_mapping = get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx); @@ -692,18 +694,77 @@ impl Debug for SourceExecutor { } } -struct WaitEpochWorker { - wait_epoch_rx: UnboundedReceiver<(Epoch, HashMap)>, +struct WaitCheckpointTaskBuilder { + wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>, + source_reader: SourceReader, + building_task: WaitCheckpointTask, +} + +impl WaitCheckpointTaskBuilder { + fn update_task_on_chunk(&mut self, offset_col: ArrayRef) { + #[expect(clippy::single_match)] + match &mut self.building_task { + WaitCheckpointTask::AckPubsubMessage(_, arrays) => { + arrays.push(offset_col); + } + _ => {} + } + } + + fn update_task_on_checkpoint(&mut self, updated_splits: HashMap) { + #[expect(clippy::single_match)] + match &mut self.building_task { + WaitCheckpointTask::CommitCdcOffset(offsets) => { + if !updated_splits.is_empty() { + // cdc source only has one split + assert_eq!(1, updated_splits.len()); + for (split_id, split_impl) in updated_splits { + if split_impl.is_cdc_split() { + *offsets = Some((split_id, split_impl.get_cdc_split_offset())); + } else { + unreachable!() + } + } + } + } + _ => {} + } + } + + /// Send and reset the building task to a new one. + async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> { + let new_task = self + .source_reader + .create_wait_checkpoint_task() + .await? + .expect("wait checkpoint task should be created"); + self.wait_checkpoint_tx + .send((epoch, std::mem::replace(&mut self.building_task, new_task))) + .expect("wait_checkpoint_tx send should succeed"); + Ok(()) + } +} + +/// A worker used to do some work after each checkpoint epoch is committed. +/// +/// Useages: +/// - CDC: Commit last consumed offset to upstream DB, so that old data can be discarded. +/// - Pubsub: Acknowledge consumed messages, so they won't be redelivered. +/// If we acknowledge immediately after reading the message, the message cannot be replayed, +/// thus not at-least-once (unless `retain_acked_messages` is enabled). +/// See also +struct WaitCheckpointWorker { + wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>, state_store: S, } -impl WaitEpochWorker { +impl WaitCheckpointWorker { pub async fn run(mut self) { tracing::debug!("wait epoch worker start success"); loop { // poll the rx and wait for the epoch commit - match self.wait_epoch_rx.recv().await { - Some((epoch, updated_offsets)) => { + match self.wait_checkpoint_rx.recv().await { + Some((epoch, task)) => { tracing::debug!("start to wait epoch {}", epoch.0); let ret = self .state_store @@ -713,20 +774,7 @@ impl WaitEpochWorker { match ret { Ok(()) => { tracing::debug!(epoch = epoch.0, "wait epoch success"); - // cdc source only has one split - assert_eq!(1, updated_offsets.len()); - let (split_id, offset) = updated_offsets.into_iter().next().unwrap(); - let source_id: u64 = u64::from_str(split_id.as_ref()).unwrap(); - // notify cdc connector to commit offset - match jni_source::commit_cdc_offset(source_id, offset.clone()) { - Ok(_) => {} - Err(e) => { - tracing::error!( - error = %e.as_report(), - "source#{source_id}: failed to commit cdc offset: {offset}.", - ) - } - } + task.run().await; } Err(e) => { tracing::error!( From f0d8787ac059e8887359ae3accc98d1b4b92af1f Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 14 May 2024 02:14:06 +0800 Subject: [PATCH 03/13] add (back) pubsub.parallelism WITH option Signed-off-by: xxchan --- .../src/source/google_pubsub/enumerator/client.rs | 15 ++++++++++++++- src/connector/src/source/google_pubsub/mod.rs | 7 ++++++- src/connector/with_options_source.yaml | 6 +++++- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index 641f201cbbf2b..db4092290a795 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -38,6 +38,19 @@ impl SplitEnumerator for PubsubSplitEnumerator { properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> ConnectorResult { + let split_count = match &properties.parallelism { + Some(parallelism) => { + let parallelism = parallelism + .parse::() + .context("error when parsing parallelism")?; + if parallelism < 1 { + bail!("parallelism must be >= 1"); + } + parallelism + } + None => 1, + }; + if properties.credentials.is_none() && properties.emulator_host.is_none() { bail!("credentials must be set if not using the pubsub emulator") } @@ -88,7 +101,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { Ok(Self { subscription: properties.subscription.to_owned(), - split_count: 1, + split_count, }) } diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index 335e33a0ae52d..69d561a7dea16 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -70,6 +70,11 @@ pub struct PubsubProperties { #[serde(rename = "pubsub.start_snapshot")] pub start_snapshot: Option, + /// `parallelism` is the number of parallel consumers to run for the subscription. + /// TODO: use system parallelism if not set + #[serde(rename = "pubsub.parallelism")] + pub parallelism: Option, + #[serde(flatten)] pub unknown_fields: HashMap, } @@ -137,7 +142,7 @@ mod tests { start_offset: None, start_snapshot: None, subscription: String::from("test-subscription"), - + parallelism: None, unknown_fields: Default::default(), }; diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index c62db228eeb02..1fd50af687967 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -596,7 +596,7 @@ PubsubProperties: fields: - name: pubsub.subscription field_type: String - comments: pubsub subscription to consume messages from The subscription should be configured with the `retain-on-ack` property to enable message recovery within risingwave. + comments: pubsub subscription to consume messages from required: true - name: pubsub.emulator_host field_type: String @@ -614,6 +614,10 @@ PubsubProperties: field_type: String comments: '`start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek to the snapshot before starting consumption. Snapshots are the preferred seeking mechanism in pub/sub because they guarantee retention of: - All unacknowledged messages at the time of their creation. - All messages created after their creation. Besides retention guarantees, timestamps are also more precise than timestamp-based seeks. See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' required: false + - name: pubsub.parallelism + field_type: String + comments: '`parallelism` is the number of parallel consumers to run for the subscription. TODO: use system parallelism if not set' + required: false PulsarProperties: fields: - name: scan.startup.mode From 25f8fa6ae29604a1b1272c4d07c3bb7e322dd61f Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 14 May 2024 14:46:16 +0800 Subject: [PATCH 04/13] lint Signed-off-by: xxchan --- src/connector/src/source/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index fca7876680019..42789f51070e2 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -25,7 +25,6 @@ pub mod mqtt; pub mod nats; pub mod nexmark; pub mod pulsar; -use std::collections::HashMap; pub use base::{UPSTREAM_SOURCE_KEY, *}; pub(crate) use common::*; From 2d29d50e99c60c49cc31a28442926cdb48225c57 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 14 May 2024 17:24:59 +0800 Subject: [PATCH 05/13] add doc & test Signed-off-by: xxchan --- e2e_test/source_inline/pubsub/prepare-data.rs | 89 ++++----- e2e_test/source_inline/pubsub/pubsub.slt | 170 +++++++++++++----- src/config/ci.toml | 1 + .../source/google_pubsub/enumerator/client.rs | 16 +- src/connector/src/source/google_pubsub/mod.rs | 9 +- src/connector/src/source/mod.rs | 4 +- .../src/executor/source/source_executor.rs | 29 ++- 7 files changed, 199 insertions(+), 119 deletions(-) diff --git a/e2e_test/source_inline/pubsub/prepare-data.rs b/e2e_test/source_inline/pubsub/prepare-data.rs index 81bd1ba73e41e..c48307e375f5d 100755 --- a/e2e_test/source_inline/pubsub/prepare-data.rs +++ b/e2e_test/source_inline/pubsub/prepare-data.rs @@ -25,65 +25,52 @@ const SUBSCRIPTION_COUNT: usize = 50; #[tokio::main] async fn main() -> anyhow::Result<()> { + let args: Vec = std::env::args().collect(); + let command = args[1].as_str(); + std::env::set_var("PUBSUB_EMULATOR_HOST", "127.0.0.1:5980"); let client = Client::new(Default::default()).await?; - // delete and create "test-topic" let topic = client.topic(TOPIC); - for subscription in topic.subscriptions(None).await? { - subscription.delete(None).await?; - } - let _ = topic.delete(None).await; - topic.create(Some(Default::default()), None).await?; - for i in 0..SUBSCRIPTION_COUNT { - let _ = client - .create_subscription( - format!("test-subscription-{}", i).as_str(), - TOPIC, - SubscriptionConfig { - retain_acked_messages: true, - ..Default::default() - }, - None, - ) - .await?; - } + if command == "create" { + // delete and create "test-topic" + for subscription in topic.subscriptions(None).await? { + subscription.delete(None).await?; + } - let publisher = topic.new_publisher(Default::default()); - for line in DATA.lines() { - let a = publisher - .publish(PubsubMessage { - data: line.to_string().into_bytes(), - ..Default::default() - }) - .await; - a.get().await?; - println!("published {}", line); + let _ = topic.delete(None).await; + topic.create(Some(Default::default()), None).await?; + for i in 0..SUBSCRIPTION_COUNT { + let _ = client + .create_subscription( + format!("test-subscription-{}", i).as_str(), + TOPIC, + SubscriptionConfig { + retain_acked_messages: false, + ..Default::default() + }, + None, + ) + .await?; + } + } else if command == "publish" { + let publisher = topic.new_publisher(Default::default()); + for i in 0..10 { + let data = format!("{{\"v1\":{i},\"v2\":\"name{i}\"}}"); + let a = publisher + .publish(PubsubMessage { + data: data.to_string().into_bytes(), + ..Default::default() + }) + .await; + a.get().await?; + println!("published {}", data); + } + } else { + panic!("unknown command {command}"); } Ok(()) } - -const DATA: &str = r#"{"v1":1,"v2":"name0"} -{"v1":2,"v2":"name0"} -{"v1":6,"v2":"name3"} -{"v1":0,"v2":"name5"} -{"v1":5,"v2":"name8"} -{"v1":6,"v2":"name4"} -{"v1":8,"v2":"name9"} -{"v1":9,"v2":"name2"} -{"v1":4,"v2":"name6"} -{"v1":5,"v2":"name3"} -{"v1":8,"v2":"name8"} -{"v1":9,"v2":"name2"} -{"v1":2,"v2":"name3"} -{"v1":4,"v2":"name7"} -{"v1":7,"v2":"name0"} -{"v1":0,"v2":"name9"} -{"v1":3,"v2":"name2"} -{"v1":7,"v2":"name5"} -{"v1":1,"v2":"name7"} -{"v1":3,"v2":"name9"} -"#; diff --git a/e2e_test/source_inline/pubsub/pubsub.slt b/e2e_test/source_inline/pubsub/pubsub.slt index cfb5f551cf36a..b2131b0333398 100644 --- a/e2e_test/source_inline/pubsub/pubsub.slt +++ b/e2e_test/source_inline/pubsub/pubsub.slt @@ -1,32 +1,45 @@ control substitution on system ok -e2e_test/source_inline/pubsub/prepare-data.rs +e2e_test/source_inline/pubsub/prepare-data.rs create + +statement error missing field `pubsub.subscription` +CREATE SOURCE s (v1 int, v2 varchar) WITH ( + connector = 'google_pubsub' +) FORMAT PLAIN ENCODE JSON; + +statement error credentials must be set if not using the pubsub emulator +CREATE SOURCE s (v1 int, v2 varchar) WITH ( + connector = 'google_pubsub', + pubsub.subscription = 'test-subscription-1' +) FORMAT PLAIN ENCODE JSON; -# fail with invalid emulator_host statement error failed to lookup address information -CREATE TABLE s1 (v1 int, v2 varchar) WITH ( +CREATE TABLE s (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-1', pubsub.emulator_host = 'invalid_host:5981' ) FORMAT PLAIN ENCODE JSON; -statement ok -CREATE TABLE s1 (v1 int, v2 varchar) WITH ( +statement error subscription test-subscription-not-exist does not exist +CREATE TABLE s (v1 int, v2 varchar) WITH ( ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, - pubsub.subscription = 'test-subscription-1', + pubsub.subscription = 'test-subscription-not-exist', ) FORMAT PLAIN ENCODE JSON; -statement ok -SELECT * FROM s1; +# fail if both start_offset and start_snapshot are provided +statement error specify at most one of start_offset or start_snapshot +CREATE TABLE s (v1 int, v2 varchar) WITH ( + ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, + pubsub.subscription = 'test-subscription-3', + pubsub.start_offset.nanos = '121212', + pubsub.start_snapshot = 'snapshot-that-doesnt-exist' +) FORMAT PLAIN ENCODE JSON; statement ok -DROP TABLE s1; - -statement error subscription test-subscription-not-exist does not exist -CREATE TABLE s2 (v1 int, v2 varchar) WITH ( +CREATE TABLE s1 (v1 int, v2 varchar) WITH ( ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, - pubsub.subscription = 'test-subscription-not-exist', + pubsub.subscription = 'test-subscription-1', ) FORMAT PLAIN ENCODE JSON; statement ok @@ -35,46 +48,115 @@ CREATE TABLE s2 (v1 int, v2 varchar) WITH ( pubsub.subscription = 'test-subscription-2', ) FORMAT PLAIN ENCODE JSON; -# fail if both start_offset and start_snapshot are provided -statement error specify at most one of start_offset or start_snapshot -CREATE TABLE s3 (v1 int, v2 varchar) WITH ( - ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, - pubsub.subscription = 'test-subscription-3', - pubsub.start_offset.nanos = '121212', - pubsub.start_snapshot = 'snapshot-that-doesnt-exist' -) FORMAT PLAIN ENCODE JSON; +# We want to test Pub/Sub ack messages on checkpoint, so disable checkpointing here. +# Note that DDL & flush will trigger checkpoint. +statement ok +ALTER SYSTEM SET checkpoint_frequency TO 114514; -# wait for source +# We publish data after the tables are created, because DDL will trigger checkpoint. +system ok +e2e_test/source_inline/pubsub/prepare-data.rs publish + +# default ack timeout is 10s. Let it redeliver once. +sleep 15s + +# visibility mode is checkpoint +query IT rowsort +select count(*) from s1; +---- +0 + +statement ok +set visibility_mode = 'all'; + +query IT rowsort +select v1, v2, count(*) FROM s1 group by v1, v2; +---- +0 name0 2 +1 name1 2 +2 name2 2 +3 name3 2 +4 name4 2 +5 name5 2 +6 name6 2 +7 name7 2 +8 name8 2 +9 name9 2 + + +statement ok +RECOVER; + +sleep 1s + +statement ok +flush; + +# After recovery, uncheckpointed data in RisingWave is lost. +query IT rowsort +select count(*) from s1; +---- +0 + +# Wait for another redelivery sleep 10s -# flush data into storage +query IT rowsort +select v1, v2, count(*) FROM s1 group by v1, v2; +---- +0 name0 1 +1 name1 1 +2 name2 1 +3 name3 1 +4 name4 1 +5 name5 1 +6 name6 1 +7 name7 1 +8 name8 1 +9 name9 1 + +# flush will force a checkpoint (and ack Pub/Sub messages on checkpoint) statement ok flush; query IT rowsort -select v1, v2 FROM s2; +select v1, v2, count(*) FROM s1 group by v1, v2; +---- +0 name0 1 +1 name1 1 +2 name2 1 +3 name3 1 +4 name4 1 +5 name5 1 +6 name6 1 +7 name7 1 +8 name8 1 +9 name9 1 + +sleep 15s + +# no redelivery any more +query IT rowsort +select v1, v2, count(*) FROM s1 group by v1, v2; ---- -0 name5 -0 name9 -1 name0 -1 name7 -2 name0 -2 name3 -3 name2 -3 name9 -4 name6 -4 name7 -5 name3 -5 name8 -6 name3 -6 name4 -7 name0 -7 name5 -8 name8 -8 name9 -9 name2 -9 name2 +0 name0 1 +1 name1 1 +2 name2 1 +3 name3 1 +4 name4 1 +5 name5 1 +6 name6 1 +7 name7 1 +8 name8 1 +9 name9 1 + + +statement ok +DROP TABLE s1; statement ok DROP TABLE s2; +# Restore to the value in src/config/ci.toml +statement ok +ALTER SYSTEM SET checkpoint_frequency TO 5; diff --git a/src/config/ci.toml b/src/config/ci.toml index 02e1509546f27..08b159c115b8c 100644 --- a/src/config/ci.toml +++ b/src/config/ci.toml @@ -18,5 +18,6 @@ imm_merge_threshold = 2 [system] barrier_interval_ms = 250 +# If this is changed, remember to also change e2e_test/source_inline/pubsub/pubsub.slt checkpoint_frequency = 5 max_concurrent_creating_streaming_jobs = 0 \ No newline at end of file diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index db4092290a795..77fb9bb40adac 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -15,7 +15,7 @@ use anyhow::Context; use async_trait::async_trait; use chrono::{TimeZone, Utc}; -use google_cloud_pubsub::subscription::{SeekTo, SubscriptionConfig}; +use google_cloud_pubsub::subscription::SeekTo; use risingwave_common::bail; use crate::error::ConnectorResult; @@ -64,20 +64,6 @@ impl SplitEnumerator for PubsubSplitEnumerator { bail!("subscription {} does not exist", &sub.id()) } - // We need the `retain_acked_messages` configuration to be true to seek back to timestamps - // as done in the [`PubsubSplitReader`] and here. - let (_, subscription_config) = sub - .config(None) - .await - .context("failed to fetch subscription config")?; - if let SubscriptionConfig { - retain_acked_messages: false, - .. - } = subscription_config - { - bail!("subscription must be configured with retain_acked_messages set to true") - } - let seek_to = match (properties.start_offset, properties.start_snapshot) { (None, None) => None, (Some(start_offset), None) => { diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index 69d561a7dea16..6b4cc66626885 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -35,7 +35,12 @@ pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub"; #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct PubsubProperties { - /// pubsub subscription to consume messages from + /// Pub/Sub subscription to consume messages from. + /// + /// Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from + /// the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV. + /// Otherwise, different MVs on the same Source will both receive part of the messages. + /// TODO: check and enforce this on Meta. #[serde(rename = "pubsub.subscription")] pub subscription: String, @@ -64,7 +69,7 @@ pub struct PubsubProperties { /// in pub/sub because they guarantee retention of: /// - All unacknowledged messages at the time of their creation. /// - All messages created after their creation. - /// Besides retention guarantees, timestamps are also more precise than timestamp-based seeks. + /// Besides retention guarantees, snapshots are also more precise than timestamp-based seeks. /// See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for /// more details. #[serde(rename = "pubsub.start_snapshot")] diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 42789f51070e2..7714e2dc75db6 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -106,7 +106,9 @@ impl WaitCheckpointTask { } } match subscription.ack(ack_ids).await { - Ok(()) => {} + Ok(()) => { + tracing::debug!("ack pubsub messages successfully") + } Err(e) => { tracing::error!( error = %e.as_report(), diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 179769a478d59..71a1014e7923e 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -747,12 +747,29 @@ impl WaitCheckpointTaskBuilder { /// A worker used to do some work after each checkpoint epoch is committed. /// -/// Useages: -/// - CDC: Commit last consumed offset to upstream DB, so that old data can be discarded. -/// - Pubsub: Acknowledge consumed messages, so they won't be redelivered. -/// If we acknowledge immediately after reading the message, the message cannot be replayed, -/// thus not at-least-once (unless `retain_acked_messages` is enabled). -/// See also +/// # Usage Cases +/// +/// Typically there are 2 issues related with ack on checkpoint: +/// +/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data. +/// For message queueing semantics (delete after ack), we should ack to avoid redelivery, +/// and only ack after checkpoint to avoid data loss. +/// +/// 2. Allow upstream to clean data after commit. +/// +/// See also +/// +/// ## CDC +/// +/// Commit last consumed offset to upstream DB, so that old data can be discarded. +/// +/// ## Google Pub/Sub +/// +/// Due to queueing semantics. +/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality, +/// it's quite limited unlike Kafka. +/// +/// See also struct WaitCheckpointWorker { wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>, state_store: S, From 36bcbd6c7b17571a82b7c3b472d245e1734d42d2 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 14 May 2024 17:32:21 +0800 Subject: [PATCH 06/13] test parallel Signed-off-by: xxchan --- e2e_test/source_inline/pubsub/pubsub.slt | 9 +-------- src/connector/src/source/mod.rs | 5 ++--- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/e2e_test/source_inline/pubsub/pubsub.slt b/e2e_test/source_inline/pubsub/pubsub.slt index b2131b0333398..d77fae509efe5 100644 --- a/e2e_test/source_inline/pubsub/pubsub.slt +++ b/e2e_test/source_inline/pubsub/pubsub.slt @@ -40,13 +40,9 @@ statement ok CREATE TABLE s1 (v1 int, v2 varchar) WITH ( ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, pubsub.subscription = 'test-subscription-1', + pubsub.parallelism = 5 ) FORMAT PLAIN ENCODE JSON; -statement ok -CREATE TABLE s2 (v1 int, v2 varchar) WITH ( - ${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON}, - pubsub.subscription = 'test-subscription-2', -) FORMAT PLAIN ENCODE JSON; # We want to test Pub/Sub ack messages on checkpoint, so disable checkpointing here. # Note that DDL & flush will trigger checkpoint. @@ -154,9 +150,6 @@ select v1, v2, count(*) FROM s1 group by v1, v2; statement ok DROP TABLE s1; -statement ok -DROP TABLE s2; - # Restore to the value in src/config/ci.toml statement ok ALTER SYSTEM SET checkpoint_frequency TO 5; diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 7714e2dc75db6..13c0dcca93583 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -105,10 +105,9 @@ impl WaitCheckpointTask { ack_ids.push(ack_id.to_string()) } } + tracing::trace!("acking pubsub messages {:?}", ack_ids); match subscription.ack(ack_ids).await { - Ok(()) => { - tracing::debug!("ack pubsub messages successfully") - } + Ok(()) => {} Err(e) => { tracing::error!( error = %e.as_report(), From 80a678a47bdac885ffb5d9d32281df4a4c1af0ce Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 14 May 2024 17:47:39 +0800 Subject: [PATCH 07/13] update with_options yaml Signed-off-by: xxchan --- src/connector/with_options_source.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 1fd50af687967..35a1e7cc3882a 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -596,7 +596,7 @@ PubsubProperties: fields: - name: pubsub.subscription field_type: String - comments: pubsub subscription to consume messages from + comments: 'Pub/Sub subscription to consume messages from. Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV. Otherwise, different MVs on the same Source will both receive part of the messages. TODO: check and enforce this on Meta.' required: true - name: pubsub.emulator_host field_type: String @@ -612,7 +612,7 @@ PubsubProperties: required: false - name: pubsub.start_snapshot field_type: String - comments: '`start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek to the snapshot before starting consumption. Snapshots are the preferred seeking mechanism in pub/sub because they guarantee retention of: - All unacknowledged messages at the time of their creation. - All messages created after their creation. Besides retention guarantees, timestamps are also more precise than timestamp-based seeks. See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' + comments: '`start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek to the snapshot before starting consumption. Snapshots are the preferred seeking mechanism in pub/sub because they guarantee retention of: - All unacknowledged messages at the time of their creation. - All messages created after their creation. Besides retention guarantees, snapshots are also more precise than timestamp-based seeks. See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' required: false - name: pubsub.parallelism field_type: String From fe621b678172090ac827b825b20d049dcd335f17 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 14 May 2024 17:54:12 +0800 Subject: [PATCH 08/13] fix Signed-off-by: xxchan --- e2e_test/source_inline/pubsub/pubsub.slt | 2 +- risedev.yml | 2 +- src/config/ci-recovery.toml | 1 + src/config/ci.toml | 1 - 4 files changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_inline/pubsub/pubsub.slt b/e2e_test/source_inline/pubsub/pubsub.slt index d77fae509efe5..35c94d07b9621 100644 --- a/e2e_test/source_inline/pubsub/pubsub.slt +++ b/e2e_test/source_inline/pubsub/pubsub.slt @@ -150,6 +150,6 @@ select v1, v2, count(*) FROM s1 group by v1, v2; statement ok DROP TABLE s1; -# Restore to the value in src/config/ci.toml +# Restore to the value in src/config/ci-recovery.toml statement ok ALTER SYSTEM SET checkpoint_frequency TO 5; diff --git a/risedev.yml b/risedev.yml index 7c57eb1601666..fdaaae7776e88 100644 --- a/risedev.yml +++ b/risedev.yml @@ -811,7 +811,7 @@ profile: port: 29092 ci-inline-source-test: - config-path: src/config/ci.toml + config-path: src/config/ci-recovery.toml steps: - use: minio - use: etcd diff --git a/src/config/ci-recovery.toml b/src/config/ci-recovery.toml index 95e5834621115..69de86279c1c5 100644 --- a/src/config/ci-recovery.toml +++ b/src/config/ci-recovery.toml @@ -10,5 +10,6 @@ imm_merge_threshold = 2 [system] barrier_interval_ms = 250 +# If this is changed, remember to also change e2e_test/source_inline/pubsub/pubsub.slt checkpoint_frequency = 5 max_concurrent_creating_streaming_jobs = 0 diff --git a/src/config/ci.toml b/src/config/ci.toml index 08b159c115b8c..02e1509546f27 100644 --- a/src/config/ci.toml +++ b/src/config/ci.toml @@ -18,6 +18,5 @@ imm_merge_threshold = 2 [system] barrier_interval_ms = 250 -# If this is changed, remember to also change e2e_test/source_inline/pubsub/pubsub.slt checkpoint_frequency = 5 max_concurrent_creating_streaming_jobs = 0 \ No newline at end of file From f15ced3f99c464bbbefb070114b2cdb01c56b4f3 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 15 May 2024 21:50:41 +0800 Subject: [PATCH 09/13] resolve comments Signed-off-by: xxchan --- .../src/source/google_pubsub/enumerator/client.rs | 14 +++----------- src/connector/src/source/google_pubsub/mod.rs | 13 ++++++++++--- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index 77fb9bb40adac..7345451c0c599 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -38,17 +38,9 @@ impl SplitEnumerator for PubsubSplitEnumerator { properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> ConnectorResult { - let split_count = match &properties.parallelism { - Some(parallelism) => { - let parallelism = parallelism - .parse::() - .context("error when parsing parallelism")?; - if parallelism < 1 { - bail!("parallelism must be >= 1"); - } - parallelism - } - None => 1, + let split_count = properties.parallelism.unwrap_or(1); + if split_count < 1 { + bail!("parallelism must be >= 1"); }; if properties.credentials.is_none() && properties.emulator_host.is_none() { diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index 6b4cc66626885..a0794bb1205df 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -22,8 +22,8 @@ use serde::Deserialize; pub mod enumerator; pub mod source; pub mod split; - pub use enumerator::*; +use serde_with::{serde_as, DisplayFromStr}; pub use source::*; pub use split::*; use with_options::WithOptions; @@ -33,12 +33,18 @@ use crate::source::SourceProperties; pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub"; +/// # Implementation Notes +/// Pub/Sub does not rely on persisted state (`SplitImpl`) to start from a position. +/// It rely on Pub/Sub to load-balance messages between all Readers. +/// We `ack` received messages after checkpoint (see `WaitCheckpointWorker`) to achieve at-least-once delivery. +#[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct PubsubProperties { /// Pub/Sub subscription to consume messages from. /// /// Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from - /// the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV. + /// the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV + /// (shared between the actors of its fragment). /// Otherwise, different MVs on the same Source will both receive part of the messages. /// TODO: check and enforce this on Meta. #[serde(rename = "pubsub.subscription")] @@ -77,8 +83,9 @@ pub struct PubsubProperties { /// `parallelism` is the number of parallel consumers to run for the subscription. /// TODO: use system parallelism if not set + #[serde_as(as = "Option")] #[serde(rename = "pubsub.parallelism")] - pub parallelism: Option, + pub parallelism: Option, #[serde(flatten)] pub unknown_fields: HashMap, From 807e9d5f4791d2110377fd05a24cccb8526c381e Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 16 May 2024 00:39:50 +0800 Subject: [PATCH 10/13] minor update Signed-off-by: xxchan --- e2e_test/source_inline/pubsub/prepare-data.rs | 11 ++++++++--- src/risedevtool/src/risedev_env.rs | 1 + 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_inline/pubsub/prepare-data.rs b/e2e_test/source_inline/pubsub/prepare-data.rs index c48307e375f5d..bc07f9de6ccf3 100755 --- a/e2e_test/source_inline/pubsub/prepare-data.rs +++ b/e2e_test/source_inline/pubsub/prepare-data.rs @@ -16,7 +16,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ ``` use google_cloud_googleapis::pubsub::v1::PubsubMessage; -use google_cloud_pubsub::client::Client; +use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_pubsub::subscription::SubscriptionConfig; const TOPIC: &str = "test-topic"; @@ -28,9 +28,14 @@ async fn main() -> anyhow::Result<()> { let args: Vec = std::env::args().collect(); let command = args[1].as_str(); - std::env::set_var("PUBSUB_EMULATOR_HOST", "127.0.0.1:5980"); + let use_emulator = std::env::var("PUBSUB_EMULATOR_HOST").is_ok(); + let use_cloud = std::env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_ok(); + if !use_emulator && !use_cloud { + panic!("either PUBSUB_EMULATOR_HOST or GOOGLE_APPLICATION_CREDENTIALS_JSON must be set"); + } - let client = Client::new(Default::default()).await?; + let config = ClientConfig::default().with_auth().await?; + let client = Client::new(config).await?; let topic = client.topic(TOPIC); diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index b33a65f9986aa..5b8415113d1ba 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -94,6 +94,7 @@ pub fn generate_risedev_env(services: &Vec) -> String { ServiceConfig::Pubsub(c) => { let address = &c.address; let port = &c.port; + writeln!(env, r#"PUBSUB_EMULATOR_HOST="{address}:{port}""#,).unwrap(); writeln!(env, r#"RISEDEV_PUBSUB_WITH_OPTIONS_COMMON="connector='google_pubsub',pubsub.emulator_host='{address}:{port}'""#,).unwrap(); } ServiceConfig::Postgres(c) => { From 4fb91fd18a8dbbce37bc669410b7f3700bf283ca Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 16 May 2024 00:41:58 +0800 Subject: [PATCH 11/13] deprecate start offset Signed-off-by: xxchan --- .../src/source/google_pubsub/enumerator/client.rs | 2 +- .../src/source/google_pubsub/source/reader.rs | 13 ------------- src/connector/src/source/google_pubsub/split.rs | 10 ++++------ 3 files changed, 5 insertions(+), 20 deletions(-) diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index 7345451c0c599..3bb37845763ea 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -89,7 +89,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { .map(|i| PubsubSplit { index: i, subscription: self.subscription.to_owned(), - start_offset: None, + __deprecated_start_offset: None, __deprecated_stop_offset: None, }) .collect(); diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index 68f9ba371fec0..740a787db1e6e 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -99,19 +99,6 @@ impl SplitReader for PubsubSplitReader { let subscription = properties.subscription_client().await?; - if let Some(ref offset) = split.start_offset { - let timestamp = offset - .as_str() - .parse::() - .map(|nanos| Utc.timestamp_nanos(nanos)) - .context("error parsing offset")?; - - subscription - .seek(SeekTo::Timestamp(timestamp.into()), None) - .await - .context("error seeking to pubsub offset")?; - } - Ok(Self { subscription, split_id: split.id(), diff --git a/src/connector/src/source/google_pubsub/split.rs b/src/connector/src/source/google_pubsub/split.rs index 418ef9d138356..85d32f479bc2e 100644 --- a/src/connector/src/source/google_pubsub/split.rs +++ b/src/connector/src/source/google_pubsub/split.rs @@ -25,11 +25,9 @@ pub struct PubsubSplit { pub(crate) index: u32, pub(crate) subscription: String, - /// `start_offset` is a numeric timestamp. - /// When not `None`, the `PubsubReader` seeks to the timestamp described by the `start_offset`. - /// These offsets are taken from the `offset` property of the `SourceMessage` yielded by the - /// pubsub reader. - pub(crate) start_offset: Option, + #[serde(rename = "start_offset")] + #[serde(skip_serializing)] + pub(crate) __deprecated_start_offset: Option, #[serde(rename = "stop_offset")] #[serde(skip_serializing)] @@ -53,7 +51,7 @@ impl SplitMetaData for PubsubSplit { /// One subscription is like one Kafka consumer group. fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> { // forcefully set previously persisted start_offset to None - self.start_offset = None; + self.__deprecated_start_offset = None; Ok(()) } } From 9b1056c1d3c509de92d90d51ab48b6b0067bd92a Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 16 May 2024 23:15:34 +0800 Subject: [PATCH 12/13] fix lint & yaml test Signed-off-by: xxchan --- src/connector/src/source/google_pubsub/source/reader.rs | 4 +--- src/connector/with_options_source.yaml | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index 740a787db1e6e..c771fc8eb638c 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -12,11 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::Context; use async_trait::async_trait; -use chrono::{TimeZone, Utc}; use futures_async_stream::try_stream; -use google_cloud_pubsub::subscription::{SeekTo, Subscription}; +use google_cloud_pubsub::subscription::Subscription; use risingwave_common::{bail, ensure}; use tonic::Code; diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index c928a23fbe285..188ceb0b5b694 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -629,7 +629,7 @@ PubsubProperties: fields: - name: pubsub.subscription field_type: String - comments: 'Pub/Sub subscription to consume messages from. Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV. Otherwise, different MVs on the same Source will both receive part of the messages. TODO: check and enforce this on Meta.' + comments: 'Pub/Sub subscription to consume messages from. Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV (shared between the actors of its fragment). Otherwise, different MVs on the same Source will both receive part of the messages. TODO: check and enforce this on Meta.' required: true - name: pubsub.emulator_host field_type: String @@ -648,7 +648,7 @@ PubsubProperties: comments: '`start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek to the snapshot before starting consumption. Snapshots are the preferred seeking mechanism in pub/sub because they guarantee retention of: - All unacknowledged messages at the time of their creation. - All messages created after their creation. Besides retention guarantees, snapshots are also more precise than timestamp-based seeks. See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' required: false - name: pubsub.parallelism - field_type: String + field_type: u32 comments: '`parallelism` is the number of parallel consumers to run for the subscription. TODO: use system parallelism if not set' required: false PulsarProperties: From 536852e17d7021b77934820771e9d5c40b369f48 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 20 May 2024 16:29:21 +0000 Subject: [PATCH 13/13] fix: failed to ack pubsub messages: Request payload size exceeds the limit: 524288 bytes. Signed-off-by: xxchan --- src/connector/src/source/mod.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 13c0dcca93583..ed8842e70825f 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -99,22 +99,29 @@ impl WaitCheckpointTask { } } WaitCheckpointTask::AckPubsubMessage(subscription, ack_id_arrs) => { + async fn ack(subscription: &Subscription, ack_ids: Vec) { + tracing::trace!("acking pubsub messages {:?}", ack_ids); + match subscription.ack(ack_ids).await { + Ok(()) => {} + Err(e) => { + tracing::error!( + error = %e.as_report(), + "failed to ack pubsub messages", + ) + } + } + } + const MAX_ACK_BATCH_SIZE: usize = 1000; let mut ack_ids: Vec = vec![]; for arr in ack_id_arrs { for ack_id in arr.as_utf8().iter().flatten() { - ack_ids.push(ack_id.to_string()) - } - } - tracing::trace!("acking pubsub messages {:?}", ack_ids); - match subscription.ack(ack_ids).await { - Ok(()) => {} - Err(e) => { - tracing::error!( - error = %e.as_report(), - "failed to ack pubsub messages", - ) + ack_ids.push(ack_id.to_string()); + if ack_ids.len() >= MAX_ACK_BATCH_SIZE { + ack(&subscription, std::mem::take(&mut ack_ids)).await; + } } } + ack(&subscription, ack_ids).await; } } }