From dba5975b25ff11cea2fa4403473ea949c4433f92 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 26 Sep 2024 17:39:06 +0800 Subject: [PATCH] feat: user-facing part of variable vnode count (#18515) Signed-off-by: Bugen Zhao --- e2e_test/batch/catalog/pg_settings.slt.part | 1 + java/com_risingwave_java_binding_Binding.h | 4 +- .../java/binding/HummockReadDemo.java | 5 +- .../com/risingwave/java/binding/Binding.java | 6 +- src/common/src/config.rs | 4 +- src/common/src/hash/consistent_hash/bitmap.rs | 24 ++++-- src/common/src/hash/consistent_hash/compat.rs | 4 +- .../src/hash/consistent_hash/mapping.rs | 5 +- src/common/src/hash/consistent_hash/vnode.rs | 19 +++-- src/common/src/hash/table_distribution.rs | 9 +-- src/common/src/session_config/mod.rs | 33 +++++++- src/config/docs.md | 4 +- src/expr/core/src/expr_context.rs | 9 +++ src/expr/impl/src/scalar/vnode.rs | 22 ++--- src/frontend/src/handler/alter_parallelism.rs | 36 +-------- src/frontend/src/stream_fragmenter/mod.rs | 4 +- src/jni_core/src/lib.rs | 7 +- src/jni_core/src/macros.rs | 10 +-- .../m20240911_083152_variable_vnode_count.rs | 2 + src/meta/src/controller/fragment.rs | 25 +++++- src/meta/src/error.rs | 12 ++- src/meta/src/manager/catalog/fragment.rs | 28 ++++--- src/meta/src/manager/metadata.rs | 8 +- src/meta/src/rpc/ddl_controller.rs | 80 ++++++++++++------- src/meta/src/serving/mod.rs | 33 ++++---- src/meta/src/stream/scale.rs | 2 - src/meta/src/stream/stream_graph/schedule.rs | 6 +- src/meta/src/stream/stream_manager.rs | 39 ++++++++- .../src/hummock/sstable/multi_builder.rs | 2 +- .../log_store_impl/kv_log_store/serde.rs | 2 +- src/stream/src/executor/actor.rs | 43 +++++++--- .../scale/streaming_parallelism.rs | 19 ++--- 32 files changed, 320 insertions(+), 187 deletions(-) diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index e05d466c3a4d6..641ce8ac65fde 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -58,6 +58,7 @@ user sink_decouple user source_rate_limit user standard_conforming_strings user statement_timeout +user streaming_max_parallelism user streaming_parallelism user streaming_use_arrangement_backfill user streaming_use_snapshot_backfill diff --git a/java/com_risingwave_java_binding_Binding.h b/java/com_risingwave_java_binding_Binding.h index 606110c405282..9de9f1d2f4fd3 100644 --- a/java/com_risingwave_java_binding_Binding.h +++ b/java/com_risingwave_java_binding_Binding.h @@ -9,10 +9,10 @@ extern "C" { #endif /* * Class: com_risingwave_java_binding_Binding - * Method: vnodeCount + * Method: defaultVnodeCount * Signature: ()I */ -JNIEXPORT jint JNICALL Java_com_risingwave_java_binding_Binding_vnodeCount +JNIEXPORT jint JNICALL Java_com_risingwave_java_binding_Binding_defaultVnodeCount (JNIEnv *, jclass); /* diff --git a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java index 6a3598b9c97d4..53ad64cc6a10f 100644 --- a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java +++ b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java @@ -50,7 +50,10 @@ public static void main(String[] args) { HummockVersion version = metaClient.pinVersion(); Table tableCatalog = metaClient.getTable(dbName, tableName); - int vnodeCount = Binding.vnodeCount(); + int vnodeCount = Binding.defaultVnodeCount(); + if (tableCatalog.hasMaybeVnodeCount()) { + vnodeCount = tableCatalog.getMaybeVnodeCount(); + } List vnodeList = new ArrayList<>(); for (int i = 0; i < vnodeCount; i++) { diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index db832566fdfa7..5d1a555968af2 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -33,7 +33,11 @@ public static native void tracingSlf4jEvent( public static native boolean tracingSlf4jEventEnabled(int level); - public static native int vnodeCount(); + /** + * Used to get the default number of vnodes for a table, if its `maybeVnodeCount` field is not + * set. + */ + public static native int defaultVnodeCount(); static native long iteratorNewStreamChunk(long pointer); diff --git a/src/common/src/config.rs b/src/common/src/config.rs index b312304e80799..7ab8b5d84c694 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -322,7 +322,7 @@ pub struct MetaConfig { pub do_not_config_object_storage_lifecycle: bool, /// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. - /// Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table. #[serde(default = "default::meta::partition_vnode_count")] pub partition_vnode_count: u32, @@ -351,7 +351,7 @@ pub struct MetaConfig { /// Count of partitions of tables in default group and materialized view group. /// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. - /// Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. /// Set it zero to disable this feature. #[serde(default = "default::meta::hybrid_partition_vnode_count")] pub hybrid_partition_vnode_count: u32, diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs index eee6a64a2b42c..a40946273a0a7 100644 --- a/src/common/src/hash/consistent_hash/bitmap.rs +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -13,8 +13,9 @@ // limitations under the License. use std::ops::RangeInclusive; +use std::sync::{Arc, LazyLock}; -use crate::bitmap::Bitmap; +use crate::bitmap::{Bitmap, BitmapBuilder}; use crate::hash::table_distribution::SINGLETON_VNODE; use crate::hash::VirtualNode; @@ -39,15 +40,24 @@ impl Bitmap { } /// Returns whether only the [`SINGLETON_VNODE`] is set in the bitmap. - /// - /// Note that this method returning `true` does not imply that the bitmap was created by - /// [`VnodeBitmapExt::singleton`], or that the bitmap has length 1. pub fn is_singleton(&self) -> bool { self.count_ones() == 1 && self.iter_vnodes().next().unwrap() == SINGLETON_VNODE } - /// Creates a bitmap with length 1 and the single bit set. - pub fn singleton() -> Self { - Self::ones(1) + /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length + /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. + pub fn singleton() -> &'static Self { + Self::singleton_arc() + } + + /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length + /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. + pub fn singleton_arc() -> &'static Arc { + static SINGLETON: LazyLock> = LazyLock::new(|| { + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT); + builder.set(SINGLETON_VNODE.to_index(), true); + builder.finish().into() + }); + &SINGLETON } } diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 58ff07a1514f9..0c86fbb12bcd4 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -16,7 +16,7 @@ use super::vnode::VirtualNode; /// A trait for accessing the vnode count field with backward compatibility. pub trait VnodeCountCompat { - /// Returns the vnode count, or [`VirtualNode::COUNT`] if the vnode count is not set, + /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, /// typically for backward compatibility. /// /// See the documentation on the field of the implementing type for more details. @@ -38,7 +38,7 @@ macro_rules! impl_maybe_vnode_count_compat { impl VnodeCountCompat for $ty { fn vnode_count(&self) -> usize { self.maybe_vnode_count - .map_or(VirtualNode::COUNT, |v| v as _) + .map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _) } } )* diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 80d5a56941cf6..1e7bca125fc50 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -140,9 +140,10 @@ impl VnodeMapping { } /// Create a vnode mapping with the single item. Should only be used for singletons. - // TODO(var-vnode): make vnode count 1, also `Distribution::vnode_count` + /// + /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count. pub fn new_single(item: T::Item) -> Self { - Self::new_uniform(std::iter::once(item), VirtualNode::COUNT) + Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT) } /// The length (or count) of the vnode in this mapping. diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 388083b81bffd..2a7390cfbbc5e 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -51,19 +51,22 @@ impl Crc32HashCode { } impl VirtualNode { - /// The total count of virtual nodes. - // TODO(var-vnode): remove this and only keep `COUNT_FOR_TEST` - pub const COUNT: usize = 1 << 8; - /// The maximum value of the virtual node. - // TODO(var-vnode): remove this and only keep `MAX_FOR_TEST` - pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1); + /// The total count of virtual nodes, for compatibility purposes **ONLY**. + /// + /// Typical use cases: + /// + /// - As the default value for the session configuration. + /// - As the vnode count for all streaming jobs, fragments, and tables that were created before + /// the variable vnode count support was introduced. + /// - As the vnode count for singletons. + pub const COUNT_FOR_COMPAT: usize = 1 << 8; } impl VirtualNode { /// The total count of virtual nodes, for testing purposes. - pub const COUNT_FOR_TEST: usize = Self::COUNT; + pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT; /// The maximum value of the virtual node, for testing purposes. - pub const MAX_FOR_TEST: VirtualNode = Self::MAX; + pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1); } impl VirtualNode { diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 5275aca04adb3..822db591c1577 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::mem::replace; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use itertools::Itertools; use risingwave_pb::plan_common::StorageTableDesc; @@ -74,7 +74,7 @@ impl TableDistribution { ) -> Self { let compute_vnode = if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk { ComputeVnode::VnodeColumnIndex { - vnodes: vnodes.unwrap_or_else(|| Bitmap::singleton().into()), + vnodes: vnodes.unwrap_or_else(|| Bitmap::singleton_arc().clone()), vnode_col_idx_in_pk, } } else if !dist_key_in_pk_indices.is_empty() { @@ -132,13 +132,10 @@ impl TableDistribution { /// Get vnode bitmap if distributed, or a dummy [`Bitmap::singleton()`] if singleton. pub fn vnodes(&self) -> &Arc { - static SINGLETON_VNODES: LazyLock> = - LazyLock::new(|| Bitmap::singleton().into()); - match &self.compute_vnode { ComputeVnode::DistKeyIndices { vnodes, .. } => vnodes, ComputeVnode::VnodeColumnIndex { vnodes, .. } => vnodes, - ComputeVnode::Singleton => &SINGLETON_VNODES, + ComputeVnode::Singleton => Bitmap::singleton_arc(), } } diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 163aa18799390..ec3f18a8e4205 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -30,6 +30,7 @@ use serde_with::{serde_as, DisplayFromStr}; use thiserror::Error; use self::non_zero64::ConfigNonZeroU64; +use crate::hash::VirtualNode; use crate::session_config::sink_decouple::SinkDecouple; use crate::session_config::transaction_isolation_level::IsolationLevel; pub use crate::session_config::visibility_mode::VisibilityMode; @@ -139,8 +140,11 @@ pub struct SessionConfig { #[parameter(default = "UTC", check_hook = check_timezone)] timezone: String, - /// If `STREAMING_PARALLELISM` is non-zero, CREATE MATERIALIZED VIEW/TABLE/INDEX will use it as - /// streaming parallelism. + /// The execution parallelism for streaming queries, including tables, materialized views, indexes, + /// and sinks. Defaults to 0, which means they will be scheduled adaptively based on the cluster size. + /// + /// If a non-zero value is set, streaming queries will be scheduled to use a fixed number of parallelism. + /// Note that the value will be bounded at `STREAMING_MAX_PARALLELISM`. #[serde_as(as = "DisplayFromStr")] #[parameter(default = ConfigNonZeroU64::default())] streaming_parallelism: ConfigNonZeroU64, @@ -298,6 +302,18 @@ pub struct SessionConfig { /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit. #[parameter(default = false)] bypass_cluster_limits: bool, + + /// The maximum number of parallelism a streaming query can use. Defaults to 256. + /// + /// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures + /// the maximum parallelism a streaming query can use in the future, if the cluster size changes or + /// users manually change the parallelism with `ALTER .. SET PARALLELISM`. + /// + /// It's not always a good idea to set this to a very large number, as it may cause performance + /// degradation when performing range scans on the table or the materialized view. + // a.k.a. vnode count + #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_vnode_count)] + streaming_max_parallelism: usize, } fn check_timezone(val: &str) -> Result<(), String> { @@ -324,6 +340,19 @@ fn check_bytea_output(val: &str) -> Result<(), String> { } } +/// Check if the provided value is a valid vnode count. +/// Note that we use term `max_parallelism` when it's user-facing. +fn check_vnode_count(val: &usize) -> Result<(), String> { + match val { + 0 => Err("STREAMING_MAX_PARALLELISM must be greater than 0".to_owned()), + 1..=VirtualNode::MAX_COUNT => Ok(()), + _ => Err(format!( + "STREAMING_MAX_PARALLELISM must be less than or equal to {}", + VirtualNode::MAX_COUNT + )), + } +} + impl SessionConfig { pub fn set_force_two_phase_agg( &mut self, diff --git a/src/config/docs.md b/src/config/docs.md index c9d008af2bb39..edf32965c4920 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -41,7 +41,7 @@ This page is automatically generated by `./risedev generate-example-config` | full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 | | hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 | | hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 | -| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | +| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | | max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 | | meta_leader_lease_secs | | 30 | | min_delta_log_num_for_hummock_version_checkpoint | The minimum delta log number a new checkpoint should compact, otherwise the checkpoint attempt is rejected. | 10 | @@ -52,7 +52,7 @@ This page is automatically generated by `./risedev generate-example-config` | parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 | | parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 | | parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 | -| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | +| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | | periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 | | periodic_scheduling_compaction_group_interval_sec | | 10 | | periodic_space_reclaim_compaction_interval_sec | Schedule `space_reclaim` compaction for all compaction groups with this interval. | 3600 | diff --git a/src/expr/core/src/expr_context.rs b/src/expr/core/src/expr_context.rs index 27a888118e318..d6798a861315f 100644 --- a/src/expr/core/src/expr_context.rs +++ b/src/expr/core/src/expr_context.rs @@ -14,6 +14,7 @@ use std::future::Future; +use risingwave_common::hash::VirtualNode; use risingwave_expr::{define_context, Result as ExprResult}; use risingwave_pb::plan_common::ExprContext; @@ -21,6 +22,7 @@ use risingwave_pb::plan_common::ExprContext; define_context! { pub TIME_ZONE: String, pub FRAGMENT_ID: u32, + pub VNODE_COUNT: usize, } pub fn capture_expr_context() -> ExprResult { @@ -28,6 +30,13 @@ pub fn capture_expr_context() -> ExprResult { Ok(ExprContext { time_zone }) } +/// Get the vnode count from the context, or [`VirtualNode::COUNT_FOR_COMPAT`] if not set. +// TODO(var-vnode): the only case where this is not set is for batch queries, is it still +// necessary to support `rw_vnode` expression in batch queries? +pub fn vnode_count() -> usize { + VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT_FOR_COMPAT) +} + pub async fn expr_context_scope(expr_context: ExprContext, future: Fut) -> Fut::Output where Fut: Future, diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index edd4caa39970e..7d44dfb0e03b1 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -19,6 +19,7 @@ use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; use risingwave_expr::expr::{BoxedExpression, Expression}; +use risingwave_expr::expr_context::vnode_count; use risingwave_expr::{build_function, Result}; #[derive(Debug)] @@ -43,8 +44,7 @@ impl Expression for VnodeExpression { } async fn eval(&self, input: &DataChunk) -> Result { - // TODO(var-vnode): get vnode count from context - let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, VirtualNode::COUNT); + let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, vnode_count()); let mut builder = I16ArrayBuilder::new(input.capacity()); vnodes .into_iter() @@ -53,9 +53,8 @@ impl Expression for VnodeExpression { } async fn eval_row(&self, input: &OwnedRow) -> Result { - // TODO(var-vnode): get vnode count from context Ok(Some( - VirtualNode::compute_row(input, &self.dist_key_indices, VirtualNode::COUNT) + VirtualNode::compute_row(input, &self.dist_key_indices, vnode_count()) .to_scalar() .into(), )) @@ -65,12 +64,13 @@ impl Expression for VnodeExpression { #[cfg(test)] mod tests { use risingwave_common::array::{DataChunk, DataChunkTestExt}; - use risingwave_common::hash::VirtualNode; use risingwave_common::row::Row; use risingwave_expr::expr::build_from_pretty; + use risingwave_expr::expr_context::VNODE_COUNT; #[tokio::test] async fn test_vnode_expr_eval() { + let vnode_count = 32; let expr = build_from_pretty("(vnode:int2 $0:int4 $0:int8 $0:varchar)"); let input = DataChunk::from_pretty( "i I T @@ -80,17 +80,21 @@ mod tests { ); // test eval - let output = expr.eval(&input).await.unwrap(); + let output = VNODE_COUNT::scope(vnode_count, expr.eval(&input)) + .await + .unwrap(); for vnode in output.iter() { let vnode = vnode.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..vnode_count as i16).contains(&vnode)); } // test eval_row for row in input.rows() { - let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); + let result = VNODE_COUNT::scope(vnode_count, expr.eval_row(&row.to_owned_row())) + .await + .unwrap(); let vnode = result.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..vnode_count as i16).contains(&vnode)); } } } diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 8c3dbd6e419cd..57aedc0e1490f 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -14,9 +14,8 @@ use pgwire::pg_response::StatementType; use risingwave_common::bail; -use risingwave_common::hash::VirtualNode; use risingwave_pb::meta::table_parallelism::{ - AdaptiveParallelism, FixedParallelism, Parallelism, PbParallelism, + AdaptiveParallelism, FixedParallelism, PbParallelism, }; use risingwave_pb::meta::{PbTableParallelism, TableParallelism}; use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value}; @@ -93,41 +92,10 @@ pub async fn handle_alter_parallelism( } }; - let mut target_parallelism = extract_table_parallelism(parallelism)?; - - let available_parallelism = session - .env() - .worker_node_manager() - .list_worker_nodes() - .iter() - .filter(|w| w.is_streaming_schedulable()) - .map(|w| w.parallelism) - .sum::(); - // TODO(var-vnode): get max parallelism from catalogs. - // Although the meta service will clamp the value for us, we should still check it here for better UI. - let max_parallelism = VirtualNode::COUNT; + let target_parallelism = extract_table_parallelism(parallelism)?; let mut builder = RwPgResponse::builder(stmt_type); - match &target_parallelism.parallelism { - Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => { - if available_parallelism > max_parallelism as u32 { - builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {max_parallelism}")); - } - } - Some(Parallelism::Fixed(FixedParallelism { parallelism })) => { - if *parallelism > max_parallelism as u32 { - builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({max_parallelism})")); - target_parallelism = PbTableParallelism { - parallelism: Some(PbParallelism::Fixed(FixedParallelism { - parallelism: max_parallelism as u32, - })), - }; - } - } - _ => {} - }; - let catalog_writer = session.catalog_writer()?; catalog_writer .alter_parallelism(table_id, target_parallelism, deferred) diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index ce08a8f0eb441..b671d0792e073 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -14,7 +14,6 @@ mod graph; use graph::*; -use risingwave_common::hash::VirtualNode; use risingwave_common::util::recursive::{self, Recurse as _}; use risingwave_connector::WithPropertiesExt; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; @@ -145,8 +144,7 @@ pub fn build_graph(plan_node: PlanRef) -> SchedulerResult Deref for JavaBindingIterator<'a> { } #[no_mangle] -extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint { - // TODO(var-vnode): vnode count can vary for different tables. - VirtualNode::COUNT as jint +extern "system" fn Java_com_risingwave_java_binding_Binding_defaultVnodeCount( + _env: EnvParam<'_>, +) -> jint { + VirtualNode::COUNT_FOR_COMPAT as jint } #[cfg_or_panic(not(madsim))] diff --git a/src/jni_core/src/macros.rs b/src/jni_core/src/macros.rs index 2e7d095e0bd45..e890232855136 100644 --- a/src/jni_core/src/macros.rs +++ b/src/jni_core/src/macros.rs @@ -385,7 +385,7 @@ macro_rules! to_jvalue { /// gen_jni_sig!(boolean f(int, java.lang.String)), /// "(ILjava/lang/String;)Z" /// ); -/// assert_eq!(gen_jni_sig!(public static native int vnodeCount()), "()I"); +/// assert_eq!(gen_jni_sig!(public static native int defaultVnodeCount()), "()I"); /// assert_eq!( /// gen_jni_sig!(long hummockIteratorNew(byte[] readPlan)), /// "([B)J" @@ -445,7 +445,7 @@ macro_rules! for_all_plain_native_methods { public static native boolean tracingSlf4jEventEnabled(int level); - public static native int vnodeCount(); + public static native int defaultVnodeCount(); static native long iteratorNewStreamChunk(long pointer); @@ -861,7 +861,7 @@ mod tests { (test) => {{ for_all_native_methods! { { - public static native int vnodeCount(); + public static native int defaultVnodeCount(); static native long hummockIteratorNew(byte[] readPlan); public static native byte[] rowGetKey(long pointer); }, @@ -885,7 +885,7 @@ mod tests { assert_eq!( sig, [ - ("vnodeCount", "()I"), + ("defaultVnodeCount", "()I"), ("hummockIteratorNew", "([B)J"), ("rowGetKey", "(J)[B") ] @@ -902,7 +902,7 @@ mod tests { [ tracingSlf4jEvent (Ljava/lang/String;Ljava/lang/String;ILjava/lang/String;)V, tracingSlf4jEventEnabled (I)Z, - vnodeCount ()I, + defaultVnodeCount ()I, iteratorNewStreamChunk (J)J, iteratorNext (J)Z, iteratorClose (J)V, diff --git a/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs b/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs index 0f93c9e3dc3de..4a30b0828f9ba 100644 --- a/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs +++ b/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs @@ -15,6 +15,7 @@ impl MigrationTrait for Migration { .add_column( ColumnDef::new(Table::VnodeCount) .integer() + .not_null() .default(VNODE_COUNT), ) .to_owned(), @@ -28,6 +29,7 @@ impl MigrationTrait for Migration { .add_column( ColumnDef::new(Fragment::VnodeCount) .integer() + .not_null() .default(VNODE_COUNT), ) .to_owned(), diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index b29c4cc207ebb..721e6da5fc7ff 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -56,7 +56,9 @@ use crate::controller::utils::{ get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors, FragmentDesc, PartialActorLocation, PartialFragmentStateTables, }; -use crate::manager::{ActorInfos, InflightFragmentInfo, LocalNotification}; +use crate::manager::{ + ActorInfos, FragmentParallelismInfo, InflightFragmentInfo, LocalNotification, +}; use crate::model::{TableFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -475,8 +477,9 @@ impl CatalogController { pub async fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> MetaResult> { + ) -> MetaResult> { let inner = self.inner.read().await; + let mut select = Actor::find() .select_only() .column(actor::Column::FragmentId) @@ -485,11 +488,25 @@ impl CatalogController { if let Some(id_filter) = id_filter { select = select.having(actor::Column::FragmentId.is_in(id_filter)); } - let fragment_parallelisms: Vec<(FragmentId, i64)> = + select = select + .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) + .column(fragment::Column::DistributionType) + .column(fragment::Column::VnodeCount); + + let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> = select.into_tuple().all(&inner.db).await?; Ok(fragment_parallelisms .into_iter() - .map(|(fragment_id, count)| (fragment_id, count as usize)) + .map(|(fragment_id, count, distribution_type, vnode_count)| { + ( + fragment_id, + FragmentParallelismInfo { + distribution_type: distribution_type.into(), + actor_count: count as usize, + vnode_count: vnode_count as usize, + }, + ) + }) .collect()) } diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 8aeaed2f9c5a8..2807980236c66 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -27,9 +27,13 @@ use crate::storage::MetaStoreError; pub type MetaResult = std::result::Result; #[derive( - thiserror::Error, thiserror_ext::ReportDebug, thiserror_ext::Arc, thiserror_ext::Construct, + thiserror::Error, + thiserror_ext::ReportDebug, + thiserror_ext::Arc, + thiserror_ext::Construct, + thiserror_ext::Macro, )] -#[thiserror_ext(newtype(name = MetaError, backtrace))] +#[thiserror_ext(newtype(name = MetaError, backtrace), macro(path = "crate::error"))] pub enum MetaErrorInner { #[error("MetaStore transaction error: {0}")] TransactionError( @@ -66,7 +70,7 @@ pub enum MetaErrorInner { InvalidWorker(WorkerId, String), #[error("Invalid parameter: {0}")] - InvalidParameter(String), + InvalidParameter(#[message] String), // Used for catalog errors. #[error("{0} id not found: {1}")] @@ -80,7 +84,7 @@ pub enum MetaErrorInner { Duplicated(&'static str, String), #[error("Service unavailable: {0}")] - Unavailable(String), + Unavailable(#[message] String), #[error("Election failed: {0}")] Election(#[source] BoxedError), diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 6ec70a4b8d286..07f73fbc0afeb 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, VnodeCountCompat, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::util::stream_graph_visitor::{ visit_stream_node, visit_stream_node_cont, visit_stream_node_cont_mut, }; @@ -136,7 +136,7 @@ impl FragmentManagerCore { fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> HashMap { + ) -> HashMap { self.table_fragments .values() .filter(|tf| tf.state() != State::Initial) @@ -148,13 +148,14 @@ impl FragmentManagerCore { return None; } - let parallelism = match fragment.get_distribution_type().unwrap() { - FragmentDistributionType::Unspecified => unreachable!(), - FragmentDistributionType::Single => 1, - FragmentDistributionType::Hash => fragment.get_actors().len(), - }; - - Some((fragment.fragment_id, parallelism)) + Some(( + fragment.fragment_id, + FragmentParallelismInfo { + distribution_type: fragment.get_distribution_type().unwrap(), + actor_count: fragment.actors.len(), + vnode_count: fragment.vnode_count(), + }, + )) }) }) .collect() @@ -189,6 +190,13 @@ impl ActorInfos { } } +#[derive(Clone, Debug)] +pub struct FragmentParallelismInfo { + pub distribution_type: FragmentDistributionType, + pub actor_count: usize, + pub vnode_count: usize, +} + pub type FragmentManagerRef = Arc; impl FragmentManager { @@ -1678,7 +1686,7 @@ impl FragmentManager { pub async fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> HashMap { + ) -> HashMap { self.core .read() .await diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 4e0ca9e77f4b8..78df862e1d8e5 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -31,6 +31,7 @@ use tokio::sync::oneshot; use tokio::time::{sleep, Instant}; use tracing::warn; +use super::FragmentParallelismInfo; use crate::barrier::Reschedule; use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo}; @@ -436,15 +437,12 @@ impl MetadataManager { pub async fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> MetaResult> { + ) -> MetaResult> { match self { MetadataManager::V1(mgr) => Ok(mgr .fragment_manager .running_fragment_parallelisms(id_filter) - .await - .into_iter() - .map(|(k, v)| (k as FragmentId, v)) - .collect()), + .await), MetadataManager::V2(mgr) => { let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect()); Ok(mgr diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 56e8edbf1d780..0d8ea1c2ae38c 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -68,6 +68,7 @@ use tracing::log::warn; use tracing::Instrument; use crate::barrier::BarrierManagerRef; +use crate::error::{bail_invalid_parameter, bail_unavailable}; use crate::manager::{ CatalogManagerRef, ConnectionId, DatabaseId, DdlType, FragmentManagerRef, FunctionId, IdCategory, IdCategoryType, IndexId, LocalNotification, MetaSrvEnv, MetadataManager, @@ -1540,33 +1541,60 @@ impl DdlController { Ok(version) } + /// Resolve the parallelism of the stream job based on the given information. + /// + /// Returns error if user specifies a parallelism that cannot be satisfied. fn resolve_stream_parallelism( &self, - specified_parallelism: Option, + specified: Option, + max: NonZeroUsize, cluster_info: &StreamingClusterInfo, ) -> MetaResult { - let available_parallelism = cluster_info.parallelism(); - if available_parallelism == 0 { - return Err(MetaError::unavailable("No available slots to schedule")); - } + let available = cluster_info.parallelism(); + let Some(available) = NonZeroUsize::new(available) else { + bail_unavailable!("no available slots to schedule"); + }; - let available_parallelism = NonZeroUsize::new(available_parallelism).unwrap(); + if let Some(specified) = specified { + if specified > max { + bail_invalid_parameter!( + "specified parallelism {} should not exceed max parallelism {}", + specified, + max, + ); + } + if specified > available { + bail_unavailable!( + "not enough parallelism to schedule, required: {}, available: {}", + specified, + available, + ); + } + Ok(specified) + } else { + // Use configured parallelism if no default parallelism is specified. + let default_parallelism = match self.env.opts.default_parallelism { + DefaultParallelism::Full => available, + DefaultParallelism::Default(num) => { + if num > available { + bail_unavailable!( + "not enough parallelism to schedule, required: {}, available: {}", + num, + available, + ); + } + num + } + }; - // Use configured parallelism if no default parallelism is specified. - let parallelism = - specified_parallelism.unwrap_or_else(|| match &self.env.opts.default_parallelism { - DefaultParallelism::Full => available_parallelism, - DefaultParallelism::Default(num) => *num, - }); - - if parallelism > available_parallelism { - return Err(MetaError::unavailable(format!( - "Not enough parallelism to schedule, required: {}, available: {}", - parallelism, available_parallelism - ))); + if default_parallelism > max { + tracing::warn!( + "too many parallelism available, use max parallelism {} instead", + max + ); + } + Ok(default_parallelism.min(max)) } - - Ok(parallelism) } /// Builds the actor graph: @@ -1631,12 +1659,8 @@ impl DdlController { // 2. Build the actor graph. let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; - let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?; - let parallelism_limited = parallelism > max_parallelism; - if parallelism_limited { - tracing::warn!("Too many parallelism, use {} instead", max_parallelism); - } - let parallelism = parallelism.min(max_parallelism); + let parallelism = + self.resolve_stream_parallelism(specified_parallelism, max_parallelism, &cluster_info)?; let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?; @@ -1659,10 +1683,6 @@ impl DdlController { // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE. // Otherwise, it defaults to FIXED based on deduction. let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) { - (None, DefaultParallelism::Full) if parallelism_limited => { - tracing::warn!("Parallelism limited to {max_parallelism} in ADAPTIVE mode"); - TableParallelism::Adaptive - } (None, DefaultParallelism::Full) => TableParallelism::Adaptive, _ => TableParallelism::Fixed(parallelism.get()), }; diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 3d7ab9888362f..c7e39ca8a0a8d 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -16,15 +16,18 @@ use std::collections::HashMap; use std::sync::Arc; use parking_lot::RwLock; -use risingwave_common::hash::{VirtualNode, WorkerSlotMapping}; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings}; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; -use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef}; +use crate::manager::{ + FragmentParallelismInfo, LocalNotification, MetadataManager, NotificationManagerRef, +}; use crate::model::FragmentId; pub type ServingVnodeMappingRef = Arc; @@ -43,22 +46,21 @@ impl ServingVnodeMapping { /// Returns (successful updates, failed updates). pub fn upsert( &self, - streaming_parallelisms: HashMap, + streaming_parallelisms: HashMap, workers: &[WorkerNode], ) -> (HashMap, Vec) { let mut serving_vnode_mappings = self.serving_vnode_mappings.write(); let mut upserted: HashMap = HashMap::default(); let mut failed: Vec = vec![]; - for (fragment_id, streaming_parallelism) in streaming_parallelisms { + for (fragment_id, info) in streaming_parallelisms { let new_mapping = { let old_mapping = serving_vnode_mappings.get(&fragment_id); - let max_parallelism = if streaming_parallelism == 1 { - Some(1) - } else { - None + let max_parallelism = match info.distribution_type { + FragmentDistributionType::Unspecified => unreachable!(), + FragmentDistributionType::Single => Some(1), + FragmentDistributionType::Hash => None, }; - // TODO(var-vnode): also fetch vnode count for each fragment - place_vnode(old_mapping, workers, max_parallelism, VirtualNode::COUNT) + place_vnode(old_mapping, workers, max_parallelism, info.vnode_count) }; match new_mapping { None => { @@ -129,7 +131,10 @@ pub async fn on_meta_start( async fn fetch_serving_infos( metadata_manager: &MetadataManager, -) -> (Vec, HashMap) { +) -> ( + Vec, + HashMap, +) { match metadata_manager { MetadataManager::V1(mgr) => ( mgr.cluster_manager @@ -155,7 +160,7 @@ async fn fetch_serving_infos( serving_compute_nodes, parallelisms .into_iter() - .map(|(fragment_id, cnt)| (fragment_id as FragmentId, cnt)) + .map(|(fragment_id, info)| (fragment_id as FragmentId, info)) .collect(), ) } @@ -193,9 +198,9 @@ pub async fn start_serving_vnode_mapping_worker( continue; } let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await; - let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id|{ + let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| { match streaming_parallelisms.get(frag_id) { - Some(parallelism) => Some((*frag_id, *parallelism)), + Some(info) => Some((*frag_id, info.clone())), None => { tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found"); None diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index c2891d81b81ab..58e731b83bc09 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2244,8 +2244,6 @@ impl ScaleController { } TableParallelism::Fixed(mut n) => { if n > max_parallelism { - // This should be unreachable as it was already checked and rewritten in the frontend. - // We still intercept it to prevent accidental modifications. tracing::warn!("specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"); n = max_parallelism } diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index f67d8547e28a8..55ae23171bdf2 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -152,11 +152,11 @@ impl Distribution { } /// Get the vnode count of the distribution. - // TODO(var-vnode): after `ServingVnodeMapping::upsert` is made vnode-count-aware, - // we may return 1 for singleton. + /// + /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton. pub fn vnode_count(&self) -> usize { match self { - Distribution::Singleton(_) => VirtualNode::COUNT, + Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT, Distribution::Hash(mapping) => mapping.len(), } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 118252038dbbd..f12de49cccbbe 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -19,6 +19,7 @@ use futures::future::join_all; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::TableId; +use risingwave_common::hash::VirtualNode; use risingwave_meta_model_v2::ObjectId; use risingwave_pb::catalog::{CreateType, Subscription, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; @@ -33,6 +34,7 @@ use crate::barrier::{ BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan, SnapshotBackfillInfo, }; +use crate::error::bail_invalid_parameter; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob}; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; use crate::stream::SourceManagerRef; @@ -666,14 +668,47 @@ impl GlobalStreamManager { let worker_nodes = self .metadata_manager .list_active_streaming_compute_nodes() - .await?; + .await? + .into_iter() + .filter(|w| w.is_streaming_schedulable()) + .collect_vec(); let worker_ids = worker_nodes .iter() - .filter(|w| w.property.as_ref().map_or(true, |p| !p.is_unschedulable)) .map(|node| node.id) .collect::>(); + // Check if the provided parallelism is valid. + let available_parallelism = worker_nodes + .iter() + .map(|w| w.parallelism as usize) + .sum::(); + // TODO(var-vnode): get correct max parallelism from catalogs. + let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; + + match parallelism { + TableParallelism::Adaptive => { + if available_parallelism > max_parallelism { + tracing::warn!( + "too many parallelism available, use max parallelism {} will be limited", + max_parallelism + ); + } + } + TableParallelism::Fixed(parallelism) => { + if parallelism > max_parallelism { + bail_invalid_parameter!( + "specified parallelism {} should not exceed max parallelism {}", + parallelism, + max_parallelism + ); + } + } + TableParallelism::Custom => { + bail_invalid_parameter!("should not alter parallelism to custom") + } + } + let table_parallelism_assignment = HashMap::from([(TableId::new(table_id), parallelism)]); if deferred { diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 25bdd54df720f..45870257bc04e 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -94,7 +94,7 @@ where concurrent_uploading_sst_count: Option, ) -> Self { // TODO(var-vnode): should use value from caller - let vnode_count = VirtualNode::COUNT; + let vnode_count = VirtualNode::COUNT_FOR_COMPAT; Self { builder_factory, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index ec7cc62f2d49c..6e020536cfb99 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -201,7 +201,7 @@ impl LogStoreRowSerde { let vnodes = match vnodes { Some(vnodes) => vnodes, - None => Bitmap::singleton().into(), + None => Bitmap::singleton_arc().clone(), }; // epoch and seq_id. The seq_id of barrier is set null, and therefore the second order type diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index b1b167567ed33..0aa5ebca9007f 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -19,13 +19,16 @@ use std::sync::{Arc, LazyLock}; use anyhow::anyhow; use await_tree::InstrumentAwait; use futures::future::join_all; +use futures::FutureExt; use hytra::TrAdder; +use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::config::StreamingConfig; +use risingwave_common::hash::VirtualNode; use risingwave_common::log::LogSuppresser; use risingwave_common::metrics::{IntGaugeExt, GLOBAL_ERROR_METRICS}; use risingwave_common::util::epoch::EpochPair; -use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID}; +use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID, VNODE_COUNT}; use risingwave_expr::ExprError; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::stream_plan::PbStreamActor; @@ -44,6 +47,7 @@ use crate::task::{ActorId, LocalBarrierManager}; pub struct ActorContext { pub id: ActorId, pub fragment_id: u32, + pub vnode_count: usize, pub mview_definition: String, // TODO(eric): these seem to be useless now? @@ -71,6 +75,7 @@ impl ActorContext { Arc::new(Self { id, fragment_id: 0, + vnode_count: VirtualNode::COUNT_FOR_TEST, mview_definition: "".to_string(), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), @@ -97,6 +102,10 @@ impl ActorContext { id: stream_actor.actor_id, fragment_id: stream_actor.fragment_id, mview_definition: stream_actor.mview_definition.clone(), + vnode_count: (stream_actor.vnode_bitmap.as_ref()) + // An unset `vnode_bitmap` means the actor is a singleton. + // For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton. + .map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val, @@ -177,18 +186,26 @@ where #[inline(always)] pub async fn run(mut self) -> StreamResult<()> { - FRAGMENT_ID::scope( - self.actor_context.fragment_id, - expr_context_scope(self.expr_context.clone(), async move { - tokio::join!( - // Drive the subtasks concurrently. - join_all(std::mem::take(&mut self.subtasks)), - self.run_consumer(), - ) - .1 - }), - ) - .await + let expr_context = self.expr_context.clone(); + let fragment_id = self.actor_context.fragment_id; + let vnode_count = self.actor_context.vnode_count; + + let run = async move { + tokio::join!( + // Drive the subtasks concurrently. + join_all(std::mem::take(&mut self.subtasks)), + self.run_consumer(), + ) + .1 + } + .boxed(); + + // Attach contexts to the future. + let run = expr_context_scope(expr_context, run); + let run = FRAGMENT_ID::scope(fragment_id, run); + let run = VNODE_COUNT::scope(vnode_count, run); + + run.await } async fn run_consumer(self) -> StreamResult<()> { diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index bef6b1ca1f1c2..1604de034b6da 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -145,7 +145,7 @@ async fn test_streaming_parallelism_index() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, true, @@ -177,7 +177,7 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100; @@ -190,19 +190,20 @@ async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { .await? .assert_result_eq("FIXED(1)"); - session + let result = session .run(format!("alter table t set parallelism = {}", vnode_max + 1)) - .await?; - session - .run("select parallelism from rw_streaming_parallelism where name = 't'") - .await? - .assert_result_eq(format!("FIXED({})", vnode_max)); + .await; + + // This should be rejected. + // TODO(var-vnode): showing that it's rejected for different vnode counts. + assert!(result.is_err(), "{result:?}"); + Ok(()) } #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100;