Skip to content

Commit

Permalink
feat: Trigger scale-out when adding a worker. (#13283)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Nov 24, 2023
1 parent ccd498f commit ac2a842
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 72 deletions.
4 changes: 4 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ pub struct MetaConfig {
#[serde(default)]
pub enable_scale_in_when_recovery: bool,

/// Whether to enable auto-scaling feature.
#[serde(default)]
pub enable_automatic_parallelism_control: bool,

#[serde(default = "default::meta::meta_leader_lease_secs")]
pub meta_leader_lease_secs: u64,

Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ min_delta_log_num_for_hummock_version_checkpoint = 10
max_heartbeat_interval_secs = 300
disable_recovery = false
enable_scale_in_when_recovery = false
enable_automatic_parallelism_control = false
meta_leader_lease_secs = 30
default_parallelism = "Full"
enable_compaction_deterministic = false
Expand Down
3 changes: 3 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
MetaOpts {
enable_recovery: !config.meta.disable_recovery,
enable_scale_in_when_recovery: config.meta.enable_scale_in_when_recovery,
enable_automatic_parallelism_control: config
.meta
.enable_automatic_parallelism_control,
in_flight_barrier_nums,
max_idle_ms,
compaction_deterministic_test: config.meta.enable_compaction_deterministic,
Expand Down
4 changes: 4 additions & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,10 @@ pub async fn start_service_as_election_leader(
Duration::from_secs(1),
));
sub_tasks.push(GlobalBarrierManager::start(barrier_manager));

if env.opts.enable_automatic_parallelism_control {
sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
}
}
let (idle_send, idle_recv) = tokio::sync::oneshot::channel();
sub_tasks.push(IdleManager::start_idle_checker(
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub struct MetaOpts {
pub enable_recovery: bool,
/// Whether to enable the scale-in feature when compute-node is removed.
pub enable_scale_in_when_recovery: bool,
/// Whether to enable the auto-scaling feature when compute-node is joined.
/// The semantics of this configuration will be expanded in the future to control the automatic scaling of the entire cluster.
pub enable_automatic_parallelism_control: bool,
/// The maximum number of barriers in-flight in the compute nodes.
pub in_flight_barrier_nums: usize,
/// After specified seconds of idle (no mview or flush), the process will be exited.
Expand Down Expand Up @@ -186,6 +189,7 @@ impl MetaOpts {
Self {
enable_recovery,
enable_scale_in_when_recovery: false,
enable_automatic_parallelism_control: false,
in_flight_barrier_nums: 40,
max_idle_ms: 0,
compaction_deterministic_test: false,
Expand Down
221 changes: 217 additions & 4 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ use std::cmp::{min, Ordering};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
use std::iter::repeat;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Context};
use futures::future::BoxFuture;
use itertools::Itertools;
use num_integer::Integer;
use num_traits::abs;
Expand All @@ -26,7 +28,9 @@ use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::hash::{ActorMapping, ParallelUnitId, VirtualNode};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_pb::common::{ActorInfo, ParallelUnit, WorkerNode};
use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy};
use risingwave_pb::meta::get_reschedule_plan_request::{
PbWorkerChanges, Policy, StableResizePolicy,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
Expand All @@ -37,17 +41,23 @@ use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor,
use risingwave_pb::stream_service::{
BroadcastActorInfoTableRequest, BuildActorsRequest, UpdateActorsRequest,
};
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use uuid::Uuid;

use crate::barrier::Reschedule;
use crate::manager::{ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, WorkerId};
use crate::barrier::{Command, Reschedule};
use crate::manager::{
ClusterManagerRef, FragmentManagerRef, IdCategory, LocalNotification, MetaSrvEnv, WorkerId,
};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::serving::{
to_deleted_fragment_parallel_unit_mapping, to_fragment_parallel_unit_mapping,
ServingVnodeMapping,
};
use crate::storage::{MetaStore, MetaStoreError, MetaStoreRef, Transaction, DEFAULT_COLUMN_FAMILY};
use crate::stream::SourceManagerRef;
use crate::stream::{GlobalStreamManager, SourceManagerRef};
use crate::{MetaError, MetaResult};

#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
Expand Down Expand Up @@ -2034,3 +2044,206 @@ impl ScaleController {
Ok(())
}
}

impl GlobalStreamManager {
pub async fn reschedule_actors(
&self,
reschedules: HashMap<FragmentId, ParallelUnitReschedule>,
options: RescheduleOptions,
) -> MetaResult<()> {
let mut revert_funcs = vec![];
if let Err(e) = self
.reschedule_actors_impl(&mut revert_funcs, reschedules, options)
.await
{
for revert_func in revert_funcs.into_iter().rev() {
revert_func.await;
}
return Err(e);
}

Ok(())
}

async fn reschedule_actors_impl(
&self,
revert_funcs: &mut Vec<BoxFuture<'_, ()>>,
reschedules: HashMap<FragmentId, ParallelUnitReschedule>,
options: RescheduleOptions,
) -> MetaResult<()> {
let (reschedule_fragment, applied_reschedules) = self
.scale_controller
.prepare_reschedule_command(reschedules, options)
.await?;

tracing::debug!("reschedule plan: {:#?}", reschedule_fragment);

let command = Command::RescheduleFragment {
reschedules: reschedule_fragment,
};

let fragment_manager_ref = self.fragment_manager.clone();

revert_funcs.push(Box::pin(async move {
fragment_manager_ref
.cancel_apply_reschedules(applied_reschedules)
.await;
}));

let _source_pause_guard = self.source_manager.paused.lock().await;

self.barrier_scheduler
.run_config_change_command_with_pause(command)
.await?;

Ok(())
}

async fn trigger_scale_out(&self, workers: Vec<WorkerId>) -> MetaResult<()> {
let _reschedule_job_lock = self.reschedule_lock.write().await;

let fragment_worker_changes = {
let guard = self.fragment_manager.get_fragment_read_guard().await;
let mut fragment_worker_changes = HashMap::new();
for table_fragments in guard.table_fragments().values() {
for fragment_id in table_fragments.fragment_ids() {
fragment_worker_changes.insert(
fragment_id,
PbWorkerChanges {
include_worker_ids: workers.clone(),
..Default::default()
},
);
}
}
fragment_worker_changes
};

let reschedules = self
.scale_controller
.generate_stable_resize_plan(
StableResizePolicy {
fragment_worker_changes,
},
None,
)
.await?;

if reschedules.is_empty() {
return Ok(());
}

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
)
.await?;

Ok(())
}

async fn run(&self, mut shutdown_rx: Receiver<()>) {
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();

self.env
.notification_manager()
.insert_local_sender(local_notification_tx)
.await;

let check_period = Duration::from_secs(10);
let mut ticker = tokio::time::interval(check_period);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
ticker.reset();

let worker_nodes = self
.cluster_manager
.list_active_streaming_compute_nodes()
.await;

let mut worker_cache: BTreeMap<_, _> = worker_nodes
.into_iter()
.map(|worker| (worker.id, worker))
.collect();

let mut changed = true;

loop {
tokio::select! {
biased;

_ = &mut shutdown_rx => {
tracing::info!("Stream manager is stopped");
break;
}

_ = ticker.tick(), if changed => {
let include_workers = worker_cache.keys().copied().collect_vec();

if include_workers.is_empty() {
tracing::debug!("no available worker nodes");
changed = false;
continue;
}

match self.trigger_scale_out(include_workers).await {
Ok(_) => {
worker_cache.clear();
changed = false;
}
Err(e) => {
tracing::warn!(error = e.to_string(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
ticker.reset();
}
}
}

notification = local_notification_rx.recv() => {
let notification = notification.expect("local notification channel closed in loop of stream manager");

match notification {
LocalNotification::WorkerNodeActivated(worker) => {
let prev_worker = worker_cache.insert(worker.id, worker.clone());

if let Some(prev_worker) = prev_worker && prev_worker.parallel_units != worker.parallel_units {
tracing::info!(worker = worker.id, "worker parallelism changed");
}

changed = true;
}

// Since our logic for handling passive scale-in is within the barrier manager,
// there’s not much we can do here. All we can do is proactively remove the entries from our cache.
LocalNotification::WorkerNodeDeleted(worker) => {
match worker_cache.remove(&worker.id) {
Some(prev_worker) => {
tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
}

None => {
tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
}
}
}

_ => {}
}
}
}
}
}

pub fn start_auto_parallelism_monitor(
self: Arc<Self>,
) -> (JoinHandle<()>, oneshot::Sender<()>) {
tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
self.run(shutdown_rx).await;
});

(join_handle, shutdown_tx)
}
}
Loading

0 comments on commit ac2a842

Please sign in to comment.