Skip to content

Commit

Permalink
refactor(stream): add more assertion on update op handling in hash di…
Browse files Browse the repository at this point in the history
…spatcher (#19347)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Nov 12, 2024
1 parent 022c0a4 commit 337235d
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,10 @@ impl Dispatcher for HashDataDispatcher {
}

if !visible {
assert!(
last_vnode_when_update_delete.is_none(),
"invisible row between U- and U+, op = {op:?}",
);
new_ops.push(op);
continue;
}
Expand All @@ -797,7 +801,11 @@ impl Dispatcher for HashDataDispatcher {
if op == Op::UpdateDelete {
last_vnode_when_update_delete = Some(vnode);
} else if op == Op::UpdateInsert {
if vnode != last_vnode_when_update_delete.unwrap() {
if vnode
!= last_vnode_when_update_delete
.take()
.expect("missing U- before U+")
{
new_ops.push(Op::Delete);
new_ops.push(Op::Insert);
} else {
Expand All @@ -808,6 +816,10 @@ impl Dispatcher for HashDataDispatcher {
new_ops.push(op);
}
}
assert!(
last_vnode_when_update_delete.is_none(),
"missing U+ after U-"
);

let ops = new_ops;

Expand Down

0 comments on commit 337235d

Please sign in to comment.