diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 9b12d0b583d1e..8c74b93d96fad 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -66,6 +66,13 @@ message SourceNode { map secret_refs = 6; } +message IcebergScanNode { + repeated plan_common.ColumnCatalog columns = 1; + map with_properties = 2; + repeated bytes split = 3; + map secret_refs = 4; +} + message FileScanNode { enum FileFormat { FILE_FORMAT_UNSPECIFIED = 0; @@ -365,6 +372,7 @@ message PlanNode { MaxOneRowNode max_one_row = 36; LogRowSeqScanNode log_row_seq_scan = 37; FileScanNode file_scan = 38; + IcebergScanNode iceberg_scan = 39; // The following nodes are used for testing. bool block_executor = 100; bool busy_loop_executor = 101; diff --git a/risedev.yml b/risedev.yml index 3c7f8e0e09be4..5a5c25ceb55dc 100644 --- a/risedev.yml +++ b/risedev.yml @@ -20,7 +20,7 @@ profile: # config-path: src/config/example.toml steps: # If you want to use the local s3 storage, enable the following line - # - use: minio + - use: minio # If you want to use aws-s3, configure AK and SK in env var and enable the following lines: # - use: aws-s3 @@ -40,7 +40,7 @@ profile: - use: frontend # If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well. - # - use: compactor + - use: compactor # If you want to create source from Kafka, uncomment the following lines # - use: kafka diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index ee4e463422c14..fca7745284fe3 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -18,12 +18,20 @@ use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; +use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; -use risingwave_common::catalog::Schema; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::DataType; use risingwave_connector::sink::iceberg::IcebergConfig; +use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit}; +use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData}; +use risingwave_connector::WithOptionsSecResolved; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder}; use crate::error::BatchError; use crate::executor::{DataChunk, Executor}; +use crate::task::BatchTaskContext; pub struct IcebergScanExecutor { iceberg_config: IcebergConfig, @@ -108,3 +116,67 @@ impl IcebergScanExecutor { } } } + +pub struct IcebergScanExecutorBuilder {} + +#[async_trait::async_trait] +impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { + async fn new_boxed_executor( + source: &ExecutorBuilder<'_, C>, + inputs: Vec, + ) -> crate::error::Result { + ensure!( + inputs.is_empty(), + "Iceberg source should not have input executor!" + ); + let source_node = try_match_expand!( + source.plan_node().get_node_body().unwrap(), + NodeBody::IcebergScan + )?; + + // prepare connector source + let options_with_secret = WithOptionsSecResolved::new( + source_node.with_properties.clone(), + source_node.secret_refs.clone(), + ); + let config = ConnectorProperties::extract(options_with_secret.clone(), false) + .map_err(BatchError::connector)?; + + let split_list = source_node + .split + .iter() + .map(|split| SplitImpl::restore_from_bytes(split).unwrap()) + .collect_vec(); + assert_eq!(split_list.len(), 1); + + let fields = source_node + .columns + .iter() + .map(|prost| { + let column_desc = prost.column_desc.as_ref().unwrap(); + let data_type = DataType::from(column_desc.column_type.as_ref().unwrap()); + let name = column_desc.name.clone(); + Field::with_name(data_type, name) + }) + .collect(); + let schema = Schema::new(fields); + + if let ConnectorProperties::Iceberg(iceberg_properties) = config + && let SplitImpl::Iceberg(split) = &split_list[0] + { + let iceberg_properties: IcebergProperties = *iceberg_properties; + let split: IcebergSplit = split.clone(); + Ok(Box::new(IcebergScanExecutor::new( + iceberg_properties.to_iceberg_config(), + Some(split.snapshot_id), + split.table_meta.deserialize(), + split.files.into_iter().map(|x| x.deserialize()).collect(), + source.context.get_config().developer.chunk_size, + schema, + source.plan_node().get_identity().clone(), + ))) + } else { + unreachable!() + } + } +} diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 80dc57b4f3620..07be18ca72988 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -243,6 +243,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { NodeBody::SortOverWindow => SortOverWindowExecutor, NodeBody::MaxOneRow => MaxOneRowExecutor, NodeBody::FileScan => FileScanExecutorBuilder, + NodeBody::IcebergScan => IcebergScanExecutorBuilder, // Follow NodeBody only used for test NodeBody::BlockExecutor => BlockExecutorBuilder, NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder, diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index c4862556ae826..7a37be9183898 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -21,7 +21,6 @@ use risingwave_common::array::{DataChunk, Op, StreamChunk}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; use risingwave_common::types::DataType; use risingwave_connector::parser::SpecificParserConfig; -use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit}; use risingwave_connector::source::monitor::SourceMetrics; use risingwave_connector::source::reader::reader::SourceReader; use risingwave_connector::source::{ @@ -32,7 +31,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use super::Executor; use crate::error::{BatchError, Result}; -use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, IcebergScanExecutor}; +use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder}; use crate::task::BatchTaskContext; pub struct SourceExecutor { @@ -103,46 +102,28 @@ impl BoxedExecutorBuilder for SourceExecutor { .collect(); let schema = Schema::new(fields); - if let ConnectorProperties::Iceberg(iceberg_properties) = config { - let iceberg_properties: IcebergProperties = *iceberg_properties; - assert_eq!(split_list.len(), 1); - if let SplitImpl::Iceberg(split) = &split_list[0] { - let split: IcebergSplit = split.clone(); - Ok(Box::new(IcebergScanExecutor::new( - iceberg_properties.to_iceberg_config(), - Some(split.snapshot_id), - split.table_meta.deserialize(), - split.files.into_iter().map(|x| x.deserialize()).collect(), - source.context.get_config().developer.chunk_size, - schema, - source.plan_node().get_identity().clone(), - ))) - } else { - unreachable!() - } - } else { - let source_reader = SourceReader { - config, - columns, - parser_config, - connector_message_buffer_size: source - .context() - .get_config() - .developer - .connector_message_buffer_size, - }; - - Ok(Box::new(SourceExecutor { - source: source_reader, - column_ids, - metrics: source.context().source_metrics(), - source_id: TableId::new(source_node.source_id), - split_list, - schema, - identity: source.plan_node().get_identity().clone(), - chunk_size: source.context().get_config().developer.chunk_size, - })) - } + assert!(!matches!(config, ConnectorProperties::Iceberg(_))); + let source_reader = SourceReader { + config, + columns, + parser_config, + connector_message_buffer_size: source + .context() + .get_config() + .developer + .connector_message_buffer_size, + }; + + Ok(Box::new(SourceExecutor { + source: source_reader, + column_ids, + metrics: source.context().source_metrics(), + source_id: TableId::new(source_node.source_id), + split_list, + schema, + identity: source.plan_node().get_identity().clone(), + chunk_size: source.context().get_config().developer.chunk_size, + })) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 3433feb8d210b..4333fcaa3e90a 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::SourceNode; +use risingwave_pb::batch_plan::IcebergScanNode; use risingwave_sqlparser::ast::AsOf; use super::batch::prelude::*; @@ -99,9 +99,7 @@ impl ToBatchPb for BatchIcebergScan { fn to_batch_prost_body(&self) -> NodeBody { let source_catalog = self.source_catalog().unwrap(); let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts(); - NodeBody::Source(SourceNode { - source_id: source_catalog.id, - info: Some(source_catalog.info.clone()), + NodeBody::IcebergScan(IcebergScanNode { columns: self .core .column_catalog diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index e30dfa0dad377..bb18e2143aa7f 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -1052,9 +1052,7 @@ impl StageRunner { node_body: Some(NodeBody::LogRowSeqScan(scan_node)), } } - PlanNodeType::BatchSource - | PlanNodeType::BatchKafkaScan - | PlanNodeType::BatchIcebergScan => { + PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => { let node_body = execution_plan_node.node.clone(); let NodeBody::Source(mut source_node) = node_body else { unreachable!(); @@ -1074,6 +1072,26 @@ impl StageRunner { node_body: Some(NodeBody::Source(source_node)), } } + PlanNodeType::BatchIcebergScan => { + let node_body = execution_plan_node.node.clone(); + let NodeBody::IcebergScan(mut iceberg_scan_node) = node_body else { + unreachable!(); + }; + + let partition = partition + .expect("no partition info for seq scan") + .into_source() + .expect("PartitionInfo should be SourcePartitionInfo"); + iceberg_scan_node.split = partition + .into_iter() + .map(|split| split.encode_to_bytes().into()) + .collect_vec(); + PbPlanNode { + children: vec![], + identity, + node_body: Some(NodeBody::IcebergScan(iceberg_scan_node)), + } + } _ => { let children = execution_plan_node .children diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index a7ff6eabdf7ff..a727ddd9db7dd 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -554,9 +554,7 @@ impl LocalQueryExecution { node_body: Some(node_body), }) } - PlanNodeType::BatchSource - | PlanNodeType::BatchKafkaScan - | PlanNodeType::BatchIcebergScan => { + PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => { let mut node_body = execution_plan_node.node.clone(); match &mut node_body { NodeBody::Source(ref mut source_node) => { @@ -579,6 +577,29 @@ impl LocalQueryExecution { node_body: Some(node_body), }) } + PlanNodeType::BatchIcebergScan => { + let mut node_body = execution_plan_node.node.clone(); + match &mut node_body { + NodeBody::IcebergScan(ref mut iceberg_scan_node) => { + if let Some(partition) = partition { + let partition = partition + .into_source() + .expect("PartitionInfo should be SourcePartitionInfo here"); + iceberg_scan_node.split = partition + .into_iter() + .map(|split| split.encode_to_bytes().into()) + .collect_vec(); + } + } + _ => unreachable!(), + } + + Ok(PbPlanNode { + children: vec![], + identity, + node_body: Some(node_body), + }) + } PlanNodeType::BatchLookupJoin => { let mut node_body = execution_plan_node.node.clone(); match &mut node_body { diff --git a/src/prost/build.rs b/src/prost/build.rs index 0682a63a02edb..6758c0ef437b4 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -80,6 +80,7 @@ fn main() -> Result<(), Box> { ".stream_plan.SourceBackfillNode", ".stream_plan.StreamSource", ".batch_plan.SourceNode", + ".batch_plan.IcebergScanNode", ]; // Build protobuf structs.