Skip to content

Commit

Permalink
Implement client artifact pusher that uses github
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbobbio committed Dec 5, 2024
1 parent 70efd34 commit 02f438d
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/maelstrom-client-process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ itertools.workspace = true
maelstrom-base.workspace = true
maelstrom-client-base.workspace = true
maelstrom-container.workspace = true
maelstrom-github.workspace = true
maelstrom-util.workspace = true
maelstrom-worker.workspace = true
pin-project.workspace = true
Expand Down
14 changes: 13 additions & 1 deletion crates/maelstrom-client-process/src/artifact_pusher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
mod github;
mod tcp_upload;

use anyhow::Result;
use maelstrom_base::Sha256Digest;
use maelstrom_util::r#async::Pool;
use std::future::Future;
use std::{num::NonZeroU32, path::PathBuf, sync::Arc};
use std::{
num::NonZeroU32,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinSet;

Expand Down Expand Up @@ -76,3 +81,10 @@ fn start_task_inner<PoolItemT, RetFutT, PushOneArtifactFn>(
Ok(())
});
}

fn construct_upload_name(digest: &Sha256Digest, path: &Path) -> String {
let digest_string = digest.to_string();
let short_digest = &digest_string[digest_string.len() - 7..];
let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
format!("{short_digest} {file_name}")
}
60 changes: 60 additions & 0 deletions crates/maelstrom-client-process/src/artifact_pusher/github.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use crate::artifact_pusher::{construct_upload_name, start_task_inner, Receiver};
use crate::progress::{ProgressTracker, UploadProgressReader};
use anyhow::Result;
use maelstrom_base::Sha256Digest;
use maelstrom_github::{FileStreamBuilder, GitHubClient, SeekableStream};
use maelstrom_util::async_fs::Fs;
use std::{path::PathBuf, sync::Arc};
use tokio::task::JoinSet;

pub async fn push_one_artifact(
github_client: Arc<GitHubClient>,
upload_tracker: ProgressTracker,
path: PathBuf,
digest: Sha256Digest,
success_callback: Box<dyn FnOnce() + Send + Sync>,
) -> Result<()> {
let fs = Fs::new();
let file = fs.open_file(&path).await?;
let size = file.metadata().await?.len();

let upload_name = construct_upload_name(&digest, &path);
let prog = upload_tracker.new_task(&upload_name, size);

let artifact_name = format!("maelstrom-cache-sha256-{digest}");
let file_stream = FileStreamBuilder::new(file.into_inner()).build().await?;
let stream = Box::new(UploadProgressReader::new(prog, file_stream)) as Box<dyn SeekableStream>;
github_client.upload(&artifact_name, stream).await?;

success_callback();

Ok(())
}

#[expect(dead_code)]
pub fn start_task(
join_set: &mut JoinSet<Result<()>>,
receiver: Receiver,
github_client: Arc<GitHubClient>,
upload_tracker: ProgressTracker,
) {
start_task_inner(
join_set,
receiver,
move |_, path, digest, success_callback| {
let upload_tracker = upload_tracker.clone();
let github_client = github_client.clone();
async move {
push_one_artifact(
github_client,
upload_tracker,
path,
digest,
success_callback,
)
.await
.map(|()| ((), ()))
}
},
)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::artifact_pusher::{start_task_inner, Receiver};
use crate::artifact_pusher::{construct_upload_name, start_task_inner, Receiver};
use crate::progress::{ProgressTracker, UploadProgressReader};
use anyhow::{anyhow, Context as _, Result};
use maelstrom_base::{
Expand All @@ -14,13 +14,6 @@ use tokio::{
task::JoinSet,
};

fn construct_upload_name(digest: &Sha256Digest, path: &Path) -> String {
let digest_string = digest.to_string();
let short_digest = &digest_string[digest_string.len() - 7..];
let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
format!("{short_digest} {file_name}")
}

pub async fn push_one_artifact(
stream: Option<TcpStream>,
upload_tracker: ProgressTracker,
Expand Down
60 changes: 52 additions & 8 deletions crates/maelstrom-client-process/src/progress.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use maelstrom_client_base::RemoteProgress;
use maelstrom_github::{AzureResult, SeekableStream};
use maelstrom_util::ext::OptionExt as _;
use std::collections::HashMap;
use std::pin::{pin, Pin};
use std::sync::OnceLock;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
use std::{
collections::HashMap,
future::Future,
pin::{pin, Pin},
sync::OnceLock,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
},
task::{ready, Poll},
};
use tokio::io::{self, AsyncRead};

Expand Down Expand Up @@ -36,7 +41,7 @@ where
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct RunningProgress {
name: String,
progress: Arc<AtomicU64>,
Expand All @@ -47,6 +52,10 @@ impl RunningProgress {
pub fn update(&self, amount_to_add: u64) {
self.progress.fetch_add(amount_to_add, Ordering::AcqRel);
}

pub fn reset(&self) {
self.progress.store(0, Ordering::Release);
}
}

impl Drop for RunningProgress {
Expand Down Expand Up @@ -114,6 +123,7 @@ impl ProgressTracker {
}
}

#[derive(Clone, Debug)]
pub struct UploadProgressReader<ReadT> {
prog: RunningProgress,
read: ReadT,
Expand All @@ -130,7 +140,7 @@ impl<ReadT: AsyncRead + Unpin> AsyncRead for UploadProgressReader<ReadT> {
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
dst: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<io::Result<()>> {
) -> Poll<io::Result<()>> {
let start_len = dst.filled().len();
let me = self.get_mut();
let result = AsyncRead::poll_read(pin!(&mut me.read), cx, dst);
Expand All @@ -139,3 +149,37 @@ impl<ReadT: AsyncRead + Unpin> AsyncRead for UploadProgressReader<ReadT> {
result
}
}

impl<ReadT: futures::io::AsyncRead + Unpin> futures::io::AsyncRead for UploadProgressReader<ReadT> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
dst: &mut [u8],
) -> Poll<io::Result<usize>> {
let me = self.get_mut();
let size = ready!(futures::io::AsyncRead::poll_read(
pin!(&mut me.read),
cx,
dst
))?;
me.prog.update(size as u64);
Poll::Ready(Ok(size))
}
}

impl<ReadT: SeekableStream + Clone> SeekableStream for UploadProgressReader<ReadT> {
fn reset<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = AzureResult<()>> + Send + 'async_trait>>
where
Self: 'async_trait,
'life0: 'async_trait,
{
self.prog.reset();
self.read.reset()
}

fn len(&self) -> usize {
self.read.len()
}
}
3 changes: 2 additions & 1 deletion crates/maelstrom-github/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
//! <https://github.com/actions/toolkit/blob/main/packages/artifact/src/generated/results/api/v1/artifact.ts>
pub use azure_core::{
error::Result as AzureResult,
tokio::fs::{FileStream, FileStreamBuilder},
Body,
Body, SeekableStream,
};

use anyhow::{anyhow, bail, Result};
Expand Down

0 comments on commit 02f438d

Please sign in to comment.