Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: user-facing part of variable vnode count #18515

Merged
merged 13 commits into from
Sep 26, 2024
Merged
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ user sink_decouple
user source_rate_limit
user standard_conforming_strings
user statement_timeout
user streaming_max_parallelism
user streaming_parallelism
user streaming_use_arrangement_backfill
user streaming_use_snapshot_backfill
Expand Down
4 changes: 2 additions & 2 deletions java/com_risingwave_java_binding_Binding.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public static void main(String[] args) {
HummockVersion version = metaClient.pinVersion();
Table tableCatalog = metaClient.getTable(dbName, tableName);

int vnodeCount = Binding.vnodeCount();
int vnodeCount = Binding.defaultVnodeCount();
if (tableCatalog.hasMaybeVnodeCount()) {
vnodeCount = tableCatalog.getMaybeVnodeCount();
}

List<Integer> vnodeList = new ArrayList<>();
for (int i = 0; i < vnodeCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ public static native void tracingSlf4jEvent(

public static native boolean tracingSlf4jEventEnabled(int level);

public static native int vnodeCount();
/**
* Used to get the default number of vnodes for a table, if its `maybeVnodeCount` field is not
* set.
*/
public static native int defaultVnodeCount();

static native long iteratorNewStreamChunk(long pointer);

Expand Down
4 changes: 2 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ pub struct MetaConfig {
pub do_not_config_object_storage_lifecycle: bool,

/// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically.
/// Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table.
/// Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table.
#[serde(default = "default::meta::partition_vnode_count")]
pub partition_vnode_count: u32,

Expand Down Expand Up @@ -351,7 +351,7 @@ pub struct MetaConfig {

/// Count of partitions of tables in default group and materialized view group.
/// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment.
/// Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table.
/// Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table.
/// Set it zero to disable this feature.
#[serde(default = "default::meta::hybrid_partition_vnode_count")]
pub hybrid_partition_vnode_count: u32,
Expand Down
24 changes: 17 additions & 7 deletions src/common/src/hash/consistent_hash/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
// limitations under the License.

use std::ops::RangeInclusive;
use std::sync::{Arc, LazyLock};

use crate::bitmap::Bitmap;
use crate::bitmap::{Bitmap, BitmapBuilder};
use crate::hash::table_distribution::SINGLETON_VNODE;
use crate::hash::VirtualNode;

Expand All @@ -39,15 +40,24 @@ impl Bitmap {
}

/// Returns whether only the [`SINGLETON_VNODE`] is set in the bitmap.
///
/// Note that this method returning `true` does not imply that the bitmap was created by
/// [`VnodeBitmapExt::singleton`], or that the bitmap has length 1.
pub fn is_singleton(&self) -> bool {
self.count_ones() == 1 && self.iter_vnodes().next().unwrap() == SINGLETON_VNODE
}

/// Creates a bitmap with length 1 and the single bit set.
pub fn singleton() -> Self {
Self::ones(1)
/// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length
/// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1.
pub fn singleton() -> &'static Self {
Self::singleton_arc()
}

/// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length
/// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1.
pub fn singleton_arc() -> &'static Arc<Self> {
static SINGLETON: LazyLock<Arc<Bitmap>> = LazyLock::new(|| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT);
builder.set(SINGLETON_VNODE.to_index(), true);
builder.finish().into()
});
&SINGLETON
}
}
4 changes: 2 additions & 2 deletions src/common/src/hash/consistent_hash/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::vnode::VirtualNode;

/// A trait for accessing the vnode count field with backward compatibility.
pub trait VnodeCountCompat {
/// Returns the vnode count, or [`VirtualNode::COUNT`] if the vnode count is not set,
/// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set,
/// typically for backward compatibility.
///
/// See the documentation on the field of the implementing type for more details.
Expand All @@ -38,7 +38,7 @@ macro_rules! impl_maybe_vnode_count_compat {
impl VnodeCountCompat for $ty {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT, |v| v as _)
.map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _)
}
}
)*
Expand Down
5 changes: 3 additions & 2 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
}

/// Create a vnode mapping with the single item. Should only be used for singletons.
// TODO(var-vnode): make vnode count 1, also `Distribution::vnode_count`
///
/// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count.
pub fn new_single(item: T::Item) -> Self {
Self::new_uniform(std::iter::once(item), VirtualNode::COUNT)
Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT)
}

/// The length (or count) of the vnode in this mapping.
Expand Down
19 changes: 11 additions & 8 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,22 @@ impl Crc32HashCode {
}

