diff --git a/proto/hummock.proto b/proto/hummock.proto index dc376cd694b8b..6eb4b7887e0e6 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -185,6 +185,12 @@ message HummockVersionDelta { reserved "gc_object_ids"; map new_table_watermarks = 8; repeated uint32 removed_table_ids = 9; + message ChangeLogDelta { + EpochNewChangeLog new_log = 1; + // only logs in epoch later than truncate_epoch will be preserved + uint64 truncate_epoch = 2; + } + map change_log_delta = 10; } message HummockVersionDeltas { diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 5990fe1e2cbcf..b23ce63f1de7d 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -69,6 +69,7 @@ message BarrierCompleteResponse { repeated GroupedSstableInfo synced_sstables = 4; uint32 worker_id = 5; map table_watermarks = 6; + repeated hummock.SstableInfo old_value_sstables = 7; } // Before starting streaming, the leader node broadcast the actor-host table to needed workers. diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 05dbdddfc3320..dcd1946117b26 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -16,6 +16,7 @@ use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::future::pending; +use std::iter::empty; use std::mem::{replace, take}; use std::sync::Arc; use std::time::Duration; @@ -30,6 +31,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; +use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_hummock_sdk::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, }; @@ -1144,11 +1146,12 @@ pub type BarrierManagerRef = GlobalBarrierManagerContext; fn collect_commit_epoch_info( resps: Vec, command_ctx: &CommandContext, - _epochs: &Vec, + epochs: &Vec, ) -> CommitEpochInfo { let mut sst_to_worker: HashMap = HashMap::new(); let mut synced_ssts: Vec = vec![]; let mut table_watermarks = Vec::with_capacity(resps.len()); + let mut old_value_ssts = Vec::with_capacity(resps.len()); for resp in resps { let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| { let sst_info = grouped.sst.expect("field not None"); @@ -1161,6 +1164,7 @@ fn collect_commit_epoch_info( }); synced_ssts.extend(ssts_iter); table_watermarks.push(resp.table_watermarks); + old_value_ssts.extend(resp.old_value_sstables); } let new_table_fragment_info = if let Command::CreateStreamingJob { table_fragments, .. @@ -1179,6 +1183,14 @@ fn collect_commit_epoch_info( None }; + let table_new_change_log = build_table_change_log_delta( + old_value_ssts.into_iter(), + synced_ssts.iter().map(|sst| &sst.sst_info), + epochs, + // TODO: pass log store table id and the corresponding truncate_epoch + empty(), + ); + CommitEpochInfo::new( synced_ssts, merge_multiple_new_table_watermarks( @@ -1199,5 +1211,6 @@ fn collect_commit_epoch_info( ), sst_to_worker, new_table_fragment_info, + table_new_change_log, ) } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 0dc940e006584..92d99d738af7f 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -263,6 +263,7 @@ pub struct CommitEpochInfo { pub new_table_watermarks: HashMap, pub sst_to_context: HashMap, pub new_table_fragment_info: Option, + pub change_log_delta: HashMap, } impl CommitEpochInfo { @@ -271,12 +272,14 @@ impl CommitEpochInfo { new_table_watermarks: HashMap, sst_to_context: HashMap, new_table_fragment_info: Option, + change_log_delta: HashMap, ) -> Self { Self { sstables, new_table_watermarks, sst_to_context, new_table_fragment_info, + change_log_delta, } } @@ -290,6 +293,7 @@ impl CommitEpochInfo { HashMap::new(), sst_to_context, None, + HashMap::new(), ) } } @@ -1624,6 +1628,7 @@ impl HummockManager { new_table_watermarks, sst_to_context, new_table_fragment_info, + change_log_delta, } = commit_info; let mut versioning_guard = write_lock!(self, versioning).await; let _timer = start_measure_real_process_timer!(self); @@ -1659,6 +1664,7 @@ impl HummockManager { ); new_version_delta.max_committed_epoch = epoch; new_version_delta.new_table_watermarks = new_table_watermarks; + new_version_delta.change_log_delta = change_log_delta; let mut table_compaction_group_mapping = old_version.build_compaction_group_info(); @@ -3475,6 +3481,7 @@ fn init_selectors() -> HashMap Result<()> { + let version: HummockVersion = self.hummock_manager.get_current_version().await; let sst_to_worker = sync_result .uncommitted_ssts .iter() .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), self.context_id)) .collect(); let new_table_watermark = sync_result.table_watermarks; - + let table_change_log = build_table_change_log_delta( + sync_result + .old_value_ssts + .into_iter() + .map(|sst| sst.sst_info), + sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info), + &vec![epoch], + version + .levels + .values() + .flat_map(|group| group.member_table_ids.iter().map(|table_id| (*table_id, 0))), + ); self.hummock_manager .commit_epoch( epoch, @@ -175,6 +188,7 @@ impl HummockMetaClient for MockHummockMetaClient { new_table_watermark, sst_to_worker, None, + table_change_log, ), ) .await diff --git a/src/storage/benches/bench_imm_compact.rs b/src/storage/benches/bench_imm_compact.rs index dbc5eb8c2b591..48cc12af6def6 100644 --- a/src/storage/benches/bench_imm_compact.rs +++ b/src/storage/benches/bench_imm_compact.rs @@ -18,8 +18,9 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::TableKey; use risingwave_storage::hummock::compactor::merge_imms_in_memory; -use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; -use risingwave_storage::hummock::value::HummockValue; +use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::{ + SharedBufferBatch, SharedBufferValue, +}; fn gen_interleave_shared_buffer_batch( batch_size: usize, @@ -34,7 +35,7 @@ fn gen_interleave_shared_buffer_batch( TableKey(Bytes::copy_from_slice( format!("test_key_{:08}", j * batch_count + i).as_bytes(), )), - HummockValue::put(Bytes::copy_from_slice("value".as_bytes())), + SharedBufferValue::Insert(Bytes::copy_from_slice("value".as_bytes())), )); } let batch = SharedBufferBatch::for_test(batch_data, epoch, Default::default()); diff --git a/src/storage/benches/bench_merge_iter.rs b/src/storage/benches/bench_merge_iter.rs index c7b42f1894265..91fc5bc2f3058 100644 --- a/src/storage/benches/bench_merge_iter.rs +++ b/src/storage/benches/bench_merge_iter.rs @@ -23,9 +23,8 @@ use risingwave_storage::hummock::iterator::{ Forward, HummockIterator, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator, }; use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::{ - SharedBufferBatch, SharedBufferBatchIterator, + SharedBufferBatch, SharedBufferBatchIterator, SharedBufferValue, }; -use risingwave_storage::hummock::value::HummockValue; fn gen_interleave_shared_buffer_batch_iter( batch_size: usize, @@ -39,7 +38,7 @@ fn gen_interleave_shared_buffer_batch_iter( TableKey(Bytes::copy_from_slice( format!("test_key_{:08}", j * batch_count + i).as_bytes(), )), - HummockValue::put(Bytes::copy_from_slice("value".as_bytes())), + SharedBufferValue::Insert(Bytes::copy_from_slice("value".as_bytes())), )); } let batch = SharedBufferBatch::for_test(batch_data, 2333, Default::default()); @@ -69,7 +68,7 @@ fn gen_interleave_shared_buffer_batch_enum_iter( TableKey(Bytes::copy_from_slice( format!("test_key_{:08}", j * batch_count + i).as_bytes(), )), - HummockValue::put(Bytes::copy_from_slice("value".as_bytes())), + SharedBufferValue::Insert(Bytes::copy_from_slice("value".as_bytes())), )); } let batch = SharedBufferBatch::for_test(batch_data, 2333, Default::default()); diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index cec5ef58034c6..1c68a05cd81a4 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::hummock::{PbEpochNewChangeLog, PbTableChangeLog, SstableInfo}; +use std::collections::HashMap; -#[derive(Debug, Clone, PartialEq)] -pub struct EpochNewChangeLog { - pub new_value: Vec, - pub old_value: Vec, - pub epochs: Vec, -} +use risingwave_common::catalog::TableId; +use risingwave_pb::hummock::hummock_version_delta::ChangeLogDelta; +use risingwave_pb::hummock::{EpochNewChangeLog, SstableInfo, TableChangeLog as PbTableChangeLog}; +use tracing::warn; #[derive(Debug, Clone, PartialEq)] pub struct TableChangeLog(pub Vec); @@ -34,35 +32,70 @@ impl TableChangeLog { }); &self.0[start..end] } + + pub fn truncate(&mut self, truncate_epoch: u64) { + // TODO: may optimize by using VecDeque to maintain the log + self.0 + .retain(|change_log| *change_log.epochs.last().expect("non-empty") > truncate_epoch); + if let Some(first_log) = self.0.first_mut() { + first_log.epochs.retain(|epoch| *epoch > truncate_epoch); + } + } } impl TableChangeLog { pub fn to_protobuf(&self) -> PbTableChangeLog { PbTableChangeLog { - change_logs: self - .0 - .iter() - .map(|epoch_new_log| PbEpochNewChangeLog { - epochs: epoch_new_log.epochs.clone(), - new_value: epoch_new_log.new_value.clone(), - old_value: epoch_new_log.old_value.clone(), - }) - .collect(), + change_logs: self.0.clone(), } } pub fn from_protobuf(val: &PbTableChangeLog) -> Self { - Self( - val.change_logs - .iter() - .map(|epoch_new_log| EpochNewChangeLog { - epochs: epoch_new_log.epochs.clone(), - new_value: epoch_new_log.new_value.clone(), - old_value: epoch_new_log.old_value.clone(), - }) - .collect(), - ) + Self(val.change_logs.clone()) + } +} + +pub fn build_table_change_log_delta<'a>( + old_value_ssts: impl Iterator, + new_value_ssts: impl Iterator, + epochs: &Vec, + log_store_table_ids: impl Iterator, +) -> HashMap { + let mut table_change_log: HashMap<_, _> = log_store_table_ids + .map(|(table_id, truncate_epoch)| { + ( + TableId::new(table_id), + ChangeLogDelta { + truncate_epoch, + new_log: Some(EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: epochs.clone(), + }), + }, + ) + }) + .collect(); + for sst in old_value_ssts { + for table_id in &sst.table_ids { + match table_change_log.get_mut(&TableId::new(*table_id)) { + Some(log) => { + log.new_log.as_mut().unwrap().old_value.push(sst.clone()); + } + None => { + warn!(table_id, ?sst, "old value sst contains non-log-store table"); + } + } + } } + for sst in new_value_ssts { + for table_id in &sst.table_ids { + if let Some(log) = table_change_log.get_mut(&TableId::new(*table_id)) { + log.new_log.as_mut().unwrap().new_value.push(sst.clone()); + } + } + } + table_change_log } #[cfg(test)] @@ -110,4 +143,69 @@ mod tests { } } } + + #[test] + fn test_truncate() { + let mut table_change_log = TableChangeLog(vec![ + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![1], + }, + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![2], + }, + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![3, 4], + }, + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![5], + }, + ]); + + table_change_log.truncate(1); + assert_eq!( + table_change_log, + TableChangeLog(vec![ + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![2], + }, + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![3, 4], + }, + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![5], + }, + ]) + ); + + table_change_log.truncate(3); + assert_eq!( + table_change_log, + TableChangeLog(vec![ + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![4], + }, + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![5], + }, + ]) + ) + } } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index ded56ede6fe65..c341b36508b3f 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -31,6 +31,7 @@ use risingwave_pb::hummock::{ use tracing::warn; use super::StateTableId; +use crate::change_log::TableChangeLog; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; use crate::prost_key_range::KeyRangeExt; @@ -165,6 +166,15 @@ impl HummockVersion { .iter() .map(|table_info| table_info.get_object_id()) }) + .chain(self.table_change_log.values().flat_map(|change_log| { + change_log.0.iter().flat_map(|epoch_change_log| { + epoch_change_log + .old_value + .iter() + .map(|sst| sst.object_id) + .chain(epoch_change_log.new_value.iter().map(|sst| sst.object_id)) + }) + })) .collect() } @@ -585,9 +595,6 @@ impl HummockVersion { } self.id = version_delta.id; self.max_committed_epoch = version_delta.max_committed_epoch; - for table_id in &version_delta.removed_table_ids { - let _ = self.table_watermarks.remove(table_id); - } let mut modified_table_watermarks: HashMap = HashMap::new(); @@ -616,6 +623,41 @@ impl HummockVersion { .insert(table_id, Arc::new(table_watermarks)); } + for (table_id, change_log_delta) in &version_delta.change_log_delta { + let new_change_log = change_log_delta.new_log.as_ref().unwrap(); + match self.table_change_log.entry(*table_id) { + Entry::Occupied(entry) => { + let change_log = entry.into_mut(); + if let Some(prev_log) = change_log.0.last() { + assert!( + prev_log.epochs.last().expect("non-empty") + < new_change_log.epochs.first().expect("non-empty") + ); + } + change_log.0.push(new_change_log.clone()); + } + Entry::Vacant(entry) => { + entry.insert(TableChangeLog(vec![new_change_log.clone()])); + } + }; + } + + for table_id in &version_delta.removed_table_ids { + let _ = self.table_watermarks.remove(table_id); + let _ = self.table_change_log.remove(table_id); + } + + // If a table has no new change log entry (even an empty one), it means we have stopped maintained + // the change log for the table + self.table_change_log + .retain(|table_id, _| version_delta.change_log_delta.contains_key(table_id)); + + for (table_id, change_log_delta) in &version_delta.change_log_delta { + if let Some(change_log) = self.table_change_log.get_mut(table_id) { + change_log.truncate(change_log_delta.truncate_epoch); + } + } + sst_split_info } @@ -1067,6 +1109,7 @@ pub fn build_version_delta_after_version(version: &HummockVersion) -> HummockVer group_deltas: Default::default(), new_table_watermarks: HashMap::new(), removed_table_ids: vec![], + change_log_delta: HashMap::new(), } } diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 1fbd627e13f02..f60d68b32ebff 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -101,6 +101,8 @@ pub struct SyncResult { pub uncommitted_ssts: Vec, /// The collected table watermarks written by state tables. pub table_watermarks: HashMap, + /// Sstable that holds the uncommitted old value + pub old_value_ssts: Vec, } #[derive(Debug, Clone)] diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 3bf6fa02df463..024da16dcb0e7 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -19,9 +19,11 @@ use std::sync::Arc; use prost::Message; use risingwave_common::catalog::TableId; use risingwave_pb::hummock::group_delta::DeltaType; -use risingwave_pb::hummock::hummock_version::PbLevels; -use risingwave_pb::hummock::hummock_version_delta::GroupDeltas as PbGroupDeltas; -use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta}; +use risingwave_pb::hummock::hummock_version::Levels as PbLevels; +use risingwave_pb::hummock::hummock_version_delta::{ChangeLogDelta, GroupDeltas as PbGroupDeltas}; +use risingwave_pb::hummock::{ + HummockVersionDelta as PbHummockVersionDelta, PbHummockVersion, SstableInfo, +}; use crate::change_log::TableChangeLog; use crate::table_watermark::TableWatermarks; @@ -138,6 +140,7 @@ pub struct HummockVersionDelta { pub trivial_move: bool, pub new_table_watermarks: HashMap, pub removed_table_ids: Vec, + pub change_log_delta: HashMap, } impl Default for HummockVersionDelta { @@ -182,6 +185,19 @@ impl HummockVersionDelta { .iter() .map(|table_id| TableId::new(*table_id)) .collect(), + change_log_delta: delta + .change_log_delta + .iter() + .map(|(table_id, log_delta)| { + ( + TableId::new(*table_id), + ChangeLogDelta { + new_log: log_delta.new_log.clone(), + truncate_epoch: log_delta.truncate_epoch, + }, + ) + }) + .collect(), } } @@ -203,6 +219,11 @@ impl HummockVersionDelta { .iter() .map(|table_id| table_id.table_id) .collect(), + change_log_delta: self + .change_log_delta + .iter() + .map(|(table_id, log_delta)| (table_id.table_id, log_delta.clone())) + .collect(), } } } @@ -213,27 +234,31 @@ impl HummockVersionDelta { /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`, /// but it is possible that the object is moved or split from other compaction groups or levels. pub fn newly_added_object_ids(&self) -> HashSet { - let mut ret = HashSet::new(); - for group_deltas in self.group_deltas.values() { - for group_delta in &group_deltas.group_deltas { - if let Some(delta_type) = &group_delta.delta_type { - match delta_type { - DeltaType::IntraLevel(level_delta) => { - ret.extend( - level_delta - .inserted_table_infos - .iter() - .map(|sst| sst.object_id), - ); - } - DeltaType::GroupConstruct(_) - | DeltaType::GroupDestroy(_) - | DeltaType::GroupMetaChange(_) - | DeltaType::GroupTableChange(_) => {} - } - } - } - } - ret + self.group_deltas + .values() + .flat_map(|group_deltas| { + group_deltas.group_deltas.iter().flat_map(|group_delta| { + group_delta.delta_type.iter().flat_map(|delta_type| { + static EMPTY_VEC: Vec = Vec::new(); + let sst_slice = match delta_type { + DeltaType::IntraLevel(level_delta) => &level_delta.inserted_table_infos, + DeltaType::GroupConstruct(_) + | DeltaType::GroupDestroy(_) + | DeltaType::GroupMetaChange(_) + | DeltaType::GroupTableChange(_) => &EMPTY_VEC, + }; + sst_slice.iter().map(|sst| sst.object_id) + }) + }) + }) + .chain(self.change_log_delta.values().flat_map(|delta| { + let new_log = delta.new_log.as_ref().unwrap(); + new_log + .new_value + .iter() + .map(|sst| sst.object_id) + .chain(new_log.old_value.iter().map(|sst| sst.object_id)) + })) + .collect() } } diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 0da472629751d..fc8fe8fbd0985 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -52,9 +52,8 @@ async fn test_read_version_basic() { { // single imm - let kv_pairs = gen_dummy_batch(1); - let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(kv_pairs); - let size = SharedBufferBatch::measure_batch_size(&sorted_items); + let sorted_items = gen_dummy_batch(1); + let size = SharedBufferBatch::measure_batch_size(&sorted_items, None); let imm = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, @@ -87,9 +86,8 @@ async fn test_read_version_basic() { // several epoch for i in 0..5 { epoch.inc_epoch(); - let kv_pairs = gen_dummy_batch(i + 2); - let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(kv_pairs); - let size = SharedBufferBatch::measure_batch_size(&sorted_items); + let sorted_items = gen_dummy_batch(i + 2); + let size = SharedBufferBatch::measure_batch_size(&sorted_items, None); let imm = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, @@ -178,6 +176,7 @@ async fn test_read_version_basic() { ..Default::default() }), ], + vec![], epoch_id_vec_for_clear, batch_id_vec_for_clear, 1, @@ -275,9 +274,8 @@ async fn test_read_filter_basic() { { // single imm - let kv_pairs = gen_dummy_batch(epoch); - let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(kv_pairs); - let size = SharedBufferBatch::measure_batch_size(&sorted_items); + let sorted_items = gen_dummy_batch(epoch); + let size = SharedBufferBatch::measure_batch_size(&sorted_items, None); let imm = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 40dd23d78d987..a6218f668da64 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -13,34 +13,39 @@ // limitations under the License. use std::ops::Bound; +use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::Arc; use bytes::Bytes; use expect_test::expect; use foyer::memory::CacheContext; use futures::{pin_mut, StreamExt}; +use itertools::Itertools; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; -use risingwave_hummock_sdk::key::prefixed_range_with_vnode; +use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; use risingwave_hummock_sdk::{ HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, }; use risingwave_meta::hummock::test_utils::setup_compute_env; -use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_rpc_client::HummockMetaClient; +use risingwave_storage::hummock::iterator::change_log::test_utils::{ + apply_test_log_data, gen_test_data, +}; use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; use risingwave_storage::hummock::test_utils::{count_stream, default_opts_for_test}; use risingwave_storage::hummock::{CachePolicy, HummockStorage}; +use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::storage_value::StorageValue; use risingwave_storage::store::*; +use risingwave_storage::store_impl::verify::VerifyStateStore; use crate::get_notification_client_for_test; use crate::local_state_store_test_utils::LocalStateStoreTestExt; -use crate::test_utils::{ - gen_key_from_str, with_hummock_storage_v2, HummockStateStoreTestTrait, TestIngestBatch, -}; +use crate::test_utils::{gen_key_from_str, with_hummock_storage_v2, TestIngestBatch}; #[tokio::test] async fn test_empty_read_v2() { @@ -80,13 +85,6 @@ async fn test_empty_read_v2() { #[tokio::test] async fn test_basic_v2() { let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - test_basic_inner(hummock_storage, meta_client).await; -} - -async fn test_basic_inner( - hummock_storage: impl HummockStateStoreTestTrait, - meta_client: Arc, -) { let anchor = gen_key_from_str(VirtualNode::ZERO, "aa"); // First batch inserts the anchor and others. @@ -409,14 +407,8 @@ async fn test_basic_inner( #[tokio::test] async fn test_state_store_sync_v2() { - let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - test_state_store_sync_inner(hummock_storage, meta_client).await; -} + let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await; -async fn test_state_store_sync_inner( - hummock_storage: impl HummockStateStoreTestTrait, - _meta_client: Arc, -) { let mut epoch = hummock_storage .get_pinned_version() .max_committed_epoch() @@ -1027,13 +1019,7 @@ async fn test_reload_storage() { #[tokio::test] async fn test_delete_get_v2() { let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - test_delete_get_inner(hummock_storage, meta_client).await; -} -async fn test_delete_get_inner( - hummock_storage: impl HummockStateStoreTestTrait, - meta_client: Arc, -) { let initial_epoch = hummock_storage.get_pinned_version().max_committed_epoch(); let epoch1 = initial_epoch.next_epoch(); let batch1 = vec![ @@ -1103,13 +1089,7 @@ async fn test_delete_get_inner( #[tokio::test] async fn test_multiple_epoch_sync_v2() { let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - test_multiple_epoch_sync_inner(hummock_storage, meta_client).await; -} -async fn test_multiple_epoch_sync_inner( - hummock_storage: impl HummockStateStoreTestTrait, - meta_client: Arc, -) { let initial_epoch = hummock_storage.get_pinned_version().max_committed_epoch(); let epoch1 = initial_epoch.next_epoch(); let batch1 = vec![ @@ -1546,3 +1526,112 @@ async fn test_replicated_local_hummock_storage() { expected.assert_debug_eq(&actual); } } + +#[tokio::test] +async fn test_iter_log() { + let table_id = TableId::new(233); + let (hummock_storage, meta_client) = with_hummock_storage_v2(table_id).await; + let epoch_count = 10; + let key_count = 10000; + + let test_log_data = gen_test_data(epoch_count, key_count, 0.05, 0.2); + let in_memory_state_store = MemoryStateStore::new(); + + let mut in_memory_local = in_memory_state_store + .new_local(NewLocalOptions { + table_id, + op_consistency_level: OpConsistencyLevel::ConsistentOldValue { + check_old_value: CHECK_BYTES_EQUAL.clone(), + is_log_store: true, + }, + table_option: Default::default(), + is_replicated: false, + vnodes: TableDistribution::all_vnodes(), + }) + .await; + + apply_test_log_data(test_log_data.clone(), &mut in_memory_local, 0.0).await; + + let mut hummock_local = hummock_storage + .new_local(NewLocalOptions { + table_id, + op_consistency_level: OpConsistencyLevel::ConsistentOldValue { + check_old_value: CHECK_BYTES_EQUAL.clone(), + is_log_store: true, + }, + table_option: Default::default(), + is_replicated: false, + vnodes: TableDistribution::all_vnodes(), + }) + .await; + // flush for about 10 times per epoch + apply_test_log_data(test_log_data.clone(), &mut hummock_local, 0.001).await; + + for (epoch, _) in &test_log_data { + let res = hummock_storage.seal_and_sync_epoch(*epoch).await.unwrap(); + if *epoch != test_log_data[0].0 { + assert!(!res.old_value_ssts.is_empty()); + } + assert!(!res.uncommitted_ssts.is_empty()); + meta_client.commit_epoch(*epoch, res).await.unwrap(); + } + + hummock_storage + .try_wait_epoch_for_test(test_log_data.last().unwrap().0) + .await; + + let verify_state_store = VerifyStateStore { + actual: hummock_storage, + expected: Some(in_memory_state_store), + _phantom: Default::default(), + }; + + let verify_iter_log = |key_range: TableKeyRange| { + let state_store = &verify_state_store; + let test_log_data = &test_log_data; + async move { + for start_epoch_idx in 0..epoch_count { + for end_epoch_idx in start_epoch_idx..epoch_count { + let min_epoch = test_log_data[start_epoch_idx].0; + let max_epoch = test_log_data[end_epoch_idx].0; + let mut iter = state_store + .iter_log( + (min_epoch, max_epoch), + key_range.clone(), + ReadLogOptions { table_id }, + ) + .await + .unwrap(); + while iter.try_next().await.unwrap().is_some() {} + } + } + } + }; + + verify_iter_log((Unbounded, Unbounded)).await; + + let keys = test_log_data + .iter() + .flat_map(|(_, logs)| logs.iter().map(|(key, _)| key.clone())) + .sorted() + .dedup() + .collect_vec(); + assert_eq!(keys.len(), key_count); + let test_key_count = 5; + let step = key_count / test_key_count; + + for start_key_idx in (0..test_key_count).step_by(step) { + let start_key = keys[start_key_idx].clone(); + let start_bound = Included(start_key); + for end_key_idx in (start_key_idx..test_key_count).step_by(step) { + let end_key = keys[end_key_idx].clone(); + let end_bound = if end_key_idx % 2 == 0 { + Included(end_key) + } else { + Excluded(end_key) + }; + verify_iter_log((start_bound.clone(), end_bound)).await; + } + verify_iter_log((start_bound, Unbounded)).await; + } +} diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 9c25eee366bda..1cd0fbb560699 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -21,7 +21,7 @@ use std::sync::{Arc, LazyLock}; use await_tree::InstrumentAwait; use bytes::Bytes; use foyer::memory::CacheContext; -use futures::future::try_join_all; +use futures::future::{try_join, try_join_all}; use futures::{stream, FutureExt, StreamExt, TryFutureExt}; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -38,7 +38,7 @@ use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorMana use crate::hummock::compactor::compaction_filter::DummyCompactionFilter; use crate::hummock::compactor::context::{await_tree_key, CompactorContext}; use crate::hummock::compactor::{check_flush_result, CompactOutput, Compactor}; -use crate::hummock::event_handler::uploader::UploadTaskPayload; +use crate::hummock::event_handler::uploader::{UploadTaskOutput, UploadTaskPayload}; use crate::hummock::event_handler::LocalInstanceId; use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator}; use crate::hummock::shared_buffer::shared_buffer_batch::{ @@ -62,9 +62,9 @@ pub async fn compact( payload: UploadTaskPayload, compaction_group_index: Arc>, filter_key_extractor_manager: FilterKeyExtractorManager, -) -> HummockResult> { +) -> HummockResult { let mut grouped_payload: HashMap = HashMap::new(); - for imm in payload { + for imm in &payload { let compaction_group_id = match compaction_group_index.get(&imm.table_id) { // compaction group id is used only as a hint for grouping different data. // If the compaction group id is not found for the table id, we can assign a @@ -79,14 +79,14 @@ pub async fn compact( grouped_payload .entry(compaction_group_id) .or_default() - .push(imm); + .push(imm.clone()); } - let mut futures = vec![]; + let mut new_value_futures = vec![]; for (id, group_payload) in grouped_payload { let id_copy = id; - futures.push( - compact_shared_buffer( + new_value_futures.push( + compact_shared_buffer::( context.clone(), sstable_object_id_manager.clone(), filter_key_extractor_manager.clone(), @@ -104,22 +104,51 @@ pub async fn compact( .instrument_await(format!("shared_buffer_compact_compaction_group {}", id)), ); } - // Note that the output is reordered compared with input `payload`. - let result = try_join_all(futures) - .await? + + let old_value_payload = payload .into_iter() - .flatten() + .filter(|imm| imm.has_old_value()) .collect_vec(); - Ok(result) + + let old_value_future = async { + if old_value_payload.is_empty() { + Ok(vec![]) + } else { + compact_shared_buffer::( + context.clone(), + sstable_object_id_manager, + filter_key_extractor_manager, + old_value_payload, + ) + .await + } + }; + + // Note that the output is reordered compared with input `payload`. + let (grouped_new_value_ssts, old_value_ssts) = + try_join(try_join_all(new_value_futures), old_value_future).await?; + + let new_value_ssts = grouped_new_value_ssts.into_iter().flatten().collect_vec(); + Ok(UploadTaskOutput { + new_value_ssts, + old_value_ssts, + wait_poll_timer: None, + }) } /// For compaction from shared buffer to level 0, this is the only function gets called. -async fn compact_shared_buffer( +/// +/// The `IS_NEW_VALUE` flag means for the given payload, we are doing compaction using its new value or old value. +/// When `IS_NEW_VALUE` is false, we are compacting with old value, and the payload imms should have `old_values` not `None` +async fn compact_shared_buffer( context: CompactorContext, sstable_object_id_manager: SstableObjectIdManagerRef, filter_key_extractor_manager: FilterKeyExtractorManager, mut payload: UploadTaskPayload, ) -> HummockResult> { + if !IS_NEW_VALUE { + assert!(payload.iter().all(|imm| imm.has_old_value())); + } // Local memory compaction looks at all key ranges. let mut existing_table_ids: HashSet = payload @@ -177,7 +206,7 @@ async fn compact_shared_buffer( ); let mut forward_iters = Vec::with_capacity(payload.len()); for imm in &payload { - forward_iters.push(imm.clone().into_forward_iter()); + forward_iters.push(imm.clone().into_directed_iter::()); } let compaction_executor = context.compaction_executor.clone(); let multi_filter_key_extractor = multi_filter_key_extractor.clone(); @@ -309,6 +338,11 @@ pub async fn merge_imms_in_memory( assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted()); let max_imm_id = imms[0].batch_id(); + let has_old_value = imms[0].has_old_value(); + // TODO: make sure that the corner case on switch_op_consistency is handled + // If the imm of a table id contains old value, all other imm of the same table id should have old value + assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value)); + let mut imm_iters = Vec::with_capacity(imms.len()); let key_count = imms.iter().map(|imm| imm.key_count()).sum(); let value_count = imms.iter().map(|imm| imm.value_count()).sum(); @@ -336,6 +370,11 @@ pub async fn merge_imms_in_memory( let mut merged_entries: Vec = Vec::with_capacity(key_count); let mut values: Vec = Vec::with_capacity(value_count); + let mut old_values: Option> = if has_old_value { + Some(Vec::with_capacity(value_count)) + } else { + None + }; merged_entries.push(SharedBufferKeyEntry { key: first_item_key.clone(), @@ -380,6 +419,9 @@ pub async fn merge_imms_in_memory( .iter() .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())), ); + if let Some(old_values) = &mut old_values { + old_values.extend(key_entry.old_values.expect("should exist").iter().cloned()) + } mi.advance_peek_to_next_key(); // Since there is no blocking point in this method, but it is cpu intensive, we call this method // to do cooperative scheduling @@ -391,6 +433,7 @@ pub async fn merge_imms_in_memory( epochs, merged_entries, values, + old_values, merged_size, max_imm_id, memory_tracker, @@ -564,7 +607,7 @@ mod tests { use risingwave_hummock_sdk::key::{prefix_slice_with_vnode, TableKey}; use crate::hummock::compactor::shared_buffer_compact::generate_splits; - use crate::hummock::value::HummockValue; + use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue; use crate::mem_table::ImmutableMemtable; use crate::opts::StorageOpts; @@ -582,7 +625,7 @@ mod tests { 0, vec![( generate_key("dddd"), - HummockValue::put(Bytes::from_static(b"v3")), + SharedBufferValue::Insert(Bytes::from_static(b"v3")), )], 1024 * 1024, TableId::new(1), @@ -592,7 +635,7 @@ mod tests { 0, vec![( generate_key("abb"), - HummockValue::put(Bytes::from_static(b"v3")), + SharedBufferValue::Insert(Bytes::from_static(b"v3")), )], (1024 + 256) * 1024, TableId::new(1), @@ -603,7 +646,7 @@ mod tests { 0, vec![( generate_key("abc"), - HummockValue::put(Bytes::from_static(b"v2")), + SharedBufferValue::Insert(Bytes::from_static(b"v2")), )], (1024 + 512) * 1024, TableId::new(1), @@ -613,7 +656,7 @@ mod tests { 0, vec![( generate_key("aaa"), - HummockValue::put(Bytes::from_static(b"v3")), + SharedBufferValue::Insert(Bytes::from_static(b"v3")), )], (1024 + 512) * 1024, TableId::new(1), @@ -624,7 +667,7 @@ mod tests { 0, vec![( generate_key("aaa"), - HummockValue::put(Bytes::from_static(b"v3")), + SharedBufferValue::Insert(Bytes::from_static(b"v3")), )], (1024 + 256) * 1024, TableId::new(2), diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 0b96aa19cb837..682586f789955 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -28,7 +28,7 @@ use prometheus::core::{AtomicU64, GenericGauge}; use prometheus::{Histogram, IntGauge}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; -use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo, SyncResult}; +use risingwave_hummock_sdk::{HummockEpoch, SyncResult}; use thiserror_ext::AsReport; use tokio::spawn; use tokio::sync::mpsc::error::SendError; @@ -199,7 +199,7 @@ async fn flush_imms( compactor_context: CompactorContext, filter_key_extractor_manager: FilterKeyExtractorManager, sstable_object_id_manager: Arc, -) -> HummockResult> { +) -> HummockResult { for epoch in &task_info.epochs { let _ = sstable_object_id_manager .add_watermark_object_id(Some(*epoch)) @@ -257,7 +257,7 @@ impl HummockEventHandler { spawn({ let future = async move { let _timer = upload_task_latency.start_timer(); - let ssts = flush_imms( + let mut output = flush_imms( payload, task_info, upload_compactor_context.clone(), @@ -265,10 +265,14 @@ impl HummockEventHandler { sstable_object_id_manager.clone(), ) .await?; - Ok(UploadTaskOutput { - ssts, - wait_poll_timer: Some(wait_poll_latency.start_timer()), - }) + assert!( + output + .wait_poll_timer + .replace(wait_poll_latency.start_timer()) + .is_none(), + "should not set timer before" + ); + Ok(output) }; if let Some(tree_root) = tree_root { tree_root.instrument(future).left_future() @@ -988,6 +992,13 @@ fn to_sync_result(result: &HummockResult) -> HummockResult Err(HummockError::other(format!( @@ -1024,10 +1035,11 @@ mod tests { use crate::hummock::event_handler::{HummockEvent, HummockEventHandler, HummockVersionUpdate}; use crate::hummock::iterator::test_utils::mock_sstable_store; use crate::hummock::local_version::pinned_version::PinnedVersion; - use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; + use crate::hummock::shared_buffer::shared_buffer_batch::{ + SharedBufferBatch, SharedBufferValue, + }; use crate::hummock::store::version::{StagingData, VersionUpdate}; use crate::hummock::test_utils::default_opts_for_test; - use crate::hummock::value::HummockValue; use crate::hummock::HummockError; use crate::monitor::HummockStateStoreMetrics; @@ -1100,7 +1112,8 @@ mod tests { SharedBufferBatch::build_shared_buffer_batch( epoch, spill_offset, - vec![(TableKey(Bytes::from("key")), HummockValue::Delete)], + vec![(TableKey(Bytes::from("key")), SharedBufferValue::Delete)], + None, 10, table_id, instance_id, diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 30be483a72f61..81f6bda0219c0 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -56,8 +56,10 @@ use crate::opts::StorageOpts; pub type UploadTaskPayload = Vec; +#[derive(Debug)] pub struct UploadTaskOutput { - pub ssts: Vec, + pub new_value_ssts: Vec, + pub old_value_ssts: Vec, pub wait_poll_timer: Option, } pub type SpawnUploadTask = Arc< @@ -296,7 +298,8 @@ impl UploadingTask { .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed")) .map(|output| { StagingSstableInfo::new( - output.ssts, + output.new_value_ssts, + output.old_value_ssts, self.task_info.epochs.clone(), self.task_info.imm_ids.clone(), self.task_info.task_size, @@ -478,16 +481,11 @@ struct SealedData { impl SealedData { fn clear(&mut self) { - self.epochs.clear(); - self.spilled_data.clear(); - self.imms_by_table_shard.clear(); - self.merged_imms.clear(); self.merging_tasks - .iter() + .drain(..) .for_each(|task| task.join_handle.abort()); - self.merging_tasks.clear(); - self.table_watermarks.clear(); + *self = Self::default(); } /// Add the data of a newly sealed epoch. @@ -842,11 +840,8 @@ impl HummockUploader { epoch, self.max_sealed_epoch ); - self.unsealed_data - .entry(epoch) - .or_default() - .imms - .push_front(imm); + let unsealed_data = self.unsealed_data.entry(epoch).or_default(); + unsealed_data.imms.push_front(imm); } pub(crate) fn add_table_watermarks( @@ -1265,13 +1260,13 @@ mod tests { iterator_test_table_key_of, transform_shared_buffer, }; use crate::hummock::local_version::pinned_version::PinnedVersion; - use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; - use crate::hummock::value::HummockValue; + use crate::hummock::shared_buffer::shared_buffer_batch::{ + SharedBufferBatch, SharedBufferValue, + }; use crate::hummock::{HummockError, HummockResult, MemoryLimiter}; use crate::mem_table::{ImmId, ImmutableMemtable}; use crate::monitor::HummockStateStoreMetrics; use crate::opts::StorageOpts; - use crate::storage_value::StorageValue; const INITIAL_EPOCH: HummockEpoch = test_epoch(5); const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; @@ -1304,11 +1299,11 @@ mod tests { epoch: HummockEpoch, limiter: Option<&MemoryLimiter>, ) -> ImmutableMemtable { - let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(vec![( + let sorted_items = vec![( TableKey(Bytes::from(dummy_table_key())), - StorageValue::new_delete(), - )]); - let size = SharedBufferBatch::measure_batch_size(&sorted_items); + SharedBufferValue::Delete, + )]; + let size = SharedBufferBatch::measure_batch_size(&sorted_items, None); let tracker = match limiter { Some(limiter) => Some(limiter.require_memory(size as u64).await), None => None, @@ -1317,6 +1312,7 @@ mod tests { epoch, 0, sorted_items, + None, size, TEST_TABLE_ID, LocalInstanceId::default(), @@ -1385,8 +1381,12 @@ mod tests { ) } - fn dummy_success_upload_output() -> Vec { - gen_sstable_info(INITIAL_EPOCH, INITIAL_EPOCH) + fn dummy_success_upload_output() -> UploadTaskOutput { + UploadTaskOutput { + new_value_ssts: gen_sstable_info(INITIAL_EPOCH, INITIAL_EPOCH), + old_value_ssts: vec![], + wait_poll_timer: None, + } } #[allow(clippy::unused_async)] @@ -1394,10 +1394,7 @@ mod tests { _: UploadTaskPayload, _: UploadTaskInfo, ) -> HummockResult { - Ok(UploadTaskOutput { - ssts: dummy_success_upload_output(), - wait_poll_timer: None, - }) + Ok(dummy_success_upload_output()) } #[allow(clippy::unused_async)] @@ -1420,7 +1417,10 @@ mod tests { assert_eq!(vec![imm_id], task.task_info.imm_ids); assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs); let output = task.await.unwrap(); - assert_eq!(output.sstable_infos(), &dummy_success_upload_output()); + assert_eq!( + output.sstable_infos(), + &dummy_success_upload_output().new_value_ssts + ); assert_eq!(imm_size, output.imm_size()); assert_eq!(&vec![imm_id], output.imm_ids()); assert_eq!(&vec![INITIAL_EPOCH], output.epochs()); @@ -1435,7 +1435,10 @@ mod tests { let uploader_context = test_uploader_context(dummy_success_upload_future); let mut task = UploadingTask::new(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context); let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap(); - assert_eq!(output.sstable_infos(), &dummy_success_upload_output()); + assert_eq!( + output.sstable_infos(), + &dummy_success_upload_output().new_value_ssts + ); let uploader_context = test_uploader_context(dummy_fail_upload_future); let mut task = UploadingTask::new(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context); @@ -1463,7 +1466,10 @@ mod tests { let mut task = UploadingTask::new(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context); let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await; assert_eq!(fail_num + 1, run_count_clone.load(SeqCst)); - assert_eq!(output.sstable_infos(), &dummy_success_upload_output()); + assert_eq!( + output.sstable_infos(), + &dummy_success_upload_output().new_value_ssts + ); } #[tokio::test] @@ -1509,7 +1515,10 @@ mod tests { let staging_sst = ssts.first().unwrap(); assert_eq!(&vec![epoch1], staging_sst.epochs()); assert_eq!(&vec![imm.batch_id()], staging_sst.imm_ids()); - assert_eq!(&dummy_success_upload_output(), staging_sst.sstable_infos()); + assert_eq!( + &dummy_success_upload_output().new_value_ssts, + staging_sst.sstable_infos() + ); } _ => unreachable!(), }; @@ -1520,7 +1529,10 @@ mod tests { let staging_sst = ssts.first().unwrap(); assert_eq!(&vec![epoch1], staging_sst.epochs()); assert_eq!(&vec![imm.batch_id()], staging_sst.imm_ids()); - assert_eq!(&dummy_success_upload_output(), staging_sst.sstable_infos()); + assert_eq!( + &dummy_success_upload_output().new_value_ssts, + staging_sst.sstable_infos() + ); let new_pinned_version = uploader .context @@ -1666,18 +1678,18 @@ mod tests { #[tokio::test] async fn test_drop_success_merging_task() { let table_id = TableId { table_id: 1004 }; - let shared_buffer_items1: Vec<(Vec, HummockValue)> = vec![ + let shared_buffer_items1: Vec<(Vec, SharedBufferValue)> = vec![ ( iterator_test_table_key_of(1), - HummockValue::put(Bytes::from("value1")), + SharedBufferValue::Insert(Bytes::from("value1")), ), ( iterator_test_table_key_of(2), - HummockValue::put(Bytes::from("value2")), + SharedBufferValue::Insert(Bytes::from("value2")), ), ( iterator_test_table_key_of(3), - HummockValue::put(Bytes::from("value3")), + SharedBufferValue::Insert(Bytes::from("value3")), ), ]; let epoch = test_epoch(1); @@ -1686,18 +1698,18 @@ mod tests { epoch, table_id, ); - let shared_buffer_items2: Vec<(Vec, HummockValue)> = vec![ + let shared_buffer_items2: Vec<(Vec, SharedBufferValue)> = vec![ ( iterator_test_table_key_of(1), - HummockValue::put(Bytes::from("value12")), + SharedBufferValue::Insert(Bytes::from("value12")), ), ( iterator_test_table_key_of(2), - HummockValue::put(Bytes::from("value22")), + SharedBufferValue::Insert(Bytes::from("value22")), ), ( iterator_test_table_key_of(3), - HummockValue::put(Bytes::from("value32")), + SharedBufferValue::Insert(Bytes::from("value32")), ), ]; let epoch = test_epoch(2); @@ -1707,18 +1719,18 @@ mod tests { table_id, ); - let shared_buffer_items3: Vec<(Vec, HummockValue)> = vec![ + let shared_buffer_items3: Vec<(Vec, SharedBufferValue)> = vec![ ( iterator_test_table_key_of(1), - HummockValue::put(Bytes::from("value13")), + SharedBufferValue::Insert(Bytes::from("value13")), ), ( iterator_test_table_key_of(2), - HummockValue::put(Bytes::from("value23")), + SharedBufferValue::Insert(Bytes::from("value23")), ), ( iterator_test_table_key_of(3), - HummockValue::put(Bytes::from("value33")), + SharedBufferValue::Insert(Bytes::from("value33")), ), ]; let epoch = test_epoch(3); @@ -1860,7 +1872,8 @@ mod tests { start_tx.send(task_info).unwrap(); finish_rx.await.unwrap(); Ok(UploadTaskOutput { - ssts, + new_value_ssts: ssts, + old_value_ssts: vec![], wait_poll_timer: None, }) }) diff --git a/src/storage/src/hummock/iterator/change_log.rs b/src/storage/src/hummock/iterator/change_log.rs index 8d6187b8cd474..f2b39c524cb18 100644 --- a/src/storage/src/hummock/iterator/change_log.rs +++ b/src/storage/src/hummock/iterator/change_log.rs @@ -387,6 +387,123 @@ impl StateStoreIter for ChangeLogIterator { } } +#[cfg(any(test, feature = "test"))] +pub mod test_utils { + use std::collections::HashMap; + + use bytes::Bytes; + use rand::{thread_rng, Rng, RngCore}; + use risingwave_common::util::epoch::{test_epoch, EpochPair, MAX_EPOCH}; + use risingwave_hummock_sdk::key::TableKey; + + use crate::hummock::iterator::test_utils::iterator_test_table_key_of; + use crate::mem_table::KeyOp; + use crate::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions}; + + pub type TestLogDataType = Vec<(u64, Vec<(TableKey, KeyOp)>)>; + + pub fn gen_test_data( + epoch_count: usize, + key_count: usize, + skip_ratio: f64, + delete_ratio: f64, + ) -> TestLogDataType { + let mut store: HashMap, Bytes> = HashMap::new(); + let mut rng = thread_rng(); + let mut logs = Vec::new(); + for epoch_idx in 1..=(epoch_count - 1) { + let mut epoch_logs = Vec::new(); + let epoch = test_epoch(epoch_idx as _); + for key_idx in 0..key_count { + if rng.gen_bool(skip_ratio) { + continue; + } + let key = TableKey(Bytes::from(iterator_test_table_key_of(key_idx))); + if rng.gen_bool(delete_ratio) { + if let Some(prev_value) = store.remove(&key) { + epoch_logs.push((key, KeyOp::Delete(prev_value))); + } + } else { + let value = Bytes::copy_from_slice(rng.next_u64().to_string().as_bytes()); + let prev_value = store.get(&key); + if let Some(prev_value) = prev_value { + epoch_logs.push(( + key.clone(), + KeyOp::Update((prev_value.clone(), value.clone())), + )); + } else { + epoch_logs.push((key.clone(), KeyOp::Insert(value.clone()))); + } + store.insert(key, value); + } + } + logs.push((epoch, epoch_logs)); + } + // at the end add an epoch with only delete + { + let mut epoch_logs = Vec::new(); + let epoch = test_epoch(epoch_count as _); + for (key, value) in store { + epoch_logs.push((key, KeyOp::Delete(value))); + } + logs.push((epoch, epoch_logs)); + } + logs + } + + pub async fn apply_test_log_data( + log_data: TestLogDataType, + state_store: &mut impl LocalStateStore, + try_flush_ratio: f64, + ) { + let mut rng = thread_rng(); + let first_epoch = log_data[0].0; + for (epoch, epoch_logs) in log_data { + if epoch == first_epoch { + state_store + .init(InitOptions { + epoch: EpochPair::new_test_epoch(epoch), + }) + .await + .unwrap(); + } else { + state_store.flush().await.unwrap(); + state_store.seal_current_epoch( + epoch, + SealCurrentEpochOptions { + table_watermarks: None, + switch_op_consistency_level: None, + }, + ); + } + for (key, op) in epoch_logs { + match op { + KeyOp::Insert(value) => { + state_store.insert(key, value, None).unwrap(); + } + KeyOp::Delete(old_value) => { + state_store.delete(key, old_value).unwrap(); + } + KeyOp::Update((old_value, value)) => { + state_store.insert(key, value, Some(old_value)).unwrap(); + } + } + if rng.gen_bool(try_flush_ratio) { + state_store.try_flush().await.unwrap(); + } + } + } + state_store.flush().await.unwrap(); + state_store.seal_current_epoch( + MAX_EPOCH, + SealCurrentEpochOptions { + table_watermarks: None, + switch_op_consistency_level: None, + }, + ); + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -394,13 +511,15 @@ mod tests { use bytes::Bytes; use itertools::Itertools; - use rand::{thread_rng, Rng, RngCore}; use risingwave_common::catalog::TableId; use risingwave_common::hash::table_distribution::TableDistribution; - use risingwave_common::util::epoch::{test_epoch, EpochPair}; + use risingwave_common::util::epoch::test_epoch; use risingwave_hummock_sdk::key::{TableKey, UserKey}; use risingwave_hummock_sdk::EpochWithGap; + use crate::hummock::iterator::change_log::test_utils::{ + apply_test_log_data, gen_test_data, TestLogDataType, + }; use crate::hummock::iterator::change_log::ChangeLogIteratorInner; use crate::hummock::iterator::test_utils::{ iterator_test_table_key_of, iterator_test_value_of, @@ -409,9 +528,8 @@ mod tests { use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableStore}; use crate::memory::MemoryStateStore; use crate::store::{ - ChangeLogValue, InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, - ReadLogOptions, ReadOptions, SealCurrentEpochOptions, StateStoreIter, StateStoreRead, - CHECK_BYTES_EQUAL, + ChangeLogValue, NewLocalOptions, OpConsistencyLevel, ReadLogOptions, StateStoreIter, + StateStoreRead, CHECK_BYTES_EQUAL, }; use crate::StateStore; @@ -533,131 +651,66 @@ mod tests { assert!(!iter.is_valid()); } - async fn gen_test_data( - table_id: TableId, - epoch_count: usize, - key_count: usize, - delete_ratio: f64, - ) -> (Vec<(u64, MemTableStore, MemTableStore)>, MemoryStateStore) { - let state_store = MemoryStateStore::new(); - let mut rng = thread_rng(); - let mut local = state_store - .new_local(NewLocalOptions { - table_id, - op_consistency_level: OpConsistencyLevel::ConsistentOldValue( - CHECK_BYTES_EQUAL.clone(), - ), - table_option: Default::default(), - is_replicated: false, - vnodes: TableDistribution::all_vnodes(), - }) - .await; + fn gen_test_mem_table_store( + test_log_data: TestLogDataType, + ) -> Vec<(u64, MemTableStore, MemTableStore)> { let mut logs = Vec::new(); - for epoch_idx in 1..=epoch_count { - let epoch = test_epoch(epoch_idx as _); + for (epoch, epoch_logs) in test_log_data { let mut new_values = MemTableStore::new(); let mut old_values = MemTableStore::new(); - if epoch_idx == 1 { - local - .init(InitOptions { - epoch: EpochPair::new_test_epoch(epoch), - }) - .await - .unwrap(); - } else { - local.flush().await.unwrap(); - local.seal_current_epoch( - epoch, - SealCurrentEpochOptions { - table_watermarks: None, - switch_op_consistency_level: None, - }, - ); - } - for key_idx in 0..key_count { - let key = TableKey(Bytes::from(iterator_test_table_key_of(key_idx))); - if rng.gen_bool(delete_ratio) { - if let Some(prev_value) = local - .get( - key.clone(), - ReadOptions { - prefix_hint: None, - ignore_range_tombstone: false, - prefetch_options: Default::default(), - cache_policy: Default::default(), - retention_seconds: None, - table_id, - read_version_from_backup: false, - }, - ) - .await - .unwrap() - { - new_values.insert(key.clone(), KeyOp::Delete(Bytes::new())); - old_values.insert(key.clone(), KeyOp::Insert(prev_value.clone())); - local.delete(key, prev_value).unwrap(); - } - } else { - let value = Bytes::copy_from_slice(rng.next_u64().to_string().as_bytes()); - new_values.insert(key.clone(), KeyOp::Insert(value.clone())); - let prev_value = local - .get( - key.clone(), - ReadOptions { - prefix_hint: None, - ignore_range_tombstone: false, - prefetch_options: Default::default(), - cache_policy: Default::default(), - retention_seconds: None, - table_id, - read_version_from_backup: false, - }, - ) - .await - .unwrap(); - if let Some(prev_value) = prev_value.clone() { - old_values.insert(key.clone(), KeyOp::Insert(prev_value)); - } - local.insert(key, value, prev_value).unwrap(); + for (key, op) in epoch_logs { + new_values.insert(key.clone(), op.clone()); + if let KeyOp::Delete(old_value) | KeyOp::Update((old_value, _)) = op { + old_values.insert(key, KeyOp::Insert(old_value)); } } logs.push((epoch, new_values, old_values)); } - local.flush().await.unwrap(); - local.seal_current_epoch( - test_epoch((epoch_count + 1) as _), - SealCurrentEpochOptions { - table_watermarks: None, - switch_op_consistency_level: None, - }, - ); - (logs, state_store) + logs } #[tokio::test] async fn test_random_data() { let table_id = TableId::new(233); let epoch_count = 10; - let (logs, state_store) = gen_test_data(table_id, epoch_count, 10000, 0.2).await; + let state_store = MemoryStateStore::new(); + let mut local = state_store + .new_local(NewLocalOptions { + table_id, + op_consistency_level: OpConsistencyLevel::ConsistentOldValue { + check_old_value: CHECK_BYTES_EQUAL.clone(), + is_log_store: true, + }, + table_option: Default::default(), + is_replicated: false, + vnodes: TableDistribution::all_vnodes(), + }) + .await; + let logs = gen_test_data(epoch_count, 10000, 0.05, 0.2); assert_eq!(logs.len(), epoch_count); + apply_test_log_data(logs.clone(), &mut local, 0.0).await; + let mem_table_logs = gen_test_mem_table_store(logs.clone()); + assert_eq!(mem_table_logs.len(), epoch_count); for start_epoch_idx in 0..epoch_count { - for end_epoch_idx in start_epoch_idx + 1..epoch_count { - let new_value_iter = - MergeIterator::new(logs.iter().map(|(epoch, new_value_memtable, _)| { + for end_epoch_idx in start_epoch_idx..epoch_count { + let new_value_iter = MergeIterator::new(mem_table_logs.iter().map( + |(epoch, new_value_memtable, _)| { MemTableHummockIterator::new( new_value_memtable, EpochWithGap::new_from_epoch(*epoch), table_id, ) - })); - let old_value_iter = - MergeIterator::new(logs.iter().map(|(epoch, _, old_value_memtable)| { + }, + )); + let old_value_iter = MergeIterator::new(mem_table_logs.iter().map( + |(epoch, _, old_value_memtable)| { MemTableHummockIterator::new( old_value_memtable, EpochWithGap::new_from_epoch(*epoch), table_id, ) - })); + }, + )); let epoch_range = (logs[start_epoch_idx].0, logs[end_epoch_idx].0); let mut change_log_iter = ChangeLogIteratorInner::new( epoch_range, diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index 9cae83be3f895..0128eecadfd61 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -41,13 +41,13 @@ use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::iterator::HummockIteratorUnion::{First, Fourth, Second, Third}; -mod change_log; -pub use change_log::*; +pub mod change_log; mod concat_delete_range_iterator; mod delete_range_iterator; mod skip_watermark; #[cfg(any(test, feature = "test"))] pub mod test_utils; + pub use delete_range_iterator::{ DeleteRangeIterator, ForwardMergeRangeIterator, RangeIteratorTyped, }; diff --git a/src/storage/src/hummock/iterator/skip_watermark.rs b/src/storage/src/hummock/iterator/skip_watermark.rs index b9c6609bd524b..0a9e03717b79a 100644 --- a/src/storage/src/hummock/iterator/skip_watermark.rs +++ b/src/storage/src/hummock/iterator/skip_watermark.rs @@ -303,8 +303,9 @@ mod tests { use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::iterator::{HummockIterator, SkipWatermarkIterator}; - use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; - use crate::hummock::value::HummockValue; + use crate::hummock::shared_buffer::shared_buffer_batch::{ + SharedBufferBatch, SharedBufferValue, + }; const EPOCH: u64 = test_epoch(1); const TABLE_ID: TableId = TableId::new(233); @@ -349,7 +350,7 @@ mod tests { } fn build_batch( - pairs: impl Iterator, HummockValue)>, + pairs: impl Iterator, SharedBufferValue)>, ) -> Option { let pairs: Vec<_> = pairs.collect(); if pairs.is_empty() { @@ -360,9 +361,9 @@ mod tests { } fn filter_with_watermarks( - iter: impl Iterator, HummockValue)>, + iter: impl Iterator, SharedBufferValue)>, table_watermarks: ReadTableWatermark, - ) -> impl Iterator, HummockValue)> { + ) -> impl Iterator, SharedBufferValue)> { iter.filter(move |(key, _)| { if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) { !table_watermarks @@ -424,10 +425,10 @@ mod tests { assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await; } - fn gen_key_value(vnode: usize, index: usize) -> (TableKey, HummockValue) { + fn gen_key_value(vnode: usize, index: usize) -> (TableKey, SharedBufferValue) { ( gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)), - HummockValue::Put(Bytes::copy_from_slice( + SharedBufferValue::Insert(Bytes::copy_from_slice( format!("{}-value-{}", vnode, index).as_bytes(), )), ) diff --git a/src/storage/src/hummock/iterator/test_utils.rs b/src/storage/src/hummock/iterator/test_utils.rs index 0103280570f5e..a74a8929edf1f 100644 --- a/src/storage/src/hummock/iterator/test_utils.rs +++ b/src/storage/src/hummock/iterator/test_utils.rs @@ -28,6 +28,7 @@ use risingwave_object_store::object::{ }; use risingwave_pb::hummock::SstableInfo; +use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::sstable_store::SstableStore; pub use crate::hummock::test_utils::default_builder_opt_for_test; @@ -132,8 +133,8 @@ pub fn iterator_test_value_of(idx: usize) -> Vec { } pub fn transform_shared_buffer( - batches: Vec<(Vec, HummockValue)>, -) -> Vec<(TableKey, HummockValue)> { + batches: Vec<(Vec, SharedBufferValue)>, +) -> Vec<(TableKey, SharedBufferValue)> { batches .into_iter() .map(|(k, v)| (TableKey(k.into()), v)) diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 5857c5d2f8bd2..7749a992a699a 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -16,6 +16,7 @@ use std::cmp::Ordering; use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; +use std::mem::size_of_val; use std::ops::Bound::Included; use std::ops::{Bound, RangeBounds}; use std::sync::atomic::AtomicU64; @@ -37,18 +38,56 @@ use crate::hummock::utils::{range_overlap, MemoryTracker}; use crate::hummock::value::HummockValue; use crate::hummock::{HummockEpoch, HummockResult, MonotonicDeleteEvent}; use crate::mem_table::ImmId; -use crate::storage_value::StorageValue; use crate::store::ReadOptions; +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum SharedBufferValue { + Insert(T), + Update(T), + Delete, +} + +impl SharedBufferValue { + fn to_ref(&self) -> SharedBufferValue<&T> { + match self { + SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val), + SharedBufferValue::Update(val) => SharedBufferValue::Update(val), + SharedBufferValue::Delete => SharedBufferValue::Delete, + } + } +} + +impl From> for HummockValue { + fn from(val: SharedBufferValue) -> HummockValue { + match val { + SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => { + HummockValue::Put(val) + } + SharedBufferValue::Delete => HummockValue::Delete, + } + } +} + +impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> { + pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> { + match self { + SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()), + SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()), + SharedBufferValue::Delete => SharedBufferValue::Delete, + } + } +} + /// The key is `table_key`, which does not contain table id or epoch. -pub(crate) type SharedBufferItem = (TableKey, HummockValue); +pub(crate) type SharedBufferItem = (TableKey, SharedBufferValue); pub type SharedBufferBatchId = u64; -pub(crate) type VersionedSharedBufferValue = (EpochWithGap, HummockValue); +pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue); pub(crate) struct SharedBufferVersionedEntryRef<'a> { pub(crate) key: &'a TableKey, pub(crate) new_values: &'a [VersionedSharedBufferValue], + pub(crate) old_values: Option<&'a [Bytes]>, } #[derive(PartialEq, Debug)] @@ -63,10 +102,10 @@ pub(crate) struct SharedBufferKeyEntry { impl SharedBufferKeyEntry { /// Return an exclusive offset of the values of key of index `i` - fn value_end_offset<'a>( + fn value_end_offset<'a, T>( i: usize, entries: &'a [SharedBufferKeyEntry], - values: &'a [VersionedSharedBufferValue], + values: &'a [T], ) -> usize { entries .get(i + 1) @@ -74,11 +113,7 @@ impl SharedBufferKeyEntry { .unwrap_or(values.len()) } - fn values<'a>( - i: usize, - entries: &'a [SharedBufferKeyEntry], - values: &'a [VersionedSharedBufferValue], - ) -> &'a [VersionedSharedBufferValue] { + fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] { &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)] } } @@ -87,6 +122,9 @@ impl SharedBufferKeyEntry { pub(crate) struct SharedBufferBatchInner { entries: Vec, new_values: Vec, + /// Store the old values. If some, the length should be the same as `new_values`. It contains empty `Bytes` when the + /// corresponding `new_value` is `Insert`, and contains the old values of `Update` and `Delete`. + old_values: Option>, /// The epochs of the data in batch, sorted in ascending order (old to new) epochs: Vec, /// Total size of all key-value items (excluding the `epoch` of value versions) @@ -102,11 +140,15 @@ impl SharedBufferBatchInner { epoch: HummockEpoch, spill_offset: u16, payload: Vec, + old_values: Option>, size: usize, _tracker: Option, ) -> Self { assert!(!payload.is_empty()); debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key)); + if let Some(old_values) = &old_values { + assert_eq!(old_values.len(), payload.len()); + } let epoch_with_gap = EpochWithGap::new(epoch, spill_offset); let mut entries = Vec::with_capacity(payload.len()); @@ -123,6 +165,7 @@ impl SharedBufferBatchInner { SharedBufferBatchInner { entries, new_values, + old_values, epochs: vec![epoch], size, _tracker, @@ -139,6 +182,7 @@ impl SharedBufferBatchInner { epochs: Vec, entries: Vec, new_values: Vec, + old_values: Option>, size: usize, imm_id: ImmId, tracker: Option, @@ -161,6 +205,7 @@ impl SharedBufferBatchInner { Self { entries, new_values, + old_values, epochs, size, _tracker: tracker, @@ -188,7 +233,7 @@ impl SharedBufferBatchInner { if read_epoch < e.pure_epoch() { continue; } - return Some((v.clone(), *e)); + return Some((v.clone().into(), *e)); } // cannot find a visible version } @@ -207,7 +252,7 @@ pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock = LazyLock::new(|| AtomicU64::new(0)); /// A write batch stored in the shared buffer. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug)] pub struct SharedBufferBatch { pub(crate) inner: Arc, pub table_id: TableId, @@ -220,13 +265,32 @@ impl SharedBufferBatch { epoch: HummockEpoch, table_id: TableId, ) -> Self { - let size = Self::measure_batch_size(&sorted_items); + Self::for_test_inner(sorted_items, None, epoch, table_id) + } + + pub fn for_test_with_old_values( + sorted_items: Vec, + old_values: Vec, + epoch: HummockEpoch, + table_id: TableId, + ) -> Self { + Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id) + } + + fn for_test_inner( + sorted_items: Vec, + old_values: Option>, + epoch: HummockEpoch, + table_id: TableId, + ) -> Self { + let size = Self::measure_batch_size(&sorted_items, old_values.as_deref()); Self { inner: Arc::new(SharedBufferBatchInner::new( epoch, 0, sorted_items, + old_values, size, None, )), @@ -253,19 +317,30 @@ impl SharedBufferBatch { .sum() } - pub fn measure_batch_size(batch_items: &[SharedBufferItem]) -> usize { + pub fn measure_batch_size( + batch_items: &[SharedBufferItem], + old_values: Option<&[Bytes]>, + ) -> usize { // size = Sum(length of full key + length of user value) batch_items .iter() .map(|(k, v)| { - k.len() + { - match v { - HummockValue::Put(val) => val.len(), - HummockValue::Delete => 0, + k.len() + + { + match v { + SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => { + val.len() + } + SharedBufferValue::Delete => 0, + } } - } + + size_of_val(v) }) - .sum() + .sum::() + + old_values + .iter() + .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len())) + .sum::() } pub fn filter(&self, table_id: TableId, table_key_range: &R) -> bool @@ -309,6 +384,10 @@ impl SharedBufferBatch { self.inner.new_values.len() } + pub fn has_old_value(&self) -> bool { + self.inner.old_values.is_some() + } + pub fn get( &self, table_key: TableKey<&[u8]>, @@ -346,8 +425,14 @@ impl SharedBufferBatch { .is_ok() } - pub fn into_directed_iter(self) -> SharedBufferBatchIterator { - SharedBufferBatchIterator::::new(self.inner, self.table_id) + pub fn into_directed_iter( + self, + ) -> SharedBufferBatchIterator { + SharedBufferBatchIterator::::new(self.inner, self.table_id) + } + + pub fn into_old_value_iter(self) -> SharedBufferBatchIterator { + self.into_directed_iter() } pub fn into_forward_iter(self) -> SharedBufferBatchIterator { @@ -391,25 +476,24 @@ impl SharedBufferBatch { &self.inner.epochs } - pub fn build_shared_buffer_item_batches( - kv_pairs: Vec<(TableKey, StorageValue)>, - ) -> Vec { - kv_pairs - .into_iter() - .map(|(key, value)| (key, value.into())) - .collect() - } - pub fn build_shared_buffer_batch( epoch: HummockEpoch, spill_offset: u16, sorted_items: Vec, + old_values: Option>, size: usize, table_id: TableId, instance_id: LocalInstanceId, tracker: Option, ) -> Self { - let inner = SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, size, tracker); + let inner = SharedBufferBatchInner::new( + epoch, + spill_offset, + sorted_items, + old_values, + size, + tracker, + ); SharedBufferBatch { inner: Arc::new(inner), table_id, @@ -456,7 +540,8 @@ impl SharedBufferBatch { size: usize, table_id: TableId, ) -> Self { - let inner = SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, size, None); + let inner = + SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None); SharedBufferBatch { inner: Arc::new(inner), table_id, @@ -467,7 +552,7 @@ impl SharedBufferBatch { /// Iterate all the items in the shared buffer batch /// If there are multiple versions of a key, the iterator will return all versions -pub struct SharedBufferBatchIterator { +pub struct SharedBufferBatchIterator { inner: Arc, /// The index of the current entry in the payload current_entry_idx: usize, @@ -479,8 +564,17 @@ pub struct SharedBufferBatchIterator { _phantom: PhantomData, } -impl SharedBufferBatchIterator { +impl + SharedBufferBatchIterator +{ pub(crate) fn new(inner: Arc, table_id: TableId) -> Self { + if !IS_NEW_VALUE { + assert!( + inner.old_values.is_some(), + "create old value iter with no old value: {:?}", + table_id + ); + } Self { inner, current_entry_idx: 0, @@ -533,6 +627,12 @@ impl SharedBufferBatchIterator { ); debug_assert_eq!(self.value_end_offset, self.get_value_end_offset()); debug_assert!(self.current_value_idx < self.value_end_offset); + if !IS_NEW_VALUE { + debug_assert!(!matches!( + &self.inner.new_values[self.current_value_idx].1, + SharedBufferValue::Insert(_) + )); + } } fn advance_to_next_value(&mut self) { @@ -547,6 +647,35 @@ impl SharedBufferBatchIterator { } } } + + fn advance_until_valid_old_value(&mut self) { + debug_assert!(!IS_NEW_VALUE); + if !self.is_valid_entry_idx() { + return; + } + loop { + while self.current_value_idx < self.value_end_offset + && matches!( + &self.inner.new_values[self.current_value_idx].1, + SharedBufferValue::Insert(_) + ) + { + self.current_value_idx += 1; + } + if self.current_value_idx >= self.value_end_offset { + debug_assert_eq!(self.current_value_idx, self.value_end_offset); + self.advance_to_next_entry(); + if self.is_valid_entry_idx() { + self.reset_value_idx(); + continue; + } else { + break; + } + } else { + break; + } + } + } } impl SharedBufferBatchIterator { @@ -566,15 +695,25 @@ impl SharedBufferBatchIterator { SharedBufferVersionedEntryRef { key: &self.inner.entries[self.current_entry_idx].key, new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset], + old_values: self + .inner + .old_values + .as_ref() + .map(|old_values| &old_values[self.current_value_idx..self.value_end_offset]), } } } -impl HummockIterator for SharedBufferBatchIterator { +impl HummockIterator + for SharedBufferBatchIterator +{ type Direction = D; async fn next(&mut self) -> HummockResult<()> { self.advance_to_next_value(); + if !IS_NEW_VALUE { + self.advance_until_valid_old_value(); + } Ok(()) } @@ -587,7 +726,17 @@ impl HummockIterator for SharedBufferBatchIterator< fn value(&self) -> HummockValue<&[u8]> { self.assert_valid_idx(); - self.inner.new_values[self.current_value_idx].1.as_slice() + if IS_NEW_VALUE { + self.inner.new_values[self.current_value_idx] + .1 + .to_ref() + .to_slice() + .into() + } else { + HummockValue::put( + self.inner.old_values.as_ref().unwrap()[self.current_value_idx].as_ref(), + ) + } } fn is_valid(&self) -> bool { @@ -604,6 +753,9 @@ impl HummockIterator for SharedBufferBatchIterator< } }; self.reset_value_idx(); + if !IS_NEW_VALUE { + self.advance_until_valid_old_value(); + } Ok(()) } @@ -651,6 +803,9 @@ impl HummockIterator for SharedBufferBatchIterator< } }, }; + if !IS_NEW_VALUE { + self.advance_until_valid_old_value(); + } Ok(()) } @@ -781,6 +936,7 @@ impl DeleteRangeIterator for SharedBufferDeleteRangeIterator { mod tests { use std::ops::Bound::{Excluded, Included}; + use itertools::{zip_eq, Itertools}; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::key::map_table_key_range; @@ -790,21 +946,27 @@ mod tests { iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer, }; + fn to_hummock_value_batch( + items: Vec<(Vec, SharedBufferValue)>, + ) -> Vec<(Vec, HummockValue)> { + items.into_iter().map(|(k, v)| (k, v.into())).collect() + } + #[tokio::test] async fn test_shared_buffer_batch_basic() { let epoch = test_epoch(1); - let shared_buffer_items: Vec<(Vec, HummockValue)> = vec![ + let shared_buffer_items: Vec<(Vec, SharedBufferValue)> = vec![ ( iterator_test_table_key_of(0), - HummockValue::put(Bytes::from("value1")), + SharedBufferValue::Insert(Bytes::from("value1")), ), ( iterator_test_table_key_of(1), - HummockValue::put(Bytes::from("value1")), + SharedBufferValue::Insert(Bytes::from("value1")), ), ( iterator_test_table_key_of(2), - HummockValue::put(Bytes::from("value1")), + SharedBufferValue::Insert(Bytes::from("value1")), ), ]; let shared_buffer_batch = SharedBufferBatch::for_test( @@ -812,6 +974,7 @@ mod tests { epoch, Default::default(), ); + let shared_buffer_items = to_hummock_value_batch(shared_buffer_items); // Sketch assert_eq!( @@ -884,15 +1047,15 @@ mod tests { let shared_buffer_items = vec![ ( iterator_test_table_key_of(1), - HummockValue::put(Bytes::from("value1")), + SharedBufferValue::Insert(Bytes::from("value1")), ), ( iterator_test_table_key_of(2), - HummockValue::put(Bytes::from("value2")), + SharedBufferValue::Insert(Bytes::from("value2")), ), ( iterator_test_table_key_of(3), - HummockValue::put(Bytes::from("value3")), + SharedBufferValue::Insert(Bytes::from("value3")), ), ]; let shared_buffer_batch = SharedBufferBatch::for_test( @@ -900,6 +1063,7 @@ mod tests { epoch, Default::default(), ); + let shared_buffer_items = to_hummock_value_batch(shared_buffer_items); // FORWARD: Seek to a key < 1st key, expect all three items to return let mut iter = shared_buffer_batch.clone().into_forward_iter(); @@ -1018,6 +1182,178 @@ mod tests { assert!(!iter.is_valid()); } + #[tokio::test] + async fn test_shared_buffer_batch_old_value_iter() { + let epoch = test_epoch(1); + let key_values = vec![ + ( + iterator_test_table_key_of(1), + SharedBufferValue::Insert(Bytes::from("value1")), + ), + ( + iterator_test_table_key_of(2), + SharedBufferValue::Update(Bytes::from("value2")), + ), + ( + iterator_test_table_key_of(3), + SharedBufferValue::Insert(Bytes::from("value3")), + ), + (iterator_test_table_key_of(4), SharedBufferValue::Delete), + ]; + let old_values = vec![ + Bytes::new(), + Bytes::from("old_value2"), + Bytes::new(), + Bytes::from("old_value4"), + ]; + let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values( + transform_shared_buffer(key_values.clone()), + old_values.clone(), + epoch, + Default::default(), + ); + let shared_buffer_items = to_hummock_value_batch(key_values.clone()); + let expected_old_value_iter_items = zip_eq(&key_values, &old_values) + .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_))) + .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value))) + .collect_vec(); + + let mut iter = shared_buffer_batch.clone().into_old_value_iter(); + iter.rewind().await.unwrap(); + for item in &expected_old_value_iter_items { + assert!(iter.is_valid()); + assert_eq!(*iter.key().user_key.table_key, item.0); + assert_eq!(iter.value(), item.1.as_slice()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + + // FORWARD: Seek to a key < 1st key, expect all three items to return + let mut iter = shared_buffer_batch.clone().into_forward_iter(); + iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref()) + .await + .unwrap(); + for item in &shared_buffer_items { + assert!(iter.is_valid()); + assert_eq!(*iter.key().user_key.table_key, item.0); + assert_eq!(iter.value(), item.1.as_slice()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + + let mut iter = shared_buffer_batch.clone().into_old_value_iter(); + iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref()) + .await + .unwrap(); + for item in &expected_old_value_iter_items { + assert!(iter.is_valid()); + assert_eq!(*iter.key().user_key.table_key, item.0); + assert_eq!(iter.value(), item.1.as_slice()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + + // FORWARD: Seek to a key > the last key, expect no items to return + let mut iter = shared_buffer_batch.clone().into_forward_iter(); + iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref()) + .await + .unwrap(); + assert!(!iter.is_valid()); + + let mut iter = shared_buffer_batch.clone().into_old_value_iter(); + iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref()) + .await + .unwrap(); + assert!(!iter.is_valid()); + + // FORWARD: Seek to 2nd key with current epoch, expect last two items to return + let mut iter = shared_buffer_batch.clone().into_forward_iter(); + iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref()) + .await + .unwrap(); + for item in &shared_buffer_items[1..] { + assert!(iter.is_valid()); + assert_eq!(*iter.key().user_key.table_key, item.0); + assert_eq!(iter.value(), item.1.as_slice()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + + let mut iter = shared_buffer_batch.clone().into_old_value_iter(); + iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref()) + .await + .unwrap(); + for item in &expected_old_value_iter_items { + assert!(iter.is_valid()); + assert_eq!(*iter.key().user_key.table_key, item.0); + assert_eq!(iter.value(), item.1.as_slice()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + + // FORWARD: Seek to 2nd key with future epoch, expect last two items to return + let mut iter = shared_buffer_batch.clone().into_forward_iter(); + iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref()) + .await + .unwrap(); + for item in &shared_buffer_items[1..] { + assert!(iter.is_valid()); + assert_eq!(*iter.key().user_key.table_key, item.0.as_slice()); + assert_eq!(iter.value(), item.1.as_slice()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + + let mut iter = shared_buffer_batch.clone().into_old_value_iter(); + iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref()) + .await + .unwrap(); + for item in &expected_old_value_iter_items { + assert!(iter.is_valid()); + assert_eq!(*iter.key().user_key.table_key, item.0); + assert_eq!(iter.value(), item.1.as_slice()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + + let mut iter = shared_buffer_batch.clone().into_old_value_iter(); + iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref()) + .await + .unwrap(); + for item in &expected_old_value_iter_items[1..] { + assert!(iter.is_valid()); + assert_eq!(*iter.key().user_key.table_key, item.0); + assert_eq!(iter.value(), item.1.as_slice()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + + // FORWARD: Seek to 2nd key with old epoch, expect last item to return + let mut iter = shared_buffer_batch.clone().into_forward_iter(); + iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref()) + .await + .unwrap(); + let item = shared_buffer_items.last().unwrap(); + assert!(iter.is_valid()); + assert_eq!(*iter.key().user_key.table_key, item.0.as_slice()); + assert_eq!(iter.value(), item.1.as_slice()); + iter.next().await.unwrap(); + assert!(!iter.is_valid()); + + // Seek to an insert key + let mut iter = shared_buffer_batch.clone().into_old_value_iter(); + iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref()) + .await + .unwrap(); + for item in &expected_old_value_iter_items[1..] { + assert!(iter.is_valid()); + assert_eq!(*iter.key().user_key.table_key, item.0); + assert_eq!(iter.value(), item.1.as_slice()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + } + #[tokio::test] #[should_panic] async fn test_invalid_table_id() { @@ -1034,10 +1370,22 @@ mod tests { async fn test_shared_buffer_batch_range_existx() { let epoch = test_epoch(1); let shared_buffer_items = vec![ - (Vec::from("a_1"), HummockValue::put(Bytes::from("value1"))), - (Vec::from("a_3"), HummockValue::put(Bytes::from("value2"))), - (Vec::from("a_5"), HummockValue::put(Bytes::from("value3"))), - (Vec::from("b_2"), HummockValue::put(Bytes::from("value3"))), + ( + Vec::from("a_1"), + SharedBufferValue::Insert(Bytes::from("value1")), + ), + ( + Vec::from("a_3"), + SharedBufferValue::Insert(Bytes::from("value2")), + ), + ( + Vec::from("a_5"), + SharedBufferValue::Insert(Bytes::from("value3")), + ), + ( + Vec::from("b_2"), + SharedBufferValue::Insert(Bytes::from("value3")), + ), ]; let shared_buffer_batch = SharedBufferBatch::for_test( transform_shared_buffer(shared_buffer_items), @@ -1077,18 +1425,18 @@ mod tests { #[tokio::test] async fn test_merge_imms_basic() { let table_id = TableId { table_id: 1004 }; - let shared_buffer_items1: Vec<(Vec, HummockValue)> = vec![ + let shared_buffer_items1: Vec<(Vec, SharedBufferValue)> = vec![ ( iterator_test_table_key_of(1), - HummockValue::put(Bytes::from("value1")), + SharedBufferValue::Insert(Bytes::from("value1")), ), ( iterator_test_table_key_of(2), - HummockValue::put(Bytes::from("value2")), + SharedBufferValue::Insert(Bytes::from("value2")), ), ( iterator_test_table_key_of(3), - HummockValue::put(Bytes::from("value3")), + SharedBufferValue::Insert(Bytes::from("value3")), ), ]; let epoch = test_epoch(1); @@ -1097,18 +1445,19 @@ mod tests { epoch, table_id, ); - let shared_buffer_items2: Vec<(Vec, HummockValue)> = vec![ + let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1); + let shared_buffer_items2: Vec<(Vec, SharedBufferValue)> = vec![ ( iterator_test_table_key_of(1), - HummockValue::put(Bytes::from("value12")), + SharedBufferValue::Insert(Bytes::from("value12")), ), ( iterator_test_table_key_of(2), - HummockValue::put(Bytes::from("value22")), + SharedBufferValue::Insert(Bytes::from("value22")), ), ( iterator_test_table_key_of(3), - HummockValue::put(Bytes::from("value32")), + SharedBufferValue::Insert(Bytes::from("value32")), ), ]; let epoch = test_epoch(2); @@ -1117,19 +1466,20 @@ mod tests { epoch, table_id, ); + let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2); - let shared_buffer_items3: Vec<(Vec, HummockValue)> = vec![ + let shared_buffer_items3: Vec<(Vec, SharedBufferValue)> = vec![ ( iterator_test_table_key_of(1), - HummockValue::put(Bytes::from("value13")), + SharedBufferValue::Insert(Bytes::from("value13")), ), ( iterator_test_table_key_of(2), - HummockValue::put(Bytes::from("value23")), + SharedBufferValue::Insert(Bytes::from("value23")), ), ( iterator_test_table_key_of(3), - HummockValue::put(Bytes::from("value33")), + SharedBufferValue::Insert(Bytes::from("value33")), ), ]; let epoch = test_epoch(3); @@ -1138,6 +1488,192 @@ mod tests { epoch, table_id, ); + let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3); + + let batch_items = [ + shared_buffer_items1, + shared_buffer_items2, + shared_buffer_items3, + ]; + // newer data comes first + let imms = vec![imm3, imm2, imm1]; + let merged_imm = merge_imms_in_memory(table_id, 0, imms.clone(), None).await; + + // Point lookup + for (i, items) in batch_items.iter().enumerate() { + for (key, value) in items { + assert_eq!( + merged_imm + .get( + TableKey(key.as_slice()), + test_epoch(i as u64 + 1), + &ReadOptions::default() + ) + .unwrap() + .0, + value.clone(), + "epoch: {}, key: {:?}", + test_epoch(i as u64 + 1), + String::from_utf8(key.clone()) + ); + } + } + assert_eq!( + merged_imm.get( + TableKey(iterator_test_table_key_of(4).as_slice()), + test_epoch(1), + &ReadOptions::default() + ), + None + ); + assert_eq!( + merged_imm.get( + TableKey(iterator_test_table_key_of(5).as_slice()), + test_epoch(1), + &ReadOptions::default() + ), + None + ); + + // Forward iterator + for snapshot_epoch in 1..=3 { + let mut iter = merged_imm.clone().into_forward_iter(); + iter.rewind().await.unwrap(); + let mut output = vec![]; + while iter.is_valid() { + let epoch = iter.key().epoch_with_gap.pure_epoch(); + if test_epoch(snapshot_epoch) == epoch { + output.push(( + iter.key().user_key.table_key.to_vec(), + iter.value().to_bytes(), + )); + } + iter.next().await.unwrap(); + } + assert_eq!(output, batch_items[snapshot_epoch as usize - 1]); + } + + // Forward and Backward iterator + { + let mut iter = merged_imm.clone().into_forward_iter(); + iter.rewind().await.unwrap(); + let mut output = vec![]; + while iter.is_valid() { + output.push(( + iter.key().user_key.table_key.to_vec(), + iter.value().to_bytes(), + )); + iter.next().await.unwrap(); + } + + let mut expected = vec![]; + for key_idx in 0..=2 { + for epoch in (1..=3).rev() { + let item = batch_items[epoch - 1][key_idx].clone(); + expected.push(item); + } + } + assert_eq!(expected, output); + + let mut backward_iter = merged_imm.clone().into_backward_iter(); + backward_iter.rewind().await.unwrap(); + let mut output = vec![]; + while backward_iter.is_valid() { + output.push(( + backward_iter.key().user_key.table_key.to_vec(), + backward_iter.value().to_bytes(), + )); + backward_iter.next().await.unwrap(); + } + let mut expected = vec![]; + for key_idx in (0..=2).rev() { + for epoch in (1..=3).rev() { + let item = batch_items[epoch - 1][key_idx].clone(); + expected.push(item); + } + } + assert_eq!(expected, output); + } + } + + #[tokio::test] + async fn test_merge_imms_with_old_values() { + let table_id = TableId { table_id: 1004 }; + let key_value1: Vec<(Vec, SharedBufferValue)> = vec![ + ( + iterator_test_table_key_of(1), + SharedBufferValue::Insert(Bytes::from("value1")), + ), + ( + iterator_test_table_key_of(2), + SharedBufferValue::Update(Bytes::from("value2")), + ), + (iterator_test_table_key_of(3), SharedBufferValue::Delete), + ]; + let old_value1 = vec![ + Bytes::new(), + Bytes::from("old_value2"), + Bytes::from("old_value3"), + ]; + let epoch = test_epoch(1); + let imm1 = SharedBufferBatch::for_test_with_old_values( + transform_shared_buffer(key_value1.clone()), + old_value1.clone(), + epoch, + table_id, + ); + let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone()); + let key_value2: Vec<(Vec, SharedBufferValue)> = vec![ + ( + iterator_test_table_key_of(1), + SharedBufferValue::Update(Bytes::from("value12")), + ), + ( + iterator_test_table_key_of(2), + SharedBufferValue::Update(Bytes::from("value22")), + ), + ( + iterator_test_table_key_of(3), + SharedBufferValue::Insert(Bytes::from("value32")), + ), + ]; + let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()]; + let epoch = epoch.next_epoch(); + let imm2 = SharedBufferBatch::for_test_with_old_values( + transform_shared_buffer(key_value2.clone()), + old_value2.clone(), + epoch, + table_id, + ); + let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone()); + + let key_value3: Vec<(Vec, SharedBufferValue)> = vec![ + (iterator_test_table_key_of(1), SharedBufferValue::Delete), + (iterator_test_table_key_of(2), SharedBufferValue::Delete), + ( + iterator_test_table_key_of(3), + SharedBufferValue::Update(Bytes::from("value33")), + ), + ]; + let old_value3 = vec![ + Bytes::from("value12"), + Bytes::from("value22"), + Bytes::from("value32"), + ]; + let epoch = epoch.next_epoch(); + let imm3 = SharedBufferBatch::for_test_with_old_values( + transform_shared_buffer(key_value3.clone()), + old_value3.clone(), + epoch, + table_id, + ); + let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone()); + + let key_values = [ + (key_value1, old_value1), + (key_value2, old_value2), + (key_value3, old_value3), + ]; let batch_items = [ shared_buffer_items1, @@ -1244,5 +1780,33 @@ mod tests { } assert_eq!(expected, output); } + + // old value iter + { + let mut iter = merged_imm.clone().into_old_value_iter(); + iter.rewind().await.unwrap(); + let mut output = vec![]; + while iter.is_valid() { + output.push(( + iter.key().user_key.table_key.to_vec(), + iter.value().to_bytes(), + )); + iter.next().await.unwrap(); + } + + let mut expected = vec![]; + for key_idx in 0..=2 { + for epoch in (0..=2).rev() { + let (key_values, old_values) = &key_values[epoch]; + let (key, new_value) = &key_values[key_idx]; + let old_value = &old_values[key_idx]; + if matches!(new_value, SharedBufferValue::Insert(_)) { + continue; + } + expected.push((key.clone(), HummockValue::Put(old_value.clone()))); + } + } + assert_eq!(expected, output); + } } } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 8c18b647ca819..d082ca89d84db 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -47,7 +47,7 @@ use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, Hummoc use crate::hummock::event_handler::{ HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping, }; -use crate::hummock::iterator::ChangeLogIterator; +use crate::hummock::iterator::change_log::ChangeLogIterator; use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, PinnedVersion}; use crate::hummock::observer_manager::HummockObserverNode; use crate::hummock::utils::{validate_safe_epoch, wait_for_epoch}; diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index d9a83c8695b14..a3823786613f9 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -29,12 +29,12 @@ use super::version::{StagingData, VersionUpdate}; use crate::error::StorageResult; use crate::hummock::event_handler::hummock_event_handler::HummockEventSender; use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard}; +use crate::hummock::iterator::change_log::ChangeLogIterator; use crate::hummock::iterator::{ - ChangeLogIterator, ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, - UserIterator, + ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, UserIterator, }; use crate::hummock::shared_buffer::shared_buffer_batch::{ - SharedBufferBatch, SharedBufferBatchIterator, + SharedBufferBatch, SharedBufferBatchIterator, SharedBufferItem, SharedBufferValue, }; use crate::hummock::store::version::{read_filter_for_version, HummockVersionReader}; use crate::hummock::utils::{ @@ -45,7 +45,6 @@ use crate::hummock::write_limiter::WriteLimiterRef; use crate::hummock::{MemoryLimiter, SstableIterator}; use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator}; use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic}; -use crate::storage_value::StorageValue; use crate::store::*; /// `LocalHummockStorage` is a handle for a state table shard to access data from and write data to @@ -310,6 +309,11 @@ impl LocalStateStore for LocalHummockStorage { async fn flush(&mut self) -> StorageResult { let buffer = self.mem_table.drain().into_parts(); let mut kv_pairs = Vec::with_capacity(buffer.len()); + let mut old_values = if self.is_flush_old_value() { + Some(Vec::with_capacity(buffer.len())) + } else { + None + }; for (key, key_op) in buffer { match key_op { // Currently, some executors do not strictly comply with these semantics. As @@ -328,7 +332,10 @@ impl LocalStateStore for LocalHummockStorage { ) .await?; } - kv_pairs.push((key, StorageValue::new_put(value))); + kv_pairs.push((key, SharedBufferValue::Insert(value))); + if let Some(old_values) = &mut old_values { + old_values.push(Bytes::new()); + } } KeyOp::Delete(old_value) => { if ENABLE_SANITY_CHECK { @@ -343,7 +350,10 @@ impl LocalStateStore for LocalHummockStorage { ) .await?; } - kv_pairs.push((key, StorageValue::new_delete())); + kv_pairs.push((key, SharedBufferValue::Delete)); + if let Some(old_values) = &mut old_values { + old_values.push(old_value); + } } KeyOp::Update((old_value, new_value)) => { if ENABLE_SANITY_CHECK { @@ -359,12 +369,16 @@ impl LocalStateStore for LocalHummockStorage { ) .await?; } - kv_pairs.push((key, StorageValue::new_put(new_value))); + kv_pairs.push((key, SharedBufferValue::Update(new_value))); + if let Some(old_values) = &mut old_values { + old_values.push(old_value); + } } } } self.flush_inner( kv_pairs, + old_values, WriteOptions { epoch: self.epoch(), table_id: self.table_id, @@ -463,7 +477,8 @@ impl LocalStateStore for LocalHummockStorage { impl LocalHummockStorage { async fn flush_inner( &mut self, - kv_pairs: Vec<(TableKey, StorageValue)>, + sorted_items: Vec, + old_values: Option>, write_options: WriteOptions, ) -> StorageResult { let epoch = write_options.epoch; @@ -473,16 +488,16 @@ impl LocalHummockStorage { self.stats .write_batch_tuple_counts .with_label_values(&[table_id_label.as_str()]) - .inc_by(kv_pairs.len() as _); + .inc_by(sorted_items.len() as _); let timer = self .stats .write_batch_duration .with_label_values(&[table_id_label.as_str()]) .start_timer(); - let imm_size = if !kv_pairs.is_empty() { - let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(kv_pairs); - let size = SharedBufferBatch::measure_batch_size(&sorted_items); + let imm_size = if !sorted_items.is_empty() { + let size = SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref()); + self.write_limiter.wait_permission(self.table_id).await; let limiter = self.memory_limiter.as_ref(); let tracker = if let Some(tracker) = limiter.try_require_memory(size as u64) { @@ -513,6 +528,7 @@ impl LocalHummockStorage { epoch, self.spill_offset, sorted_items, + old_values, size, table_id, instance_id, @@ -589,6 +605,16 @@ impl LocalHummockStorage { pub fn instance_id(&self) -> u64 { self.instance_guard.instance_id } + + fn is_flush_old_value(&self) -> bool { + matches!( + &self.op_consistency_level, + OpConsistencyLevel::ConsistentOldValue { + is_log_store: true, + .. + } + ) + } } pub type StagingDataIterator = MergeIterator< diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 91b6394ed160a..9f94895e2bba7 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -28,7 +28,6 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::MAX_SPILL_TIMES; -use risingwave_hummock_sdk::change_log::EpochNewChangeLog; use risingwave_hummock_sdk::key::{ bound_table_key_range, is_empty_key_range, FullKey, TableKey, TableKeyRange, UserKey, }; @@ -38,14 +37,13 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo}; -use risingwave_pb::hummock::{LevelType, SstableInfo}; +use risingwave_pb::hummock::{EpochNewChangeLog, LevelType, SstableInfo}; use sync_point::sync_point; use super::StagingDataIterator; use crate::error::StorageResult; -use crate::hummock::iterator::{ - ChangeLogIterator, ConcatIterator, HummockIteratorUnion, MergeIterator, UserIterator, -}; +use crate::hummock::iterator::change_log::ChangeLogIterator; +use crate::hummock::iterator::{ConcatIterator, HummockIteratorUnion, MergeIterator, UserIterator}; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::sstable_store::SstableStoreRef; @@ -75,6 +73,7 @@ pub type CommittedVersion = PinnedVersion; pub struct StagingSstableInfo { // newer data comes first sstable_infos: Vec, + old_value_sstable_infos: Vec, /// Epochs whose data are included in the Sstable. The newer epoch comes first. /// The field must not be empty. epochs: Vec, @@ -85,6 +84,7 @@ pub struct StagingSstableInfo { impl StagingSstableInfo { pub fn new( sstable_infos: Vec, + old_value_sstable_infos: Vec, epochs: Vec, imm_ids: Vec, imm_size: usize, @@ -93,6 +93,7 @@ impl StagingSstableInfo { assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2.partial_cmp(epoch1))); Self { sstable_infos, + old_value_sstable_infos, epochs, imm_ids, imm_size, @@ -103,6 +104,10 @@ impl StagingSstableInfo { &self.sstable_infos } + pub fn old_value_sstable_infos(&self) -> &Vec { + &self.old_value_sstable_infos + } + pub fn imm_size(&self) -> usize { self.imm_size } diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 872e0877ac4a5..445c7fd718d31 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -34,7 +34,9 @@ use super::{ }; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FullKeyFilterKeyExtractor}; use crate::hummock::iterator::ForwardMergeRangeIterator; -use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; +use crate::hummock::shared_buffer::shared_buffer_batch::{ + SharedBufferBatch, SharedBufferItem, SharedBufferValue, +}; use crate::hummock::value::HummockValue; use crate::hummock::{ BlockedXor16FilterBuilder, CachePolicy, CompactionDeleteRangeIterator, DeleteRangeTombstone, @@ -67,10 +69,10 @@ pub fn default_opts_for_test() -> StorageOpts { } } -pub fn gen_dummy_batch(n: u64) -> Vec<(TableKey, StorageValue)> { +pub fn gen_dummy_batch(n: u64) -> Vec { vec![( TableKey(Bytes::from(iterator_test_table_key_of(n as usize))), - StorageValue::new_put(b"value1".to_vec()), + SharedBufferValue::Insert(Bytes::copy_from_slice(&b"value1"[..])), )] } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 82af4beadf761..9c6b3bbdf193f 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -413,7 +413,11 @@ pub(crate) async fn do_delete_sanity_check( table_option: TableOption, op_consistency_level: &OpConsistencyLevel, ) -> StorageResult<()> { - let OpConsistencyLevel::ConsistentOldValue(old_value_checker) = op_consistency_level else { + let OpConsistencyLevel::ConsistentOldValue { + check_old_value: old_value_checker, + .. + } = op_consistency_level + else { return Ok(()); }; let read_options = ReadOptions { @@ -455,7 +459,11 @@ pub(crate) async fn do_update_sanity_check( table_option: TableOption, op_consistency_level: &OpConsistencyLevel, ) -> StorageResult<()> { - let OpConsistencyLevel::ConsistentOldValue(old_value_checker) = op_consistency_level else { + let OpConsistencyLevel::ConsistentOldValue { + check_old_value: old_value_checker, + .. + } = op_consistency_level + else { return Ok(()); }; let read_options = ReadOptions { diff --git a/src/storage/src/hummock/value.rs b/src/storage/src/hummock/value.rs index 7a1987ad81b31..0572a5c164f6f 100644 --- a/src/storage/src/hummock/value.rs +++ b/src/storage/src/hummock/value.rs @@ -17,7 +17,6 @@ use std::fmt::Debug; use bytes::{Buf, BufMut, Bytes}; use super::{HummockError, HummockResult}; -use crate::storage_value::StorageValue; pub const VALUE_DELETE: u8 = 1 << 0; pub const VALUE_PUT: u8 = 0; @@ -45,6 +44,7 @@ impl> Debug for HummockValue { impl Copy for HummockValue where T: Copy {} +#[cfg(any(test, feature = "test"))] impl PartialEq for HummockValue { fn eq(&self, other: &Self) -> bool { match (self, other) { @@ -55,11 +55,12 @@ impl PartialEq for HummockValue { } } +#[cfg(any(test, feature = "test"))] impl Eq for HummockValue {} impl HummockValue where - T: PartialEq + Eq + AsRef<[u8]>, + T: AsRef<[u8]>, { pub fn encoded_len(&self) -> usize { match self { @@ -168,16 +169,6 @@ impl From>> for HummockValue { } } -impl From for HummockValue { - fn from(data: StorageValue) -> Self { - if data.is_some() { - HummockValue::Put(data.user_value.unwrap_or_default()) - } else { - HummockValue::Delete - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 48000ec702310..3b7953fccdec2 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -175,7 +175,10 @@ impl MemTable { pub fn delete(&mut self, pk: TableKey, old_value: Bytes) -> Result<()> { let key_len = std::mem::size_of::() + pk.len(); - let OpConsistencyLevel::ConsistentOldValue(value_checker) = &self.op_consistency_level + let OpConsistencyLevel::ConsistentOldValue { + check_old_value: value_checker, + .. + } = &self.op_consistency_level else { let delete_value = KeyOp::Delete(old_value); self.kv_size.add(&pk, &delete_value); @@ -240,7 +243,10 @@ impl MemTable { old_value: Bytes, new_value: Bytes, ) -> Result<()> { - let OpConsistencyLevel::ConsistentOldValue(value_checker) = &self.op_consistency_level + let OpConsistencyLevel::ConsistentOldValue { + check_old_value: value_checker, + .. + } = &self.op_consistency_level else { let key_len = std::mem::size_of::() + pk.len(); @@ -683,9 +689,10 @@ mod tests { #[tokio::test] async fn test_mem_table_memory_size() { - let mut mem_table = MemTable::new(OpConsistencyLevel::ConsistentOldValue( - CHECK_BYTES_EQUAL.clone(), - )); + let mut mem_table = MemTable::new(OpConsistencyLevel::ConsistentOldValue { + check_old_value: CHECK_BYTES_EQUAL.clone(), + is_log_store: false, + }); assert_eq!(mem_table.kv_size.size(), 0); mem_table @@ -878,9 +885,10 @@ mod tests { let mut test_data = ordered_test_data.clone(); test_data.shuffle(&mut rng); - let mut mem_table = MemTable::new(OpConsistencyLevel::ConsistentOldValue( - CHECK_BYTES_EQUAL.clone(), - )); + let mut mem_table = MemTable::new(OpConsistencyLevel::ConsistentOldValue { + check_old_value: CHECK_BYTES_EQUAL.clone(), + is_log_store: false, + }); for (key, op) in test_data { match op { KeyOp::Insert(value) => { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 95705d154af6c..ade0ad7ffff64 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -547,16 +547,21 @@ pub static CHECK_BYTES_EQUAL: LazyLock> = pub enum OpConsistencyLevel { #[default] Inconsistent, - ConsistentOldValue(Arc), + ConsistentOldValue { + check_old_value: Arc, + /// whether should store the old value + is_log_store: bool, + }, } impl Debug for OpConsistencyLevel { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"), - OpConsistencyLevel::ConsistentOldValue(_) => { - f.write_str("OpConsistencyLevel::ConsistentOldValue") - } + OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f + .debug_struct("OpConsistencyLevel::ConsistentOldValue") + .field("is_log_store", is_log_store) + .finish(), } } } @@ -569,8 +574,23 @@ impl PartialEq for OpConsistencyLevel { OpConsistencyLevel::Inconsistent, OpConsistencyLevel::Inconsistent ) | ( - OpConsistencyLevel::ConsistentOldValue(_), - OpConsistencyLevel::ConsistentOldValue(_), + OpConsistencyLevel::ConsistentOldValue { + is_log_store: true, + .. + }, + OpConsistencyLevel::ConsistentOldValue { + is_log_store: true, + .. + }, + ) | ( + OpConsistencyLevel::ConsistentOldValue { + is_log_store: false, + .. + }, + OpConsistencyLevel::ConsistentOldValue { + is_log_store: false, + .. + }, ) ) } @@ -613,7 +633,11 @@ impl From for NewLocalOptions { op_consistency_level: match value.op_consistency_level { TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent, TracedOpConsistencyLevel::ConsistentOldValue => { - OpConsistencyLevel::ConsistentOldValue(CHECK_BYTES_EQUAL.clone()) + OpConsistencyLevel::ConsistentOldValue { + check_old_value: CHECK_BYTES_EQUAL.clone(), + // TODO: for simplicity, set it to false + is_log_store: false, + } } }, table_option: value.table_option.into(), @@ -629,7 +653,7 @@ impl From for TracedNewLocalOptions { table_id: value.table_id.into(), op_consistency_level: match value.op_consistency_level { OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent, - OpConsistencyLevel::ConsistentOldValue(_) => { + OpConsistencyLevel::ConsistentOldValue { .. } => { TracedOpConsistencyLevel::ConsistentOldValue } }, @@ -731,7 +755,7 @@ impl From for TracedSealCurrentEpochOptions { }), switch_op_consistency_level: value .switch_op_consistency_level - .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue(_))), + .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })), } } } @@ -758,7 +782,10 @@ impl From for SealCurrentEpochOptions { }), switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| { if enable { - OpConsistencyLevel::ConsistentOldValue(CHECK_BYTES_EQUAL.clone()) + OpConsistencyLevel::ConsistentOldValue { + check_old_value: CHECK_BYTES_EQUAL.clone(), + is_log_store: false, + } } else { OpConsistencyLevel::Inconsistent } diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 7fdb838654b70..e7c8c6e45f7ea 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -201,7 +201,7 @@ macro_rules! dispatch_state_store { }}; } -#[cfg(debug_assertions)] +#[cfg(any(debug_assertions, test, feature = "test"))] pub mod verify { use std::fmt::Debug; use std::future::Future; diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 09e0488f6ff91..3a629658c6f8e 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -219,31 +219,34 @@ where } fn consistent_old_value_op(row_serde: impl ValueRowSerde) -> OpConsistencyLevel { - OpConsistencyLevel::ConsistentOldValue(Arc::new(move |first: &Bytes, second: &Bytes| { - if first == second { - return true; - } - let first = match row_serde.deserialize(first) { - Ok(rows) => rows, - Err(e) => { - error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value"); - return false; + OpConsistencyLevel::ConsistentOldValue { + check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| { + if first == second { + return true; } - }; - let second = match row_serde.deserialize(second) { - Ok(rows) => rows, - Err(e) => { - error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value"); - return false; + let first = match row_serde.deserialize(first) { + Ok(rows) => rows, + Err(e) => { + error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value"); + return false; + } + }; + let second = match row_serde.deserialize(second) { + Ok(rows) => rows, + Err(e) => { + error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value"); + return false; + } + }; + if first != second { + error!(first = ?first, second = ?second, "sanity check fail"); + false + } else { + true } - }; - if first != second { - error!(first = ?first, second = ?second, "sanity check fail"); - false - } else { - true - } - })) + }), + is_log_store: false, + } } // initialize diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 4549552bab607..bb6fcd8bd46eb 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -523,8 +523,14 @@ impl LocalBarrierWorker { sync_result, } = result; - let (synced_sstables, table_watermarks) = sync_result - .map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks)) + let (synced_sstables, table_watermarks, old_value_ssts) = sync_result + .map(|sync_result| { + ( + sync_result.uncommitted_ssts, + sync_result.table_watermarks, + sync_result.old_value_ssts, + ) + }) .unwrap_or_default(); let result = StreamingControlStreamResponse { @@ -553,6 +559,10 @@ impl LocalBarrierWorker { .into_iter() .map(|(key, value)| (key.table_id, value.to_protobuf())) .collect(), + old_value_sstables: old_value_ssts + .into_iter() + .map(|sst| sst.sst_info) + .collect(), }, ), ),