Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

index: propagate some of index load errors #2721

Merged
merged 6 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/src/default_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ mod store;
pub use self::composite::{AsCompositeIndex, CompositeIndex, IndexLevelStats, IndexStats};
pub use self::entry::{IndexEntry, IndexPosition};
pub use self::mutable::DefaultMutableIndex;
pub use self::readonly::DefaultReadonlyIndex;
pub use self::readonly::{DefaultReadonlyIndex, ReadonlyIndexLoadError};
pub use self::rev_walk::{
RevWalk, RevWalkDescendants, RevWalkDescendantsGenerationRange, RevWalkGenerationRange,
};
pub use self::store::{DefaultIndexStore, DefaultIndexStoreError, IndexLoadError};
pub use self::store::{DefaultIndexStore, DefaultIndexStoreError};

#[cfg(test)]
mod tests {
Expand Down
46 changes: 19 additions & 27 deletions lib/src/default_index/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use tempfile::NamedTempFile;
use super::composite::{AsCompositeIndex, CompositeIndex, IndexSegment};
use super::entry::{IndexEntry, IndexPosition, SmallIndexPositionsVec};
use super::readonly::{DefaultReadonlyIndex, ReadonlyIndexSegment};
use super::store::IndexLoadError;
use crate::backend::{ChangeId, CommitId, ObjectId};
use crate::commit::Commit;
use crate::file_util::persist_content_addressed_temp_file;
Expand Down Expand Up @@ -171,28 +170,27 @@ impl MutableIndexSegment {
}
}

