Skip to content

Commit

Permalink
perf(storage): use VecDeque for TableChangeLog to avoid large clone o…
Browse files Browse the repository at this point in the history
…n truncate (#19984)
  • Loading branch information
wenym1 authored Jan 2, 2025
1 parent cbf88ef commit 5cb0d17
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 60 deletions.
65 changes: 49 additions & 16 deletions src/storage/hummock_sdk/src/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};

use risingwave_common::catalog::TableId;
use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
Expand All @@ -22,14 +22,44 @@ use tracing::warn;
use crate::sstable_info::SstableInfo;

#[derive(Debug, Clone, PartialEq)]
pub struct TableChangeLogCommon<T>(pub Vec<EpochNewChangeLogCommon<T>>);
pub struct TableChangeLogCommon<T>(
// older log at the front
VecDeque<EpochNewChangeLogCommon<T>>,
);

impl<T> TableChangeLogCommon<T> {
pub fn new(logs: impl IntoIterator<Item = EpochNewChangeLogCommon<T>>) -> Self {
let logs = logs.into_iter().collect::<VecDeque<_>>();
debug_assert!(logs.iter().flat_map(|log| log.epochs.iter()).is_sorted());
Self(logs)
}

pub fn iter(&self) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> {
self.0.iter()
}

pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
self.0.iter_mut()
}

pub fn add_change_log(&mut self, new_change_log: EpochNewChangeLogCommon<T>) {
if let Some(prev_log) = self.0.back() {
assert!(
prev_log.epochs.last().expect("non-empty")
< new_change_log.epochs.first().expect("non-empty")
);
}
self.0.push_back(new_change_log);
}
}

pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;

#[derive(Debug, Clone, PartialEq)]
pub struct EpochNewChangeLogCommon<T> {
pub new_value: Vec<T>,
pub old_value: Vec<T>,
// epochs are sorted in ascending order
pub epochs: Vec<u64>,
}

Expand Down Expand Up @@ -91,20 +121,19 @@ impl<T> TableChangeLogCommon<T> {
pub fn filter_epoch(
&self,
(min_epoch, max_epoch): (u64, u64),
) -> &[EpochNewChangeLogCommon<T>] {
) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> + '_ {
let start = self.0.partition_point(|epoch_change_log| {
epoch_change_log.epochs.last().expect("non-empty") < &min_epoch
});
let end = self.0.partition_point(|epoch_change_log| {
epoch_change_log.epochs.first().expect("non-empty") <= &max_epoch
});
&self.0[start..end]
self.0.range(start..end)
}

