diff --git a/Cargo.lock b/Cargo.lock index 85fe3c1c4955e..366fb3b36a672 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -991,7 +991,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "136d4d23bcc79e27423727b36823d86233aad06dfea531837b038394d11e9928" dependencies = [ "concurrent-queue", - "event-listener 5.2.0", + "event-listener 5.3.1", "event-listener-strategy", "futures-core", "pin-project-lite", @@ -2345,9 +2345,9 @@ dependencies = [ [[package]] name = "bytecount" -version = "0.6.3" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" +checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" [[package]] name = "bytemuck" @@ -2418,9 +2418,9 @@ checksum = "981520c98f422fcc584dc1a95c334e6953900b9106bc47a9839b81790009eb21" [[package]] name = "camino" -version = "1.1.6" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3" dependencies = [ "serde", ] @@ -2498,9 +2498,9 @@ checksum = "1582e1c9e755dd6ad6b224dcffb135d199399a4568d454bd89fe515ca8425695" [[package]] name = "cargo-platform" -version = "0.1.3" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cfa25e60aea747ec7e1124f238816749faa93759c6ff5b31f1ccdda137f4479" +checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc" dependencies = [ "serde", ] @@ -2852,9 +2852,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.2.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ "crossbeam-utils", ] @@ -4568,9 +4568,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "event-listener" -version = "5.2.0" +version = "5.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b5fb89194fa3cad959b833185b3063ba881dbfc7030680b314250779fb4cc91" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" dependencies = [ "concurrent-queue", "parking", @@ -4583,7 +4583,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "332f51cb23d20b0de8458b86580878211da09bcd4503cb579c225b3d124cabb3" dependencies = [ - "event-listener 5.2.0", + "event-listener 5.3.1", "pin-project-lite", ] @@ -6004,7 +6004,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=df076a9949e9ed6431a6dd3998ac0c49152dda9c#df076a9949e9ed6431a6dd3998ac0c49152dda9c" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=84bf51c9d0d5886e4ee306ca4f383f029e1767a4#84bf51c9d0d5886e4ee306ca4f383f029e1767a4" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6024,11 +6024,13 @@ dependencies = [ "fnv", "futures", "itertools 0.13.0", + "moka", "murmur3", "once_cell", "opendal 0.49.0", "ordered-float 4.1.1", "parquet 52.0.0", + "paste", "reqwest 0.12.4", "rust_decimal", "serde", @@ -6038,7 +6040,7 @@ dependencies = [ "serde_repr", "serde_with 3.8.0", "tokio", - "typed-builder 0.19.1", + "typed-builder 0.20.0", "url", "uuid", ] @@ -6046,7 +6048,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=df076a9949e9ed6431a6dd3998ac0c49152dda9c#df076a9949e9ed6431a6dd3998ac0c49152dda9c" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=84bf51c9d0d5886e4ee306ca4f383f029e1767a4#84bf51c9d0d5886e4ee306ca4f383f029e1767a4" dependencies = [ "anyhow", "async-trait", @@ -6056,14 +6058,14 @@ dependencies = [ "log", "serde_json", "tokio", - "typed-builder 0.19.1", + "typed-builder 0.20.0", "uuid", ] [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=df076a9949e9ed6431a6dd3998ac0c49152dda9c#df076a9949e9ed6431a6dd3998ac0c49152dda9c" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=84bf51c9d0d5886e4ee306ca4f383f029e1767a4#84bf51c9d0d5886e4ee306ca4f383f029e1767a4" dependencies = [ "async-trait", "chrono", @@ -6076,7 +6078,7 @@ dependencies = [ "serde_derive", "serde_json", "tokio", - "typed-builder 0.19.1", + "typed-builder 0.20.0", "uuid", ] @@ -9124,7 +9126,7 @@ checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.10.5", + "itertools 0.11.0", "log", "multimap 0.8.3", "once_cell", @@ -9179,7 +9181,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.66", @@ -9308,11 +9310,11 @@ dependencies = [ [[package]] name = "pulldown-cmark" -version = "0.9.3" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" +checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.6.0", "memchr", "unicase", ] @@ -9365,7 +9367,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -14830,6 +14832,15 @@ dependencies = [ "typed-builder-macro 0.19.1", ] +[[package]] +name = "typed-builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e14ed59dc8b7b26cacb2a92bad2e8b1f098806063898ab42a3bd121d7d45e75" +dependencies = [ + "typed-builder-macro 0.20.0", +] + [[package]] name = "typed-builder-macro" version = "0.16.2" @@ -14863,6 +14874,17 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "typed-builder-macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "560b82d656506509d43abe30e0ba64c56b1953ab3d4fe7ba5902747a7a3cedd5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "typenum" version = "1.16.0" diff --git a/Cargo.toml b/Cargo.toml index a6fc35d11c908..63feb981d9a1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,9 +143,10 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" } arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" } -iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" } -iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "df076a9949e9ed6431a6dd3998ac0c49152dda9c" } +# branch dev +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } opendal = "0.47" arrow-array = "50" arrow-arith = "50" diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index 1a46f30682bdd..c039c625aa213 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -46,6 +46,7 @@ poetry run python main.py -t ./test_case/range_partition_append_only.toml poetry run python main.py -t ./test_case/range_partition_upsert.toml poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml +poetry run python main.py -t ./test_case/iceberg_source_eq_delete.toml echo "--- Kill cluster" diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt new file mode 100644 index 0000000000000..820776fb7e773 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt @@ -0,0 +1,113 @@ +statement ok +set sink_decouple = false; + +statement ok +set streaming_parallelism=4; + +statement ok +CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar); + +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1; + +statement ok +CREATE SINK sink1 AS select * from mv1 WITH ( + connector = 'iceberg', + type = 'upsert', + database.name = 'demo_db', + table.name = 't1', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + create_table_if_not_exists = 'true', + primary_key = 'i1,i2', +); + +statement ok +insert into s1 values(1,'2','3'); + +statement ok +insert into s1 values(7,'8','9'); + +statement ok +insert into s1 values(4,'5','6'); + +statement ok +flush; + +statement ok +delete from s1 where i1 = 7; + +statement ok +flush; + +sleep 5s + +statement ok +CREATE SOURCE iceberg_t1_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + database.name = 'demo_db', + table.name = 't1', +); + +query I +select * from iceberg_t1_source order by i1; +---- +1 2 3 +4 5 6 + +query I +select i1,i2,i3 from iceberg_t1_source order by i1; +---- +1 2 3 +4 5 6 + +query I +select i3,i2 from iceberg_t1_source order by i2; +---- +3 2 +6 5 + +query I +select i2,i1 from iceberg_t1_source order by i1; +---- +2 1 +5 4 + +query I +select i1 from iceberg_t1_source order by i1; +---- +1 +4 + +query I +select i2 from iceberg_t1_source order by i2; +---- +2 +5 + +query I +select i3 from iceberg_t1_source order by i3; +---- +3 +6 + +statement ok +DROP SINK sink1; + +statement ok +DROP SOURCE iceberg_t1_source; + +statement ok +DROP TABLE s1 cascade; diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml new file mode 100644 index 0000000000000..6e49ca949f501 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml @@ -0,0 +1,11 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.t1', +] + +slt = 'test_case/iceberg_source_eq_delete.slt' + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.t1', + 'DROP SCHEMA IF EXISTS demo_db', +] \ No newline at end of file diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index fca7745284fe3..2f67d8ce005aa 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::mem; use futures_async_stream::try_stream; @@ -20,8 +21,11 @@ use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::DataType; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit}; use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData}; @@ -38,7 +42,8 @@ pub struct IcebergScanExecutor { #[allow(dead_code)] snapshot_id: Option, table_meta: TableMetadata, - file_scan_tasks: Vec, + data_file_scan_tasks: Vec, + eq_delete_file_scan_tasks: Vec, batch_size: usize, schema: Schema, identity: String, @@ -63,7 +68,8 @@ impl IcebergScanExecutor { iceberg_config: IcebergConfig, snapshot_id: Option, table_meta: TableMetadata, - file_scan_tasks: Vec, + data_file_scan_tasks: Vec, + eq_delete_file_scan_tasks: Vec, batch_size: usize, schema: Schema, identity: String, @@ -72,7 +78,8 @@ impl IcebergScanExecutor { iceberg_config, snapshot_id, table_meta, - file_scan_tasks, + data_file_scan_tasks, + eq_delete_file_scan_tasks, batch_size, schema, identity, @@ -86,33 +93,136 @@ impl IcebergScanExecutor { .load_table_v2_with_metadata(self.table_meta) .await?; let data_types = self.schema.data_types(); + let executor_schema_names = self.schema.names(); - let file_scan_tasks = mem::take(&mut self.file_scan_tasks); + let mut eq_delete_file_scan_tasks_map: HashMap = HashMap::default(); + let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); - let file_scan_stream = { - #[try_stream] - async move { - for file_scan_task in file_scan_tasks { - yield file_scan_task; + // Build hash map for equality delete files + // Currently, all equality delete files have the same schema which is guaranteed by `IcebergSplitEnumerator`. + let mut eq_delete_ids: Option> = None; + for eq_delete_file_scan_task in eq_delete_file_scan_tasks { + let mut sequence_number = eq_delete_file_scan_task.sequence_number; + + if eq_delete_ids.is_none() { + eq_delete_ids = Some(eq_delete_file_scan_task.project_field_ids.clone()); + } else { + debug_assert_eq!( + eq_delete_ids.as_ref().unwrap(), + &eq_delete_file_scan_task.project_field_ids + ); + } + + let reader = table + .reader_builder() + .with_batch_size(self.batch_size) + .build(); + let delete_file_scan_stream = tokio_stream::once(Ok(eq_delete_file_scan_task)); + + let mut delete_record_batch_stream = reader + .read(Box::pin(delete_file_scan_stream)) + .map_err(BatchError::Iceberg)?; + + while let Some(record_batch) = delete_record_batch_stream.next().await { + let record_batch = record_batch.map_err(BatchError::Iceberg)?; + + let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + for row in chunk.rows() { + let entry = eq_delete_file_scan_tasks_map + .entry(row.to_owned_row()) + .or_default(); + *entry = *entry.max(&mut sequence_number); } } - }; - - let reader = table - .reader_builder() - .with_batch_size(self.batch_size) - .build(); - - let record_batch_stream = reader - .read(Box::pin(file_scan_stream)) - .map_err(BatchError::Iceberg)?; - - #[for_await] - for record_batch in record_batch_stream { - let record_batch = record_batch.map_err(BatchError::Iceberg)?; - let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - debug_assert_eq!(chunk.data_types(), data_types); - yield chunk; + } + + let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks); + + // Delete rows in the data file that need to be deleted by map + for data_file_scan_task in data_file_scan_tasks { + let data_sequence_number = data_file_scan_task.sequence_number; + + let data_chunk_column_names: Vec<_> = data_file_scan_task + .project_field_ids + .iter() + .filter_map(|id| { + data_file_scan_task + .schema + .name_by_field_id(*id) + .map(|name| name.to_string()) + }) + .collect(); + + // eq_delete_column_idxes are used to fetch equality delete columns from data files. + let eq_delete_column_idxes = eq_delete_ids.as_ref().map(|eq_delete_ids| { + eq_delete_ids + .iter() + .map(|eq_delete_id| { + data_file_scan_task + .project_field_ids + .iter() + .position(|project_field_id| eq_delete_id == project_field_id) + .expect("eq_delete_id not found in delete_equality_ids") + }) + .collect_vec() + }); + + let reader = table + .reader_builder() + .with_batch_size(self.batch_size) + .build(); + let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task)); + + let mut record_batch_stream = reader + .read(Box::pin(file_scan_stream)) + .map_err(BatchError::Iceberg)?; + + while let Some(record_batch) = record_batch_stream.next().await { + let record_batch = record_batch.map_err(BatchError::Iceberg)?; + + let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + let chunk = match eq_delete_column_idxes.as_ref() { + Some(delete_column_ids) => { + let visibility = Bitmap::from_iter( + // Project with the schema of the delete file + chunk.project(delete_column_ids).rows().map(|row_ref| { + let row = row_ref.to_owned_row(); + if let Some(delete_sequence_number) = + eq_delete_file_scan_tasks_map.get(&row) + && delete_sequence_number > &data_sequence_number + { + // delete_sequence_number > data_sequence_number means the delete file is written later than data file, + // so it needs to be deleted + false + } else { + true + } + }), + ) + .clone(); + // Keep the schema consistent(chunk and executor) + // Filter out (equality delete) columns that are not in the executor schema + let data = chunk + .columns() + .iter() + .zip_eq_fast(&data_chunk_column_names) + .filter_map(|(array, columns)| { + if executor_schema_names.contains(columns) { + Some(array.clone()) + } else { + None + } + }) + .collect_vec(); + let chunk = DataChunk::new(data, visibility); + debug_assert_eq!(chunk.data_types(), data_types); + chunk + } + // If there is no delete file, the data file is directly output + None => chunk, + }; + yield chunk; + } } } } @@ -171,6 +281,11 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { Some(split.snapshot_id), split.table_meta.deserialize(), split.files.into_iter().map(|x| x.deserialize()).collect(), + split + .eq_delete_files + .into_iter() + .map(|x| x.deserialize()) + .collect(), source.context.get_config().developer.chunk_size, schema, source.plan_node().get_identity().clone(), diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index a77e9cb929d17..3801508a7aa19 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -76,7 +76,7 @@ jni = { version = "0.21.1", features = ["invocation"] } jsonbb = { workspace = true } jsonwebtoken = "9.2.0" maplit = "1.0.2" -moka = { version = "0.12", features = ["future"] } +moka = { version = "0.12.0", features = ["future"] } mongodb = { version = "2.8.2", features = ["tokio-runtime"] } mysql_async = { version = "0.34", default-features = false, features = [ "default", diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/sink/iceberg/jni_catalog.rs index b80a6a305870f..6529ea733428d 100644 --- a/src/connector/src/sink/iceberg/jni_catalog.rs +++ b/src/connector/src/sink/iceberg/jni_catalog.rs @@ -288,7 +288,7 @@ impl CatalogV2 for JniCatalog { "Failed to crete iceberg table.", ) .with_source(e) - }) + })? } /// Load table from the catalog. @@ -338,7 +338,7 @@ impl CatalogV2 for JniCatalog { "Failed to load iceberg table.", ) .with_source(e) - }) + })? } /// Drop a table from the catalog. diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 977d165171881..9e87694539f0c 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -669,7 +669,7 @@ impl IcebergConfig { .file_io(storage_catalog.file_io().clone()) // Only support readonly table for storage catalog now. .readonly(true) - .build()) + .build()?) } _ => self.load_table_v2().await, } diff --git a/src/connector/src/sink/iceberg/storage_catalog.rs b/src/connector/src/sink/iceberg/storage_catalog.rs index 01adb510882a2..18e2ff0e036ff 100644 --- a/src/connector/src/sink/iceberg/storage_catalog.rs +++ b/src/connector/src/sink/iceberg/storage_catalog.rs @@ -249,11 +249,11 @@ impl Catalog for StorageCatalog { let version_hint_output = self.file_io.new_output(&version_hint_path)?; version_hint_output.write("1".into()).await?; - Ok(Table::builder() + Table::builder() .metadata(table_metadata) .identifier(table_ident) .file_io(self.file_io.clone()) - .build()) + .build() } /// Load table from the catalog. @@ -283,13 +283,13 @@ impl Catalog for StorageCatalog { let metadata_file_content = metadata_file.read().await?; let table_metadata = serde_json::from_slice::(&metadata_file_content)?; - Ok(Table::builder() + Table::builder() .metadata(table_metadata) .identifier(table.clone()) .file_io(self.file_io.clone()) // Only support readonly table for storage catalog now. .readonly(true) - .build()) + .build() } /// Drop a table from the catalog. diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index d65929faafba1..845ffb66804d3 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use futures_async_stream::for_await; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; +use iceberg::table::Table; use itertools::Itertools; pub use parquet_file_reader::*; use risingwave_common::bail; @@ -28,7 +29,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; -use crate::error::ConnectorResult; +use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; use crate::sink::iceberg::IcebergConfig; use crate::source::{ @@ -144,6 +145,7 @@ pub struct IcebergSplit { pub snapshot_id: i64, pub table_meta: TableMetadataJsonStr, pub files: Vec, + pub eq_delete_files: Vec, } impl SplitMetaData for IcebergSplit { @@ -206,6 +208,7 @@ impl IcebergSplitEnumerator { bail!("Batch parallelism is 0. Cannot split the iceberg files."); } let table = self.config.load_table_v2().await?; + let current_snapshot = table.metadata().current_snapshot(); if current_snapshot.is_none() { // If there is no snapshot, we will return a mock `IcebergSplit` with empty files. @@ -214,6 +217,7 @@ impl IcebergSplitEnumerator { snapshot_id: 0, // unused table_meta: TableMetadataJsonStr::serialize(table.metadata()), files: vec![], + eq_delete_files: vec![], }]); } @@ -228,10 +232,13 @@ impl IcebergSplitEnumerator { let snapshot = table .metadata() .snapshots() - .filter(|snapshot| snapshot.timestamp().timestamp_millis() <= timestamp) - .max_by_key(|snapshot| snapshot.timestamp().timestamp_millis()); + .map(|snapshot| snapshot.timestamp().map(|ts| ts.timestamp_millis())) + .collect::, _>>()? + .into_iter() + .filter(|&snapshot_millis| snapshot_millis <= timestamp) + .max_by_key(|&snapshot_millis| snapshot_millis); match snapshot { - Some(snapshot) => snapshot.snapshot_id(), + Some(snapshot) => snapshot, None => { // convert unix time to human readable time let time = chrono::DateTime::from_timestamp_millis(timestamp); @@ -248,12 +255,15 @@ impl IcebergSplitEnumerator { current_snapshot.unwrap().snapshot_id() } }; - let mut files = vec![]; + let require_names = Self::get_require_field_names(&table, snapshot_id, schema).await?; + + let mut data_files = vec![]; + let mut eq_delete_files = vec![]; let scan = table .scan() .snapshot_id(snapshot_id) - .select(schema.names()) + .select(require_names) .build() .map_err(|e| anyhow!(e))?; @@ -261,16 +271,27 @@ impl IcebergSplitEnumerator { #[for_await] for task in file_scan_stream { - let task = task.map_err(|e| anyhow!(e))?; - files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?; + match task.data_file_content { + iceberg::spec::DataContentType::Data => { + data_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + } + iceberg::spec::DataContentType::EqualityDeletes => { + task.project_field_ids = task.equality_ids.clone(); + eq_delete_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + } + iceberg::spec::DataContentType::PositionDeletes => { + bail!("Position delete file is not supported") + } + } } let table_meta = TableMetadataJsonStr::serialize(table.metadata()); let split_num = batch_parallelism; // evenly split the files into splits based on the parallelism. - let split_size = files.len() / split_num; - let remaining = files.len() % split_num; + let split_size = data_files.len() / split_num; + let remaining = data_files.len() % split_num; let mut splits = vec![]; for i in 0..split_num { let start = i * split_size; @@ -279,20 +300,62 @@ impl IcebergSplitEnumerator { split_id: i as i64, snapshot_id, table_meta: table_meta.clone(), - files: files[start..end].to_vec(), + files: data_files[start..end].to_vec(), + eq_delete_files: eq_delete_files.clone(), }; splits.push(split); } for i in 0..remaining { splits[i] .files - .push(files[split_num * split_size + i].clone()); + .push(data_files[split_num * split_size + i].clone()); } Ok(splits .into_iter() .filter(|split| !split.files.is_empty()) .collect_vec()) } + + async fn get_require_field_names( + table: &Table, + snapshot_id: i64, + rw_schema: Schema, + ) -> ConnectorResult> { + let scan = table + .scan() + .snapshot_id(snapshot_id) + .build() + .map_err(|e| anyhow!(e))?; + let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?; + let schema = scan.snapshot().schema(table.metadata())?; + let mut equality_ids = vec![]; + #[for_await] + for task in file_scan_stream { + let task: FileScanTask = task.map_err(|e| anyhow!(e))?; + if task.data_file_content == iceberg::spec::DataContentType::EqualityDeletes { + if equality_ids.is_empty() { + equality_ids = task.equality_ids; + } else if equality_ids != task.equality_ids { + bail!("The schema of iceberg equality delete file must be consistent"); + } + } + } + let delete_columns = equality_ids + .into_iter() + .map(|id| match schema.name_by_field_id(id) { + Some(name) => Ok::(name.to_string()), + None => bail!("Delete field id {} not found in schema", id), + }) + .collect::>>()?; + let mut require_field_names: Vec<_> = rw_schema.names().to_vec(); + // Add the delete columns to the required field names + for names in delete_columns { + if !require_field_names.contains(&names) { + require_field_names.push(names); + } + } + Ok(require_field_names) + } } #[derive(Debug)] diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index e493037c200b7..c0e506889ef77 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -51,7 +51,7 @@ itertools = { workspace = true } jsonbb = { workspace = true } linkme = { version = "0.3", features = ["used_linker"] } md5 = "0.7" -moka = { version = "0.12", features = ["sync"] } +moka = { version = "0.12.0", features = ["sync"] } num-traits = "0.2" openssl = "0.10" regex = "1" diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs index e2bbcb486b926..3c60236f96e66 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs @@ -17,6 +17,7 @@ use std::ops::Deref; use iceberg::table::Table; use jsonbb::{Value, ValueRef}; use risingwave_common::types::{Fields, JsonbVal, Timestamptz}; +use risingwave_connector::error::ConnectorResult; use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::ConnectorProperties; use risingwave_connector::WithPropertiesExt; @@ -62,25 +63,32 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result> let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config(); let table: Table = iceberg_config.load_table_v2().await?; - result.extend(table.metadata().snapshots().map(|snapshot| { - RwIcebergSnapshots { - source_id: source.id as i32, - schema_name: schema_name.clone(), - source_name: source.name.clone(), - sequence_number: snapshot.sequence_number(), - snapshot_id: snapshot.snapshot_id(), - timestamp_ms: Timestamptz::from_millis(snapshot.timestamp().timestamp_millis()), - manifest_list: snapshot.manifest_list().to_string(), - summary: Value::object( - snapshot - .summary() - .other - .iter() - .map(|(k, v)| (k.as_str(), ValueRef::String(v))), - ) - .into(), - } - })); + let snapshots: ConnectorResult> = table + .metadata() + .snapshots() + .map(|snapshot| { + Ok(RwIcebergSnapshots { + source_id: source.id as i32, + schema_name: schema_name.clone(), + source_name: source.name.clone(), + sequence_number: snapshot.sequence_number(), + snapshot_id: snapshot.snapshot_id(), + timestamp_ms: Timestamptz::from_millis( + snapshot.timestamp()?.timestamp_millis(), + ), + manifest_list: snapshot.manifest_list().to_string(), + summary: Value::object( + snapshot + .summary() + .other + .iter() + .map(|(k, v)| (k.as_str(), ValueRef::String(v))), + ) + .into(), + }) + }) + .collect(); + result.extend(snapshots?); } } Ok(result) diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index 49729c6d9e8ac..6a25be3c21738 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -23,7 +23,7 @@ http = "1" hyper = "1" itertools = { workspace = true } lru = { workspace = true } -moka = { version = "0.12", features = ["future"] } +moka = { version = "0.12.0", features = ["future"] } paste = "1" rand = { workspace = true } risingwave_common = { workspace = true } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 6a6bde4b146e0..b321c43b99e43 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -36,7 +36,7 @@ libc = "0.2" lz4 = "1.25.0" memcomparable = "0.2" metrics-prometheus = "0.7" -moka = { version = "0.12", features = ["future", "sync"] } +moka = { version = "0.12.0", features = ["future", "sync"] } more-asserts = "0.3" num-integer = "0.1" parking_lot = { workspace = true }