Skip to content

Commit

Permalink
use real watermark epoch in global simple agg
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Feb 23, 2023
1 parent 37cb6e0 commit 1aad269
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/stream/src/executor/agg_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct AggExecutorArgs<S: StateStore> {
pub storages: Vec<AggStateStorage<S>>,
pub result_table: StateTable<S>,
pub distinct_dedup_tables: HashMap<usize, StateTable<S>>,
pub watermark_epoch: AtomicU64Ref,

// extra
pub extra: Option<AggExecutorArgsExtra>,
Expand All @@ -53,5 +54,4 @@ pub struct AggExecutorArgsExtra {
// things only used by hash agg currently
pub metrics: Arc<StreamingMetrics>,
pub chunk_size: usize,
pub watermark_epoch: AtomicU64Ref,
}
12 changes: 6 additions & 6 deletions src/stream/src/executor/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicU64;

use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::array::StreamChunk;
Expand All @@ -32,6 +30,7 @@ use crate::error::StreamResult;
use crate::executor::aggregation::{generate_agg_schema, AggCall, AggGroup};
use crate::executor::error::StreamExecutorError;
use crate::executor::{BoxedMessageStream, Message};
use crate::task::AtomicU64Ref;

/// `GlobalSimpleAggExecutor` is the aggregation operator for streaming system.
/// To create an aggregation operator, states and expressions should be passed along the
Expand Down Expand Up @@ -76,6 +75,9 @@ struct ExecutorInner<S: StateStore> {
/// One table per distinct column (may be shared by multiple agg calls).
distinct_dedup_tables: HashMap<usize, StateTable<S>>,

/// Watermark epoch.
watermark_epoch: AtomicU64Ref,

/// Extreme state cache size
extreme_cache_size: usize,
}
Expand Down Expand Up @@ -140,6 +142,7 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
storages: args.storages,
result_table: args.result_table,
distinct_dedup_tables: args.distinct_dedup_tables,
watermark_epoch: args.watermark_epoch,
extreme_cache_size: args.extreme_cache_size,
},
})
Expand Down Expand Up @@ -306,10 +309,7 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
&this.input_schema,
)
.await?,
distinct_dedup: DistinctDeduplicater::new(
&this.agg_calls,
&Arc::new(AtomicU64::new(0)), // TODO(rc): use real watermark epoch?
),
distinct_dedup: DistinctDeduplicater::new(&this.agg_calls, &this.watermark_epoch),
state_changed: false,
};

Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ struct ExecutorInner<K: HashKey, S: StateStore> {
/// One table per distinct column (may be shared by multiple agg calls).
distinct_dedup_tables: HashMap<usize, StateTable<S>>,

/// Lru manager. None if using local eviction.
/// Watermark epoch.
watermark_epoch: AtomicU64Ref,

/// The maximum size of the chunk produced by executor at a time.
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
storages: args.storages,
result_table: args.result_table,
distinct_dedup_tables: args.distinct_dedup_tables,
watermark_epoch: extra_args.watermark_epoch,
watermark_epoch: args.watermark_epoch,
chunk_size: extra_args.chunk_size,
extreme_cache_size: args.extreme_cache_size,
metrics: extra_args.metrics,
Expand Down Expand Up @@ -685,13 +685,13 @@ mod tests {
storages,
result_table,
distinct_dedup_tables: Default::default(),
watermark_epoch: Arc::new(AtomicU64::new(0)),

extra: Some(AggExecutorArgsExtra {
group_key_indices,

metrics: Arc::new(StreamingMetrics::unused()),
chunk_size: 1024,
watermark_epoch: Arc::new(AtomicU64::new(0)),
}),
})
.unwrap()
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ pub trait StreamExecutorTestExt: MessageStream + Unpin {
impl StreamExecutorTestExt for BoxedMessageStream {}

pub mod agg_executor {
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
Expand Down Expand Up @@ -374,6 +377,7 @@ pub mod agg_executor {
storages,
result_table,
distinct_dedup_tables: Default::default(),
watermark_epoch: Arc::new(AtomicU64::new(0)),

extra: None,
})
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl ExecutorBuilder for GlobalSimpleAggExecutorBuilder {
storages,
result_table,
distinct_dedup_tables,
watermark_epoch: stream.get_watermark_epoch(),

extra: None,
})?
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ impl ExecutorBuilder for HashAggExecutorBuilder {
storages,
result_table,
distinct_dedup_tables,
watermark_epoch: stream.get_watermark_epoch(),

extra: Some(AggExecutorArgsExtra {
group_key_indices,

metrics: params.executor_stats,
chunk_size: params.env.config().developer.stream_chunk_size,
watermark_epoch: stream.get_watermark_epoch(),
}),
},
group_key_types,
Expand Down

0 comments on commit 1aad269

Please sign in to comment.