Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(storage): use VecDeque for TableChangeLog to avoid large clone on truncate #19984

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 49 additions & 16 deletions src/storage/hummock_sdk/src/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};

use risingwave_common::catalog::TableId;
use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
Expand All @@ -22,14 +22,44 @@ use tracing::warn;
use crate::sstable_info::SstableInfo;

#[derive(Debug, Clone, PartialEq)]
pub struct TableChangeLogCommon<T>(pub Vec<EpochNewChangeLogCommon<T>>);
pub struct TableChangeLogCommon<T>(
// older log at the front
VecDeque<EpochNewChangeLogCommon<T>>,
);

impl<T> TableChangeLogCommon<T> {
pub fn new(logs: impl IntoIterator<Item = EpochNewChangeLogCommon<T>>) -> Self {
let logs = logs.into_iter().collect::<VecDeque<_>>();
debug_assert!(logs.iter().flat_map(|log| log.epochs.iter()).is_sorted());
Self(logs)
}

pub fn iter(&self) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> {
self.0.iter()
}

pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
self.0.iter_mut()
}

pub fn add_change_log(&mut self, new_change_log: EpochNewChangeLogCommon<T>) {
if let Some(prev_log) = self.0.back() {
assert!(
prev_log.epochs.last().expect("non-empty")
< new_change_log.epochs.first().expect("non-empty")
);
}
self.0.push_back(new_change_log);
}
}

pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;

#[derive(Debug, Clone, PartialEq)]
pub struct EpochNewChangeLogCommon<T> {
pub new_value: Vec<T>,
pub old_value: Vec<T>,
// epochs are sorted in ascending order
pub epochs: Vec<u64>,
}

Expand Down Expand Up @@ -91,20 +121,19 @@ impl<T> TableChangeLogCommon<T> {
pub fn filter_epoch(
&self,
(min_epoch, max_epoch): (u64, u64),
) -> &[EpochNewChangeLogCommon<T>] {
) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> + '_ {
let start = self.0.partition_point(|epoch_change_log| {
epoch_change_log.epochs.last().expect("non-empty") < &min_epoch
});
let end = self.0.partition_point(|epoch_change_log| {
epoch_change_log.epochs.first().expect("non-empty") <= &max_epoch
});
&self.0[start..end]
self.0.range(start..end)
}

