Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a default compaction policy for journal #37

Merged
merged 1 commit into from
Jun 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 165 additions & 38 deletions core/src/layers/2-edit/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where
disk: D,
init_state: S,
state_max_nbytes: usize,
compaction_policy: P,
mut compaction_policy: P,
) -> Result<EditJournal<E, S, D, P>> {
// Create `SnapshotManager` to persist the init state.
let snapshots = SnapshotManager::create(&disk, &init_state, state_max_nbytes)?;
Expand All @@ -111,6 +111,7 @@ where
let mut write_buf = WriteBuf::new(CryptoChain::<BlockRing<D>>::AVAIL_BLOCK_SIZE);
write_buf.write(&Record::Version(mac))?;
journal_chain.append(write_buf.as_slice())?;
compaction_policy.on_append_journal(1);
write_buf.clear();
journal_chain.flush()?;

Expand Down Expand Up @@ -233,37 +234,37 @@ where
// TODO: sync disk first to ensure data are persisted before
// journal records.

self.append_write_buf_to_journal();

let is_second_try_success = self.write_buf.write(record).is_ok();
if is_second_try_success == false {
panic!("the write buffer must have enough free space");
}
}

fn append_write_buf_to_journal(&mut self) {
let write_data = self.write_buf.as_slice();
if write_data.len() == 0 {
return;
}

self.journal_chain
.append(write_data)
// TODO: how to handle I/O error in journaling?
.expect("we cannot handle I/O error in journaling gracefully");
self.compaction_policy.on_append_journal(1);
self.write_buf.clear();

if self.compaction_policy.should_compact() {
if self.compact().is_err() {
// TODO: how to handle a compaction failure?
panic!("the journal chain compact failed");
}
self.compaction_policy.done_compact();
}

let is_second_try_success = self.write_buf.write(record).is_ok();
if is_second_try_success == false {
panic!("the write buffer must have enough free space");
// TODO: how to handle a compaction failure?
let compacted_blocks = self.compact().expect("journal chain compaction failed");
self.compaction_policy.done_compact(compacted_blocks);
}
}

/// Ensure that all committed edits are persisted to disk.
pub fn flush(&mut self) -> Result<()> {
let buf = self.write_buf.as_slice();
if buf.len() != 0 {
self.journal_chain
.append(buf)
// TODO: how to handle I/O error when append the journal_chain.
.expect("journal_chain append failed");
self.write_buf.clear();
}
self.append_write_buf_to_journal();
self.journal_chain.flush()
}

Expand All @@ -272,9 +273,9 @@ where
self.curr_edit_group.as_mut().map(|edits| edits.clear());
}

