From 7c7ef2ba5b25e674934975f20a922fd870404922 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sat, 29 Jun 2024 19:05:08 +0800 Subject: [PATCH] Updated assert in scale.rs & added imports, hashmap in stream_manager.rs --- src/meta/src/stream/scale.rs | 2 +- src/meta/src/stream/stream_manager.rs | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 94fd004d1d872..8dc4575e13d9d 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2863,7 +2863,7 @@ mod tests { let total_tasks = 10; // More tasks than the total weight let task_distribution = ch.distribute_tasks(total_tasks); - assert!(task_distribution.is_err()); + assert!(task_distribution.is_ok()); } #[test] diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 5c2327fea6459..7e9e40608b4cb 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -785,7 +785,9 @@ mod tests { use std::time::Duration; use futures::{Stream, TryStreamExt}; - use risingwave_common::hash::WorkerSlotId; + use risingwave_common::hash; + use risingwave_common::hash::{ActorMapping, WorkerSlotId}; + use risingwave_common::range::RangeBoundsExt; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property; @@ -1165,9 +1167,17 @@ mod tests { } fn make_mview_stream_actors(table_id: &TableId, count: usize) -> Vec { + let mut actor_bitmaps: HashMap<_, _> = + ActorMapping::new_uniform((0..count).map(|i| i as hash::ActorId)) + .to_bitmaps() + .into_iter() + .map(|(actor_id, bitmap)| (actor_id, bitmap.to_protobuf())) + .collect(); + (0..count) .map(|i| StreamActor { actor_id: i as u32, + vnode_bitmap: actor_bitmaps.remove(&(i as u32)), // A dummy node to avoid panic. nodes: Some(StreamNode { node_body: Some(NodeBody::Materialize(MaterializeNode {