Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix todo & corner case for scaling in sql backend #14935

Merged
merged 2 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,11 @@ impl GlobalBarrierManagerContext {
// Resolve actor info for recovery. If there's no actor to recover, most of the
// following steps will be no-op, while the compute nodes will still be reset.
let mut info = if !self.env.opts.disable_automatic_parallelism_control {
let info = self.resolve_actor_info().await;
let scaled = self.scale_actors(&info).await.inspect_err(|err| {
self.scale_actors().await.inspect_err(|err| {
warn!(error = %err.as_report(), "scale actors failed");
})?;
if scaled {
self.resolve_actor_info().await
} else {
info
}

self.resolve_actor_info().await
} else {
// Migrate actors in expired CN to newly joined one.
self.migrate_actors().await.inspect_err(|err| {
Expand Down Expand Up @@ -596,14 +592,14 @@ impl GlobalBarrierManagerContext {
Ok(info)
}

async fn scale_actors(&self, info: &InflightActorInfo) -> MetaResult<bool> {
async fn scale_actors(&self) -> MetaResult<()> {
match &self.metadata_manager {
MetadataManager::V1(_) => self.scale_actors_v1(info).await,
MetadataManager::V2(_) => self.scale_actors_v2(info).await,
MetadataManager::V1(_) => self.scale_actors_v1().await,
MetadataManager::V2(_) => self.scale_actors_v2().await,
}
}

async fn scale_actors_v2(&self, _info: &InflightActorInfo) -> MetaResult<bool> {
async fn scale_actors_v2(&self) -> MetaResult<()> {
let mgr = self.metadata_manager.as_v2_ref();
debug!("start resetting actors distribution");

Expand Down Expand Up @@ -690,16 +686,18 @@ impl GlobalBarrierManagerContext {
}

debug!("scaling-in actors succeed.");
Ok(true)
Ok(())
}

async fn scale_actors_v1(&self, info: &InflightActorInfo) -> MetaResult<bool> {
async fn scale_actors_v1(&self) -> MetaResult<()> {
let info = self.resolve_actor_info().await;

let mgr = self.metadata_manager.as_v1_ref();
debug!("start resetting actors distribution");

if info.actor_location_map.is_empty() {
debug!("empty cluster, skipping");
return Ok(true);
return Ok(());
}

let current_parallelism = info
Expand Down Expand Up @@ -821,7 +819,7 @@ impl GlobalBarrierManagerContext {
}

debug!("scaling-in actors succeed.");
Ok(true)
Ok(())
}

/// This function will generate a migration plan, which includes the mapping for all expired and
Expand Down
40 changes: 30 additions & 10 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ impl ClusterController {
let meta_store = env
.sql_meta_store()
.expect("sql meta store is not initialized");
let inner = ClusterControllerInner::new(meta_store.conn).await?;
let inner = ClusterControllerInner::new(
meta_store.conn,
env.opts.disable_automatic_parallelism_control,
)
.await?;
Ok(Self {
env,
max_heartbeat_interval,
Expand Down Expand Up @@ -460,13 +464,17 @@ pub struct ClusterControllerInner {
/// Record for tracking available machine ids, one is available.
available_transactional_ids: VecDeque<TransactionId>,
worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
disable_automatic_parallelism_control: bool,
}

impl ClusterControllerInner {
pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;

pub async fn new(db: DatabaseConnection) -> MetaResult<Self> {
pub async fn new(
db: DatabaseConnection,
disable_automatic_parallelism_control: bool,
) -> MetaResult<Self> {
let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
.select_only()
.column(worker::Column::WorkerId)
Expand All @@ -492,6 +500,7 @@ impl ClusterControllerInner {
db,
available_transactional_ids,
worker_extra_info,
disable_automatic_parallelism_control,
})
}

Expand Down Expand Up @@ -598,14 +607,25 @@ impl ClusterControllerInner {

match new_parallelism.cmp(&current_parallelism.len()) {
Ordering::Less => {
// Warn and keep the original parallelism if the worker registered with a
// smaller parallelism.
tracing::warn!(
"worker {} parallelism is less than current, current is {}, but received {}",
worker.worker_id,
current_parallelism.len(),
new_parallelism
);
if !self.disable_automatic_parallelism_control {
// Handing over to the subsequent recovery loop for a forced reschedule.
tracing::info!(
"worker {} parallelism reduced from {} to {}",
worker.worker_id,
current_parallelism.len(),
new_parallelism
);
current_parallelism.truncate(new_parallelism);
} else {
// Warn and keep the original parallelism if the worker registered with a
// smaller parallelism.
tracing::warn!(
"worker {} parallelism is less than current, current is {}, but received {}",
worker.worker_id,
current_parallelism.len(),
new_parallelism
);
}
}
Ordering::Greater => {
tracing::info!(
Expand Down
15 changes: 12 additions & 3 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink,
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId,
StreamNode, TableId, VnodeBitmap, WorkerId,
StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::meta::subscribe_response::{
Expand Down Expand Up @@ -54,6 +54,7 @@ use crate::controller::utils::{
PartialActorLocation, PartialFragmentStateTables,
};
use crate::manager::{ActorInfos, LocalNotification};
use crate::model::TableParallelism;
use crate::stream::SplitAssignment;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -311,6 +312,7 @@ impl CatalogController {
HashMap<ActorId, Vec<actor_dispatcher::Model>>,
)>,
parallel_units_map: &HashMap<u32, PbParallelUnit>,
parallelism: StreamingParallelism,
) -> MetaResult<PbTableFragments> {
let mut pb_fragments = HashMap::new();
let mut pb_actor_splits = HashMap::new();
Expand All @@ -333,8 +335,13 @@ impl CatalogController {
actor_status: pb_actor_status,
actor_splits: pb_actor_splits,
ctx: Some(ctx.unwrap_or_default()),
// TODO(peng): fix this for model v2
parallelism: None,
parallelism: Some(
match parallelism {
StreamingParallelism::Adaptive => TableParallelism::Adaptive,
StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
}
.into(),
),
};

Ok(table_fragments)
Expand Down Expand Up @@ -631,6 +638,7 @@ impl CatalogController {
job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
&parallel_units_map,
job_info.parallelism.clone(),
)
}

Expand Down Expand Up @@ -733,6 +741,7 @@ impl CatalogController {
job.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
&parallel_units_map,
job.parallelism.clone(),
)?,
);
}
Expand Down
64 changes: 59 additions & 5 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, ParallelUnitId, VirtualNode};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::{ActorInfo, ParallelUnit, WorkerNode};
use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
Expand Down Expand Up @@ -1365,9 +1366,6 @@ impl ScaleController {
}

for created_parallel_unit_id in added_parallel_units {
// self.env.sql_id_gen_manager_ref().map(|id_gen| id_gen.actors.generate_interval(1))
//

let id = match self.env.sql_id_gen_manager_ref() {
None => {
self.env
Expand Down Expand Up @@ -2488,8 +2486,64 @@ impl GlobalStreamManager {
)
.await?;
}
MetadataManager::V2(_mgr) => {
todo!()
MetadataManager::V2(mgr) => {
let table_parallelisms: HashMap<_, _> = {
let streaming_parallelisms = mgr
.catalog_controller
.get_all_streaming_parallelisms()
.await?;

streaming_parallelisms
.into_iter()
.map(|(table_id, parallelism)| {
// no custom for sql backend
let table_parallelism = match parallelism {
StreamingParallelism::Adaptive => TableParallelism::Adaptive,
StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
};

(table_id as u32, table_parallelism)
})
.collect()
};

let workers = mgr
.cluster_controller
.list_active_streaming_workers()
.await?;

let schedulable_worker_ids = workers
.iter()
.filter(|worker| {
!worker
.property
.as_ref()
.map(|p| p.is_unschedulable)
.unwrap_or(false)
})
.map(|worker| worker.id)
.collect();

let reschedules = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms: table_parallelisms.clone(),
})
.await?;

if reschedules.is_empty() {
return Ok(());
}

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
)
.await?;
}
}

Expand Down
Loading