From dcc337d7469e8ca8f6fb35eee60999bd846cb851 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 31 Jan 2024 16:23:57 +0800 Subject: [PATCH] own local copy of local version map in event handler --- .../event_handler/hummock_event_handler.rs | 100 +++++++++++------- src/storage/src/hummock/event_handler/mod.rs | 28 ++++- .../src/hummock/store/hummock_storage.rs | 4 +- .../hummock/store/local_hummock_storage.rs | 11 +- src/storage/src/hummock/store/version.rs | 7 +- 5 files changed, 97 insertions(+), 53 deletions(-) diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index d56cf6f2789b8..ef342e1de21f9 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -38,7 +38,10 @@ use crate::hummock::event_handler::uploader::{ default_spawn_merging_task, HummockUploader, SpawnMergingTask, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskPayload, UploaderEvent, }; -use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; +use crate::hummock::event_handler::{ + HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping, + ReadOnlyRwLockRef, +}; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::store::version::{ HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate, @@ -117,7 +120,9 @@ pub struct HummockEventHandler { hummock_event_tx: mpsc::UnboundedSender, hummock_event_rx: mpsc::UnboundedReceiver, pending_sync_requests: BTreeMap>>, - read_version_mapping: Arc, + read_version_mapping: Arc>, + /// A copy of `read_version_mapping` but owned by event handler + local_read_version_mapping: HashMap, version_update_notifier_tx: Arc>, pinned_version: Arc>, @@ -232,6 +237,7 @@ impl HummockEventHandler { pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)), write_conflict_detector, read_version_mapping, + local_read_version_mapping: Default::default(), uploader, refiller, last_instance_id: 0, @@ -247,8 +253,8 @@ impl HummockEventHandler { self.pinned_version.clone() } - pub fn read_version_mapping(&self) -> Arc { - self.read_version_mapping.clone() + pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping { + ReadOnlyRwLockRef::new(self.read_version_mapping.clone()) } pub fn buffer_tracker(&self) -> &BufferTracker { @@ -271,7 +277,7 @@ impl HummockEventHandler { // older data first .rev() .for_each(|staging_sstable_info| { - Self::for_each_read_version(&self.read_version_mapping, |read_version| { + self.for_each_read_version(|read_version| { read_version.update(VersionUpdate::Staging(StagingData::Sst( staging_sstable_info.clone(), ))) @@ -309,21 +315,15 @@ impl HummockEventHandler { /// This function will be performed under the protection of the `read_version_mapping` read /// lock, and add write lock on each `read_version` operation - fn for_each_read_version(read_version: &Arc, mut f: F) - where - F: FnMut(&mut HummockReadVersion), - { - let read_version_mapping_guard = read_version.read(); - - read_version_mapping_guard + fn for_each_read_version(&self, mut f: impl FnMut(&mut HummockReadVersion)) { + self.local_read_version_mapping .values() - .flat_map(HashMap::values) - .for_each(|read_version| f(read_version.write().deref_mut())); + .for_each(|read_version: &HummockReadVersionRef| f(read_version.write().deref_mut())); } fn handle_data_spilled(&mut self, staging_sstable_info: StagingSstableInfo) { // todo: do some prune for version update - Self::for_each_read_version(&self.read_version_mapping, |read_version| { + self.for_each_read_version(|read_version| { trace!("data_spilled. SST size {}", staging_sstable_info.imm_size()); read_version.update(VersionUpdate::Staging(StagingData::Sst( staging_sstable_info.clone(), @@ -415,14 +415,14 @@ impl HummockEventHandler { ); } - { - let mapping = self.read_version_mapping.read(); - assert!( - mapping.is_empty(), - "read version mapping not empty when clear. remaining tables: {:?}", - mapping.keys().collect_vec() - ); - } + assert!( + self.local_read_version_mapping.is_empty(), + "read version mapping not empty when clear. remaining tables: {:?}", + self.local_read_version_mapping + .values() + .map(|read_version| read_version.read().table_id()) + .collect_vec() + ); if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager { sstable_object_id_manager @@ -477,7 +477,7 @@ impl HummockEventHandler { .store(Arc::new(new_pinned_version.clone())); { - Self::for_each_read_version(&self.read_version_mapping, |read_version| { + self.for_each_read_version(|read_version| { read_version.update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone())) }); } @@ -549,20 +549,19 @@ impl HummockEventHandler { UploaderEvent::ImmMerged(merge_output) => { // update read version for corresponding table shards - let read_guard = self.read_version_mapping.read(); - if let Some(shards) = read_guard.get(&merge_output.table_id) { - shards.get(&merge_output.instance_id).map_or_else( - || { - warn!( - "handle ImmMerged: table instance not found. table {}, instance {}", - &merge_output.table_id, &merge_output.instance_id - ) - }, - |read_version| { - read_version.write().update(VersionUpdate::Staging( - StagingData::MergedImmMem(merge_output.merged_imm), - )); - }, + if let Some(read_version) = self + .local_read_version_mapping + .get(&merge_output.instance_id) + { + read_version + .write() + .update(VersionUpdate::Staging(StagingData::MergedImmMem( + merge_output.merged_imm, + ))); + } else { + warn!( + "handle ImmMerged: table instance not found. table {:?}, instance {}", + &merge_output.table_id, &merge_output.instance_id ) } } @@ -594,6 +593,13 @@ impl HummockEventHandler { } HummockEvent::ImmToUploader(imm) => { + assert!( + self.local_read_version_mapping + .contains_key(&imm.instance_id), + "add imm from non-existing read version instance: instance_id: {}, table_id {}", + imm.instance_id, + imm.table_id, + ); self.uploader.add_imm(imm); self.uploader.may_flush(); } @@ -616,8 +622,14 @@ impl HummockEventHandler { epoch, opts, table_id, - .. + instance_id, } => { + assert!( + self.local_read_version_mapping + .contains_key(&instance_id), + "seal epoch from non-existing read version instance: instance_id: {}, table_id: {}, epoch: {}", + instance_id, table_id, epoch, + ); if let Some((direction, watermarks)) = opts.table_watermarks { self.uploader .add_table_watermarks(epoch, table_id, watermarks, direction) @@ -653,6 +665,8 @@ impl HummockEventHandler { ); { + self.local_read_version_mapping + .insert(instance_id, basic_read_version.clone()); let mut read_version_mapping_guard = self.read_version_mapping.write(); read_version_mapping_guard @@ -687,6 +701,14 @@ impl HummockEventHandler { "read version deregister: table_id: {}, instance_id: {}", table_id, instance_id ); + self.local_read_version_mapping + .remove(&instance_id) + .unwrap_or_else(|| { + panic!( + "DestroyHummockInstance inexist instance table_id {} instance_id {}", + table_id, instance_id + ) + }); let mut read_version_mapping_guard = self.read_version_mapping.write(); let entry = read_version_mapping_guard .get_mut(&table_id) diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index ffce8c622fbd6..762f91d6a12ba 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockReadGuard}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::HummockEpoch; use thiserror_ext::AsReport; @@ -88,8 +88,7 @@ pub enum HummockEvent { RegisterReadVersion { table_id: TableId, - new_read_version_sender: - oneshot::Sender<(Arc>, LocalInstanceGuard)>, + new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>, is_replicated: bool, }, @@ -173,8 +172,27 @@ impl std::fmt::Debug for HummockEvent { } pub type LocalInstanceId = u64; -pub type ReadVersionMappingType = - RwLock>>>>; +pub type HummockReadVersionRef = Arc>; +pub type ReadVersionMappingType = HashMap>; +pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef; + +pub struct ReadOnlyRwLockRef(Arc>); + +impl Clone for ReadOnlyRwLockRef { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl ReadOnlyRwLockRef { + pub fn new(inner: Arc>) -> Self { + Self(inner) + } + + pub fn read(&self) -> RwLockReadGuard<'_, T> { + self.0.read() + } +} pub struct LocalInstanceGuard { pub table_id: TableId, diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 831411c11017a..a5322a0d7765b 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -41,7 +41,7 @@ use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; use crate::hummock::compactor::CompactorContext; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::{ - HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadVersionMappingType, + HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping, }; use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, PinnedVersion}; use crate::hummock::observer_manager::HummockObserverNode; @@ -98,7 +98,7 @@ pub struct HummockStorage { _shutdown_guard: Arc, - read_version_mapping: Arc, + read_version_mapping: ReadOnlyReadVersionMapping, backup_reader: BackupReaderRef, diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 7b3a9bc5da5fb..fa35f71975063 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use bytes::Bytes; -use parking_lot::RwLock; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange}; @@ -26,9 +25,9 @@ use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; use tokio::sync::mpsc; use tracing::{warn, Instrument}; -use super::version::{HummockReadVersion, StagingData, VersionUpdate}; +use super::version::{StagingData, VersionUpdate}; use crate::error::StorageResult; -use crate::hummock::event_handler::{HummockEvent, LocalInstanceGuard}; +use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard}; use crate::hummock::iterator::{ ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator, UserIterator, @@ -64,7 +63,7 @@ pub struct LocalHummockStorage { instance_guard: LocalInstanceGuard, /// Read handle. - read_version: Arc>, + read_version: HummockReadVersionRef, /// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`. /// It's used by executors in different CNs to synchronize states. @@ -517,7 +516,7 @@ impl LocalHummockStorage { #[allow(clippy::too_many_arguments)] pub fn new( instance_guard: LocalInstanceGuard, - read_version: Arc>, + read_version: HummockReadVersionRef, hummock_version_reader: HummockVersionReader, event_sender: mpsc::UnboundedSender, memory_limiter: Arc, @@ -548,7 +547,7 @@ impl LocalHummockStorage { } /// See `HummockReadVersion::update` for more details. - pub fn read_version(&self) -> Arc> { + pub fn read_version(&self) -> HummockReadVersionRef { self.read_version.clone() } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index f7029f7df7bc8..329145c169f85 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -40,6 +40,7 @@ use tracing::Instrument; use super::StagingDataIterator; use crate::error::StorageResult; +use crate::hummock::event_handler::HummockReadVersionRef; use crate::hummock::iterator::{ ConcatIterator, ForwardMergeRangeIterator, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator, UserIterator, @@ -244,6 +245,10 @@ impl HummockReadVersion { Self::new_with_replication_option(table_id, committed_version, false) } + pub fn table_id(&self) -> TableId { + self.table_id + } + /// Updates the read version with `VersionUpdate`. /// There will be three data types to be processed /// `VersionUpdate::Staging` @@ -500,7 +505,7 @@ pub fn read_filter_for_batch( epoch: HummockEpoch, // for check table_id: TableId, mut key_range: TableKeyRange, - read_version_vec: Vec>>, + read_version_vec: Vec, ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { assert!(!read_version_vec.is_empty()); let mut staging_vec = Vec::with_capacity(read_version_vec.len());