Skip to content

Commit

Permalink
feat(batch): support iceberg scan executor (#14915)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Feb 4, 2024
1 parent 90aa90a commit eeed1e9
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 51 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
Expand All @@ -24,6 +26,7 @@ futures-async-stream = { workspace = true }
futures-util = "0.3"
hashbrown = { workspace = true }
hytra = "0.1.2"
icelake = { workspace = true }
itertools = "0.12"
memcomparable = "0.2"
parking_lot = { version = "0.12", features = ["arc_lock"] }
Expand Down
7 changes: 7 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ pub enum BatchError {
DmlError,
),

#[error(transparent)]
Iceberg(
#[from]
#[backtrace]
icelake::Error,
),

// Make the ref-counted type to be a variant for easier code structuring.
// TODO(error-handling): replace with `thiserror_ext::Arc`
#[error(transparent)]
Expand Down
194 changes: 194 additions & 0 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;

use anyhow::anyhow;
use arrow_array::RecordBatch;
use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use icelake::io::{FileScan, TableScan};
use risingwave_common::catalog::Schema;
use risingwave_connector::sink::iceberg::IcebergConfig;

use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};

/// Create a iceberg scan executor.
///
/// # Examples
///
/// ```
/// use futures_async_stream::for_await;
/// use risingwave_batch::executor::{Executor, FileSelector, IcebergScanExecutor};
/// use risingwave_common::catalog::{Field, Schema};
/// use risingwave_common::types::DataType;
/// use risingwave_connector::sink::iceberg::IcebergConfig;
///
/// #[tokio::test]
/// async fn test_iceberg_scan() {
/// let iceberg_scan_executor = IcebergScanExecutor::new(
/// IcebergConfig {
/// database_name: "demo_db".into(),
/// table_name: "demo_table".into(),
/// catalog_type: Some("storage".into()),
/// path: "s3a://hummock001/".into(),
/// endpoint: Some("http://127.0.0.1:9301".into()),
/// access_key: "hummockadmin".into(),
/// secret_key: "hummockadmin".into(),
/// region: Some("us-east-1".into()),
/// ..Default::default()
/// },
/// None,
/// FileSelector::select_all(),
/// 1024,
/// Schema::new(vec![
/// Field::with_name(DataType::Int64, "seq_id"),
/// Field::with_name(DataType::Int64, "user_id"),
/// Field::with_name(DataType::Varchar, "user_name"),
/// ]),
/// "iceberg_scan".into(),
/// );
///
/// let stream = Box::new(iceberg_scan_executor).execute();
/// #[for_await]
/// for chunk in stream {
/// let chunk = chunk.unwrap();
/// println!("{:?}", chunk);
/// }
/// }
/// ```
pub struct IcebergScanExecutor {
iceberg_config: IcebergConfig,
snapshot_id: Option<i64>,
file_selector: FileSelector,
batch_size: usize,

schema: Schema,
identity: String,
}

impl Executor for IcebergScanExecutor {
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
self.do_execute().boxed()
}
}

impl IcebergScanExecutor {
pub fn new(
iceberg_config: IcebergConfig,
snapshot_id: Option<i64>,
file_selector: FileSelector,
batch_size: usize,
schema: Schema,
identity: String,
) -> Self {
Self {
iceberg_config,
snapshot_id,
file_selector,
batch_size,
schema,
identity,
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let table = self
.iceberg_config
.load_table()
.await
.map_err(BatchError::Internal)?;

let table_scan: TableScan = table
.new_scan_builder()
.with_snapshot_id(
self.snapshot_id
.unwrap_or_else(|| table.current_table_metadata().current_snapshot_id.unwrap()),
)
.with_batch_size(self.batch_size)
.with_column_names(self.schema.names())
.build()
.map_err(|e| BatchError::Internal(anyhow!(e)))?;
let file_scan_stream: icelake::io::FileScanStream =
table_scan.scan(&table).await.map_err(BatchError::Iceberg)?;

#[for_await]
for file_scan in file_scan_stream {
let file_scan: FileScan = file_scan.map_err(BatchError::Iceberg)?;
if !self.file_selector.select(file_scan.path()) {
continue;
}
let record_batch_stream = file_scan.scan().await.map_err(BatchError::Iceberg)?;

#[for_await]
for record_batch in record_batch_stream {
let record_batch: RecordBatch = record_batch.map_err(BatchError::Iceberg)?;
let chunk = Self::record_batch_to_chunk(record_batch)?;
debug_assert_eq!(chunk.data_types(), self.schema.data_types());
yield chunk;
}
}
}

fn record_batch_to_chunk(record_batch: RecordBatch) -> Result<DataChunk, BatchError> {
let mut columns = Vec::with_capacity(record_batch.num_columns());
for array in record_batch.columns() {
let column = Arc::new(array.try_into()?);
columns.push(column);
}
Ok(DataChunk::new(columns, record_batch.num_rows()))
}
}

pub enum FileSelector {
// File paths to be scanned by this executor are specified.
FileList(Vec<String>),
// Data files to be scanned by this executor could be calculated by Hash(file_path) % num_tasks == task_id.
// task_id, num_tasks
Hash(usize, usize),
}

impl FileSelector {
pub fn select_all() -> Self {
FileSelector::Hash(0, 1)
}

pub fn select(&self, path: &str) -> bool {
match self {
FileSelector::FileList(paths) => paths.contains(&path.to_string()),
FileSelector::Hash(task_id, num_tasks) => {
let hash = Self::hash_str_to_usize(path);
hash % num_tasks == *task_id
}
}
}

pub fn hash_str_to_usize(s: &str) -> usize {
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish() as usize
}
}
2 changes: 2 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod generic_exchange;
mod group_top_n;
mod hash_agg;
mod hop_window;
mod iceberg_scan;
mod insert;
mod join;
mod limit;
Expand Down Expand Up @@ -52,6 +53,7 @@ pub use generic_exchange::*;
pub use group_top_n::*;
pub use hash_agg::*;
pub use hop_window::*;
pub use iceberg_scan::*;
pub use insert::*;
pub use join::*;
pub use limit::*;
Expand Down
5 changes: 1 addition & 4 deletions src/connector/src/sink/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use jni::JavaVM;
use risingwave_jni_core::call_method;
use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM};

use crate::sink::{Result, SinkError};

pub struct JniCatalog {
java_catalog: GlobalRef,
jvm: &'static JavaVM,
Expand Down Expand Up @@ -144,7 +142,7 @@ impl JniCatalog {
name: impl ToString,
catalog_impl: impl ToString,
java_catalog_props: HashMap<String, String>,
) -> Result<CatalogRef> {
) -> anyhow::Result<CatalogRef> {
let jvm = JVM.get_or_init()?;

execute_with_jni_env(jvm, |env| {
Expand Down Expand Up @@ -184,6 +182,5 @@ impl JniCatalog {
config: base_config,
}) as CatalogRef)
})
.map_err(SinkError::Iceberg)
}
}
Loading

0 comments on commit eeed1e9

Please sign in to comment.