From 7888b8f461a75953c63a1b210cf716bcbb8a4a53 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 1 Feb 2024 14:48:01 +0800 Subject: [PATCH 1/4] support iceberg scan executor --- Cargo.lock | 3 + src/batch/Cargo.toml | 3 + src/batch/src/error.rs | 7 + src/batch/src/executor/iceberg_scan.rs | 185 ++++++++++++++++++ src/batch/src/executor/mod.rs | 1 + src/connector/src/sink/iceberg/jni_catalog.rs | 5 +- src/connector/src/sink/iceberg/mod.rs | 59 +++--- 7 files changed, 230 insertions(+), 33 deletions(-) create mode 100644 src/batch/src/executor/iceberg_scan.rs diff --git a/Cargo.lock b/Cargo.lock index 7e6f91306c3aa..2f84d4a83e400 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8574,6 +8574,8 @@ name = "risingwave_batch" version = "1.7.0-alpha" dependencies = [ "anyhow", + "arrow-array 50.0.0", + "arrow-schema 50.0.0", "assert_matches", "async-recursion", "async-trait", @@ -8584,6 +8586,7 @@ dependencies = [ "futures-util", "hashbrown 0.14.0", "hytra", + "icelake", "itertools 0.12.0", "madsim-tokio", "madsim-tonic", diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 053a133bc2337..c1fcc23ca3484 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -15,6 +15,8 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" +arrow-array = { workspace = true } +arrow-schema = { workspace = true } assert_matches = "1" async-recursion = "1" async-trait = "0.1" @@ -24,6 +26,7 @@ futures-async-stream = { workspace = true } futures-util = "0.3" hashbrown = { workspace = true } hytra = "0.1.2" +icelake = { workspace = true } itertools = "0.12" memcomparable = "0.2" parking_lot = { version = "0.12", features = ["arc_lock"] } diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index 11614386cd0b5..1cbfc7247827c 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -109,6 +109,13 @@ pub enum BatchError { DmlError, ), + #[error(transparent)] + Iceberg( + #[from] + #[backtrace] + icelake::Error, + ), + // Make the ref-counted type to be a variant for easier code structuring. // TODO(error-handling): replace with `thiserror_ext::Arc` #[error(transparent)] diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs new file mode 100644 index 0000000000000..20dc9df97c707 --- /dev/null +++ b/src/batch/src/executor/iceberg_scan.rs @@ -0,0 +1,185 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::{DefaultHasher, Hash, Hasher}; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow_array::RecordBatch; +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use icelake::io::{FileScan, TableScan}; +use icelake::TableIdentifier; +use risingwave_common::catalog::Schema; +use risingwave_connector::sink::iceberg::IcebergConfig; + +use crate::error::BatchError; +use crate::executor::{DataChunk, Executor}; + +/// Create a iceberg scan executor. +/// +/// # Examples +/// +/// ``` +/// use futures_async_stream::for_await; +/// use risingwave_common::catalog::{Field, Schema}; +/// use risingwave_common::types::DataType; +/// use risingwave_connector::sink::iceberg::IcebergConfig; +/// +/// use crate::executor::iceberg_scan::{FileSelector, IcebergScanExecutor}; +/// use crate::executor::Executor; +/// +/// #[tokio::test] +/// async fn test_iceberg_scan() { +/// let iceberg_scan_executor = IcebergScanExecutor { +/// database_name: "demo_db".into(), +/// table_name: "demo_table".into(), +/// file_selector: FileSelector::select_all(), +/// iceberg_config: IcebergConfig { +/// database_name: "demo_db".into(), +/// table_name: "demo_table".into(), +/// catalog_type: Some("storage".into()), +/// path: "s3a://hummock001/".into(), +/// endpoint: Some("http://127.0.0.1:9301".into()), +/// access_key: "hummockadmin".into(), +/// secret_key: "hummockadmin".into(), +/// region: Some("us-east-1".into()), +/// ..Default::default() +/// }, +/// snapshot_id: None, +/// batch_size: 1024, +/// schema: Schema::new(vec![ +/// Field::with_name(DataType::Int64, "seq_id"), +/// Field::with_name(DataType::Int64, "user_id"), +/// Field::with_name(DataType::Varchar, "user_name"), +/// ]), +/// identity: "iceberg_scan".into(), +/// }; +/// +/// let stream = Box::new(iceberg_scan_executor).execute(); +/// #[for_await] +/// for chunk in stream { +/// let chunk = chunk.unwrap(); +/// println!("{:?}", chunk); +/// } +/// } +/// ``` + +pub struct IcebergScanExecutor { + database_name: String, + table_name: String, + iceberg_config: IcebergConfig, + snapshot_id: Option, + file_selector: FileSelector, + batch_size: usize, + + schema: Schema, + identity: String, +} + +impl Executor for IcebergScanExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl IcebergScanExecutor { + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + let catalog = self.iceberg_config.create_catalog().await?; + + let table_ident = TableIdentifier::new(vec![self.database_name, self.table_name]).unwrap(); + let table = catalog + .load_table(&table_ident) + .await + .map_err(BatchError::Iceberg)?; + + let table_scan: TableScan = table + .new_scan_builder() + .with_snapshot_id( + self.snapshot_id + .unwrap_or_else(|| table.current_table_metadata().current_snapshot_id.unwrap()), + ) + .with_batch_size(self.batch_size) + .with_column_names(self.schema.names()) + .build() + .map_err(|e| BatchError::Internal(anyhow!(e)))?; + let file_scan_stream: icelake::io::FileScanStream = + table_scan.scan(&table).await.map_err(BatchError::Iceberg)?; + + #[for_await] + for file_scan in file_scan_stream { + let file_scan: FileScan = file_scan.map_err(BatchError::Iceberg)?; + if !self.file_selector.select(file_scan.path()) { + continue; + } + let record_batch_stream = file_scan.scan().await.map_err(BatchError::Iceberg)?; + + #[for_await] + for record_batch in record_batch_stream { + let record_batch: RecordBatch = record_batch.map_err(BatchError::Iceberg)?; + let chunk = Self::record_batch_to_chunk(record_batch)?; + debug_assert_eq!(chunk.data_types(), self.schema.data_types()); + yield chunk; + } + } + } + + fn record_batch_to_chunk(record_batch: RecordBatch) -> Result { + let mut columns = Vec::with_capacity(record_batch.num_columns()); + for array in record_batch.columns() { + let column = Arc::new(array.try_into()?); + columns.push(column); + } + Ok(DataChunk::new(columns, record_batch.num_rows())) + } +} + +pub enum FileSelector { + // File paths to be scanned by this executor are specified. + FileList(Vec), + // Data files to be scanned by this executor could be calculated by Hash(file_path) % num_tasks == task_id. + // task_id, num_tasks + Hash(usize, usize), +} + +impl FileSelector { + fn select_all() -> Self { + FileSelector::Hash(0, 1) + } + + fn select(&self, path: &str) -> bool { + match self { + FileSelector::FileList(paths) => paths.contains(&path.to_string()), + FileSelector::Hash(task_id, num_tasks) => { + let hash = Self::hash_str_to_usize(path); + hash % num_tasks == *task_id + } + } + } + + fn hash_str_to_usize(s: &str) -> usize { + let mut hasher = DefaultHasher::new(); + s.hash(&mut hasher); + hasher.finish() as usize + } +} diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 3b0298643484a..b6344ab9600c0 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -20,6 +20,7 @@ mod generic_exchange; mod group_top_n; mod hash_agg; mod hop_window; +mod iceberg_scan; mod insert; mod join; mod limit; diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/sink/iceberg/jni_catalog.rs index 0057257df996a..11de50e69936a 100644 --- a/src/connector/src/sink/iceberg/jni_catalog.rs +++ b/src/connector/src/sink/iceberg/jni_catalog.rs @@ -29,8 +29,6 @@ use jni::JavaVM; use risingwave_jni_core::call_method; use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM}; -use crate::sink::{Result, SinkError}; - pub struct JniCatalog { java_catalog: GlobalRef, jvm: &'static JavaVM, @@ -144,7 +142,7 @@ impl JniCatalog { name: impl ToString, catalog_impl: impl ToString, java_catalog_props: HashMap, - ) -> Result { + ) -> anyhow::Result { let jvm = JVM.get_or_init()?; execute_with_jni_env(jvm, |env| { @@ -184,6 +182,5 @@ impl JniCatalog { config: base_config, }) as CatalogRef) }) - .map_err(SinkError::Iceberg) } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 9e51557ef477a..25b9ecf5d2dc8 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -67,7 +67,7 @@ pub const ICEBERG_SINK: &str = "iceberg"; static RW_CATALOG_NAME: &str = "risingwave"; -#[derive(Debug, Clone, Deserialize, WithOptions)] +#[derive(Debug, Clone, Deserialize, WithOptions, Default)] #[serde(deny_unknown_fields)] pub struct IcebergConfig { pub connector: String, // Avoid deny unknown field. Must be "iceberg" @@ -348,42 +348,43 @@ impl IcebergConfig { Ok((base_catalog_config, java_catalog_configs)) } -} -async fn create_catalog(config: &IcebergConfig) -> Result { - match config.catalog_type() { - "storage" | "rest" => { - let iceberg_configs = config.build_iceberg_configs()?; - let catalog = load_catalog(&iceberg_configs) - .await - .map_err(|e| SinkError::Iceberg(anyhow!(e)))?; - Ok(catalog) - } - catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { - // Create java catalog - let (base_catalog_config, java_catalog_props) = config.build_jni_catalog_configs()?; - let catalog_impl = match catalog_type { - "hive" => "org.apache.iceberg.hive.HiveCatalog", - "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", - "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", - "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", - _ => unreachable!(), - }; + pub async fn create_catalog(&self) -> anyhow::Result { + match self.catalog_type() { + "storage" | "rest" => { + let iceberg_configs = self.build_iceberg_configs()?; + let catalog = load_catalog(&iceberg_configs) + .await + .map_err(|e| anyhow!(e))?; + Ok(catalog) + } + catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { + // Create java catalog + let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?; + let catalog_impl = match catalog_type { + "hive" => "org.apache.iceberg.hive.HiveCatalog", + "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", + "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", + "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", + _ => unreachable!(), + }; - jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) - } - "mock" => Ok(Arc::new(MockCatalog{})), - _ => { - Err(SinkError::Iceberg(anyhow!( + jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) + } + "mock" => Ok(Arc::new(MockCatalog{})), + _ => { + Err(anyhow!( "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`", - config.catalog_type() - ))) + self.catalog_type() + )) + } } } } pub async fn create_table(config: &IcebergConfig) -> Result { - let catalog = create_catalog(config) + let catalog = config + .create_catalog() .await .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; From 5e4a47aa469da454c9760b5664b017f25fb1cee8 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 1 Feb 2024 17:28:12 +0800 Subject: [PATCH 2/4] refactor --- src/batch/src/executor/iceberg_scan.rs | 10 ++----- src/connector/src/sink/iceberg/mod.rs | 38 ++++++++++++------------- src/frontend/src/handler/create_sink.rs | 4 +-- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 20dc9df97c707..3c69570049322 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -20,7 +20,6 @@ use arrow_array::RecordBatch; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use icelake::io::{FileScan, TableScan}; -use icelake::TableIdentifier; use risingwave_common::catalog::Schema; use risingwave_connector::sink::iceberg::IcebergConfig; @@ -105,13 +104,10 @@ impl Executor for IcebergScanExecutor { impl IcebergScanExecutor { #[try_stream(ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { - let catalog = self.iceberg_config.create_catalog().await?; - - let table_ident = TableIdentifier::new(vec![self.database_name, self.table_name]).unwrap(); - let table = catalog - .load_table(&table_ident) + let table = self.iceberg_config + .load_table() .await - .map_err(BatchError::Iceberg)?; + .map_err(BatchError::Internal)?; let table_scan: TableScan = table .new_scan_builder() diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 25b9ecf5d2dc8..2783b15b4ddb5 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -349,7 +349,7 @@ impl IcebergConfig { Ok((base_catalog_config, java_catalog_configs)) } - pub async fn create_catalog(&self) -> anyhow::Result { + async fn create_catalog(&self) -> anyhow::Result { match self.catalog_type() { "storage" | "rest" => { let iceberg_configs = self.build_iceberg_configs()?; @@ -380,25 +380,25 @@ impl IcebergConfig { } } } -} -pub async fn create_table(config: &IcebergConfig) -> Result
{ - let catalog = config - .create_catalog() - .await - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; + pub async fn load_table(&self) -> anyhow::Result
{ + let catalog = self + .create_catalog() + .await + .map_err(|e| anyhow!("Unable to load iceberg catalog: {e}"))?; - let table_id = TableIdentifier::new( - vec![config.database_name.as_str()] - .into_iter() - .chain(config.table_name.split('.')), - ) - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?; - - catalog - .load_table(&table_id) - .await - .map_err(|err| SinkError::Iceberg(anyhow!(err))) + let table_id = TableIdentifier::new( + vec![self.database_name.as_str()] + .into_iter() + .chain(self.table_name.split('.')), + ) + .map_err(|e| anyhow!("Unable to parse table name: {e}"))?; + + catalog + .load_table(&table_id) + .await + .map_err(|err| anyhow!(err)) + } } pub struct IcebergSink { @@ -427,7 +427,7 @@ impl Debug for IcebergSink { impl IcebergSink { async fn create_and_validate_table(&self) -> Result
{ - let table = create_table(&self.config).await?; + let table = self.config.load_table().await.map_err(SinkError::Iceberg)?; let sink_schema = self.param.schema(); let iceberg_schema = table diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 2ccd32a4c0065..e2e9388e81866 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -28,7 +28,7 @@ use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_common::{bail, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; -use risingwave_connector::sink::iceberg::{create_table, IcebergConfig, ICEBERG_SINK}; +use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; @@ -321,7 +321,7 @@ pub async fn get_partition_compute_info( async fn get_partition_compute_info_for_iceberg( iceberg_config: &IcebergConfig, ) -> Result> { - let table = create_table(iceberg_config).await?; + let table = iceberg_config.load_table().await?; let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() else { return Ok(None); }; From 8f0681bea265ec3f18ea0ee6f92561cebe85726b Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 1 Feb 2024 17:43:23 +0800 Subject: [PATCH 3/4] fmt --- src/batch/src/executor/iceberg_scan.rs | 3 ++- src/connector/src/sink/iceberg/mod.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 3c69570049322..a417192d8124e 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -104,7 +104,8 @@ impl Executor for IcebergScanExecutor { impl IcebergScanExecutor { #[try_stream(ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { - let table = self.iceberg_config + let table = self + .iceberg_config .load_table() .await .map_err(BatchError::Internal)?; diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 2783b15b4ddb5..0e4f28f64639c 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -392,7 +392,7 @@ impl IcebergConfig { .into_iter() .chain(self.table_name.split('.')), ) - .map_err(|e| anyhow!("Unable to parse table name: {e}"))?; + .map_err(|e| anyhow!("Unable to parse table name: {e}"))?; catalog .load_table(&table_id) From 52de44c014f97819e5c8d48c520a805df6417f7a Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 1 Feb 2024 18:06:49 +0800 Subject: [PATCH 4/4] fix doc --- src/batch/src/executor/iceberg_scan.rs | 48 ++++++++++++++++---------- src/batch/src/executor/mod.rs | 1 + 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index a417192d8124e..caf1289220d48 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -32,20 +32,15 @@ use crate::executor::{DataChunk, Executor}; /// /// ``` /// use futures_async_stream::for_await; +/// use risingwave_batch::executor::{Executor, FileSelector, IcebergScanExecutor}; /// use risingwave_common::catalog::{Field, Schema}; /// use risingwave_common::types::DataType; /// use risingwave_connector::sink::iceberg::IcebergConfig; /// -/// use crate::executor::iceberg_scan::{FileSelector, IcebergScanExecutor}; -/// use crate::executor::Executor; -/// /// #[tokio::test] /// async fn test_iceberg_scan() { -/// let iceberg_scan_executor = IcebergScanExecutor { -/// database_name: "demo_db".into(), -/// table_name: "demo_table".into(), -/// file_selector: FileSelector::select_all(), -/// iceberg_config: IcebergConfig { +/// let iceberg_scan_executor = IcebergScanExecutor::new( +/// IcebergConfig { /// database_name: "demo_db".into(), /// table_name: "demo_table".into(), /// catalog_type: Some("storage".into()), @@ -56,15 +51,16 @@ use crate::executor::{DataChunk, Executor}; /// region: Some("us-east-1".into()), /// ..Default::default() /// }, -/// snapshot_id: None, -/// batch_size: 1024, -/// schema: Schema::new(vec![ +/// None, +/// FileSelector::select_all(), +/// 1024, +/// Schema::new(vec![ /// Field::with_name(DataType::Int64, "seq_id"), /// Field::with_name(DataType::Int64, "user_id"), /// Field::with_name(DataType::Varchar, "user_name"), /// ]), -/// identity: "iceberg_scan".into(), -/// }; +/// "iceberg_scan".into(), +/// ); /// /// let stream = Box::new(iceberg_scan_executor).execute(); /// #[for_await] @@ -76,8 +72,6 @@ use crate::executor::{DataChunk, Executor}; /// ``` pub struct IcebergScanExecutor { - database_name: String, - table_name: String, iceberg_config: IcebergConfig, snapshot_id: Option, file_selector: FileSelector, @@ -102,6 +96,24 @@ impl Executor for IcebergScanExecutor { } impl IcebergScanExecutor { + pub fn new( + iceberg_config: IcebergConfig, + snapshot_id: Option, + file_selector: FileSelector, + batch_size: usize, + schema: Schema, + identity: String, + ) -> Self { + Self { + iceberg_config, + snapshot_id, + file_selector, + batch_size, + schema, + identity, + } + } + #[try_stream(ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { let table = self @@ -160,11 +172,11 @@ pub enum FileSelector { } impl FileSelector { - fn select_all() -> Self { + pub fn select_all() -> Self { FileSelector::Hash(0, 1) } - fn select(&self, path: &str) -> bool { + pub fn select(&self, path: &str) -> bool { match self { FileSelector::FileList(paths) => paths.contains(&path.to_string()), FileSelector::Hash(task_id, num_tasks) => { @@ -174,7 +186,7 @@ impl FileSelector { } } - fn hash_str_to_usize(s: &str) -> usize { + pub fn hash_str_to_usize(s: &str) -> usize { let mut hasher = DefaultHasher::new(); s.hash(&mut hasher); hasher.finish() as usize diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index b6344ab9600c0..ded2af7089826 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -53,6 +53,7 @@ pub use generic_exchange::*; pub use group_top_n::*; pub use hash_agg::*; pub use hop_window::*; +pub use iceberg_scan::*; pub use insert::*; pub use join::*; pub use limit::*;