Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cgwalters committed Aug 21, 2024
1 parent 4a96721 commit 38d5996
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 40 deletions.
2 changes: 1 addition & 1 deletion rust/composefs-oci/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn main() {
// main simply invokes a run() where all the work is done.
// This code just captures any errors.
if let Err(e) = run() {
eprintln!("{:#}", e);
eprintln!("error: {:#}", e);
std::process::exit(1);
}
}
150 changes: 111 additions & 39 deletions rust/composefs-oci/src/repo.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::hash_map::VacantEntry;
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, Seek, Write};
use std::ops::Add;
use std::os::fd::AsFd;
use std::path::Path;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::{Arc, Mutex, OnceLock};

use anyhow::{Context, Result};
Expand Down Expand Up @@ -41,14 +42,30 @@ const BY_MANIFEST: &str = "by-manifest-digest";
/// A subdirectory of images/
const LAYERS: &str = "layers";
/// Generic OCI artifacts (may be container images)
const ARTIFACTS: &str = "artifacts/";
const ARTIFACTS: &str = "artifacts";
const TMP: &str = "tmp";
const BOOTID_XATTR: &str = "user.composefs-oci.bootid";
const BY_SHA256_UPLINK: &'static str = "../../";
const TAG_UPLINK: &'static str = "../../objects/";

/// Can be included in a manifest if the digest is pre-computed
const CFS_DIGEST_ANNOTATION: &str = "composefs.rootfs.digest";

type SharedObjectDirs = Arc<Mutex<Vec<Dir>>>;
type ObjectDigest = String;
type ObjectPath = Utf8PathBuf;

fn object_digest_to_path(mut objid: ObjectDigest) -> ObjectPath {
object_digest_to_path_prefixed(objid, "")
}

fn object_digest_to_path_prefixed(mut objid: ObjectDigest, prefix: &str) -> ObjectPath {
// Ensure we are only passed an object id
assert_eq!(objid.len(), 64);
objid.insert(2, '/');
objid.insert_str(0, prefix);
objid.into()
}

