Skip to content

Commit

Permalink
Add migration timeout; update logging terminology; scale worker slots…
Browse files Browse the repository at this point in the history
… dynamically based on need
  • Loading branch information
shanicky committed Jul 19, 2024
1 parent 864f55a commit 777760b
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 48 deletions.
169 changes: 124 additions & 45 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResize
use crate::{model, MetaError, MetaResult};

impl GlobalBarrierManager {
// Migration timeout.
const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(100);
// Retry base interval in milliseconds.
const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
// Retry max interval.
Expand Down Expand Up @@ -470,16 +472,13 @@ impl GlobalBarrierManagerContext {
.collect();

if expired_worker_slots.is_empty() {
debug!("no expired parallel units, skipping.");
debug!("no expired worker slots, skipping.");
return self.resolve_actor_info(active_nodes).await;
}

debug!("start migrate actors.");
let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
debug!(
"got to migrate parallel units {:#?}",
to_migrate_worker_slots
);
debug!("got to migrate worker slots {:#?}", to_migrate_worker_slots);

let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
.intersection(&active_worker_slots)
Expand All @@ -489,15 +488,51 @@ impl GlobalBarrierManagerContext {
let start = Instant::now();
let mut plan = HashMap::new();
'discovery: while !to_migrate_worker_slots.is_empty() {
let new_worker_slots = active_nodes
let mut new_worker_slots = active_nodes
.current()
.values()
.flat_map(|node| {
(0..node.parallelism)
.map(|idx| WorkerSlotId::new(node.id, idx as _))
.filter(|worker_slot| !inuse_worker_slots.contains(worker_slot))
.flat_map(|worker| {
(0..worker.parallelism).map(move |i| WorkerSlotId::new(worker.id, i as _))
})
.collect_vec();

let all_size = new_worker_slots.len();

new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
let to_migration_size = to_migrate_worker_slots.len();
let available_size = new_worker_slots.len();
let used_size = all_size - available_size;

if available_size < to_migration_size
&& start.elapsed() > GlobalBarrierManager::RECOVERY_FORCE_MIGRATION_TIMEOUT
{
let factor =
(((to_migration_size + used_size) as f32) / (all_size as f32)).ceil() as u32;
let mut extended_worker_slots = active_nodes
.current()
.values()
.flat_map(|node| {
(0..node.parallelism * factor)
.map(|idx| WorkerSlotId::new(node.id, idx as _))
.filter(|worker_slot| !inuse_worker_slots.contains(worker_slot))
})
.collect_vec();

extended_worker_slots.sort_by(|a, b| {
a.slot_idx()
.cmp(&b.slot_idx())
.then(a.worker_id().cmp(&b.worker_id()))
});

tracing::info!(
"migration timed out, extending worker slots to {:?} by factor {}",
extended_worker_slots,
factor,
);

new_worker_slots = extended_worker_slots;
}

if !new_worker_slots.is_empty() {
debug!("new worker slots found: {:#?}", new_worker_slots);
for target_worker_slot in new_worker_slots {
Expand All @@ -520,20 +555,24 @@ impl GlobalBarrierManagerContext {

// wait to get newly joined CN
let changed = active_nodes
.wait_changed(Duration::from_millis(5000), |active_nodes| {
let current_nodes = active_nodes
.current()
.values()
.map(|node| (node.id, &node.host, node.parallelism))
.collect_vec();
warn!(
current_nodes = ?current_nodes,
"waiting for new workers to join, elapsed: {}s",
start.elapsed().as_secs()
);
})
.wait_changed(
Duration::from_millis(5000),
GlobalBarrierManager::RECOVERY_FORCE_MIGRATION_TIMEOUT,
|active_nodes| {
let current_nodes = active_nodes
.current()
.values()
.map(|node| (node.id, &node.host, node.parallelism))
.collect_vec();
warn!(
current_nodes = ?current_nodes,
"waiting for new workers to join, elapsed: {}s",
start.elapsed().as_secs()
);
},
)
.await;
warn!(?changed, "get worker changed. Retry migrate");
warn!(?changed, "get worker changed or timed out. Retry migrate");
}

mgr.catalog_controller.migrate_actors(plan).await?;
Expand Down Expand Up @@ -768,7 +807,7 @@ impl GlobalBarrierManagerContext {
.sum();

if available_parallelism == 0 {
return Err(anyhow!("no available parallel units for auto scaling").into());
return Err(anyhow!("no available worker slots for auto scaling").into());
}

let all_table_parallelisms: HashMap<_, _> = {
Expand Down Expand Up @@ -896,7 +935,7 @@ impl GlobalBarrierManagerContext {
cached_plan.worker_slot_plan.retain(|from, to| {
if to_migrate_worker_slots.contains(from) {
if !to_migrate_worker_slots.contains(to) {
// clean up target parallel units in migration plan that are expired and not
// clean up target worker slots in migration plan that are expired and not
// used by any actors.
return !expired_workers.contains(&to.worker_id());
}
Expand All @@ -908,18 +947,15 @@ impl GlobalBarrierManagerContext {
inuse_worker_slots.extend(cached_plan.worker_slot_plan.values());

if to_migrate_worker_slots.is_empty() {
// all expired parallel units are already in migration plan.
debug!("all expired parallel units are already in migration plan.");
// all expired worker slots are already in migration plan.
debug!("all expired worker slots are already in migration plan.");
return Ok(cached_plan);
}
let mut to_migrate_worker_slots = to_migrate_worker_slots.into_iter().rev().collect_vec();
debug!(
"got to migrate parallel units {:#?}",
to_migrate_worker_slots
);
debug!("got to migrate worker slots {:#?}", to_migrate_worker_slots);

let start = Instant::now();
// if in-used expire parallel units are not empty, should wait for newly joined worker.
// if in-used expire worker slots are not empty, should wait for newly joined worker.
'discovery: while !to_migrate_worker_slots.is_empty() {
let mut new_worker_slots = active_nodes
.current()
Expand All @@ -929,8 +965,47 @@ impl GlobalBarrierManagerContext {
})
.collect_vec();

let all_size = new_worker_slots.len();

new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));

let to_migration_size = to_migrate_worker_slots.len();
let available_size = new_worker_slots.len();
let used_size = all_size - available_size;

if available_size < to_migration_size
&& start.elapsed() > GlobalBarrierManager::RECOVERY_FORCE_MIGRATION_TIMEOUT
{
let factor =
(((to_migration_size + used_size) as f32) / (all_size as f32)).ceil() as u32;

let mut extended_worker_slots = active_nodes
.current()
.values()
.flat_map(|worker| {
(0..worker.parallelism * factor)
.map(move |i| WorkerSlotId::new(worker.id, i as _))
})
.collect_vec();

extended_worker_slots
.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));

extended_worker_slots.sort_by(|a, b| {
a.slot_idx()
.cmp(&b.slot_idx())
.then(a.worker_id().cmp(&b.worker_id()))
});

tracing::info!(
"migration timed out, extending worker slots to {:?} by factor {}",
extended_worker_slots,
factor,
);

new_worker_slots = extended_worker_slots;
}

if !new_worker_slots.is_empty() {
debug!("new worker slots found: {:#?}", new_worker_slots);
for target_worker_slot in new_worker_slots {
Expand All @@ -955,20 +1030,24 @@ impl GlobalBarrierManagerContext {

// wait to get newly joined CN
let changed = active_nodes
.wait_changed(Duration::from_millis(5000), |active_nodes| {
let current_nodes = active_nodes
.current()
.values()
.map(|node| (node.id, &node.host, &node.parallelism))
.collect_vec();
warn!(
current_nodes = ?current_nodes,
"waiting for new workers to join, elapsed: {}s",
start.elapsed().as_secs()
);
})
.wait_changed(
Duration::from_millis(5000),
GlobalBarrierManager::RECOVERY_FORCE_MIGRATION_TIMEOUT,
|active_nodes| {
let current_nodes = active_nodes
.current()
.values()
.map(|node| (node.id, &node.host, &node.parallelism))
.collect_vec();
warn!(
current_nodes = ?current_nodes,
"waiting for new workers to join, elapsed: {}s",
start.elapsed().as_secs()
);
},
)
.await;
warn!(?changed, "get worker changed. Retry migrate");
warn!(?changed, "get worker changed or timed out. Retry migrate");
}

// update migration plan, if there is a chain in the plan, update it.
Expand Down
13 changes: 10 additions & 3 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_pb::stream_plan::{PbDispatchStrategy, StreamActor};
use risingwave_pb::stream_service::BuildActorInfo;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
use tokio::time::sleep;
use tokio::time::{sleep, Instant};
use tracing::warn;

use crate::barrier::Reschedule;
Expand Down Expand Up @@ -104,14 +104,21 @@ impl ActiveStreamingWorkerNodes {
pub(crate) async fn wait_changed(
&mut self,
verbose_internal: Duration,
verbose_timeout: Duration,
verbose_fn: impl Fn(&Self),
) -> ActiveStreamingWorkerChange {
) -> Option<ActiveStreamingWorkerChange> {
let start = Instant::now();
loop {
if let Either::Left((change, _)) =
select(pin!(self.changed()), pin!(sleep(verbose_internal))).await
{
break change;
break Some(change);
}

if start.elapsed() > verbose_timeout {
break None;
}

verbose_fn(self)
}
}
Expand Down

0 comments on commit 777760b

Please sign in to comment.