Skip to content

Commit

Permalink
feat(batch): use iceberg rust in iceberg scan (#17545)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jul 3, 2024
1 parent d54faf3 commit 5ce8102
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ futures-async-stream = { workspace = true }
futures-util = "0.3"
hashbrown = { workspace = true }
hytra = "0.1.2"
icelake = { workspace = true }
iceberg = { workspace = true }
itertools = { workspace = true }
memcomparable = "0.2"
opendal = "0.45.1"
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub enum BatchError {
Iceberg(
#[from]
#[backtrace]
icelake::Error,
iceberg::Error,
),

// Make the ref-counted type to be a variant for easier code structuring.
Expand Down
79 changes: 47 additions & 32 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future;
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::anyhow;
use arrow_array_iceberg::RecordBatch;
use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use icelake::io::{FileScan, TableScan};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::Schema;
use risingwave_connector::sink::iceberg::IcebergConfig;
Expand Down Expand Up @@ -116,44 +115,60 @@ impl IcebergScanExecutor {

#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let table = self.iceberg_config.load_table().await?;

let table_scan: TableScan = table
.new_scan_builder()
.with_snapshot_id(
self.snapshot_id
.unwrap_or_else(|| table.current_table_metadata().current_snapshot_id.unwrap()),
)
.with_batch_size(self.batch_size)
.with_column_names(self.schema.names())
let table = self.iceberg_config.load_table_v2().await?;

let snapshot_id = if let Some(snapshot_id) = self.snapshot_id {
snapshot_id
} else {
table
.metadata()
.current_snapshot()
.ok_or_else(|| {
BatchError::Internal(anyhow!("No snapshot found for iceberg table"))
})?
.snapshot_id()
};
let scan = table
.scan()
.snapshot_id(snapshot_id)
.with_batch_size(Some(self.batch_size))
.select(self.schema.names())
.build()
.map_err(|e| BatchError::Internal(anyhow!(e)))?;
let file_scan_stream: icelake::io::FileScanStream =
table_scan.scan(&table).await.map_err(BatchError::Iceberg)?;

let file_selector = self.file_selector.clone();
let file_scan_stream = scan
.plan_files()
.await
.map_err(BatchError::Iceberg)?
.filter(move |task| {
let res = task
.as_ref()
.map(|task| file_selector.select(task.data_file_path()))
.unwrap_or(true);
future::ready(res)
});

let reader = table
.reader_builder()
.with_batch_size(self.batch_size)
.build();

let record_batch_stream = reader
.read(Box::pin(file_scan_stream))
.map_err(BatchError::Iceberg)?;

#[for_await]
for file_scan in file_scan_stream {
let file_scan: FileScan = file_scan.map_err(BatchError::Iceberg)?;
if !self.file_selector.select(file_scan.path()) {
continue;
}
let record_batch_stream = file_scan.scan().await.map_err(BatchError::Iceberg)?;

#[for_await]
for record_batch in record_batch_stream {
let record_batch: RecordBatch = record_batch.map_err(BatchError::Iceberg)?;
let chunk = Self::record_batch_to_chunk(record_batch)?;
debug_assert_eq!(chunk.data_types(), self.schema.data_types());
yield chunk;
}
for record_batch in record_batch_stream {
let record_batch = record_batch.map_err(BatchError::Iceberg)?;
let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
debug_assert_eq!(chunk.data_types(), self.schema.data_types());
yield chunk;
}
}

fn record_batch_to_chunk(record_batch: RecordBatch) -> Result<DataChunk, BatchError> {
Ok(IcebergArrowConvert.chunk_from_record_batch(&record_batch)?)
}
}

#[derive(Clone)]
pub enum FileSelector {
// File paths to be scanned by this executor are specified.
FileList(Vec<String>),
Expand Down
31 changes: 29 additions & 2 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow_schema_iceberg::{
DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef,
};
use async_trait::async_trait;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::table::Table as TableV2;
use iceberg::{Catalog as CatalogV2, TableIdent};
use icelake::catalog::{
Expand Down Expand Up @@ -307,19 +308,32 @@ impl IcebergConfig {
iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name());

if let Some(region) = &self.region {
// icelake
iceberg_configs.insert(
"iceberg.table.io.region".to_string(),
region.clone().to_string(),
);
// iceberg-rust
iceberg_configs.insert(
("iceberg.table.io.".to_string() + S3_REGION).to_string(),
region.clone().to_string(),
);
}

if let Some(endpoint) = &self.endpoint {
iceberg_configs.insert(
"iceberg.table.io.endpoint".to_string(),
endpoint.clone().to_string(),
);

// iceberg-rust
iceberg_configs.insert(
("iceberg.table.io.".to_string() + S3_ENDPOINT).to_string(),
endpoint.clone().to_string(),
);
}

// icelake
iceberg_configs.insert(
"iceberg.table.io.access_key_id".to_string(),
self.access_key.clone().to_string(),
Expand All @@ -329,6 +343,16 @@ impl IcebergConfig {
self.secret_key.clone().to_string(),
);

// iceberg-rust
iceberg_configs.insert(
("iceberg.table.io.".to_string() + S3_ACCESS_KEY_ID).to_string(),
self.access_key.clone().to_string(),
);
iceberg_configs.insert(
("iceberg.table.io.".to_string() + S3_SECRET_ACCESS_KEY).to_string(),
self.secret_key.clone().to_string(),
);

let (bucket, _) = {
let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?;
let bucket = url
Expand Down Expand Up @@ -498,12 +522,15 @@ impl IcebergConfig {
let catalog = iceberg_catalog_rest::RestCatalog::new(config).await?;
Ok(Arc::new(catalog))
}
catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => {
catalog_type
if catalog_type == "hive" || catalog_type == "jdbc" || catalog_type == "glue" =>
{
// Create java catalog
let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
"glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
_ => unreachable!(),
};

Expand All @@ -516,7 +543,7 @@ impl IcebergConfig {
}
_ => {
bail!(
"Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`",
"Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`",
self.catalog_type()
)
}
Expand Down

0 comments on commit 5ce8102

Please sign in to comment.