Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dyn-filter): filter left changes according to watermark before writing state table #17816

Merged
merged 9 commits into from
Aug 1, 2024
2 changes: 1 addition & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ message DynamicFilterNode {
// When the condition changes, we will tell downstream to insert the LHS records which now match the condition.
// If this is false, we need to store RHS records which match the condition in the internal table.
// When the condition changes, we will tell downstream to delete the LHS records which now no longer match the condition.
bool condition_always_relax = 5;
bool condition_always_relax = 5 [deprecated = true];
}

// Delta join with two indexes. This is a pseudo plan node generated on frontend. On meta
Expand Down
7 changes: 1 addition & 6 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,7 @@ pub fn visit_stream_node_tables_inner<F>(
optional!(node.memo_table, "TemporalJoinMemo");
}
NodeBody::DynamicFilter(node) => {
if node.condition_always_relax {
always!(node.left_table, "DynamicFilterLeftNotSatisfy");
} else {
always!(node.left_table, "DynamicFilterLeft");
}

always!(node.left_table, "DynamicFilterLeft");
always!(node.right_table, "DynamicFilterRight");
}

Expand Down
19 changes: 9 additions & 10 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub struct StreamDynamicFilter {
pub base: PlanBase<Stream>,
core: generic::DynamicFilter<PlanRef>,
cleaned_by_watermark: bool,
condition_always_relax: bool,
}

impl StreamDynamicFilter {
Expand Down Expand Up @@ -67,7 +66,6 @@ impl StreamDynamicFilter {
base,
core,
cleaned_by_watermark,
condition_always_relax,
}
}

Expand Down Expand Up @@ -100,7 +98,13 @@ impl StreamDynamicFilter {
// downstream. See `derive_watermark_columns`.
true
}
_ => false,
ExprType::LessThan | ExprType::LessThanOrEqual => {
// For <= and <, watermark on rhs means all rows older than the watermark should already be emitted,
// and future lhs inputs should be directly passed to downstream without any state table operation.
// So, the state table can be cleaned up.
true
}
Comment on lines +101 to +106
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the left table is cleaned by rhs watermark when comparator is < and <= as well.

_ => unreachable!(),
}
} else {
false
Expand All @@ -124,12 +128,6 @@ impl Distill for StreamDynamicFilter {
Pretty::display(&self.cleaned_by_watermark),
));
}
if self.condition_always_relax {
vec.push((
"condition_always_relax",
Pretty::display(&self.condition_always_relax),
));
}
childless_record(
plan_node_name!(
"StreamDynamicFilter",
Expand Down Expand Up @@ -172,12 +170,13 @@ impl StreamNode for StreamDynamicFilter {
let right = self.right();
let right_table = infer_right_internal_table_catalog(right.plan_base())
.with_id(state.gen_table_id_wrapped());
#[allow(deprecated)]
NodeBody::DynamicFilter(DynamicFilterNode {
left_key: left_index as u32,
condition,
left_table: Some(left_table.to_internal_table_prost()),
right_table: Some(right_table.to_internal_table_prost()),
condition_always_relax: self.condition_always_relax,
condition_always_relax: false, // deprecated
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ pub trait LocalStateStore: StaticSendSync {
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_;

/// Get last persisted watermark for a given vnode.
fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;

/// Inserts a key-value entry associated with a given `epoch` into the state store.
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/common/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

pub mod state_table;
mod state_table_cache;
mod watermark;

#[cfg(test)]
pub mod test_state_table;
Expand Down
Loading
Loading