Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support iceberg scan executor #14915

Merged
merged 6 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
Expand All @@ -24,6 +26,7 @@ futures-async-stream = { workspace = true }
futures-util = "0.3"
hashbrown = { workspace = true }
hytra = "0.1.2"
icelake = { workspace = true }
itertools = "0.12"
memcomparable = "0.2"
parking_lot = { version = "0.12", features = ["arc_lock"] }
Expand Down
7 changes: 7 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ pub enum BatchError {
DmlError,
),

#[error(transparent)]
Iceberg(
#[from]
#[backtrace]
icelake::Error,
),

// Make the ref-counted type to be a variant for easier code structuring.
// TODO(error-handling): replace with `thiserror_ext::Arc`
#[error(transparent)]
Expand Down
185 changes: 185 additions & 0 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;

use anyhow::anyhow;
use arrow_array::RecordBatch;
use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use icelake::io::{FileScan, TableScan};
use icelake::TableIdentifier;
use risingwave_common::catalog::Schema;
use risingwave_connector::sink::iceberg::IcebergConfig;

use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};

/// Create a iceberg scan executor.
///
/// # Examples
///
/// ```
/// use futures_async_stream::for_await;
/// use risingwave_common::catalog::{Field, Schema};
/// use risingwave_common::types::DataType;
/// use risingwave_connector::sink::iceberg::IcebergConfig;
///
/// use crate::executor::iceberg_scan::{FileSelector, IcebergScanExecutor};
/// use crate::executor::Executor;
///
/// #[tokio::test]
/// async fn test_iceberg_scan() {
/// let iceberg_scan_executor = IcebergScanExecutor {
/// database_name: "demo_db".into(),
/// table_name: "demo_table".into(),
/// file_selector: FileSelector::select_all(),
/// iceberg_config: IcebergConfig {
/// database_name: "demo_db".into(),
/// table_name: "demo_table".into(),
/// catalog_type: Some("storage".into()),
/// path: "s3a://hummock001/".into(),
/// endpoint: Some("http://127.0.0.1:9301".into()),
/// access_key: "hummockadmin".into(),
/// secret_key: "hummockadmin".into(),
/// region: Some("us-east-1".into()),
/// ..Default::default()
/// },
/// snapshot_id: None,
/// batch_size: 1024,
/// schema: Schema::new(vec![
/// Field::with_name(DataType::Int64, "seq_id"),
/// Field::with_name(DataType::Int64, "user_id"),
/// Field::with_name(DataType::Varchar, "user_name"),
/// ]),
/// identity: "iceberg_scan".into(),
/// };
///
/// let stream = Box::new(iceberg_scan_executor).execute();
/// #[for_await]
/// for chunk in stream {
/// let chunk = chunk.unwrap();
/// println!("{:?}", chunk);
/// }
/// }
/// ```

pub struct IcebergScanExecutor {
database_name: String,
table_name: String,
iceberg_config: IcebergConfig,
snapshot_id: Option<i64>,
file_selector: FileSelector,
batch_size: usize,

schema: Schema,
identity: String,
}

impl Executor for IcebergScanExecutor {
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
self.do_execute().boxed()
}
}

impl IcebergScanExecutor {
#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let catalog = self.iceberg_config.create_catalog().await?;

let table_ident = TableIdentifier::new(vec![self.database_name, self.table_name]).unwrap();
let table = catalog
.load_table(&table_ident)
.await
.map_err(BatchError::Iceberg)?;

let table_scan: TableScan = table
.new_scan_builder()
.with_snapshot_id(
self.snapshot_id
.unwrap_or_else(|| table.current_table_metadata().current_snapshot_id.unwrap()),
)
.with_batch_size(self.batch_size)
.with_column_names(self.schema.names())
.build()
.map_err(|e| BatchError::Internal(anyhow!(e)))?;
let file_scan_stream: icelake::io::FileScanStream =
table_scan.scan(&table).await.map_err(BatchError::Iceberg)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we do this in planning phase, so that we only need to read metadata for only once, and the scan executor only needs to contains file names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we can do the same as long as setting snapshot_id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, icelake currently doesn't provide a way to read Files directly. If we have a better API later, we can avoid such planning in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we can do the same as long as setting snapshot_id

Yes, in fact we should set snapshot_id, otherwise different tasks may see different snapshots.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ZENOTME Could you help to add this api in icelake?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to pending this to wait for icelake's api support, WDYT? cc @chenzl25

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intend to move fast and present a demo version. Any optimization could be done in parallel as well. 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, let's move

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ZENOTME Could you help to add this api in icelake?

Sorry for replying late. Yes. And I agree to move it now. For the new API, let's track it in icelake as a new feature.


#[for_await]
for file_scan in file_scan_stream {
let file_scan: FileScan = file_scan.map_err(BatchError::Iceberg)?;
if !self.file_selector.select(file_scan.path()) {
continue;
}
let record_batch_stream = file_scan.scan().await.map_err(BatchError::Iceberg)?;

#[for_await]
for record_batch in record_batch_stream {
let record_batch: RecordBatch = record_batch.map_err(BatchError::Iceberg)?;
let chunk = Self::record_batch_to_chunk(record_batch)?;
debug_assert_eq!(chunk.data_types(), self.schema.data_types());
yield chunk;
}
}
}

