Skip to content
This repository has been archived by the owner on Jan 15, 2025. It is now read-only.

Commit

Permalink
Merge pull request #109 from cgwalters/cancellable-async
Browse files Browse the repository at this point in the history
tokio_util: Add API to do GLib+`GCancellable` from `async fn`
  • Loading branch information
cgwalters authored Sep 29, 2021
2 parents e02a8e8 + 9340002 commit c78222c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 31 deletions.
57 changes: 27 additions & 30 deletions lib/src/tar/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ impl Importer {
size: usize,
checksum: &str,
xattrs: Option<glib::Variant>,
cancellable: Option<&gio::Cancellable>,
) -> Result<()> {
let cancellable = gio::NONE_CANCELLABLE;
let (uid, gid, mode) = header_attrs(entry.header())?;
let w = self.repo.write_regfile(
Some(checksum),
Expand Down Expand Up @@ -256,6 +256,7 @@ impl Importer {
size: usize,
checksum: &str,
xattrs: Option<glib::Variant>,
cancellable: Option<&gio::Cancellable>,
) -> Result<()> {
let (uid, gid, mode) = header_attrs(entry.header())?;
assert!(size <= SMALL_REGFILE_SIZE);
Expand All @@ -268,7 +269,7 @@ impl Importer {
mode,
xattrs.as_ref(),
&buf,
gio::NONE_CANCELLABLE,
cancellable,
)?;
debug_assert_eq!(c.as_str(), checksum);
self.stats.regfile_small += 1;
Expand Down Expand Up @@ -311,8 +312,8 @@ impl Importer {
entry: tar::Entry<R>,
checksum: &str,
xattrs: Option<glib::Variant>,
cancellable: Option<&gio::Cancellable>,
) -> Result<()> {
let cancellable = gio::NONE_CANCELLABLE;
if self
.repo
.has_object(ostree::ObjectType::File, checksum, cancellable)?
Expand All @@ -323,9 +324,9 @@ impl Importer {
match entry.header().entry_type() {
tar::EntryType::Regular => {
if size > SMALL_REGFILE_SIZE {
self.import_large_regfile_object(entry, size, checksum, xattrs)
self.import_large_regfile_object(entry, size, checksum, xattrs, cancellable)
} else {
self.import_small_regfile_object(entry, size, checksum, xattrs)
self.import_small_regfile_object(entry, size, checksum, xattrs, cancellable)
}
}
tar::EntryType::Symlink => self.import_symlink_object(entry, checksum, xattrs),
Expand All @@ -340,6 +341,7 @@ impl Importer {
&mut self,
entry: tar::Entry<'b, R>,
path: &Utf8Path,
cancellable: Option<&gio::Cancellable>,
) -> Result<()> {
let (parentname, mut name, mut objtype) = parse_object_entry_path(path)?;

Expand Down Expand Up @@ -385,7 +387,7 @@ impl Importer {
if is_xattrs {
self.import_xattr_ref(entry, checksum)
} else {
self.import_content_object(entry, &checksum, xattr_ref)
self.import_content_object(entry, &checksum, xattr_ref, cancellable)
}
}
objtype => self.import_metadata(entry, &checksum, objtype),
Expand Down Expand Up @@ -452,8 +454,12 @@ impl Importer {
Ok(())
}

fn import(mut self, archive: &mut tar::Archive<impl Read + Send + Unpin>) -> Result<String> {
self.repo.prepare_transaction(gio::NONE_CANCELLABLE)?;
fn import(
mut self,
archive: &mut tar::Archive<impl Read + Send + Unpin>,
cancellable: Option<&gio::Cancellable>,
) -> Result<String> {
self.repo.prepare_transaction(cancellable)?;

// Create an iterator that skips over directories; we just care about the file names.
let mut ents = archive.entries()?.filter_map(|e| match e {
Expand Down Expand Up @@ -518,29 +524,20 @@ impl Importer {
)?;

// Write the commit object, which also verifies its checksum.
let actual_checksum = self.repo.write_metadata(
objtype,
Some(&checksum),
&commit,
gio::NONE_CANCELLABLE,
)?;
let actual_checksum =
self.repo
.write_metadata(objtype, Some(&checksum), &commit, cancellable)?;
assert_eq!(actual_checksum.to_hex(), checksum);
event!(Level::DEBUG, "Imported {}.commit", checksum);

// Finally, write the detached metadata.
self.repo.write_commit_detached_metadata(
&checksum,
Some(&commitmeta),
gio::NONE_CANCELLABLE,
)?;
self.repo
.write_commit_detached_metadata(&checksum, Some(&commitmeta), cancellable)?;
} else {
// We're not doing any validation of the commit, so go ahead and write it.
let actual_checksum = self.repo.write_metadata(
objtype,
Some(&checksum),
&commit,
gio::NONE_CANCELLABLE,
)?;
let actual_checksum =
self.repo
.write_metadata(objtype, Some(&checksum), &commit, cancellable)?;
assert_eq!(actual_checksum.to_hex(), checksum);
event!(Level::DEBUG, "Imported {}.commit", checksum);

Expand All @@ -559,7 +556,7 @@ impl Importer {
)?;
}
_ => {
self.import_object(next_ent, &nextent_path)?;
self.import_object(next_ent, &nextent_path, cancellable)?;
}
}
}
Expand All @@ -568,12 +565,12 @@ impl Importer {
let (entry, path) = entry?;

if let Ok(p) = path.strip_prefix("objects/") {
self.import_object(entry, p)?;
self.import_object(entry, p, cancellable)?;
} else if path.strip_prefix("xattrs/").is_ok() {
self.import_xattrs(entry)?;
}
}
self.repo.commit_transaction(gio::NONE_CANCELLABLE)?;
self.repo.commit_transaction(cancellable)?;

Ok(checksum)
}
Expand Down Expand Up @@ -606,10 +603,10 @@ pub async fn import_tar(
let options = options.unwrap_or_default();
let src = ReadBridge::new(src);
let repo = repo.clone();
let import = tokio::task::spawn_blocking(move || {
let import = crate::tokio_util::spawn_blocking_cancellable(move |cancellable| {
let mut archive = tar::Archive::new(src);
let importer = Importer::new(&repo, options.remote);
importer.import(&mut archive)
importer.import(&mut archive, Some(cancellable))
})
.map_err(anyhow::Error::msg);
let import: String = import.await??;
Expand Down
28 changes: 27 additions & 1 deletion lib/src/tokio_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
use anyhow::Result;
use futures_util::Future;
use ostree::gio;
use ostree::prelude::CancellableExt;

/// Call a faillible future, while monitoring `cancellable` and return an error if cancelled.
pub async fn run_with_cancellable<F, R>(f: F, cancellable: &ostree::gio::Cancellable) -> Result<R>
pub async fn run_with_cancellable<F, R>(f: F, cancellable: &gio::Cancellable) -> Result<R>
where
F: Future<Output = Result<R>>,
{
Expand All @@ -22,6 +23,31 @@ where
}
}

struct CancelOnDrop(gio::Cancellable);

impl Drop for CancelOnDrop {
fn drop(&mut self) {
self.0.cancel();
}
}

/// Wrapper for [`tokio::task::spawn_blocking`] which provides a [`gio::Cancellable`] that will be triggered on drop.
///
/// This function should be used in a Rust/tokio native `async fn`, but that want to invoke
/// GLib style blocking APIs that use `GCancellable`. The cancellable will be triggered when this
/// future is dropped, which helps bound thread usage.
///
/// This is in a sense the inverse of [`run_with_cancellable`].
pub fn spawn_blocking_cancellable<F, R>(f: F) -> tokio::task::JoinHandle<R>
where
F: FnOnce(&gio::Cancellable) -> R + Send + 'static,
R: Send + 'static,
{
tokio::task::spawn_blocking(move || {
let dropper = CancelOnDrop(gio::Cancellable::new());
f(&dropper.0)
})
}
#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit c78222c

Please sign in to comment.