Skip to content

Commit

Permalink
Refactor actor scaling; cleanup; enhance GlobalStreamManager.
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Feb 1, 2024
1 parent 8481ea7 commit 5c2e5f7
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 33 deletions.
28 changes: 13 additions & 15 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,11 @@ impl GlobalBarrierManagerContext {
// Resolve actor info for recovery. If there's no actor to recover, most of the
// following steps will be no-op, while the compute nodes will still be reset.
let mut info = if !self.env.opts.disable_automatic_parallelism_control {
let info = self.resolve_actor_info().await;
let scaled = self.scale_actors(&info).await.inspect_err(|err| {
self.scale_actors().await.inspect_err(|err| {
warn!(error = %err.as_report(), "scale actors failed");
})?;
if scaled {
self.resolve_actor_info().await
} else {
info
}

self.resolve_actor_info().await
} else {
// Migrate actors in expired CN to newly joined one.
self.migrate_actors().await.inspect_err(|err| {
Expand Down Expand Up @@ -596,14 +592,14 @@ impl GlobalBarrierManagerContext {
Ok(info)
}

async fn scale_actors(&self, info: &InflightActorInfo) -> MetaResult<bool> {
async fn scale_actors(&self) -> MetaResult<()> {
match &self.metadata_manager {
MetadataManager::V1(_) => self.scale_actors_v1(info).await,
MetadataManager::V2(_) => self.scale_actors_v2(info).await,
MetadataManager::V1(_) => self.scale_actors_v1().await,
MetadataManager::V2(_) => self.scale_actors_v2().await,
}
}

async fn scale_actors_v2(&self, _info: &InflightActorInfo) -> MetaResult<bool> {
async fn scale_actors_v2(&self) -> MetaResult<()> {
let mgr = self.metadata_manager.as_v2_ref();
debug!("start resetting actors distribution");

Expand Down Expand Up @@ -690,16 +686,18 @@ impl GlobalBarrierManagerContext {
}

debug!("scaling-in actors succeed.");
Ok(true)
Ok(())
}

async fn scale_actors_v1(&self, info: &InflightActorInfo) -> MetaResult<bool> {
async fn scale_actors_v1(&self) -> MetaResult<()> {
let info = self.resolve_actor_info().await;

let mgr = self.metadata_manager.as_v1_ref();
debug!("start resetting actors distribution");

if info.actor_location_map.is_empty() {
debug!("empty cluster, skipping");
return Ok(true);
return Ok(());
}

let current_parallelism = info
Expand Down Expand Up @@ -821,7 +819,7 @@ impl GlobalBarrierManagerContext {
}

debug!("scaling-in actors succeed.");
Ok(true)
Ok(())
}

/// This function will generate a migration plan, which includes the mapping for all expired and
Expand Down
40 changes: 30 additions & 10 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ impl ClusterController {
let meta_store = env
.sql_meta_store()
.expect("sql meta store is not initialized");
let inner = ClusterControllerInner::new(meta_store.conn).await?;
let inner = ClusterControllerInner::new(
meta_store.conn,
env.opts.disable_automatic_parallelism_control,
)
.await?;
Ok(Self {
env,
max_heartbeat_interval,
Expand Down Expand Up @@ -460,13 +464,17 @@ pub struct ClusterControllerInner {
/// Record for tracking available machine ids, one is available.
available_transactional_ids: VecDeque<TransactionId>,
worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
disable_automatic_parallelism_control: bool,
}

impl ClusterControllerInner {
pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;

pub async fn new(db: DatabaseConnection) -> MetaResult<Self> {
pub async fn new(
db: DatabaseConnection,
disable_automatic_parallelism_control: bool,
) -> MetaResult<Self> {
let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
.select_only()
.column(worker::Column::WorkerId)
Expand All @@ -492,6 +500,7 @@ impl ClusterControllerInner {
db,
available_transactional_ids,
worker_extra_info,
disable_automatic_parallelism_control,
})
}

Expand Down Expand Up @@ -598,14 +607,25 @@ impl ClusterControllerInner {

match new_parallelism.cmp(&current_parallelism.len()) {
Ordering::Less => {
// Warn and keep the original parallelism if the worker registered with a
// smaller parallelism.
tracing::warn!(
"worker {} parallelism is less than current, current is {}, but received {}",
worker.worker_id,
current_parallelism.len(),
new_parallelism
);
if !self.disable_automatic_parallelism_control {
// Handing over to the subsequent recovery loop for a forced reschedule.
tracing::info!(
"worker {} parallelism reduced from {} to {}",
worker.worker_id,
current_parallelism.len(),
new_parallelism
);
current_parallelism.truncate(new_parallelism);
} else {
// Warn and keep the original parallelism if the worker registered with a
// smaller parallelism.
tracing::warn!(
"worker {} parallelism is less than current, current is {}, but received {}",
worker.worker_id,
current_parallelism.len(),
new_parallelism
);
}
}
Ordering::Greater => {
tracing::info!(
Expand Down
15 changes: 12 additions & 3 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink,
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId,
StreamNode, TableId, VnodeBitmap, WorkerId,
StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::meta::subscribe_response::{
Expand Down Expand Up @@ -54,6 +54,7 @@ use crate::controller::utils::{
PartialActorLocation, PartialFragmentStateTables,
};
use crate::manager::{ActorInfos, LocalNotification};
use crate::model::TableParallelism;
use crate::stream::SplitAssignment;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -311,6 +312,7 @@ impl CatalogController {
HashMap<ActorId, Vec<actor_dispatcher::Model>>,
)>,
parallel_units_map: &HashMap<u32, PbParallelUnit>,
parallelism: StreamingParallelism,
) -> MetaResult<PbTableFragments> {
let mut pb_fragments = HashMap::new();
let mut pb_actor_splits = HashMap::new();
Expand All @@ -333,8 +335,13 @@ impl CatalogController {
actor_status: pb_actor_status,
actor_splits: pb_actor_splits,
ctx: Some(ctx.unwrap_or_default()),
// TODO(peng): fix this for model v2
parallelism: None,
parallelism: Some(
match parallelism {
StreamingParallelism::Adaptive => TableParallelism::Adaptive,
StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
}
.into(),
),
};

Ok(table_fragments)
Expand Down Expand Up @@ -631,6 +638,7 @@ impl CatalogController {
job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
&parallel_units_map,
job_info.parallelism.clone(),
)
}

Expand Down Expand Up @@ -733,6 +741,7 @@ impl CatalogController {
job.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
&parallel_units_map,
job.parallelism.clone(),
)?,
);
}
Expand Down
65 changes: 60 additions & 5 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, ParallelUnitId, VirtualNode};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::{ActorInfo, ParallelUnit, WorkerNode};
use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
Expand All @@ -44,6 +45,7 @@ use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use tracing::debug;

use crate::barrier::{Command, Reschedule, StreamRpcManager};
use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId};
Expand Down Expand Up @@ -1365,9 +1367,6 @@ impl ScaleController {
}

for created_parallel_unit_id in added_parallel_units {
// self.env.sql_id_gen_manager_ref().map(|id_gen| id_gen.actors.generate_interval(1))
//

let id = match self.env.sql_id_gen_manager_ref() {
None => {
self.env
Expand Down Expand Up @@ -2488,8 +2487,64 @@ impl GlobalStreamManager {
)
.await?;
}
MetadataManager::V2(_mgr) => {
todo!()
MetadataManager::V2(mgr) => {
let table_parallelisms: HashMap<_, _> = {
let streaming_parallelisms = mgr
.catalog_controller
.get_all_streaming_parallelisms()
.await?;

streaming_parallelisms
.into_iter()
.map(|(table_id, parallelism)| {
// no custom for sql backend
let table_parallelism = match parallelism {
StreamingParallelism::Adaptive => TableParallelism::Adaptive,
StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
};

(table_id as u32, table_parallelism)
})
.collect()
};

let workers = mgr
.cluster_controller
.list_active_streaming_workers()
.await?;

let schedulable_worker_ids = workers
.iter()
.filter(|worker| {
!worker
.property
.as_ref()
.map(|p| p.is_unschedulable)
.unwrap_or(false)
})
.map(|worker| worker.id)
.collect();

let reschedules = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms: table_parallelisms.clone(),
})
.await?;

if reschedules.is_empty() {
return Ok(());
}

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
)
.await?;
}
}

Expand Down

0 comments on commit 5c2e5f7

Please sign in to comment.