Skip to content

Commit

Permalink
fix(meta): unregister tables from hummock manager during recovery (#1…
Browse files Browse the repository at this point in the history
…5355)

# Conflicts:
#	src/meta/src/barrier/recovery.rs
#	src/meta/src/manager/metadata.rs
#	src/meta/src/stream/stream_manager.rs
  • Loading branch information
zwang28 committed Mar 7, 2024
1 parent 871dee8 commit b283302
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 57 deletions.
9 changes: 6 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use thiserror_ext::AsReport;
use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo};
use super::trace::TracedEpoch;
use crate::barrier::GlobalBarrierManagerContext;
use crate::manager::{DdlType, MetadataManager, WorkerId};
use crate::manager::{DdlType, LocalNotification, MetadataManager, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::MetaResult;
Expand Down Expand Up @@ -820,8 +820,11 @@ impl CommandContext {
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
self.barrier_manager_context
.hummock_manager
.unregister_table_ids_fail_fast(&table_ids)
.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
table_ids,
))
.await;

match &self.barrier_manager_context.metadata_manager {
Expand Down
30 changes: 21 additions & 9 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::barrier::schedule::ScheduledBarriers;
use crate::barrier::state::BarrierManagerState;
use crate::barrier::{Command, GlobalBarrierManagerContext};
use crate::controller::catalog::ReleaseContext;
use crate::manager::{MetadataManager, WorkerId};
use crate::manager::{LocalNotification, MetadataManager, WorkerId};
use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
use crate::MetaResult;
Expand Down Expand Up @@ -83,13 +83,17 @@ impl GlobalBarrierManagerContext {
.collect();
debug!("clean dirty table fragments: {:?}", to_drop_streaming_ids);

mgr.fragment_manager
let unregister_table_ids = mgr
.fragment_manager
.drop_table_fragments_vec(&to_drop_streaming_ids)
.await?;

// unregister compaction group for dirty table fragments.
self.hummock_manager
.unregister_table_fragments_vec(&to_drop_table_fragments)
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
unregister_table_ids,
))
.await;

// clean up source connector dirty changes.
Expand All @@ -105,13 +109,14 @@ impl GlobalBarrierManagerContext {
} = mgr.catalog_controller.clean_dirty_creating_jobs().await?;

// unregister compaction group for cleaned state tables.
self.hummock_manager
.unregister_table_ids_fail_fast(
&state_table_ids
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
state_table_ids
.into_iter()
.map(|id| id as StateTableId)
.collect_vec(),
)
))
.await;

