diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 54be33103be32..d9a4a1ef5a726 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -288,14 +288,12 @@ pub mod marker { /// A marker type for items of [`ActorId`]. pub struct Actor; - impl VnodeMappingItem for Actor { type Item = ActorId; } /// A marker type for items of [`ParallelUnitId`]. pub struct ParallelUnit; - impl VnodeMappingItem for ParallelUnit { type Item = ParallelUnitId; } @@ -470,7 +468,7 @@ impl ParallelUnitMapping { } impl WorkerSlotMapping { - /// Transform this parallel unit mapping to an actor mapping, essentially `transform`. + /// Transform this worker slot mapping to an actor mapping, essentially `transform`. pub fn to_actor(&self, to_map: &HashMap) -> ActorMapping { self.transform(to_map) } @@ -485,13 +483,11 @@ mod tests { use super::*; struct Test; - impl VnodeMappingItem for Test { type Item = u32; } struct Test2; - impl VnodeMappingItem for Test2 { type Item = u32; } diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index df0b8222be7a2..38a3b8541bbda 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -161,7 +161,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an is_streaming: Set(pb_property.is_streaming), is_serving: Set(pb_property.is_serving), is_unschedulable: Set(pb_property.is_unschedulable), - parallelism: Set(worker.worker_node.parallelism as _), + parallelism: Set(worker.worker_node.parallelism() as _), }; WorkerProperty::insert(property) .exec(&meta_store_sql.conn) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 54881f93860d6..70db505010c29 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -31,9 +31,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_meta_model_v2::StreamingParallelism; -use risingwave_pb::common::{ - ActorInfo, Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode, WorkerType, -}; +use risingwave_pb::common::{ActorInfo, Buffer, ParallelUnit, WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::fragment::{ diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index bdf41fe7ca12c..385dc2107e1f1 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -1198,12 +1198,6 @@ mod tests { let table_id = TableId::new(0); let actors = make_mview_stream_actors(&table_id, 4); - let StreamingClusterInfo { .. } = services - .global_stream_manager - .metadata_manager - .get_streaming_cluster_info() - .await?; - let mut fragments = BTreeMap::default(); fragments.insert( @@ -1217,7 +1211,6 @@ mod tests { ..Default::default() }, ); - services .create_materialized_view(table_id, fragments) .await?; diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 2a20f2e5530d4..d1958dc74e42f 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -204,10 +204,10 @@ impl stream_plan::MaterializeNode { } } -// Encapsulating the use of parallel_units. +// Encapsulating the use of parallelism. impl common::WorkerNode { pub fn parallelism(&self) -> usize { - self.parallel_units.len() + self.parallelism as usize } }