Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible inconsistent state in journal and optimize handling BVT/BALs #39

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 46 additions & 30 deletions core/src/layers/3-log/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl ChunkAlloc {
}

/// Constructs a `ChunkAlloc` from its parts.
pub(super) fn from_parts(state: ChunkAllocState, tx_provider: Arc<TxProvider>) -> Self {
pub(super) fn from_parts(mut state: ChunkAllocState, tx_provider: Arc<TxProvider>) -> Self {
state.in_journal = false;
let new_self = Self {
state: Arc::new(Mutex::new(state)),
tx_provider,
Expand Down Expand Up @@ -147,6 +148,7 @@ impl ChunkAlloc {
}
}
}
ids.sort_unstable();
ids
};

Expand Down Expand Up @@ -215,7 +217,7 @@ impl Debug for ChunkAlloc {
let state = self.state.lock();
f.debug_struct("ChunkAlloc")
.field("bitmap_free_count", &state.free_count)
.field("bitmap_min_free", &state.min_free)
.field("bitmap_next_free", &state.next_free)
.finish()
}
}
Expand All @@ -232,9 +234,11 @@ pub struct ChunkAllocState {
alloc_map: BitMap,
// The number of free chunks.
free_count: usize,
// The minimum free chunk Id. Useful to narrow the scope of searching for
// free chunk IDs.
min_free: usize,
// The next free chunk Id. Used to narrow the scope of
// searching for free chunk IDs.
next_free: usize,
/// Whether the state is in the journal or not.
in_journal: bool,
}
// TODO: Separate persistent and volatile state of `ChunkAlloc`

Expand All @@ -245,26 +249,43 @@ impl ChunkAllocState {
Self {
alloc_map: BitMap::repeat(false, capacity),
free_count: capacity,
min_free: 0,
next_free: 0,
in_journal: false,
}
}

/// Creates a persistent state in the journal. The state in the journal and
/// the state that `RawLogStore` manages act differently on allocation and
/// edits' appliance.
pub fn new_in_journal(capacity: usize) -> Self {
Self {
alloc_map: BitMap::repeat(false, capacity),
free_count: capacity,
next_free: 0,
in_journal: true,
}
}

