From 02f438d607df6e5179c805b0d3065b496a89c79a Mon Sep 17 00:00:00 2001 From: Remi Bernotavicius Date: Wed, 4 Dec 2024 18:41:55 -0800 Subject: [PATCH] Implement client artifact pusher that uses github --- Cargo.lock | 1 + crates/maelstrom-client-process/Cargo.toml | 1 + .../src/artifact_pusher.rs | 14 ++++- .../src/artifact_pusher/github.rs | 60 +++++++++++++++++++ .../src/artifact_pusher/tcp_upload.rs | 9 +-- .../maelstrom-client-process/src/progress.rs | 60 ++++++++++++++++--- crates/maelstrom-github/src/lib.rs | 3 +- 7 files changed, 130 insertions(+), 18 deletions(-) create mode 100644 crates/maelstrom-client-process/src/artifact_pusher/github.rs diff --git a/Cargo.lock b/Cargo.lock index 12d4d2a7..44c7136f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3472,6 +3472,7 @@ dependencies = [ "maelstrom-base", "maelstrom-client-base", "maelstrom-container", + "maelstrom-github", "maelstrom-test", "maelstrom-util", "maelstrom-worker", diff --git a/crates/maelstrom-client-process/Cargo.toml b/crates/maelstrom-client-process/Cargo.toml index 07477c69..aee9c05b 100644 --- a/crates/maelstrom-client-process/Cargo.toml +++ b/crates/maelstrom-client-process/Cargo.toml @@ -23,6 +23,7 @@ itertools.workspace = true maelstrom-base.workspace = true maelstrom-client-base.workspace = true maelstrom-container.workspace = true +maelstrom-github.workspace = true maelstrom-util.workspace = true maelstrom-worker.workspace = true pin-project.workspace = true diff --git a/crates/maelstrom-client-process/src/artifact_pusher.rs b/crates/maelstrom-client-process/src/artifact_pusher.rs index ffc9b079..68f9f9d4 100644 --- a/crates/maelstrom-client-process/src/artifact_pusher.rs +++ b/crates/maelstrom-client-process/src/artifact_pusher.rs @@ -1,10 +1,15 @@ +mod github; mod tcp_upload; use anyhow::Result; use maelstrom_base::Sha256Digest; use maelstrom_util::r#async::Pool; use std::future::Future; -use std::{num::NonZeroU32, path::PathBuf, sync::Arc}; +use std::{ + num::NonZeroU32, + path::{Path, PathBuf}, + sync::Arc, +}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::task::JoinSet; @@ -76,3 +81,10 @@ fn start_task_inner( Ok(()) }); } + +fn construct_upload_name(digest: &Sha256Digest, path: &Path) -> String { + let digest_string = digest.to_string(); + let short_digest = &digest_string[digest_string.len() - 7..]; + let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); + format!("{short_digest} {file_name}") +} diff --git a/crates/maelstrom-client-process/src/artifact_pusher/github.rs b/crates/maelstrom-client-process/src/artifact_pusher/github.rs new file mode 100644 index 00000000..c8a13136 --- /dev/null +++ b/crates/maelstrom-client-process/src/artifact_pusher/github.rs @@ -0,0 +1,60 @@ +use crate::artifact_pusher::{construct_upload_name, start_task_inner, Receiver}; +use crate::progress::{ProgressTracker, UploadProgressReader}; +use anyhow::Result; +use maelstrom_base::Sha256Digest; +use maelstrom_github::{FileStreamBuilder, GitHubClient, SeekableStream}; +use maelstrom_util::async_fs::Fs; +use std::{path::PathBuf, sync::Arc}; +use tokio::task::JoinSet; + +pub async fn push_one_artifact( + github_client: Arc, + upload_tracker: ProgressTracker, + path: PathBuf, + digest: Sha256Digest, + success_callback: Box, +) -> Result<()> { + let fs = Fs::new(); + let file = fs.open_file(&path).await?; + let size = file.metadata().await?.len(); + + let upload_name = construct_upload_name(&digest, &path); + let prog = upload_tracker.new_task(&upload_name, size); + + let artifact_name = format!("maelstrom-cache-sha256-{digest}"); + let file_stream = FileStreamBuilder::new(file.into_inner()).build().await?; + let stream = Box::new(UploadProgressReader::new(prog, file_stream)) as Box; + github_client.upload(&artifact_name, stream).await?; + + success_callback(); + + Ok(()) +} + +#[expect(dead_code)] +pub fn start_task( + join_set: &mut JoinSet>, + receiver: Receiver, + github_client: Arc, + upload_tracker: ProgressTracker, +) { + start_task_inner( + join_set, + receiver, + move |_, path, digest, success_callback| { + let upload_tracker = upload_tracker.clone(); + let github_client = github_client.clone(); + async move { + push_one_artifact( + github_client, + upload_tracker, + path, + digest, + success_callback, + ) + .await + .map(|()| ((), ())) + } + }, + ) +} diff --git a/crates/maelstrom-client-process/src/artifact_pusher/tcp_upload.rs b/crates/maelstrom-client-process/src/artifact_pusher/tcp_upload.rs index a8b9873d..39526f1a 100644 --- a/crates/maelstrom-client-process/src/artifact_pusher/tcp_upload.rs +++ b/crates/maelstrom-client-process/src/artifact_pusher/tcp_upload.rs @@ -1,4 +1,4 @@ -use crate::artifact_pusher::{start_task_inner, Receiver}; +use crate::artifact_pusher::{construct_upload_name, start_task_inner, Receiver}; use crate::progress::{ProgressTracker, UploadProgressReader}; use anyhow::{anyhow, Context as _, Result}; use maelstrom_base::{ @@ -14,13 +14,6 @@ use tokio::{ task::JoinSet, }; -fn construct_upload_name(digest: &Sha256Digest, path: &Path) -> String { - let digest_string = digest.to_string(); - let short_digest = &digest_string[digest_string.len() - 7..]; - let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); - format!("{short_digest} {file_name}") -} - pub async fn push_one_artifact( stream: Option, upload_tracker: ProgressTracker, diff --git a/crates/maelstrom-client-process/src/progress.rs b/crates/maelstrom-client-process/src/progress.rs index 1a33b10e..331c8210 100644 --- a/crates/maelstrom-client-process/src/progress.rs +++ b/crates/maelstrom-client-process/src/progress.rs @@ -1,11 +1,16 @@ use maelstrom_client_base::RemoteProgress; +use maelstrom_github::{AzureResult, SeekableStream}; use maelstrom_util::ext::OptionExt as _; -use std::collections::HashMap; -use std::pin::{pin, Pin}; -use std::sync::OnceLock; -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, Mutex, +use std::{ + collections::HashMap, + future::Future, + pin::{pin, Pin}, + sync::OnceLock, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, + }, + task::{ready, Poll}, }; use tokio::io::{self, AsyncRead}; @@ -36,7 +41,7 @@ where } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct RunningProgress { name: String, progress: Arc, @@ -47,6 +52,10 @@ impl RunningProgress { pub fn update(&self, amount_to_add: u64) { self.progress.fetch_add(amount_to_add, Ordering::AcqRel); } + + pub fn reset(&self) { + self.progress.store(0, Ordering::Release); + } } impl Drop for RunningProgress { @@ -114,6 +123,7 @@ impl ProgressTracker { } } +#[derive(Clone, Debug)] pub struct UploadProgressReader { prog: RunningProgress, read: ReadT, @@ -130,7 +140,7 @@ impl AsyncRead for UploadProgressReader { self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, dst: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { + ) -> Poll> { let start_len = dst.filled().len(); let me = self.get_mut(); let result = AsyncRead::poll_read(pin!(&mut me.read), cx, dst); @@ -139,3 +149,37 @@ impl AsyncRead for UploadProgressReader { result } } + +impl futures::io::AsyncRead for UploadProgressReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + dst: &mut [u8], + ) -> Poll> { + let me = self.get_mut(); + let size = ready!(futures::io::AsyncRead::poll_read( + pin!(&mut me.read), + cx, + dst + ))?; + me.prog.update(size as u64); + Poll::Ready(Ok(size)) + } +} + +impl SeekableStream for UploadProgressReader { + fn reset<'life0, 'async_trait>( + &'life0 mut self, + ) -> Pin> + Send + 'async_trait>> + where + Self: 'async_trait, + 'life0: 'async_trait, + { + self.prog.reset(); + self.read.reset() + } + + fn len(&self) -> usize { + self.read.len() + } +} diff --git a/crates/maelstrom-github/src/lib.rs b/crates/maelstrom-github/src/lib.rs index fd57f301..dcfb3571 100644 --- a/crates/maelstrom-github/src/lib.rs +++ b/crates/maelstrom-github/src/lib.rs @@ -11,8 +11,9 @@ //! pub use azure_core::{ + error::Result as AzureResult, tokio::fs::{FileStream, FileStreamBuilder}, - Body, + Body, SeekableStream, }; use anyhow::{anyhow, bail, Result};