Skip to content

Commit

Permalink
fix unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 20, 2024
1 parent 39dab2d commit 34a2539
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 16 deletions.
11 changes: 8 additions & 3 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,19 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
}
}

/// Create a vnode mapping where all vnodes are mapped to the same single item.
/// Create a vnode mapping with vnode count 1 and the single item.
///
/// The length of the mapping will be 1, as if there's only one vnode in total.
/// This is to be consistent with [`VnodeBitmapExt::singleton`].
/// Should only be used for singleton distribution. This is to be consistent with
/// [`VnodeBitmapExt::singleton`].
pub fn new_single(item: T::Item) -> Self {
Self::new_uniform(std::iter::once(item), 1)
}

/// Create a vnode mapping with the same item for all vnodes. Mainly used for testing.
pub fn new_all_same(item: T::Item, vnode_count: usize) -> Self {
Self::new_uniform(std::iter::once(item), vnode_count)
}

/// The length (or count) of the vnode in this mapping.
pub fn len(&self) -> usize {
self.original_indices
Expand Down
16 changes: 6 additions & 10 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ pub(crate) mod tests {
//
let ctx = OptimizerContext::mock().await;
let table_id = 0.into();
let vnode_count = VirtualNode::COUNT_FOR_TEST;

let table_catalog: TableCatalog = TableCatalog {
id: table_id,
associated_source_id: None,
Expand Down Expand Up @@ -597,7 +599,7 @@ pub(crate) mod tests {
initialized_at_cluster_version: None,
created_at_cluster_version: None,
cdc_table_id: None,
vnode_count: Some(VirtualNode::COUNT_FOR_TEST),
vnode_count: Some(vnode_count),
};
let batch_plan_node: PlanRef = LogicalScan::create(
"".to_string(),
Expand Down Expand Up @@ -726,15 +728,9 @@ pub(crate) mod tests {
let workers = vec![worker1, worker2, worker3];
let worker_node_manager = Arc::new(WorkerNodeManager::mock(workers));
let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false);
worker_node_manager.insert_streaming_fragment_mapping(
0,
WorkerSlotMapping::new_single(WorkerSlotId::new(0, 0)),
);
worker_node_manager.set_serving_fragment_mapping(
vec![(0, WorkerSlotMapping::new_single(WorkerSlotId::new(0, 0)))]
.into_iter()
.collect(),
);
let mapping = WorkerSlotMapping::new_all_same(WorkerSlotId::new(0, 0), vnode_count);
worker_node_manager.insert_streaming_fragment_mapping(0, mapping.clone());
worker_node_manager.set_serving_fragment_mapping(vec![(0, mapping)].into_iter().collect());
let catalog = Arc::new(parking_lot::RwLock::new(Catalog::default()));
catalog.write().insert_table_id_mapping(table_id, 0);
let catalog_reader = CatalogReader::new(catalog);
Expand Down
3 changes: 0 additions & 3 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1430,9 +1430,6 @@ impl ScaleController {
FragmentDistributionType::Hash => {
if !in_degree_types.contains(&DispatcherType::Hash) {
None
} else if actors_after_reschedule.len() == 1 {
let actor_id = actors_after_reschedule.keys().next().cloned().unwrap();
Some(ActorMapping::new_single(actor_id))
} else {
// Changes of the bitmap must occur in the case of HashDistribution
Some(ActorMapping::from_bitmaps(
Expand Down

0 comments on commit 34a2539

Please sign in to comment.