Skip to content

Commit

Permalink
support iceberg scan executor
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Feb 1, 2024
1 parent 6acb999 commit 7888b8f
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 33 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
Expand All @@ -24,6 +26,7 @@ futures-async-stream = { workspace = true }
futures-util = "0.3"
hashbrown = { workspace = true }
hytra = "0.1.2"
icelake = { workspace = true }
itertools = "0.12"
memcomparable = "0.2"
parking_lot = { version = "0.12", features = ["arc_lock"] }
Expand Down
7 changes: 7 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ pub enum BatchError {
DmlError,
),

#[error(transparent)]
Iceberg(
#[from]
#[backtrace]
icelake::Error,
),

// Make the ref-counted type to be a variant for easier code structuring.
// TODO(error-handling): replace with `thiserror_ext::Arc`
#[error(transparent)]
Expand Down
185 changes: 185 additions & 0 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;

use anyhow::anyhow;
use arrow_array::RecordBatch;
use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use icelake::io::{FileScan, TableScan};
use icelake::TableIdentifier;
use risingwave_common::catalog::Schema;
use risingwave_connector::sink::iceberg::IcebergConfig;

use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};

/// Create a iceberg scan executor.
///
/// # Examples
///
/// ```
/// use futures_async_stream::for_await;
/// use risingwave_common::catalog::{Field, Schema};
/// use risingwave_common::types::DataType;
/// use risingwave_connector::sink::iceberg::IcebergConfig;
///
/// use crate::executor::iceberg_scan::{FileSelector, IcebergScanExecutor};
/// use crate::executor::Executor;
///
/// #[tokio::test]
/// async fn test_iceberg_scan() {
/// let iceberg_scan_executor = IcebergScanExecutor {
/// database_name: "demo_db".into(),
/// table_name: "demo_table".into(),
/// file_selector: FileSelector::select_all(),
/// iceberg_config: IcebergConfig {
/// database_name: "demo_db".into(),
/// table_name: "demo_table".into(),
/// catalog_type: Some("storage".into()),
/// path: "s3a://hummock001/".into(),
/// endpoint: Some("http://127.0.0.1:9301".into()),
/// access_key: "hummockadmin".into(),
/// secret_key: "hummockadmin".into(),
/// region: Some("us-east-1".into()),
/// ..Default::default()
/// },
/// snapshot_id: None,
/// batch_size: 1024,
/// schema: Schema::new(vec![
/// Field::with_name(DataType::Int64, "seq_id"),
/// Field::with_name(DataType::Int64, "user_id"),
/// Field::with_name(DataType::Varchar, "user_name"),
/// ]),
/// identity: "iceberg_scan".into(),
/// };
///
/// let stream = Box::new(iceberg_scan_executor).execute();
/// #[for_await]
/// for chunk in stream {
/// let chunk = chunk.unwrap();
/// println!("{:?}", chunk);
/// }
/// }
/// ```
pub struct IcebergScanExecutor {
database_name: String,
table_name: String,
iceberg_config: IcebergConfig,
snapshot_id: Option<i64>,
file_selector: FileSelector,
batch_size: usize,

schema: Schema,
identity: String,
}

impl Executor for IcebergScanExecutor {
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
self.do_execute().boxed()
}
}

impl IcebergScanExecutor {
#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let catalog = self.iceberg_config.create_catalog().await?;

let table_ident = TableIdentifier::new(vec![self.database_name, self.table_name]).unwrap();
let table = catalog
.load_table(&table_ident)
.await
.map_err(BatchError::Iceberg)?;

let table_scan: TableScan = table
.new_scan_builder()
.with_snapshot_id(
self.snapshot_id
.unwrap_or_else(|| table.current_table_metadata().current_snapshot_id.unwrap()),
)
.with_batch_size(self.batch_size)
.with_column_names(self.schema.names())
.build()
.map_err(|e| BatchError::Internal(anyhow!(e)))?;
let file_scan_stream: icelake::io::FileScanStream =
table_scan.scan(&table).await.map_err(BatchError::Iceberg)?;

#[for_await]
for file_scan in file_scan_stream {
let file_scan: FileScan = file_scan.map_err(BatchError::Iceberg)?;
if !self.file_selector.select(file_scan.path()) {
continue;
}
let record_batch_stream = file_scan.scan().await.map_err(BatchError::Iceberg)?;

#[for_await]
for record_batch in record_batch_stream {
let record_batch: RecordBatch = record_batch.map_err(BatchError::Iceberg)?;
let chunk = Self::record_batch_to_chunk(record_batch)?;
debug_assert_eq!(chunk.data_types(), self.schema.data_types());
yield chunk;
}
}
}

