Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): refactor on merge imm code #14880

Merged
merged 8 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 44 additions & 10 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::iter::once;
use std::ops::Bound::*;
use std::ops::{Bound, Deref, DerefMut, RangeBounds};
use std::ptr;
Expand Down Expand Up @@ -827,11 +828,11 @@ impl<T: AsRef<[u8]> + Ord + Eq> PartialOrd for FullKey<T> {
}
}

impl<'a, T> From<FullKey<&'a [u8]>> for FullKey<T>
impl<'a, T> From<UserKey<&'a [u8]>> for UserKey<T>
where
T: AsRef<[u8]> + CopyFromSlice,
{
fn from(value: FullKey<&'a [u8]>) -> Self {
fn from(value: UserKey<&'a [u8]>) -> Self {
value.copy_into()
}
}
Expand Down Expand Up @@ -979,40 +980,73 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
/// - Otherwise: return None
pub fn observe<F>(&mut self, key: FullKey<F>) -> Option<FullKey<T>>
where
FullKey<F>: Into<FullKey<T>>,
UserKey<F>: Into<UserKey<T>>,
F: AsRef<[u8]>,
{
self.observe_multi_version(key.user_key, once(key.epoch_with_gap))
}

/// `epochs` comes from greater to smaller
pub fn observe_multi_version<F>(
&mut self,
user_key: UserKey<F>,
mut epochs: impl Iterator<Item = EpochWithGap>,
) -> Option<FullKey<T>>
where
UserKey<F>: Into<UserKey<T>>,
F: AsRef<[u8]>,
{
let max_epoch_with_gap = epochs.next().expect("non-empty");
let min_epoch_with_gap = epochs.fold(
max_epoch_with_gap,
|prev_epoch_with_gap, curr_epoch_with_gap| {
assert!(
prev_epoch_with_gap > curr_epoch_with_gap,
"epoch list not sorted. prev: {:?}, curr: {:?}, user_key: {:?}",
prev_epoch_with_gap,
curr_epoch_with_gap,
user_key
);
curr_epoch_with_gap
},
);
match self
.latest_full_key
.user_key
.as_ref()
.cmp(&key.user_key.as_ref())
.cmp(&user_key.as_ref())
{
Ordering::Less => {
// Observe a new user key

// Reset epochs
self.last_observed_epoch_with_gap = key.epoch_with_gap;
self.last_observed_epoch_with_gap = min_epoch_with_gap;

// Take the previous key and set latest key
Some(std::mem::replace(&mut self.latest_full_key, key.into()))
Some(std::mem::replace(
&mut self.latest_full_key,
FullKey {
user_key: user_key.into(),
epoch_with_gap: min_epoch_with_gap,
},
))
}
Ordering::Equal => {
if key.epoch_with_gap >= self.last_observed_epoch_with_gap {
if max_epoch_with_gap >= self.last_observed_epoch_with_gap {
// Epoch from the same user key should be monotonically decreasing
panic!(
"key {:?} epoch {:?} >= prev epoch {:?}",
key.user_key, key.epoch_with_gap, self.last_observed_epoch_with_gap
user_key, max_epoch_with_gap, self.last_observed_epoch_with_gap
);
}
self.last_observed_epoch_with_gap = key.epoch_with_gap;
self.last_observed_epoch_with_gap = min_epoch_with_gap;
None
}
Ordering::Greater => {
// User key should be monotonically increasing
panic!(
"key {:?} <= prev key {:?}",
key,
user_key,
FullKey {
user_key: self.latest_full_key.user_key.as_ref(),
epoch_with_gap: self.last_observed_epoch_with_gap
Expand Down
50 changes: 35 additions & 15 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,12 @@ pub async fn merge_imms_in_memory(
instance_id: LocalInstanceId,
imms: Vec<ImmutableMemtable>,
memory_tracker: Option<MemoryTracker>,
) -> HummockResult<ImmutableMemtable> {
) -> ImmutableMemtable {
let mut kv_count = 0;
let mut epochs = vec![];
let mut merged_size = 0;
let mut merged_imm_ids = Vec::with_capacity(imms.len());
assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
let max_imm_id = imms[0].batch_id();

let mut imm_iters = Vec::with_capacity(imms.len());
for imm in imms {
Expand All @@ -300,7 +301,6 @@ pub async fn merge_imms_in_memory(
"should only merge data belonging to the same table"
);

merged_imm_ids.push(imm.batch_id());
epochs.push(imm.min_epoch());
kv_count += imm.kv_count();
merged_size += imm.size();
Expand All @@ -311,10 +311,10 @@ pub async fn merge_imms_in_memory(

// use merge iterator to merge input imms
let mut mi = MergeIterator::new(imm_iters);
mi.rewind().await?;
mi.rewind_no_await();
assert!(mi.is_valid());

let first_item_key = mi.current_item().0.clone();
let first_item_key = mi.current_key_entry().key.clone();

let mut merged_payload: Vec<SharedBufferVersionedEntry> = Vec::new();

Expand All @@ -327,44 +327,64 @@ pub async fn merge_imms_in_memory(
let mut table_key_versions: Vec<(EpochWithGap, HummockValue<Bytes>)> = Vec::new();

while mi.is_valid() {
let (key, (epoch_with_gap, value)) = mi.current_item();
let full_key = FullKey::new_with_gap_epoch(table_id, key.clone(), *epoch_with_gap);
if let Some(last_full_key) = full_key_tracker.observe(full_key) {
let key_entry = mi.current_key_entry();
let user_key = UserKey {
table_id,
table_key: key_entry.key.clone(),
};
if let Some(last_full_key) = full_key_tracker.observe_multi_version(
user_key,
key_entry
.new_values
.iter()
.map(|(epoch_with_gap, _)| *epoch_with_gap),
) {
let last_user_key = last_full_key.user_key;
// `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items
// and should not be used because we use max epoch to initialize the tracker
let _epoch_with_gap = last_full_key.epoch_with_gap;

// Record kv entries
merged_payload.push((last_user_key.table_key, table_key_versions));
merged_payload.push(SharedBufferVersionedEntry::new(
last_user_key.table_key,
table_key_versions,
));

// Reset state before moving onto the new table key
table_key_versions = vec![];
}
table_key_versions.push((*epoch_with_gap, value.clone()));
mi.next().await?;
table_key_versions.extend(
key_entry
.new_values
.iter()
.map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
);
mi.advance_peek_to_next_key();
// Since there is no blocking point in this method, but it is cpu intensive, we call this method
// to do cooperative scheduling
tokio::task::consume_budget().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May add comments of the reason for coop scheduling above.

}

// process the last key
if !table_key_versions.is_empty() {
merged_payload.push((
merged_payload.push(SharedBufferVersionedEntry::new(
full_key_tracker.latest_full_key.user_key.table_key,
table_key_versions,
));
}

Ok(SharedBufferBatch {
SharedBufferBatch {
inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
epochs,
merged_payload,
kv_count,
merged_imm_ids,
merged_size,
max_imm_id,
memory_tracker,
)),
table_id,
instance_id,
})
}
}

/// Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
Expand Down
21 changes: 4 additions & 17 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ impl HummockEventHandler {
.write()
.update(VersionUpdate::Staging(StagingData::MergedImmMem(
merge_output.merged_imm,
merge_output.imm_ids,
)));
} else {
warn!(
Expand Down Expand Up @@ -795,9 +796,7 @@ mod tests {
use crate::hummock::event_handler::{HummockEvent, HummockEventHandler};
use crate::hummock::iterator::test_utils::mock_sstable_store;
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchInner,
};
use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
use crate::hummock::store::version::{StagingData, VersionUpdate};
use crate::hummock::test_utils::default_opts_for_test;
use crate::hummock::value::HummockValue;
Expand Down Expand Up @@ -840,26 +839,14 @@ mod tests {
Err(HummockError::other("".to_string()))
})
}),
Arc::new(move |table_id, instance_id, imms, _| {
Arc::new(move |_, _, imms, _| {
let (tx, rx) = oneshot::channel::<()>();
let (finish_tx, finish_rx) = oneshot::channel::<()>();
spawn_merging_task_tx.send((tx, finish_rx)).unwrap();
spawn(async move {
rx.await.unwrap();
finish_tx.send(()).unwrap();
let first_imm = &imms[0];
Ok(SharedBufferBatch {
inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
first_imm.epochs().clone(),
first_imm.get_payload().iter().cloned().collect_vec(),
100,
imms.iter().map(|imm| imm.batch_id()).collect_vec(),
100,
None,
)),
table_id,
instance_id,
})
imms[0].clone()
})
}),
);
Expand Down
53 changes: 31 additions & 22 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub type SpawnMergingTask = Arc<
LocalInstanceId,
Vec<ImmutableMemtable>,
Option<MemoryTracker>,
) -> JoinHandle<HummockResult<ImmutableMemtable>>
) -> JoinHandle<ImmutableMemtable>
+ Send
+ Sync
+ 'static,
Expand Down Expand Up @@ -119,6 +119,8 @@ struct UploadingTask {
}

pub struct MergeImmTaskOutput {
/// Input imm ids of the merging task. Larger imm ids at the front.
pub imm_ids: Vec<ImmId>,
pub table_id: TableId,
pub instance_id: LocalInstanceId,
pub merged_imm: ImmutableMemtable,
Expand All @@ -129,7 +131,17 @@ struct MergingImmTask {
table_id: TableId,
instance_id: LocalInstanceId,
input_imms: Vec<ImmutableMemtable>,
join_handle: JoinHandle<HummockResult<ImmutableMemtable>>,
join_handle: JoinHandle<ImmutableMemtable>,
}

impl Debug for MergingImmTask {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MergingImmTask")
.field("table_id", &self.table_id)
.field("instance_id", &self.instance_id)
.field("input_imms", &self.input_imms)
.finish()
}
}

impl MergingImmTask {
Expand All @@ -140,6 +152,7 @@ impl MergingImmTask {
memory_tracker: Option<MemoryTracker>,
context: &UploaderContext,
) -> Self {
assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
let input_imms = imms.clone();
let join_handle = (context.spawn_merging_task)(table_id, instance_id, imms, memory_tracker);

Expand All @@ -152,19 +165,22 @@ impl MergingImmTask {
}

/// Poll the result of the merge task
fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll<HummockResult<ImmutableMemtable>> {
fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll<ImmutableMemtable> {
Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
Ok(task_result) => task_result,
Err(err) => Err(HummockError::other(format!(
"fail to join imm merge join handle: {}",
err.as_report()
))),
Err(err) => {
panic!(
"failed to join merging task: {:?} {:?}",
err.as_report(),
self
);
}
Comment on lines +171 to +177
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the only await point in merge imm is consume_budget, this code path should never be reached. Can you add some documentations here?

})
}
}

impl Future for MergingImmTask {
type Output = HummockResult<ImmutableMemtable>;
type Output = ImmutableMemtable;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_result(cx)
Expand Down Expand Up @@ -510,7 +526,6 @@ impl SealedData {
}

fn add_merged_imm(&mut self, merged_imm: &ImmutableMemtable) {
debug_assert!(merged_imm.is_merged_imm());
// add merged_imm to merged_imms
self.merged_imms.push_front(merged_imm.clone());
}
Expand Down Expand Up @@ -573,15 +588,16 @@ impl SealedData {
fn poll_success_merge_imm(&mut self, cx: &mut Context<'_>) -> Poll<Option<MergeImmTaskOutput>> {
// only poll the oldest merge task if there is any
if let Some(task) = self.merging_tasks.back_mut() {
let merge_result = ready!(task.poll_unpin(cx));
let merged_imm = ready!(task.poll_unpin(cx));

// pop the finished task
let task = self.merging_tasks.pop_back().expect("must exist");

Poll::Ready(Some(MergeImmTaskOutput {
imm_ids: task.input_imms.iter().map(|imm| imm.batch_id()).collect(),
table_id: task.table_id,
instance_id: task.instance_id,
merged_imm: merge_result.unwrap(),
merged_imm,
}))
} else {
Poll::Ready(None)
Expand Down Expand Up @@ -1655,17 +1671,10 @@ mod tests {
_ = sleep => {
println!("sleep timeout")
}
res = &mut task => {
match res {
Ok(imm) => {
println!("merging task success");
assert_eq!(table_id, imm.table_id);
assert_eq!(9, imm.kv_count());
}
Err(err) => {
println!("merging task failed: {:?}", err);
}
}
imm = &mut task => {
println!("merging task success");
assert_eq!(table_id, imm.table_id);
assert_eq!(9, imm.kv_count());
}
}
task.join_handle.abort();
Expand Down
Loading
Loading