Skip to content

Commit

Permalink
refactor(storage): refactor on merge imm code (#14880)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Feb 4, 2024
1 parent 81c84c5 commit 90aa90a
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 154 deletions.
54 changes: 44 additions & 10 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::iter::once;
use std::ops::Bound::*;
use std::ops::{Bound, Deref, DerefMut, RangeBounds};
use std::ptr;
Expand Down Expand Up @@ -827,11 +828,11 @@ impl<T: AsRef<[u8]> + Ord + Eq> PartialOrd for FullKey<T> {
}
}

impl<'a, T> From<FullKey<&'a [u8]>> for FullKey<T>
impl<'a, T> From<UserKey<&'a [u8]>> for UserKey<T>
where
T: AsRef<[u8]> + CopyFromSlice,
{
fn from(value: FullKey<&'a [u8]>) -> Self {
fn from(value: UserKey<&'a [u8]>) -> Self {
value.copy_into()
}
}
Expand Down Expand Up @@ -979,40 +980,73 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
/// - Otherwise: return None
pub fn observe<F>(&mut self, key: FullKey<F>) -> Option<FullKey<T>>
where
FullKey<F>: Into<FullKey<T>>,
UserKey<F>: Into<UserKey<T>>,
F: AsRef<[u8]>,
{
self.observe_multi_version(key.user_key, once(key.epoch_with_gap))
}

/// `epochs` comes from greater to smaller
pub fn observe_multi_version<F>(
&mut self,
user_key: UserKey<F>,
mut epochs: impl Iterator<Item = EpochWithGap>,
) -> Option<FullKey<T>>
where
UserKey<F>: Into<UserKey<T>>,
F: AsRef<[u8]>,
{
let max_epoch_with_gap = epochs.next().expect("non-empty");
let min_epoch_with_gap = epochs.fold(
max_epoch_with_gap,
|prev_epoch_with_gap, curr_epoch_with_gap| {
assert!(
prev_epoch_with_gap > curr_epoch_with_gap,
"epoch list not sorted. prev: {:?}, curr: {:?}, user_key: {:?}",
prev_epoch_with_gap,
curr_epoch_with_gap,
user_key
);
curr_epoch_with_gap
},
);
match self
.latest_full_key
.user_key
.as_ref()
.cmp(&key.user_key.as_ref())
.cmp(&user_key.as_ref())
{
Ordering::Less => {
// Observe a new user key

// Reset epochs
self.last_observed_epoch_with_gap = key.epoch_with_gap;
self.last_observed_epoch_with_gap = min_epoch_with_gap;

// Take the previous key and set latest key
Some(std::mem::replace(&mut self.latest_full_key, key.into()))
Some(std::mem::replace(
&mut self.latest_full_key,
FullKey {
user_key: user_key.into(),
epoch_with_gap: min_epoch_with_gap,
},
))
}
Ordering::Equal => {
if key.epoch_with_gap >= self.last_observed_epoch_with_gap {
if max_epoch_with_gap >= self.last_observed_epoch_with_gap {
// Epoch from the same user key should be monotonically decreasing
panic!(
"key {:?} epoch {:?} >= prev epoch {:?}",
key.user_key, key.epoch_with_gap, self.last_observed_epoch_with_gap
user_key, max_epoch_with_gap, self.last_observed_epoch_with_gap
);
}
self.last_observed_epoch_with_gap = key.epoch_with_gap;
self.last_observed_epoch_with_gap = min_epoch_with_gap;
None
}
Ordering::Greater => {
// User key should be monotonically increasing
panic!(
"key {:?} <= prev key {:?}",
key,
user_key,
FullKey {
user_key: self.latest_full_key.user_key.as_ref(),
epoch_with_gap: self.last_observed_epoch_with_gap
Expand Down
50 changes: 35 additions & 15 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,12 @@ pub async fn merge_imms_in_memory(
instance_id: LocalInstanceId,
imms: Vec<ImmutableMemtable>,
memory_tracker: Option<MemoryTracker>,
) -> HummockResult<ImmutableMemtable> {
) -> ImmutableMemtable {
let mut kv_count = 0;
let mut epochs = vec![];
let mut merged_size = 0;
let mut merged_imm_ids = Vec::with_capacity(imms.len());
assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
let max_imm_id = imms[0].batch_id();

let mut imm_iters = Vec::with_capacity(imms.len());
for imm in imms {
Expand All @@ -300,7 +301,6 @@ pub async fn merge_imms_in_memory(
"should only merge data belonging to the same table"
);

merged_imm_ids.push(imm.batch_id());
epochs.push(imm.min_epoch());
kv_count += imm.kv_count();
merged_size += imm.size();
Expand All @@ -311,10 +311,10 @@ pub async fn merge_imms_in_memory(

// use merge iterator to merge input imms
let mut mi = MergeIterator::new(imm_iters);
mi.rewind().await?;
mi.rewind_no_await();
assert!(mi.is_valid());

let first_item_key = mi.current_item().0.clone();
let first_item_key = mi.current_key_entry().key.clone();

let mut merged_payload: Vec<SharedBufferVersionedEntry> = Vec::new();

Expand All @@ -327,44 +327,64 @@ pub async fn merge_imms_in_memory(
let mut table_key_versions: Vec<(EpochWithGap, HummockValue<Bytes>)> = Vec::new();

while mi.is_valid() {
let (key, (epoch_with_gap, value)) = mi.current_item();
let full_key = FullKey::new_with_gap_epoch(table_id, key.clone(), *epoch_with_gap);
if let Some(last_full_key) = full_key_tracker.observe(full_key) {
let key_entry = mi.current_key_entry();
let user_key = UserKey {
table_id,
table_key: key_entry.key.clone(),
};
if let Some(last_full_key) = full_key_tracker.observe_multi_version(
user_key,
key_entry
.new_values
.iter()
.map(|(epoch_with_gap, _)| *epoch_with_gap),
) {
let last_user_key = last_full_key.user_key;
// `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items
// and should not be used because we use max epoch to initialize the tracker
let _epoch_with_gap = last_full_key.epoch_with_gap;

// Record kv entries
merged_payload.push((last_user_key.table_key, table_key_versions));
merged_payload.push(SharedBufferVersionedEntry::new(
last_user_key.table_key,
table_key_versions,
));

// Reset state before moving onto the new table key
table_key_versions = vec![];
}
table_key_versions.push((*epoch_with_gap, value.clone()));
mi.next().await?;
table_key_versions.extend(
key_entry
.new_values
.iter()
.map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
);
mi.advance_peek_to_next_key();
// Since there is no blocking point in this method, but it is cpu intensive, we call this method
// to do cooperative scheduling
tokio::task::consume_budget().await;
}

// process the last key
if !table_key_versions.is_empty() {
merged_payload.push((
merged_payload.push(SharedBufferVersionedEntry::new(
full_key_tracker.latest_full_key.user_key.table_key,
table_key_versions,
));
}

Ok(SharedBufferBatch {
SharedBufferBatch {
inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
epochs,
merged_payload,
kv_count,
merged_imm_ids,
merged_size,
max_imm_id,
memory_tracker,
)),
table_id,
instance_id,
})
}
}

/// Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
Expand Down
21 changes: 4 additions & 17 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ impl HummockEventHandler {
.write()
.update(VersionUpdate::Staging(StagingData::MergedImmMem(
merge_output.merged_imm,
merge_output.imm_ids,
)));
} else {
warn!(
Expand Down Expand Up @@ -795,9 +796,7 @@ mod tests {
use crate::hummock::event_handler::{HummockEvent, HummockEventHandler};
use crate::hummock::iterator::test_utils::mock_sstable_store;
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchInner,
};
use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
use crate::hummock::store::version::{StagingData, VersionUpdate};
use crate::hummock::test_utils::default_opts_for_test;
use crate::hummock::value::HummockValue;
Expand Down Expand Up @@ -840,26 +839,14 @@ mod tests {
Err(HummockError::other("".to_string()))
})
}),
Arc::new(move |table_id, instance_id, imms, _| {
Arc::new(move |_, _, imms, _| {
let (tx, rx) = oneshot::channel::<()>();
let (finish_tx, finish_rx) = oneshot::channel::<()>();
spawn_merging_task_tx.send((tx, finish_rx)).unwrap();
spawn(async move {
rx.await.unwrap();
finish_tx.send(()).unwrap();
let first_imm = &imms[0];
Ok(SharedBufferBatch {
inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
first_imm.epochs().clone(),
first_imm.get_payload().iter().cloned().collect_vec(),
100,
imms.iter().map(|imm| imm.batch_id()).collect_vec(),
100,
None,
)),
table_id,
instance_id,
})
imms[0].clone()
})
}),
);
Expand Down
53 changes: 31 additions & 22 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub type SpawnMergingTask = Arc<
LocalInstanceId,
Vec<ImmutableMemtable>,
Option<MemoryTracker>,
) -> JoinHandle<HummockResult<ImmutableMemtable>>
) -> JoinHandle<ImmutableMemtable>
+ Send
+ Sync
+ 'static,
Expand Down Expand Up @@ -119,6 +119,8 @@ struct UploadingTask {
}

pub struct MergeImmTaskOutput {
/// Input imm ids of the merging task. Larger imm ids at the front.
pub imm_ids: Vec<ImmId>,
pub table_id: TableId,
pub instance_id: LocalInstanceId,
pub merged_imm: ImmutableMemtable,
Expand All @@ -129,7 +131,17 @@ struct MergingImmTask {
table_id: TableId,
instance_id: LocalInstanceId,
input_imms: Vec<ImmutableMemtable>,
join_handle: JoinHandle<HummockResult<ImmutableMemtable>>,
join_handle: JoinHandle<ImmutableMemtable>,
}

impl Debug for MergingImmTask {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MergingImmTask")
.field("table_id", &self.table_id)
.field("instance_id", &self.instance_id)
.field("input_imms", &self.input_imms)
.finish()
}
}

impl MergingImmTask {
Expand All @@ -140,6 +152,7 @@ impl MergingImmTask {
memory_tracker: Option<MemoryTracker>,
context: &UploaderContext,
) -> Self {
assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
let input_imms = imms.clone();
let join_handle = (context.spawn_merging_task)(table_id, instance_id, imms, memory_tracker);

Expand All @@ -152,19 +165,22 @@ impl MergingImmTask {
}

/// Poll the result of the merge task
fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll<HummockResult<ImmutableMemtable>> {
fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll<ImmutableMemtable> {
Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
Ok(task_result) => task_result,
Err(err) => Err(HummockError::other(format!(
"fail to join imm merge join handle: {}",
err.as_report()
))),
Err(err) => {
panic!(
"failed to join merging task: {:?} {:?}",
err.as_report(),
self
);
}
})
}
}

impl Future for MergingImmTask {
type Output = HummockResult<ImmutableMemtable>;
type Output = ImmutableMemtable;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_result(cx)
Expand Down Expand Up @@ -510,7 +526,6 @@ impl SealedData {
}

fn add_merged_imm(&mut self, merged_imm: &ImmutableMemtable) {
debug_assert!(merged_imm.is_merged_imm());
// add merged_imm to merged_imms
self.merged_imms.push_front(merged_imm.clone());
}
Expand Down Expand Up @@ -573,15 +588,16 @@ impl SealedData {
fn poll_success_merge_imm(&mut self, cx: &mut Context<'_>) -> Poll<Option<MergeImmTaskOutput>> {
// only poll the oldest merge task if there is any
if let Some(task) = self.merging_tasks.back_mut() {
let merge_result = ready!(task.poll_unpin(cx));
let merged_imm = ready!(task.poll_unpin(cx));

// pop the finished task
let task = self.merging_tasks.pop_back().expect("must exist");

Poll::Ready(Some(MergeImmTaskOutput {
imm_ids: task.input_imms.iter().map(|imm| imm.batch_id()).collect(),
table_id: task.table_id,
instance_id: task.instance_id,
merged_imm: merge_result.unwrap(),
merged_imm,
}))
} else {
Poll::Ready(None)
Expand Down Expand Up @@ -1655,17 +1671,10 @@ mod tests {
_ = sleep => {
println!("sleep timeout")
}
res = &mut task => {
match res {
Ok(imm) => {
println!("merging task success");
assert_eq!(table_id, imm.table_id);
assert_eq!(9, imm.kv_count());
}
Err(err) => {
println!("merging task failed: {:?}", err);
}
}
imm = &mut task => {
println!("merging task success");
assert_eq!(table_id, imm.table_id);
assert_eq!(9, imm.kv_count());
}
}
task.join_handle.abort();
Expand Down
Loading

0 comments on commit 90aa90a

Please sign in to comment.