Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support iceberg scan executor #14915

Merged
merged 6 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
Expand All @@ -24,6 +26,7 @@ futures-async-stream = { workspace = true }
futures-util = "0.3"
hashbrown = { workspace = true }
hytra = "0.1.2"
icelake = { workspace = true }
itertools = "0.12"
memcomparable = "0.2"
parking_lot = { version = "0.12", features = ["arc_lock"] }
Expand Down
7 changes: 7 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ pub enum BatchError {
DmlError,
),

#[error(transparent)]
Iceberg(
#[from]
#[backtrace]
icelake::Error,
),

// Make the ref-counted type to be a variant for easier code structuring.
// TODO(error-handling): replace with `thiserror_ext::Arc`
#[error(transparent)]
Expand Down
194 changes: 194 additions & 0 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;

use anyhow::anyhow;
use arrow_array::RecordBatch;
use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use icelake::io::{FileScan, TableScan};
use risingwave_common::catalog::Schema;
use risingwave_connector::sink::iceberg::IcebergConfig;

use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};

/// Create a iceberg scan executor.
///
/// # Examples
///
/// ```
/// use futures_async_stream::for_await;
/// use risingwave_batch::executor::{Executor, FileSelector, IcebergScanExecutor};
/// use risingwave_common::catalog::{Field, Schema};
/// use risingwave_common::types::DataType;
/// use risingwave_connector::sink::iceberg::IcebergConfig;
///
/// #[tokio::test]
/// async fn test_iceberg_scan() {
/// let iceberg_scan_executor = IcebergScanExecutor::new(
/// IcebergConfig {
/// database_name: "demo_db".into(),
/// table_name: "demo_table".into(),
/// catalog_type: Some("storage".into()),
/// path: "s3a://hummock001/".into(),
/// endpoint: Some("http://127.0.0.1:9301".into()),
/// access_key: "hummockadmin".into(),
/// secret_key: "hummockadmin".into(),
/// region: Some("us-east-1".into()),
/// ..Default::default()
/// },
/// None,
/// FileSelector::select_all(),
/// 1024,
/// Schema::new(vec![
/// Field::with_name(DataType::Int64, "seq_id"),
/// Field::with_name(DataType::Int64, "user_id"),
/// Field::with_name(DataType::Varchar, "user_name"),
/// ]),
/// "iceberg_scan".into(),
/// );
///
/// let stream = Box::new(iceberg_scan_executor).execute();
/// #[for_await]
/// for chunk in stream {
/// let chunk = chunk.unwrap();
/// println!("{:?}", chunk);
/// }
/// }
/// ```

pub struct IcebergScanExecutor {
iceberg_config: IcebergConfig,
snapshot_id: Option<i64>,
file_selector: FileSelector,
batch_size: usize,

schema: Schema,
identity: String,
}

impl Executor for IcebergScanExecutor {
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
self.do_execute().boxed()
}
}

impl IcebergScanExecutor {
pub fn new(
iceberg_config: IcebergConfig,
snapshot_id: Option<i64>,
file_selector: FileSelector,
batch_size: usize,
schema: Schema,
identity: String,
) -> Self {
Self {
iceberg_config,
snapshot_id,
file_selector,
batch_size,
schema,
identity,
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let table = self
.iceberg_config
.load_table()
.await
.map_err(BatchError::Internal)?;

let table_scan: TableScan = table
.new_scan_builder()
.with_snapshot_id(
self.snapshot_id
.unwrap_or_else(|| table.current_table_metadata().current_snapshot_id.unwrap()),
)
.with_batch_size(self.batch_size)
.with_column_names(self.schema.names())
.build()
.map_err(|e| BatchError::Internal(anyhow!(e)))?;
let file_scan_stream: icelake::io::FileScanStream =
table_scan.scan(&table).await.map_err(BatchError::Iceberg)?;

#[for_await]
for file_scan in file_scan_stream {
let file_scan: FileScan = file_scan.map_err(BatchError::Iceberg)?;
if !self.file_selector.select(file_scan.path()) {
continue;
}
let record_batch_stream = file_scan.scan().await.map_err(BatchError::Iceberg)?;

#[for_await]
for record_batch in record_batch_stream {
let record_batch: RecordBatch = record_batch.map_err(BatchError::Iceberg)?;
let chunk = Self::record_batch_to_chunk(record_batch)?;
debug_assert_eq!(chunk.data_types(), self.schema.data_types());
yield chunk;
}
}
}

fn record_batch_to_chunk(record_batch: RecordBatch) -> Result<DataChunk, BatchError> {
let mut columns = Vec::with_capacity(record_batch.num_columns());
for array in record_batch.columns() {
let column = Arc::new(array.try_into()?);
columns.push(column);
}
Ok(DataChunk::new(columns, record_batch.num_rows()))
}
}

pub enum FileSelector {
// File paths to be scanned by this executor are specified.
FileList(Vec<String>),
// Data files to be scanned by this executor could be calculated by Hash(file_path) % num_tasks == task_id.
// task_id, num_tasks
Hash(usize, usize),
}

impl FileSelector {
pub fn select_all() -> Self {
FileSelector::Hash(0, 1)
}

pub fn select(&self, path: &str) -> bool {
match self {
FileSelector::FileList(paths) => paths.contains(&path.to_string()),
FileSelector::Hash(task_id, num_tasks) => {
let hash = Self::hash_str_to_usize(path);
hash % num_tasks == *task_id
}
}
}

pub fn hash_str_to_usize(s: &str) -> usize {
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish() as usize
}
}
2 changes: 2 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod generic_exchange;
mod group_top_n;
mod hash_agg;
mod hop_window;
mod iceberg_scan;
mod insert;
mod join;
mod limit;
Expand Down Expand Up @@ -52,6 +53,7 @@ pub use generic_exchange::*;
pub use group_top_n::*;
pub use hash_agg::*;
pub use hop_window::*;
pub use iceberg_scan::*;
pub use insert::*;
pub use join::*;
pub use limit::*;
Expand Down
5 changes: 1 addition & 4 deletions src/connector/src/sink/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use jni::JavaVM;
use risingwave_jni_core::call_method;
use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM};

use crate::sink::{Result, SinkError};

pub struct JniCatalog {
java_catalog: GlobalRef,
jvm: &'static JavaVM,
Expand Down Expand Up @@ -144,7 +142,7 @@ impl JniCatalog {
name: impl ToString,
catalog_impl: impl ToString,
java_catalog_props: HashMap<String, String>,
) -> Result<CatalogRef> {
) -> anyhow::Result<CatalogRef> {
let jvm = JVM.get_or_init()?;

execute_with_jni_env(jvm, |env| {
Expand Down Expand Up @@ -184,6 +182,5 @@ impl JniCatalog {
config: base_config,
}) as CatalogRef)
})
.map_err(SinkError::Iceberg)
}
}
Loading
Loading