Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce table resize plan for scaling. #14235

Merged
merged 4 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ message TableFragments {
map<uint32, source.ConnectorSplits> actor_splits = 5;

stream_plan.StreamContext ctx = 6;
TableParallelism parallelism = 7;
}

/// Parallel unit mapping with fragment id, used for notification.
Expand Down Expand Up @@ -461,6 +462,22 @@ message RescheduleResponse {
uint64 revision = 2;
}

message TableParallelism {
message FixedParallelism {
uint32 parallelism = 1;
}

message AutoParallelism {}

message CustomParallelism {}

oneof parallelism {
FixedParallelism fixed = 1;
AutoParallelism auto = 2;
Comment on lines +475 to +476
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding some comments to describe them? For example, the respective behavior when auto scaling is on/off.

CustomParallelism custom = 3;
}
}

message GetReschedulePlanRequest {
uint64 revision = 1;

Expand Down
5 changes: 5 additions & 0 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ impl ParallelUnitMapping {
Self::new_uniform(parallel_units.iter().map(|pu| pu.id))
}

/// Create a uniform parallel unit mapping from the given parallel units ids
pub fn build_from_ids(parallel_unit_ids: &[ParallelUnitId]) -> Self {
Self::new_uniform(parallel_unit_ids.iter().cloned())
}

/// Transform this parallel unit mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<ParallelUnitId, ActorId>) -> ActorMapping {
self.transform(to_map)
Expand Down
19 changes: 19 additions & 0 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use risingwave_meta::model::TableParallelism;
use risingwave_meta::stream::{ScaleController, ScaleControllerRef};
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::scale_service_server::ScaleService;
Expand Down Expand Up @@ -145,6 +147,22 @@ impl ScaleService for ScaleServiceImpl {
}));
}

let table_parallelisms = {
let guard = self.fragment_manager.get_fragment_read_guard().await;

let mut table_parallelisms = HashMap::new();
for (table_id, table) in guard.table_fragments() {
if table
.fragment_ids()
.any(|fragment_id| reschedules.contains_key(&fragment_id))
{
table_parallelisms.insert(*table_id, TableParallelism::Custom);
}
}

table_parallelisms
};

