From a496bedc6d4ce71c3d3b79376ef73566a8f7adb5 Mon Sep 17 00:00:00 2001 From: Remi Bernotavicius Date: Sun, 24 Nov 2024 12:01:35 -0800 Subject: [PATCH] Broker reads manifests in an async way It kicks off a job to read a manifest which sends the read digests back to the scheduler via its message queue. This is better than previously because it doesn't do IO in the scheduler. Local file IO isn't "blocking" in the traditional sense, but it can take a while sometimes. Also, if we want our manifests to live on some network backed cache, we will have to do real blocking in this path anyway. --- crates/maelstrom-broker/src/scheduler_task.rs | 111 +++++- .../src/scheduler_task/scheduler.rs | 372 +++++++++--------- 2 files changed, 276 insertions(+), 207 deletions(-) diff --git a/crates/maelstrom-broker/src/scheduler_task.rs b/crates/maelstrom-broker/src/scheduler_task.rs index 8c4be5e7..bc342568 100644 --- a/crates/maelstrom-broker/src/scheduler_task.rs +++ b/crates/maelstrom-broker/src/scheduler_task.rs @@ -2,25 +2,38 @@ mod scheduler; pub use maelstrom_util::cache::CacheDir; -use maelstrom_base::proto::{BrokerToClient, BrokerToMonitor, BrokerToWorker}; +use maelstrom_base::{ + manifest::{ManifestEntryData, ManifestFileData}, + proto::{BrokerToClient, BrokerToMonitor, BrokerToWorker}, + JobId, Sha256Digest, +}; use maelstrom_util::{ + async_fs, cache::{self, Cache, TempFileFactory}, config::common::CacheSize, - manifest::ManifestReader, + manifest::AsyncManifestReader, root::RootBuf, sync, }; +use ref_cast::RefCast; use scheduler::{Message, Scheduler, SchedulerDeps}; use slog::Logger; -use std::{ - fs, io, - path::{Path, PathBuf}, - sync::mpsc as std_mpsc, -}; +use std::{path::PathBuf, sync::mpsc as std_mpsc}; use tokio::sync::mpsc as tokio_mpsc; +use tokio::task::JoinSet; #[derive(Debug)] -pub struct PassThroughDeps; +pub struct PassThroughDeps { + manifest_reader_sender: tokio_mpsc::UnboundedSender<(Sha256Digest, JobId)>, +} + +impl PassThroughDeps { + fn new(manifest_reader_sender: tokio_mpsc::UnboundedSender<(Sha256Digest, JobId)>) -> Self { + Self { + manifest_reader_sender, + } + } +} /// The production implementation of [SchedulerDeps]. This implementation just hands the /// message to the provided sender. @@ -29,8 +42,6 @@ impl SchedulerDeps for PassThroughDeps { type WorkerSender = tokio_mpsc::UnboundedSender; type MonitorSender = tokio_mpsc::UnboundedSender; type WorkerArtifactFetcherSender = std_mpsc::Sender>; - type ManifestError = io::Error; - type ManifestIterator = ManifestReader; fn send_message_to_client(&mut self, sender: &mut Self::ClientSender, message: BrokerToClient) { sender.send(message).ok(); @@ -56,8 +67,64 @@ impl SchedulerDeps for PassThroughDeps { sender.send(message).ok(); } - fn read_manifest(&mut self, path: &Path) -> io::Result> { - ManifestReader::new(fs::File::open(path)?) + fn read_manifest(&mut self, manifest_digest: Sha256Digest, job_id: JobId) { + self.manifest_reader_sender + .send((manifest_digest, job_id)) + .ok(); + } +} + +#[derive(Debug)] +struct CacheManifestReader { + tasks: JoinSet<()>, + receiver: tokio_mpsc::UnboundedReceiver<(Sha256Digest, JobId)>, + sender: SchedulerSender, +} + +async fn read_manifest_from_path( + sender: SchedulerSender, + manifest_path: PathBuf, + job_id: JobId, +) -> anyhow::Result<()> { + let fs = async_fs::Fs::new(); + let mut reader = AsyncManifestReader::new(fs.open_file(manifest_path).await?).await?; + while let Some(entry) = reader.next().await? { + if let ManifestEntryData::File(ManifestFileData::Digest(digest)) = entry.data { + sender.send(Message::GotManifestEntry(digest, job_id)).ok(); + } + } + Ok(()) +} + +impl CacheManifestReader { + fn new( + receiver: tokio_mpsc::UnboundedReceiver<(Sha256Digest, JobId)>, + sender: SchedulerSender, + ) -> Self { + Self { + tasks: JoinSet::new(), + receiver, + sender, + } + } + + fn kick_off_manifest_readings(&mut self, cache: &TaskCache) { + while let Ok((manifest_digest, job_id)) = self.receiver.try_recv() { + let sender = self.sender.clone(); + let manifest_path = cache + .cache_path(scheduler::BrokerKey::ref_cast(&manifest_digest)) + .into_path_buf(); + self.tasks.spawn(async move { + let result = read_manifest_from_path(sender.clone(), manifest_path, job_id).await; + sender + .send(Message::FinishedReadingManifest( + manifest_digest, + job_id, + result, + )) + .ok(); + }); + } } } @@ -69,14 +136,15 @@ pub type SchedulerMessage = Message; /// This type is used often enough to warrant an alias. pub type SchedulerSender = tokio_mpsc::UnboundedSender; +type TaskCache = Cache; + pub struct SchedulerTask { - scheduler: Scheduler< - Cache, - PassThroughDeps, - >, + scheduler: Scheduler, sender: SchedulerSender, receiver: tokio_mpsc::UnboundedReceiver, temp_file_factory: TempFileFactory, + manifest_reader: CacheManifestReader, + deps: PassThroughDeps, } impl SchedulerTask { @@ -84,11 +152,18 @@ impl SchedulerTask { let (sender, receiver) = tokio_mpsc::unbounded_channel(); let (cache, temp_file_factory) = Cache::new(cache::fs::std::Fs, cache_root, cache_size, log, true).unwrap(); + + let (manifest_reader_sender, manifest_reader_receiver) = tokio_mpsc::unbounded_channel(); + let manifest_reader = CacheManifestReader::new(manifest_reader_receiver, sender.clone()); + let deps = PassThroughDeps::new(manifest_reader_sender); + SchedulerTask { scheduler: Scheduler::new(cache), sender, receiver, temp_file_factory, + manifest_reader, + deps, } } @@ -113,7 +188,9 @@ impl SchedulerTask { /// give us a way to return an error, for precisely this reason. pub async fn run(mut self) { sync::channel_reader(self.receiver, |msg| { - self.scheduler.receive_message(&mut PassThroughDeps, msg) + self.scheduler.receive_message(&mut self.deps, msg); + self.manifest_reader + .kick_off_manifest_readings(&self.scheduler.cache); }) .await .unwrap(); diff --git a/crates/maelstrom-broker/src/scheduler_task/scheduler.rs b/crates/maelstrom-broker/src/scheduler_task/scheduler.rs index 2a20d21f..5647ecbe 100644 --- a/crates/maelstrom-broker/src/scheduler_task/scheduler.rs +++ b/crates/maelstrom-broker/src/scheduler_task/scheduler.rs @@ -1,9 +1,7 @@ //! Central processing module for the broker. Receives and sends messages to and from clients and //! workers. -use anyhow::Result; use maelstrom_base::{ - manifest::{ManifestEntry, ManifestEntryData, ManifestFileData}, proto::{ BrokerToClient, BrokerToMonitor, BrokerToWorker, ClientToBroker, MonitorToBroker, WorkerToBroker, @@ -28,9 +26,8 @@ use ref_cast::RefCast; use std::{ cmp::Ordering, collections::{BinaryHeap, HashMap, HashSet}, - error, fmt::{self, Debug, Formatter}, - path::{Path, PathBuf}, + path::PathBuf, time::Duration, }; @@ -50,8 +47,6 @@ pub trait SchedulerDeps { type WorkerSender; type MonitorSender; type WorkerArtifactFetcherSender; - type ManifestError: error::Error + Send + Sync + 'static; - type ManifestIterator: Iterator>; fn send_message_to_client(&mut self, sender: &mut Self::ClientSender, message: BrokerToClient); fn send_message_to_worker(&mut self, sender: &mut Self::WorkerSender, message: BrokerToWorker); fn send_message_to_monitor( @@ -64,8 +59,7 @@ pub trait SchedulerDeps { sender: &mut Self::WorkerArtifactFetcherSender, message: Option<(PathBuf, u64)>, ); - fn read_manifest(&mut self, path: &Path) - -> Result; + fn read_manifest(&mut self, manifest_digest: Sha256Digest, job_id: JobId); } #[derive(Clone, Debug, Eq, Hash, PartialEq, RefCast)] @@ -122,9 +116,6 @@ pub trait SchedulerCache { /// See [`Cache::get_artifact_for_worker`]. fn get_artifact_for_worker(&mut self, digest: &Sha256Digest) -> Option<(PathBuf, u64)>; - - /// See [`Cache::cache_path`]. - fn cache_path(&self, digest: &Sha256Digest) -> PathBuf; } impl SchedulerCache for Cache { @@ -153,10 +144,6 @@ impl SchedulerCache for Cache { self.try_increment_ref_count(BrokerKey::ref_cast(digest)) .map(|size| (cache_path, size)) } - - fn cache_path(&self, digest: &Sha256Digest) -> PathBuf { - self.cache_path(BrokerKey::ref_cast(digest)).into_path_buf() - } } /// The incoming messages, or events, for [`Scheduler`]. @@ -206,6 +193,14 @@ pub enum Message { /// The stats heartbeat task has decided it's time to take another statistics sample. StatisticsHeartbeat, + + /// Read a manifest and found the given digest as a dependency. The job which is responsible + /// for the reading is included. + GotManifestEntry(Sha256Digest, JobId), + + /// Finished reading the manifest either due to reaching EOF or an error and no more dependent + /// digests messages will be sent. + FinishedReadingManifest(Sha256Digest, JobId, anyhow::Result<()>), } impl Debug for Message { @@ -252,6 +247,17 @@ impl Debug for Message f.debug_tuple("StatisticsHeartbeat").finish(), + Message::GotManifestEntry(entry_digest, job_id) => f + .debug_tuple("GotManifestEntry") + .field(entry_digest) + .field(job_id) + .finish(), + Message::FinishedReadingManifest(manifest_digest, job_id, result) => f + .debug_tuple("FinishedReadingManifest") + .field(manifest_digest) + .field(job_id) + .field(result) + .finish(), } } } @@ -301,6 +307,12 @@ impl Scheduler { } Message::DecrementRefcount(digest) => self.receive_decrement_refcount(digest), Message::StatisticsHeartbeat => self.receive_statistics_heartbeat(), + Message::GotManifestEntry(entry_digest, job_id) => { + self.receive_manifest_entry(deps, entry_digest, job_id) + } + Message::FinishedReadingManifest(manifest_digest, job_id, result) => { + self.receive_finished_reading_manifest(deps, manifest_digest, job_id, result) + } } } } @@ -340,6 +352,7 @@ struct Job { spec: JobSpec, acquired_artifacts: HashSet, missing_artifacts: HashMap, + manifests_being_read: HashSet, } impl Job { @@ -348,8 +361,13 @@ impl Job { spec, acquired_artifacts: Default::default(), missing_artifacts: Default::default(), + manifests_being_read: Default::default(), } } + + fn have_all_artifacts(&self) -> bool { + self.missing_artifacts.is_empty() && self.manifests_being_read.is_empty() + } } struct Client { @@ -451,7 +469,7 @@ impl Ord for QueuedJob { } pub struct Scheduler { - cache: CacheT, + pub(crate) cache: CacheT, clients: ClientMap, workers: WorkerMap, monitors: HashMap, @@ -542,8 +560,10 @@ impl Scheduler { .insert(digest.clone()) .assert_is_true(); if is_manifest.is_manifest() { - self.ensure_manifest_artifacts_for_job(deps, jid, digest) - .unwrap(); + deps.read_manifest(digest.clone(), jid); + job.manifests_being_read + .insert(digest.clone()) + .assert_is_true(); } } GetArtifact::Wait => { @@ -584,8 +604,7 @@ impl Scheduler { let client = self.clients.0.get_mut(&jid.cid).unwrap(); let job = client.jobs.get(&jid.cjid).unwrap(); - let have_all_artifacts = job.missing_artifacts.is_empty(); - if have_all_artifacts { + if job.have_all_artifacts() { self.queued_jobs .push(QueuedJob::new(jid, priority, estimated_duration)); self.possibly_start_jobs(deps, HashSet::from_iter([jid])); @@ -713,21 +732,6 @@ impl Scheduler { deps.send_message_to_monitor(self.monitors.get_mut(&mid).unwrap(), resp); } - fn ensure_manifest_artifacts_for_job( - &mut self, - deps: &mut DepsT, - jid: JobId, - digest: Sha256Digest, - ) -> Result<()> { - for entry in deps.read_manifest(&self.cache.cache_path(&digest))? { - let entry = entry?; - if let ManifestEntryData::File(ManifestFileData::Digest(digest)) = entry.data { - self.ensure_artifact_for_job(deps, digest, jid, IsManifest::NotManifest); - } - } - Ok(()) - } - fn receive_got_artifact( &mut self, deps: &mut DepsT, @@ -744,12 +748,14 @@ impl Scheduler { let is_manifest = job.missing_artifacts.remove(&digest.clone()).unwrap(); if is_manifest.is_manifest() { - self.ensure_manifest_artifacts_for_job(deps, jid, digest.clone()) - .unwrap(); + deps.read_manifest(digest.clone(), jid); + job.manifests_being_read + .insert(digest.clone()) + .assert_is_true(); } let job = self.clients.job_from_jid(jid); - if job.missing_artifacts.is_empty() { + if job.have_all_artifacts() { self.queued_jobs.push(QueuedJob::new( jid, job.spec.priority, @@ -784,7 +790,7 @@ impl Scheduler { let mut counts = JobStateCounts::default(); counts[JobState::WaitingForArtifacts] = jobs .values() - .filter(|job| !job.missing_artifacts.is_empty()) + .filter(|job| !job.have_all_artifacts()) .count() as u64; counts[JobState::Pending] = self @@ -817,6 +823,43 @@ impl Scheduler { }; self.job_statistics.insert(sample); } + + fn receive_manifest_entry( + &mut self, + deps: &mut DepsT, + entry_digest: Sha256Digest, + job_id: JobId, + ) { + self.ensure_artifact_for_job(deps, entry_digest, job_id, IsManifest::NotManifest); + } + + fn receive_finished_reading_manifest( + &mut self, + deps: &mut DepsT, + manifest_digest: Sha256Digest, + job_id: JobId, + result: anyhow::Result<()>, + ) { + // It would be better to not crash... + result.expect("failed reading a manifest"); + + let client = self.clients.0.get_mut(&job_id.cid).unwrap(); + let job = client.jobs.get_mut(&job_id.cjid).unwrap(); + job.manifests_being_read + .remove(&manifest_digest) + .assert_is_true(); + + let mut just_enqueued = HashSet::default(); + if job.have_all_artifacts() { + self.queued_jobs.push(QueuedJob::new( + job_id, + job.spec.priority, + job.spec.estimated_duration, + )); + just_enqueued.insert(job_id); + } + self.possibly_start_jobs(deps, just_enqueued); + } } /* _ _ @@ -832,15 +875,11 @@ mod tests { use super::{Message::*, *}; use enum_map::enum_map; use itertools::Itertools; - use maelstrom_base::{ - manifest::{ManifestEntry, ManifestEntryMetadata, Mode, UnixTimestamp}, - proto::BrokerToWorker::{self, *}, - }; + use maelstrom_base::proto::BrokerToWorker::{self, *}; use maelstrom_test::*; use maelstrom_util::cache::fs::test; use maplit::hashmap; - use std::{cell::RefCell, error, rc::Rc, str::FromStr as _}; - use strum::Display; + use std::{cell::RefCell, rc::Rc}; #[derive(Clone, Debug, PartialEq)] enum TestMessage { @@ -853,6 +892,7 @@ mod tests { CacheDecrementRefcount(Sha256Digest), CacheClientDisconnected(ClientId), CacheGetArtifactForWorker(Sha256Digest), + ReadManifest(Sha256Digest, JobId), } use TestMessage::*; @@ -869,7 +909,6 @@ mod tests { got_artifact_returns: HashMap>>, #[allow(clippy::type_complexity)] get_artifact_for_worker_returns: HashMap>>, - read_manifest_returns: HashMap>, } impl SchedulerCache for Rc> { @@ -919,24 +958,13 @@ mod tests { .unwrap() .remove(0) } - - fn cache_path(&self, digest: &Sha256Digest) -> PathBuf { - Path::new("/").join(digest.to_string()) - } } - #[derive(Debug, Display)] - enum NoError {} - - impl error::Error for NoError {} - impl SchedulerDeps for Rc> { type ClientSender = TestClientSender; type WorkerSender = TestWorkerSender; type MonitorSender = TestMonitorSender; type WorkerArtifactFetcherSender = TestWorkerArtifactFetcherSender; - type ManifestError = NoError; - type ManifestIterator = > as IntoIterator>::IntoIter; fn send_message_to_client( &mut self, @@ -974,18 +1002,10 @@ mod tests { .push(ToWorkerArtifactFetcher(sender.0, message)); } - fn read_manifest(&mut self, path: &Path) -> Result { - let digest = - Sha256Digest::from_str(path.file_name().unwrap().to_str().unwrap()).unwrap(); - Ok(self - .borrow_mut() - .read_manifest_returns - .get(&digest) - .unwrap() - .iter() - .map(|e| Ok(e.clone())) - .collect_vec() - .into_iter()) + fn read_manifest(&mut self, manifest_digest: Sha256Digest, job_id: JobId) { + self.borrow_mut() + .messages + .push(ReadManifest(manifest_digest, job_id)); } } @@ -1006,11 +1026,10 @@ mod tests { impl Fixture { #[allow(clippy::type_complexity)] - fn new( + fn new( get_artifact_returns: [((JobId, Sha256Digest), Vec); L], got_artifact_returns: [(Sha256Digest, Vec>); M], get_artifact_for_worker_returns: [(Sha256Digest, Vec>); N], - read_manifest_returns: [(Sha256Digest, Vec); O], ) -> Self { let result = Self::default(); result.test_state.borrow_mut().get_artifact_returns = @@ -1021,8 +1040,6 @@ mod tests { .test_state .borrow_mut() .get_artifact_for_worker_returns = HashMap::from(get_artifact_for_worker_returns); - result.test_state.borrow_mut().read_manifest_returns = - HashMap::from(read_manifest_returns); result } @@ -1085,7 +1102,7 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![1], spec![1, Tar])) => { @@ -1158,7 +1175,7 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; WorkerConnected(wid![1], 2, worker_sender![1]) => {}; @@ -1192,7 +1209,7 @@ mod tests { ((jid![1, 8], digest![8]), vec![GetArtifact::Success]), ((jid![1, 9], digest![9]), vec![GetArtifact::Success]), ((jid![1, 10], digest![10]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 2, worker_sender![1]) => {}; WorkerConnected(wid![2], 2, worker_sender![2]) => {}; @@ -1279,7 +1296,7 @@ mod tests { ((jid![1, 4], digest![4]), vec![GetArtifact::Success]), ((jid![1, 5], digest![5]), vec![GetArtifact::Success]), ((jid![1, 6], digest![6]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; WorkerConnected(wid![2], 1, worker_sender![2]) => {}; @@ -1344,7 +1361,7 @@ mod tests { ((jid![1, 4], digest![4]), vec![GetArtifact::Success]), ((jid![1, 5], digest![5]), vec![GetArtifact::Success]), ((jid![1, 6], digest![6]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; @@ -1395,7 +1412,7 @@ mod tests { ((jid![1, 3], digest![3]), vec![GetArtifact::Success]), ((jid![1, 4], digest![4]), vec![GetArtifact::Success]), ((jid![1, 5], digest![5]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; WorkerConnected(wid![2], 1, worker_sender![2]) => {}; @@ -1447,7 +1464,7 @@ mod tests { ((jid![1, 2], digest![2]), vec![GetArtifact::Success]), ((jid![1, 3], digest![3]), vec![GetArtifact::Success]), ((jid![1, 4], digest![4]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1501,7 +1518,7 @@ mod tests { ((jid![1, 2], digest![2]), vec![GetArtifact::Success]), ((jid![1, 3], digest![3]), vec![GetArtifact::Success]), ((jid![1, 4], digest![4]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1542,7 +1559,7 @@ mod tests { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), ((jid![1, 2], digest![2]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; @@ -1577,7 +1594,7 @@ mod tests { { Fixture::new([ ((jid!(1, 1), digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1601,7 +1618,7 @@ mod tests { ((jid!(1, 1), digest![1]), vec![GetArtifact::Success]), ((jid!(2, 1), digest![2]), vec![GetArtifact::Success]), ((jid!(1, 2), digest![3]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; WorkerConnected(wid![2], 1, worker_sender![2]) => {}; @@ -1638,7 +1655,7 @@ mod tests { ((jid!(1, 2), digest![2]), vec![GetArtifact::Success]), ((jid!(1, 3), digest![3]), vec![GetArtifact::Success]), ((jid!(2, 1), digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1685,7 +1702,7 @@ mod tests { ((jid!(2, 2), digest![2]), vec![GetArtifact::Success]), ((jid!(2, 3), digest![3]), vec![GetArtifact::Success]), ((jid!(2, 4), digest![4]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; WorkerConnected(wid![2], 1, worker_sender![2]) => {}; @@ -1744,7 +1761,7 @@ mod tests { ((jid![1, 2], digest![42]), vec![GetArtifact::Success]), ((jid![1, 2], digest![43]), vec![GetArtifact::Wait]), ((jid![1, 2], digest![44]), vec![GetArtifact::Get]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1773,7 +1790,7 @@ mod tests { ((jid![1, 2], digest![42]), vec![GetArtifact::Success]), ((jid![1, 2], digest![43]), vec![GetArtifact::Success]), ((jid![1, 2], digest![44]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1806,7 +1823,7 @@ mod tests { ], [ (digest![43], vec![vec![jid![1, 2]]]), (digest![44], vec![vec![jid![1, 2]]]), - ], [], []) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1844,7 +1861,7 @@ mod tests { { Fixture::new([ ((jid![1, 2], digest![42]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1870,7 +1887,7 @@ mod tests { ((jid![1, 2], digest![42]), vec![GetArtifact::Get]), ], [ (digest![42], vec![vec![jid![1, 2]]]), - ], [], []) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1904,17 +1921,7 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ManifestEntry { - path: "foobar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1927,10 +1934,16 @@ mod tests { GotArtifact(digest![42], "/z/tmp/foo".into()) => { CacheGotArtifact(digest![42], "/z/tmp/foo".into()), + ReadManifest(digest![42], jid![1, 2]), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), ToClient(cid![1], BrokerToClient::TransferArtifact(digest![43])), }; + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => {}; + GotArtifact(digest![43], "/z/tmp/bar".into()) => { CacheGotArtifact(digest![43], "/z/tmp/bar".into()), ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), @@ -1953,28 +1966,7 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ - ManifestEntry { - path: "foo.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }, - ManifestEntry { - path: "bar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - } - ]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1987,10 +1979,18 @@ mod tests { GotArtifact(digest![42], "/z/tmp/foo".into()) => { CacheGotArtifact(digest![42], "/z/tmp/foo".into()), + ReadManifest(digest![42], jid![1, 2]), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), ToClient(cid![1], BrokerToClient::TransferArtifact(digest![43])), }; + GotManifestEntry(digest![43], jid![1, 2]) => {}; + + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => {}; + GotArtifact(digest![43], "/z/tmp/bar".into()) => { CacheGotArtifact(digest![43], "/z/tmp/bar".into()), ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), @@ -2013,28 +2013,27 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ManifestEntry { - path: "foobar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![2], spec![1, [(42, Manifest)]])) => { CacheGetArtifact(jid![1, 2], digest![42]), + ReadManifest(digest![42], jid![1, 2]), + ToClient( + cid![1], + BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers) + ), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), ToClient(cid![1], BrokerToClient::TransferArtifact(digest![43])), - ToClient(cid![1], BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers)), }; + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => {}; + GotArtifact(digest![43], "/z/tmp/bar".into()) => { CacheGotArtifact(digest![43], "/z/tmp/bar".into()), ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), @@ -2057,28 +2056,27 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ManifestEntry { - path: "foobar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![2], spec![1, [(42, Manifest)]])) => { CacheGetArtifact(jid![1, 2], digest![42]), + ReadManifest(digest![42], jid![1, 2]), + ToClient( + cid![1], + BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers) + ), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), ToClient(cid![1], BrokerToClient::TransferArtifact(digest![43])), - ToClient(cid![1], BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers)), }; + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => {}; + GotArtifact(digest![43], "/z/tmp/bar".into()) => { CacheGotArtifact(digest![43], "/z/tmp/bar".into()), ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), @@ -2101,24 +2099,25 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ManifestEntry { - path: "foobar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![2], spec![1, [(42, Manifest)]])) => { CacheGetArtifact(jid![1, 2], digest![42]), + ReadManifest(digest![42], jid![1, 2]), + ToClient( + cid![1], + BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers) + ), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), + }; + + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => { ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), }; @@ -2139,35 +2138,25 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ - ManifestEntry { - path: "foo.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }, - ManifestEntry { - path: "bar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - } - ]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![2], spec![1, [(42, Manifest)]])) => { CacheGetArtifact(jid![1, 2], digest![42]), + ReadManifest(digest![42], jid![1, 2]), + ToClient( + cid![1], + BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers) + ), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), + }; + + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => { ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), }; @@ -2187,7 +2176,7 @@ mod tests { digest![42], vec![Some(("/a/good/path".into(), 42))], ), - ], []) + ]) }, GetArtifactForWorker(digest![42], worker_artifact_fetcher_sender![1]) => { CacheGetArtifactForWorker(digest![42]), @@ -2203,7 +2192,7 @@ mod tests { digest![42], vec![Some(("/a/good/path".into(), 42))] ), - ], []) + ]) }, GetArtifactForWorker(digest![42], worker_artifact_fetcher_sender![1]) => { CacheGetArtifactForWorker(digest![42]), @@ -2223,14 +2212,17 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![42]), vec![GetArtifact::Wait]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; WorkerConnected(wid![1], 2, worker_sender![1]) => {}; MonitorConnected(mid![1], monitor_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![1], spec![1, [(42, Tar)]])) => { CacheGetArtifact(jid![1, 1], digest![42]), - ToClient(cid![1], BrokerToClient::JobStatusUpdate(cjid![1], JobBrokerStatus::WaitingForLayers)), + ToClient( + cid![1], + BrokerToClient::JobStatusUpdate(cjid![1], JobBrokerStatus::WaitingForLayers) + ), }; StatisticsHeartbeat => {}; FromMonitor(mid![1], MonitorToBroker::StatisticsRequest) => { @@ -2257,7 +2249,7 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; MonitorConnected(mid![1], monitor_sender![1]) => {}; @@ -2288,7 +2280,7 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; WorkerConnected(wid![1], 2, worker_sender![1]) => {}; @@ -2322,7 +2314,7 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; WorkerConnected(wid![1], 2, worker_sender![1]) => {}; @@ -2360,7 +2352,7 @@ mod tests { { Fixture::new([ ((jid![1, 3], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; WorkerConnected(wid![2], 1, worker_sender![2]) => {}; @@ -2409,7 +2401,7 @@ mod tests { script_test! { drop_unknown_job_status_update, { - Fixture::new([], [], [], []) + Fixture::new([], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; FromWorker(