Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: deprecate StreamChunkWithState #14524

Merged
merged 47 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
976ed2a
add offset/partition cols upon building SourceDesc
Rossil2012 Jan 11, 2024
f2d9466
batch rename
Rossil2012 Jan 12, 2024
095f3a1
refactor nexmark reader
Rossil2012 Jan 12, 2024
2217dc6
fix nexmark
Rossil2012 Jan 12, 2024
9fd4670
refactor datagen
Rossil2012 Jan 12, 2024
e3ac101
fmt
Rossil2012 Jan 12, 2024
f696d25
refactor opendal reader
Rossil2012 Jan 12, 2024
3fe8df9
fix test of plain_parser
Rossil2012 Jan 12, 2024
df51578
refactor pulsar-iceberg source
Rossil2012 Jan 15, 2024
c40634e
refactor fetch_executor roughly
Rossil2012 Jan 15, 2024
5d570fc
refactor parser
Rossil2012 Jan 15, 2024
7035694
fix misc
Rossil2012 Jan 15, 2024
e8ea63f
fix executors
Rossil2012 Jan 15, 2024
0ffdf3b
fix misc
Rossil2012 Jan 15, 2024
a544eb0
const additional column name
Rossil2012 Jan 15, 2024
8c1f930
refactor fetch_executor
Rossil2012 Jan 15, 2024
633c710
simplify import
Rossil2012 Jan 15, 2024
d5203d2
fix comments
Rossil2012 Jan 16, 2024
4b70038
refactor column_catalogs_to_source_column_descs
Rossil2012 Jan 17, 2024
1df6e82
fix test_source_executor
Rossil2012 Jan 17, 2024
f8f3a45
prune chunk conditionally
Rossil2012 Jan 17, 2024
7098e96
fix e2e-source-test
Rossil2012 Jan 17, 2024
9fdc2a1
debug
Rossil2012 Jan 17, 2024
9fc26d2
debug
Rossil2012 Jan 17, 2024
b0a3b04
fix hidden columns
Rossil2012 Jan 19, 2024
830e389
fix debezuim mongo
Rossil2012 Jan 19, 2024
7f07678
Merge branch 'main' into kanzhen/deprecate_chunk_with_state
Rossil2012 Jan 19, 2024
40b5024
fix confict
Rossil2012 Jan 19, 2024
3216a23
fix mongo test
Rossil2012 Jan 19, 2024
2aae930
Merge branch 'main' into kanzhen/deprecate_chunk_with_state
Rossil2012 Jan 20, 2024
e569e5d
fix simulation test
Rossil2012 Jan 22, 2024
6e32500
fix simulation test
Rossil2012 Jan 22, 2024
91480d4
fix simulation test
Rossil2012 Jan 22, 2024
62df41d
determine hidden without name
Rossil2012 Jan 22, 2024
052f852
fix unit test
Rossil2012 Jan 22, 2024
a18f361
avoid building a source_desc in from_proto
Rossil2012 Jan 22, 2024
08f8f12
chores
Rossil2012 Jan 23, 2024
e5ce20d
fix comments
Rossil2012 Jan 25, 2024
8ac8f68
refactor datagen with StreamChunkBuilder
Rossil2012 Jan 25, 2024
ab1e53a
refactor nexmark with StreamChunkBuilder
Rossil2012 Jan 25, 2024
487fc1c
fix warning
Rossil2012 Jan 25, 2024
60aa198
resolve conflicts
Rossil2012 Jan 25, 2024
f78c256
fix unit test
Rossil2012 Jan 25, 2024
23f480c
fix conflict
Rossil2012 Jan 25, 2024
a2888cf
add comments
Rossil2012 Jan 25, 2024
3625684
fix conflict
Rossil2012 Jan 25, 2024
e39c486
fix conflicts
Rossil2012 Jan 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl SourceExecutor {
#[for_await]
for chunk in stream {
let chunk = chunk.map_err(BatchError::connector)?;
let data_chunk = covert_stream_chunk_to_batch_chunk(chunk.chunk)?;
let data_chunk = covert_stream_chunk_to_batch_chunk(chunk)?;
if data_chunk.capacity() > 0 {
yield data_chunk;
}
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ pub fn row_id_column_desc() -> ColumnDesc {
ColumnDesc::named(ROWID_PREFIX, ROW_ID_COLUMN_ID, DataType::Serial)
}

/// The additional columns for partition/offset.
pub const ADDITION_PARTITION_COLUMN_NAME: &str = "_rw_addition_partition";
pub const ADDITION_OFFSET_COLUMN_NAME: &str = "_rw_addition_offset";
Rossil2012 marked this conversation as resolved.
Show resolved Hide resolved

pub const OFFSET_COLUMN_NAME: &str = "_rw_offset";

// The number of columns output by the cdc source job
Expand Down
9 changes: 4 additions & 5 deletions src/connector/benches/nexmark_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use std::sync::LazyLock;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use futures::{FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::ColumnId;
use risingwave_common::types::DataType;
use risingwave_connector::parser::{
ByteStreamSourceParser, JsonParser, SourceParserIntoStreamExt, SpecificParserConfig,
};
use risingwave_connector::source::{
BoxSourceStream, BoxSourceWithStateStream, SourceColumnDesc, SourceMessage, SourceMeta,
StreamChunkWithState,
BoxChunkedSourceStream, BoxSourceStream, SourceColumnDesc, SourceMessage, SourceMeta,
};
use tracing::Level;
use tracing_subscriber::prelude::*;
Expand Down Expand Up @@ -90,9 +90,8 @@ fn make_parser() -> impl ByteStreamSourceParser {
JsonParser::new(props, columns, Default::default()).unwrap()
}

fn make_stream_iter() -> impl Iterator<Item = StreamChunkWithState> {
let mut stream: BoxSourceWithStateStream =
make_parser().into_stream(make_data_stream()).boxed();
fn make_stream_iter() -> impl Iterator<Item = StreamChunk> {
let mut stream: BoxChunkedSourceStream = make_parser().into_stream(make_data_stream()).boxed();

std::iter::from_fn(move || {
stream
Expand Down
38 changes: 9 additions & 29 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ use crate::parser::util::{extract_headers_from_meta, extreact_timestamp_from_met
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId,
StreamChunkWithState,
extract_source_struct, BoxSourceStream, ChunkedSourceStream, SourceColumnDesc,
SourceColumnType, SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMeta,
};

pub mod additional_columns;
Expand Down Expand Up @@ -551,8 +550,8 @@ impl<P: ByteStreamSourceParser> P {
///
/// # Returns
///
/// A [`SourceWithStateStream`] which is a stream of parsed messages.
pub fn into_stream(self, data_stream: BoxSourceStream) -> impl SourceWithStateStream {
/// A [`ChunkedSourceStream`] which is a stream of parsed messages.
Rossil2012 marked this conversation as resolved.
Show resolved Hide resolved
pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkedSourceStream {
// Enable tracing to provide more information for parsing failures.
let source_info = self.source_ctx().source_info;

Expand All @@ -574,12 +573,11 @@ const MAX_ROWS_FOR_TRANSACTION: usize = 4096;

// TODO: when upsert is disabled, how to filter those empty payload
// Currently, an err is returned for non upsert with empty payload
#[try_stream(ok = StreamChunkWithState, error = RwError)]
#[try_stream(ok = StreamChunk, error = RwError)]
async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream: BoxSourceStream) {
let columns = parser.columns().to_vec();

let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);
let mut split_offset_mapping = HashMap::<SplitId, String>::new();

struct Transaction {
id: Box<str>,
Expand All @@ -604,10 +602,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
);
*len = 0; // reset `len` while keeping `id`
yield_asap = false;
yield StreamChunkWithState {
chunk: builder.take(batch_len),
split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)),
};
yield builder.take(batch_len);
} else {
// Normal transaction. After the transaction is committed, we should yield the last
// batch immediately, so set `yield_asap` to true.
Expand All @@ -617,20 +612,13 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
// Clean state. Reserve capacity for the builder.
assert!(builder.is_empty());
assert!(!yield_asap);
assert!(split_offset_mapping.is_empty());
let _ = builder.take(batch_len);
}

let process_time_ms = chrono::Utc::now().timestamp_millis();
for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
// assumes an empty message as a heartbeat
// heartbeat message offset should not overwrite data messages offset
split_offset_mapping
.entry(msg.split_id)
.or_insert(msg.offset.clone());

continue;
}

Expand All @@ -644,8 +632,6 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
.observe(lag_ms as f64);
}

split_offset_mapping.insert(msg.split_id.clone(), msg.offset.clone());

let old_op_num = builder.op_num();
match parser
.parse_one_with_txn(
Expand Down Expand Up @@ -711,10 +697,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
// chunk now.
if current_transaction.is_none() && yield_asap {
yield_asap = false;
yield StreamChunkWithState {
chunk: builder.take(batch_len - (i + 1)),
split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)),
};
yield builder.take(batch_len - (i + 1));
}
}
}
Expand All @@ -724,10 +707,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
if current_transaction.is_none() {
yield_asap = false;

yield StreamChunkWithState {
chunk: builder.take(0),
split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)),
};
yield builder.take(0);
}
}
}
Expand Down Expand Up @@ -799,7 +779,7 @@ pub enum ByteStreamSourceParserImpl {
CanalJson(CanalJsonParser),
}