// unregister cleaned sources.
Expand Down Expand Up @@ -307,9 +312,16 @@ impl GlobalBarrierManagerContext {
if !cancelled.is_empty() {
match &self.metadata_manager {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
let unregister_table_ids = mgr
.fragment_manager
.drop_table_fragments_vec(&cancelled)
.await?;
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
unregister_table_ids,
))
.await;
}
MetadataManager::V2(mgr) => {
for job_id in cancelled {
Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}
use crate::manager::MetaSrvEnv;
use crate::model::{
BTreeMapEntryTransaction, BTreeMapEntryTransactionWrapper, BTreeMapTransaction,
BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, TableFragments, ValTransaction,
BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, ValTransaction,
};
use crate::storage::MetaStore;

Expand Down Expand Up @@ -142,8 +142,12 @@ impl HummockManager {
Ok(pairs.iter().map(|(table_id, ..)| *table_id).collect_vec())
}

#[cfg(test)]
/// Unregisters `table_fragments` from compaction groups
pub async fn unregister_table_fragments_vec(&self, table_fragments: &[TableFragments]) {
pub async fn unregister_table_fragments_vec(
&self,
table_fragments: &[crate::model::TableFragments],
) {
self.unregister_table_ids_fail_fast(
&table_fragments
.iter()
Expand All @@ -169,8 +173,6 @@ impl HummockManager {
self.unregister_table_ids_fail_fast(&to_unregister).await;
}

/// Prefer using `register_table_fragments`.
/// Use `register_table_ids` only when [`TableFragments`] is unavailable.
/// The implementation acquires `versioning` lock.
#[named]
pub async fn register_table_ids(
Expand Down Expand Up @@ -392,8 +394,6 @@ impl HummockManager {
Ok(())
}

/// Prefer using `unregister_table_fragments_vec`.
/// Only use `unregister_table_ids_fail_fast` when [`TableFragments`] is unavailable.
/// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
pub async fn unregister_table_ids_fail_fast(&self, table_ids: &[StateTableId]) {
self.unregister_table_ids(table_ids)
Expand Down
49 changes: 20 additions & 29 deletions src/meta/src/hummock/manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::common::WorkerType;
use sync_point::sync_point;
use thiserror_ext::AsReport;
use tokio::task::JoinHandle;
use tokio_retry::strategy::{jitter, ExponentialBackoff};

use crate::hummock::utils::RetryableError;
use crate::hummock::{HummockManager, HummockManagerRef};
use crate::manager::LocalNotification;

Expand Down Expand Up @@ -96,32 +92,27 @@ impl HummockManager {
}

async fn handle_local_notification(&self, notification: LocalNotification) {
let retry_strategy = ExponentialBackoff::from_millis(10)
.max_delay(Duration::from_secs(60))
.map(jitter);
if let LocalNotification::WorkerNodeDeleted(worker_node) = notification {
if worker_node.get_type().unwrap() == WorkerType::Compactor {
self.compactor_manager.remove_compactor(worker_node.id);
}
tokio_retry::RetryIf::spawn(
retry_strategy.clone(),
|| async {
if let Err(err) = self.release_contexts(vec![worker_node.id]).await {
tracing::warn!(
error = %err.as_report(),
"Failed to release hummock context {}, will retry",
match notification {
LocalNotification::WorkerNodeDeleted(worker_node) => {
if worker_node.get_type().unwrap() == WorkerType::Compactor {
self.compactor_manager.remove_compactor(worker_node.id);
}
self.release_contexts(vec![worker_node.id])
.await
.unwrap_or_else(|err| {
panic!(
"Failed to release hummock context {}, error={}",
worker_node.id,
);
return Err(err);
}
Ok(())
},
RetryableError::default(),
)
.await
.expect("retry until success");
tracing::info!("Released hummock context {}", worker_node.id);
sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
err.as_report()
)
});
tracing::info!("Released hummock context {}", worker_node.id);
sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
}
LocalNotification::UnregisterTablesFromHummock(table_ids) => {
self.unregister_table_ids_fail_fast(&table_ids).await;
}
_ => {}
}
}
}
9 changes: 7 additions & 2 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,10 @@ impl FragmentManager {
/// tables.
/// If table fragments already deleted, this should just be noop,
/// the delete function (`table_fragments.remove`) will not return an error.
pub async fn drop_table_fragments_vec(&self, table_ids: &HashSet<TableId>) -> MetaResult<()> {
pub async fn drop_table_fragments_vec(
&self,
table_ids: &HashSet<TableId>,
) -> MetaResult<Vec<u32>> {
let mut guard = self.core.write().await;
let current_revision = guard.table_revision;

Expand All @@ -559,7 +562,9 @@ impl FragmentManager {

let mut dirty_sink_into_table_upstream_fragment_id = HashSet::new();
let mut table_fragments = BTreeMapTransaction::new(map);
let mut table_ids_to_unregister_from_hummock = vec![];
for table_fragment in &to_delete_table_fragments {
table_ids_to_unregister_from_hummock.extend(table_fragment.all_table_ids());
table_fragments.remove(table_fragment.table_id());
let to_remove_actor_ids: HashSet<_> = table_fragment.actor_ids().into_iter().collect();
let dependent_table_ids = table_fragment.dependent_table_ids();
Expand Down Expand Up @@ -634,7 +639,7 @@ impl FragmentManager {
}
}

Ok(())
Ok(table_ids_to_unregister_from_hummock)
}

// When dropping sink into a table, there could be an unexpected meta reboot. At this time, the sink’s catalog might have been deleted,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub enum LocalNotification {
SystemParamsChange(SystemParamsReader),
FragmentMappingsUpsert(Vec<FragmentId>),
FragmentMappingsDelete(Vec<FragmentId>),
UnregisterTablesFromHummock(Vec<u32>),
}

#[derive(Debug)]
Expand Down
26 changes: 18 additions & 8 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tracing::Instrument;
use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy};
use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager};
use crate::hummock::HummockManagerRef;
use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob};
use crate::manager::{DdlType, LocalNotification, MetaSrvEnv, MetadataManager, StreamingJob};
use crate::model::{ActorId, TableFragments, TableParallelism};
use crate::stream::SourceManagerRef;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -424,10 +424,13 @@ impl GlobalStreamManager {
registered_table_ids.len(),
table_fragments.internal_table_ids().len() + mv_table_id.map_or(0, |_| 1)
);
let notification_manager_ref = self.env.notification_manager_ref();
revert_funcs.push(Box::pin(async move {
if create_type == CreateType::Foreground {
hummock_manager_ref
.unregister_table_ids_fail_fast(&registered_table_ids)
notification_manager_ref
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
registered_table_ids,
))
.await;
}
}));
Expand Down Expand Up @@ -568,8 +571,11 @@ impl GlobalStreamManager {
tracing::error!(error = ?err.as_report(), "failed to run drop command");
});

self.hummock_manager
.unregister_table_ids_fail_fast(&state_table_ids)
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
state_table_ids,
))
.await;
}
}
Expand All @@ -586,7 +592,8 @@ impl GlobalStreamManager {
.await;

// Drop table fragments directly.
mgr.fragment_manager
let unregister_table_ids = mgr
.fragment_manager
.drop_table_fragments_vec(&table_ids.into_iter().collect())
.await?;

Expand All @@ -604,8 +611,11 @@ impl GlobalStreamManager {
});

// Unregister from compaction group afterwards.
self.hummock_manager
.unregister_table_fragments_vec(&table_fragments_vec)
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
unregister_table_ids,
))
.await;

Ok(())
Expand Down

0 comments on commit b283302

Please sign in to comment.