Skip to content

Commit

Permalink
refactor(batch): schedule with streaming mapping if serving is tempor…
Browse files Browse the repository at this point in the history
…arily unavailable (#18505)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Sep 12, 2024
1 parent 4afcd4d commit 317641f
Showing 1 changed file with 21 additions and 33 deletions.
54 changes: 21 additions & 33 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::Duration;
use rand::seq::SliceRandom;
use risingwave_common::bail;
use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping};
use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
use risingwave_pb::common::{WorkerNode, WorkerType};

Expand Down Expand Up @@ -346,38 +346,26 @@ impl WorkerNodeSelector {
if self.enable_barrier_read {
self.manager.get_streaming_fragment_mapping(&fragment_id)
} else {
let (hint, parallelism) = match self.manager.serving_fragment_mapping(fragment_id) {
Ok(o) => {
if self.manager.worker_node_mask().is_empty() {
// 1. Stable mapping for most cases.
return Ok(o);
}
// If it's a singleton, set max_parallelism=1 for place_vnode.
let max_parallelism = o.to_single().map(|_| 1);
(Some(o), max_parallelism)
}
Err(e) => {
if !matches!(e, BatchError::ServingVnodeMappingNotFound(_)) {
return Err(e);
}
// We cannot tell whether it's a singleton, set max_parallelism=1 for place_vnode as if it's a singleton.
let max_parallelism = 1;
tracing::warn!(
fragment_id,
max_parallelism,
"Serving fragment mapping not found, fall back to temporary one."
);
// Workaround the case that new mapping is not available yet due to asynchronous
// notification.
(None, Some(max_parallelism))
}
};
// 2. Temporary mapping that filters out unavailable workers.
let new_workers = self.apply_worker_node_mask(self.manager.list_serving_worker_nodes());
// TODO(var-vnode): use vnode count from config
let masked_mapping =
place_vnode(hint.as_ref(), &new_workers, parallelism, VirtualNode::COUNT);
masked_mapping.ok_or_else(|| BatchError::EmptyWorkerNodes)
let mapping = (self.manager.serving_fragment_mapping(fragment_id)).or_else(|_| {
tracing::warn!(
fragment_id,
"Serving fragment mapping not found, fall back to streaming one."
);
self.manager.get_streaming_fragment_mapping(&fragment_id)
})?;

// Filter out unavailable workers.
if self.manager.worker_node_mask().is_empty() {
Ok(mapping)
} else {
let workers = self.apply_worker_node_mask(self.manager.list_serving_worker_nodes());
// If it's a singleton, set max_parallelism=1 for place_vnode.
let max_parallelism = mapping.to_single().map(|_| 1);
let masked_mapping =
place_vnode(Some(&mapping), &workers, max_parallelism, mapping.len())
.ok_or_else(|| BatchError::EmptyWorkerNodes)?;
Ok(masked_mapping)
}
}
}

Expand Down

0 comments on commit 317641f

Please sign in to comment.