pub type ParsedStreamImpl = impl SourceWithStateStream + Unpin;
pub type ParsedStreamImpl = impl ChunkedSourceStream + Unpin;

impl ByteStreamSourceParserImpl {
/// Converts this parser into a stream of [`StreamChunk`].
Expand Down
14 changes: 7 additions & 7 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,18 @@ mod tests {
let output = output
.unwrap()
.into_iter()
.filter(|c| c.chunk.cardinality() > 0)
.filter(|c| c.cardinality() > 0)
.enumerate()
.map(|(i, c)| {
if i == 0 {
// begin + 3 data messages
assert_eq!(4, c.chunk.cardinality());
assert_eq!(4, c.cardinality());
}
if i == 1 {
// 2 data messages + 1 end
assert_eq!(3, c.chunk.cardinality());
assert_eq!(3, c.cardinality());
}
c.chunk
c
})
.collect_vec();

Expand All @@ -255,11 +255,11 @@ mod tests {
let output = output
.unwrap()
.into_iter()
.filter(|c| c.chunk.cardinality() > 0)
.filter(|c| c.cardinality() > 0)
.map(|c| {
// 5 data messages in a single chunk
assert_eq!(5, c.chunk.cardinality());
c.chunk
assert_eq!(5, c.cardinality());
c
})
.collect_vec();

Expand Down
26 changes: 3 additions & 23 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,30 +343,10 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct>

pub type BoxSourceStream = BoxStream<'static, Result<Vec<SourceMessage>>>;

pub trait SourceWithStateStream =
Stream<Item = Result<StreamChunkWithState, RwError>> + Send + 'static;
pub type BoxSourceWithStateStream = BoxStream<'static, Result<StreamChunkWithState, RwError>>;
pub trait ChunkedSourceStream = Stream<Item = Result<StreamChunk, RwError>> + Send + 'static;
pub type BoxChunkedSourceStream = BoxStream<'static, Result<StreamChunk, RwError>>;
pub type BoxTryStream<M> = BoxStream<'static, Result<M, RwError>>;

/// [`StreamChunkWithState`] returns stream chunk together with offset for each split. In the
/// current design, one connector source can have multiple split reader. The keys are unique
/// `split_id` and values are the latest offset for each split.
#[derive(Clone, Debug, PartialEq)]
pub struct StreamChunkWithState {
pub chunk: StreamChunk,
pub split_offset_mapping: Option<HashMap<SplitId, String>>,
}

/// The `split_offset_mapping` field is unused for the table source, so we implement `From` for it.
impl From<StreamChunk> for StreamChunkWithState {
fn from(chunk: StreamChunk) -> Self {
Self {
chunk,
split_offset_mapping: None,
}
}
}

/// [`SplitReader`] is a new abstraction of the external connector read interface which is
/// responsible for parsing, it is used to read messages from the outside and transform them into a
/// stream of parsed [`StreamChunk`]
Expand All @@ -383,7 +363,7 @@ pub trait SplitReader: Sized + Send {
columns: Option<Vec<Column>>,
) -> Result<Self>;

fn into_stream(self) -> BoxSourceWithStateStream;
fn into_stream(self) -> BoxChunkedSourceStream;
}

for_all_sources!(impl_connector_properties);
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::parser::ParserConfig;
use crate::source::base::SourceMessage;
use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit};
use crate::source::{
into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef,
into_chunk_stream, BoxChunkedSourceStream, Column, CommonSplitReader, SourceContextRef,
SplitId, SplitMetaData, SplitReader,
};

Expand Down Expand Up @@ -186,7 +186,7 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
}
}