self.stream_manager
.reschedule_actors(
reschedules
Expand All @@ -170,6 +188,7 @@ impl ScaleService for ScaleServiceImpl {
RescheduleOptions {
resolve_no_shuffle_upstream,
},
Some(table_parallelisms),
)
.await?;

Expand Down
10 changes: 7 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use super::trace::TracedEpoch;
use crate::barrier::CommandChanges;
use crate::hummock::HummockManagerRef;
use crate::manager::{CatalogManagerRef, DdlType, FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{
build_actor_connector_splits, ScaleControllerRef, SourceManagerRef, SplitAssignment,
ThrottleConfig,
Expand Down Expand Up @@ -146,6 +146,7 @@ pub enum Command {
/// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
RescheduleFragment {
reschedules: HashMap<FragmentId, Reschedule>,
table_parallelism: HashMap<TableId, TableParallelism>,
},

/// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is
Expand Down Expand Up @@ -891,10 +892,13 @@ impl CommandContext {
.await;
}

Command::RescheduleFragment { reschedules } => {
Command::RescheduleFragment {
reschedules,
table_parallelism,
} => {
let node_dropped_actors = self
.scale_controller
.post_apply_reschedule(reschedules)
.post_apply_reschedule(reschedules, table_parallelism)
.await?;
self.clean_up(node_dropped_actors).await?;
}
Expand Down
61 changes: 31 additions & 30 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::get_reschedule_plan_request::{PbWorkerChanges, StableResizePolicy};
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
Expand All @@ -44,7 +43,7 @@ use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager};
use crate::manager::WorkerId;
use crate::model::{BarrierManagerState, MigrationPlan};
use crate::stream::{build_actor_connector_splits, RescheduleOptions};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
use crate::MetaResult;

impl GlobalBarrierManager {
Expand Down Expand Up @@ -416,38 +415,39 @@ impl GlobalBarrierManager {
return Ok(false);
}

let all_worker_parallel_units = self.fragment_manager.all_worker_parallel_units().await;

let expired_worker_parallel_units: HashMap<_, _> = all_worker_parallel_units
.into_iter()
.filter(|(worker, _)| expired_workers.contains(worker))
.collect();

let fragment_worker_changes = {
let table_parallelisms = {
let guard = self.fragment_manager.get_fragment_read_guard().await;
let mut policy = HashMap::new();
for table_fragments in guard.table_fragments().values() {
for fragment_id in table_fragments.fragment_ids() {
policy.insert(
fragment_id,
PbWorkerChanges {
exclude_worker_ids: expired_workers.iter().cloned().collect(),
..Default::default()
},
);
}
}
policy

guard
.table_fragments()
.iter()
.map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism))
.collect()
};

let workers = self
.cluster_manager
.list_active_streaming_compute_nodes()
.await;

let schedulable_worker_ids = workers
.iter()
.filter(|worker| {
!worker
.property
.as_ref()
.map(|p| p.is_unschedulable)
.unwrap_or(false)
})
.map(|worker| worker.id)
.collect();

let plan = self
.scale_controller
.generate_stable_resize_plan(
StableResizePolicy {
fragment_worker_changes,
},
Some(expired_worker_parallel_units),
)
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms,
})
.await?;

let (reschedule_fragment, applied_reschedules) = self
Expand All @@ -457,12 +457,13 @@ impl GlobalBarrierManager {
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
)
.await?;

if let Err(e) = self
.scale_controller
.post_apply_reschedule(&reschedule_fragment)
.post_apply_reschedule(&reschedule_fragment, &Default::default())
.await
{
tracing::error!(
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ impl CatalogController {
actor_status: pb_actor_status,
actor_splits: pb_actor_splits,
ctx: Some(ctx.unwrap_or_default()),
// TODO(peng): fix this for model v2
parallelism: None,
};

Ok(table_fragments)
Expand Down
13 changes: 10 additions & 3 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::manager::cluster::WorkerId;
use crate::manager::{commit_meta, commit_meta_with_trx, LocalNotification, MetaSrvEnv};
use crate::model::{
ActorId, BTreeMapTransaction, FragmentId, MetadataModel, MigrationPlan, TableFragments,
ValTransaction,
TableParallelism, ValTransaction,
};
use crate::storage::Transaction;
use crate::stream::{SplitAssignment, TableRevision};
Expand Down Expand Up @@ -1111,6 +1111,7 @@ impl FragmentManager {
pub async fn post_apply_reschedules(
&self,
mut reschedules: HashMap<FragmentId, Reschedule>,
table_parallelism_assignment: HashMap<TableId, TableParallelism>,
) -> MetaResult<()> {
let mut guard = self.core.write().await;
let current_version = guard.table_revision;
Expand Down Expand Up @@ -1178,6 +1179,8 @@ impl FragmentManager {
})
.collect_vec();

let mut table_fragment = table_fragments.get_mut(table_id).unwrap();

for (fragment_id, reschedule) in reschedules {
let Reschedule {
added_actors,
Expand All @@ -1189,8 +1192,6 @@ impl FragmentManager {
actor_splits,
} = reschedule;

let mut table_fragment = table_fragments.get_mut(table_id).unwrap();

// First step, update self fragment
// Add actors to this fragment: set the state to `Running`.
for actor_id in &added_actors {
Expand Down Expand Up @@ -1328,6 +1329,12 @@ impl FragmentManager {
}
}

for (table_id, parallelism) in table_parallelism_assignment {
if let Some(mut table) = table_fragments.get_mut(table_id) {
table.assigned_parallelism = parallelism;
}
}

assert!(reschedules.is_empty(), "all reschedules must be applied");

// new empty transaction
Expand Down
56 changes: 55 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::meta::table_parallelism::{
FixedParallelism, Parallelism, PbAutoParallelism, PbCustomParallelism, PbFixedParallelism,
PbParallelism,
};
use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
use risingwave_pb::plan_common::PbExprContext;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
Expand All @@ -37,6 +41,43 @@ use crate::stream::{build_actor_connector_splits, build_actor_split_impls, Split
/// Column family name for table fragments.
const TABLE_FRAGMENTS_CF_NAME: &str = "cf/table_fragments";

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TableParallelism {
Auto,
Fixed(usize),
Custom,
}

impl From<PbTableParallelism> for TableParallelism {
fn from(value: PbTableParallelism) -> Self {
use Parallelism::*;
match &value.parallelism {
Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
Some(Auto(_)) => Self::Auto,
Some(Custom(_)) => Self::Custom,
_ => Self::Auto,
}
}
}

impl From<TableParallelism> for PbTableParallelism {
fn from(value: TableParallelism) -> Self {
use TableParallelism::*;

let parallelism = match value {
Auto => PbParallelism::Auto(PbAutoParallelism {}),
Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
parallelism: n as u32,
}),
Custom => PbParallelism::Custom(PbCustomParallelism {}),
};

Self {
parallelism: Some(parallelism),
}
}
}

/// Fragments of a streaming job.
///
/// We store whole fragments in a single column family as follow:
Expand All @@ -60,6 +101,9 @@ pub struct TableFragments {

/// The streaming context associated with this stream plan and its fragments
pub ctx: StreamContext,

/// The parallelism assigned to this table fragments
pub assigned_parallelism: TableParallelism,
}

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -109,18 +153,25 @@ impl MetadataModel for TableFragments {
actor_status: self.actor_status.clone().into_iter().collect(),
actor_splits: build_actor_connector_splits(&self.actor_splits),
ctx: Some(self.ctx.to_protobuf()),
parallelism: Some(self.assigned_parallelism.into()),
}
}

fn from_protobuf(prost: Self::PbType) -> Self {
let ctx = StreamContext::from_protobuf(prost.get_ctx().unwrap());

let default_parallelism = PbTableParallelism {
parallelism: Some(Parallelism::Custom(PbCustomParallelism {})),
};

Self {
table_id: TableId::new(prost.table_id),
state: prost.state(),
fragments: prost.fragments.into_iter().collect(),
actor_status: prost.actor_status.into_iter().collect(),
actor_splits: build_actor_split_impls(&prost.actor_splits),
ctx,
assigned_parallelism: prost.parallelism.unwrap_or(default_parallelism).into(),
}
}

Expand All @@ -137,6 +188,7 @@ impl TableFragments {
fragments,
&BTreeMap::new(),
StreamContext::default(),
TableParallelism::Auto,
)
}

Expand All @@ -147,6 +199,7 @@ impl TableFragments {
fragments: BTreeMap<FragmentId, Fragment>,
actor_locations: &BTreeMap<ActorId, ParallelUnit>,
ctx: StreamContext,
table_parallelism: TableParallelism,
) -> Self {
let actor_status = actor_locations
.iter()
Expand All @@ -168,6 +221,7 @@ impl TableFragments {
actor_status,
actor_splits: HashMap::default(),
ctx,
assigned_parallelism: table_parallelism,
}
}

Expand Down
Loading
Loading