Skip to content

Commit

Permalink
refactor(connector): improve common split reader to chunk stream logic (
Browse files Browse the repository at this point in the history
#16892)

Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan authored May 23, 2024
1 parent 50ae9d0 commit d9890ef
Showing 17 changed files with 94 additions and 126 deletions.
8 changes: 0 additions & 8 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
@@ -159,14 +159,6 @@ impl AvroParserConfig {
}
}

pub fn extract_pks(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(
self.key_schema
.as_deref()
.ok_or_else(|| anyhow::format_err!("key schema is required"))?,
)
}

pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(self.schema.as_ref())
}
10 changes: 5 additions & 5 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
@@ -139,9 +139,7 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult<DataType> {
.variants()
.iter()
.find_or_first(|s| !matches!(s, Schema::Null))
.ok_or_else(|| {
anyhow::format_err!("unsupported type in Avro: {:?}", union_schema)
})?;
.ok_or_else(|| anyhow::format_err!("unsupported Avro type: {:?}", union_schema))?;

avro_type_mapping(nested_schema)?
}
@@ -151,10 +149,12 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult<DataType> {
{
DataType::Decimal
} else {
bail!("unsupported type in Avro: {:?}", schema);
bail!("unsupported Avro type: {:?}", schema);
}
}
_ => bail!("unsupported type in Avro: {:?}", schema),
Schema::Map(_) | Schema::Null | Schema::Fixed(_) | Schema::Uuid => {
bail!("unsupported Avro type: {:?}", schema)
}
};

Ok(data_type)
23 changes: 14 additions & 9 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
@@ -544,8 +544,10 @@ pub enum ParserFormat {
Plain,
}

/// `ByteStreamSourceParser` is a new message parser, the parser should consume
/// the input data stream and return a stream of parsed msgs.
/// `ByteStreamSourceParser` is the entrypoint abstraction for parsing messages.
/// It consumes bytes of one individual message and produces parsed records.
///
/// It's used by [`ByteStreamSourceParserImpl::into_stream`]. `pub` is for benchmark only.
pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
/// The column descriptors of the output chunk.
fn columns(&self) -> &[SourceColumnDesc];
@@ -627,7 +629,7 @@ impl<P: ByteStreamSourceParser> P {

// The parser stream will be long-lived. We use `instrument_with` here to create
// a new span for the polling of each chunk.
into_chunk_stream(self, data_stream)
into_chunk_stream_inner(self, data_stream)
.instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id))
}
}
@@ -639,7 +641,10 @@ const MAX_ROWS_FOR_TRANSACTION: usize = 4096;
// TODO: when upsert is disabled, how to filter those empty payload
// Currently, an err is returned for non upsert with empty payload
#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream: BoxSourceStream) {
async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
mut parser: P,
data_stream: BoxSourceStream,
) {
let columns = parser.columns().to_vec();

let mut heartbeat_builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 0);
@@ -855,8 +860,10 @@ impl AccessBuilderImpl {
}
}

