Skip to content

Commit

Permalink
Fix several crashes in repair from corrupted pages
Browse files Browse the repository at this point in the history
During repair, it's normal to encounter a partially-written tree, which
can contain arbitrary invalid data. We can't crash when that happens;
we need to gracefully fail so the repair can try the other commit slot
instead.

This first batch of fixes only covers the crashes most likely to occur in
practice: the ones that can happen on a 64-bit machine, without malicious
input.
  • Loading branch information
mconst authored and cberner committed Nov 5, 2024
1 parent 07ebf79 commit 2388afc
Showing 1 changed file with 41 additions and 13 deletions.
54 changes: 41 additions & 13 deletions src/tree_store/btree_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,19 @@ pub(super) fn leaf_checksum<T: Page>(
fixed_value_size: Option<usize>,
) -> Result<Checksum, StorageError> {
let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
let end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
StorageError::Corrupted(format!(
"Leaf page {:?} corrupted. Number of pairs is zero",
page.get_page_number()
))
})?;
let end = accessor.value_end(last_pair).ok_or_else(|| {
StorageError::Corrupted(format!(
"Leaf page {:?} corrupted. Couldn't find offset for pair {}",
page.get_page_number(),
last_pair,
))
})?;
if end > page.memory().len() {
Err(StorageError::Corrupted(format!(
"Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
Expand All @@ -40,7 +52,19 @@ pub(super) fn branch_checksum<T: Page>(
fixed_key_size: Option<usize>,
) -> Result<Checksum, StorageError> {
let accessor = BranchAccessor::new(page, fixed_key_size);
let end = accessor.key_end(accessor.num_keys() - 1);
let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
StorageError::Corrupted(format!(
"Branch page {:?} corrupted. Number of keys is zero",
page.get_page_number()
))
})?;
let end = accessor.key_end(last_key).ok_or_else(|| {
StorageError::Corrupted(format!(
"Branch page {:?} corrupted. Can't find offset for key {}",
page.get_page_number(),
last_key
))
})?;
if end > page.memory().len() {
Err(StorageError::Corrupted(format!(
"Branch page {:?} corrupted. Last offset {} beyond end of data {}",
Expand Down Expand Up @@ -370,7 +394,8 @@ impl<'a> LeafAccessor<'a> {
}
let offset = 4 + size_of::<u32>() * n;
let end = u32::from_le_bytes(
self.page[offset..(offset + size_of::<u32>())]
self.page
.get(offset..(offset + size_of::<u32>()))?
.try_into()
.unwrap(),
) as usize;
Expand All @@ -391,14 +416,15 @@ impl<'a> LeafAccessor<'a> {
None
} else {
if let Some(fixed) = self.fixed_value_size {
return Some(self.key_end(self.num_pairs - 1).unwrap() + fixed * (n + 1));
return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
}
let mut offset = 4 + size_of::<u32>() * n;
if self.fixed_key_size.is_none() {
offset += size_of::<u32>() * self.num_pairs;
}
let end = u32::from_le_bytes(
self.page[offset..(offset + size_of::<u32>())]
self.page
.get(offset..(offset + size_of::<u32>()))?
.try_into()
.unwrap(),
) as usize;
Expand Down Expand Up @@ -1141,7 +1167,7 @@ impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {

pub(crate) fn total_length(&self) -> usize {
// Keys are stored at the end
self.key_end(self.num_keys() - 1)
self.key_end(self.num_keys() - 1).unwrap()
}

pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
Expand Down Expand Up @@ -1179,30 +1205,32 @@ impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
if n == 0 {
self.key_section_start()
} else {
self.key_end(n - 1)
self.key_end(n - 1).unwrap()
}
}

fn key_end(&self, n: usize) -> usize {
fn key_end(&self, n: usize) -> Option<usize> {
if let Some(fixed) = self.fixed_key_size {
return self.key_section_start() + fixed * (n + 1);
return Some(self.key_section_start() + fixed * (n + 1));
}
let offset = 8
+ (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
+ size_of::<u32>() * n;
u32::from_le_bytes(
self.page.memory()[offset..(offset + size_of::<u32>())]
Some(u32::from_le_bytes(
self.page
.memory()
.get(offset..(offset + size_of::<u32>()))?
.try_into()
.unwrap(),
) as usize
) as usize)
}

pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
if n >= self.num_keys() {
return None;
}
let offset = self.key_offset(n);
let end = self.key_end(n);
let end = self.key_end(n)?;
Some(&self.page.memory()[offset..end])
}

Expand Down

0 comments on commit 2388afc

Please sign in to comment.