Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <>
  • Loading branch information
Shanicky Chen committed Jun 20, 2024
1 parent b6f4b47 commit e53f0e4
Show file tree
Hide file tree
Showing 14 changed files with 479 additions and 791 deletions.
8 changes: 6 additions & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ message WorkerNode {
WorkerType type = 2;
HostAddress host = 3;
State state = 4;
// TODO #8940 `parallel_units` should be moved into `Property`
repeated ParallelUnit parallel_units = 5;

reserved 5;
reserved "parallel_units";

Property property = 6;

// Ranges from 0 to 1023, used to generate the machine ID field in the global unique ID.
Expand All @@ -75,6 +77,8 @@ message WorkerNode {
// It's populated by meta node, when the worker node is added by meta node.
// It's not persistent in meta store.
optional uint64 started_at = 9;

uint32 parallelism = 10;
}

message Buffer {
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
.iter()
.flat_map(|worker| {
(0..(worker.parallel_units.len()))
(0..(worker.parallelism as usize))
.map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
})
.collect();
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl WorkerNodeManager {
.worker_nodes
.iter()
.flat_map(|worker| {
(0..worker.parallel_units.len())
(0..worker.parallelism as usize)
.map(move |i| (WorkerSlotId::new(worker.id, i), worker))
})
.collect();
Expand Down Expand Up @@ -337,7 +337,7 @@ impl WorkerNodeSelector {
};
worker_nodes
.iter()
.map(|node| node.parallel_units.len())
.map(|node| node.parallelism as usize)
.sum()
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn place_vnode(
.iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
.sorted_by_key(|w| w.id)
.map(|w| (0..w.parallel_units.len()).map(|idx| WorkerSlotId::new(w.id, idx)))
.map(|w| (0..w.parallelism as usize).map(|idx| WorkerSlotId::new(w.id, idx)))
.collect();

// Set serving parallelism to the minimum of total number of worker slots, specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
port: host.map(|h| h.port.to_string()),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().into(),
parallelism: worker.parallel_units.len() as i32,
parallelism: worker.parallelism as i32,
is_streaming: property.map(|p| p.is_streaming),
is_serving: property.map(|p| p.is_serving),
is_unschedulable: property.map(|p| p.is_unschedulable),
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ struct ShowClusterRow {
addr: String,
r#type: String,
state: String,
parallel_units: String,
// parallel_units: String,
is_streaming: Option<bool>,
is_serving: Option<bool>,
is_unschedulable: Option<bool>,
Expand Down Expand Up @@ -435,7 +435,7 @@ pub async fn handle_show_object(
addr: addr.to_string(),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().to_string(),
parallel_units: worker.parallel_units.into_iter().map(|pu| pu.id).join(", "),
// parallel_units: worker.parallel_units.into_iter().map(|pu| pu.id).join(", "),
is_streaming: property.map(|p| p.is_streaming),
is_serving: property.map(|p| p.is_serving),
is_unschedulable: property.map(|p| p.is_unschedulable),
Expand Down
293 changes: 148 additions & 145 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,157 +133,160 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<RescheduleRequest>,
) -> Result<Response<RescheduleResponse>, Status> {
self.barrier_manager.check_status_running()?;

let RescheduleRequest {
worker_reschedules,
revision,
resolve_no_shuffle_upstream,
} = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;

let current_revision = self.get_revision().await;

if revision != current_revision.inner() {
return Ok(Response::new(RescheduleResponse {
success: false,
revision: current_revision.inner(),
}));
}

let table_parallelisms = {
match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

let mut table_parallelisms = HashMap::new();
for (table_id, table) in guard.table_fragments() {
if table
.fragment_ids()
.any(|fragment_id| worker_reschedules.contains_key(&fragment_id))
{
table_parallelisms.insert(*table_id, TableParallelism::Custom);
}
}

table_parallelisms
}
MetadataManager::V2(mgr) => {
let streaming_job_ids = mgr
.catalog_controller
.get_fragment_job_id(
worker_reschedules
.keys()
.map(|id| *id as FragmentId)
.collect(),
)
.await?;

streaming_job_ids
.into_iter()
.map(|id| (catalog::TableId::new(id as _), TableParallelism::Custom))
.collect()
}
}
};

self.stream_manager
.reschedule_actors_v2(
worker_reschedules
.into_iter()
.map(|(fragment_id, reschedule)| {
let PbWorkerReschedule {
increased_actor_count,
decreased_actor_count,
} = reschedule;

(
fragment_id,
WorkerReschedule {
increased_actor_count: increased_actor_count
.into_iter()
.map(|(k, v)| (k as _, v as _))
.collect(),
decreased_actor_count: decreased_actor_count
.into_iter()
.map(|(k, v)| (k as _, v as _))
.collect(),
},
)
})
.collect(),
RescheduleOptions {
resolve_no_shuffle_upstream,
skip_create_new_actors: false,
},
Some(table_parallelisms),
)
.await?;

let next_revision = self.get_revision().await;

Ok(Response::new(RescheduleResponse {
success: true,
revision: next_revision.into(),
}))
// self.barrier_manager.check_status_running()?;
//
// let RescheduleRequest {
// worker_reschedules,
// revision,
// resolve_no_shuffle_upstream,
// } = request.into_inner();
//
// let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;
//
// let current_revision = self.get_revision().await;
//
// if revision != current_revision.inner() {
// return Ok(Response::new(RescheduleResponse {
// success: false,
// revision: current_revision.inner(),
// }));
// }
//
// let table_parallelisms = {
// match &self.metadata_manager {
// MetadataManager::V1(mgr) => {
// let guard = mgr.fragment_manager.get_fragment_read_guard().await;
//
// let mut table_parallelisms = HashMap::new();
// for (table_id, table) in guard.table_fragments() {
// if table
// .fragment_ids()
// .any(|fragment_id| worker_reschedules.contains_key(&fragment_id))
// {
// table_parallelisms.insert(*table_id, TableParallelism::Custom);
// }
// }
//
// table_parallelisms
// }
// MetadataManager::V2(mgr) => {
// let streaming_job_ids = mgr
// .catalog_controller
// .get_fragment_job_id(
// worker_reschedules
// .keys()
// .map(|id| *id as FragmentId)
// .collect(),
// )
// .await?;
//
// streaming_job_ids
// .into_iter()
// .map(|id| (catalog::TableId::new(id as _), TableParallelism::Custom))
// .collect()
// }
// }
// };
//
// self.stream_manager
// .reschedule_actors_v2(
// worker_reschedules
// .into_iter()
// .map(|(fragment_id, reschedule)| {
// let PbWorkerReschedule {
// increased_actor_count,
// decreased_actor_count,
// } = reschedule;
//
// (
// fragment_id,
// WorkerReschedule {
// increased_actor_count: increased_actor_count
// .into_iter()
// .map(|(k, v)| (k as _, v as _))
// .collect(),
// decreased_actor_count: decreased_actor_count
// .into_iter()
// .map(|(k, v)| (k as _, v as _))
// .collect(),
// },
// )
// })
// .collect(),
// RescheduleOptions {
// resolve_no_shuffle_upstream,
// skip_create_new_actors: false,
// },
// Some(table_parallelisms),
// )
// .await?;
//
// let next_revision = self.get_revision().await;
//
// Ok(Response::new(RescheduleResponse {
// success: true,
// revision: next_revision.into(),
// }))
todo!()
}

#[cfg_attr(coverage, coverage(off))]
async fn get_reschedule_plan(
&self,
request: Request<GetReschedulePlanRequest>,
) -> Result<Response<GetReschedulePlanResponse>, Status> {
self.barrier_manager.check_status_running()?;

let req = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let current_revision = self.get_revision().await;

if req.revision != current_revision.inner() {
return Ok(Response::new(GetReschedulePlanResponse {
success: false,
revision: current_revision.inner(),
reschedules: Default::default(),
}));
}

let policy = req
.policy
.ok_or_else(|| Status::invalid_argument("policy is required"))?;

let scale_controller = &self.scale_controller;

let plan = scale_controller.get_reschedule_plan(policy).await?;

let next_revision = self.get_revision().await;

// generate reschedule plan will not change the revision
assert_eq!(current_revision, next_revision);

Ok(Response::new(GetReschedulePlanResponse {
success: true,
revision: next_revision.into(),
reschedules: plan
.into_iter()
.map(|(fragment_id, reschedule)| {
(
fragment_id,
Reschedule {
added_parallel_units: reschedule
.added_parallel_units
.into_iter()
.collect(),
removed_parallel_units: reschedule
.removed_parallel_units
.into_iter()
.collect(),
},
)
})
.collect(),
}))
// self.barrier_manager.check_status_running()?;
//
// let req = request.into_inner();
//
// let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
//
// let current_revision = self.get_revision().await;
//
// if req.revision != current_revision.inner() {
// return Ok(Response::new(GetReschedulePlanResponse {
// success: false,
// revision: current_revision.inner(),
// reschedules: Default::default(),
// }));
// }
//
// let policy = req
// .policy
// .ok_or_else(|| Status::invalid_argument("policy is required"))?;
//
// let scale_controller = &self.scale_controller;
//
// let plan = scale_controller.get_reschedule_plan(policy).await?;
//
// let next_revision = self.get_revision().await;
//
// // generate reschedule plan will not change the revision
// assert_eq!(current_revision, next_revision);
//
// Ok(Response::new(GetReschedulePlanResponse {
// success: true,
// revision: next_revision.into(),
// reschedules: plan
// .into_iter()
// .map(|(fragment_id, reschedule)| {
// (
// fragment_id,
// Reschedule {
// added_parallel_units: reschedule
// .added_parallel_units
// .into_iter()
// .collect(),
// removed_parallel_units: reschedule
// .removed_parallel_units
// .into_iter()
// .collect(),
// },
// )
// })
// .collect(),
// }))

todo!()
}
}
2 changes: 1 addition & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ impl GlobalBarrierManager {
id: node.id,
r#type: node.r#type,
host: node.host.clone(),
parallel_units: node.parallel_units.clone(),
parallelism: node.parallelism,
property: node.property.clone(),
resource: node.resource.clone(),
..Default::default()
Expand Down
Loading

0 comments on commit e53f0e4

Please sign in to comment.