Skip to content

Commit

Permalink
feat: store vnode bitmap in local state store
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Feb 21, 2024
1 parent 4197ad5 commit 45efd77
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 29 deletions.
10 changes: 7 additions & 3 deletions src/storage/hummock_test/src/local_state_store_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
// limitations under the License.

use std::future::Future;
use std::sync::Arc;

use risingwave_common::buffer::Bitmap;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::EpochPair;
use risingwave_storage::error::StorageResult;
use risingwave_storage::store::{InitOptions, LocalStateStore};

pub trait LocalStateStoreTestExt: LocalStateStore {
fn init_for_test(&mut self, epoch: u64) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.init(InitOptions::new_with_epoch(EpochPair::new_test_epoch(
epoch,
)))
self.init(InitOptions::new(
EpochPair::new_test_epoch(epoch),
Arc::new(Bitmap::ones(VirtualNode::COUNT)),
))
}
}
impl<T: LocalStateStore> LocalStateStoreTestExt for T {}
43 changes: 36 additions & 7 deletions src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

use bincode::{Decode, Encode};
use risingwave_common::buffer::Bitmap;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::util::epoch::EpochPair;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_pb::common::PbBuffer;

use crate::TracedBytes;

Expand Down Expand Up @@ -58,7 +60,7 @@ impl From<TracedCachePriority> for CachePriority {
}
}

#[derive(Copy, Encode, Decode, PartialEq, Eq, Debug, Clone, Hash)]
#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
pub struct TracedTableId {
pub table_id: u32,
}
Expand Down Expand Up @@ -114,7 +116,7 @@ pub struct TracedWriteOptions {
pub table_id: TracedTableId,
}

#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)]
#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
pub struct TracedTableOption {
pub retention_seconds: Option<u32>,
}
Expand All @@ -135,13 +137,13 @@ impl From<TracedTableOption> for TableOption {
}
}

#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)]
#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
pub enum TracedOpConsistencyLevel {
Inconsistent,
ConsistentOldValue,
}

#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)]
#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
pub struct TracedNewLocalOptions {
pub table_id: TracedTableId,
pub op_consistency_level: TracedOpConsistencyLevel,
Expand All @@ -165,7 +167,7 @@ impl TracedNewLocalOptions {

pub type TracedHummockEpoch = u64;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)]
#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
pub enum TracedHummockReadEpoch {
Committed(TracedHummockEpoch),
Current(TracedHummockEpoch),
Expand Down Expand Up @@ -195,7 +197,7 @@ impl From<TracedHummockReadEpoch> for HummockReadEpoch {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)]
#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
pub struct TracedEpochPair {
pub curr: TracedHummockEpoch,
pub prev: TracedHummockEpoch,
Expand All @@ -219,9 +221,10 @@ impl From<TracedEpochPair> for EpochPair {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)]
#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
pub struct TracedInitOptions {
pub epoch: TracedEpochPair,
pub vnodes: TracedBitmap,
}

#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
Expand All @@ -230,3 +233,29 @@ pub struct TracedSealCurrentEpochOptions {
pub table_watermarks: Option<(bool, Vec<Vec<u8>>)>,
pub switch_op_consistency_level: Option<bool>,
}

#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
pub struct TracedBitmap {
pub compression: i32,
pub body: Vec<u8>,
}

impl From<Bitmap> for TracedBitmap {
fn from(value: Bitmap) -> Self {
let pb = value.to_protobuf();
Self {
compression: pb.compression,
body: pb.body,
}
}
}

