From b61886933f612daa4a6428606f4b336b3f6f738b Mon Sep 17 00:00:00 2001 From: August Date: Sun, 4 Feb 2024 12:02:23 +0800 Subject: [PATCH] feat(sql-backend): add worker id in actor table to support auto scale in and down in sql backend (#14958) --- .../migration/src/m20230908_072257_init.rs | 2 + src/meta/model_v2/src/actor.rs | 5 +- src/meta/src/barrier/mod.rs | 14 +-- src/meta/src/barrier/recovery.rs | 29 +++--- src/meta/src/controller/fragment.rs | 95 +++++++------------ src/meta/src/controller/streaming_job.rs | 5 +- src/meta/src/controller/utils.rs | 41 +------- 7 files changed, 65 insertions(+), 126 deletions(-) diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index 66e8db40418c3..63250bb46c7e7 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -390,6 +390,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Actor::Status).string().not_null()) .col(ColumnDef::new(Actor::Splits).json()) .col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null()) + .col(ColumnDef::new(Actor::WorkerId).integer().not_null()) .col(ColumnDef::new(Actor::UpstreamActorIds).json()) .col(ColumnDef::new(Actor::VnodeBitmap).json()) .col(ColumnDef::new(Actor::ExprContext).json().not_null()) @@ -966,6 +967,7 @@ enum Actor { Status, Splits, ParallelUnitId, + WorkerId, UpstreamActorIds, VnodeBitmap, ExprContext, diff --git a/src/meta/model_v2/src/actor.rs b/src/meta/model_v2/src/actor.rs index 1bfcf7183c4d8..2ebafd23a51ad 100644 --- a/src/meta/model_v2/src/actor.rs +++ b/src/meta/model_v2/src/actor.rs @@ -15,7 +15,9 @@ use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use sea_orm::entity::prelude::*; -use crate::{ActorId, ActorUpstreamActors, ConnectorSplits, ExprContext, FragmentId, VnodeBitmap}; +use crate::{ + ActorId, ActorUpstreamActors, ConnectorSplits, ExprContext, FragmentId, VnodeBitmap, WorkerId, +}; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] @@ -54,6 +56,7 @@ pub struct Model { pub status: ActorStatus, pub splits: Option, pub parallel_unit_id: i32, + pub worker_id: WorkerId, pub upstream_actor_ids: ActorUpstreamActors, pub vnode_bitmap: Option, pub expr_context: ExprContext, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 8536a611aafaa..9de6df91fed39 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -920,7 +920,7 @@ impl GlobalBarrierManagerContext { /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor /// will create or drop before this barrier flow through them. - async fn resolve_actor_info(&self) -> InflightActorInfo { + async fn resolve_actor_info(&self) -> MetaResult { let info = match &self.metadata_manager { MetadataManager::V1(mgr) => { let all_nodes = mgr @@ -937,21 +937,13 @@ impl GlobalBarrierManagerContext { .list_active_streaming_workers() .await .unwrap(); - let pu_mappings = all_nodes - .iter() - .flat_map(|node| node.parallel_units.iter().map(|pu| (pu.id, pu.clone()))) - .collect(); - let all_actor_infos = mgr - .catalog_controller - .load_all_actors(&pu_mappings) - .await - .unwrap(); + let all_actor_infos = mgr.catalog_controller.load_all_actors().await?; InflightActorInfo::resolve(all_nodes, all_actor_infos) } }; - info + Ok(info) } pub async fn get_ddl_progress(&self) -> Vec { diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 61a35629f6c69..1ad0299c13b22 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -372,7 +372,9 @@ impl GlobalBarrierManagerContext { warn!(error = %err.as_report(), "scale actors failed"); })?; - self.resolve_actor_info().await + self.resolve_actor_info().await.inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })? } else { // Migrate actors in expired CN to newly joined one. self.migrate_actors().await.inspect_err(|err| { @@ -386,7 +388,9 @@ impl GlobalBarrierManagerContext { })?; if self.pre_apply_drop_cancel(scheduled_barriers).await? { - info = self.resolve_actor_info().await; + info = self.resolve_actor_info().await.inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })?; } // update and build all actors. @@ -504,8 +508,7 @@ impl GlobalBarrierManagerContext { .collect(); if expired_parallel_units.is_empty() { debug!("no expired parallel units, skipping."); - let info = self.resolve_actor_info().await; - return Ok(info); + return self.resolve_actor_info().await; } debug!("start migrate actors."); @@ -527,8 +530,7 @@ impl GlobalBarrierManagerContext { .list_active_parallel_units() .await? .into_iter() - .map(|pu| pu.id as i32) - .filter(|pu| !inuse_parallel_units.contains(pu)) + .filter(|pu| !inuse_parallel_units.contains(&(pu.id as i32))) .collect_vec(); if !new_parallel_units.is_empty() { debug!("new parallel units found: {:#?}", new_parallel_units); @@ -536,9 +538,9 @@ impl GlobalBarrierManagerContext { if let Some(from) = to_migrate_parallel_units.pop() { debug!( "plan to migrate from parallel unit {} to {}", - from, target_parallel_unit + from, target_parallel_unit.id ); - inuse_parallel_units.insert(target_parallel_unit); + inuse_parallel_units.insert(target_parallel_unit.id as i32); plan.insert(from, target_parallel_unit); } else { break 'discovery; @@ -556,15 +558,15 @@ impl GlobalBarrierManagerContext { mgr.catalog_controller.migrate_actors(plan).await?; debug!("migrate actors succeed."); - let info = self.resolve_actor_info().await; - Ok(info) + + self.resolve_actor_info().await } /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. async fn migrate_actors_v1(&self) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let info = self.resolve_actor_info().await; + let info = self.resolve_actor_info().await?; // 1. get expired workers. let expired_workers: HashSet = info @@ -588,8 +590,7 @@ impl GlobalBarrierManagerContext { migration_plan.delete(self.env.meta_store_checked()).await?; debug!("migrate actors succeed."); - let info = self.resolve_actor_info().await; - Ok(info) + self.resolve_actor_info().await } async fn scale_actors(&self) -> MetaResult<()> { @@ -690,7 +691,7 @@ impl GlobalBarrierManagerContext { } async fn scale_actors_v1(&self) -> MetaResult<()> { - let info = self.resolve_actor_info().await; + let info = self.resolve_actor_info().await?; let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index bf84fc41e2198..4f7fb469aec97 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -50,8 +50,8 @@ use sea_orm::{ use crate::controller::catalog::{CatalogController, CatalogControllerInner}; use crate::controller::utils::{ - find_stream_source, get_actor_dispatchers, get_parallel_unit_mapping, FragmentDesc, - PartialActorLocation, PartialFragmentStateTables, + find_stream_source, get_actor_dispatchers, FragmentDesc, PartialActorLocation, + PartialFragmentStateTables, }; use crate::manager::{ActorInfos, LocalNotification}; use crate::model::TableParallelism; @@ -238,12 +238,11 @@ impl CatalogController { ) })?; - let parallel_unit_id = status + let (worker_id, parallel_unit_id) = status .parallel_unit .as_ref() - .map(|parallel_unit| parallel_unit.id) - .expect("no parallel unit id found in actor_status") - as _; + .map(|pu| (pu.worker_node_id as WorkerId, pu.id as i32)) + .expect("no parallel unit id found in actor_status"); assert_eq!( pb_upstream_actor_id @@ -264,6 +263,7 @@ impl CatalogController { status: status.get_state().unwrap().into(), splits, parallel_unit_id, + worker_id, upstream_actor_ids: upstream_actors.into(), vnode_bitmap: pb_vnode_bitmap.map(VnodeBitmap), expr_context: ExprContext(pb_expr_context), @@ -311,7 +311,6 @@ impl CatalogController { Vec, HashMap>, )>, - parallel_units_map: &HashMap, parallelism: StreamingParallelism, ) -> MetaResult { let mut pb_fragments = HashMap::new(); @@ -320,7 +319,7 @@ impl CatalogController { for (fragment, actors, actor_dispatcher) in fragments { let (fragment, fragment_actor_status, fragment_actor_splits) = - Self::compose_fragment(fragment, actors, actor_dispatcher, parallel_units_map)?; + Self::compose_fragment(fragment, actors, actor_dispatcher)?; pb_fragments.insert(fragment.fragment_id, fragment); @@ -352,7 +351,6 @@ impl CatalogController { fragment: fragment::Model, actors: Vec, mut actor_dispatcher: HashMap>, - parallel_units_map: &HashMap, ) -> MetaResult<( PbFragment, HashMap, @@ -391,6 +389,7 @@ impl CatalogController { fragment_id, status, parallel_unit_id, + worker_id, splits, upstream_actor_ids, vnode_bitmap, @@ -434,12 +433,10 @@ impl CatalogController { pb_actor_status.insert( actor_id as _, PbActorStatus { - parallel_unit: Some( - parallel_units_map - .get(&(parallel_unit_id as _)) - .unwrap() - .clone(), - ), + parallel_unit: Some(PbParallelUnit { + id: parallel_unit_id as _, + worker_node_id: worker_id as _, + }), state: PbActorState::from(status) as _, }, ); @@ -631,13 +628,11 @@ impl CatalogController { fragment_info.push((fragment, actors, dispatcher_info)); } - let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; Self::compose_table_fragments( job_id as _, job_info.job_status.into(), job_info.timezone.map(|tz| PbStreamContext { timezone: tz }), fragment_info, - ¶llel_units_map, job_info.parallelism.clone(), ) } @@ -710,7 +705,6 @@ impl CatalogController { pub async fn table_fragments(&self) -> MetaResult> { let inner = self.inner.read().await; let jobs = StreamingJob::find().all(&inner.db).await?; - let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; let mut table_fragments = BTreeMap::new(); for job in jobs { let fragment_actors = Fragment::find() @@ -743,7 +737,6 @@ impl CatalogController { job.job_status.into(), job.timezone.map(|tz| PbStreamContext { timezone: tz }), fragment_info, - ¶llel_units_map, job.parallelism.clone(), )?, ); @@ -844,15 +837,12 @@ impl CatalogController { /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or /// collected - pub async fn load_all_actors( - &self, - parallel_units_map: &HashMap, - ) -> MetaResult { + pub async fn load_all_actors(&self) -> MetaResult { let inner = self.inner.read().await; - let actor_info: Vec<(ActorId, i32, i32)> = Actor::find() + let actor_info: Vec<(ActorId, WorkerId, i32)> = Actor::find() .select_only() .column(actor::Column::ActorId) - .column(actor::Column::ParallelUnitId) + .column(actor::Column::WorkerId) .column(fragment::Column::FragmentTypeMask) .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) .filter(actor::Column::Status.eq(ActorStatus::Running)) @@ -863,19 +853,14 @@ impl CatalogController { let mut actor_maps = HashMap::new(); let mut barrier_inject_actor_maps = HashMap::new(); - for (actor_id, parallel_unit_id, type_mask) in actor_info { - // FIXME: since worker might have gone, it's not safe to unwrap here. - let worker_id = parallel_units_map - .get(&(parallel_unit_id as _)) - .unwrap() - .worker_node_id; + for (actor_id, worker_id, type_mask) in actor_info { actor_maps - .entry(worker_id) + .entry(worker_id as _) .or_insert_with(Vec::new) .push(actor_id as _); if Self::is_injectable(type_mask as _) { barrier_inject_actor_maps - .entry(worker_id) + .entry(worker_id as _) .or_insert_with(Vec::new) .push(actor_id as _); } @@ -887,14 +872,18 @@ impl CatalogController { }) } - pub async fn migrate_actors(&self, plan: HashMap) -> MetaResult<()> { + pub async fn migrate_actors(&self, plan: HashMap) -> MetaResult<()> { let inner = self.inner.read().await; let txn = inner.db.begin().await?; for (from_pu_id, to_pu_id) in &plan { Actor::update_many() .col_expr( actor::Column::ParallelUnitId, - Expr::value(Value::Int(Some(*to_pu_id))), + Expr::value(Value::Int(Some(to_pu_id.id as i32))), + ) + .col_expr( + actor::Column::WorkerId, + Expr::value(Value::Int(Some(to_pu_id.worker_node_id as WorkerId))), ) .filter(actor::Column::ParallelUnitId.eq(*from_pu_id)) .exec(&txn) @@ -913,7 +902,7 @@ impl CatalogController { for (fragment_id, vnode_mapping) in &mut fragment_mapping { vnode_mapping.0.data.iter_mut().for_each(|id| { if let Some(new_id) = plan.get(&(*id as i32)) { - *id = *new_id as u32; + *id = new_id.id; } }); fragment::ActiveModel { @@ -959,7 +948,6 @@ impl CatalogController { include_inactive: bool, ) -> MetaResult>> { let inner = self.inner.read().await; - let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; let fragment_actors = if include_inactive { Fragment::find() .find_with_related(Actor) @@ -992,7 +980,7 @@ impl CatalogController { } let (table_fragments, actor_status, _) = - Self::compose_fragment(fragment, actors, dispatcher_info, ¶llel_units_map)?; + Self::compose_fragment(fragment, actors, dispatcher_info)?; for actor in table_fragments.actors { let node_id = actor_status[&actor.actor_id] .get_parallel_unit() @@ -1013,10 +1001,9 @@ impl CatalogController { job_ids: Vec, ) -> MetaResult>> { let inner = self.inner.read().await; - let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; - let actor_pu: Vec<(ActorId, i32)> = Actor::find() + let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find() .select_only() - .columns([actor::Column::ActorId, actor::Column::ParallelUnitId]) + .columns([actor::Column::ActorId, actor::Column::WorkerId]) .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) .filter(fragment::Column::JobId.is_in(job_ids)) .into_tuple() @@ -1024,11 +1011,7 @@ impl CatalogController { .await?; let mut worker_actors = BTreeMap::new(); - for (actor_id, pu_id) in actor_pu { - let worker_id = parallel_units_map - .get(&(pu_id as _)) - .unwrap() - .worker_node_id as WorkerId; + for (actor_id, worker_id) in actor_workers { worker_actors .entry(worker_id) .or_insert_with(Vec::new) @@ -1123,7 +1106,6 @@ impl CatalogController { } } - let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; let mut root_fragments = HashMap::new(); for (_, fragment) in fragments { let actors = fragment.find_related(Actor).all(&inner.db).await?; @@ -1135,7 +1117,7 @@ impl CatalogController { root_fragments.insert( fragment.job_id, - Self::compose_fragment(fragment, actors, actor_dispatchers, ¶llel_units_map)?.0, + Self::compose_fragment(fragment, actors, actor_dispatchers)?.0, ); } @@ -1163,7 +1145,6 @@ impl CatalogController { .collect(); let inner = self.inner.read().await; - let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; let mut chain_fragments = vec![]; for (fragment_id, dispatch_strategy) in downstream_dispatches { let mut fragment_actors = Fragment::find_by_id(fragment_id) @@ -1180,8 +1161,7 @@ impl CatalogController { actors.iter().map(|actor| actor.actor_id).collect(), ) .await?; - let fragment = - Self::compose_fragment(fragment, actors, actor_dispatchers, ¶llel_units_map)?.0; + let fragment = Self::compose_fragment(fragment, actors, actor_dispatchers)?.0; chain_fragments.push((dispatch_strategy, fragment)); } @@ -1271,7 +1251,6 @@ impl CatalogController { } assert_eq!(fragments.len(), 1); - let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; let fragment = fragments.pop().unwrap(); let actor_with_dispatchers = fragment .find_related(Actor) @@ -1285,7 +1264,7 @@ impl CatalogController { actors.push(actor); } - Ok(Self::compose_fragment(fragment, actors, actor_dispatchers, ¶llel_units_map)?.0) + Ok(Self::compose_fragment(fragment, actors, actor_dispatchers)?.0) } } @@ -1332,7 +1311,7 @@ mod tests { (0..count) .map(|parallel_unit_id| ParallelUnit { id: parallel_unit_id, - ..Default::default() + worker_node_id: 0, }) .collect_vec() } @@ -1509,11 +1488,6 @@ mod tests { }) .collect(); - let parallel_units_map = parallel_units - .iter() - .map(|parallel_unit| (parallel_unit.id, parallel_unit.clone())) - .collect(); - let actors = (0..actor_count) .map(|actor_id| { let parallel_unit_id = actor_id as ParallelUnitId; @@ -1538,6 +1512,7 @@ mod tests { status: ActorStatus::Running, splits: actor_splits, parallel_unit_id: parallel_unit_id as i32, + worker_id: 0, upstream_actor_ids: ActorUpstreamActors(actor_upstream_actor_ids), vnode_bitmap, expr_context: ExprContext(PbExprContext { @@ -1588,7 +1563,6 @@ mod tests { fragment.clone(), actors.clone(), actor_dispatchers.clone(), - ¶llel_units_map, ) .unwrap(); @@ -1625,6 +1599,7 @@ mod tests { status, splits, parallel_unit_id, + worker_id: _, upstream_actor_ids, vnode_bitmap, expr_context, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index a6273cf12067e..86a527c6a0eb0 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -992,7 +992,7 @@ impl CatalogController { ); let actor_upstreams = ActorUpstreamActors(actor_upstreams); - let parallel_unit_id = parallel_unit.unwrap().id; + let parallel_unit = parallel_unit.unwrap(); let splits = actor_splits .get(&actor_id) @@ -1003,7 +1003,8 @@ impl CatalogController { fragment_id: Set(fragment_id as _), status: Set(ActorStatus::Running), splits: Set(splits.map(|splits| PbConnectorSplits { splits }.into())), - parallel_unit_id: Set(parallel_unit_id as _), + parallel_unit_id: Set(parallel_unit.id as _), + worker_id: Set(parallel_unit.worker_node_id as _), upstream_actor_ids: Set(actor_upstreams), vnode_bitmap: Set(vnode_bitmap.map(|bitmap| bitmap.into())), expr_context: Set(expr_context.unwrap().into()), diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index e5c6044271e31..53ac3c9616e28 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -23,12 +23,11 @@ use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, - object_dependency, schema, sink, source, table, user, user_privilege, view, worker_property, - ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, - PrivilegeId, SchemaId, SourceId, StreamNode, UserId, WorkerId, + object_dependency, schema, sink, source, table, user, user_privilege, view, ActorId, + DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId, + SchemaId, SourceId, StreamNode, UserId, }; use risingwave_pb::catalog::{PbConnection, PbFunction}; -use risingwave_pb::common::PbParallelUnit; use risingwave_pb::meta::PbFragmentParallelUnitMapping; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource}; @@ -613,40 +612,6 @@ pub fn extract_grant_obj_id(object: &PbObject) -> ObjectId { } } -// todo: deprecate parallel units and avoid this query. -pub async fn get_parallel_unit_mapping(db: &C) -> MetaResult> -where - C: ConnectionTrait, -{ - let parallel_units: Vec<(WorkerId, I32Array)> = WorkerProperty::find() - .select_only() - .columns([ - worker_property::Column::WorkerId, - worker_property::Column::ParallelUnitIds, - ]) - .into_tuple() - .all(db) - .await?; - let parallel_units_map = parallel_units - .into_iter() - .flat_map(|(worker_id, parallel_unit_ids)| { - parallel_unit_ids - .into_inner() - .into_iter() - .map(move |parallel_unit_id| { - ( - parallel_unit_id as _, - PbParallelUnit { - id: parallel_unit_id as _, - worker_node_id: worker_id as _, - }, - ) - }) - }) - .collect(); - Ok(parallel_units_map) -} - pub async fn get_actor_dispatchers( db: &C, actor_ids: Vec,