Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify op heads interface #2749

Merged
merged 8 commits into from
Dec 28, 2023
68 changes: 34 additions & 34 deletions lib/src/op_heads_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use itertools::Itertools;
use thiserror::Error;

use crate::dag_walk;
use crate::op_store::{OpStore, OpStoreError, OpStoreResult, OperationId};
use crate::op_store::{OpStore, OpStoreError, OperationId};
use crate::operation::Operation;

#[derive(Debug, Error)]
Expand All @@ -35,41 +35,24 @@ pub enum OpHeadResolutionError<E> {
Err(#[source] E),
}

pub trait OpHeadsStoreLock<'a> {
fn promote_new_op(&self, new_op: &Operation);
}
pub trait OpHeadsStoreLock {}

/// Manages the set of current heads of the operation log.
pub trait OpHeadsStore: Send + Sync + Debug {
fn name(&self) -> &str;

fn add_op_head(&self, id: &OperationId);

fn remove_op_head(&self, id: &OperationId);
/// Remove the old op heads and add the new one.
///
/// The old op heads must not contain the new one.
fn update_op_heads(&self, old_ids: &[OperationId], new_id: &OperationId);

fn get_op_heads(&self) -> Vec<OperationId>;

fn lock<'a>(&'a self) -> Box<dyn OpHeadsStoreLock<'a> + 'a>;

/// Removes operations in the input that are ancestors of other operations
/// in the input. The ancestors are removed both from the list and from
/// storage.
// TODO: maybe introduce OpHeadsStoreError?
fn handle_ancestor_ops(&self, op_heads: Vec<Operation>) -> OpStoreResult<Vec<Operation>> {
let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
// Remove ancestors so we don't create merge operation with an operation and its
// ancestor
let op_heads = dag_walk::heads_ok(
op_heads.into_iter().map(Ok),
|op: &Operation| op.id().clone(),
|op: &Operation| op.parents().collect_vec(),
)?;
let op_head_ids_after: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
for removed_op_head in op_head_ids_before.difference(&op_head_ids_after) {
self.remove_op_head(removed_op_head);
}
Ok(op_heads.into_iter().collect())
}
/// Optionally takes a lock on the op heads store. The purpose of the lock
/// is to prevent concurrent processes from resolving the same divergent
/// operations. It is not needed for correctness; implementations are free
/// to return a type that doesn't hold.
fn lock(&self) -> Box<dyn OpHeadsStoreLock + '_>;
}

