Skip to content

Commit

Permalink
Provide a default compaction policy for journal
Browse files Browse the repository at this point in the history
  • Loading branch information
cqs21 committed Jun 3, 2024
1 parent ed90406 commit 5586008
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 56 deletions.
203 changes: 165 additions & 38 deletions core/src/layers/2-edit/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where
disk: D,
init_state: S,
state_max_nbytes: usize,
compaction_policy: P,
mut compaction_policy: P,
) -> Result<EditJournal<E, S, D, P>> {
// Create `SnapshotManager` to persist the init state.
let snapshots = SnapshotManager::create(&disk, &init_state, state_max_nbytes)?;
Expand All @@ -111,6 +111,7 @@ where
let mut write_buf = WriteBuf::new(CryptoChain::<BlockRing<D>>::AVAIL_BLOCK_SIZE);
write_buf.write(&Record::Version(mac))?;
journal_chain.append(write_buf.as_slice())?;
compaction_policy.on_append_journal(1);
write_buf.clear();
journal_chain.flush()?;

Expand Down Expand Up @@ -233,37 +234,37 @@ where
// TODO: sync disk first to ensure data are persisted before
// journal records.

self.append_write_buf_to_journal();

let is_second_try_success = self.write_buf.write(record).is_ok();
if is_second_try_success == false {
panic!("the write buffer must have enough free space");
}
}

fn append_write_buf_to_journal(&mut self) {
let write_data = self.write_buf.as_slice();
if write_data.len() == 0 {
return;
}

self.journal_chain
.append(write_data)
// TODO: how to handle I/O error in journaling?
.expect("we cannot handle I/O error in journaling gracefully");
self.compaction_policy.on_append_journal(1);
self.write_buf.clear();

if self.compaction_policy.should_compact() {
if self.compact().is_err() {
// TODO: how to handle a compaction failure?
panic!("the journal chain compact failed");
}
self.compaction_policy.done_compact();
}

let is_second_try_success = self.write_buf.write(record).is_ok();
if is_second_try_success == false {
panic!("the write buffer must have enough free space");
// TODO: how to handle a compaction failure?
let compacted_blocks = self.compact().expect("journal chain compaction failed");
self.compaction_policy.done_compact(compacted_blocks);
}
}

/// Ensure that all committed edits are persisted to disk.
pub fn flush(&mut self) -> Result<()> {
let buf = self.write_buf.as_slice();
if buf.len() != 0 {
self.journal_chain
.append(buf)
// TODO: how to handle I/O error when append the journal_chain.
.expect("journal_chain append failed");
self.write_buf.clear();
}
self.append_write_buf_to_journal();
self.journal_chain.flush()
}

Expand All @@ -272,9 +273,9 @@ where
self.curr_edit_group.as_mut().map(|edits| edits.clear());
}

