From e8d5ca303029ea448da52ddbb98fe77cac138c71 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Tue, 27 Aug 2024 23:07:46 +0800 Subject: [PATCH] fix(meta): fix list changed log epoch retrun null epoch (#18273) --- src/frontend/src/session/cursor_manager.rs | 2 +- src/meta/src/hummock/manager/versioning.rs | 2 +- src/storage/hummock_sdk/src/change_log.rs | 9 ++++++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index dd14e8023fbc4..68832d6824718 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -320,7 +320,7 @@ impl SubscriptionCursor { &mut chunk_stream, formats, &from_snapshot, - &self.fields, + &fields, handle_args.session.clone(), ); diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 30c8b09a69d0f..7b320847e64fd 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -310,7 +310,7 @@ impl HummockManager { .get(&TableId::new(table_id)) { let table_change_log = table_change_log.clone(); - table_change_log.get_epochs(min_epoch, max_count as usize) + table_change_log.get_non_empty_epochs(min_epoch, max_count as usize) } else { vec![] } diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 885880933ce11..433309acab930 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -90,9 +90,16 @@ impl TableChangeLog { &self.0[start..end] } - pub fn get_epochs(&self, min_epoch: u64, max_count: usize) -> Vec { + /// Returns epochs where value is non-null and >= `min_epoch`. + pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec { self.filter_epoch((min_epoch, u64::MAX)) .iter() + .filter(|epoch_change_log| { + // Filter out empty change logs + let new_value_empty = epoch_change_log.new_value.is_empty(); + let old_value_empty = epoch_change_log.old_value.is_empty(); + !new_value_empty || !old_value_empty + }) .flat_map(|epoch_change_log| epoch_change_log.epochs.iter().cloned()) .filter(|a| a >= &min_epoch) .clone()