Skip to content

Commit

Permalink
Refine streaming parallelism and actor scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jan 30, 2024
1 parent 6f9de63 commit d5c06d1
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 57 deletions.
1 change: 0 additions & 1 deletion src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ derive_from_json_struct!(
pub enum StreamingParallelism {
Auto,
Fixed(usize),
Custom,
}

impl Eq for StreamingParallelism {}
82 changes: 34 additions & 48 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,56 +610,29 @@ impl GlobalBarrierManagerContext {
}
}

async fn scale_actors_v2(&self, info: &InflightActorInfo) -> MetaResult<bool> {
async fn scale_actors_v2(&self, _info: &InflightActorInfo) -> MetaResult<bool> {
let mgr = self.metadata_manager.as_v2_ref();
let prev_used_parallel_units: HashSet<_> = mgr
.catalog_controller
.all_inuse_parallel_units()
.await?
.into_iter()
.collect();
let curr_worker_parallel_units: HashSet<_> = info
.node_map
.iter()
.flat_map(|(_, worker_node)| {
worker_node
.parallel_units
.iter()
.map(|parallel_unit| parallel_unit.id as i32)
})
.collect();

// todo: maybe we can only check the reduced workers
if curr_worker_parallel_units == prev_used_parallel_units {
debug!("no changed workers, skipping.");
return Ok(false);
}

info!("parallel unit has changed, triggering a forced reschedule.");

debug!(
"previous worker parallel units {:?}, current worker parallel units {:?}",
prev_used_parallel_units, curr_worker_parallel_units
);
debug!("start resetting actors distribution");

let streaming_parallelisms = mgr
.catalog_controller
.get_all_streaming_parallelisms()
.await?;
let table_parallelisms: HashMap<_, _> = {
let streaming_parallelisms = mgr
.catalog_controller
.get_all_streaming_parallelisms()
.await?;

let table_parallelisms = streaming_parallelisms
.into_iter()
.map(|(id, parallelism)| {
(
id as u32,
match parallelism {
streaming_parallelisms
.into_iter()
.map(|(table_id, parallelism)| {
// no custom for sql backend
let table_parallelism = match parallelism {
StreamingParallelism::Auto => TableParallelism::Auto,
StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
StreamingParallelism::Custom => TableParallelism::Custom,
},
)
})
.collect();
};

(table_id as u32, table_parallelism)
})
.collect()
};

let workers = mgr
.cluster_controller
Expand All @@ -682,24 +655,37 @@ impl GlobalBarrierManagerContext {
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms,
table_parallelisms: table_parallelisms.clone(),
})
.await?;

let table_parallelisms: HashMap<_, _> = table_parallelisms
.into_iter()
.map(|(table_id, parallelism)| {
debug_assert_ne!(parallelism, TableParallelism::Custom);
(TableId::new(table_id), parallelism)
})
.collect();

let mut compared_table_parallelisms = table_parallelisms.clone();

let (reschedule_fragment, _) = self
.scale_controller
.prepare_reschedule_command(
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
Some(&mut compared_table_parallelisms),
)
.await?;

// Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
debug_assert_eq!(compared_table_parallelisms, table_parallelisms);

if let Err(e) = self
.scale_controller
.post_apply_reschedule(&reschedule_fragment, &Default::default())
.post_apply_reschedule(&reschedule_fragment, &table_parallelisms)
.await
{
tracing::error!(
Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ use risingwave_meta_model_v2::{
connection, database, function, index, object, object_dependency, schema, sink, source,
streaming_job, table, user_privilege, view, ActorId, ColumnCatalogArray, ConnectionId,
CreateType, DatabaseId, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
PrivateLinkService, Property, SchemaId, SourceId, StreamSourceInfo, TableId, UserId,
DatabaseId, FunctionId, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId,
SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId,
PrivateLinkService, Property, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism,
TableId, UserId,
};
use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::{
Expand Down
10 changes: 5 additions & 5 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ use risingwave_meta_model_v2::prelude::{
StreamingJob as StreamingJobModel, Table,
};
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, index, object_dependency, sink, source, streaming_job,
table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, FragmentId,
I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, StreamingParallelism,
TableId, TableVersion, UserId,
actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source,
streaming_job, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray,
FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode,
StreamingParallelism, TableId, TableVersion, UserId,
};
use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion};
Expand Down Expand Up @@ -1201,7 +1201,7 @@ impl CatalogController {
streaming_job.parallelism = Set(match parallelism {
TableParallelism::Auto => StreamingParallelism::Auto,
TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n as _),
TableParallelism::Custom => StreamingParallelism::Custom,
TableParallelism::Custom => unreachable!("sql backend does't support custom parallelism"),
});

streaming_job.update(&txn).await?;
Expand Down

0 comments on commit d5c06d1

Please sign in to comment.