fn record_batch_to_chunk(record_batch: RecordBatch) -> Result<DataChunk, BatchError> {
let mut columns = Vec::with_capacity(record_batch.num_columns());
for array in record_batch.columns() {
let column = Arc::new(array.try_into()?);
columns.push(column);
}
Ok(DataChunk::new(columns, record_batch.num_rows()))
}
}

pub enum FileSelector {
// File paths to be scanned by this executor are specified.
FileList(Vec<String>),
// Data files to be scanned by this executor could be calculated by Hash(file_path) % num_tasks == task_id.
// task_id, num_tasks
Hash(usize, usize),
}

impl FileSelector {
fn select_all() -> Self {
FileSelector::Hash(0, 1)
}

fn select(&self, path: &str) -> bool {
match self {
FileSelector::FileList(paths) => paths.contains(&path.to_string()),
FileSelector::Hash(task_id, num_tasks) => {
let hash = Self::hash_str_to_usize(path);
hash % num_tasks == *task_id
}
}
}

fn hash_str_to_usize(s: &str) -> usize {
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish() as usize
}
}
1 change: 1 addition & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod generic_exchange;
mod group_top_n;
mod hash_agg;
mod hop_window;
mod iceberg_scan;
mod insert;
mod join;
mod limit;
Expand Down
5 changes: 1 addition & 4 deletions src/connector/src/sink/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use jni::JavaVM;
use risingwave_jni_core::call_method;
use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM};

use crate::sink::{Result, SinkError};

pub struct JniCatalog {
java_catalog: GlobalRef,
jvm: &'static JavaVM,
Expand Down Expand Up @@ -144,7 +142,7 @@ impl JniCatalog {
name: impl ToString,
catalog_impl: impl ToString,
java_catalog_props: HashMap<String, String>,
) -> Result<CatalogRef> {
) -> anyhow::Result<CatalogRef> {
let jvm = JVM.get_or_init()?;

execute_with_jni_env(jvm, |env| {
Expand Down Expand Up @@ -184,6 +182,5 @@ impl JniCatalog {
config: base_config,
}) as CatalogRef)
})
.map_err(SinkError::Iceberg)
}
}
59 changes: 30 additions & 29 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub const ICEBERG_SINK: &str = "iceberg";

static RW_CATALOG_NAME: &str = "risingwave";

#[derive(Debug, Clone, Deserialize, WithOptions)]
#[derive(Debug, Clone, Deserialize, WithOptions, Default)]
#[serde(deny_unknown_fields)]
pub struct IcebergConfig {
pub connector: String, // Avoid deny unknown field. Must be "iceberg"
Expand Down Expand Up @@ -348,42 +348,43 @@ impl IcebergConfig {

Ok((base_catalog_config, java_catalog_configs))
}
}

async fn create_catalog(config: &IcebergConfig) -> Result<CatalogRef> {
match config.catalog_type() {
"storage" | "rest" => {
let iceberg_configs = config.build_iceberg_configs()?;
let catalog = load_catalog(&iceberg_configs)
.await
.map_err(|e| SinkError::Iceberg(anyhow!(e)))?;
Ok(catalog)
}
catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => {
// Create java catalog
let (base_catalog_config, java_catalog_props) = config.build_jni_catalog_configs()?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"sql" => "org.apache.iceberg.jdbc.JdbcCatalog",
"glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
"dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog",
_ => unreachable!(),
};
pub async fn create_catalog(&self) -> anyhow::Result<CatalogRef> {
match self.catalog_type() {
"storage" | "rest" => {
let iceberg_configs = self.build_iceberg_configs()?;
let catalog = load_catalog(&iceberg_configs)
.await
.map_err(|e| anyhow!(e))?;
Ok(catalog)
}
catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => {
// Create java catalog
let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"sql" => "org.apache.iceberg.jdbc.JdbcCatalog",
"glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
"dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog",
_ => unreachable!(),
};

jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props)
}
"mock" => Ok(Arc::new(MockCatalog{})),
_ => {
Err(SinkError::Iceberg(anyhow!(
jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props)
}
"mock" => Ok(Arc::new(MockCatalog{})),
_ => {
Err(anyhow!(
"Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`",
config.catalog_type()
)))
self.catalog_type()
))
}
}
}
}

pub async fn create_table(config: &IcebergConfig) -> Result<Table> {
let catalog = create_catalog(config)
let catalog = config
.create_catalog()
.await
.map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?;

Expand Down

0 comments on commit 7888b8f

Please sign in to comment.