From 980afcdd81c339e37621981b555f57751d111dc6 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Thu, 17 Nov 2022 13:12:12 +0000 Subject: [PATCH 1/5] feat(streaming): introduce source executor v2 --- src/source/src/connector_source.rs | 180 ++++- src/source/src/lib.rs | 1 + src/source/src/manager.rs | 3 +- src/stream/src/executor/source/mod.rs | 2 + .../src/executor/source/source_executor_v2.rs | 742 ++++++++++++++++++ 5 files changed, 925 insertions(+), 3 deletions(-) create mode 100644 src/stream/src/executor/source/source_executor_v2.rs diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 0c8c161a99aba..6e4f276b307ce 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -20,16 +20,28 @@ use futures::stream::BoxStream; use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::catalog::{ColumnId, TableId}; +use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; +use risingwave_common::error::ErrorCode::{ConnectorError, ProtocolError}; use risingwave_common::error::{internal_error, Result, RwError, ToRwResult}; use risingwave_common::util::select_all; use risingwave_connector::source::{ Column, ConnectorProperties, ConnectorState, SourceMessage, SplitId, SplitMetaData, SplitReaderImpl, }; +use risingwave_pb::catalog::{ + ColumnIndex as ProstColumnIndex, StreamSourceInfo as ProstStreamSourceInfo, +}; +use risingwave_pb::plan_common::{ + ColumnCatalog as ProstColumnCatalog, RowFormatType as ProstRowFormatType, +}; use crate::monitor::SourceMetrics; -use crate::{SourceColumnDesc, SourceParserImpl, SourceStreamChunkBuilder, StreamChunkWithState}; +use crate::{ + SourceColumnDesc, SourceFormat, SourceParserImpl, SourceStreamChunkBuilder, + StreamChunkWithState, +}; + +pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16; #[derive(Clone, Debug)] pub struct SourceContext { @@ -173,6 +185,33 @@ pub struct ConnectorSource { } impl ConnectorSource { + pub async fn new( + format: SourceFormat, + row_schema_location: &String, + use_schema_registry: bool, + proto_message_name: String, + properties: HashMap, + columns: Vec, + connector_message_buffer_size: usize, + ) -> Result { + let config = ConnectorProperties::extract(properties.clone()) + .map_err(|e| ConnectorError(e.into()))?; + let parser = SourceParserImpl::create( + &format, + &properties, + row_schema_location.as_str(), + use_schema_registry, + proto_message_name, + ) + .await?; + Ok(Self { + config, + columns, + parser, + connector_message_buffer_size, + }) + } + fn get_target_columns(&self, column_ids: Vec) -> Result> { column_ids .iter() @@ -231,3 +270,140 @@ impl ConnectorSource { }) } } + +/// `SourceDescV2` describes a stream source. +#[derive(Debug)] +pub struct SourceDescV2 { + pub source: ConnectorSource, + pub format: SourceFormat, + pub columns: Vec, + pub metrics: Arc, + pub pk_column_ids: Vec, +} + +#[derive(Clone)] +pub struct SourceDescBuilderV2 { + row_id_index: Option, + columns: Vec, + metrics: Arc, + pk_column_ids: Vec, + properties: HashMap, + source_info: ProstStreamSourceInfo, + connector_message_buffer_size: usize, +} + +impl SourceDescBuilderV2 { + pub fn new( + row_id_index: Option, + columns: Vec, + metrics: Arc, + pk_column_ids: Vec, + properties: HashMap, + source_info: ProstStreamSourceInfo, + connector_message_buffer_size: usize, + ) -> Self { + Self { + row_id_index, + columns, + metrics, + pk_column_ids, + properties, + source_info, + connector_message_buffer_size, + } + } + + pub async fn build(self) -> Result { + let format = match self.source_info.get_row_format()? { + ProstRowFormatType::Json => SourceFormat::Json, + ProstRowFormatType::Protobuf => SourceFormat::Protobuf, + ProstRowFormatType::DebeziumJson => SourceFormat::DebeziumJson, + ProstRowFormatType::Avro => SourceFormat::Avro, + ProstRowFormatType::Maxwell => SourceFormat::Maxwell, + ProstRowFormatType::RowUnspecified => unreachable!(), + }; + + if format == SourceFormat::Protobuf && self.source_info.row_schema_location.is_empty() { + return Err(ProtocolError("protobuf file location not provided".to_string()).into()); + } + + let mut columns: Vec<_> = self + .columns + .iter() + .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) + .collect(); + self.row_id_index.as_ref().map(|row_id_index| { + columns[row_id_index.index as usize].skip_parse = true; + }); + assert!( + !self.pk_column_ids.is_empty(), + "source should have at least one pk column" + ); + + let source = ConnectorSource::new( + format.clone(), + &self.source_info.row_schema_location, + self.source_info.use_schema_registry, + self.source_info.proto_message_name, + self.properties, + columns.clone(), + self.connector_message_buffer_size, + ) + .await?; + + Ok(SourceDescV2 { + source, + format, + columns, + metrics: self.metrics, + pk_column_ids: self.pk_column_ids, + }) + } +} + +pub mod test_utils { + use std::collections::HashMap; + + use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; + use risingwave_pb::catalog::{ColumnIndex, StreamSourceInfo}; + use risingwave_pb::plan_common::ColumnCatalog; + + use super::{SourceDescBuilderV2, DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE}; + + pub fn create_source_desc_builder( + schema: &Schema, + row_id_index: Option, + pk_column_ids: Vec, + source_info: StreamSourceInfo, + properties: HashMap, + ) -> SourceDescBuilderV2 { + let row_id_index = row_id_index.map(|index| ColumnIndex { index }); + let columns = schema + .fields + .iter() + .enumerate() + .map(|(i, f)| ColumnCatalog { + column_desc: Some( + ColumnDesc { + data_type: f.data_type.clone(), + column_id: ColumnId::from(i as i32), // use column index as column id + name: f.name.clone(), + field_descs: vec![], + type_name: "".to_string(), + } + .to_protobuf(), + ), + is_hidden: false, + }) + .collect(); + SourceDescBuilderV2 { + row_id_index, + columns, + metrics: Default::default(), + pk_column_ids, + properties, + source_info, + connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, + } + } +} diff --git a/src/source/src/lib.rs b/src/source/src/lib.rs index 143a8cb9b4fc2..f152a95c7f32e 100644 --- a/src/source/src/lib.rs +++ b/src/source/src/lib.rs @@ -42,6 +42,7 @@ pub use manager::test_utils as table_test_utils; mod common; pub mod connector_source; +pub use connector_source::test_utils as connector_test_utils; pub mod monitor; pub mod row_id; mod table; diff --git a/src/source/src/manager.rs b/src/source/src/manager.rs index 05b4e3f47b3dd..b5d16868712b3 100644 --- a/src/source/src/manager.rs +++ b/src/source/src/manager.rs @@ -28,6 +28,7 @@ use risingwave_pb::catalog::ColumnIndex as ProstColumnIndex; use risingwave_pb::plan_common::{ColumnCatalog as ProstColumnCatalog, RowFormatType}; use risingwave_pb::stream_plan::source_node::Info as ProstSourceInfo; +use crate::connector_source::DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE; use crate::monitor::SourceMetrics; use crate::table::TableSource; use crate::{ConnectorSource, SourceFormat, SourceImpl, SourceParserImpl}; @@ -179,7 +180,7 @@ impl Default for TableSourceManager { TableSourceManager { sources: Default::default(), metrics: Default::default(), - connector_message_buffer_size: 16, + connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, } } } diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index c1f4472733fb1..c6c35c965d259 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -15,6 +15,8 @@ pub mod source_executor; pub use source_executor::*; +pub mod source_executor_v2; + mod reader; pub mod state_table_handler; diff --git a/src/stream/src/executor/source/source_executor_v2.rs b/src/stream/src/executor/source/source_executor_v2.rs new file mode 100644 index 0000000000000..461bb7a678c6c --- /dev/null +++ b/src/stream/src/executor/source/source_executor_v2.rs @@ -0,0 +1,742 @@ +use std::fmt::Formatter; + +use either::Either; +use futures::StreamExt; +use futures_async_stream::try_stream; +use risingwave_common::catalog::{ColumnId, TableId}; +use risingwave_connector::source::{ConnectorState, SplitId, SplitMetaData}; +use risingwave_source::connector_source::{SourceContext, SourceDescBuilderV2, SourceDescV2}; +use risingwave_source::{BoxSourceWithStateStream, StreamChunkWithState}; +use risingwave_storage::StateStore; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::time::Instant; + +use super::SourceStateTableHandler; +use crate::executor::monitor::StreamingMetrics; +use crate::executor::source::reader::SourceReaderStream; +use crate::executor::*; + +/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to +/// some latencies in network and cost in meta. +const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5; + +/// [`StreamSourceCore`] stores the necessary information for the source executor to execute on the +/// external connector. +pub struct StreamSourceCore { + table_id: TableId, + + column_ids: Vec, + + source_identify: String, + + /// `source_desc_builder` will be taken (`mem::take`) on execution. A `SourceDesc` (currently + /// named `SourceDescV2`) will be constructed and used for execution. + source_desc_builder: Option, + + /// Split info for stream source. A source executor might read data from several splits of + /// external connector. + stream_source_splits: Vec, + + /// Stores informtion of the splits. + split_state_store: SourceStateTableHandler, + + /// In-memory cache for the splits. + state_cache: HashMap, +} + +pub struct SourceExecutorV2 { + ctx: ActorContextRef, + + identity: String, + + schema: Schema, + + pk_indices: PkIndices, + + /// Streaming source for external + stream_source_core: Option>, + + /// Metrics for monitor. + metrics: Arc, + + /// Receiver of barrier channel. + barrier_receiver: Option>, + + /// Expected barrier latency. + expected_barrier_latency_ms: u64, +} + +impl SourceExecutorV2 { + pub fn new( + ctx: ActorContextRef, + schema: Schema, + pk_indices: PkIndices, + stream_source_core: Option>, + metrics: Arc, + barrier_receiver: UnboundedReceiver, + expected_barrier_latency_ms: u64, + executor_id: u64, + ) -> Self { + Self { + ctx, + identity: format!("SourceExecutor {:X}", executor_id), + schema, + pk_indices, + stream_source_core, + metrics, + barrier_receiver: Some(barrier_receiver), + expected_barrier_latency_ms, + } + } + + async fn build_stream_source_reader( + &self, + source_desc: &SourceDescV2, + state: ConnectorState, + ) -> StreamExecutorResult { + let column_ids = source_desc + .columns + .iter() + .map(|column_desc| column_desc.column_id) + .collect_vec(); + Ok(source_desc + .source + .stream_reader( + state, + column_ids, + source_desc.metrics.clone(), + SourceContext::new( + self.ctx.id, + self.stream_source_core.as_ref().unwrap().table_id, + ), + ) + .await + .map_err(StreamExecutorError::connector_error)? + .into_stream()) + } + + async fn apply_split_change( + &mut self, + source_desc: &SourceDescV2, + stream: &mut SourceReaderStream, + mapping: &HashMap>, + ) -> StreamExecutorResult<()> { + if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() { + if let Some(target_state) = self.get_diff(Some(target_splits)).await? { + tracing::info!( + actor_id = self.ctx.id, + state = ?target_state, + "apply split change" + ); + + self.replace_stream_reader_with_target_state(source_desc, stream, target_state) + .await?; + } + } + + Ok(()) + } + + // Note: `get_diff` will modify `state_cache` + // `rhs` can not be None because we do not support split number reduction + async fn get_diff(&mut self, rhs: ConnectorState) -> StreamExecutorResult { + let source_info = self.stream_source_core.as_mut().unwrap(); + + let split_change = rhs.unwrap(); + let mut target_state: Vec = Vec::with_capacity(split_change.len()); + let mut no_change_flag = true; + for sc in &split_change { + if let Some(s) = source_info.state_cache.get(&sc.id()) { + target_state.push(s.clone()) + } else { + no_change_flag = false; + // write new assigned split to state cache. snapshot is base on cache. + + let state = if let Some(recover_state) = source_info + .split_state_store + .try_recover_from_state_store(sc) + .await? + { + recover_state + } else { + sc.clone() + }; + + source_info + .state_cache + .entry(sc.id()) + .or_insert_with(|| state.clone()); + target_state.push(state); + } + } + + Ok((!no_change_flag).then_some(target_state)) + } + + async fn replace_stream_reader_with_target_state( + &mut self, + source_desc: &SourceDescV2, + stream: &mut SourceReaderStream, + target_state: Vec, + ) -> StreamExecutorResult<()> { + tracing::info!( + "actor {:?} apply source split change to {:?}", + self.ctx.id, + target_state + ); + + // Replace the source reader with a new one of the new state. + let reader = self + .build_stream_source_reader(source_desc, Some(target_state.clone())) + .await?; + stream.replace_source_stream(reader); + + self.stream_source_core + .as_mut() + .unwrap() + .stream_source_splits = target_state; + + Ok(()) + } + + async fn take_snapshot_and_clear_cache( + &mut self, + epoch: EpochPair, + ) -> StreamExecutorResult<()> { + let source_info = self.stream_source_core.as_mut().unwrap(); + + let cache = source_info + .state_cache + .values() + .map(|split_impl| split_impl.to_owned()) + .collect_vec(); + + if !cache.is_empty() { + tracing::debug!(actor_id = self.ctx.id, state = ?cache, "take snapshot"); + source_info.split_state_store.take_snapshot(cache).await? + } + // commit anyway, even if no message saved + source_info + .split_state_store + .state_store + .commit(epoch) + .await?; + + source_info.state_cache.clear(); + + Ok(()) + } + + /// A source executor with a stream source receives: + /// 1. Barrier messages + /// 2. Data from external source + /// and acts accordingly. + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_with_stream_source(mut self) { + let mut barrier_receiver = self.barrier_receiver.take().unwrap(); + let barrier = barrier_receiver + .recv() + .stack_trace("source_recv_first_barrier") + .await + .unwrap(); + + let mut source_info = self.stream_source_core.unwrap(); + + // Build source description fron the builder. + let source_desc = source_info + .source_desc_builder + .take() + .unwrap() + .build() + .await + .map_err(StreamExecutorError::connector_error)?; + + if let Some(mutation) = barrier.mutation.as_ref() { + match mutation.as_ref() { + Mutation::Add { splits, .. } => { + if let Some(splits) = splits.get(&self.ctx.id) { + source_info.stream_source_splits = splits.clone(); + } + } + Mutation::Update { actor_splits, .. } => { + if let Some(splits) = actor_splits.get(&self.ctx.id) { + source_info.stream_source_splits = splits.clone(); + } + } + _ => {} + } + } + + source_info.split_state_store.init_epoch(barrier.epoch); + + let mut boot_state = source_info.stream_source_splits.clone(); + for ele in &mut boot_state { + if let Some(recover_state) = source_info + .split_state_store + .try_recover_from_state_store(ele) + .await? + { + *ele = recover_state; + } + } + + // Return the ownership of `source_info` to source executor. + self.stream_source_core = Some(source_info); + + let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); + tracing::info!( + "start actor {:?} with state {:?}", + self.ctx.id, + recover_state + ); + + let source_chunk_reader = self + .build_stream_source_reader(&source_desc, recover_state) + .stack_trace("source_build_reader") + .await?; + + // Merge the chunks from source and the barriers into a single stream. + let mut stream = SourceReaderStream::new(barrier_receiver, source_chunk_reader); + + // If the first barrier is configuration change, then the source executor must be newly + // created, and we should start with the paused state. + if barrier.is_update() { + stream.pause_source(); + } + + yield Message::Barrier(barrier); + + // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms` + // milliseconds, considering some other latencies like network and cost in Meta. + let max_wait_barrier_time_ms = + self.expected_barrier_latency_ms as u128 * WAIT_BARRIER_MULTIPLE_TIMES; + let mut last_barrier_time = Instant::now(); + let mut self_paused = false; + while let Some(msg) = stream.next().await { + match msg? { + // This branch will be preferred. + Either::Left(barrier) => { + last_barrier_time = Instant::now(); + if self_paused { + stream.resume_source(); + self_paused = false; + } + let epoch = barrier.epoch; + + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::SourceChangeSplit(actor_splits) => { + self.apply_split_change(&source_desc, &mut stream, actor_splits) + .await? + } + Mutation::Pause => stream.pause_source(), + Mutation::Resume => stream.resume_source(), + Mutation::Update { actor_splits, .. } => { + self.apply_split_change(&source_desc, &mut stream, actor_splits) + .await?; + } + _ => {} + } + } + + self.take_snapshot_and_clear_cache(epoch).await?; + + yield Message::Barrier(barrier); + } + + Either::Right(StreamChunkWithState { + chunk, + split_offset_mapping, + }) => { + if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms { + // Exceeds the max wait barrier time, the source will be paused. Currently + // we can guarantee the source is not paused since it received stream + // chunks. + self_paused = true; + stream.pause_source(); + } + if let Some(mapping) = split_offset_mapping { + let state: HashMap<_, _> = mapping + .iter() + .flat_map(|(split, offset)| { + let origin_split_impl = self + .stream_source_core + .as_ref() + .unwrap() + .stream_source_splits + .iter() + .filter(|origin_split| &origin_split.id() == split) + .at_most_one() + .unwrap_or_else(|_| { + panic!("multiple splits with same id `{split}`") + }); + + origin_split_impl.map(|split_impl| { + (split.clone(), split_impl.update(offset.clone())) + }) + }) + .collect(); + + self.stream_source_core + .as_mut() + .unwrap() + .state_cache + .extend(state); + } + + self.metrics + .source_output_row_count + .with_label_values(&[self + .stream_source_core + .as_ref() + .unwrap() + .source_identify + .as_str()]) + .inc_by(chunk.cardinality() as u64); + yield Message::Chunk(chunk); + } + } + } + + // The source executor should only be stopped by the actor when finding a `Stop` mutation. + tracing::error!( + actor_id = self.ctx.id, + "source executor exited unexpectedly" + ) + } + + /// A source executor without stream source only receives barrier messages and sends them to + /// the downstream executor. + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_without_stream_source(mut self) { + let mut barrier_receiver = self.barrier_receiver.take().unwrap(); + let barrier = barrier_receiver + .recv() + .stack_trace("source_recv_first_barrier") + .await + .unwrap(); + yield Message::Barrier(barrier); + + while let Some(barrier) = barrier_receiver.recv().await { + yield Message::Barrier(barrier); + } + } +} + +impl Executor for SourceExecutorV2 { + fn execute(self: Box) -> BoxedMessageStream { + if self.stream_source_core.is_some() { + self.execute_with_stream_source().boxed() + } else { + self.execute_without_stream_source().boxed() + } + } + + fn schema(&self) -> &Schema { + &self.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.pk_indices + } + + fn identity(&self) -> &str { + self.identity.as_str() + } +} + +impl Debug for SourceExecutorV2 { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(source_info) = &self.stream_source_core { + f.debug_struct("SourceExecutor") + .field("source_id", &source_info.table_id) + .field("column_ids", &source_info.column_ids) + .field("pk_indices", &self.pk_indices) + .finish() + } else { + f.debug_struct("SourceExecutor").finish() + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use maplit::{convert_args, hashmap}; + use risingwave_common::array::StreamChunk; + use risingwave_common::catalog::{Field, Schema, TableId}; + use risingwave_common::test_prelude::StreamChunkTestExt; + use risingwave_common::types::DataType; + use risingwave_common::util::sort_util::{OrderPair, OrderType}; + use risingwave_connector::source::datagen::DatagenSplit; + use risingwave_pb::catalog::StreamSourceInfo; + use risingwave_pb::plan_common::RowFormatType as ProstRowFormatType; + use risingwave_source::connector_test_utils::create_source_desc_builder; + use risingwave_storage::memory::MemoryStateStore; + use tokio::sync::mpsc::unbounded_channel; + + use super::*; + use crate::executor::ActorContext; + + #[tokio::test] + async fn test_source_executor() { + let table_id = TableId::default(); + let schema = Schema { + fields: vec![ + Field::unnamed(DataType::Int64), + Field::with_name(DataType::Int32, "sequence_int"), + ], + }; + let row_id_index = Some(0); + let pk_column_ids = vec![0]; + let pk_indices = vec![0]; + let source_info = StreamSourceInfo { + row_format: ProstRowFormatType::Json as i32, + ..Default::default() + }; + let (barrier_tx, barrier_rx) = unbounded_channel::(); + let column_ids = vec![0, 1].into_iter().map(ColumnId::from).collect(); + + // This datagen will generate 3 rows at one time. + let properties: HashMap = convert_args!(hashmap!( + "connector" => "datagen", + "datagen.rows.per.second" => "3", + "fields.sequence_int.kind" => "sequence", + "fields.sequence_int.start" => "11", + "fields.sequence_int.end" => "11111", + )); + let source_desc_builder = create_source_desc_builder( + &schema, + row_id_index, + pk_column_ids, + source_info, + properties, + ); + let split_state_store = SourceStateTableHandler::from_table_catalog( + &default_source_internal_table(0x2333), + MemoryStateStore::new(), + ) + .await; + let core = StreamSourceCore:: { + table_id, + column_ids, + source_identify: "Table_".to_string() + &table_id.table_id().to_string(), + source_desc_builder: Some(source_desc_builder), + stream_source_splits: vec![], + split_state_store, + state_cache: HashMap::new(), + }; + + let executor = SourceExecutorV2::new( + ActorContext::create(0), + schema, + pk_indices, + Some(core), + Arc::new(StreamingMetrics::unused()), + barrier_rx, + u64::MAX, + 1, + ); + let mut executor = Box::new(executor).execute(); + + let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add { + adds: HashMap::new(), + splits: hashmap! { + ActorId::default() => vec![ + SplitImpl::Datagen(DatagenSplit { + split_index: 0, + split_num: 1, + start_offset: None, + }), + ], + }, + }); + barrier_tx.send(init_barrier).unwrap(); + + // Consume barrier. + executor.next().await.unwrap().unwrap(); + + // Consume data chunk. + let msg = executor.next().await.unwrap().unwrap(); + + // Row id will not be filled here. + assert_eq!( + msg.into_chunk().unwrap(), + StreamChunk::from_pretty( + " I i + + . 11 + + . 12 + + . 13" + ) + ); + } + + #[tokio::test] + async fn test_split_change_mutation() { + let table_id = TableId::default(); + let schema = Schema { + fields: vec![Field::with_name(DataType::Int32, "v1")], + }; + let row_id_index = None; + let pk_column_ids = vec![0]; + let pk_indices = vec![0_usize]; + let source_info = StreamSourceInfo { + row_format: ProstRowFormatType::Json as i32, + ..Default::default() + }; + let properties = convert_args!(hashmap!( + "connector" => "datagen", + "fields.v1.min" => "1", + "fields.v1.max" => "1000", + "fields.v1.seed" => "12345", + )); + + let source_desc_builder = create_source_desc_builder( + &schema, + row_id_index, + pk_column_ids, + source_info, + properties, + ); + let mem_state_store = MemoryStateStore::new(); + + let column_ids = vec![ColumnId::from(0)]; + let (barrier_tx, barrier_rx) = unbounded_channel::(); + let split_state_store = SourceStateTableHandler::from_table_catalog( + &default_source_internal_table(0x2333), + mem_state_store.clone(), + ) + .await; + + let core = StreamSourceCore:: { + table_id, + column_ids: column_ids.clone(), + source_identify: "Table_".to_string() + &table_id.table_id().to_string(), + source_desc_builder: Some(source_desc_builder), + stream_source_splits: vec![], + split_state_store, + state_cache: HashMap::new(), + }; + + let executor = SourceExecutorV2::new( + ActorContext::create(0), + schema, + pk_indices, + Some(core), + Arc::new(StreamingMetrics::unused()), + barrier_rx, + u64::MAX, + 1, + ); + + let mut materialize = MaterializeExecutor::for_test( + Box::new(executor), + mem_state_store.clone(), + TableId::from(0x2333), + vec![OrderPair::new(0, OrderType::Ascending)], + column_ids, + 2, + ) + .await + .boxed() + .execute(); + + let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add { + adds: HashMap::new(), + splits: hashmap! { + ActorId::default() => vec![ + SplitImpl::Datagen(DatagenSplit { + split_index: 0, + split_num: 3, + start_offset: None, + }), + ], + }, + }); + barrier_tx.send(init_barrier).unwrap(); + + (materialize.next().await.unwrap().unwrap()) + .into_barrier() + .unwrap(); + + let mut ready_chunks = materialize.ready_chunks(10); + let chunks = (ready_chunks.next().await.unwrap()) + .into_iter() + .map(|msg| msg.unwrap().into_chunk().unwrap()) + .collect(); + let chunk_1 = StreamChunk::concat(chunks); + assert_eq!( + chunk_1, + StreamChunk::from_pretty( + " i + + 533 + + 833 + + 738 + + 344", + ) + ); + + let new_assignments = vec![ + SplitImpl::Datagen(DatagenSplit { + split_index: 0, + split_num: 3, + start_offset: None, + }), + SplitImpl::Datagen(DatagenSplit { + split_index: 1, + split_num: 3, + start_offset: None, + }), + ]; + + let change_split_mutation = + Barrier::new_test_barrier(2).with_mutation(Mutation::SourceChangeSplit(hashmap! { + ActorId::default() => new_assignments.clone() + })); + + barrier_tx.send(change_split_mutation).unwrap(); + + let _ = ready_chunks.next().await.unwrap(); // barrier + + let mut source_state_handler = SourceStateTableHandler::from_table_catalog( + &default_source_internal_table(0x2333), + mem_state_store.clone(), + ) + .await; + // there must exist state for new add partition + source_state_handler.init_epoch(EpochPair::new_test_epoch(2)); + source_state_handler + .get(new_assignments[1].id()) + .await + .unwrap() + .unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + let chunks = (ready_chunks.next().await.unwrap()) + .into_iter() + .map(|msg| msg.unwrap().into_chunk().unwrap()) + .collect(); + let chunk_2 = StreamChunk::concat(chunks).sort_rows(); + assert_eq!( + chunk_2, + // mixed from datagen split 0 and 1 + StreamChunk::from_pretty( + " i + + 29 + + 201 + + 344 + + 425 + + 525 + + 533 + + 833", + ) + ); + + let barrier = Barrier::new_test_barrier(3).with_mutation(Mutation::Pause); + barrier_tx.send(barrier).unwrap(); + + let barrier = Barrier::new_test_barrier(4).with_mutation(Mutation::Resume); + barrier_tx.send(barrier).unwrap(); + } +} From ded9eacdd214adae6290eb8049651e5591e66960 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Thu, 17 Nov 2022 13:31:30 +0000 Subject: [PATCH 2/5] pass risedev c --- src/source/src/connector_source.rs | 8 ++++---- src/stream/src/executor/source/source_executor_v2.rs | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 6e4f276b307ce..983bd76de4422 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -187,7 +187,7 @@ pub struct ConnectorSource { impl ConnectorSource { pub async fn new( format: SourceFormat, - row_schema_location: &String, + row_schema_location: &str, use_schema_registry: bool, proto_message_name: String, properties: HashMap, @@ -199,7 +199,7 @@ impl ConnectorSource { let parser = SourceParserImpl::create( &format, &properties, - row_schema_location.as_str(), + row_schema_location, use_schema_registry, proto_message_name, ) @@ -332,9 +332,9 @@ impl SourceDescBuilderV2 { .iter() .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) .collect(); - self.row_id_index.as_ref().map(|row_id_index| { + if let Some(row_id_index) = self.row_id_index.as_ref() { columns[row_id_index.index as usize].skip_parse = true; - }); + } assert!( !self.pk_column_ids.is_empty(), "source should have at least one pk column" diff --git a/src/stream/src/executor/source/source_executor_v2.rs b/src/stream/src/executor/source/source_executor_v2.rs index 461bb7a678c6c..009908e509ff5 100644 --- a/src/stream/src/executor/source/source_executor_v2.rs +++ b/src/stream/src/executor/source/source_executor_v2.rs @@ -37,7 +37,7 @@ pub struct StreamSourceCore { /// external connector. stream_source_splits: Vec, - /// Stores informtion of the splits. + /// Stores information of the splits. split_state_store: SourceStateTableHandler, /// In-memory cache for the splits. @@ -67,6 +67,7 @@ pub struct SourceExecutorV2 { } impl SourceExecutorV2 { + #[allow(clippy::too_many_arguments)] pub fn new( ctx: ActorContextRef, schema: Schema, @@ -242,7 +243,7 @@ impl SourceExecutorV2 { let mut source_info = self.stream_source_core.unwrap(); - // Build source description fron the builder. + // Build source description from the builder. let source_desc = source_info .source_desc_builder .take() From 23e3567f366a892669eb5f9e6f6cf11ef40b10d5 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Thu, 17 Nov 2022 14:16:46 +0000 Subject: [PATCH 3/5] fix var name --- .../src/executor/source/source_executor_v2.rs | 47 +++++++++---------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/src/stream/src/executor/source/source_executor_v2.rs b/src/stream/src/executor/source/source_executor_v2.rs index 009908e509ff5..ece3b8aa16fbb 100644 --- a/src/stream/src/executor/source/source_executor_v2.rs +++ b/src/stream/src/executor/source/source_executor_v2.rs @@ -141,19 +141,19 @@ impl SourceExecutorV2 { // Note: `get_diff` will modify `state_cache` // `rhs` can not be None because we do not support split number reduction async fn get_diff(&mut self, rhs: ConnectorState) -> StreamExecutorResult { - let source_info = self.stream_source_core.as_mut().unwrap(); + let core = self.stream_source_core.as_mut().unwrap(); let split_change = rhs.unwrap(); let mut target_state: Vec = Vec::with_capacity(split_change.len()); let mut no_change_flag = true; for sc in &split_change { - if let Some(s) = source_info.state_cache.get(&sc.id()) { + if let Some(s) = core.state_cache.get(&sc.id()) { target_state.push(s.clone()) } else { no_change_flag = false; // write new assigned split to state cache. snapshot is base on cache. - let state = if let Some(recover_state) = source_info + let state = if let Some(recover_state) = core .split_state_store .try_recover_from_state_store(sc) .await? @@ -163,8 +163,7 @@ impl SourceExecutorV2 { sc.clone() }; - source_info - .state_cache + core.state_cache .entry(sc.id()) .or_insert_with(|| state.clone()); target_state.push(state); @@ -204,9 +203,9 @@ impl SourceExecutorV2 { &mut self, epoch: EpochPair, ) -> StreamExecutorResult<()> { - let source_info = self.stream_source_core.as_mut().unwrap(); + let core = self.stream_source_core.as_mut().unwrap(); - let cache = source_info + let cache = core .state_cache .values() .map(|split_impl| split_impl.to_owned()) @@ -214,16 +213,12 @@ impl SourceExecutorV2 { if !cache.is_empty() { tracing::debug!(actor_id = self.ctx.id, state = ?cache, "take snapshot"); - source_info.split_state_store.take_snapshot(cache).await? + core.split_state_store.take_snapshot(cache).await? } // commit anyway, even if no message saved - source_info - .split_state_store - .state_store - .commit(epoch) - .await?; + core.split_state_store.state_store.commit(epoch).await?; - source_info.state_cache.clear(); + core.state_cache.clear(); Ok(()) } @@ -241,10 +236,10 @@ impl SourceExecutorV2 { .await .unwrap(); - let mut source_info = self.stream_source_core.unwrap(); + let mut core = self.stream_source_core.unwrap(); // Build source description from the builder. - let source_desc = source_info + let source_desc = core .source_desc_builder .take() .unwrap() @@ -256,23 +251,23 @@ impl SourceExecutorV2 { match mutation.as_ref() { Mutation::Add { splits, .. } => { if let Some(splits) = splits.get(&self.ctx.id) { - source_info.stream_source_splits = splits.clone(); + core.stream_source_splits = splits.clone(); } } Mutation::Update { actor_splits, .. } => { if let Some(splits) = actor_splits.get(&self.ctx.id) { - source_info.stream_source_splits = splits.clone(); + core.stream_source_splits = splits.clone(); } } _ => {} } } - source_info.split_state_store.init_epoch(barrier.epoch); + core.split_state_store.init_epoch(barrier.epoch); - let mut boot_state = source_info.stream_source_splits.clone(); + let mut boot_state = core.stream_source_splits.clone(); for ele in &mut boot_state { - if let Some(recover_state) = source_info + if let Some(recover_state) = core .split_state_store .try_recover_from_state_store(ele) .await? @@ -281,8 +276,8 @@ impl SourceExecutorV2 { } } - // Return the ownership of `source_info` to source executor. - self.stream_source_core = Some(source_info); + // Return the ownership of `stream_source_core` to the source executor. + self.stream_source_core = Some(core); let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::info!( @@ -448,10 +443,10 @@ impl Executor for SourceExecutorV2 { impl Debug for SourceExecutorV2 { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if let Some(source_info) = &self.stream_source_core { + if let Some(core) = &self.stream_source_core { f.debug_struct("SourceExecutor") - .field("source_id", &source_info.table_id) - .field("column_ids", &source_info.column_ids) + .field("source_id", &core.table_id) + .field("column_ids", &core.column_ids) .field("pk_indices", &self.pk_indices) .finish() } else { From 9b6ccb14f6c6cf0841d8d78814bc1b289741b4a1 Mon Sep 17 00:00:00 2001 From: Yuanxin Cao <60498509+xx01cyx@users.noreply.github.com> Date: Fri, 18 Nov 2022 11:56:41 +0800 Subject: [PATCH 4/5] Update src/stream/src/executor/source/source_executor_v2.rs Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- .../src/executor/source/source_executor_v2.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/stream/src/executor/source/source_executor_v2.rs b/src/stream/src/executor/source/source_executor_v2.rs index 009908e509ff5..b1b404739857d 100644 --- a/src/stream/src/executor/source/source_executor_v2.rs +++ b/src/stream/src/executor/source/source_executor_v2.rs @@ -1,3 +1,17 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::fmt::Formatter; use either::Either; From 7258f7b2c9fea383ef3cb8b2e052f63ed9b0ac7a Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Mon, 21 Nov 2022 03:15:56 +0000 Subject: [PATCH 5/5] pass ut --- src/source/src/connector_source.rs | 20 +++++++++++++++++-- .../src/executor/source/source_executor_v2.rs | 3 +++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 983bd76de4422..067d49e31bc22 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -185,6 +185,7 @@ pub struct ConnectorSource { } impl ConnectorSource { + #[allow(clippy::too_many_arguments)] pub async fn new( format: SourceFormat, row_schema_location: &str, @@ -192,10 +193,18 @@ impl ConnectorSource { proto_message_name: String, properties: HashMap, columns: Vec, + connector_node_addr: String, connector_message_buffer_size: usize, ) -> Result { - let config = ConnectorProperties::extract(properties.clone()) - .map_err(|e| ConnectorError(e.into()))?; + // Store the connector node address to properties for later use. + let mut source_props: HashMap = + HashMap::from_iter(properties.clone().into_iter()); + source_props.insert( + "connector_node_addr".to_string(), + connector_node_addr.clone(), + ); + let config = + ConnectorProperties::extract(source_props).map_err(|e| ConnectorError(e.into()))?; let parser = SourceParserImpl::create( &format, &properties, @@ -289,10 +298,12 @@ pub struct SourceDescBuilderV2 { pk_column_ids: Vec, properties: HashMap, source_info: ProstStreamSourceInfo, + connector_node_addr: String, connector_message_buffer_size: usize, } impl SourceDescBuilderV2 { + #[allow(clippy::too_many_arguments)] pub fn new( row_id_index: Option, columns: Vec, @@ -300,6 +311,7 @@ impl SourceDescBuilderV2 { pk_column_ids: Vec, properties: HashMap, source_info: ProstStreamSourceInfo, + connector_node_addr: String, connector_message_buffer_size: usize, ) -> Self { Self { @@ -309,6 +321,7 @@ impl SourceDescBuilderV2 { pk_column_ids, properties, source_info, + connector_node_addr, connector_message_buffer_size, } } @@ -320,6 +333,7 @@ impl SourceDescBuilderV2 { ProstRowFormatType::DebeziumJson => SourceFormat::DebeziumJson, ProstRowFormatType::Avro => SourceFormat::Avro, ProstRowFormatType::Maxwell => SourceFormat::Maxwell, + ProstRowFormatType::CanalJson => SourceFormat::CanalJson, ProstRowFormatType::RowUnspecified => unreachable!(), }; @@ -347,6 +361,7 @@ impl SourceDescBuilderV2 { self.source_info.proto_message_name, self.properties, columns.clone(), + self.connector_node_addr, self.connector_message_buffer_size, ) .await?; @@ -403,6 +418,7 @@ pub mod test_utils { pk_column_ids, properties, source_info, + connector_node_addr: "127.0.0.1:60061".to_string(), connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, } } diff --git a/src/stream/src/executor/source/source_executor_v2.rs b/src/stream/src/executor/source/source_executor_v2.rs index e4ca67f0f7496..ace32952a2482 100644 --- a/src/stream/src/executor/source/source_executor_v2.rs +++ b/src/stream/src/executor/source/source_executor_v2.rs @@ -647,6 +647,9 @@ mod tests { vec![OrderPair::new(0, OrderType::Ascending)], column_ids, 2, + None, + 0, + false, ) .await .boxed()