-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: configurable vnode count with env var #18161
Changes from all commits
0c63f01
69b6d7e
560745c
f708314
f3dc336
3a87132
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,9 @@ | |
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use std::num::NonZero; | ||
use std::sync::LazyLock; | ||
|
||
use itertools::Itertools; | ||
use parse_display::Display; | ||
|
||
|
@@ -31,44 +34,82 @@ pub struct VirtualNode(VirtualNodeInner); | |
|
||
/// The internal representation of a virtual node id. | ||
type VirtualNodeInner = u16; | ||
static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32); | ||
// static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32); | ||
|
||
impl From<Crc32HashCode> for VirtualNode { | ||
fn from(hash_code: Crc32HashCode) -> Self { | ||
// Take the least significant bits of the hash code. | ||
// TODO: should we use the most significant bits? | ||
let inner = (hash_code.value() % Self::COUNT as u64) as VirtualNodeInner; | ||
let inner = (hash_code.value() % Self::count() as u64) as VirtualNodeInner; | ||
VirtualNode(inner) | ||
} | ||
} | ||
|
||
impl VirtualNode { | ||
/// The number of bits used to represent a virtual node. | ||
/// | ||
/// Note: Not all bits of the inner representation are used. One should rely on this constant | ||
/// to determine the count of virtual nodes. | ||
pub const BITS: usize = 8; | ||
/// The total count of virtual nodes. | ||
pub const COUNT: usize = 1 << Self::BITS; | ||
/// We may use `VirtualNode` as a datum in a stream, or store it as a column. | ||
/// Hence this reifies it as a RW datatype. | ||
pub const RW_TYPE: DataType = DataType::Int16; | ||
/// The size of a virtual node in bytes, in memory or serialized representation. | ||
pub const SIZE: usize = std::mem::size_of::<Self>(); | ||
/// The minimum (zero) value of the virtual node. | ||
pub const ZERO: VirtualNode = unsafe { VirtualNode::from_index_unchecked(0) }; | ||
} | ||
|
||
impl VirtualNode { | ||
/// The default count of virtual nodes. | ||
const DEFAULT_COUNT: usize = 1 << 8; | ||
/// The maximum count of virtual nodes, limited by the size of the inner type [`VirtualNodeInner`]. | ||
const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS; | ||
|
||
/// The total count of virtual nodes. | ||
/// | ||
/// It can be customized by the environment variable `RW_VNODE_COUNT`, or defaults to [`Self::DEFAULT_COUNT`]. | ||
pub fn count() -> usize { | ||
// Cache the value to avoid repeated env lookups and parsing. | ||
static COUNT: LazyLock<usize> = LazyLock::new(|| { | ||
if let Ok(count) = std::env::var("RW_VNODE_COUNT") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the Considering the vnode is persisted in storage, this will certainly cause some problems, I think. If so, I would question whether it's a good idea to pass it via env var, perhaps making it a MV argument would be better? Or at least it should be a system parameter, and should always use the first-time initialized value, like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. I had some discussion about these approaches in the original issue. Basically this PR is a quick hack if we want to unblock some users to utilize more resources for their clusters. However, if we find this niche and not that urgent, I agree that we may not merge this PR but directly implement the polished ideas, like per-job configuration or global immutable system parameter. |
||
let count = count | ||
.parse::<NonZero<usize>>() | ||
.expect("`RW_VNODE_COUNT` must be a positive integer") | ||
.get(); | ||
assert!( | ||
count <= VirtualNode::MAX_COUNT, | ||
"`RW_VNODE_COUNT` should not exceed maximum value {}", | ||
VirtualNode::MAX_COUNT | ||
); | ||
// TODO(var-vnode): shall we enforce it to be a power of 2? | ||
count | ||
} else { | ||
VirtualNode::DEFAULT_COUNT | ||
} | ||
}); | ||
|
||
*COUNT | ||
} | ||
|
||
/// The last virtual node in the range. It's derived from [`Self::count`]. | ||
pub fn max() -> VirtualNode { | ||
VirtualNode::from_index(Self::count() - 1) | ||
} | ||
} | ||
|
||
/// An iterator over all virtual nodes. | ||
pub type AllVirtualNodeIter = std::iter::Map<std::ops::Range<usize>, fn(usize) -> VirtualNode>; | ||
|
||
impl VirtualNode { | ||
/// The maximum value of the virtual node. | ||
pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1); | ||
/// We may use `VirtualNode` as a datum in a stream, or store it as a column. | ||
/// Hence this reifies it as a RW datatype. | ||
pub const RW_TYPE: DataType = DataType::Int16; | ||
/// The minimum (zero) value of the virtual node. | ||
pub const ZERO: VirtualNode = VirtualNode::from_index(0); | ||
|
||
/// Creates a virtual node from the `usize` index. | ||
pub const fn from_index(index: usize) -> Self { | ||
debug_assert!(index < Self::COUNT); | ||
pub fn from_index(index: usize) -> Self { | ||
debug_assert!(index < Self::count()); | ||
Self(index as _) | ||
} | ||
|
||
/// Creates a virtual node from the `usize` index without bounds checking. | ||
/// | ||
/// # Safety | ||
/// | ||
/// The caller must ensure that the index is within the range of virtual nodes, | ||
/// i.e., less than [`Self::count`]. | ||
pub const unsafe fn from_index_unchecked(index: usize) -> Self { | ||
Self(index as _) | ||
} | ||
|
||
|
@@ -78,8 +119,8 @@ impl VirtualNode { | |
} | ||
|
||
/// Creates a virtual node from the given scalar representation. Used by `VNODE` expression. | ||
pub const fn from_scalar(scalar: i16) -> Self { | ||
debug_assert!((scalar as usize) < Self::COUNT); | ||
pub fn from_scalar(scalar: i16) -> Self { | ||
debug_assert!((scalar as usize) < Self::count()); | ||
Self(scalar as _) | ||
} | ||
|
||
|
@@ -97,9 +138,9 @@ impl VirtualNode { | |
} | ||
|
||
/// Creates a virtual node from the given big-endian bytes representation. | ||
pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { | ||
pub fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { | ||
let inner = VirtualNodeInner::from_be_bytes(bytes); | ||
debug_assert!((inner as usize) < Self::COUNT); | ||
debug_assert!((inner as usize) < Self::count()); | ||
Self(inner) | ||
} | ||
|
||
|
@@ -110,7 +151,7 @@ impl VirtualNode { | |
|
||
/// Iterates over all virtual nodes. | ||
pub fn all() -> AllVirtualNodeIter { | ||
(0..Self::COUNT).map(Self::from_index) | ||
(0..Self::count()).map(Self::from_index) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.