impl From<TracedBitmap> for Bitmap {
fn from(value: TracedBitmap) -> Self {
let pb = PbBuffer {
compression: value.compression,
body: value.body,
};
Bitmap::from(&pb)
}
}
6 changes: 3 additions & 3 deletions src/storage/hummock_trace/src/replay/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mod tests {
let storage_type4 = StorageType::Global;

let actor_1 = vec![
(0, Operation::NewLocalStorage(opts1, 1)),
(0, Operation::NewLocalStorage(opts1.clone(), 1)),
(
1,
Operation::get(
Expand Down Expand Up @@ -135,7 +135,7 @@ mod tests {
.map(|(record_id, op)| Ok(Record::new(storage_type1, record_id, op)));

let actor_2 = vec![
(4, Operation::NewLocalStorage(opts2, 2)),
(4, Operation::NewLocalStorage(opts2.clone(), 2)),
(
5,
Operation::get(
Expand Down Expand Up @@ -166,7 +166,7 @@ mod tests {
.map(|(record_id, op)| Ok(Record::new(storage_type2, record_id, op)));

let actor_3 = vec![
(8, Operation::NewLocalStorage(opts3, 3)),
(8, Operation::NewLocalStorage(opts3.clone(), 3)),
(
9,
Operation::get(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_trace/src/replay/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl ReplayWorker {
}
Operation::NewLocalStorage(new_local_opts, id) => {
assert_ne!(storage_type, StorageType::Global);
local_storage_opts_map.insert(id, new_local_opts);
local_storage_opts_map.insert(id, new_local_opts.clone());
let local_storage = replay.new_local(new_local_opts).await;
local_storages.insert(storage_type, local_storage);
}
Expand Down
22 changes: 22 additions & 0 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use await_tree::InstrumentAwait;
use bytes::Bytes;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange};
Expand Down Expand Up @@ -386,6 +387,14 @@ impl LocalStateStore for LocalHummockStorage {
"local state store of table id {:?} is init for more than once",
self.table_id
);
let prev_vnodes = self
.read_version
.write()
.update_vnode_bitmap(options.vnodes);
assert!(
prev_vnodes.is_none(),
"Vnode bitmap should be empty during init"
);

Ok(())
}
Expand Down Expand Up @@ -427,6 +436,19 @@ impl LocalStateStore for LocalHummockStorage {
})
.expect("should be able to send")
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
let mut read_version = self.read_version.write();
assert!(read_version.staging().is_empty(), "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
self.table_id(), self.instance_id()
);
let prev_vnodes = read_version.update_vnode_bitmap(vnodes);
assert!(
prev_vnodes.is_some(),
"Previous vnode bitmap should not be none"
);
prev_vnodes.unwrap()
}
}

impl LocalHummockStorage {
Expand Down
14 changes: 14 additions & 0 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use await_tree::InstrumentAwait;
use bytes::Bytes;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{
Expand Down Expand Up @@ -192,6 +193,10 @@ impl StagingVersion {
});
(overlapped_imms, overlapped_ssts)
}

pub fn is_empty(&self) -> bool {
self.imm.is_empty() && self.sst.is_empty()
}
}

#[derive(Clone)]
Expand All @@ -212,6 +217,10 @@ pub struct HummockReadVersion {
is_replicated: bool,

table_watermarks: Option<TableWatermarksIndex>,

// Vnode bitmap corresponding to the read version
// It will be initialized after local state store init
vnodes: Option<Arc<Bitmap>>,
}

impl HummockReadVersion {
Expand All @@ -238,6 +247,7 @@ impl HummockReadVersion {
committed: committed_version,

is_replicated,
vnodes: None,
}
}

Expand Down Expand Up @@ -499,6 +509,10 @@ impl HummockReadVersion {
pub fn is_replicated(&self) -> bool {
self.is_replicated
}

pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Option<Arc<Bitmap>> {
self.vnodes.replace(vnodes)
}
}

pub fn read_filter_for_batch(
Expand Down
16 changes: 15 additions & 1 deletion src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use std::collections::BTreeMap;
use std::future::Future;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::RangeBounds;
use std::sync::Arc;

use bytes::Bytes;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::estimate_size::{EstimateSize, KvSize};
use risingwave_common::hash::VnodeBitmapExt;
Expand Down Expand Up @@ -434,6 +436,7 @@ pub struct MemtableLocalStateStore<S: StateStoreWrite + StateStoreRead> {
table_id: TableId,
op_consistency_level: OpConsistencyLevel,
table_option: TableOption,
vnodes: Option<Arc<Bitmap>>,
}

impl<S: StateStoreWrite + StateStoreRead> MemtableLocalStateStore<S> {
Expand All @@ -445,6 +448,7 @@ impl<S: StateStoreWrite + StateStoreRead> MemtableLocalStateStore<S> {
table_id: option.table_id,
op_consistency_level: option.op_consistency_level,
table_option: option.table_option,
vnodes: None,
}
}

Expand Down Expand Up @@ -594,9 +598,15 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
assert!(
self.epoch.replace(options.epoch.curr).is_none(),
"local state store of table id {:?} is init for more than once",
"epoch in local state store of table id {:?} is init for more than once",
self.table_id
);
assert!(
self.vnodes.replace(options.vnodes).is_none(),
"vnodes in local state store of table id {:?} is init for more than once",
self.table_id
);

Ok(())
}

Expand Down Expand Up @@ -653,6 +663,10 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
async fn try_flush(&mut self) -> StorageResult<()> {
Ok(())
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.vnodes.replace(vnodes).unwrap()
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use await_tree::InstrumentAwait;
use bytes::Bytes;
use futures::{Future, TryFutureExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockReadEpoch;
Expand Down Expand Up @@ -286,6 +287,10 @@ impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
.try_flush()
.verbose_instrument_await("store_try_flush")
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.inner.update_vnode_bitmap(vnodes)
}
}

impl<S: StateStore> StateStore for MonitoredStateStore<S> {
Expand Down
6 changes: 6 additions & 0 deletions src/storage/src/panic_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

use std::ops::Bound;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures::Stream;
use risingwave_common::buffer::Bitmap;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockReadEpoch;

Expand Down Expand Up @@ -134,6 +136,10 @@ impl LocalStateStore for PanicStateStore {
async fn try_flush(&mut self) -> StorageResult<()> {
panic!("should not operate on the panic state store!");
}

fn update_vnode_bitmap(&mut self, _vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
panic!("should not operate on the panic state store!");
}
}

impl StateStore for PanicStateStore {
Expand Down
Loading

0 comments on commit 45efd77

Please sign in to comment.