From 6d02b187495674db172c9bcbaed014ad3e646be9 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Thu, 4 Jan 2024 18:46:50 +0800 Subject: [PATCH] fix(stream/materialize): enforce update_cache when modify fixed_changes (#14364) Signed-off-by: TennyZhuang --- src/stream/src/executor/mview/materialize.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index ddc985528d7f4..9b8707a5ea52a 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -443,26 +443,30 @@ impl MaterializeCache { let mut fixed_changes = MaterializeBuffer::new(); for (op, key, value) in row_ops { let mut update_cache = false; + let fixed_changes = &mut fixed_changes; + // When you change the fixed_changes buffer, you must mark `update_cache` to true so the cache and downstream can be consistent. + let fixed_changes = || { + update_cache = true; + fixed_changes + }; match op { Op::Insert | Op::UpdateInsert => { match conflict_behavior { ConflictBehavior::Overwrite => { match self.force_get(&key) { - Some(old_row) => fixed_changes.update( + Some(old_row) => fixed_changes().update( key.clone(), old_row.row.clone(), value.clone(), ), - None => fixed_changes.insert(key.clone(), value.clone()), + None => fixed_changes().insert(key.clone(), value.clone()), }; - update_cache = true; } ConflictBehavior::IgnoreConflict => { match self.force_get(&key) { Some(_) => (), None => { - fixed_changes.insert(key.clone(), value.clone()); - update_cache = true; + fixed_changes().insert(key.clone(), value.clone()); } }; } @@ -487,18 +491,16 @@ impl MaterializeCache { ConflictBehavior::Overwrite => { match self.force_get(&key) { Some(old_row) => { - fixed_changes.delete(key.clone(), old_row.row.clone()); + fixed_changes().delete(key.clone(), old_row.row.clone()); } None => (), // delete a nonexistent value }; - update_cache = true; } ConflictBehavior::IgnoreConflict => { match self.force_get(&key) { Some(old_row) => { if old_row.row == value { - fixed_changes.delete(key.clone(), old_row.row.clone()); - update_cache = true; + fixed_changes().delete(key.clone(), old_row.row.clone()); } } None => (), // delete a nonexistent value