Skip to content

Commit

Permalink
Updated assert in scale.rs & added imports, hashmap in stream_manager.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jun 29, 2024
1 parent 595b5e9 commit 7c7ef2b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2863,7 +2863,7 @@ mod tests {
let total_tasks = 10; // More tasks than the total weight
let task_distribution = ch.distribute_tasks(total_tasks);

assert!(task_distribution.is_err());
assert!(task_distribution.is_ok());
}

#[test]
Expand Down
12 changes: 11 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,9 @@ mod tests {
use std::time::Duration;

use futures::{Stream, TryStreamExt};
use risingwave_common::hash::WorkerSlotId;
use risingwave_common::hash;
use risingwave_common::hash::{ActorMapping, WorkerSlotId};
use risingwave_common::range::RangeBoundsExt;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_pb::common::{HostAddress, WorkerType};
use risingwave_pb::meta::add_worker_node_request::Property;
Expand Down Expand Up @@ -1165,9 +1167,17 @@ mod tests {
}

fn make_mview_stream_actors(table_id: &TableId, count: usize) -> Vec<StreamActor> {
let mut actor_bitmaps: HashMap<_, _> =
ActorMapping::new_uniform((0..count).map(|i| i as hash::ActorId))
.to_bitmaps()
.into_iter()
.map(|(actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
.collect();

(0..count)
.map(|i| StreamActor {
actor_id: i as u32,
vnode_bitmap: actor_bitmaps.remove(&(i as u32)),
// A dummy node to avoid panic.
nodes: Some(StreamNode {
node_body: Some(NodeBody::Materialize(MaterializeNode {
Expand Down

0 comments on commit 7c7ef2b

Please sign in to comment.