diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 42f610455c134..4ac4163070e5d 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -156,7 +156,7 @@ impl SourceExecutor { #[for_await] for chunk in stream { let chunk = chunk.map_err(BatchError::connector)?; - let data_chunk = covert_stream_chunk_to_batch_chunk(chunk.chunk)?; + let data_chunk = covert_stream_chunk_to_batch_chunk(chunk)?; if data_chunk.capacity() > 0 { yield data_chunk; } diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 56e681cc3eac5..067460db830db 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -241,7 +241,7 @@ impl MockDatagenSource { loop { for i in &mut readers { let item = i.next().await.unwrap().unwrap(); - yield Message::Chunk(item.chunk); + yield Message::Chunk(item); } } } diff --git a/src/connector/benches/nexmark_integration.rs b/src/connector/benches/nexmark_integration.rs index 951d42e594732..e6388ed4b0d25 100644 --- a/src/connector/benches/nexmark_integration.rs +++ b/src/connector/benches/nexmark_integration.rs @@ -19,14 +19,14 @@ use std::sync::LazyLock; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; +use risingwave_common::array::StreamChunk; use risingwave_common::catalog::ColumnId; use risingwave_common::types::DataType; use risingwave_connector::parser::{ ByteStreamSourceParser, JsonParser, SourceParserIntoStreamExt, SpecificParserConfig, }; use risingwave_connector::source::{ - BoxSourceStream, BoxSourceWithStateStream, SourceColumnDesc, SourceMessage, SourceMeta, - StreamChunkWithState, + BoxChunkSourceStream, BoxSourceStream, SourceColumnDesc, SourceMessage, SourceMeta, }; use tracing::Level; use tracing_subscriber::prelude::*; @@ -90,9 +90,8 @@ fn make_parser() -> impl ByteStreamSourceParser { JsonParser::new(props, columns, Default::default()).unwrap() } -fn make_stream_iter() -> impl Iterator { - let mut stream: BoxSourceWithStateStream = - make_parser().into_stream(make_data_stream()).boxed(); +fn make_stream_iter() -> impl Iterator { + let mut stream: BoxChunkSourceStream = make_parser().into_stream(make_data_stream()).boxed(); std::iter::from_fn(move || { stream diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index fa023bac01baa..bc1a2784ec3f6 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -32,6 +32,10 @@ use crate::source::{ S3_CONNECTOR, }; +// Hidden additional columns connectors which do not support `include` syntax. +pub static COMMON_COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock> = + LazyLock::new(|| HashSet::from(["partition", "offset"])); + pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock>> = LazyLock::new(|| { HashMap::from([ @@ -53,7 +57,7 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock, @@ -81,8 +85,23 @@ pub fn build_additional_column_catalog( column_alias: Option, inner_field_name: Option<&str>, data_type: Option<&str>, + reject_unknown_connector: bool, ) -> Result { - let compatible_columns = COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name).unwrap(); + let compatible_columns = match ( + COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name), + reject_unknown_connector, + ) { + (Some(compat_cols), _) => compat_cols, + (None, false) => &COMMON_COMPATIBLE_ADDITIONAL_COLUMNS, + (None, true) => { + return Err(format!( + "additional column is not supported for connector {}, acceptable connectors: {:?}", + connector_name, + COMPATIBLE_ADDITIONAL_COLUMNS.keys(), + ) + .into()) + } + }; if !compatible_columns.contains(additional_col_type) { return Err(format!( "additional column type {} is not supported for connector {}, acceptable column types: {:?}", @@ -91,7 +110,7 @@ pub fn build_additional_column_catalog( } let column_name = column_alias.unwrap_or_else(|| { - gen_default_name( + gen_default_addition_col_name( connector_name, additional_col_type, inner_field_name, @@ -241,17 +260,17 @@ mod test { use super::*; #[test] - fn test_gen_default_name() { + fn test_gen_default_addition_col_name() { assert_eq!( - gen_default_name("kafka", "key", None, None), + gen_default_addition_col_name("kafka", "key", None, None), "_rw_kafka_key" ); assert_eq!( - gen_default_name("kafka", "header", Some("inner"), None), + gen_default_addition_col_name("kafka", "header", Some("inner"), None), "_rw_kafka_header_inner" ); assert_eq!( - gen_default_name("kafka", "header", Some("inner"), Some("varchar")), + gen_default_addition_col_name("kafka", "header", Some("inner"), Some("varchar")), "_rw_kafka_header_inner_varchar" ); } diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 9233bc987eb17..899068a4cccb5 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -63,7 +63,8 @@ impl DebeziumMongoJsonParser { })? .clone(); - if rw_columns.len() != 2 { + // _rw_{connector}_file/partition & _rw_{connector}_offset are created automatically. + if rw_columns.iter().filter(|desc| desc.is_visible()).count() != 2 { return Err(RwError::from(ProtocolError( "Debezuim Mongo needs no more columns except `_id` and `payload` in table".into(), ))); diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index ae7dd22b2d349..87aead865c6db 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -566,6 +566,7 @@ mod tests { fields: vec![], column_type: SourceColumnType::Normal, is_pk: false, + is_hidden_addition_col: false, additional_column_type: AdditionalColumn { column_type: None }, }, SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)), diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index b549dcb838c24..8480138e02c07 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -581,6 +581,7 @@ mod tests { fields: vec![], column_type: SourceColumnType::Normal, is_pk: true, + is_hidden_addition_col: false, additional_column_type: AdditionalColumn { column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})), }, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index c9ee987f4652c..fedab10ef9d68 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -55,9 +55,8 @@ use crate::parser::util::{ use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ - extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, - SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId, - StreamChunkWithState, + extract_source_struct, BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, + SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMeta, }; pub mod additional_columns; @@ -567,8 +566,8 @@ impl P { /// /// # Returns /// - /// A [`SourceWithStateStream`] which is a stream of parsed messages. - pub fn into_stream(self, data_stream: BoxSourceStream) -> impl SourceWithStateStream { + /// A [`ChunkSourceStream`] which is a stream of parsed messages. + pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream { // Enable tracing to provide more information for parsing failures. let source_info = self.source_ctx().source_info; @@ -590,12 +589,11 @@ const MAX_ROWS_FOR_TRANSACTION: usize = 4096; // TODO: when upsert is disabled, how to filter those empty payload // Currently, an err is returned for non upsert with empty payload -#[try_stream(ok = StreamChunkWithState, error = RwError)] +#[try_stream(ok = StreamChunk, error = RwError)] async fn into_chunk_stream(mut parser: P, data_stream: BoxSourceStream) { let columns = parser.columns().to_vec(); let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); - let mut split_offset_mapping = HashMap::::new(); struct Transaction { id: Box, @@ -620,10 +618,7 @@ async fn into_chunk_stream(mut parser: P, data_stream ); *len = 0; // reset `len` while keeping `id` yield_asap = false; - yield StreamChunkWithState { - chunk: builder.take(batch_len), - split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)), - }; + yield builder.take(batch_len); } else { // Normal transaction. After the transaction is committed, we should yield the last // batch immediately, so set `yield_asap` to true. @@ -633,7 +628,6 @@ async fn into_chunk_stream(mut parser: P, data_stream // Clean state. Reserve capacity for the builder. assert!(builder.is_empty()); assert!(!yield_asap); - assert!(split_offset_mapping.is_empty()); let _ = builder.take(batch_len); } @@ -641,12 +635,6 @@ async fn into_chunk_stream(mut parser: P, data_stream for (i, msg) in batch.into_iter().enumerate() { if msg.key.is_none() && msg.payload.is_none() { tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message"); - // assumes an empty message as a heartbeat - // heartbeat message offset should not overwrite data messages offset - split_offset_mapping - .entry(msg.split_id) - .or_insert(msg.offset.clone()); - continue; } @@ -660,8 +648,6 @@ async fn into_chunk_stream(mut parser: P, data_stream .observe(lag_ms as f64); } - split_offset_mapping.insert(msg.split_id.clone(), msg.offset.clone()); - let old_op_num = builder.op_num(); match parser .parse_one_with_txn( @@ -727,10 +713,7 @@ async fn into_chunk_stream(mut parser: P, data_stream // chunk now. if current_transaction.is_none() && yield_asap { yield_asap = false; - yield StreamChunkWithState { - chunk: builder.take(batch_len - (i + 1)), - split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)), - }; + yield builder.take(batch_len - (i + 1)); } } } @@ -740,10 +723,7 @@ async fn into_chunk_stream(mut parser: P, data_stream if current_transaction.is_none() { yield_asap = false; - yield StreamChunkWithState { - chunk: builder.take(0), - split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)), - }; + yield builder.take(0); } } } @@ -815,7 +795,7 @@ pub enum ByteStreamSourceParserImpl { CanalJson(CanalJsonParser), } -pub type ParsedStreamImpl = impl SourceWithStateStream + Unpin; +pub type ParsedStreamImpl = impl ChunkSourceStream + Unpin; impl ByteStreamSourceParserImpl { /// Converts this parser into a stream of [`StreamChunk`]. diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index cd45a72865211..75912155a0dd6 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -218,18 +218,18 @@ mod tests { let output = output .unwrap() .into_iter() - .filter(|c| c.chunk.cardinality() > 0) + .filter(|c| c.cardinality() > 0) .enumerate() .map(|(i, c)| { if i == 0 { // begin + 3 data messages - assert_eq!(4, c.chunk.cardinality()); + assert_eq!(4, c.cardinality()); } if i == 1 { // 2 data messages + 1 end - assert_eq!(3, c.chunk.cardinality()); + assert_eq!(3, c.cardinality()); } - c.chunk + c }) .collect_vec(); @@ -255,11 +255,11 @@ mod tests { let output = output .unwrap() .into_iter() - .filter(|c| c.chunk.cardinality() > 0) + .filter(|c| c.cardinality() > 0) .map(|c| { // 5 data messages in a single chunk - assert_eq!(5, c.chunk.cardinality()); - c.chunk + assert_eq!(5, c.cardinality()); + c }) .collect_vec(); diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 2016fee6de60d..3f027b2dddfe9 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -343,30 +343,10 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result pub type BoxSourceStream = BoxStream<'static, Result>>; -pub trait SourceWithStateStream = - Stream> + Send + 'static; -pub type BoxSourceWithStateStream = BoxStream<'static, Result>; +pub trait ChunkSourceStream = Stream> + Send + 'static; +pub type BoxChunkSourceStream = BoxStream<'static, Result>; pub type BoxTryStream = BoxStream<'static, Result>; -/// [`StreamChunkWithState`] returns stream chunk together with offset for each split. In the -/// current design, one connector source can have multiple split reader. The keys are unique -/// `split_id` and values are the latest offset for each split. -#[derive(Clone, Debug, PartialEq)] -pub struct StreamChunkWithState { - pub chunk: StreamChunk, - pub split_offset_mapping: Option>, -} - -/// The `split_offset_mapping` field is unused for the table source, so we implement `From` for it. -impl From for StreamChunkWithState { - fn from(chunk: StreamChunk) -> Self { - Self { - chunk, - split_offset_mapping: None, - } - } -} - /// [`SplitReader`] is a new abstraction of the external connector read interface which is /// responsible for parsing, it is used to read messages from the outside and transform them into a /// stream of parsed [`StreamChunk`] @@ -383,7 +363,7 @@ pub trait SplitReader: Sized + Send { columns: Option>, ) -> Result; - fn into_stream(self) -> BoxSourceWithStateStream; + fn into_stream(self) -> BoxChunkSourceStream; } for_all_sources!(impl_connector_properties); diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index ce0f0d7ff66cb..19f7ca55cd302 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -32,8 +32,8 @@ use crate::parser::ParserConfig; use crate::source::base::SourceMessage; use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit}; use crate::source::{ - into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, - SplitId, SplitMetaData, SplitReader, + into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, SplitId, + SplitMetaData, SplitReader, }; pub struct CdcSplitReader { @@ -187,7 +187,7 @@ impl SplitReader for CdcSplitReader { } } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self, parser_config, source_context) diff --git a/src/connector/src/source/common.rs b/src/connector/src/source/common.rs index 11cbfce5d97f5..e5c6f98aff44c 100644 --- a/src/connector/src/source/common.rs +++ b/src/connector/src/source/common.rs @@ -14,10 +14,11 @@ use futures::{Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; +use risingwave_common::array::StreamChunk; use risingwave_common::error::RwError; use crate::parser::ParserConfig; -use crate::source::{SourceContextRef, SourceMessage, SplitReader, StreamChunkWithState}; +use crate::source::{SourceContextRef, SourceMessage, SplitReader}; pub(crate) trait CommonSplitReader: SplitReader + 'static { fn into_data_stream( @@ -25,7 +26,7 @@ pub(crate) trait CommonSplitReader: SplitReader + 'static { ) -> impl Stream, anyhow::Error>> + Send; } -#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] +#[try_stream(boxed, ok = StreamChunk, error = RwError)] pub(crate) async fn into_chunk_stream( reader: impl CommonSplitReader, parser_config: ParserConfig, diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index 6bf3b782d3949..b298d8eafcfa1 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -15,16 +15,16 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::Result; use futures_async_stream::try_stream; -use maplit::hashmap; +use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::error::RwError; use risingwave_common::field_generator::FieldGeneratorImpl; use risingwave_common::row::OwnedRow; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::iter_util::ZipEqFast; use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig}; -use crate::source::{SourceMessage, SourceMeta, SplitId, StreamChunkWithState}; +use crate::source::{SourceMessage, SourceMeta, SplitId}; pub enum FieldDesc { // field is invisible, generate None @@ -158,7 +158,7 @@ impl DatagenEventGenerator { } } - #[try_stream(ok = StreamChunkWithState, error = RwError)] + #[try_stream(ok = StreamChunk, error = RwError)] pub async fn into_native_stream(mut self) { let mut interval = tokio::time::interval(Duration::from_secs(1)); const MAX_ROWS_PER_YIELD: u64 = 1024; @@ -167,17 +167,29 @@ impl DatagenEventGenerator { // generate `partition_rows_per_second` rows per second interval.tick().await; let mut rows_generated_this_second = 0; + let mut chunk_builder = + StreamChunkBuilder::new(MAX_ROWS_PER_YIELD as usize, self.data_types.clone()); while rows_generated_this_second < self.partition_rows_per_second { - let mut rows = vec![]; let num_rows_to_generate = std::cmp::min( MAX_ROWS_PER_YIELD, self.partition_rows_per_second - rows_generated_this_second, ); 'outer: for _ in 0..num_rows_to_generate { - let mut row = Vec::with_capacity(self.fields_vec.len()); - for field_generator in &mut self.fields_vec { + let mut row = Vec::with_capacity(self.data_types.len()); + for (field_generator, field_name) in + self.fields_vec.iter_mut().zip_eq_fast(&self.field_names) + { let datum = match field_generator { - FieldDesc::Invisible => None, + // TODO: avoid distinguishing hidden partition/offset columns by name + FieldDesc::Invisible => match field_name.as_str() { + "_rw_datagen_partition" => { + Some(ScalarImpl::Utf8(self.split_id.as_ref().into())) + } + "_rw_datagen_offset" => { + Some(ScalarImpl::Utf8(self.offset.to_string().into_boxed_str())) + } + _ => None, + }, FieldDesc::Visible(field_generator) => { let datum = field_generator.generate_datum(self.offset); if datum.is_none() { @@ -197,20 +209,15 @@ impl DatagenEventGenerator { row.push(datum); } - rows.push((Op::Insert, OwnedRow::new(row))); self.offset += 1; rows_generated_this_second += 1; + if let Some(chunk) = chunk_builder.append_row(Op::Insert, OwnedRow::new(row)) { + yield chunk; + } } - if !rows.is_empty() { - let chunk = StreamChunk::from_rows(&rows, &self.data_types); - let mapping = hashmap! { - self.split_id.clone() => (self.offset - 1).to_string() - }; - yield StreamChunkWithState { - chunk, - split_offset_mapping: Some(mapping), - }; + if let Some(chunk) = chunk_builder.take() { + yield chunk; } if reach_end { diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index d276e6b414210..46112395b298a 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -25,8 +25,8 @@ use crate::source::data_gen_util::spawn_data_generation_stream; use crate::source::datagen::source::SEQUENCE_FIELD_KIND; use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc}; use crate::source::{ - into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, DataType, - SourceContextRef, SourceMessage, SplitId, SplitMetaData, SplitReader, + into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, DataType, SourceContextRef, + SourceMessage, SplitId, SplitMetaData, SplitReader, }; pub struct DatagenSplitReader { @@ -138,7 +138,7 @@ impl SplitReader for DatagenSplitReader { }) } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { // Will buffer at most 4 event chunks. const BUFFER_SIZE: usize = 4; // spawn_data_generation_stream(self.generator.into_native_stream(), BUFFER_SIZE).boxed() @@ -154,11 +154,11 @@ impl SplitReader for DatagenSplitReader { spawn_data_generation_stream( self.generator .into_native_stream() - .inspect_ok(move |chunk_with_states| { + .inspect_ok(move |stream_chunk| { metrics .partition_input_count .with_label_values(&[&actor_id, &source_id, &split_id]) - .inc_by(chunk_with_states.chunk.cardinality() as u64); + .inc_by(stream_chunk.cardinality() as u64); }), BUFFER_SIZE, ) @@ -397,7 +397,7 @@ mod tests { .into_stream(); let stream_chunk = reader.next().await.unwrap().unwrap(); - let (op, row) = stream_chunk.chunk.rows().next().unwrap(); + let (op, row) = stream_chunk.rows().next().unwrap(); assert_eq!(op, Op::Insert); assert_eq!(row.datum_at(0), Some(ScalarImpl::Int32(533)).to_datum_ref(),); assert_eq!( diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 57dfa7396128f..7096df9d684c4 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -17,6 +17,7 @@ use async_trait::async_trait; use futures::TryStreamExt; use futures_async_stream::try_stream; use opendal::Operator; +use risingwave_common::array::StreamChunk; use risingwave_common::error::RwError; use tokio::io::BufReader; use tokio_util::io::{ReaderStream, StreamReader}; @@ -27,8 +28,8 @@ use crate::parser::{ByteStreamSourceParserImpl, ParserConfig}; use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::{nd_streaming, OpendalFsSplit}; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, - SplitReader, StreamChunkWithState, + BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, + SplitReader, }; const MAX_CHANNEL_BUFFER_SIZE: usize = 2048; @@ -62,13 +63,13 @@ impl SplitReader for OpendalReader { Ok(opendal_reader) } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { self.into_chunk_stream() } } impl OpendalReader { - #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] + #[try_stream(boxed, ok = StreamChunk, error = RwError)] async fn into_chunk_stream(self) { for split in self.splits { let actor_id = self.source_ctx.source_info.actor_id.to_string(); @@ -94,7 +95,7 @@ impl OpendalReader { .metrics .partition_input_count .with_label_values(&[&actor_id, &source_id, &split_id]) - .inc_by(msg.chunk.cardinality() as u64); + .inc_by(msg.cardinality() as u64); yield msg; } } diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 7d3e638811b63..c223febd1cf56 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -25,6 +25,7 @@ use aws_smithy_types::body::SdkBody; use aws_smithy_types::byte_stream::ByteStream; use futures_async_stream::try_stream; use io::StreamReader; +use risingwave_common::array::StreamChunk; use risingwave_common::error::RwError; use tokio::io::BufReader; use tokio_util::io; @@ -38,10 +39,7 @@ use crate::source::filesystem::file_common::FsSplit; use crate::source::filesystem::nd_streaming; use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::s3::S3Properties; -use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta, - StreamChunkWithState, -}; +use crate::source::{BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta}; const MAX_CHANNEL_BUFFER_SIZE: usize = 2048; const STREAM_READER_CAPACITY: usize = 4096; @@ -199,13 +197,13 @@ impl SplitReader for S3FileReader { Ok(s3_file_reader) } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { self.into_chunk_stream() } } impl S3FileReader { - #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] + #[try_stream(boxed, ok = StreamChunk, error = RwError)] async fn into_chunk_stream(self) { for split in self.splits { let actor_id = self.source_ctx.source_info.actor_id.to_string(); @@ -235,7 +233,7 @@ impl S3FileReader { .metrics .partition_input_count .with_label_values(&[&actor_id, &source_id, &split_id]) - .inc_by(msg.chunk.cardinality() as u64); + .inc_by(msg.cardinality() as u64); yield msg; } } diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index dfaf21ad3164e..d18fcb0be258b 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -25,7 +25,7 @@ use super::TaggedReceivedMessage; use crate::parser::ParserConfig; use crate::source::google_pubsub::{PubsubProperties, PubsubSplit}; use crate::source::{ - into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, + into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, SourceMessage, SplitId, SplitMetaData, SplitReader, }; @@ -164,7 +164,7 @@ impl SplitReader for PubsubSplitReader { }) } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self, parser_config, source_context) diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 95e44a26cd919..ed9a7929c3231 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -33,8 +33,8 @@ use crate::source::kafka::{ KafkaProperties, KafkaSplit, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL, }; use crate::source::{ - into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, - SplitId, SplitMetaData, SplitReader, + into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, SplitId, + SplitMetaData, SplitReader, }; pub struct KafkaSplitReader { @@ -145,7 +145,7 @@ impl SplitReader for KafkaSplitReader { }) } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self, parser_config, source_context) diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index 0e5b07d97736a..51b3c77710410 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -29,7 +29,7 @@ use crate::source::kinesis::source::message::from_kinesis_record; use crate::source::kinesis::split::{KinesisOffset, KinesisSplit}; use crate::source::kinesis::KinesisProperties; use crate::source::{ - into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, + into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, SourceMessage, SplitId, SplitMetaData, SplitReader, }; @@ -113,7 +113,7 @@ impl SplitReader for KinesisSplitReader { }) } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self, parser_config, source_context) diff --git a/src/connector/src/source/manager.rs b/src/connector/src/source/manager.rs index 48b8a9fe5c096..049515d6091a8 100644 --- a/src/connector/src/source/manager.rs +++ b/src/connector/src/source/manager.rs @@ -31,12 +31,15 @@ pub struct SourceColumnDesc { pub fields: Vec, pub column_type: SourceColumnType, - // `is_pk` is used to indicate whether the column is part of the primary key columns. + /// `is_pk` is used to indicate whether the column is part of the primary key columns. pub is_pk: bool, - // `additional_column_type` and `column_type` are orthogonal - // `additional_column_type` is used to indicate the column is from which part of the message - // `column_type` is used to indicate the type of the column, only used in cdc scenario + /// `is_hidden_addition_col` is used to indicate whether the column is a hidden addition column. + pub is_hidden_addition_col: bool, + + /// `additional_column_type` and `column_type` are orthogonal + /// `additional_column_type` is used to indicate the column is from which part of the message + /// `column_type` is used to indicate the type of the column, only used in cdc scenario pub additional_column_type: AdditionalColumn, } @@ -87,10 +90,18 @@ impl SourceColumnDesc { fields: vec![], column_type: SourceColumnType::Normal, is_pk: false, + is_hidden_addition_col: false, additional_column_type: AdditionalColumn { column_type: None }, } } + pub fn hidden_addition_col_from_column_desc(c: &ColumnDesc) -> Self { + Self { + is_hidden_addition_col: true, + ..c.into() + } + } + pub fn is_row_id(&self) -> bool { self.column_type == SourceColumnType::RowId } @@ -105,7 +116,7 @@ impl SourceColumnDesc { #[inline] pub fn is_visible(&self) -> bool { - self.column_type == SourceColumnType::Normal + !self.is_hidden_addition_col && self.column_type == SourceColumnType::Normal } } @@ -119,6 +130,7 @@ impl From<&ColumnDesc> for SourceColumnDesc { fields: c.field_descs.clone(), column_type, is_pk: false, + is_hidden_addition_col: false, additional_column_type: c.additional_columns.clone(), } } diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 88ba9d862a925..20a8f9c0dbc0b 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -24,7 +24,7 @@ use crate::parser::ParserConfig; use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::nats::NatsProperties; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitReader, + BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitId, SplitReader, }; pub struct NatsSplitReader { @@ -93,7 +93,7 @@ impl SplitReader for NatsSplitReader { }) } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self, parser_config, source_context) diff --git a/src/connector/src/source/nexmark/source/combined_event.rs b/src/connector/src/source/nexmark/source/combined_event.rs index 7242c838f77f4..47a4d7e6f421d 100644 --- a/src/connector/src/source/nexmark/source/combined_event.rs +++ b/src/connector/src/source/nexmark/source/combined_event.rs @@ -16,7 +16,6 @@ pub use nexmark::event::EventType; use nexmark::event::{Auction, Bid, Event, Person}; use risingwave_common::array::StructValue; use risingwave_common::catalog::ROWID_PREFIX; -use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum, ScalarImpl, StructType, Timestamp}; use serde::{Deserialize, Serialize}; @@ -180,7 +179,10 @@ pub(crate) fn get_bid_struct_type() -> StructType { ]) } -pub(crate) fn combined_event_to_row(e: CombinedEvent, row_id_index: Option) -> OwnedRow { +pub(crate) fn combined_event_to_row( + e: CombinedEvent, + row_id_index: Option, +) -> Vec> { let mut fields = vec![ Some(ScalarImpl::Int64(e.event_type as i64)), e.person @@ -199,10 +201,10 @@ pub(crate) fn combined_event_to_row(e: CombinedEvent, row_id_index: Option) -> OwnedRow { +pub(crate) fn event_to_row(e: Event, row_id_index: Option) -> Vec> { let mut fields = match e { Event::Person(p) => person_to_datum(p), Event::Auction(a) => auction_to_datum(a), @@ -212,7 +214,7 @@ pub(crate) fn event_to_row(e: Event, row_id_index: Option) -> OwnedRow { // _row_id fields.insert(row_id_index, None); } - OwnedRow::new(fields) + fields } fn person_to_datum(p: Person) -> Vec { diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index 10c052c6d1b98..e8ff3097d8217 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -18,13 +18,15 @@ use anyhow::Result; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; -use maplit::hashmap; use nexmark::config::NexmarkConfig; use nexmark::event::EventType; use nexmark::EventGenerator; +use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::error::RwError; use risingwave_common::estimate_size::EstimateSize; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; use tokio::time::Instant; use crate::parser::ParserConfig; @@ -34,8 +36,7 @@ use crate::source::nexmark::source::combined_event::{ }; use crate::source::nexmark::{NexmarkProperties, NexmarkSplit}; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitMetaData, SplitReader, - StreamChunkWithState, + BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData, SplitReader, }; #[derive(Debug)] @@ -106,7 +107,7 @@ impl SplitReader for NexmarkSplitReader { }) } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { let actor_id = self.source_ctx.source_info.actor_id.to_string(); let source_id = self.source_ctx.source_info.source_id.to_string(); let split_id = self.split_id.clone(); @@ -115,18 +116,17 @@ impl SplitReader for NexmarkSplitReader { // Will buffer at most 4 event chunks. const BUFFER_SIZE: usize = 4; spawn_data_generation_stream( - self.into_native_stream().inspect_ok( - move |chunk_with_states: &StreamChunkWithState| { + self.into_native_stream() + .inspect_ok(move |chunk: &StreamChunk| { metrics .partition_input_count .with_label_values(&[&actor_id, &source_id, &split_id]) - .inc_by(chunk_with_states.chunk.cardinality() as u64); + .inc_by(chunk.cardinality() as u64); metrics .partition_input_bytes .with_label_values(&[&actor_id, &source_id, &split_id]) - .inc_by(chunk_with_states.chunk.estimated_size() as u64); - }, - ), + .inc_by(chunk.estimated_size() as u64); + }), BUFFER_SIZE, ) .boxed() @@ -134,49 +134,60 @@ impl SplitReader for NexmarkSplitReader { } impl NexmarkSplitReader { - #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] + async fn sleep_til_next_event(&self, start_time: Instant, start_offset: u64, start_ts: u64) { + if self.use_real_time { + tokio::time::sleep_until( + start_time + Duration::from_millis(self.generator.timestamp() - start_ts), + ) + .await; + } else if self.min_event_gap_in_ns > 0 { + tokio::time::sleep_until( + start_time + + Duration::from_nanos( + self.min_event_gap_in_ns * (self.generator.global_offset() - start_offset), + ), + ) + .await; + } + } + + #[try_stream(boxed, ok = StreamChunk, error = RwError)] async fn into_native_stream(mut self) { let start_time = Instant::now(); let start_offset = self.generator.global_offset(); let start_ts = self.generator.timestamp(); - let event_dtypes = get_event_data_types(self.event_type, self.row_id_index); + let mut event_dtypes_with_offset = get_event_data_types(self.event_type, self.row_id_index); + event_dtypes_with_offset.extend([DataType::Varchar, DataType::Varchar]); + + let mut chunk_builder = + StreamChunkBuilder::new(self.max_chunk_size as usize, event_dtypes_with_offset); loop { - let mut rows = vec![]; - while (rows.len() as u64) < self.max_chunk_size { - if self.generator.global_offset() >= self.event_num { - break; - } - let event = self.generator.next().unwrap(); - let row = match self.event_type { - Some(_) => event_to_row(event, self.row_id_index), - None => combined_event_to_row(new_combined_event(event), self.row_id_index), - }; - rows.push((Op::Insert, row)); - } - if rows.is_empty() { + if self.generator.global_offset() >= self.event_num { break; } - if self.use_real_time { - tokio::time::sleep_until( - start_time + Duration::from_millis(self.generator.timestamp() - start_ts), - ) - .await; - } else if self.min_event_gap_in_ns > 0 { - tokio::time::sleep_until( - start_time - + Duration::from_nanos( - self.min_event_gap_in_ns - * (self.generator.global_offset() - start_offset), - ), - ) - .await; - } - let mapping = hashmap! {self.split_id.clone() => self.generator.offset().to_string()}; - let stream_chunk = StreamChunk::from_rows(&rows, &event_dtypes); - yield StreamChunkWithState { - chunk: stream_chunk, - split_offset_mapping: Some(mapping), + let event = self.generator.next().unwrap(); + let mut fields = match self.event_type { + Some(_) => event_to_row(event, self.row_id_index), + None => combined_event_to_row(new_combined_event(event), self.row_id_index), }; + fields.extend([ + Some(ScalarImpl::Utf8(self.split_id.as_ref().into())), + Some(ScalarImpl::Utf8( + self.generator.offset().to_string().into_boxed_str(), + )), + ]); + + if let Some(chunk) = chunk_builder.append_row(Op::Insert, OwnedRow::new(fields)) { + self.sleep_til_next_event(start_time, start_offset, start_ts) + .await; + yield chunk; + } + } + + if let Some(chunk) = chunk_builder.take() { + self.sleep_til_next_event(start_time, start_offset, start_ts) + .await; + yield chunk; } tracing::debug!(?self.event_type, "nexmark generator finished"); @@ -224,4 +235,45 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_nexmark_event_num() -> Result<()> { + let max_chunk_size = 32; + let event_num = max_chunk_size * 128 + 1; + let props = NexmarkProperties { + split_num: 1, + min_event_gap_in_ns: 0, + table_type: None, + max_chunk_size, + event_num, + ..Default::default() + }; + + let mut enumerator = + NexmarkSplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into()) + .await?; + let list_splits_resp: Vec<_> = enumerator.list_splits().await?.into_iter().collect(); + + for split in list_splits_resp { + let state = vec![split]; + let reader = NexmarkSplitReader::new( + props.clone(), + state, + Default::default(), + Default::default(), + None, + ) + .await? + .into_stream(); + let (rows_count, chunk_count) = reader + .fold((0, 0), |acc, x| async move { + (acc.0 + x.unwrap().cardinality(), acc.1 + 1) + }) + .await; + assert_eq!(rows_count as u64, event_num); + assert_eq!(chunk_count as u64, event_num / max_chunk_size + 1); + } + + Ok(()) + } } diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 8d80487a7da8b..07265f1e4acfc 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -37,8 +37,8 @@ use crate::parser::ParserConfig; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; use crate::source::{ - into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, - SourceMessage, SplitId, SplitMetaData, SplitReader, StreamChunkWithState, + into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, + SourceMessage, SplitId, SplitMetaData, SplitReader, }; pub enum PulsarSplitReader { @@ -83,7 +83,7 @@ impl SplitReader for PulsarSplitReader { } } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { match self { Self::Broker(reader) => { let (parser_config, source_context) = @@ -234,7 +234,7 @@ impl SplitReader for PulsarBrokerReader { }) } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self, parser_config, source_context) @@ -356,7 +356,7 @@ impl PulsarIcebergReader { Ok(table) } - #[try_stream(ok = StreamChunkWithState, error = anyhow::Error)] + #[try_stream(ok = (StreamChunk, HashMap), error = anyhow::Error)] async fn as_stream_chunk_stream(&self) { #[for_await] for file_scan in self.scan().await? { @@ -371,7 +371,7 @@ impl PulsarIcebergReader { } } - #[try_stream(ok = StreamChunkWithState, error = RwError)] + #[try_stream(ok = StreamChunk, error = RwError)] async fn into_stream(self) { let (props, mut split, parser_config, source_ctx) = ( self.props.clone(), @@ -384,13 +384,9 @@ impl PulsarIcebergReader { #[for_await] for msg in self.as_stream_chunk_stream() { - let msg = + let (_chunk, mapping) = msg.inspect_err(|e| tracing::error!("Failed to read message from iceberg: {}", e))?; - last_msg_id = msg - .split_offset_mapping - .as_ref() - .and_then(|m| m.get(self.split.topic.to_string().as_str())) - .cloned(); + last_msg_id = mapping.get(self.split.topic.to_string().as_str()).cloned(); } tracing::info!("Finished reading pulsar message from iceberg"); @@ -470,7 +466,7 @@ impl PulsarIcebergReader { fn convert_record_batch_to_source_with_state( &self, record_batch: &RecordBatch, - ) -> Result { + ) -> Result<(StreamChunk, HashMap)> { let mut offsets = Vec::with_capacity(record_batch.num_rows()); let ledger_id_array = record_batch @@ -539,14 +535,11 @@ impl PulsarIcebergReader { let stream_chunk = StreamChunk::from(data_chunk); - let state = Some(HashMap::from([( + let state = HashMap::from([( self.split.topic.to_string().into(), offsets.last().unwrap().clone(), - )])); + )]); - Ok(StreamChunkWithState { - chunk: stream_chunk, - split_offset_mapping: state, - }) + Ok((stream_chunk, state)) } } diff --git a/src/connector/src/source/reader/desc.rs b/src/connector/src/source/reader/desc.rs index 1f94267b46dd1..e049be8bbe940 100644 --- a/src/connector/src/source/reader/desc.rs +++ b/src/connector/src/source/reader/desc.rs @@ -15,18 +15,24 @@ use std::collections::HashMap; use std::sync::Arc; -use risingwave_common::catalog::ColumnDesc; +use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::PbStreamSourceInfo; -use risingwave_pb::plan_common::PbColumnCatalog; +use risingwave_pb::plan_common::additional_column::ColumnType; +use risingwave_pb::plan_common::{AdditionalColumn, PbColumnCatalog}; #[expect(deprecated)] use super::fs_reader::FsSourceReader; use super::reader::SourceReader; +use crate::parser::additional_columns::{ + build_additional_column_catalog, COMMON_COMPATIBLE_ADDITIONAL_COLUMNS, + COMPATIBLE_ADDITIONAL_COLUMNS, +}; use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig}; use crate::source::monitor::SourceMetrics; -use crate::source::{SourceColumnDesc, SourceColumnType}; +use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY}; use crate::ConnectorParams; pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16; @@ -85,12 +91,83 @@ impl SourceDescBuilder { } } - fn column_catalogs_to_source_column_descs(&self) -> Vec { + /// This function builds `SourceColumnDesc` from `ColumnCatalog`, and handle the creation + /// of hidden columns like partition/file, offset that are not specified by user. + pub fn column_catalogs_to_source_column_descs(&self) -> Vec { + let mut columns_exist = [false; 2]; + let mut last_column_id = self + .columns + .iter() + .map(|c| c.column_desc.as_ref().unwrap().column_id.into()) + .max() + .unwrap_or(ColumnId::placeholder()); + let connector_name = self + .with_properties + .get(UPSTREAM_SOURCE_KEY) + .map(|s| s.to_lowercase()) + .unwrap(); + + let additional_columns: Vec<_> = { + let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS + .get(&*connector_name) + .unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS); + ["partition", "file", "offset"] + .iter() + .filter_map(|col_type| { + last_column_id = last_column_id.next(); + if compat_col_types.contains(col_type) { + Some( + build_additional_column_catalog( + last_column_id, + &connector_name, + col_type, + None, + None, + None, + false, + ) + .unwrap() + .to_protobuf(), + ) + } else { + None + } + }) + .collect() + }; + assert_eq!(additional_columns.len(), 2); + + // Check if partition/file/offset columns are included explicitly. + for col in &self.columns { + match col.column_desc.as_ref().unwrap().get_additional_columns() { + Ok(AdditionalColumn { + column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)), + }) => { + columns_exist[0] = true; + } + Ok(AdditionalColumn { + column_type: Some(ColumnType::Offset(_)), + }) => { + columns_exist[1] = true; + } + _ => (), + } + } + let mut columns: Vec<_> = self .columns .iter() .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) .collect(); + + for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) { + if !existed { + columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc( + &ColumnDesc::from(c.column_desc.as_ref().unwrap()), + )); + } + } + if let Some(row_id_index) = self.row_id_index { columns[row_id_index].column_type = SourceColumnType::RowId; } diff --git a/src/connector/src/source/reader/fs_reader.rs b/src/connector/src/source/reader/fs_reader.rs index 95106ebbd2eee..bb8cc39389e47 100644 --- a/src/connector/src/source/reader/fs_reader.rs +++ b/src/connector/src/source/reader/fs_reader.rs @@ -27,7 +27,7 @@ use risingwave_common::error::Result; use crate::dispatch_source_prop; use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use crate::source::{ - create_split_reader, BoxSourceWithStateStream, ConnectorProperties, ConnectorState, + create_split_reader, BoxChunkSourceStream, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitReader, }; @@ -82,7 +82,7 @@ impl FsSourceReader { state: ConnectorState, column_ids: Vec, source_ctx: Arc, - ) -> Result { + ) -> Result { let config = self.config.clone(); let columns = self.get_target_columns(column_ids)?; diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 5f9259058f869..571825b05e5fc 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -35,7 +35,7 @@ use crate::source::filesystem::opendal_source::{ }; use crate::source::filesystem::FsPageItem; use crate::source::{ - create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties, + create_split_reader, BoxChunkSourceStream, BoxTryStream, Column, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitReader, }; @@ -107,7 +107,7 @@ impl SourceReader { state: ConnectorState, column_ids: Vec, source_ctx: Arc, - ) -> Result { + ) -> Result { let Some(splits) = state else { return Ok(pending().boxed()); }; diff --git a/src/connector/src/source/test_source.rs b/src/connector/src/source/test_source.rs index 6c10ff9934eef..e0b901ddbf253 100644 --- a/src/connector/src/source/test_source.rs +++ b/src/connector/src/source/test_source.rs @@ -24,8 +24,8 @@ use with_options::WithOptions; use crate::parser::ParserConfig; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceEnumeratorContextRef, - SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, TryFromHashmap, + BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, + SplitEnumerator, SplitId, SplitMetaData, SplitReader, TryFromHashmap, }; pub type BoxListSplits = Box< @@ -44,7 +44,7 @@ pub type BoxIntoSourceStream = Box< ParserConfig, SourceContextRef, Option>, - ) -> BoxSourceWithStateStream + ) -> BoxChunkSourceStream + Send + 'static, >; @@ -68,7 +68,7 @@ impl BoxSource { ParserConfig, SourceContextRef, Option>, - ) -> BoxSourceWithStateStream + ) -> BoxChunkSourceStream + Send + 'static, ) -> BoxSource { @@ -218,7 +218,7 @@ impl SplitReader for TestSourceSplitReader { }) } - fn into_stream(self) -> BoxSourceWithStateStream { + fn into_stream(self) -> BoxChunkSourceStream { (get_registry() .box_source .lock() diff --git a/src/dml/src/table.rs b/src/dml/src/table.rs index ad20b1c13f12a..4981ebce2e8a3 100644 --- a/src/dml/src/table.rs +++ b/src/dml/src/table.rs @@ -20,7 +20,6 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::ColumnDesc; use risingwave_common::transaction::transaction_id::TxnId; use risingwave_common::transaction::transaction_message::TxnMsg; -use risingwave_connector::source::StreamChunkWithState; use tokio::sync::oneshot; use crate::error::{DmlError, Result}; @@ -248,7 +247,7 @@ pub struct TableStreamReader { } impl TableStreamReader { - #[try_stream(boxed, ok = StreamChunkWithState, error = DmlError)] + #[try_stream(boxed, ok = StreamChunk, error = DmlError)] pub async fn into_data_stream_for_test(mut self) { while let Some((txn_msg, notifier)) = self.rx.recv().await { // Notify about that we've taken the chunk. @@ -258,7 +257,7 @@ impl TableStreamReader { } TxnMsg::Data(_, chunk) => { _ = notifier.send(chunk.cardinality()); - yield chunk.into(); + yield chunk; } } } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a03e074301cd1..5f25d12650f0c 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -544,6 +544,7 @@ pub fn handle_addition_columns( item.column_alias.map(|alias| alias.real_value()), item.inner_field.as_deref(), data_type_name.as_deref(), + true, )?); } @@ -863,6 +864,7 @@ fn check_and_add_timestamp_column( Some(KAFKA_TIMESTAMP_COLUMN_NAME.to_string()), None, None, + true, ) .unwrap(); catalog.is_hidden = true; diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 324a12f8df6f5..c29afda3dcae5 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -21,6 +21,7 @@ use either::Either; use futures::stream::{self, StreamExt}; use futures::{pin_mut, TryStreamExt}; use futures_async_stream::try_stream; +use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnId, Schema, TableId}; use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{OwnedRow, Row}; @@ -31,8 +32,7 @@ use risingwave_connector::source::filesystem::opendal_source::{ use risingwave_connector::source::filesystem::OpendalFsSplit; use risingwave_connector::source::reader::desc::SourceDesc; use risingwave_connector::source::{ - BoxSourceWithStateStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData, - StreamChunkWithState, + BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData, }; use risingwave_connector::ConnectorParams; use risingwave_storage::store::PrefetchOptions; @@ -40,11 +40,7 @@ use risingwave_storage::StateStore; use thiserror_ext::AsReport; use crate::executor::stream_reader::StreamReaderWithPause; -use crate::executor::{ - expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, - ExecutorInfo, Message, Mutation, PkIndicesRef, SourceStateTableHandler, StreamExecutorError, - StreamExecutorResult, StreamSourceCore, -}; +use crate::executor::*; const SPLIT_BATCH_SIZE: usize = 1000; @@ -96,7 +92,7 @@ impl FsFetchExecutor { column_ids: Vec, source_ctx: SourceContext, source_desc: &SourceDesc, - stream: &mut StreamReaderWithPause, + stream: &mut StreamReaderWithPause, ) -> StreamExecutorResult<()> { let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE); 'vnodes: for vnode in state_store_handler.state_store.vnodes().iter_vnodes() { @@ -160,7 +156,7 @@ impl FsFetchExecutor { source_ctx: SourceContext, source_desc: &SourceDesc, batch: SplitBatch, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult { source_desc .source .to_stream(batch, column_ids, Arc::new(source_ctx)) @@ -196,14 +192,17 @@ impl FsFetchExecutor { .build() .map_err(StreamExecutorError::connector_error)?; + let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns) + else { + unreachable!("Partition and offset columns must be set."); + }; + // Initialize state table. state_store_handler.init_epoch(barrier.epoch); let mut splits_on_fetch: usize = 0; - let mut stream = StreamReaderWithPause::::new( - upstream, - stream::pending().boxed(), - ); + let mut stream = + StreamReaderWithPause::::new(upstream, stream::pending().boxed()); if barrier.is_pause_on_startup() { stream.pause_stream(); @@ -279,7 +278,7 @@ impl FsFetchExecutor { yield msg; } // Receiving file assignments from upstream list executor, - // store into state table and try building a new reader. + // store into state table. Message::Chunk(chunk) => { let file_assignment = chunk .data_chunk() @@ -301,12 +300,10 @@ impl FsFetchExecutor { } } // StreamChunk from FsSourceReader, and the reader reads only one file. - // If the file read out, replace with a new file reader. - Either::Right(StreamChunkWithState { - chunk, - split_offset_mapping, - }) => { - let mapping = split_offset_mapping.unwrap(); + Either::Right(chunk) => { + let mapping = + get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx) + .unwrap(); debug_assert_eq!(mapping.len(), 1); if let Some((split_id, offset)) = mapping.into_iter().next() { let row = state_store_handler @@ -332,6 +329,12 @@ impl FsFetchExecutor { } } + let chunk = prune_additional_cols( + &chunk, + split_idx, + offset_idx, + &source_desc.columns, + ); yield Message::Chunk(chunk); } } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 5e78532b9282b..cb473b82d4c4a 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -27,8 +27,8 @@ use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_connector::source::reader::desc::{FsSourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ - BoxSourceWithStateStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, - SplitMetaData, StreamChunkWithState, + BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, + SplitMetaData, }; use risingwave_storage::StateStore; use tokio::sync::mpsc::UnboundedReceiver; @@ -92,7 +92,7 @@ impl FsSourceExecutor { &mut self, source_desc: &FsSourceDesc, state: ConnectorState, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult { let column_ids = source_desc .columns .iter() @@ -118,7 +118,7 @@ impl FsSourceExecutor { async fn apply_split_change( &mut self, source_desc: &FsSourceDesc, - stream: &mut StreamReaderWithPause, + stream: &mut StreamReaderWithPause, mapping: &HashMap>, ) -> StreamExecutorResult<()> { if let Some(target_splits) = mapping.get(&self.actor_ctx.id).cloned() { @@ -184,7 +184,7 @@ impl FsSourceExecutor { async fn replace_stream_reader_with_target_state( &mut self, source_desc: &FsSourceDesc, - stream: &mut StreamReaderWithPause, + stream: &mut StreamReaderWithPause, target_state: Vec, ) -> StreamExecutorResult<()> { tracing::info!( @@ -283,6 +283,11 @@ impl FsSourceExecutor { .build_fs_source_desc() .map_err(StreamExecutorError::connector_error)?; + let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns) + else { + unreachable!("Partition and offset columns must be set."); + }; + // If the first barrier requires us to pause on startup, pause the stream. let start_with_paused = barrier.is_pause_on_startup(); @@ -344,10 +349,8 @@ impl FsSourceExecutor { // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); - let mut stream = StreamReaderWithPause::::new( - barrier_stream, - source_chunk_reader, - ); + let mut stream = + StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); if start_with_paused { stream.pause_stream(); } @@ -412,10 +415,10 @@ impl FsSourceExecutor { } }, - Either::Right(StreamChunkWithState { - chunk, - split_offset_mapping, - }) => { + Either::Right(chunk) => { + // TODO: confirm when split_offset_mapping is None + let split_offset_mapping = + get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx); if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms { // Exceeds the max wait barrier time, the source will be paused. Currently // we can guarantee the source is not paused since it received stream @@ -455,7 +458,11 @@ impl FsSourceExecutor { self.actor_ctx.id.to_string().as_str(), ]) .inc_by(chunk.cardinality() as u64); + + let chunk = + prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns); yield Message::Chunk(chunk); + self.try_flush_data().await?; } } diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index b03aa634ca241..7df7cc2ea8373 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -13,12 +13,20 @@ // limitations under the License. pub mod executor_core; +use std::collections::HashMap; + use await_tree::InstrumentAwait; pub use executor_core::StreamSourceCore; mod fs_source_executor; #[expect(deprecated)] pub use fs_source_executor::*; +use itertools::Itertools; +use risingwave_common::array::StreamChunk; use risingwave_common::bail; +use risingwave_common::row::Row; +use risingwave_connector::source::{SourceColumnDesc, SplitId}; +use risingwave_pb::plan_common::additional_column::ColumnType; +use risingwave_pb::plan_common::AdditionalColumn; pub use state_table_handler::*; pub mod fetch_executor; pub use fetch_executor::*; @@ -42,3 +50,55 @@ pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver) { } bail!("barrier reader closed unexpectedly"); } + +pub fn get_split_offset_mapping_from_chunk( + chunk: &StreamChunk, + split_idx: usize, + offset_idx: usize, +) -> Option> { + let mut split_offset_mapping = HashMap::new(); + for (_, row) in chunk.rows() { + let split_id = row.datum_at(split_idx).unwrap().into_utf8().into(); + let offset = row.datum_at(offset_idx).unwrap().into_utf8(); + split_offset_mapping.insert(split_id, offset.to_string()); + } + Some(split_offset_mapping) +} + +pub fn get_split_offset_col_idx( + column_descs: &[SourceColumnDesc], +) -> (Option, Option) { + let mut split_idx = None; + let mut offset_idx = None; + for (idx, column) in column_descs.iter().enumerate() { + match column.additional_column_type { + AdditionalColumn { + column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)), + } => { + split_idx = Some(idx); + } + AdditionalColumn { + column_type: Some(ColumnType::Offset(_)), + } => { + offset_idx = Some(idx); + } + _ => (), + } + } + (split_idx, offset_idx) +} + +pub fn prune_additional_cols( + chunk: &StreamChunk, + split_idx: usize, + offset_idx: usize, + column_descs: &[SourceColumnDesc], +) -> StreamChunk { + chunk.project( + &(0..chunk.dimension()) + .filter(|&idx| { + (idx != split_idx && idx != offset_idx) || column_descs[idx].is_visible() + }) + .collect_vec(), + ) +} diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index e3675ccb33555..3f9a7664b751e 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -24,8 +24,7 @@ use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ - BoxSourceWithStateStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitMetaData, - StreamChunkWithState, + BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitMetaData, }; use risingwave_connector::ConnectorParams; use risingwave_storage::StateStore; @@ -93,7 +92,7 @@ impl SourceExecutor { &self, source_desc: &SourceDesc, state: ConnectorState, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult { let column_ids = source_desc .columns .iter() @@ -136,7 +135,7 @@ impl SourceExecutor { async fn apply_split_change( &mut self, source_desc: &SourceDesc, - stream: &mut StreamReaderWithPause, + stream: &mut StreamReaderWithPause, split_assignment: &HashMap>, ) -> StreamExecutorResult>> { self.metrics @@ -229,7 +228,7 @@ impl SourceExecutor { async fn rebuild_stream_reader_from_error( &mut self, source_desc: &SourceDesc, - stream: &mut StreamReaderWithPause, + stream: &mut StreamReaderWithPause, split_info: &mut [SplitImpl], e: StreamExecutorError, ) -> StreamExecutorResult<()> { @@ -273,7 +272,7 @@ impl SourceExecutor { async fn replace_stream_reader_with_target_state( &mut self, source_desc: &SourceDesc, - stream: &mut StreamReaderWithPause, + stream: &mut StreamReaderWithPause, target_state: Vec, ) -> StreamExecutorResult<()> { tracing::info!( @@ -376,6 +375,11 @@ impl SourceExecutor { .build() .map_err(StreamExecutorError::connector_error)?; + let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns) + else { + unreachable!("Partition and offset columns must be set."); + }; + let mut boot_state = Vec::default(); if let Some(mutation) = barrier.mutation.as_ref() { match mutation.as_ref() { @@ -427,10 +431,8 @@ impl SourceExecutor { // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); - let mut stream = StreamReaderWithPause::::new( - barrier_stream, - source_chunk_reader, - ); + let mut stream = + StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); // If the first barrier requires us to pause on startup, pause the stream. if barrier.is_pause_on_startup() { @@ -546,10 +548,10 @@ impl SourceExecutor { } }, - Either::Right(StreamChunkWithState { - chunk, - split_offset_mapping, - }) => { + Either::Right(chunk) => { + // TODO: confirm when split_offset_mapping is None + let split_offset_mapping = + get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx); if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms { // Exceeds the max wait barrier time, the source will be paused. // Currently we can guarantee the @@ -610,6 +612,12 @@ impl SourceExecutor { .collect::>(), ) .inc_by(chunk.cardinality() as u64); + let chunk = prune_additional_cols( + &chunk, + split_idx, + offset_idx, + &source_desc.columns, + ); yield Message::Chunk(chunk); self.try_flush_data().await?; } diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index 263c0a98f8a71..4757ca76a9170 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -132,7 +132,6 @@ mod tests { use futures::{pin_mut, FutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::transaction::transaction_id::TxnId; - use risingwave_connector::source::StreamChunkWithState; use risingwave_dml::TableDmlHandle; use tokio::sync::mpsc; @@ -163,8 +162,7 @@ mod tests { .unwrap(); let barrier_stream = barrier_to_message_stream(barrier_rx).boxed(); - let stream = - StreamReaderWithPause::::new(barrier_stream, source_stream); + let stream = StreamReaderWithPause::::new(barrier_stream, source_stream); pin_mut!(stream); macro_rules! next { @@ -184,7 +182,7 @@ mod tests { .write_chunk(StreamChunk::default()) .await .unwrap(); - // We don't call end() here, since we test `StreamChunkWithState` instead of `TxnMsg`. + // We don't call end() here, since we test `StreamChunk` instead of `TxnMsg`. assert_matches!(next!().unwrap(), Either::Right(_)); // Write a barrier, and we should receive it. @@ -202,7 +200,7 @@ mod tests { .write_chunk(StreamChunk::default()) .await .unwrap(); - // We don't call end() here, since we test `StreamChunkWithState` instead of `TxnMsg`. + // We don't call end() here, since we test `StreamChunk` instead of `TxnMsg`. // We should receive the barrier. assert_matches!(next!().unwrap(), Either::Left(_)); diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index b03bdd5e23917..97cf26129d8d5 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use risingwave_common::catalog::{ColumnId, TableId}; +use risingwave_common::catalog::TableId; use risingwave_connector::source::filesystem::opendal_source::{ OpendalGcs, OpendalPosixFs, OpendalS3, }; @@ -63,10 +63,10 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { chunk_size: params.env.config().developer.chunk_size, }; - let source_column_ids: Vec<_> = source - .columns + let source_column_ids: Vec<_> = source_desc_builder + .column_catalogs_to_source_column_descs() .iter() - .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id)) + .map(|column| column.column_id) .collect(); let vnodes = Some(Arc::new( diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 3c3a82de9fb29..a99be097e881d 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::catalog::{ - default_key_column_name_version_mapping, ColumnId, TableId, KAFKA_TIMESTAMP_COLUMN_NAME, + default_key_column_name_version_mapping, TableId, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_connector::source::reader::desc::SourceDescBuilder; use risingwave_connector::source::{ @@ -159,9 +159,10 @@ impl ExecutorBuilder for SourceExecutorBuilder { chunk_size: params.env.config().developer.chunk_size, }; - let source_column_ids: Vec<_> = source_columns + let source_column_ids: Vec<_> = source_desc_builder + .column_catalogs_to_source_column_descs() .iter() - .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id)) + .map(|column| column.column_id) .collect(); let state_table_handler = SourceStateTableHandler::from_table_catalog( diff --git a/src/tests/simulation/tests/integration_tests/sink/utils.rs b/src/tests/simulation/tests/integration_tests/sink/utils.rs index 6b9ea61e708b1..a134ab1a265ff 100644 --- a/src/tests/simulation/tests/integration_tests/sink/utils.rs +++ b/src/tests/simulation/tests/integration_tests/sink/utils.rs @@ -38,7 +38,6 @@ use risingwave_connector::sink::SinkError; use risingwave_connector::source::test_source::{ registry_test_source, BoxSource, TestSourceRegistryGuard, TestSourceSplit, }; -use risingwave_connector::source::StreamChunkWithState; use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration}; use tokio::time::sleep; @@ -230,20 +229,30 @@ impl SimulationTestSink { } } -pub fn build_stream_chunk(row_iter: impl Iterator) -> StreamChunk { +pub fn build_stream_chunk( + row_iter: impl Iterator, +) -> StreamChunk { static ROW_ID_GEN: LazyLock> = LazyLock::new(|| Arc::new(AtomicI64::new(0))); let mut builder = DataChunkBuilder::new( - vec![DataType::Int32, DataType::Varchar, DataType::Serial], + vec![ + DataType::Int32, + DataType::Varchar, + DataType::Serial, + DataType::Varchar, + DataType::Varchar, + ], 100000, ); - for (id, name) in row_iter { + for (id, name, split_id, offset) in row_iter { let row_id = ROW_ID_GEN.fetch_add(1, Relaxed); std::assert!(builder .append_one_row([ Some(ScalarImpl::Int32(id)), Some(ScalarImpl::Utf8(name.into())), Some(ScalarImpl::Serial(Serial::from(row_id))), + Some(ScalarImpl::Utf8(split_id.into())), + Some(ScalarImpl::Utf8(offset.into())), ]) .is_none()); } @@ -297,20 +306,19 @@ impl SimulationTestSource { split.offset.parse::().unwrap() + 1 }; - let mut stream: BoxStream<'static, StreamChunkWithState> = empty().boxed(); + let mut stream: BoxStream<'static, StreamChunk> = empty().boxed(); while offset < id_list.len() { let mut chunks = Vec::new(); while offset < id_list.len() && chunks.len() < pause_interval { let id = id_list[offset]; - let chunk = build_stream_chunk(once((id, simple_name_of_id(id)))); - let mut split_offset = HashMap::new(); - split_offset.insert(split.id.clone(), offset.to_string()); - let chunk_with_state = StreamChunkWithState { - chunk, - split_offset_mapping: Some(split_offset), - }; - chunks.push(chunk_with_state); + let chunk = build_stream_chunk(once(( + id, + simple_name_of_id(id), + split.id.to_string(), + offset.to_string(), + ))); + chunks.push(chunk); offset += 1; } @@ -332,7 +340,7 @@ impl SimulationTestSource { } stream - .chain(async { pending::().await }.into_stream()) + .chain(async { pending::().await }.into_stream()) .map(|chunk| Ok(chunk)) .boxed() }))