diff --git a/e2e_test/source_inline/kafka/shared_source.slt b/e2e_test/source_inline/kafka/shared_source.slt index c481e609ffccd..51a9f1e5ee1b3 100644 --- a/e2e_test/source_inline/kafka/shared_source.slt +++ b/e2e_test/source_inline/kafka/shared_source.slt @@ -6,6 +6,29 @@ SET rw_enable_shared_source TO true; system ok rpk topic create shared_source -p 4 +# Test create source before produing data. +statement ok +create source s_before_produce (v1 int, v2 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + +statement ok +create materialized view mv_before_produce as select * from s_before_produce; + +sleep 2s + +# All partitions starts with backfill_info: NoDataToBackfill, so it finishes immediately. +system ok +internal_table.mjs --name mv_before_produce --type sourcebackfill +---- +0,"""Finished""" +1,"""Finished""" +2,"""Finished""" +3,"""Finished""" + + system ok cat << EOF | rpk topic produce shared_source -f "%p %v\n" -p 0 0 {"v1": 1, "v2": "a"} @@ -21,7 +44,7 @@ create source s0 (v1 int, v2 varchar) with ( scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE JSON; -query I +query ? select count(*) from rw_internal_tables where name like '%s0%'; ---- 1 @@ -41,21 +64,24 @@ create materialized view mv_1 as select * from s0; # Wait enough time to ensure SourceExecutor consumes all Kafka data. sleep 2s -# SourceExecutor's ingestion started, but it only starts from latest. +# SourceExecutor's ingestion started, but it only starts from latest (offset 1). system ok internal_table.mjs --name s0 --type source ---- (empty) -# offset 0 must be backfilled, not from upstream. +# SourceBackfill starts from offset 0, with backfill_info: HasDataToBackfill { latest_offset: "0" } (decided by kafka high watermark). +# (meaning upstream already consumed offset 0, so we only need to backfill to offset 0) +# After backfilling offset 0, it enters SourceCachingUp state. Now the backfill is finished. +# We wait for SourceExecutor to produce offset > 0. system ok internal_table.mjs --name mv_1 --type sourcebackfill ---- -0,"{""Backfilling"": ""0""}" -1,"{""Backfilling"": ""0""}" -2,"{""Backfilling"": ""0""}" -3,"{""Backfilling"": ""0""}" +0,"{""SourceCachingUp"": ""0""}" +1,"{""SourceCachingUp"": ""0""}" +2,"{""SourceCachingUp"": ""0""}" +3,"{""SourceCachingUp"": ""0""}" # This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor. @@ -67,7 +93,7 @@ create materialized view mv_2 as select * from s0; sleep 2s -query IT rowsort +query ?? rowsort select v1, v2 from s0; ---- 1 a @@ -75,7 +101,7 @@ select v1, v2 from s0; 3 c 4 d -query IT rowsort +query ?? rowsort select v1, v2 from mv_1; ---- 1 a @@ -83,7 +109,7 @@ select v1, v2 from mv_1; 3 c 4 d -query IT rowsort +query ?? rowsort select v1, v2 from mv_2; ---- 1 a @@ -111,7 +137,7 @@ internal_table.mjs --name s0 --type source 3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -query IT rowsort +query ?? rowsort select v1, v2 from s0; ---- 1 a @@ -123,7 +149,7 @@ select v1, v2 from s0; 4 d 4 dd -query IT rowsort +query ?? rowsort select v1, v2 from mv_1; ---- 1 a @@ -146,18 +172,14 @@ internal_table.mjs --name s0 --type source 3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -# The result is non-deterministic: -# If the upstream row comes before the backfill row, it will be ignored, and the result state is "{""Backfilling"": ""1""}". -# If the upstream row comes after the backfill row, the result state is Finished. -# Uncomment below and run manually to see the result. - -# system ok -# internal_table.mjs --name mv_1 --type sourcebackfill -# ---- -# 0,"{""Finished""}" -# 1,"{""Finished""}" -# 2,"{""Finished""}" -# 3,"{""Finished""}" +# Transition from SourceCachingUp to Finished after consuming one upstream message. +system ok +internal_table.mjs --name mv_1 --type sourcebackfill +---- +0,"""Finished""" +1,"""Finished""" +2,"""Finished""" +3,"""Finished""" system ok @@ -173,7 +195,7 @@ done sleep 3s -query IT rowsort +query ?? rowsort select v1, count(*) from s0 group by v1; ---- 1 12 @@ -181,7 +203,7 @@ select v1, count(*) from s0 group by v1; 3 12 4 12 -query IT rowsort +query ?? rowsort select v1, count(*) from mv_1 group by v1; ---- 1 12 @@ -189,6 +211,14 @@ select v1, count(*) from mv_1 group by v1; 3 12 4 12 +query ?? rowsort +select v1, count(*) from mv_before_produce group by v1; +---- +1 12 +2 12 +3 12 +4 12 + # start_offset changed to 11 system ok @@ -200,15 +230,8 @@ internal_table.mjs --name s0 --type source 3,"{""split_info"": {""partition"": 3, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -# Now it is highly probable that all partitions have finished. -system ok -internal_table.mjs --name mv_1 --type sourcebackfill ----- -0,"""Finished""" -1,"""Finished""" -2,"""Finished""" -3,"""Finished""" - - statement ok drop source s0 cascade; + +statement ok +drop source s_before_produce cascade; diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 38c2f25eb0336..d2b5aa1e88b4c 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -370,6 +370,33 @@ pub trait SplitReader: Sized + Send { ) -> crate::error::ConnectorResult; fn into_stream(self) -> BoxChunkSourceStream; + + fn backfill_info(&self) -> HashMap { + HashMap::new() + } +} + +/// Information used to determine whether we should start and finish source backfill. +/// +/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable), +/// perhaps we should ban blocking DDL for it. +#[derive(Debug, Clone)] +pub enum BackfillInfo { + HasDataToBackfill { + /// The last available offsets for each split (**inclusive**). + /// + /// This will be used to determine whether source backfill is finished when + /// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise, + /// blocking DDL cannot finish until new messages come. + /// + /// When there are upstream messages, we will use the latest offsets from the upstream. + latest_offset: String, + }, + /// If there are no messages in the split at all, we don't need to start backfill. + /// In this case, there will be no message from the backfill stream too. + /// If we started backfill, we cannot finish it until new messages come. + /// So we mark this a special case for optimization. + NoDataToBackfill, } for_all_sources!(impl_connector_properties); diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index ff007076c1338..5551c12b433b3 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -170,6 +170,7 @@ impl KafkaSplitEnumerator { self.report_high_watermark(*partition, high); map.insert(*partition, (low, high)); } + tracing::debug!("fetch kafka watermarks: {map:?}"); Ok(map) } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 5ace1820b4249..72d4c36377c81 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -34,13 +34,14 @@ use crate::source::kafka::{ KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext, KAFKA_ISOLATION_LEVEL, }; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData, - SplitReader, + into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId, + SplitMetaData, SplitReader, }; pub struct KafkaSplitReader { consumer: StreamConsumer, offsets: HashMap, Option)>, + backfill_info: HashMap, bytes_per_second: usize, max_num_messages: usize, parser_config: ParserConfig, @@ -106,7 +107,7 @@ impl SplitReader for KafkaSplitReader { let mut tpl = TopicPartitionList::with_capacity(splits.len()); let mut offsets = HashMap::new(); - + let mut backfill_info = HashMap::new(); for split in splits { offsets.insert(split.id(), (split.start_offset, split.stop_offset)); @@ -121,7 +122,29 @@ impl SplitReader for KafkaSplitReader { } else { tpl.add_partition(split.topic.as_str(), split.partition); } + + let (low, high) = consumer + .fetch_watermarks( + split.topic.as_str(), + split.partition, + properties.common.sync_call_timeout, + ) + .await?; + tracing::debug!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}"); + // note: low is inclusive, high is exclusive + if low == high { + backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill); + } else { + debug_assert!(high > 0); + backfill_info.insert( + split.id(), + BackfillInfo::HasDataToBackfill { + latest_offset: (high - 1).to_string(), + }, + ); + } } + tracing::debug!("backfill_info: {:?}", backfill_info); consumer.assign(&tpl)?; @@ -143,6 +166,7 @@ impl SplitReader for KafkaSplitReader { Ok(Self { consumer, offsets, + backfill_info, bytes_per_second, max_num_messages, parser_config, @@ -155,6 +179,10 @@ impl SplitReader for KafkaSplitReader { let source_context = self.source_ctx.clone(); into_chunk_stream(self.into_data_stream(), parser_config, source_context) } + + fn backfill_info(&self) -> HashMap { + self.backfill_info.clone() + } } impl KafkaSplitReader { diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 61468bd72a4b6..95764792c0025 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use anyhow::Context; @@ -34,8 +35,9 @@ use crate::source::filesystem::opendal_source::{ }; use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ - create_split_reader, BoxChunkSourceStream, BoxTryStream, Column, ConnectorProperties, - ConnectorState, SourceColumnDesc, SourceContext, SplitReader, WaitCheckpointTask, + create_split_reader, BackfillInfo, BoxChunkSourceStream, BoxTryStream, Column, + ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitReader, + WaitCheckpointTask, }; use crate::{dispatch_source_prop, WithOptionsSecResolved}; @@ -129,6 +131,72 @@ impl SourceReader { }) } + pub async fn build_stream_for_backfill( + &self, + state: ConnectorState, + column_ids: Vec, + source_ctx: Arc, + ) -> ConnectorResult<(BoxChunkSourceStream, HashMap)> { + let Some(splits) = state else { + return Ok((pending().boxed(), HashMap::new())); + }; + let config = self.config.clone(); + let columns = self.get_target_columns(column_ids)?; + + let data_gen_columns = Some( + columns + .iter() + .map(|col| Column { + name: col.name.clone(), + data_type: col.data_type.clone(), + is_visible: col.is_visible(), + }) + .collect_vec(), + ); + + let parser_config = ParserConfig { + specific: self.parser_config.clone(), + common: CommonParserConfig { + rw_columns: columns, + }, + }; + + let support_multiple_splits = config.support_multiple_splits(); + dispatch_source_prop!(config, prop, { + let readers = if support_multiple_splits { + tracing::debug!( + "spawning connector split reader for multiple splits {:?}", + splits + ); + let reader = + create_split_reader(*prop, splits, parser_config, source_ctx, data_gen_columns) + .await?; + + vec![reader] + } else { + let to_reader_splits = splits.into_iter().map(|split| vec![split]); + try_join_all(to_reader_splits.into_iter().map(|splits| { + tracing::debug!(?splits, "spawning connector split reader"); + let props = prop.clone(); + let data_gen_columns = data_gen_columns.clone(); + let parser_config = parser_config.clone(); + // TODO: is this reader split across multiple threads...? Realistically, we want + // source_ctx to live in a single actor. + let source_ctx = source_ctx.clone(); + create_split_reader(*props, splits, parser_config, source_ctx, data_gen_columns) + })) + .await? + }; + + let backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect(); + + Ok(( + select_all(readers.into_iter().map(|r| r.into_stream())).boxed(), + backfill_info, + )) + }) + } + /// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s). pub async fn build_stream( &self, diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 9c3336878f952..b28c707bdedd0 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -27,7 +27,8 @@ use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::types::JsonbVal; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ - BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData, + BackfillInfo, BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, + SplitMetaData, }; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; @@ -43,6 +44,7 @@ use crate::executor::UpdateMutation; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum BackfillState { /// `None` means not started yet. It's the initial state. + /// XXX: perhaps we can also set to low-watermark instead of `None` Backfilling(Option), /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset. SourceCachingUp(String), @@ -90,6 +92,8 @@ pub struct SourceBackfillExecutorInner { /// Local variables used in the backfill stage. /// +/// See for a state diagram about how it works. +/// /// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`. #[derive(Debug)] struct BackfillStage { @@ -99,8 +103,8 @@ struct BackfillStage { /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`). splits: Vec, /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling. - /// TODO: initialize this with high watermark so that we can finish backfilling even when upstream - /// doesn't emit any data. + /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it) + /// so that we can finish backfilling even when upstream doesn't emit any data. target_offsets: HashMap>, } @@ -259,7 +263,7 @@ impl SourceBackfillExecutorInner { &self, source_desc: &SourceDesc, splits: Vec, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult<(BoxChunkSourceStream, HashMap)> { let column_ids = source_desc .columns .iter() @@ -278,12 +282,22 @@ impl SourceBackfillExecutorInner { source_desc.source.config.clone(), None, ); - let stream = source_desc + + // We will check watermark to decide whether we need to backfill. + // e.g., when there's a Kafka topic-partition without any data, + // we don't need to backfill at all. But if we do not check here, + // the executor can only know it's finished when data coming in. + // For blocking DDL, this would be annoying. + + let (stream, backfill_info) = source_desc .source - .build_stream(Some(splits), column_ids, Arc::new(source_ctx)) + .build_stream_for_backfill(Some(splits), column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error)?; - Ok(apply_rate_limit(stream, self.rate_limit_rps).boxed()) + Ok(( + apply_rate_limit(stream, self.rate_limit_rps).boxed(), + backfill_info, + )) } #[try_stream(ok = Message, error = StreamExecutorError)] @@ -337,13 +351,25 @@ impl SourceBackfillExecutorInner { // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = core; - let source_chunk_reader = self + let (source_chunk_reader, backfill_info) = self .build_stream_source_reader( &source_desc, backfill_stage.get_latest_unfinished_splits()?, ) .instrument_await("source_build_reader") .await?; + for (split_id, info) in &backfill_info { + match info { + BackfillInfo::NoDataToBackfill => { + *backfill_stage.states.get_mut(split_id).unwrap() = BackfillState::Finished; + } + BackfillInfo::HasDataToBackfill { latest_offset } => { + // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value. + *backfill_stage.target_offsets.get_mut(split_id).unwrap() = + Some(latest_offset.clone()); + } + } + } fn select_strategy(_: &mut ()) -> PollNext { futures::stream::PollNext::Left @@ -422,7 +448,7 @@ impl SourceBackfillExecutorInner { self.actor_ctx.fragment_id.to_string(), ]); - let reader = self + let (reader, _backfill_info) = self .build_stream_source_reader( &source_desc, backfill_stage.get_latest_unfinished_splits()?, @@ -504,7 +530,7 @@ impl SourceBackfillExecutorInner { ); // Replace the source reader with a new one of the new state. - let reader = self + let (reader, _backfill_info) = self .build_stream_source_reader( &source_desc, latest_unfinished_splits, @@ -602,6 +628,15 @@ impl SourceBackfillExecutorInner { } let mut splits: HashSet = backfill_stage.states.keys().cloned().collect(); + // Make sure `Finished` state is persisted. + self.backfill_state_store + .set_states( + splits + .iter() + .map(|s| (s.clone(), BackfillState::Finished)) + .collect(), + ) + .await?; // All splits finished backfilling. Now we only forward the source data. #[for_await]