Skip to content

Commit

Permalink
Refactor, docs update, and tweak parallelism access
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jul 8, 2024
1 parent 10cbd2a commit fdf6f02
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 18 deletions.
6 changes: 1 addition & 5 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,12 @@ pub mod marker {

/// A marker type for items of [`ActorId`].
pub struct Actor;

impl VnodeMappingItem for Actor {
type Item = ActorId;
}

/// A marker type for items of [`ParallelUnitId`].
pub struct ParallelUnit;

impl VnodeMappingItem for ParallelUnit {
type Item = ParallelUnitId;
}
Expand Down Expand Up @@ -470,7 +468,7 @@ impl ParallelUnitMapping {
}

impl WorkerSlotMapping {
/// Transform this parallel unit mapping to an actor mapping, essentially `transform`.
/// Transform this worker slot mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
self.transform(to_map)
}
Expand All @@ -485,13 +483,11 @@ mod tests {
use super::*;

struct Test;

impl VnodeMappingItem for Test {
type Item = u32;
}

struct Test2;

impl VnodeMappingItem for Test2 {
type Item = u32;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
is_streaming: Set(pb_property.is_streaming),
is_serving: Set(pb_property.is_serving),
is_unschedulable: Set(pb_property.is_unschedulable),
parallelism: Set(worker.worker_node.parallelism as _),
parallelism: Set(worker.worker_node.parallelism() as _),
};
WorkerProperty::insert(property)
.exec(&meta_store_sql.conn)
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, VirtualNode};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::{
ActorInfo, Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode, WorkerType,
};
use risingwave_pb::common::{ActorInfo, Buffer, ParallelUnit, WorkerNode, WorkerType};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::fragment::{
Expand Down
7 changes: 0 additions & 7 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,12 +1198,6 @@ mod tests {
let table_id = TableId::new(0);
let actors = make_mview_stream_actors(&table_id, 4);

let StreamingClusterInfo { .. } = services
.global_stream_manager
.metadata_manager
.get_streaming_cluster_info()
.await?;

let mut fragments = BTreeMap::default();

fragments.insert(
Expand All @@ -1217,7 +1211,6 @@ mod tests {
..Default::default()
},
);

services
.create_materialized_view(table_id, fragments)
.await?;
Expand Down
4 changes: 2 additions & 2 deletions src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ impl stream_plan::MaterializeNode {
}
}

// Encapsulating the use of parallel_units.
// Encapsulating the use of parallelism.
impl common::WorkerNode {
pub fn parallelism(&self) -> usize {
self.parallel_units.len()
self.parallelism as usize
}
}

Expand Down

0 comments on commit fdf6f02

Please sign in to comment.