diff --git a/e2e_test/subscription/create_table_and_subscription.slt b/e2e_test/subscription/create_table_and_subscription.slt index fd43567bc52de..8ba73fae7ec91 100644 --- a/e2e_test/subscription/create_table_and_subscription.slt +++ b/e2e_test/subscription/create_table_and_subscription.slt @@ -8,4 +8,16 @@ statement ok flush; statement ok -create subscription sub from t1 with(retention = '1D'); \ No newline at end of file +create subscription sub from t1 with(retention = '1D'); + +statement ok +create table t2 (v1 int, v2 int); + +statement ok +create table t3 (v1 int primary key, v2 int); + +statement ok +create subscription sub2 from t3 with(retention = '1D'); + +statement ok +create sink s1 into t3 from t2; \ No newline at end of file diff --git a/e2e_test/subscription/drop_table_and_subscription.slt b/e2e_test/subscription/drop_table_and_subscription.slt index 0df183a5b7793..d89401c9cddc8 100644 --- a/e2e_test/subscription/drop_table_and_subscription.slt +++ b/e2e_test/subscription/drop_table_and_subscription.slt @@ -2,4 +2,16 @@ statement ok drop subscription sub; statement ok -drop table t1; \ No newline at end of file +drop table t1; + +statement ok +drop sink s1; + +statement ok +drop subscription sub2; + +statement ok +drop table t3; + +statement ok +drop table t2; \ No newline at end of file diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index c7fcc56a35ac5..470a00ab85407 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -233,6 +233,28 @@ def test_cursor_op(): execute_insert("close cur",conn) drop_table_subscription() +def test_rebuild_table(): + print(f"test_rebuild_table") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("declare cur subscription cursor for sub2",conn) + execute_insert("insert into t2 values(1,1)",conn) + execute_insert("flush",conn) + execute_insert("update t2 set v2 = 100 where v1 = 1",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([1,1],row,1) + row = execute_query("fetch next from cur",conn) + check_rows_data([1,1],row,4) + row = execute_query("fetch next from cur",conn) + check_rows_data([1,100],row,3) + if __name__ == "__main__": test_cursor_snapshot() test_cursor_op() @@ -240,3 +262,4 @@ def test_cursor_op(): test_cursor_since_rw_timestamp() test_cursor_since_now() test_cursor_since_begin() + test_rebuild_table() diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index d0979d81455af..e194f50643346 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2533,6 +2533,30 @@ impl CatalogController { Ok(subscription) } + pub async fn get_mv_depended_subscriptions( + &self, + ) -> MetaResult>> { + let inner = self.inner.read().await; + let subscription_objs = Subscription::find() + .find_also_related(Object) + .all(&inner.db) + .await?; + let mut map = HashMap::new(); + // Write object at the same time we write subscription, so we must be able to get obj + for subscription in subscription_objs + .into_iter() + .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into()) + { + let subscription: PbSubscription = subscription; + map.entry(risingwave_common::catalog::TableId::from( + subscription.dependent_table_id, + )) + .or_insert(HashMap::new()) + .insert(subscription.id, subscription.retention_seconds); + } + Ok(map) + } + pub async fn find_creating_streaming_job_ids( &self, infos: Vec, diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index e1c2e1cb2d1d4..eb51b4862eee2 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3754,6 +3754,22 @@ impl CatalogManager { Ok(subscription.clone()) } + pub async fn get_mv_depended_subscriptions( + &self, + ) -> MetaResult>> { + let guard = self.core.lock().await; + let mut map = HashMap::new(); + for subscription in guard.database.subscriptions.values() { + map.entry(risingwave_common::catalog::TableId::from( + subscription.dependent_table_id, + )) + .or_insert(HashMap::new()) + .insert(subscription.id, subscription.retention_seconds); + } + + Ok(map) + } + pub async fn get_created_table_ids(&self) -> Vec { let guard = self.core.lock().await; guard diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index ffd791fd23b9d..07da09a682b01 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -821,11 +821,14 @@ impl MetadataManager { } } - #[expect(clippy::unused_async)] pub async fn get_mv_depended_subscriptions( &self, ) -> MetaResult>> { - // TODO(subscription): support the correct logic when supporting L0 log store subscriptions - Ok(HashMap::new()) + match self { + MetadataManager::V1(mgr) => mgr.catalog_manager.get_mv_depended_subscriptions().await, + MetadataManager::V2(mgr) => { + mgr.catalog_controller.get_mv_depended_subscriptions().await + } + } } } diff --git a/src/meta/src/stream/mod.rs b/src/meta/src/stream/mod.rs index 57ca9896257f3..ec542ed8c008b 100644 --- a/src/meta/src/stream/mod.rs +++ b/src/meta/src/stream/mod.rs @@ -36,16 +36,16 @@ pub use stream_manager::*; pub(crate) fn to_build_actor_info( actor: StreamActor, subscriptions: &HashMap>, - fragment_table_id: TableId, + subscription_depend_table_id: TableId, ) -> BuildActorInfo { BuildActorInfo { actor: Some(actor), related_subscriptions: subscriptions - .get(&fragment_table_id) + .get(&subscription_depend_table_id) .into_iter() .map(|subscriptions| { ( - fragment_table_id.table_id, + subscription_depend_table_id.table_id, SubscriptionIds { subscription_ids: subscriptions.keys().cloned().collect(), }, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 58c3b9add64c5..2acbe3112180a 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -334,6 +334,7 @@ impl GlobalStreamManager { table_fragments: &TableFragments, building_locations: &Locations, existing_locations: &Locations, + subscription_depend_table_id: TableId, ) -> MetaResult<()> { let actor_map = table_fragments.actor_map(); @@ -367,7 +368,7 @@ impl GlobalStreamManager { to_build_actor_info( actor_map[actor_id].clone(), &subscriptions, - table_fragments.table_id(), + subscription_depend_table_id, ) }) .collect::>(); @@ -406,8 +407,13 @@ impl GlobalStreamManager { let mut replace_table_command = None; let mut replace_table_id = None; - self.build_actors(&table_fragments, &building_locations, &existing_locations) - .await?; + self.build_actors( + &table_fragments, + &building_locations, + &existing_locations, + table_fragments.table_id(), + ) + .await?; tracing::debug!( table_id = %table_fragments.table_id(), "built actors finished" @@ -418,6 +424,7 @@ impl GlobalStreamManager { &table_fragments, &context.building_locations, &context.existing_locations, + context.old_table_fragments.table_id(), ) .await?; @@ -504,8 +511,13 @@ impl GlobalStreamManager { existing_locations, }: ReplaceTableContext, ) -> MetaResult<()> { - self.build_actors(&table_fragments, &building_locations, &existing_locations) - .await?; + self.build_actors( + &table_fragments, + &building_locations, + &existing_locations, + old_table_fragments.table_id(), + ) + .await?; let dummy_table_id = table_fragments.table_id(); let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?;