Skip to content

Commit

Permalink
refactor(frontend): use iceberg-rust instead of icelake to enumerate …
Browse files Browse the repository at this point in the history
…iceberg files and snapshots. (#17558)

Co-authored-by: Eric Fu <[email protected]>
  • Loading branch information
chenzl25 and fuyufjh authored Jul 5, 2024
1 parent 3c88611 commit 0e64bc0
Show file tree
Hide file tree
Showing 22 changed files with 122 additions and 100 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ CREATE SINK sink1 AS select * from mv1 WITH (
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand All @@ -45,7 +45,7 @@ CREATE SINK sink2 AS select * from mv1 WITH (
table.name = 't2',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE SINK s1 AS select * from products WITH (
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE SINK s6 AS select * from mv6 WITH (
table.name = 'no_partition_append_only_table',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ WITH (
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 'no_partition_append_only_table',
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
database.name = 'demo_db',
table.name = 'no_partition_upsert_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE SINK s6 AS select * from mv6 WITH (
table.name = 'partition_append_only_table',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ WITH (
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 'partition_append_only_table',
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
database.name = 'demo_db',
table.name = 'partition_upsert_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE SINK s6 AS select * from mv6 WITH (
table.name = 'range_partition_append_only_table',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ WITH (
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 'range_partition_append_only_table',
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
database.name = 'demo_db',
table.name = 'range_partition_upsert_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
connector = 'iceberg',
type = 'upsert',
primary_key = 'v1',
warehouse.path = 's3://iceberg',
warehouse.path = 's3a://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
Expand All @@ -23,7 +23,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
statement ok
CREATE SOURCE iceberg_demo_source WITH (
connector = 'iceberg',
warehouse.path = 's3://iceberg',
warehouse.path = 's3a://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/iceberg-sink2/docker/hive/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type=append-only
force_append_only = true
catalog.type = hive
catalog.uri = thrift://metastore:9083
warehouse.path = s3://icebergdata/demo
warehouse.path = s3a://icebergdata/demo
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/iceberg-sink2/docker/storage/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type = storage
catalog.name = demo
warehouse.path = s3://icebergdata/demo
warehouse.path = s3a://icebergdata/demo
database.name=s1
table.name=t1
2 changes: 1 addition & 1 deletion integration_tests/iceberg-source/docker/hive/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ port=4566
connector = iceberg
catalog.type = hive
catalog.uri = thrift://metastore:9083
warehouse.path = s3://icebergdata/demo
warehouse.path = s3a://icebergdata/demo
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/iceberg-source/docker/storage/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type = storage
warehouse.path = s3://icebergdata/demo
warehouse.path = s3a://icebergdata/demo
database.name=s1
table.name=t1
2 changes: 1 addition & 1 deletion src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::executor::{DataChunk, Executor};
/// database_name: Some("demo_db".into()),
/// table_name: "demo_table".into(),
/// catalog_type: Some("storage".into()),
/// path: "s3a://hummock001/".into(),
/// path: "s3://hummock001/".into(),
/// endpoint: Some("http://127.0.0.1:9301".into()),
/// access_key: "hummockadmin".into(),
/// secret_key: "hummockadmin".into(),
Expand Down
83 changes: 45 additions & 38 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use std::collections::HashMap;

use anyhow::anyhow;
use async_trait::async_trait;
use icelake::types::DataContentType;
use futures::StreamExt;
use iceberg::spec::{DataContentType, ManifestList};
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::types::JsonbVal;
Expand Down Expand Up @@ -171,58 +172,64 @@ impl IcebergSplitEnumerator {
if batch_parallelism == 0 {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
}
let table = self.config.load_table().await?;
let table = self.config.load_table_v2().await?;
let snapshot_id = match time_traval_info {
Some(IcebergTimeTravelInfo::Version(version)) => {
let Some(snapshot) = table.current_table_metadata().snapshot(version) else {
let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
bail!("Cannot find the snapshot id in the iceberg table.");
};
snapshot.snapshot_id
snapshot.snapshot_id()
}
Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
match &table.current_table_metadata().snapshots {
Some(snapshots) => {
let snapshot = snapshots
.iter()
.filter(|snapshot| snapshot.timestamp_ms <= timestamp)
.max_by_key(|snapshot| snapshot.timestamp_ms);
match snapshot {
Some(snapshot) => snapshot.snapshot_id,
None => {
// convert unix time to human readable time
let time = chrono::DateTime::from_timestamp_millis(timestamp);
if time.is_some() {
bail!("Cannot find a snapshot older than {}", time.unwrap());
} else {
bail!("Cannot find a snapshot");
}
}
}
}
let snapshot = table
.metadata()
.snapshots()
.filter(|snapshot| snapshot.timestamp().timestamp_millis() <= timestamp)
.max_by_key(|snapshot| snapshot.timestamp().timestamp_millis());
match snapshot {
Some(snapshot) => snapshot.snapshot_id(),
None => {
bail!("Cannot find the snapshots in the iceberg table.");
// convert unix time to human readable time
let time = chrono::DateTime::from_timestamp_millis(timestamp);
if time.is_some() {
bail!("Cannot find a snapshot older than {}", time.unwrap());
} else {
bail!("Cannot find a snapshot");
}
}
}
}
None => match table.current_table_metadata().current_snapshot_id {
Some(snapshot_id) => snapshot_id,
None => match table.metadata().current_snapshot() {
Some(snapshot) => snapshot.snapshot_id(),
None => bail!("Cannot find the current snapshot id in the iceberg table."),
},
};
let mut files = vec![];
for file in table
.data_files_of_snapshot(
table
.current_table_metadata()
.snapshot(snapshot_id)
.expect("snapshot must exists"),
)
.await?
{
if file.content != DataContentType::Data {
bail!("Reading iceberg table with delete files is unsupported. Please try to compact the table first.");

let snapshot = table
.metadata()
.snapshot_by_id(snapshot_id)
.expect("snapshot must exist");

let manifest_list: ManifestList = snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.map_err(|e| anyhow!(e))?;
for entry in manifest_list.entries() {
let manifest = entry
.load_manifest(table.file_io())
.await
.map_err(|e| anyhow!(e))?;
let mut manifest_entries_stream =
futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive()));

while let Some(manifest_entry) = manifest_entries_stream.next().await {
let file = manifest_entry.data_file();
if file.content_type() != DataContentType::Data {
bail!("Reading iceberg table with delete files is unsupported. Please try to compact the table first.");
}
files.push(file.file_path().to_string());
}
files.push(file.file_path);
}
let split_num = batch_parallelism;
// evenly split the files into splits based on the parallelism.
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fixedbitset = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
iana-time-zone = "0.1"
iceberg = { workspace = true }
icelake = { workspace = true }
itertools = { workspace = true }
jsonbb = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::ops::Deref;

use anyhow::anyhow;
use icelake::Table;
use futures::StreamExt;
use iceberg::spec::ManifestList;
use iceberg::table::Table;
use risingwave_common::types::Fields;
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::ConnectorProperties;
Expand Down Expand Up @@ -46,7 +48,7 @@ struct RwIcebergFiles {
/// Required when content is `EqualityDeletes` and should be null
/// otherwise. Fields with ids listed in this column must be present
/// in the delete file
pub equality_ids: Option<Vec<i32>>,
pub equality_ids: Vec<i32>,
/// ID representing sort order for this file.
///
/// If sort order ID is missing or unknown, then the order is assumed to
Expand Down Expand Up @@ -81,26 +83,37 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwIcebergFiles>> {
let config = ConnectorProperties::extract(source_props, false)?;
if let ConnectorProperties::Iceberg(iceberg_properties) = config {
let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config();
let table: Table = iceberg_config.load_table().await?;
result.extend(
table
.current_data_files()
let table: Table = iceberg_config.load_table_v2().await?;
if let Some(snapshot) = table.metadata().current_snapshot() {
let manifest_list: ManifestList = snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.map_err(|e| anyhow!(e))?
.iter()
.map(|file| RwIcebergFiles {
source_id: source.id as i32,
schema_name: schema_name.clone(),
source_name: source.name.clone(),
content: file.content as i32,
file_path: file.file_path.clone(),
file_format: file.file_format.to_string(),
record_count: file.record_count,
file_size_in_bytes: file.file_size_in_bytes,
equality_ids: file.equality_ids.clone(),
sort_order_id: file.sort_order_id,
}),
);
.map_err(|e| anyhow!(e))?;
for entry in manifest_list.entries() {
let manifest = entry
.load_manifest(table.file_io())
.await
.map_err(|e| anyhow!(e))?;
let mut manifest_entries_stream =
futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive()));

while let Some(manifest_entry) = manifest_entries_stream.next().await {
let file = manifest_entry.data_file();
result.push(RwIcebergFiles {
source_id: source.id as i32,
schema_name: schema_name.clone(),
source_name: source.name.clone(),
content: file.content_type() as i32,
file_path: file.file_path().to_string(),
file_format: file.file_format().to_string(),
record_count: file.record_count() as i64,
file_size_in_bytes: file.file_size_in_bytes() as i64,
equality_ids: file.equality_ids().to_vec(),
sort_order_id: file.sort_order_id(),
});
}
}
}
} else {
unreachable!()
}
Expand Down
Loading

0 comments on commit 0e64bc0

Please sign in to comment.