Skip to content

Commit

Permalink
init vnode bitmap in new_local
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Mar 5, 2024
1 parent 29901b9 commit 957f8b0
Show file tree
Hide file tree
Showing 18 changed files with 98 additions and 76 deletions.
4 changes: 3 additions & 1 deletion src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ pub(crate) mod tests {
value_size: usize,
epochs: Vec<u64>,
) {
let mut local = storage.new_local(Default::default()).await;
let mut local = storage
.new_local(NewLocalOptions::for_test(TableId::default()))
.await;
// 1. add sstables
let val = b"0"[..].repeat(value_size);
local.init_for_test(epochs[0]).await.unwrap();
Expand Down
4 changes: 3 additions & 1 deletion src/storage/hummock_test/src/failpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ async fn test_failpoints_state_store_read_upload() {
.await
.unwrap();

let mut local = hummock_storage.new_local(NewLocalOptions::default()).await;
let mut local = hummock_storage
.new_local(NewLocalOptions::for_test(TableId::default()))
.await;

let anchor = gen_key_from_str(VirtualNode::ZERO, "aa");
let mut batch1 = vec![
Expand Down
9 changes: 5 additions & 4 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ async fn test_read_version_basic() {

let mut epoch = 1;
let table_id = 0;
let mut read_version = HummockReadVersion::new(TableId::from(table_id), pinned_version);
let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT));
let mut read_version = HummockReadVersion::new(TableId::from(table_id), pinned_version, vnodes);

{
// single imm
Expand Down Expand Up @@ -263,13 +264,13 @@ async fn test_read_filter_basic() {

let epoch = 1;
let table_id = 0;
let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT));
let read_version = Arc::new(RwLock::new(HummockReadVersion::new(
TableId::from(table_id),
pinned_version,
vnodes.clone(),
)));
read_version
.write()
.update_vnode_bitmap(Arc::new(Bitmap::ones(VirtualNode::COUNT)));
read_version.write().update_vnode_bitmap(vnodes);

{
// single imm
Expand Down
8 changes: 1 addition & 7 deletions src/storage/hummock_test/src/local_state_store_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,14 @@
// limitations under the License.

use std::future::Future;
use std::sync::Arc;

use risingwave_common::buffer::Bitmap;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::EpochPair;
use risingwave_storage::error::StorageResult;
use risingwave_storage::store::{InitOptions, LocalStateStore};

pub trait LocalStateStoreTestExt: LocalStateStore {
fn init_for_test(&mut self, epoch: u64) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.init(InitOptions::new(
EpochPair::new_test_epoch(epoch),
Arc::new(Bitmap::ones(VirtualNode::COUNT)),
))
self.init(InitOptions::new(EpochPair::new_test_epoch(epoch)))
}
}
impl<T: LocalStateStore> LocalStateStoreTestExt for T {}
14 changes: 11 additions & 3 deletions src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use bytes::Bytes;
use expect_test::expect;
use futures::{pin_mut, StreamExt, TryStreamExt};
use risingwave_common::buffer::Bitmap;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VirtualNode;
Expand Down Expand Up @@ -127,7 +128,9 @@ async fn test_basic_inner(
// Make sure the batch is sorted.
batch3.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));

let mut local = hummock_storage.new_local(Default::default()).await;
let mut local = hummock_storage
.new_local(NewLocalOptions::for_test(TableId::default()))
.await;

// epoch 0 is reserved by storage service
let epoch1: u64 = 1;
Expand Down Expand Up @@ -1043,7 +1046,9 @@ async fn test_delete_get_inner(
StorageValue::new_put("222"),
),
];
let mut local = hummock_storage.new_local(NewLocalOptions::default()).await;
let mut local = hummock_storage
.new_local(NewLocalOptions::for_test(Default::default()))
.await;
local.init_for_test(epoch1).await.unwrap();
local
.ingest_batch(
Expand Down Expand Up @@ -1126,7 +1131,9 @@ async fn test_multiple_epoch_sync_inner(
),
];

let mut local = hummock_storage.new_local(NewLocalOptions::default()).await;
let mut local = hummock_storage
.new_local(NewLocalOptions::for_test(TableId::default()))
.await;
local.init_for_test(epoch1).await.unwrap();
local
.ingest_batch(
Expand Down Expand Up @@ -1382,6 +1389,7 @@ async fn test_replicated_local_hummock_storage() {
TableOption {
retention_seconds: None,
},
Arc::new(Bitmap::ones(VirtualNode::COUNT)),
))
.await;

Expand Down
4 changes: 3 additions & 1 deletion src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use bincode::{Decode, Encode};
use risingwave_common::buffer::Bitmap;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::EpochPair;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_pb::common::PbBuffer;
Expand Down Expand Up @@ -149,6 +150,7 @@ pub struct TracedNewLocalOptions {
pub op_consistency_level: TracedOpConsistencyLevel,
pub table_option: TracedTableOption,
pub is_replicated: bool,
pub vnodes: TracedBitmap,
}

