diff --git a/rust/composefs-oci/src/repo.rs b/rust/composefs-oci/src/repo.rs index 2406f627..b047929a 100644 --- a/rust/composefs-oci/src/repo.rs +++ b/rust/composefs-oci/src/repo.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use std::cell::OnceCell; use std::fs::File; use std::io::{self, BufRead, BufReader, Seek, Write}; +use std::ops::Add; use std::os::fd::AsFd; use std::path::{Path, PathBuf}; use std::process::Command; @@ -47,6 +48,9 @@ const BOOTID_XATTR: &str = "user.composefs-oci.bootid"; /// do something malicious, so we'll just reject it. const API_FILESYSTEMS: &[&str] = &["proc", "sys", "dev"]; +/// Can be included in a manifest if the digest is pre-computed +const CFS_DIGEST_ANNOTATION: &str = "composefs.rootfs.digest"; + /// The extended attribute we attach with the target metadata const CFS_ENTRY_META_XATTR: &str = "user.cfs.entry.meta"; /// This records the virtual number of links (as opposed to @@ -199,6 +203,27 @@ fn linkat_optional_allow_exists( } } +fn linkat_allow_exists( + old_dirfd: &Dir, + old_path: impl AsRef, + new_dirfd: &Dir, + new_path: impl AsRef, +) -> Result { + match rustix::fs::linkat( + old_dirfd.as_fd(), + old_path.as_ref(), + new_dirfd.as_fd(), + new_path.as_ref(), + AtFlags::empty(), + ) { + // We successfully linked + Ok(()) => Ok(true), + // We're idempotent; it's ok if the target already exists + Err(e) if e == rustix::io::Errno::EXIST => Ok(true), + Err(e) => Err(e.into()), + } +} + struct ImportContext { has_verity: bool, /// Reference to global objects @@ -618,19 +643,23 @@ impl Repo { /// Ensure that a downloaded OCI image is "expanded" (unpacked) /// into the composefs-native store. - pub async fn expand(&self, manifest_desc: &Descriptor) -> Result<()> { + pub async fn expand(&self, manifest_desc: &Descriptor) -> Result { let repo = self.clone(); let manifest_desc = manifest_desc.clone(); - // Read and parse the manifest in a helper thread - let manifest: ImageManifest = tokio::task::spawn_blocking(move || -> Result<_> { - let mut f = repo + // Read and parse the manifest in a helper thread, also retaining its fd + let (manifest_fd, manifest) = tokio::task::spawn_blocking(move || -> Result<_> { + let mut bufr = repo .as_oci() .read_blob(&manifest_desc) .map(BufReader::new)?; - serde_json::from_reader(&mut f).map_err(Into::into) + let parsed = serde_json::from_reader::<_, ImageManifest>(&mut bufr)?; + let mut f = bufr.into_inner(); + f.seek(std::io::SeekFrom::Start(0))?; + Ok((f, parsed)) }) .await - .unwrap()?; + .unwrap() + .context("Reading manifest")?; // Read and parse the config in a helper thread let repo = self.clone(); let config = manifest.config().clone(); @@ -657,11 +686,23 @@ impl Repo { }, )?; + let mut stats = ImportLayerStats::default(); for (layer, diffid) in needed_diffs { - let blobsrc = self.as_oci().read_blob(layer).map(BufReader::new)?; + let blobsrc = self.as_oci().read_blob(layer)?; + stats = stats + self.import_layer(blobsrc, diffid.sha256()).await?; } - Ok(()) + if let Some(expected_digest) = manifest + .annotations() + .as_ref() + .and_then(|a| a.get(CFS_DIGEST_ANNOTATION)) + { + // Handle verified manifests later + todo!() + } else { + } + + Ok(stats) } } @@ -686,6 +727,22 @@ pub struct ImportLayerStats { meta_count: u64, } +impl Add for ImportLayerStats { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self { + extant_objects_count: self.extant_objects_count + rhs.extant_objects_count, + extant_objects_size: self.extant_objects_size + rhs.extant_objects_size, + external_objects_count: self.external_objects_count + rhs.external_objects_count, + external_objects_size: self.external_objects_size + rhs.external_objects_size, + imported_objects_count: self.imported_objects_count + rhs.imported_objects_count, + imported_objects_size: self.imported_objects_size + rhs.imported_objects_size, + meta_count: self.meta_count + rhs.meta_count, + } + } +} + #[cfg(test)] mod tests { use std::{