diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index ddc985528d7f4..9b8707a5ea52a 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -443,26 +443,30 @@ impl MaterializeCache { let mut fixed_changes = MaterializeBuffer::new(); for (op, key, value) in row_ops { let mut update_cache = false; + let fixed_changes = &mut fixed_changes; + // When you change the fixed_changes buffer, you must mark `update_cache` to true so the cache and downstream can be consistent. + let fixed_changes = || { + update_cache = true; + fixed_changes + }; match op { Op::Insert | Op::UpdateInsert => { match conflict_behavior { ConflictBehavior::Overwrite => { match self.force_get(&key) { - Some(old_row) => fixed_changes.update( + Some(old_row) => fixed_changes().update( key.clone(), old_row.row.clone(), value.clone(), ), - None => fixed_changes.insert(key.clone(), value.clone()), + None => fixed_changes().insert(key.clone(), value.clone()), }; - update_cache = true; } ConflictBehavior::IgnoreConflict => { match self.force_get(&key) { Some(_) => (), None => { - fixed_changes.insert(key.clone(), value.clone()); - update_cache = true; + fixed_changes().insert(key.clone(), value.clone()); } }; } @@ -487,18 +491,16 @@ impl MaterializeCache { ConflictBehavior::Overwrite => { match self.force_get(&key) { Some(old_row) => { - fixed_changes.delete(key.clone(), old_row.row.clone()); + fixed_changes().delete(key.clone(), old_row.row.clone()); } None => (), // delete a nonexistent value }; - update_cache = true; } ConflictBehavior::IgnoreConflict => { match self.force_get(&key) { Some(old_row) => { if old_row.row == value { - fixed_changes.delete(key.clone(), old_row.row.clone()); - update_cache = true; + fixed_changes().delete(key.clone(), old_row.row.clone()); } } None => (), // delete a nonexistent value