diff --git a/Cargo.lock b/Cargo.lock index 7e6f91306c3aa..2f84d4a83e400 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8574,6 +8574,8 @@ name = "risingwave_batch" version = "1.7.0-alpha" dependencies = [ "anyhow", + "arrow-array 50.0.0", + "arrow-schema 50.0.0", "assert_matches", "async-recursion", "async-trait", @@ -8584,6 +8586,7 @@ dependencies = [ "futures-util", "hashbrown 0.14.0", "hytra", + "icelake", "itertools 0.12.0", "madsim-tokio", "madsim-tonic", diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 053a133bc2337..c1fcc23ca3484 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -15,6 +15,8 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" +arrow-array = { workspace = true } +arrow-schema = { workspace = true } assert_matches = "1" async-recursion = "1" async-trait = "0.1" @@ -24,6 +26,7 @@ futures-async-stream = { workspace = true } futures-util = "0.3" hashbrown = { workspace = true } hytra = "0.1.2" +icelake = { workspace = true } itertools = "0.12" memcomparable = "0.2" parking_lot = { version = "0.12", features = ["arc_lock"] } diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index 11614386cd0b5..1cbfc7247827c 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -109,6 +109,13 @@ pub enum BatchError { DmlError, ), + #[error(transparent)] + Iceberg( + #[from] + #[backtrace] + icelake::Error, + ), + // Make the ref-counted type to be a variant for easier code structuring. // TODO(error-handling): replace with `thiserror_ext::Arc` #[error(transparent)] diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs new file mode 100644 index 0000000000000..20dc9df97c707 --- /dev/null +++ b/src/batch/src/executor/iceberg_scan.rs @@ -0,0 +1,185 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::{DefaultHasher, Hash, Hasher}; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow_array::RecordBatch; +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use icelake::io::{FileScan, TableScan}; +use icelake::TableIdentifier; +use risingwave_common::catalog::Schema; +use risingwave_connector::sink::iceberg::IcebergConfig; + +use crate::error::BatchError; +use crate::executor::{DataChunk, Executor}; + +/// Create a iceberg scan executor. +/// +/// # Examples +/// +/// ``` +/// use futures_async_stream::for_await; +/// use risingwave_common::catalog::{Field, Schema}; +/// use risingwave_common::types::DataType; +/// use risingwave_connector::sink::iceberg::IcebergConfig; +/// +/// use crate::executor::iceberg_scan::{FileSelector, IcebergScanExecutor}; +/// use crate::executor::Executor; +/// +/// #[tokio::test] +/// async fn test_iceberg_scan() { +/// let iceberg_scan_executor = IcebergScanExecutor { +/// database_name: "demo_db".into(), +/// table_name: "demo_table".into(), +/// file_selector: FileSelector::select_all(), +/// iceberg_config: IcebergConfig { +/// database_name: "demo_db".into(), +/// table_name: "demo_table".into(), +/// catalog_type: Some("storage".into()), +/// path: "s3a://hummock001/".into(), +/// endpoint: Some("http://127.0.0.1:9301".into()), +/// access_key: "hummockadmin".into(), +/// secret_key: "hummockadmin".into(), +/// region: Some("us-east-1".into()), +/// ..Default::default() +/// }, +/// snapshot_id: None, +/// batch_size: 1024, +/// schema: Schema::new(vec![ +/// Field::with_name(DataType::Int64, "seq_id"), +/// Field::with_name(DataType::Int64, "user_id"), +/// Field::with_name(DataType::Varchar, "user_name"), +/// ]), +/// identity: "iceberg_scan".into(), +/// }; +/// +/// let stream = Box::new(iceberg_scan_executor).execute(); +/// #[for_await] +/// for chunk in stream { +/// let chunk = chunk.unwrap(); +/// println!("{:?}", chunk); +/// } +/// } +/// ``` + +pub struct IcebergScanExecutor { + database_name: String, + table_name: String, + iceberg_config: IcebergConfig, + snapshot_id: Option, + file_selector: FileSelector, + batch_size: usize, + + schema: Schema, + identity: String, +} + +impl Executor for IcebergScanExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl IcebergScanExecutor { + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + let catalog = self.iceberg_config.create_catalog().await?; + + let table_ident = TableIdentifier::new(vec![self.database_name, self.table_name]).unwrap(); + let table = catalog + .load_table(&table_ident) + .await + .map_err(BatchError::Iceberg)?; + + let table_scan: TableScan = table + .new_scan_builder() + .with_snapshot_id( + self.snapshot_id + .unwrap_or_else(|| table.current_table_metadata().current_snapshot_id.unwrap()), + ) + .with_batch_size(self.batch_size) + .with_column_names(self.schema.names()) + .build() + .map_err(|e| BatchError::Internal(anyhow!(e)))?; + let file_scan_stream: icelake::io::FileScanStream = + table_scan.scan(&table).await.map_err(BatchError::Iceberg)?; + + #[for_await] + for file_scan in file_scan_stream { + let file_scan: FileScan = file_scan.map_err(BatchError::Iceberg)?; + if !self.file_selector.select(file_scan.path()) { + continue; + } + let record_batch_stream = file_scan.scan().await.map_err(BatchError::Iceberg)?; + + #[for_await] + for record_batch in record_batch_stream { + let record_batch: RecordBatch = record_batch.map_err(BatchError::Iceberg)?; + let chunk = Self::record_batch_to_chunk(record_batch)?; + debug_assert_eq!(chunk.data_types(), self.schema.data_types()); + yield chunk; + } + } + } + + fn record_batch_to_chunk(record_batch: RecordBatch) -> Result { + let mut columns = Vec::with_capacity(record_batch.num_columns()); + for array in record_batch.columns() { + let column = Arc::new(array.try_into()?); + columns.push(column); + } + Ok(DataChunk::new(columns, record_batch.num_rows())) + } +} + +pub enum FileSelector { + // File paths to be scanned by this executor are specified. + FileList(Vec), + // Data files to be scanned by this executor could be calculated by Hash(file_path) % num_tasks == task_id. + // task_id, num_tasks + Hash(usize, usize), +} + +impl FileSelector { + fn select_all() -> Self { + FileSelector::Hash(0, 1) + } + + fn select(&self, path: &str) -> bool { + match self { + FileSelector::FileList(paths) => paths.contains(&path.to_string()), + FileSelector::Hash(task_id, num_tasks) => { + let hash = Self::hash_str_to_usize(path); + hash % num_tasks == *task_id + } + } + } + + fn hash_str_to_usize(s: &str) -> usize { + let mut hasher = DefaultHasher::new(); + s.hash(&mut hasher); + hasher.finish() as usize + } +} diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 3b0298643484a..b6344ab9600c0 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -20,6 +20,7 @@ mod generic_exchange; mod group_top_n; mod hash_agg; mod hop_window; +mod iceberg_scan; mod insert; mod join; mod limit; diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/sink/iceberg/jni_catalog.rs index 0057257df996a..11de50e69936a 100644 --- a/src/connector/src/sink/iceberg/jni_catalog.rs +++ b/src/connector/src/sink/iceberg/jni_catalog.rs @@ -29,8 +29,6 @@ use jni::JavaVM; use risingwave_jni_core::call_method; use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM}; -use crate::sink::{Result, SinkError}; - pub struct JniCatalog { java_catalog: GlobalRef, jvm: &'static JavaVM, @@ -144,7 +142,7 @@ impl JniCatalog { name: impl ToString, catalog_impl: impl ToString, java_catalog_props: HashMap, - ) -> Result { + ) -> anyhow::Result { let jvm = JVM.get_or_init()?; execute_with_jni_env(jvm, |env| { @@ -184,6 +182,5 @@ impl JniCatalog { config: base_config, }) as CatalogRef) }) - .map_err(SinkError::Iceberg) } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 9e51557ef477a..25b9ecf5d2dc8 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -67,7 +67,7 @@ pub const ICEBERG_SINK: &str = "iceberg"; static RW_CATALOG_NAME: &str = "risingwave"; -#[derive(Debug, Clone, Deserialize, WithOptions)] +#[derive(Debug, Clone, Deserialize, WithOptions, Default)] #[serde(deny_unknown_fields)] pub struct IcebergConfig { pub connector: String, // Avoid deny unknown field. Must be "iceberg" @@ -348,42 +348,43 @@ impl IcebergConfig { Ok((base_catalog_config, java_catalog_configs)) } -} -async fn create_catalog(config: &IcebergConfig) -> Result { - match config.catalog_type() { - "storage" | "rest" => { - let iceberg_configs = config.build_iceberg_configs()?; - let catalog = load_catalog(&iceberg_configs) - .await - .map_err(|e| SinkError::Iceberg(anyhow!(e)))?; - Ok(catalog) - } - catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { - // Create java catalog - let (base_catalog_config, java_catalog_props) = config.build_jni_catalog_configs()?; - let catalog_impl = match catalog_type { - "hive" => "org.apache.iceberg.hive.HiveCatalog", - "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", - "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", - "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", - _ => unreachable!(), - }; + pub async fn create_catalog(&self) -> anyhow::Result { + match self.catalog_type() { + "storage" | "rest" => { + let iceberg_configs = self.build_iceberg_configs()?; + let catalog = load_catalog(&iceberg_configs) + .await + .map_err(|e| anyhow!(e))?; + Ok(catalog) + } + catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { + // Create java catalog + let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?; + let catalog_impl = match catalog_type { + "hive" => "org.apache.iceberg.hive.HiveCatalog", + "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", + "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", + "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", + _ => unreachable!(), + }; - jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) - } - "mock" => Ok(Arc::new(MockCatalog{})), - _ => { - Err(SinkError::Iceberg(anyhow!( + jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) + } + "mock" => Ok(Arc::new(MockCatalog{})), + _ => { + Err(anyhow!( "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`", - config.catalog_type() - ))) + self.catalog_type() + )) + } } } } pub async fn create_table(config: &IcebergConfig) -> Result { - let catalog = create_catalog(config) + let catalog = config + .create_catalog() .await .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?;