Skip to content

Commit

Permalink
Inline start_job_inner.
Browse files Browse the repository at this point in the history
  • Loading branch information
nfachan committed Oct 23, 2024
1 parent 976bf9f commit 0bc0e49
Showing 1 changed file with 29 additions and 38 deletions.
67 changes: 29 additions & 38 deletions crates/maelstrom-worker/src/dispatcher_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use maelstrom_util::{
config::common::InlineLimit,
fs::Fs,
root::RootBuf,
sync::{self, EventReceiver, EventSender},
sync::{self, EventSender},
time::SystemMonotonicClock,
};
use slog::{debug, o, Logger};
Expand Down Expand Up @@ -73,22 +73,35 @@ impl DispatcherAdapter {
temp_file_factory,
})
}
}

fn start_job_inner(
&mut self,
jid: JobId,
spec: JobSpec,
layer_fs_path: PathBuf,
kill_event_receiver: EventReceiver,
) -> Result<()> {
debug!(self.log, "job starting"; "spec" => ?spec);
let log = self.log.new(o!(
"jid" => format!("{jid:?}"),
"program" => format!("{:?}", spec.program),
"args" => format!("{:?}", spec.arguments)
));

let layer_fs = LayerFs::from_path(&layer_fs_path, self.blob_dir.as_root())?;
pub struct TimerHandle(JoinHandle<()>);

impl Drop for TimerHandle {
fn drop(&mut self) {
self.0.abort();
}
}

impl Deps for DispatcherAdapter {
type JobHandle = EventSender;

fn start_job(&mut self, jid: JobId, spec: JobSpec, layer_fs_path: PathBuf) -> EventSender {
debug!(self.log, "starting job"; "jid" => ?jid, "spec" => ?spec);
let log = self.log.new(o!("jid" => format!("{jid:?}")));

let (kill_event_sender, kill_event_receiver) = sync::event();

let layer_fs = match LayerFs::from_path(&layer_fs_path, self.blob_dir.as_root()) {
Ok(layer_fs) => layer_fs,
Err(err) => {
let _ = self.dispatcher_sender.send(Message::JobCompleted(
jid,
Err(JobError::System(err.to_string())),
));
return kill_event_sender;
}
};
let layer_fs_cache = self.layer_fs_cache.clone();
let fuse_spawn = move |fd| {
tokio::spawn(async move {
Expand Down Expand Up @@ -119,29 +132,7 @@ impl DispatcherAdapter {
))
.ok()
});
Ok(())
}
}

pub struct TimerHandle(JoinHandle<()>);

impl Drop for TimerHandle {
fn drop(&mut self) {
self.0.abort();
}
}

impl Deps for DispatcherAdapter {
type JobHandle = EventSender;

fn start_job(&mut self, jid: JobId, spec: JobSpec, layer_fs_path: PathBuf) -> Self::JobHandle {
let (kill_event_sender, kill_event_receiver) = sync::event();
if let Err(e) = self.start_job_inner(jid, spec, layer_fs_path, kill_event_receiver) {
let _ = self.dispatcher_sender.send(Message::JobCompleted(
jid,
Err(JobError::System(e.to_string())),
));
}
kill_event_sender
}

Expand Down

0 comments on commit 0bc0e49

Please sign in to comment.