diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 6d24838cd935..0a96f45de80a 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -240,27 +240,6 @@ impl LocalHummockStorage { ) .await } - - pub async fn may_exist_inner( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> StorageResult { - if self.mem_table.iter(key_range.clone()).next().is_some() { - return Ok(true); - } - - let (key_range, read_snapshot) = read_filter_for_version( - HummockEpoch::MAX, // Use MAX epoch to make sure we read from latest - read_options.table_id, - key_range, - &self.read_version, - )?; - - self.hummock_version_reader - .may_exist(key_range, read_options, read_snapshot) - .await - } } impl StateStoreRead for LocalHummockStorage { @@ -319,14 +298,6 @@ impl LocalStateStore for LocalHummockStorage { type Iter<'a> = LocalHummockStorageIterator<'a>; type RevIter<'a> = LocalHummockStorageRevIterator<'a>; - fn may_exist( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> impl Future> + Send + '_ { - self.may_exist_inner(key_range, read_options) - } - async fn get( &self, key: TableKey, diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 33e5101847b1..7db6e1edf5e9 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -28,7 +28,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{ - bound_table_key_range, is_empty_key_range, FullKey, TableKey, TableKeyRange, UserKey, + bound_table_key_range, FullKey, TableKey, TableKeyRange, UserKey, }; use risingwave_hummock_sdk::key_range::KeyRangeCommon; use risingwave_hummock_sdk::table_watermark::{ @@ -61,9 +61,7 @@ use crate::hummock::{ use crate::mem_table::{ ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator, }; -use crate::monitor::{ - GetLocalMetricsGuard, HummockStateStoreMetrics, MayExistLocalMetricsGuard, StoreLocalStatistic, -}; +use crate::monitor::{GetLocalMetricsGuard, HummockStateStoreMetrics, StoreLocalStatistic}; use crate::store::{gen_min_epoch, ReadLogOptions, ReadOptions}; pub type CommittedVersion = PinnedVersion; @@ -913,135 +911,6 @@ impl HummockVersionReader { Ok(()) } - // Note: this method will not check the kv tomestones and delete range tomestones - pub async fn may_exist( - &self, - table_key_range: TableKeyRange, - read_options: ReadOptions, - read_version_tuple: ReadVersionTuple, - ) -> StorageResult { - if is_empty_key_range(&table_key_range) { - return Ok(false); - } - - let table_id = read_options.table_id; - let (imms, uncommitted_ssts, committed_version) = read_version_tuple; - let mut stats_guard = - MayExistLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id); - - // 1. check staging data - for imm in &imms { - if imm.range_exists(&table_key_range) { - return Ok(true); - } - } - - let user_key_range = bound_table_key_range(read_options.table_id, &table_key_range); - let user_key_range_ref = ( - user_key_range.0.as_ref().map(UserKey::as_ref), - user_key_range.1.as_ref().map(UserKey::as_ref), - ); - let bloom_filter_prefix_hash = if let Some(prefix_hint) = read_options.prefix_hint { - Sstable::hash_for_bloom_filter(&prefix_hint, table_id.table_id) - } else { - // only use `table_key_range` to see whether all SSTs are filtered out - // without looking at bloom filter because prefix_hint is not provided - if !uncommitted_ssts.is_empty() { - // uncommitted_ssts is already pruned by `table_key_range` so no extra check is - // needed. - return Ok(true); - } - for level in committed_version.levels(table_id) { - match level.level_type() { - LevelType::Overlapping | LevelType::Unspecified => { - if prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range) - .next() - .is_some() - { - return Ok(true); - } - } - LevelType::Nonoverlapping => { - if prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref) - .next() - .is_some() - { - return Ok(true); - } - } - } - } - return Ok(false); - }; - - // 2. order guarantee: imm -> sst - for local_sst in &uncommitted_ssts { - stats_guard.local_stats.may_exist_check_sstable_count += 1; - if hit_sstable_bloom_filter( - self.sstable_store - .sstable(local_sst, &mut stats_guard.local_stats) - .await? - .as_ref(), - &user_key_range_ref, - bloom_filter_prefix_hash, - &mut stats_guard.local_stats, - ) { - return Ok(true); - } - } - - // 3. read from committed_version sst file - // Because SST meta records encoded key range, - // the filter key needs to be encoded as well. - assert!(committed_version.is_valid()); - for level in committed_version.levels(table_id) { - if level.table_infos.is_empty() { - continue; - } - match level.level_type() { - LevelType::Overlapping | LevelType::Unspecified => { - let sstable_infos = - prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range); - for sstable_info in sstable_infos { - stats_guard.local_stats.may_exist_check_sstable_count += 1; - if hit_sstable_bloom_filter( - self.sstable_store - .sstable(sstable_info, &mut stats_guard.local_stats) - .await? - .as_ref(), - &user_key_range_ref, - bloom_filter_prefix_hash, - &mut stats_guard.local_stats, - ) { - return Ok(true); - } - } - } - LevelType::Nonoverlapping => { - let table_infos = - prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref); - - for table_info in table_infos { - stats_guard.local_stats.may_exist_check_sstable_count += 1; - if hit_sstable_bloom_filter( - self.sstable_store - .sstable(table_info, &mut stats_guard.local_stats) - .await? - .as_ref(), - &user_key_range_ref, - bloom_filter_prefix_hash, - &mut stats_guard.local_stats, - ) { - return Ok(true); - } - } - } - } - } - - Ok(false) - } - pub async fn iter_log( &self, version: PinnedVersion, diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 0334d2c4e35e..bd4f13b8291d 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -546,15 +546,6 @@ impl LocalStateStore for MemtableLocalState type Iter<'a> = impl StateStoreIter + 'a; type RevIter<'a> = impl StateStoreIter + 'a; - #[allow(clippy::unused_async)] - async fn may_exist( - &self, - _key_range: TableKeyRange, - _read_options: ReadOptions, - ) -> StorageResult { - Ok(true) - } - async fn get( &self, key: TableKey, diff --git a/src/storage/src/monitor/local_metrics.rs b/src/storage/src/monitor/local_metrics.rs index c218e484265b..5fd7fe6c0ef5 100644 --- a/src/storage/src/monitor/local_metrics.rs +++ b/src/storage/src/monitor/local_metrics.rs @@ -51,7 +51,6 @@ pub struct StoreLocalStatistic { pub staging_sst_iter_count: u64, pub overlapping_iter_count: u64, pub non_overlapping_iter_count: u64, - pub may_exist_check_sstable_count: u64, pub sub_iter_count: u64, pub found_key: bool, @@ -233,11 +232,9 @@ struct LocalStoreMetrics { staging_sst_iter_count: LocalHistogram, overlapping_iter_count: LocalHistogram, non_overlapping_iter_count: LocalHistogram, - may_exist_check_sstable_count: LocalHistogram, sub_iter_count: LocalHistogram, iter_filter_metrics: BloomFilterLocalMetrics, get_filter_metrics: BloomFilterLocalMetrics, - may_exist_filter_metrics: BloomFilterLocalMetrics, collect_count: usize, staging_imm_get_count: LocalHistogram, @@ -324,18 +321,12 @@ impl LocalStoreMetrics { .iter_merge_sstable_counts .with_label_values(&[table_id_label, "committed-non-overlapping-iter"]) .local(); - let may_exist_check_sstable_count = metrics - .iter_merge_sstable_counts - .with_label_values(&[table_id_label, "may-exist-check-sstable"]) - .local(); let sub_iter_count = metrics .iter_merge_sstable_counts .with_label_values(&[table_id_label, "sub-iter"]) .local(); let get_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "get"); let iter_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "iter"); - let may_exist_filter_metrics = - BloomFilterLocalMetrics::new(metrics, table_id_label, "may_exist"); let staging_imm_get_count = metrics .iter_merge_sstable_counts @@ -372,10 +363,8 @@ impl LocalStoreMetrics { overlapping_iter_count, sub_iter_count, non_overlapping_iter_count, - may_exist_check_sstable_count, get_filter_metrics, iter_filter_metrics, - may_exist_filter_metrics, collect_count: 0, staging_imm_get_count, staging_sst_get_count, @@ -425,7 +414,6 @@ add_local_metrics_histogram!( overlapping_iter_count, non_overlapping_iter_count, sub_iter_count, - may_exist_check_sstable_count, staging_imm_get_count, staging_sst_get_count, overlapping_get_count, @@ -574,37 +562,3 @@ impl Drop for IterLocalMetricsGuard { }); } } - -pub struct MayExistLocalMetricsGuard { - metrics: Arc, - table_id: TableId, - pub local_stats: StoreLocalStatistic, -} - -impl MayExistLocalMetricsGuard { - pub fn new(metrics: Arc, table_id: TableId) -> Self { - Self { - metrics, - table_id, - local_stats: StoreLocalStatistic::default(), - } - } -} - -impl Drop for MayExistLocalMetricsGuard { - fn drop(&mut self) { - LOCAL_METRICS.with_borrow_mut(|local_metrics| { - let table_metrics = local_metrics - .entry(self.table_id.table_id) - .or_insert_with(|| { - LocalStoreMetrics::new( - self.metrics.as_ref(), - self.table_id.to_string().as_str(), - ) - }); - self.local_stats.report(table_metrics); - self.local_stats - .report_bloom_filter_metrics(&table_metrics.may_exist_filter_metrics); - }); - } -} diff --git a/src/storage/src/monitor/monitored_storage_metrics.rs b/src/storage/src/monitor/monitored_storage_metrics.rs index bbf904d38381..01419586f2ee 100644 --- a/src/storage/src/monitor/monitored_storage_metrics.rs +++ b/src/storage/src/monitor/monitored_storage_metrics.rs @@ -18,14 +18,14 @@ use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; use prometheus::{ - exponential_buckets, histogram_opts, linear_buckets, register_histogram_vec_with_registry, - register_histogram_with_registry, Histogram, Registry, + exponential_buckets, histogram_opts, linear_buckets, register_histogram_with_registry, + Histogram, Registry, }; use risingwave_common::config::MetricLevel; use risingwave_common::metrics::{ LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedLocalHistogram, LabelGuardedLocalIntCounter, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, - RelabeledGuardedIntGaugeVec, RelabeledHistogramVec, + RelabeledGuardedIntGaugeVec, }; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use risingwave_common::{ @@ -55,7 +55,6 @@ pub struct MonitoredStorageMetrics { // [table_id, op_type] pub iter_log_op_type_counts: LabelGuardedIntCounterVec<2>, - pub may_exist_duration: RelabeledHistogramVec, pub sync_duration: Histogram, pub sync_size: Histogram, @@ -239,19 +238,6 @@ impl MonitoredStorageMetrics { ) .unwrap(); - let opts = histogram_opts!( - "state_store_may_exist_duration", - "Histogram of may exist time that have been issued to state store", - buckets, - ); - let may_exist_duration = - register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); - let may_exist_duration = RelabeledHistogramVec::with_metric_level( - MetricLevel::Debug, - may_exist_duration, - metric_level, - ); - let opts = histogram_opts!( "state_store_sync_duration", "Histogram of time spent on compacting shared buffer to remote storage", @@ -277,7 +263,6 @@ impl MonitoredStorageMetrics { iter_counts, iter_in_progress_counts, iter_log_op_type_counts, - may_exist_duration, sync_duration, sync_size, } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 40b5a10cca3d..2dcb2fb30e7b 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -205,26 +205,6 @@ impl LocalStateStore for MonitoredStateStore { type Iter<'a> = impl StateStoreIter + 'a; type RevIter<'a> = impl StateStoreIter + 'a; - async fn may_exist( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> StorageResult { - let table_id_label = read_options.table_id.to_string(); - let timer = self - .storage_metrics - .may_exist_duration - .with_label_values(&[table_id_label.as_str()]) - .start_timer(); - let res = self - .inner - .may_exist(key_range, read_options) - .verbose_instrument_await("store_may_exist") - .await; - timer.observe_duration(); - res - } - fn get( &self, key: TableKey, diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 35a0e497f7cf..bdd9ce90406f 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -110,14 +110,6 @@ impl LocalStateStore for TracedStateStore { type Iter<'a> = impl StateStoreIter + 'a; type RevIter<'a> = impl StateStoreIter + 'a; - fn may_exist( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> impl Future> + Send + '_ { - self.inner.may_exist(key_range, read_options) - } - fn get( &self, key: TableKey, diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 2f2f4abac1b9..7ec0249a3427 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -92,15 +92,6 @@ impl LocalStateStore for PanicStateStore { type Iter<'a> = PanicStateStoreIter; type RevIter<'a> = PanicStateStoreIter; - #[allow(clippy::unused_async)] - async fn may_exist( - &self, - _key_range: TableKeyRange, - _read_options: ReadOptions, - ) -> StorageResult { - panic!("should not call may_exist from the state store!"); - } - #[allow(clippy::unused_async)] async fn get( &self, diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 806deae3bfe6..56395d48b624 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -415,21 +415,6 @@ pub trait LocalStateStore: StaticSendSync { /// the previous write epoch is sealed. fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); - /// Check existence of a given `key_range`. - /// It is better to provide `prefix_hint` in `read_options`, which will be used - /// for checking bloom filter if hummock is used. If `prefix_hint` is not provided, - /// the false positive rate can be significantly higher because bloom filter cannot - /// be used. - /// - /// Returns: - /// - false: `key_range` is guaranteed to be absent in storage. - /// - true: `key_range` may or may not exist in storage. - fn may_exist( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> impl Future> + Send + '_; - // Updates the vnode bitmap corresponding to the local state store // Returns the previous vnode bitmap fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index c45a7680c5cc..dc1eb34be62b 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -408,17 +408,6 @@ pub mod verify { type Iter<'a> = impl StateStoreIter + 'a; type RevIter<'a> = impl StateStoreIter + 'a; - // We don't verify `may_exist` across different state stores because - // the return value of `may_exist` is implementation specific and may not - // be consistent across different state store backends. - fn may_exist( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> impl Future> + Send + '_ { - self.actual.may_exist(key_range, read_options) - } - async fn get( &self, key: TableKey, @@ -948,12 +937,6 @@ pub mod boxed_state_store { pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreIterItem>; #[async_trait::async_trait] pub trait DynamicDispatchedLocalStateStore: StaticSendSync { - async fn may_exist( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> StorageResult; - async fn get( &self, key: TableKey, @@ -998,14 +981,6 @@ pub mod boxed_state_store { #[async_trait::async_trait] impl DynamicDispatchedLocalStateStore for S { - async fn may_exist( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> StorageResult { - self.may_exist(key_range, read_options).await - } - async fn get( &self, key: TableKey, @@ -1078,14 +1053,6 @@ pub mod boxed_state_store { type Iter<'a> = BoxLocalStateStoreIterStream<'a>; type RevIter<'a> = BoxLocalStateStoreIterStream<'a>; - fn may_exist( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> impl Future> + Send + '_ { - self.deref().may_exist(key_range, read_options) - } - fn get( &self, key: TableKey, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 84267951b081..babc2d401e7e 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -39,8 +39,8 @@ use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::BasicSerde; use risingwave_hummock_sdk::key::{ - end_bound_of_prefix, prefixed_range_with_vnode, range_of_prefix, - start_bound_of_excluded_prefix, TableKey, TableKeyRange, + end_bound_of_prefix, prefixed_range_with_vnode, start_bound_of_excluded_prefix, TableKey, + TableKeyRange, }; use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; use risingwave_pb::catalog::Table; @@ -1567,51 +1567,6 @@ where .map_err(StreamExecutorError::from) } - /// Returns: - /// false: the provided pk prefix is absent in state store. - /// true: the provided pk prefix may or may not be present in state store. - pub async fn may_exist(&self, pk_prefix: impl Row) -> StreamExecutorResult { - let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); - let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); - let encoded_key_range = range_of_prefix(&encoded_prefix); - - // We assume that all usages of iterating the state table only access a single vnode. - // If this assertion fails, then something must be wrong with the operator implementation or - // the distribution derivation from the optimizer. - let vnode = self.compute_prefix_vnode(&pk_prefix); - let table_key_range = prefixed_range_with_vnode(encoded_key_range, vnode); - - // Construct prefix hint for prefix bloom filter. - if self.prefix_hint_len != 0 { - debug_assert_eq!(self.prefix_hint_len, pk_prefix.len()); - } - let prefix_hint = { - if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() { - panic!(); - } else { - let encoded_prefix_len = self - .pk_serde - .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?; - - Some(Bytes::copy_from_slice( - &encoded_prefix[..encoded_prefix_len], - )) - } - }; - - let read_options = ReadOptions { - prefix_hint, - table_id: self.table_id, - cache_policy: CachePolicy::Fill(CacheContext::Default), - ..Default::default() - }; - - self.local_store - .may_exist(table_key_range, read_options) - .await - .map_err(Into::into) - } - #[cfg(test)] pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache { &self.watermark_cache diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 87c27145ea1c..89944cdfc487 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -24,11 +24,9 @@ use risingwave_common::util::epoch::{test_epoch, EpochPair}; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::BasicSerde; use risingwave_hummock_test::test_utils::prepare_hummock_test_env; -use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::HummockStorage; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::DEFAULT_VNODE; -use risingwave_storage::StateStore; use crate::common::table::state_table::{ ReplicatedStateTable, StateTable, WatermarkCacheStateTable, @@ -1269,190 +1267,6 @@ async fn test_state_table_write_chunk_value_indices() { ); } -async fn check_may_exist( - state_table: &StateTable, - existent_prefix: Vec, - non_existent_prefix: Vec, -) where - S: StateStore, -{ - for prefix in existent_prefix { - let pk_prefix = OwnedRow::new(vec![Some(prefix.into())]); - assert!(state_table.may_exist(&pk_prefix).await.unwrap()); - } - for prefix in non_existent_prefix { - let pk_prefix = OwnedRow::new(vec![Some(prefix.into())]); - assert!(!state_table.may_exist(&pk_prefix).await.unwrap()); - } -} - -#[tokio::test] -async fn test_state_table_may_exist() { - const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; - let test_env = prepare_hummock_test_env().await; - - // let pk_columns = vec![0, 1]; leave a message to indicate pk columns - let order_types = vec![OrderType::ascending(), OrderType::descending()]; - - let column_ids = [ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)]; - let column_descs = vec![ - ColumnDesc::unnamed(column_ids[0], DataType::Int32), - ColumnDesc::unnamed(column_ids[1], DataType::Int32), - ColumnDesc::unnamed(column_ids[2], DataType::Int32), - ]; - let pk_index = vec![0_usize, 1_usize]; - let read_prefix_len_hint = 1; - let table = gen_prost_table( - TEST_TABLE_ID, - column_descs, - order_types, - pk_index, - read_prefix_len_hint, - ); - - test_env.register_table(table.clone()).await; - let mut state_table = - StateTable::from_table_catalog_inconsistent_op(&table, test_env.storage.clone(), None) - .await; - - let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); - state_table.init_epoch(epoch); - - state_table.insert(OwnedRow::new(vec![ - Some(1_i32.into()), - Some(11_i32.into()), - Some(111_i32.into()), - ])); - state_table.insert(OwnedRow::new(vec![ - Some(1_i32.into()), - Some(22_i32.into()), - Some(222_i32.into()), - ])); - - state_table.insert(OwnedRow::new(vec![ - Some(4_i32.into()), - Some(44_i32.into()), - Some(444_i32.into()), - ])); - - state_table.insert(OwnedRow::new(vec![ - Some(1_i32.into()), - Some(55_i32.into()), - Some(555_i32.into()), - ])); - - // test may_exist with data only in memtable (e1) - check_may_exist(&state_table, vec![1, 4], vec![2, 3, 6, 12]).await; - - epoch.inc_for_test(); - state_table.commit(epoch).await.unwrap(); - let e1 = epoch.prev; - - // test may_exist with data only in immutable memtable (e1) - check_may_exist(&state_table, vec![1, 4], vec![2, 3, 6, 12]).await; - - let e1_res = test_env.storage.seal_and_sync_epoch(e1).await.unwrap(); - - // test may_exist with data only in uncommitted ssts (e1) - check_may_exist(&state_table, vec![1, 4], vec![2, 3, 6, 12]).await; - - test_env.meta_client.commit_epoch(e1, e1_res).await.unwrap(); - test_env.storage.try_wait_epoch_for_test(e1).await; - - // test may_exist with data only in committed ssts (e1) - check_may_exist(&state_table, vec![1, 4], vec![2, 3, 6, 12]).await; - - state_table.insert(OwnedRow::new(vec![ - Some(1_i32.into()), - Some(33_i32.into()), - Some(333_i32.into()), - ])); - state_table.insert(OwnedRow::new(vec![ - Some(1_i32.into()), - Some(55_i32.into()), - Some(5555_i32.into()), - ])); - state_table.insert(OwnedRow::new(vec![ - Some(6_i32.into()), - Some(66_i32.into()), - Some(666_i32.into()), - ])); - - // test may_exist with data in memtable (e2), committed ssts (e1) - check_may_exist(&state_table, vec![1, 4, 6], vec![2, 3, 12]).await; - - epoch.inc_for_test(); - state_table.commit(epoch).await.unwrap(); - let e2 = epoch.prev; - - // test may_exist with data in immutable memtable (e2), committed ssts (e1) - check_may_exist(&state_table, vec![1, 4, 6], vec![2, 3, 12]).await; - - state_table.insert(OwnedRow::new(vec![ - Some(1_i32.into()), - Some(44_i32.into()), - Some(444_i32.into()), - ])); - state_table.insert(OwnedRow::new(vec![ - Some(3_i32.into()), - Some(1_i32.into()), - Some(111_i32.into()), - ])); - - // test may_exist with data in memtable (e3), immutable memtable (e2), committed ssts (e1) - check_may_exist(&state_table, vec![1, 3, 4, 6], vec![2, 12]).await; - - let e2_res = test_env.storage.seal_and_sync_epoch(e2).await.unwrap(); - - // test may_exist with data in memtable (e3), uncommitted ssts (e2), committed ssts (e1) - check_may_exist(&state_table, vec![1, 3, 4, 6], vec![2, 12]).await; - - epoch.inc_for_test(); - state_table.commit(epoch).await.unwrap(); - let e3 = epoch.prev; - - // test may_exist with data in immutable memtable (e3), uncommitted ssts (e2), committed - // ssts (e1) - check_may_exist(&state_table, vec![1, 3, 4, 6], vec![2, 12]).await; - - state_table.insert(OwnedRow::new(vec![ - Some(1_i32.into()), - Some(55_i32.into()), - Some(555_i32.into()), - ])); - state_table.insert(OwnedRow::new(vec![ - Some(2_i32.into()), - Some(1_i32.into()), - Some(111_i32.into()), - ])); - - // test may_exist with data in memtable (e4), immutable memtable (e3), uncommitted ssts - // (e2), committed ssts (e1) - check_may_exist(&state_table, vec![1, 3, 4, 6], vec![12]).await; - - test_env.meta_client.commit_epoch(e2, e2_res).await.unwrap(); - test_env.storage.try_wait_epoch_for_test(e2).await; - - epoch.inc_for_test(); - state_table.commit(epoch).await.unwrap(); - let e4 = epoch.prev; - - let e3_res = test_env.storage.seal_and_sync_epoch(e3).await.unwrap(); - let e4_res = test_env.storage.seal_and_sync_epoch(e4).await.unwrap(); - - // test may_exist with data in uncommitted ssts (e3, e4), committed ssts (e1, e2, e3, e4) - check_may_exist(&state_table, vec![1, 3, 4, 6], vec![12]).await; - - test_env.meta_client.commit_epoch(e3, e3_res).await.unwrap(); - test_env.storage.try_wait_epoch_for_test(e3).await; - - test_env.meta_client.commit_epoch(e4, e4_res).await.unwrap(); - test_env.storage.try_wait_epoch_for_test(e4).await; - - // test may_exist with data in committed ssts (e1, e2, e3, e4) - check_may_exist(&state_table, vec![1, 3, 4, 6], vec![12]).await; -} - // After NULL watermark col values are inserted & deleted, they should not appear in the state table // cache. Test for apply_batch. #[tokio::test]