/// Allocates a chunk, returning its ID.
pub fn alloc(&mut self) -> Option<ChunkId> {
let min_free = self.min_free;
if min_free >= self.alloc_map.len() {
return None;
let mut next_free = self.next_free;
if next_free == self.alloc_map.len() {
next_free = 0;
}

let free_chunk_id = self
.alloc_map
.first_zero(min_free)
.expect("there must exists a zero");
let free_chunk_id = {
if let Some(chunk_id) = self.alloc_map.first_zero(next_free) {
chunk_id
} else {
self.alloc_map
.first_zero(0)
.expect("there must exists a zero")
}
};

self.alloc_map.set(free_chunk_id, true);
self.free_count -= 1;

// Keep the invariance that all free chunk IDs are no less than `min_free`
self.min_free = free_chunk_id + 1;
self.next_free = free_chunk_id + 1;

Some(free_chunk_id)
}
Expand All @@ -275,14 +296,9 @@ impl ChunkAllocState {
///
/// Deallocating a free chunk causes panic.
pub fn dealloc(&mut self, chunk_id: ChunkId) {
// debug_assert_eq!(self.alloc_map[chunk_id], true); // may fail in journal's commit
debug_assert_eq!(self.alloc_map[chunk_id], true);
self.alloc_map.set(chunk_id, false);
self.free_count += 1;

// Keep the invariance that all free chunk IDs are no less than min_free
if chunk_id < self.min_free {
self.min_free = chunk_id;
}
}

/// Returns the total number of chunks.
Expand All @@ -306,15 +322,15 @@ impl ChunkAllocState {
////////////////////////////////////////////////////////////////////////////////

/// A persistent edit to the state of a chunk allocator.
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChunkAllocEdit {
edit_table: HashMap<ChunkId, ChunkEdit>,
}

/// The smallest unit of a persistent edit to the
/// state of a chunk allocator, which is
/// a chunk being either allocated or deallocated.
#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
enum ChunkEdit {
Alloc,
Dealloc,
Expand Down Expand Up @@ -379,23 +395,23 @@ impl ChunkAllocEdit {

impl Edit<ChunkAllocState> for ChunkAllocEdit {
fn apply_to(&self, state: &mut ChunkAllocState) {
let mut to_be_deallocated = Vec::new();
for (&chunk_id, chunk_edit) in &self.edit_table {
match chunk_edit {
ChunkEdit::Alloc => {
// Journal's state also needs to be updated
if !state.is_chunk_allocated(chunk_id) {
if state.in_journal {
let _allocated_id = state.alloc().unwrap();
// `_allocated_id` may not be equal to `chunk_id` due to concurrent TXs,
// but eventually the state will be consistent
}

// Except journal, nothing needs to be done
}
ChunkEdit::Dealloc => {
state.dealloc(chunk_id);
to_be_deallocated.push(chunk_id);
}
}
}
for chunk_id in to_be_deallocated {
state.dealloc(chunk_id);
}
}
}

Expand Down
54 changes: 27 additions & 27 deletions core/src/layers/3-log/raw_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl<D: BlockSet> RawLogStore<D> {
/// This method must be called within a TX. Otherwise, this method panics.
pub fn create_log(&self) -> Result<RawLog<D>> {
let mut state = self.state.lock();
let new_log_id = state.persistent.alloc_log_id();
let new_log_id = state.alloc_log_id();
state
.add_to_write_set(new_log_id)
.expect("created log can't appear in write set");
Expand Down Expand Up @@ -335,10 +335,7 @@ impl<D> Debug for RawLogStore<D> {
let state = self.state.lock();
f.debug_struct("RawLogStore")
.field("persistent_log_table", &state.persistent.log_table)
.field(
"persistent_next_free_log_id",
&state.persistent.next_free_log_id,
)
.field("next_free_log_id", &state.next_free_log_id)
.field("write_set", &state.write_set)
.field("chunk_alloc", &self.chunk_alloc)
.finish()
Expand Down Expand Up @@ -612,11 +609,12 @@ impl<'a> RawLogHeadRef<'a> {
debug_assert!(offset + nblocks <= self.entry.head.num_blocks as _);

let prepared_blocks = self.prepare_blocks(offset, nblocks);
debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted());
debug_assert_eq!(prepared_blocks.len(), nblocks);

// Batch read
// Note that `prepared_blocks` are not always sorted
let mut offset = 0;
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) {
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) {
let len = consecutive_blocks.len();
let first_bid = *consecutive_blocks.first().unwrap();
let buf_slice =
Expand Down Expand Up @@ -687,11 +685,12 @@ impl<'a> RawLogTailRef<'a> {
debug_assert!(offset + nblocks <= tail_nblocks);

let prepared_blocks = self.prepare_blocks(offset, nblocks);
debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted());
debug_assert_eq!(prepared_blocks.len(), nblocks);

// Batch read
// Note that `prepared_blocks` are not always sorted
let mut offset = 0;
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) {
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) {
let len = consecutive_blocks.len();
let first_bid = *consecutive_blocks.first().unwrap();
let buf_slice =
Expand All @@ -707,11 +706,12 @@ impl<'a> RawLogTailRef<'a> {
let nblocks = buf.nblocks();

let prepared_blocks = self.prepare_blocks(self.len() as _, nblocks);
debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted());
debug_assert_eq!(prepared_blocks.len(), nblocks);

// Batch write
// Note that `prepared_blocks` are not always sorted
let mut offset = 0;
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) {
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) {
let len = consecutive_blocks.len();
let first_bid = *consecutive_blocks.first().unwrap();
let buf_slice = &buf.as_slice()[offset * BLOCK_SIZE..(offset + len) * BLOCK_SIZE];
Expand Down Expand Up @@ -781,6 +781,7 @@ impl<'a> RawLogTailRef<'a> {
/// The volatile and persistent state of a `RawLogStore`.
struct State {
persistent: RawLogStoreState,
next_free_log_id: u64,
write_set: HashSet<RawLogId>,
lazy_deletes: HashMap<RawLogId, Arc<LazyDelete<RawLogEntry>>>,
}
Expand All @@ -789,7 +790,6 @@ struct State {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RawLogStoreState {
log_table: HashMap<RawLogId, RawLogEntry>,
next_free_log_id: u64,
}

/// A log entry implies the persistent state of the raw log.
Expand All @@ -810,8 +810,14 @@ impl State {
persistent: RawLogStoreState,
lazy_deletes: HashMap<RawLogId, Arc<LazyDelete<RawLogEntry>>>,
) -> Self {
let next_free_log_id = if let Some(max_log_id) = lazy_deletes.keys().max() {
max_log_id + 1
} else {
0
};
Self {
persistent: persistent.clone(),
next_free_log_id,
write_set: HashSet::new(),
lazy_deletes,
}
Expand All @@ -821,6 +827,15 @@ impl State {
edit.apply_to(&mut self.persistent);
}

pub fn alloc_log_id(&mut self) -> u64 {
let new_log_id = self.next_free_log_id;
self.next_free_log_id = self
.next_free_log_id
.checked_add(1)
.expect("64-bit IDs won't be exhausted even though IDs are not recycled");
new_log_id
}

pub fn add_to_write_set(&mut self, log_id: RawLogId) -> Result<()> {
let not_exists = self.write_set.insert(log_id);
if !not_exists {
Expand All @@ -840,19 +855,9 @@ impl RawLogStoreState {
pub fn new() -> Self {
Self {
log_table: HashMap::new(),
next_free_log_id: 0,
}
}

pub fn alloc_log_id(&mut self) -> u64 {
let new_log_id = self.next_free_log_id;
self.next_free_log_id = self
.next_free_log_id
.checked_add(1)
.expect("64-bit IDs won't be exhausted even though IDs are not recycled");
new_log_id
}

pub fn create_log(&mut self, new_log_id: u64) {
let new_log_entry = RawLogEntry {
head: RawLogHead::new(),
Expand Down Expand Up @@ -1039,11 +1044,6 @@ impl Edit<RawLogStoreState> for RawLogStoreEdit {
let RawLogCreate { tail } = create;
state.create_log(log_id);
state.append_log(log_id, tail);

// Journal's state also needs to be updated
if state.next_free_log_id <= log_id {
let _ = state.alloc_log_id();
}
}
RawLogEdit::Append(append) => {
let RawLogAppend { tail } = append;
Expand Down
Loading
Loading