fn serialize(self) -> Vec<u8> {
assert_eq!(self.graph.len(), self.lookup.len());

let num_commits = self.graph.len() as u32;

let mut buf = vec![];

fn serialize_parent_filename(&self, buf: &mut Vec<u8>) {
if let Some(parent_file) = &self.parent_file {
buf.write_u32::<LittleEndian>(parent_file.name().len() as u32)
.unwrap();
buf.write_all(parent_file.name().as_bytes()).unwrap();
} else {
buf.write_u32::<LittleEndian>(0).unwrap();
}
}

fn serialize_local_entries(&self, buf: &mut Vec<u8>) {
assert_eq!(self.graph.len(), self.lookup.len());

let num_commits = self.graph.len() as u32;
buf.write_u32::<LittleEndian>(num_commits).unwrap();
// We'll write the actual value later
let parent_overflow_offset = buf.len();
buf.write_u32::<LittleEndian>(0_u32).unwrap();

let mut parent_overflow = vec![];
for entry in self.graph {
for entry in &self.graph {
let flags = 0;
buf.write_u32::<LittleEndian>(flags).unwrap();

Expand Down Expand Up @@ -220,7 +218,7 @@ impl MutableIndexSegment {
buf.write_all(entry.commit_id.as_bytes()).unwrap();
}

for (commit_id, pos) in self.lookup {
for (commit_id, pos) in &self.lookup {
buf.write_all(commit_id.as_bytes()).unwrap();
buf.write_u32::<LittleEndian>(pos.0).unwrap();
}
Expand All @@ -231,8 +229,6 @@ impl MutableIndexSegment {
for parent_pos in parent_overflow {
buf.write_u32::<LittleEndian>(parent_pos.0).unwrap();
}

buf
}

/// If the MutableIndex has more than half the commits of its parent
Expand Down Expand Up @@ -280,10 +276,11 @@ impl MutableIndexSegment {
return Ok(self.parent_file.unwrap());
}

let commit_id_length = self.commit_id_length;
let change_id_length = self.change_id_length;

let buf = self.maybe_squash_with_ancestors().serialize();
let squashed = self.maybe_squash_with_ancestors();
let mut buf = Vec::new();
squashed.serialize_parent_filename(&mut buf);
let local_entries_offset = buf.len();
squashed.serialize_local_entries(&mut buf);
let mut hasher = Blake2b512::new();
hasher.update(&buf);
let index_file_id_hex = hex::encode(hasher.finalize());
Expand All @@ -294,19 +291,14 @@ impl MutableIndexSegment {
file.write_all(&buf)?;
persist_content_addressed_temp_file(temp_file, index_file_path)?;

ReadonlyIndexSegment::load_from(
&mut buf.as_slice(),
dir,
Ok(ReadonlyIndexSegment::load_with_parent_file(
&mut &buf[local_entries_offset..],
index_file_id_hex,
commit_id_length,
change_id_length,
squashed.parent_file,
squashed.commit_id_length,
squashed.change_id_length,
)
.map_err(|err| match err {
IndexLoadError::IndexCorrupt(err) => {
panic!("Just-created index file is corrupt: {err}")
}
IndexLoadError::IoError(err) => err,
})
.expect("in-memory index data should be valid and readable"))
}
}

Expand Down
62 changes: 45 additions & 17 deletions lib/src/default_index/readonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,32 @@ use std::any::Any;
use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io;
use std::io::Read;
use std::path::Path;
use std::sync::Arc;

use byteorder::{LittleEndian, ReadBytesExt};
use smallvec::SmallVec;
use thiserror::Error;

use super::composite::{AsCompositeIndex, CompositeIndex, IndexSegment};
use super::entry::{IndexEntry, IndexPosition, SmallIndexPositionsVec};
use super::mutable::DefaultMutableIndex;
use super::store::IndexLoadError;
use crate::backend::{ChangeId, CommitId, ObjectId};
use crate::default_revset_engine;
use crate::index::{HexPrefix, Index, MutableIndex, PrefixResolution, ReadonlyIndex};
use crate::revset::{ResolvedExpression, Revset, RevsetEvaluationError};
use crate::store::Store;

#[derive(Debug, Error)]
pub enum ReadonlyIndexLoadError {
#[error("Index file '{0}' is corrupt.")]
IndexCorrupt(String),
#[error("I/O error while loading index file: {0}")]
IoError(#[from] io::Error),
}

struct CommitGraphEntry<'a> {
data: &'a [u8],
commit_id_length: usize,
Expand Down Expand Up @@ -146,57 +155,76 @@ impl Debug for ReadonlyIndexSegment {
}

impl ReadonlyIndexSegment {
/// Loads both parent segments and local entries from the given `file`.
pub(super) fn load_from(
file: &mut dyn Read,
dir: &Path,
name: String,
commit_id_length: usize,
change_id_length: usize,
) -> Result<Arc<ReadonlyIndexSegment>, IndexLoadError> {
) -> Result<Arc<ReadonlyIndexSegment>, ReadonlyIndexLoadError> {
let parent_filename_len = file.read_u32::<LittleEndian>()?;
let num_parent_commits;
let maybe_parent_file;
if parent_filename_len > 0 {
let maybe_parent_file = if parent_filename_len > 0 {
let mut parent_filename_bytes = vec![0; parent_filename_len as usize];
file.read_exact(&mut parent_filename_bytes)?;
let parent_filename = String::from_utf8(parent_filename_bytes).unwrap();
let parent_filename = String::from_utf8(parent_filename_bytes)
.map_err(|_| ReadonlyIndexLoadError::IndexCorrupt(name.to_owned()))?;
let parent_file_path = dir.join(&parent_filename);
let mut index_file = File::open(parent_file_path).unwrap();
let mut index_file = File::open(parent_file_path)?;
let parent_file = ReadonlyIndexSegment::load_from(
&mut index_file,
dir,
parent_filename,
commit_id_length,
change_id_length,
)?;
num_parent_commits = parent_file.num_parent_commits + parent_file.num_local_commits;
maybe_parent_file = Some(parent_file);
Some(parent_file)
} else {
num_parent_commits = 0;
maybe_parent_file = None;
None
};
let num_commits = file.read_u32::<LittleEndian>()?;
Self::load_with_parent_file(
file,
name,
maybe_parent_file,
commit_id_length,
change_id_length,
)
}

/// Loads local entries from the given `file`, returns new segment linked to
/// the given `parent_file`.
pub(super) fn load_with_parent_file(
file: &mut dyn Read,
name: String,
parent_file: Option<Arc<ReadonlyIndexSegment>>,
commit_id_length: usize,
change_id_length: usize,
) -> Result<Arc<ReadonlyIndexSegment>, ReadonlyIndexLoadError> {
let num_parent_commits = parent_file
.as_ref()
.map_or(0, |segment| segment.as_composite().num_commits());
let num_local_commits = file.read_u32::<LittleEndian>()?;
let num_parent_overflow_entries = file.read_u32::<LittleEndian>()?;
let mut data = vec![];
file.read_to_end(&mut data)?;
let commit_graph_entry_size = CommitGraphEntry::size(commit_id_length, change_id_length);
let graph_size = (num_commits as usize) * commit_graph_entry_size;
let graph_size = (num_local_commits as usize) * commit_graph_entry_size;
let commit_lookup_entry_size = CommitLookupEntry::size(commit_id_length);
let lookup_size = (num_commits as usize) * commit_lookup_entry_size;
let lookup_size = (num_local_commits as usize) * commit_lookup_entry_size;
let parent_overflow_size = (num_parent_overflow_entries as usize) * 4;
let expected_size = graph_size + lookup_size + parent_overflow_size;
if data.len() != expected_size {
return Err(IndexLoadError::IndexCorrupt(name));
return Err(ReadonlyIndexLoadError::IndexCorrupt(name));
}
Ok(Arc::new(ReadonlyIndexSegment {
parent_file: maybe_parent_file,
parent_file,
num_parent_commits,
name,
commit_id_length,
change_id_length,
commit_graph_entry_size,
commit_lookup_entry_size,
num_local_commits: num_commits,
num_local_commits,
data,
}))
}
Expand Down
57 changes: 33 additions & 24 deletions lib/src/default_index/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,29 @@ use tempfile::NamedTempFile;
use thiserror::Error;

use super::mutable::DefaultMutableIndex;
use super::readonly::{DefaultReadonlyIndex, ReadonlyIndexSegment};
use super::readonly::{DefaultReadonlyIndex, ReadonlyIndexLoadError, ReadonlyIndexSegment};
use crate::backend::{CommitId, ObjectId};
use crate::commit::CommitByCommitterTimestamp;
use crate::dag_walk;
use crate::file_util::persist_content_addressed_temp_file;
use crate::index::{Index, IndexStore, IndexWriteError, MutableIndex, ReadonlyIndex};
use crate::index::{
Index, IndexReadError, IndexStore, IndexWriteError, MutableIndex, ReadonlyIndex,
};
use crate::op_store::{OpStoreError, OperationId};
use crate::operation::Operation;
use crate::store::Store;

#[derive(Error, Debug)]
pub enum IndexLoadError {
#[error("Index file '{0}' is corrupt.")]
IndexCorrupt(String),
#[error("I/O error while loading index file: {0}")]
IoError(#[from] io::Error),
}

#[derive(Debug, Error)]
pub enum DefaultIndexStoreError {
#[error("Failed to associate commit index file with a operation {op_id:?}: {source}")]
AssociateIndex {
op_id: OperationId,
source: io::Error,
},
#[error("Failed to load associated commit index file name: {0}")]
LoadAssociation(#[source] io::Error),
#[error("Failed to load commit index: {0}")]
LoadIndex(#[source] ReadonlyIndexLoadError),
#[error("Failed to write commit index file: {0}")]
SaveIndex(#[source] io::Error),
#[error(transparent)]
Expand Down Expand Up @@ -91,19 +89,22 @@ impl DefaultIndexStore {
commit_id_length: usize,
change_id_length: usize,
op_id: &OperationId,
) -> Result<Arc<ReadonlyIndexSegment>, IndexLoadError> {
) -> Result<Arc<ReadonlyIndexSegment>, DefaultIndexStoreError> {
let op_id_file = self.dir.join("operations").join(op_id.hex());
let buf = fs::read(op_id_file).unwrap();
let index_file_id_hex = String::from_utf8(buf).unwrap();
let index_file_id_hex =
fs::read_to_string(op_id_file).map_err(DefaultIndexStoreError::LoadAssociation)?;
let index_file_path = self.dir.join(&index_file_id_hex);
let mut index_file = File::open(index_file_path).unwrap();
let mut index_file = File::open(index_file_path).map_err(|err| {
DefaultIndexStoreError::LoadIndex(ReadonlyIndexLoadError::IoError(err))
})?;
ReadonlyIndexSegment::load_from(
&mut index_file,
&self.dir,
index_file_id_hex,
commit_id_length,
change_id_length,
)
.map_err(DefaultIndexStoreError::LoadIndex)
}

#[tracing::instrument(skip(self, store))]
Expand Down Expand Up @@ -142,9 +143,11 @@ impl DefaultIndexStore {
mutable_index = DefaultMutableIndex::full(commit_id_length, change_id_length);
}
Some(parent_op_id) => {
let parent_file = self
.load_index_at_operation(commit_id_length, change_id_length, &parent_op_id)
.unwrap();
let parent_file = self.load_index_at_operation(
commit_id_length,
change_id_length,
&parent_op_id,
)?;
maybe_parent_file = Some(parent_file.clone());
mutable_index = DefaultMutableIndex::incremental(parent_file)
}
Expand Down Expand Up @@ -233,7 +236,11 @@ impl IndexStore for DefaultIndexStore {
Self::name()
}

fn get_index_at_op(&self, op: &Operation, store: &Arc<Store>) -> Box<dyn ReadonlyIndex> {
fn get_index_at_op(
&self,
op: &Operation,
store: &Arc<Store>,
) -> Result<Box<dyn ReadonlyIndex>, IndexReadError> {
let op_id_hex = op.id().hex();
let op_id_file = self.dir.join("operations").join(op_id_hex);
let index_segment = if op_id_file.exists() {
Expand All @@ -242,21 +249,23 @@ impl IndexStore for DefaultIndexStore {
store.change_id_length(),
op.id(),
) {
Err(IndexLoadError::IndexCorrupt(_)) => {
Err(DefaultIndexStoreError::LoadIndex(ReadonlyIndexLoadError::IndexCorrupt(_))) => {
// If the index was corrupt (maybe it was written in a different format),
// we just reindex.
// TODO: Move this message to a callback or something.
println!("The index was corrupt (maybe the format has changed). Reindexing...");
// TODO: propagate error
std::fs::remove_dir_all(self.dir.join("operations")).unwrap();
std::fs::create_dir(self.dir.join("operations")).unwrap();
self.index_at_operation(store, op).unwrap()
self.index_at_operation(store, op)
}
result => result.unwrap(),
result => result,
}
} else {
self.index_at_operation(store, op).unwrap()
};
Box::new(DefaultReadonlyIndex::from_segment(index_segment))
self.index_at_operation(store, op)
}
.map_err(|err| IndexReadError(err.into()))?;
Ok(Box::new(DefaultReadonlyIndex::from_segment(index_segment)))
}

fn write_index(
Expand Down
11 changes: 10 additions & 1 deletion lib/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ use crate::operation::Operation;
use crate::revset::{ResolvedExpression, Revset, RevsetEvaluationError};
use crate::store::Store;

/// Error while reading index from the `IndexStore`.
#[derive(Debug, Error)]
#[error(transparent)]
pub struct IndexReadError(pub Box<dyn std::error::Error + Send + Sync>);

/// Error while writing index to the `IndexStore`.
#[derive(Debug, Error)]
#[error(transparent)]
Expand All @@ -37,7 +42,11 @@ pub trait IndexStore: Send + Sync + Debug {

fn name(&self) -> &str;

fn get_index_at_op(&self, op: &Operation, store: &Arc<Store>) -> Box<dyn ReadonlyIndex>;
fn get_index_at_op(
&self,
op: &Operation,
store: &Arc<Store>,
) -> Result<Box<dyn ReadonlyIndex>, IndexReadError>;

fn write_index(
&self,
Expand Down
Loading