Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure quick-repair commit on shutdown #897

Merged
merged 1 commit into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use crate::tree_store::{
TableType, TransactionalMemory, PAGE_SIZE,
};
use crate::types::{Key, Value};
use crate::{CompactionError, DatabaseError, ReadOnlyTable, SavepointError, StorageError};
use crate::{CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError};
use crate::{ReadTransaction, Result, WriteTransaction};
use std::fmt::{Debug, Display, Formatter};

use std::fs::{File, OpenOptions};
use std::io;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::ops::RangeFull;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::{io, thread};

use crate::error::TransactionError;
use crate::sealed::Sealed;
Expand Down Expand Up @@ -713,9 +713,10 @@ impl Database {
if mem.needs_repair()? {
// If the last transaction used 2-phase commit and updated the allocator state table, then
// we can just load the allocator state from there. Otherwise, we need a full repair
if Self::try_quick_repair(mem.clone())? {
if let Some(tree) = Self::get_allocator_state_table(&mem)? {
#[cfg(feature = "logging")]
info!("Quick-repair successful, full repair not needed");
info!("Found valid allocator state, full repair not needed");
mem.load_allocator_state(&tree)?;
} else {
#[cfg(feature = "logging")]
warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
Expand Down Expand Up @@ -770,14 +771,15 @@ impl Database {
Ok(db)
}

// Returns true if quick-repair was successful, or false if a full repair is needed
fn try_quick_repair(mem: Arc<TransactionalMemory>) -> Result<bool> {
// Quick-repair is only possible if the primary was written using 2-phase commit
fn get_allocator_state_table(
mem: &Arc<TransactionalMemory>,
) -> Result<Option<AllocatorStateTree>> {
// The allocator state table is only valid if the primary was written using 2-phase commit
if !mem.used_two_phase_commit() {
return Ok(false);
return Ok(None);
}

// See if the allocator state table is present in the system table tree
// See if it's present in the system table tree
let fake_freed_pages = Arc::new(Mutex::new(vec![]));
let system_table_tree = TableTreeMut::new(
mem.get_system_root(),
Expand All @@ -789,10 +791,10 @@ impl Database {
.get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
.map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
else {
return Ok(false);
return Ok(None);
};

// Load the allocator state from the table
// Load the allocator state table
let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
unreachable!();
};
Expand All @@ -803,7 +805,12 @@ impl Database {
fake_freed_pages,
);

mem.try_load_allocator_state(&tree)
// Make sure this isn't stale allocator state left over from a previous transaction
if !mem.is_valid_allocator_state(&tree)? {
return Ok(None);
}

Ok(Some(tree))
}

fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
Expand Down Expand Up @@ -851,6 +858,35 @@ impl Database {
debug!("Beginning read transaction id={:?}", guard.id());
ReadTransaction::new(self.get_memory(), guard)
}

fn ensure_allocator_state_table(&self) -> Result<(), Error> {
// If the allocator state table is already up to date, we're done
if Self::get_allocator_state_table(&self.mem)?.is_some() {
return Ok(());
}

// Make a new quick-repair commit to update the allocator state table
#[cfg(feature = "logging")]
debug!("Writing allocator state table");
let mut tx = self.begin_write()?;
tx.set_quick_repair(true);
tx.commit()?;

Ok(())
}
}

impl Drop for Database {
fn drop(&mut self) {
if thread::panicking() {
return;
}

if self.ensure_allocator_state_table().is_err() {
#[cfg(feature = "logging")]
warn!("Failed to write allocator state table. Repair may be required at restart.")
}
}
}

pub struct RepairSession {
Expand Down
10 changes: 8 additions & 2 deletions src/tree_store/page_store/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ mod test {
use crate::db::TableDefinition;
use crate::tree_store::page_store::header::{
GOD_BYTE_OFFSET, MAGICNUMBER, PAGE_SIZE, PRIMARY_BIT, RECOVERY_REQUIRED,
TRANSACTION_0_OFFSET, TRANSACTION_1_OFFSET, USER_ROOT_OFFSET,
TRANSACTION_0_OFFSET, TRANSACTION_1_OFFSET, TWO_PHASE_COMMIT, USER_ROOT_OFFSET,
};
use crate::tree_store::page_store::TransactionalMemory;
#[cfg(not(target_os = "windows"))]
Expand All @@ -467,8 +467,12 @@ mod test {
// Start a read to be sure the previous write isn't garbage collected
let read_txn = db.begin_read().unwrap();

let write_txn = db.begin_write().unwrap();
let mut write_txn = db.begin_write().unwrap();
{
// We want this to be the last commit before the database is closed, so it needs to
// use quick-repair -- otherwise, Database::drop() will generate its own quick-repair
// commit on shutdown
write_txn.set_quick_repair(true);
let mut table = write_txn.open_table(X).unwrap();
table.insert("hello", "world2").unwrap();
}
Expand All @@ -487,6 +491,7 @@ mod test {
file.read_exact(&mut buffer).unwrap();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
buffer[0] |= RECOVERY_REQUIRED;
buffer[0] &= !TWO_PHASE_COMMIT;
file.write_all(&buffer).unwrap();

// Overwrite the primary checksum to simulate a failure during commit
Expand Down Expand Up @@ -621,6 +626,7 @@ mod test {
file.read_exact(&mut buffer).unwrap();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
buffer[0] |= RECOVERY_REQUIRED;
buffer[0] &= !TWO_PHASE_COMMIT;
file.write_all(&buffer).unwrap();

assert!(TransactionalMemory::new(
Expand Down
20 changes: 10 additions & 10 deletions src/tree_store/page_store/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::tree_store::{Page, PageNumber};
use crate::StorageBackend;
use crate::{DatabaseError, Result, StorageError};
#[cfg(feature = "logging")]
use log::{debug, warn};
use log::warn;
use std::cmp::{max, min};
#[cfg(debug_assertions)]
use std::collections::HashMap;
Expand Down Expand Up @@ -461,9 +461,8 @@ impl TransactionalMemory {
Ok(true)
}

// Returns true on success, or false if the allocator state was stale (in which case we need
// to fall back to a full repair)
pub(crate) fn try_load_allocator_state(&self, tree: &AllocatorStateTree) -> Result<bool> {
// Returns true if the allocator state table is up to date, or false if it's stale
pub(crate) fn is_valid_allocator_state(&self, tree: &AllocatorStateTree) -> Result<bool> {
// See if this is stale allocator state left over from a previous transaction. That won't
// happen during normal operation, since WriteTransaction::commit() always updates the
// allocator state table before calling TransactionalMemory::commit(), but there are also
Expand All @@ -478,11 +477,12 @@ impl TransactionalMemory {
.try_into()
.unwrap(),
));
if transaction_id != self.get_last_committed_transaction_id()? {
#[cfg(feature = "logging")]
debug!("Ignoring stale allocator state from {:?}", transaction_id);
return Ok(false);
}

Ok(transaction_id == self.get_last_committed_transaction_id()?)
}

pub(crate) fn load_allocator_state(&self, tree: &AllocatorStateTree) -> Result {
assert!(self.is_valid_allocator_state(tree)?);

// Load the allocator state
let mut region_allocators = vec![];
Expand Down Expand Up @@ -517,7 +517,7 @@ impl TransactionalMemory {
self.state.lock().unwrap().header.recovery_required = false;
self.needs_recovery.store(false, Ordering::Release);

Ok(true)
Ok(())
}

pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
Expand Down