Skip to content

Commit

Permalink
feat(iceberg): reduce iceberg catalog fetch rpc number for iceberg sc…
Browse files Browse the repository at this point in the history
…an (#17939)
  • Loading branch information
chenzl25 authored Aug 7, 2024
1 parent 05efd48 commit ea4e8f0
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 149 deletions.
144 changes: 27 additions & 117 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,70 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::mem;

use anyhow::anyhow;
use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::Schema;
use risingwave_connector::sink::iceberg::IcebergConfig;

use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};

/// Create a iceberg scan executor.
///
/// # Examples
///
/// ```
/// use futures_async_stream::for_await;
/// use risingwave_batch::executor::{Executor, FileSelector, IcebergScanExecutor};
/// use risingwave_common::catalog::{Field, Schema};
/// use risingwave_common::types::DataType;
/// use risingwave_connector::sink::iceberg::IcebergConfig;
///
/// #[tokio::test]
/// async fn test_iceberg_scan() {
/// let iceberg_scan_executor = IcebergScanExecutor::new(
/// IcebergConfig {
/// database_name: Some("demo_db".into()),
/// table_name: "demo_table".into(),
/// catalog_type: Some("storage".into()),
/// path: "s3://hummock001/".into(),
/// endpoint: Some("http://127.0.0.1:9301".into()),
/// access_key: "hummockadmin".into(),
/// secret_key: "hummockadmin".into(),
/// region: Some("us-east-1".into()),
/// ..Default::default()
/// },
/// None,
/// FileSelector::select_all(),
/// 1024,
/// Schema::new(vec![
/// Field::with_name(DataType::Int64, "seq_id"),
/// Field::with_name(DataType::Int64, "user_id"),
/// Field::with_name(DataType::Varchar, "user_name"),
/// ]),
/// "iceberg_scan".into(),
/// );
///
/// let stream = Box::new(iceberg_scan_executor).execute();
/// #[for_await]
/// for chunk in stream {
/// let chunk = chunk.unwrap();
/// println!("{:?}", chunk);
/// }
/// }
/// ```
pub struct IcebergScanExecutor {
iceberg_config: IcebergConfig,
#[allow(dead_code)]
snapshot_id: Option<i64>,
file_selector: FileSelector,
table_meta: TableMetadata,
file_scan_tasks: Vec<FileScanTask>,
batch_size: usize,

schema: Schema,
identity: String,
}
Expand All @@ -98,56 +54,41 @@ impl IcebergScanExecutor {
pub fn new(
iceberg_config: IcebergConfig,
snapshot_id: Option<i64>,
file_selector: FileSelector,
table_meta: TableMetadata,
file_scan_tasks: Vec<FileScanTask>,
batch_size: usize,
schema: Schema,
identity: String,
) -> Self {
Self {
iceberg_config,
snapshot_id,
file_selector,
table_meta,
file_scan_tasks,
batch_size,
schema,
identity,
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let table = self.iceberg_config.load_table_v2().await?;

let snapshot_id = if let Some(snapshot_id) = self.snapshot_id {
snapshot_id
} else {
table
.metadata()
.current_snapshot()
.ok_or_else(|| {
BatchError::Internal(anyhow!("No snapshot found for iceberg table"))
})?
.snapshot_id()
async fn do_execute(mut self: Box<Self>) {
let table = self
.iceberg_config
.load_table_v2_with_metadata(self.table_meta)
.await?;
let data_types = self.schema.data_types();

let file_scan_tasks = mem::take(&mut self.file_scan_tasks);

let file_scan_stream = {
#[try_stream]
async move {
for file_scan_task in file_scan_tasks {
yield file_scan_task;
}
}
};
let scan = table
.scan()
.snapshot_id(snapshot_id)
.with_batch_size(Some(self.batch_size))
.select(self.schema.names())
.build()
.map_err(|e| BatchError::Internal(anyhow!(e)))?;

let file_selector = self.file_selector.clone();
let file_scan_stream = scan
.plan_files()
.await
.map_err(BatchError::Iceberg)?
.filter(move |task| {
let res = task
.as_ref()
.map(|task| file_selector.select(task.data_file_path()))
.unwrap_or(true);
future::ready(res)
});

let reader = table
.reader_builder()
Expand All @@ -162,39 +103,8 @@ impl IcebergScanExecutor {
for record_batch in record_batch_stream {
let record_batch = record_batch.map_err(BatchError::Iceberg)?;
let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
debug_assert_eq!(chunk.data_types(), self.schema.data_types());
debug_assert_eq!(chunk.data_types(), data_types);
yield chunk;
}
}
}

#[derive(Clone)]
pub enum FileSelector {
// File paths to be scanned by this executor are specified.
FileList(Vec<String>),
// Data files to be scanned by this executor could be calculated by Hash(file_path) % num_tasks == task_id.
// task_id, num_tasks
Hash(usize, usize),
}

impl FileSelector {
pub fn select_all() -> Self {
FileSelector::Hash(0, 1)
}

pub fn select(&self, path: &str) -> bool {
match self {
FileSelector::FileList(paths) => paths.contains(&path.to_string()),
FileSelector::Hash(task_id, num_tasks) => {
let hash = Self::hash_str_to_usize(path);
hash % num_tasks == *task_id
}
}
}

pub fn hash_str_to_usize(s: &str) -> usize {
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish() as usize
}
}
7 changes: 3 additions & 4 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::Executor;
use crate::error::{BatchError, Result};
use crate::executor::{
BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, FileSelector, IcebergScanExecutor,
};
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, IcebergScanExecutor};
use crate::task::BatchTaskContext;

pub struct SourceExecutor {
Expand Down Expand Up @@ -113,7 +111,8 @@ impl BoxedExecutorBuilder for SourceExecutor {
Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties.to_iceberg_config(),
Some(split.snapshot_id),
FileSelector::FileList(split.files),
split.table_meta.deserialize(),
split.files.into_iter().map(|x| x.deserialize()).collect(),
source.context.get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
Expand Down
32 changes: 32 additions & 0 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow_schema_iceberg::{
};
use async_trait::async_trait;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::TableMetadata;
use iceberg::table::Table as TableV2;
use iceberg::{Catalog as CatalogV2, TableIdent};
use icelake::catalog::{
Expand Down Expand Up @@ -578,6 +579,37 @@ impl IcebergConfig {

catalog.load_table(&table_id).await.map_err(Into::into)
}

pub async fn load_table_v2_with_metadata(
&self,
metadata: TableMetadata,
) -> ConnectorResult<TableV2> {
match self.catalog_type() {
"storage" => {
let config = StorageCatalogConfig::builder()
.warehouse(self.path.clone())
.access_key(self.access_key.clone())
.secret_key(self.secret_key.clone())
.region(self.region.clone())
.endpoint(self.endpoint.clone())
.build();
let storage_catalog = storage_catalog::StorageCatalog::new(config)?;

let table_id = self
.full_table_name_v2()
.context("Unable to parse table name")?;

Ok(iceberg::table::Table::builder()
.metadata(metadata)
.identifier(table_id)
.file_io(storage_catalog.file_io().clone())
// Only support readonly table for storage catalog now.
.readonly(true)
.build())
}
_ => self.load_table_v2().await,
}
}
}

pub struct IcebergSink {
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/sink/iceberg/storage_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ impl StorageCatalog {

Ok(paths)
}

pub fn file_io(&self) -> &FileIO {
&self.file_io
}
}

#[async_trait]
Expand Down
75 changes: 49 additions & 26 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use std::collections::HashMap;

use anyhow::anyhow;
use async_trait::async_trait;
use futures::StreamExt;
use iceberg::spec::{DataContentType, ManifestList};
use futures_async_stream::for_await;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use itertools::Itertools;
pub use parquet_file_reader::*;
use risingwave_common::bail;
use risingwave_common::catalog::Schema;
use risingwave_common::types::JsonbVal;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -110,11 +112,38 @@ impl UnknownFields for IcebergProperties {
}
}

#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct IcebergFileScanTaskJsonStr(String);

impl IcebergFileScanTaskJsonStr {
pub fn deserialize(&self) -> FileScanTask {
serde_json::from_str(&self.0).unwrap()
}

pub fn serialize(task: &FileScanTask) -> Self {
Self(serde_json::to_string(task).unwrap())
}
}

#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct TableMetadataJsonStr(String);

impl TableMetadataJsonStr {
pub fn deserialize(&self) -> TableMetadata {
serde_json::from_str(&self.0).unwrap()
}

pub fn serialize(metadata: &TableMetadata) -> Self {
Self(serde_json::to_string(metadata).unwrap())
}
}

#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct IcebergSplit {
pub split_id: i64,
pub snapshot_id: i64,
pub files: Vec<String>,
pub table_meta: TableMetadataJsonStr,
pub files: Vec<IcebergFileScanTaskJsonStr>,
}

impl SplitMetaData for IcebergSplit {
Expand Down Expand Up @@ -169,6 +198,7 @@ pub enum IcebergTimeTravelInfo {
impl IcebergSplitEnumerator {
pub async fn list_splits_batch(
&self,
schema: Schema,
time_traval_info: Option<IcebergTimeTravelInfo>,
batch_parallelism: usize,
) -> ConnectorResult<Vec<IcebergSplit>> {
Expand Down Expand Up @@ -209,31 +239,23 @@ impl IcebergSplitEnumerator {
};
let mut files = vec![];

let snapshot = table
.metadata()
.snapshot_by_id(snapshot_id)
.expect("snapshot must exist");

let manifest_list: ManifestList = snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
let scan = table
.scan()
.snapshot_id(snapshot_id)
.select(schema.names())
.build()
.map_err(|e| anyhow!(e))?;
for entry in manifest_list.entries() {
let manifest = entry
.load_manifest(table.file_io())
.await
.map_err(|e| anyhow!(e))?;
let mut manifest_entries_stream =
futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive()));

while let Some(manifest_entry) = manifest_entries_stream.next().await {
let file = manifest_entry.data_file();
if file.content_type() != DataContentType::Data {
bail!("Reading iceberg table with delete files is unsupported. Please try to compact the table first.");
}
files.push(file.file_path().to_string());
}

let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;

#[for_await]
for task in file_scan_stream {
let task = task.map_err(|e| anyhow!(e))?;
files.push(IcebergFileScanTaskJsonStr::serialize(&task));
}

let table_meta = TableMetadataJsonStr::serialize(table.metadata());

let split_num = batch_parallelism;
// evenly split the files into splits based on the parallelism.
let split_size = files.len() / split_num;
Expand All @@ -245,6 +267,7 @@ impl IcebergSplitEnumerator {
let split = IcebergSplit {
split_id: i as i64,
snapshot_id,
table_meta: table_meta.clone(),
files: files[start..end].to_vec(),
};
splits.push(split);
Expand Down
Loading

0 comments on commit ea4e8f0

Please sign in to comment.