Skip to content

Commit

Permalink
Formatted vnode import; Removed scale_prev module; Actor rescale logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jun 24, 2024
1 parent bff821e commit 81c1466
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 1,909 deletions.
2 changes: 1 addition & 1 deletion src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ mod tests {
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::WorkerNode;

use crate::hash::{VirtualNode};
use crate::hash::VirtualNode;
use crate::vnode_mapping::vnode_placement::place_vnode;
#[test]
fn test_place_vnode() {
Expand Down
2 changes: 0 additions & 2 deletions src/meta/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

mod scale;
mod scale_prev;
mod sink;
mod source_manager;
mod stream_graph;
Expand All @@ -29,7 +28,6 @@ use risingwave_pb::stream_plan::StreamActor;
use risingwave_pb::stream_service::build_actor_info::SubscriptionIds;
use risingwave_pb::stream_service::BuildActorInfo;
pub use scale::*;
pub use scale_prev::*;
pub use sink::*;
pub use source_manager::*;
pub use stream_graph::*;
Expand Down
92 changes: 92 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2801,3 +2801,95 @@ impl ConsistentHashRingV2 {
Ok(task_distribution)
}
}

#[cfg(test)]
mod tests {
use super::*;

const DEFAULT_SALT: u32 = 42;

#[test]
fn test_single_worker_capacity() {
let mut ch = ConsistentHashRingV2::new(DEFAULT_SALT);
ch.add_worker(1, 10);

let total_tasks = 5;
let task_distribution = ch.distribute_tasks(total_tasks).unwrap();

assert_eq!(task_distribution.get(&1).cloned().unwrap_or(0), 5);
}

#[test]
fn test_multiple_workers_even_distribution() {
let mut ch = ConsistentHashRingV2::new(DEFAULT_SALT);

ch.add_worker(1, 1);
ch.add_worker(2, 1);
ch.add_worker(3, 1);

let total_tasks = 3;
let task_distribution = ch.distribute_tasks(total_tasks).unwrap();

for id in 1..=3 {
assert_eq!(task_distribution.get(&id).cloned().unwrap_or(0), 1);
}
}

#[test]
fn test_weighted_distribution() {
let mut ch = ConsistentHashRingV2::new(DEFAULT_SALT);

ch.add_worker(1, 2);
ch.add_worker(2, 3);
ch.add_worker(3, 5);

let total_tasks = 10;
let task_distribution = ch.distribute_tasks(total_tasks).unwrap();

assert_eq!(task_distribution.get(&1).cloned().unwrap_or(0), 2);
assert_eq!(task_distribution.get(&2).cloned().unwrap_or(0), 3);
assert_eq!(task_distribution.get(&3).cloned().unwrap_or(0), 5);
}

#[test]
fn test_over_capacity() {
let mut ch = ConsistentHashRingV2::new(DEFAULT_SALT);

ch.add_worker(1, 1);
ch.add_worker(2, 2);
ch.add_worker(3, 3);

let total_tasks = 10; // More tasks than the total weight
let task_distribution = ch.distribute_tasks(total_tasks);

assert!(task_distribution.is_err());
}

#[test]
fn test_balance_distribution() {
for mut worker_capacity in 1..10 {
for workers in 3..10 {
let mut ring = ConsistentHashRingV2::new(DEFAULT_SALT);

for worker_id in 0..workers {
ring.add_worker(worker_id, worker_capacity);
}

// Here we simulate a real situation where the actual parallelism cannot fill all the capacity.
// This is to ensure an average distribution, for example, when three workers with 6 parallelism are assigned 9 tasks,
// they should ideally get an exact distribution of 3, 3, 3 respectively.
if worker_capacity % 2 == 0 {
worker_capacity /= 2;
}

let total_tasks = worker_capacity * workers;

let task_distribution = ring.distribute_tasks(total_tasks).unwrap();

for (_, v) in task_distribution {
assert_eq!(v, worker_capacity);
}
}
}
}
}
Loading

0 comments on commit 81c1466

Please sign in to comment.