Skip to content

Commit

Permalink
own local copy of local version map in event handler
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 31, 2024
1 parent 1a7dc2b commit dcc337d
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 53 deletions.
100 changes: 61 additions & 39 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ use crate::hummock::event_handler::uploader::{
default_spawn_merging_task, HummockUploader, SpawnMergingTask, SpawnUploadTask, SyncedData,
UploadTaskInfo, UploadTaskPayload, UploaderEvent,
};
use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate};
use crate::hummock::event_handler::{
HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
ReadOnlyRwLockRef,
};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::store::version::{
HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate,
Expand Down Expand Up @@ -117,7 +120,9 @@ pub struct HummockEventHandler {
hummock_event_tx: mpsc::UnboundedSender<HummockEvent>,
hummock_event_rx: mpsc::UnboundedReceiver<HummockEvent>,
pending_sync_requests: BTreeMap<HummockEpoch, oneshot::Sender<HummockResult<SyncResult>>>,
read_version_mapping: Arc<ReadVersionMappingType>,
read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
/// A copy of `read_version_mapping` but owned by event handler
local_read_version_mapping: HashMap<LocalInstanceId, HummockReadVersionRef>,

version_update_notifier_tx: Arc<tokio::sync::watch::Sender<HummockEpoch>>,
pinned_version: Arc<ArcSwap<PinnedVersion>>,
Expand Down Expand Up @@ -232,6 +237,7 @@ impl HummockEventHandler {
pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)),
write_conflict_detector,
read_version_mapping,
local_read_version_mapping: Default::default(),
uploader,
refiller,
last_instance_id: 0,
Expand All @@ -247,8 +253,8 @@ impl HummockEventHandler {
self.pinned_version.clone()
}

pub fn read_version_mapping(&self) -> Arc<ReadVersionMappingType> {
self.read_version_mapping.clone()
pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
}

pub fn buffer_tracker(&self) -> &BufferTracker {
Expand All @@ -271,7 +277,7 @@ impl HummockEventHandler {
// older data first
.rev()
.for_each(|staging_sstable_info| {
Self::for_each_read_version(&self.read_version_mapping, |read_version| {
self.for_each_read_version(|read_version| {
read_version.update(VersionUpdate::Staging(StagingData::Sst(
staging_sstable_info.clone(),
)))
Expand Down Expand Up @@ -309,21 +315,15 @@ impl HummockEventHandler {

/// This function will be performed under the protection of the `read_version_mapping` read
/// lock, and add write lock on each `read_version` operation
fn for_each_read_version<F>(read_version: &Arc<ReadVersionMappingType>, mut f: F)
where
F: FnMut(&mut HummockReadVersion),
{
let read_version_mapping_guard = read_version.read();

read_version_mapping_guard
fn for_each_read_version(&self, mut f: impl FnMut(&mut HummockReadVersion)) {
self.local_read_version_mapping
.values()
.flat_map(HashMap::values)
.for_each(|read_version| f(read_version.write().deref_mut()));
.for_each(|read_version: &HummockReadVersionRef| f(read_version.write().deref_mut()));
}

fn handle_data_spilled(&mut self, staging_sstable_info: StagingSstableInfo) {
// todo: do some prune for version update
Self::for_each_read_version(&self.read_version_mapping, |read_version| {
self.for_each_read_version(|read_version| {
trace!("data_spilled. SST size {}", staging_sstable_info.imm_size());
read_version.update(VersionUpdate::Staging(StagingData::Sst(
staging_sstable_info.clone(),
Expand Down Expand Up @@ -415,14 +415,14 @@ impl HummockEventHandler {
);
}

{
let mapping = self.read_version_mapping.read();
assert!(
mapping.is_empty(),
"read version mapping not empty when clear. remaining tables: {:?}",
mapping.keys().collect_vec()
);
}
assert!(
self.local_read_version_mapping.is_empty(),
"read version mapping not empty when clear. remaining tables: {:?}",
self.local_read_version_mapping
.values()
.map(|read_version| read_version.read().table_id())
.collect_vec()
);

if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager {
sstable_object_id_manager
Expand Down Expand Up @@ -477,7 +477,7 @@ impl HummockEventHandler {
.store(Arc::new(new_pinned_version.clone()));

{
Self::for_each_read_version(&self.read_version_mapping, |read_version| {
self.for_each_read_version(|read_version| {
read_version.update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
});
}
Expand Down Expand Up @@ -549,20 +549,19 @@ impl HummockEventHandler {

UploaderEvent::ImmMerged(merge_output) => {
// update read version for corresponding table shards
let read_guard = self.read_version_mapping.read();
if let Some(shards) = read_guard.get(&merge_output.table_id) {
shards.get(&merge_output.instance_id).map_or_else(
|| {
warn!(
"handle ImmMerged: table instance not found. table {}, instance {}",
&merge_output.table_id, &merge_output.instance_id
)
},
|read_version| {
read_version.write().update(VersionUpdate::Staging(
StagingData::MergedImmMem(merge_output.merged_imm),
));
},
if let Some(read_version) = self
.local_read_version_mapping
.get(&merge_output.instance_id)
{
read_version
.write()
.update(VersionUpdate::Staging(StagingData::MergedImmMem(
merge_output.merged_imm,
)));
} else {
warn!(
"handle ImmMerged: table instance not found. table {:?}, instance {}",
&merge_output.table_id, &merge_output.instance_id
)
}
}
Expand Down Expand Up @@ -594,6 +593,13 @@ impl HummockEventHandler {
}

HummockEvent::ImmToUploader(imm) => {
assert!(
self.local_read_version_mapping
.contains_key(&imm.instance_id),
"add imm from non-existing read version instance: instance_id: {}, table_id {}",
imm.instance_id,
imm.table_id,
);
self.uploader.add_imm(imm);
self.uploader.may_flush();
}
Expand All @@ -616,8 +622,14 @@ impl HummockEventHandler {
epoch,
opts,
table_id,
..
instance_id,
} => {
assert!(
self.local_read_version_mapping
.contains_key(&instance_id),
"seal epoch from non-existing read version instance: instance_id: {}, table_id: {}, epoch: {}",
instance_id, table_id, epoch,
);
if let Some((direction, watermarks)) = opts.table_watermarks {
self.uploader
.add_table_watermarks(epoch, table_id, watermarks, direction)
Expand Down Expand Up @@ -653,6 +665,8 @@ impl HummockEventHandler {
);

{
self.local_read_version_mapping
.insert(instance_id, basic_read_version.clone());
let mut read_version_mapping_guard = self.read_version_mapping.write();

read_version_mapping_guard
Expand Down Expand Up @@ -687,6 +701,14 @@ impl HummockEventHandler {
"read version deregister: table_id: {}, instance_id: {}",
table_id, instance_id
);
self.local_read_version_mapping
.remove(&instance_id)
.unwrap_or_else(|| {
panic!(
"DestroyHummockInstance inexist instance table_id {} instance_id {}",
table_id, instance_id
)
});
let mut read_version_mapping_guard = self.read_version_mapping.write();
let entry = read_version_mapping_guard
.get_mut(&table_id)
Expand Down
28 changes: 23 additions & 5 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::RwLock;
use parking_lot::{RwLock, RwLockReadGuard};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::HummockEpoch;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -88,8 +88,7 @@ pub enum HummockEvent {

RegisterReadVersion {
table_id: TableId,
new_read_version_sender:
oneshot::Sender<(Arc<RwLock<HummockReadVersion>>, LocalInstanceGuard)>,
new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>,
is_replicated: bool,
},

Expand Down Expand Up @@ -173,8 +172,27 @@ impl std::fmt::Debug for HummockEvent {
}

pub type LocalInstanceId = u64;
pub type ReadVersionMappingType =
RwLock<HashMap<TableId, HashMap<LocalInstanceId, Arc<RwLock<HummockReadVersion>>>>>;
pub type HummockReadVersionRef = Arc<RwLock<HummockReadVersion>>;
pub type ReadVersionMappingType = HashMap<TableId, HashMap<LocalInstanceId, HummockReadVersionRef>>;
pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef<ReadVersionMappingType>;

pub struct ReadOnlyRwLockRef<T>(Arc<RwLock<T>>);

impl<T> Clone for ReadOnlyRwLockRef<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<T> ReadOnlyRwLockRef<T> {
pub fn new(inner: Arc<RwLock<T>>) -> Self {
Self(inner)
}

pub fn read(&self) -> RwLockReadGuard<'_, T> {
self.0.read()
}
}

pub struct LocalInstanceGuard {
pub table_id: TableId,
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
use crate::hummock::compactor::CompactorContext;
use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
use crate::hummock::event_handler::{
HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadVersionMappingType,
HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
};
use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, PinnedVersion};
use crate::hummock::observer_manager::HummockObserverNode;
Expand Down Expand Up @@ -98,7 +98,7 @@ pub struct HummockStorage {

_shutdown_guard: Arc<HummockStorageShutdownGuard>,

read_version_mapping: Arc<ReadVersionMappingType>,
read_version_mapping: ReadOnlyReadVersionMapping,

backup_reader: BackupReaderRef,

Expand Down
11 changes: 5 additions & 6 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ use std::sync::Arc;

use await_tree::InstrumentAwait;
use bytes::Bytes;
use parking_lot::RwLock;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
use tokio::sync::mpsc;
use tracing::{warn, Instrument};

use super::version::{HummockReadVersion, StagingData, VersionUpdate};
use super::version::{StagingData, VersionUpdate};
use crate::error::StorageResult;
use crate::hummock::event_handler::{HummockEvent, LocalInstanceGuard};
use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
use crate::hummock::iterator::{
ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator,
UserIterator,
Expand Down Expand Up @@ -64,7 +63,7 @@ pub struct LocalHummockStorage {
instance_guard: LocalInstanceGuard,

/// Read handle.
read_version: Arc<RwLock<HummockReadVersion>>,
read_version: HummockReadVersionRef,

/// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`.
/// It's used by executors in different CNs to synchronize states.
Expand Down Expand Up @@ -517,7 +516,7 @@ impl LocalHummockStorage {
#[allow(clippy::too_many_arguments)]
pub fn new(
instance_guard: LocalInstanceGuard,
read_version: Arc<RwLock<HummockReadVersion>>,
read_version: HummockReadVersionRef,
hummock_version_reader: HummockVersionReader,
event_sender: mpsc::UnboundedSender<HummockEvent>,
memory_limiter: Arc<MemoryLimiter>,
Expand Down Expand Up @@ -548,7 +547,7 @@ impl LocalHummockStorage {
}

/// See `HummockReadVersion::update` for more details.
pub fn read_version(&self) -> Arc<RwLock<HummockReadVersion>> {
pub fn read_version(&self) -> HummockReadVersionRef {
self.read_version.clone()
}

Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use tracing::Instrument;

use super::StagingDataIterator;
use crate::error::StorageResult;
use crate::hummock::event_handler::HummockReadVersionRef;
use crate::hummock::iterator::{
ConcatIterator, ForwardMergeRangeIterator, HummockIteratorUnion, MergeIterator,
SkipWatermarkIterator, UserIterator,
Expand Down Expand Up @@ -244,6 +245,10 @@ impl HummockReadVersion {
Self::new_with_replication_option(table_id, committed_version, false)
}

pub fn table_id(&self) -> TableId {
self.table_id
}

/// Updates the read version with `VersionUpdate`.
/// There will be three data types to be processed
/// `VersionUpdate::Staging`
Expand Down Expand Up @@ -500,7 +505,7 @@ pub fn read_filter_for_batch(
epoch: HummockEpoch, // for check
table_id: TableId,
mut key_range: TableKeyRange,
read_version_vec: Vec<Arc<RwLock<HummockReadVersion>>>,
read_version_vec: Vec<HummockReadVersionRef>,
) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
assert!(!read_version_vec.is_empty());
let mut staging_vec = Vec::with_capacity(read_version_vec.len());
Expand Down

0 comments on commit dcc337d

Please sign in to comment.