diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index fa92e4eefc3a9..2bd0ec0b5ef8c 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -32,10 +32,8 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, use risingwave_common::types::{Datum, JsonbVal}; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader; -use risingwave_connector::source::cdc::external::mysql::MySqlOffset; use risingwave_connector::source::cdc::external::{ - DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, SchemaTableName, + CdcTableType, DebeziumOffset, DebeziumSourceOffset, ExternalTableConfig, SchemaTableName, }; use risingwave_connector::source::cdc::DebeziumCdcSplit; use risingwave_connector::source::SplitImpl; @@ -160,19 +158,6 @@ async fn test_cdc_backfill() -> StreamResult<()> { MockOffsetGenExecutor::new(source).boxed(), ); - let binlog_file = String::from("1.binlog"); - // mock binlog watermarks for backfill - // initial low watermark: 1.binlog, pos=2 and expected behaviors: - // - ignore events before (1.binlog, pos=2); - // - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot - let binlog_watermarks = vec![ - MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark - MySqlOffset::new(binlog_file.clone(), 4), - MySqlOffset::new(binlog_file.clone(), 6), - MySqlOffset::new(binlog_file.clone(), 8), - MySqlOffset::new(binlog_file.clone(), 10), - ]; - let table_name = SchemaTableName { schema_name: "public".to_string(), table_name: "mock_table".to_string(), @@ -183,11 +168,14 @@ async fn test_cdc_backfill() -> StreamResult<()> { ]); let table_pk_indices = vec![0]; let table_pk_order_types = vec![OrderType::ascending()]; + let config = ExternalTableConfig::default(); + let external_table = ExternalStorageTable::new( TableId::new(1234), table_name, "mydb".to_string(), - ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)), + config, + CdcTableType::Mock, table_schema.clone(), table_pk_order_types, table_pk_indices.clone(), diff --git a/src/connector/src/source/cdc/external/mock_external_table.rs b/src/connector/src/source/cdc/external/mock_external_table.rs index 7242f5614d409..4f9538322d377 100644 --- a/src/connector/src/source/cdc/external/mock_external_table.rs +++ b/src/connector/src/source/cdc/external/mock_external_table.rs @@ -31,7 +31,19 @@ pub struct MockExternalTableReader { } impl MockExternalTableReader { - pub fn new(binlog_watermarks: Vec) -> Self { + pub fn new() -> Self { + let binlog_file = String::from("1.binlog"); + // mock binlog watermarks for backfill + // initial low watermark: 1.binlog, pos=2 and expected behaviors: + // - ignore events before (1.binlog, pos=2); + // - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot + let binlog_watermarks = vec![ + MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark + MySqlOffset::new(binlog_file.clone(), 4), + MySqlOffset::new(binlog_file.clone(), 6), + MySqlOffset::new(binlog_file.clone(), 8), + MySqlOffset::new(binlog_file.clone(), 10), + ]; Self { binlog_watermarks, snapshot_cnt: AtomicUsize::new(0), diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 115b1be99d21b..8b8c74512da78 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -45,9 +45,10 @@ use crate::source::cdc::external::sql_server::{ use crate::source::cdc::CdcSourceType; use crate::WithPropertiesExt; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum CdcTableType { Undefined, + Mock, MySql, Postgres, SqlServer, @@ -97,6 +98,7 @@ impl CdcTableType { Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer( SqlServerExternalTableReader::new(config, schema, pk_indices).await?, )), + Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())), _ => bail!("invalid external table type: {:?}", *self), } } @@ -214,7 +216,7 @@ pub enum ExternalTableReaderImpl { Mock(MockExternalTableReader), } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Default, Clone, Deserialize)] pub struct ExternalTableConfig { pub connector: String, diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index e4cd24f58d4dd..b47a43e8b74c1 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::Future; use std::pin::Pin; use either::Either; @@ -27,9 +28,11 @@ use risingwave_connector::parser::{ ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, }; -use risingwave_connector::source::cdc::external::CdcOffset; +use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl}; use risingwave_connector::source::{SourceColumnDesc, SourceContext}; use rw_futures_util::pausable; +use thiserror_ext::AsReport; +use tracing::Instrument; use crate::executor::backfill::cdc::state::CdcBackfillState; use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable; @@ -42,6 +45,7 @@ use crate::executor::backfill::utils::{ use crate::executor::backfill::CdcScanOptions; use crate::executor::monitor::CdcBackfillMetrics; use crate::executor::prelude::*; +use crate::executor::source::get_infinite_backoff_strategy; use crate::executor::UpdateMutation; use crate::task::CreateMviewProgressReporter; @@ -140,7 +144,6 @@ impl CdcBackfillExecutor { let upstream_table_name = self.external_table.qualified_table_name(); let schema_table_name = self.external_table.schema_table_name().clone(); let external_database_name = self.external_table.database_name().to_owned(); - let upstream_table_reader = UpstreamTableReader::new(self.external_table); let additional_columns = self .output_columns @@ -168,29 +171,85 @@ impl CdcBackfillExecutor { // if not, we should bypass the backfill directly. let mut state_impl = self.state_impl; - let mut upstream = transform_upstream(upstream, &self.output_columns) - .boxed() - .peekable(); - state_impl.init_epoch(first_barrier_epoch).await?; // restore backfill state let state = state_impl.restore_state().await?; current_pk_pos = state.current_pk_pos.clone(); - let to_backfill = !self.options.disable_backfill && !state.is_finished; + let need_backfill = !self.options.disable_backfill && !state.is_finished; // Keep track of rows from the snapshot. let mut total_snapshot_row_count = state.row_count as u64; + // After init the state table and forward the initial barrier to downstream, + // we now try to create the table reader with retry. + // If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader; + // If backfill is finished, we should forward the upstream cdc events to downstream. + let mut table_reader: Option = None; + let external_table = self.external_table.clone(); + let mut future = Box::pin(async move { + let backoff = get_infinite_backoff_strategy(); + tokio_retry::Retry::spawn(backoff, || async { + match external_table.create_table_reader().await { + Ok(reader) => Ok(reader), + Err(e) => { + tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying..."); + Err(e) + } + } + }) + .instrument(tracing::info_span!("create_cdc_table_reader_with_retry")) + .await + .expect("Retry create cdc table reader until success.") + }); + loop { + if let Some(msg) = + build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future) + .await? + { + match msg { + Message::Barrier(barrier) => { + // commit state to bump the epoch of state table + state_impl.commit_state(barrier.epoch).await?; + yield Message::Barrier(barrier); + } + Message::Chunk(chunk) => { + if need_backfill { + // ignore chunk if we need backfill, since we can read the data from the snapshot + } else { + // forward the chunk to downstream + yield Message::Chunk(chunk); + } + } + Message::Watermark(_) => { + // ignore watermark + } + } + } else { + assert!(table_reader.is_some(), "table reader must created"); + tracing::info!( + table_id, + upstream_table_name, + "table reader created successfully" + ); + break; + } + } + + let upstream_table_reader = UpstreamTableReader::new( + self.external_table.clone(), + table_reader.expect("table reader must created"), + ); + + let mut upstream = transform_upstream(upstream, &self.output_columns) + .boxed() + .peekable(); let mut last_binlog_offset: Option = state .last_cdc_offset .map_or(upstream_table_reader.current_cdc_offset().await?, Some); - let offset_parse_func = upstream_table_reader - .inner() - .table_reader() - .get_cdc_offset_parser(); + let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser(); let mut consumed_binlog_offset: Option = None; tracing::info!( @@ -227,7 +286,7 @@ impl CdcBackfillExecutor { // finished. // // Once the backfill loop ends, we forward the upstream directly to the downstream. - if to_backfill { + if need_backfill { // drive the upstream changelog first to ensure we can receive timely changelog event, // otherwise the upstream changelog may be blocked by the snapshot read stream let _ = Pin::new(&mut upstream).peek().await; @@ -702,6 +761,26 @@ impl CdcBackfillExecutor { } } +async fn build_reader_and_poll_upstream( + upstream: &mut BoxedMessageStream, + table_reader: &mut Option, + future: &mut Pin>>, +) -> StreamExecutorResult> { + if table_reader.is_some() { + return Ok(None); + } + tokio::select! { + biased; + reader = &mut *future => { + *table_reader = Some(reader); + Ok(None) + } + msg = upstream.next() => { + msg.transpose() + } + } +} + #[try_stream(ok = Message, error = StreamExecutorError)] pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) { let props = SpecificParserConfig { diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/external.rs b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs index bd99eb59821ee..3e3944e634f1a 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/external.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs @@ -14,9 +14,14 @@ use risingwave_common::catalog::{Schema, TableId}; use risingwave_common::util::sort_util::OrderType; -use risingwave_connector::source::cdc::external::{ExternalTableReaderImpl, SchemaTableName}; +use risingwave_connector::error::ConnectorResult; +use risingwave_connector::source::cdc::external::{ + CdcOffset, CdcTableType, ExternalTableConfig, ExternalTableReader, ExternalTableReaderImpl, + SchemaTableName, +}; /// This struct represents an external table to be read during backfill +#[derive(Debug, Clone)] pub struct ExternalStorageTable { /// Id for this table. table_id: TableId, @@ -28,7 +33,9 @@ pub struct ExternalStorageTable { database_name: String, - table_reader: ExternalTableReaderImpl, + config: ExternalTableConfig, + + table_type: CdcTableType, /// The schema of the output columns, i.e., this table VIEWED BY some executor like /// `RowSeqScanExecutor`. @@ -43,6 +50,7 @@ pub struct ExternalStorageTable { } impl ExternalStorageTable { + #[allow(clippy::too_many_arguments)] pub fn new( table_id: TableId, SchemaTableName { @@ -50,7 +58,8 @@ impl ExternalStorageTable { schema_name, }: SchemaTableName, database_name: String, - table_reader: ExternalTableReaderImpl, + config: ExternalTableConfig, + table_type: CdcTableType, schema: Schema, pk_order_types: Vec, pk_indices: Vec, @@ -60,7 +69,8 @@ impl ExternalStorageTable { table_name, schema_name, database_name, - table_reader, + config, + table_type, schema, pk_order_types, pk_indices, @@ -90,8 +100,14 @@ impl ExternalStorageTable { } } - pub fn table_reader(&self) -> &ExternalTableReaderImpl { - &self.table_reader + pub async fn create_table_reader(&self) -> ConnectorResult { + self.table_type + .create_table_reader( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + ) + .await } pub fn qualified_table_name(&self) -> String { @@ -101,4 +117,12 @@ impl ExternalStorageTable { pub fn database_name(&self) -> &str { self.database_name.as_str() } + + pub async fn current_cdc_offset( + &self, + table_reader: &ExternalTableReaderImpl, + ) -> ConnectorResult> { + let binlog = table_reader.current_cdc_offset().await?; + Ok(Some(binlog)) + } } diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 85c2f6b2ab178..6e3da9f6e4990 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -26,7 +26,7 @@ use risingwave_common::row::OwnedRow; use risingwave_common::types::{Scalar, ScalarImpl, Timestamptz}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_connector::source::cdc::external::{ - CdcOffset, ExternalTableReader, SchemaTableName, + CdcOffset, ExternalTableReader, ExternalTableReaderImpl, SchemaTableName, }; use risingwave_pb::plan_common::additional_column::ColumnType; @@ -81,16 +81,13 @@ impl SnapshotReadArgs { /// because we need to customize the snapshot read for managed upstream table (e.g. mv, index) /// and external upstream table. pub struct UpstreamTableReader { - inner: T, + table: T, + pub(crate) reader: ExternalTableReaderImpl, } impl UpstreamTableReader { - pub fn inner(&self) -> &T { - &self.inner - } - - pub fn new(table: T) -> Self { - Self { inner: table } + pub fn new(table: T, reader: ExternalTableReaderImpl) -> Self { + Self { table, reader } } } @@ -142,11 +139,11 @@ impl UpstreamTableRead for UpstreamTableReader { #[try_stream(ok = Option, error = StreamExecutorError)] async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, batch_size: u32) { let primary_keys = self - .inner + .table .pk_indices() .iter() .map(|idx| { - let f = &self.inner.schema().fields[*idx]; + let f = &self.table.schema().fields[*idx]; f.name.clone() }) .collect_vec(); @@ -179,8 +176,8 @@ impl UpstreamTableRead for UpstreamTableReader { ); let mut read_count: usize = 0; - let row_stream = self.inner.table_reader().snapshot_read( - self.inner.schema_table_name(), + let row_stream = self.reader.snapshot_read( + self.table.schema_table_name(), read_args.current_pos.clone(), primary_keys.clone(), batch_size, @@ -188,7 +185,7 @@ impl UpstreamTableRead for UpstreamTableReader { pin_mut!(row_stream); let mut builder = DataChunkBuilder::new( - self.inner.schema().data_types(), + self.table.schema().data_types(), limited_chunk_size(read_args.rate_limit_rps), ); let chunk_stream = iter_chunks(row_stream, &mut builder); @@ -247,7 +244,7 @@ impl UpstreamTableRead for UpstreamTableReader { } async fn current_cdc_offset(&self) -> StreamExecutorResult> { - let binlog = self.inner.table_reader().current_cdc_offset(); + let binlog = self.reader.current_cdc_offset(); let binlog = binlog.await?; Ok(Some(binlog)) } diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index cab3527a32f25..d873afdfcdb37 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::num::NonZeroU32; +use std::time::Duration; use await_tree::InstrumentAwait; use governor::clock::MonotonicClock; @@ -49,6 +50,7 @@ pub use source_backfill_state_table::BackfillStateTableHandler; pub mod state_table_handler; use futures_async_stream::try_stream; use tokio::sync::mpsc::UnboundedReceiver; +use tokio_retry::strategy::{jitter, ExponentialBackoff}; use crate::executor::error::StreamExecutorError; use crate::executor::{Barrier, Message}; @@ -182,3 +184,13 @@ pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit_rps: Opti } } } + +pub fn get_infinite_backoff_strategy() -> impl Iterator { + const BASE_DELAY: Duration = Duration::from_secs(1); + const BACKOFF_FACTOR: u64 = 2; + const MAX_DELAY: Duration = Duration::from_secs(10); + ExponentialBackoff::from_millis(BASE_DELAY.as_millis() as u64) + .factor(BACKOFF_FACTOR) + .max_delay(MAX_DELAY) + .map(jitter) +} diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 80b252014d284..ca98ec99ce6ae 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,14 +13,17 @@ // limitations under the License. use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; use std::time::Duration; use anyhow::anyhow; use either::Either; +use futures::stream::BoxStream; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; -use risingwave_common::catalog::TableId; +use risingwave_common::catalog::{ColumnId, TableId}; use risingwave_common::metrics::{LabelGuardedIntCounter, GLOBAL_ERROR_METRICS}; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; @@ -38,6 +41,7 @@ use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; +use tracing::Instrument; use super::executor_core::StreamSourceCore; use super::{ @@ -46,6 +50,7 @@ use super::{ }; use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; +use crate::executor::source::get_infinite_backoff_strategy; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::UpdateMutation; @@ -123,6 +128,24 @@ impl SourceExecutor { state: ConnectorState, seek_to_latest: bool, ) -> StreamExecutorResult<(BoxChunkSourceStream, Option>)> { + let (column_ids, source_ctx) = self.prepare_source_stream_build(source_desc); + let (stream, latest_splits) = source_desc + .source + .build_stream(state, column_ids, Arc::new(source_ctx), seek_to_latest) + .await + .map_err(StreamExecutorError::connector_error)?; + + Ok(( + apply_rate_limit(stream, self.rate_limit_rps).boxed(), + latest_splits, + )) + } + + /// build the source column ids and the source context which will be used to build the source stream + pub fn prepare_source_stream_build( + &self, + source_desc: &SourceDesc, + ) -> (Vec, SourceContext) { let column_ids = source_desc .columns .iter() @@ -130,7 +153,7 @@ impl SourceExecutor { .collect_vec(); let (schema_change_tx, mut schema_change_rx) = - tokio::sync::mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16); + mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16); let schema_change_tx = if self.is_auto_schema_change_enable() { let meta_client = self.actor_ctx.meta_client.clone(); // spawn a task to handle schema change event from source parser @@ -185,16 +208,8 @@ impl SourceExecutor { source_desc.source.config.clone(), schema_change_tx, ); - let (stream, latest_splits) = source_desc - .source - .build_stream(state, column_ids, Arc::new(source_ctx), seek_to_latest) - .await - .map_err(StreamExecutorError::connector_error)?; - Ok(( - apply_rate_limit(stream, self.rate_limit_rps).boxed(), - latest_splits, - )) + (column_ids, source_ctx) } fn is_auto_schema_change_enable(&self) -> bool { @@ -424,7 +439,7 @@ impl SourceExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_with_stream_source(mut self) { let mut barrier_receiver = self.barrier_receiver.take().unwrap(); - let barrier = barrier_receiver + let first_barrier = barrier_receiver .recv() .instrument_await("source_recv_first_barrier") .await @@ -435,17 +450,17 @@ impl SourceExecutor { self.stream_source_core.as_ref().unwrap().source_id ) })?; - let first_epoch = barrier.epoch; + let first_epoch = first_barrier.epoch; let mut boot_state = - if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { + if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) { tracing::debug!(?splits, "boot with splits"); splits.to_vec() } else { Vec::default() }; - let is_pause_on_startup = barrier.is_pause_on_startup(); + let is_pause_on_startup = first_barrier.is_pause_on_startup(); - yield Message::Barrier(barrier); + yield Message::Barrier(first_barrier); let mut core = self.stream_source_core.unwrap(); @@ -490,20 +505,98 @@ impl SourceExecutor { let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); - let (source_chunk_reader, latest_splits) = self - .build_stream_source_reader( - &source_desc, - recover_state, - // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. - // It's highly probable that the work of scanning historical data cannot be shared, - // so don't waste work on it. - // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 - // Note that shared CDC source is special. It already starts from latest. - self.is_shared_non_cdc && is_uninitialized, + + let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + let mut reader_and_splits: Option<(BoxChunkSourceStream, Option>)> = None; + let seek_to_latest = self.is_shared_non_cdc && is_uninitialized; + let source_reader = source_desc.source.clone(); + let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc); + let source_ctx = Arc::new(source_ctx); + let mut build_source_stream_fut = Box::pin(async move { + let backoff = get_infinite_backoff_strategy(); + tokio_retry::Retry::spawn(backoff, || async { + match source_reader + .build_stream( + recover_state.clone(), + column_ids.clone(), + source_ctx.clone(), + seek_to_latest, + ) + .await { + Ok((stream, latest_splits)) => Ok((stream, latest_splits)), + Err(e) => { + tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying..."); + Err(e) + } + } + }) + .instrument(tracing::info_span!("build_source_stream_with_retry")) + .await + .expect("Retry build source stream until success.") + }); + + let mut need_resume_after_build = false; + // loop to create source stream until success + loop { + if let Some(barrier) = build_source_stream_and_poll_barrier( + &mut barrier_stream, + &mut reader_and_splits, + &mut build_source_stream_fut, ) - .instrument_await("source_build_reader") - .await?; - let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error); + .await? + { + if let Message::Barrier(barrier) = barrier { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Throttle(actor_to_apply) => { + if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id) + && *new_rate_limit != self.rate_limit_rps + { + tracing::info!( + "updating rate limit from {:?} to {:?}", + self.rate_limit_rps, + *new_rate_limit + ); + + // update the rate limit option, we will apply the rate limit + // when we finish building the source stream. + self.rate_limit_rps = *new_rate_limit; + } + } + Mutation::Resume => { + // We record the Resume mutation here and postpone the resume of the source stream + // after we have successfully built the source stream. + need_resume_after_build = true; + } + _ => { + // ignore other mutations and output a warn log + tracing::warn!( + "Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before + finish building source stream.", + mutation + ); + } + } + } + + // bump state store epoch + let _ = self.persist_state_and_clear_cache(barrier.epoch).await?; + yield Message::Barrier(barrier); + } else { + unreachable!("Only barrier message is expected when building source stream."); + } + } else { + assert!(reader_and_splits.is_some()); + tracing::info!("source stream created successfully"); + break; + } + } + let (source_chunk_reader, latest_splits) = + reader_and_splits.expect("source chunk reader and splits must be created"); + + let source_chunk_reader = apply_rate_limit(source_chunk_reader, self.rate_limit_rps) + .boxed() + .map_err(StreamExecutorError::connector_error); if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. // Then even it receives no messages, we can observe it in state table. @@ -515,13 +608,12 @@ impl SourceExecutor { } // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. - let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); let mut command_paused = false; - // - If the first barrier requires us to pause on startup, pause the stream. - if is_pause_on_startup { + // - If the first barrier requires us to pause on startup and we haven't received a Resume mutation, pause the stream. + if is_pause_on_startup && !need_resume_after_build { tracing::info!("source paused on startup"); stream.pause_stream(); command_paused = true; @@ -736,6 +828,29 @@ impl SourceExecutor { } } +async fn build_source_stream_and_poll_barrier( + barrier_stream: &mut BoxStream<'static, StreamExecutorResult>, + reader_and_splits: &mut Option<(BoxChunkSourceStream, Option>)>, + build_future: &mut Pin< + Box>)>>, + >, +) -> StreamExecutorResult> { + if reader_and_splits.is_some() { + return Ok(None); + } + + tokio::select! { + biased; + build_ret = &mut *build_future => { + *reader_and_splits = Some(build_ret); + Ok(None) + } + msg = barrier_stream.next() => { + msg.transpose() + } + } +} + impl Execute for SourceExecutor { fn execute(self: Box) -> BoxedMessageStream { if self.stream_source_core.is_some() { diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index 18a47ae8a461f..45cd661441f07 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -94,15 +94,13 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { .context("failed to parse external table config")?; let database_name = table_config.database.clone(); - let table_reader = table_type - .create_table_reader(table_config, table_schema.clone(), table_pk_indices.clone()) - .await?; let external_table = ExternalStorageTable::new( TableId::new(table_desc.table_id), schema_table_name, database_name, - table_reader, + table_config, + table_type, table_schema, table_pk_order_types, table_pk_indices,