diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 1983937808656..7607915bd13f7 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -482,16 +482,19 @@ impl CatalogController { .one(&txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?; + txn.commit().await?; - let relations = vec![PbRelation { - relation_info: Some(PbRelationInfo::Subscription( - ObjectModel(subscription, obj.unwrap()).into(), - )), - }]; let version = self .notify_frontend( - NotificationOperation::Add, - NotificationInfo::RelationGroup(PbRelationGroup { relations }), + Operation::Add, + Info::RelationGroup(PbRelationGroup { + relations: vec![PbRelation { + relation_info: PbRelationInfo::Subscription( + ObjectModel(subscription, obj.unwrap()).into(), + ) + .into(), + }], + }), ) .await; Ok(version) diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index 0ce47608cdfd2..dcf537ad6c7e7 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -101,7 +101,6 @@ impl NotificationManager { info: Some(task.info), version: task.version.unwrap_or_default(), }; - core.lock().await.notify(task.target, response); } }); diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 69d0d78fba1f9..af694c3373e10 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -39,6 +39,7 @@ impl TableChangeLog { .filter_epoch((min_epoch, u64::MAX)) .iter() .flat_map(|epoch_change_log| epoch_change_log.epochs.clone()) + .filter(|a| a >= &min_epoch) .clone() .collect(); let end = min(max_count, epochs.len());