// Given an OpHeadsStore, fetch and resolve its op heads down to one under a
Expand Down Expand Up @@ -102,7 +85,7 @@ pub fn resolve_op_heads<E>(
// Note that the locking isn't necessary for correctness; we take the lock
// only to prevent other concurrent processes from doing the same work (and
// producing another set of divergent heads).
let lock = op_heads_store.lock();
let _lock = op_heads_store.lock();
let op_head_ids = op_heads_store.get_op_heads();

if op_head_ids.is_empty() {
Expand All @@ -115,24 +98,41 @@ pub fn resolve_op_heads<E>(
return Ok(Operation::new(op_store.clone(), op_head_id, op_head));
}

let op_heads = op_head_ids
let op_heads: Vec<_> = op_head_ids
.iter()
.map(|op_id: &OperationId| -> Result<Operation, OpStoreError> {
let data = op_store.read_operation(op_id)?;
Ok(Operation::new(op_store.clone(), op_id.clone(), data))
})
.try_collect()?;
let mut op_heads = op_heads_store.handle_ancestor_ops(op_heads)?;
// Remove ancestors so we don't create merge operation with an operation and its
// ancestor
let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
let filtered_op_heads = dag_walk::heads_ok(
op_heads.into_iter().map(Ok),
|op: &Operation| op.id().clone(),
|op: &Operation| op.parents().collect_vec(),
)?;
let op_head_ids_after: HashSet<_> =
filtered_op_heads.iter().map(|op| op.id().clone()).collect();
let ancestor_op_heads = op_head_ids_before
.difference(&op_head_ids_after)
.cloned()
.collect_vec();
let mut op_heads = filtered_op_heads.into_iter().collect_vec();

// Return without creating a merge operation
if op_heads.len() == 1 {
return Ok(op_heads.pop().unwrap());
if let [op_head] = &*op_heads {
op_heads_store.update_op_heads(&ancestor_op_heads, op_head.id());
return Ok(op_head.clone());
}

op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
match resolver(op_heads) {
Ok(new_op) => {
lock.promote_new_op(&new_op);
let mut old_op_heads = ancestor_op_heads;
old_op_heads.extend_from_slice(new_op.parent_ids());
op_heads_store.update_op_heads(&old_op_heads, new_op.id());
Ok(new_op)
}
Err(e) => Err(OpHeadResolutionError::Err(e)),
Expand Down
2 changes: 1 addition & 1 deletion lib/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl ReadonlyRepo {
let init_operation_id = op_store.write_operation(&init_operation).unwrap();
let init_operation = Operation::new(op_store.clone(), init_operation_id, init_operation);
let op_heads_store = op_heads_store_initializer(user_settings, &op_heads_path);
op_heads_store.add_op_head(init_operation.id());
op_heads_store.update_op_heads(&[], init_operation.id());
let op_heads_type_path = op_heads_path.join("type");
fs::write(&op_heads_type_path, op_heads_store.name()).context(&op_heads_type_path)?;
let op_heads_store = Arc::from(op_heads_store);
Expand Down
165 changes: 20 additions & 145 deletions lib/src/simple_op_heads_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::backend::ObjectId;
use crate::lock::FileLock;
use crate::op_heads_store::{OpHeadsStore, OpHeadsStoreLock};
use crate::op_store::OperationId;
use crate::operation::Operation;

pub struct SimpleOpHeadsStore {
dir: PathBuf,
Expand All @@ -49,48 +48,8 @@ impl SimpleOpHeadsStore {

pub fn load(dir: &Path) -> Self {
let op_heads_dir = dir.join("heads");
// TODO: Delete this migration code at 0.9+ or so
if !op_heads_dir.exists() {
// For some months during 0.7 development, the name was "simple_op_heads"
if dir.join("simple_op_heads").exists() {
fs::rename(dir.join("simple_op_heads"), &op_heads_dir).unwrap();
} else {
let old_store = Self {
dir: dir.to_path_buf(),
};
fs::create_dir(&op_heads_dir).unwrap();
let new_store = Self { dir: op_heads_dir };

for id in old_store.get_op_heads() {
old_store.remove_op_head(&id);
new_store.add_op_head(&id);
}
return new_store;
}
}

Self { dir: op_heads_dir }
}
}

struct SimpleOpHeadsStoreLock<'a> {
store: &'a dyn OpHeadsStore,
_lock: FileLock,
}

impl OpHeadsStoreLock<'_> for SimpleOpHeadsStoreLock<'_> {
fn promote_new_op(&self, new_op: &Operation) {
self.store.add_op_head(new_op.id());
for old_id in new_op.parent_ids() {
self.store.remove_op_head(old_id);
}
}
}

impl OpHeadsStore for SimpleOpHeadsStore {
fn name(&self) -> &str {
Self::name()
}

fn add_op_head(&self, id: &OperationId) {
std::fs::write(self.dir.join(id.hex()), "").unwrap();
Expand All @@ -103,6 +62,25 @@ impl OpHeadsStore for SimpleOpHeadsStore {
// heads. We'll detect that next time we load the view.
std::fs::remove_file(self.dir.join(id.hex())).ok();
}
}

struct SimpleOpHeadsStoreLock {
_lock: FileLock,
}

impl OpHeadsStoreLock for SimpleOpHeadsStoreLock {}

impl OpHeadsStore for SimpleOpHeadsStore {
fn name(&self) -> &str {
Self::name()
}

fn update_op_heads(&self, old_ids: &[OperationId], new_id: &OperationId) {
self.add_op_head(new_id);
for old_id in old_ids {
self.remove_op_head(old_id)
}
}

fn get_op_heads(&self) -> Vec<OperationId> {
let mut op_heads = vec![];
Expand All @@ -116,112 +94,9 @@ impl OpHeadsStore for SimpleOpHeadsStore {
op_heads
}

fn lock<'a>(&'a self) -> Box<dyn OpHeadsStoreLock<'a> + 'a> {
fn lock(&self) -> Box<dyn OpHeadsStoreLock + '_> {
Box::new(SimpleOpHeadsStoreLock {
store: self,
_lock: FileLock::lock(self.dir.join("lock")),
})
}
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::fs;
use std::path::Path;

use itertools::Itertools;

use crate::backend::ObjectId;
use crate::op_heads_store::OpHeadsStore;
use crate::op_store::OperationId;
use crate::simple_op_heads_store::SimpleOpHeadsStore;

fn read_dir(dir: &Path) -> Vec<String> {
fs::read_dir(dir)
.unwrap()
.map(|entry| entry.unwrap().file_name().to_str().unwrap().to_string())
.sorted()
.collect()
}

#[test]
fn test_simple_op_heads_store_migration_into_subdir() {
let test_dir = testutils::new_temp_dir();
let store_path = test_dir.path().join("op_heads");
fs::create_dir(&store_path).unwrap();

let op1 = OperationId::from_hex("012345");
let op2 = OperationId::from_hex("abcdef");
let mut ops = HashSet::new();
ops.insert(op1.clone());
ops.insert(op2.clone());

let old_store = SimpleOpHeadsStore {
dir: store_path.clone(),
};
old_store.add_op_head(&op1);
old_store.add_op_head(&op2);

assert_eq!(vec!["012345", "abcdef"], read_dir(&store_path));
drop(old_store);

let new_store = SimpleOpHeadsStore::load(&store_path);
assert_eq!(&ops, &new_store.get_op_heads().into_iter().collect());
assert_eq!(vec!["heads"], read_dir(&store_path));
assert_eq!(
vec!["012345", "abcdef"],
read_dir(&store_path.join("heads"))
);

// Migration is idempotent
let new_store = SimpleOpHeadsStore::load(&store_path);
assert_eq!(&ops, &new_store.get_op_heads().into_iter().collect());
assert_eq!(vec!["heads"], read_dir(&store_path));
assert_eq!(
vec!["012345", "abcdef"],
read_dir(&store_path.join("heads"))
);
}

#[test]
fn test_simple_op_heads_store_migration_change_dirname() {
let test_dir = testutils::new_temp_dir();
let store_path = test_dir.path().join("op_heads");
fs::create_dir(&store_path).unwrap();
let old_heads_path = store_path.join("simple_op_heads");
fs::create_dir(&old_heads_path).unwrap();

let op1 = OperationId::from_hex("012345");
let op2 = OperationId::from_hex("abcdef");
let mut ops = HashSet::new();
ops.insert(op1.clone());
ops.insert(op2.clone());

let old_store = SimpleOpHeadsStore {
dir: old_heads_path,
};
old_store.add_op_head(&op1);
old_store.add_op_head(&op2);

assert_eq!(vec!["simple_op_heads"], read_dir(&store_path));
drop(old_store);

let new_store = SimpleOpHeadsStore::load(&store_path);
assert_eq!(&ops, &new_store.get_op_heads().into_iter().collect());
assert_eq!(vec!["heads"], read_dir(&store_path));
assert_eq!(
vec!["012345", "abcdef"],
read_dir(&store_path.join("heads"))
);

// Migration is idempotent
let new_store = SimpleOpHeadsStore::load(&store_path);
assert_eq!(&ops, &new_store.get_op_heads().into_iter().collect());
assert_eq!(vec!["heads"], read_dir(&store_path));
assert_eq!(
vec!["012345", "abcdef"],
read_dir(&store_path.join("heads"))
);
}
}
10 changes: 6 additions & 4 deletions lib/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,12 @@ impl UnpublishedOperation {

pub fn publish(mut self) -> Arc<ReadonlyRepo> {
let data = self.data.take().unwrap();
self.repo_loader
.op_heads_store()
.lock()
.promote_new_op(&data.operation);
{
let _lock = self.repo_loader.op_heads_store().lock();
self.repo_loader
.op_heads_store()
.update_op_heads(data.operation.parent_ids(), data.operation.id());
}
let repo = self
.repo_loader
.create_from(data.operation, data.view, data.index);
Expand Down