Skip to content

Commit

Permalink
feat(storage): register and unregister member table id at commit epoch (
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Mar 26, 2024
1 parent d5ac14f commit 93adb82
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 239 deletions.
20 changes: 0 additions & 20 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use anyhow::Context;
use either::Either;
use etcd_client::ConnectOptions;
use futures::future::join_all;
use itertools::Itertools;
use otlp_embedded::TraceServiceServer;
use regex::Regex;
use risingwave_common::config::MetaBackend;
Expand Down Expand Up @@ -562,31 +561,12 @@ pub async fn start_service_as_election_leader(
metadata_manager.clone(),
barrier_scheduler.clone(),
source_manager.clone(),
hummock_manager.clone(),
stream_rpc_manager,
scale_controller.clone(),
)
.unwrap(),
);

let all_state_table_ids = match &metadata_manager {
MetadataManager::V1(mgr) => mgr
.catalog_manager
.list_tables()
.await
.into_iter()
.map(|t| t.id)
.collect_vec(),
MetadataManager::V2(mgr) => mgr
.catalog_controller
.list_all_state_table_ids()
.await?
.into_iter()
.map(|id| id as u32)
.collect_vec(),
};
hummock_manager.purge(&all_state_table_ids).await;

// Initialize services.
let backup_manager = BackupManager::new(
env.clone(),
Expand Down
36 changes: 25 additions & 11 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use thiserror_ext::AsReport;
use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo};
use super::trace::TracedEpoch;
use crate::barrier::GlobalBarrierManagerContext;
use crate::manager::{DdlType, LocalNotification, MetadataManager, WorkerId};
use crate::manager::{DdlType, MetadataManager, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::MetaResult;
Expand Down Expand Up @@ -141,7 +141,11 @@ pub enum Command {
/// Barriers from the actors to be dropped will STILL be collected.
/// After the barrier is collected, it notifies the local stream manager of compute nodes to
/// drop actors, and then delete the table fragments info from meta store.
DropStreamingJobs(Vec<ActorId>),
DropStreamingJobs {
actors: Vec<ActorId>,
unregistered_table_fragment_ids: HashSet<TableId>,
unregistered_state_table_ids: HashSet<TableId>,
},

/// `CreateStreamingJob` command generates a `Add` barrier by given info.
///
Expand Down Expand Up @@ -214,7 +218,7 @@ impl Command {
Command::Plain(_) => None,
Command::Pause(_) => None,
Command::Resume(_) => None,
Command::DropStreamingJobs(actors) => Some(CommandActorChanges {
Command::DropStreamingJobs { actors, .. } => Some(CommandActorChanges {
to_add: Default::default(),
to_remove: actors.iter().cloned().collect(),
}),
Expand Down Expand Up @@ -401,7 +405,7 @@ impl CommandContext {
}))
}

Command::DropStreamingJobs(actors) => Some(Mutation::Stop(StopMutation {
Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
actors: actors.clone(),
})),

Expand Down Expand Up @@ -793,9 +797,22 @@ impl CommandContext {
.await;
}

Command::DropStreamingJobs(actors) => {
Command::DropStreamingJobs {
actors,
unregistered_state_table_ids,
..
} => {
// Tell compute nodes to drop actors.
self.clean_up(actors.clone()).await?;

let unregistered_state_table_ids = unregistered_state_table_ids
.iter()
.map(|table_id| table_id.table_id)
.collect_vec();
self.barrier_manager_context
.hummock_manager
.unregister_table_ids(&unregistered_state_table_ids)
.await?;
}

Command::CancelStreamingJob(table_fragments) => {
Expand All @@ -815,12 +832,9 @@ impl CommandContext {
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
self.barrier_manager_context
.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
table_ids,
))
.await;
.hummock_manager
.unregister_table_ids(&table_ids)
.await?;

match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
Expand Down
23 changes: 21 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use crate::barrier::notifier::BarrierInfo;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::state::BarrierManagerState;
use crate::hummock::{CommitEpochInfo, HummockManagerRef};
use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo};
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{
ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
Expand Down Expand Up @@ -819,7 +819,7 @@ impl GlobalBarrierManagerContext {
assert!(state.node_to_collect.is_empty());
let resps = state.resps;
let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer();
let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps);
let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps, &command_ctx);
if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await {
for notifier in notifiers {
notifier.notify_collection_failed(e.clone());
Expand Down Expand Up @@ -1106,6 +1106,7 @@ pub type BarrierManagerRef = GlobalBarrierManagerContext;

fn collect_commit_epoch_info(
resps: Vec<BarrierCompleteResponse>,
command_ctx: &CommandContext,
) -> (CommitEpochInfo, Vec<CreateMviewProgress>) {
let mut sst_to_worker: HashMap<HummockSstableObjectId, WorkerId> = HashMap::new();
let mut synced_ssts: Vec<ExtendedSstableInfo> = vec![];
Expand All @@ -1125,6 +1126,23 @@ fn collect_commit_epoch_info(
table_watermarks.push(resp.table_watermarks);
progresses.extend(resp.create_mview_progress);
}
let new_table_fragment_info = if let Command::CreateStreamingJob {
table_fragments, ..
} = &command_ctx.command
{
Some(NewTableFragmentInfo {
table_id: table_fragments.table_id(),
mv_table_id: table_fragments.mv_table_id().map(TableId::new),
internal_table_ids: table_fragments
.internal_table_ids()
.into_iter()
.map(TableId::new)
.collect(),
})
} else {
None
};

let info = CommitEpochInfo::new(
synced_ssts,
merge_multiple_new_table_watermarks(
Expand All @@ -1144,6 +1162,7 @@ fn collect_commit_epoch_info(
.collect_vec(),
),
sst_to_worker,
new_table_fragment_info,
);
(info, progresses)
}
113 changes: 60 additions & 53 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::DefaultParallelism;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::meta::table_fragments::State;
Expand All @@ -47,7 +46,7 @@ use crate::barrier::schedule::ScheduledBarriers;
use crate::barrier::state::BarrierManagerState;
use crate::barrier::{Command, GlobalBarrierManager, GlobalBarrierManagerContext};
use crate::controller::catalog::ReleaseContext;
use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, MetadataManager, WorkerId};
use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId};
use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
use crate::MetaResult;
Expand Down Expand Up @@ -91,41 +90,19 @@ impl GlobalBarrierManagerContext {
.collect();
debug!("clean dirty table fragments: {:?}", to_drop_streaming_ids);

let unregister_table_ids = mgr
let _unregister_table_ids = mgr
.fragment_manager
.drop_table_fragments_vec(&to_drop_streaming_ids)
.await?;

// unregister compaction group for dirty table fragments.
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
unregister_table_ids,
))
.await;

// clean up source connector dirty changes.
self.source_manager
.drop_source_fragments(&to_drop_table_fragments)
.await;
}
MetadataManager::V2(mgr) => {
let ReleaseContext {
state_table_ids,
source_ids,
..
} = mgr.catalog_controller.clean_dirty_creating_jobs().await?;

// unregister compaction group for cleaned state tables.
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
state_table_ids
.into_iter()
.map(|id| id as StateTableId)
.collect_vec(),
))
.await;
let ReleaseContext { source_ids, .. } =
mgr.catalog_controller.clean_dirty_creating_jobs().await?;

