Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge config for auto scale & enable auto scale #14873

Merged
merged 10 commits into from
Feb 1, 2024
8 changes: 2 additions & 6 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,9 @@ pub struct MetaConfig {
#[serde(default)]
pub disable_recovery: bool,

/// Whether to enable scale-in when recovery.
/// Whether to disable adaptive-scaling feature.
#[serde(default)]
pub enable_scale_in_when_recovery: bool,

/// Whether to enable auto-scaling feature.
#[serde(default)]
pub enable_automatic_parallelism_control: bool,
pub disable_automatic_parallelism_control: bool,

#[serde(default = "default::meta::meta_leader_lease_secs")]
pub meta_leader_lease_secs: u64,
Expand Down
3 changes: 1 addition & 2 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ enable_hummock_data_archive = false
min_delta_log_num_for_hummock_version_checkpoint = 10
max_heartbeat_interval_secs = 300
disable_recovery = false
enable_scale_in_when_recovery = false
enable_automatic_parallelism_control = false
disable_automatic_parallelism_control = false
meta_leader_lease_secs = 30
default_parallelism = "Full"
enable_compaction_deterministic = false
Expand Down
5 changes: 2 additions & 3 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,9 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
config.meta.meta_leader_lease_secs,
MetaOpts {
enable_recovery: !config.meta.disable_recovery,
enable_scale_in_when_recovery: config.meta.enable_scale_in_when_recovery,
enable_automatic_parallelism_control: config
disable_automatic_parallelism_control: config
.meta
.enable_automatic_parallelism_control,
.disable_automatic_parallelism_control,
in_flight_barrier_nums,
max_idle_ms,
compaction_deterministic_test: config.meta.enable_compaction_deterministic,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ pub async fn start_service_as_election_leader(
sub_tasks.push(task);
sub_tasks.push(GlobalBarrierManager::start(barrier_manager));

if env.opts.enable_automatic_parallelism_control {
if !env.opts.disable_automatic_parallelism_control {
sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
}
}
Expand Down
11 changes: 10 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ impl GlobalBarrierManagerContext {

// Resolve actor info for recovery. If there's no actor to recover, most of the
// following steps will be no-op, while the compute nodes will still be reset.
let mut info = if self.env.opts.enable_scale_in_when_recovery {
let mut info = if !self.env.opts.disable_automatic_parallelism_control {
let info = self.resolve_actor_info().await;
let scaled = self.scale_actors(&info).await.inspect_err(|err| {
warn!(error = %err.as_report(), "scale actors failed");
Expand Down Expand Up @@ -704,12 +704,21 @@ impl GlobalBarrierManagerContext {
let mgr = self.metadata_manager.as_v1_ref();
debug!("start resetting actors distribution");

if info.actor_location_map.is_empty() {
debug!("empty cluster, skipping");
return Ok(true);
}

let current_parallelism = info
.node_map
.values()
.flat_map(|worker_node| worker_node.parallel_units.iter())
.count();

if current_parallelism == 0 {
return Err(anyhow!("no available parallel units for auto scaling").into());
}

/// We infer the new parallelism strategy based on the prior level of parallelism of the table.
/// If the parallelism strategy is Fixed or Auto, we won't make any modifications.
/// For Custom, we'll assess the parallelism of the core fragment;
Expand Down
36 changes: 31 additions & 5 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl ClusterManager {
let mut core = self.core.write().await;

if let Some(worker) = core.get_worker_by_host_mut(host_address.clone()) {
tracing::info!("worker {} re-joined the cluster", worker.worker_id());
worker.update_resource(Some(resource));
worker.update_started_at(timestamp_now_sec());
if let Some(property) = &mut property {
Expand Down Expand Up @@ -149,7 +150,7 @@ impl ClusterManager {
new_worker.worker_node.parallel_units.extend(parallel_units);
}
Ordering::Greater => {
if self.env.opts.enable_scale_in_when_recovery {
if !self.env.opts.disable_automatic_parallelism_control {
// Handing over to the subsequent recovery loop for a forced reschedule.
tracing::info!(
"worker {} parallelism reduced from {} to {}",
Expand Down Expand Up @@ -233,12 +234,25 @@ impl ClusterManager {
worker.insert(self.env.meta_store_checked()).await?;
// Update core.
core.add_worker_node(worker);

tracing::info!(
"new worker {} from {}:{} joined cluster",
worker_id,
host_address.get_host(),
host_address.get_port()
);

Ok(worker_node)
}

pub async fn activate_worker_node(&self, host_address: HostAddress) -> MetaResult<()> {
let mut core = self.core.write().await;
let mut worker = core.get_worker_by_host_checked(host_address.clone())?;

let worker_id = worker.worker_id();

tracing::info!("worker {} activating", worker_id);

if worker.worker_node.state != State::Running as i32 {
worker.worker_node.state = State::Running as i32;
worker.insert(self.env.meta_store_checked()).await?;
Expand All @@ -258,6 +272,8 @@ impl ClusterManager {
.notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker.worker_node))
.await;

tracing::info!("worker {} activated", worker_id);

Ok(())
}

Expand Down Expand Up @@ -790,8 +806,11 @@ mod tests {
async fn test_cluster_manager() -> MetaResult<()> {
let env = MetaSrvEnv::for_test().await;

let cluster_manager =
Arc::new(ClusterManager::new(env, Duration::new(0, 0)).await.unwrap());
let cluster_manager = Arc::new(
ClusterManager::new(env.clone(), Duration::new(0, 0))
.await
.unwrap(),
);

let mut worker_nodes = Vec::new();
let worker_count = 5usize;
Expand Down Expand Up @@ -878,8 +897,15 @@ mod tests {
)
.await
.unwrap();
assert_eq!(worker_node.parallel_units.len(), fake_parallelism + 4);
assert_cluster_manager(&cluster_manager, parallel_count + 4).await;

if !env.opts.disable_automatic_parallelism_control {
assert_eq!(worker_node.parallel_units.len(), fake_parallelism - 2);
assert_cluster_manager(&cluster_manager, parallel_count - 2).await;
} else {
// compatibility mode
assert_eq!(worker_node.parallel_units.len(), fake_parallelism + 4);
assert_cluster_manager(&cluster_manager, parallel_count + 4).await;
}

let worker_to_delete_count = 4usize;
for i in 0..worker_to_delete_count {
Expand Down
10 changes: 3 additions & 7 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,8 @@ pub struct MetaOpts {
/// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on
/// abnormal cases.
pub enable_recovery: bool,
/// Whether to enable the scale-in feature when compute-node is removed.
pub enable_scale_in_when_recovery: bool,
/// Whether to enable the auto-scaling feature when compute-node is joined.
/// The semantics of this configuration will be expanded in the future to control the automatic scaling of the entire cluster.
pub enable_automatic_parallelism_control: bool,
/// Whether to disable the auto-scaling feature.
pub disable_automatic_parallelism_control: bool,
/// The maximum number of barriers in-flight in the compute nodes.
pub in_flight_barrier_nums: usize,
/// After specified seconds of idle (no mview or flush), the process will be exited.
Expand Down Expand Up @@ -223,8 +220,7 @@ impl MetaOpts {
pub fn test(enable_recovery: bool) -> Self {
Self {
enable_recovery,
enable_scale_in_when_recovery: false,
enable_automatic_parallelism_control: false,
disable_automatic_parallelism_control: false,
in_flight_barrier_nums: 40,
max_idle_ms: 0,
compaction_deterministic_test: false,
Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ pub fn rebalance_actor_vnode(
actors_to_remove: &BTreeSet<ActorId>,
actors_to_create: &BTreeSet<ActorId>,
) -> HashMap<ActorId, Bitmap> {
let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();

assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);

assert!(actors.len() >= actors_to_remove.len());

let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
Expand Down
16 changes: 14 additions & 2 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,9 @@ mod tests {
let actor_locations = fragments
.values()
.flat_map(|f| &f.actors)
.map(|a| (a.actor_id, parallel_units[&0].clone()))
.sorted_by(|a, b| a.actor_id.cmp(&b.actor_id))
.enumerate()
.map(|(idx, a)| (a.actor_id, parallel_units[&(idx as u32)].clone()))
.collect();

Locations {
Expand Down Expand Up @@ -1182,6 +1184,14 @@ mod tests {
let table_id = TableId::new(0);
let actors = make_mview_stream_actors(&table_id, 4);

let StreamingClusterInfo { parallel_units, .. } = services
.global_stream_manager
.metadata_manager
.get_streaming_cluster_info()
.await?;

let parallel_unit_ids = parallel_units.keys().cloned().sorted().collect_vec();

let mut fragments = BTreeMap::default();
fragments.insert(
0,
Expand All @@ -1191,7 +1201,9 @@ mod tests {
distribution_type: FragmentDistributionType::Hash as i32,
actors: actors.clone(),
state_table_ids: vec![0],
vnode_mapping: Some(ParallelUnitMapping::new_single(0).to_protobuf()),
vnode_mapping: Some(
ParallelUnitMapping::new_uniform(parallel_unit_ids.into_iter()).to_protobuf(),
),
..Default::default()
},
);
Expand Down
6 changes: 3 additions & 3 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,18 @@ impl Configuration {

pub fn for_auto_parallelism(
max_heartbeat_interval_secs: u64,
enable_auto_scale_in: bool,
enable_auto_parallelism: bool,
) -> Self {
let disable_automatic_parallelism_control = !enable_auto_parallelism;

let config_path = {
let mut file =
tempfile::NamedTempFile::new().expect("failed to create temp config file");

let config_data = format!(
r#"[meta]
max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
enable_scale_in_when_recovery = {enable_auto_scale_in}
enable_automatic_parallelism_control = {enable_auto_parallelism}
disable_automatic_parallelism_control = {disable_automatic_parallelism_control}

[system]
barrier_interval_ms = 250
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ async fn test_passive_online_and_offline() -> Result<()> {
let config = Configuration::for_auto_parallelism(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE,
true,
true,
);
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();
Expand Down Expand Up @@ -215,7 +214,6 @@ async fn test_passive_online_and_offline() -> Result<()> {
async fn test_active_online() -> Result<()> {
let config = Configuration::for_auto_parallelism(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE,
false,
true,
);
let mut cluster = Cluster::start(config.clone()).await?;
Expand Down Expand Up @@ -302,7 +300,6 @@ async fn test_auto_parallelism_control_with_fixed_and_auto_helper(
) -> Result<()> {
let config = Configuration::for_auto_parallelism(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE,
true,
enable_auto_parallelism_control,
);
let mut cluster = Cluster::start(config.clone()).await?;
Expand Down Expand Up @@ -490,7 +487,6 @@ async fn test_auto_parallelism_control_with_fixed_and_auto_helper(
async fn test_compatibility_with_low_level() -> Result<()> {
let config = Configuration::for_auto_parallelism(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE,
false,
true,
);
let mut cluster = Cluster::start(config.clone()).await?;
Expand Down Expand Up @@ -632,7 +628,6 @@ async fn test_compatibility_with_low_level() -> Result<()> {
async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<()> {
let config = Configuration::for_auto_parallelism(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE,
false,
true,
);
let mut cluster = Cluster::start(config.clone()).await?;
Expand Down
Loading