fn into_stream(self) -> BoxSourceWithStateStream {
fn into_stream(self) -> BoxChunkedSourceStream {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self, parser_config, source_context)
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/source/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@

use futures::{Stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::StreamChunk;
use risingwave_common::error::RwError;

use crate::parser::ParserConfig;
use crate::source::{SourceContextRef, SourceMessage, SplitReader, StreamChunkWithState};
use crate::source::{SourceContextRef, SourceMessage, SplitReader};

pub(crate) trait CommonSplitReader: SplitReader + 'static {
fn into_data_stream(
self,
) -> impl Stream<Item = Result<Vec<SourceMessage>, anyhow::Error>> + Send;
}

#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)]
#[try_stream(boxed, ok = StreamChunk, error = RwError)]
pub(crate) async fn into_chunk_stream(
reader: impl CommonSplitReader,
parser_config: ParserConfig,
Expand Down
28 changes: 15 additions & 13 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::Result;
use futures_async_stream::try_stream;
use maplit::hashmap;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::error::RwError;
use risingwave_common::field_generator::FieldGeneratorImpl;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::iter_util::ZipEqFast;

use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
use crate::source::{SourceMessage, SourceMeta, SplitId, StreamChunkWithState};
use crate::source::{SourceMessage, SourceMeta, SplitId};

pub enum FieldDesc {
// field is invisible, generate None
Expand Down Expand Up @@ -158,11 +157,16 @@ impl DatagenEventGenerator {
}
}

#[try_stream(ok = StreamChunkWithState, error = RwError)]
#[try_stream(ok = StreamChunk, error = RwError)]
pub async fn into_native_stream(mut self) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
const MAX_ROWS_PER_YIELD: u64 = 1024;
let mut reach_end = false;
let dtypes_with_offset: Vec<_> = self
.data_types
.into_iter()
.chain([DataType::Varchar, DataType::Varchar])
.collect();
loop {
// generate `partition_rows_per_second` rows per second
interval.tick().await;
Expand Down Expand Up @@ -197,20 +201,18 @@ impl DatagenEventGenerator {
row.push(datum);
}

rows.push((Op::Insert, OwnedRow::new(row)));
self.offset += 1;
row.extend([
Some(ScalarImpl::Utf8(self.split_id.as_ref().into())),
Some(ScalarImpl::Utf8(self.offset.to_string().into_boxed_str())),
]);

rows.push((Op::Insert, OwnedRow::new(row)));
rows_generated_this_second += 1;
}

if !rows.is_empty() {
let chunk = StreamChunk::from_rows(&rows, &self.data_types);
let mapping = hashmap! {
self.split_id.clone() => (self.offset - 1).to_string()
Rossil2012 marked this conversation as resolved.
Show resolved Hide resolved
};
yield StreamChunkWithState {
chunk,
split_offset_mapping: Some(mapping),
};
yield StreamChunk::from_rows(&rows, &dtypes_with_offset);
}

if reach_end {
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::source::data_gen_util::spawn_data_generation_stream;
use crate::source::datagen::source::SEQUENCE_FIELD_KIND;
use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc};
use crate::source::{
into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, DataType,
into_chunk_stream, BoxChunkedSourceStream, Column, CommonSplitReader, DataType,
SourceContextRef, SourceMessage, SplitId, SplitMetaData, SplitReader,
};

Expand Down Expand Up @@ -138,7 +138,7 @@ impl SplitReader for DatagenSplitReader {
})
}

