Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): introduce new source executor #6447

Merged
merged 9 commits into from
Nov 21, 2022
196 changes: 194 additions & 2 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,28 @@ use futures::stream::BoxStream;
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnId, TableId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::error::ErrorCode::{ConnectorError, ProtocolError};
use risingwave_common::error::{internal_error, Result, RwError, ToRwResult};
use risingwave_common::util::select_all;
use risingwave_connector::source::{
Column, ConnectorProperties, ConnectorState, SourceMessage, SplitId, SplitMetaData,
SplitReaderImpl,
};
use risingwave_pb::catalog::{
ColumnIndex as ProstColumnIndex, StreamSourceInfo as ProstStreamSourceInfo,
};
use risingwave_pb::plan_common::{
ColumnCatalog as ProstColumnCatalog, RowFormatType as ProstRowFormatType,
};

use crate::monitor::SourceMetrics;
use crate::{SourceColumnDesc, SourceParserImpl, SourceStreamChunkBuilder, StreamChunkWithState};
use crate::{
SourceColumnDesc, SourceFormat, SourceParserImpl, SourceStreamChunkBuilder,
StreamChunkWithState,
};

pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16;

#[derive(Clone, Debug)]
pub struct SourceContext {
Expand Down Expand Up @@ -173,6 +185,42 @@ pub struct ConnectorSource {
}

impl ConnectorSource {
#[allow(clippy::too_many_arguments)]
pub async fn new(
format: SourceFormat,
row_schema_location: &str,
use_schema_registry: bool,
proto_message_name: String,
properties: HashMap<String, String>,
columns: Vec<SourceColumnDesc>,
connector_node_addr: String,
connector_message_buffer_size: usize,
) -> Result<Self> {
// Store the connector node address to properties for later use.
let mut source_props: HashMap<String, String> =
HashMap::from_iter(properties.clone().into_iter());
source_props.insert(
"connector_node_addr".to_string(),
connector_node_addr.clone(),
);
let config =
ConnectorProperties::extract(source_props).map_err(|e| ConnectorError(e.into()))?;
let parser = SourceParserImpl::create(
&format,
&properties,
row_schema_location,
use_schema_registry,
proto_message_name,
)
.await?;
Ok(Self {
config,
columns,
parser,
connector_message_buffer_size,
})
}

fn get_target_columns(&self, column_ids: Vec<ColumnId>) -> Result<Vec<SourceColumnDesc>> {
column_ids
.iter()
Expand Down Expand Up @@ -231,3 +279,147 @@ impl ConnectorSource {
})
}
}

/// `SourceDescV2` describes a stream source.
#[derive(Debug)]
pub struct SourceDescV2 {
pub source: ConnectorSource,
pub format: SourceFormat,
pub columns: Vec<SourceColumnDesc>,
pub metrics: Arc<SourceMetrics>,
pub pk_column_ids: Vec<i32>,
}

#[derive(Clone)]
pub struct SourceDescBuilderV2 {
row_id_index: Option<ProstColumnIndex>,
columns: Vec<ProstColumnCatalog>,
metrics: Arc<SourceMetrics>,
pk_column_ids: Vec<i32>,
properties: HashMap<String, String>,
source_info: ProstStreamSourceInfo,
connector_node_addr: String,
connector_message_buffer_size: usize,
}

