From ab712168635461cdb67c5bc96c28e2d28f3be59a Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 5 Sep 2024 17:12:11 +0800 Subject: [PATCH 01/24] isave --- Cargo.lock | 9 ++---- Cargo.toml | 6 ++-- src/batch/src/executor/iceberg_scan.rs | 37 +++++++++++++++++++++ src/connector/src/source/iceberg/mod.rs | 43 +++++++++++++++++++++++-- 4 files changed, 83 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1b1ec57fdece..930ab2b9dae31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6004,8 +6004,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "651dfca7c429918e164607a549287cfdd1e7814d2e4cb577d0d6dc57fe19b785" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2648421c18993edec6853ad5ce978733178b26f1#2648421c18993edec6853ad5ce978733178b26f1" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6047,8 +6046,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ef7c992442a80c46975e08f3862140ca3e1c1c772aa68baaf65bb08f97ff07" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2648421c18993edec6853ad5ce978733178b26f1#2648421c18993edec6853ad5ce978733178b26f1" dependencies = [ "anyhow", "async-trait", @@ -6065,8 +6063,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f351c7b964fa6f3b4f976f8de3f16f1bf84eea8478606aaebdfd6a871d6b082c" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2648421c18993edec6853ad5ce978733178b26f1#2648421c18993edec6853ad5ce978733178b26f1" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index a5da9b82b658c..2635de95c66dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,9 +142,9 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" } arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } -iceberg = "0.3.0" -iceberg-catalog-rest = "0.3.0" -iceberg-catalog-glue = "0.3.0" +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2648421c18993edec6853ad5ce978733178b26f1" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2648421c18993edec6853ad5ce978733178b26f1" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2648421c18993edec6853ad5ce978733178b26f1" } opendal = "0.47" arrow-array = "50" arrow-arith = "50" diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index fca7745284fe3..270254ac5c3fe 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -16,6 +16,7 @@ use std::mem; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; +use hashbrown::HashMap; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use itertools::Itertools; @@ -39,6 +40,7 @@ pub struct IcebergScanExecutor { snapshot_id: Option, table_meta: TableMetadata, file_scan_tasks: Vec, + eq_delete_file_scan_tasks: Vec, batch_size: usize, schema: Schema, identity: String, @@ -64,6 +66,7 @@ impl IcebergScanExecutor { snapshot_id: Option, table_meta: TableMetadata, file_scan_tasks: Vec, + eq_delete_file_scan_tasks: Vec, batch_size: usize, schema: Schema, identity: String, @@ -73,6 +76,7 @@ impl IcebergScanExecutor { snapshot_id, table_meta, file_scan_tasks, + eq_delete_file_scan_tasks, batch_size, schema, identity, @@ -87,6 +91,38 @@ impl IcebergScanExecutor { .await?; let data_types = self.schema.data_types(); + let mut eq_delete_file_scan_tasks_map = HashMap::new(); + let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); + + for eq_delete_file_scan_task in eq_delete_file_scan_tasks { + let sequence_number = eq_delete_file_scan_task.sequence_number(); + let reader = table + .reader_builder() + .with_batch_size(self.batch_size) + .build(); + let delete_file_scan_stream = tokio_stream::once(async move{ delete_file_scan_task }); + + let mut delete_record_batch_stream = reader + .read(Box::pin(delete_file_scan_stream)) + .map_err(BatchError::Iceberg)?; + + while let Some(record_batch) = delete_record_batch_stream.next().await { + let record_batch = record_batch.map_err(BatchError::Iceberg)?; + let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + for row in chunk.rows() { + if let Some(ScalarRefImpl::Int32(i)) = row.datum_at(0) { + if let Some(s) = map.get(&i) { + map.insert(i, *s.max(&sequence_number.clone())); + } else { + map.insert(i, sequence_number); + } + } else { + unreachable!(); + } + } + } + } + let file_scan_tasks = mem::take(&mut self.file_scan_tasks); let file_scan_stream = { @@ -171,6 +207,7 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { Some(split.snapshot_id), split.table_meta.deserialize(), split.files.into_iter().map(|x| x.deserialize()).collect(), + split.eq_delete_files.deserialize(), source.context.get_config().developer.chunk_size, schema, source.plan_node().get_identity().clone(), diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index f101ff9ed6d4b..63163288a0402 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -14,17 +14,21 @@ pub mod parquet_file_reader; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; use futures_async_stream::for_await; +use iceberg::arrow::schema_to_arrow_schema; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; +use iceberg::table::Table; use itertools::Itertools; pub use parquet_file_reader::*; +use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::bail; -use risingwave_common::catalog::Schema; +use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; @@ -144,6 +148,7 @@ pub struct IcebergSplit { pub snapshot_id: i64, pub table_meta: TableMetadataJsonStr, pub files: Vec, + pub eq_delete_files: Vec, } impl SplitMetaData for IcebergSplit { @@ -237,12 +242,17 @@ impl IcebergSplitEnumerator { None => bail!("Cannot find the current snapshot id in the iceberg table."), }, }; + let (eq_delete_files,eq_delete_file_schema) = IcebergSplitEnumerator::load_eq_delete_file(&table,snapshot_id).await?; + let arrow_schema = schema_to_arrow_schema(&eq_delete_file_schema)?; + let mut require_names:HashSet = schema.names().clone().into_iter().collect(); + require_names.extend(arrow_schema.all_fields().into_iter().map(|filed| filed.name().clone())); + let mut files = vec![]; let scan = table .scan() .snapshot_id(snapshot_id) - .select(schema.names()) + .select(require_names) .build() .map_err(|e| anyhow!(e))?; @@ -269,6 +279,7 @@ impl IcebergSplitEnumerator { snapshot_id, table_meta: table_meta.clone(), files: files[start..end].to_vec(), + eq_delete_files: eq_delete_files.clone(), }; splits.push(split); } @@ -282,6 +293,32 @@ impl IcebergSplitEnumerator { .filter(|split| !split.files.is_empty()) .collect_vec()) } + + async fn load_eq_delete_file(table: &Table, snapshot_id: i64) -> ConnectorResult<(Vec,Arc)>{ + let mut files = vec![]; + + let scan = table + .scan() + .snapshot_id(snapshot_id) + .build() + .map_err(|e| anyhow!(e))?; + let schema = scan.snapshot().schema(table.metadata())?; + // let arrow_schema = schema_to_arrow_schema(&schema)?; + // let rw_fileds = arrow_schema.fields().iter().map(|field|{ + // Ok(Field::with_name(IcebergArrowConvert.type_from_field(field)?, field.name().to_string())) + // }).collect::>>()?; + // let schema = Schema::new(rw_fileds); + + let file_scan_stream = scan.plan_eq_delete_files().await.map_err(|e| anyhow!(e))?; + + #[for_await] + for task in file_scan_stream { + let task = task.map_err(|e| anyhow!(e))?; + files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + } + Ok((files,schema)) + } + } #[derive(Debug)] From 73d22c1b9d351d26a2c2d9ab8729d76e93bda609 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 6 Sep 2024 14:37:24 +0800 Subject: [PATCH 02/24] save --- risedev.yml | 4 +- src/batch/src/executor/iceberg_scan.rs | 139 +++++++++++++++++------- src/common/src/array/mod.rs | 4 + src/connector/src/source/iceberg/mod.rs | 28 ++--- 4 files changed, 122 insertions(+), 53 deletions(-) diff --git a/risedev.yml b/risedev.yml index 22c4569adb610..a5eca729d2617 100644 --- a/risedev.yml +++ b/risedev.yml @@ -115,8 +115,8 @@ profile: - use: compactor - use: prometheus - use: grafana - - use: kafka - persist-data: true + # - use: kafka + # persist-data: true standalone-full-peripherals: steps: diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 270254ac5c3fe..296c6dad20634 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -12,15 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::mem; +use std::ops::BitAnd; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; -use hashbrown::HashMap; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; use risingwave_connector::sink::iceberg::IcebergConfig; @@ -90,34 +92,45 @@ impl IcebergScanExecutor { .load_table_v2_with_metadata(self.table_meta) .await?; let data_types = self.schema.data_types(); + let chunk_schema_names = self.schema.names(); - let mut eq_delete_file_scan_tasks_map = HashMap::new(); + let mut eq_delete_file_scan_tasks_map: HashMap< + String, + HashMap, i64>, + > = HashMap::default(); let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); for eq_delete_file_scan_task in eq_delete_file_scan_tasks { - let sequence_number = eq_delete_file_scan_task.sequence_number(); + let mut sequence_number = eq_delete_file_scan_task.sequence_number(); let reader = table .reader_builder() .with_batch_size(self.batch_size) .build(); - let delete_file_scan_stream = tokio_stream::once(async move{ delete_file_scan_task }); + let delete_file_scan_stream = tokio_stream::once(Ok(eq_delete_file_scan_task)); let mut delete_record_batch_stream = reader .read(Box::pin(delete_file_scan_stream)) .map_err(BatchError::Iceberg)?; - + while let Some(record_batch) = delete_record_batch_stream.next().await { let record_batch = record_batch.map_err(BatchError::Iceberg)?; + let delete_column_names = record_batch + .schema() + .fields() + .iter() + .map(|field| field.name()) + .cloned() + .collect_vec(); let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - for row in chunk.rows() { - if let Some(ScalarRefImpl::Int32(i)) = row.datum_at(0) { - if let Some(s) = map.get(&i) { - map.insert(i, *s.max(&sequence_number.clone())); - } else { - map.insert(i, sequence_number); - } - } else { - unreachable!(); + for (array, columns_name) in chunk.columns().iter().zip_eq(delete_column_names) { + let each_column_seq_num_map = eq_delete_file_scan_tasks_map + .entry(columns_name) + .or_insert(HashMap::new()); + for datum in array.get_all_values() { + let entry = each_column_seq_num_map + .entry(datum) + .or_insert(sequence_number); + *entry = *entry.max(&mut sequence_number); } } } @@ -125,30 +138,76 @@ impl IcebergScanExecutor { let file_scan_tasks = mem::take(&mut self.file_scan_tasks); - let file_scan_stream = { - #[try_stream] - async move { - for file_scan_task in file_scan_tasks { - yield file_scan_task; - } + for file_scan_task in file_scan_tasks { + let sequence_number = file_scan_task.sequence_number(); + let reader = table + .reader_builder() + .with_batch_size(self.batch_size) + .build(); + let file_scan_stream = tokio_stream::once(Ok(file_scan_task)); + + let mut record_batch_stream = reader + .read(Box::pin(file_scan_stream)) + .map_err(BatchError::Iceberg)?; + + while let Some(record_batch) = record_batch_stream.next().await { + let record_batch = record_batch.map_err(BatchError::Iceberg)?; + let column_names = record_batch + .schema() + .fields() + .iter() + .map(|field| field.name()) + .cloned() + .collect_vec(); + let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + let visibilitys: Vec<_> = chunk + .columns() + .iter() + .zip_eq(column_names.clone()) + .filter_map(|(array, column_map)| { + if let Some(each_column_seq_num_map) = + eq_delete_file_scan_tasks_map.get(&column_map) + { + let visibility = + Bitmap::from_iter(array.get_all_values().iter().map(|datum| { + if let Some(s) = each_column_seq_num_map.get(datum) + && s > &sequence_number + { + false + } else { + true + } + })); + Some(visibility) + } else { + None + } + }) + .collect(); + let (data, va) = chunk.into_parts_v2(); + let visibility = if visibilitys.is_empty() { + va + } else { + visibilitys + .iter() + .skip(1) + .fold(visibilitys[0].clone(), |acc, bitmap| acc.bitand(bitmap)) + }; + let data = data + .into_iter() + .zip_eq(column_names) + .filter_map(|(array, columns)| { + if chunk_schema_names.contains(&columns) { + Some(array.clone()) + } else { + None + } + }) + .collect_vec(); + let chunk = DataChunk::new(data, visibility); + debug_assert_eq!(chunk.data_types(), data_types); + yield chunk; } - }; - - let reader = table - .reader_builder() - .with_batch_size(self.batch_size) - .build(); - - let record_batch_stream = reader - .read(Box::pin(file_scan_stream)) - .map_err(BatchError::Iceberg)?; - - #[for_await] - for record_batch in record_batch_stream { - let record_batch = record_batch.map_err(BatchError::Iceberg)?; - let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - debug_assert_eq!(chunk.data_types(), data_types); - yield chunk; } } } @@ -207,7 +266,11 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { Some(split.snapshot_id), split.table_meta.deserialize(), split.files.into_iter().map(|x| x.deserialize()).collect(), - split.eq_delete_files.deserialize(), + split + .eq_delete_files + .into_iter() + .map(|x| x.deserialize()) + .collect(), source.context.get_config().developer.chunk_size, schema, source.plan_node().get_identity().clone(), diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index b34e5f9b9c470..54840198ccca0 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -603,6 +603,10 @@ impl ArrayImpl { }) } + pub fn get_all_values(&self) -> Vec { + (0..self.len()).map(|i| self.datum_at(i)).collect() + } + /// # Safety /// /// This function is unsafe because it does not check the validity of `idx`. It is caller's diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 63163288a0402..a0939f92c9670 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -26,9 +26,8 @@ use iceberg::spec::TableMetadata; use iceberg::table::Table; use itertools::Itertools; pub use parquet_file_reader::*; -use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::bail; -use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::catalog::Schema; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; @@ -242,10 +241,16 @@ impl IcebergSplitEnumerator { None => bail!("Cannot find the current snapshot id in the iceberg table."), }, }; - let (eq_delete_files,eq_delete_file_schema) = IcebergSplitEnumerator::load_eq_delete_file(&table,snapshot_id).await?; + let (eq_delete_files, eq_delete_file_schema) = + IcebergSplitEnumerator::load_eq_delete_file(&table, snapshot_id).await?; let arrow_schema = schema_to_arrow_schema(&eq_delete_file_schema)?; - let mut require_names:HashSet = schema.names().clone().into_iter().collect(); - require_names.extend(arrow_schema.all_fields().into_iter().map(|filed| filed.name().clone())); + let mut require_names: HashSet = schema.names().clone().into_iter().collect(); + require_names.extend( + arrow_schema + .all_fields() + .into_iter() + .map(|filed| filed.name().clone()), + ); let mut files = vec![]; @@ -294,7 +299,10 @@ impl IcebergSplitEnumerator { .collect_vec()) } - async fn load_eq_delete_file(table: &Table, snapshot_id: i64) -> ConnectorResult<(Vec,Arc)>{ + async fn load_eq_delete_file( + table: &Table, + snapshot_id: i64, + ) -> ConnectorResult<(Vec, Arc)> { let mut files = vec![]; let scan = table @@ -303,11 +311,6 @@ impl IcebergSplitEnumerator { .build() .map_err(|e| anyhow!(e))?; let schema = scan.snapshot().schema(table.metadata())?; - // let arrow_schema = schema_to_arrow_schema(&schema)?; - // let rw_fileds = arrow_schema.fields().iter().map(|field|{ - // Ok(Field::with_name(IcebergArrowConvert.type_from_field(field)?, field.name().to_string())) - // }).collect::>>()?; - // let schema = Schema::new(rw_fileds); let file_scan_stream = scan.plan_eq_delete_files().await.map_err(|e| anyhow!(e))?; @@ -316,9 +319,8 @@ impl IcebergSplitEnumerator { let task = task.map_err(|e| anyhow!(e))?; files.push(IcebergFileScanTaskJsonStr::serialize(&task)); } - Ok((files,schema)) + Ok((files, schema)) } - } #[derive(Debug)] From 46dc286b4fae25e449e92216de67743bd8ad56b1 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 6 Sep 2024 19:00:21 +0800 Subject: [PATCH 03/24] support --- risedev.yml | 4 ++-- src/batch/src/executor/iceberg_scan.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/risedev.yml b/risedev.yml index a5eca729d2617..22c4569adb610 100644 --- a/risedev.yml +++ b/risedev.yml @@ -115,8 +115,8 @@ profile: - use: compactor - use: prometheus - use: grafana - # - use: kafka - # persist-data: true + - use: kafka + persist-data: true standalone-full-peripherals: steps: diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 296c6dad20634..41aae86030a92 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -125,7 +125,7 @@ impl IcebergScanExecutor { for (array, columns_name) in chunk.columns().iter().zip_eq(delete_column_names) { let each_column_seq_num_map = eq_delete_file_scan_tasks_map .entry(columns_name) - .or_insert(HashMap::new()); + .or_default(); for datum in array.get_all_values() { let entry = each_column_seq_num_map .entry(datum) @@ -194,7 +194,7 @@ impl IcebergScanExecutor { .fold(visibilitys[0].clone(), |acc, bitmap| acc.bitand(bitmap)) }; let data = data - .into_iter() + .iter() .zip_eq(column_names) .filter_map(|(array, columns)| { if chunk_schema_names.contains(&columns) { From 8e2397da1ed53ee91045506f87abb15cf0f82d49 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 6 Sep 2024 19:04:42 +0800 Subject: [PATCH 04/24] fix risedev --- risedev.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/risedev.yml b/risedev.yml index 22c4569adb610..a5eca729d2617 100644 --- a/risedev.yml +++ b/risedev.yml @@ -115,8 +115,8 @@ profile: - use: compactor - use: prometheus - use: grafana - - use: kafka - persist-data: true + # - use: kafka + # persist-data: true standalone-full-peripherals: steps: From 718a6f9e660ced97ed766f5bfac2908c05230a46 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 6 Sep 2024 19:06:56 +0800 Subject: [PATCH 05/24] fix risedev --- risedev.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/risedev.yml b/risedev.yml index a5eca729d2617..22c4569adb610 100644 --- a/risedev.yml +++ b/risedev.yml @@ -115,8 +115,8 @@ profile: - use: compactor - use: prometheus - use: grafana - # - use: kafka - # persist-data: true + - use: kafka + persist-data: true standalone-full-peripherals: steps: From edeab1ea45b64f86ad22985ff4655ed3c12f2ec3 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 9 Sep 2024 12:15:06 +0800 Subject: [PATCH 06/24] fix mutil pk --- src/batch/src/executor/iceberg_scan.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 41aae86030a92..81d18b3daa11c 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::ops::BitOr; use std::collections::HashMap; use std::mem; -use std::ops::BitAnd; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; @@ -191,7 +191,7 @@ impl IcebergScanExecutor { visibilitys .iter() .skip(1) - .fold(visibilitys[0].clone(), |acc, bitmap| acc.bitand(bitmap)) + .fold(visibilitys[0].clone(), |acc, bitmap| acc.bitor(bitmap)) }; let data = data .iter() From 606c26fddfeb4bc595f5c90c4f4d8c64988b6ccd Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 9 Sep 2024 15:41:34 +0800 Subject: [PATCH 07/24] add eq delete schema check --- src/batch/src/executor/iceberg_scan.rs | 11 +++++++++++ src/connector/src/source/iceberg/mod.rs | 1 + 2 files changed, 12 insertions(+) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 81d18b3daa11c..1d91d9f9078cd 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -100,6 +100,7 @@ impl IcebergScanExecutor { > = HashMap::default(); let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); + let mut delete_column_names_for_check = vec![]; for eq_delete_file_scan_task in eq_delete_file_scan_tasks { let mut sequence_number = eq_delete_file_scan_task.sequence_number(); let reader = table @@ -121,6 +122,16 @@ impl IcebergScanExecutor { .map(|field| field.name()) .cloned() .collect_vec(); + + // Todo! xxhZs move check to iceberg source. + if delete_column_names_for_check.is_empty() { + delete_column_names_for_check = delete_column_names.clone(); + } else if delete_column_names_for_check != delete_column_names { + return Err(BatchError::Internal(anyhow::anyhow!( + "The schema of iceberg equality delete file must be consistent" + ))); + } + let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; for (array, columns_name) in chunk.columns().iter().zip_eq(delete_column_names) { let each_column_seq_num_map = eq_delete_file_scan_tasks_map diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index a0939f92c9670..25f57401f76b9 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -310,6 +310,7 @@ impl IcebergSplitEnumerator { .snapshot_id(snapshot_id) .build() .map_err(|e| anyhow!(e))?; + // Todo! xxhZs Currently, this schema is the full schema, this is because iceberg_rust's task does not have eq_delete_id. let schema = scan.snapshot().schema(table.metadata())?; let file_scan_stream = scan.plan_eq_delete_files().await.map_err(|e| anyhow!(e))?; From 9caa2fd6aa01dbc6b7ba19ef67a9f5c0cbc7c10c Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 10 Sep 2024 14:13:42 +0800 Subject: [PATCH 08/24] refa --- Cargo.lock | 176 +++++++----------- Cargo.toml | 7 +- risedev.yml | 4 +- src/batch/src/executor/iceberg_scan.rs | 36 ++-- src/connector/Cargo.toml | 2 +- src/connector/src/sink/iceberg/jni_catalog.rs | 4 +- src/connector/src/sink/iceberg/mod.rs | 2 +- .../src/sink/iceberg/storage_catalog.rs | 8 +- src/connector/src/source/iceberg/mod.rs | 80 ++++---- src/expr/impl/Cargo.toml | 2 +- .../rw_catalog/rw_iceberg_snapshots.rs | 46 +++-- src/rpc_client/Cargo.toml | 2 +- src/storage/Cargo.toml | 2 +- 13 files changed, 173 insertions(+), 198 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 930ab2b9dae31..0d704aeb15c4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -991,7 +991,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "136d4d23bcc79e27423727b36823d86233aad06dfea531837b038394d11e9928" dependencies = [ "concurrent-queue", - "event-listener 5.2.0", + "event-listener 5.3.1", "event-listener-strategy", "futures-core", "pin-project-lite", @@ -1021,7 +1021,7 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" dependencies = [ - "async-lock", + "async-lock 2.8.0", "async-task", "concurrent-queue", "fastrand 1.9.0", @@ -1038,7 +1038,7 @@ dependencies = [ "async-channel 1.9.0", "async-executor", "async-io", - "async-lock", + "async-lock 2.8.0", "blocking", "futures-lite", "once_cell", @@ -1051,7 +1051,7 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" dependencies = [ - "async-lock", + "async-lock 2.8.0", "autocfg", "cfg-if", "concurrent-queue", @@ -1074,6 +1074,17 @@ dependencies = [ "event-listener 2.5.3", ] +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener 5.3.1", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-nats" version = "0.35.0" @@ -1128,7 +1139,7 @@ dependencies = [ "async-channel 1.9.0", "async-global-executor", "async-io", - "async-lock", + "async-lock 2.8.0", "crossbeam-utils", "futures-channel", "futures-core", @@ -2203,7 +2214,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" dependencies = [ "async-channel 1.9.0", - "async-lock", + "async-lock 2.8.0", "async-task", "atomic-waker", "fastrand 1.9.0", @@ -2343,12 +2354,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "bytecount" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" - [[package]] name = "bytemuck" version = "1.14.0" @@ -2416,15 +2421,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "981520c98f422fcc584dc1a95c334e6953900b9106bc47a9839b81790009eb21" -[[package]] -name = "camino" -version = "1.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" -dependencies = [ - "serde", -] - [[package]] name = "cap-fs-ext" version = "3.0.0" @@ -2496,28 +2492,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1582e1c9e755dd6ad6b224dcffb135d199399a4568d454bd89fe515ca8425695" -[[package]] -name = "cargo-platform" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cfa25e60aea747ec7e1124f238816749faa93759c6ff5b31f1ccdda137f4479" -dependencies = [ - "serde", -] - -[[package]] -name = "cargo_metadata" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" -dependencies = [ - "camino", - "cargo-platform", - "semver 1.0.18", - "serde", - "serde_json", -] - [[package]] name = "cast" version = "0.3.0" @@ -2852,9 +2826,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.2.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ "crossbeam-utils", ] @@ -4506,15 +4480,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "error-chain" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" -dependencies = [ - "version_check", -] - [[package]] name = "escape8259" version = "0.5.2" @@ -4568,9 +4533,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "event-listener" -version = "5.2.0" +version = "5.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b5fb89194fa3cad959b833185b3063ba881dbfc7030680b314250779fb4cc91" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" dependencies = [ "concurrent-queue", "parking", @@ -4583,7 +4548,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "332f51cb23d20b0de8458b86580878211da09bcd4503cb579c225b3d124cabb3" dependencies = [ - "event-listener 5.2.0", + "event-listener 5.3.1", "pin-project-lite", ] @@ -5353,7 +5318,7 @@ dependencies = [ "http 0.2.9", "thiserror", "tokio", - "tonic 0.10.2", + "tonic 0.11.0", "tower", "tracing", "trust-dns-resolver 0.23.2", @@ -6004,7 +5969,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2648421c18993edec6853ad5ce978733178b26f1#2648421c18993edec6853ad5ce978733178b26f1" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=682b38037dafa78ebbc6e0479844e980c5ed68a4#682b38037dafa78ebbc6e0479844e980c5ed68a4" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6024,11 +5989,13 @@ dependencies = [ "fnv", "futures", "itertools 0.13.0", + "moka", "murmur3", "once_cell", "opendal 0.49.0", "ordered-float 4.1.1", "parquet 52.0.0", + "paste", "reqwest 0.12.4", "rust_decimal", "serde", @@ -6038,7 +6005,7 @@ dependencies = [ "serde_repr", "serde_with 3.8.0", "tokio", - "typed-builder 0.19.1", + "typed-builder 0.20.0", "url", "uuid", ] @@ -6046,7 +6013,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2648421c18993edec6853ad5ce978733178b26f1#2648421c18993edec6853ad5ce978733178b26f1" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=682b38037dafa78ebbc6e0479844e980c5ed68a4#682b38037dafa78ebbc6e0479844e980c5ed68a4" dependencies = [ "anyhow", "async-trait", @@ -6056,14 +6023,14 @@ dependencies = [ "log", "serde_json", "tokio", - "typed-builder 0.19.1", + "typed-builder 0.20.0", "uuid", ] [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2648421c18993edec6853ad5ce978733178b26f1#2648421c18993edec6853ad5ce978733178b26f1" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=682b38037dafa78ebbc6e0479844e980c5ed68a4#682b38037dafa78ebbc6e0479844e980c5ed68a4" dependencies = [ "async-trait", "chrono", @@ -6076,7 +6043,7 @@ dependencies = [ "serde_derive", "serde_json", "tokio", - "typed-builder 0.19.1", + "typed-builder 0.20.0", "uuid", ] @@ -7280,21 +7247,21 @@ dependencies = [ [[package]] name = "moka" -version = "0.12.0" +version = "0.12.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dc65d4615c08c8a13d91fd404b5a2a4485ba35b4091e3315cf8798d280c2f29" +checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" dependencies = [ - "async-lock", + "async-lock 3.4.0", "async-trait", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", + "event-listener 5.3.1", "futures-util", "once_cell", "parking_lot 0.12.1", "quanta", "rustc_version 0.4.0", - "skeptic", "smallvec", "tagptr", "thiserror", @@ -9124,7 +9091,7 @@ checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.10.5", + "itertools 0.11.0", "log", "multimap 0.8.3", "once_cell", @@ -9179,7 +9146,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.66", @@ -9303,17 +9270,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "pulldown-cmark" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" -dependencies = [ - "bitflags 1.3.2", - "memchr", - "unicase", -] - [[package]] name = "pulsar" version = "6.3.0" @@ -9362,7 +9318,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -9423,12 +9379,12 @@ checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" [[package]] name = "quanta" -version = "0.11.0" -source = "git+https://github.com/madsim-rs/quanta.git?rev=948bdc3#948bdc3d4cd3fcfe3d52d03dd83deee96d97d770" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" dependencies = [ "crossbeam-utils", "libc", - "mach2", "once_cell", "raw-cpuid", "wasi", @@ -9547,11 +9503,11 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "10.7.0" +version = "11.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.6.0", ] [[package]] @@ -12663,9 +12619,6 @@ name = "semver" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" -dependencies = [ - "serde", -] [[package]] name = "semver-parser" @@ -13114,21 +13067,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fed904c7fb2856d868b92464fc8fa597fce366edea1a9cbfaa8cb5fe080bd6d" -[[package]] -name = "skeptic" -version = "0.13.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" -dependencies = [ - "bytecount", - "cargo_metadata", - "error-chain", - "glob", - "pulldown-cmark", - "tempfile", - "walkdir", -] - [[package]] name = "slab" version = "0.4.9" @@ -14365,10 +14303,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.12.1", - "rustls 0.21.11", - "rustls-pemfile 1.0.4", "tokio", - "tokio-rustls 0.24.1", "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "tower", "tower-layer", @@ -14395,7 +14330,10 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.12.1", + "rustls-pemfile 2.1.1", + "rustls-pki-types", "tokio", + "tokio-rustls 0.25.0", "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "tower", "tower-layer", @@ -14825,6 +14763,15 @@ dependencies = [ "typed-builder-macro 0.19.1", ] +[[package]] +name = "typed-builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e14ed59dc8b7b26cacb2a92bad2e8b1f098806063898ab42a3bd121d7d45e75" +dependencies = [ + "typed-builder-macro 0.20.0", +] + [[package]] name = "typed-builder-macro" version = "0.16.2" @@ -14858,6 +14805,17 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "typed-builder-macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "560b82d656506509d43abe30e0ba64c56b1953ab3d4fe7ba5902747a7a3cedd5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "typenum" version = "1.16.0" diff --git a/Cargo.toml b/Cargo.toml index 2635de95c66dc..0cee622062139 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,9 +142,9 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" } arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2648421c18993edec6853ad5ce978733178b26f1" } -iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2648421c18993edec6853ad5ce978733178b26f1" } -iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2648421c18993edec6853ad5ce978733178b26f1" } +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "682b38037dafa78ebbc6e0479844e980c5ed68a4" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "682b38037dafa78ebbc6e0479844e980c5ed68a4" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "682b38037dafa78ebbc6e0479844e980c5ed68a4" } opendal = "0.47" arrow-array = "50" arrow-arith = "50" @@ -342,7 +342,6 @@ opt-level = 2 [patch.crates-io] # Patch third-party crates for deterministic simulation. -quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" } # Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies. # Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`. diff --git a/risedev.yml b/risedev.yml index 22c4569adb610..a5eca729d2617 100644 --- a/risedev.yml +++ b/risedev.yml @@ -115,8 +115,8 @@ profile: - use: compactor - use: prometheus - use: grafana - - use: kafka - persist-data: true + # - use: kafka + # persist-data: true standalone-full-peripherals: steps: diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 1d91d9f9078cd..0999e905188fb 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -92,7 +92,13 @@ impl IcebergScanExecutor { .load_table_v2_with_metadata(self.table_meta) .await?; let data_types = self.schema.data_types(); - let chunk_schema_names = self.schema.names(); + let chunk_schema_name_to_id = self + .schema + .names() + .iter() + .enumerate() + .map(|(k, v)| (v.clone(), k)) + .collect::>(); let mut eq_delete_file_scan_tasks_map: HashMap< String, @@ -100,9 +106,10 @@ impl IcebergScanExecutor { > = HashMap::default(); let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); - let mut delete_column_names_for_check = vec![]; - for eq_delete_file_scan_task in eq_delete_file_scan_tasks { - let mut sequence_number = eq_delete_file_scan_task.sequence_number(); + for mut eq_delete_file_scan_task in eq_delete_file_scan_tasks { + eq_delete_file_scan_task.project_field_ids = + eq_delete_file_scan_task.equality_ids.clone(); + let mut sequence_number = eq_delete_file_scan_task.sequence_number; let reader = table .reader_builder() .with_batch_size(self.batch_size) @@ -123,15 +130,6 @@ impl IcebergScanExecutor { .cloned() .collect_vec(); - // Todo! xxhZs move check to iceberg source. - if delete_column_names_for_check.is_empty() { - delete_column_names_for_check = delete_column_names.clone(); - } else if delete_column_names_for_check != delete_column_names { - return Err(BatchError::Internal(anyhow::anyhow!( - "The schema of iceberg equality delete file must be consistent" - ))); - } - let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; for (array, columns_name) in chunk.columns().iter().zip_eq(delete_column_names) { let each_column_seq_num_map = eq_delete_file_scan_tasks_map @@ -150,7 +148,7 @@ impl IcebergScanExecutor { let file_scan_tasks = mem::take(&mut self.file_scan_tasks); for file_scan_task in file_scan_tasks { - let sequence_number = file_scan_task.sequence_number(); + let sequence_number = file_scan_task.sequence_number; let reader = table .reader_builder() .with_batch_size(self.batch_size) @@ -208,12 +206,12 @@ impl IcebergScanExecutor { .iter() .zip_eq(column_names) .filter_map(|(array, columns)| { - if chunk_schema_names.contains(&columns) { - Some(array.clone()) - } else { - None - } + chunk_schema_name_to_id + .get(&columns) + .map(|&id| (id, array.clone())) }) + .sorted_by_key(|a| a.0) + .map(|(k, v)| v) .collect_vec(); let chunk = DataChunk::new(data, visibility); debug_assert_eq!(chunk.data_types(), data_types); diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index d87e89c1cf65d..ede939584fe9f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -76,7 +76,7 @@ jni = { version = "0.21.1", features = ["invocation"] } jsonbb = { workspace = true } jsonwebtoken = "9.2.0" maplit = "1.0.2" -moka = { version = "0.12", features = ["future"] } +moka = { version = "0.12.8", features = ["future"] } mongodb = { version = "2.8.2", features = ["tokio-runtime"] } mysql_async = { version = "0.34", default-features = false, features = [ "default", diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/sink/iceberg/jni_catalog.rs index b80a6a305870f..6529ea733428d 100644 --- a/src/connector/src/sink/iceberg/jni_catalog.rs +++ b/src/connector/src/sink/iceberg/jni_catalog.rs @@ -288,7 +288,7 @@ impl CatalogV2 for JniCatalog { "Failed to crete iceberg table.", ) .with_source(e) - }) + })? } /// Load table from the catalog. @@ -338,7 +338,7 @@ impl CatalogV2 for JniCatalog { "Failed to load iceberg table.", ) .with_source(e) - }) + })? } /// Drop a table from the catalog. diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index b68e74b1f5d95..da615fa9bef3b 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -672,7 +672,7 @@ impl IcebergConfig { .file_io(storage_catalog.file_io().clone()) // Only support readonly table for storage catalog now. .readonly(true) - .build()) + .build()?) } _ => self.load_table_v2().await, } diff --git a/src/connector/src/sink/iceberg/storage_catalog.rs b/src/connector/src/sink/iceberg/storage_catalog.rs index 01adb510882a2..18e2ff0e036ff 100644 --- a/src/connector/src/sink/iceberg/storage_catalog.rs +++ b/src/connector/src/sink/iceberg/storage_catalog.rs @@ -249,11 +249,11 @@ impl Catalog for StorageCatalog { let version_hint_output = self.file_io.new_output(&version_hint_path)?; version_hint_output.write("1".into()).await?; - Ok(Table::builder() + Table::builder() .metadata(table_metadata) .identifier(table_ident) .file_io(self.file_io.clone()) - .build()) + .build() } /// Load table from the catalog. @@ -283,13 +283,13 @@ impl Catalog for StorageCatalog { let metadata_file_content = metadata_file.read().await?; let table_metadata = serde_json::from_slice::(&metadata_file_content)?; - Ok(Table::builder() + Table::builder() .metadata(table_metadata) .identifier(table.clone()) .file_io(self.file_io.clone()) // Only support readonly table for storage catalog now. .readonly(true) - .build()) + .build() } /// Drop a table from the catalog. diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 25f57401f76b9..8d2f1e97f6f42 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -15,12 +15,10 @@ pub mod parquet_file_reader; use std::collections::{HashMap, HashSet}; -use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; use futures_async_stream::for_await; -use iceberg::arrow::schema_to_arrow_schema; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use iceberg::table::Table; @@ -221,10 +219,13 @@ impl IcebergSplitEnumerator { let snapshot = table .metadata() .snapshots() - .filter(|snapshot| snapshot.timestamp().timestamp_millis() <= timestamp) - .max_by_key(|snapshot| snapshot.timestamp().timestamp_millis()); + .map(|snapshot| snapshot.timestamp().map(|ts| ts.timestamp_millis())) + .collect::, _>>()? + .into_iter() + .filter(|&snapshot_millis| snapshot_millis <= timestamp) + .max_by_key(|&snapshot_millis| snapshot_millis); match snapshot { - Some(snapshot) => snapshot.snapshot_id(), + Some(snapshot) => snapshot, None => { // convert unix time to human readable time let time = chrono::DateTime::from_timestamp_millis(timestamp); @@ -241,18 +242,11 @@ impl IcebergSplitEnumerator { None => bail!("Cannot find the current snapshot id in the iceberg table."), }, }; - let (eq_delete_files, eq_delete_file_schema) = - IcebergSplitEnumerator::load_eq_delete_file(&table, snapshot_id).await?; - let arrow_schema = schema_to_arrow_schema(&eq_delete_file_schema)?; - let mut require_names: HashSet = schema.names().clone().into_iter().collect(); - require_names.extend( - arrow_schema - .all_fields() - .into_iter() - .map(|filed| filed.name().clone()), - ); - - let mut files = vec![]; + let mut require_names = Self::get_eq_delete_names(&table, snapshot_id).await?; + require_names.extend(schema.names().clone().into_iter()); + + let mut data_files = vec![]; + let mut eq_delete_files = vec![]; let scan = table .scan() @@ -265,16 +259,26 @@ impl IcebergSplitEnumerator { #[for_await] for task in file_scan_stream { - let task = task.map_err(|e| anyhow!(e))?; - files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + let task: FileScanTask = task.map_err(|e| anyhow!(e))?; + match task.data_file_content { + iceberg::spec::DataContentType::Data => { + data_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + } + iceberg::spec::DataContentType::EqualityDeletes => { + eq_delete_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + } + iceberg::spec::DataContentType::PositionDeletes => { + bail!("Position delete file is not supported") + } + } } let table_meta = TableMetadataJsonStr::serialize(table.metadata()); let split_num = batch_parallelism; // evenly split the files into splits based on the parallelism. - let split_size = files.len() / split_num; - let remaining = files.len() % split_num; + let split_size = data_files.len() / split_num; + let remaining = data_files.len() % split_num; let mut splits = vec![]; for i in 0..split_num { let start = i * split_size; @@ -283,7 +287,7 @@ impl IcebergSplitEnumerator { split_id: i as i64, snapshot_id, table_meta: table_meta.clone(), - files: files[start..end].to_vec(), + files: data_files[start..end].to_vec(), eq_delete_files: eq_delete_files.clone(), }; splits.push(split); @@ -291,7 +295,7 @@ impl IcebergSplitEnumerator { for i in 0..remaining { splits[i] .files - .push(files[split_num * split_size + i].clone()); + .push(data_files[split_num * split_size + i].clone()); } Ok(splits .into_iter() @@ -299,28 +303,36 @@ impl IcebergSplitEnumerator { .collect_vec()) } - async fn load_eq_delete_file( + async fn get_eq_delete_names( table: &Table, snapshot_id: i64, - ) -> ConnectorResult<(Vec, Arc)> { - let mut files = vec![]; - + ) -> ConnectorResult> { let scan = table .scan() .snapshot_id(snapshot_id) .build() .map_err(|e| anyhow!(e))?; - // Todo! xxhZs Currently, this schema is the full schema, this is because iceberg_rust's task does not have eq_delete_id. + let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?; let schema = scan.snapshot().schema(table.metadata())?; - - let file_scan_stream = scan.plan_eq_delete_files().await.map_err(|e| anyhow!(e))?; - + let mut equality_ids = vec![]; #[for_await] for task in file_scan_stream { - let task = task.map_err(|e| anyhow!(e))?; - files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + let task: FileScanTask = task.map_err(|e| anyhow!(e))?; + if task.data_file_content == iceberg::spec::DataContentType::EqualityDeletes { + if equality_ids.is_empty() { + equality_ids = task.equality_ids; + } else if equality_ids != task.equality_ids { + bail!("The schema of iceberg equality delete file must be consistent"); + } + } } - Ok((files, schema)) + equality_ids + .into_iter() + .map(|id| match schema.name_by_field_id(id) { + Some(name) => Ok(name.to_string()), + None => bail!("Delete field id {} not found in schema", id), + }) + .collect() } } diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index e493037c200b7..ac18122044a38 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -51,7 +51,7 @@ itertools = { workspace = true } jsonbb = { workspace = true } linkme = { version = "0.3", features = ["used_linker"] } md5 = "0.7" -moka = { version = "0.12", features = ["sync"] } +moka = { version = "0.12.8", features = ["sync"] } num-traits = "0.2" openssl = "0.10" regex = "1" diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs index e2bbcb486b926..3c60236f96e66 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs @@ -17,6 +17,7 @@ use std::ops::Deref; use iceberg::table::Table; use jsonbb::{Value, ValueRef}; use risingwave_common::types::{Fields, JsonbVal, Timestamptz}; +use risingwave_connector::error::ConnectorResult; use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::ConnectorProperties; use risingwave_connector::WithPropertiesExt; @@ -62,25 +63,32 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result> let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config(); let table: Table = iceberg_config.load_table_v2().await?; - result.extend(table.metadata().snapshots().map(|snapshot| { - RwIcebergSnapshots { - source_id: source.id as i32, - schema_name: schema_name.clone(), - source_name: source.name.clone(), - sequence_number: snapshot.sequence_number(), - snapshot_id: snapshot.snapshot_id(), - timestamp_ms: Timestamptz::from_millis(snapshot.timestamp().timestamp_millis()), - manifest_list: snapshot.manifest_list().to_string(), - summary: Value::object( - snapshot - .summary() - .other - .iter() - .map(|(k, v)| (k.as_str(), ValueRef::String(v))), - ) - .into(), - } - })); + let snapshots: ConnectorResult> = table + .metadata() + .snapshots() + .map(|snapshot| { + Ok(RwIcebergSnapshots { + source_id: source.id as i32, + schema_name: schema_name.clone(), + source_name: source.name.clone(), + sequence_number: snapshot.sequence_number(), + snapshot_id: snapshot.snapshot_id(), + timestamp_ms: Timestamptz::from_millis( + snapshot.timestamp()?.timestamp_millis(), + ), + manifest_list: snapshot.manifest_list().to_string(), + summary: Value::object( + snapshot + .summary() + .other + .iter() + .map(|(k, v)| (k.as_str(), ValueRef::String(v))), + ) + .into(), + }) + }) + .collect(); + result.extend(snapshots?); } } Ok(result) diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index 49729c6d9e8ac..1ee2b8d94737f 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -23,7 +23,7 @@ http = "1" hyper = "1" itertools = { workspace = true } lru = { workspace = true } -moka = { version = "0.12", features = ["future"] } +moka = { version = "0.12.8", features = ["future"] } paste = "1" rand = { workspace = true } risingwave_common = { workspace = true } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 2886c4e4e23f7..a03ef728a9046 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -36,7 +36,7 @@ libc = "0.2" lz4 = "1.25.0" memcomparable = "0.2" metrics-prometheus = "0.7" -moka = { version = "0.12", features = ["future", "sync"] } +moka = { version = "0.12.8", features = ["future", "sync"] } more-asserts = "0.3" num-integer = "0.1" parking_lot = { workspace = true } From 4ee73ba2b2b12e20677670d8b9353143fb37ab70 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 10 Sep 2024 14:25:26 +0800 Subject: [PATCH 09/24] fix risedev --- risedev.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/risedev.yml b/risedev.yml index a5eca729d2617..22c4569adb610 100644 --- a/risedev.yml +++ b/risedev.yml @@ -115,8 +115,8 @@ profile: - use: compactor - use: prometheus - use: grafana - # - use: kafka - # persist-data: true + - use: kafka + persist-data: true standalone-full-peripherals: steps: From 62644fe3b1c3107edbcf8278329384a906bb44b4 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 10 Sep 2024 14:47:12 +0800 Subject: [PATCH 10/24] fix comm --- src/batch/src/executor/iceberg_scan.rs | 35 +++++++++++++++----------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 0999e905188fb..0a9fb652500bc 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -25,6 +25,7 @@ use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit}; use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData}; @@ -41,7 +42,7 @@ pub struct IcebergScanExecutor { #[allow(dead_code)] snapshot_id: Option, table_meta: TableMetadata, - file_scan_tasks: Vec, + data_file_scan_tasks: Vec, eq_delete_file_scan_tasks: Vec, batch_size: usize, schema: Schema, @@ -67,7 +68,7 @@ impl IcebergScanExecutor { iceberg_config: IcebergConfig, snapshot_id: Option, table_meta: TableMetadata, - file_scan_tasks: Vec, + data_file_scan_tasks: Vec, eq_delete_file_scan_tasks: Vec, batch_size: usize, schema: Schema, @@ -77,7 +78,7 @@ impl IcebergScanExecutor { iceberg_config, snapshot_id, table_meta, - file_scan_tasks, + data_file_scan_tasks, eq_delete_file_scan_tasks, batch_size, schema, @@ -100,6 +101,7 @@ impl IcebergScanExecutor { .map(|(k, v)| (v.clone(), k)) .collect::>(); + // The value to remove from the column and its seq_num. let mut eq_delete_file_scan_tasks_map: HashMap< String, HashMap, i64>, @@ -131,7 +133,8 @@ impl IcebergScanExecutor { .collect_vec(); let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - for (array, columns_name) in chunk.columns().iter().zip_eq(delete_column_names) { + for (array, columns_name) in chunk.columns().iter().zip_eq_fast(delete_column_names) + { let each_column_seq_num_map = eq_delete_file_scan_tasks_map .entry(columns_name) .or_default(); @@ -145,15 +148,15 @@ impl IcebergScanExecutor { } } - let file_scan_tasks = mem::take(&mut self.file_scan_tasks); + let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks); - for file_scan_task in file_scan_tasks { - let sequence_number = file_scan_task.sequence_number; + for data_file_scan_task in data_file_scan_tasks { + let data_sequence_number = data_file_scan_task.sequence_number; let reader = table .reader_builder() .with_batch_size(self.batch_size) .build(); - let file_scan_stream = tokio_stream::once(Ok(file_scan_task)); + let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task)); let mut record_batch_stream = reader .read(Box::pin(file_scan_stream)) @@ -172,15 +175,16 @@ impl IcebergScanExecutor { let visibilitys: Vec<_> = chunk .columns() .iter() - .zip_eq(column_names.clone()) + .zip_eq_fast(column_names.clone()) .filter_map(|(array, column_map)| { if let Some(each_column_seq_num_map) = eq_delete_file_scan_tasks_map.get(&column_map) { let visibility = Bitmap::from_iter(array.get_all_values().iter().map(|datum| { - if let Some(s) = each_column_seq_num_map.get(datum) - && s > &sequence_number + if let Some(delete_sequence_number) = + each_column_seq_num_map.get(datum) + && delete_sequence_number > &data_sequence_number { false } else { @@ -193,10 +197,11 @@ impl IcebergScanExecutor { } }) .collect(); - let (data, va) = chunk.into_parts_v2(); + let (data, chunk_visibilitys) = chunk.into_parts_v2(); let visibility = if visibilitys.is_empty() { - va + chunk_visibilitys } else { + // Calculate the result of the or operation for different columns of the bitmap visibilitys .iter() .skip(1) @@ -204,14 +209,14 @@ impl IcebergScanExecutor { }; let data = data .iter() - .zip_eq(column_names) + .zip_eq_fast(column_names) .filter_map(|(array, columns)| { chunk_schema_name_to_id .get(&columns) .map(|&id| (id, array.clone())) }) .sorted_by_key(|a| a.0) - .map(|(k, v)| v) + .map(|(_k, v)| v) .collect_vec(); let chunk = DataChunk::new(data, visibility); debug_assert_eq!(chunk.data_types(), data_types); From 3a35be36870bab8aa031811e8f2de504119a0134 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 11 Sep 2024 14:10:33 +0800 Subject: [PATCH 11/24] fix comm --- src/batch/src/executor/iceberg_scan.rs | 129 ++++++++++++------------ src/common/src/array/mod.rs | 4 - src/connector/src/source/iceberg/mod.rs | 3 +- 3 files changed, 67 insertions(+), 69 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 0a9fb652500bc..16035dde0adb9 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::ops::BitOr; use std::collections::HashMap; use std::mem; @@ -24,6 +23,7 @@ use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::sink::iceberg::IcebergConfig; @@ -101,17 +101,13 @@ impl IcebergScanExecutor { .map(|(k, v)| (v.clone(), k)) .collect::>(); - // The value to remove from the column and its seq_num. - let mut eq_delete_file_scan_tasks_map: HashMap< - String, - HashMap, i64>, - > = HashMap::default(); + let mut eq_delete_file_scan_tasks_map: HashMap = HashMap::default(); let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); - for mut eq_delete_file_scan_task in eq_delete_file_scan_tasks { - eq_delete_file_scan_task.project_field_ids = - eq_delete_file_scan_task.equality_ids.clone(); + let mut delete_column_names = None; + for eq_delete_file_scan_task in eq_delete_file_scan_tasks { let mut sequence_number = eq_delete_file_scan_task.sequence_number; + let reader = table .reader_builder() .with_batch_size(self.batch_size) @@ -124,26 +120,28 @@ impl IcebergScanExecutor { while let Some(record_batch) = delete_record_batch_stream.next().await { let record_batch = record_batch.map_err(BatchError::Iceberg)?; - let delete_column_names = record_batch - .schema() - .fields() - .iter() - .map(|field| field.name()) - .cloned() - .collect_vec(); + if delete_column_names.is_none() { + delete_column_names = Some( + record_batch + .schema() + .fields() + .iter() + .map(|field| field.name()) + .cloned() + .collect_vec(), + ); + } let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - for (array, columns_name) in chunk.columns().iter().zip_eq_fast(delete_column_names) - { - let each_column_seq_num_map = eq_delete_file_scan_tasks_map - .entry(columns_name) + for row in chunk.rows() { + let entry = eq_delete_file_scan_tasks_map + .entry(OwnedRow::new( + row.iter() + .map(|scalar_ref| scalar_ref.map(Into::into)) + .collect_vec(), + )) .or_default(); - for datum in array.get_all_values() { - let entry = each_column_seq_num_map - .entry(datum) - .or_insert(sequence_number); - *entry = *entry.max(&mut sequence_number); - } + *entry = *entry.max(&mut sequence_number); } } } @@ -152,6 +150,7 @@ impl IcebergScanExecutor { for data_file_scan_task in data_file_scan_tasks { let data_sequence_number = data_file_scan_task.sequence_number; + let reader = table .reader_builder() .with_batch_size(self.batch_size) @@ -164,55 +163,57 @@ impl IcebergScanExecutor { while let Some(record_batch) = record_batch_stream.next().await { let record_batch = record_batch.map_err(BatchError::Iceberg)?; - let column_names = record_batch - .schema() + + let arrow_schema = record_batch.schema(); + let column_names = arrow_schema .fields() .iter() .map(|field| field.name()) - .cloned() .collect_vec(); let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - let visibilitys: Vec<_> = chunk - .columns() - .iter() - .zip_eq_fast(column_names.clone()) - .filter_map(|(array, column_map)| { - if let Some(each_column_seq_num_map) = - eq_delete_file_scan_tasks_map.get(&column_map) - { - let visibility = - Bitmap::from_iter(array.get_all_values().iter().map(|datum| { - if let Some(delete_sequence_number) = - each_column_seq_num_map.get(datum) - && delete_sequence_number > &data_sequence_number - { - false - } else { - true - } - })); - Some(visibility) - } else { - None - } - }) - .collect(); - let (data, chunk_visibilitys) = chunk.into_parts_v2(); - let visibility = if visibilitys.is_empty() { - chunk_visibilitys - } else { - // Calculate the result of the or operation for different columns of the bitmap - visibilitys - .iter() - .skip(1) - .fold(visibilitys[0].clone(), |acc, bitmap| acc.bitor(bitmap)) + let (data, visibility) = match delete_column_names { + Some(ref delete_column_names) => { + let column_ids = column_names + .iter() + .enumerate() + .filter_map(|(id, column_name)| { + if delete_column_names.contains(column_name) { + Some(id) + } else { + None + } + }) + .collect_vec(); + let visibility = + Bitmap::from_iter(chunk.project(&column_ids).rows().map(|row_ref| { + let row = OwnedRow::new( + row_ref + .iter() + .map(|scalar_ref| scalar_ref.map(Into::into)) + .collect_vec(), + ); + if let Some(delete_sequence_number) = + eq_delete_file_scan_tasks_map.get(&row) + && delete_sequence_number > &data_sequence_number + { + false + } else { + true + } + })) + .clone(); + let (data, _chunk_visibilities) = chunk.into_parts_v2(); + (data, visibility) + } + None => chunk.into_parts_v2(), }; + let data = data .iter() .zip_eq_fast(column_names) .filter_map(|(array, columns)| { chunk_schema_name_to_id - .get(&columns) + .get(columns) .map(|&id| (id, array.clone())) }) .sorted_by_key(|a| a.0) diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 54840198ccca0..b34e5f9b9c470 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -603,10 +603,6 @@ impl ArrayImpl { }) } - pub fn get_all_values(&self) -> Vec { - (0..self.len()).map(|i| self.datum_at(i)).collect() - } - /// # Safety /// /// This function is unsafe because it does not check the validity of `idx`. It is caller's diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 8d2f1e97f6f42..fd14f88100115 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -259,12 +259,13 @@ impl IcebergSplitEnumerator { #[for_await] for task in file_scan_stream { - let task: FileScanTask = task.map_err(|e| anyhow!(e))?; + let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?; match task.data_file_content { iceberg::spec::DataContentType::Data => { data_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); } iceberg::spec::DataContentType::EqualityDeletes => { + task.project_field_ids = task.equality_ids.clone(); eq_delete_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); } iceberg::spec::DataContentType::PositionDeletes => { From c68eecd0a8a55abbe33450fe4942f6f2ee4e3cc4 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 11 Sep 2024 15:35:10 +0800 Subject: [PATCH 12/24] delete schema mv --- src/batch/src/executor/iceberg_scan.rs | 122 +++++++++++++----------- src/connector/src/source/iceberg/mod.rs | 15 +-- 2 files changed, 72 insertions(+), 65 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 16035dde0adb9..6e59ebc9b9bd7 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -93,21 +93,30 @@ impl IcebergScanExecutor { .load_table_v2_with_metadata(self.table_meta) .await?; let data_types = self.schema.data_types(); - let chunk_schema_name_to_id = self - .schema - .names() - .iter() - .enumerate() - .map(|(k, v)| (v.clone(), k)) - .collect::>(); + let chunk_schema_name = self.schema.names(); let mut eq_delete_file_scan_tasks_map: HashMap = HashMap::default(); let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); - let mut delete_column_names = None; + let mut delete_column_names: Option> = None; for eq_delete_file_scan_task in eq_delete_file_scan_tasks { let mut sequence_number = eq_delete_file_scan_task.sequence_number; + if delete_column_names.is_none() { + delete_column_names = Some( + eq_delete_file_scan_task + .project_field_ids + .iter() + .filter_map(|id| { + eq_delete_file_scan_task + .schema + .name_by_field_id(*id) + .map(|name| name.to_string()) + }) + .collect(), + ); + } + let reader = table .reader_builder() .with_batch_size(self.batch_size) @@ -120,17 +129,6 @@ impl IcebergScanExecutor { while let Some(record_batch) = delete_record_batch_stream.next().await { let record_batch = record_batch.map_err(BatchError::Iceberg)?; - if delete_column_names.is_none() { - delete_column_names = Some( - record_batch - .schema() - .fields() - .iter() - .map(|field| field.name()) - .cloned() - .collect_vec(), - ); - } let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; for row in chunk.rows() { @@ -151,6 +149,31 @@ impl IcebergScanExecutor { for data_file_scan_task in data_file_scan_tasks { let data_sequence_number = data_file_scan_task.sequence_number; + let column_names: Vec<_> = data_file_scan_task + .project_field_ids + .iter() + .filter_map(|id| { + data_file_scan_task + .schema + .name_by_field_id(*id) + .map(|name| name.to_string()) + }) + .collect(); + + let delete_column_ids = delete_column_names.as_ref().map(|delete_column_names| { + column_names + .iter() + .enumerate() + .filter_map(|(id, column_name)| { + if delete_column_names.contains(column_name) { + Some(id) + } else { + None + } + }) + .collect_vec() + }); + let reader = table .reader_builder() .with_batch_size(self.batch_size) @@ -164,28 +187,11 @@ impl IcebergScanExecutor { while let Some(record_batch) = record_batch_stream.next().await { let record_batch = record_batch.map_err(BatchError::Iceberg)?; - let arrow_schema = record_batch.schema(); - let column_names = arrow_schema - .fields() - .iter() - .map(|field| field.name()) - .collect_vec(); let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - let (data, visibility) = match delete_column_names { - Some(ref delete_column_names) => { - let column_ids = column_names - .iter() - .enumerate() - .filter_map(|(id, column_name)| { - if delete_column_names.contains(column_name) { - Some(id) - } else { - None - } - }) - .collect_vec(); - let visibility = - Bitmap::from_iter(chunk.project(&column_ids).rows().map(|row_ref| { + let chunk = match delete_column_ids.as_ref() { + Some(delete_column_ids) => { + let visibility = Bitmap::from_iter( + chunk.project(delete_column_ids).rows().map(|row_ref| { let row = OwnedRow::new( row_ref .iter() @@ -200,27 +206,27 @@ impl IcebergScanExecutor { } else { true } - })) - .clone(); + }), + ) + .clone(); let (data, _chunk_visibilities) = chunk.into_parts_v2(); - (data, visibility) + let data = data + .iter() + .zip_eq_fast(&column_names) + .filter_map(|(array, columns)| { + if chunk_schema_name.contains(columns) { + Some(array.clone()) + } else { + None + } + }) + .collect_vec(); + let chunk = DataChunk::new(data, visibility); + debug_assert_eq!(chunk.data_types(), data_types); + chunk } - None => chunk.into_parts_v2(), + None => chunk, }; - - let data = data - .iter() - .zip_eq_fast(column_names) - .filter_map(|(array, columns)| { - chunk_schema_name_to_id - .get(columns) - .map(|&id| (id, array.clone())) - }) - .sorted_by_key(|a| a.0) - .map(|(_k, v)| v) - .collect_vec(); - let chunk = DataChunk::new(data, visibility); - debug_assert_eq!(chunk.data_types(), data_types); yield chunk; } } diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index fd14f88100115..3d64cedfdc963 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -14,7 +14,7 @@ pub mod parquet_file_reader; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; @@ -242,8 +242,12 @@ impl IcebergSplitEnumerator { None => bail!("Cannot find the current snapshot id in the iceberg table."), }, }; - let mut require_names = Self::get_eq_delete_names(&table, snapshot_id).await?; - require_names.extend(schema.names().clone().into_iter()); + let mut require_names = schema.names(); + for name in Self::get_eq_delete_names(&table, snapshot_id).await? { + if !require_names.contains(&name) { + require_names.push(name); + } + } let mut data_files = vec![]; let mut eq_delete_files = vec![]; @@ -304,10 +308,7 @@ impl IcebergSplitEnumerator { .collect_vec()) } - async fn get_eq_delete_names( - table: &Table, - snapshot_id: i64, - ) -> ConnectorResult> { + async fn get_eq_delete_names(table: &Table, snapshot_id: i64) -> ConnectorResult> { let scan = table .scan() .snapshot_id(snapshot_id) From 7b76adcb8554edea0f3f1744b89649641930151d Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 11 Sep 2024 17:02:38 +0800 Subject: [PATCH 13/24] fix comm --- src/batch/src/executor/iceberg_scan.rs | 37 ++++++++++++-------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 6e59ebc9b9bd7..53b7ee705be91 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -98,23 +98,13 @@ impl IcebergScanExecutor { let mut eq_delete_file_scan_tasks_map: HashMap = HashMap::default(); let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); - let mut delete_column_names: Option> = None; + // Iterate all the delete files to get the value and seq_num + let mut delete_equality_ids: Option> = None; for eq_delete_file_scan_task in eq_delete_file_scan_tasks { let mut sequence_number = eq_delete_file_scan_task.sequence_number; - if delete_column_names.is_none() { - delete_column_names = Some( - eq_delete_file_scan_task - .project_field_ids - .iter() - .filter_map(|id| { - eq_delete_file_scan_task - .schema - .name_by_field_id(*id) - .map(|name| name.to_string()) - }) - .collect(), - ); + if delete_equality_ids.is_none() { + delete_equality_ids = Some(eq_delete_file_scan_task.project_field_ids.clone()); } let reader = table @@ -146,6 +136,7 @@ impl IcebergScanExecutor { let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks); + // Delete rows in the data file that need to be deleted by map for data_file_scan_task in data_file_scan_tasks { let data_sequence_number = data_file_scan_task.sequence_number; @@ -160,13 +151,14 @@ impl IcebergScanExecutor { }) .collect(); - let delete_column_ids = delete_column_names.as_ref().map(|delete_column_names| { - column_names + // The order of the field_ids in the data file and delete file may be different, so need to correct them here + let delete_column_ids = delete_equality_ids.as_ref().map(|delete_column_ids| { + data_file_scan_task + .project_field_ids .iter() - .enumerate() - .filter_map(|(id, column_name)| { - if delete_column_names.contains(column_name) { - Some(id) + .filter_map(|project_field_id| { + if delete_column_ids.contains(project_field_id) { + Some(*project_field_id as usize) } else { None } @@ -191,6 +183,7 @@ impl IcebergScanExecutor { let chunk = match delete_column_ids.as_ref() { Some(delete_column_ids) => { let visibility = Bitmap::from_iter( + // Project with the schema of the delete file chunk.project(delete_column_ids).rows().map(|row_ref| { let row = OwnedRow::new( row_ref @@ -202,6 +195,8 @@ impl IcebergScanExecutor { eq_delete_file_scan_tasks_map.get(&row) && delete_sequence_number > &data_sequence_number { + // delete_sequence_number > data_sequence_number means the delete file is written later than data file, + // so it needs to be deleted false } else { true @@ -210,6 +205,7 @@ impl IcebergScanExecutor { ) .clone(); let (data, _chunk_visibilities) = chunk.into_parts_v2(); + // Keep the schema consistent(chunk and executor) let data = data .iter() .zip_eq_fast(&column_names) @@ -225,6 +221,7 @@ impl IcebergScanExecutor { debug_assert_eq!(chunk.data_types(), data_types); chunk } + // If there is no delete file, the data file is directly output None => chunk, }; yield chunk; From 52adf4c25febfce474a9f84d68f0b116bf780958 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 11 Sep 2024 20:13:43 +0800 Subject: [PATCH 14/24] fix comm --- risedev.yml | 4 ++-- src/batch/src/executor/iceberg_scan.rs | 21 +++++++--------- src/connector/src/source/iceberg/mod.rs | 32 +++++++++++++++---------- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/risedev.yml b/risedev.yml index 22c4569adb610..a5eca729d2617 100644 --- a/risedev.yml +++ b/risedev.yml @@ -115,8 +115,8 @@ profile: - use: compactor - use: prometheus - use: grafana - - use: kafka - persist-data: true + # - use: kafka + # persist-data: true standalone-full-peripherals: steps: diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 53b7ee705be91..308ca450e2113 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -123,11 +123,7 @@ impl IcebergScanExecutor { let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; for row in chunk.rows() { let entry = eq_delete_file_scan_tasks_map - .entry(OwnedRow::new( - row.iter() - .map(|scalar_ref| scalar_ref.map(Into::into)) - .collect_vec(), - )) + .entry(row.to_owned_row()) .or_default(); *entry = *entry.max(&mut sequence_number); } @@ -153,15 +149,14 @@ impl IcebergScanExecutor { // The order of the field_ids in the data file and delete file may be different, so need to correct them here let delete_column_ids = delete_equality_ids.as_ref().map(|delete_column_ids| { - data_file_scan_task - .project_field_ids + delete_column_ids .iter() - .filter_map(|project_field_id| { - if delete_column_ids.contains(project_field_id) { - Some(*project_field_id as usize) - } else { - None - } + .map(|delete_column_id| { + data_file_scan_task + .project_field_ids + .iter() + .position(|project_field_id| delete_column_id == project_field_id) + .expect("delete_column_id not found in delete_equality_ids") }) .collect_vec() }); diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 3d64cedfdc963..63dfe0a28f8f5 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -14,7 +14,7 @@ pub mod parquet_file_reader; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use anyhow::anyhow; use async_trait::async_trait; @@ -29,7 +29,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; -use crate::error::ConnectorResult; +use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; use crate::sink::iceberg::IcebergConfig; use crate::source::{ @@ -242,12 +242,7 @@ impl IcebergSplitEnumerator { None => bail!("Cannot find the current snapshot id in the iceberg table."), }, }; - let mut require_names = schema.names(); - for name in Self::get_eq_delete_names(&table, snapshot_id).await? { - if !require_names.contains(&name) { - require_names.push(name); - } - } + let require_names = Self::get_require_field_names(&table, snapshot_id,schema).await?; let mut data_files = vec![]; let mut eq_delete_files = vec![]; @@ -308,7 +303,7 @@ impl IcebergSplitEnumerator { .collect_vec()) } - async fn get_eq_delete_names(table: &Table, snapshot_id: i64) -> ConnectorResult> { + async fn get_require_field_names(table: &Table, snapshot_id: i64, rw_schema: Schema) -> ConnectorResult> { let scan = table .scan() .snapshot_id(snapshot_id) @@ -328,13 +323,26 @@ impl IcebergSplitEnumerator { } } } - equality_ids + let mut delete_columns = equality_ids .into_iter() .map(|id| match schema.name_by_field_id(id) { - Some(name) => Ok(name.to_string()), + Some(name) => Ok::(name.to_string()), None => bail!("Delete field id {} not found in schema", id), }) - .collect() + .collect::>>()?; + delete_columns.extend(rw_schema.names().iter().cloned()); + let require_field_ids = schema.as_struct() + .fields() + .iter() + .filter_map(|field| { + if delete_columns.contains(&field.name) { + Some(field.name.clone()) + } else { + None + } + }) + .collect(); + Ok(require_field_ids) } } From d38d5bb0e46812755b347109e73963593655aabd Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 12 Sep 2024 11:50:52 +0800 Subject: [PATCH 15/24] add ci --- .../test_case/iceberg_source_eq_delete.slt | 104 ++++++++++++++++++ .../test_case/iceberg_source_eq_delete.toml | 11 ++ src/connector/src/source/iceberg/mod.rs | 14 ++- 3 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt create mode 100644 e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt new file mode 100644 index 0000000000000..6700cc00345c1 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt @@ -0,0 +1,104 @@ +statement ok +set sink_decouple = false; + +statement ok +set streaming_parallelism=4; + +statement ok +CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar); + +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1; + +statement ok +CREATE SINK sink1 AS select * from mv1 WITH ( + connector = 'iceberg', + type = 'upsert', + force_append_only = 'true', + database.name = 'demo_db', + table.name = 't1', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + create_table_if_not_exists = 'true', + primary_key = 'i1,i2', +); + +statement ok +CREATE SOURCE iceberg_t1_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + database.name = 'demo_db', + table.name = 't1', +); + +statement ok +insert into s1 values(1,'2','3'); + +statement ok +insert into s1 values(4,'5','6'); + +statement ok +insert into s1 values(7,'8','9'); + +statement ok +delete from s1 where i1 = 7; + +statement ok +flush; + +query I +select * from iceberg_t1_source order by i1; +---- +1,'2','3' +4,'5','6' + +query I +select i1,i2,i3 from iceberg_t1_source order by v1; +---- +1,'2','3' +4,'5','6' + +query I +select i3,i2 from iceberg_t1_source order by i2; +---- +'3','2' +'6','5' + + +query I +select i1 from iceberg_t1_source order by i1; +---- +1 +4 + +query I +select i2 from iceberg_t1_source order by i2; +---- +'2' +'5' + +query I +select i3 from iceberg_t1_source order by i3; +---- +'3' +'6' + +statement ok +DROP SINK sink1; + +statement ok +DROP SOURCE iceberg_t1_source; + +statement ok +DROP TABLE s1 cascade; diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml new file mode 100644 index 0000000000000..994a1c16621ae --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml @@ -0,0 +1,11 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.t1', +] + +slt = 'test_case/iceberg_select_empty_table.slt' + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.t1', + 'DROP SCHEMA IF EXISTS demo_db', +] \ No newline at end of file diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index d87ba74e587cf..550c966c4f465 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -213,9 +213,10 @@ impl IcebergSplitEnumerator { // If there is no snapshot, we will return a mock `IcebergSplit` with empty files. return Ok(vec![IcebergSplit { split_id: 0, - snapshot_id: 0, // unused + snapshot_id: 0, table_meta: TableMetadataJsonStr::serialize(table.metadata()), files: vec![], + eq_delete_files: vec![], }]); } @@ -253,7 +254,7 @@ impl IcebergSplitEnumerator { current_snapshot.unwrap().snapshot_id() } }; - let require_names = Self::get_require_field_names(&table, snapshot_id,schema).await?; + let require_names = Self::get_require_field_names(&table, snapshot_id, schema).await?; let mut data_files = vec![]; let mut eq_delete_files = vec![]; @@ -314,7 +315,11 @@ impl IcebergSplitEnumerator { .collect_vec()) } - async fn get_require_field_names(table: &Table, snapshot_id: i64, rw_schema: Schema) -> ConnectorResult> { + async fn get_require_field_names( + table: &Table, + snapshot_id: i64, + rw_schema: Schema, + ) -> ConnectorResult> { let scan = table .scan() .snapshot_id(snapshot_id) @@ -342,7 +347,8 @@ impl IcebergSplitEnumerator { }) .collect::>>()?; delete_columns.extend(rw_schema.names().iter().cloned()); - let require_field_ids = schema.as_struct() + let require_field_ids = schema + .as_struct() .fields() .iter() .filter_map(|field| { From da835cfc18ae9ec33a309120ae364a743f73b770 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 12 Sep 2024 14:10:23 +0800 Subject: [PATCH 16/24] fix cargo lock --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6027673d32cc6..51d7136e5bb88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5318,7 +5318,7 @@ dependencies = [ "http 0.2.9", "thiserror", "tokio", - "tonic 0.11.0", + "tonic 0.10.2", "tower", "tracing", "trust-dns-resolver 0.23.2", @@ -14308,7 +14308,10 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.12.1", + "rustls 0.21.11", + "rustls-pemfile 1.0.4", "tokio", + "tokio-rustls 0.24.1", "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "tower", "tower-layer", @@ -14335,10 +14338,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.12.1", - "rustls-pemfile 2.1.1", - "rustls-pki-types", "tokio", - "tokio-rustls 0.25.0", "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "tower", "tower-layer", From 0c29c4010aef76851a2de36366068968a7069ee2 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 12 Sep 2024 14:35:30 +0800 Subject: [PATCH 17/24] add ci --- Cargo.lock | 6 +++--- Cargo.toml | 6 +++--- ci/scripts/e2e-iceberg-sink-v2-test.sh | 1 + e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt | 5 +++++ e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml | 2 +- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 51d7136e5bb88..0988cc14eb8a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5969,7 +5969,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=682b38037dafa78ebbc6e0479844e980c5ed68a4#682b38037dafa78ebbc6e0479844e980c5ed68a4" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=057665e7ddebcc5ff0663fbc8058d88ba2950153#057665e7ddebcc5ff0663fbc8058d88ba2950153" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6013,7 +6013,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=682b38037dafa78ebbc6e0479844e980c5ed68a4#682b38037dafa78ebbc6e0479844e980c5ed68a4" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=057665e7ddebcc5ff0663fbc8058d88ba2950153#057665e7ddebcc5ff0663fbc8058d88ba2950153" dependencies = [ "anyhow", "async-trait", @@ -6030,7 +6030,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=682b38037dafa78ebbc6e0479844e980c5ed68a4#682b38037dafa78ebbc6e0479844e980c5ed68a4" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=057665e7ddebcc5ff0663fbc8058d88ba2950153#057665e7ddebcc5ff0663fbc8058d88ba2950153" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 61c38ad2f3147..220959124f73b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,9 +143,9 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" } arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "682b38037dafa78ebbc6e0479844e980c5ed68a4" } -iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "682b38037dafa78ebbc6e0479844e980c5ed68a4" } -iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "682b38037dafa78ebbc6e0479844e980c5ed68a4" } +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "057665e7ddebcc5ff0663fbc8058d88ba2950153" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "057665e7ddebcc5ff0663fbc8058d88ba2950153" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "057665e7ddebcc5ff0663fbc8058d88ba2950153" } opendal = "0.47" arrow-array = "50" arrow-arith = "50" diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index 1a46f30682bdd..c039c625aa213 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -46,6 +46,7 @@ poetry run python main.py -t ./test_case/range_partition_append_only.toml poetry run python main.py -t ./test_case/range_partition_upsert.toml poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml +poetry run python main.py -t ./test_case/iceberg_source_eq_delete.toml echo "--- Kill cluster" diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt index 6700cc00345c1..c2b79c12d29a1 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt @@ -75,6 +75,11 @@ select i3,i2 from iceberg_t1_source order by i2; '3','2' '6','5' +query I +select i2,i1 from iceberg_t1_source order by i1; +---- +'2',1 +'5',4 query I select i1 from iceberg_t1_source order by i1; diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml index 994a1c16621ae..6e49ca949f501 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml +++ b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml @@ -3,7 +3,7 @@ init_sqls = [ 'DROP TABLE IF EXISTS demo_db.t1', ] -slt = 'test_case/iceberg_select_empty_table.slt' +slt = 'test_case/iceberg_source_eq_delete.slt' drop_sqls = [ 'DROP TABLE IF EXISTS demo_db.t1', From 24092e2df3a06bc53f93a7973cfc127d14505812 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 12 Sep 2024 15:08:11 +0800 Subject: [PATCH 18/24] fix ci --- Cargo.lock | 6 +++--- Cargo.toml | 6 +++--- e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt | 1 - risedev.yml | 4 ++-- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0988cc14eb8a3..6b7895e8b8007 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5969,7 +5969,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=057665e7ddebcc5ff0663fbc8058d88ba2950153#057665e7ddebcc5ff0663fbc8058d88ba2950153" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=bf97b9de51c5c86394e4df7aaff82cc3ed8bc928#bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6013,7 +6013,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=057665e7ddebcc5ff0663fbc8058d88ba2950153#057665e7ddebcc5ff0663fbc8058d88ba2950153" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=bf97b9de51c5c86394e4df7aaff82cc3ed8bc928#bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" dependencies = [ "anyhow", "async-trait", @@ -6030,7 +6030,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=057665e7ddebcc5ff0663fbc8058d88ba2950153#057665e7ddebcc5ff0663fbc8058d88ba2950153" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=bf97b9de51c5c86394e4df7aaff82cc3ed8bc928#bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 220959124f73b..4d032ab50dac0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,9 +143,9 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" } arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "057665e7ddebcc5ff0663fbc8058d88ba2950153" } -iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "057665e7ddebcc5ff0663fbc8058d88ba2950153" } -iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "057665e7ddebcc5ff0663fbc8058d88ba2950153" } +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" } opendal = "0.47" arrow-array = "50" arrow-arith = "50" diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt index c2b79c12d29a1..ba0228c38bc0b 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt @@ -14,7 +14,6 @@ statement ok CREATE SINK sink1 AS select * from mv1 WITH ( connector = 'iceberg', type = 'upsert', - force_append_only = 'true', database.name = 'demo_db', table.name = 't1', catalog.name = 'demo', diff --git a/risedev.yml b/risedev.yml index a5eca729d2617..22c4569adb610 100644 --- a/risedev.yml +++ b/risedev.yml @@ -115,8 +115,8 @@ profile: - use: compactor - use: prometheus - use: grafana - # - use: kafka - # persist-data: true + - use: kafka + persist-data: true standalone-full-peripherals: steps: From e39af11c16f6ae257b136d0fc2b5e76cb743a96d Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 12 Sep 2024 16:02:59 +0800 Subject: [PATCH 19/24] fix ci fix fmt --- Cargo.toml | 1 + .../test_case/iceberg_source_eq_delete.slt | 2 ++ src/connector/src/source/iceberg/mod.rs | 30 ++++++++----------- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4d032ab50dac0..e5299db81af40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,6 +143,7 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" } arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } +# branch dev iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" } iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" } iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" } diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt index ba0228c38bc0b..8849d46972ae8 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt @@ -56,6 +56,8 @@ delete from s1 where i1 = 7; statement ok flush; +sleep 5s + query I select * from iceberg_t1_source order by i1; ---- diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 550c966c4f465..9f685050d9f11 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -14,7 +14,7 @@ pub mod parquet_file_reader; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; @@ -208,12 +208,13 @@ impl IcebergSplitEnumerator { bail!("Batch parallelism is 0. Cannot split the iceberg files."); } let table = self.config.load_table_v2().await?; + let current_snapshot = table.metadata().current_snapshot(); if current_snapshot.is_none() { // If there is no snapshot, we will return a mock `IcebergSplit` with empty files. return Ok(vec![IcebergSplit { split_id: 0, - snapshot_id: 0, + snapshot_id: 0, // unused table_meta: TableMetadataJsonStr::serialize(table.metadata()), files: vec![], eq_delete_files: vec![], @@ -339,27 +340,20 @@ impl IcebergSplitEnumerator { } } } - let mut delete_columns = equality_ids + let delete_columns = equality_ids .into_iter() .map(|id| match schema.name_by_field_id(id) { Some(name) => Ok::(name.to_string()), None => bail!("Delete field id {} not found in schema", id), }) - .collect::>>()?; - delete_columns.extend(rw_schema.names().iter().cloned()); - let require_field_ids = schema - .as_struct() - .fields() - .iter() - .filter_map(|field| { - if delete_columns.contains(&field.name) { - Some(field.name.clone()) - } else { - None - } - }) - .collect(); - Ok(require_field_ids) + .collect::>>()?; + let mut require_field_names: Vec<_> = rw_schema.names().to_vec(); + for names in delete_columns { + if !require_field_names.contains(&names) { + require_field_names.push(names); + } + } + Ok(require_field_names) } } From 31c15a7d55029e4c878867539c193377ee18c4b5 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 12 Sep 2024 16:58:49 +0800 Subject: [PATCH 20/24] fix ci --- .../test_case/iceberg_source_eq_delete.slt | 55 ++++++++++--------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt index 8849d46972ae8..f1aa92235d7f9 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt @@ -28,27 +28,16 @@ CREATE SINK sink1 AS select * from mv1 WITH ( ); statement ok -CREATE SOURCE iceberg_t1_source -WITH ( - connector = 'iceberg', - s3.endpoint = 'http://127.0.0.1:9301', - s3.region = 'us-east-1', - s3.access.key = 'hummockadmin', - s3.secret.key = 'hummockadmin', - catalog.type = 'storage', - warehouse.path = 's3a://icebergdata/demo', - database.name = 'demo_db', - table.name = 't1', -); +insert into s1 values(1,'2','3'); statement ok -insert into s1 values(1,'2','3'); +insert into s1 values(7,'8','9'); statement ok insert into s1 values(4,'5','6'); statement ok -insert into s1 values(7,'8','9'); +flush; statement ok delete from s1 where i1 = 7; @@ -58,29 +47,43 @@ flush; sleep 5s +statement ok +CREATE SOURCE iceberg_t1_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + database.name = 'demo_db', + table.name = 't1', +); + query I select * from iceberg_t1_source order by i1; ---- -1,'2','3' -4,'5','6' +1,2,3 +4,5,6 query I select i1,i2,i3 from iceberg_t1_source order by v1; ---- -1,'2','3' -4,'5','6' +1,2,3 +4,5,6 query I select i3,i2 from iceberg_t1_source order by i2; ---- -'3','2' -'6','5' +3,2 +6,5 query I select i2,i1 from iceberg_t1_source order by i1; ---- -'2',1 -'5',4 +2,1 +5,4 query I select i1 from iceberg_t1_source order by i1; @@ -91,14 +94,14 @@ select i1 from iceberg_t1_source order by i1; query I select i2 from iceberg_t1_source order by i2; ---- -'2' -'5' +2 +5 query I select i3 from iceberg_t1_source order by i3; ---- -'3' -'6' +3 +6 statement ok DROP SINK sink1; From e0df8cc1b39ca07d62a08819c462528a7da829fd Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 12 Sep 2024 17:21:51 +0800 Subject: [PATCH 21/24] fix ci fix ci --- .../test_case/iceberg_source_eq_delete.slt | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt index f1aa92235d7f9..820776fb7e773 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt @@ -64,26 +64,26 @@ WITH ( query I select * from iceberg_t1_source order by i1; ---- -1,2,3 -4,5,6 +1 2 3 +4 5 6 query I -select i1,i2,i3 from iceberg_t1_source order by v1; +select i1,i2,i3 from iceberg_t1_source order by i1; ---- -1,2,3 -4,5,6 +1 2 3 +4 5 6 query I select i3,i2 from iceberg_t1_source order by i2; ---- -3,2 -6,5 +3 2 +6 5 query I select i2,i1 from iceberg_t1_source order by i1; ---- -2,1 -5,4 +2 1 +5 4 query I select i1 from iceberg_t1_source order by i1; From a42665a84f33af7806f87be782a8adb5552bfea3 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 12 Sep 2024 18:50:12 +0800 Subject: [PATCH 22/24] refactor --- src/batch/src/executor/iceberg_scan.rs | 48 +++++++++++++------------ src/connector/src/source/iceberg/mod.rs | 1 + 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 308ca450e2113..2f67d8ce005aa 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -93,18 +93,24 @@ impl IcebergScanExecutor { .load_table_v2_with_metadata(self.table_meta) .await?; let data_types = self.schema.data_types(); - let chunk_schema_name = self.schema.names(); + let executor_schema_names = self.schema.names(); let mut eq_delete_file_scan_tasks_map: HashMap = HashMap::default(); let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); - // Iterate all the delete files to get the value and seq_num - let mut delete_equality_ids: Option> = None; + // Build hash map for equality delete files + // Currently, all equality delete files have the same schema which is guaranteed by `IcebergSplitEnumerator`. + let mut eq_delete_ids: Option> = None; for eq_delete_file_scan_task in eq_delete_file_scan_tasks { let mut sequence_number = eq_delete_file_scan_task.sequence_number; - if delete_equality_ids.is_none() { - delete_equality_ids = Some(eq_delete_file_scan_task.project_field_ids.clone()); + if eq_delete_ids.is_none() { + eq_delete_ids = Some(eq_delete_file_scan_task.project_field_ids.clone()); + } else { + debug_assert_eq!( + eq_delete_ids.as_ref().unwrap(), + &eq_delete_file_scan_task.project_field_ids + ); } let reader = table @@ -136,7 +142,7 @@ impl IcebergScanExecutor { for data_file_scan_task in data_file_scan_tasks { let data_sequence_number = data_file_scan_task.sequence_number; - let column_names: Vec<_> = data_file_scan_task + let data_chunk_column_names: Vec<_> = data_file_scan_task .project_field_ids .iter() .filter_map(|id| { @@ -147,16 +153,16 @@ impl IcebergScanExecutor { }) .collect(); - // The order of the field_ids in the data file and delete file may be different, so need to correct them here - let delete_column_ids = delete_equality_ids.as_ref().map(|delete_column_ids| { - delete_column_ids + // eq_delete_column_idxes are used to fetch equality delete columns from data files. + let eq_delete_column_idxes = eq_delete_ids.as_ref().map(|eq_delete_ids| { + eq_delete_ids .iter() - .map(|delete_column_id| { + .map(|eq_delete_id| { data_file_scan_task .project_field_ids .iter() - .position(|project_field_id| delete_column_id == project_field_id) - .expect("delete_column_id not found in delete_equality_ids") + .position(|project_field_id| eq_delete_id == project_field_id) + .expect("eq_delete_id not found in delete_equality_ids") }) .collect_vec() }); @@ -175,17 +181,12 @@ impl IcebergScanExecutor { let record_batch = record_batch.map_err(BatchError::Iceberg)?; let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - let chunk = match delete_column_ids.as_ref() { + let chunk = match eq_delete_column_idxes.as_ref() { Some(delete_column_ids) => { let visibility = Bitmap::from_iter( // Project with the schema of the delete file chunk.project(delete_column_ids).rows().map(|row_ref| { - let row = OwnedRow::new( - row_ref - .iter() - .map(|scalar_ref| scalar_ref.map(Into::into)) - .collect_vec(), - ); + let row = row_ref.to_owned_row(); if let Some(delete_sequence_number) = eq_delete_file_scan_tasks_map.get(&row) && delete_sequence_number > &data_sequence_number @@ -199,13 +200,14 @@ impl IcebergScanExecutor { }), ) .clone(); - let (data, _chunk_visibilities) = chunk.into_parts_v2(); // Keep the schema consistent(chunk and executor) - let data = data + // Filter out (equality delete) columns that are not in the executor schema + let data = chunk + .columns() .iter() - .zip_eq_fast(&column_names) + .zip_eq_fast(&data_chunk_column_names) .filter_map(|(array, columns)| { - if chunk_schema_name.contains(columns) { + if executor_schema_names.contains(columns) { Some(array.clone()) } else { None diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 9f685050d9f11..845ffb66804d3 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -348,6 +348,7 @@ impl IcebergSplitEnumerator { }) .collect::>>()?; let mut require_field_names: Vec<_> = rw_schema.names().to_vec(); + // Add the delete columns to the required field names for names in delete_columns { if !require_field_names.contains(&names) { require_field_names.push(names); From 83c30a61a62c53a8e644d5fc2feac1fcb2f81148 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 12 Sep 2024 19:06:38 +0800 Subject: [PATCH 23/24] fix ci fix cargo lock fix --- Cargo.lock | 122 +++++++++++++++++++++++++++++--------- Cargo.toml | 7 ++- src/connector/Cargo.toml | 2 +- src/expr/impl/Cargo.toml | 2 +- src/rpc_client/Cargo.toml | 2 +- src/storage/Cargo.toml | 2 +- 6 files changed, 101 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b7895e8b8007..da46d143a51ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1021,7 +1021,7 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" dependencies = [ - "async-lock 2.8.0", + "async-lock", "async-task", "concurrent-queue", "fastrand 1.9.0", @@ -1038,7 +1038,7 @@ dependencies = [ "async-channel 1.9.0", "async-executor", "async-io", - "async-lock 2.8.0", + "async-lock", "blocking", "futures-lite", "once_cell", @@ -1051,7 +1051,7 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" dependencies = [ - "async-lock 2.8.0", + "async-lock", "autocfg", "cfg-if", "concurrent-queue", @@ -1074,17 +1074,6 @@ dependencies = [ "event-listener 2.5.3", ] -[[package]] -name = "async-lock" -version = "3.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" -dependencies = [ - "event-listener 5.3.1", - "event-listener-strategy", - "pin-project-lite", -] - [[package]] name = "async-nats" version = "0.35.0" @@ -1139,7 +1128,7 @@ dependencies = [ "async-channel 1.9.0", "async-global-executor", "async-io", - "async-lock 2.8.0", + "async-lock", "crossbeam-utils", "futures-channel", "futures-core", @@ -2214,7 +2203,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" dependencies = [ "async-channel 1.9.0", - "async-lock 2.8.0", + "async-lock", "async-task", "atomic-waker", "fastrand 1.9.0", @@ -2354,6 +2343,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytecount" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" + [[package]] name = "bytemuck" version = "1.14.0" @@ -2421,6 +2416,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "981520c98f422fcc584dc1a95c334e6953900b9106bc47a9839b81790009eb21" +[[package]] +name = "camino" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3" +dependencies = [ + "serde", +] + [[package]] name = "cap-fs-ext" version = "3.0.0" @@ -2492,6 +2496,28 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1582e1c9e755dd6ad6b224dcffb135d199399a4568d454bd89fe515ca8425695" +[[package]] +name = "cargo-platform" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver 1.0.18", + "serde", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -4480,6 +4506,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "escape8259" version = "0.5.2" @@ -5969,7 +6004,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=bf97b9de51c5c86394e4df7aaff82cc3ed8bc928#bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=82142ea07044203becf72ea6d067d619d0cf46d9#82142ea07044203becf72ea6d067d619d0cf46d9" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6013,7 +6048,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=bf97b9de51c5c86394e4df7aaff82cc3ed8bc928#bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=82142ea07044203becf72ea6d067d619d0cf46d9#82142ea07044203becf72ea6d067d619d0cf46d9" dependencies = [ "anyhow", "async-trait", @@ -6030,7 +6065,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=bf97b9de51c5c86394e4df7aaff82cc3ed8bc928#bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=82142ea07044203becf72ea6d067d619d0cf46d9#82142ea07044203becf72ea6d067d619d0cf46d9" dependencies = [ "async-trait", "chrono", @@ -7247,21 +7282,21 @@ dependencies = [ [[package]] name = "moka" -version = "0.12.8" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +checksum = "8dc65d4615c08c8a13d91fd404b5a2a4485ba35b4091e3315cf8798d280c2f29" dependencies = [ - "async-lock 3.4.0", + "async-lock", "async-trait", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", - "event-listener 5.3.1", "futures-util", "once_cell", "parking_lot 0.12.1", "quanta", "rustc_version 0.4.0", + "skeptic", "smallvec", "tagptr", "thiserror", @@ -9273,6 +9308,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b" +dependencies = [ + "bitflags 2.6.0", + "memchr", + "unicase", +] + [[package]] name = "pulsar" version = "6.3.0" @@ -9382,12 +9428,12 @@ checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" [[package]] name = "quanta" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +version = "0.11.0" +source = "git+https://github.com/madsim-rs/quanta.git?rev=948bdc3#948bdc3d4cd3fcfe3d52d03dd83deee96d97d770" dependencies = [ "crossbeam-utils", "libc", + "mach2", "once_cell", "raw-cpuid", "wasi", @@ -9506,11 +9552,11 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "11.1.0" +version = "10.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" dependencies = [ - "bitflags 2.6.0", + "bitflags 1.3.2", ] [[package]] @@ -12624,6 +12670,9 @@ name = "semver" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" +dependencies = [ + "serde", +] [[package]] name = "semver-parser" @@ -13072,6 +13121,21 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fed904c7fb2856d868b92464fc8fa597fce366edea1a9cbfaa8cb5fe080bd6d" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.9" diff --git a/Cargo.toml b/Cargo.toml index e5299db81af40..a3aa9e23784ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -144,9 +144,9 @@ arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } # branch dev -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" } -iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" } -iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" } +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "82142ea07044203becf72ea6d067d619d0cf46d9" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "82142ea07044203becf72ea6d067d619d0cf46d9" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "82142ea07044203becf72ea6d067d619d0cf46d9" } opendal = "0.47" arrow-array = "50" arrow-arith = "50" @@ -344,6 +344,7 @@ opt-level = 2 [patch.crates-io] # Patch third-party crates for deterministic simulation. +quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" } # Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies. # Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`. diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 46846318f040a..3801508a7aa19 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -76,7 +76,7 @@ jni = { version = "0.21.1", features = ["invocation"] } jsonbb = { workspace = true } jsonwebtoken = "9.2.0" maplit = "1.0.2" -moka = { version = "0.12.8", features = ["future"] } +moka = { version = "0.12.0", features = ["future"] } mongodb = { version = "2.8.2", features = ["tokio-runtime"] } mysql_async = { version = "0.34", default-features = false, features = [ "default", diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index ac18122044a38..c0e506889ef77 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -51,7 +51,7 @@ itertools = { workspace = true } jsonbb = { workspace = true } linkme = { version = "0.3", features = ["used_linker"] } md5 = "0.7" -moka = { version = "0.12.8", features = ["sync"] } +moka = { version = "0.12.0", features = ["sync"] } num-traits = "0.2" openssl = "0.10" regex = "1" diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index 1ee2b8d94737f..6a25be3c21738 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -23,7 +23,7 @@ http = "1" hyper = "1" itertools = { workspace = true } lru = { workspace = true } -moka = { version = "0.12.8", features = ["future"] } +moka = { version = "0.12.0", features = ["future"] } paste = "1" rand = { workspace = true } risingwave_common = { workspace = true } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 36858519bb20d..b321c43b99e43 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -36,7 +36,7 @@ libc = "0.2" lz4 = "1.25.0" memcomparable = "0.2" metrics-prometheus = "0.7" -moka = { version = "0.12.8", features = ["future", "sync"] } +moka = { version = "0.12.0", features = ["future", "sync"] } more-asserts = "0.3" num-integer = "0.1" parking_lot = { workspace = true } From 88a8b62a633746ac960f55f827634aa09244ee16 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 13 Sep 2024 12:22:25 +0800 Subject: [PATCH 24/24] use dev --- Cargo.lock | 6 +++--- Cargo.toml | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da46d143a51ba..366fb3b36a672 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6004,7 +6004,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=82142ea07044203becf72ea6d067d619d0cf46d9#82142ea07044203becf72ea6d067d619d0cf46d9" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=84bf51c9d0d5886e4ee306ca4f383f029e1767a4#84bf51c9d0d5886e4ee306ca4f383f029e1767a4" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6048,7 +6048,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=82142ea07044203becf72ea6d067d619d0cf46d9#82142ea07044203becf72ea6d067d619d0cf46d9" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=84bf51c9d0d5886e4ee306ca4f383f029e1767a4#84bf51c9d0d5886e4ee306ca4f383f029e1767a4" dependencies = [ "anyhow", "async-trait", @@ -6065,7 +6065,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=82142ea07044203becf72ea6d067d619d0cf46d9#82142ea07044203becf72ea6d067d619d0cf46d9" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=84bf51c9d0d5886e4ee306ca4f383f029e1767a4#84bf51c9d0d5886e4ee306ca4f383f029e1767a4" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index a3aa9e23784ee..63feb981d9a1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -144,9 +144,9 @@ arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } # branch dev -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "82142ea07044203becf72ea6d067d619d0cf46d9" } -iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "82142ea07044203becf72ea6d067d619d0cf46d9" } -iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "82142ea07044203becf72ea6d067d619d0cf46d9" } +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "84bf51c9d0d5886e4ee306ca4f383f029e1767a4" } opendal = "0.47" arrow-array = "50" arrow-arith = "50"