fn compact(&mut self) -> Result<()> {
fn compact(&mut self) -> Result<usize> {
if self.journal_chain.block_range().is_empty() {
return Ok(());
return Ok(0);
}

// Persist current state to latest snapshot.
Expand All @@ -285,15 +286,19 @@ where
// Persist the MAC of latest_snapshot.
let mac = self.snapshots.latest_mac();
self.write_buf.write(&Record::Version(mac))?;
self.flush()?;
self.journal_chain.append(self.write_buf.as_slice())?;
self.compaction_policy.on_append_journal(1);
self.write_buf.clear();

// The latest_snapshot has been persisted, now trim the journal_chain.
// And ensure that there is at least one valid block after trimming.
if self.journal_chain.block_range().len() > 1 {
let old_chain_len = self.journal_chain.block_range().len();
if old_chain_len > 1 {
self.journal_chain
.trim(self.journal_chain.block_range().end - 1);
}
Ok(())
let new_chain_len = self.journal_chain.block_range().len();
Ok(old_chain_len - new_chain_len)
}
}

Expand Down Expand Up @@ -708,11 +713,20 @@ pub trait CompactPolicy<E: Edit<S>, S> {
/// decide that now is the time to compact.
fn on_commit_edits(&mut self, edits: &EditGroup<E, S>);

/// Called when some edits are appended to `CryptoChain`.
///
/// The `appended_blocks` indicates how many blocks of journal area are
/// occupied by those edits.
fn on_append_journal(&mut self, appended_blocks: usize);

/// Returns whether now is a good timing for compaction.
fn should_compact(&self) -> bool;

/// Reset the state, as if no edits have ever been added.
fn done_compact(&mut self);
///
/// The `compacted_blocks` indicates how many blocks are reclaimed during
/// this compaction.
fn done_compact(&mut self, compacted_blocks: usize);
}

/// A never-do-compaction policy. Mostly useful for testing.
Expand All @@ -721,19 +735,75 @@ pub struct NeverCompactPolicy;
impl<E: Edit<S>, S> CompactPolicy<E, S> for NeverCompactPolicy {
fn on_commit_edits(&mut self, _edits: &EditGroup<E, S>) {}

fn on_append_journal(&mut self, _appended_nblocks: usize) {}

fn should_compact(&self) -> bool {
false
}

fn done_compact(&mut self) {}
fn done_compact(&mut self, _compacted_blocks: usize) {}
}

/// A compaction policy, triggered when there's no-space left for new edits.
pub struct DefaultCompactPolicy {
used_blocks: usize,
total_blocks: usize,
}

impl DefaultCompactPolicy {
/// Constructs a `DefaultCompactPolicy`.
///
/// It is initialized via the total number of blocks of `EditJournal` and state.
pub fn new<D: BlockSet>(disk_nblocks: usize, state_max_nbytes: usize) -> Self {
// Calculate the blocks used by `Snapshot`s.
let snapshot_bytes =
CryptoBlob::<D>::HEADER_NBYTES + state_max_nbytes + Snapshot::<D>::meta_len();
let snapshot_blocks = (snapshot_bytes + BLOCK_SIZE - 1) / BLOCK_SIZE;
debug_assert!(
snapshot_blocks * 2 < disk_nblocks,
"the number of blocks of journal area are too small"
);

Self {
used_blocks: 0,
total_blocks: disk_nblocks - snapshot_blocks * 2,
}
}

/// Constructs a `DefaultCompactPolicy` from `EditJournalMeta`.
pub fn from_meta(meta: &EditJournalMeta) -> Self {
Self {
used_blocks: 0,
total_blocks: meta.journal_area_nblocks,
}
}
}

impl<E: Edit<S>, S> CompactPolicy<E, S> for DefaultCompactPolicy {
fn on_commit_edits(&mut self, _edits: &EditGroup<E, S>) {}

fn on_append_journal(&mut self, nblocks: usize) {
self.used_blocks += nblocks;
}

fn should_compact(&self) -> bool {
self.used_blocks >= self.total_blocks
}

fn done_compact(&mut self, compacted_blocks: usize) {
debug_assert!(self.used_blocks >= compacted_blocks);
self.used_blocks -= compacted_blocks;
}
}

#[cfg(test)]
mod tests {
use super::{CompactPolicy, Edit, EditGroup, EditJournal, Record, RecordSlice, WriteBuf};
use super::{
CompactPolicy, DefaultCompactPolicy, Edit, EditGroup, EditJournal, Record, RecordSlice,
WriteBuf,
};
use crate::layers::bio::{BlockSet, MemDisk, BLOCK_SIZE};
use crate::layers::crypto::Mac;
use crate::os::Mutex;
use crate::prelude::*;
use pod::Pod;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -762,14 +832,14 @@ mod tests {
/// The `EditJournal` must have enough space to persist the threshold
/// of appended blocks, to avoid overlapping.
struct ThresholdPolicy {
appended: Mutex<usize>,
appended: usize,
threshold: usize,
}

impl ThresholdPolicy {
pub fn new(threshold: usize) -> Self {
Self {
appended: Mutex::new(0),
appended: 0,
threshold,
}
}
Expand All @@ -778,14 +848,16 @@ mod tests {
impl CompactPolicy<XEdit, XState> for ThresholdPolicy {
fn on_commit_edits(&mut self, _edits: &EditGroup<XEdit, XState>) {}

fn on_append_journal(&mut self, nblocks: usize) {
self.appended += nblocks;
}

fn should_compact(&self) -> bool {
let mut appended = self.appended.lock();
*appended += 1;
*appended >= self.threshold
self.appended >= self.threshold
}

fn done_compact(&mut self) {
*self.appended.lock() = 0;
fn done_compact(&mut self, _compacted_blocks: usize) {
self.appended = 0;
}
}

Expand Down Expand Up @@ -901,6 +973,8 @@ mod tests {
}
};

journal.flush().unwrap();

let journal_disk = disk.subset(0..32).unwrap();
let threshold_policy = ThresholdPolicy::new(1_000);
let recover = EditJournal::recover(journal_disk, &meta, threshold_policy).unwrap();
Expand All @@ -913,12 +987,10 @@ mod tests {
println!("append times: {}", append_times);
assert_eq!(
recover.state().sum as usize,
(0 + 999) * 1000 / 2 * (append_times * 2)
(0 + 999) * 1000 / 2 * commit_times
);
let compact_times = append_times / threshold;
println!("compact times: {}", compact_times);
let block_range_len = append_times % threshold + 1;
assert_eq!(recover.journal_chain.block_range().len(), block_range_len);
}

#[test]
Expand All @@ -937,4 +1009,59 @@ mod tests {
// Compact many times.
append_and_recover(5, 1000);
}

/// A test case for `DefaultCompactPolicy`.
///
/// The `commit_times` is used to control the number of `EditGroup` committed.
fn default_compact_policy_when_commit(commit_times: usize) {
let disk = MemDisk::create(16).unwrap();

let journal_disk = disk.subset(0..12).unwrap();
let state_max_nbytes = core::mem::size_of::<XState>() * 2;
let compact_policy =
DefaultCompactPolicy::new::<MemDisk>(journal_disk.nblocks(), state_max_nbytes);
let mut journal: EditJournal<XEdit, XState, MemDisk, DefaultCompactPolicy> =
EditJournal::format(
journal_disk,
XState { sum: 0 },
state_max_nbytes,
compact_policy,
)
.unwrap();
let meta = journal.meta();
assert_eq!(meta.snapshot_area_nblocks, 1);
assert_eq!(meta.journal_area_nblocks, 10);
{
println!("journaling started");
// The `WriteBuf` could hold two `EditGroup` in this test.
for _ in 0..commit_times {
for x in 0..1000 {
let edit = XEdit { x };
journal.add(edit);
}
journal.commit();
println!("state: {}", journal.state().sum);
}
};

journal.flush().unwrap();

let journal_disk = disk.subset(0..12).unwrap();
let compact_policy = DefaultCompactPolicy::from_meta(&meta);
let recover: EditJournal<XEdit, XState, MemDisk, DefaultCompactPolicy> =
EditJournal::recover(journal_disk, &meta, compact_policy).unwrap();
println!("recover state: {}", recover.state().sum);
assert_eq!(
recover.state().sum as usize,
(0 + 999) * 1000 / 2 * commit_times
);
}

#[test]
fn default_compact_policy() {
default_compact_policy_when_commit(0);
default_compact_policy_when_commit(10);
default_compact_policy_when_commit(100);
default_compact_policy_when_commit(1000);
}
}
4 changes: 3 additions & 1 deletion core/src/layers/2-edit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ mod edit;
mod journal;

pub use self::edit::{Edit, EditGroup};
pub use self::journal::{CompactPolicy, EditJournal, EditJournalMeta, NeverCompactPolicy};
pub use self::journal::{
CompactPolicy, DefaultCompactPolicy, EditJournal, EditJournalMeta, NeverCompactPolicy,
};
Loading
Loading