diff --git a/crates/maelstrom-broker/src/scheduler_task.rs b/crates/maelstrom-broker/src/scheduler_task.rs index 8c4be5e7..bc342568 100644 --- a/crates/maelstrom-broker/src/scheduler_task.rs +++ b/crates/maelstrom-broker/src/scheduler_task.rs @@ -2,25 +2,38 @@ mod scheduler; pub use maelstrom_util::cache::CacheDir; -use maelstrom_base::proto::{BrokerToClient, BrokerToMonitor, BrokerToWorker}; +use maelstrom_base::{ + manifest::{ManifestEntryData, ManifestFileData}, + proto::{BrokerToClient, BrokerToMonitor, BrokerToWorker}, + JobId, Sha256Digest, +}; use maelstrom_util::{ + async_fs, cache::{self, Cache, TempFileFactory}, config::common::CacheSize, - manifest::ManifestReader, + manifest::AsyncManifestReader, root::RootBuf, sync, }; +use ref_cast::RefCast; use scheduler::{Message, Scheduler, SchedulerDeps}; use slog::Logger; -use std::{ - fs, io, - path::{Path, PathBuf}, - sync::mpsc as std_mpsc, -}; +use std::{path::PathBuf, sync::mpsc as std_mpsc}; use tokio::sync::mpsc as tokio_mpsc; +use tokio::task::JoinSet; #[derive(Debug)] -pub struct PassThroughDeps; +pub struct PassThroughDeps { + manifest_reader_sender: tokio_mpsc::UnboundedSender<(Sha256Digest, JobId)>, +} + +impl PassThroughDeps { + fn new(manifest_reader_sender: tokio_mpsc::UnboundedSender<(Sha256Digest, JobId)>) -> Self { + Self { + manifest_reader_sender, + } + } +} /// The production implementation of [SchedulerDeps]. This implementation just hands the /// message to the provided sender. @@ -29,8 +42,6 @@ impl SchedulerDeps for PassThroughDeps { type WorkerSender = tokio_mpsc::UnboundedSender; type MonitorSender = tokio_mpsc::UnboundedSender; type WorkerArtifactFetcherSender = std_mpsc::Sender>; - type ManifestError = io::Error; - type ManifestIterator = ManifestReader; fn send_message_to_client(&mut self, sender: &mut Self::ClientSender, message: BrokerToClient) { sender.send(message).ok(); @@ -56,8 +67,64 @@ impl SchedulerDeps for PassThroughDeps { sender.send(message).ok(); } - fn read_manifest(&mut self, path: &Path) -> io::Result> { - ManifestReader::new(fs::File::open(path)?) + fn read_manifest(&mut self, manifest_digest: Sha256Digest, job_id: JobId) { + self.manifest_reader_sender + .send((manifest_digest, job_id)) + .ok(); + } +} + +#[derive(Debug)] +struct CacheManifestReader { + tasks: JoinSet<()>, + receiver: tokio_mpsc::UnboundedReceiver<(Sha256Digest, JobId)>, + sender: SchedulerSender, +} + +async fn read_manifest_from_path( + sender: SchedulerSender, + manifest_path: PathBuf, + job_id: JobId, +) -> anyhow::Result<()> { + let fs = async_fs::Fs::new(); + let mut reader = AsyncManifestReader::new(fs.open_file(manifest_path).await?).await?; + while let Some(entry) = reader.next().await? { + if let ManifestEntryData::File(ManifestFileData::Digest(digest)) = entry.data { + sender.send(Message::GotManifestEntry(digest, job_id)).ok(); + } + } + Ok(()) +} + +impl CacheManifestReader { + fn new( + receiver: tokio_mpsc::UnboundedReceiver<(Sha256Digest, JobId)>, + sender: SchedulerSender, + ) -> Self { + Self { + tasks: JoinSet::new(), + receiver, + sender, + } + } + + fn kick_off_manifest_readings(&mut self, cache: &TaskCache) { + while let Ok((manifest_digest, job_id)) = self.receiver.try_recv() { + let sender = self.sender.clone(); + let manifest_path = cache + .cache_path(scheduler::BrokerKey::ref_cast(&manifest_digest)) + .into_path_buf(); + self.tasks.spawn(async move { + let result = read_manifest_from_path(sender.clone(), manifest_path, job_id).await; + sender + .send(Message::FinishedReadingManifest( + manifest_digest, + job_id, + result, + )) + .ok(); + }); + } } } @@ -69,14 +136,15 @@ pub type SchedulerMessage = Message; /// This type is used often enough to warrant an alias. pub type SchedulerSender = tokio_mpsc::UnboundedSender; +type TaskCache = Cache; + pub struct SchedulerTask { - scheduler: Scheduler< - Cache, - PassThroughDeps, - >, + scheduler: Scheduler, sender: SchedulerSender, receiver: tokio_mpsc::UnboundedReceiver, temp_file_factory: TempFileFactory, + manifest_reader: CacheManifestReader, + deps: PassThroughDeps, } impl SchedulerTask { @@ -84,11 +152,18 @@ impl SchedulerTask { let (sender, receiver) = tokio_mpsc::unbounded_channel(); let (cache, temp_file_factory) = Cache::new(cache::fs::std::Fs, cache_root, cache_size, log, true).unwrap(); + + let (manifest_reader_sender, manifest_reader_receiver) = tokio_mpsc::unbounded_channel(); + let manifest_reader = CacheManifestReader::new(manifest_reader_receiver, sender.clone()); + let deps = PassThroughDeps::new(manifest_reader_sender); + SchedulerTask { scheduler: Scheduler::new(cache), sender, receiver, temp_file_factory, + manifest_reader, + deps, } } @@ -113,7 +188,9 @@ impl SchedulerTask { /// give us a way to return an error, for precisely this reason. pub async fn run(mut self) { sync::channel_reader(self.receiver, |msg| { - self.scheduler.receive_message(&mut PassThroughDeps, msg) + self.scheduler.receive_message(&mut self.deps, msg); + self.manifest_reader + .kick_off_manifest_readings(&self.scheduler.cache); }) .await .unwrap(); diff --git a/crates/maelstrom-broker/src/scheduler_task/scheduler.rs b/crates/maelstrom-broker/src/scheduler_task/scheduler.rs index 2a20d21f..5647ecbe 100644 --- a/crates/maelstrom-broker/src/scheduler_task/scheduler.rs +++ b/crates/maelstrom-broker/src/scheduler_task/scheduler.rs @@ -1,9 +1,7 @@ //! Central processing module for the broker. Receives and sends messages to and from clients and //! workers. -use anyhow::Result; use maelstrom_base::{ - manifest::{ManifestEntry, ManifestEntryData, ManifestFileData}, proto::{ BrokerToClient, BrokerToMonitor, BrokerToWorker, ClientToBroker, MonitorToBroker, WorkerToBroker, @@ -28,9 +26,8 @@ use ref_cast::RefCast; use std::{ cmp::Ordering, collections::{BinaryHeap, HashMap, HashSet}, - error, fmt::{self, Debug, Formatter}, - path::{Path, PathBuf}, + path::PathBuf, time::Duration, }; @@ -50,8 +47,6 @@ pub trait SchedulerDeps { type WorkerSender; type MonitorSender; type WorkerArtifactFetcherSender; - type ManifestError: error::Error + Send + Sync + 'static; - type ManifestIterator: Iterator>; fn send_message_to_client(&mut self, sender: &mut Self::ClientSender, message: BrokerToClient); fn send_message_to_worker(&mut self, sender: &mut Self::WorkerSender, message: BrokerToWorker); fn send_message_to_monitor( @@ -64,8 +59,7 @@ pub trait SchedulerDeps { sender: &mut Self::WorkerArtifactFetcherSender, message: Option<(PathBuf, u64)>, ); - fn read_manifest(&mut self, path: &Path) - -> Result; + fn read_manifest(&mut self, manifest_digest: Sha256Digest, job_id: JobId); } #[derive(Clone, Debug, Eq, Hash, PartialEq, RefCast)] @@ -122,9 +116,6 @@ pub trait SchedulerCache { /// See [`Cache::get_artifact_for_worker`]. fn get_artifact_for_worker(&mut self, digest: &Sha256Digest) -> Option<(PathBuf, u64)>; - - /// See [`Cache::cache_path`]. - fn cache_path(&self, digest: &Sha256Digest) -> PathBuf; } impl SchedulerCache for Cache { @@ -153,10 +144,6 @@ impl SchedulerCache for Cache { self.try_increment_ref_count(BrokerKey::ref_cast(digest)) .map(|size| (cache_path, size)) } - - fn cache_path(&self, digest: &Sha256Digest) -> PathBuf { - self.cache_path(BrokerKey::ref_cast(digest)).into_path_buf() - } } /// The incoming messages, or events, for [`Scheduler`]. @@ -206,6 +193,14 @@ pub enum Message { /// The stats heartbeat task has decided it's time to take another statistics sample. StatisticsHeartbeat, + + /// Read a manifest and found the given digest as a dependency. The job which is responsible + /// for the reading is included. + GotManifestEntry(Sha256Digest, JobId), + + /// Finished reading the manifest either due to reaching EOF or an error and no more dependent + /// digests messages will be sent. + FinishedReadingManifest(Sha256Digest, JobId, anyhow::Result<()>), } impl Debug for Message { @@ -252,6 +247,17 @@ impl Debug for Message f.debug_tuple("StatisticsHeartbeat").finish(), + Message::GotManifestEntry(entry_digest, job_id) => f + .debug_tuple("GotManifestEntry") + .field(entry_digest) + .field(job_id) + .finish(), + Message::FinishedReadingManifest(manifest_digest, job_id, result) => f + .debug_tuple("FinishedReadingManifest") + .field(manifest_digest) + .field(job_id) + .field(result) + .finish(), } } } @@ -301,6 +307,12 @@ impl Scheduler { } Message::DecrementRefcount(digest) => self.receive_decrement_refcount(digest), Message::StatisticsHeartbeat => self.receive_statistics_heartbeat(), + Message::GotManifestEntry(entry_digest, job_id) => { + self.receive_manifest_entry(deps, entry_digest, job_id) + } + Message::FinishedReadingManifest(manifest_digest, job_id, result) => { + self.receive_finished_reading_manifest(deps, manifest_digest, job_id, result) + } } } } @@ -340,6 +352,7 @@ struct Job { spec: JobSpec, acquired_artifacts: HashSet, missing_artifacts: HashMap, + manifests_being_read: HashSet, } impl Job { @@ -348,8 +361,13 @@ impl Job { spec, acquired_artifacts: Default::default(), missing_artifacts: Default::default(), + manifests_being_read: Default::default(), } } + + fn have_all_artifacts(&self) -> bool { + self.missing_artifacts.is_empty() && self.manifests_being_read.is_empty() + } } struct Client { @@ -451,7 +469,7 @@ impl Ord for QueuedJob { } pub struct Scheduler { - cache: CacheT, + pub(crate) cache: CacheT, clients: ClientMap, workers: WorkerMap, monitors: HashMap, @@ -542,8 +560,10 @@ impl Scheduler { .insert(digest.clone()) .assert_is_true(); if is_manifest.is_manifest() { - self.ensure_manifest_artifacts_for_job(deps, jid, digest) - .unwrap(); + deps.read_manifest(digest.clone(), jid); + job.manifests_being_read + .insert(digest.clone()) + .assert_is_true(); } } GetArtifact::Wait => { @@ -584,8 +604,7 @@ impl Scheduler { let client = self.clients.0.get_mut(&jid.cid).unwrap(); let job = client.jobs.get(&jid.cjid).unwrap(); - let have_all_artifacts = job.missing_artifacts.is_empty(); - if have_all_artifacts { + if job.have_all_artifacts() { self.queued_jobs .push(QueuedJob::new(jid, priority, estimated_duration)); self.possibly_start_jobs(deps, HashSet::from_iter([jid])); @@ -713,21 +732,6 @@ impl Scheduler { deps.send_message_to_monitor(self.monitors.get_mut(&mid).unwrap(), resp); } - fn ensure_manifest_artifacts_for_job( - &mut self, - deps: &mut DepsT, - jid: JobId, - digest: Sha256Digest, - ) -> Result<()> { - for entry in deps.read_manifest(&self.cache.cache_path(&digest))? { - let entry = entry?; - if let ManifestEntryData::File(ManifestFileData::Digest(digest)) = entry.data { - self.ensure_artifact_for_job(deps, digest, jid, IsManifest::NotManifest); - } - } - Ok(()) - } - fn receive_got_artifact( &mut self, deps: &mut DepsT, @@ -744,12 +748,14 @@ impl Scheduler { let is_manifest = job.missing_artifacts.remove(&digest.clone()).unwrap(); if is_manifest.is_manifest() { - self.ensure_manifest_artifacts_for_job(deps, jid, digest.clone()) - .unwrap(); + deps.read_manifest(digest.clone(), jid); + job.manifests_being_read + .insert(digest.clone()) + .assert_is_true(); } let job = self.clients.job_from_jid(jid); - if job.missing_artifacts.is_empty() { + if job.have_all_artifacts() { self.queued_jobs.push(QueuedJob::new( jid, job.spec.priority, @@ -784,7 +790,7 @@ impl Scheduler { let mut counts = JobStateCounts::default(); counts[JobState::WaitingForArtifacts] = jobs .values() - .filter(|job| !job.missing_artifacts.is_empty()) + .filter(|job| !job.have_all_artifacts()) .count() as u64; counts[JobState::Pending] = self @@ -817,6 +823,43 @@ impl Scheduler { }; self.job_statistics.insert(sample); } + + fn receive_manifest_entry( + &mut self, + deps: &mut DepsT, + entry_digest: Sha256Digest, + job_id: JobId, + ) { + self.ensure_artifact_for_job(deps, entry_digest, job_id, IsManifest::NotManifest); + } + + fn receive_finished_reading_manifest( + &mut self, + deps: &mut DepsT, + manifest_digest: Sha256Digest, + job_id: JobId, + result: anyhow::Result<()>, + ) { + // It would be better to not crash... + result.expect("failed reading a manifest"); + + let client = self.clients.0.get_mut(&job_id.cid).unwrap(); + let job = client.jobs.get_mut(&job_id.cjid).unwrap(); + job.manifests_being_read + .remove(&manifest_digest) + .assert_is_true(); + + let mut just_enqueued = HashSet::default(); + if job.have_all_artifacts() { + self.queued_jobs.push(QueuedJob::new( + job_id, + job.spec.priority, + job.spec.estimated_duration, + )); + just_enqueued.insert(job_id); + } + self.possibly_start_jobs(deps, just_enqueued); + } } /* _ _ @@ -832,15 +875,11 @@ mod tests { use super::{Message::*, *}; use enum_map::enum_map; use itertools::Itertools; - use maelstrom_base::{ - manifest::{ManifestEntry, ManifestEntryMetadata, Mode, UnixTimestamp}, - proto::BrokerToWorker::{self, *}, - }; + use maelstrom_base::proto::BrokerToWorker::{self, *}; use maelstrom_test::*; use maelstrom_util::cache::fs::test; use maplit::hashmap; - use std::{cell::RefCell, error, rc::Rc, str::FromStr as _}; - use strum::Display; + use std::{cell::RefCell, rc::Rc}; #[derive(Clone, Debug, PartialEq)] enum TestMessage { @@ -853,6 +892,7 @@ mod tests { CacheDecrementRefcount(Sha256Digest), CacheClientDisconnected(ClientId), CacheGetArtifactForWorker(Sha256Digest), + ReadManifest(Sha256Digest, JobId), } use TestMessage::*; @@ -869,7 +909,6 @@ mod tests { got_artifact_returns: HashMap>>, #[allow(clippy::type_complexity)] get_artifact_for_worker_returns: HashMap>>, - read_manifest_returns: HashMap>, } impl SchedulerCache for Rc> { @@ -919,24 +958,13 @@ mod tests { .unwrap() .remove(0) } - - fn cache_path(&self, digest: &Sha256Digest) -> PathBuf { - Path::new("/").join(digest.to_string()) - } } - #[derive(Debug, Display)] - enum NoError {} - - impl error::Error for NoError {} - impl SchedulerDeps for Rc> { type ClientSender = TestClientSender; type WorkerSender = TestWorkerSender; type MonitorSender = TestMonitorSender; type WorkerArtifactFetcherSender = TestWorkerArtifactFetcherSender; - type ManifestError = NoError; - type ManifestIterator = > as IntoIterator>::IntoIter; fn send_message_to_client( &mut self, @@ -974,18 +1002,10 @@ mod tests { .push(ToWorkerArtifactFetcher(sender.0, message)); } - fn read_manifest(&mut self, path: &Path) -> Result { - let digest = - Sha256Digest::from_str(path.file_name().unwrap().to_str().unwrap()).unwrap(); - Ok(self - .borrow_mut() - .read_manifest_returns - .get(&digest) - .unwrap() - .iter() - .map(|e| Ok(e.clone())) - .collect_vec() - .into_iter()) + fn read_manifest(&mut self, manifest_digest: Sha256Digest, job_id: JobId) { + self.borrow_mut() + .messages + .push(ReadManifest(manifest_digest, job_id)); } } @@ -1006,11 +1026,10 @@ mod tests { impl Fixture { #[allow(clippy::type_complexity)] - fn new( + fn new( get_artifact_returns: [((JobId, Sha256Digest), Vec); L], got_artifact_returns: [(Sha256Digest, Vec>); M], get_artifact_for_worker_returns: [(Sha256Digest, Vec>); N], - read_manifest_returns: [(Sha256Digest, Vec); O], ) -> Self { let result = Self::default(); result.test_state.borrow_mut().get_artifact_returns = @@ -1021,8 +1040,6 @@ mod tests { .test_state .borrow_mut() .get_artifact_for_worker_returns = HashMap::from(get_artifact_for_worker_returns); - result.test_state.borrow_mut().read_manifest_returns = - HashMap::from(read_manifest_returns); result } @@ -1085,7 +1102,7 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![1], spec![1, Tar])) => { @@ -1158,7 +1175,7 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; WorkerConnected(wid![1], 2, worker_sender![1]) => {}; @@ -1192,7 +1209,7 @@ mod tests { ((jid![1, 8], digest![8]), vec![GetArtifact::Success]), ((jid![1, 9], digest![9]), vec![GetArtifact::Success]), ((jid![1, 10], digest![10]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 2, worker_sender![1]) => {}; WorkerConnected(wid![2], 2, worker_sender![2]) => {}; @@ -1279,7 +1296,7 @@ mod tests { ((jid![1, 4], digest![4]), vec![GetArtifact::Success]), ((jid![1, 5], digest![5]), vec![GetArtifact::Success]), ((jid![1, 6], digest![6]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; WorkerConnected(wid![2], 1, worker_sender![2]) => {}; @@ -1344,7 +1361,7 @@ mod tests { ((jid![1, 4], digest![4]), vec![GetArtifact::Success]), ((jid![1, 5], digest![5]), vec![GetArtifact::Success]), ((jid![1, 6], digest![6]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; @@ -1395,7 +1412,7 @@ mod tests { ((jid![1, 3], digest![3]), vec![GetArtifact::Success]), ((jid![1, 4], digest![4]), vec![GetArtifact::Success]), ((jid![1, 5], digest![5]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; WorkerConnected(wid![2], 1, worker_sender![2]) => {}; @@ -1447,7 +1464,7 @@ mod tests { ((jid![1, 2], digest![2]), vec![GetArtifact::Success]), ((jid![1, 3], digest![3]), vec![GetArtifact::Success]), ((jid![1, 4], digest![4]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1501,7 +1518,7 @@ mod tests { ((jid![1, 2], digest![2]), vec![GetArtifact::Success]), ((jid![1, 3], digest![3]), vec![GetArtifact::Success]), ((jid![1, 4], digest![4]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1542,7 +1559,7 @@ mod tests { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), ((jid![1, 2], digest![2]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; @@ -1577,7 +1594,7 @@ mod tests { { Fixture::new([ ((jid!(1, 1), digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1601,7 +1618,7 @@ mod tests { ((jid!(1, 1), digest![1]), vec![GetArtifact::Success]), ((jid!(2, 1), digest![2]), vec![GetArtifact::Success]), ((jid!(1, 2), digest![3]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; WorkerConnected(wid![2], 1, worker_sender![2]) => {}; @@ -1638,7 +1655,7 @@ mod tests { ((jid!(1, 2), digest![2]), vec![GetArtifact::Success]), ((jid!(1, 3), digest![3]), vec![GetArtifact::Success]), ((jid!(2, 1), digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1685,7 +1702,7 @@ mod tests { ((jid!(2, 2), digest![2]), vec![GetArtifact::Success]), ((jid!(2, 3), digest![3]), vec![GetArtifact::Success]), ((jid!(2, 4), digest![4]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; WorkerConnected(wid![2], 1, worker_sender![2]) => {}; @@ -1744,7 +1761,7 @@ mod tests { ((jid![1, 2], digest![42]), vec![GetArtifact::Success]), ((jid![1, 2], digest![43]), vec![GetArtifact::Wait]), ((jid![1, 2], digest![44]), vec![GetArtifact::Get]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1773,7 +1790,7 @@ mod tests { ((jid![1, 2], digest![42]), vec![GetArtifact::Success]), ((jid![1, 2], digest![43]), vec![GetArtifact::Success]), ((jid![1, 2], digest![44]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1806,7 +1823,7 @@ mod tests { ], [ (digest![43], vec![vec![jid![1, 2]]]), (digest![44], vec![vec![jid![1, 2]]]), - ], [], []) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1844,7 +1861,7 @@ mod tests { { Fixture::new([ ((jid![1, 2], digest![42]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1870,7 +1887,7 @@ mod tests { ((jid![1, 2], digest![42]), vec![GetArtifact::Get]), ], [ (digest![42], vec![vec![jid![1, 2]]]), - ], [], []) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1904,17 +1921,7 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ManifestEntry { - path: "foobar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1927,10 +1934,16 @@ mod tests { GotArtifact(digest![42], "/z/tmp/foo".into()) => { CacheGotArtifact(digest![42], "/z/tmp/foo".into()), + ReadManifest(digest![42], jid![1, 2]), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), ToClient(cid![1], BrokerToClient::TransferArtifact(digest![43])), }; + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => {}; + GotArtifact(digest![43], "/z/tmp/bar".into()) => { CacheGotArtifact(digest![43], "/z/tmp/bar".into()), ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), @@ -1953,28 +1966,7 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ - ManifestEntry { - path: "foo.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }, - ManifestEntry { - path: "bar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - } - ]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; @@ -1987,10 +1979,18 @@ mod tests { GotArtifact(digest![42], "/z/tmp/foo".into()) => { CacheGotArtifact(digest![42], "/z/tmp/foo".into()), + ReadManifest(digest![42], jid![1, 2]), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), ToClient(cid![1], BrokerToClient::TransferArtifact(digest![43])), }; + GotManifestEntry(digest![43], jid![1, 2]) => {}; + + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => {}; + GotArtifact(digest![43], "/z/tmp/bar".into()) => { CacheGotArtifact(digest![43], "/z/tmp/bar".into()), ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), @@ -2013,28 +2013,27 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ManifestEntry { - path: "foobar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![2], spec![1, [(42, Manifest)]])) => { CacheGetArtifact(jid![1, 2], digest![42]), + ReadManifest(digest![42], jid![1, 2]), + ToClient( + cid![1], + BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers) + ), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), ToClient(cid![1], BrokerToClient::TransferArtifact(digest![43])), - ToClient(cid![1], BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers)), }; + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => {}; + GotArtifact(digest![43], "/z/tmp/bar".into()) => { CacheGotArtifact(digest![43], "/z/tmp/bar".into()), ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), @@ -2057,28 +2056,27 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ManifestEntry { - path: "foobar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![2], spec![1, [(42, Manifest)]])) => { CacheGetArtifact(jid![1, 2], digest![42]), + ReadManifest(digest![42], jid![1, 2]), + ToClient( + cid![1], + BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers) + ), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), ToClient(cid![1], BrokerToClient::TransferArtifact(digest![43])), - ToClient(cid![1], BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers)), }; + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => {}; + GotArtifact(digest![43], "/z/tmp/bar".into()) => { CacheGotArtifact(digest![43], "/z/tmp/bar".into()), ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), @@ -2101,24 +2099,25 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ManifestEntry { - path: "foobar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![2], spec![1, [(42, Manifest)]])) => { CacheGetArtifact(jid![1, 2], digest![42]), + ReadManifest(digest![42], jid![1, 2]), + ToClient( + cid![1], + BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers) + ), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), + }; + + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => { ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), }; @@ -2139,35 +2138,25 @@ mod tests { ], [ (digest![42], vec![vec![jid![1, 2]]]), (digest![43], vec![vec![jid![1, 2]]]), - ], [], [ - (digest![42], vec![ - ManifestEntry { - path: "foo.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - }, - ManifestEntry { - path: "bar.txt".into(), - metadata: ManifestEntryMetadata { - size: 11, - mode: Mode(0o0555), - mtime: UnixTimestamp(1705538554), - }, - data: ManifestEntryData::File(ManifestFileData::Digest(digest![43])), - } - ]) - ]) + ], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; ClientConnected(cid![1], client_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![2], spec![1, [(42, Manifest)]])) => { CacheGetArtifact(jid![1, 2], digest![42]), + ReadManifest(digest![42], jid![1, 2]), + ToClient( + cid![1], + BrokerToClient::JobStatusUpdate(cjid![2], JobBrokerStatus::WaitingForLayers) + ), + }; + + GotManifestEntry(digest![43], jid![1, 2]) => { CacheGetArtifact(jid![1, 2], digest![43]), + }; + + FinishedReadingManifest(digest![42], jid![1, 2], Ok(())) => { ToWorker(wid![1], EnqueueJob(jid![1, 2], spec![1, [(42, Manifest)]])), }; @@ -2187,7 +2176,7 @@ mod tests { digest![42], vec![Some(("/a/good/path".into(), 42))], ), - ], []) + ]) }, GetArtifactForWorker(digest![42], worker_artifact_fetcher_sender![1]) => { CacheGetArtifactForWorker(digest![42]), @@ -2203,7 +2192,7 @@ mod tests { digest![42], vec![Some(("/a/good/path".into(), 42))] ), - ], []) + ]) }, GetArtifactForWorker(digest![42], worker_artifact_fetcher_sender![1]) => { CacheGetArtifactForWorker(digest![42]), @@ -2223,14 +2212,17 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![42]), vec![GetArtifact::Wait]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; WorkerConnected(wid![1], 2, worker_sender![1]) => {}; MonitorConnected(mid![1], monitor_sender![1]) => {}; FromClient(cid![1], ClientToBroker::JobRequest(cjid![1], spec![1, [(42, Tar)]])) => { CacheGetArtifact(jid![1, 1], digest![42]), - ToClient(cid![1], BrokerToClient::JobStatusUpdate(cjid![1], JobBrokerStatus::WaitingForLayers)), + ToClient( + cid![1], + BrokerToClient::JobStatusUpdate(cjid![1], JobBrokerStatus::WaitingForLayers) + ), }; StatisticsHeartbeat => {}; FromMonitor(mid![1], MonitorToBroker::StatisticsRequest) => { @@ -2257,7 +2249,7 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; MonitorConnected(mid![1], monitor_sender![1]) => {}; @@ -2288,7 +2280,7 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; WorkerConnected(wid![1], 2, worker_sender![1]) => {}; @@ -2322,7 +2314,7 @@ mod tests { { Fixture::new([ ((jid![1, 1], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; WorkerConnected(wid![1], 2, worker_sender![1]) => {}; @@ -2360,7 +2352,7 @@ mod tests { { Fixture::new([ ((jid![1, 3], digest![1]), vec![GetArtifact::Success]), - ], [], [], []) + ], [], []) }, ClientConnected(cid![1], client_sender![1]) => {}; WorkerConnected(wid![2], 1, worker_sender![2]) => {}; @@ -2409,7 +2401,7 @@ mod tests { script_test! { drop_unknown_job_status_update, { - Fixture::new([], [], [], []) + Fixture::new([], [], []) }, WorkerConnected(wid![1], 1, worker_sender![1]) => {}; FromWorker(