fn into_stream(self) -> BoxSourceWithStateStream {
fn into_stream(self) -> BoxChunkedSourceStream {
// Will buffer at most 4 event chunks.
const BUFFER_SIZE: usize = 4;
// spawn_data_generation_stream(self.generator.into_native_stream(), BUFFER_SIZE).boxed()
Expand All @@ -154,11 +154,11 @@ impl SplitReader for DatagenSplitReader {
spawn_data_generation_stream(
self.generator
.into_native_stream()
.inspect_ok(move |chunk_with_states| {
.inspect_ok(move |stream_chunk| {
metrics
.partition_input_count
.with_label_values(&[&actor_id, &source_id, &split_id])
.inc_by(chunk_with_states.chunk.cardinality() as u64);
.inc_by(stream_chunk.cardinality() as u64);
}),
BUFFER_SIZE,
)
Expand Down Expand Up @@ -397,7 +397,7 @@ mod tests {
.into_stream();

let stream_chunk = reader.next().await.unwrap().unwrap();
let (op, row) = stream_chunk.chunk.rows().next().unwrap();
let (op, row) = stream_chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
assert_eq!(row.datum_at(0), Some(ScalarImpl::Int32(533)).to_datum_ref(),);
assert_eq!(
Expand Down
Loading
Loading