Skip to content

Commit

Permalink
feat(sql-backend): add worker id in actor table to support auto scale…
Browse files Browse the repository at this point in the history
… in and down in sql backend (#14958)
  • Loading branch information
yezizp2012 authored Feb 4, 2024
1 parent 9e90eeb commit b618869
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 126 deletions.
2 changes: 2 additions & 0 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Actor::Status).string().not_null())
.col(ColumnDef::new(Actor::Splits).json())
.col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
.col(ColumnDef::new(Actor::WorkerId).integer().not_null())
.col(ColumnDef::new(Actor::UpstreamActorIds).json())
.col(ColumnDef::new(Actor::VnodeBitmap).json())
.col(ColumnDef::new(Actor::ExprContext).json().not_null())
Expand Down Expand Up @@ -966,6 +967,7 @@ enum Actor {
Status,
Splits,
ParallelUnitId,
WorkerId,
UpstreamActorIds,
VnodeBitmap,
ExprContext,
Expand Down
5 changes: 4 additions & 1 deletion src/meta/model_v2/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
use sea_orm::entity::prelude::*;

use crate::{ActorId, ActorUpstreamActors, ConnectorSplits, ExprContext, FragmentId, VnodeBitmap};
use crate::{
ActorId, ActorUpstreamActors, ConnectorSplits, ExprContext, FragmentId, VnodeBitmap, WorkerId,
};

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
Expand Down Expand Up @@ -54,6 +56,7 @@ pub struct Model {
pub status: ActorStatus,
pub splits: Option<ConnectorSplits>,
pub parallel_unit_id: i32,
pub worker_id: WorkerId,
pub upstream_actor_ids: ActorUpstreamActors,
pub vnode_bitmap: Option<VnodeBitmap>,
pub expr_context: ExprContext,
Expand Down
14 changes: 3 additions & 11 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ impl GlobalBarrierManagerContext {
/// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
/// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
/// will create or drop before this barrier flow through them.
async fn resolve_actor_info(&self) -> InflightActorInfo {
async fn resolve_actor_info(&self) -> MetaResult<InflightActorInfo> {
let info = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let all_nodes = mgr
Expand All @@ -937,21 +937,13 @@ impl GlobalBarrierManagerContext {
.list_active_streaming_workers()
.await
.unwrap();
let pu_mappings = all_nodes
.iter()
.flat_map(|node| node.parallel_units.iter().map(|pu| (pu.id, pu.clone())))
.collect();
let all_actor_infos = mgr
.catalog_controller
.load_all_actors(&pu_mappings)
.await
.unwrap();
let all_actor_infos = mgr.catalog_controller.load_all_actors().await?;

InflightActorInfo::resolve(all_nodes, all_actor_infos)
}
};

info
Ok(info)
}

pub async fn get_ddl_progress(&self) -> Vec<DdlProgress> {
Expand Down
29 changes: 15 additions & 14 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,9 @@ impl GlobalBarrierManagerContext {
warn!(error = %err.as_report(), "scale actors failed");
})?;

self.resolve_actor_info().await
self.resolve_actor_info().await.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?
} else {
// Migrate actors in expired CN to newly joined one.
self.migrate_actors().await.inspect_err(|err| {
Expand All @@ -386,7 +388,9 @@ impl GlobalBarrierManagerContext {
})?;

if self.pre_apply_drop_cancel(scheduled_barriers).await? {
info = self.resolve_actor_info().await;
info = self.resolve_actor_info().await.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?;
}

// update and build all actors.
Expand Down Expand Up @@ -504,8 +508,7 @@ impl GlobalBarrierManagerContext {
.collect();
if expired_parallel_units.is_empty() {
debug!("no expired parallel units, skipping.");
let info = self.resolve_actor_info().await;
return Ok(info);
return self.resolve_actor_info().await;
}

debug!("start migrate actors.");
Expand All @@ -527,18 +530,17 @@ impl GlobalBarrierManagerContext {
.list_active_parallel_units()
.await?
.into_iter()
.map(|pu| pu.id as i32)
.filter(|pu| !inuse_parallel_units.contains(pu))
.filter(|pu| !inuse_parallel_units.contains(&(pu.id as i32)))
.collect_vec();
if !new_parallel_units.is_empty() {
debug!("new parallel units found: {:#?}", new_parallel_units);
for target_parallel_unit in new_parallel_units {
if let Some(from) = to_migrate_parallel_units.pop() {
debug!(
"plan to migrate from parallel unit {} to {}",
from, target_parallel_unit
from, target_parallel_unit.id
);
inuse_parallel_units.insert(target_parallel_unit);
inuse_parallel_units.insert(target_parallel_unit.id as i32);
plan.insert(from, target_parallel_unit);
} else {
break 'discovery;
Expand All @@ -556,15 +558,15 @@ impl GlobalBarrierManagerContext {
mgr.catalog_controller.migrate_actors(plan).await?;

debug!("migrate actors succeed.");
let info = self.resolve_actor_info().await;
Ok(info)

self.resolve_actor_info().await
}

/// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
async fn migrate_actors_v1(&self) -> MetaResult<InflightActorInfo> {
let mgr = self.metadata_manager.as_v1_ref();

let info = self.resolve_actor_info().await;
let info = self.resolve_actor_info().await?;

// 1. get expired workers.
let expired_workers: HashSet<WorkerId> = info
Expand All @@ -588,8 +590,7 @@ impl GlobalBarrierManagerContext {
migration_plan.delete(self.env.meta_store_checked()).await?;
debug!("migrate actors succeed.");

let info = self.resolve_actor_info().await;
Ok(info)
self.resolve_actor_info().await
}

async fn scale_actors(&self) -> MetaResult<()> {
Expand Down Expand Up @@ -690,7 +691,7 @@ impl GlobalBarrierManagerContext {
}

async fn scale_actors_v1(&self) -> MetaResult<()> {
let info = self.resolve_actor_info().await;
let info = self.resolve_actor_info().await?;

let mgr = self.metadata_manager.as_v1_ref();
debug!("start resetting actors distribution");
Expand Down
Loading

0 comments on commit b618869

Please sign in to comment.