#[cfg(test)]
Expand All @@ -161,6 +163,7 @@ impl TracedNewLocalOptions {
retention_seconds: None,
},
is_replicated: false,
vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT)),
}
}
}
Expand Down Expand Up @@ -224,7 +227,6 @@ impl From<TracedEpochPair> for EpochPair {
#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
pub struct TracedInitOptions {
pub epoch: TracedEpochPair,
pub vnodes: TracedBitmap,
}

#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,13 +741,15 @@ impl HummockEventHandler {
table_id,
new_read_version_sender,
is_replicated,
vnodes,
} => {
let pinned_version = self.pinned_version.load();
let basic_read_version = Arc::new(RwLock::new(
HummockReadVersion::new_with_replication_option(
table_id,
(**pinned_version).clone(),
is_replicated,
vnodes,
),
));

Expand Down Expand Up @@ -875,7 +877,9 @@ mod tests {
use bytes::Bytes;
use futures::FutureExt;
use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_hummock_sdk::version::HummockVersion;
Expand Down Expand Up @@ -955,6 +959,7 @@ mod tests {
table_id,
new_read_version_sender: read_version_tx,
is_replicated: false,
vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)),
})
.unwrap();
let (read_version, guard) = read_version_rx.await.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::{RwLock, RwLockReadGuard};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::HummockEpoch;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -88,6 +89,7 @@ pub enum HummockEvent {
table_id: TableId,
new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>,
is_replicated: bool,
vnodes: Arc<Bitmap>,
},

DestroyReadVersion {
Expand Down Expand Up @@ -138,6 +140,7 @@ impl HummockEvent {
table_id,
new_read_version_sender: _,
is_replicated,
vnodes: _,
} => format!(
"RegisterReadVersion table_id {:?}, is_replicated: {:?}",
table_id, is_replicated
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ impl HummockStorage {
table_id: option.table_id,
new_read_version_sender: tx,
is_replicated: option.is_replicated,
vnodes: option.vnodes.clone(),
})
.unwrap();

Expand Down
12 changes: 1 addition & 11 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,15 +394,6 @@ impl LocalStateStore for LocalHummockStorage {
"local state store of table id {:?} is init for more than once",
self.table_id
);
let prev_vnodes = self
.read_version
.write()
.update_vnode_bitmap(options.vnodes);
assert!(
prev_vnodes.is_none(),
"Vnode bitmap should be empty during init"
);

Ok(())
}

Expand Down Expand Up @@ -449,8 +440,7 @@ impl LocalStateStore for LocalHummockStorage {
assert!(read_version.staging().is_empty(), "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
self.table_id(), self.instance_id()
);
let prev_vnodes = read_version.update_vnode_bitmap(vnodes);
prev_vnodes.expect("Previous vnode bitmap should not be none")
read_version.update_vnode_bitmap(vnodes)
}
}

Expand Down
21 changes: 13 additions & 8 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,15 @@ pub struct HummockReadVersion {

// Vnode bitmap corresponding to the read version
// It will be initialized after local state store init
vnodes: Option<Arc<Bitmap>>,
vnodes: Arc<Bitmap>,
}

impl HummockReadVersion {
pub fn new_with_replication_option(
table_id: TableId,
committed_version: CommittedVersion,
is_replicated: bool,
vnodes: Arc<Bitmap>,
) -> Self {
// before build `HummockReadVersion`, we need to get the a initial version which obtained
// from meta. want this initialization after version is initialized (now with
Expand All @@ -246,12 +247,16 @@ impl HummockReadVersion {
committed: committed_version,

is_replicated,
vnodes: None,
vnodes,
}
}

pub fn new(table_id: TableId, committed_version: CommittedVersion) -> Self {
Self::new_with_replication_option(table_id, committed_version, false)
pub fn new(
table_id: TableId,
committed_version: CommittedVersion,
vnodes: Arc<Bitmap>,
) -> Self {
Self::new_with_replication_option(table_id, committed_version, false, vnodes)
}

pub fn table_id(&self) -> TableId {
Expand Down Expand Up @@ -509,16 +514,16 @@ impl HummockReadVersion {
self.is_replicated
}

pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Option<Arc<Bitmap>> {
self.vnodes.replace(vnodes)
pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
std::mem::replace(&mut self.vnodes, vnodes)
}

pub fn contains(&self, vnode: VirtualNode) -> bool {
self.vnodes.as_ref().unwrap().is_set(vnode.to_index())
self.vnodes.is_set(vnode.to_index())
}

pub fn vnodes(&self) -> Arc<Bitmap> {
self.vnodes.as_ref().unwrap().clone()
self.vnodes.clone()
}
}

Expand Down
11 changes: 3 additions & 8 deletions src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ pub struct MemtableLocalStateStore<S: StateStoreWrite + StateStoreRead> {
table_id: TableId,
op_consistency_level: OpConsistencyLevel,
table_option: TableOption,
vnodes: Option<Arc<Bitmap>>,
vnodes: Arc<Bitmap>,
}

impl<S: StateStoreWrite + StateStoreRead> MemtableLocalStateStore<S> {
Expand All @@ -448,7 +448,7 @@ impl<S: StateStoreWrite + StateStoreRead> MemtableLocalStateStore<S> {
table_id: option.table_id,
op_consistency_level: option.op_consistency_level,
table_option: option.table_option,
vnodes: None,
vnodes: option.vnodes,
}
}

Expand Down Expand Up @@ -601,11 +601,6 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
"epoch in local state store of table id {:?} is init for more than once",
self.table_id
);
assert!(
self.vnodes.replace(options.vnodes).is_none(),
"vnodes in local state store of table id {:?} is init for more than once",
self.table_id
);

Ok(())
}
Expand Down Expand Up @@ -665,7 +660,7 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.vnodes.replace(vnodes).unwrap()
std::mem::replace(&mut self.vnodes, vnodes)
}
}

Expand Down
Loading

0 comments on commit 957f8b0

Please sign in to comment.