diff --git a/proto/hummock.proto b/proto/hummock.proto index 4a3dac7ff6f4..dc376cd694b8 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -137,6 +137,18 @@ message TableWatermarks { bool is_ascending = 2; } +message EpochNewChangeLog { + repeated SstableInfo old_value = 1; + repeated SstableInfo new_value = 2; + // Epochs should be sorted in ascending order, which means earlier epoch at the front + repeated uint64 epochs = 3; +} + +message TableChangeLog { + // Epochs should be sorted in ascending order, which means earlier epoch at the front. + repeated EpochNewChangeLog change_logs = 1; +} + message HummockVersion { message Levels { repeated Level levels = 1; @@ -153,6 +165,7 @@ message HummockVersion { // Reads against such an epoch will fail. uint64 safe_epoch = 4; map table_watermarks = 5; + map table_change_logs = 6; } message HummockVersionDelta { diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index be510d2aa755..feeacc98b558 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -347,6 +347,7 @@ pub(super) fn create_init_version(default_compaction_config: CompactionConfig) - max_committed_epoch: INVALID_EPOCH, safe_epoch: INVALID_EPOCH, table_watermarks: HashMap::new(), + table_change_log: HashMap::new(), }; for group_id in [ StaticCompactionGroupId::StateDefault as CompactionGroupId, @@ -569,6 +570,7 @@ mod tests { max_committed_epoch: 0, safe_epoch: 0, table_watermarks: HashMap::new(), + table_change_log: HashMap::new(), }; for cg in 1..3 { version.levels.insert( diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs new file mode 100644 index 000000000000..cec5ef58034c --- /dev/null +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -0,0 +1,113 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::hummock::{PbEpochNewChangeLog, PbTableChangeLog, SstableInfo}; + +#[derive(Debug, Clone, PartialEq)] +pub struct EpochNewChangeLog { + pub new_value: Vec, + pub old_value: Vec, + pub epochs: Vec, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TableChangeLog(pub Vec); + +impl TableChangeLog { + pub fn filter_epoch(&self, (min_epoch, max_epoch): (u64, u64)) -> &[EpochNewChangeLog] { + let start = self.0.partition_point(|epoch_change_log| { + epoch_change_log.epochs.last().expect("non-empty") < &min_epoch + }); + let end = self.0.partition_point(|epoch_change_log| { + epoch_change_log.epochs.first().expect("non-empty") <= &max_epoch + }); + &self.0[start..end] + } +} + +impl TableChangeLog { + pub fn to_protobuf(&self) -> PbTableChangeLog { + PbTableChangeLog { + change_logs: self + .0 + .iter() + .map(|epoch_new_log| PbEpochNewChangeLog { + epochs: epoch_new_log.epochs.clone(), + new_value: epoch_new_log.new_value.clone(), + old_value: epoch_new_log.old_value.clone(), + }) + .collect(), + } + } + + pub fn from_protobuf(val: &PbTableChangeLog) -> Self { + Self( + val.change_logs + .iter() + .map(|epoch_new_log| EpochNewChangeLog { + epochs: epoch_new_log.epochs.clone(), + new_value: epoch_new_log.new_value.clone(), + old_value: epoch_new_log.old_value.clone(), + }) + .collect(), + ) + } +} + +#[cfg(test)] +mod tests { + use itertools::Itertools; + + use crate::change_log::{EpochNewChangeLog, TableChangeLog}; + + #[test] + fn test_filter_epoch() { + let table_change_log = TableChangeLog(vec![ + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![2], + }, + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![3, 4], + }, + EpochNewChangeLog { + new_value: vec![], + old_value: vec![], + epochs: vec![5], + }, + ]); + + let epochs = [1, 2, 3, 4, 5, 6]; + for i in 0..epochs.len() { + for j in i..epochs.len() { + let min_epoch = epochs[i]; + let max_epoch = epochs[j]; + let expected = table_change_log + .0 + .iter() + .filter(|log| { + &min_epoch <= log.epochs.last().unwrap() + && log.epochs.first().unwrap() <= &max_epoch + }) + .cloned() + .collect_vec(); + let actual = table_change_log.filter_epoch((min_epoch, max_epoch)); + assert_eq!(&expected, actual, "{:?}", (min_epoch, max_epoch)); + } + } + } +} diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 19295a5cf124..ded56ede6fe6 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1315,6 +1315,7 @@ mod tests { max_committed_epoch: 0, safe_epoch: 0, table_watermarks: HashMap::new(), + table_change_log: HashMap::new(), }; assert_eq!(version.get_object_ids().len(), 0); @@ -1378,6 +1379,7 @@ mod tests { max_committed_epoch: 0, safe_epoch: 0, table_watermarks: HashMap::new(), + table_change_log: HashMap::new(), }; let version_delta = HummockVersionDelta { id: 1, @@ -1461,6 +1463,7 @@ mod tests { max_committed_epoch: 0, safe_epoch: 0, table_watermarks: HashMap::new(), + table_change_log: HashMap::new(), } ); } diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 1090a132855e..aa095b6c6632 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -37,6 +37,7 @@ use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; use crate::table_stats::{to_prost_table_stats_map, PbTableStatsMap, TableStatsMap}; +pub mod change_log; pub mod compact; pub mod compaction_group; pub mod key; diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 4fa87296667a..3bf6fa02df46 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -23,6 +23,7 @@ use risingwave_pb::hummock::hummock_version::PbLevels; use risingwave_pb::hummock::hummock_version_delta::GroupDeltas as PbGroupDeltas; use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta}; +use crate::change_log::TableChangeLog; use crate::table_watermark::TableWatermarks; use crate::{CompactionGroupId, HummockSstableObjectId}; @@ -33,6 +34,7 @@ pub struct HummockVersion { pub max_committed_epoch: u64, pub safe_epoch: u64, pub table_watermarks: HashMap>, + pub table_change_log: HashMap, } impl Default for HummockVersion { @@ -74,6 +76,16 @@ impl HummockVersion { ) }) .collect(), + table_change_log: pb_version + .table_change_logs + .iter() + .map(|(table_id, change_log)| { + ( + TableId::new(*table_id), + TableChangeLog::from_protobuf(change_log), + ) + }) + .collect(), } } @@ -92,6 +104,11 @@ impl HummockVersion { .iter() .map(|(table_id, watermark)| (table_id.table_id, watermark.to_protobuf())) .collect(), + table_change_logs: self + .table_change_log + .iter() + .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) + .collect(), } } diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index eafa99042a50..30be483a72f6 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -1288,6 +1288,7 @@ mod tests { max_committed_epoch: epoch, safe_epoch: 0, table_watermarks: HashMap::new(), + table_change_log: HashMap::new(), } } diff --git a/src/storage/src/hummock/iterator/change_log.rs b/src/storage/src/hummock/iterator/change_log.rs new file mode 100644 index 000000000000..8d6187b8cd47 --- /dev/null +++ b/src/storage/src/hummock/iterator/change_log.rs @@ -0,0 +1,695 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::ops::Bound::{Excluded, Included, Unbounded}; + +use risingwave_common::catalog::TableId; +use risingwave_common::must_match; +use risingwave_common::util::epoch::MAX_SPILL_TIMES; +use risingwave_hummock_sdk::key::{FullKey, SetSlice, TableKeyRange, UserKey, UserKeyRange}; +use risingwave_hummock_sdk::EpochWithGap; + +use crate::error::StorageResult; +use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator}; +use crate::hummock::value::HummockValue; +use crate::hummock::{HummockResult, SstableIterator}; +use crate::store::{ChangeLogValue, StateStoreReadLogItem, StateStoreReadLogItemRef}; +use crate::StateStoreIter; + +struct ChangeLogIteratorInner< + NI: HummockIterator, + OI: HummockIterator, +> { + /// Iterator for new value. In each `next`, the iterator will iterate over all value of the current key. + /// Therefore, we need to buffer the key and newest value in `curr_key` and `new_value`. + /// + /// We assume that all operation between `min_epoch` and `max_epoch` will be included in the `new_value_iter`. + new_value_iter: NI, + /// Iterator for old value. When `is_old_value_set` is true, its value is the old value in the change log value. + /// + /// We assume that each old value will have a new value of the same epoch in the `new_value_iter`. This is to say, + /// For a specific key, we won't have an epoch that only exists in the `old_value_iter` but not exists in `new_value_iter`. + /// `Delete` also contains a tombstone value. + old_value_iter: OI, + /// Inclusive max epoch + max_epoch: u64, + /// Inclusive min epoch + min_epoch: u64, + key_range: UserKeyRange, + + /// Buffer of current key + curr_key: FullKey>, + /// Buffer for new value. Only valid when `is_new_value_delete` is true + new_value: Vec, + /// Indicate whether the current new value is delete. + is_new_value_delete: bool, + + /// Whether Indicate whether the current `old_value_iter` represents the old value in ChangeLogValue + is_old_value_set: bool, + + /// Whether the iterator is currently pointing at a valid key with ChangeLogValue + is_current_pos_valid: bool, +} + +impl, OI: HummockIterator> + ChangeLogIteratorInner +{ + fn new( + (min_epoch, max_epoch): (u64, u64), + key_range: UserKeyRange, + new_value_iter: NI, + old_value_iter: OI, + ) -> Self { + Self { + new_value_iter, + old_value_iter, + min_epoch, + max_epoch, + key_range, + + curr_key: FullKey::default(), + new_value: vec![], + is_new_value_delete: false, + is_old_value_set: false, + is_current_pos_valid: false, + } + } + + /// Resets the iterating position to the beginning. + pub async fn rewind(&mut self) -> HummockResult<()> { + // Handle range scan + match &self.key_range.0 { + Included(begin_key) => { + let full_key = FullKey { + user_key: begin_key.as_ref(), + epoch_with_gap: EpochWithGap::new(self.max_epoch, MAX_SPILL_TIMES), + }; + self.new_value_iter.seek(full_key).await?; + self.old_value_iter.seek(full_key).await?; + } + Excluded(_) => unimplemented!("excluded begin key is not supported"), + Unbounded => { + self.new_value_iter.rewind().await?; + self.old_value_iter.rewind().await?; + } + }; + + self.try_advance_to_next_change_log_value().await?; + Ok(()) + } + + pub async fn next(&mut self) -> HummockResult<()> { + self.try_advance_to_next_change_log_value().await + } + + pub fn is_valid(&self) -> bool { + self.is_current_pos_valid + } + + pub fn log_value(&self) -> ChangeLogValue<&[u8]> { + if self.is_new_value_delete { + ChangeLogValue::Delete( + self.old_value() + .expect("should have old value when new value is delete"), + ) + } else { + match self.old_value() { + Some(old_value) => ChangeLogValue::Update { + new_value: self.new_value.as_slice(), + old_value, + }, + None => ChangeLogValue::Insert(self.new_value.as_slice()), + } + } + } + + pub fn key(&self) -> UserKey<&[u8]> { + self.curr_key.user_key.as_ref() + } +} + +impl, OI: HummockIterator> + ChangeLogIteratorInner +{ + async fn try_advance_to_next_change_log_value(&mut self) -> HummockResult<()> { + loop { + self.try_advance_to_next_valid().await?; + if !self.is_valid() { + break; + } + if self.has_log_value() { + break; + } else { + continue; + } + } + Ok(()) + } + + fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool { + // handle range scan + match &self.key_range.1 { + Included(end_key) => user_key > end_key.as_ref(), + Excluded(end_key) => user_key >= end_key.as_ref(), + Unbounded => false, + } + } + + /// Advance the `new_value_iter` to a valid key and valid epoch. + async fn advance_to_valid_key(&mut self) -> HummockResult<()> { + self.is_current_pos_valid = false; + loop { + if !self.new_value_iter.is_valid() { + return Ok(()); + } + + let key = self.new_value_iter.key(); + + // Handle epoch visibility + if !self.is_valid_epoch(key.epoch_with_gap) { + self.new_value_iter.next().await?; + continue; + } + + if self.user_key_out_of_range(key.user_key) { + return Ok(()); + } + + break; + } + + debug_assert!(self.new_value_iter.is_valid()); + debug_assert!(self.is_valid_epoch(self.new_value_iter.key().epoch_with_gap)); + debug_assert!(!self.user_key_out_of_range(self.new_value_iter.key().user_key)); + self.is_current_pos_valid = true; + // The key and value will be saved in a buffer, because in the next step we will + // continue advancing the `new_value_iter`. + self.curr_key.set(self.new_value_iter.key()); + match self.new_value_iter.value() { + HummockValue::Put(val) => { + self.new_value.set(val); + self.is_new_value_delete = false; + } + HummockValue::Delete => { + self.new_value.clear(); + self.is_new_value_delete = true; + } + } + + Ok(()) + } + + /// Advance the `new_value_iter` to find the oldest epoch of the current key. + async fn advance_to_find_oldest_epoch(&mut self) -> HummockResult { + let mut ret = self.curr_key.epoch_with_gap; + debug_assert!(self.is_valid_epoch(ret)); + self.new_value_iter.next().await?; + loop { + if !self.new_value_iter.is_valid() { + break; + } + let key = self.new_value_iter.key(); + match self.curr_key.user_key.as_ref().cmp(&key.user_key) { + Ordering::Less => { + // has advance to next key + break; + } + Ordering::Equal => { + assert!(ret > key.epoch_with_gap); + if !self.is_valid_epoch(key.epoch_with_gap) { + debug_assert!(self.min_epoch > key.epoch_with_gap.pure_epoch()); + break; + } + ret = key.epoch_with_gap; + self.new_value_iter.next().await?; + continue; + } + Ordering::Greater => { + unreachable!( + "hummock iterator advance to a prev key: {:?} {:?}", + self.curr_key, + self.new_value_iter.key() + ); + } + } + } + debug_assert!(self.is_valid_epoch(ret)); + + Ok(ret) + } + + /// Advance the two iters to a valid position. After it returns with Ok, + /// it is possible that the position is valid but there is no change log value, + /// because the new and old value may consume each other, such as Insert in old epoch, + /// but then Delete in new epoch + async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> { + // 1. advance the new_value_iter to the newest op between max and min epoch + self.advance_to_valid_key().await?; + + if !self.is_current_pos_valid { + return Ok(()); + } + + // 2. advance new_value_iter to out of the valid range, and save the oldest value + let oldest_epoch = self.advance_to_find_oldest_epoch().await?; + + // 3. iterate old value iter to the oldest epoch + self.is_old_value_set = false; + loop { + if !self.old_value_iter.is_valid() { + break; + } + + let old_value_iter_key = self.old_value_iter.key(); + match self + .curr_key + .user_key + .as_ref() + .cmp(&old_value_iter_key.user_key.as_ref()) + { + Ordering::Less => { + // old value iter has advanced over the current range + break; + } + Ordering::Equal => match old_value_iter_key.epoch_with_gap.cmp(&oldest_epoch) { + Ordering::Less => { + // The assertion holds because we assume that for a specific key, any old value will have a new value of the same + // epoch in the `new_value_iter`. If the assertion is broken, it means we must have a new value of the same epoch + // that are valid but older than the `oldest_epoch`, which breaks the definition of `oldest_epoch`. + assert!( + old_value_iter_key.epoch_with_gap.pure_epoch() < self.min_epoch, + "there should not be old value between oldest new_value and min_epoch. \ + new value key: {:?}, oldest epoch: {:?}, min epoch: {:?}, old value epoch: {:?}", + self.curr_key, oldest_epoch, self.min_epoch, old_value_iter_key.epoch_with_gap + ); + break; + } + Ordering::Equal => { + self.is_old_value_set = true; + break; + } + Ordering::Greater => { + self.old_value_iter.next().await?; + continue; + } + }, + Ordering::Greater => { + self.old_value_iter.next().await?; + continue; + } + } + } + + Ok(()) + } + + fn is_valid_epoch(&self, epoch: EpochWithGap) -> bool { + let epoch = epoch.pure_epoch(); + self.min_epoch <= epoch && epoch <= self.max_epoch + } + + fn old_value(&self) -> Option<&[u8]> { + if self.is_old_value_set { + debug_assert!(self.old_value_iter.is_valid()); + debug_assert_eq!( + self.old_value_iter.key().user_key, + self.curr_key.user_key.as_ref() + ); + Some(must_match!(self.old_value_iter.value(), HummockValue::Put(val) => val)) + } else { + None + } + } + + fn has_log_value(&self) -> bool { + debug_assert!(self.is_current_pos_valid); + !self.is_new_value_delete || self.is_old_value_set + } +} + +pub struct ChangeLogIterator { + inner: ChangeLogIteratorInner, MergeIterator>, + initial_read: bool, +} + +impl ChangeLogIterator { + pub async fn new( + epoch_range: (u64, u64), + (start_bound, end_bound): TableKeyRange, + new_value_iter: MergeIterator, + old_value_iter: MergeIterator, + table_id: TableId, + ) -> HummockResult { + let make_user_key = |table_key| UserKey { + table_id, + table_key, + }; + let start_bound = start_bound.map(make_user_key); + let end_bound = end_bound.map(make_user_key); + let mut inner = ChangeLogIteratorInner::new( + epoch_range, + (start_bound, end_bound), + new_value_iter, + old_value_iter, + ); + inner.rewind().await?; + Ok(Self { + inner, + initial_read: false, + }) + } +} + +impl StateStoreIter for ChangeLogIterator { + async fn try_next(&mut self) -> StorageResult>> { + if !self.initial_read { + self.initial_read = true; + } else { + self.inner.next().await?; + } + if self.inner.is_valid() { + Ok(Some((self.inner.key().table_key, self.inner.log_value()))) + } else { + Ok(None) + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + use std::ops::Bound::Unbounded; + + use bytes::Bytes; + use itertools::Itertools; + use rand::{thread_rng, Rng, RngCore}; + use risingwave_common::catalog::TableId; + use risingwave_common::hash::table_distribution::TableDistribution; + use risingwave_common::util::epoch::{test_epoch, EpochPair}; + use risingwave_hummock_sdk::key::{TableKey, UserKey}; + use risingwave_hummock_sdk::EpochWithGap; + + use crate::hummock::iterator::change_log::ChangeLogIteratorInner; + use crate::hummock::iterator::test_utils::{ + iterator_test_table_key_of, iterator_test_value_of, + }; + use crate::hummock::iterator::MergeIterator; + use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableStore}; + use crate::memory::MemoryStateStore; + use crate::store::{ + ChangeLogValue, InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, + ReadLogOptions, ReadOptions, SealCurrentEpochOptions, StateStoreIter, StateStoreRead, + CHECK_BYTES_EQUAL, + }; + use crate::StateStore; + + #[tokio::test] + async fn test_empty() { + let table_id = TableId::new(233); + let epoch = EpochWithGap::new_from_epoch(test_epoch(1)); + let empty = BTreeMap::new(); + let new_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id); + let old_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id); + let mut iter = ChangeLogIteratorInner::new( + (epoch.pure_epoch(), epoch.pure_epoch()), + (Unbounded, Unbounded), + new_value_iter, + old_value_iter, + ); + iter.rewind().await.unwrap(); + assert!(!iter.is_valid()); + } + + #[tokio::test] + async fn test_append_only() { + let table_id = TableId::new(233); + + let count = 100; + let kvs = (0..count) + .map(|i| { + ( + TableKey(Bytes::from(iterator_test_table_key_of(i))), + Bytes::from(iterator_test_value_of(i)), + ) + }) + .collect_vec(); + let mem_tables = kvs + .iter() + .map(|(key, value)| { + let mut t = MemTable::new(OpConsistencyLevel::Inconsistent); + t.insert(key.clone(), value.clone()).unwrap(); + t + }) + .collect_vec(); + let epoch = EpochWithGap::new_from_epoch(test_epoch(1)); + let new_value_iter = MergeIterator::new( + mem_tables + .iter() + .map(|mem_table| MemTableHummockIterator::new(&mem_table.buffer, epoch, table_id)), + ); + let empty = BTreeMap::new(); + let old_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id); + let mut iter = ChangeLogIteratorInner::new( + (epoch.pure_epoch(), epoch.pure_epoch()), + (Unbounded, Unbounded), + new_value_iter, + old_value_iter, + ); + iter.rewind().await.unwrap(); + for (key, value) in kvs { + assert!(iter.is_valid()); + assert_eq!( + UserKey { + table_id, + table_key: key.to_ref(), + }, + iter.key() + ); + assert_eq!(ChangeLogValue::Insert(value.as_ref()), iter.log_value()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + } + + #[tokio::test] + async fn test_delete_only() { + let table_id = TableId::new(233); + + let count = 100; + let kvs = (0..count) + .map(|i| { + ( + TableKey(Bytes::from(iterator_test_table_key_of(i))), + Bytes::from(iterator_test_value_of(i)), + ) + }) + .collect_vec(); + let mut new_value_memtable = MemTable::new(OpConsistencyLevel::Inconsistent); + let mut old_value_memtable = MemTable::new(OpConsistencyLevel::Inconsistent); + for (key, value) in &kvs { + new_value_memtable + .delete(key.clone(), Bytes::new()) + .unwrap(); + old_value_memtable + .insert(key.clone(), value.clone()) + .unwrap(); + } + let epoch = EpochWithGap::new_from_epoch(test_epoch(1)); + let new_value_iter = + MemTableHummockIterator::new(&new_value_memtable.buffer, epoch, table_id); + let old_value_iter = + MemTableHummockIterator::new(&old_value_memtable.buffer, epoch, table_id); + let mut iter = ChangeLogIteratorInner::new( + (epoch.pure_epoch(), epoch.pure_epoch()), + (Unbounded, Unbounded), + new_value_iter, + old_value_iter, + ); + iter.rewind().await.unwrap(); + for (key, value) in kvs { + assert!(iter.is_valid()); + assert_eq!( + UserKey { + table_id, + table_key: key.to_ref(), + }, + iter.key() + ); + assert_eq!(ChangeLogValue::Delete(value.as_ref()), iter.log_value()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + } + + async fn gen_test_data( + table_id: TableId, + epoch_count: usize, + key_count: usize, + delete_ratio: f64, + ) -> (Vec<(u64, MemTableStore, MemTableStore)>, MemoryStateStore) { + let state_store = MemoryStateStore::new(); + let mut rng = thread_rng(); + let mut local = state_store + .new_local(NewLocalOptions { + table_id, + op_consistency_level: OpConsistencyLevel::ConsistentOldValue( + CHECK_BYTES_EQUAL.clone(), + ), + table_option: Default::default(), + is_replicated: false, + vnodes: TableDistribution::all_vnodes(), + }) + .await; + let mut logs = Vec::new(); + for epoch_idx in 1..=epoch_count { + let epoch = test_epoch(epoch_idx as _); + let mut new_values = MemTableStore::new(); + let mut old_values = MemTableStore::new(); + if epoch_idx == 1 { + local + .init(InitOptions { + epoch: EpochPair::new_test_epoch(epoch), + }) + .await + .unwrap(); + } else { + local.flush().await.unwrap(); + local.seal_current_epoch( + epoch, + SealCurrentEpochOptions { + table_watermarks: None, + switch_op_consistency_level: None, + }, + ); + } + for key_idx in 0..key_count { + let key = TableKey(Bytes::from(iterator_test_table_key_of(key_idx))); + if rng.gen_bool(delete_ratio) { + if let Some(prev_value) = local + .get( + key.clone(), + ReadOptions { + prefix_hint: None, + ignore_range_tombstone: false, + prefetch_options: Default::default(), + cache_policy: Default::default(), + retention_seconds: None, + table_id, + read_version_from_backup: false, + }, + ) + .await + .unwrap() + { + new_values.insert(key.clone(), KeyOp::Delete(Bytes::new())); + old_values.insert(key.clone(), KeyOp::Insert(prev_value.clone())); + local.delete(key, prev_value).unwrap(); + } + } else { + let value = Bytes::copy_from_slice(rng.next_u64().to_string().as_bytes()); + new_values.insert(key.clone(), KeyOp::Insert(value.clone())); + let prev_value = local + .get( + key.clone(), + ReadOptions { + prefix_hint: None, + ignore_range_tombstone: false, + prefetch_options: Default::default(), + cache_policy: Default::default(), + retention_seconds: None, + table_id, + read_version_from_backup: false, + }, + ) + .await + .unwrap(); + if let Some(prev_value) = prev_value.clone() { + old_values.insert(key.clone(), KeyOp::Insert(prev_value)); + } + local.insert(key, value, prev_value).unwrap(); + } + } + logs.push((epoch, new_values, old_values)); + } + local.flush().await.unwrap(); + local.seal_current_epoch( + test_epoch((epoch_count + 1) as _), + SealCurrentEpochOptions { + table_watermarks: None, + switch_op_consistency_level: None, + }, + ); + (logs, state_store) + } + + #[tokio::test] + async fn test_random_data() { + let table_id = TableId::new(233); + let epoch_count = 10; + let (logs, state_store) = gen_test_data(table_id, epoch_count, 10000, 0.2).await; + assert_eq!(logs.len(), epoch_count); + for start_epoch_idx in 0..epoch_count { + for end_epoch_idx in start_epoch_idx + 1..epoch_count { + let new_value_iter = + MergeIterator::new(logs.iter().map(|(epoch, new_value_memtable, _)| { + MemTableHummockIterator::new( + new_value_memtable, + EpochWithGap::new_from_epoch(*epoch), + table_id, + ) + })); + let old_value_iter = + MergeIterator::new(logs.iter().map(|(epoch, _, old_value_memtable)| { + MemTableHummockIterator::new( + old_value_memtable, + EpochWithGap::new_from_epoch(*epoch), + table_id, + ) + })); + let epoch_range = (logs[start_epoch_idx].0, logs[end_epoch_idx].0); + let mut change_log_iter = ChangeLogIteratorInner::new( + epoch_range, + (Unbounded, Unbounded), + new_value_iter, + old_value_iter, + ); + change_log_iter.rewind().await.unwrap(); + let mut expected_change_log_iter = state_store + .iter_log( + epoch_range, + (Unbounded, Unbounded), + ReadLogOptions { table_id }, + ) + .await + .unwrap(); + while let Some((key, change_log_value)) = + expected_change_log_iter.try_next().await.unwrap() + { + assert!(change_log_iter.is_valid()); + assert_eq!( + change_log_iter.key(), + UserKey { + table_id, + table_key: key, + }, + ); + assert_eq!(change_log_iter.log_value(), change_log_value); + change_log_iter.next().await.unwrap(); + } + assert!(!change_log_iter.is_valid()); + } + } + } +} diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index 83bf2de61b13..9cae83be3f89 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -41,6 +41,8 @@ use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::iterator::HummockIteratorUnion::{First, Fourth, Second, Third}; +mod change_log; +pub use change_log::*; mod concat_delete_range_iterator; mod delete_range_iterator; mod skip_watermark; diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index fd392a3e023c..2bdbd70c17d5 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -47,6 +47,7 @@ use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, Hummoc use crate::hummock::event_handler::{ HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping, }; +use crate::hummock::iterator::ChangeLogIterator; use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, PinnedVersion}; use crate::hummock::observer_manager::HummockObserverNode; use crate::hummock::utils::{validate_safe_epoch, wait_for_epoch}; @@ -459,7 +460,7 @@ impl HummockStorage { } impl StateStoreRead for HummockStorage { - type ChangeLogIter = PanicStateStoreIter; + type ChangeLogIter = ChangeLogIterator; type Iter = HummockStorageIterator; fn get( @@ -490,11 +491,16 @@ impl StateStoreRead for HummockStorage { async fn iter_log( &self, - _epoch_range: (u64, u64), - _key_range: TableKeyRange, - _options: ReadLogOptions, + epoch_range: (u64, u64), + key_range: TableKeyRange, + options: ReadLogOptions, ) -> StorageResult { - unimplemented!() + let version = (**self.pinned_version.load()).clone(); + let iter = self + .hummock_version_reader + .iter_log(version, epoch_range, key_range, options) + .await?; + Ok(iter) } } @@ -599,8 +605,6 @@ impl StateStore for HummockStorage { #[cfg(any(test, feature = "test"))] use risingwave_hummock_sdk::version::HummockVersion; -use crate::panic_store::PanicStateStoreIter; - #[cfg(any(test, feature = "test"))] impl HummockStorage { pub async fn seal_and_sync_epoch(&self, epoch: u64) -> StorageResult { diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 5215a9eaf18d..d9a83c8695b1 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -30,7 +30,8 @@ use crate::error::StorageResult; use crate::hummock::event_handler::hummock_event_handler::HummockEventSender; use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard}; use crate::hummock::iterator::{ - ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, UserIterator, + ChangeLogIterator, ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, + UserIterator, }; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferBatchIterator, @@ -44,7 +45,6 @@ use crate::hummock::write_limiter::WriteLimiterRef; use crate::hummock::{MemoryLimiter, SstableIterator}; use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator}; use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic}; -use crate::panic_store::PanicStateStoreIter; use crate::storage_value::StorageValue; use crate::store::*; @@ -206,7 +206,7 @@ impl LocalHummockStorage { } impl StateStoreRead for LocalHummockStorage { - type ChangeLogIter = PanicStateStoreIter; + type ChangeLogIter = ChangeLogIterator; type Iter = HummockStorageIterator; fn get( @@ -232,11 +232,16 @@ impl StateStoreRead for LocalHummockStorage { async fn iter_log( &self, - _epoch_range: (u64, u64), - _key_range: TableKeyRange, - _options: ReadLogOptions, + epoch_range: (u64, u64), + key_range: TableKeyRange, + options: ReadLogOptions, ) -> StorageResult { - unimplemented!() + let version = self.read_version.read().committed().clone(); + let iter = self + .hummock_version_reader + .iter_log(version, epoch_range, key_range, options) + .await?; + Ok(iter) } } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 3ce49380863a..91b6394ed160 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -21,12 +21,14 @@ use std::sync::Arc; use std::time::Instant; use bytes::Bytes; +use futures::future::try_join_all; use itertools::Itertools; use parking_lot::RwLock; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::MAX_SPILL_TIMES; +use risingwave_hummock_sdk::change_log::EpochNewChangeLog; use risingwave_hummock_sdk::key::{ bound_table_key_range, is_empty_key_range, FullKey, TableKey, TableKeyRange, UserKey, }; @@ -41,7 +43,9 @@ use sync_point::sync_point; use super::StagingDataIterator; use crate::error::StorageResult; -use crate::hummock::iterator::{ConcatIterator, HummockIteratorUnion, MergeIterator, UserIterator}; +use crate::hummock::iterator::{ + ChangeLogIterator, ConcatIterator, HummockIteratorUnion, MergeIterator, UserIterator, +}; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::sstable_store::SstableStoreRef; @@ -50,15 +54,15 @@ use crate::hummock::utils::{ prune_overlapping_ssts, range_overlap, search_sst_idx, }; use crate::hummock::{ - get_from_batch, get_from_sstable_info, hit_sstable_bloom_filter, HummockStorageIterator, - HummockStorageIteratorInner, LocalHummockStorageIterator, ReadVersionTuple, Sstable, - SstableIterator, + get_from_batch, get_from_sstable_info, hit_sstable_bloom_filter, HummockError, HummockResult, + HummockStorageIterator, HummockStorageIteratorInner, LocalHummockStorageIterator, + ReadVersionTuple, Sstable, SstableIterator, }; use crate::mem_table::{ImmId, ImmutableMemtable, MemTableHummockIterator}; use crate::monitor::{ GetLocalMetricsGuard, HummockStateStoreMetrics, MayExistLocalMetricsGuard, StoreLocalStatistic, }; -use crate::store::{gen_min_epoch, ReadOptions}; +use crate::store::{gen_min_epoch, ReadLogOptions, ReadOptions}; pub type CommittedVersion = PinnedVersion; @@ -1123,4 +1127,75 @@ impl HummockVersionReader { Ok(false) } + + pub async fn iter_log( + &self, + version: PinnedVersion, + epoch_range: (u64, u64), + key_range: TableKeyRange, + options: ReadLogOptions, + ) -> HummockResult { + let change_log = + if let Some(change_log) = version.version().table_change_log.get(&options.table_id) { + change_log.filter_epoch(epoch_range) + } else { + static EMPTY_VEC: Vec = Vec::new(); + &EMPTY_VEC[..] + }; + let read_options = Arc::new(SstableIteratorReadOptions { + cache_policy: Default::default(), + must_iterated_end_user_key: None, + max_preload_retry_times: 0, + prefetch_for_large_query: false, + }); + + async fn make_iter( + ssts: impl Iterator, + sstable_store: &SstableStoreRef, + read_options: Arc, + ) -> HummockResult> { + let iters = try_join_all(ssts.map(|sst| { + let sstable_store = sstable_store.clone(); + let read_options = read_options.clone(); + async move { + let mut local_stat = StoreLocalStatistic::default(); + let table_holder = sstable_store.sstable(sst, &mut local_stat).await?; + Ok::<_, HummockError>(SstableIterator::new( + table_holder, + sstable_store, + read_options, + )) + } + })) + .await?; + Ok::<_, HummockError>(MergeIterator::new(iters)) + } + + let new_value_iter = make_iter( + change_log + .iter() + .flat_map(|log| log.new_value.iter()) + .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)), + &self.sstable_store, + read_options.clone(), + ) + .await?; + let old_value_iter = make_iter( + change_log + .iter() + .flat_map(|log| log.old_value.iter()) + .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)), + &self.sstable_store, + read_options.clone(), + ) + .await?; + ChangeLogIterator::new( + epoch_range, + key_range, + new_value_iter, + old_value_iter, + options.table_id, + ) + .await + } }