diff --git a/CHANGELOG.md b/CHANGELOG.md index b506b7417..11436e985 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Re-materialize blobs which were only partially written to disc due to node crash [#618](https://github.com/p2panda/aquadoggo/pull/618) + ## [0.7.3] ### Fixed diff --git a/aquadoggo/src/materializer/tasks/blob.rs b/aquadoggo/src/materializer/tasks/blob.rs index 6dcd2a493..73219fb4f 100644 --- a/aquadoggo/src/materializer/tasks/blob.rs +++ b/aquadoggo/src/materializer/tasks/blob.rs @@ -3,6 +3,7 @@ use futures::{pin_mut, StreamExt}; use log::{debug, info}; use p2panda_rs::document::traits::AsDocument; +use p2panda_rs::operation::OperationValue; use p2panda_rs::schema::SchemaId; use p2panda_rs::storage_provider::traits::DocumentStore; use tokio::fs::OpenOptions; @@ -46,13 +47,25 @@ pub async fn blob_task(context: Context, input: TaskInput) -> TaskResult { + let metadata = file + .metadata() + .await + .expect("Can retrieve blob file metadata"); + + let expected_blob_length = match blob_document.get("length").unwrap() { + OperationValue::Integer(length) => length, + _ => unreachable!(), + }; + + metadata.len() == *expected_blob_length as u64 + } + Err(_) => false, + }; if is_blob_materialized { return Err(TaskError::Failure(format!( "Blob file already exists at {}", @@ -249,4 +262,41 @@ mod tests { assert!(result.is_err()); }) } + + #[rstest] + fn re_materialize_blob_after_previous_task_did_not_complete(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + // Publish blob + let blob_data = "Hello, World!"; + let blob_view_id = + add_blob(&mut node, blob_data.as_bytes(), 5, "plain/text", &key_pair).await; + + // Construct the expected path to the blob view file + let base_path = &node.context.config.blobs_base_path; + let blob_path = base_path.join(blob_view_id.to_string()); + + // Write some bytes to the expected blob path which are < than the actual blob + // bytes length. We expect this file to be overwritten when we run the blob task. + fs::write(blob_path.clone(), vec![0, 1, 2]).await.unwrap(); + + // Run blob task + let result = blob_task( + node.context.clone(), + TaskInput::DocumentViewId(blob_view_id.clone()), + ) + .await; + + // It shouldn't fail + assert!(result.is_ok()); + // It should return no extra tasks + assert!(result.unwrap().is_none()); + + // Read from this file + let retrieved_blob_data = fs::read(blob_path).await; + + // Number of bytes for the publish and materialized blob should be the same + assert!(retrieved_blob_data.is_ok()); + assert_eq!(blob_data.len(), retrieved_blob_data.unwrap().len()); + }) + } } diff --git a/aquadoggo_cli/src/config.rs b/aquadoggo_cli/src/config.rs index 4cddc5546..758beec51 100644 --- a/aquadoggo_cli/src/config.rs +++ b/aquadoggo_cli/src/config.rs @@ -27,7 +27,7 @@ type ConfigFilePath = Option; /// Returns a partly unchecked configuration object which results from all of these sources. It /// still needs to be converted for aquadoggo as it might still contain invalid values. pub fn load_config() -> Result<(ConfigFilePath, ConfigFile)> { - // Parse command line arguments and CONFIG environment variable first to get optional config + // Parse command line arguments and CONFIG environment variable first to get optional config // file path let cli = Cli::parse();