diff --git a/src/stream/src/executor/agg_common.rs b/src/stream/src/executor/agg_common.rs index 2e32c297113c2..1ef9500457345 100644 --- a/src/stream/src/executor/agg_common.rs +++ b/src/stream/src/executor/agg_common.rs @@ -40,6 +40,7 @@ pub struct AggExecutorArgs { pub storages: Vec>, pub result_table: StateTable, pub distinct_dedup_tables: HashMap>, + pub watermark_epoch: AtomicU64Ref, // extra pub extra: Option, @@ -53,5 +54,4 @@ pub struct AggExecutorArgsExtra { // things only used by hash agg currently pub metrics: Arc, pub chunk_size: usize, - pub watermark_epoch: AtomicU64Ref, } diff --git a/src/stream/src/executor/global_simple_agg.rs b/src/stream/src/executor/global_simple_agg.rs index 5bdcc14996924..880b759ab2f5d 100644 --- a/src/stream/src/executor/global_simple_agg.rs +++ b/src/stream/src/executor/global_simple_agg.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::AtomicU64; - use futures::StreamExt; use futures_async_stream::try_stream; use risingwave_common::array::StreamChunk; @@ -32,6 +30,7 @@ use crate::error::StreamResult; use crate::executor::aggregation::{generate_agg_schema, AggCall, AggGroup}; use crate::executor::error::StreamExecutorError; use crate::executor::{BoxedMessageStream, Message}; +use crate::task::AtomicU64Ref; /// `GlobalSimpleAggExecutor` is the aggregation operator for streaming system. /// To create an aggregation operator, states and expressions should be passed along the @@ -76,6 +75,9 @@ struct ExecutorInner { /// One table per distinct column (may be shared by multiple agg calls). distinct_dedup_tables: HashMap>, + /// Watermark epoch. + watermark_epoch: AtomicU64Ref, + /// Extreme state cache size extreme_cache_size: usize, } @@ -140,6 +142,7 @@ impl GlobalSimpleAggExecutor { storages: args.storages, result_table: args.result_table, distinct_dedup_tables: args.distinct_dedup_tables, + watermark_epoch: args.watermark_epoch, extreme_cache_size: args.extreme_cache_size, }, }) @@ -306,10 +309,7 @@ impl GlobalSimpleAggExecutor { &this.input_schema, ) .await?, - distinct_dedup: DistinctDeduplicater::new( - &this.agg_calls, - &Arc::new(AtomicU64::new(0)), // TODO(rc): use real watermark epoch? - ), + distinct_dedup: DistinctDeduplicater::new(&this.agg_calls, &this.watermark_epoch), state_changed: false, }; diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 582fd734f1e98..03e0774f595dd 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -96,7 +96,7 @@ struct ExecutorInner { /// One table per distinct column (may be shared by multiple agg calls). distinct_dedup_tables: HashMap>, - /// Lru manager. None if using local eviction. + /// Watermark epoch. watermark_epoch: AtomicU64Ref, /// The maximum size of the chunk produced by executor at a time. @@ -199,7 +199,7 @@ impl HashAggExecutor { storages: args.storages, result_table: args.result_table, distinct_dedup_tables: args.distinct_dedup_tables, - watermark_epoch: extra_args.watermark_epoch, + watermark_epoch: args.watermark_epoch, chunk_size: extra_args.chunk_size, extreme_cache_size: args.extreme_cache_size, metrics: extra_args.metrics, @@ -685,13 +685,13 @@ mod tests { storages, result_table, distinct_dedup_tables: Default::default(), + watermark_epoch: Arc::new(AtomicU64::new(0)), extra: Some(AggExecutorArgsExtra { group_key_indices, metrics: Arc::new(StreamingMetrics::unused()), chunk_size: 1024, - watermark_epoch: Arc::new(AtomicU64::new(0)), }), }) .unwrap() diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 1597f79252291..3773370a48b22 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -207,6 +207,9 @@ pub trait StreamExecutorTestExt: MessageStream + Unpin { impl StreamExecutorTestExt for BoxedMessageStream {} pub mod agg_executor { + use std::sync::atomic::AtomicU64; + use std::sync::Arc; + use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; @@ -374,6 +377,7 @@ pub mod agg_executor { storages, result_table, distinct_dedup_tables: Default::default(), + watermark_epoch: Arc::new(AtomicU64::new(0)), extra: None, }) diff --git a/src/stream/src/from_proto/global_simple_agg.rs b/src/stream/src/from_proto/global_simple_agg.rs index e9147c8b4f3a5..d78fb26107da8 100644 --- a/src/stream/src/from_proto/global_simple_agg.rs +++ b/src/stream/src/from_proto/global_simple_agg.rs @@ -66,6 +66,7 @@ impl ExecutorBuilder for GlobalSimpleAggExecutorBuilder { storages, result_table, distinct_dedup_tables, + watermark_epoch: stream.get_watermark_epoch(), extra: None, })? diff --git a/src/stream/src/from_proto/hash_agg.rs b/src/stream/src/from_proto/hash_agg.rs index ca94c7a69e271..417a166f020eb 100644 --- a/src/stream/src/from_proto/hash_agg.rs +++ b/src/stream/src/from_proto/hash_agg.rs @@ -108,13 +108,13 @@ impl ExecutorBuilder for HashAggExecutorBuilder { storages, result_table, distinct_dedup_tables, + watermark_epoch: stream.get_watermark_epoch(), extra: Some(AggExecutorArgsExtra { group_key_indices, metrics: params.executor_stats, chunk_size: params.env.config().developer.stream_chunk_size, - watermark_epoch: stream.get_watermark_epoch(), }), }, group_key_types,