/// Returns epochs where value is non-null and >= `min_epoch`.
pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec<u64> {
self.filter_epoch((min_epoch, u64::MAX))
.iter()
.filter(|epoch_change_log| {
// Filter out empty change logs
let new_value_empty = epoch_change_log.new_value.is_empty();
Expand All @@ -113,16 +142,17 @@ impl<T> TableChangeLogCommon<T> {
})
.flat_map(|epoch_change_log| epoch_change_log.epochs.iter().cloned())
.filter(|a| a >= &min_epoch)
.clone()
.take(max_count)
.collect()
}

pub fn truncate(&mut self, truncate_epoch: u64) {
// TODO: may optimize by using VecDeque to maintain the log
self.0
.retain(|change_log| *change_log.epochs.last().expect("non-empty") > truncate_epoch);
if let Some(first_log) = self.0.first_mut() {
while let Some(change_log) = self.0.front()
&& *change_log.epochs.last().expect("non-empty") <= truncate_epoch
{
let _change_log = self.0.pop_front().expect("non-empty");
}
if let Some(first_log) = self.0.front_mut() {
first_log.epochs.retain(|epoch| *epoch > truncate_epoch);
}
}
Expand Down Expand Up @@ -256,7 +286,7 @@ mod tests {

#[test]
fn test_filter_epoch() {
let table_change_log = TableChangeLogCommon::<SstableInfo>(vec![
let table_change_log = TableChangeLogCommon::<SstableInfo>::new([
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand Down Expand Up @@ -288,15 +318,18 @@ mod tests {
})
.cloned()
.collect_vec();
let actual = table_change_log.filter_epoch((min_epoch, max_epoch));
assert_eq!(&expected, actual, "{:?}", (min_epoch, max_epoch));
let actual = table_change_log
.filter_epoch((min_epoch, max_epoch))
.cloned()
.collect_vec();
assert_eq!(expected, actual, "{:?}", (min_epoch, max_epoch));
}
}
}

#[test]
fn test_truncate() {
let mut table_change_log = TableChangeLogCommon::<SstableInfo>(vec![
let mut table_change_log = TableChangeLogCommon::<SstableInfo>::new([
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand All @@ -322,7 +355,7 @@ mod tests {
table_change_log.truncate(1);
assert_eq!(
table_change_log,
TableChangeLogCommon::<SstableInfo>(vec![
TableChangeLogCommon::<SstableInfo>::new([
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand All @@ -344,7 +377,7 @@ mod tests {
table_change_log.truncate(3);
assert_eq!(
table_change_log,
TableChangeLogCommon::<SstableInfo>(vec![
TableChangeLogCommon::<SstableInfo>::new([
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::iter::once;
use std::sync::Arc;

use bytes::Bytes;
Expand Down Expand Up @@ -696,16 +697,10 @@ impl HummockVersion {
match table_change_log.entry(*table_id) {
Entry::Occupied(entry) => {
let change_log = entry.into_mut();
if let Some(prev_log) = change_log.0.last() {
assert!(
prev_log.epochs.last().expect("non-empty")
< new_change_log.epochs.first().expect("non-empty")
);
}
change_log.0.push(new_change_log.clone());
change_log.add_change_log(new_change_log.clone());
}
Entry::Vacant(entry) => {
entry.insert(TableChangeLogCommon(vec![new_change_log.clone()]));
entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
}
};
}
Expand Down Expand Up @@ -957,7 +952,7 @@ where
self.get_combined_levels()
.flat_map(|level| level.table_infos.iter())
.chain(self.table_change_log.values().flat_map(|change_log| {
change_log.0.iter().flat_map(|epoch_change_log| {
change_log.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
Expand Down Expand Up @@ -1364,7 +1359,7 @@ pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockSstableObject
.flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
})
.chain(version.table_change_log.values().flat_map(|c| {
c.0.iter().flat_map(|l| {
c.iter().flat_map(|l| {
l.old_value
.iter()
.chain(l.new_value.iter())
Expand Down
37 changes: 14 additions & 23 deletions src/storage/hummock_sdk/src/frontend_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,13 @@ impl FrontendHummockVersion {
.map(|(table_id, change_log)| {
(
*table_id,
TableChangeLogCommon(
change_log
.0
.iter()
.map(|change_log| EpochNewChangeLogCommon {
new_value: vec![],
old_value: vec![],
epochs: change_log.epochs.clone(),
})
.collect(),
),
TableChangeLogCommon::new(change_log.iter().map(|change_log| {
EpochNewChangeLogCommon {
new_value: vec![],
old_value: vec![],
epochs: change_log.epochs.clone(),
}
})),
)
})
.collect(),
Expand All @@ -76,7 +72,6 @@ impl FrontendHummockVersion {
table_id.table_id,
PbTableChangeLog {
change_logs: change_log
.0
.iter()
.map(|change_log| PbEpochNewChangeLog {
old_value: vec![],
Expand All @@ -102,17 +97,13 @@ impl FrontendHummockVersion {
.map(|(table_id, change_log)| {
(
TableId::new(table_id),
TableChangeLogCommon(
change_log
.change_logs
.into_iter()
.map(|change_log| EpochNewChangeLogCommon {
new_value: vec![],
old_value: vec![],
epochs: change_log.epochs,
})
.collect(),
),
TableChangeLogCommon::new(change_log.change_logs.into_iter().map(
|change_log| EpochNewChangeLogCommon {
new_value: vec![],
old_value: vec![],
epochs: change_log.epochs,
},
)),
)
})
.collect(),
Expand Down
13 changes: 7 additions & 6 deletions src/storage/hummock_sdk/src/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn refill_table_change_log(
table_change_log: &mut TableChangeLog,
sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
) {
for c in &mut table_change_log.0 {
for c in table_change_log.iter_mut() {
for s in &mut c.old_value {
refill_sstable_info(s, sst_id_to_info);
}
Expand Down Expand Up @@ -113,19 +113,20 @@ impl From<(&HummockVersion, &HashSet<StateTableId>)> for IncompleteHummockVersio
if !time_travel_table_ids.contains(&table_id.table_id()) {
return None;
}
debug_assert!(change_log.0.iter().all(|d| {
debug_assert!(change_log.iter().all(|d| {
d.new_value.iter().chain(d.old_value.iter()).all(|s| {
s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
})
}));
let incomplete_table_change_log = change_log
.0
.iter()
.map(|e| PbEpochNewChangeLog::from(e).into())
.collect();
Some((*table_id, TableChangeLogCommon(incomplete_table_change_log)))
.map(|e| PbEpochNewChangeLog::from(e).into());
Some((
*table_id,
TableChangeLogCommon::new(incomplete_table_change_log),
))
})
.collect(),
state_table_info: version.state_table_info.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl HummockVersion {
.table_change_log
.values()
.map(|c| {
c.0.iter()
c.iter()
.map(|l| {
l.old_value
.iter()
Expand Down
6 changes: 2 additions & 4 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::change_log::EpochNewChangeLog;
use risingwave_hummock_sdk::key::{
bound_table_key_range, FullKey, TableKey, TableKeyRange, UserKey,
};
Expand Down Expand Up @@ -994,10 +993,9 @@ impl HummockVersionReader {
options: ReadLogOptions,
) -> HummockResult<ChangeLogIterator> {
let change_log = if let Some(change_log) = version.table_change_log.get(&options.table_id) {
change_log.filter_epoch(epoch_range)
change_log.filter_epoch(epoch_range).collect_vec()
} else {
static EMPTY_VEC: Vec<EpochNewChangeLog> = Vec::new();
&EMPTY_VEC[..]
Vec::new()
};
if let Some(max_epoch_change_log) = change_log.last() {
let (_, max_epoch) = epoch_range;
Expand Down

0 comments on commit 5cb0d17

Please sign in to comment.