Skip to content

Commit

Permalink
feat(frontend): use WatermarkCacheStateTable in TemporalFilter (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Aug 9, 2023
1 parent 87d5d98 commit 6d4d5ac
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 33 deletions.
5 changes: 5 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ message Table {
optional uint64 initialized_at_epoch = 28;
optional uint64 created_at_epoch = 29;

// This field is introduced in v1.2.0. It is used to indicate whether the table should use
// watermark_cache to avoid state cleaning as a performance optimization.
// In older versions we can just initialize without it.
bool cleaned_by_watermark = 30;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ pub struct TableCatalog {
pub created_at_epoch: Option<Epoch>,

pub initialized_at_epoch: Option<Epoch>,

/// Indicate whether to use watermark cache for state table.
pub cleaned_by_watermark: bool,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -233,6 +236,11 @@ impl TableCatalog {
self
}

pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
self.cleaned_by_watermark = cleaned_by_watermark;
self
}

pub fn conflict_behavior(&self) -> ConflictBehavior {
self.conflict_behavior
}
Expand Down Expand Up @@ -392,6 +400,7 @@ impl TableCatalog {
cardinality: Some(self.cardinality.to_protobuf()),
initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
cleaned_by_watermark: self.cleaned_by_watermark,
}
}

Expand Down Expand Up @@ -497,6 +506,7 @@ impl From<PbTable> for TableCatalog {
.unwrap_or_else(Cardinality::unknown),
created_at_epoch: tb.created_at_epoch.map(Epoch::from),
initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
cleaned_by_watermark: matches!(tb.cleaned_by_watermark, true),
}
}
}
Expand Down Expand Up @@ -586,6 +596,7 @@ mod tests {
dist_key_in_pk: vec![],
cardinality: None,
created_at_epoch: None,
cleaned_by_watermark: false,
}
.into();

