Skip to content

Commit

Permalink
fix(subscription): fix drop subscription not clear cursor (#17232)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Jul 5, 2024
1 parent 3c18089 commit 096889f
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 40 deletions.
19 changes: 0 additions & 19 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,6 @@ pub trait CatalogWriter: Send + Sync {
object: alter_set_schema_request::Object,
new_schema_id: u32,
) -> Result<()>;

async fn list_change_log_epochs(
&self,
table_id: u32,
min_epoch: u64,
max_count: u32,
) -> Result<Vec<u64>>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -596,18 +589,6 @@ impl CatalogWriter for CatalogWriterImpl {

Ok(())
}

async fn list_change_log_epochs(
&self,
table_id: u32,
min_epoch: u64,
max_count: u32,
) -> Result<Vec<u64>> {
Ok(self
.meta_client
.list_change_log_epochs(table_id, min_epoch, max_count)
.await?)
}
}

impl CatalogWriterImpl {
Expand Down
18 changes: 18 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ pub trait FrontendMetaClient: Send + Sync {
id: u32,
rate_limit: Option<u32>,
) -> Result<()>;

async fn list_change_log_epochs(
&self,
table_id: u32,
min_epoch: u64,
max_count: u32,
) -> Result<Vec<u64>>;
}

pub struct FrontendMetaClientImpl(pub MetaClient);
Expand Down Expand Up @@ -318,4 +325,15 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
.await
.map(|_| ())
}

async fn list_change_log_epochs(
&self,
table_id: u32,
min_epoch: u64,
max_count: u32,
) -> Result<Vec<u64>> {
self.0
.list_change_log_epochs(table_id, min_epoch, max_count)
.await
}
}
28 changes: 25 additions & 3 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,27 @@ impl SessionImpl {
Ok(connection.clone())
}

pub fn get_subscription_by_schema_id_name(
&self,
schema_id: SchemaId,
subscription_name: &str,
) -> Result<Arc<SubscriptionCatalog>> {
let db_name = self.database();

let catalog_reader = self.env().catalog_reader().read_guard();
let db_id = catalog_reader.get_database_by_name(db_name)?.id();
let schema = catalog_reader.get_schema_by_id(&db_id, &schema_id)?;
let subscription = schema
.get_subscription_by_name(subscription_name)
.ok_or_else(|| {
RwError::from(ErrorCode::ItemNotFound(format!(
"subscription {} not found",
subscription_name
)))
})?;
Ok(subscription.clone())
}

pub fn get_subscription_by_name(
&self,
schema_name: Option<String>,
Expand Down Expand Up @@ -978,10 +999,11 @@ impl SessionImpl {
min_epoch: u64,
max_count: u32,
) -> Result<Vec<u64>> {
self.env
.catalog_writer
Ok(self
.env
.meta_client()
.list_change_log_epochs(table_id, min_epoch, max_count)
.await
.await?)
}

pub fn clear_cancel_query_flag(&self) {
Expand Down
33 changes: 24 additions & 9 deletions src/frontend/src/session/cursor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@ impl SubscriptionCursor {
// Initiate a new batch query to continue fetching
match Self::get_next_rw_timestamp(
*seek_timestamp,
self.dependent_table_id.table_id,
&self.dependent_table_id,
*expected_timestamp,
handle_args.clone(),
&self.subscription,
)
.await
{
Expand Down Expand Up @@ -343,15 +344,21 @@ impl SubscriptionCursor {

async fn get_next_rw_timestamp(
seek_timestamp: u64,
table_id: u32,
table_id: &TableId,
expected_timestamp: Option<u64>,
handle_args: HandlerArgs,
dependent_subscription: &SubscriptionCatalog,
) -> Result<(Option<u64>, Option<u64>)> {
let session = handle_args.session;
// Test subscription existence
session.get_subscription_by_schema_id_name(
dependent_subscription.schema_id,
&dependent_subscription.name,
)?;

// The epoch here must be pulled every time, otherwise there will be cache consistency issues
let new_epochs = handle_args
.session
.catalog_writer()?
.list_change_log_epochs(table_id, seek_timestamp, 2)
let new_epochs = session
.list_change_log_epochs(table_id.table_id(), seek_timestamp, 2)
.await?;
if let Some(expected_timestamp) = expected_timestamp
&& (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
Expand Down Expand Up @@ -525,9 +532,17 @@ impl CursorManager {
handle_args,
)
.await?;
self.cursor_map
.lock()
.await
let mut cursor_map = self.cursor_map.lock().await;

cursor_map.retain(|_, v| {
if let Cursor::Subscription(cursor) = v {
!matches!(cursor.state, State::Invalid)
} else {
true
}
});

cursor_map
.try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor))
.map_err(|_| {
ErrorCode::CatalogError(format!("cursor `{}` already exists", cursor_name).into())
Expand Down
18 changes: 9 additions & 9 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,6 @@ impl CatalogWriter for MockCatalogWriter {
Ok(())
}

async fn list_change_log_epochs(
&self,
_table_id: u32,
_min_epoch: u64,
_max_count: u32,
) -> Result<Vec<u64>> {
unreachable!()
}

async fn create_schema(
&self,
db_id: DatabaseId,
Expand Down Expand Up @@ -1091,6 +1082,15 @@ impl FrontendMetaClient for MockFrontendMetaClient {
) -> RpcResult<()> {
unimplemented!()
}

async fn list_change_log_epochs(
&self,
_table_id: u32,
_min_epoch: u64,
_max_count: u32,
) -> RpcResult<Vec<u64>> {
unimplemented!()
}
}

#[cfg(test)]
Expand Down

0 comments on commit 096889f

Please sign in to comment.