// unregister cleaned sources.
self.source_manager
Expand All @@ -137,6 +114,27 @@ impl GlobalBarrierManagerContext {
Ok(())
}

async fn purge_state_table_from_hummock(&self) -> MetaResult<()> {
let all_state_table_ids = match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr
.catalog_manager
.list_tables()
.await
.into_iter()
.map(|t| t.id)
.collect_vec(),
MetadataManager::V2(mgr) => mgr
.catalog_controller
.list_all_state_table_ids()
.await?
.into_iter()
.map(|id| id as u32)
.collect_vec(),
};
self.hummock_manager.purge(&all_state_table_ids).await?;
Ok(())
}

async fn recover_background_mv_progress(&self) -> MetaResult<()> {
match &self.metadata_manager {
MetadataManager::V1(_) => self.recover_background_mv_progress_v1().await,
Expand Down Expand Up @@ -317,27 +315,30 @@ impl GlobalBarrierManagerContext {
let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled();
let applied = !dropped_actors.is_empty() || !cancelled.is_empty();
if !cancelled.is_empty() {
match &self.metadata_manager {
let unregister_table_ids = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let unregister_table_ids = mgr
.fragment_manager
mgr.fragment_manager
.drop_table_fragments_vec(&cancelled)
.await?;
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::UnregisterTablesFromHummock(
unregister_table_ids,
))
.await;
.await?
}
MetadataManager::V2(mgr) => {
let mut unregister_table_ids = Vec::new();
for job_id in cancelled {
mgr.catalog_controller
let (_, table_ids_to_unregister) = mgr
.catalog_controller
.try_abort_creating_streaming_job(job_id.table_id as _, true)
.await?;
unregister_table_ids.extend(table_ids_to_unregister);
}
unregister_table_ids
.into_iter()
.map(|table_id| table_id as u32)
.collect()
}
}
};
self.hummock_manager
.unregister_table_ids(&unregister_table_ids)
.await?;
}
Ok(applied)
}
Expand All @@ -364,29 +365,33 @@ impl GlobalBarrierManager {
.abort_and_mark_blocked("cluster is under recovering");

tracing::info!("recovery start!");
self.context
.clean_dirty_streaming_jobs()
.await
.expect("clean dirty streaming jobs");

self.context.sink_manager.reset().await;
let retry_strategy = Self::get_retry_strategy();

// Mview progress needs to be recovered.
tracing::info!("recovering mview progress");
self.context
.recover_background_mv_progress()
.await
.expect("recover mview progress should not fail");
tracing::info!("recovered mview progress");

// We take retry into consideration because this is the latency user sees for a cluster to
// get recovered.
let recovery_timer = self.context.metrics.recovery_latency.start_timer();

let new_state = tokio_retry::Retry::spawn(retry_strategy, || {
async {
let recovery_result: MetaResult<_> = try {
self.context
.clean_dirty_streaming_jobs()
.await
.context("clean dirty streaming jobs")?;

self.context
.purge_state_table_from_hummock()
.await
.context("purge state table from hummock")?;

// Mview progress needs to be recovered.
tracing::info!("recovering mview progress");
self.context
.recover_background_mv_progress()
.await
.context("recover mview progress should not fail")?;
tracing::info!("recovered mview progress");

// This is a quick path to accelerate the process of dropping and canceling streaming jobs.
let _ = self
.context
Expand Down Expand Up @@ -449,6 +454,8 @@ impl GlobalBarrierManager {
warn!(error = %err.as_report(), "reset compute nodes failed");
})?;

self.context.sink_manager.reset().await;

if self
.context
.pre_apply_drop_cancel(&self.scheduled_barriers)
Expand Down
Loading

0 comments on commit 93adb82

Please sign in to comment.