diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index fe053ff63dfc8..8c9894b726ec6 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -21,6 +21,7 @@ use risingwave_common::array::{DataChunk, Op, StreamChunk}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; use risingwave_common::types::DataType; use risingwave_connector::parser::SpecificParserConfig; +use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit}; use risingwave_connector::source::monitor::SourceMetrics; use risingwave_connector::source::reader::reader::SourceReader; use risingwave_connector::source::{ @@ -30,7 +31,9 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use super::Executor; use crate::error::{BatchError, Result}; -use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder}; +use crate::executor::{ + BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, FileSelector, IcebergScanExecutor, +}; use crate::task::BatchTaskContext; pub struct SourceExecutor { @@ -75,16 +78,6 @@ impl BoxedExecutorBuilder for SourceExecutor { .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) .collect(); - let source_reader = SourceReader { - config, - columns, - parser_config, - connector_message_buffer_size: source - .context() - .get_config() - .developer - .connector_message_buffer_size, - }; let source_ctrl_opts = SourceCtrlOpts { chunk_size: source.context().get_config().developer.chunk_size, rate_limit: None, @@ -110,16 +103,44 @@ impl BoxedExecutorBuilder for SourceExecutor { .collect(); let schema = Schema::new(fields); - Ok(Box::new(SourceExecutor { - source: source_reader, - column_ids, - metrics: source.context().source_metrics(), - source_id: TableId::new(source_node.source_id), - split, - schema, - identity: source.plan_node().get_identity().clone(), - source_ctrl_opts, - })) + if let ConnectorProperties::Iceberg(iceberg_properties) = config { + let iceberg_properties: IcebergProperties = *iceberg_properties; + if let SplitImpl::Iceberg(split) = split { + let split: IcebergSplit = split; + Ok(Box::new(IcebergScanExecutor::new( + iceberg_properties.to_iceberg_config(), + Some(split.snapshot_id), + FileSelector::FileList(split.files), + source.context.get_config().developer.chunk_size, + schema, + source.plan_node().get_identity().clone(), + ))) + } else { + unreachable!() + } + } else { + let source_reader = SourceReader { + config, + columns, + parser_config, + connector_message_buffer_size: source + .context() + .get_config() + .developer + .connector_message_buffer_size, + }; + + Ok(Box::new(SourceExecutor { + source: source_reader, + column_ids, + metrics: source.context().source_metrics(), + source_id: TableId::new(source_node.source_id), + split, + schema, + identity: source.plan_node().get_identity().clone(), + source_ctrl_opts, + })) + } } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 721cfa2f241c7..a784a75a3c188 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -974,6 +974,8 @@ pub enum EncodingProperties { Json(JsonProperties), Bytes(BytesProperties), Native, + /// Encoding can't be specified because the source will determines it. Now only used in Iceberg. + None, #[default] Unspecified, } @@ -987,6 +989,8 @@ pub enum ProtocolProperties { Plain, Upsert, Native, + /// Protocol can't be specified because the source will determines it. Now only used in Iceberg. + None, #[default] Unspecified, } @@ -1004,6 +1008,7 @@ impl SpecificParserConfig { // in the future let protocol_config = match format { SourceFormat::Native => ProtocolProperties::Native, + SourceFormat::None => ProtocolProperties::None, SourceFormat::Debezium => ProtocolProperties::Debezium, SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo, SourceFormat::Maxwell => ProtocolProperties::Maxwell, @@ -1114,6 +1119,7 @@ impl SpecificParserConfig { EncodingProperties::Bytes(BytesProperties { column_name: None }) } (SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native, + (SourceFormat::None, SourceEncode::None) => EncodingProperties::None, (format, encode) => { bail!("Unsupported format {:?} encode {:?}", format, encode); } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 625f3dedb30bc..6be4d7c1a755e 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -261,6 +261,7 @@ pub enum SourceFormat { #[default] Invalid, Native, + None, Debezium, DebeziumMongo, Maxwell, @@ -274,6 +275,7 @@ pub enum SourceEncode { #[default] Invalid, Native, + None, Avro, Csv, Protobuf, @@ -334,6 +336,7 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result (PbFormatType::Native, PbEncodeType::Native) => { (SourceFormat::Native, SourceEncode::Native) } + (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None), (PbFormatType::Debezium, PbEncodeType::Avro) => { (SourceFormat::Debezium, SourceEncode::Avro) } diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 899828322c810..776ab65a05540 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -14,12 +14,17 @@ use std::collections::HashMap; +use anyhow::anyhow; use async_trait::async_trait; +use icelake::types::DataContentType; +use itertools::Itertools; +use risingwave_common::bail; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; use crate::error::ConnectorResult; use crate::parser::ParserConfig; +use crate::sink::iceberg::IcebergConfig; use crate::source::{ BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields, @@ -50,6 +55,22 @@ pub struct IcebergProperties { pub unknown_fields: HashMap, } +impl IcebergProperties { + pub fn to_iceberg_config(&self) -> IcebergConfig { + IcebergConfig { + database_name: Some(self.database_name.clone()), + table_name: self.table_name.clone(), + catalog_type: Some(self.catalog_type.clone()), + path: self.warehouse_path.clone(), + endpoint: Some(self.endpoint.clone()), + access_key: self.s3_access.clone(), + secret_key: self.s3_secret.clone(), + region: Some(self.region_name.clone()), + ..Default::default() + } + } +} + impl SourceProperties for IcebergProperties { type Split = IcebergSplit; type SplitEnumerator = IcebergSplitEnumerator; @@ -65,19 +86,23 @@ impl UnknownFields for IcebergProperties { } #[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] -pub struct IcebergSplit {} +pub struct IcebergSplit { + pub split_id: i64, + pub snapshot_id: i64, + pub files: Vec, +} impl SplitMetaData for IcebergSplit { fn id(&self) -> SplitId { - unimplemented!() + self.split_id.to_string().into() } - fn restore_from_json(_value: JsonbVal) -> ConnectorResult { - unimplemented!() + fn restore_from_json(value: JsonbVal) -> ConnectorResult { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into()) } fn encode_to_json(&self) -> JsonbVal { - unimplemented!() + serde_json::to_value(self.clone()).unwrap().into() } fn update_with_offset(&mut self, _start_offset: String) -> ConnectorResult<()> { @@ -86,7 +111,9 @@ impl SplitMetaData for IcebergSplit { } #[derive(Debug, Clone)] -pub struct IcebergSplitEnumerator {} +pub struct IcebergSplitEnumerator { + config: IcebergConfig, +} #[async_trait] impl SplitEnumerator for IcebergSplitEnumerator { @@ -94,17 +121,65 @@ impl SplitEnumerator for IcebergSplitEnumerator { type Split = IcebergSplit; async fn new( - _properties: Self::Properties, + properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> ConnectorResult { - Ok(Self {}) + let iceberg_config = properties.to_iceberg_config(); + Ok(Self { + config: iceberg_config, + }) } async fn list_splits(&mut self) -> ConnectorResult> { + // Iceberg source does not support streaming queries Ok(vec![]) } } +impl IcebergSplitEnumerator { + pub async fn list_splits_batch( + &self, + batch_parallelism: usize, + ) -> ConnectorResult> { + if batch_parallelism == 0 { + bail!("Batch parallelism is 0. Cannot split the iceberg files."); + } + let table = self.config.load_table().await?; + let snapshot_id = table.current_table_metadata().current_snapshot_id.unwrap(); + let mut files = vec![]; + for file in table.current_data_files().await? { + if file.content != DataContentType::Data { + bail!("Reading iceberg table with delete files is unsupported. Please try to compact the table first."); + } + files.push(file.file_path); + } + let split_num = batch_parallelism; + // evenly split the files into splits based on the parallelism. + let split_size = files.len() / split_num; + let remaining = files.len() % split_num; + let mut splits = vec![]; + for i in 0..split_num { + let start = i * split_size; + let end = (i + 1) * split_size; + let split = IcebergSplit { + split_id: i as i64, + snapshot_id, + files: files[start..end].to_vec(), + }; + splits.push(split); + } + for i in 0..remaining { + splits[i] + .files + .push(files[split_num * split_size + i].clone()); + } + Ok(splits + .into_iter() + .filter(|split| !split.files.is_empty()) + .collect_vec()) + } +} + #[derive(Debug)] pub struct IcebergFileReader {} diff --git a/src/frontend/planner_test/tests/testdata/output/explain.yaml b/src/frontend/planner_test/tests/testdata/output/explain.yaml index 6319e836b96d1..e88ce0b9caa68 100644 --- a/src/frontend/planner_test/tests/testdata/output/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/output/explain.yaml @@ -75,7 +75,8 @@ }, "parent_edges": { "0": [] - } + }, + "batch_parallelism": 0 } - sql: | create table t1(v1 int); diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 726013fe1c18b..e77876a636f16 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -36,7 +36,6 @@ use risingwave_connector::parser::{ use risingwave_connector::schema::schema_registry::{ name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME, }; -use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::cdc::external::CdcTableType; use risingwave_connector::source::cdc::{ CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, @@ -1154,17 +1153,7 @@ pub async fn check_iceberg_source( ))); }; - let iceberg_config = IcebergConfig { - database_name: Some(properties.database_name), - table_name: properties.table_name, - catalog_type: Some(properties.catalog_type), - path: properties.warehouse_path, - endpoint: Some(properties.endpoint), - access_key: properties.s3_access, - secret_key: properties.s3_secret, - region: Some(properties.region_name), - ..Default::default() - }; + let iceberg_config = properties.to_iceberg_config(); let schema = Schema { fields: columns diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 168b00e0f8527..47784fc8c4d69 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -28,6 +28,7 @@ use risingwave_common::catalog::TableDesc; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; use risingwave_common::util::scan_range::ScanRange; +use risingwave_connector::source::iceberg::IcebergSplitEnumerator; use risingwave_connector::source::kafka::KafkaSplitEnumerator; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, SplitImpl, @@ -130,11 +131,7 @@ pub struct BatchPlanFragmenter { worker_node_manager: WorkerNodeSelector, catalog_reader: CatalogReader, - /// if batch_parallelism is None, it means no limit, we will use the available nodes count as - /// parallelism. - /// if batch_parallelism is Some(num), we will use the min(num, the available - /// nodes count) as parallelism. - batch_parallelism: Option, + batch_parallelism: usize, stage_graph_builder: Option, stage_graph: Option, @@ -155,13 +152,28 @@ impl BatchPlanFragmenter { batch_parallelism: Option, batch_node: PlanRef, ) -> SchedulerResult { + // if batch_parallelism is None, it means no limit, we will use the available nodes count as + // parallelism. + // if batch_parallelism is Some(num), we will use the min(num, the available + // nodes count) as parallelism. + let batch_parallelism = if let Some(num) = batch_parallelism { + // can be 0 if no available serving worker + min( + num.get() as usize, + worker_node_manager.schedule_unit_count(), + ) + } else { + // can be 0 if no available serving worker + worker_node_manager.schedule_unit_count() + }; + let mut plan_fragmenter = Self { query_id: Default::default(), - stage_graph_builder: Some(StageGraphBuilder::new()), next_stage_id: 0, worker_node_manager, catalog_reader, batch_parallelism, + stage_graph_builder: Some(StageGraphBuilder::new(batch_parallelism)), stage_graph: None, }; plan_fragmenter.split_into_stage(batch_node)?; @@ -264,32 +276,44 @@ impl SourceScanInfo { Self::Incomplete(fetch_info) } - pub async fn complete(self) -> SchedulerResult { + pub async fn complete(self, batch_parallelism: usize) -> SchedulerResult { let fetch_info = match self { SourceScanInfo::Incomplete(fetch_info) => fetch_info, SourceScanInfo::Complete(_) => { unreachable!("Never call complete when SourceScanInfo is already complete") } }; - let kafka_prop = match fetch_info.connector { - ConnectorProperties::Kafka(prop) => *prop, - _ => { - return Err(SchedulerError::Internal(anyhow!( - "Unsupported to query directly from this source" - ))) + match fetch_info.connector { + ConnectorProperties::Kafka(prop) => { + let mut kafka_enumerator = + KafkaSplitEnumerator::new(*prop, SourceEnumeratorContext::default().into()) + .await?; + let split_info = kafka_enumerator + .list_splits_batch(fetch_info.timebound.0, fetch_info.timebound.1) + .await? + .into_iter() + .map(SplitImpl::Kafka) + .collect_vec(); + Ok(SourceScanInfo::Complete(split_info)) } - }; - let mut kafka_enumerator = - KafkaSplitEnumerator::new(kafka_prop, SourceEnumeratorContext::default().into()) - .await?; - let split_info = kafka_enumerator - .list_splits_batch(fetch_info.timebound.0, fetch_info.timebound.1) - .await? - .into_iter() - .map(SplitImpl::Kafka) - .collect_vec(); - - Ok(SourceScanInfo::Complete(split_info)) + ConnectorProperties::Iceberg(prop) => { + let iceberg_enumerator = + IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::default().into()) + .await?; + + let split_info = iceberg_enumerator + .list_splits_batch(batch_parallelism) + .await? + .into_iter() + .map(SplitImpl::Iceberg) + .collect_vec(); + + Ok(SourceScanInfo::Complete(split_info)) + } + _ => Err(SchedulerError::Internal(anyhow!( + "Unsupported to query directly from this source" + ))), + } } pub fn split_info(&self) -> SchedulerResult<&Vec> { @@ -553,6 +577,8 @@ pub struct StageGraph { child_edges: HashMap>, /// Traverse from down to top. Used in schedule each stage. parent_edges: HashMap>, + + batch_parallelism: usize, } impl StageGraph { @@ -601,6 +627,7 @@ impl StageGraph { stages: complete_stages, child_edges: self.child_edges, parent_edges: self.parent_edges, + batch_parallelism: self.batch_parallelism, }) } @@ -630,7 +657,7 @@ impl StageGraph { .as_ref() .unwrap() .clone() - .complete() + .complete(self.batch_parallelism) .await?; let complete_stage = Arc::new(stage.clone_with_exchange_info_and_complete_source_info( @@ -676,14 +703,16 @@ struct StageGraphBuilder { stages: HashMap, child_edges: HashMap>, parent_edges: HashMap>, + batch_parallelism: usize, } impl StageGraphBuilder { - pub fn new() -> Self { + pub fn new(batch_parallelism: usize) -> Self { Self { stages: HashMap::new(), child_edges: HashMap::new(), parent_edges: HashMap::new(), + batch_parallelism, } } @@ -693,6 +722,7 @@ impl StageGraphBuilder { stages: self.stages, child_edges: self.child_edges, parent_edges: self.parent_edges, + batch_parallelism: self.batch_parallelism, } } @@ -797,15 +827,8 @@ impl BatchPlanFragmenter { lookup_join_parallelism } else if source_info.is_some() { 0 - } else if let Some(num) = self.batch_parallelism { - // can be 0 if no available serving worker - min( - num.get() as usize, - self.worker_node_manager.schedule_unit_count(), - ) } else { - // can be 0 if no available serving worker - self.worker_node_manager.schedule_unit_count() + self.batch_parallelism } } };