From dea0db6686a9ef335cd3fcef82f3f82ad70c322c Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 6 May 2024 23:25:50 +0800 Subject: [PATCH] fix ci fix --- src/frontend/src/observer/observer_manager.rs | 25 +++++++++++-------- src/meta/src/controller/catalog.rs | 16 ++++++------ 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index ddf6ca489bf0c..b88ea55c669ac 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -294,16 +294,21 @@ impl FrontendObserverNode { Operation::Update => catalog_guard.update_sink(sink), _ => panic!("receive an unsupported notify {:?}", resp), }, - RelationInfo::Subscription(subscription) => match resp.operation() { - Operation::Add => catalog_guard.create_subscription(subscription), - Operation::Delete => catalog_guard.drop_subscription( - subscription.database_id, - subscription.schema_id, - subscription.id, - ), - Operation::Update => catalog_guard.update_subscription(subscription), - _ => panic!("receive an unsupported notify {:?}", resp), - }, + RelationInfo::Subscription(subscription) => { + println!("subscription: {:?}", resp); + match resp.operation() { + Operation::Add => catalog_guard.create_subscription(subscription), + Operation::Delete => catalog_guard.drop_subscription( + subscription.database_id, + subscription.schema_id, + subscription.id, + ), + Operation::Update => { + catalog_guard.update_subscription(subscription) + } + _ => panic!("receive an unsupported notify {:?}", resp), + } + } RelationInfo::Index(index) => match resp.operation() { Operation::Add => catalog_guard.create_index(index), Operation::Delete => catalog_guard.drop_index( diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 1983937808656..8711a89bd7972 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -483,15 +483,17 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?; - let relations = vec![PbRelation { - relation_info: Some(PbRelationInfo::Subscription( - ObjectModel(subscription, obj.unwrap()).into(), - )), - }]; let version = self .notify_frontend( - NotificationOperation::Add, - NotificationInfo::RelationGroup(PbRelationGroup { relations }), + Operation::Add, + Info::RelationGroup(PbRelationGroup { + relations: vec![PbRelation { + relation_info: PbRelationInfo::Subscription( + ObjectModel(subscription, obj.unwrap()).into(), + ) + .into(), + }], + }), ) .await; Ok(version)