Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't mark the region tracker page as allocated #899

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 56 additions & 46 deletions src/tree_store/page_store/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,26 +361,7 @@ impl TransactionalMemory {
}

pub(crate) fn end_repair(&self) -> Result<()> {
let mut state = self.state.lock().unwrap();
let tracker_len = state.allocators.region_tracker.to_vec().len();
let tracker_page = state.header.region_tracker();

let allocator = state.get_region_mut(tracker_page.region);
// Allocate a new tracker page, if the old one was overwritten or is too small
if allocator.is_allocated(tracker_page.page_index, tracker_page.page_order)
|| tracker_page.page_size_bytes(self.page_size) < tracker_len as u64
{
drop(state);
let new_tracker_page = self.allocate(tracker_len)?.get_page_number();

let mut state = self.state.lock().unwrap();
state.header.set_region_tracker(new_tracker_page);
self.write_header(&state.header)?;
self.storage.flush(false)?;
} else {
allocator.record_alloc(tracker_page.page_index, tracker_page.page_order);
drop(state);
}
self.allocate_region_tracker_page()?;

let mut state = self.state.lock().unwrap();
let tracker_page = state.header.region_tracker();
Expand Down Expand Up @@ -436,10 +417,30 @@ impl TransactionalMemory {
num_regions: u32,
) -> Result<bool> {
// Has the number of regions changed since reserve_allocator_state() was called?
if num_regions != self.state.lock().unwrap().header.layout().num_regions() {
let state = self.state.lock().unwrap();
if num_regions != state.header.layout().num_regions() {
return Ok(false);
}

// Temporarily free the region tracker page, because we don't want to include it in our
// recorded allocations
let tracker_page = state.header.region_tracker();
drop(state);
self.free(tracker_page);

let result = self.try_save_allocator_state_inner(tree, num_regions);

// Restore the region tracker page
self.mark_page_allocated(tracker_page);

result
}

fn try_save_allocator_state_inner(
&self,
tree: &mut AllocatorStateTree,
num_regions: u32,
) -> Result<bool> {
for i in 0..num_regions {
let region_bytes =
&self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec();
Expand Down Expand Up @@ -509,10 +510,8 @@ impl TransactionalMemory {
state.allocators.resize_to(layout);
drop(state);

// Allocate a larger region tracker page if necessary. This also happens automatically on
// shutdown, but we do it here because we want our allocator state to exactly match the
// result of running a full repair
self.ensure_region_tracker_page()?;
// Allocate a page for the region tracker
self.allocate_region_tracker_page()?;

self.state.lock().unwrap().header.recovery_required = false;
self.needs_recovery.store(false, Ordering::Release);
Expand All @@ -527,22 +526,33 @@ impl TransactionalMemory {
allocator.is_allocated(page.page_index, page.page_order)
}

// Make sure we have a large enough region-tracker page. This uses allocate_non_transactional(),
// so it should only be called from outside a transaction
fn ensure_region_tracker_page(&self) -> Result {
let state = self.state.lock().unwrap();
// Allocate a page for the region tracker. If possible, this will pick the same page that
// was used last time; otherwise it'll pick a new page and update the database header to
// match
fn allocate_region_tracker_page(&self) -> Result {
let mut state = self.state.lock().unwrap();
let tracker_len = state.allocators.region_tracker.to_vec().len();
let mut tracker_page = state.header.region_tracker();
drop(state);
let tracker_page = state.header.region_tracker();

let allocator = state.get_region_mut(tracker_page.region);
// Pick a new tracker page, if the old one was overwritten or is too small
if allocator.is_allocated(tracker_page.page_index, tracker_page.page_order)
|| tracker_page.page_size_bytes(self.page_size) < tracker_len as u64
{
drop(state);

if tracker_page.page_size_bytes(self.page_size) < (tracker_len as u64) {
// Allocate a larger tracker page
self.free(tracker_page);
tracker_page = self
.allocate_non_transactional(tracker_len)?
let new_tracker_page = self
.allocate_non_transactional(tracker_len, false)?
.get_page_number();

let mut state = self.state.lock().unwrap();
state.header.set_region_tracker(tracker_page);
state.header.set_region_tracker(new_tracker_page);
self.write_header(&state.header)?;
self.storage.flush(false)?;
} else {
// The old page is available, so just mark it as allocated
allocator.record_alloc(tracker_page.page_index, tracker_page.page_order);
drop(state);
}

Ok(())
Expand All @@ -559,10 +569,8 @@ impl TransactionalMemory {
let old_tracker_page = state.header.region_tracker();
// allocate acquires this lock, so we need to drop it
drop(state);
// Allocate the new page. Unlike other region-tracker allocations, this happens inside
// a transaction, so we use an ordinary allocation (which gets committed or rolled back
// along with the rest of the transaction) rather than allocate_non_transactional()
let new_page = self.allocate_lowest(region_tracker_size.try_into().unwrap())?;
let new_page =
self.allocate_non_transactional(region_tracker_size.try_into().unwrap(), true)?;
if new_page.get_page_number().is_before(old_tracker_page) {
let mut state = self.state.lock().unwrap();
state.header.set_region_tracker(new_page.get_page_number());
Expand Down Expand Up @@ -1110,8 +1118,8 @@ impl TransactionalMemory {

// Allocate a page not associated with any transaction. The page is immediately considered committed,
// and won't be rolled back if an abort happens. This is only used for the region tracker
fn allocate_non_transactional(&self, allocation_size: usize) -> Result<PageMut> {
self.allocate_helper(allocation_size, false, false)
fn allocate_non_transactional(&self, allocation_size: usize, lowest: bool) -> Result<PageMut> {
self.allocate_helper(allocation_size, lowest, false)
}

pub(crate) fn count_allocated_pages(&self) -> Result<u64> {
Expand All @@ -1131,12 +1139,14 @@ impl TransactionalMemory {

impl Drop for TransactionalMemory {
fn drop(&mut self) {
if thread::panicking() {
if thread::panicking() || self.needs_recovery.load(Ordering::Acquire) {
return;
}

// Allocate a larger region-tracker page if necessary
if self.ensure_region_tracker_page().is_err() {
// Reallocate the region tracker page, which will grow it if necessary
let tracker_page = self.state.lock().unwrap().header.region_tracker();
self.free(tracker_page);
if self.allocate_region_tracker_page().is_err() {
#[cfg(feature = "logging")]
warn!("Failure while flushing allocator state. Repair required at restart.");
return;
Expand Down