Skip to content

Commit

Permalink
Broker handles getting manifest dependencies from client
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbobbio committed Jan 18, 2024
1 parent aeca66b commit 6a511ea
Show file tree
Hide file tree
Showing 6 changed files with 499 additions and 155 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/maelstrom-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ license.workspace = true
version.workspace = true

[dependencies]
bincode.workspace = true
camino.workspace = true
derive_more.workspace = true
enum-map.workspace = true
Expand Down
69 changes: 69 additions & 0 deletions crates/maelstrom-base/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
};
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::io;

/// The first message sent by a connector to the broker. It identifies what the connector is, and
/// provides any relevant information.
Expand Down Expand Up @@ -98,6 +99,74 @@ pub struct ManifestEntry {
pub data: ManifestEntryData,
}

pub struct ManifestReader<ReadT> {
r: ReadT,
stream_end: u64,
}

impl<ReadT: io::Read + io::Seek> ManifestReader<ReadT> {
pub fn new(mut r: ReadT) -> io::Result<Self> {
let stream_start = r.stream_position()?;
r.seek(io::SeekFrom::End(0))?;
let stream_end = r.stream_position()?;
r.seek(io::SeekFrom::Start(stream_start))?;

let version: ManifestVersion = bincode::deserialize_from(&mut r)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
if version != ManifestVersion::default() {
return Err(io::Error::new(io::ErrorKind::Other, "bad manifest version"));
}

Ok(Self { r, stream_end })
}

fn next_inner(&mut self) -> io::Result<Option<ManifestEntry>> {
if self.r.stream_position()? == self.stream_end {
return Ok(None);
}
Ok(Some(
bincode::deserialize_from(&mut self.r)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?,
))
}
}

impl<ReadT: io::Read + io::Seek> Iterator for ManifestReader<ReadT> {
type Item = io::Result<ManifestEntry>;

fn next(&mut self) -> Option<io::Result<ManifestEntry>> {
self.next_inner().transpose()
}
}

pub struct ManifestWriter<WriteT> {
w: WriteT,
}

impl<WriteT: io::Write> ManifestWriter<WriteT> {
pub fn new(mut w: WriteT) -> io::Result<Self> {
bincode::serialize_into(&mut w, &ManifestVersion::default())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(Self { w })
}

pub fn write_entry(&mut self, entry: &ManifestEntry) -> io::Result<()> {
bincode::serialize_into(&mut self.w, entry)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(())
}

pub fn write_entries<'entry>(
&mut self,
entries: impl IntoIterator<Item = &'entry ManifestEntry>,
) -> io::Result<()> {
for entry in entries {
self.write_entry(entry)?
}
Ok(())
}
}

#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize_repr, Deserialize_repr)]
#[repr(u32)]
pub enum ManifestVersion {
Expand Down
22 changes: 7 additions & 15 deletions crates/maelstrom-broker/src/artifact_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::scheduler_task::{SchedulerMessage, SchedulerSender};
use anyhow::{anyhow, Result};
use anyhow::Result;
use maelstrom_base::{
proto::{
ArtifactFetcherToBroker, ArtifactMetadata, ArtifactType, BrokerToArtifactFetcher, Identity,
ManifestEntry, ManifestEntryData, ManifestVersion,
ManifestEntry, ManifestEntryData, ManifestReader,
},
Sha256Digest,
};
Expand Down Expand Up @@ -88,15 +88,9 @@ fn send_manifest(
mut socket: &mut impl io::Write,
artifact_meta: ArtifactMetadata,
) -> Result<()> {
let version: ManifestVersion = bincode::deserialize_from(&mut file)?;
if version != ManifestVersion::default() {
return Err(anyhow!("bad manifest version"));
}

let mut tar = tar::Builder::new(&mut socket);
while file.stream_position()? != file.metadata()?.len() {
let entry = bincode::deserialize_from::<_, ManifestEntry>(&mut file)?;
add_entry_to_tar(fs, &mut tar, scheduler_sender, &entry)?;
for entry in ManifestReader::new(&mut file)? {
add_entry_to_tar(fs, &mut tar, scheduler_sender, &entry?)?;
}
tar.finish()?;

Expand Down Expand Up @@ -173,7 +167,7 @@ pub fn connection_main(
mod tests {
use super::*;
use assert_matches::assert_matches;
use maelstrom_base::proto::{ManifestEntryMetadata, Mode, UnixTimestamp};
use maelstrom_base::proto::{ManifestEntryMetadata, ManifestWriter, Mode, UnixTimestamp};
use maelstrom_test::*;
use std::io::Read as _;
use std::os::unix::fs::MetadataExt as _;
Expand All @@ -185,10 +179,8 @@ mod tests {
fn write_manifest(path: &Path, entries: Vec<ManifestEntry>) -> u64 {
let fs = Fs::new();
let mut f = fs.create_file(path).unwrap();
bincode::serialize_into(&mut f, &ManifestVersion::default()).unwrap();
for entry in entries {
bincode::serialize_into(&mut f, &entry).unwrap();
}
let mut writer = ManifestWriter::new(&mut f).unwrap();
writer.write_entries(&entries).unwrap();
f.stream_position().unwrap()
}

Expand Down
Loading

0 comments on commit 6a511ea

Please sign in to comment.