From 79d98109deee8518740be6a00c3e4f41c29d5f1b Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 27 Aug 2024 13:58:57 +0800 Subject: [PATCH 1/3] feat(storage): Support for repairing the size of a split sst based on table stats (#18053) --- proto/hummock.proto | 4 ++ .../picker/trivial_move_compaction_picker.rs | 3 +- .../picker/vnode_watermark_picker.rs | 2 +- src/meta/src/hummock/manager/commit_epoch.rs | 55 ++++++++++++++-- .../manager/compaction_group_manager.rs | 14 +--- src/meta/src/hummock/manager/tests.rs | 64 +++++-------------- src/meta/src/manager/diagnose.rs | 2 +- .../compaction_group/hummock_version_ext.rs | 37 ++++++++++- src/storage/hummock_sdk/src/table_stats.rs | 8 +++ .../src/hummock/compactor/compaction_utils.rs | 11 ++-- .../src/hummock/compactor/compactor_runner.rs | 2 - .../compactor/fast_compactor_runner.rs | 5 +- .../compactor/shared_buffer_compact.rs | 2 +- src/storage/src/hummock/sstable/builder.rs | 16 +++++ src/storage/src/hummock/sstable_store.rs | 2 +- 15 files changed, 141 insertions(+), 86 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 60dbe176958f..9015cb44d42a 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -922,6 +922,10 @@ message TableStats { int64 total_key_size = 1; int64 total_value_size = 2; int64 total_key_count = 3; + + // `total_compressed_size`` represents the size that the table takes up in the output sst + // and this field is only filled and used by CN flushes, not compactor compaction + uint64 total_compressed_size = 4; } message HummockVersionStats { diff --git a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs index 6e172f15d380..458919cd8b71 100644 --- a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs @@ -55,7 +55,7 @@ impl TrivialMovePicker { ) -> Option { let mut skip_by_pending = false; for sst in select_tables { - if sst.file_size < self.sst_allowed_trivial_move_min_size { + if sst.sst_size < self.sst_allowed_trivial_move_min_size { continue; } @@ -128,6 +128,7 @@ pub mod tests { let sst = SstableInfo { sst_id: 1, file_size: 100, + sst_size: 100, ..Default::default() }; diff --git a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs index 50a33c7d42e4..5171c48ad9c3 100644 --- a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs +++ b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs @@ -50,7 +50,7 @@ impl VnodeWatermarkCompactionPicker { return None; } Some(CompactionInput { - select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(), total_file_count: select_input_ssts.len() as u64, input_levels: vec![ InputLevel { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 0c07ca6f09fb..0853e28fcb57 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -139,7 +139,7 @@ impl HummockManager { for s in &mut sstables { add_prost_table_stats_map( &mut table_stats_change, - &to_prost_table_stats_map(std::mem::take(&mut s.table_stats)), + &to_prost_table_stats_map(s.table_stats.clone()), ); } @@ -158,7 +158,6 @@ impl HummockManager { ); let state_table_info = &version.latest_version().state_table_info; - let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); // Add new table @@ -221,8 +220,23 @@ impl HummockManager { NewTableFragmentInfo::None => (HashMap::new(), None, None), }; + let mut group_members_table_ids: HashMap> = HashMap::new(); + { + // expand group_members_table_ids + for (table_id, group_id) in &table_compaction_group_mapping { + group_members_table_ids + .entry(*group_id) + .or_default() + .insert(*table_id); + } + } + let commit_sstables = self - .correct_commit_ssts(sstables, &table_compaction_group_mapping) + .correct_commit_ssts( + sstables, + &table_compaction_group_mapping, + &group_members_table_ids, + ) .await?; let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); @@ -379,6 +393,7 @@ impl HummockManager { &self, sstables: Vec, table_compaction_group_mapping: &HashMap, + group_members_table_ids: &HashMap>, ) -> Result>> { let mut new_sst_id_number = 0; let mut sst_to_cg_vec = Vec::with_capacity(sstables.len()); @@ -413,8 +428,36 @@ impl HummockManager { let mut commit_sstables: BTreeMap> = BTreeMap::new(); for (mut sst, group_table_ids) in sst_to_cg_vec { - for (group_id, _match_ids) in group_table_ids { - let branch_sst = split_sst(&mut sst.sst_info, &mut new_sst_id); + for (group_id, match_ids) in group_table_ids { + let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap(); + if match_ids + .iter() + .all(|id| group_members_table_ids.contains(&TableId::new(*id))) + { + commit_sstables + .entry(group_id) + .or_default() + .push(sst.sst_info.clone()); + continue; + } + + let origin_sst_size = sst.sst_info.sst_size; + let new_sst_size = match_ids + .iter() + .map(|id| { + let stat = sst.table_stats.get(id).unwrap(); + stat.total_compressed_size + }) + .sum(); + + let branch_sst = split_sst( + &mut sst.sst_info, + &mut new_sst_id, + origin_sst_size - new_sst_size, + new_sst_size, + match_ids, + ); + commit_sstables .entry(group_id) .or_default() diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 2dac4a843ead..c68fc4222f28 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -30,8 +30,8 @@ use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, PbGroupConstruct, - PbGroupDestroy, PbStateTableInfoDelta, + CompactionConfig, CompactionGroupInfo, CompatibilityVersion, PbGroupConstruct, PbGroupDestroy, + PbStateTableInfoDelta, }; use tokio::sync::OnceCell; @@ -605,16 +605,6 @@ impl HummockManager { drop(compaction_guard); self.report_compact_tasks(canceled_tasks).await?; - // Don't trigger compactions if we enable deterministic compaction - if !self.env.opts.compaction_deterministic_test { - // commit_epoch may contains SSTs from any compaction group - self.try_send_compaction_request(parent_group_id, compact_task::TaskType::SpaceReclaim); - self.try_send_compaction_request( - target_compaction_group_id, - compact_task::TaskType::SpaceReclaim, - ); - } - self.metrics .move_state_table_count .with_label_values(&[&parent_group_id.to_string()]) diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 73fb98c03e0d..4b9ddcb18cbc 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -41,10 +41,7 @@ use risingwave_pb::hummock::{HummockPinnedSnapshot, HummockPinnedVersion, Hummoc use risingwave_pb::meta::add_worker_node_request::Property; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; -use crate::hummock::compaction::selector::{ - default_compaction_selector, CompactionSelector, ManualCompactionOption, - SpaceReclaimCompactionSelector, -}; +use crate::hummock::compaction::selector::{default_compaction_selector, ManualCompactionOption}; use crate::hummock::error::Error; use crate::hummock::test_utils::*; use crate::hummock::{HummockManager, HummockManagerRef}; @@ -1200,6 +1197,8 @@ async fn test_version_stats() { total_key_size: 1000, total_value_size: 100, total_key_count: 10, + + total_compressed_size: 1024 * 1024, }; let ssts_with_table_ids = vec![vec![1, 2], vec![2, 3]]; let sst_ids = get_sst_ids(&hummock_manager, ssts_with_table_ids.len() as _).await; @@ -1271,6 +1270,7 @@ async fn test_version_stats() { total_key_size: -1000, total_value_size: -100, total_key_count: -10, + total_compressed_size: 0, // unused }, ), ( @@ -1279,6 +1279,7 @@ async fn test_version_stats() { total_key_size: -1000, total_value_size: -100, total_key_count: -10, + total_compressed_size: 0, // unused }, ), ]); @@ -1384,11 +1385,6 @@ async fn test_split_compaction_group_on_demand_basic() { .unwrap_err(); assert_eq!("compaction group error: invalid group 100", err.to_string()); - hummock_manager - .split_compaction_group(2, &[]) - .await - .unwrap(); - let err = hummock_manager .split_compaction_group(2, &[100]) .await @@ -1477,7 +1473,7 @@ async fn test_split_compaction_group_on_demand_basic() { assert!(new_group_id > StaticCompactionGroupId::End as u64); assert_eq!( get_compaction_group_object_ids(¤t_version, 2), - vec![10, 11] + Vec::::new() ); assert_eq!( get_compaction_group_object_ids(¤t_version, new_group_id), @@ -1667,15 +1663,6 @@ async fn test_split_compaction_group_trivial_expired() { .split_compaction_group(2, &[100]) .await .unwrap(); - let mut selector: Box = - Box::::default(); - let (mut normal_tasks, _unscheduled) = hummock_manager - .get_compact_tasks_impl(vec![2], 1, &mut selector) - .await - .unwrap(); - use crate::hummock::manager::CompactStatus; - let reclaim_task = normal_tasks.pop().unwrap(); - assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); let current_version = hummock_manager.get_current_version().await; let new_group_id = current_version.levels.keys().max().cloned().unwrap(); @@ -1854,7 +1841,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { current_version.get_compaction_group_levels(2).levels[base_level - 1] .table_infos .len(), - 2 + 1 ); assert_eq!( @@ -1865,7 +1852,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { assert_eq!( current_version.get_compaction_group_levels(2).levels[base_level - 1].table_infos[0] .table_ids, - vec![100, 101] + vec![101] ); assert_eq!( current_version @@ -1881,7 +1868,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { .levels[base_level - 1] .table_infos[0] .table_ids, - vec![100, 101] + vec![100] ); assert_eq!( current_version @@ -2047,38 +2034,15 @@ async fn test_move_tables_between_compaction_group() { current_version.get_compaction_group_levels(2).levels[base_level - 1] .table_infos .len(), - 3 + 2 ); let level = ¤t_version .get_compaction_group_levels(new_group_id) .levels[base_level - 1]; assert_eq!(level.table_infos[0].table_ids, vec![100]); - assert_eq!(level.table_infos[1].table_ids, vec![100, 101]); + assert_eq!(level.table_infos[1].table_ids, vec![100]); assert_eq!(level.table_infos.len(), 2); - - let mut selector: Box = - Box::::default(); - - let compaction_task = hummock_manager - .get_compact_task(2, &mut selector) - .await - .unwrap() - .unwrap(); - assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); - assert_eq!(compaction_task.input_ssts[0].table_infos[0].object_id, 12); - assert_eq!(compaction_task.existing_table_ids, vec![101]); - - let ret = hummock_manager - .report_compact_task( - compaction_task.task_id, - TaskStatus::Success, - vec![gen_sstable_info(20, 2, vec![101])], - None, - ) - .await - .unwrap(); - assert!(ret); } #[tokio::test] @@ -2137,11 +2101,13 @@ async fn test_gc_stats() { hummock_manager.create_version_checkpoint(0).await.unwrap(), 0 ); + assert_eq_gc_stats(0, 0, 6, 3, 2, 4); hummock_manager .unpin_version_before(context_id, HummockVersionId::MAX) .await .unwrap(); + assert_eq_gc_stats(0, 0, 6, 3, 2, 4); assert_eq!( hummock_manager.create_version_checkpoint(0).await.unwrap(), @@ -2353,7 +2319,7 @@ async fn test_unregister_moved_table() { assert_eq!(current_version.levels.len(), 3); assert_eq!( get_compaction_group_object_ids(¤t_version, 2), - vec![10, 11] + vec![11] ); assert_eq!( get_compaction_group_object_ids(¤t_version, new_group_id), @@ -2387,7 +2353,7 @@ async fn test_unregister_moved_table() { assert!(!current_version.levels.contains_key(&new_group_id)); assert_eq!( get_compaction_group_object_ids(¤t_version, 2), - vec![10, 11] + vec![11] ); assert_eq!( current_version diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index e50242a9d44e..afacce8dfdec 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -475,7 +475,7 @@ impl DiagnoseCommand { let mut visit_level = |level: &Level| { sst_num += level.table_infos.len(); sst_total_file_size += - level.table_infos.iter().map(|t| t.file_size).sum::(); + level.table_infos.iter().map(|t| t.sst_size).sum::(); for sst in &level.table_infos { if sst.total_key_count == 0 { continue; diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index bfd6402e21a8..ca6585f46fd5 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1013,7 +1013,13 @@ fn split_sst_info_for_level( .cloned() .collect_vec(); if !removed_table_ids.is_empty() { - let branch_sst = split_sst(sst_info, new_sst_id); + let branch_sst = split_sst( + sst_info, + new_sst_id, + sst_info.sst_size / 2, + sst_info.sst_size / 2, + member_table_ids.iter().cloned().collect_vec(), + ); insert_table_infos.push(branch_sst); } } @@ -1220,7 +1226,7 @@ pub fn object_size_map(version: &HummockVersion) -> HashMap Vec { res } -pub fn split_sst(sst_info: &mut SstableInfo, new_sst_id: &mut u64) -> SstableInfo { +pub fn split_sst( + sst_info: &mut SstableInfo, + new_sst_id: &mut u64, + old_sst_size: u64, + new_sst_size: u64, + new_sst_table_ids: Vec, +) -> SstableInfo { let mut branch_table_info = sst_info.clone(); branch_table_info.sst_id = *new_sst_id; + branch_table_info.sst_size = new_sst_size; + sst_info.sst_id = *new_sst_id + 1; + sst_info.sst_size = old_sst_size; + + { + // related github.com/risingwavelabs/risingwave/pull/17898/ + // This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898 + + let set1: HashSet<_> = sst_info.table_ids.iter().cloned().collect(); + let set2: HashSet<_> = new_sst_table_ids.iter().cloned().collect(); + let intersection: Vec<_> = set1.intersection(&set2).cloned().collect(); + + // Update table_ids + branch_table_info.table_ids = intersection; + sst_info + .table_ids + .retain(|table_id| !branch_table_info.table_ids.contains(table_id)); + } + *new_sst_id += 1; branch_table_info diff --git a/src/storage/hummock_sdk/src/table_stats.rs b/src/storage/hummock_sdk/src/table_stats.rs index 4fce0e0f048a..0ab7a252b880 100644 --- a/src/storage/hummock_sdk/src/table_stats.rs +++ b/src/storage/hummock_sdk/src/table_stats.rs @@ -29,6 +29,10 @@ pub struct TableStats { pub total_key_size: i64, pub total_value_size: i64, pub total_key_count: i64, + + // `total_compressed_size`` represents the size that the table takes up in the output sst + // and this field is only filled and used by CN flushes, not compactor compaction + pub total_compressed_size: u64, } impl From<&TableStats> for PbTableStats { @@ -37,6 +41,7 @@ impl From<&TableStats> for PbTableStats { total_key_size: value.total_key_size, total_value_size: value.total_value_size, total_key_count: value.total_key_count, + total_compressed_size: value.total_compressed_size, } } } @@ -53,6 +58,7 @@ impl From<&PbTableStats> for TableStats { total_key_size: value.total_key_size, total_value_size: value.total_value_size, total_key_count: value.total_key_count, + total_compressed_size: value.total_compressed_size, } } } @@ -62,6 +68,7 @@ impl TableStats { self.total_key_size += other.total_key_size; self.total_value_size += other.total_value_size; self.total_key_count += other.total_key_count; + self.total_compressed_size += other.total_compressed_size; } } @@ -69,6 +76,7 @@ pub fn add_prost_table_stats(this: &mut PbTableStats, other: &PbTableStats) { this.total_key_size += other.total_key_size; this.total_value_size += other.total_value_size; this.total_key_count += other.total_key_count; + this.total_compressed_size += other.total_compressed_size; } pub fn add_prost_table_stats_map(this: &mut PbTableStatsMap, other: &PbTableStatsMap) { diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 6eb49d627b34..093612623cce 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -128,7 +128,6 @@ pub struct TaskConfig { /// doesn't belong to this divided SST. See `Compactor::compact_and_build_sst`. pub stats_target_table_ids: Option>, pub task_type: PbTaskType, - pub is_target_l0_or_lbase: bool, pub use_block_based_filter: bool, pub table_vnode_partition: BTreeMap, @@ -508,7 +507,7 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorCon .collect_vec(); let compaction_size = sstable_infos .iter() - .map(|table_info| table_info.file_size) + .map(|table_info| table_info.sst_size) .sum::(); let all_ssts_are_blocked_filter = sstable_infos @@ -575,7 +574,7 @@ pub async fn generate_splits_for_task( .collect_vec(); let compaction_size = sstable_infos .iter() - .map(|table_info| table_info.file_size) + .map(|table_info| table_info.sst_size) .sum::(); if !optimize_by_copy_block { @@ -612,7 +611,7 @@ pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorCo .collect_vec(); let select_size = select_table_infos .iter() - .map(|table| table.file_size) + .map(|table| table.sst_size) .sum::(); context .compactor_metrics @@ -625,7 +624,7 @@ pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorCo .with_label_values(&[&group_label, &cur_level_label]) .inc_by(select_table_infos.len() as u64); - let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::(); + let target_level_read_bytes = target_table_infos.iter().map(|t| t.sst_size).sum::(); let next_level_label = compact_task.target_level.to_string(); context .compactor_metrics @@ -660,7 +659,7 @@ pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &Compacto .collect_vec(); let compaction_size = sstable_infos .iter() - .map(|table_info| table_info.file_size) + .map(|table_info| table_info.sst_size) .sum::(); let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; calculate_task_parallelism_impl( diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 773b2d565550..f812ff5cd6ff 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -111,8 +111,6 @@ impl CompactorRunner { watermark: task.watermark, stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())), task_type: task.task_type, - is_target_l0_or_lbase: task.target_level == 0 - || task.target_level == task.base_level, use_block_based_filter, table_vnode_partition: task.table_vnode_partition.clone(), table_schemas: task diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 227a50cb5813..9851afc47f14 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -294,7 +294,6 @@ impl CompactorRunner { watermark: task.watermark, stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())), task_type: task.task_type, - is_target_l0_or_lbase: task.target_level == 0 || task.target_level == task.base_level, table_vnode_partition: task.table_vnode_partition.clone(), use_block_based_filter: true, table_schemas: Default::default(), @@ -493,10 +492,10 @@ impl CompactorRunner { } let mut total_read_bytes = 0; for sst in &self.left.sstables { - total_read_bytes += sst.file_size; + total_read_bytes += sst.sst_size; } for sst in &self.right.sstables { - total_read_bytes += sst.file_size; + total_read_bytes += sst.sst_size; } self.metrics .compact_fast_runner_bytes diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 00680f5906cc..fab4497433dd 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -259,6 +259,7 @@ async fn compact_shared_buffer( .compactor_metrics .write_build_l0_bytes .inc_by(sst_info.file_size()); + sst_infos.push(sst_info.sst_info.clone()); } level0.extend(ssts); @@ -557,7 +558,6 @@ impl SharedBufferCompactRunner { watermark: GC_WATERMARK_FOR_FLUSH, stats_target_table_ids: None, task_type: compact_task::TaskType::SharedBuffer, - is_target_l0_or_lbase: true, table_vnode_partition, use_block_based_filter, table_schemas: Default::default(), diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 5a3dc3ef598f..32960c7b8f97 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -244,6 +244,7 @@ impl SstableBuilder { self.filter_builder.add_raw_data(filter_data); let block_meta = self.block_metas.last_mut().unwrap(); self.writer.write_block_bytes(buf, block_meta).await?; + Ok(true) } @@ -539,6 +540,21 @@ impl SstableBuilder { let bloom_filter_size = meta.bloom_filter.len(); let sstable_file_size = sst_info.file_size as usize; + if !meta.block_metas.is_empty() { + // fill total_compressed_size + let mut last_table_id = meta.block_metas[0].table_id().table_id(); + let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap(); + for block_meta in &meta.block_metas { + let block_table_id = block_meta.table_id(); + if last_table_id != block_table_id.table_id() { + last_table_id = block_table_id.table_id(); + last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap(); + } + + last_table_stats.total_compressed_size += block_meta.len as u64; + } + } + let writer_output = self.writer.finish(meta).await?; Ok(SstableBuilderOutput:: { sst_info: LocalSstableInfo::new(sst_info, self.table_stats), diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index cea2c42529ce..ee26c922c648 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -578,7 +578,7 @@ impl SstableStore { let store = self.store.clone(); let meta_path = self.get_sst_data_path(object_id); let stats_ptr = stats.remote_io_time.clone(); - let range = sst.meta_offset as usize..sst.file_size as usize; + let range = sst.meta_offset as usize..; async move { let now = Instant::now(); let buf = store.read(&meta_path, range).await?; From edaace2f93877f1bc25c523da4d434b869aaf73a Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 27 Aug 2024 14:06:37 +0800 Subject: [PATCH 2/3] fix(batch): fix CTAS pins snapshot too long (#18248) --- Cargo.lock | 1 + src/frontend/src/handler/query.rs | 44 ++--------------------------- src/utils/pgwire/Cargo.toml | 1 + src/utils/pgwire/src/pg_extended.rs | 40 +++++++++++++++++++++++++- src/utils/pgwire/src/pg_protocol.rs | 30 +++++++++++++++++++- src/utils/pgwire/src/pg_response.rs | 19 ++++++++++++- 6 files changed, 90 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e8ba1952a80..c8bb3bb7afa8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8534,6 +8534,7 @@ dependencies = [ "openssl", "panic-message", "parking_lot 0.12.1", + "postgres-types", "reqwest 0.12.4", "risingwave_common", "risingwave_sqlparser", diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index de60743e4717..480bb1c7f656 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -16,12 +16,10 @@ use std::collections::HashSet; use std::sync::Arc; use std::time::Instant; -use futures::StreamExt; use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Format; -use postgres_types::FromSql; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Schema; @@ -424,47 +422,9 @@ async fn execute( let stmt_type = plan_fragmenter_result.stmt_type; let query_start_time = Instant::now(); - let (mut row_stream, pg_descs) = + let (row_stream, pg_descs) = create_stream(session.clone(), plan_fragmenter_result, formats).await?; - let row_cnt: Option = match stmt_type { - StatementType::SELECT - | StatementType::INSERT_RETURNING - | StatementType::DELETE_RETURNING - | StatementType::UPDATE_RETURNING => None, - - StatementType::INSERT | StatementType::DELETE | StatementType::UPDATE => { - let first_row_set = row_stream.next().await; - let first_row_set = match first_row_set { - None => { - return Err(RwError::from(ErrorCode::InternalError( - "no affected rows in output".to_string(), - ))) - } - Some(row) => row?, - }; - let affected_rows_str = first_row_set[0].values()[0] - .as_ref() - .expect("compute node should return affected rows in output"); - if let Format::Binary = first_field_format { - Some( - i64::from_sql(&postgres_types::Type::INT8, affected_rows_str) - .unwrap() - .try_into() - .expect("affected rows count large than i64"), - ) - } else { - Some( - String::from_utf8(affected_rows_str.to_vec()) - .unwrap() - .parse() - .unwrap_or_default(), - ) - } - } - _ => unreachable!(), - }; - // We need to do some post work after the query is finished and before the `Complete` response // it sent. This is achieved by the `callback` in `PgResponse`. let callback = async move { @@ -510,7 +470,7 @@ async fn execute( }; Ok(PgResponse::builder(stmt_type) - .row_cnt_opt(row_cnt) + .row_cnt_format_opt(Some(first_field_format)) .values(row_stream, pg_descs) .callback(callback) .into()) diff --git a/src/utils/pgwire/Cargo.toml b/src/utils/pgwire/Cargo.toml index d5bd081b92d6..8f0e55ff8446 100644 --- a/src/utils/pgwire/Cargo.toml +++ b/src/utils/pgwire/Cargo.toml @@ -25,6 +25,7 @@ jsonwebtoken = "9" openssl = "0.10.66" panic-message = "0.3" parking_lot = { workspace = true } +postgres-types = { version = "0.2.6" } reqwest = "0.12.2" risingwave_common = { workspace = true } risingwave_sqlparser = { workspace = true } diff --git a/src/utils/pgwire/src/pg_extended.rs b/src/utils/pgwire/src/pg_extended.rs index e4a79f0ae0d6..f26a9df5ad2b 100644 --- a/src/utils/pgwire/src/pg_extended.rs +++ b/src/utils/pgwire/src/pg_extended.rs @@ -16,13 +16,14 @@ use std::vec::IntoIter; use futures::stream::FusedStream; use futures::{StreamExt, TryStreamExt}; +use postgres_types::FromSql; use tokio::io::{AsyncRead, AsyncWrite}; use crate::error::{PsqlError, PsqlResult}; use crate::pg_message::{BeCommandCompleteMessage, BeMessage}; use crate::pg_protocol::Conn; use crate::pg_response::{PgResponse, ValuesStream}; -use crate::types::Row; +use crate::types::{Format, Row}; pub struct ResultCache where @@ -118,6 +119,43 @@ where } else { msg_stream.write_no_flush(&BeMessage::PortalSuspended)?; } + } else if self.result.stmt_type().is_dml() && !self.result.stmt_type().is_returning() { + let first_row_set = self.result.values_stream().next().await; + let first_row_set = match first_row_set { + None => { + return Err(PsqlError::Uncategorized( + anyhow::anyhow!("no affected rows in output").into(), + )); + } + Some(row) => row.map_err(PsqlError::SimpleQueryError)?, + }; + let affected_rows_str = first_row_set[0].values()[0] + .as_ref() + .expect("compute node should return affected rows in output"); + + let affected_rows_cnt: i32 = match self.result.row_cnt_format() { + Some(Format::Binary) => { + i64::from_sql(&postgres_types::Type::INT8, affected_rows_str) + .unwrap() + .try_into() + .expect("affected rows count large than i64") + } + Some(Format::Text) => String::from_utf8(affected_rows_str.to_vec()) + .unwrap() + .parse() + .unwrap_or_default(), + None => panic!("affected rows count should be set"), + }; + + // Run the callback before sending the `CommandComplete` message. + self.result.run_callback().await?; + + msg_stream.write_no_flush(&BeMessage::CommandComplete(BeCommandCompleteMessage { + stmt_type: self.result.stmt_type(), + rows_cnt: affected_rows_cnt, + }))?; + + query_end = true; } else { // Run the callback before sending the `CommandComplete` message. self.result.run_callback().await?; diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index 72b99f6d50d6..630feb1ebd28 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -658,6 +658,34 @@ where stmt_type: res.stmt_type(), rows_cnt, }))?; + } else if res.stmt_type().is_dml() && !res.stmt_type().is_returning() { + let first_row_set = res.values_stream().next().await; + let first_row_set = match first_row_set { + None => { + return Err(PsqlError::Uncategorized( + anyhow::anyhow!("no affected rows in output").into(), + )); + } + Some(row) => row.map_err(PsqlError::SimpleQueryError)?, + }; + let affected_rows_str = first_row_set[0].values()[0] + .as_ref() + .expect("compute node should return affected rows in output"); + + assert!(matches!(res.row_cnt_format(), Some(Format::Text))); + let affected_rows_cnt = String::from_utf8(affected_rows_str.to_vec()) + .unwrap() + .parse() + .unwrap_or_default(); + + // Run the callback before sending the `CommandComplete` message. + res.run_callback().await?; + + self.stream + .write_no_flush(&BeMessage::CommandComplete(BeCommandCompleteMessage { + stmt_type: res.stmt_type(), + rows_cnt: affected_rows_cnt, + }))?; } else { // Run the callback before sending the `CommandComplete` message. res.run_callback().await?; @@ -665,7 +693,7 @@ where self.stream .write_no_flush(&BeMessage::CommandComplete(BeCommandCompleteMessage { stmt_type: res.stmt_type(), - rows_cnt: res.affected_rows_cnt().expect("row count should be set"), + rows_cnt: 0, }))?; } diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index 4f55c524942b..5718533f6709 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -22,7 +22,7 @@ use crate::error::PsqlError; use crate::pg_field_descriptor::PgFieldDescriptor; use crate::pg_protocol::ParameterStatus; use crate::pg_server::BoxedError; -use crate::types::Row; +use crate::types::{Format, Row}; pub type RowSet = Vec; pub type RowSetResult = Result; @@ -129,6 +129,8 @@ pub struct PgResponse { // row count of affected row. Used for INSERT, UPDATE, DELETE, COPY, and other statements that // don't return rows. row_cnt: Option, + // Used for INSERT, UPDATE, DELETE to specify the format of the affected row count. + row_cnt_format: Option, notices: Vec, values_stream: Option, callback: Option, @@ -141,6 +143,8 @@ pub struct PgResponseBuilder { // row count of affected row. Used for INSERT, UPDATE, DELETE, COPY, and other statements that // don't return rows. row_cnt: Option, + // Used for INSERT, UPDATE, DELETE to specify the format of the affected row count. + row_cnt_format: Option, notices: Vec, values_stream: Option, callback: Option, @@ -153,6 +157,7 @@ impl From> for PgResponse { Self { stmt_type: builder.stmt_type, row_cnt: builder.row_cnt, + row_cnt_format: builder.row_cnt_format, notices: builder.notices, values_stream: builder.values_stream, callback: builder.callback, @@ -168,6 +173,7 @@ impl PgResponseBuilder { Self { stmt_type, row_cnt, + row_cnt_format: None, notices: vec![], values_stream: None, callback: None, @@ -187,6 +193,13 @@ impl PgResponseBuilder { Self { row_cnt, ..self } } + pub fn row_cnt_format_opt(self, row_cnt_format: Option) -> Self { + Self { + row_cnt_format, + ..self + } + } + pub fn values(self, values_stream: VS, row_desc: Vec) -> Self { Self { values_stream: Some(values_stream), @@ -394,6 +407,10 @@ where self.row_cnt } + pub fn row_cnt_format(&self) -> Option { + self.row_cnt_format + } + pub fn is_query(&self) -> bool { self.stmt_type.is_query() } From 22926d67fff102cd7114b150b8437e3afd4a4231 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Tue, 27 Aug 2024 14:51:44 +0800 Subject: [PATCH 3/3] refactor(iceberg): Separate iceberg source pb from source pb (#18209) --- proto/batch_plan.proto | 8 ++ risedev.yml | 4 +- src/batch/src/executor/iceberg_scan.rs | 74 ++++++++++++++++++- src/batch/src/executor/mod.rs | 1 + src/batch/src/executor/source.rs | 65 ++++++---------- .../optimizer/plan_node/batch_iceberg_scan.rs | 6 +- .../src/scheduler/distributed/stage.rs | 24 +++++- src/frontend/src/scheduler/local.rs | 27 ++++++- src/prost/build.rs | 1 + 9 files changed, 155 insertions(+), 55 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 9b12d0b583d1..8c74b93d96fa 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -66,6 +66,13 @@ message SourceNode { map secret_refs = 6; } +message IcebergScanNode { + repeated plan_common.ColumnCatalog columns = 1; + map with_properties = 2; + repeated bytes split = 3; + map secret_refs = 4; +} + message FileScanNode { enum FileFormat { FILE_FORMAT_UNSPECIFIED = 0; @@ -365,6 +372,7 @@ message PlanNode { MaxOneRowNode max_one_row = 36; LogRowSeqScanNode log_row_seq_scan = 37; FileScanNode file_scan = 38; + IcebergScanNode iceberg_scan = 39; // The following nodes are used for testing. bool block_executor = 100; bool busy_loop_executor = 101; diff --git a/risedev.yml b/risedev.yml index 3c7f8e0e09be..5a5c25ceb55d 100644 --- a/risedev.yml +++ b/risedev.yml @@ -20,7 +20,7 @@ profile: # config-path: src/config/example.toml steps: # If you want to use the local s3 storage, enable the following line - # - use: minio + - use: minio # If you want to use aws-s3, configure AK and SK in env var and enable the following lines: # - use: aws-s3 @@ -40,7 +40,7 @@ profile: - use: frontend # If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well. - # - use: compactor + - use: compactor # If you want to create source from Kafka, uncomment the following lines # - use: kafka diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index ee4e463422c1..fca7745284fe 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -18,12 +18,20 @@ use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; +use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; -use risingwave_common::catalog::Schema; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::DataType; use risingwave_connector::sink::iceberg::IcebergConfig; +use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit}; +use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData}; +use risingwave_connector::WithOptionsSecResolved; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder}; use crate::error::BatchError; use crate::executor::{DataChunk, Executor}; +use crate::task::BatchTaskContext; pub struct IcebergScanExecutor { iceberg_config: IcebergConfig, @@ -108,3 +116,67 @@ impl IcebergScanExecutor { } } } + +pub struct IcebergScanExecutorBuilder {} + +#[async_trait::async_trait] +impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { + async fn new_boxed_executor( + source: &ExecutorBuilder<'_, C>, + inputs: Vec, + ) -> crate::error::Result { + ensure!( + inputs.is_empty(), + "Iceberg source should not have input executor!" + ); + let source_node = try_match_expand!( + source.plan_node().get_node_body().unwrap(), + NodeBody::IcebergScan + )?; + + // prepare connector source + let options_with_secret = WithOptionsSecResolved::new( + source_node.with_properties.clone(), + source_node.secret_refs.clone(), + ); + let config = ConnectorProperties::extract(options_with_secret.clone(), false) + .map_err(BatchError::connector)?; + + let split_list = source_node + .split + .iter() + .map(|split| SplitImpl::restore_from_bytes(split).unwrap()) + .collect_vec(); + assert_eq!(split_list.len(), 1); + + let fields = source_node + .columns + .iter() + .map(|prost| { + let column_desc = prost.column_desc.as_ref().unwrap(); + let data_type = DataType::from(column_desc.column_type.as_ref().unwrap()); + let name = column_desc.name.clone(); + Field::with_name(data_type, name) + }) + .collect(); + let schema = Schema::new(fields); + + if let ConnectorProperties::Iceberg(iceberg_properties) = config + && let SplitImpl::Iceberg(split) = &split_list[0] + { + let iceberg_properties: IcebergProperties = *iceberg_properties; + let split: IcebergSplit = split.clone(); + Ok(Box::new(IcebergScanExecutor::new( + iceberg_properties.to_iceberg_config(), + Some(split.snapshot_id), + split.table_meta.deserialize(), + split.files.into_iter().map(|x| x.deserialize()).collect(), + source.context.get_config().developer.chunk_size, + schema, + source.plan_node().get_identity().clone(), + ))) + } else { + unreachable!() + } + } +} diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 80dc57b4f362..07be18ca7298 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -243,6 +243,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { NodeBody::SortOverWindow => SortOverWindowExecutor, NodeBody::MaxOneRow => MaxOneRowExecutor, NodeBody::FileScan => FileScanExecutorBuilder, + NodeBody::IcebergScan => IcebergScanExecutorBuilder, // Follow NodeBody only used for test NodeBody::BlockExecutor => BlockExecutorBuilder, NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder, diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index c4862556ae82..7a37be918389 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -21,7 +21,6 @@ use risingwave_common::array::{DataChunk, Op, StreamChunk}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; use risingwave_common::types::DataType; use risingwave_connector::parser::SpecificParserConfig; -use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit}; use risingwave_connector::source::monitor::SourceMetrics; use risingwave_connector::source::reader::reader::SourceReader; use risingwave_connector::source::{ @@ -32,7 +31,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use super::Executor; use crate::error::{BatchError, Result}; -use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, IcebergScanExecutor}; +use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder}; use crate::task::BatchTaskContext; pub struct SourceExecutor { @@ -103,46 +102,28 @@ impl BoxedExecutorBuilder for SourceExecutor { .collect(); let schema = Schema::new(fields); - if let ConnectorProperties::Iceberg(iceberg_properties) = config { - let iceberg_properties: IcebergProperties = *iceberg_properties; - assert_eq!(split_list.len(), 1); - if let SplitImpl::Iceberg(split) = &split_list[0] { - let split: IcebergSplit = split.clone(); - Ok(Box::new(IcebergScanExecutor::new( - iceberg_properties.to_iceberg_config(), - Some(split.snapshot_id), - split.table_meta.deserialize(), - split.files.into_iter().map(|x| x.deserialize()).collect(), - source.context.get_config().developer.chunk_size, - schema, - source.plan_node().get_identity().clone(), - ))) - } else { - unreachable!() - } - } else { - let source_reader = SourceReader { - config, - columns, - parser_config, - connector_message_buffer_size: source - .context() - .get_config() - .developer - .connector_message_buffer_size, - }; - - Ok(Box::new(SourceExecutor { - source: source_reader, - column_ids, - metrics: source.context().source_metrics(), - source_id: TableId::new(source_node.source_id), - split_list, - schema, - identity: source.plan_node().get_identity().clone(), - chunk_size: source.context().get_config().developer.chunk_size, - })) - } + assert!(!matches!(config, ConnectorProperties::Iceberg(_))); + let source_reader = SourceReader { + config, + columns, + parser_config, + connector_message_buffer_size: source + .context() + .get_config() + .developer + .connector_message_buffer_size, + }; + + Ok(Box::new(SourceExecutor { + source: source_reader, + column_ids, + metrics: source.context().source_metrics(), + source_id: TableId::new(source_node.source_id), + split_list, + schema, + identity: source.plan_node().get_identity().clone(), + chunk_size: source.context().get_config().developer.chunk_size, + })) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs index 3433feb8d210..4333fcaa3e90 100644 --- a/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::SourceNode; +use risingwave_pb::batch_plan::IcebergScanNode; use risingwave_sqlparser::ast::AsOf; use super::batch::prelude::*; @@ -99,9 +99,7 @@ impl ToBatchPb for BatchIcebergScan { fn to_batch_prost_body(&self) -> NodeBody { let source_catalog = self.source_catalog().unwrap(); let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts(); - NodeBody::Source(SourceNode { - source_id: source_catalog.id, - info: Some(source_catalog.info.clone()), + NodeBody::IcebergScan(IcebergScanNode { columns: self .core .column_catalog diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index e30dfa0dad37..bb18e2143aa7 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -1052,9 +1052,7 @@ impl StageRunner { node_body: Some(NodeBody::LogRowSeqScan(scan_node)), } } - PlanNodeType::BatchSource - | PlanNodeType::BatchKafkaScan - | PlanNodeType::BatchIcebergScan => { + PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => { let node_body = execution_plan_node.node.clone(); let NodeBody::Source(mut source_node) = node_body else { unreachable!(); @@ -1074,6 +1072,26 @@ impl StageRunner { node_body: Some(NodeBody::Source(source_node)), } } + PlanNodeType::BatchIcebergScan => { + let node_body = execution_plan_node.node.clone(); + let NodeBody::IcebergScan(mut iceberg_scan_node) = node_body else { + unreachable!(); + }; + + let partition = partition + .expect("no partition info for seq scan") + .into_source() + .expect("PartitionInfo should be SourcePartitionInfo"); + iceberg_scan_node.split = partition + .into_iter() + .map(|split| split.encode_to_bytes().into()) + .collect_vec(); + PbPlanNode { + children: vec![], + identity, + node_body: Some(NodeBody::IcebergScan(iceberg_scan_node)), + } + } _ => { let children = execution_plan_node .children diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index a7ff6eabdf7f..a727ddd9db7d 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -554,9 +554,7 @@ impl LocalQueryExecution { node_body: Some(node_body), }) } - PlanNodeType::BatchSource - | PlanNodeType::BatchKafkaScan - | PlanNodeType::BatchIcebergScan => { + PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => { let mut node_body = execution_plan_node.node.clone(); match &mut node_body { NodeBody::Source(ref mut source_node) => { @@ -579,6 +577,29 @@ impl LocalQueryExecution { node_body: Some(node_body), }) } + PlanNodeType::BatchIcebergScan => { + let mut node_body = execution_plan_node.node.clone(); + match &mut node_body { + NodeBody::IcebergScan(ref mut iceberg_scan_node) => { + if let Some(partition) = partition { + let partition = partition + .into_source() + .expect("PartitionInfo should be SourcePartitionInfo here"); + iceberg_scan_node.split = partition + .into_iter() + .map(|split| split.encode_to_bytes().into()) + .collect_vec(); + } + } + _ => unreachable!(), + } + + Ok(PbPlanNode { + children: vec![], + identity, + node_body: Some(node_body), + }) + } PlanNodeType::BatchLookupJoin => { let mut node_body = execution_plan_node.node.clone(); match &mut node_body { diff --git a/src/prost/build.rs b/src/prost/build.rs index 0682a63a02ed..6758c0ef437b 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -80,6 +80,7 @@ fn main() -> Result<(), Box> { ".stream_plan.SourceBackfillNode", ".stream_plan.StreamSource", ".batch_plan.SourceNode", + ".batch_plan.IcebergScanNode", ]; // Build protobuf structs.