From ea4e8f0ddd1850ae8509afcdcc33c72cf25eb398 Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 7 Aug 2024 11:13:31 +0800 Subject: [PATCH] feat(iceberg): reduce iceberg catalog fetch rpc number for iceberg scan (#17939) --- src/batch/src/executor/iceberg_scan.rs | 144 ++++-------------- src/batch/src/executor/source.rs | 7 +- src/connector/src/sink/iceberg/mod.rs | 32 ++++ .../src/sink/iceberg/storage_catalog.rs | 4 + src/connector/src/source/iceberg/mod.rs | 75 +++++---- src/frontend/src/scheduler/plan_fragmenter.rs | 8 +- 6 files changed, 121 insertions(+), 149 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 7bf3835944b3..ee4e463422c1 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future; -use std::hash::{DefaultHasher, Hash, Hasher}; +use std::mem; -use anyhow::anyhow; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; +use iceberg::scan::FileScanTask; +use iceberg::spec::TableMetadata; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::catalog::Schema; use risingwave_connector::sink::iceberg::IcebergConfig; @@ -25,57 +25,13 @@ use risingwave_connector::sink::iceberg::IcebergConfig; use crate::error::BatchError; use crate::executor::{DataChunk, Executor}; -/// Create a iceberg scan executor. -/// -/// # Examples -/// -/// ``` -/// use futures_async_stream::for_await; -/// use risingwave_batch::executor::{Executor, FileSelector, IcebergScanExecutor}; -/// use risingwave_common::catalog::{Field, Schema}; -/// use risingwave_common::types::DataType; -/// use risingwave_connector::sink::iceberg::IcebergConfig; -/// -/// #[tokio::test] -/// async fn test_iceberg_scan() { -/// let iceberg_scan_executor = IcebergScanExecutor::new( -/// IcebergConfig { -/// database_name: Some("demo_db".into()), -/// table_name: "demo_table".into(), -/// catalog_type: Some("storage".into()), -/// path: "s3://hummock001/".into(), -/// endpoint: Some("http://127.0.0.1:9301".into()), -/// access_key: "hummockadmin".into(), -/// secret_key: "hummockadmin".into(), -/// region: Some("us-east-1".into()), -/// ..Default::default() -/// }, -/// None, -/// FileSelector::select_all(), -/// 1024, -/// Schema::new(vec![ -/// Field::with_name(DataType::Int64, "seq_id"), -/// Field::with_name(DataType::Int64, "user_id"), -/// Field::with_name(DataType::Varchar, "user_name"), -/// ]), -/// "iceberg_scan".into(), -/// ); -/// -/// let stream = Box::new(iceberg_scan_executor).execute(); -/// #[for_await] -/// for chunk in stream { -/// let chunk = chunk.unwrap(); -/// println!("{:?}", chunk); -/// } -/// } -/// ``` - pub struct IcebergScanExecutor { iceberg_config: IcebergConfig, + #[allow(dead_code)] snapshot_id: Option, - file_selector: FileSelector, + table_meta: TableMetadata, + file_scan_tasks: Vec, batch_size: usize, - schema: Schema, identity: String, } @@ -98,7 +54,8 @@ impl IcebergScanExecutor { pub fn new( iceberg_config: IcebergConfig, snapshot_id: Option, - file_selector: FileSelector, + table_meta: TableMetadata, + file_scan_tasks: Vec, batch_size: usize, schema: Schema, identity: String, @@ -106,7 +63,8 @@ impl IcebergScanExecutor { Self { iceberg_config, snapshot_id, - file_selector, + table_meta, + file_scan_tasks, batch_size, schema, identity, @@ -114,40 +72,23 @@ impl IcebergScanExecutor { } #[try_stream(ok = DataChunk, error = BatchError)] - async fn do_execute(self: Box) { - let table = self.iceberg_config.load_table_v2().await?; - - let snapshot_id = if let Some(snapshot_id) = self.snapshot_id { - snapshot_id - } else { - table - .metadata() - .current_snapshot() - .ok_or_else(|| { - BatchError::Internal(anyhow!("No snapshot found for iceberg table")) - })? - .snapshot_id() + async fn do_execute(mut self: Box) { + let table = self + .iceberg_config + .load_table_v2_with_metadata(self.table_meta) + .await?; + let data_types = self.schema.data_types(); + + let file_scan_tasks = mem::take(&mut self.file_scan_tasks); + + let file_scan_stream = { + #[try_stream] + async move { + for file_scan_task in file_scan_tasks { + yield file_scan_task; + } + } }; - let scan = table - .scan() - .snapshot_id(snapshot_id) - .with_batch_size(Some(self.batch_size)) - .select(self.schema.names()) - .build() - .map_err(|e| BatchError::Internal(anyhow!(e)))?; - - let file_selector = self.file_selector.clone(); - let file_scan_stream = scan - .plan_files() - .await - .map_err(BatchError::Iceberg)? - .filter(move |task| { - let res = task - .as_ref() - .map(|task| file_selector.select(task.data_file_path())) - .unwrap_or(true); - future::ready(res) - }); let reader = table .reader_builder() @@ -162,39 +103,8 @@ impl IcebergScanExecutor { for record_batch in record_batch_stream { let record_batch = record_batch.map_err(BatchError::Iceberg)?; let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - debug_assert_eq!(chunk.data_types(), self.schema.data_types()); + debug_assert_eq!(chunk.data_types(), data_types); yield chunk; } } } - -#[derive(Clone)] -pub enum FileSelector { - // File paths to be scanned by this executor are specified. - FileList(Vec), - // Data files to be scanned by this executor could be calculated by Hash(file_path) % num_tasks == task_id. - // task_id, num_tasks - Hash(usize, usize), -} - -impl FileSelector { - pub fn select_all() -> Self { - FileSelector::Hash(0, 1) - } - - pub fn select(&self, path: &str) -> bool { - match self { - FileSelector::FileList(paths) => paths.contains(&path.to_string()), - FileSelector::Hash(task_id, num_tasks) => { - let hash = Self::hash_str_to_usize(path); - hash % num_tasks == *task_id - } - } - } - - pub fn hash_str_to_usize(s: &str) -> usize { - let mut hasher = DefaultHasher::new(); - s.hash(&mut hasher); - hasher.finish() as usize - } -} diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 51d8da9d14d9..e01ee9e4b7de 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -32,9 +32,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use super::Executor; use crate::error::{BatchError, Result}; -use crate::executor::{ - BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, FileSelector, IcebergScanExecutor, -}; +use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, IcebergScanExecutor}; use crate::task::BatchTaskContext; pub struct SourceExecutor { @@ -113,7 +111,8 @@ impl BoxedExecutorBuilder for SourceExecutor { Ok(Box::new(IcebergScanExecutor::new( iceberg_properties.to_iceberg_config(), Some(split.snapshot_id), - FileSelector::FileList(split.files), + split.table_meta.deserialize(), + split.files.into_iter().map(|x| x.deserialize()).collect(), source.context.get_config().developer.chunk_size, schema, source.plan_node().get_identity().clone(), diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 76c9dff49137..1b35c6783054 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -29,6 +29,7 @@ use arrow_schema_iceberg::{ }; use async_trait::async_trait; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::spec::TableMetadata; use iceberg::table::Table as TableV2; use iceberg::{Catalog as CatalogV2, TableIdent}; use icelake::catalog::{ @@ -578,6 +579,37 @@ impl IcebergConfig { catalog.load_table(&table_id).await.map_err(Into::into) } + + pub async fn load_table_v2_with_metadata( + &self, + metadata: TableMetadata, + ) -> ConnectorResult { + match self.catalog_type() { + "storage" => { + let config = StorageCatalogConfig::builder() + .warehouse(self.path.clone()) + .access_key(self.access_key.clone()) + .secret_key(self.secret_key.clone()) + .region(self.region.clone()) + .endpoint(self.endpoint.clone()) + .build(); + let storage_catalog = storage_catalog::StorageCatalog::new(config)?; + + let table_id = self + .full_table_name_v2() + .context("Unable to parse table name")?; + + Ok(iceberg::table::Table::builder() + .metadata(metadata) + .identifier(table_id) + .file_io(storage_catalog.file_io().clone()) + // Only support readonly table for storage catalog now. + .readonly(true) + .build()) + } + _ => self.load_table_v2().await, + } + } } pub struct IcebergSink { diff --git a/src/connector/src/sink/iceberg/storage_catalog.rs b/src/connector/src/sink/iceberg/storage_catalog.rs index 5fb2c5105fda..7fd025020e1d 100644 --- a/src/connector/src/sink/iceberg/storage_catalog.rs +++ b/src/connector/src/sink/iceberg/storage_catalog.rs @@ -162,6 +162,10 @@ impl StorageCatalog { Ok(paths) } + + pub fn file_io(&self) -> &FileIO { + &self.file_io + } } #[async_trait] diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 1b76a0ff3e86..f101ff9ed6d4 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -18,11 +18,13 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; -use futures::StreamExt; -use iceberg::spec::{DataContentType, ManifestList}; +use futures_async_stream::for_await; +use iceberg::scan::FileScanTask; +use iceberg::spec::TableMetadata; use itertools::Itertools; pub use parquet_file_reader::*; use risingwave_common::bail; +use risingwave_common::catalog::Schema; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; @@ -110,11 +112,38 @@ impl UnknownFields for IcebergProperties { } } +#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct IcebergFileScanTaskJsonStr(String); + +impl IcebergFileScanTaskJsonStr { + pub fn deserialize(&self) -> FileScanTask { + serde_json::from_str(&self.0).unwrap() + } + + pub fn serialize(task: &FileScanTask) -> Self { + Self(serde_json::to_string(task).unwrap()) + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct TableMetadataJsonStr(String); + +impl TableMetadataJsonStr { + pub fn deserialize(&self) -> TableMetadata { + serde_json::from_str(&self.0).unwrap() + } + + pub fn serialize(metadata: &TableMetadata) -> Self { + Self(serde_json::to_string(metadata).unwrap()) + } +} + #[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct IcebergSplit { pub split_id: i64, pub snapshot_id: i64, - pub files: Vec, + pub table_meta: TableMetadataJsonStr, + pub files: Vec, } impl SplitMetaData for IcebergSplit { @@ -169,6 +198,7 @@ pub enum IcebergTimeTravelInfo { impl IcebergSplitEnumerator { pub async fn list_splits_batch( &self, + schema: Schema, time_traval_info: Option, batch_parallelism: usize, ) -> ConnectorResult> { @@ -209,31 +239,23 @@ impl IcebergSplitEnumerator { }; let mut files = vec![]; - let snapshot = table - .metadata() - .snapshot_by_id(snapshot_id) - .expect("snapshot must exist"); - - let manifest_list: ManifestList = snapshot - .load_manifest_list(table.file_io(), table.metadata()) - .await + let scan = table + .scan() + .snapshot_id(snapshot_id) + .select(schema.names()) + .build() .map_err(|e| anyhow!(e))?; - for entry in manifest_list.entries() { - let manifest = entry - .load_manifest(table.file_io()) - .await - .map_err(|e| anyhow!(e))?; - let mut manifest_entries_stream = - futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); - - while let Some(manifest_entry) = manifest_entries_stream.next().await { - let file = manifest_entry.data_file(); - if file.content_type() != DataContentType::Data { - bail!("Reading iceberg table with delete files is unsupported. Please try to compact the table first."); - } - files.push(file.file_path().to_string()); - } + + let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?; + + #[for_await] + for task in file_scan_stream { + let task = task.map_err(|e| anyhow!(e))?; + files.push(IcebergFileScanTaskJsonStr::serialize(&task)); } + + let table_meta = TableMetadataJsonStr::serialize(table.metadata()); + let split_num = batch_parallelism; // evenly split the files into splits based on the parallelism. let split_size = files.len() / split_num; @@ -245,6 +267,7 @@ impl IcebergSplitEnumerator { let split = IcebergSplit { split_id: i as i64, snapshot_id, + table_meta: table_meta.clone(), files: files[start..end].to_vec(), }; splits.push(split); diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 81cd1a904429..7643e5c5e7ba 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -28,7 +28,7 @@ use risingwave_batch::error::BatchError; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::bail; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; -use risingwave_common::catalog::TableDesc; +use risingwave_common::catalog::{Schema, TableDesc}; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::util::scan_range::ScanRange; @@ -269,6 +269,7 @@ impl Query { #[derive(Debug, Clone)] pub struct SourceFetchInfo { + pub schema: Schema, pub connector: ConnectorProperties, pub timebound: (Option, Option), pub as_of: Option, @@ -358,7 +359,7 @@ impl SourceScanInfo { }; let split_info = iceberg_enumerator - .list_splits_batch(time_travel_info, batch_parallelism) + .list_splits_batch(fetch_info.schema, time_travel_info, batch_parallelism) .await? .into_iter() .map(SplitImpl::Iceberg) @@ -1048,6 +1049,7 @@ impl BatchPlanFragmenter { ConnectorProperties::extract(source_catalog.with_properties.clone(), false)?; let timestamp_bound = batch_kafka_scan.kafka_timestamp_range_value(); return Ok(Some(SourceScanInfo::new(SourceFetchInfo { + schema: batch_kafka_scan.base.schema().clone(), connector: property, timebound: timestamp_bound, as_of: None, @@ -1061,6 +1063,7 @@ impl BatchPlanFragmenter { ConnectorProperties::extract(source_catalog.with_properties.clone(), false)?; let as_of = batch_iceberg_scan.as_of(); return Ok(Some(SourceScanInfo::new(SourceFetchInfo { + schema: batch_iceberg_scan.base.schema().clone(), connector: property, timebound: (None, None), as_of, @@ -1075,6 +1078,7 @@ impl BatchPlanFragmenter { ConnectorProperties::extract(source_catalog.with_properties.clone(), false)?; let as_of = source_node.as_of(); return Ok(Some(SourceScanInfo::new(SourceFetchInfo { + schema: source_node.base.schema().clone(), connector: property, timebound: (None, None), as_of,