/// The extended attribute we attach with the target metadata
// const CFS_ENTRY_META_XATTR: &str = "user.cfs.entry.meta";
Expand Down Expand Up @@ -348,7 +365,6 @@ pub struct RepoTransaction {

impl RepoTransaction {
const TMPROOT: &'static str = "tmp/root";
const BY_SHA256_UPLINK: &'static str = "../../";

fn new(repo: &Repo) -> Result<Self> {
let parent = Arc::clone(&repo.0);
Expand Down Expand Up @@ -472,7 +488,7 @@ impl RepoTransaction {
}

#[context("Importing object")]
fn import_object(&self, mut tmpfile: TempFile) -> Result<Utf8PathBuf> {
fn import_object(&self, mut tmpfile: TempFile) -> Result<ObjectDigest> {
// Rewind to ensure we read from the start
tmpfile.as_file_mut().seek(std::io::SeekFrom::Start(0))?;
// Gather state
Expand All @@ -489,7 +505,8 @@ impl RepoTransaction {
let mut digest = Digest::new();
composefs::fsverity::fsverity_digest_from_fd(tmpfile.as_file().as_fd(), &mut digest)
.context("Computing fsverity digest")?;
let mut buf = hex::encode(digest.get());
let digest = hex::encode(digest.get());
let mut buf = digest.clone();
buf.insert(2, '/');
let buf = Utf8PathBuf::from(buf);
let objpath = buf.as_std_path();
Expand All @@ -502,7 +519,7 @@ impl RepoTransaction {
let mut stats = self.stats.lock().unwrap();
stats.external_objects_count += 1;
stats.external_objects_size += size;
return Ok(buf);
return Ok(digest);
}
}
};
Expand All @@ -520,48 +537,44 @@ impl RepoTransaction {
stats.imported_objects_size += size;
}
}
let mut buf = buf.into_string();
buf.remove(2);
Ok(buf)
}

/// Import an object which also has a known descriptor. The descriptor will be validated (size and content-sha256),
/// and upon success a symlink will be added in objects/by-sha256 with the descriptor's content-sha256.
fn import_descriptor(&self, tmpf: DescriptorWriter, descriptor: &Descriptor) -> Result<()> {
fn import_descriptor(
&self,
tmpf: DescriptorWriter,
descriptor: &Descriptor,
) -> Result<ObjectDigest> {
let descriptor_sha256 = descriptor.sha256()?;
let tmpf = tmpf.finish_validate(&descriptor)?;
let mut objpath = self.import_object(tmpf)?.into_string();
objpath.insert_str(0, Self::BY_SHA256_UPLINK);
let objid = self.import_object(tmpf)?;
let mut uplink_path = objid.clone();
uplink_path.insert_str(0, BY_SHA256_UPLINK);
let mut by_sha256_path = String::from(OBJECTS_BY_SHA256);
append_object_path(&mut by_sha256_path, &descriptor_sha256)?;
ignore_rustix_eexist(rustix::fs::symlinkat(
&objpath,
&uplink_path,
&self.repo.0.dir,
&by_sha256_path,
))?;
tracing::debug!(
"Added descriptor {} to {by_sha256_path}",
descriptor.digest()
);
Ok(())
Ok(objid)
}

#[context("Reading object path of descriptor {}", descriptor.digest())]
fn fsverity_digest_for_descriptor(&self, descriptor: &Descriptor) -> Result<String> {
let descriptor_sha256 = descriptor.sha256()?;
let mut by_sha256_path = String::from(OBJECTS_BY_SHA256);
append_object_path(&mut by_sha256_path, &descriptor_sha256)?;
let buf = rustix::fs::readlinkat(&self.repo.0.dir, &by_sha256_path, Vec::new())?;
let mut buf = buf.into_string()?;
if !(buf.chars().all(|c| c.is_ascii())
&& buf.starts_with(Self::BY_SHA256_UPLINK)
&& buf.bytes().nth(2) == Some(b'/'))
{
anyhow::bail!("Invalid descriptor symlink: {buf}");
}
buf.replace_range(0..Self::BY_SHA256_UPLINK.len(), "");
buf.remove(2);
// Verify
let _ = Sha256Hex::new(&buf);
Ok(buf)
fn add_tag(&self, objid: ObjectDigest, tagpath: &Utf8Path) -> Result<()> {
let mut objpath = object_digest_to_path_prefixed(objid, TAG_UPLINK);
ignore_rustix_eexist(rustix::fs::symlinkat(
objpath.as_std_path(),
&self.repo.0.dir,
tagpath.as_std_path(),
))
}

#[context("Unpacking regfile")]
Expand All @@ -583,7 +596,7 @@ impl RepoTransaction {
// This should always be true, but just in case
anyhow::ensure!(size == wrote_size);

let objpath = self.import_object(tmpfile)?;
let objpath = object_digest_to_path(self.import_object(tmpfile)?);
rustix::fs::linkat(
&self.repo.0.objects,
objpath.as_std_path(),
Expand Down Expand Up @@ -779,6 +792,30 @@ impl Repo {
self.0.meta.verity
}

#[context("Reading object path of descriptor {}", descriptor.digest())]
fn lookup_descriptor(&self, descriptor: &Descriptor) -> Result<Option<String>> {
let descriptor_sha256 = descriptor.sha256()?;
let mut by_sha256_path = String::from(OBJECTS_BY_SHA256);
append_object_path(&mut by_sha256_path, &descriptor_sha256)?;
let buf = match rustix::fs::readlinkat(&self.0.dir, &by_sha256_path, Vec::new()) {
Ok(r) => r,
Err(e) if e == rustix::io::Errno::NOENT => return Ok(None),
Err(e) => return Err(e.into()),
};
let mut buf = buf.into_string()?;
if !(buf.chars().all(|c| c.is_ascii())
&& buf.starts_with(BY_SHA256_UPLINK)
&& buf.bytes().nth(2) == Some(b'/'))
{
anyhow::bail!("Invalid descriptor symlink: {buf}");
}
buf.replace_range(0..BY_SHA256_UPLINK.len(), "");
buf.remove(2);
// Verify
let _ = Sha256Hex::new(&buf);
Ok(Some(buf))
}

/// Returns true if this layer is stored in expanded form.
fn has_layer(&self, diffid: &str) -> Result<bool> {
let mut layer_path = format!("{IMAGES}/{LAYERS}");
Expand Down Expand Up @@ -840,13 +877,28 @@ impl Repo {
proxy: &containers_image_proxy::ImageProxy,
imgref: &str,
) -> Result<(RepoTransaction, Descriptor)> {
let img_filename =
percent_encoding::utf8_percent_encode(imgref, percent_encoding::NON_ALPHANUMERIC)
.to_string();
let img_path: Utf8PathBuf = format!("{ARTIFACTS}/{TAGS}/{img_filename}").into();

if self.0.dir.try_exists(&img_path)? {
return Ok((txn, todo!()));
}

let img = proxy.open_image(&imgref).await?;
let (manifest_digest, raw_manifest) = proxy.fetch_manifest_raw_oci(&img).await?;
let manifest_descriptor = Descriptor::new(
ocidir::oci_spec::image::MediaType::ImageManifest,
raw_manifest.len().try_into().unwrap(),
&manifest_digest,
);
let manifest_fsverity = if let Some(v) = self.lookup_descriptor(&manifest_descriptor)? {
v
} else {
let tmpf = txn.new_descriptor_with_bytes(&raw_manifest)?;
txn.import_descriptor(tmpf, &manifest_descriptor)?
};

if self.has_artifact_manifest(&manifest_descriptor)? {
tracing::debug!("Already stored: {manifest_digest}");
Expand All @@ -862,13 +914,15 @@ impl Repo {
let manifest =
ocidir::oci_spec::image::ImageManifest::from_reader(io::Cursor::new(&raw_manifest))?;
let txn = Arc::new(txn);
let config_raw = proxy.fetch_config_raw(&img).await?;
let config_descriptor = manifest.config();
// Import the config
{
let config_fsverity = if let Some(v) = self.lookup_descriptor(&config_descriptor)? {
v
} else {
let config_raw = proxy.fetch_config_raw(&img).await?;
// Import the config
let tmpf = txn.new_descriptor_with_bytes(&config_raw)?;
txn.import_descriptor(tmpf, &config_descriptor)?;
}
txn.import_descriptor(tmpf, &config_descriptor)?
};

let layers_to_fetch =
manifest
Expand Down Expand Up @@ -901,6 +955,22 @@ impl Repo {
}
tracing::debug!("Imported all layers");

// Map from a layer sha256:<hash> to a fsverity digest
let layer_digest_to_objid =
manifest
.layers()
.iter()
.try_fold(HashMap::new(), |mut acc, layer| {
let mut ent = acc.entry(layer.digest());
if let std::collections::hash_map::Entry::Vacant(v) = ent {
let objid = self
.lookup_descriptor(layer)?
.expect("Should have fetched descriptor");
v.insert(objid);
}
anyhow::Ok(acc)
})?;

let (send_entries, recv_entries) = std::sync::mpsc::sync_channel(5);
let txn_clone = Arc::clone(&txn);
let cfs_worker = tokio::task::spawn_blocking(move || -> Result<_> {
Expand All @@ -915,7 +985,6 @@ impl Repo {
let manifest_desc_ref = &manifest_descriptor;
let manifest_ref = &manifest;
let send_task = async move {
let manifest_fsverity = txn_clone.fsverity_digest_for_descriptor(manifest_desc_ref)?;
// If we fail to send on the channel, then we should get an error from the mkcomposefs job
if let Err(_) = send_entries.send(dir_cfs_entry("/".into())) {
return Ok(());
Expand All @@ -928,7 +997,6 @@ impl Repo {
)?) {
return Ok(());
}
let config_fsverity = txn_clone.fsverity_digest_for_descriptor(&config_descriptor)?;
let path = Utf8Path::new("/config.json");
if let Err(_) = send_entries.send(cfs_entry_for_descriptor(
&config_descriptor,
Expand All @@ -941,7 +1009,9 @@ impl Repo {
return Ok(());
}
for (i, layer) in manifest_ref.layers().iter().enumerate() {
let digest = txn_clone.fsverity_digest_for_descriptor(&layer)?;
let digest = layer_digest_to_objid
.get(layer.digest())
.expect("Should have objid for layer");
let path = &format!("/layers/{i}");
if let Err(_) =
send_entries.send(cfs_entry_for_descriptor(&layer, &digest, path.as_ref())?)
Expand All @@ -955,9 +1025,11 @@ impl Repo {
};

let (mkcfs_result, send_result) = tokio::join!(cfs_worker, send_task);
let cfs_objpath = mkcfs_result.unwrap()?;
let mut cfs_objid = mkcfs_result.unwrap()?;
let _: () = send_result?;

txn.add_tag(cfs_objid, &img_path)?;

// let repo = self.clone();
// tokio::task::spawn_blocking(move || -> Result<_> {
// repo.as_oci().insert_manifest(manifest, Some("default"), platform)
Expand Down

0 comments on commit 38d5996

Please sign in to comment.