/// Returns epochs where value is non-null and >= `min_epoch`.
pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec<u64> {
self.filter_epoch((min_epoch, u64::MAX))
.iter()
.filter(|epoch_change_log| {
// Filter out empty change logs
let new_value_empty = epoch_change_log.new_value.is_empty();
Expand All @@ -113,16 +142,17 @@ impl<T> TableChangeLogCommon<T> {
})
.flat_map(|epoch_change_log| epoch_change_log.epochs.iter().cloned())
.filter(|a| a >= &min_epoch)
.clone()
.take(max_count)
.collect()
}

pub fn truncate(&mut self, truncate_epoch: u64) {
// TODO: may optimize by using VecDeque to maintain the log
self.0
.retain(|change_log| *change_log.epochs.last().expect("non-empty") > truncate_epoch);
if let Some(first_log) = self.0.first_mut() {
while let Some(change_log) = self.0.front()
&& *change_log.epochs.last().expect("non-empty") <= truncate_epoch
{
let _change_log = self.0.pop_front().expect("non-empty");
}
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
if let Some(first_log) = self.0.front_mut() {
first_log.epochs.retain(|epoch| *epoch > truncate_epoch);
}
}
Expand Down Expand Up @@ -256,7 +286,7 @@ mod tests {

#[test]
fn test_filter_epoch() {
let table_change_log = TableChangeLogCommon::<SstableInfo>(vec![
let table_change_log = TableChangeLogCommon::<SstableInfo>::new([
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand Down Expand Up @@ -288,15 +318,18 @@ mod tests {
})
.cloned()
.collect_vec();
let actual = table_change_log.filter_epoch((min_epoch, max_epoch));
assert_eq!(&expected, actual, "{:?}", (min_epoch, max_epoch));
let actual = table_change_log
.filter_epoch((min_epoch, max_epoch))
.cloned()
.collect_vec();
assert_eq!(expected, actual, "{:?}", (min_epoch, max_epoch));
}
}
}

#[test]
fn test_truncate() {
let mut table_change_log = TableChangeLogCommon::<SstableInfo>(vec![
let mut table_change_log = TableChangeLogCommon::<SstableInfo>::new([
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand All @@ -322,7 +355,7 @@ mod tests {
table_change_log.truncate(1);
assert_eq!(
table_change_log,
TableChangeLogCommon::<SstableInfo>(vec![
TableChangeLogCommon::<SstableInfo>::new([
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand All @@ -344,7 +377,7 @@ mod tests {
table_change_log.truncate(3);
assert_eq!(
table_change_log,
TableChangeLogCommon::<SstableInfo>(vec![
TableChangeLogCommon::<SstableInfo>::new([
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::iter::once;
use std::sync::Arc;

use bytes::Bytes;
Expand Down Expand Up @@ -696,16 +697,10 @@ impl HummockVersion {
match table_change_log.entry(*table_id) {
Entry::Occupied(entry) => {
let change_log = entry.into_mut();
if let Some(prev_log) = change_log.0.last() {
assert!(
prev_log.epochs.last().expect("non-empty")
< new_change_log.epochs.first().expect("non-empty")
);
}
change_log.0.push(new_change_log.clone());
change_log.add_change_log(new_change_log.clone());
}
Entry::Vacant(entry) => {
entry.insert(TableChangeLogCommon(vec![new_change_log.clone()]));
entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
}
};
}
Expand Down Expand Up @@ -957,7 +952,7 @@ where
self.get_combined_levels()
.flat_map(|level| level.table_infos.iter())
.chain(self.table_change_log.values().flat_map(|change_log| {
change_log.0.iter().flat_map(|epoch_change_log| {
change_log.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
Expand Down Expand Up @@ -1364,7 +1359,7 @@ pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockSstableObject
.flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
})
.chain(version.table_change_log.values().flat_map(|c| {
c.0.iter().flat_map(|l| {
c.iter().flat_map(|l| {
l.old_value
.iter()
.chain(l.new_value.iter())
Expand Down
37 changes: 14 additions & 23 deletions src/storage/hummock_sdk/src/frontend_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,13 @@ impl FrontendHummockVersion {
.map(|(table_id, change_log)| {
(
*table_id,
TableChangeLogCommon(
change_log
.0
.iter()
.map(|change_log| EpochNewChangeLogCommon {
new_value: vec![],
old_value: vec![],
epochs: change_log.epochs.clone(),
})
.collect(),
),
TableChangeLogCommon::new(change_log.iter().map(|change_log| {
EpochNewChangeLogCommon {
new_value: vec![],
old_value: vec![],
epochs: change_log.epochs.clone(),
}
})),
)
})
.collect(),
Expand All @@ -76,7 +72,6 @@ impl FrontendHummockVersion {
table_id.table_id,
PbTableChangeLog {
change_logs: change_log
.0
.iter()
.map(|change_log| PbEpochNewChangeLog {
old_value: vec![],
Expand All @@ -102,17 +97,13 @@ impl FrontendHummockVersion {
.map(|(table_id, change_log)| {
(
TableId::new(table_id),
TableChangeLogCommon(
change_log
.change_logs
.into_iter()
.map(|change_log| EpochNewChangeLogCommon {
new_value: vec![],
old_value: vec![],
epochs: change_log.epochs,
})
.collect(),
),
TableChangeLogCommon::new(change_log.change_logs.into_iter().map(
|change_log| EpochNewChangeLogCommon {
new_value: vec![],
old_value: vec![],
epochs: change_log.epochs,
},
)),
)
})
.collect(),
Expand Down
13 changes: 7 additions & 6 deletions src/storage/hummock_sdk/src/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn refill_table_change_log(
table_change_log: &mut TableChangeLog,
sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
) {
for c in &mut table_change_log.0 {
for c in table_change_log.iter_mut() {
for s in &mut c.old_value {
refill_sstable_info(s, sst_id_to_info);
}
Expand Down Expand Up @@ -113,19 +113,20 @@ impl From<(&HummockVersion, &HashSet<StateTableId>)> for IncompleteHummockVersio
if !time_travel_table_ids.contains(&table_id.table_id()) {
return None;
}
debug_assert!(change_log.0.iter().all(|d| {
debug_assert!(change_log.iter().all(|d| {
d.new_value.iter().chain(d.old_value.iter()).all(|s| {
s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
})
}));
let incomplete_table_change_log = change_log
.0
.iter()
.map(|e| PbEpochNewChangeLog::from(e).into())
.collect();
Some((*table_id, TableChangeLogCommon(incomplete_table_change_log)))
.map(|e| PbEpochNewChangeLog::from(e).into());
Some((
*table_id,
TableChangeLogCommon::new(incomplete_table_change_log),
))
})
.collect(),
state_table_info: version.state_table_info.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl HummockVersion {
.table_change_log
.values()
.map(|c| {
c.0.iter()
c.iter()
.map(|l| {
l.old_value
.iter()
Expand Down
6 changes: 2 additions & 4 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::change_log::EpochNewChangeLog;
use risingwave_hummock_sdk::key::{
bound_table_key_range, FullKey, TableKey, TableKeyRange, UserKey,
};
Expand Down Expand Up @@ -994,10 +993,9 @@ impl HummockVersionReader {
options: ReadLogOptions,
) -> HummockResult<ChangeLogIterator> {
let change_log = if let Some(change_log) = version.table_change_log.get(&options.table_id) {
change_log.filter_epoch(epoch_range)
change_log.filter_epoch(epoch_range).collect_vec()
} else {
static EMPTY_VEC: Vec<EpochNewChangeLog> = Vec::new();
&EMPTY_VEC[..]
Vec::new()
};
if let Some(max_epoch_change_log) = change_log.last() {
let (_, max_epoch) = epoch_range;
Expand Down
Loading