diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index f2bcdd2b62e1..f740e9567e4c 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -212,13 +212,6 @@ pub trait CatalogWriter: Send + Sync { object: alter_set_schema_request::Object, new_schema_id: u32, ) -> Result<()>; - - async fn list_change_log_epochs( - &self, - table_id: u32, - min_epoch: u64, - max_count: u32, - ) -> Result>; } #[derive(Clone)] @@ -596,18 +589,6 @@ impl CatalogWriter for CatalogWriterImpl { Ok(()) } - - async fn list_change_log_epochs( - &self, - table_id: u32, - min_epoch: u64, - max_count: u32, - ) -> Result> { - Ok(self - .meta_client - .list_change_log_epochs(table_id, min_epoch, max_count) - .await?) - } } impl CatalogWriterImpl { diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 56beb26207b3..c54ebcd0aecb 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -126,6 +126,13 @@ pub trait FrontendMetaClient: Send + Sync { id: u32, rate_limit: Option, ) -> Result<()>; + + async fn list_change_log_epochs( + &self, + table_id: u32, + min_epoch: u64, + max_count: u32, + ) -> Result>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -318,4 +325,15 @@ impl FrontendMetaClient for FrontendMetaClientImpl { .await .map(|_| ()) } + + async fn list_change_log_epochs( + &self, + table_id: u32, + min_epoch: u64, + max_count: u32, + ) -> Result> { + self.0 + .list_change_log_epochs(table_id, min_epoch, max_count) + .await + } } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 9ac63eeb391a..2148656bee00 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -922,6 +922,27 @@ impl SessionImpl { Ok(connection.clone()) } + pub fn get_subscription_by_schema_id_name( + &self, + schema_id: SchemaId, + subscription_name: &str, + ) -> Result> { + let db_name = self.database(); + + let catalog_reader = self.env().catalog_reader().read_guard(); + let db_id = catalog_reader.get_database_by_name(db_name)?.id(); + let schema = catalog_reader.get_schema_by_id(&db_id, &schema_id)?; + let subscription = schema + .get_subscription_by_name(subscription_name) + .ok_or_else(|| { + RwError::from(ErrorCode::ItemNotFound(format!( + "subscription {} not found", + subscription_name + ))) + })?; + Ok(subscription.clone()) + } + pub fn get_subscription_by_name( &self, schema_name: Option, @@ -978,10 +999,11 @@ impl SessionImpl { min_epoch: u64, max_count: u32, ) -> Result> { - self.env - .catalog_writer + Ok(self + .env + .meta_client() .list_change_log_epochs(table_id, min_epoch, max_count) - .await + .await?) } pub fn clear_cancel_query_flag(&self) { diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 46eca3beb996..bcd1aa11ec74 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -214,9 +214,10 @@ impl SubscriptionCursor { // Initiate a new batch query to continue fetching match Self::get_next_rw_timestamp( *seek_timestamp, - self.dependent_table_id.table_id, + &self.dependent_table_id, *expected_timestamp, handle_args.clone(), + &self.subscription, ) .await { @@ -343,15 +344,21 @@ impl SubscriptionCursor { async fn get_next_rw_timestamp( seek_timestamp: u64, - table_id: u32, + table_id: &TableId, expected_timestamp: Option, handle_args: HandlerArgs, + dependent_subscription: &SubscriptionCatalog, ) -> Result<(Option, Option)> { + let session = handle_args.session; + // Test subscription existence + session.get_subscription_by_schema_id_name( + dependent_subscription.schema_id, + &dependent_subscription.name, + )?; + // The epoch here must be pulled every time, otherwise there will be cache consistency issues - let new_epochs = handle_args - .session - .catalog_writer()? - .list_change_log_epochs(table_id, seek_timestamp, 2) + let new_epochs = session + .list_change_log_epochs(table_id.table_id(), seek_timestamp, 2) .await?; if let Some(expected_timestamp) = expected_timestamp && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap()) @@ -525,9 +532,17 @@ impl CursorManager { handle_args, ) .await?; - self.cursor_map - .lock() - .await + let mut cursor_map = self.cursor_map.lock().await; + + cursor_map.retain(|_, v| { + if let Cursor::Subscription(cursor) = v { + !matches!(cursor.state, State::Invalid) + } else { + true + } + }); + + cursor_map .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor)) .map_err(|_| { ErrorCode::CatalogError(format!("cursor `{}` already exists", cursor_name).into()) diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index a9ff4084df44..3067e9a97b29 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -242,15 +242,6 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } - async fn list_change_log_epochs( - &self, - _table_id: u32, - _min_epoch: u64, - _max_count: u32, - ) -> Result> { - unreachable!() - } - async fn create_schema( &self, db_id: DatabaseId, @@ -1091,6 +1082,15 @@ impl FrontendMetaClient for MockFrontendMetaClient { ) -> RpcResult<()> { unimplemented!() } + + async fn list_change_log_epochs( + &self, + _table_id: u32, + _min_epoch: u64, + _max_count: u32, + ) -> RpcResult> { + unimplemented!() + } } #[cfg(test)]