Skip to content

Commit

Permalink
Add some handling of artifacts in client test
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbobbio committed Feb 1, 2024
1 parent 075343d commit 5663524
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 11 deletions.
4 changes: 2 additions & 2 deletions crates/cargo-maelstrom/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ fn run_app(

// process job enqueuing
client_driver.process_client_messages();
b_conn.process(1);
b_conn.process(1, false /* fetch_layers */);
if test.desired_state == JobState::Complete {
client_driver.process_broker_msg(1);
}
Expand All @@ -294,7 +294,7 @@ fn run_app(
client_driver.process_client_messages();

// process job state request
b_conn.process(1);
b_conn.process(1, false /* fetch_layers */);
client_driver.process_broker_msg(1);

prog_driver.update(counts.try_recv().unwrap()).unwrap();
Expand Down
8 changes: 7 additions & 1 deletion crates/maelstrom-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ impl Dispatcher {
}
}
DispatcherMessage::BrokerToClient(BrokerToClient::TransferArtifact(digest)) => {
let path = self.artifacts.get(&digest).unwrap().clone();
let path = self
.artifacts
.get(&digest)
.unwrap_or_else(|| {
panic!("got request for unknown artifact with digest {digest}")
})
.clone();
self.artifact_pusher
.send(ArtifactPushRequest { path, digest })?;
}
Expand Down
23 changes: 20 additions & 3 deletions crates/maelstrom-client/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use maelstrom_base::{ArtifactType, JobOutputResult, JobSpec, JobStatus, JobSucce
use maelstrom_client::Client;
use maelstrom_test::{
client_driver::TestClientDriver,
digest,
fake_broker::{FakeBroker, FakeBrokerJobAction, FakeBrokerState, JobSpecMatcher},
utf8_path_buf,
};
Expand All @@ -20,6 +19,8 @@ fn basic_add_job() {
fs.create_dir(&project_dir).unwrap();
let cache_dir = tmp_dir.path().join("cache");
fs.create_dir(&cache_dir).unwrap();
let artifact_dir = tmp_dir.path().join("artifacts");
fs.create_dir(&artifact_dir).unwrap();

let test_job_result = JobSuccess {
status: JobStatus::Exited(0),
Expand Down Expand Up @@ -47,11 +48,17 @@ fn basic_add_job() {
.unwrap();
let mut broker_conn = broker.accept();

let test_artifact = artifact_dir.join("test_artifact");
fs.write(&test_artifact, b"hello world").unwrap();

let digest = client.add_artifact(&test_artifact).unwrap();
client_driver.process_client_messages();

let spec = JobSpec {
program: utf8_path_buf!("foo"),
arguments: vec!["bar".into()],
environment: vec![],
layers: nonempty![(digest![1], ArtifactType::Tar)],
layers: nonempty![(digest.clone(), ArtifactType::Tar)],
devices: Default::default(),
mounts: vec![],
enable_loopback: false,
Expand All @@ -67,7 +74,17 @@ fn basic_add_job() {
);

client_driver.process_client_messages();
broker_conn.process(1);
broker_conn.process(1, true /* fetch_layers */);
client_driver.process_broker_msg(1);

client_driver.process_artifact(|| broker.receive_artifact(&artifact_dir));
assert_eq!(
fs.read_to_string(&artifact_dir.join(digest.to_string()))
.unwrap(),
"hello world"
);

broker_conn.process(1, true /* fetch_layers */);
client_driver.process_broker_msg(1);

let (_id, result) = recv.recv().unwrap();
Expand Down
10 changes: 10 additions & 0 deletions crates/maelstrom-test/src/client_driver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Result;
use maelstrom_client::{ClientDeps, ClientDriver};
use std::sync::{Arc, Mutex};
use std::thread;

#[derive(Default, Clone)]
pub struct TestClientDriver {
Expand Down Expand Up @@ -33,4 +34,13 @@ impl TestClientDriver {
let deps = locked_deps.as_mut().unwrap();
while deps.dispatcher.try_process_one().is_ok() {}
}

pub fn process_artifact(&self, body: impl FnOnce()) {
let mut locked_deps = self.deps.lock().unwrap();
let deps = locked_deps.as_mut().unwrap();
thread::scope(|scope| {
deps.artifact_pusher.process_one(scope);
body()
});
}
}
65 changes: 60 additions & 5 deletions crates/maelstrom-test/src/fake_broker.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use assert_matches::assert_matches;
use maelstrom_base::{
proto::{BrokerToClient, ClientToBroker, Hello},
proto::{
ArtifactPusherToBroker, BrokerToArtifactPusher, BrokerToClient, ClientToBroker, Hello,
},
stats::JobStateCounts,
JobSpec, JobStringResult,
ClientJobId, JobSpec, JobStringResult,
};
use maelstrom_util::config::BrokerAddr;
use maelstrom_util::{config::BrokerAddr, ext::OptionExt as _, fs::Fs, io::FixedSizeReader};
use serde::{de::DeserializeOwned, Serialize};
use std::collections::HashMap;
use std::io::{self, Read as _, Write as _};
use std::net::{Ipv6Addr, SocketAddrV6, TcpListener, TcpStream};
use std::path::Path;

fn job_spec_matcher(spec: &JobSpec) -> JobSpecMatcher {
let binary = spec
Expand Down Expand Up @@ -51,6 +54,10 @@ impl MessageStream {
self.stream.read_exact(&mut buf).unwrap();
Ok(bincode::deserialize_from(&buf[..]).unwrap())
}

fn into_inner(self) -> TcpStream {
self.stream
}
}

pub struct FakeBroker {
Expand All @@ -63,6 +70,7 @@ pub struct FakeBroker {
pub struct FakeBrokerConnection {
messages: MessageStream,
state: FakeBrokerState,
pending_response: Option<(ClientJobId, FakeBrokerJobAction)>,
}

impl FakeBroker {
Expand Down Expand Up @@ -92,8 +100,26 @@ impl FakeBroker {
FakeBrokerConnection {
messages,
state: self.state.clone(),
pending_response: None,
}
}

pub fn receive_artifact(&mut self, digest_dir: &Path) {
let (stream, _) = self.listener.accept().unwrap();
let mut messages = MessageStream { stream };

let msg: Hello = messages.next().unwrap();
assert_matches!(msg, Hello::ArtifactPusher);

let ArtifactPusherToBroker(digest, size) = messages.next().unwrap();
let destination = digest_dir.join(digest.to_string());

let fs = Fs::new();
let mut stream = messages.into_inner();
let mut file = fs.create_file(destination).unwrap();
io::copy(&mut FixedSizeReader::new(&mut stream, size), &mut file).unwrap();
send_message(&stream, &BrokerToArtifactPusher(Ok(())));
}
}

fn send_message(mut stream: &TcpStream, msg: &impl Serialize) {
Expand All @@ -103,13 +129,42 @@ fn send_message(mut stream: &TcpStream, msg: &impl Serialize) {
}

impl FakeBrokerConnection {
pub fn process(&mut self, count: usize) {
fn fetch_layers(&self, spec: &JobSpec) {
for (digest, _type) in &spec.layers {
send_message(
&self.messages.stream,
&BrokerToClient::TransferArtifact(digest.clone()),
);
}
}

pub fn process(&mut self, count: usize, fetch_layers: bool) {
for _ in 0..count {
if let Some((id, response)) = self.pending_response.take() {
match response {
FakeBrokerJobAction::Respond(res) => {
send_message(&self.messages.stream, &BrokerToClient::JobResponse(id, res))
}
FakeBrokerJobAction::Ignore => (),
}
continue;
}

let msg = self.messages.next::<ClientToBroker>().unwrap();
match msg {
ClientToBroker::JobRequest(id, spec) => {
let job_spec_matcher = job_spec_matcher(&spec);
match self.state.job_responses.remove(&job_spec_matcher).unwrap() {
let response = self.state.job_responses.remove(&job_spec_matcher).unwrap();

if fetch_layers {
self.fetch_layers(&spec);
self.pending_response
.replace((id, response))
.assert_is_none();
continue;
}

match response {
FakeBrokerJobAction::Respond(res) => send_message(
&self.messages.stream,
&BrokerToClient::JobResponse(id, res),
Expand Down

0 comments on commit 5663524

Please sign in to comment.