Skip to content

Commit

Permalink
Refactor worker manager & meta mgmt, simplify worker map & enhance ba…
Browse files Browse the repository at this point in the history
…tch proc logic
  • Loading branch information
shanicky committed Jul 1, 2024
1 parent a48ef13 commit 9c57fe1
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 16 deletions.
11 changes: 2 additions & 9 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,12 @@ impl WorkerNodeManager {

let guard = self.inner.read().unwrap();

let worker_slot_index: HashMap<_, _> = guard
.worker_nodes
.iter()
.flat_map(|worker| {
(0..worker.parallelism as usize)
.map(move |i| (WorkerSlotId::new(worker.id, i), worker))
})
.collect();
let worker_index: HashMap<_, _> = guard.worker_nodes.iter().map(|w| (w.id, w)).collect();

let mut workers = Vec::with_capacity(worker_slot_ids.len());

for worker_slot_id in worker_slot_ids {
match worker_slot_index.get(worker_slot_id) {
match worker_index.get(&worker_slot_id.worker_id()) {
Some(worker) => workers.push((*worker).clone()),
None => bail!(
"No worker node found for worker slot id: {}",
Expand Down
14 changes: 7 additions & 7 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1525,24 +1525,24 @@ impl DdlController {
));
}

let available_parallel_units = NonZeroUsize::new(available_parallelism).unwrap();
let available_parallelism = NonZeroUsize::new(available_parallelism).unwrap();

// Use configured parallel units if no default parallelism is specified.
let parallelism =
specified_parallelism.unwrap_or_else(|| match &self.env.opts.default_parallelism {
DefaultParallelism::Full => available_parallel_units,
DefaultParallelism::Full => available_parallelism,
DefaultParallelism::Default(num) => *num,
});

if parallelism > available_parallel_units {
if parallelism > available_parallelism {
return Err(MetaError::unavailable(format!(
"Not enough parallel units to schedule, required: {}, available: {}",
parallelism, available_parallel_units
"Not enough parallelism to schedule, required: {}, available: {}",
parallelism, available_parallelism
)));
}

if available_parallel_units > MAX_PARALLELISM {
tracing::warn!("Too many parallel units, use {} instead", MAX_PARALLELISM);
if available_parallelism > MAX_PARALLELISM {
tracing::warn!("Too many parallelism, use {} instead", MAX_PARALLELISM);
Ok(MAX_PARALLELISM)
} else {
Ok(parallelism)
Expand Down

0 comments on commit 9c57fe1

Please sign in to comment.