Skip to content

Commit

Permalink
feat: Introduce table resize plan for scaling. (#14235)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
Co-authored-by: August <[email protected]>
  • Loading branch information
shanicky and yezizp2012 authored Dec 28, 2023
1 parent ee6c1e9 commit 680e398
Show file tree
Hide file tree
Showing 14 changed files with 814 additions and 146 deletions.
17 changes: 17 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ message TableFragments {
map<uint32, source.ConnectorSplits> actor_splits = 5;

stream_plan.StreamContext ctx = 6;
TableParallelism parallelism = 7;
}

/// Parallel unit mapping with fragment id, used for notification.
Expand Down Expand Up @@ -461,6 +462,22 @@ message RescheduleResponse {
uint64 revision = 2;
}

message TableParallelism {
message FixedParallelism {
uint32 parallelism = 1;
}

message AutoParallelism {}

message CustomParallelism {}

oneof parallelism {
FixedParallelism fixed = 1;
AutoParallelism auto = 2;
CustomParallelism custom = 3;
}
}

message GetReschedulePlanRequest {
uint64 revision = 1;

Expand Down
5 changes: 5 additions & 0 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ impl ParallelUnitMapping {
Self::new_uniform(parallel_units.iter().map(|pu| pu.id))
}

/// Create a uniform parallel unit mapping from the given parallel units ids
pub fn build_from_ids(parallel_unit_ids: &[ParallelUnitId]) -> Self {
Self::new_uniform(parallel_unit_ids.iter().cloned())
}

/// Transform this parallel unit mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<ParallelUnitId, ActorId>) -> ActorMapping {
self.transform(to_map)
Expand Down
19 changes: 19 additions & 0 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use risingwave_meta::model::TableParallelism;
use risingwave_meta::stream::{ScaleController, ScaleControllerRef};
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::scale_service_server::ScaleService;
Expand Down Expand Up @@ -145,6 +147,22 @@ impl ScaleService for ScaleServiceImpl {
}));
}

let table_parallelisms = {
let guard = self.fragment_manager.get_fragment_read_guard().await;

let mut table_parallelisms = HashMap::new();
for (table_id, table) in guard.table_fragments() {
if table
.fragment_ids()
.any(|fragment_id| reschedules.contains_key(&fragment_id))
{
table_parallelisms.insert(*table_id, TableParallelism::Custom);
}
}

table_parallelisms
};

