diff --git a/e2e_test/source_inline/kafka/issue_19563.slt.serial b/e2e_test/source_inline/kafka/issue_19563.slt.serial index a01e7697d124a..528fac0735767 100644 --- a/e2e_test/source_inline/kafka/issue_19563.slt.serial +++ b/e2e_test/source_inline/kafka/issue_19563.slt.serial @@ -55,11 +55,7 @@ cat < StreamResult<()> { MockOffsetGenExecutor::new(source).boxed(), ); + let binlog_file = String::from("1.binlog"); + // mock binlog watermarks for backfill + // initial low watermark: 1.binlog, pos=2 and expected behaviors: + // - ignore events before (1.binlog, pos=2); + // - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot + let binlog_watermarks = vec![ + MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark + MySqlOffset::new(binlog_file.clone(), 4), + MySqlOffset::new(binlog_file.clone(), 6), + MySqlOffset::new(binlog_file.clone(), 8), + MySqlOffset::new(binlog_file.clone(), 10), + ]; + let table_name = SchemaTableName { schema_name: "public".to_string(), table_name: "mock_table".to_string(), @@ -168,14 +183,11 @@ async fn test_cdc_backfill() -> StreamResult<()> { ]); let table_pk_indices = vec![0]; let table_pk_order_types = vec![OrderType::ascending()]; - let config = ExternalTableConfig::default(); - let external_table = ExternalStorageTable::new( TableId::new(1234), table_name, "mydb".to_string(), - config, - CdcTableType::Mock, + ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)), table_schema.clone(), table_pk_order_types, table_pk_indices.clone(), diff --git a/src/connector/src/source/cdc/external/mock_external_table.rs b/src/connector/src/source/cdc/external/mock_external_table.rs index 4f9538322d377..7242f5614d409 100644 --- a/src/connector/src/source/cdc/external/mock_external_table.rs +++ b/src/connector/src/source/cdc/external/mock_external_table.rs @@ -31,19 +31,7 @@ pub struct MockExternalTableReader { } impl MockExternalTableReader { - pub fn new() -> Self { - let binlog_file = String::from("1.binlog"); - // mock binlog watermarks for backfill - // initial low watermark: 1.binlog, pos=2 and expected behaviors: - // - ignore events before (1.binlog, pos=2); - // - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot - let binlog_watermarks = vec![ - MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark - MySqlOffset::new(binlog_file.clone(), 4), - MySqlOffset::new(binlog_file.clone(), 6), - MySqlOffset::new(binlog_file.clone(), 8), - MySqlOffset::new(binlog_file.clone(), 10), - ]; + pub fn new(binlog_watermarks: Vec) -> Self { Self { binlog_watermarks, snapshot_cnt: AtomicUsize::new(0), diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index d955de4ef4e66..fbfa66ef0e7c6 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -49,10 +49,9 @@ use crate::source::cdc::external::sql_server::{ use crate::source::cdc::CdcSourceType; use crate::WithPropertiesExt; -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum CdcTableType { Undefined, - Mock, MySql, Postgres, SqlServer, @@ -102,7 +101,6 @@ impl CdcTableType { Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer( SqlServerExternalTableReader::new(config, schema, pk_indices).await?, )), - Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())), _ => bail!("invalid external table type: {:?}", *self), } } @@ -220,7 +218,7 @@ pub enum ExternalTableReaderImpl { Mock(MockExternalTableReader), } -#[derive(Debug, Default, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize)] pub struct ExternalTableConfig { pub connector: String, diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 9d0b5d73b9791..066dc86ba551c 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; use std::pin::Pin; use either::Either; @@ -28,11 +27,9 @@ use risingwave_connector::parser::{ ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, }; -use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl}; +use risingwave_connector::source::cdc::external::CdcOffset; use risingwave_connector::source::{SourceColumnDesc, SourceContext}; use rw_futures_util::pausable; -use thiserror_ext::AsReport; -use tracing::Instrument; use crate::executor::backfill::cdc::state::CdcBackfillState; use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable; @@ -45,7 +42,6 @@ use crate::executor::backfill::utils::{ use crate::executor::backfill::CdcScanOptions; use crate::executor::monitor::CdcBackfillMetrics; use crate::executor::prelude::*; -use crate::executor::source::get_infinite_backoff_strategy; use crate::executor::UpdateMutation; use crate::task::CreateMviewProgressReporter; @@ -144,6 +140,7 @@ impl CdcBackfillExecutor { let upstream_table_name = self.external_table.qualified_table_name(); let schema_table_name = self.external_table.schema_table_name().clone(); let external_database_name = self.external_table.database_name().to_owned(); + let upstream_table_reader = UpstreamTableReader::new(self.external_table); let additional_columns = self .output_columns @@ -162,94 +159,38 @@ impl CdcBackfillExecutor { let first_barrier = expect_first_barrier(&mut upstream).await?; let mut is_snapshot_paused = first_barrier.is_pause_on_startup(); - let first_barrier_epoch = first_barrier.epoch; - // The first barrier message should be propagated. - yield Message::Barrier(first_barrier); let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0); // Check whether this parallelism has been assigned splits, // if not, we should bypass the backfill directly. let mut state_impl = self.state_impl; - state_impl.init_epoch(first_barrier_epoch); + let mut upstream = transform_upstream(upstream, &self.output_columns) + .boxed() + .peekable(); + + state_impl.init_epoch(first_barrier.epoch); // restore backfill state let state = state_impl.restore_state().await?; current_pk_pos = state.current_pk_pos.clone(); - let need_backfill = !self.options.disable_backfill && !state.is_finished; + let to_backfill = !self.options.disable_backfill && !state.is_finished; + + // The first barrier message should be propagated. + yield Message::Barrier(first_barrier); // Keep track of rows from the snapshot. let mut total_snapshot_row_count = state.row_count as u64; - // After init the state table and forward the initial barrier to downstream, - // we now try to create the table reader with retry. - // If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader; - // If backfill is finished, we should forward the upstream cdc events to downstream. - let mut table_reader: Option = None; - let external_table = self.external_table.clone(); - let mut future = Box::pin(async move { - let backoff = get_infinite_backoff_strategy(); - tokio_retry::Retry::spawn(backoff, || async { - match external_table.create_table_reader().await { - Ok(reader) => Ok(reader), - Err(e) => { - tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying..."); - Err(e) - } - } - }) - .instrument(tracing::info_span!("create_cdc_table_reader_with_retry")) - .await - .expect("Retry create cdc table reader until success.") - }); - loop { - if let Some(msg) = - build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future) - .await? - { - match msg { - Message::Barrier(barrier) => { - // commit state to bump the epoch of state table - state_impl.commit_state(barrier.epoch).await?; - yield Message::Barrier(barrier); - } - Message::Chunk(chunk) => { - if need_backfill { - // ignore chunk if we need backfill, since we can read the data from the snapshot - } else { - // forward the chunk to downstream - yield Message::Chunk(chunk); - } - } - Message::Watermark(_) => { - // ignore watermark - } - } - } else { - assert!(table_reader.is_some(), "table reader must created"); - tracing::info!( - table_id, - upstream_table_name, - "table reader created successfully" - ); - break; - } - } - - let upstream_table_reader = UpstreamTableReader::new( - self.external_table.clone(), - table_reader.expect("table reader must created"), - ); - - let mut upstream = transform_upstream(upstream, &self.output_columns) - .boxed() - .peekable(); let mut last_binlog_offset: Option = state .last_cdc_offset .map_or(upstream_table_reader.current_cdc_offset().await?, Some); - let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser(); + let offset_parse_func = upstream_table_reader + .inner() + .table_reader() + .get_cdc_offset_parser(); let mut consumed_binlog_offset: Option = None; tracing::info!( @@ -286,7 +227,7 @@ impl CdcBackfillExecutor { // finished. // // Once the backfill loop ends, we forward the upstream directly to the downstream. - if need_backfill { + if to_backfill { // drive the upstream changelog first to ensure we can receive timely changelog event, // otherwise the upstream changelog may be blocked by the snapshot read stream let _ = Pin::new(&mut upstream).peek().await; @@ -761,26 +702,6 @@ impl CdcBackfillExecutor { } } -async fn build_reader_and_poll_upstream( - upstream: &mut BoxedMessageStream, - table_reader: &mut Option, - future: &mut Pin>>, -) -> StreamExecutorResult> { - if table_reader.is_some() { - return Ok(None); - } - tokio::select! { - biased; - reader = &mut *future => { - *table_reader = Some(reader); - Ok(None) - } - msg = upstream.next() => { - msg.transpose() - } - } -} - #[try_stream(ok = Message, error = StreamExecutorError)] pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) { let props = SpecificParserConfig { diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/external.rs b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs index 3e3944e634f1a..bd99eb59821ee 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/external.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs @@ -14,14 +14,9 @@ use risingwave_common::catalog::{Schema, TableId}; use risingwave_common::util::sort_util::OrderType; -use risingwave_connector::error::ConnectorResult; -use risingwave_connector::source::cdc::external::{ - CdcOffset, CdcTableType, ExternalTableConfig, ExternalTableReader, ExternalTableReaderImpl, - SchemaTableName, -}; +use risingwave_connector::source::cdc::external::{ExternalTableReaderImpl, SchemaTableName}; /// This struct represents an external table to be read during backfill -#[derive(Debug, Clone)] pub struct ExternalStorageTable { /// Id for this table. table_id: TableId, @@ -33,9 +28,7 @@ pub struct ExternalStorageTable { database_name: String, - config: ExternalTableConfig, - - table_type: CdcTableType, + table_reader: ExternalTableReaderImpl, /// The schema of the output columns, i.e., this table VIEWED BY some executor like /// `RowSeqScanExecutor`. @@ -50,7 +43,6 @@ pub struct ExternalStorageTable { } impl ExternalStorageTable { - #[allow(clippy::too_many_arguments)] pub fn new( table_id: TableId, SchemaTableName { @@ -58,8 +50,7 @@ impl ExternalStorageTable { schema_name, }: SchemaTableName, database_name: String, - config: ExternalTableConfig, - table_type: CdcTableType, + table_reader: ExternalTableReaderImpl, schema: Schema, pk_order_types: Vec, pk_indices: Vec, @@ -69,8 +60,7 @@ impl ExternalStorageTable { table_name, schema_name, database_name, - config, - table_type, + table_reader, schema, pk_order_types, pk_indices, @@ -100,14 +90,8 @@ impl ExternalStorageTable { } } - pub async fn create_table_reader(&self) -> ConnectorResult { - self.table_type - .create_table_reader( - self.config.clone(), - self.schema.clone(), - self.pk_indices.clone(), - ) - .await + pub fn table_reader(&self) -> &ExternalTableReaderImpl { + &self.table_reader } pub fn qualified_table_name(&self) -> String { @@ -117,12 +101,4 @@ impl ExternalStorageTable { pub fn database_name(&self) -> &str { self.database_name.as_str() } - - pub async fn current_cdc_offset( - &self, - table_reader: &ExternalTableReaderImpl, - ) -> ConnectorResult> { - let binlog = table_reader.current_cdc_offset().await?; - Ok(Some(binlog)) - } } diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 6e3da9f6e4990..85c2f6b2ab178 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -26,7 +26,7 @@ use risingwave_common::row::OwnedRow; use risingwave_common::types::{Scalar, ScalarImpl, Timestamptz}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_connector::source::cdc::external::{ - CdcOffset, ExternalTableReader, ExternalTableReaderImpl, SchemaTableName, + CdcOffset, ExternalTableReader, SchemaTableName, }; use risingwave_pb::plan_common::additional_column::ColumnType; @@ -81,13 +81,16 @@ impl SnapshotReadArgs { /// because we need to customize the snapshot read for managed upstream table (e.g. mv, index) /// and external upstream table. pub struct UpstreamTableReader { - table: T, - pub(crate) reader: ExternalTableReaderImpl, + inner: T, } impl UpstreamTableReader { - pub fn new(table: T, reader: ExternalTableReaderImpl) -> Self { - Self { table, reader } + pub fn inner(&self) -> &T { + &self.inner + } + + pub fn new(table: T) -> Self { + Self { inner: table } } } @@ -139,11 +142,11 @@ impl UpstreamTableRead for UpstreamTableReader { #[try_stream(ok = Option, error = StreamExecutorError)] async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, batch_size: u32) { let primary_keys = self - .table + .inner .pk_indices() .iter() .map(|idx| { - let f = &self.table.schema().fields[*idx]; + let f = &self.inner.schema().fields[*idx]; f.name.clone() }) .collect_vec(); @@ -176,8 +179,8 @@ impl UpstreamTableRead for UpstreamTableReader { ); let mut read_count: usize = 0; - let row_stream = self.reader.snapshot_read( - self.table.schema_table_name(), + let row_stream = self.inner.table_reader().snapshot_read( + self.inner.schema_table_name(), read_args.current_pos.clone(), primary_keys.clone(), batch_size, @@ -185,7 +188,7 @@ impl UpstreamTableRead for UpstreamTableReader { pin_mut!(row_stream); let mut builder = DataChunkBuilder::new( - self.table.schema().data_types(), + self.inner.schema().data_types(), limited_chunk_size(read_args.rate_limit_rps), ); let chunk_stream = iter_chunks(row_stream, &mut builder); @@ -244,7 +247,7 @@ impl UpstreamTableRead for UpstreamTableReader { } async fn current_cdc_offset(&self) -> StreamExecutorResult> { - let binlog = self.reader.current_cdc_offset(); + let binlog = self.inner.table_reader().current_cdc_offset(); let binlog = binlog.await?; Ok(Some(binlog)) } diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index d873afdfcdb37..cab3527a32f25 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::num::NonZeroU32; -use std::time::Duration; use await_tree::InstrumentAwait; use governor::clock::MonotonicClock; @@ -50,7 +49,6 @@ pub use source_backfill_state_table::BackfillStateTableHandler; pub mod state_table_handler; use futures_async_stream::try_stream; use tokio::sync::mpsc::UnboundedReceiver; -use tokio_retry::strategy::{jitter, ExponentialBackoff}; use crate::executor::error::StreamExecutorError; use crate::executor::{Barrier, Message}; @@ -184,13 +182,3 @@ pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit_rps: Opti } } } - -pub fn get_infinite_backoff_strategy() -> impl Iterator { - const BASE_DELAY: Duration = Duration::from_secs(1); - const BACKOFF_FACTOR: u64 = 2; - const MAX_DELAY: Duration = Duration::from_secs(10); - ExponentialBackoff::from_millis(BASE_DELAY.as_millis() as u64) - .factor(BACKOFF_FACTOR) - .max_delay(MAX_DELAY) - .map(jitter) -} diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index a318a0f9a9cc1..f32a5da734671 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,17 +13,14 @@ // limitations under the License. use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; use std::time::Duration; use anyhow::anyhow; use either::Either; -use futures::stream::BoxStream; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; -use risingwave_common::catalog::{ColumnId, TableId}; +use risingwave_common::catalog::TableId; use risingwave_common::metrics::{LabelGuardedIntCounter, GLOBAL_ERROR_METRICS}; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; @@ -41,7 +38,6 @@ use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; -use tracing::Instrument; use super::executor_core::StreamSourceCore; use super::{ @@ -50,7 +46,6 @@ use super::{ }; use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; -use crate::executor::source::get_infinite_backoff_strategy; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::UpdateMutation; @@ -128,24 +123,6 @@ impl SourceExecutor { state: ConnectorState, seek_to_latest: bool, ) -> StreamExecutorResult<(BoxChunkSourceStream, Option>)> { - let (column_ids, source_ctx) = self.prepare_source_stream_build(source_desc); - let (stream, latest_splits) = source_desc - .source - .build_stream(state, column_ids, Arc::new(source_ctx), seek_to_latest) - .await - .map_err(StreamExecutorError::connector_error)?; - - Ok(( - apply_rate_limit(stream, self.rate_limit_rps).boxed(), - latest_splits, - )) - } - - /// build the source column ids and the source context which will be used to build the source stream - pub fn prepare_source_stream_build( - &self, - source_desc: &SourceDesc, - ) -> (Vec, SourceContext) { let column_ids = source_desc .columns .iter() @@ -153,7 +130,7 @@ impl SourceExecutor { .collect_vec(); let (schema_change_tx, mut schema_change_rx) = - mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16); + tokio::sync::mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16); let schema_change_tx = if self.is_auto_schema_change_enable() { let meta_client = self.actor_ctx.meta_client.clone(); // spawn a task to handle schema change event from source parser @@ -208,8 +185,16 @@ impl SourceExecutor { source_desc.source.config.clone(), schema_change_tx, ); + let (stream, latest_splits) = source_desc + .source + .build_stream(state, column_ids, Arc::new(source_ctx), seek_to_latest) + .await + .map_err(StreamExecutorError::connector_error)?; - (column_ids, source_ctx) + Ok(( + apply_rate_limit(stream, self.rate_limit_rps).boxed(), + latest_splits, + )) } fn is_auto_schema_change_enable(&self) -> bool { @@ -439,7 +424,7 @@ impl SourceExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_with_stream_source(mut self) { let mut barrier_receiver = self.barrier_receiver.take().unwrap(); - let first_barrier = barrier_receiver + let barrier = barrier_receiver .recv() .instrument_await("source_recv_first_barrier") .await @@ -450,17 +435,6 @@ impl SourceExecutor { self.stream_source_core.as_ref().unwrap().source_id ) })?; - let first_epoch = first_barrier.epoch; - let mut boot_state = - if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) { - tracing::debug!(?splits, "boot with splits"); - splits.to_vec() - } else { - Vec::default() - }; - let is_pause_on_startup = first_barrier.is_pause_on_startup(); - - yield Message::Barrier(first_barrier); let mut core = self.stream_source_core.unwrap(); @@ -478,7 +452,13 @@ impl SourceExecutor { unreachable!("Partition and offset columns must be set."); }; - core.split_state_store.init_epoch(first_epoch); + let mut boot_state = Vec::default(); + if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) { + tracing::debug!(?splits, "boot with splits"); + boot_state = splits.to_vec(); + } + + core.split_state_store.init_epoch(barrier.epoch); let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; for ele in &mut boot_state { @@ -506,98 +486,20 @@ impl SourceExecutor { let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); - - let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); - let mut reader_and_splits: Option<(BoxChunkSourceStream, Option>)> = None; - let seek_to_latest = self.is_shared_non_cdc && is_uninitialized; - let source_reader = source_desc.source.clone(); - let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc); - let source_ctx = Arc::new(source_ctx); - let mut build_source_stream_fut = Box::pin(async move { - let backoff = get_infinite_backoff_strategy(); - tokio_retry::Retry::spawn(backoff, || async { - match source_reader - .build_stream( - recover_state.clone(), - column_ids.clone(), - source_ctx.clone(), - seek_to_latest, - ) - .await { - Ok((stream, latest_splits)) => Ok((stream, latest_splits)), - Err(e) => { - tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying..."); - Err(e) - } - } - }) - .instrument(tracing::info_span!("build_source_stream_with_retry")) - .await - .expect("Retry build source stream until success.") - }); - - let mut need_resume_after_build = false; - // loop to create source stream until success - loop { - if let Some(barrier) = build_source_stream_and_poll_barrier( - &mut barrier_stream, - &mut reader_and_splits, - &mut build_source_stream_fut, + let (source_chunk_reader, latest_splits) = self + .build_stream_source_reader( + &source_desc, + recover_state, + // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. + // It's highly probable that the work of scanning historical data cannot be shared, + // so don't waste work on it. + // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 + // Note that shared CDC source is special. It already starts from latest. + self.is_shared_non_cdc && is_uninitialized, ) - .await? - { - if let Message::Barrier(barrier) = barrier { - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Throttle(actor_to_apply) => { - if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id) - && *new_rate_limit != self.rate_limit_rps - { - tracing::info!( - "updating rate limit from {:?} to {:?}", - self.rate_limit_rps, - *new_rate_limit - ); - - // update the rate limit option, we will apply the rate limit - // when we finish building the source stream. - self.rate_limit_rps = *new_rate_limit; - } - } - Mutation::Resume => { - // We record the Resume mutation here and postpone the resume of the source stream - // after we have successfully built the source stream. - need_resume_after_build = true; - } - _ => { - // ignore other mutations and output a warn log - tracing::warn!( - "Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before - finish building source stream.", - mutation - ); - } - } - } - - // bump state store epoch - let _ = self.persist_state_and_clear_cache(barrier.epoch).await?; - yield Message::Barrier(barrier); - } else { - unreachable!("Only barrier message is expected when building source stream."); - } - } else { - assert!(reader_and_splits.is_some()); - tracing::info!("source stream created successfully"); - break; - } - } - let (source_chunk_reader, latest_splits) = - reader_and_splits.expect("source chunk reader and splits must be created"); - - let source_chunk_reader = apply_rate_limit(source_chunk_reader, self.rate_limit_rps) - .boxed() - .map_err(StreamExecutorError::connector_error); + .instrument_await("source_build_reader") + .await?; + let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error); if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. // Then even it receives no messages, we can observe it in state table. @@ -609,17 +511,18 @@ impl SourceExecutor { } // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); - let mut command_paused = false; - // - If the first barrier requires us to pause on startup and we haven't received a Resume mutation, pause the stream. - if is_pause_on_startup && !need_resume_after_build { + // - If the first barrier requires us to pause on startup, pause the stream. + if barrier.is_pause_on_startup() { tracing::info!("source paused on startup"); stream.pause_stream(); - command_paused = true; } + yield Message::Barrier(barrier); + // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms` // milliseconds, considering some other latencies like network and cost in Meta. let mut max_wait_barrier_time_ms = @@ -651,25 +554,17 @@ impl SourceExecutor { last_barrier_time = Instant::now(); if self_paused { + stream.resume_stream(); self_paused = false; - // command_paused has a higher priority. - if !command_paused { - stream.resume_stream(); - } } let epoch = barrier.epoch; if let Some(mutation) = barrier.mutation.as_deref() { match mutation { - Mutation::Pause => { - command_paused = true; - stream.pause_stream() - } - Mutation::Resume => { - command_paused = false; - stream.resume_stream() - } + // XXX: Is it possible that the stream is self_paused, and we have pause mutation now? In this case, it will panic. + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), Mutation::SourceChangeSplit(actor_splits) => { tracing::info!( actor_id = self.actor_ctx.id, @@ -829,29 +724,6 @@ impl SourceExecutor { } } -async fn build_source_stream_and_poll_barrier( - barrier_stream: &mut BoxStream<'static, StreamExecutorResult>, - reader_and_splits: &mut Option<(BoxChunkSourceStream, Option>)>, - build_future: &mut Pin< - Box>)>>, - >, -) -> StreamExecutorResult> { - if reader_and_splits.is_some() { - return Ok(None); - } - - tokio::select! { - biased; - build_ret = &mut *build_future => { - *reader_and_splits = Some(build_ret); - Ok(None) - } - msg = barrier_stream.next() => { - msg.transpose() - } - } -} - impl Execute for SourceExecutor { fn execute(self: Box) -> BoxedMessageStream { if self.stream_source_core.is_some() { diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index e8dcb5f2ef83b..3c81ecb80e859 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -94,13 +94,15 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { .context("failed to parse external table config")?; let database_name = table_config.database.clone(); + let table_reader = table_type + .create_table_reader(table_config, table_schema.clone(), table_pk_indices.clone()) + .await?; let external_table = ExternalStorageTable::new( TableId::new(table_desc.table_id), schema_table_name, database_name, - table_config, - table_type, + table_reader, table_schema, table_pk_order_types, table_pk_indices,