From e265c94a2cb7b7f785570bd18ef21946ad5a7fa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 25 Apr 2024 17:08:14 +0200 Subject: [PATCH] fix: Public reconciliation of a concurrent write and remove (of independent paths) (#432) * chore: Write failing test case Also: Add better debug impls for `PublicFile`, `PublicFile`, `PublicLink`, etc. * fix: Concurrent write and remove should reconcile correctly --- wnfs-common/src/link.rs | 8 ++- wnfs/src/private/directory.rs | 4 +- wnfs/src/public/directory.rs | 125 +++++++++++++++++++++++----------- wnfs/src/public/file.rs | 26 ++++++- 4 files changed, 119 insertions(+), 44 deletions(-) diff --git a/wnfs-common/src/link.rs b/wnfs-common/src/link.rs index 1f3bb8b1..e9d9e8da 100644 --- a/wnfs-common/src/link.rs +++ b/wnfs-common/src/link.rs @@ -199,8 +199,12 @@ where { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - Self::Encoded { cid, .. } => f.debug_tuple("Link::Encoded").field(cid).finish(), - Self::Decoded { value, .. } => f.debug_tuple("Link::Decoded").field(value).finish(), + Self::Encoded { cid, value_cache } => f + .debug_struct("Link::Encoded") + .field("cid", &format!("{cid}")) + .field("value_cache", &value_cache.get()) + .finish(), + Self::Decoded { value } => f.debug_tuple("Link::Decoded").field(value).finish(), } } } diff --git a/wnfs/src/private/directory.rs b/wnfs/src/private/directory.rs index 734ec94c..7f25564f 100644 --- a/wnfs/src/private/directory.rs +++ b/wnfs/src/private/directory.rs @@ -1453,13 +1453,13 @@ impl PrivateDirectory { } (PrivateNode::File(_), PrivateNode::Dir(_)) => { // a directory wins over a file - *our_link = other_link.clone(); + our_link.clone_from(other_link); } // file vs. file and dir vs. dir cases _ => { // We tie-break as usual if ord == Ordering::Greater { - *our_link = other_link.clone(); + our_link.clone_from(other_link); } } } diff --git a/wnfs/src/public/directory.rs b/wnfs/src/public/directory.rs index 9bdbf68e..e54d6716 100644 --- a/wnfs/src/public/directory.rs +++ b/wnfs/src/public/directory.rs @@ -4,7 +4,11 @@ use super::{ PublicDirectorySerializable, PublicFile, PublicLink, PublicNode, PublicNodeSerializable, }; use crate::{ - error::FsError, is_readable_wnfs_version, traits::Id, utils, SearchResult, WNFS_VERSION, + error::FsError, + is_readable_wnfs_version, + traits::Id, + utils::{self, OnceCellDebug}, + SearchResult, WNFS_VERSION, }; use anyhow::{bail, ensure, Result}; use async_once_cell::OnceCell; @@ -36,7 +40,6 @@ use wnfs_common::{ /// /// println!("Directory: {:?}", dir); /// ``` -#[derive(Debug)] pub struct PublicDirectory { persisted_as: OnceCell, pub(crate) metadata: Metadata, @@ -868,7 +871,7 @@ impl PublicDirectory { /// See the documentation for the `Reconciliation` enum for more information. pub async fn reconcile( self: &mut Arc, - other: Arc, + other: &Arc, store: &impl BlockStore, ) -> Result { let causal_order = self.clone().causal_compare(other.clone(), store).await?; @@ -877,42 +880,21 @@ impl PublicDirectory { Some(Ordering::Equal) => Reconciliation::AlreadyAhead, Some(Ordering::Greater) => Reconciliation::AlreadyAhead, Some(Ordering::Less) => { - *self = other; + self.clone_from(other); Reconciliation::FastForward } None => { - let file_tie_breaks = self.merge(&other, store).await?; + let mut file_tie_breaks = BTreeSet::new(); + self.reconcile_helper(other, store, &[], &mut file_tie_breaks) + .await?; Reconciliation::Merged { file_tie_breaks } } }) } - /// Merge this node with given other node, ignoring whether the - /// other node is actually ahead in history or not. - /// - /// Prefer using `reconcile`, if you don't know what the difference is! - /// - /// Returns the set of file paths where tie breaks were used to resolve - /// conflicts. This means that for each path there exists a file that has been - /// overwritten with another version. - /// - /// It's possible to walk the history backwards to find which version of each - /// file has been overwritten & merge the two file versions of each file together - /// in an application-specific way and create another history entry. - pub async fn merge<'a>( - self: &'a mut Arc, - other: &'a Arc, - store: &'a impl BlockStore, - ) -> Result>> { - let mut file_tie_breaks = BTreeSet::new(); - self.merge_helper(other, store, &[], &mut file_tie_breaks) - .await?; - Ok(file_tie_breaks) - } - #[cfg_attr(not(target_arch = "wasm32"), async_recursion)] #[cfg_attr(target_arch = "wasm32", async_recursion(?Send))] - async fn merge_helper<'a>( + async fn reconcile_helper<'a>( self: &'a mut Arc, other: &'a Arc, store: &'a impl BlockStore, @@ -944,6 +926,21 @@ impl PublicDirectory { } Entry::Occupied(mut occupied) => { let our_node = occupied.get_mut().resolve_value_mut(store).await?; + + match our_node.causal_compare(other_node, store).await? { + Some(Ordering::Equal) => { + continue; + } + Some(Ordering::Less) => { + our_node.clone_from(other_node); + continue; + } + Some(Ordering::Greater) => { + continue; + } + None => {} + }; + match (our_node, other_node) { (PublicNode::File(our_file), PublicNode::File(other_file)) => { if our_file.merge(other_file, store).await? { @@ -963,7 +960,7 @@ impl PublicDirectory { (PublicNode::Dir(dir), PublicNode::Dir(other_dir)) => { let mut path = current_path.to_vec(); path.push(name.clone()); - dir.merge_helper(other_dir, store, &path, file_tie_breaks) + dir.reconcile_helper(other_dir, store, &path, file_tie_breaks) .await?; } } @@ -975,6 +972,27 @@ impl PublicDirectory { } } +impl std::fmt::Debug for PublicDirectory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PublicDirectory") + .field( + "persisted_as", + &OnceCellDebug(self.persisted_as.get().map(|cid| format!("{cid}"))), + ) + .field("metadata", &self.metadata) + .field("userland", &self.userland) + .field( + "previous", + &self + .previous + .iter() + .map(|cid| format!("{cid}")) + .collect::>(), + ) + .finish() + } +} + impl Id for PublicDirectory { fn get_id(&self) -> String { format!("{:p}", &self.metadata) @@ -1455,6 +1473,37 @@ mod tests { vec![previous_cid] ); } + + #[async_std::test] + async fn reconciliation_of_concurrent_write_and_remove() -> TestResult { + // This path we first write, then both replicas have it, then one replica removes it + let path1 = &["a".into(), "b.txt".into()]; + // This path the second replica writes after forking + let path2 = &["file.txt".into()]; + // we should be left with `b.txt` removed, while `file.txt` is there. + + let time = Utc::now(); + let store = &MemoryBlockStore::new(); + let root_dir = &mut PublicDirectory::new_rc(time); + root_dir.store(store).await?; + root_dir.write(path1, vec![0], time, store).await?; + root_dir.store(store).await?; + + let fork = &mut Arc::clone(root_dir); + fork.rm(path1, store).await?; + fork.store(store).await?; + + root_dir.write(path2, vec![0], time, store).await?; + root_dir.store(store).await?; + + root_dir.reconcile(fork, store).await?; + + assert!(root_dir.get_node(path1, store).await?.is_none()); + + assert_eq!(root_dir.read(path2, store).await?, vec![0]); + + Ok(()) + } } #[cfg(test)] @@ -1571,7 +1620,7 @@ mod proptests { root1.mkdir(&path, time, store).await.unwrap(); - root0.merge(root1, store).await.unwrap(); + root0.reconcile(root1, store).await.unwrap(); let node = root0 .get_node(&path, store) @@ -1598,9 +1647,9 @@ mod proptests { let root1 = convert_fs(fs1, time, store).await.unwrap(); let mut merge_one_way = Arc::clone(&root0); - merge_one_way.merge(&root1, store).await.unwrap(); + merge_one_way.reconcile(&root1, store).await.unwrap(); let mut merge_other_way = Arc::clone(&root1); - merge_other_way.merge(&root0, store).await.unwrap(); + merge_other_way.reconcile(&root0, store).await.unwrap(); let cid_one_way = merge_one_way.store(store).await.unwrap(); let cid_other_way = merge_other_way.store(store).await.unwrap(); @@ -1625,13 +1674,13 @@ mod proptests { let root2 = convert_fs(fs2, time, store).await.unwrap(); let mut merge_0_1_then_2 = Arc::clone(&root0); - merge_0_1_then_2.merge(&root1, store).await.unwrap(); - merge_0_1_then_2.merge(&root2, store).await.unwrap(); + merge_0_1_then_2.reconcile(&root1, store).await.unwrap(); + merge_0_1_then_2.reconcile(&root2, store).await.unwrap(); let mut merge_1_2 = Arc::clone(&root1); - merge_1_2.merge(&root2, store).await.unwrap(); + merge_1_2.reconcile(&root2, store).await.unwrap(); let mut merge_0_with_1_2 = Arc::clone(&root0); - merge_0_with_1_2.merge(&merge_1_2, store).await.unwrap(); + merge_0_with_1_2.reconcile(&merge_1_2, store).await.unwrap(); let cid_one_way = merge_0_1_then_2.store(store).await.unwrap(); let cid_other_way = merge_0_with_1_2.store(store).await.unwrap(); @@ -1657,7 +1706,7 @@ mod proptests { let mut root = convert_fs(fs0, time, store).await.unwrap(); let root1 = convert_fs(fs1, time, store).await.unwrap(); - root.merge(&root1, store).await.unwrap(); + root.reconcile(&root1, store).await.unwrap(); for dir in all_dirs { let exists = root.get_node(&dir, store).await.unwrap().is_some(); diff --git a/wnfs/src/public/file.rs b/wnfs/src/public/file.rs index 2c37e612..6e0c736f 100644 --- a/wnfs/src/public/file.rs +++ b/wnfs/src/public/file.rs @@ -1,7 +1,9 @@ //! Public fs file node. use super::{PublicFileSerializable, PublicNodeSerializable}; -use crate::{error::FsError, is_readable_wnfs_version, traits::Id, WNFS_VERSION}; +use crate::{ + error::FsError, is_readable_wnfs_version, traits::Id, utils::OnceCellDebug, WNFS_VERSION, +}; use anyhow::{anyhow, bail, Result}; use async_once_cell::OnceCell; use chrono::{DateTime, Utc}; @@ -28,7 +30,6 @@ use wnfs_unixfs_file::{builder::FileBuilder, unixfs::UnixFsFile}; /// /// println!("File: {:?}", file); /// ``` -#[derive(Debug)] pub struct PublicFile { persisted_as: OnceCell, pub(crate) metadata: Metadata, @@ -545,6 +546,27 @@ impl PublicFile { } } +impl std::fmt::Debug for PublicFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PublicFile") + .field( + "persisted_as", + &OnceCellDebug(self.persisted_as.get().map(|cid| format!("{cid}"))), + ) + .field("metadata", &self.metadata) + .field("userland", &self.userland) + .field( + "previous", + &self + .previous + .iter() + .map(|cid| format!("{cid}")) + .collect::>(), + ) + .finish() + } +} + impl Storable for PublicFile { type Serializable = PublicNodeSerializable;