self.stream_manager
.reschedule_actors(
reschedules
Expand All @@ -170,6 +188,7 @@ impl ScaleService for ScaleServiceImpl {
RescheduleOptions {
resolve_no_shuffle_upstream,
},
Some(table_parallelisms),
)
.await?;

Expand Down
10 changes: 7 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use super::trace::TracedEpoch;
use crate::barrier::CommandChanges;
use crate::hummock::HummockManagerRef;
use crate::manager::{CatalogManagerRef, DdlType, FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{
build_actor_connector_splits, ScaleControllerRef, SourceManagerRef, SplitAssignment,
ThrottleConfig,
Expand Down Expand Up @@ -146,6 +146,7 @@ pub enum Command {
/// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
RescheduleFragment {
reschedules: HashMap<FragmentId, Reschedule>,
table_parallelism: HashMap<TableId, TableParallelism>,
},

/// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is
Expand Down Expand Up @@ -891,10 +892,13 @@ impl CommandContext {
.await;
}

Command::RescheduleFragment { reschedules } => {
Command::RescheduleFragment {
reschedules,
table_parallelism,
} => {
let node_dropped_actors = self
.scale_controller
.post_apply_reschedule(reschedules)
.post_apply_reschedule(reschedules, table_parallelism)
.await?;
self.clean_up(node_dropped_actors).await?;
}
Expand Down
61 changes: 31 additions & 30 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::get_reschedule_plan_request::{PbWorkerChanges, StableResizePolicy};
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
Expand All @@ -44,7 +43,7 @@ use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager};
use crate::manager::WorkerId;
use crate::model::{BarrierManagerState, MigrationPlan};
use crate::stream::{build_actor_connector_splits, RescheduleOptions};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
use crate::MetaResult;

impl GlobalBarrierManager {
Expand Down Expand Up @@ -416,38 +415,39 @@ impl GlobalBarrierManager {
return Ok(false);
}

let all_worker_parallel_units = self.fragment_manager.all_worker_parallel_units().await;

let expired_worker_parallel_units: HashMap<_, _> = all_worker_parallel_units
.into_iter()
.filter(|(worker, _)| expired_workers.contains(worker))
.collect();

let fragment_worker_changes = {
let table_parallelisms = {
let guard = self.fragment_manager.get_fragment_read_guard().await;
let mut policy = HashMap::new();
for table_fragments in guard.table_fragments().values() {
for fragment_id in table_fragments.fragment_ids() {
policy.insert(
fragment_id,
PbWorkerChanges {
exclude_worker_ids: expired_workers.iter().cloned().collect(),
..Default::default()
},
);
}
}
policy

guard
.table_fragments()
.iter()
.map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism))
.collect()
};

let workers = self
.cluster_manager
.list_active_streaming_compute_nodes()
.await;

let schedulable_worker_ids = workers
.iter()
.filter(|worker| {
!worker
.property
.as_ref()
.map(|p| p.is_unschedulable)
.unwrap_or(false)
})
.map(|worker| worker.id)
.collect();

let plan = self
.scale_controller
.generate_stable_resize_plan(
StableResizePolicy {
fragment_worker_changes,
},
Some(expired_worker_parallel_units),
)
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms,
})
.await?;

let (reschedule_fragment, applied_reschedules) = self
Expand All @@ -457,12 +457,13 @@ impl GlobalBarrierManager {
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
)
.await?;

if let Err(e) = self
.scale_controller
.post_apply_reschedule(&reschedule_fragment)
.post_apply_reschedule(&reschedule_fragment, &Default::default())
.await
{
tracing::error!(
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ impl CatalogController {
actor_status: pb_actor_status,
actor_splits: pb_actor_splits,
ctx: Some(ctx.unwrap_or_default()),
// TODO(peng): fix this for model v2
parallelism: None,
};

Ok(table_fragments)
Expand Down
13 changes: 10 additions & 3 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::manager::cluster::WorkerId;
use crate::manager::{commit_meta, commit_meta_with_trx, LocalNotification, MetaSrvEnv};
use crate::model::{
ActorId, BTreeMapTransaction, FragmentId, MetadataModel, MigrationPlan, TableFragments,
ValTransaction,
TableParallelism, ValTransaction,
};
use crate::storage::Transaction;
use crate::stream::{SplitAssignment, TableRevision};
Expand Down Expand Up @@ -1111,6 +1111,7 @@ impl FragmentManager {
pub async fn post_apply_reschedules(
&self,
mut reschedules: HashMap<FragmentId, Reschedule>,
table_parallelism_assignment: HashMap<TableId, TableParallelism>,
) -> MetaResult<()> {
let mut guard = self.core.write().await;
let current_version = guard.table_revision;
Expand Down Expand Up @@ -1178,6 +1179,8 @@ impl FragmentManager {
})
.collect_vec();

let mut table_fragment = table_fragments.get_mut(table_id).unwrap();

for (fragment_id, reschedule) in reschedules {
let Reschedule {
added_actors,
Expand All @@ -1189,8 +1192,6 @@ impl FragmentManager {
actor_splits,
} = reschedule;

let mut table_fragment = table_fragments.get_mut(table_id).unwrap();

// First step, update self fragment
// Add actors to this fragment: set the state to `Running`.
for actor_id in &added_actors {
Expand Down Expand Up @@ -1328,6 +1329,12 @@ impl FragmentManager {
}
}

for (table_id, parallelism) in table_parallelism_assignment {
if let Some(mut table) = table_fragments.get_mut(table_id) {
table.assigned_parallelism = parallelism;
}
}

assert!(reschedules.is_empty(), "all reschedules must be applied");

// new empty transaction
Expand Down
56 changes: 55 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::meta::table_parallelism::{
FixedParallelism, Parallelism, PbAutoParallelism, PbCustomParallelism, PbFixedParallelism,
PbParallelism,
};
use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
use risingwave_pb::plan_common::PbExprContext;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
Expand All @@ -37,6 +41,43 @@ use crate::stream::{build_actor_connector_splits, build_actor_split_impls, Split
/// Column family name for table fragments.
const TABLE_FRAGMENTS_CF_NAME: &str = "cf/table_fragments";

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TableParallelism {
Auto,
Fixed(usize),
Custom,
}

impl From<PbTableParallelism> for TableParallelism {
fn from(value: PbTableParallelism) -> Self {
use Parallelism::*;
match &value.parallelism {
Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
Some(Auto(_)) => Self::Auto,
Some(Custom(_)) => Self::Custom,
_ => Self::Auto,
}
}
}

impl From<TableParallelism> for PbTableParallelism {
fn from(value: TableParallelism) -> Self {
use TableParallelism::*;

let parallelism = match value {
Auto => PbParallelism::Auto(PbAutoParallelism {}),
Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
parallelism: n as u32,
}),
Custom => PbParallelism::Custom(PbCustomParallelism {}),
};

Self {
parallelism: Some(parallelism),
}
}
}

/// Fragments of a streaming job.
///
/// We store whole fragments in a single column family as follow:
Expand All @@ -60,6 +101,9 @@ pub struct TableFragments {

/// The streaming context associated with this stream plan and its fragments
pub ctx: StreamContext,

/// The parallelism assigned to this table fragments
pub assigned_parallelism: TableParallelism,
}

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -109,18 +153,25 @@ impl MetadataModel for TableFragments {
actor_status: self.actor_status.clone().into_iter().collect(),
actor_splits: build_actor_connector_splits(&self.actor_splits),
ctx: Some(self.ctx.to_protobuf()),
parallelism: Some(self.assigned_parallelism.into()),
}
}

fn from_protobuf(prost: Self::PbType) -> Self {
let ctx = StreamContext::from_protobuf(prost.get_ctx().unwrap());

let default_parallelism = PbTableParallelism {
parallelism: Some(Parallelism::Custom(PbCustomParallelism {})),
};

Self {
table_id: TableId::new(prost.table_id),
state: prost.state(),
fragments: prost.fragments.into_iter().collect(),
actor_status: prost.actor_status.into_iter().collect(),
actor_splits: build_actor_split_impls(&prost.actor_splits),
ctx,
assigned_parallelism: prost.parallelism.unwrap_or(default_parallelism).into(),
}
}

Expand All @@ -137,6 +188,7 @@ impl TableFragments {
fragments,
&BTreeMap::new(),
StreamContext::default(),
TableParallelism::Auto,
)
}

Expand All @@ -147,6 +199,7 @@ impl TableFragments {
fragments: BTreeMap<FragmentId, Fragment>,
actor_locations: &BTreeMap<ActorId, ParallelUnit>,
ctx: StreamContext,
table_parallelism: TableParallelism,
) -> Self {
let actor_status = actor_locations
.iter()
Expand All @@ -168,6 +221,7 @@ impl TableFragments {
actor_status,
actor_splits: HashMap::default(),
ctx,
assigned_parallelism: table_parallelism,
}
}

Expand Down
Loading

0 comments on commit 680e398

Please sign in to comment.