Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: store vnode bitmap in local state store #15183

Merged
merged 11 commits into from
Feb 27, 2024
10 changes: 7 additions & 3 deletions src/storage/hummock_test/src/local_state_store_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
// limitations under the License.

use std::future::Future;
use std::sync::Arc;

use risingwave_common::buffer::Bitmap;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::EpochPair;
use risingwave_storage::error::StorageResult;
use risingwave_storage::store::{InitOptions, LocalStateStore};

pub trait LocalStateStoreTestExt: LocalStateStore {
fn init_for_test(&mut self, epoch: u64) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.init(InitOptions::new_with_epoch(EpochPair::new_test_epoch(
epoch,
)))
self.init(InitOptions::new(
EpochPair::new_test_epoch(epoch),
Arc::new(Bitmap::ones(VirtualNode::COUNT)),
))
}
}
impl<T: LocalStateStore> LocalStateStoreTestExt for T {}
43 changes: 36 additions & 7 deletions src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

use bincode::{Decode, Encode};
use risingwave_common::buffer::Bitmap;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::util::epoch::EpochPair;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_pb::common::PbBuffer;

use crate::TracedBytes;

Expand Down Expand Up @@ -58,7 +60,7 @@ impl From<TracedCachePriority> for CachePriority {
}
}

#[derive(Copy, Encode, Decode, PartialEq, Eq, Debug, Clone, Hash)]
#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
pub struct TracedTableId {
pub table_id: u32,
}
Expand Down Expand Up @@ -114,7 +116,7 @@ pub struct TracedWriteOptions {
pub table_id: TracedTableId,
}

#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)]
#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
pub struct TracedTableOption {
pub retention_seconds: Option<u32>,
}
Expand All @@ -135,13 +137,13 @@ impl From<TracedTableOption> for TableOption {
}
}

#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)]
#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
pub enum TracedOpConsistencyLevel {
Inconsistent,
ConsistentOldValue,
}

#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)]
#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
pub struct TracedNewLocalOptions {
pub table_id: TracedTableId,
pub op_consistency_level: TracedOpConsistencyLevel,
Expand All @@ -165,7 +167,7 @@ impl TracedNewLocalOptions {

pub type TracedHummockEpoch = u64;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)]
#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
pub enum TracedHummockReadEpoch {
Committed(TracedHummockEpoch),
Current(TracedHummockEpoch),
Expand Down Expand Up @@ -195,7 +197,7 @@ impl From<TracedHummockReadEpoch> for HummockReadEpoch {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)]
#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
pub struct TracedEpochPair {
pub curr: TracedHummockEpoch,
pub prev: TracedHummockEpoch,
Expand All @@ -219,9 +221,10 @@ impl From<TracedEpochPair> for EpochPair {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)]
#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
pub struct TracedInitOptions {
pub epoch: TracedEpochPair,
pub vnodes: TracedBitmap,
}

#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
Expand All @@ -230,3 +233,29 @@ pub struct TracedSealCurrentEpochOptions {
pub table_watermarks: Option<(bool, Vec<Vec<u8>>)>,
pub switch_op_consistency_level: Option<bool>,
}

#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
pub struct TracedBitmap {
pub compression: i32,
pub body: Vec<u8>,
}

impl From<Bitmap> for TracedBitmap {
fn from(value: Bitmap) -> Self {
let pb = value.to_protobuf();
Self {
compression: pb.compression,
body: pb.body,
}
}
}

