Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: execute auto-scaling in batches (#15420) #15562

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,18 @@ pub struct MetaConfig {
#[serde(default)]
pub disable_automatic_parallelism_control: bool,

/// The number of streaming jobs per scaling operation.
#[serde(default = "default::meta::parallelism_control_batch_size")]
pub parallelism_control_batch_size: usize,

/// The period of parallelism control trigger.
#[serde(default = "default::meta::parallelism_control_trigger_period_sec")]
pub parallelism_control_trigger_period_sec: u64,

/// The first delay of parallelism control.
#[serde(default = "default::meta::parallelism_control_trigger_first_delay_sec")]
pub parallelism_control_trigger_first_delay_sec: u64,

#[serde(default = "default::meta::meta_leader_lease_secs")]
pub meta_leader_lease_secs: u64,

Expand Down Expand Up @@ -1121,6 +1133,18 @@ pub mod default {
pub fn event_log_channel_max_size() -> u32 {
10
}

pub fn parallelism_control_batch_size() -> usize {
10
}

pub fn parallelism_control_trigger_period_sec() -> u64 {
10
}

pub fn parallelism_control_trigger_first_delay_sec() -> u64 {
30
}
}

pub mod server {
Expand Down
3 changes: 3 additions & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ This page is automatically generated by `./risedev generate-example-config`
| min_table_split_write_throughput | If the size of one table is smaller than `min_table_split_write_throughput`, we would not split it to an single group. | 4194304 |
| move_table_size_limit | | 10737418240 |
| node_num_monitor_interval_sec | | 10 |
| parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 |
| parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 |
| parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 |
| partition_vnode_count | | 16 |
| periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 |
| periodic_space_reclaim_compaction_interval_sec | Schedule space_reclaim compaction for all compaction groups with this interval. | 3600 |
Expand Down
3 changes: 3 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ min_delta_log_num_for_hummock_version_checkpoint = 10
max_heartbeat_interval_secs = 300
disable_recovery = false
disable_automatic_parallelism_control = false
parallelism_control_batch_size = 10
parallelism_control_trigger_period_sec = 10
parallelism_control_trigger_first_delay_sec = 30
meta_leader_lease_secs = 30
default_parallelism = "Full"
enable_compaction_deterministic = false
Expand Down
7 changes: 7 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
disable_automatic_parallelism_control: config
.meta
.disable_automatic_parallelism_control,
parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
parallelism_control_trigger_period_sec: config
.meta
.parallelism_control_trigger_period_sec,
parallelism_control_trigger_first_delay_sec: config
.meta
.parallelism_control_trigger_first_delay_sec,
in_flight_barrier_nums,
max_idle_ms,
compaction_deterministic_test: config.meta.enable_compaction_deterministic,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ impl CommandContext {
actor_splits,
actor_new_dispatchers,
});
tracing::debug!("update mutation: {mutation:#?}");
tracing::debug!("update mutation: {mutation:?}");
Some(mutation)
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ impl GlobalBarrierManagerContext {
return Err(e);
}

debug!("scaling-in actors succeed.");
debug!("scaling actors succeed.");
Ok(())
}

Expand Down Expand Up @@ -854,7 +854,7 @@ impl GlobalBarrierManagerContext {
return Err(e);
}

debug!("scaling-in actors succeed.");
debug!("scaling actors succeed.");
Ok(())
}

Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ pub struct MetaOpts {
pub enable_recovery: bool,
/// Whether to disable the auto-scaling feature.
pub disable_automatic_parallelism_control: bool,
/// The number of streaming jobs per scaling operation.
pub parallelism_control_batch_size: usize,
/// The period of parallelism control trigger.
pub parallelism_control_trigger_period_sec: u64,
/// The first delay of parallelism control.
pub parallelism_control_trigger_first_delay_sec: u64,
/// The maximum number of barriers in-flight in the compute nodes.
pub in_flight_barrier_nums: usize,
/// After specified seconds of idle (no mview or flush), the process will be exited.
Expand Down Expand Up @@ -221,6 +227,9 @@ impl MetaOpts {
Self {
enable_recovery,
disable_automatic_parallelism_control: false,
parallelism_control_batch_size: 1,
parallelism_control_trigger_period_sec: 10,
parallelism_control_trigger_first_delay_sec: 30,
in_flight_barrier_nums: 40,
max_idle_ms: 0,
compaction_deterministic_test: false,
Expand Down
166 changes: 108 additions & 58 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, ParallelUnitId, VirtualNode};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::{ActorInfo, Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode};
use risingwave_pb::common::{
ActorInfo, Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode, WorkerType,
};
use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
Expand All @@ -48,7 +50,7 @@ use thiserror_ext::AsReport;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use tokio::time::{Instant, MissedTickBehavior};

use crate::barrier::{Command, Reschedule, StreamRpcManager};
use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId};
Expand Down Expand Up @@ -2676,12 +2678,14 @@ impl GlobalStreamManager {
Ok(())
}

async fn trigger_parallelism_control(&self) -> MetaResult<()> {
async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
tracing::info!("trigger parallelism control");

let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

match &self.metadata_manager {
let (schedulable_worker_ids, table_parallelisms) = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let table_parallelisms = {
let table_parallelisms: HashMap<u32, TableParallelism> = {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

guard
Expand All @@ -2697,7 +2701,7 @@ impl GlobalStreamManager {
.list_active_streaming_compute_nodes()
.await;

let schedulable_worker_ids = workers
let schedulable_worker_ids: BTreeSet<_> = workers
.iter()
.filter(|worker| {
!worker
Expand All @@ -2709,26 +2713,7 @@ impl GlobalStreamManager {
.map(|worker| worker.id)
.collect();

let reschedules = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms,
})
.await?;

if reschedules.is_empty() {
return Ok(());
}

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
)
.await?;
(schedulable_worker_ids, table_parallelisms)
}
MetadataManager::V2(mgr) => {
let table_parallelisms: HashMap<_, _> = {
Expand Down Expand Up @@ -2768,33 +2753,90 @@ impl GlobalStreamManager {
.map(|worker| worker.id)
.collect();

let reschedules = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms: table_parallelisms.clone(),
})
.await?;
(schedulable_worker_ids, table_parallelisms)
}
};

if reschedules.is_empty() {
return Ok(());
}
if table_parallelisms.is_empty() {
tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
return Ok(false);
}

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
)
let batch_size = match self.env.opts.parallelism_control_batch_size {
0 => table_parallelisms.len(),
n => n,
};

tracing::info!(
"total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
table_parallelisms.len(),
batch_size,
schedulable_worker_ids
);

let batches: Vec<_> = table_parallelisms
.into_iter()
.chunks(batch_size)
.into_iter()
.map(|chunk| chunk.collect_vec())
.collect();

let mut reschedules = None;

for batch in batches {
let parallelisms: HashMap<_, _> = batch.into_iter().collect();

let plan = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids.clone(),
table_parallelisms: parallelisms.clone(),
})
.await?;

if !plan.is_empty() {
tracing::info!(
"reschedule plan generated for streaming jobs {:?}",
parallelisms
);
reschedules = Some(plan);
break;
}
}

Ok(())
let Some(reschedules) = reschedules else {
tracing::info!("no reschedule plan generated");
return Ok(false);
};

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: false,
},
None,
)
.await?;

