Skip to content

Commit

Permalink
fix(stream): fix pk indices of GroupTopN executors (#12304)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Sep 15, 2023
1 parent 43c010e commit a99e6f3
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 7 deletions.
45 changes: 45 additions & 0 deletions e2e_test/streaming/group_top_n/group_top_1.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

# https://github.com/risingwavelabs/risingwave/issues/12282

statement ok
create table t(a int, b int, c int);

statement ok
create materialized view mv as SELECT * FROM (
SELECT
*,
row_number() OVER (PARTITION BY a ORDER BY b) AS rank
FROM t
) WHERE rank <= 1;

statement ok
insert into t values (1, 1, 1);

query iiiI
select * from mv;
----
1 1 1 1

statement ok
insert into t values (1, 0, 1);

query iiiI
select * from mv;
----
1 0 1 1

statement ok
insert into t values (1, 0, 1);

query iiiI
select * from mv;
----
1 0 1 1

statement ok
drop materialized view mv;

statement ok
drop table t;
File renamed without changes.
14 changes: 11 additions & 3 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> GroupTopNExecutor<K, S, W
group_by: Vec<usize>,
state_table: StateTable<S>,
watermark_epoch: AtomicU64Ref,
pk_indices: PkIndices,
) -> StreamResult<Self> {
let info = input.info();
Ok(TopNExecutorWrapper {
Expand All @@ -66,6 +67,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> GroupTopNExecutor<K, S, W
state_table,
watermark_epoch,
ctx,
pk_indices,
)?,
})
}
Expand Down Expand Up @@ -109,9 +111,11 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutor<K,
state_table: StateTable<S>,
watermark_epoch: AtomicU64Ref,
ctx: ActorContextRef,
pk_indices: PkIndices,
) -> StreamResult<Self> {
let ExecutorInfo {
pk_indices, schema, ..
schema: input_schema,
..
} = input_info;

let metrics_info = MetricsInfo::new(
Expand All @@ -121,12 +125,13 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutor<K,
"GroupTopN",
);

let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by);
let cache_key_serde =
create_cache_key_serde(&storage_key, &input_schema, &order_by, &group_by);
let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());

Ok(Self {
info: ExecutorInfo {
schema,
schema: input_schema,
pk_indices,
identity: format!("GroupTopNExecutor {:X}", executor_id),
},
Expand Down Expand Up @@ -408,6 +413,7 @@ mod tests {
vec![1],
state_table,
Arc::new(AtomicU64::new(0)),
pk_indices(),
)
.unwrap();
let top_n_executor = Box::new(a);
Expand Down Expand Up @@ -505,6 +511,7 @@ mod tests {
vec![1],
state_table,
Arc::new(AtomicU64::new(0)),
pk_indices(),
)
.unwrap(),
);
Expand Down Expand Up @@ -595,6 +602,7 @@ mod tests {
vec![1, 2],
state_table,
Arc::new(AtomicU64::new(0)),
pk_indices(),
)
.unwrap(),
);
Expand Down
11 changes: 8 additions & 3 deletions src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
group_by: Vec<usize>,
state_table: StateTable<S>,
watermark_epoch: AtomicU64Ref,
pk_indices: PkIndices,
) -> StreamResult<Self> {
let info = input.info();
Ok(TopNExecutorWrapper {
Expand All @@ -84,6 +85,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
state_table,
watermark_epoch,
ctx,
pk_indices,
)?,
})
}
Expand Down Expand Up @@ -129,9 +131,11 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
state_table: StateTable<S>,
watermark_epoch: AtomicU64Ref,
ctx: ActorContextRef,
pk_indices: PkIndices,
) -> StreamResult<Self> {
let ExecutorInfo {
pk_indices, schema, ..
schema: input_schema,
..
} = input_info;

let metrics_info = MetricsInfo::new(
Expand All @@ -141,12 +145,13 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
"GroupTopN",
);

let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by);
let cache_key_serde =
create_cache_key_serde(&storage_key, &input_schema, &order_by, &group_by);
let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());

Ok(Self {
info: ExecutorInfo {
schema,
schema: input_schema,
pk_indices,
identity: format!("AppendOnlyGroupTopNExecutor {:X}", executor_id),
},
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/from_proto/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_pb::stream_plan::GroupTopNNode;

use super::*;
use crate::common::table::state_table::StateTable;
use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor};
use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor, PkIndices};
use crate::task::AtomicU64Ref;

pub struct GroupTopNExecutorBuilder<const APPEND_ONLY: bool>;
Expand Down Expand Up @@ -80,6 +80,7 @@ impl<const APPEND_ONLY: bool> ExecutorBuilder for GroupTopNExecutorBuilder<APPEN
state_table,
watermark_epoch: stream.get_watermark_epoch(),
group_key_types,
pk_indices: params.pk_indices,

with_ties: node.with_ties,
append_only: APPEND_ONLY,
Expand All @@ -99,6 +100,7 @@ struct GroupTopNExecutorDispatcherArgs<S: StateStore> {
state_table: StateTable<S>,
watermark_epoch: AtomicU64Ref,
group_key_types: Vec<DataType>,
pk_indices: PkIndices,

with_ties: bool,
append_only: bool,
Expand All @@ -120,6 +122,7 @@ impl<S: StateStore> HashKeyDispatcher for GroupTopNExecutorDispatcherArgs<S> {
self.group_by,
self.state_table,
self.watermark_epoch,
self.pk_indices,
)?
.boxed())
};
Expand Down

0 comments on commit a99e6f3

Please sign in to comment.