Skip to content

Commit

Permalink
fix: fix connection refer count in catalog manager
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed May 24, 2024
1 parent e7c1035 commit 215d1df
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 46 deletions.
32 changes: 26 additions & 6 deletions src/meta/src/manager/catalog/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ pub struct DatabaseManager {
pub(super) connections: BTreeMap<ConnectionId, Connection>,

/// Relation reference count mapping.
// TODO(zehua): avoid key conflicts after distinguishing table's and source's id generator.
pub(super) relation_ref_count: HashMap<RelationId, usize>,
/// Connection reference count mapping.
pub(super) connection_ref_count: HashMap<ConnectionId, usize>,
// In-progress creation tracker.
pub(super) in_progress_creation_tracker: HashSet<RelationKey>,
// In-progress creating streaming job tracker: this is a temporary workaround to avoid clean up
Expand All @@ -103,6 +104,7 @@ impl DatabaseManager {
let subscriptions = Subscription::list(env.meta_store().as_kv()).await?;

let mut relation_ref_count = HashMap::new();
let mut connection_ref_count = HashMap::new();

let databases = BTreeMap::from_iter(
databases
Expand All @@ -111,16 +113,18 @@ impl DatabaseManager {
);
let schemas = BTreeMap::from_iter(schemas.into_iter().map(|schema| (schema.id, schema)));
let sources = BTreeMap::from_iter(sources.into_iter().map(|source| {
// TODO(weili): wait for yezizp to refactor ref cnt
if let Some(connection_id) = source.connection_id {
*relation_ref_count.entry(connection_id).or_default() += 1;
*connection_ref_count.entry(connection_id).or_default() += 1;
}
(source.id, source)
}));
let sinks = BTreeMap::from_iter(sinks.into_iter().map(|sink| {
for depend_relation_id in &sink.dependent_relations {
*relation_ref_count.entry(*depend_relation_id).or_default() += 1;
}
if let Some(connection_id) = sink.connection_id {
*connection_ref_count.entry(connection_id).or_default() += 1;
}
(sink.id, sink)
}));
let subscriptions = BTreeMap::from_iter(subscriptions.into_iter().map(|subscription| {
Expand Down Expand Up @@ -157,6 +161,7 @@ impl DatabaseManager {
functions,
connections,
relation_ref_count,
connection_ref_count,
in_progress_creation_tracker: HashSet::default(),
in_progress_creation_streaming_job: HashMap::default(),
in_progress_creating_tables: HashMap::default(),
Expand Down Expand Up @@ -451,11 +456,11 @@ impl DatabaseManager {
&& self.views.values().all(|v| v.schema_id != schema_id)
}

pub fn increase_ref_count(&mut self, relation_id: RelationId) {
pub fn increase_relation_ref_count(&mut self, relation_id: RelationId) {
*self.relation_ref_count.entry(relation_id).or_insert(0) += 1;
}

pub fn decrease_ref_count(&mut self, relation_id: RelationId) {
pub fn decrease_relation_ref_count(&mut self, relation_id: RelationId) {
match self.relation_ref_count.entry(relation_id) {
Entry::Occupied(mut o) => {
*o.get_mut() -= 1;
Expand All @@ -467,6 +472,22 @@ impl DatabaseManager {
}
}

pub fn increase_connection_ref_count(&mut self, connection_id: ConnectionId) {
*self.connection_ref_count.entry(connection_id).or_insert(0) += 1;
}

pub fn decrease_connection_ref_count(&mut self, connection_id: ConnectionId) {
match self.connection_ref_count.entry(connection_id) {
Entry::Occupied(mut o) => {
*o.get_mut() -= 1;
if *o.get() == 0 {
o.remove_entry();
}
}
Entry::Vacant(_) => unreachable!(),
}
}

pub fn has_creation_in_database(&self, database_id: DatabaseId) -> bool {
self.in_progress_creation_tracker
.iter()
Expand Down Expand Up @@ -619,7 +640,6 @@ impl DatabaseManager {
}
}

// TODO(zehua): refactor when using SourceId.
pub fn ensure_table_view_or_source_id(&self, table_id: &TableId) -> MetaResult<()> {
if self.tables.contains_key(table_id)
|| self.sources.contains_key(table_id)
Expand Down
65 changes: 29 additions & 36 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,8 @@ impl CatalogManager {
for view in &views_to_drop {
database_core.relation_ref_count.remove(&view.id);
}
// TODO(weili): wait for yezizp to refactor ref cnt
for connection in &connections_to_drop {
database_core.relation_ref_count.remove(&connection.id);
database_core.connection_ref_count.remove(&connection.id);
}
for user in users_need_update {
self.notify_frontend(Operation::Update, Info::User(user))
Expand Down Expand Up @@ -521,12 +520,11 @@ impl CatalogManager {
let user_core = &mut core.user;
let mut connections = BTreeMapTransaction::new(&mut database_core.connections);

// TODO(weili): wait for yezizp to refactor ref cnt
match database_core.relation_ref_count.get(&conn_id) {
match database_core.connection_ref_count.get(&conn_id) {
Some(ref_count) => {
let connection_name = connections
.get(&conn_id)
.ok_or_else(|| anyhow!("connection not found"))?
.ok_or_else(|| MetaError::catalog_id_not_found("connection", conn_id))?
.name
.clone();
Err(MetaError::permission_denied(format!(
Expand All @@ -537,7 +535,7 @@ impl CatalogManager {
None => {
let connection = connections
.remove(conn_id)
.ok_or_else(|| anyhow!("connection not found"))?;
.ok_or_else(|| MetaError::catalog_id_not_found("connection", conn_id))?;

commit_meta!(self, connections)?;
user_core.decrease_ref(connection.owner);
Expand Down Expand Up @@ -633,7 +631,7 @@ impl CatalogManager {
user_core.increase_ref(view.owner);

for &dependent_relation_id in &view.dependent_relations {
database_core.increase_ref_count(dependent_relation_id);
database_core.increase_relation_ref_count(dependent_relation_id);
}

let version = self
Expand Down Expand Up @@ -682,7 +680,7 @@ impl CatalogManager {

let function = functions
.remove(function_id)
.ok_or_else(|| anyhow!("function not found"))?;
.ok_or_else(|| MetaError::catalog_id_not_found("function", function_id))?;

let objects = &[Object::FunctionId(function_id)];
let users_need_update = Self::update_user_privileges(&mut users, objects);
Expand Down Expand Up @@ -790,7 +788,6 @@ impl CatalogManager {
database_core.ensure_database_id(table.database_id)?;
database_core.ensure_schema_id(table.schema_id)?;
for dependent_id in &table.dependent_relations {
// TODO(zehua): refactor when using SourceId.
database_core.ensure_table_view_or_source_id(dependent_id)?;
}
#[cfg(not(test))]
Expand All @@ -811,7 +808,7 @@ impl CatalogManager {
commit_meta!(self, tables)?;

for &dependent_relation_id in &table.dependent_relations {
database_core.increase_ref_count(dependent_relation_id);
database_core.increase_relation_ref_count(dependent_relation_id);
}
user_core.increase_ref(table.owner);
Ok(())
Expand Down Expand Up @@ -1009,7 +1006,7 @@ impl CatalogManager {
if table.table_type != TableType::Internal as i32 {
// Recovered when init database manager.
for relation_id in &table.dependent_relations {
database_core.decrease_ref_count(*relation_id);
database_core.decrease_relation_ref_count(*relation_id);
}
// Recovered when init user manager.
tracing::debug!("decrease ref for {}", table.id);
Expand Down Expand Up @@ -1120,7 +1117,7 @@ impl CatalogManager {
{
let database_core = &mut core.database;
for &dependent_relation_id in &table.dependent_relations {
database_core.decrease_ref_count(dependent_relation_id);
database_core.decrease_relation_ref_count(dependent_relation_id);
}
}
}
Expand Down Expand Up @@ -1707,28 +1704,25 @@ impl CatalogManager {
// decrease dependent relations
for table in &tables_removed {
for dependent_relation_id in &table.dependent_relations {
database_core.decrease_ref_count(*dependent_relation_id);
database_core.decrease_relation_ref_count(*dependent_relation_id);
}
}

for view in &views_removed {
for dependent_relation_id in &view.dependent_relations {
database_core.decrease_ref_count(*dependent_relation_id);
database_core.decrease_relation_ref_count(*dependent_relation_id);
}
}

for sink in &sinks_removed {
if let Some(connection_id) = sink.connection_id {
// TODO(siyuan): wait for yezizp to refactor ref cnt
database_core.decrease_ref_count(connection_id);
}
refcnt_dec_connection(database_core, sink.connection_id);
for dependent_relation_id in &sink.dependent_relations {
database_core.decrease_ref_count(*dependent_relation_id);
database_core.decrease_relation_ref_count(*dependent_relation_id);
}
}

for subscription in &subscriptions_removed {
database_core.decrease_ref_count(subscription.dependent_table_id);
database_core.decrease_relation_ref_count(subscription.dependent_table_id);
}

let version = self
Expand Down Expand Up @@ -2743,7 +2737,7 @@ impl CatalogManager {
database_core
.get_connection(connection_id)
.cloned()
.ok_or_else(|| anyhow!(format!("could not find connection {}", connection_id)).into())
.ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))
}

pub async fn finish_create_source_procedure(
Expand Down Expand Up @@ -2954,7 +2948,7 @@ impl CatalogManager {
database_core.mark_creating(&key);
database_core.mark_creating_streaming_job(index_table.id, key);
for &dependent_relation_id in &index_table.dependent_relations {
database_core.increase_ref_count(dependent_relation_id);
database_core.increase_relation_ref_count(dependent_relation_id);
}
// index table and index.
user_core.increase_ref_count(index.owner, 2);
Expand All @@ -2975,7 +2969,7 @@ impl CatalogManager {
database_core.unmark_creating(&key);
database_core.unmark_creating_streaming_job(index_table.id);
for &dependent_relation_id in &index_table.dependent_relations {
database_core.decrease_ref_count(dependent_relation_id);
database_core.decrease_relation_ref_count(dependent_relation_id);
}
// index table and index.
user_core.decrease_ref_count(index.owner, 2);
Expand Down Expand Up @@ -3046,7 +3040,6 @@ impl CatalogManager {
database_core.ensure_database_id(sink.database_id)?;
database_core.ensure_schema_id(sink.schema_id)?;
for dependent_id in &sink.dependent_relations {
// TODO(zehua): refactor when using SourceId.
database_core.ensure_table_view_or_source_id(dependent_id)?;
}
let key = (sink.database_id, sink.schema_id, sink.name.clone());
Expand All @@ -3060,7 +3053,7 @@ impl CatalogManager {
database_core.mark_creating(&key);
database_core.mark_creating_streaming_job(sink.id, key);
for &dependent_relation_id in &sink.dependent_relations {
database_core.increase_ref_count(dependent_relation_id);
database_core.increase_relation_ref_count(dependent_relation_id);
}
user_core.increase_ref(sink.owner);
// We have validate the status of connection before starting the procedure.
Expand Down Expand Up @@ -3134,7 +3127,7 @@ impl CatalogManager {
database_core.unmark_creating(&key);
database_core.unmark_creating_streaming_job(sink.id);
for &dependent_relation_id in &sink.dependent_relations {
database_core.decrease_ref_count(dependent_relation_id);
database_core.decrease_relation_ref_count(dependent_relation_id);
}
user_core.decrease_ref(sink.owner);
refcnt_dec_connection(database_core, sink.connection_id);
Expand Down Expand Up @@ -3169,7 +3162,7 @@ impl CatalogManager {
} else {
database_core.mark_creating(&key);
database_core.mark_creating_streaming_job(subscription.id, key);
database_core.increase_ref_count(subscription.dependent_table_id);
database_core.increase_relation_ref_count(subscription.dependent_table_id);
user_core.increase_ref(subscription.owner);
let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions);
subscriptions.insert(subscription.id, subscription.clone());
Expand All @@ -3187,7 +3180,7 @@ impl CatalogManager {
let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions);
let mut subscription = subscriptions
.get(&subscription_id)
.ok_or_else(|| anyhow!("subscription not found"))?
.ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?
.clone();
subscription.created_at_cluster_version = Some(current_cluster_version());
subscription.created_at_epoch = Some(Epoch::now().0);
Expand Down Expand Up @@ -3223,7 +3216,7 @@ impl CatalogManager {
let subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions);
let subscription = subscriptions
.get(&subscription_id)
.ok_or_else(|| anyhow!("subscription not found"))?
.ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?
.clone();
assert_eq!(
subscription.subscription_state,
Expand Down Expand Up @@ -3272,7 +3265,7 @@ impl CatalogManager {

database_core.unmark_creating(&key);
database_core.unmark_creating_streaming_job(subscription.id);
database_core.decrease_ref_count(subscription.dependent_table_id);
database_core.decrease_relation_ref_count(subscription.dependent_table_id);
user_core.decrease_ref(subscription.owner);
}
Ok(())
Expand Down Expand Up @@ -3761,7 +3754,7 @@ impl CatalogManager {
.database
.subscriptions
.get(&subscription_id)
.ok_or_else(|| anyhow!("cant find subscription with id {}", subscription_id))?;
.ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
Ok(subscription.clone())
}

Expand Down Expand Up @@ -3950,7 +3943,7 @@ impl CatalogManager {
core.user_info
.get(&id)
.cloned()
.ok_or_else(|| anyhow!("User {} not found", id).into())
.ok_or_else(|| MetaError::catalog_id_not_found("user", id))
}

pub async fn drop_user(&self, id: UserId) -> MetaResult<NotificationVersion> {
Expand Down Expand Up @@ -4080,11 +4073,11 @@ impl CatalogManager {
let grantor_info = users
.get(&grantor)
.cloned()
.ok_or_else(|| anyhow!("User {} does not exist", &grantor))?;
.ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
for user_id in user_ids {
let mut user = users
.get_mut(*user_id)
.ok_or_else(|| anyhow!("User {} does not exist", user_id))?;
.ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;

if user.is_super {
return Err(MetaError::permission_denied(format!(
Expand Down Expand Up @@ -4207,7 +4200,7 @@ impl CatalogManager {
// check revoke permission
let revoke_by = users
.get(&revoke_by)
.ok_or_else(|| anyhow!("User {} does not exist", &revoke_by))?;
.ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
let same_user = granted_by == revoke_by.id;
if !revoke_by.is_super {
for privilege in revoke_grant_privileges {
Expand Down Expand Up @@ -4242,7 +4235,7 @@ impl CatalogManager {
let user = users
.get(user_id)
.cloned()
.ok_or_else(|| anyhow!("User {} does not exist", user_id))?;
.ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
if user.is_super {
return Err(MetaError::permission_denied(format!(
"Cannot revoke privilege from supper user {}",
Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/manager/catalog/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ pub fn refcnt_inc_connection(
) -> anyhow::Result<()> {
if let Some(connection_id) = connection_id {
if let Some(_conn) = database_mgr.get_connection(connection_id) {
// TODO(weili): wait for yezizp to refactor ref cnt
database_mgr.increase_ref_count(connection_id);
database_mgr.increase_connection_ref_count(connection_id);
} else {
bail!("connection {} not found.", connection_id);
}
Expand All @@ -36,7 +35,6 @@ pub fn refcnt_dec_connection(
connection_id: Option<ConnectionId>,
) {
if let Some(connection_id) = connection_id {
// TODO: wait for yezizp to refactor ref cnt
database_mgr.decrease_ref_count(connection_id);
database_mgr.decrease_connection_ref_count(connection_id);
}
}

0 comments on commit 215d1df

Please sign in to comment.