Skip to content

Commit

Permalink
refactor(iceberg): Separate iceberg source pb from source pb (#18209)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Aug 27, 2024
1 parent edaace2 commit 22926d6
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 55 deletions.
8 changes: 8 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ message SourceNode {
map<string, secret.SecretRef> secret_refs = 6;
}

message IcebergScanNode {
repeated plan_common.ColumnCatalog columns = 1;
map<string, string> with_properties = 2;
repeated bytes split = 3;
map<string, secret.SecretRef> secret_refs = 4;
}

message FileScanNode {
enum FileFormat {
FILE_FORMAT_UNSPECIFIED = 0;
Expand Down Expand Up @@ -365,6 +372,7 @@ message PlanNode {
MaxOneRowNode max_one_row = 36;
LogRowSeqScanNode log_row_seq_scan = 37;
FileScanNode file_scan = 38;
IcebergScanNode iceberg_scan = 39;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
4 changes: 2 additions & 2 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ profile:
# config-path: src/config/example.toml
steps:
# If you want to use the local s3 storage, enable the following line
# - use: minio
- use: minio

# If you want to use aws-s3, configure AK and SK in env var and enable the following lines:
# - use: aws-s3
Expand All @@ -40,7 +40,7 @@ profile:
- use: frontend

# If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well.
# - use: compactor
- use: compactor

# If you want to create source from Kafka, uncomment the following lines
# - use: kafka
Expand Down
74 changes: 73 additions & 1 deletion src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use itertools::Itertools;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::Schema;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};
use crate::task::BatchTaskContext;

pub struct IcebergScanExecutor {
iceberg_config: IcebergConfig,
Expand Down Expand Up @@ -108,3 +116,67 @@ impl IcebergScanExecutor {
}
}
}

pub struct IcebergScanExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
inputs: Vec<BoxedExecutor>,
) -> crate::error::Result<BoxedExecutor> {
ensure!(
inputs.is_empty(),
"Iceberg source should not have input executor!"
);
let source_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::IcebergScan
)?;

// prepare connector source
let options_with_secret = WithOptionsSecResolved::new(
source_node.with_properties.clone(),
source_node.secret_refs.clone(),
);
let config = ConnectorProperties::extract(options_with_secret.clone(), false)
.map_err(BatchError::connector)?;

let split_list = source_node
.split
.iter()
.map(|split| SplitImpl::restore_from_bytes(split).unwrap())
.collect_vec();
assert_eq!(split_list.len(), 1);

let fields = source_node
.columns
.iter()
.map(|prost| {
let column_desc = prost.column_desc.as_ref().unwrap();
let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
let name = column_desc.name.clone();
Field::with_name(data_type, name)
})
.collect();
let schema = Schema::new(fields);

if let ConnectorProperties::Iceberg(iceberg_properties) = config
&& let SplitImpl::Iceberg(split) = &split_list[0]
{
let iceberg_properties: IcebergProperties = *iceberg_properties;
let split: IcebergSplit = split.clone();
Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties.to_iceberg_config(),
Some(split.snapshot_id),
split.table_meta.deserialize(),
split.files.into_iter().map(|x| x.deserialize()).collect(),
source.context.get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
)))
} else {
unreachable!()
}
}
}
1 change: 1 addition & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::SortOverWindow => SortOverWindowExecutor,
NodeBody::MaxOneRow => MaxOneRowExecutor,
NodeBody::FileScan => FileScanExecutorBuilder,
NodeBody::IcebergScan => IcebergScanExecutorBuilder,
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuilder,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder,
Expand Down
65 changes: 23 additions & 42 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::types::DataType;
use risingwave_connector::parser::SpecificParserConfig;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_connector::source::reader::reader::SourceReader;
use risingwave_connector::source::{
Expand All @@ -32,7 +31,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::Executor;
use crate::error::{BatchError, Result};
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, IcebergScanExecutor};
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::task::BatchTaskContext;

