diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index dd14e8023fbc..68832d682471 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -320,7 +320,7 @@ impl SubscriptionCursor { &mut chunk_stream, formats, &from_snapshot, - &self.fields, + &fields, handle_args.session.clone(), ); diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 30c8b09a69d0..7b320847e64f 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -310,7 +310,7 @@ impl HummockManager { .get(&TableId::new(table_id)) { let table_change_log = table_change_log.clone(); - table_change_log.get_epochs(min_epoch, max_count as usize) + table_change_log.get_non_empty_epochs(min_epoch, max_count as usize) } else { vec![] } diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 885880933ce1..433309acab93 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -90,9 +90,16 @@ impl TableChangeLog { &self.0[start..end] } - pub fn get_epochs(&self, min_epoch: u64, max_count: usize) -> Vec { + /// Returns epochs where value is non-null and >= `min_epoch`. + pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec { self.filter_epoch((min_epoch, u64::MAX)) .iter() + .filter(|epoch_change_log| { + // Filter out empty change logs + let new_value_empty = epoch_change_log.new_value.is_empty(); + let old_value_empty = epoch_change_log.old_value.is_empty(); + !new_value_empty || !old_value_empty + }) .flat_map(|epoch_change_log| epoch_change_log.epochs.iter().cloned()) .filter(|a| a >= &min_epoch) .clone()