From eeed1e9063080a41072270bc3387ada07643cc52 Mon Sep 17 00:00:00 2001 From: Dylan Date: Sun, 4 Feb 2024 12:02:23 +0800 Subject: [PATCH] feat(batch): support iceberg scan executor (#14915) --- Cargo.lock | 3 + src/batch/Cargo.toml | 3 + src/batch/src/error.rs | 7 + src/batch/src/executor/iceberg_scan.rs | 194 ++++++++++++++++++ src/batch/src/executor/mod.rs | 2 + src/connector/src/sink/iceberg/jni_catalog.rs | 5 +- src/connector/src/sink/iceberg/mod.rs | 91 ++++---- src/frontend/src/handler/create_sink.rs | 4 +- 8 files changed, 258 insertions(+), 51 deletions(-) create mode 100644 src/batch/src/executor/iceberg_scan.rs diff --git a/Cargo.lock b/Cargo.lock index 98bcbbe535af..6e0110161774 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8574,6 +8574,8 @@ name = "risingwave_batch" version = "1.7.0-alpha" dependencies = [ "anyhow", + "arrow-array 50.0.0", + "arrow-schema 50.0.0", "assert_matches", "async-recursion", "async-trait", @@ -8584,6 +8586,7 @@ dependencies = [ "futures-util", "hashbrown 0.14.0", "hytra", + "icelake", "itertools 0.12.0", "madsim-tokio", "madsim-tonic", diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 053a133bc233..c1fcc23ca348 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -15,6 +15,8 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" +arrow-array = { workspace = true } +arrow-schema = { workspace = true } assert_matches = "1" async-recursion = "1" async-trait = "0.1" @@ -24,6 +26,7 @@ futures-async-stream = { workspace = true } futures-util = "0.3" hashbrown = { workspace = true } hytra = "0.1.2" +icelake = { workspace = true } itertools = "0.12" memcomparable = "0.2" parking_lot = { version = "0.12", features = ["arc_lock"] } diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index 11614386cd0b..1cbfc7247827 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -109,6 +109,13 @@ pub enum BatchError { DmlError, ), + #[error(transparent)] + Iceberg( + #[from] + #[backtrace] + icelake::Error, + ), + // Make the ref-counted type to be a variant for easier code structuring. // TODO(error-handling): replace with `thiserror_ext::Arc` #[error(transparent)] diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs new file mode 100644 index 000000000000..caf1289220d4 --- /dev/null +++ b/src/batch/src/executor/iceberg_scan.rs @@ -0,0 +1,194 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::{DefaultHasher, Hash, Hasher}; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow_array::RecordBatch; +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use icelake::io::{FileScan, TableScan}; +use risingwave_common::catalog::Schema; +use risingwave_connector::sink::iceberg::IcebergConfig; + +use crate::error::BatchError; +use crate::executor::{DataChunk, Executor}; + +/// Create a iceberg scan executor. +/// +/// # Examples +/// +/// ``` +/// use futures_async_stream::for_await; +/// use risingwave_batch::executor::{Executor, FileSelector, IcebergScanExecutor}; +/// use risingwave_common::catalog::{Field, Schema}; +/// use risingwave_common::types::DataType; +/// use risingwave_connector::sink::iceberg::IcebergConfig; +/// +/// #[tokio::test] +/// async fn test_iceberg_scan() { +/// let iceberg_scan_executor = IcebergScanExecutor::new( +/// IcebergConfig { +/// database_name: "demo_db".into(), +/// table_name: "demo_table".into(), +/// catalog_type: Some("storage".into()), +/// path: "s3a://hummock001/".into(), +/// endpoint: Some("http://127.0.0.1:9301".into()), +/// access_key: "hummockadmin".into(), +/// secret_key: "hummockadmin".into(), +/// region: Some("us-east-1".into()), +/// ..Default::default() +/// }, +/// None, +/// FileSelector::select_all(), +/// 1024, +/// Schema::new(vec![ +/// Field::with_name(DataType::Int64, "seq_id"), +/// Field::with_name(DataType::Int64, "user_id"), +/// Field::with_name(DataType::Varchar, "user_name"), +/// ]), +/// "iceberg_scan".into(), +/// ); +/// +/// let stream = Box::new(iceberg_scan_executor).execute(); +/// #[for_await] +/// for chunk in stream { +/// let chunk = chunk.unwrap(); +/// println!("{:?}", chunk); +/// } +/// } +/// ``` + +pub struct IcebergScanExecutor { + iceberg_config: IcebergConfig, + snapshot_id: Option, + file_selector: FileSelector, + batch_size: usize, + + schema: Schema, + identity: String, +} + +impl Executor for IcebergScanExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl IcebergScanExecutor { + pub fn new( + iceberg_config: IcebergConfig, + snapshot_id: Option, + file_selector: FileSelector, + batch_size: usize, + schema: Schema, + identity: String, + ) -> Self { + Self { + iceberg_config, + snapshot_id, + file_selector, + batch_size, + schema, + identity, + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + let table = self + .iceberg_config + .load_table() + .await + .map_err(BatchError::Internal)?; + + let table_scan: TableScan = table + .new_scan_builder() + .with_snapshot_id( + self.snapshot_id + .unwrap_or_else(|| table.current_table_metadata().current_snapshot_id.unwrap()), + ) + .with_batch_size(self.batch_size) + .with_column_names(self.schema.names()) + .build() + .map_err(|e| BatchError::Internal(anyhow!(e)))?; + let file_scan_stream: icelake::io::FileScanStream = + table_scan.scan(&table).await.map_err(BatchError::Iceberg)?; + + #[for_await] + for file_scan in file_scan_stream { + let file_scan: FileScan = file_scan.map_err(BatchError::Iceberg)?; + if !self.file_selector.select(file_scan.path()) { + continue; + } + let record_batch_stream = file_scan.scan().await.map_err(BatchError::Iceberg)?; + + #[for_await] + for record_batch in record_batch_stream { + let record_batch: RecordBatch = record_batch.map_err(BatchError::Iceberg)?; + let chunk = Self::record_batch_to_chunk(record_batch)?; + debug_assert_eq!(chunk.data_types(), self.schema.data_types()); + yield chunk; + } + } + } + + fn record_batch_to_chunk(record_batch: RecordBatch) -> Result { + let mut columns = Vec::with_capacity(record_batch.num_columns()); + for array in record_batch.columns() { + let column = Arc::new(array.try_into()?); + columns.push(column); + } + Ok(DataChunk::new(columns, record_batch.num_rows())) + } +} + +pub enum FileSelector { + // File paths to be scanned by this executor are specified. + FileList(Vec), + // Data files to be scanned by this executor could be calculated by Hash(file_path) % num_tasks == task_id. + // task_id, num_tasks + Hash(usize, usize), +} + +impl FileSelector { + pub fn select_all() -> Self { + FileSelector::Hash(0, 1) + } + + pub fn select(&self, path: &str) -> bool { + match self { + FileSelector::FileList(paths) => paths.contains(&path.to_string()), + FileSelector::Hash(task_id, num_tasks) => { + let hash = Self::hash_str_to_usize(path); + hash % num_tasks == *task_id + } + } + } + + pub fn hash_str_to_usize(s: &str) -> usize { + let mut hasher = DefaultHasher::new(); + s.hash(&mut hasher); + hasher.finish() as usize + } +} diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 3b0298643484..ded2af708982 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -20,6 +20,7 @@ mod generic_exchange; mod group_top_n; mod hash_agg; mod hop_window; +mod iceberg_scan; mod insert; mod join; mod limit; @@ -52,6 +53,7 @@ pub use generic_exchange::*; pub use group_top_n::*; pub use hash_agg::*; pub use hop_window::*; +pub use iceberg_scan::*; pub use insert::*; pub use join::*; pub use limit::*; diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/sink/iceberg/jni_catalog.rs index 0057257df996..11de50e69936 100644 --- a/src/connector/src/sink/iceberg/jni_catalog.rs +++ b/src/connector/src/sink/iceberg/jni_catalog.rs @@ -29,8 +29,6 @@ use jni::JavaVM; use risingwave_jni_core::call_method; use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM}; -use crate::sink::{Result, SinkError}; - pub struct JniCatalog { java_catalog: GlobalRef, jvm: &'static JavaVM, @@ -144,7 +142,7 @@ impl JniCatalog { name: impl ToString, catalog_impl: impl ToString, java_catalog_props: HashMap, - ) -> Result { + ) -> anyhow::Result { let jvm = JVM.get_or_init()?; execute_with_jni_env(jvm, |env| { @@ -184,6 +182,5 @@ impl JniCatalog { config: base_config, }) as CatalogRef) }) - .map_err(SinkError::Iceberg) } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 9e51557ef477..0e4f28f64639 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -67,7 +67,7 @@ pub const ICEBERG_SINK: &str = "iceberg"; static RW_CATALOG_NAME: &str = "risingwave"; -#[derive(Debug, Clone, Deserialize, WithOptions)] +#[derive(Debug, Clone, Deserialize, WithOptions, Default)] #[serde(deny_unknown_fields)] pub struct IcebergConfig { pub connector: String, // Avoid deny unknown field. Must be "iceberg" @@ -348,56 +348,57 @@ impl IcebergConfig { Ok((base_catalog_config, java_catalog_configs)) } -} -async fn create_catalog(config: &IcebergConfig) -> Result { - match config.catalog_type() { - "storage" | "rest" => { - let iceberg_configs = config.build_iceberg_configs()?; - let catalog = load_catalog(&iceberg_configs) - .await - .map_err(|e| SinkError::Iceberg(anyhow!(e)))?; - Ok(catalog) - } - catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { - // Create java catalog - let (base_catalog_config, java_catalog_props) = config.build_jni_catalog_configs()?; - let catalog_impl = match catalog_type { - "hive" => "org.apache.iceberg.hive.HiveCatalog", - "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", - "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", - "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", - _ => unreachable!(), - }; + async fn create_catalog(&self) -> anyhow::Result { + match self.catalog_type() { + "storage" | "rest" => { + let iceberg_configs = self.build_iceberg_configs()?; + let catalog = load_catalog(&iceberg_configs) + .await + .map_err(|e| anyhow!(e))?; + Ok(catalog) + } + catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { + // Create java catalog + let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?; + let catalog_impl = match catalog_type { + "hive" => "org.apache.iceberg.hive.HiveCatalog", + "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", + "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", + "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", + _ => unreachable!(), + }; - jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) - } - "mock" => Ok(Arc::new(MockCatalog{})), - _ => { - Err(SinkError::Iceberg(anyhow!( + jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) + } + "mock" => Ok(Arc::new(MockCatalog{})), + _ => { + Err(anyhow!( "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`", - config.catalog_type() - ))) + self.catalog_type() + )) + } } } -} -pub async fn create_table(config: &IcebergConfig) -> Result { - let catalog = create_catalog(config) - .await - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; + pub async fn load_table(&self) -> anyhow::Result
{ + let catalog = self + .create_catalog() + .await + .map_err(|e| anyhow!("Unable to load iceberg catalog: {e}"))?; - let table_id = TableIdentifier::new( - vec![config.database_name.as_str()] - .into_iter() - .chain(config.table_name.split('.')), - ) - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?; - - catalog - .load_table(&table_id) - .await - .map_err(|err| SinkError::Iceberg(anyhow!(err))) + let table_id = TableIdentifier::new( + vec![self.database_name.as_str()] + .into_iter() + .chain(self.table_name.split('.')), + ) + .map_err(|e| anyhow!("Unable to parse table name: {e}"))?; + + catalog + .load_table(&table_id) + .await + .map_err(|err| anyhow!(err)) + } } pub struct IcebergSink { @@ -426,7 +427,7 @@ impl Debug for IcebergSink { impl IcebergSink { async fn create_and_validate_table(&self) -> Result
{ - let table = create_table(&self.config).await?; + let table = self.config.load_table().await.map_err(SinkError::Iceberg)?; let sink_schema = self.param.schema(); let iceberg_schema = table diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 2ccd32a4c006..e2e9388e8186 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -28,7 +28,7 @@ use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_common::{bail, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; -use risingwave_connector::sink::iceberg::{create_table, IcebergConfig, ICEBERG_SINK}; +use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; @@ -321,7 +321,7 @@ pub async fn get_partition_compute_info( async fn get_partition_compute_info_for_iceberg( iceberg_config: &IcebergConfig, ) -> Result> { - let table = create_table(iceberg_config).await?; + let table = iceberg_config.load_table().await?; let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() else { return Ok(None); };