diff --git a/e2e_test/streaming/group_top_n/group_top_1.slt b/e2e_test/streaming/group_top_n/group_top_1.slt new file mode 100644 index 000000000000..2bed792a4ae8 --- /dev/null +++ b/e2e_test/streaming/group_top_n/group_top_1.slt @@ -0,0 +1,45 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# https://github.com/risingwavelabs/risingwave/issues/12282 + +statement ok +create table t(a int, b int, c int); + +statement ok +create materialized view mv as SELECT * FROM ( + SELECT + *, + row_number() OVER (PARTITION BY a ORDER BY b) AS rank + FROM t +) WHERE rank <= 1; + +statement ok +insert into t values (1, 1, 1); + +query iiiI +select * from mv; +---- +1 1 1 1 + +statement ok +insert into t values (1, 0, 1); + +query iiiI +select * from mv; +---- +1 0 1 1 + +statement ok +insert into t values (1, 0, 1); + +query iiiI +select * from mv; +---- +1 0 1 1 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/e2e_test/streaming/group_top_n.slt b/e2e_test/streaming/group_top_n/main1.slt similarity index 100% rename from e2e_test/streaming/group_top_n.slt rename to e2e_test/streaming/group_top_n/main1.slt diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 421b1141843a..7e075002b99c 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -51,6 +51,7 @@ impl GroupTopNExecutor, state_table: StateTable, watermark_epoch: AtomicU64Ref, + pk_indices: PkIndices, ) -> StreamResult { let info = input.info(); Ok(TopNExecutorWrapper { @@ -66,6 +67,7 @@ impl GroupTopNExecutor InnerGroupTopNExecutor, watermark_epoch: AtomicU64Ref, ctx: ActorContextRef, + pk_indices: PkIndices, ) -> StreamResult { let ExecutorInfo { - pk_indices, schema, .. + schema: input_schema, + .. } = input_info; let metrics_info = MetricsInfo::new( @@ -121,12 +125,13 @@ impl InnerGroupTopNExecutor::new(state_table, cache_key_serde.clone()); Ok(Self { info: ExecutorInfo { - schema, + schema: input_schema, pk_indices, identity: format!("GroupTopNExecutor {:X}", executor_id), }, @@ -408,6 +413,7 @@ mod tests { vec![1], state_table, Arc::new(AtomicU64::new(0)), + pk_indices(), ) .unwrap(); let top_n_executor = Box::new(a); @@ -505,6 +511,7 @@ mod tests { vec![1], state_table, Arc::new(AtomicU64::new(0)), + pk_indices(), ) .unwrap(), ); @@ -595,6 +602,7 @@ mod tests { vec![1, 2], state_table, Arc::new(AtomicU64::new(0)), + pk_indices(), ) .unwrap(), ); diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index ceb4a3bca4d4..8df8ee17768c 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -69,6 +69,7 @@ impl group_by: Vec, state_table: StateTable, watermark_epoch: AtomicU64Ref, + pk_indices: PkIndices, ) -> StreamResult { let info = input.info(); Ok(TopNExecutorWrapper { @@ -84,6 +85,7 @@ impl state_table, watermark_epoch, ctx, + pk_indices, )?, }) } @@ -129,9 +131,11 @@ impl state_table: StateTable, watermark_epoch: AtomicU64Ref, ctx: ActorContextRef, + pk_indices: PkIndices, ) -> StreamResult { let ExecutorInfo { - pk_indices, schema, .. + schema: input_schema, + .. } = input_info; let metrics_info = MetricsInfo::new( @@ -141,12 +145,13 @@ impl "GroupTopN", ); - let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by); + let cache_key_serde = + create_cache_key_serde(&storage_key, &input_schema, &order_by, &group_by); let managed_state = ManagedTopNState::::new(state_table, cache_key_serde.clone()); Ok(Self { info: ExecutorInfo { - schema, + schema: input_schema, pk_indices, identity: format!("AppendOnlyGroupTopNExecutor {:X}", executor_id), }, diff --git a/src/stream/src/from_proto/group_top_n.rs b/src/stream/src/from_proto/group_top_n.rs index 9f74134308da..a91d4a91a6ef 100644 --- a/src/stream/src/from_proto/group_top_n.rs +++ b/src/stream/src/from_proto/group_top_n.rs @@ -21,7 +21,7 @@ use risingwave_pb::stream_plan::GroupTopNNode; use super::*; use crate::common::table::state_table::StateTable; -use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor}; +use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor, PkIndices}; use crate::task::AtomicU64Ref; pub struct GroupTopNExecutorBuilder; @@ -80,6 +80,7 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder { state_table: StateTable, watermark_epoch: AtomicU64Ref, group_key_types: Vec, + pk_indices: PkIndices, with_ties: bool, append_only: bool, @@ -120,6 +122,7 @@ impl HashKeyDispatcher for GroupTopNExecutorDispatcherArgs { self.group_by, self.state_table, self.watermark_epoch, + self.pk_indices, )? .boxed()) };