impl From<TracedBitmap> for Bitmap {
fn from(value: TracedBitmap) -> Self {
let pb = PbBuffer {
compression: value.compression,
body: value.body,
};
Bitmap::from(&pb)
}
}
6 changes: 3 additions & 3 deletions src/storage/hummock_trace/src/replay/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mod tests {
let storage_type4 = StorageType::Global;

let actor_1 = vec![
(0, Operation::NewLocalStorage(opts1, 1)),
(0, Operation::NewLocalStorage(opts1.clone(), 1)),
(
1,
Operation::get(
Expand Down Expand Up @@ -135,7 +135,7 @@ mod tests {
.map(|(record_id, op)| Ok(Record::new(storage_type1, record_id, op)));

let actor_2 = vec![
(4, Operation::NewLocalStorage(opts2, 2)),
(4, Operation::NewLocalStorage(opts2.clone(), 2)),
(
5,
Operation::get(
Expand Down Expand Up @@ -166,7 +166,7 @@ mod tests {
.map(|(record_id, op)| Ok(Record::new(storage_type2, record_id, op)));

let actor_3 = vec![
(8, Operation::NewLocalStorage(opts3, 3)),
(8, Operation::NewLocalStorage(opts3.clone(), 3)),
(
9,
Operation::get(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_trace/src/replay/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl ReplayWorker {
}
Operation::NewLocalStorage(new_local_opts, id) => {
assert_ne!(storage_type, StorageType::Global);
local_storage_opts_map.insert(id, new_local_opts);
local_storage_opts_map.insert(id, new_local_opts.clone());
let local_storage = replay.new_local(new_local_opts).await;
local_storages.insert(storage_type, local_storage);
}
Expand Down
18 changes: 18 additions & 0 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use await_tree::InstrumentAwait;
use bytes::Bytes;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange};
Expand Down Expand Up @@ -386,6 +387,14 @@ impl LocalStateStore for LocalHummockStorage {
"local state store of table id {:?} is init for more than once",
self.table_id
);
let prev_vnodes = self
.read_version
.write()
.update_vnode_bitmap(options.vnodes);
assert!(
prev_vnodes.is_none(),
"Vnode bitmap should be empty during init"
);

Ok(())
}
Expand Down Expand Up @@ -427,6 +436,15 @@ impl LocalStateStore for LocalHummockStorage {
})
.expect("should be able to send")
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
let mut read_version = self.read_version.write();
assert!(read_version.staging().is_empty(), "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
self.table_id(), self.instance_id()
);
let prev_vnodes = read_version.update_vnode_bitmap(vnodes);
prev_vnodes.expect("Previous vnode bitmap should not be none")
}
}

impl LocalHummockStorage {
Expand Down
14 changes: 14 additions & 0 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use await_tree::InstrumentAwait;
use bytes::Bytes;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{
Expand Down Expand Up @@ -192,6 +193,10 @@ impl StagingVersion {
});
(overlapped_imms, overlapped_ssts)
}

pub fn is_empty(&self) -> bool {
self.imm.is_empty() && self.sst.is_empty()
}
}

#[derive(Clone)]
Expand All @@ -212,6 +217,10 @@ pub struct HummockReadVersion {
is_replicated: bool,

table_watermarks: Option<TableWatermarksIndex>,

// Vnode bitmap corresponding to the read version
// It will be initialized after local state store init
vnodes: Option<Arc<Bitmap>>,
}

impl HummockReadVersion {
Expand All @@ -238,6 +247,7 @@ impl HummockReadVersion {
committed: committed_version,

is_replicated,
vnodes: None,
}
}

Expand Down Expand Up @@ -499,6 +509,10 @@ impl HummockReadVersion {
pub fn is_replicated(&self) -> bool {
self.is_replicated
}

pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Option<Arc<Bitmap>> {
self.vnodes.replace(vnodes)
}
}

pub fn read_filter_for_batch(
Expand Down
16 changes: 15 additions & 1 deletion src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use std::collections::BTreeMap;
use std::future::Future;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::RangeBounds;
use std::sync::Arc;

use bytes::Bytes;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::estimate_size::{EstimateSize, KvSize};
use risingwave_common::hash::VnodeBitmapExt;
Expand Down Expand Up @@ -434,6 +436,7 @@ pub struct MemtableLocalStateStore<S: StateStoreWrite + StateStoreRead> {
table_id: TableId,
op_consistency_level: OpConsistencyLevel,
table_option: TableOption,
vnodes: Option<Arc<Bitmap>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simplicity I think it's unnecessary to store the vnode in this trivial local state store.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't store vnodes in this local state store, I think the assertion in state table to check whether the state table and state store vnode bitmap matches will fail.

}

impl<S: StateStoreWrite + StateStoreRead> MemtableLocalStateStore<S> {
Expand All @@ -445,6 +448,7 @@ impl<S: StateStoreWrite + StateStoreRead> MemtableLocalStateStore<S> {
table_id: option.table_id,
op_consistency_level: option.op_consistency_level,
table_option: option.table_option,
vnodes: None,
}
}

Expand Down Expand Up @@ -594,9 +598,15 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
assert!(
self.epoch.replace(options.epoch.curr).is_none(),
"local state store of table id {:?} is init for more than once",
"epoch in local state store of table id {:?} is init for more than once",
self.table_id
);
assert!(
self.vnodes.replace(options.vnodes).is_none(),
"vnodes in local state store of table id {:?} is init for more than once",
self.table_id
);

Ok(())
}

Expand Down Expand Up @@ -653,6 +663,10 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
async fn try_flush(&mut self) -> StorageResult<()> {
Ok(())
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.vnodes.replace(vnodes).unwrap()
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use await_tree::InstrumentAwait;
use bytes::Bytes;
use futures::{Future, TryFutureExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockReadEpoch;
Expand Down Expand Up @@ -286,6 +287,10 @@ impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
.try_flush()
.verbose_instrument_await("store_try_flush")
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.inner.update_vnode_bitmap(vnodes)
}
}

impl<S: StateStore> StateStore for MonitoredStateStore<S> {
Expand Down
7 changes: 7 additions & 0 deletions src/storage/src/monitor/traced_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;

use bytes::Bytes;
use futures::{Future, TryFutureExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::buffer::Bitmap;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_hummock_trace::{
Expand Down Expand Up @@ -219,6 +221,11 @@ impl<S: LocalStateStore> LocalStateStore for TracedStateStore<S> {
span.may_send_result(OperationResult::TryFlush(res.as_ref().map(|o| *o).into()));
res
}

// TODO: add trace span
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.inner.update_vnode_bitmap(vnodes)
}
}

impl<S: StateStore> StateStore for TracedStateStore<S> {
Expand Down
6 changes: 6 additions & 0 deletions src/storage/src/panic_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

use std::ops::Bound;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures::Stream;
use risingwave_common::buffer::Bitmap;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockReadEpoch;

Expand Down Expand Up @@ -134,6 +136,10 @@ impl LocalStateStore for PanicStateStore {
async fn try_flush(&mut self) -> StorageResult<()> {
panic!("should not operate on the panic state store!");
}

fn update_vnode_bitmap(&mut self, _vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
panic!("should not operate on the panic state store!");
}
}

impl StateStore for PanicStateStore {
Expand Down
Loading
Loading