Skip to content

Commit

Permalink
Extract out a start_dispatcher_task_common which is used in both paths.
Browse files Browse the repository at this point in the history
  • Loading branch information
nfachan committed Oct 28, 2024
1 parent 863bf65 commit 7e3d0e1
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 36 deletions.
31 changes: 25 additions & 6 deletions crates/maelstrom-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,31 @@ fn start_dispatcher_task(
config.slots,
);

Ok(task::spawn(dispatcher_main(
broker_socket_incoming_receiver,
dispatcher,
dispatcher_receiver,
log.clone(),
)))
start_dispatcher_task_common(dispatcher, log, move |dispatcher, log| {
dispatcher_main(
broker_socket_incoming_receiver,
dispatcher,
dispatcher_receiver,
log,
)
})
}

fn start_dispatcher_task_common<
DepsT: dispatcher::Deps,
ArtifactFetcherT,
BrokerSenderT,
CacheT,
MainFutureT: Future<Output = ()> + Send + 'static,
>(
dispatcher: dispatcher::Dispatcher<DepsT, ArtifactFetcherT, BrokerSenderT, CacheT>,
log: &Logger,
main: impl FnOnce(
dispatcher::Dispatcher<DepsT, ArtifactFetcherT, BrokerSenderT, CacheT>,
Logger,
) -> MainFutureT,
) -> Result<JoinHandle<()>> {
Ok(task::spawn(main(dispatcher, log.clone())))
}

async fn dispatcher_main(
Expand Down
72 changes: 42 additions & 30 deletions crates/maelstrom-worker/src/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use maelstrom_util::{
use slog::{debug, Logger};
use tokio::{
sync::mpsc::{self},
task::{self, JoinHandle},
task::JoinHandle,
};

pub struct Config {
Expand All @@ -39,7 +39,7 @@ pub fn start_task(
artifact_fetcher: impl ArtifactFetcher + Send + Sync + 'static,
broker_sender: impl BrokerSender + Send + Sync + 'static,
config: Config,
mut dispatcher_receiver: Receiver,
dispatcher_receiver: Receiver,
dispatcher_sender: Sender,
log: &Logger,
) -> Result<JoinHandle<()>> {
Expand All @@ -65,49 +65,61 @@ pub fn start_task(
)?;

// Create the actual local_worker.
let mut worker_dispatcher = Dispatcher::new(
let dispatcher = Dispatcher::new(
local_worker_dispatcher_adapter,
artifact_fetcher,
broker_sender,
local_worker_cache,
config.slots,
);

let handle_worker_message = |msg, worker: &mut Dispatcher<_, _, _, _>| -> Result<()> {
// Spawn a task for the local_worker.
crate::start_dispatcher_task_common(dispatcher, log, move |dispatcher, log| {
main(dispatcher, dispatcher_receiver, log)
})
}

async fn main<
ArtifactFetcherT: crate::dispatcher::ArtifactFetcher,
BrokerSenderT: crate::dispatcher::BrokerSender,
>(
mut dispatcher: Dispatcher<
DispatcherAdapter,
ArtifactFetcherT,
BrokerSenderT,
crate::types::Cache,
>,
mut dispatcher_receiver: Receiver,
log: Logger,
) {
let handle_worker_message = |msg, dispatcher: &mut Dispatcher<_, _, _, _>| -> Result<()> {
if let Message::Shutdown(error) = msg {
Err(error)
} else {
worker.receive_message(msg);
dispatcher.receive_message(msg);
Ok(())
}
};

// Spawn a task for the local_worker.
let log_clone = log.clone();
Ok(task::spawn(async move {
let shutdown_error = loop {
let msg = dispatcher_receiver.recv().await.expect("missing shutdown");
if let Err(err) = handle_worker_message(msg, &mut worker_dispatcher) {
break err;
}
};
let shutdown_error = loop {
let msg = dispatcher_receiver.recv().await.expect("missing shutdown");
if let Err(err) = handle_worker_message(msg, &mut dispatcher) {
break err;
}
};

debug!(
log_clone,
"shutting down local worker due to {shutdown_error}"
);
worker_dispatcher.receive_message(Message::Shutdown(shutdown_error));
debug!(
log_clone,
"canceling {} running jobs",
worker_dispatcher.num_jobs_executing()
);
debug!(log, "shutting down local worker due to {shutdown_error}");
dispatcher.receive_message(Message::Shutdown(shutdown_error));
debug!(
log,
"canceling {} running jobs",
dispatcher.num_jobs_executing()
);

while worker_dispatcher.num_jobs_executing() > 0 {
let msg = dispatcher_receiver.recv().await.expect("missing shutdown");
let _ = handle_worker_message(msg, &mut worker_dispatcher);
}
while dispatcher.num_jobs_executing() > 0 {
let msg = dispatcher_receiver.recv().await.expect("missing shutdown");
let _ = handle_worker_message(msg, &mut dispatcher);
}

debug!(log_clone, "local worker exiting");
}))
debug!(log, "local worker exiting");
}

0 comments on commit 7e3d0e1

Please sign in to comment.