Ok(true)
}

async fn run(&self, mut shutdown_rx: Receiver<()>) {
tracing::info!("starting automatic parallelism control monitor");

let check_period =
Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);

let mut ticker = tokio::time::interval_at(
Instant::now()
+ Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
check_period,
);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);

// waiting for first tick
ticker.tick().await;

let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();

Expand All @@ -2803,11 +2845,6 @@ impl GlobalStreamManager {
.insert_local_sender(local_notification_tx)
.await;

let check_period = Duration::from_secs(10);
let mut ticker = tokio::time::interval(check_period);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
ticker.reset();

let worker_nodes = self
.metadata_manager
.list_active_streaming_compute_nodes()
Expand All @@ -2819,7 +2856,7 @@ impl GlobalStreamManager {
.map(|worker| (worker.id, worker))
.collect();

let mut changed = true;
let mut should_trigger = false;

loop {
tokio::select! {
Expand All @@ -2830,18 +2867,18 @@ impl GlobalStreamManager {
break;
}

_ = ticker.tick(), if changed => {
_ = ticker.tick(), if should_trigger => {
let include_workers = worker_cache.keys().copied().collect_vec();

if include_workers.is_empty() {
tracing::debug!("no available worker nodes");
changed = false;
should_trigger = false;
continue;
}

match self.trigger_parallelism_control().await {
Ok(_) => {
changed = false;
Ok(cont) => {
should_trigger = cont;
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
Expand All @@ -2855,13 +2892,26 @@ impl GlobalStreamManager {

match notification {
LocalNotification::WorkerNodeActivated(worker) => {
match (worker.get_type(), worker.property.as_ref()) {
(Ok(WorkerType::ComputeNode), Some(prop)) if prop.is_streaming => {
tracing::info!("worker {} activated notification received", worker.id);
}
_ => continue
}

let prev_worker = worker_cache.insert(worker.id, worker.clone());

if let Some(prev_worker) = prev_worker && prev_worker.parallel_units != worker.parallel_units {
tracing::info!(worker = worker.id, "worker parallelism changed");
match prev_worker {
Some(prev_worker) if prev_worker.parallel_units != worker.parallel_units => {
tracing::info!(worker = worker.id, "worker parallelism changed");
should_trigger = true;
}
None => {
tracing::info!(worker = worker.id, "new worker joined");
should_trigger = true;
}
_ => {}
}

changed = true;
}

// Since our logic for handling passive scale-in is within the barrier manager,
Expand Down
3 changes: 3 additions & 0 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ impl Configuration {
r#"[meta]
max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
parallelism_control_trigger_first_delay_sec = 0
parallelism_control_batch_size = 0
parallelism_control_trigger_period_sec = 10

[system]
barrier_interval_ms = 250
Expand Down
Loading