From 34a345a850d30f160a85a7314b5464c6ff316305 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 11 Oct 2024 09:11:05 +0000 Subject: [PATCH] fix: fix subscription clean during recovery (#18866) (#18869) Co-authored-by: August --- src/meta/src/controller/catalog.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 06d9e262f17c8..4e42a16b654b1 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -51,7 +51,7 @@ use risingwave_pb::meta::{ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::FragmentTypeFlag; use risingwave_pb::user::PbUserInfo; -use sea_orm::sea_query::{Expr, SimpleExpr}; +use sea_orm::sea_query::{Expr, Query, SimpleExpr}; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, @@ -516,10 +516,21 @@ impl CatalogController { pub async fn clean_dirty_subscription(&self) -> MetaResult<()> { let inner = self.inner.write().await; let txn = inner.db.begin().await?; - let _res = Subscription::delete_many() + + Object::delete_many() .filter( - subscription::Column::SubscriptionState - .eq(Into::::into(SubscriptionState::Init)), + object::Column::ObjType.eq(ObjectType::Subscription).and( + object::Column::Oid.not_in_subquery( + Query::select() + .column(subscription::Column::SubscriptionId) + .from(Subscription) + .and_where( + subscription::Column::SubscriptionState + .eq(SubscriptionState::Created as i32), + ) + .take(), + ), + ), ) .exec(&txn) .await?;