Skip to content

Commit

Permalink
fix: fix subscription clean during recovery (#18866) (#18869)
Browse files Browse the repository at this point in the history
Co-authored-by: August <[email protected]>
  • Loading branch information
github-actions[bot] and yezizp2012 authored Oct 11, 2024
1 parent 69ed9ba commit 34a345a
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use risingwave_pb::meta::{
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::FragmentTypeFlag;
use risingwave_pb::user::PbUserInfo;
use sea_orm::sea_query::{Expr, SimpleExpr};
use sea_orm::sea_query::{Expr, Query, SimpleExpr};
use sea_orm::ActiveValue::Set;
use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
Expand Down Expand Up @@ -516,10 +516,21 @@ impl CatalogController {
pub async fn clean_dirty_subscription(&self) -> MetaResult<()> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let _res = Subscription::delete_many()

Object::delete_many()
.filter(
subscription::Column::SubscriptionState
.eq(Into::<i32>::into(SubscriptionState::Init)),
object::Column::ObjType.eq(ObjectType::Subscription).and(
object::Column::Oid.not_in_subquery(
Query::select()
.column(subscription::Column::SubscriptionId)
.from(Subscription)
.and_where(
subscription::Column::SubscriptionState
.eq(SubscriptionState::Created as i32),
)
.take(),
),
),
)
.exec(&txn)
.await?;
Expand Down

0 comments on commit 34a345a

Please sign in to comment.