/// The entrypoint of parsing. It parses [`SourceMessage`] stream (byte stream) into [`StreamChunk`] stream.
/// Used by [`crate::source::into_chunk_stream`].
#[derive(Debug)]
pub enum ByteStreamSourceParserImpl {
pub(crate) enum ByteStreamSourceParserImpl {
Csv(CsvParser),
Json(JsonParser),
Debezium(DebeziumParser),
@@ -867,11 +874,9 @@ pub enum ByteStreamSourceParserImpl {
CanalJson(CanalJsonParser),
}

pub type ParsedStreamImpl = impl ChunkSourceStream + Unpin;

impl ByteStreamSourceParserImpl {
/// Converts this `SourceMessage` stream into a stream of [`StreamChunk`].
pub fn into_stream(self, msg_stream: BoxSourceStream) -> ParsedStreamImpl {
/// Converts [`SourceMessage`] stream into [`StreamChunk`] stream.
pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream + Unpin {
#[auto_enum(futures03::Stream)]
let stream = match self {
Self::Csv(parser) => parser.into_stream(msg_stream),
5 changes: 3 additions & 2 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ use crate::parser::upsert_parser::get_key_column_name;
use crate::parser::{BytesProperties, ParseResult, ParserFormat};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};

/// Parser for `FORMAT PLAIN`, i.e., append-only source.
#[derive(Debug)]
pub struct PlainParser {
pub key_builder: Option<AccessBuilderImpl>,
@@ -210,7 +211,7 @@ mod tests {
let mut transactional = false;
// for untransactional source, we expect emit a chunk for each message batch
let message_stream = source_message_stream(transactional);
let chunk_stream = crate::parser::into_chunk_stream(parser, message_stream.boxed());
let chunk_stream = crate::parser::into_chunk_stream_inner(parser, message_stream.boxed());
let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
.into_iter()
.collect();
@@ -247,7 +248,7 @@ mod tests {
// for transactional source, we expect emit a single chunk for the transaction
transactional = true;
let message_stream = source_message_stream(transactional);
let chunk_stream = crate::parser::into_chunk_stream(parser, message_stream.boxed());
let chunk_stream = crate::parser::into_chunk_stream_inner(parser, message_stream.boxed());
let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
.into_iter()
.collect();
3 changes: 1 addition & 2 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
@@ -478,8 +478,7 @@ mod tests {
/// - string: String
/// - Date (the number of days from the unix epoch, 1970-1-1 UTC)
/// - Timestamp (the number of milliseconds from the unix epoch, 1970-1-1 00:00:00.000 UTC)
pub(crate) fn from_avro_value(
fn from_avro_value(
value: Value,
value_schema: &Schema,
shape: &DataType,
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
@@ -42,7 +42,6 @@ use super::nexmark::source::message::NexmarkMeta;
use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use crate::error::ConnectorResult as Result;
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::FsPageItem;
use crate::source::monitor::EnumeratorMetrics;
use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc};
@@ -314,6 +313,7 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct>
Ok(SourceStruct::new(format, encode))
}

/// Stream of [`SourceMessage`].
pub type BoxSourceStream = BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;

pub trait ChunkSourceStream =
8 changes: 4 additions & 4 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
@@ -33,8 +33,8 @@ use crate::parser::ParserConfig;
use crate::source::base::SourceMessage;
use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit};
use crate::source::{
into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, SplitId,
SplitMetaData, SplitReader,
into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData,
SplitReader,
};

pub struct CdcSplitReader<T: CdcSourceTypeTrait> {
@@ -190,11 +190,11 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
fn into_stream(self) -> BoxChunkSourceStream {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self, parser_config, source_context)
into_chunk_stream(self.into_data_stream(), parser_config, source_context)
}
}

impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
impl<T: CdcSourceTypeTrait> CdcSplitReader<T> {
#[try_stream(ok = Vec<SourceMessage>, error = ConnectorError)]
async fn into_data_stream(self) {
let source_type = T::source_type();
13 changes: 5 additions & 8 deletions src/connector/src/source/common.rs
Original file line number Diff line number Diff line change
@@ -18,15 +18,13 @@ use risingwave_common::array::StreamChunk;

use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::ParserConfig;
use crate::source::{SourceContextRef, SourceMessage, SplitReader};

pub(crate) trait CommonSplitReader: SplitReader + 'static {
fn into_data_stream(self) -> impl Stream<Item = ConnectorResult<Vec<SourceMessage>>> + Send;
}
use crate::source::{SourceContextRef, SourceMessage};

/// Utility function to convert [`SourceMessage`] stream (got from specific connector's [`SplitReader`](super::SplitReader))
/// into [`StreamChunk`] stream (by invoking [`ByteStreamSourceParserImpl`](crate::parser::ByteStreamSourceParserImpl)).
#[try_stream(boxed, ok = StreamChunk, error = ConnectorError)]
pub(crate) async fn into_chunk_stream(
reader: impl CommonSplitReader,
data_stream: impl Stream<Item = ConnectorResult<Vec<SourceMessage>>> + Send + 'static,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
) {
@@ -36,8 +34,7 @@ pub(crate) async fn into_chunk_stream(
let source_name = source_ctx.source_name.to_string();
let metrics = source_ctx.metrics.clone();

let data_stream = reader.into_data_stream();

// add metrics to the data stream
let data_stream = data_stream
.inspect_ok(move |data_batch| {
let mut by_split_id = std::collections::HashMap::new();
8 changes: 4 additions & 4 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
@@ -27,8 +27,8 @@ use crate::source::data_gen_util::spawn_data_generation_stream;
use crate::source::datagen::source::SEQUENCE_FIELD_KIND;
use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc};
use crate::source::{
into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, DataType, SourceContextRef,
SourceMessage, SplitId, SplitMetaData, SplitReader,
into_chunk_stream, BoxChunkSourceStream, Column, DataType, SourceContextRef, SourceMessage,
SplitId, SplitMetaData, SplitReader,
};

pub struct DatagenSplitReader {
@@ -177,13 +177,13 @@ impl SplitReader for DatagenSplitReader {
_ => {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self, parser_config, source_context)
into_chunk_stream(self.into_data_stream(), parser_config, source_context)
}
}
}
}

impl CommonSplitReader for DatagenSplitReader {
impl DatagenSplitReader {
fn into_data_stream(self) -> impl Stream<Item = ConnectorResult<Vec<SourceMessage>>> {
// Will buffer at most 4 event chunks.
const BUFFER_SIZE: usize = 4;
Original file line number Diff line number Diff line change
@@ -25,12 +25,12 @@ use tokio_util::io::{ReaderStream, StreamReader};
use super::opendal_enumerator::OpendalEnumerator;
use super::OpendalSource;
use crate::error::ConnectorResult;
use crate::parser::{ByteStreamSourceParserImpl, ParserConfig};
use crate::parser::ParserConfig;
use crate::source::filesystem::nd_streaming::need_nd_streaming;
use crate::source::filesystem::{nd_streaming, OpendalFsSplit};
use crate::source::{
BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
SplitReader,
into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta,
SplitMetaData, SplitReader,
};

const MAX_CHANNEL_BUFFER_SIZE: usize = 2048;
@@ -65,46 +65,30 @@ impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
}

fn into_stream(self) -> BoxChunkSourceStream {
self.into_chunk_stream()
self.into_stream_inner()
}
}

impl<Src: OpendalSource> OpendalReader<Src> {
#[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
async fn into_chunk_stream(self) {
async fn into_stream_inner(self) {
for split in self.splits {
let actor_id = self.source_ctx.actor_id.to_string();
let fragment_id = self.source_ctx.fragment_id.to_string();
let source_id = self.source_ctx.source_id.to_string();
let source_name = self.source_ctx.source_name.to_string();
let source_ctx = self.source_ctx.clone();

let split_id = split.id();

let data_stream =
Self::stream_read_object(self.connector.op.clone(), split, self.source_ctx.clone());

let parser =
ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?;
let msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) {
parser.into_stream(nd_streaming::split_stream(data_stream))
let data_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) {
nd_streaming::split_stream(data_stream)
} else {
parser.into_stream(data_stream)
data_stream
};

let msg_stream = into_chunk_stream(
data_stream,
self.parser_config.clone(),
self.source_ctx.clone(),
);
#[for_await]
for msg in msg_stream {
let msg = msg?;
self.source_ctx
.metrics
.partition_input_count
.with_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
])
.inc_by(msg.cardinality() as u64);
yield msg;
}
}
46 changes: 16 additions & 30 deletions src/connector/src/source/filesystem/s3/source/reader.rs
Original file line number Diff line number Diff line change
@@ -33,13 +33,15 @@ use tokio_util::io::ReaderStream;
use crate::aws_utils::{default_conn_config, s3_client};
use crate::connector_common::AwsAuthProps;
use crate::error::ConnectorResult;
use crate::parser::{ByteStreamSourceParserImpl, ParserConfig};
use crate::parser::ParserConfig;
use crate::source::base::{SplitMetaData, SplitReader};
use crate::source::filesystem::file_common::FsSplit;
use crate::source::filesystem::nd_streaming;
use crate::source::filesystem::nd_streaming::need_nd_streaming;
use crate::source::filesystem::s3::S3Properties;
use crate::source::{BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta};
use crate::source::{
into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta,
};

const MAX_CHANNEL_BUFFER_SIZE: usize = 2048;
const STREAM_READER_CAPACITY: usize = 4096;
@@ -204,50 +206,34 @@ impl SplitReader for S3FileReader {
}

fn into_stream(self) -> BoxChunkSourceStream {
self.into_chunk_stream()
self.into_stream_inner()
}
}

impl S3FileReader {
#[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
async fn into_chunk_stream(self) {
async fn into_stream_inner(self) {
for split in self.splits {
let actor_id = self.source_ctx.actor_id.to_string();
let fragment_id = self.source_ctx.fragment_id.to_string();
let source_id = self.source_ctx.source_id.to_string();
let source_name = self.source_ctx.source_name.to_string();
let source_ctx = self.source_ctx.clone();

let split_id = split.id();

let data_stream = Self::stream_read_object(
self.s3_client.clone(),
self.bucket_name.clone(),
split,
self.source_ctx.clone(),
);

let parser =
ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?;
let msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) {
parser.into_stream(nd_streaming::split_stream(data_stream))
let data_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) {
nd_streaming::split_stream(data_stream)
} else {
parser.into_stream(data_stream)
data_stream
};

let msg_stream = into_chunk_stream(
data_stream,
self.parser_config.clone(),
self.source_ctx.clone(),
);
#[for_await]
for msg in msg_stream {
let msg = msg?;
self.source_ctx
.metrics
.partition_input_count
.with_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
])
.inc_by(msg.cardinality() as u64);
yield msg;
}
}
@@ -313,7 +299,7 @@ mod tests {
.await
.unwrap();

let msg_stream = reader.into_chunk_stream();
let msg_stream = reader.into_stream_inner();
#[for_await]
for msg in msg_stream {
println!("msg {:?}", msg);
Loading

0 comments on commit d9890ef

Please sign in to comment.