From a5e9e711de05c210ee106e127df68a5112033b90 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 14 Sep 2023 15:09:40 +0800 Subject: [PATCH 1/7] assert pk indices consistency Signed-off-by: Richard Chien --- src/stream/src/task/stream_manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 2abc8212e2984..2177f27b0ab6c 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -531,7 +531,7 @@ impl LocalStreamManagerCore { // Build the executor with params. let executor_params = ExecutorParams { env: env.clone(), - pk_indices, + pk_indices: pk_indices.clone(), executor_id, operator_id, op_info, @@ -544,6 +544,7 @@ impl LocalStreamManagerCore { }; let executor = create_executor(executor_params, self, node, store).await?; + assert_eq!(executor.pk_indices(), &pk_indices); // Wrap the executor for debug purpose. let executor = WrapperExecutor::new( From 370720b6c887ef51ca99a00085e0b6227ef4d89f Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 14 Sep 2023 15:09:47 +0800 Subject: [PATCH 2/7] fix group topn pk indices Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/group_top_n.rs | 14 +++++++++++--- .../src/executor/top_n/group_top_n_appendonly.rs | 11 ++++++++--- src/stream/src/from_proto/group_top_n.rs | 5 ++++- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 421b1141843a0..7e075002b99cb 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -51,6 +51,7 @@ impl GroupTopNExecutor, state_table: StateTable, watermark_epoch: AtomicU64Ref, + pk_indices: PkIndices, ) -> StreamResult { let info = input.info(); Ok(TopNExecutorWrapper { @@ -66,6 +67,7 @@ impl GroupTopNExecutor InnerGroupTopNExecutor, watermark_epoch: AtomicU64Ref, ctx: ActorContextRef, + pk_indices: PkIndices, ) -> StreamResult { let ExecutorInfo { - pk_indices, schema, .. + schema: input_schema, + .. } = input_info; let metrics_info = MetricsInfo::new( @@ -121,12 +125,13 @@ impl InnerGroupTopNExecutor::new(state_table, cache_key_serde.clone()); Ok(Self { info: ExecutorInfo { - schema, + schema: input_schema, pk_indices, identity: format!("GroupTopNExecutor {:X}", executor_id), }, @@ -408,6 +413,7 @@ mod tests { vec![1], state_table, Arc::new(AtomicU64::new(0)), + pk_indices(), ) .unwrap(); let top_n_executor = Box::new(a); @@ -505,6 +511,7 @@ mod tests { vec![1], state_table, Arc::new(AtomicU64::new(0)), + pk_indices(), ) .unwrap(), ); @@ -595,6 +602,7 @@ mod tests { vec![1, 2], state_table, Arc::new(AtomicU64::new(0)), + pk_indices(), ) .unwrap(), ); diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index ceb4a3bca4d40..8df8ee17768c5 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -69,6 +69,7 @@ impl group_by: Vec, state_table: StateTable, watermark_epoch: AtomicU64Ref, + pk_indices: PkIndices, ) -> StreamResult { let info = input.info(); Ok(TopNExecutorWrapper { @@ -84,6 +85,7 @@ impl state_table, watermark_epoch, ctx, + pk_indices, )?, }) } @@ -129,9 +131,11 @@ impl state_table: StateTable, watermark_epoch: AtomicU64Ref, ctx: ActorContextRef, + pk_indices: PkIndices, ) -> StreamResult { let ExecutorInfo { - pk_indices, schema, .. + schema: input_schema, + .. } = input_info; let metrics_info = MetricsInfo::new( @@ -141,12 +145,13 @@ impl "GroupTopN", ); - let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by); + let cache_key_serde = + create_cache_key_serde(&storage_key, &input_schema, &order_by, &group_by); let managed_state = ManagedTopNState::::new(state_table, cache_key_serde.clone()); Ok(Self { info: ExecutorInfo { - schema, + schema: input_schema, pk_indices, identity: format!("AppendOnlyGroupTopNExecutor {:X}", executor_id), }, diff --git a/src/stream/src/from_proto/group_top_n.rs b/src/stream/src/from_proto/group_top_n.rs index 9f74134308daa..a91d4a91a6ef0 100644 --- a/src/stream/src/from_proto/group_top_n.rs +++ b/src/stream/src/from_proto/group_top_n.rs @@ -21,7 +21,7 @@ use risingwave_pb::stream_plan::GroupTopNNode; use super::*; use crate::common::table::state_table::StateTable; -use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor}; +use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor, PkIndices}; use crate::task::AtomicU64Ref; pub struct GroupTopNExecutorBuilder; @@ -80,6 +80,7 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder { state_table: StateTable, watermark_epoch: AtomicU64Ref, group_key_types: Vec, + pk_indices: PkIndices, with_ties: bool, append_only: bool, @@ -120,6 +122,7 @@ impl HashKeyDispatcher for GroupTopNExecutorDispatcherArgs { self.group_by, self.state_table, self.watermark_epoch, + self.pk_indices, )? .boxed()) }; From da46f87eca3775753ddf7352da4707d12d36eb4a Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 14 Sep 2023 15:42:39 +0800 Subject: [PATCH 3/7] Revert "assert pk indices consistency" This reverts commit a5e9e711de05c210ee106e127df68a5112033b90. --- src/stream/src/task/stream_manager.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 2177f27b0ab6c..2abc8212e2984 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -531,7 +531,7 @@ impl LocalStreamManagerCore { // Build the executor with params. let executor_params = ExecutorParams { env: env.clone(), - pk_indices: pk_indices.clone(), + pk_indices, executor_id, operator_id, op_info, @@ -544,7 +544,6 @@ impl LocalStreamManagerCore { }; let executor = create_executor(executor_params, self, node, store).await?; - assert_eq!(executor.pk_indices(), &pk_indices); // Wrap the executor for debug purpose. let executor = WrapperExecutor::new( From 8589af11099e14d60bf155dd18d33ef48077212b Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 14 Sep 2023 16:31:38 +0800 Subject: [PATCH 4/7] update e2e Signed-off-by: Richard Chien --- .../streaming/group_top_n/group_top_1.slt | 39 +++++++++++++++++++ .../{group_top_n.slt => group_top_n/main.slt} | 0 2 files changed, 39 insertions(+) create mode 100644 e2e_test/streaming/group_top_n/group_top_1.slt rename e2e_test/streaming/{group_top_n.slt => group_top_n/main.slt} (100%) diff --git a/e2e_test/streaming/group_top_n/group_top_1.slt b/e2e_test/streaming/group_top_n/group_top_1.slt new file mode 100644 index 0000000000000..333efbff6add4 --- /dev/null +++ b/e2e_test/streaming/group_top_n/group_top_1.slt @@ -0,0 +1,39 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# https://github.com/risingwavelabs/risingwave/issues/12282 + +statement ok +create table t(a int, b int, c int); + +statement ok +create materialized view mv as SELECT * FROM ( + SELECT + *, + row_number() OVER (PARTITION BY a ORDER BY b) AS rank + FROM t +) WHERE rank <= 1; + +statement ok +insert into t values (1, 1, 1); + +query iiiI +select * from mv; +---- +1 1 1 1 + +statement ok +insert into t values (1, 0, 1); + +query iiiI +select * from mv; +---- +1 0 1 1 + +statement ok +insert into t values (1, 0, 1); + +query iiiI +select * from mv; +---- +1 0 1 1 diff --git a/e2e_test/streaming/group_top_n.slt b/e2e_test/streaming/group_top_n/main.slt similarity index 100% rename from e2e_test/streaming/group_top_n.slt rename to e2e_test/streaming/group_top_n/main.slt From b9f52522a54d3bf9274f95043cc44b57b5cf8fd9 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 14 Sep 2023 16:56:14 +0800 Subject: [PATCH 5/7] fix Signed-off-by: Richard Chien --- e2e_test/streaming/group_top_n/group_top_1.slt | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/e2e_test/streaming/group_top_n/group_top_1.slt b/e2e_test/streaming/group_top_n/group_top_1.slt index 333efbff6add4..2bed792a4ae82 100644 --- a/e2e_test/streaming/group_top_n/group_top_1.slt +++ b/e2e_test/streaming/group_top_n/group_top_1.slt @@ -37,3 +37,9 @@ query iiiI select * from mv; ---- 1 0 1 1 + +statement ok +drop materialized view mv; + +statement ok +drop table t; From 80a3685d3dd33c614a5092b4192eea1a9f34bb7a Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 15 Sep 2023 12:47:55 +0800 Subject: [PATCH 6/7] rename to workaround, waiting for https://github.com/risinglightdb/sqllogictest-rs/pull/194 Signed-off-by: Richard Chien --- e2e_test/streaming/group_top_n/{main.slt => main1.slt} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename e2e_test/streaming/group_top_n/{main.slt => main1.slt} (100%) diff --git a/e2e_test/streaming/group_top_n/main.slt b/e2e_test/streaming/group_top_n/main1.slt similarity index 100% rename from e2e_test/streaming/group_top_n/main.slt rename to e2e_test/streaming/group_top_n/main1.slt From 648b54e823f7b0f751b5fab8ebdc733c8502fe4d Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 15 Sep 2023 13:34:30 +0800 Subject: [PATCH 7/7] empty Signed-off-by: Richard Chien