fn record_batch_to_chunk(record_batch: RecordBatch) -> Result<DataChunk, BatchError> {
let mut columns = Vec::with_capacity(record_batch.num_columns());
for array in record_batch.columns() {
let column = Arc::new(array.try_into()?);
columns.push(column);
}
Ok(DataChunk::new(columns, record_batch.num_rows()))
}
}

pub enum FileSelector {
// File paths to be scanned by this executor are specified.
FileList(Vec<String>),
// Data files to be scanned by this executor could be calculated by Hash(file_path) % num_tasks == task_id.
// task_id, num_tasks
Hash(usize, usize),
}

impl FileSelector {
fn select_all() -> Self {
FileSelector::Hash(0, 1)
}

fn select(&self, path: &str) -> bool {
match self {
FileSelector::FileList(paths) => paths.contains(&path.to_string()),
FileSelector::Hash(task_id, num_tasks) => {
let hash = Self::hash_str_to_usize(path);
hash % num_tasks == *task_id
}
}
}

fn hash_str_to_usize(s: &str) -> usize {
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish() as usize
}
}
1 change: 1 addition & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod generic_exchange;
mod group_top_n;
mod hash_agg;
mod hop_window;
mod iceberg_scan;
mod insert;
mod join;
mod limit;
Expand Down
5 changes: 1 addition & 4 deletions src/connector/src/sink/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use jni::JavaVM;
use risingwave_jni_core::call_method;
use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM};

use crate::sink::{Result, SinkError};

pub struct JniCatalog {
java_catalog: GlobalRef,
jvm: &'static JavaVM,
Expand Down Expand Up @@ -144,7 +142,7 @@ impl JniCatalog {
name: impl ToString,
catalog_impl: impl ToString,
java_catalog_props: HashMap<String, String>,
) -> Result<CatalogRef> {
) -> anyhow::Result<CatalogRef> {
let jvm = JVM.get_or_init()?;

execute_with_jni_env(jvm, |env| {
Expand Down Expand Up @@ -184,6 +182,5 @@ impl JniCatalog {
config: base_config,
}) as CatalogRef)
})
.map_err(SinkError::Iceberg)
}
}
59 changes: 30 additions & 29 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub const ICEBERG_SINK: &str = "iceberg";

static RW_CATALOG_NAME: &str = "risingwave";

#[derive(Debug, Clone, Deserialize, WithOptions)]
#[derive(Debug, Clone, Deserialize, WithOptions, Default)]
#[serde(deny_unknown_fields)]
pub struct IcebergConfig {
pub connector: String, // Avoid deny unknown field. Must be "iceberg"
Expand Down Expand Up @@ -348,42 +348,43 @@ impl IcebergConfig {

Ok((base_catalog_config, java_catalog_configs))
}
}

async fn create_catalog(config: &IcebergConfig) -> Result<CatalogRef> {
match config.catalog_type() {
"storage" | "rest" => {
let iceberg_configs = config.build_iceberg_configs()?;
let catalog = load_catalog(&iceberg_configs)
.await
.map_err(|e| SinkError::Iceberg(anyhow!(e)))?;
Ok(catalog)
}
catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => {
// Create java catalog
let (base_catalog_config, java_catalog_props) = config.build_jni_catalog_configs()?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"sql" => "org.apache.iceberg.jdbc.JdbcCatalog",
"glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
"dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog",
_ => unreachable!(),
};
pub async fn create_catalog(&self) -> anyhow::Result<CatalogRef> {
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
match self.catalog_type() {
"storage" | "rest" => {
let iceberg_configs = self.build_iceberg_configs()?;
let catalog = load_catalog(&iceberg_configs)
.await
.map_err(|e| anyhow!(e))?;
Ok(catalog)
}
catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => {
// Create java catalog
let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"sql" => "org.apache.iceberg.jdbc.JdbcCatalog",
"glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
"dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog",
_ => unreachable!(),
};

jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props)
}
"mock" => Ok(Arc::new(MockCatalog{})),
_ => {
Err(SinkError::Iceberg(anyhow!(
jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props)
}
"mock" => Ok(Arc::new(MockCatalog{})),
_ => {
Err(anyhow!(
"Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`",
config.catalog_type()
)))
self.catalog_type()
))
}
}
}
}

pub async fn create_table(config: &IcebergConfig) -> Result<Table> {
let catalog = create_catalog(config)
let catalog = config
.create_catalog()
.await
.map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?;

Expand Down
Loading