fn compact(&mut self) -> Result<()> {
fn compact(&mut self) -> Result<usize> {
if self.journal_chain.block_range().is_empty() {
return Ok(());
return Ok(0);
}

// Persist current state to latest snapshot.
Expand All @@ -285,15 +286,19 @@ where
// Persist the MAC of latest_snapshot.
let mac = self.snapshots.latest_mac();
self.write_buf.write(&Record::Version(mac))?;
self.flush()?;
self.journal_chain.append(self.write_buf.as_slice())?;
self.compaction_policy.on_append_journal(1);
self.write_buf.clear();

// The latest_snapshot has been persisted, now trim the journal_chain.
// And ensure that there is at least one valid block after trimming.
if self.journal_chain.block_range().len() > 1 {
let old_chain_len = self.journal_chain.block_range().len();
if old_chain_len > 1 {
self.journal_chain
.trim(self.journal_chain.block_range().end - 1);
}
Ok(())
let new_chain_len = self.journal_chain.block_range().len();
Ok(old_chain_len - new_chain_len)
}
}

Expand Down Expand Up @@ -708,11 +713,20 @@ pub trait CompactPolicy<E: Edit<S>, S> {
/// decide that now is the time to compact.
fn on_commit_edits(&mut self, edits: &EditGroup<E, S>);

/// Called when some edits are appended to `CryptoChain`.
///
/// The `appended_blocks` indicates how many blocks of journal area are
/// occupied by those edits.
fn on_append_journal(&mut self, appended_blocks: usize);

/// Returns whether now is a good timing for compaction.
fn should_compact(&self) -> bool;

/// Reset the state, as if no edits have ever been added.
fn done_compact(&mut self);
///
/// The `compacted_blocks` indicates how many blocks are reclaimed during
/// this compaction.
fn done_compact(&mut self, compacted_blocks: usize);
}

/// A never-do-compaction policy. Mostly useful for testing.
Expand All @@ -721,19 +735,75 @@ pub struct NeverCompactPolicy;
impl<E: Edit<S>, S> CompactPolicy<E, S> for NeverCompactPolicy {
fn on_commit_edits(&mut self, _edits: &EditGroup<E, S>) {}

fn on_append_journal(&mut self, _appended_nblocks: usize) {}

fn should_compact(&self) -> bool {
false
}

fn done_compact(&mut self) {}
fn done_compact(&mut self, _compacted_blocks: usize) {}
}

/// A compaction policy, triggered when there's no-space left for new edits.
pub struct DefaultCompactPolicy {
used_blocks: usize,
total_blocks: usize,
}

impl DefaultCompactPolicy {
/// Constructs a `DefaultCompactPolicy`.
///
/// It is initialized via the total number of blocks of `EditJournal` and state.
pub fn new<D: BlockSet>(disk_nblocks: usize, state_max_nbytes: usize) -> Self {
// Calculate the blocks used by `Snapshot`s.
let snapshot_bytes =
CryptoBlob::<D>::HEADER_NBYTES + state_max_nbytes + Snapshot::<D>::meta_len();
let snapshot_blocks = (snapshot_bytes + BLOCK_SIZE - 1) / BLOCK_SIZE;
debug_assert!(
snapshot_blocks * 2 < disk_nblocks,
"the number of blocks of journal area are too small"
);

Self {
used_blocks: 0,
total_blocks: disk_nblocks - snapshot_blocks * 2,
}
}

/// Constructs a `DefaultCompactPolicy` from `EditJournalMeta`.
pub fn from_meta(meta: &EditJournalMeta) -> Self {
Self {
used_blocks: 0,
total_blocks: meta.journal_area_nblocks,
}
}
}

impl<E: Edit<S>, S> CompactPolicy<E, S> for DefaultCompactPolicy {
fn on_commit_edits(&mut self, _edits: &EditGroup<E, S>) {}

fn on_append_journal(&mut self, nblocks: usize) {
self.used_blocks += nblocks;
}

fn should_compact(&self) -> bool {
self.used_blocks >= self.total_blocks
}

fn done_compact(&mut self, compacted_blocks: usize) {
debug_assert!(self.used_blocks >= compacted_blocks);
self.used_blocks -= compacted_blocks;
}
}

#[cfg(test)]
mod tests {
use super::{CompactPolicy, Edit, EditGroup, EditJournal, Record, RecordSlice, WriteBuf};
use super::{
CompactPolicy, DefaultCompactPolicy, Edit, EditGroup, EditJournal, Record, RecordSlice,
WriteBuf,
};
use crate::layers::bio::{BlockSet, MemDisk, BLOCK_SIZE};
use crate::layers::crypto::Mac;
use crate::os::Mutex;
use crate::prelude::*;
use pod::Pod;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -762,14 +832,14 @@ mod tests {
/// The `EditJournal` must have enough space to persist the threshold
/// of appended blocks, to avoid overlapping.
struct ThresholdPolicy {
appended: Mutex<usize>,
appended: usize,
threshold: usize,
}

impl ThresholdPolicy {
pub fn new(threshold: usize) -> Self {
Self {
appended: Mutex::new(0),
appended: 0,
threshold,
}
}
Expand All @@ -778,14 +848,16 @@ mod tests {
impl CompactPolicy<XEdit, XState> for ThresholdPolicy {
fn on_commit_edits(&mut self, _edits: &EditGroup<XEdit, XState>) {}

fn on_append_journal(&mut self, nblocks: usize) {
self.appended += nblocks;
}

fn should_compact(&self) -> bool {
let mut appended = self.appended.lock();
*appended += 1;
*appended >= self.threshold
self.appended >= self.threshold
}

fn done_compact(&mut self) {
*self.appended.lock() = 0;
fn done_compact(&mut self, _compacted_blocks: usize) {
self.appended = 0;
}
}

Expand Down Expand Up @@ -901,6 +973,8 @@ mod tests {
}
};

journal.flush().unwrap();

let journal_disk = disk.subset(0..32).unwrap();
let threshold_policy = ThresholdPolicy::new(1_000);
let recover = EditJournal::recover(journal_disk, &meta, threshold_policy).unwrap();
Expand All @@ -913,12 +987,10 @@ mod tests {
println!("append times: {}", append_times);
assert_eq!(
recover.state().sum as usize,
(0 + 999) * 1000 / 2 * (append_times * 2)
(0 + 999) * 1000 / 2 * commit_times
);
let compact_times = append_times / threshold;
println!("compact times: {}", compact_times);
let block_range_len = append_times % threshold + 1;
assert_eq!(recover.journal_chain.block_range().len(), block_range_len);
}

#[test]
Expand All @@ -937,4 +1009,59 @@ mod tests {
// Compact many times.
append_and_recover(5, 1000);
}

/// A test case for `DefaultCompactPolicy`.
///
/// The `commit_times` is used to control the number of `EditGroup` committed.
fn default_compact_policy_when_commit(commit_times: usize) {
let disk = MemDisk::create(16).unwrap();

let journal_disk = disk.subset(0..12).unwrap();
let state_max_nbytes = core::mem::size_of::<XState>() * 2;
let compact_policy =
DefaultCompactPolicy::new::<MemDisk>(journal_disk.nblocks(), state_max_nbytes);
let mut journal: EditJournal<XEdit, XState, MemDisk, DefaultCompactPolicy> =
EditJournal::format(
journal_disk,
XState { sum: 0 },
state_max_nbytes,
compact_policy,
)
.unwrap();
let meta = journal.meta();
assert_eq!(meta.snapshot_area_nblocks, 1);
assert_eq!(meta.journal_area_nblocks, 10);
{
println!("journaling started");
// The `WriteBuf` could hold two `EditGroup` in this test.
for _ in 0..commit_times {
for x in 0..1000 {
let edit = XEdit { x };
journal.add(edit);
}
journal.commit();
println!("state: {}", journal.state().sum);
}
};

journal.flush().unwrap();

let journal_disk = disk.subset(0..12).unwrap();
let compact_policy = DefaultCompactPolicy::from_meta(&meta);
let recover: EditJournal<XEdit, XState, MemDisk, DefaultCompactPolicy> =
EditJournal::recover(journal_disk, &meta, compact_policy).unwrap();
println!("recover state: {}", recover.state().sum);
assert_eq!(
recover.state().sum as usize,
(0 + 999) * 1000 / 2 * commit_times
);
}

#[test]
fn default_compact_policy() {
default_compact_policy_when_commit(0);
default_compact_policy_when_commit(10);
default_compact_policy_when_commit(100);
default_compact_policy_when_commit(1000);
}
}
4 changes: 3 additions & 1 deletion core/src/layers/2-edit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ mod edit;
mod journal;

pub use self::edit::{Edit, EditGroup};
pub use self::journal::{CompactPolicy, EditJournal, EditJournalMeta, NeverCompactPolicy};
pub use self::journal::{
CompactPolicy, DefaultCompactPolicy, EditJournal, EditJournalMeta, NeverCompactPolicy,
};
26 changes: 9 additions & 17 deletions core/src/layers/3-log/tx_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,14 @@ impl<D: BlockSet + 'static> TxLogStore<D> {
raw_log_store: RawLogStoreState::new(),
tx_log_store: tx_log_store_state.clone(),
};
let state_max_nbytes = 1048576; // TBD
let compaction_policy =
JournalCompactPolicy::new::<D>(journal_area.nblocks(), state_max_nbytes);
Arc::new(Mutex::new(Journal::format(
journal_area,
all_state,
1048576, // TBD
JournalCompactPolicy {},
state_max_nbytes,
compaction_policy,
)?))
};

Expand Down Expand Up @@ -185,7 +188,8 @@ impl<D: BlockSet + 'static> TxLogStore<D> {
1 + superblock.chunk_area_nblocks
..1 + superblock.chunk_area_nblocks + journal_area_meta.total_nblocks(),
)?;
Journal::recover(journal_area, &journal_area_meta, JournalCompactPolicy {})?
let compaction_policy = JournalCompactPolicy::from_meta(journal_area_meta);
Journal::recover(journal_area, &journal_area_meta, compaction_policy)?
};
let all_state = journal.state();

Expand Down Expand Up @@ -1287,10 +1291,10 @@ impl TxData for OpenLogCache {}

mod journaling {
use super::*;
use crate::layers::edit::EditGroup;
use crate::layers::edit::DefaultCompactPolicy;

pub type Journal<D> = EditJournal<AllEdit, AllState, D, JournalCompactPolicy>;
pub type JournalCompactPolicy = NeverCompactPolicy;
pub type JournalCompactPolicy = DefaultCompactPolicy;

#[derive(Clone, Serialize, Deserialize)]
pub struct AllState {
Expand Down Expand Up @@ -1339,18 +1343,6 @@ mod journaling {
}
}
}

pub struct NeverCompactPolicy;

impl CompactPolicy<AllEdit, AllState> for JournalCompactPolicy {
fn on_commit_edits(&mut self, _edits: &EditGroup<AllEdit, AllState>) {}

fn should_compact(&self) -> bool {
false
}

fn done_compact(&mut self) {}
}
}

#[cfg(test)]
Expand Down

0 comments on commit 5586008

Please sign in to comment.