Skip to content

Commit

Permalink
refactor: deprecate StreamChunkWithState (#14524)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 authored Jan 25, 2024
1 parent 7cf63df commit 372c2d7
Show file tree
Hide file tree
Showing 38 changed files with 497 additions and 287 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl SourceExecutor {
#[for_await]
for chunk in stream {
let chunk = chunk.map_err(BatchError::connector)?;
let data_chunk = covert_stream_chunk_to_batch_chunk(chunk.chunk)?;
let data_chunk = covert_stream_chunk_to_batch_chunk(chunk)?;
if data_chunk.capacity() > 0 {
yield data_chunk;
}
Expand Down
2 changes: 1 addition & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl MockDatagenSource {
loop {
for i in &mut readers {
let item = i.next().await.unwrap().unwrap();
yield Message::Chunk(item.chunk);
yield Message::Chunk(item);
}
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/connector/benches/nexmark_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use std::sync::LazyLock;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use futures::{FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::ColumnId;
use risingwave_common::types::DataType;
use risingwave_connector::parser::{
ByteStreamSourceParser, JsonParser, SourceParserIntoStreamExt, SpecificParserConfig,
};
use risingwave_connector::source::{
BoxSourceStream, BoxSourceWithStateStream, SourceColumnDesc, SourceMessage, SourceMeta,
StreamChunkWithState,
BoxChunkSourceStream, BoxSourceStream, SourceColumnDesc, SourceMessage, SourceMeta,
};
use tracing::Level;
use tracing_subscriber::prelude::*;
Expand Down Expand Up @@ -90,9 +90,8 @@ fn make_parser() -> impl ByteStreamSourceParser {
JsonParser::new(props, columns, Default::default()).unwrap()
}

fn make_stream_iter() -> impl Iterator<Item = StreamChunkWithState> {
let mut stream: BoxSourceWithStateStream =
make_parser().into_stream(make_data_stream()).boxed();
fn make_stream_iter() -> impl Iterator<Item = StreamChunk> {
let mut stream: BoxChunkSourceStream = make_parser().into_stream(make_data_stream()).boxed();

std::iter::from_fn(move || {
stream
Expand Down
33 changes: 26 additions & 7 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ use crate::source::{
S3_CONNECTOR,
};

// Hidden additional columns connectors which do not support `include` syntax.
pub static COMMON_COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
LazyLock::new(|| HashSet::from(["partition", "offset"]));

pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet<&'static str>>> =
LazyLock::new(|| {
HashMap::from([
Expand All @@ -53,7 +57,7 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
])
});

fn gen_default_name(
pub fn gen_default_addition_col_name(
connector_name: &str,
additional_col_type: &str,
inner_field_name: Option<&str>,
Expand Down Expand Up @@ -81,8 +85,23 @@ pub fn build_additional_column_catalog(
column_alias: Option<String>,
inner_field_name: Option<&str>,
data_type: Option<&str>,
reject_unknown_connector: bool,
) -> Result<ColumnCatalog> {
let compatible_columns = COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name).unwrap();
let compatible_columns = match (
COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name),
reject_unknown_connector,
) {
(Some(compat_cols), _) => compat_cols,
(None, false) => &COMMON_COMPATIBLE_ADDITIONAL_COLUMNS,
(None, true) => {
return Err(format!(
"additional column is not supported for connector {}, acceptable connectors: {:?}",
connector_name,
COMPATIBLE_ADDITIONAL_COLUMNS.keys(),
)
.into())
}
};
if !compatible_columns.contains(additional_col_type) {
return Err(format!(
"additional column type {} is not supported for connector {}, acceptable column types: {:?}",
Expand All @@ -91,7 +110,7 @@ pub fn build_additional_column_catalog(
}

let column_name = column_alias.unwrap_or_else(|| {
gen_default_name(
gen_default_addition_col_name(
connector_name,
additional_col_type,
inner_field_name,
Expand Down Expand Up @@ -241,17 +260,17 @@ mod test {
use super::*;

#[test]
fn test_gen_default_name() {
fn test_gen_default_addition_col_name() {
assert_eq!(
gen_default_name("kafka", "key", None, None),
gen_default_addition_col_name("kafka", "key", None, None),
"_rw_kafka_key"
);
assert_eq!(
gen_default_name("kafka", "header", Some("inner"), None),
gen_default_addition_col_name("kafka", "header", Some("inner"), None),
"_rw_kafka_header_inner"
);
assert_eq!(
gen_default_name("kafka", "header", Some("inner"), Some("varchar")),
gen_default_addition_col_name("kafka", "header", Some("inner"), Some("varchar")),
"_rw_kafka_header_inner_varchar"
);
}
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/parser/debezium/mongo_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ impl DebeziumMongoJsonParser {
})?
.clone();

if rw_columns.len() != 2 {
// _rw_{connector}_file/partition & _rw_{connector}_offset are created automatically.
if rw_columns.iter().filter(|desc| desc.is_visible()).count() != 2 {
return Err(RwError::from(ProtocolError(
"Debezuim Mongo needs no more columns except `_id` and `payload` in table".into(),
)));
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ mod tests {
fields: vec![],
column_type: SourceColumnType::Normal,
is_pk: false,
is_hidden_addition_col: false,
additional_column_type: AdditionalColumn { column_type: None },
},
SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)),
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ mod tests {
fields: vec![],
column_type: SourceColumnType::Normal,
is_pk: true,
is_hidden_addition_col: false,
additional_column_type: AdditionalColumn {
column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
},
Expand Down
38 changes: 9 additions & 29 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ use crate::parser::util::{
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId,
StreamChunkWithState,
extract_source_struct, BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType,
SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMeta,
};

pub mod additional_columns;
Expand Down Expand Up @@ -567,8 +566,8 @@ impl<P: ByteStreamSourceParser> P {
///
/// # Returns
///
/// A [`SourceWithStateStream`] which is a stream of parsed messages.
pub fn into_stream(self, data_stream: BoxSourceStream) -> impl SourceWithStateStream {
/// A [`ChunkSourceStream`] which is a stream of parsed messages.
pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream {
// Enable tracing to provide more information for parsing failures.
let source_info = self.source_ctx().source_info;

Expand All @@ -590,12 +589,11 @@ const MAX_ROWS_FOR_TRANSACTION: usize = 4096;

// TODO: when upsert is disabled, how to filter those empty payload
// Currently, an err is returned for non upsert with empty payload
#[try_stream(ok = StreamChunkWithState, error = RwError)]
#[try_stream(ok = StreamChunk, error = RwError)]
async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream: BoxSourceStream) {
let columns = parser.columns().to_vec();

let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);
let mut split_offset_mapping = HashMap::<SplitId, String>::new();

struct Transaction {
id: Box<str>,
Expand All @@ -620,10 +618,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
);
*len = 0; // reset `len` while keeping `id`
yield_asap = false;
yield StreamChunkWithState {
chunk: builder.take(batch_len),
split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)),
};
yield builder.take(batch_len);
} else {
// Normal transaction. After the transaction is committed, we should yield the last
// batch immediately, so set `yield_asap` to true.
Expand All @@ -633,20 +628,13 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
// Clean state. Reserve capacity for the builder.
assert!(builder.is_empty());
assert!(!yield_asap);
assert!(split_offset_mapping.is_empty());
let _ = builder.take(batch_len);
}

let process_time_ms = chrono::Utc::now().timestamp_millis();
for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
// assumes an empty message as a heartbeat
// heartbeat message offset should not overwrite data messages offset
split_offset_mapping
.entry(msg.split_id)
.or_insert(msg.offset.clone());

continue;
}

Expand All @@ -660,8 +648,6 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
.observe(lag_ms as f64);
}

split_offset_mapping.insert(msg.split_id.clone(), msg.offset.clone());

let old_op_num = builder.op_num();
match parser
.parse_one_with_txn(
Expand Down Expand Up @@ -727,10 +713,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
// chunk now.
if current_transaction.is_none() && yield_asap {
yield_asap = false;
yield StreamChunkWithState {
chunk: builder.take(batch_len - (i + 1)),
split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)),
};
yield builder.take(batch_len - (i + 1));
}
}
}
Expand All @@ -740,10 +723,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
if current_transaction.is_none() {
yield_asap = false;

yield StreamChunkWithState {
chunk: builder.take(0),
split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)),
};
yield builder.take(0);
}
}
}
Expand Down Expand Up @@ -815,7 +795,7 @@ pub enum ByteStreamSourceParserImpl {
CanalJson(CanalJsonParser),
}

pub type ParsedStreamImpl = impl SourceWithStateStream + Unpin;
pub type ParsedStreamImpl = impl ChunkSourceStream + Unpin;

impl ByteStreamSourceParserImpl {
/// Converts this parser into a stream of [`StreamChunk`].
Expand Down
14 changes: 7 additions & 7 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,18 @@ mod tests {
let output = output
.unwrap()
.into_iter()
.filter(|c| c.chunk.cardinality() > 0)
.filter(|c| c.cardinality() > 0)
.enumerate()
.map(|(i, c)| {
if i == 0 {
// begin + 3 data messages
assert_eq!(4, c.chunk.cardinality());
assert_eq!(4, c.cardinality());
}
if i == 1 {
// 2 data messages + 1 end
assert_eq!(3, c.chunk.cardinality());
assert_eq!(3, c.cardinality());
}
c.chunk
c
})
.collect_vec();

Expand All @@ -255,11 +255,11 @@ mod tests {
let output = output
.unwrap()
.into_iter()
.filter(|c| c.chunk.cardinality() > 0)
.filter(|c| c.cardinality() > 0)
.map(|c| {
// 5 data messages in a single chunk
assert_eq!(5, c.chunk.cardinality());
c.chunk
assert_eq!(5, c.cardinality());
c
})
.collect_vec();

Expand Down
26 changes: 3 additions & 23 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,30 +343,10 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct>

pub type BoxSourceStream = BoxStream<'static, Result<Vec<SourceMessage>>>;

pub trait SourceWithStateStream =
Stream<Item = Result<StreamChunkWithState, RwError>> + Send + 'static;
pub type BoxSourceWithStateStream = BoxStream<'static, Result<StreamChunkWithState, RwError>>;
pub trait ChunkSourceStream = Stream<Item = Result<StreamChunk, RwError>> + Send + 'static;
pub type BoxChunkSourceStream = BoxStream<'static, Result<StreamChunk, RwError>>;
pub type BoxTryStream<M> = BoxStream<'static, Result<M, RwError>>;

/// [`StreamChunkWithState`] returns stream chunk together with offset for each split. In the
/// current design, one connector source can have multiple split reader. The keys are unique
/// `split_id` and values are the latest offset for each split.
#[derive(Clone, Debug, PartialEq)]
pub struct StreamChunkWithState {
pub chunk: StreamChunk,
pub split_offset_mapping: Option<HashMap<SplitId, String>>,
}

/// The `split_offset_mapping` field is unused for the table source, so we implement `From` for it.
impl From<StreamChunk> for StreamChunkWithState {
fn from(chunk: StreamChunk) -> Self {
Self {
chunk,
split_offset_mapping: None,
}
}
}

/// [`SplitReader`] is a new abstraction of the external connector read interface which is
/// responsible for parsing, it is used to read messages from the outside and transform them into a
/// stream of parsed [`StreamChunk`]
Expand All @@ -383,7 +363,7 @@ pub trait SplitReader: Sized + Send {
columns: Option<Vec<Column>>,
) -> Result<Self>;

fn into_stream(self) -> BoxSourceWithStateStream;
fn into_stream(self) -> BoxChunkSourceStream;
}

for_all_sources!(impl_connector_properties);
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use crate::parser::ParserConfig;
use crate::source::base::SourceMessage;
use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit};
use crate::source::{
into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef,
SplitId, SplitMetaData, SplitReader,
into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, SplitId,
SplitMetaData, SplitReader,
};

pub struct CdcSplitReader<T: CdcSourceTypeTrait> {
Expand Down Expand Up @@ -187,7 +187,7 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
}
}

fn into_stream(self) -> BoxSourceWithStateStream {
fn into_stream(self) -> BoxChunkSourceStream {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self, parser_config, source_context)
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/source/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@

use futures::{Stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::StreamChunk;
use risingwave_common::error::RwError;

use crate::parser::ParserConfig;
use crate::source::{SourceContextRef, SourceMessage, SplitReader, StreamChunkWithState};
use crate::source::{SourceContextRef, SourceMessage, SplitReader};

pub(crate) trait CommonSplitReader: SplitReader + 'static {
fn into_data_stream(
self,
) -> impl Stream<Item = Result<Vec<SourceMessage>, anyhow::Error>> + Send;
}

#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)]
#[try_stream(boxed, ok = StreamChunk, error = RwError)]
pub(crate) async fn into_chunk_stream(
reader: impl CommonSplitReader,
parser_config: ParserConfig,
Expand Down
Loading

0 comments on commit 372c2d7

Please sign in to comment.