Skip to content

Commit

Permalink
Broker reads manifests in an async way
Browse files Browse the repository at this point in the history
It kicks off a job to read a manifest which sends the read digests back
to the scheduler via its message queue.

This is better than previously because it doesn't do IO in the
scheduler.

Local file IO isn't "blocking" in the traditional sense, but it can take
a while sometimes. Also, if we want our manifests to live on some
network backed cache, we will have to do real blocking in this path
anyway.
  • Loading branch information
bobbobbio committed Nov 24, 2024
1 parent 6be0b59 commit a496bed
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 207 deletions.
111 changes: 94 additions & 17 deletions crates/maelstrom-broker/src/scheduler_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,38 @@ mod scheduler;

pub use maelstrom_util::cache::CacheDir;

use maelstrom_base::proto::{BrokerToClient, BrokerToMonitor, BrokerToWorker};
use maelstrom_base::{
manifest::{ManifestEntryData, ManifestFileData},
proto::{BrokerToClient, BrokerToMonitor, BrokerToWorker},
JobId, Sha256Digest,
};
use maelstrom_util::{
async_fs,
cache::{self, Cache, TempFileFactory},
config::common::CacheSize,
manifest::ManifestReader,
manifest::AsyncManifestReader,
root::RootBuf,
sync,
};
use ref_cast::RefCast;
use scheduler::{Message, Scheduler, SchedulerDeps};
use slog::Logger;
use std::{
fs, io,
path::{Path, PathBuf},
sync::mpsc as std_mpsc,
};
use std::{path::PathBuf, sync::mpsc as std_mpsc};
use tokio::sync::mpsc as tokio_mpsc;
use tokio::task::JoinSet;

#[derive(Debug)]
pub struct PassThroughDeps;
pub struct PassThroughDeps {
manifest_reader_sender: tokio_mpsc::UnboundedSender<(Sha256Digest, JobId)>,
}

impl PassThroughDeps {
fn new(manifest_reader_sender: tokio_mpsc::UnboundedSender<(Sha256Digest, JobId)>) -> Self {
Self {
manifest_reader_sender,
}
}
}

/// The production implementation of [SchedulerDeps]. This implementation just hands the
/// message to the provided sender.
Expand All @@ -29,8 +42,6 @@ impl SchedulerDeps for PassThroughDeps {
type WorkerSender = tokio_mpsc::UnboundedSender<BrokerToWorker>;
type MonitorSender = tokio_mpsc::UnboundedSender<BrokerToMonitor>;
type WorkerArtifactFetcherSender = std_mpsc::Sender<Option<(PathBuf, u64)>>;
type ManifestError = io::Error;
type ManifestIterator = ManifestReader<fs::File>;

fn send_message_to_client(&mut self, sender: &mut Self::ClientSender, message: BrokerToClient) {
sender.send(message).ok();
Expand All @@ -56,8 +67,64 @@ impl SchedulerDeps for PassThroughDeps {
sender.send(message).ok();
}

fn read_manifest(&mut self, path: &Path) -> io::Result<ManifestReader<fs::File>> {
ManifestReader::new(fs::File::open(path)?)
fn read_manifest(&mut self, manifest_digest: Sha256Digest, job_id: JobId) {
self.manifest_reader_sender
.send((manifest_digest, job_id))
.ok();
}
}

#[derive(Debug)]
struct CacheManifestReader {
tasks: JoinSet<()>,
receiver: tokio_mpsc::UnboundedReceiver<(Sha256Digest, JobId)>,
sender: SchedulerSender,
}

async fn read_manifest_from_path(
sender: SchedulerSender,
manifest_path: PathBuf,
job_id: JobId,
) -> anyhow::Result<()> {
let fs = async_fs::Fs::new();
let mut reader = AsyncManifestReader::new(fs.open_file(manifest_path).await?).await?;
while let Some(entry) = reader.next().await? {
if let ManifestEntryData::File(ManifestFileData::Digest(digest)) = entry.data {
sender.send(Message::GotManifestEntry(digest, job_id)).ok();
}
}
Ok(())
}

impl CacheManifestReader {
fn new(
receiver: tokio_mpsc::UnboundedReceiver<(Sha256Digest, JobId)>,
sender: SchedulerSender,
) -> Self {
Self {
tasks: JoinSet::new(),
receiver,
sender,
}
}

fn kick_off_manifest_readings(&mut self, cache: &TaskCache) {
while let Ok((manifest_digest, job_id)) = self.receiver.try_recv() {
let sender = self.sender.clone();
let manifest_path = cache
.cache_path(scheduler::BrokerKey::ref_cast(&manifest_digest))
.into_path_buf();
self.tasks.spawn(async move {
let result = read_manifest_from_path(sender.clone(), manifest_path, job_id).await;
sender
.send(Message::FinishedReadingManifest(
manifest_digest,
job_id,
result,
))
.ok();
});
}
}
}

Expand All @@ -69,26 +136,34 @@ pub type SchedulerMessage = Message<PassThroughDeps, cache::fs::std::TempFile>;
/// This type is used often enough to warrant an alias.
pub type SchedulerSender = tokio_mpsc::UnboundedSender<SchedulerMessage>;

type TaskCache = Cache<cache::fs::std::Fs, scheduler::BrokerKey, scheduler::BrokerGetStrategy>;

pub struct SchedulerTask {
scheduler: Scheduler<
Cache<cache::fs::std::Fs, scheduler::BrokerKey, scheduler::BrokerGetStrategy>,
PassThroughDeps,
>,
scheduler: Scheduler<TaskCache, PassThroughDeps>,
sender: SchedulerSender,
receiver: tokio_mpsc::UnboundedReceiver<SchedulerMessage>,
temp_file_factory: TempFileFactory<cache::fs::std::Fs>,
manifest_reader: CacheManifestReader,
deps: PassThroughDeps,
}

impl SchedulerTask {
pub fn new(cache_root: RootBuf<CacheDir>, cache_size: CacheSize, log: Logger) -> Self {
let (sender, receiver) = tokio_mpsc::unbounded_channel();
let (cache, temp_file_factory) =
Cache::new(cache::fs::std::Fs, cache_root, cache_size, log, true).unwrap();

let (manifest_reader_sender, manifest_reader_receiver) = tokio_mpsc::unbounded_channel();
let manifest_reader = CacheManifestReader::new(manifest_reader_receiver, sender.clone());
let deps = PassThroughDeps::new(manifest_reader_sender);

SchedulerTask {
scheduler: Scheduler::new(cache),
sender,
receiver,
temp_file_factory,
manifest_reader,
deps,
}
}

Expand All @@ -113,7 +188,9 @@ impl SchedulerTask {
/// give us a way to return an error, for precisely this reason.
pub async fn run(mut self) {
sync::channel_reader(self.receiver, |msg| {
self.scheduler.receive_message(&mut PassThroughDeps, msg)
self.scheduler.receive_message(&mut self.deps, msg);
self.manifest_reader
.kick_off_manifest_readings(&self.scheduler.cache);
})
.await
.unwrap();
Expand Down
Loading

0 comments on commit a496bed

Please sign in to comment.