impl VirtualNode {
/// The total count of virtual nodes.
// TODO(var-vnode): remove this and only keep `COUNT_FOR_TEST`
pub const COUNT: usize = 1 << 8;
/// The maximum value of the virtual node.
// TODO(var-vnode): remove this and only keep `MAX_FOR_TEST`
pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1);
/// The total count of virtual nodes, for compatibility purposes **ONLY**.
///
/// Typical use cases:
///
/// - As the default value for the session configuration.
/// - As the vnode count for all streaming jobs, fragments, and tables that were created before
/// the variable vnode count support was introduced.
/// - As the vnode count for singletons.
pub const COUNT_FOR_COMPAT: usize = 1 << 8;
}

impl VirtualNode {
/// The total count of virtual nodes, for testing purposes.
pub const COUNT_FOR_TEST: usize = Self::COUNT;
pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT;
/// The maximum value of the virtual node, for testing purposes.
pub const MAX_FOR_TEST: VirtualNode = Self::MAX;
pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1);
}

impl VirtualNode {
Expand Down
9 changes: 3 additions & 6 deletions src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::mem::replace;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;

use itertools::Itertools;
use risingwave_pb::plan_common::StorageTableDesc;
Expand Down Expand Up @@ -74,7 +74,7 @@ impl TableDistribution {
) -> Self {
let compute_vnode = if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk {
ComputeVnode::VnodeColumnIndex {
vnodes: vnodes.unwrap_or_else(|| Bitmap::singleton().into()),
vnodes: vnodes.unwrap_or_else(|| Bitmap::singleton_arc().clone()),
vnode_col_idx_in_pk,
}
} else if !dist_key_in_pk_indices.is_empty() {
Expand Down Expand Up @@ -132,13 +132,10 @@ impl TableDistribution {

/// Get vnode bitmap if distributed, or a dummy [`Bitmap::singleton()`] if singleton.
pub fn vnodes(&self) -> &Arc<Bitmap> {
static SINGLETON_VNODES: LazyLock<Arc<Bitmap>> =
LazyLock::new(|| Bitmap::singleton().into());

match &self.compute_vnode {
ComputeVnode::DistKeyIndices { vnodes, .. } => vnodes,
ComputeVnode::VnodeColumnIndex { vnodes, .. } => vnodes,
ComputeVnode::Singleton => &SINGLETON_VNODES,
ComputeVnode::Singleton => Bitmap::singleton_arc(),
}
}

Expand Down
33 changes: 31 additions & 2 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use serde_with::{serde_as, DisplayFromStr};
use thiserror::Error;

use self::non_zero64::ConfigNonZeroU64;
use crate::hash::VirtualNode;
use crate::session_config::sink_decouple::SinkDecouple;
use crate::session_config::transaction_isolation_level::IsolationLevel;
pub use crate::session_config::visibility_mode::VisibilityMode;
Expand Down Expand Up @@ -139,8 +140,11 @@ pub struct SessionConfig {
#[parameter(default = "UTC", check_hook = check_timezone)]
timezone: String,

/// If `STREAMING_PARALLELISM` is non-zero, CREATE MATERIALIZED VIEW/TABLE/INDEX will use it as
/// streaming parallelism.
/// The execution parallelism for streaming queries, including tables, materialized views, indexes,
/// and sinks. Defaults to 0, which means they will be scheduled adaptively based on the cluster size.
///
/// If a non-zero value is set, streaming queries will be scheduled to use a fixed number of parallelism.
/// Note that the value will be bounded at `STREAMING_MAX_PARALLELISM`.
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = ConfigNonZeroU64::default())]
streaming_parallelism: ConfigNonZeroU64,
Expand Down Expand Up @@ -298,6 +302,18 @@ pub struct SessionConfig {
/// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
#[parameter(default = false)]
bypass_cluster_limits: bool,

/// The maximum number of parallelism a streaming query can use. Defaults to 256.
///
/// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures
/// the maximum parallelism a streaming query can use in the future, if the cluster size changes or
/// users manually change the parallelism with `ALTER .. SET PARALLELISM`.
///
/// It's not always a good idea to set this to a very large number, as it may cause performance
/// degradation when performing range scans on the table or the materialized view.
// a.k.a. vnode count
#[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_vnode_count)]
streaming_max_parallelism: usize,
}

fn check_timezone(val: &str) -> Result<(), String> {
Expand All @@ -324,6 +340,19 @@ fn check_bytea_output(val: &str) -> Result<(), String> {
}
}

/// Check if the provided value is a valid vnode count.
/// Note that we use term `max_parallelism` when it's user-facing.
fn check_vnode_count(val: &usize) -> Result<(), String> {
match val {
0 => Err("STREAMING_MAX_PARALLELISM must be greater than 0".to_owned()),
1..=VirtualNode::MAX_COUNT => Ok(()),
_ => Err(format!(
"STREAMING_MAX_PARALLELISM must be less than or equal to {}",
VirtualNode::MAX_COUNT
)),
}
}

impl SessionConfig {
pub fn set_force_two_phase_agg(
&mut self,
Expand Down
4 changes: 2 additions & 2 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ This page is automatically generated by `./risedev generate-example-config`
| full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 |
| hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 |
| hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 |
| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 |
| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 |
| max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 |
| meta_leader_lease_secs | | 30 |
| min_delta_log_num_for_hummock_version_checkpoint | The minimum delta log number a new checkpoint should compact, otherwise the checkpoint attempt is rejected. | 10 |
Expand All @@ -52,7 +52,7 @@ This page is automatically generated by `./risedev generate-example-config`
| parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 |
| parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 |
| parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 |
| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 |
| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 |
| periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 |
| periodic_scheduling_compaction_group_interval_sec | | 10 |
| periodic_space_reclaim_compaction_interval_sec | Schedule `space_reclaim` compaction for all compaction groups with this interval. | 3600 |
Expand Down
9 changes: 9 additions & 0 deletions src/expr/core/src/expr_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,29 @@

use std::future::Future;

use risingwave_common::hash::VirtualNode;
use risingwave_expr::{define_context, Result as ExprResult};
use risingwave_pb::plan_common::ExprContext;

// For all execution mode.
define_context! {
pub TIME_ZONE: String,
pub FRAGMENT_ID: u32,
pub VNODE_COUNT: usize,
}

pub fn capture_expr_context() -> ExprResult<ExprContext> {
let time_zone = TIME_ZONE::try_with(ToOwned::to_owned)?;
Ok(ExprContext { time_zone })
}

/// Get the vnode count from the context, or [`VirtualNode::COUNT_FOR_COMPAT`] if not set.
// TODO(var-vnode): the only case where this is not set is for batch queries, is it still
// necessary to support `rw_vnode` expression in batch queries?
pub fn vnode_count() -> usize {
VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT_FOR_COMPAT)
}

pub async fn expr_context_scope<Fut>(expr_context: ExprContext, future: Fut) -> Fut::Output
where
Fut: Future,
Expand Down
22 changes: 13 additions & 9 deletions src/expr/impl/src/scalar/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::hash::VirtualNode;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::expr::{BoxedExpression, Expression};
use risingwave_expr::expr_context::vnode_count;
use risingwave_expr::{build_function, Result};

#[derive(Debug)]
Expand All @@ -43,8 +44,7 @@ impl Expression for VnodeExpression {
}

async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
// TODO(var-vnode): get vnode count from context
let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, VirtualNode::COUNT);
let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, vnode_count());
let mut builder = I16ArrayBuilder::new(input.capacity());
vnodes
.into_iter()
Expand All @@ -53,9 +53,8 @@ impl Expression for VnodeExpression {
}

async fn eval_row(&self, input: &OwnedRow) -> Result<Datum> {
// TODO(var-vnode): get vnode count from context
Ok(Some(
VirtualNode::compute_row(input, &self.dist_key_indices, VirtualNode::COUNT)
VirtualNode::compute_row(input, &self.dist_key_indices, vnode_count())
.to_scalar()
.into(),
))
Expand All @@ -65,12 +64,13 @@ impl Expression for VnodeExpression {
#[cfg(test)]
mod tests {
use risingwave_common::array::{DataChunk, DataChunkTestExt};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::Row;
use risingwave_expr::expr::build_from_pretty;
use risingwave_expr::expr_context::VNODE_COUNT;

#[tokio::test]
async fn test_vnode_expr_eval() {
let vnode_count = 32;
let expr = build_from_pretty("(vnode:int2 $0:int4 $0:int8 $0:varchar)");
let input = DataChunk::from_pretty(
"i I T
Expand All @@ -80,17 +80,21 @@ mod tests {
);

// test eval
let output = expr.eval(&input).await.unwrap();
let output = VNODE_COUNT::scope(vnode_count, expr.eval(&input))
.await
.unwrap();
for vnode in output.iter() {
let vnode = vnode.unwrap().into_int16();
assert!((0..VirtualNode::COUNT as i16).contains(&vnode));
assert!((0..vnode_count as i16).contains(&vnode));
}

// test eval_row
for row in input.rows() {
let result = expr.eval_row(&row.to_owned_row()).await.unwrap();
let result = VNODE_COUNT::scope(vnode_count, expr.eval_row(&row.to_owned_row()))
.await
.unwrap();
let vnode = result.unwrap().into_int16();
assert!((0..VirtualNode::COUNT as i16).contains(&vnode));
assert!((0..vnode_count as i16).contains(&vnode));
}
}
}
Loading
Loading