Expand Down Expand Up @@ -639,6 +650,7 @@ mod tests {
cardinality: Cardinality::unknown(),
created_at_epoch: None,
initialized_at_epoch: None,
cleaned_by_watermark: false,
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
13 changes: 12 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,25 @@ impl_plan_tree_node_for_binary! { StreamDynamicFilter }
impl StreamNode for StreamDynamicFilter {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
use generic::dynamic_filter::*;
let cleaned_by_watermark = if let Some(condition) = self.core.predicate().as_expr_unless_true()
&& let Some((_input_expr, cmp, _now_expr)) = condition.as_now_comparison_cond() {
// Only GT / GTE will undergo state cleaning.
// As such, we only need cache for these cases.
matches!(cmp, ExprType::GreaterThan | ExprType::GreaterThanOrEqual)
// NOTE(kwannoel): `now_expr` is checked within `as_now_comparison_cond`.
// so we don't need to check it here.
} else {
false
};
let condition = self
.core
.predicate()
.as_expr_unless_true()
.map(|x| x.to_expr_proto());
let left_index = self.core.left_index();
let left_table = infer_left_internal_table_catalog(&self.base, left_index)
.with_id(state.gen_table_id_wrapped());
.with_id(state.gen_table_id_wrapped())
.with_cleaned_by_watermark(cleaned_by_watermark);
let right = self.right();
let right_table = infer_right_internal_table_catalog(right.plan_base())
.with_id(state.gen_table_id_wrapped());
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ impl StreamMaterialize {
cardinality,
created_at_epoch: None,
initialized_at_epoch: None,
cleaned_by_watermark: false,
})
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ impl TableCatalogBuilder {
cardinality: Cardinality::unknown(), // TODO(card): cardinality of internal table
created_at_epoch: None,
initialized_at_epoch: None,
cleaned_by_watermark: false,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/src/filter_key_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ mod tests {
dist_key_in_pk: vec![],
cardinality: None,
created_at_epoch: None,
cleaned_by_watermark: false,
}
}

Expand Down
1 change: 0 additions & 1 deletion src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ pub type ReplicatedStateTable<S> = StateTableInner<S, BasicSerde, true>;
/// It will reduce state cleaning overhead.
pub type WatermarkCacheStateTable<S> =
StateTableInner<S, BasicSerde, false, DefaultWatermarkBufferStrategy, true>;

pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
StateTableInner<S, BasicSerde, false, DefaultWatermarkBufferStrategy, USE_WATERMARK_CACHE>;

Expand Down
16 changes: 9 additions & 7 deletions src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,27 @@ use super::monitor::StreamingMetrics;
use super::{
ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, Message, PkIndices, PkIndicesRef,
};
use crate::common::table::state_table::StateTable;
use crate::common::table::state_table::{StateTable, WatermarkCacheParameterizedStateTable};
use crate::common::StreamChunkBuilder;
use crate::executor::expect_first_barrier_from_aligned_stream;

pub struct DynamicFilterExecutor<S: StateStore> {
pub struct DynamicFilterExecutor<S: StateStore, const USE_WATERMARK_CACHE: bool> {
ctx: ActorContextRef,
source_l: Option<BoxedExecutor>,
source_r: Option<BoxedExecutor>,
key_l: usize,
pk_indices: PkIndices,
identity: String,
comparator: ExprNodeType,
left_table: StateTable<S>,
left_table: WatermarkCacheParameterizedStateTable<S, USE_WATERMARK_CACHE>,
right_table: StateTable<S>,
schema: Schema,
metrics: Arc<StreamingMetrics>,
/// The maximum size of the chunk produced by executor at a time.
chunk_size: usize,
}

impl<S: StateStore> DynamicFilterExecutor<S> {
impl<S: StateStore, const USE_WATERMARK_CACHE: bool> DynamicFilterExecutor<S, USE_WATERMARK_CACHE> {
#[allow(clippy::too_many_arguments)]
pub fn new(
ctx: ActorContextRef,
Expand All @@ -69,7 +69,7 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
pk_indices: PkIndices,
executor_id: u64,
comparator: ExprNodeType,
state_table_l: StateTable<S>,
state_table_l: WatermarkCacheParameterizedStateTable<S, USE_WATERMARK_CACHE>,
state_table_r: StateTable<S>,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
Expand Down Expand Up @@ -464,7 +464,9 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
}
}

impl<S: StateStore> Executor for DynamicFilterExecutor<S> {
impl<S: StateStore, const USE_WATERMARK_CACHE: bool> Executor
for DynamicFilterExecutor<S, USE_WATERMARK_CACHE>
{
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.into_stream().boxed()
}
Expand Down Expand Up @@ -536,7 +538,7 @@ mod tests {
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![0]);
let (tx_r, source_r) = MockSource::channel(schema, vec![]);

let executor = DynamicFilterExecutor::<MemoryStateStore>::new(
let executor = DynamicFilterExecutor::<MemoryStateStore, false>::new(
ActorContext::create(123),
Box::new(source_l),
Box::new(source_r),
Expand Down
66 changes: 42 additions & 24 deletions src/stream/src/from_proto/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_pb::expr::expr_node::Type::{
use risingwave_pb::stream_plan::DynamicFilterNode;

use super::*;
use crate::common::table::state_table::StateTable;
use crate::common::table::state_table::{StateTable, WatermarkCacheStateTable};
use crate::executor::DynamicFilterExecutor;

pub struct DynamicFilterExecutorBuilder;
Expand Down Expand Up @@ -56,30 +56,48 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder {
);
}

// TODO: use consistent operation for dynamic filter <https://github.com/risingwavelabs/risingwave/issues/3893>
let state_table_l = StateTable::from_table_catalog_inconsistent_op(
node.get_left_table()?,
store.clone(),
Some(vnodes),
)
.await;

let state_table_r =
StateTable::from_table_catalog_inconsistent_op(node.get_right_table()?, store, None)
.await;
StateTable::from_table_catalog(node.get_right_table()?, store.clone(), None).await;

Ok(Box::new(DynamicFilterExecutor::new(
params.actor_context,
source_l,
source_r,
key_l,
params.pk_indices,
params.executor_id,
comparator,
state_table_l,
state_table_r,
params.executor_stats,
params.env.config().developer.chunk_size,
)))
let left_table = node.get_left_table()?;
if left_table.get_cleaned_by_watermark() {
let state_table_l = WatermarkCacheStateTable::from_table_catalog(
node.get_left_table()?,
store,
Some(vnodes),
)
.await;

Ok(Box::new(DynamicFilterExecutor::new(
params.actor_context,
source_l,
source_r,
key_l,
params.pk_indices,
params.executor_id,
comparator,
state_table_l,
state_table_r,
params.executor_stats,
params.env.config().developer.chunk_size,
)))
} else {
let state_table_l =
StateTable::from_table_catalog(node.get_left_table()?, store, Some(vnodes)).await;

Ok(Box::new(DynamicFilterExecutor::new(
params.actor_context,
source_l,
source_r,
key_l,
params.pk_indices,
params.executor_id,
comparator,
state_table_l,
state_table_r,
params.executor_stats,
params.env.config().developer.chunk_size,
)))
}
}
}
1 change: 1 addition & 0 deletions src/tests/compaction_test/src/delete_range_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async fn compaction_test(
dist_key_in_pk: vec![],
cardinality: None,
created_at_epoch: None,
cleaned_by_watermark: false,
};
let mut delete_range_table = delete_key_table.clone();
delete_range_table.id = 2;
Expand Down

0 comments on commit 6d4d5ac

Please sign in to comment.