impl SourceDescBuilderV2 {
#[allow(clippy::too_many_arguments)]
pub fn new(
row_id_index: Option<ProstColumnIndex>,
columns: Vec<ProstColumnCatalog>,
metrics: Arc<SourceMetrics>,
pk_column_ids: Vec<i32>,
properties: HashMap<String, String>,
source_info: ProstStreamSourceInfo,
connector_node_addr: String,
connector_message_buffer_size: usize,
) -> Self {
Self {
row_id_index,
columns,
metrics,
pk_column_ids,
properties,
source_info,
connector_node_addr,
connector_message_buffer_size,
}
}

pub async fn build(self) -> Result<SourceDescV2> {
let format = match self.source_info.get_row_format()? {
ProstRowFormatType::Json => SourceFormat::Json,
ProstRowFormatType::Protobuf => SourceFormat::Protobuf,
ProstRowFormatType::DebeziumJson => SourceFormat::DebeziumJson,
ProstRowFormatType::Avro => SourceFormat::Avro,
ProstRowFormatType::Maxwell => SourceFormat::Maxwell,
ProstRowFormatType::CanalJson => SourceFormat::CanalJson,
ProstRowFormatType::RowUnspecified => unreachable!(),
};

if format == SourceFormat::Protobuf && self.source_info.row_schema_location.is_empty() {
return Err(ProtocolError("protobuf file location not provided".to_string()).into());
}

let mut columns: Vec<_> = self
.columns
.iter()
.map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap())))
.collect();
if let Some(row_id_index) = self.row_id_index.as_ref() {
columns[row_id_index.index as usize].skip_parse = true;
}
assert!(
!self.pk_column_ids.is_empty(),
"source should have at least one pk column"
);

let source = ConnectorSource::new(
format.clone(),
&self.source_info.row_schema_location,
self.source_info.use_schema_registry,
self.source_info.proto_message_name,
self.properties,
columns.clone(),
self.connector_node_addr,
self.connector_message_buffer_size,
)
.await?;

Ok(SourceDescV2 {
source,
format,
columns,
metrics: self.metrics,
pk_column_ids: self.pk_column_ids,
})
}
}

pub mod test_utils {
use std::collections::HashMap;

use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
use risingwave_pb::catalog::{ColumnIndex, StreamSourceInfo};
use risingwave_pb::plan_common::ColumnCatalog;

use super::{SourceDescBuilderV2, DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE};

pub fn create_source_desc_builder(
schema: &Schema,
row_id_index: Option<u64>,
pk_column_ids: Vec<i32>,
source_info: StreamSourceInfo,
properties: HashMap<String, String>,
) -> SourceDescBuilderV2 {
let row_id_index = row_id_index.map(|index| ColumnIndex { index });
let columns = schema
.fields
.iter()
.enumerate()
.map(|(i, f)| ColumnCatalog {
column_desc: Some(
ColumnDesc {
data_type: f.data_type.clone(),
column_id: ColumnId::from(i as i32), // use column index as column id
name: f.name.clone(),
field_descs: vec![],
type_name: "".to_string(),
}
.to_protobuf(),
),
is_hidden: false,
})
.collect();
SourceDescBuilderV2 {
row_id_index,
columns,
metrics: Default::default(),
pk_column_ids,
properties,
source_info,
connector_node_addr: "127.0.0.1:60061".to_string(),
connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE,
}
}
}
1 change: 1 addition & 0 deletions src/source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub use manager::test_utils as table_test_utils;

mod common;
pub mod connector_source;
pub use connector_source::test_utils as connector_test_utils;
pub mod monitor;
pub mod row_id;
mod table;
Expand Down
3 changes: 2 additions & 1 deletion src/source/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_pb::catalog::ColumnIndex as ProstColumnIndex;
use risingwave_pb::plan_common::{ColumnCatalog as ProstColumnCatalog, RowFormatType};
use risingwave_pb::stream_plan::source_node::Info as ProstSourceInfo;

use crate::connector_source::DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE;
use crate::monitor::SourceMetrics;
use crate::table::TableSource;
use crate::{ConnectorSource, SourceFormat, SourceImpl, SourceParserImpl};
Expand Down Expand Up @@ -179,7 +180,7 @@ impl Default for TableSourceManager {
TableSourceManager {
sources: Default::default(),
metrics: Default::default(),
connector_message_buffer_size: 16,
connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
pub mod source_executor;
pub use source_executor::*;

pub mod source_executor_v2;

mod reader;
pub mod state_table_handler;

Expand Down
Loading