diff --git a/src/storage/src/hummock/state_store.rs b/src/storage/src/hummock/state_store.rs deleted file mode 100644 index 8dfb33fe2ea17..0000000000000 --- a/src/storage/src/hummock/state_store.rs +++ /dev/null @@ -1,282 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::future::Future; -use std::ops::Bound; -use std::sync::atomic::Ordering as MemOrdering; - -use bytes::Bytes; -use itertools::Itertools; -use more_asserts::assert_gt; -use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::key::{map_table_key_range, TableKey, TableKeyRange}; -use risingwave_hummock_sdk::HummockReadEpoch; -use risingwave_pb::hummock::SstableInfo; -use tokio::sync::oneshot; - -use super::store::state_store::HummockStorageIterator; -use super::store::version::CommittedVersion; -use super::utils::validate_safe_epoch; -use super::HummockStorage; -use crate::error::StorageResult; -use crate::hummock::event_handler::HummockEvent; -use crate::hummock::store::memtable::ImmutableMemtable; -use crate::hummock::store::state_store::LocalHummockStorage; -use crate::hummock::store::version::read_filter_for_batch; -use crate::hummock::utils::wait_for_epoch; -use crate::hummock::{HummockEpoch, HummockError}; -use crate::monitor::StoreLocalStatistic; -use crate::store::*; -use crate::StateStore; - -impl HummockStorage { - /// Gets the value of a specified `key` in the table specified in `read_options`. - /// The result is based on a snapshot corresponding to the given `epoch`. - /// if `key` has consistent hash virtual node value, then such value is stored in `value_meta` - /// - /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned, - /// the key is not found. If `Err()` is returned, the searching for the key - /// failed due to other non-EOF errors. - pub async fn get( - &self, - key: Bytes, - epoch: HummockEpoch, - read_options: ReadOptions, - ) -> StorageResult> { - let key_range = ( - Bound::Included(TableKey(key.clone())), - Bound::Included(TableKey(key.clone())), - ); - - let read_version_tuple = if read_options.read_version_from_backup { - self.build_read_version_tuple_from_backup(epoch).await? - } else { - self.build_read_version_tuple(epoch, read_options.table_id, &key_range)? - }; - - self.hummock_version_reader - .get(TableKey(key), epoch, read_options, read_version_tuple) - .await - } - - async fn iter_inner( - &self, - key_range: TableKeyRange, - epoch: u64, - read_options: ReadOptions, - ) -> StorageResult> { - let read_version_tuple = if read_options.read_version_from_backup { - self.build_read_version_tuple_from_backup(epoch).await? - } else { - self.build_read_version_tuple(epoch, read_options.table_id, &key_range)? - }; - - self.hummock_version_reader - .iter(key_range, epoch, read_options, read_version_tuple) - .await - } - - async fn build_read_version_tuple_from_backup( - &self, - epoch: u64, - ) -> StorageResult<(Vec, Vec, CommittedVersion)> { - match self.backup_reader.try_get_hummock_version(epoch).await { - Ok(Some(backup_version)) => { - validate_safe_epoch(backup_version.safe_epoch(), epoch)?; - Ok((Vec::default(), Vec::default(), backup_version)) - } - Ok(None) => Err(HummockError::read_backup_error(format!( - "backup include epoch {} not found", - epoch - )) - .into()), - Err(e) => Err(e), - } - } - - fn build_read_version_tuple( - &self, - epoch: u64, - table_id: TableId, - key_range: &TableKeyRange, - ) -> StorageResult<(Vec, Vec, CommittedVersion)> { - let pinned_version = self.pinned_version.load(); - validate_safe_epoch(pinned_version.safe_epoch(), epoch)?; - - // check epoch if lower mce - let read_version_tuple: (Vec, Vec, CommittedVersion) = - if epoch <= pinned_version.max_committed_epoch() { - // read committed_version directly without build snapshot - (Vec::default(), Vec::default(), (**pinned_version).clone()) - } else { - let read_version_vec = { - let read_guard = self.read_version_mapping.read(); - read_guard - .get(&table_id) - .map(|v| { - v.values() - .filter(|v| !v.read_arc().is_replicated()) - .cloned() - .collect_vec() - }) - .unwrap_or_default() - }; - - // When the system has just started and no state has been created, the memory state - // may be empty - if read_version_vec.is_empty() { - (Vec::default(), Vec::default(), (**pinned_version).clone()) - } else { - let (imm_vec, sst_vec) = - read_filter_for_batch(epoch, table_id, key_range, read_version_vec)?; - let committed_version = (**pinned_version).clone(); - - (imm_vec, sst_vec, committed_version) - } - }; - - Ok(read_version_tuple) - } -} - -impl StateStoreRead for HummockStorage { - type IterStream = StreamTypeOfIter; - - fn get( - &self, - key: Bytes, - epoch: u64, - read_options: ReadOptions, - ) -> impl Future>> + '_ { - self.get(key, epoch, read_options) - } - - fn iter( - &self, - key_range: IterKeyRange, - epoch: u64, - read_options: ReadOptions, - ) -> impl Future> + '_ { - self.iter_inner(map_table_key_range(key_range), epoch, read_options) - } -} - -impl StateStore for HummockStorage { - type Local = LocalHummockStorage; - - /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`, - /// we will only check whether it is le `sealed_epoch` and won't wait. - async fn try_wait_epoch(&self, wait_epoch: HummockReadEpoch) -> StorageResult<()> { - self.validate_read_epoch(wait_epoch)?; - let wait_epoch = match wait_epoch { - HummockReadEpoch::Committed(epoch) => { - assert_ne!(epoch, MAX_EPOCH, "epoch should not be MAX_EPOCH"); - epoch - } - _ => return Ok(()), - }; - wait_for_epoch(&self.version_update_notifier_tx, wait_epoch).await - } - - async fn sync(&self, epoch: u64) -> StorageResult { - let (tx, rx) = oneshot::channel(); - self.hummock_event_sender - .send(HummockEvent::AwaitSyncEpoch { - new_sync_epoch: epoch, - sync_result_sender: tx, - }) - .expect("should send success"); - Ok(rx.await.expect("should wait success")?) - } - - fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { - // Update `seal_epoch` synchronously, - // as `HummockEvent::SealEpoch` is handled asynchronously. - let prev_epoch = self.seal_epoch.swap(epoch, MemOrdering::SeqCst); - assert_gt!(epoch, prev_epoch); - - if is_checkpoint { - let _ = self.min_current_epoch.compare_exchange( - MAX_EPOCH, - epoch, - MemOrdering::SeqCst, - MemOrdering::SeqCst, - ); - } - self.hummock_event_sender - .send(HummockEvent::SealEpoch { - epoch, - is_checkpoint, - }) - .expect("should send success"); - StoreLocalStatistic::flush_all(); - } - - async fn clear_shared_buffer(&self) -> StorageResult<()> { - let (tx, rx) = oneshot::channel(); - self.hummock_event_sender - .send(HummockEvent::Clear(tx)) - .expect("should send success"); - rx.await.expect("should wait success"); - - let epoch = self.pinned_version.load().max_committed_epoch(); - self.min_current_epoch - .store(MAX_EPOCH, MemOrdering::SeqCst); - self.seal_epoch.store(epoch, MemOrdering::SeqCst); - - Ok(()) - } - - fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_ { - self.new_local_inner(option) - } - - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - if let HummockReadEpoch::Current(read_current_epoch) = epoch { - assert_ne!( - read_current_epoch, - MAX_EPOCH, - "epoch should not be MAX_EPOCH" - ); - let sealed_epoch = self.seal_epoch.load(MemOrdering::SeqCst); - if read_current_epoch > sealed_epoch { - tracing::warn!( - "invalid barrier read {} > max seal epoch {}", - read_current_epoch, - sealed_epoch - ); - return Err(HummockError::read_current_epoch().into()); - } - - let min_current_epoch = self.min_current_epoch.load(MemOrdering::SeqCst); - if read_current_epoch < min_current_epoch { - tracing::warn!( - "invalid barrier read {} < min current epoch {}", - read_current_epoch, - min_current_epoch - ); - return Err(HummockError::read_current_epoch().into()); - } - } - Ok(()) - } -} - -impl HummockStorage { - #[cfg(any(test, feature = "test"))] - pub async fn seal_and_sync_epoch(&self, epoch: u64) -> StorageResult { - self.seal_epoch(epoch, true); - self.sync(epoch).await - } -}