pub struct SourceExecutor {
Expand Down Expand Up @@ -103,46 +102,28 @@ impl BoxedExecutorBuilder for SourceExecutor {
.collect();
let schema = Schema::new(fields);

if let ConnectorProperties::Iceberg(iceberg_properties) = config {
let iceberg_properties: IcebergProperties = *iceberg_properties;
assert_eq!(split_list.len(), 1);
if let SplitImpl::Iceberg(split) = &split_list[0] {
let split: IcebergSplit = split.clone();
Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties.to_iceberg_config(),
Some(split.snapshot_id),
split.table_meta.deserialize(),
split.files.into_iter().map(|x| x.deserialize()).collect(),
source.context.get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
)))
} else {
unreachable!()
}
} else {
let source_reader = SourceReader {
config,
columns,
parser_config,
connector_message_buffer_size: source
.context()
.get_config()
.developer
.connector_message_buffer_size,
};

Ok(Box::new(SourceExecutor {
source: source_reader,
column_ids,
metrics: source.context().source_metrics(),
source_id: TableId::new(source_node.source_id),
split_list,
schema,
identity: source.plan_node().get_identity().clone(),
chunk_size: source.context().get_config().developer.chunk_size,
}))
}
assert!(!matches!(config, ConnectorProperties::Iceberg(_)));
let source_reader = SourceReader {
config,
columns,
parser_config,
connector_message_buffer_size: source
.context()
.get_config()
.developer
.connector_message_buffer_size,
};

Ok(Box::new(SourceExecutor {
source: source_reader,
column_ids,
metrics: source.context().source_metrics(),
source_id: TableId::new(source_node.source_id),
split_list,
schema,
identity: source.plan_node().get_identity().clone(),
chunk_size: source.context().get_config().developer.chunk_size,
}))
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;

use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SourceNode;
use risingwave_pb::batch_plan::IcebergScanNode;
use risingwave_sqlparser::ast::AsOf;

use super::batch::prelude::*;
Expand Down Expand Up @@ -99,9 +99,7 @@ impl ToBatchPb for BatchIcebergScan {
fn to_batch_prost_body(&self) -> NodeBody {
let source_catalog = self.source_catalog().unwrap();
let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
NodeBody::Source(SourceNode {
source_id: source_catalog.id,
info: Some(source_catalog.info.clone()),
NodeBody::IcebergScan(IcebergScanNode {
columns: self
.core
.column_catalog
Expand Down
24 changes: 21 additions & 3 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,9 +1052,7 @@ impl StageRunner {
node_body: Some(NodeBody::LogRowSeqScan(scan_node)),
}
}
PlanNodeType::BatchSource
| PlanNodeType::BatchKafkaScan
| PlanNodeType::BatchIcebergScan => {
PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => {
let node_body = execution_plan_node.node.clone();
let NodeBody::Source(mut source_node) = node_body else {
unreachable!();
Expand All @@ -1074,6 +1072,26 @@ impl StageRunner {
node_body: Some(NodeBody::Source(source_node)),
}
}
PlanNodeType::BatchIcebergScan => {
let node_body = execution_plan_node.node.clone();
let NodeBody::IcebergScan(mut iceberg_scan_node) = node_body else {
unreachable!();
};

let partition = partition
.expect("no partition info for seq scan")
.into_source()
.expect("PartitionInfo should be SourcePartitionInfo");
iceberg_scan_node.split = partition
.into_iter()
.map(|split| split.encode_to_bytes().into())
.collect_vec();
PbPlanNode {
children: vec![],
identity,
node_body: Some(NodeBody::IcebergScan(iceberg_scan_node)),
}
}
_ => {
let children = execution_plan_node
.children
Expand Down
27 changes: 24 additions & 3 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,7 @@ impl LocalQueryExecution {
node_body: Some(node_body),
})
}
PlanNodeType::BatchSource
| PlanNodeType::BatchKafkaScan
| PlanNodeType::BatchIcebergScan => {
PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => {
let mut node_body = execution_plan_node.node.clone();
match &mut node_body {
NodeBody::Source(ref mut source_node) => {
Expand All @@ -579,6 +577,29 @@ impl LocalQueryExecution {
node_body: Some(node_body),
})
}
PlanNodeType::BatchIcebergScan => {
let mut node_body = execution_plan_node.node.clone();
match &mut node_body {
NodeBody::IcebergScan(ref mut iceberg_scan_node) => {
if let Some(partition) = partition {
let partition = partition
.into_source()
.expect("PartitionInfo should be SourcePartitionInfo here");
iceberg_scan_node.split = partition
.into_iter()
.map(|split| split.encode_to_bytes().into())
.collect_vec();
}
}
_ => unreachable!(),
}

Ok(PbPlanNode {
children: vec![],
identity,
node_body: Some(node_body),
})
}
PlanNodeType::BatchLookupJoin => {
let mut node_body = execution_plan_node.node.clone();
match &mut node_body {
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
".stream_plan.SourceBackfillNode",
".stream_plan.StreamSource",
".batch_plan.SourceNode",
".batch_plan.IcebergScanNode",
];

// Build protobuf structs.
Expand Down

0 comments on commit 22926d6

Please sign in to comment.