Skip to content

Commit

Permalink
refactor(meta): make unregister compaction group member fail fast (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Dec 4, 2023
1 parent ba4b196 commit a73d940
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ pub async fn start_service_as_election_leader(
.map(|t| t.id)
.collect_vec(),
)
.await?;
.await;

// Initialize services.
let backup_manager = BackupManager::new(
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,9 +724,9 @@ impl CommandContext {
let table_id = table_fragments.table_id().table_id;
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
if let Err(e) = self.hummock_manager.unregister_table_ids(&table_ids).await {
tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", &table_ids, e);
}
self.hummock_manager
.unregister_table_ids_fail_fast(&table_ids)
.await;

// NOTE(kwannoel): At this point, catalog manager has persisted the tables already.
// We need to cleanup the table state. So we can do it here.
Expand Down
13 changes: 3 additions & 10 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,9 @@ impl GlobalBarrierManager {
.await?;

// unregister compaction group for dirty table fragments.
let _ = self.hummock_manager
.unregister_table_fragments_vec(
&to_drop_table_fragments
)
.await.inspect_err(|e|
warn!(
"Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}",
to_drop_table_fragments,
e)
);
self.hummock_manager
.unregister_table_fragments_vec(&to_drop_table_fragments)
.await;

// clean up source connector dirty changes.
self.source_manager
Expand Down
43 changes: 20 additions & 23 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,21 @@ impl HummockManager {
}

/// Unregisters `table_fragments` from compaction groups
pub async fn unregister_table_fragments_vec(
&self,
table_fragments: &[TableFragments],
) -> Result<()> {
self.unregister_table_ids(
pub async fn unregister_table_fragments_vec(&self, table_fragments: &[TableFragments]) {
self.unregister_table_ids_fail_fast(
&table_fragments
.iter()
.flat_map(|t| t.all_table_ids())
.collect_vec(),
)
.await
.await;
}

/// Unregisters stale members and groups
/// The caller should ensure `table_fragments_list` remain unchanged during `purge`.
/// Currently `purge` is only called during meta service start ups.
#[named]
pub async fn purge(&self, valid_ids: &[u32]) -> Result<()> {
pub async fn purge(&self, valid_ids: &[u32]) {
let registered_members =
get_member_table_ids(&read_lock!(self, versioning).await.current_version);
let to_unregister = registered_members
Expand All @@ -161,8 +158,7 @@ impl HummockManager {
.collect_vec();
// As we have released versioning lock, the version that `to_unregister` is calculated from
// may not be the same as the one used in unregister_table_ids. It is OK.
self.unregister_table_ids(&to_unregister).await?;
Ok(())
self.unregister_table_ids_fail_fast(&to_unregister).await;
}

/// Prefer using `register_table_fragments`.
Expand Down Expand Up @@ -265,11 +261,8 @@ impl HummockManager {
Ok(())
}

/// Prefer using `unregister_table_fragments_vec`.
/// Only use `unregister_table_ids` when [`TableFragments`] is unavailable.
/// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
#[named]
pub async fn unregister_table_ids(&self, table_ids: &[StateTableId]) -> Result<()> {
async fn unregister_table_ids(&self, table_ids: &[StateTableId]) -> Result<()> {
if table_ids.is_empty() {
return Ok(());
}
Expand Down Expand Up @@ -376,12 +369,19 @@ impl HummockManager {
HashSet::from_iter(get_compaction_group_ids(&versioning.current_version)),
self.env.meta_store(),
)
.await
.inspect_err(|e| tracing::warn!("failed to purge stale compaction group config. {}", e))
.ok();
.await?;
Ok(())
}

/// Prefer using `unregister_table_fragments_vec`.
/// Only use `unregister_table_ids_fail_fast` when [`TableFragments`] is unavailable.
/// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
pub async fn unregister_table_ids_fail_fast(&self, table_ids: &[StateTableId]) {
self.unregister_table_ids(table_ids)
.await
.unwrap_or_else(|e| panic!("unregister table ids fail: {table_ids:?} {e}"));
}

pub async fn update_compaction_config(
&self,
compaction_group_ids: &[CompactionGroupId],
Expand Down Expand Up @@ -1082,17 +1082,15 @@ mod tests {
// Test unregister_table_fragments
compaction_group_manager
.unregister_table_fragments_vec(&[table_fragment_1.clone()])
.await
.unwrap();
.await;
assert_eq!(registered_number().await, 4);

// Test purge_stale_members: table fragments
compaction_group_manager
.purge(&table_fragment_2.all_table_ids().collect_vec())
.await
.unwrap();
.await;
assert_eq!(registered_number().await, 4);
compaction_group_manager.purge(&[]).await.unwrap();
compaction_group_manager.purge(&[]).await;
assert_eq!(registered_number().await, 0);

// Test `StaticCompactionGroupId::NewCompactionGroup` in `register_table_fragments`
Expand All @@ -1115,8 +1113,7 @@ mod tests {
// Test `StaticCompactionGroupId::NewCompactionGroup` in `unregister_table_fragments`
compaction_group_manager
.unregister_table_fragments_vec(&[table_fragment_1])
.await
.unwrap();
.await;
assert_eq!(registered_number().await, 0);
assert_eq!(group_number().await, 2);
}
Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,8 @@ pub async fn unregister_table_ids_from_compaction_group(
table_ids: &[u32],
) {
hummock_manager_ref
.unregister_table_ids(table_ids)
.await
.unwrap();
.unregister_table_ids_fail_fast(table_ids)
.await;
}

/// Generate keys like `001_key_test_00002` with timestamp `epoch`.
Expand Down
18 changes: 5 additions & 13 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,9 @@ impl GlobalStreamManager {
);
revert_funcs.push(Box::pin(async move {
if create_type == CreateType::Foreground {
if let Err(e) = hummock_manager_ref.unregister_table_ids(&registered_table_ids).await {
tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", registered_table_ids, e);
}
hummock_manager_ref
.unregister_table_ids_fail_fast(&registered_table_ids)
.await;
}
}));

Expand Down Expand Up @@ -577,17 +577,9 @@ impl GlobalStreamManager {
.await?;

// Unregister from compaction group afterwards.
if let Err(e) = self
.hummock_manager
self.hummock_manager
.unregister_table_fragments_vec(&table_fragments_vec)
.await
{
tracing::warn!(
"Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}",
table_fragments_vec,
e
);
}
.await;

Ok(())
}
Expand Down

0 comments on commit a73d940

Please sign in to comment.