From 042bb8110016cbe0f21953acf877f9cbdb27e483 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Mon, 9 Sep 2024 20:48:02 +0900 Subject: [PATCH 1/9] feat: purgatory - beginning --- crates/worker/src/commands/compile.rs | 13 +++- crates/worker/src/dynamodb_client.rs | 8 +-- crates/worker/src/errors.rs | 4 ++ crates/worker/src/purgatory.rs | 84 ++++++++++++++++++++++--- crates/worker/src/s3_client.rs | 24 +++++++ crates/worker/src/sqs_clients/client.rs | 4 +- crates/worker/src/worker.rs | 4 +- 7 files changed, 122 insertions(+), 19 deletions(-) diff --git a/crates/worker/src/commands/compile.rs b/crates/worker/src/commands/compile.rs index 9375c31..4038900 100644 --- a/crates/worker/src/commands/compile.rs +++ b/crates/worker/src/commands/compile.rs @@ -42,7 +42,7 @@ pub async fn compile( db_client: &DynamoDBClient, s3_client: &S3Client, ) -> Result<(), CompilationError> { - let item = db_client.get_item(request.id.clone()).await?; + let item = db_client.get_item(&request.id).await?; let item: Item = match item { Some(item) => item, None => { @@ -65,7 +65,7 @@ pub async fn compile( // Update status to Compiling try_set_compiling_status(db_client, &request.id).await?; - match do_compile( + let result = match do_compile( &request.id, CompilationInput { config: request.config, @@ -85,7 +85,16 @@ pub async fn compile( ) .await?), Err(err) => Err(err), + }; + + // Clean S3 objects + // TODO: when making generic - consider if compiler shall manage clean-up here. + match &result { + Ok(_) => s3_client.delete_dir(&dir).await?, + Err(err) if !err.recoverable() => s3_client.delete_dir(&dir).await?, + _ => {} } + result } async fn try_set_compiling_status( diff --git a/crates/worker/src/dynamodb_client.rs b/crates/worker/src/dynamodb_client.rs index c5df389..17c0065 100644 --- a/crates/worker/src/dynamodb_client.rs +++ b/crates/worker/src/dynamodb_client.rs @@ -17,23 +17,23 @@ impl DynamoDBClient { } } - pub async fn delete_item(&self, id: String) -> Result<(), DBError> { + pub async fn delete_item(&self, id: &str) -> Result<(), DBError> { self.client .delete_item() .table_name(self.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(id)) + .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) .send() .await?; Ok(()) } - pub async fn get_item(&self, id: String) -> Result, DBError> { + pub async fn get_item(&self, id: &str) -> Result, DBError> { let result = self .client .get_item() .table_name(self.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(id)) + .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) .send() .await?; diff --git a/crates/worker/src/errors.rs b/crates/worker/src/errors.rs index 96e9ab0..e750e5f 100644 --- a/crates/worker/src/errors.rs +++ b/crates/worker/src/errors.rs @@ -2,6 +2,7 @@ use aws_sdk_dynamodb::config::http::HttpResponse; use aws_sdk_dynamodb::operation::delete_item::DeleteItemError; use aws_sdk_dynamodb::operation::get_item::GetItemError; use aws_sdk_dynamodb::operation::update_item::UpdateItemError; +use aws_sdk_s3::operation::delete_object::DeleteObjectError; use aws_sdk_s3::operation::get_object::GetObjectError; use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error; use aws_sdk_s3::operation::put_object::PutObjectError; @@ -25,6 +26,7 @@ pub(crate) type DBUpdateError = SdkError; pub(crate) type S3ListObjectsError = SdkError; pub(crate) type S3GetObjectError = SdkError; pub(crate) type S3PutObjectError = SdkError; +pub(crate) type S3DeleteObjectError = SdkError; #[derive(thiserror::Error, Debug)] pub enum SqsError { @@ -57,6 +59,8 @@ pub enum S3Error { #[error(transparent)] PutObjectError(#[from] S3PutObjectError), #[error(transparent)] + DeleteObjectError(#[from] S3DeleteObjectError), + #[error(transparent)] IoError(#[from] std::io::Error), #[error(transparent)] ByteStreamError(#[from] aws_smithy_types::byte_stream::error::Error), diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index 0b2e760..71a9974 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -2,11 +2,18 @@ use std::collections::HashMap; use std::marker::PhantomData; use std::ptr::NonNull; use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; use tokio::{sync::Mutex, task::JoinHandle}; +use tracing::warn; use types::item::Status; -use types::SqsMessage; +use types::{SqsMessage, ARTIFACTS_FOLDER}; use uuid::Uuid; +use crate::dynamodb_client::DynamoDBClient; +use crate::s3_client::S3Client; +use crate::utils::lib::timestamp; + pub type Timestamp = u64; #[derive(Clone)] @@ -15,10 +22,10 @@ pub struct Purgatory { } impl Purgatory { - pub fn new(state: State) -> Self { + pub fn new(state: State, db_client: DynamoDBClient, s3_client: S3Client) -> Self { let mut handle = NonNull::dangling(); let this = Self { - inner: Arc::new(Mutex::new(Inner::new(handle, state))), + inner: Arc::new(Mutex::new(Inner::new(handle, state, db_client, s3_client))), }; let initialized_handle = tokio::spawn(this.clone().deamon()); @@ -30,7 +37,7 @@ impl Purgatory { } pub async fn purge(&mut self) { - self.inner.lock().await.purge() + self.inner.lock().await.purge().await; } pub async fn add_task(&mut self, _: &SqsMessage) { @@ -41,12 +48,21 @@ impl Purgatory { pub async fn update_task(&mut self) {} async fn deamon(self) { - todo!() + const PURGE_INTERVAL: Duration = Duration::from_secs(60); + + loop { + let mut inner = self.inner.lock().await; + inner.purge().await; + + sleep(PURGE_INTERVAL).await; + } } } struct Inner { state: State, + s3_client: S3Client, + db_client: DynamoDBClient, // No aliases possible since only we own the data handle: NonNull>, @@ -64,16 +80,60 @@ impl Drop for Inner { } impl Inner { - fn new(handle: NonNull>, state: State) -> Self { + fn new( + handle: NonNull>, + state: State, + db_client: DynamoDBClient, + s3_client: S3Client, + ) -> Self { Self { - handle, state, + s3_client, + db_client, + handle, _marker: PhantomData, } } - pub fn purge(&mut self) { - todo!() + pub async fn purge(&mut self) { + let now = timestamp(); + for (id, timestamp) in self.state.expiration_timestamps.iter() { + if *timestamp > now { + break; + } + + let status = if let Some(status) = self.state.task_status.get(id) { + status + } else { + warn!("Inconsistency! ID present vector but not in status map"); + continue; + }; + + match status { + Status::Pending => warn!("Item pending for too long!"), + Status::Compiling => warn!("Item compiling for too long!"), + Status::Ready { .. } => { + let dir = format!("{}/{}/", ARTIFACTS_FOLDER, id); + self.s3_client.delete_dir(&dir).await.unwrap(); // TODO: fix + self.db_client + .delete_item(id.to_string().as_str()) + .await + .unwrap(); + } + Status::Failed { .. } => { + let dir = format!("{}/{}/", ARTIFACTS_FOLDER, id); + self.s3_client.delete_dir(&dir).await; // TODO: fix + self.db_client + .delete_item(id.to_string().as_str()) + .await + .unwrap(); + } + } + } + + self.state + .expiration_timestamps + .retain(|(_, timestamp)| *timestamp > now); } // TODO: replace with Self::purge @@ -115,6 +175,10 @@ pub struct State { impl State { pub async fn load() -> State { - todo!() + // TODO: load state here from DB + Self { + expiration_timestamps: vec![], + task_status: HashMap::new(), + } } } diff --git a/crates/worker/src/s3_client.rs b/crates/worker/src/s3_client.rs index 43a3c76..798a8eb 100644 --- a/crates/worker/src/s3_client.rs +++ b/crates/worker/src/s3_client.rs @@ -103,6 +103,30 @@ impl S3Client { Ok(()) } + pub async fn delete_dir(&self, dir: &str) -> Result<(), S3Error> { + let objects = self.list_all_keys(dir).await?; + for object in objects { + let key = object.key().ok_or(S3Error::InvalidObjectError)?; + self.delete_object(key).await?; + } + + // TODO: check that works + let result = self.delete_object(dir).await; + result?; + Ok(()) + } + + pub async fn delete_object(&self, key: &str) -> Result<(), S3Error> { + let _ = self + .client + .delete_object() + .bucket(self.bucket_name.clone()) + .key(key) + .send() + .await?; + Ok(()) + } + pub async fn list_all_keys(&self, dir: &str) -> Result, S3Error> { let mut objects = Vec::new(); let mut continuation_token: Option = None; diff --git a/crates/worker/src/sqs_clients/client.rs b/crates/worker/src/sqs_clients/client.rs index c979c31..198080d 100644 --- a/crates/worker/src/sqs_clients/client.rs +++ b/crates/worker/src/sqs_clients/client.rs @@ -23,7 +23,9 @@ macro_rules! match_result { } if let Some(other) = dispatch_err.as_other() { return match other { - aws_config::retry::ErrorKind::ClientError => Err($err_type::DispatchFailure(dispatch_err)), + aws_config::retry::ErrorKind::ClientError => { + Err($err_type::DispatchFailure(dispatch_err)) + } _ => Ok(None), }; } diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index 4146c98..c51b316 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -63,7 +63,7 @@ impl RunningEngine { state: State, num_workers: usize, ) -> Self { - let purgatory = Purgatory::new(state); + let purgatory = Purgatory::new(state, db_client.clone(), s3_client.clone()); let mut worker_threads = Vec::with_capacity(num_workers); for _ in 0..num_workers { @@ -127,7 +127,7 @@ impl RunningEngine { purgatory.add_task(&sqs_message).await; match sqs_message { SqsMessage::Compile { request } => { - let result = compile(request, &db_client, &s3_client).await; // TODO: + let result = compile(request, &db_client, &s3_client).await; match result { Ok(()) => purgatory.update_task().await, Err(err) => { From 3215cc3874ce87e3af0e309ae2fa34c88db6607a Mon Sep 17 00:00:00 2001 From: taco-paco Date: Mon, 9 Sep 2024 20:51:20 +0900 Subject: [PATCH 2/9] feat: renaming in status --- crates/types/src/item.rs | 10 +++++----- crates/worker/src/commands/compile.rs | 2 +- crates/worker/src/purgatory.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/types/src/item.rs b/crates/types/src/item.rs index 4d47453..3430d18 100644 --- a/crates/types/src/item.rs +++ b/crates/types/src/item.rs @@ -10,7 +10,7 @@ pub type AttributeMap = HashMap; pub enum Status { // TODO: add FilesUploaded(?) Pending, - Compiling, + InProgress, Ready { presigned_urls: Vec, }, @@ -27,7 +27,7 @@ impl fmt::Display for Status { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { Status::Pending => write!(f, "Pending"), - Status::Compiling => write!(f, "Compiling"), + Status::InProgress => write!(f, "Compiling"), Status::Ready { .. } => write!(f, "Ready"), Status::Failed(msg) => write!(f, "Failed: {}", msg), } @@ -38,7 +38,7 @@ impl From<&Status> for u32 { fn from(value: &Status) -> Self { match value { Status::Pending => 0, - Status::Compiling => 1, + Status::InProgress => 1, Status::Ready { .. } => 2, Status::Failed(_) => 3, } @@ -54,7 +54,7 @@ impl From for u32 { impl From for HashMap { fn from(value: Status) -> Self { match value.clone() { - Status::Pending | Status::Compiling => HashMap::from([( + Status::Pending | Status::InProgress => HashMap::from([( Status::attribute_name().into(), AttributeValue::N(u32::from(&value).to_string()), )]), @@ -128,7 +128,7 @@ impl TryFrom<&AttributeMap> for Status { .parse::()?; let status = match status { 0 => Status::Pending, - 1 => Status::Compiling, + 1 => Status::InProgress, 2 => { let data = value.get(Item::data_attribute_name()).ok_or(ItemError::FormatError)?; let data = data.as_ss().map_err(|_| ItemError::FormatError)?; diff --git a/crates/worker/src/commands/compile.rs b/crates/worker/src/commands/compile.rs index 4038900..4fd2686 100644 --- a/crates/worker/src/commands/compile.rs +++ b/crates/worker/src/commands/compile.rs @@ -111,7 +111,7 @@ async fn try_set_compiling_status( .expression_attribute_names("#status", Status::attribute_name()) .expression_attribute_values( ":newStatus", - AttributeValue::N(u32::from(Status::Compiling).to_string()), + AttributeValue::N(u32::from(Status::InProgress).to_string()), ) .expression_attribute_values( ":currentStatus", diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index 71a9974..b440093 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -111,7 +111,7 @@ impl Inner { match status { Status::Pending => warn!("Item pending for too long!"), - Status::Compiling => warn!("Item compiling for too long!"), + Status::InProgress => warn!("Item compiling for too long!"), Status::Ready { .. } => { let dir = format!("{}/{}/", ARTIFACTS_FOLDER, id); self.s3_client.delete_dir(&dir).await.unwrap(); // TODO: fix From 578a31e1adc430a74a076e7641adfafa7c597013 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Tue, 10 Sep 2024 16:21:56 +0900 Subject: [PATCH 3/9] feat: separate preparation from compilation --- Cargo.toml | 2 +- crates/types/Cargo.toml | 3 + crates/types/src/item.rs | 96 +++++++----- crates/types/src/lib.rs | 14 +- crates/worker/Cargo.toml | 2 +- crates/worker/src/commands/compile.rs | 202 ++------------------------ crates/worker/src/commands/errors.rs | 36 +++++ crates/worker/src/commands/mod.rs | 2 + crates/worker/src/commands/utils.rs | 170 ++++++++++++++++++++++ crates/worker/src/dynamodb_client.rs | 4 +- crates/worker/src/errors.rs | 31 ---- crates/worker/src/purgatory.rs | 99 +++++-------- crates/worker/src/worker.rs | 72 +++++++-- 13 files changed, 385 insertions(+), 348 deletions(-) create mode 100644 crates/worker/src/commands/errors.rs create mode 100644 crates/worker/src/commands/utils.rs diff --git a/Cargo.toml b/Cargo.toml index 8f7df71..9f76fb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ aws-sdk-sqs = "1.39.0" aws-sdk-dynamodb = "1.42.0" tokio = {version = "1.39.3", features = ["macros"]} serde = "1.0.207" -serde_json = "1.0.124" +serde_json = "1.0.128" thiserror = "1.0.63" tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", default-features = false, features = ["fmt", "ansi"] } diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index d68de44..75eefee 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -8,3 +8,6 @@ aws-sdk-dynamodb = {workspace = true} serde = {workspace = true} thiserror = {worksapce = true} uuid = {workspace = true} + +[dev-dependencies] +serde_json = {workspace = true} \ No newline at end of file diff --git a/crates/types/src/item.rs b/crates/types/src/item.rs index 3430d18..cddeb42 100644 --- a/crates/types/src/item.rs +++ b/crates/types/src/item.rs @@ -1,20 +1,23 @@ use aws_sdk_dynamodb::types::AttributeValue; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; use std::fmt::Formatter; pub type AttributeMap = HashMap; -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TaskResult { + Success { presigned_urls: Vec }, + Failure(String), +} + +#[derive(Debug, Clone)] pub enum Status { // TODO: add FilesUploaded(?) Pending, InProgress, - Ready { - presigned_urls: Vec, - }, - Failed(String), + Done(TaskResult), } impl Status { @@ -28,8 +31,8 @@ impl fmt::Display for Status { match self { Status::Pending => write!(f, "Pending"), Status::InProgress => write!(f, "Compiling"), - Status::Ready { .. } => write!(f, "Ready"), - Status::Failed(msg) => write!(f, "Failed: {}", msg), + Status::Done(TaskResult::Success { .. }) => write!(f, "Success"), + Status::Done(TaskResult::Failure(msg)) => write!(f, "Failure: {}", msg), } } } @@ -39,8 +42,8 @@ impl From<&Status> for u32 { match value { Status::Pending => 0, Status::InProgress => 1, - Status::Ready { .. } => 2, - Status::Failed(_) => 3, + Status::Done(TaskResult::Success { .. }) => 2, + Status::Done(TaskResult::Failure(_)) => 3, } } } @@ -51,31 +54,37 @@ impl From for u32 { } } -impl From for HashMap { - fn from(value: Status) -> Self { - match value.clone() { - Status::Pending | Status::InProgress => HashMap::from([( - Status::attribute_name().into(), - AttributeValue::N(u32::from(&value).to_string()), +impl From for AttributeMap { + fn from(value: TaskResult) -> Self { + match value { + TaskResult::Success { presigned_urls } => HashMap::from([( + Item::data_attribute_name().into(), + AttributeValue::Ss(presigned_urls), + )]), + TaskResult::Failure(message) => HashMap::from([( + Item::data_attribute_name().into(), + AttributeValue::S(message), )]), - Status::Ready { presigned_urls } => HashMap::from([ - ( - Status::attribute_name().into(), - AttributeValue::N(u32::from(&value).to_string()), - ), - (Item::data_attribute_name().into(), AttributeValue::Ss(presigned_urls)), - ]), - Status::Failed(val) => HashMap::from([ - ( - Status::attribute_name().into(), - AttributeValue::N(u32::from(&value).to_string()), - ), - (Item::data_attribute_name().into(), AttributeValue::S(val)), - ]), } } } +impl From for AttributeMap { + fn from(value: Status) -> Self { + let mut map = HashMap::from([( + Status::attribute_name().into(), + AttributeValue::N(u32::from(&value).to_string()), + )]); + + // For `Done` variant, reuse the conversion logic from `TaskResult` + if let Status::Done(task_result) = value { + map.extend(AttributeMap::from(task_result)); + } + + map + } +} + #[derive(thiserror::Error, Debug)] pub enum ItemError { #[error("Invalid Item format")] @@ -111,7 +120,10 @@ impl Item { impl From for AttributeMap { fn from(value: Item) -> Self { - let mut item_map = HashMap::from([(Item::id_attribute_name().into(), AttributeValue::S(value.id))]); + let mut item_map = HashMap::from([( + Item::id_attribute_name().into(), + AttributeValue::S(value.id), + )]); item_map.extend(HashMap::from(value.status)); item_map @@ -121,7 +133,9 @@ impl From for AttributeMap { impl TryFrom<&AttributeMap> for Status { type Error = ItemError; fn try_from(value: &AttributeMap) -> Result { - let status = value.get(Status::attribute_name()).ok_or(ItemError::FormatError)?; + let status = value + .get(Status::attribute_name()) + .ok_or(ItemError::FormatError)?; let status: u32 = status .as_n() .map_err(|_| ItemError::FormatError)? @@ -130,18 +144,22 @@ impl TryFrom<&AttributeMap> for Status { 0 => Status::Pending, 1 => Status::InProgress, 2 => { - let data = value.get(Item::data_attribute_name()).ok_or(ItemError::FormatError)?; + let data = value + .get(Item::data_attribute_name()) + .ok_or(ItemError::FormatError)?; let data = data.as_ss().map_err(|_| ItemError::FormatError)?; - Status::Ready { + Status::Done(TaskResult::Success { presigned_urls: data.clone(), - } + }) } 3 => { - let data = value.get(Item::data_attribute_name()).ok_or(ItemError::FormatError)?; + let data = value + .get(Item::data_attribute_name()) + .ok_or(ItemError::FormatError)?; let data = data.as_s().map_err(|_| ItemError::FormatError)?; - Status::Failed(data.clone()) + Status::Done(TaskResult::Failure(data.clone())) } _ => return Err(ItemError::FormatError), }; @@ -153,7 +171,9 @@ impl TryFrom<&AttributeMap> for Status { impl TryFrom for Item { type Error = ItemError; fn try_from(value: AttributeMap) -> Result { - let id = value.get(Item::id_attribute_name()).ok_or(ItemError::FormatError)?; + let id = value + .get(Item::id_attribute_name()) + .ok_or(ItemError::FormatError)?; let id = id.as_s().map_err(|_| ItemError::FormatError)?; let status = (&value).try_into()?; diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index 61d3646..8fc687f 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -1,6 +1,7 @@ pub mod item; use serde::{Deserialize, Serialize}; +use uuid::Uuid; pub const ARTIFACTS_FOLDER: &str = "artifacts"; @@ -15,7 +16,7 @@ pub struct CompilationConfig { #[derive(Serialize, Deserialize, Clone, Debug)] pub struct CompilationRequest { - pub id: String, + pub id: Uuid, pub config: CompilationConfig, } @@ -30,7 +31,7 @@ pub struct VerifyConfig { #[derive(Serialize, Deserialize, Clone, Debug)] pub struct VerificationRequest { - pub id: String, + pub id: Uuid, pub config: VerifyConfig, } @@ -46,3 +47,12 @@ pub enum SqsMessage { request: VerificationRequest, }, } + +impl SqsMessage { + pub fn id(&self) -> Uuid { + match self { + SqsMessage::Compile {request} => request.id, + SqsMessage::Verify {request} => request.id, + } + } +} diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 890d7d4..6d9f994 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -25,4 +25,4 @@ lazy_static = "1.5.0" walkdir = "2.3.2" # Inner crates -types = {workspace = true} +types = { workspace = true } diff --git a/crates/worker/src/commands/compile.rs b/crates/worker/src/commands/compile.rs index 4fd2686..c1a3259 100644 --- a/crates/worker/src/commands/compile.rs +++ b/crates/worker/src/commands/compile.rs @@ -1,23 +1,16 @@ -use crate::commands::SPAWN_SEMAPHORE; -use aws_sdk_dynamodb::error::SdkError; -use aws_sdk_dynamodb::operation::update_item::UpdateItemError; -use aws_sdk_dynamodb::types::AttributeValue; -use aws_sdk_s3::presigning::PresigningConfig; -use std::path::Path; +// TODO: extract in class + +use std::path::{Path, PathBuf}; use std::process::Stdio; -use std::time::Duration; -use tracing::warn; use tracing::{error, info}; -use types::item::{Item, Status}; -use types::{CompilationConfig, CompilationRequest, ARTIFACTS_FOLDER}; +use types::{CompilationConfig}; -use crate::dynamodb_client::DynamoDBClient; -use crate::errors::{CompilationError, DBError}; -use crate::s3_client::S3Client; +use crate::commands::errors::CompilationError; +use crate::commands::SPAWN_SEMAPHORE; use crate::utils::cleaner::AutoCleanUp; use crate::utils::hardhat_config::HardhatConfigBuilder; use crate::utils::lib::{ - initialize_files, list_files_in_directory, DEFAULT_SOLIDITY_VERSION, SOL_ROOT, ZKSOLC_VERSIONS, + initialize_files, list_files_in_directory, DEFAULT_SOLIDITY_VERSION, }; pub struct CompilationFile { @@ -26,6 +19,7 @@ pub struct CompilationFile { } pub struct CompilationInput { + pub workspace_path: PathBuf, pub config: CompilationConfig, // legacy. files really pub contracts: Vec, @@ -36,188 +30,11 @@ pub struct CompilationArtifact { pub file_content: Vec, pub is_contract: bool, } - -pub async fn compile( - request: CompilationRequest, - db_client: &DynamoDBClient, - s3_client: &S3Client, -) -> Result<(), CompilationError> { - let item = db_client.get_item(&request.id).await?; - let item: Item = match item { - Some(item) => item, - None => { - error!("No item id: {}", request.id); - return Err(CompilationError::NoDBItemError(request.id)); - } - }; - - match item.status { - Status::Pending => {} - status => { - warn!("Item already processing: {}", status); - return Err(CompilationError::UnexpectedStatusError(status.to_string())); - } - } - - let dir = format!("{}/", request.id); - let files = s3_client.extract_files(&dir).await?; - - // Update status to Compiling - try_set_compiling_status(db_client, &request.id).await?; - - let result = match do_compile( - &request.id, - CompilationInput { - config: request.config, - contracts: files, - }, - ) - .await - { - Ok(value) => Ok(on_compilation_success(&request.id, db_client, s3_client, value).await?), - Err(CompilationError::CompilationFailureError(value)) => { - Ok(on_compilation_failed(&request.id, db_client, value).await?) - } - Err(CompilationError::VersionNotSupported(value)) => Ok(on_compilation_failed( - &request.id, - &db_client, - format!("Unsupported compiler version: {}", value), - ) - .await?), - Err(err) => Err(err), - }; - - // Clean S3 objects - // TODO: when making generic - consider if compiler shall manage clean-up here. - match &result { - Ok(_) => s3_client.delete_dir(&dir).await?, - Err(err) if !err.recoverable() => s3_client.delete_dir(&dir).await?, - _ => {} - } - result -} - -async fn try_set_compiling_status( - db_client: &DynamoDBClient, - key: &str, -) -> Result<(), CompilationError> { - let db_update_result = db_client - .client - .update_item() - .table_name(db_client.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(key.to_string())) - .update_expression("SET #status = :newStatus") - .condition_expression("#status = :currentStatus") - .expression_attribute_names("#status", Status::attribute_name()) - .expression_attribute_values( - ":newStatus", - AttributeValue::N(u32::from(Status::InProgress).to_string()), - ) - .expression_attribute_values( - ":currentStatus", - AttributeValue::N(u32::from(Status::Pending).to_string()), - ) - .send() - .await; - - match db_update_result { - Ok(_) => Ok(()), - Err(SdkError::ServiceError(err)) => match err.err() { - UpdateItemError::ConditionalCheckFailedException(_) => { - error!("Conditional check not met"); - Err(CompilationError::UnexpectedStatusError( - "Concurrent status change from another instance".into(), - )) - } - _ => Err(DBError::from(SdkError::ServiceError(err)).into()), - }, - Err(err) => Err(DBError::from(err).into()), - } -} - -pub async fn on_compilation_success( - id: &str, - db_client: &DynamoDBClient, - s3_client: &S3Client, - compilation_artifacts: Vec, -) -> Result<(), CompilationError> { - const DOWNLOAD_URL_EXPIRATION: Duration = Duration::from_secs(5 * 60 * 60); - - let mut presigned_urls = Vec::with_capacity(compilation_artifacts.len()); - for el in compilation_artifacts { - let file_key = format!("{}/{}/{}", ARTIFACTS_FOLDER, id, el.file_name); - s3_client.put_object(&file_key, el.file_content).await?; - - let expires_in = PresigningConfig::expires_in(DOWNLOAD_URL_EXPIRATION).unwrap(); - let presigned_request = s3_client - .get_object_presigned(&file_key, expires_in) - .await?; - - presigned_urls.push(presigned_request.uri().to_string()); - } - - if presigned_urls.is_empty() { - // TODO: AttributeValue::Ss doesn't allow empty arrays. Decide what to do. for now - presigned_urls.push("".to_string()); - } - - db_client - .client - .update_item() - .table_name(db_client.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) - .update_expression("SET #status = :newStatus, #data = :data") - .expression_attribute_names("#status", Status::attribute_name()) - .expression_attribute_names("#data", Item::data_attribute_name()) - .expression_attribute_values( - ":newStatus", - AttributeValue::N(2.to_string()), // Ready - ) - .expression_attribute_values(":data", AttributeValue::Ss(presigned_urls)) - .send() - .await - .map_err(DBError::from)?; - - Ok(()) -} - -pub async fn on_compilation_failed( - id: &str, - db_client: &DynamoDBClient, - message: String, -) -> Result<(), DBError> { - db_client - .client - .update_item() - .table_name(db_client.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) - .update_expression("SET #status = :newStatus, #data = :data") - .expression_attribute_names("#status", Status::attribute_name()) - .expression_attribute_names("#data", Item::data_attribute_name()) - .expression_attribute_values( - ":newStatus", - AttributeValue::N(3.to_string()), // Failed - ) - .expression_attribute_values(":data", AttributeValue::S(message)) - .send() - .await?; - - Ok(()) -} - pub async fn do_compile( - namespace: &str, compilation_input: CompilationInput, ) -> Result, CompilationError> { - let zksolc_version = compilation_input.config.version; - - // check if the version is supported - if !ZKSOLC_VERSIONS.contains(&zksolc_version.as_str()) { - return Err(CompilationError::VersionNotSupported(zksolc_version)); - } - // root directory for the contracts - let workspace_path = Path::new(SOL_ROOT).join(namespace); + let workspace_path = compilation_input.workspace_path; // root directory for the artifacts let artifacts_path = workspace_path.join("artifacts-zk"); // root directory for user files (hardhat config, etc) @@ -234,6 +51,7 @@ pub async fn do_compile( }; // write the hardhat config file + let zksolc_version = compilation_input.config.version; let mut hardhat_config_builder = HardhatConfigBuilder::new(); hardhat_config_builder .zksolc_version(&zksolc_version) diff --git a/crates/worker/src/commands/errors.rs b/crates/worker/src/commands/errors.rs new file mode 100644 index 0000000..5eea89d --- /dev/null +++ b/crates/worker/src/commands/errors.rs @@ -0,0 +1,36 @@ +use crate::errors::{DBError, S3Error}; + +#[derive(thiserror::Error, Debug)] +pub enum CompilationError { + #[error("IoError: {0}")] + IoError(#[from] std::io::Error), + #[error("Failed to compile: {0}")] + CompilationFailureError(String), +} + +#[derive(thiserror::Error, Debug)] +pub enum PreparationError { + #[error("DBError: {0}")] + DBError(#[from] DBError), + #[error("S3Error: {0}")] + S3Error(#[from] S3Error), + #[error("Item isn't id DB: {0}")] + NoDBItemError(String), + #[error("Unexpected status: {0}")] + UnexpectedStatusError(String), + #[error("Unsupported version: {0}")] + VersionNotSupported(String), +} + +// impl CompilationError { +// pub fn recoverable(&self) -> bool { +// match self { +// CompilationError::DBError(_) | CompilationError::S3Error(_) => true, +// CompilationError::NoDBItemError(_) +// | CompilationError::UnexpectedStatusError(_) +// | CompilationError::IoError(_) +// | CompilationError::VersionNotSupported(_) +// | CompilationError::CompilationFailureError(_) => false, +// } +// } +// } diff --git a/crates/worker/src/commands/mod.rs b/crates/worker/src/commands/mod.rs index b540cd7..2bf6f39 100644 --- a/crates/worker/src/commands/mod.rs +++ b/crates/worker/src/commands/mod.rs @@ -2,6 +2,8 @@ use lazy_static::lazy_static; use tokio::sync::Semaphore; pub mod compile; +pub mod errors; +pub mod utils; pub mod verify; const PROCESS_SPAWN_LIMIT: usize = 8; diff --git a/crates/worker/src/commands/utils.rs b/crates/worker/src/commands/utils.rs new file mode 100644 index 0000000..3a4e6c4 --- /dev/null +++ b/crates/worker/src/commands/utils.rs @@ -0,0 +1,170 @@ +// TODO: probably extract preparations to a class + +use aws_sdk_dynamodb::error::SdkError; +use aws_sdk_dynamodb::operation::update_item::UpdateItemError; +use aws_sdk_dynamodb::types::AttributeValue; +use std::path::Path; +use std::time::Duration; +use aws_sdk_s3::presigning::PresigningConfig; +use tracing::{error, warn}; +use types::item::{Item, Status, TaskResult}; +use types::{ARTIFACTS_FOLDER, CompilationRequest}; +use uuid::Uuid; + +use crate::commands::compile::{ + CompilationArtifact, CompilationInput, +}; +use crate::commands::errors::{PreparationError}; +use crate::dynamodb_client::DynamoDBClient; +use crate::errors::DBError; +use crate::s3_client::S3Client; +use crate::utils::lib::{SOL_ROOT, ZKSOLC_VERSIONS}; + +async fn try_set_compiling_status( + db_client: &DynamoDBClient, + key: Uuid, +) -> Result<(), PreparationError> { + let db_update_result = db_client + .client + .update_item() + .table_name(db_client.table_name.clone()) + .key(Item::primary_key_name(), AttributeValue::S(key.to_string())) + .update_expression("SET #status = :newStatus") + .condition_expression("#status = :currentStatus") + .expression_attribute_names("#status", Status::attribute_name()) + .expression_attribute_values( + ":newStatus", + AttributeValue::N(u32::from(Status::InProgress).to_string()), + ) + .expression_attribute_values( + ":currentStatus", + AttributeValue::N(u32::from(Status::Pending).to_string()), + ) + .send() + .await; + + match db_update_result { + Ok(_) => Ok(()), + Err(SdkError::ServiceError(err)) => match err.err() { + UpdateItemError::ConditionalCheckFailedException(_) => { + error!("Conditional check not met"); + Err(PreparationError::UnexpectedStatusError( + "Concurrent status change from another instance".into(), + )) + } + _ => Err(DBError::from(SdkError::ServiceError(err)).into()), + }, + Err(err) => Err(DBError::from(err).into()), + } +} + +pub(crate) async fn prepare_compile_input( + request: &CompilationRequest, + db_client: &DynamoDBClient, + s3_client: &S3Client, +) -> Result { + let zksolc_version = request.config.version.as_str(); + if !ZKSOLC_VERSIONS.contains(&zksolc_version) { + return Err(PreparationError::VersionNotSupported( + zksolc_version.to_string(), + )); + } + + let item = db_client.get_item(request.id.to_string().as_str()).await?; + let item: Item = match item { + Some(item) => item, + None => { + error!("No item id: {}", request.id); + return Err(PreparationError::NoDBItemError(request.id.to_string())); + } + }; + + match item.status { + Status::Pending => {} + status => { + warn!("Item already processing: {}", status); + return Err(PreparationError::UnexpectedStatusError(status.to_string())); + } + } + + let dir = format!("{}/", request.id); + let files = s3_client.extract_files(&dir).await?; + + // Update status to Compiling + try_set_compiling_status(db_client, request.id).await?; + + Ok(CompilationInput { + workspace_path: Path::new(SOL_ROOT).join(request.id.to_string().as_str()), + config: request.config.clone(), + contracts: files, + }) +} +pub async fn on_compilation_success( + id: Uuid, + db_client: &DynamoDBClient, + s3_client: &S3Client, + compilation_artifacts: Vec, +) -> Result { + const DOWNLOAD_URL_EXPIRATION: Duration = Duration::from_secs(5 * 60 * 60); + + let mut presigned_urls = Vec::with_capacity(compilation_artifacts.len()); + for el in compilation_artifacts { + let file_key = format!("{}/{}/{}", ARTIFACTS_FOLDER, id, el.file_name); + s3_client.put_object(&file_key, el.file_content).await?; + + let expires_in = PresigningConfig::expires_in(DOWNLOAD_URL_EXPIRATION).unwrap(); + let presigned_request = s3_client + .get_object_presigned(&file_key, expires_in) + .await?; + + presigned_urls.push(presigned_request.uri().to_string()); + } + + if presigned_urls.is_empty() { + // TODO: AttributeValue::Ss doesn't allow empty arrays. Decide what to do. for now + presigned_urls.push("".to_string()); + } + + db_client + .client + .update_item() + .table_name(db_client.table_name.clone()) + .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) + .update_expression("SET #status = :newStatus, #data = :data") + .expression_attribute_names("#status", Status::attribute_name()) + .expression_attribute_names("#data", Item::data_attribute_name()) + .expression_attribute_values( + ":newStatus", + AttributeValue::N(2.to_string()), // Ready + ) + .expression_attribute_values(":data", AttributeValue::Ss(presigned_urls.clone())) + .send() + .await + .map_err(DBError::from)?; + + Ok(TaskResult::Success { presigned_urls }) +} + +pub async fn on_compilation_failed( + id: Uuid, + db_client: &DynamoDBClient, + message: String, +) -> Result { + db_client + .client + .update_item() + .table_name(db_client.table_name.clone()) + .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) + .update_expression("SET #status = :newStatus, #data = :data") + .expression_attribute_names("#status", Status::attribute_name()) + .expression_attribute_names("#data", Item::data_attribute_name()) + .expression_attribute_values( + ":newStatus", + AttributeValue::N(3.to_string()), // Failed + ) + .expression_attribute_values(":data", AttributeValue::S(message.clone())) + .send() + .await?; + + Ok(TaskResult::Failure(message)) +} diff --git a/crates/worker/src/dynamodb_client.rs b/crates/worker/src/dynamodb_client.rs index 17c0065..64a2468 100644 --- a/crates/worker/src/dynamodb_client.rs +++ b/crates/worker/src/dynamodb_client.rs @@ -28,12 +28,12 @@ impl DynamoDBClient { Ok(()) } - pub async fn get_item(&self, id: &str) -> Result, DBError> { + pub async fn get_item(&self, key: &str) -> Result, DBError> { let result = self .client .get_item() .table_name(self.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) + .key(Item::primary_key_name(), AttributeValue::S(key.to_string())) .send() .await?; diff --git a/crates/worker/src/errors.rs b/crates/worker/src/errors.rs index e750e5f..d5b96ae 100644 --- a/crates/worker/src/errors.rs +++ b/crates/worker/src/errors.rs @@ -66,37 +66,6 @@ pub enum S3Error { ByteStreamError(#[from] aws_smithy_types::byte_stream::error::Error), } -#[derive(thiserror::Error, Debug)] -pub enum CompilationError { - #[error("DBError: {0}")] - DBError(#[from] DBError), - #[error("S3Error: {0}")] - S3Error(#[from] S3Error), - #[error("Item isn't id DB: {0}")] - NoDBItemError(String), - #[error("Unexpected status: {0}")] - UnexpectedStatusError(String), - #[error("Unsupported version: {0}")] - VersionNotSupported(String), - #[error("IoError: {0}")] - IoError(#[from] std::io::Error), - #[error("Failed to compile: {0}")] - CompilationFailureError(String), -} - -impl CompilationError { - pub fn recoverable(&self) -> bool { - match self { - CompilationError::DBError(_) | CompilationError::S3Error(_) => true, - CompilationError::NoDBItemError(_) - | CompilationError::UnexpectedStatusError(_) - | CompilationError::IoError(_) - | CompilationError::VersionNotSupported(_) - | CompilationError::CompilationFailureError(_) => false, - } - } -} - #[derive(thiserror::Error, Debug)] pub enum Error { #[error(transparent)] diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index b440093..e0343ce 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -6,8 +6,8 @@ use std::time::Duration; use tokio::time::sleep; use tokio::{sync::Mutex, task::JoinHandle}; use tracing::warn; -use types::item::Status; -use types::{SqsMessage, ARTIFACTS_FOLDER}; +use types::item::{Status, TaskResult}; +use types::ARTIFACTS_FOLDER; use uuid::Uuid; use crate::dynamodb_client::DynamoDBClient; @@ -40,25 +40,39 @@ impl Purgatory { self.inner.lock().await.purge().await; } - pub async fn add_task(&mut self, _: &SqsMessage) { - todo!() + pub async fn add_task(&mut self, id: Uuid) { + self.inner.lock().await.add_task(id); } - // TODO: args: status, id - pub async fn update_task(&mut self) {} + pub async fn update_task(&mut self, id: Uuid, result: TaskResult) { + self.inner.lock().await.update_task(id, result); + } - async fn deamon(self) { + async fn deamon(mut self) { const PURGE_INTERVAL: Duration = Duration::from_secs(60); loop { - let mut inner = self.inner.lock().await; - inner.purge().await; - + self.purge().await; sleep(PURGE_INTERVAL).await; } } } +pub struct State { + expiration_timestamps: Vec<(Uuid, Timestamp)>, + task_status: HashMap, +} + +impl State { + pub async fn load() -> State { + // TODO: load state here from DB + Self { + expiration_timestamps: vec![], + task_status: HashMap::new(), + } + } +} + struct Inner { state: State, s3_client: S3Client, @@ -95,6 +109,14 @@ impl Inner { } } + fn add_task(&mut self, id: Uuid) { + self.state.task_status.insert(id, Status::InProgress); + } + + fn update_task(&mut self, id: Uuid, result: TaskResult) { + self.state.task_status.insert(id, Status::Done(result)); + } + pub async fn purge(&mut self) { let now = timestamp(); for (id, timestamp) in self.state.expiration_timestamps.iter() { @@ -110,9 +132,8 @@ impl Inner { }; match status { - Status::Pending => warn!("Item pending for too long!"), Status::InProgress => warn!("Item compiling for too long!"), - Status::Ready { .. } => { + Status::Done(_) | Status::Pending => { let dir = format!("{}/{}/", ARTIFACTS_FOLDER, id); self.s3_client.delete_dir(&dir).await.unwrap(); // TODO: fix self.db_client @@ -120,14 +141,6 @@ impl Inner { .await .unwrap(); } - Status::Failed { .. } => { - let dir = format!("{}/{}/", ARTIFACTS_FOLDER, id); - self.s3_client.delete_dir(&dir).await; // TODO: fix - self.db_client - .delete_item(id.to_string().as_str()) - .await - .unwrap(); - } } } @@ -135,50 +148,4 @@ impl Inner { .expiration_timestamps .retain(|(_, timestamp)| *timestamp > now); } - - // TODO: replace with Self::purge - // pub async fn supervisor( - // db_client: DynamoDBClient, - // expiration_timestamps: Arc>>, - // ) { - // loop { - // let now = timestamp(); - // - // let to_delete = { - // let mut to_delete = vec![]; - // let mut expiration_timestamps = expiration_timestamps.lock().await; - // expiration_timestamps.retain(|&(uuid, expiration)| { - // if expiration < now { - // to_delete.push(uuid); - // false - // } else { - // true - // } - // }); - // - // to_delete - // }; - // - // for uuid in to_delete { - // db_client.delete_item(uuid.to_string()).await; - // } - // - // sleep(Duration::from_millis(2000)).await; - // } - // } -} - -pub struct State { - expiration_timestamps: Vec<(Uuid, Timestamp)>, - task_status: HashMap, -} - -impl State { - pub async fn load() -> State { - // TODO: load state here from DB - Self { - expiration_timestamps: vec![], - task_status: HashMap::new(), - } - } } diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index c51b316..2523172 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -1,10 +1,12 @@ use std::num::NonZeroUsize; use std::time::Duration; use tokio::task::JoinHandle; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use types::SqsMessage; -use crate::commands::compile::compile; +use crate::commands::compile::{ do_compile}; +use crate::commands::errors::PreparationError; +use crate::commands::utils::{on_compilation_failed, on_compilation_success, prepare_compile_input}; use crate::dynamodb_client::DynamoDBClient; use crate::purgatory::{Purgatory, State}; use crate::s3_client::S3Client; @@ -124,23 +126,63 @@ impl RunningEngine { } }; - purgatory.add_task(&sqs_message).await; + let id = sqs_message.id(); + purgatory.add_task(id).await; + match sqs_message { SqsMessage::Compile { request } => { - let result = compile(request, &db_client, &s3_client).await; - match result { - Ok(()) => purgatory.update_task().await, - Err(err) => { - if err.recoverable() { - warn!("recoverable error after compilation: {}", err); - continue; - } else { - // delete from SQS - warn!("unrecoverable error after compilation: {}", err); - purgatory.update_task().await; + let compilation_input = match prepare_compile_input(&request, &db_client, &s3_client).await { + Ok(value) => value, + Err(PreparationError::NoDBItemError(err)) => { + // Delete the message in this case. something weird. + // No need to cleanup s3 + error!("{}", PreparationError::NoDBItemError(err)); + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { + warn!("{}", err); } + continue; } - } + Err(PreparationError::UnexpectedStatusError(err)) => { + // Probably some other instance executing this at the same time. + // For sake of safety still try delete it. Doesn't matter if succeed + // No need to cleanup s3 + info!("{}", PreparationError::UnexpectedStatusError(err)); + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { + warn!("{}", err); + } + + continue; + }, + Err(PreparationError::VersionNotSupported(err)) => { + let dir = format!("{}/", id); + s3_client.delete_dir(&dir).await.unwrap(); // TODO: do those retriable + + let task_result = on_compilation_failed(id, &db_client, PreparationError::VersionNotSupported(err).to_string()).await.unwrap(); + purgatory.update_task(id, task_result).await; // TODO: actually don't need anything + + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { + warn!("{}", err); + } + continue; + } + Err(PreparationError::S3Error(err) ) => { + warn!("S3Error during preparation - ignoring. {}", err); + continue + }, + Err(PreparationError::DBError(err)) => { + warn!("DBError during preparation - ignoring. {}", err); + continue; + } + }; + + let task_result = match do_compile(compilation_input).await { + Ok(value) => on_compilation_success(id, &db_client, &s3_client, value).await.unwrap(), // TODO: unwraps + Err(err) => on_compilation_failed(id, &db_client, err.to_string()).await.unwrap(), + }; + purgatory.update_task(id, task_result).await; + + let dir = format!("{}/", id); + s3_client.delete_dir(&dir).await.unwrap(); } SqsMessage::Verify { request } => {} // TODO; } From 37a6ac3b68bad6ebd53d6ab1e1ad65313ff13572 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Tue, 10 Sep 2024 17:15:21 +0900 Subject: [PATCH 4/9] refactor: move clients into clients folder --- crates/worker/src/clients.rs | 4 + .../src/{ => clients}/dynamodb_client.rs | 3 +- crates/worker/src/clients/errors.rs | 67 +++++++++++ crates/worker/src/{ => clients}/s3_client.rs | 5 +- .../src/{ => clients}/sqs_clients/client.rs | 3 +- .../src/{ => clients}/sqs_clients/mod.rs | 0 .../src/{ => clients}/sqs_clients/wrapper.rs | 5 +- crates/worker/src/commands/compile.rs | 6 +- crates/worker/src/commands/errors.rs | 2 +- crates/worker/src/commands/utils.rs | 16 ++- crates/worker/src/errors.rs | 78 +------------ crates/worker/src/main.rs | 10 +- crates/worker/src/purgatory.rs | 13 ++- crates/worker/src/sqs_listener.rs | 4 +- crates/worker/src/worker.rs | 110 ++++++++++-------- 15 files changed, 172 insertions(+), 154 deletions(-) create mode 100644 crates/worker/src/clients.rs rename crates/worker/src/{ => clients}/dynamodb_client.rs (96%) create mode 100644 crates/worker/src/clients/errors.rs rename crates/worker/src/{ => clients}/s3_client.rs (99%) rename crates/worker/src/{ => clients}/sqs_clients/client.rs (97%) rename crates/worker/src/{ => clients}/sqs_clients/mod.rs (100%) rename crates/worker/src/{ => clients}/sqs_clients/wrapper.rs (98%) diff --git a/crates/worker/src/clients.rs b/crates/worker/src/clients.rs new file mode 100644 index 0000000..e642bf5 --- /dev/null +++ b/crates/worker/src/clients.rs @@ -0,0 +1,4 @@ +pub mod dynamodb_client; +pub mod errors; +pub mod s3_client; +pub mod sqs_clients; diff --git a/crates/worker/src/dynamodb_client.rs b/crates/worker/src/clients/dynamodb_client.rs similarity index 96% rename from crates/worker/src/dynamodb_client.rs rename to crates/worker/src/clients/dynamodb_client.rs index 64a2468..434aa8f 100644 --- a/crates/worker/src/dynamodb_client.rs +++ b/crates/worker/src/clients/dynamodb_client.rs @@ -1,8 +1,9 @@ -use crate::errors::DBError; use aws_sdk_dynamodb::types::AttributeValue; use aws_sdk_dynamodb::Client; use types::item::Item; +use crate::clients::errors::DBError; + #[derive(Clone)] pub struct DynamoDBClient { pub client: Client, diff --git a/crates/worker/src/clients/errors.rs b/crates/worker/src/clients/errors.rs new file mode 100644 index 0000000..e7ff8e3 --- /dev/null +++ b/crates/worker/src/clients/errors.rs @@ -0,0 +1,67 @@ +use aws_sdk_dynamodb::config::http::HttpResponse; +use aws_sdk_dynamodb::operation::delete_item::DeleteItemError; +use aws_sdk_dynamodb::operation::get_item::GetItemError; +use aws_sdk_dynamodb::operation::update_item::UpdateItemError; +use aws_sdk_s3::operation::delete_object::DeleteObjectError; +use aws_sdk_s3::operation::get_object::GetObjectError; +use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error; +use aws_sdk_s3::operation::put_object::PutObjectError; +use aws_sdk_sqs::error::SdkError; +use aws_sdk_sqs::operation::delete_message::DeleteMessageError; +use aws_sdk_sqs::operation::receive_message::ReceiveMessageError; +use tracing::error; +use types::item::ItemError; + +// SQS related errors +pub(crate) type SqsReceiveError = SdkError; +pub(crate) type SqsDeleteError = SdkError; + +// DynamoDB related errors +pub(crate) type DBDeleteError = SdkError; +pub(crate) type DBGetError = SdkError; + +pub(crate) type DBUpdateError = SdkError; + +// S3 related errors +pub(crate) type S3ListObjectsError = SdkError; +pub(crate) type S3GetObjectError = SdkError; +pub(crate) type S3PutObjectError = SdkError; +pub(crate) type S3DeleteObjectError = SdkError; + +#[derive(thiserror::Error, Debug)] +pub enum SqsError { + #[error("SqsReceiveError: {0}")] + ReceiveError(#[from] SqsReceiveError), + #[error("SqsDeleteError: {0}")] + DeleteError(#[from] SqsDeleteError), +} + +#[derive(thiserror::Error, Debug)] +pub enum DBError { + #[error(transparent)] + DeleteItemError(#[from] DBDeleteError), + #[error(transparent)] + GetItemError(#[from] DBGetError), + #[error(transparent)] + ItemFormatError(#[from] ItemError), + #[error(transparent)] + UpdateItemError(#[from] DBUpdateError), +} + +#[derive(thiserror::Error, Debug)] +pub enum S3Error { + #[error("Invalid object")] + InvalidObjectError, + #[error(transparent)] + GetObjectError(#[from] S3GetObjectError), + #[error(transparent)] + ListObjectsError(#[from] S3ListObjectsError), + #[error(transparent)] + PutObjectError(#[from] S3PutObjectError), + #[error(transparent)] + DeleteObjectError(#[from] S3DeleteObjectError), + #[error(transparent)] + IoError(#[from] std::io::Error), + #[error(transparent)] + ByteStreamError(#[from] aws_smithy_types::byte_stream::error::Error), +} diff --git a/crates/worker/src/s3_client.rs b/crates/worker/src/clients/s3_client.rs similarity index 99% rename from crates/worker/src/s3_client.rs rename to crates/worker/src/clients/s3_client.rs index 798a8eb..1ccaa07 100644 --- a/crates/worker/src/s3_client.rs +++ b/crates/worker/src/clients/s3_client.rs @@ -1,5 +1,3 @@ -use crate::commands::compile::CompilationFile; -use crate::errors::S3Error; use aws_sdk_s3::presigning::{PresignedRequest, PresigningConfig}; use aws_sdk_s3::types::Object; use aws_sdk_s3::Client; @@ -8,6 +6,9 @@ use std::io::Write; use std::path::Path; use tracing::{error, warn}; +use crate::clients::errors::S3Error; +use crate::commands::compile::CompilationFile; + #[derive(Clone)] pub struct S3Client { pub client: Client, diff --git a/crates/worker/src/sqs_clients/client.rs b/crates/worker/src/clients/sqs_clients/client.rs similarity index 97% rename from crates/worker/src/sqs_clients/client.rs rename to crates/worker/src/clients/sqs_clients/client.rs index 198080d..5b9b08f 100644 --- a/crates/worker/src/sqs_clients/client.rs +++ b/crates/worker/src/clients/sqs_clients/client.rs @@ -1,9 +1,8 @@ +use crate::clients::errors::{SqsDeleteError, SqsReceiveError}; use aws_sdk_sqs::operation::delete_message::DeleteMessageOutput; use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput; use aws_sdk_sqs::Client; -use crate::errors::{SqsDeleteError, SqsReceiveError}; - macro_rules! match_result { ($err_type:ident, $result:expr) => { match $result { diff --git a/crates/worker/src/sqs_clients/mod.rs b/crates/worker/src/clients/sqs_clients/mod.rs similarity index 100% rename from crates/worker/src/sqs_clients/mod.rs rename to crates/worker/src/clients/sqs_clients/mod.rs diff --git a/crates/worker/src/sqs_clients/wrapper.rs b/crates/worker/src/clients/sqs_clients/wrapper.rs similarity index 98% rename from crates/worker/src/sqs_clients/wrapper.rs rename to crates/worker/src/clients/sqs_clients/wrapper.rs index ec05471..76b6b6f 100644 --- a/crates/worker/src/sqs_clients/wrapper.rs +++ b/crates/worker/src/clients/sqs_clients/wrapper.rs @@ -1,5 +1,3 @@ -use crate::errors::{SqsDeleteError, SqsReceiveError}; -use crate::sqs_clients::client::SqsClient; use aws_sdk_sqs::operation::delete_message::DeleteMessageOutput; use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput; use aws_sdk_sqs::Client; @@ -10,6 +8,9 @@ use tokio::select; use tokio::sync::{mpsc, oneshot}; use tokio::time::{sleep, Instant}; +use crate::clients::errors::{SqsDeleteError, SqsReceiveError}; +use crate::clients::sqs_clients::client::SqsClient; + #[derive(Default)] pub enum Action { #[default] diff --git a/crates/worker/src/commands/compile.rs b/crates/worker/src/commands/compile.rs index c1a3259..6fe84e3 100644 --- a/crates/worker/src/commands/compile.rs +++ b/crates/worker/src/commands/compile.rs @@ -3,15 +3,13 @@ use std::path::{Path, PathBuf}; use std::process::Stdio; use tracing::{error, info}; -use types::{CompilationConfig}; +use types::CompilationConfig; use crate::commands::errors::CompilationError; use crate::commands::SPAWN_SEMAPHORE; use crate::utils::cleaner::AutoCleanUp; use crate::utils::hardhat_config::HardhatConfigBuilder; -use crate::utils::lib::{ - initialize_files, list_files_in_directory, DEFAULT_SOLIDITY_VERSION, -}; +use crate::utils::lib::{initialize_files, list_files_in_directory, DEFAULT_SOLIDITY_VERSION}; pub struct CompilationFile { pub file_path: String, diff --git a/crates/worker/src/commands/errors.rs b/crates/worker/src/commands/errors.rs index 5eea89d..a3dc954 100644 --- a/crates/worker/src/commands/errors.rs +++ b/crates/worker/src/commands/errors.rs @@ -1,4 +1,4 @@ -use crate::errors::{DBError, S3Error}; +use crate::clients::errors::{DBError, S3Error}; #[derive(thiserror::Error, Debug)] pub enum CompilationError { diff --git a/crates/worker/src/commands/utils.rs b/crates/worker/src/commands/utils.rs index 3a4e6c4..92d0832 100644 --- a/crates/worker/src/commands/utils.rs +++ b/crates/worker/src/commands/utils.rs @@ -3,21 +3,19 @@ use aws_sdk_dynamodb::error::SdkError; use aws_sdk_dynamodb::operation::update_item::UpdateItemError; use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_s3::presigning::PresigningConfig; use std::path::Path; use std::time::Duration; -use aws_sdk_s3::presigning::PresigningConfig; use tracing::{error, warn}; use types::item::{Item, Status, TaskResult}; -use types::{ARTIFACTS_FOLDER, CompilationRequest}; +use types::{CompilationRequest, ARTIFACTS_FOLDER}; use uuid::Uuid; -use crate::commands::compile::{ - CompilationArtifact, CompilationInput, -}; -use crate::commands::errors::{PreparationError}; -use crate::dynamodb_client::DynamoDBClient; -use crate::errors::DBError; -use crate::s3_client::S3Client; +use crate::clients::dynamodb_client::DynamoDBClient; +use crate::clients::errors::DBError; +use crate::clients::s3_client::S3Client; +use crate::commands::compile::{CompilationArtifact, CompilationInput}; +use crate::commands::errors::PreparationError; use crate::utils::lib::{SOL_ROOT, ZKSOLC_VERSIONS}; async fn try_set_compiling_status( diff --git a/crates/worker/src/errors.rs b/crates/worker/src/errors.rs index d5b96ae..240e851 100644 --- a/crates/worker/src/errors.rs +++ b/crates/worker/src/errors.rs @@ -1,73 +1,5 @@ -use aws_sdk_dynamodb::config::http::HttpResponse; -use aws_sdk_dynamodb::operation::delete_item::DeleteItemError; -use aws_sdk_dynamodb::operation::get_item::GetItemError; -use aws_sdk_dynamodb::operation::update_item::UpdateItemError; -use aws_sdk_s3::operation::delete_object::DeleteObjectError; -use aws_sdk_s3::operation::get_object::GetObjectError; -use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error; -use aws_sdk_s3::operation::put_object::PutObjectError; -use aws_sdk_sqs::error::SdkError; -use aws_sdk_sqs::operation::delete_message::DeleteMessageError; -use aws_sdk_sqs::operation::receive_message::ReceiveMessageError; -use tracing::error; -use types::item::ItemError; - -// SQS related errors -pub(crate) type SqsReceiveError = SdkError; -pub(crate) type SqsDeleteError = SdkError; - -// DynamoDB related errors -pub(crate) type DBDeleteError = SdkError; -pub(crate) type DBGetError = SdkError; - -pub(crate) type DBUpdateError = SdkError; - -// S3 related errors -pub(crate) type S3ListObjectsError = SdkError; -pub(crate) type S3GetObjectError = SdkError; -pub(crate) type S3PutObjectError = SdkError; -pub(crate) type S3DeleteObjectError = SdkError; - -#[derive(thiserror::Error, Debug)] -pub enum SqsError { - #[error("SqsReceiveError: {0}")] - ReceiveError(#[from] SqsReceiveError), - #[error("SqsDeleteError: {0}")] - DeleteError(#[from] SqsDeleteError), -} - -#[derive(thiserror::Error, Debug)] -pub enum DBError { - #[error(transparent)] - DeleteItemError(#[from] DBDeleteError), - #[error(transparent)] - GetItemError(#[from] DBGetError), - #[error(transparent)] - ItemFormatError(#[from] ItemError), - #[error(transparent)] - UpdateItemError(#[from] DBUpdateError), -} - -#[derive(thiserror::Error, Debug)] -pub enum S3Error { - #[error("Invalid object")] - InvalidObjectError, - #[error(transparent)] - GetObjectError(#[from] S3GetObjectError), - #[error(transparent)] - ListObjectsError(#[from] S3ListObjectsError), - #[error(transparent)] - PutObjectError(#[from] S3PutObjectError), - #[error(transparent)] - DeleteObjectError(#[from] S3DeleteObjectError), - #[error(transparent)] - IoError(#[from] std::io::Error), - #[error(transparent)] - ByteStreamError(#[from] aws_smithy_types::byte_stream::error::Error), -} - -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error(transparent)] - DBError(#[from] DBError), -} +// #[derive(thiserror::Error, Debug)] +// pub enum Error { +// #[error(transparent)] +// DBError(#[from] DBError), +// } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index aea5159..f570e63 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,9 +1,7 @@ +mod clients; mod commands; -mod dynamodb_client; mod errors; mod purgatory; -mod s3_client; -mod sqs_clients; mod sqs_listener; mod utils; mod worker; @@ -12,10 +10,10 @@ use aws_config::BehaviorVersion; use aws_runtime::env_config::file::{EnvConfigFileKind, EnvConfigFiles}; use std::num::NonZeroUsize; -use crate::dynamodb_client::DynamoDBClient; +use crate::clients::dynamodb_client::DynamoDBClient; +use crate::clients::s3_client::S3Client; +use crate::clients::sqs_clients::wrapper::SqsClientWrapper; use crate::purgatory::State; -use crate::s3_client::S3Client; -use crate::sqs_clients::wrapper::SqsClientWrapper; use crate::worker::EngineBuilder; const AWS_PROFILE_DEFAULT: &str = "dev"; diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index e0343ce..d53123f 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -10,8 +10,8 @@ use types::item::{Status, TaskResult}; use types::ARTIFACTS_FOLDER; use uuid::Uuid; -use crate::dynamodb_client::DynamoDBClient; -use crate::s3_client::S3Client; +use crate::clients::dynamodb_client::DynamoDBClient; +use crate::clients::s3_client::S3Client; use crate::utils::lib::timestamp; pub type Timestamp = u64; @@ -28,9 +28,12 @@ impl Purgatory { inner: Arc::new(Mutex::new(Inner::new(handle, state, db_client, s3_client))), }; - let initialized_handle = tokio::spawn(this.clone().deamon()); - unsafe { - *handle.as_mut() = initialized_handle; + { + let mut inner = this.inner.try_lock().unwrap(); + let mut initialized_handle = tokio::spawn(this.clone().deamon()); + inner.handle = unsafe { + NonNull::new_unchecked(&mut initialized_handle as *mut _) + }; } this diff --git a/crates/worker/src/sqs_listener.rs b/crates/worker/src/sqs_listener.rs index c8f5b15..d115e7b 100644 --- a/crates/worker/src/sqs_listener.rs +++ b/crates/worker/src/sqs_listener.rs @@ -1,11 +1,11 @@ -use crate::errors::{SqsDeleteError, SqsReceiveError}; use async_channel::{Receiver, Recv, Sender, TrySendError}; use aws_sdk_sqs::types::Message; use std::time::Duration; use tokio::task::JoinHandle; use tokio::time::sleep; -use crate::sqs_clients::wrapper::SqsClientWrapper; +use crate::clients::errors::{SqsDeleteError, SqsReceiveError}; +use crate::clients::sqs_clients::wrapper::SqsClientWrapper; pub struct SqsListener { handle: JoinHandle>, diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index 2523172..589bc8a 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -4,13 +4,15 @@ use tokio::task::JoinHandle; use tracing::{error, info, warn}; use types::SqsMessage; -use crate::commands::compile::{ do_compile}; +use crate::clients::dynamodb_client::DynamoDBClient; +use crate::clients::s3_client::S3Client; +use crate::clients::sqs_clients::wrapper::SqsClientWrapper; +use crate::commands::compile::do_compile; use crate::commands::errors::PreparationError; -use crate::commands::utils::{on_compilation_failed, on_compilation_success, prepare_compile_input}; -use crate::dynamodb_client::DynamoDBClient; +use crate::commands::utils::{ + on_compilation_failed, on_compilation_success, prepare_compile_input, +}; use crate::purgatory::{Purgatory, State}; -use crate::s3_client::S3Client; -use crate::sqs_clients::wrapper::SqsClientWrapper; use crate::sqs_listener::{SqsListener, SqsReceiver}; pub struct EngineBuilder { @@ -131,53 +133,67 @@ impl RunningEngine { match sqs_message { SqsMessage::Compile { request } => { - let compilation_input = match prepare_compile_input(&request, &db_client, &s3_client).await { - Ok(value) => value, - Err(PreparationError::NoDBItemError(err)) => { - // Delete the message in this case. something weird. - // No need to cleanup s3 - error!("{}", PreparationError::NoDBItemError(err)); - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); + let compilation_input = + match prepare_compile_input(&request, &db_client, &s3_client).await { + Ok(value) => value, + Err(PreparationError::NoDBItemError(err)) => { + // Delete the message in this case. something weird. + // No need to cleanup s3 + error!("{}", PreparationError::NoDBItemError(err)); + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await + { + warn!("{}", err); + } + continue; } - continue; - } - Err(PreparationError::UnexpectedStatusError(err)) => { - // Probably some other instance executing this at the same time. - // For sake of safety still try delete it. Doesn't matter if succeed - // No need to cleanup s3 - info!("{}", PreparationError::UnexpectedStatusError(err)); - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); + Err(PreparationError::UnexpectedStatusError(err)) => { + // Probably some other instance executing this at the same time. + // For sake of safety still try delete it. Doesn't matter if succeed + // No need to cleanup s3 + info!("{}", PreparationError::UnexpectedStatusError(err)); + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await + { + warn!("{}", err); + } + + continue; } - - continue; - }, - Err(PreparationError::VersionNotSupported(err)) => { - let dir = format!("{}/", id); - s3_client.delete_dir(&dir).await.unwrap(); // TODO: do those retriable - - let task_result = on_compilation_failed(id, &db_client, PreparationError::VersionNotSupported(err).to_string()).await.unwrap(); - purgatory.update_task(id, task_result).await; // TODO: actually don't need anything - - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); + Err(PreparationError::VersionNotSupported(err)) => { + let dir = format!("{}/", id); + s3_client.delete_dir(&dir).await.unwrap(); // TODO: do those retriable + + let task_result = on_compilation_failed( + id, + &db_client, + PreparationError::VersionNotSupported(err).to_string(), + ) + .await + .unwrap(); + purgatory.update_task(id, task_result).await; // TODO: actually don't need anything + + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await + { + warn!("{}", err); + } + continue; } - continue; - } - Err(PreparationError::S3Error(err) ) => { - warn!("S3Error during preparation - ignoring. {}", err); - continue - }, - Err(PreparationError::DBError(err)) => { - warn!("DBError during preparation - ignoring. {}", err); - continue; - } - }; + Err(PreparationError::S3Error(err)) => { + warn!("S3Error during preparation - ignoring. {}", err); + continue; + } + Err(PreparationError::DBError(err)) => { + warn!("DBError during preparation - ignoring. {}", err); + continue; + } + }; let task_result = match do_compile(compilation_input).await { - Ok(value) => on_compilation_success(id, &db_client, &s3_client, value).await.unwrap(), // TODO: unwraps - Err(err) => on_compilation_failed(id, &db_client, err.to_string()).await.unwrap(), + Ok(value) => on_compilation_success(id, &db_client, &s3_client, value) + .await + .unwrap(), // TODO: unwraps + Err(err) => on_compilation_failed(id, &db_client, err.to_string()) + .await + .unwrap(), }; purgatory.update_task(id, task_result).await; From 2632fb648caab3e4379c385ab0253b36886185bb Mon Sep 17 00:00:00 2001 From: taco-paco Date: Tue, 10 Sep 2024 17:18:09 +0900 Subject: [PATCH 5/9] fix: lambda compilation --- crates/lambdas/src/compile.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/lambdas/src/compile.rs b/crates/lambdas/src/compile.rs index 5fefc80..a3f8daa 100644 --- a/crates/lambdas/src/compile.rs +++ b/crates/lambdas/src/compile.rs @@ -36,7 +36,7 @@ async fn compile( queue_url: &str, ) -> Result<(), Error> { let item = Item { - id: request.id.clone(), + id: request.id.to_string(), status: Status::Pending, }; @@ -112,7 +112,7 @@ async fn process_request( let objects = s3_client .list_objects_v2() .delimiter('/') - .prefix(request.id.clone().add("/")) + .prefix(request.id.to_string().add("/")) .bucket(bucket_name) .send() .await From afaae88150ec1587cc74589cad4e8d04d4db34aa Mon Sep 17 00:00:00 2001 From: taco-paco Date: Wed, 11 Sep 2024 12:20:54 +0900 Subject: [PATCH 6/9] feat: move message process to seaprate functions --- crates/lambdas/src/compile.rs | 2 +- crates/worker/src/purgatory.rs | 26 ++--- crates/worker/src/utils/lib.rs | 9 ++ crates/worker/src/worker.rs | 179 +++++++++++++++++++-------------- 4 files changed, 126 insertions(+), 90 deletions(-) diff --git a/crates/lambdas/src/compile.rs b/crates/lambdas/src/compile.rs index a3f8daa..6f78956 100644 --- a/crates/lambdas/src/compile.rs +++ b/crates/lambdas/src/compile.rs @@ -178,4 +178,4 @@ async fn main() -> Result<(), LambdaError> { } })) .await -} +} \ No newline at end of file diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index d53123f..459039d 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -12,7 +12,7 @@ use uuid::Uuid; use crate::clients::dynamodb_client::DynamoDBClient; use crate::clients::s3_client::S3Client; -use crate::utils::lib::timestamp; +use crate::utils::lib::{s3_artifacts_dir, timestamp}; pub type Timestamp = u64; @@ -31,9 +31,7 @@ impl Purgatory { { let mut inner = this.inner.try_lock().unwrap(); let mut initialized_handle = tokio::spawn(this.clone().deamon()); - inner.handle = unsafe { - NonNull::new_unchecked(&mut initialized_handle as *mut _) - }; + inner.handle = unsafe { NonNull::new_unchecked(&mut initialized_handle as *mut _) }; } this @@ -43,12 +41,8 @@ impl Purgatory { self.inner.lock().await.purge().await; } - pub async fn add_task(&mut self, id: Uuid) { - self.inner.lock().await.add_task(id); - } - - pub async fn update_task(&mut self, id: Uuid, result: TaskResult) { - self.inner.lock().await.update_task(id, result); + pub async fn add_record(&mut self, id: Uuid, result: TaskResult) { + self.inner.lock().await.add_record(id, result); } async fn deamon(mut self) { @@ -112,11 +106,7 @@ impl Inner { } } - fn add_task(&mut self, id: Uuid) { - self.state.task_status.insert(id, Status::InProgress); - } - - fn update_task(&mut self, id: Uuid, result: TaskResult) { + fn add_record(&mut self, id: Uuid, result: TaskResult) { self.state.task_status.insert(id, Status::Done(result)); } @@ -134,9 +124,13 @@ impl Inner { continue; }; + let artifacts_dir = s3_artifacts_dir(&id); match status { Status::InProgress => warn!("Item compiling for too long!"), - Status::Done(_) | Status::Pending => { + Status::Pending => { + warn!("Item pending for too long"); + } + Status::Done(_) => { let dir = format!("{}/{}/", ARTIFACTS_FOLDER, id); self.s3_client.delete_dir(&dir).await.unwrap(); // TODO: fix self.db_client diff --git a/crates/worker/src/utils/lib.rs b/crates/worker/src/utils/lib.rs index ff03aaa..27eeb61 100644 --- a/crates/worker/src/utils/lib.rs +++ b/crates/worker/src/utils/lib.rs @@ -1,6 +1,7 @@ use crate::commands::compile::CompilationFile; use std::path::{Path, PathBuf}; use tracing::debug; +use types::ARTIFACTS_FOLDER; use uuid::Uuid; use walkdir::WalkDir; @@ -167,6 +168,14 @@ pub fn list_files_in_directory>(path: P) -> Result, w // } // } +pub fn s3_artifacts_dir(id: &str) -> String { + format!("{}/{}/", ARTIFACTS_FOLDER, id) +} + +pub fn s3_compilation_files_dir(id: &str) -> String { + format!("{}/", id) +} + pub fn generate_mock_solidity_file_content() -> String { r#" pragma solidity ^0.8.0; diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index 589bc8a..9d8595f 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -2,7 +2,7 @@ use std::num::NonZeroUsize; use std::time::Duration; use tokio::task::JoinHandle; use tracing::{error, info, warn}; -use types::SqsMessage; +use types::{CompilationRequest, SqsMessage, VerificationRequest}; use crate::clients::dynamodb_client::DynamoDBClient; use crate::clients::s3_client::S3Client; @@ -14,6 +14,7 @@ use crate::commands::utils::{ }; use crate::purgatory::{Purgatory, State}; use crate::sqs_listener::{SqsListener, SqsReceiver}; +use crate::utils::lib::{s3_artifacts_dir, s3_compilation_files_dir}; pub struct EngineBuilder { sqs_client: SqsClientWrapper, @@ -128,84 +129,116 @@ impl RunningEngine { } }; - let id = sqs_message.id(); - purgatory.add_task(id).await; - + // TODO: add metrics for how long it takes - + // adjust "visibility timeout" or receiver chan capacity match sqs_message { SqsMessage::Compile { request } => { - let compilation_input = - match prepare_compile_input(&request, &db_client, &s3_client).await { - Ok(value) => value, - Err(PreparationError::NoDBItemError(err)) => { - // Delete the message in this case. something weird. - // No need to cleanup s3 - error!("{}", PreparationError::NoDBItemError(err)); - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await - { - warn!("{}", err); - } - continue; - } - Err(PreparationError::UnexpectedStatusError(err)) => { - // Probably some other instance executing this at the same time. - // For sake of safety still try delete it. Doesn't matter if succeed - // No need to cleanup s3 - info!("{}", PreparationError::UnexpectedStatusError(err)); - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await - { - warn!("{}", err); - } - - continue; - } - Err(PreparationError::VersionNotSupported(err)) => { - let dir = format!("{}/", id); - s3_client.delete_dir(&dir).await.unwrap(); // TODO: do those retriable - - let task_result = on_compilation_failed( - id, - &db_client, - PreparationError::VersionNotSupported(err).to_string(), - ) - .await - .unwrap(); - purgatory.update_task(id, task_result).await; // TODO: actually don't need anything - - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await - { - warn!("{}", err); - } - continue; - } - Err(PreparationError::S3Error(err)) => { - warn!("S3Error during preparation - ignoring. {}", err); - continue; - } - Err(PreparationError::DBError(err)) => { - warn!("DBError during preparation - ignoring. {}", err); - continue; - } - }; - - let task_result = match do_compile(compilation_input).await { - Ok(value) => on_compilation_success(id, &db_client, &s3_client, value) - .await - .unwrap(), // TODO: unwraps - Err(err) => on_compilation_failed(id, &db_client, err.to_string()) - .await - .unwrap(), - }; - purgatory.update_task(id, task_result).await; - - let dir = format!("{}/", id); - s3_client.delete_dir(&dir).await.unwrap(); + Self::process_compile_message( + request, + receipt_handle, + &sqs_receiver, + &db_client, + &s3_client, + &mut purgatory, + ) + .await + } + SqsMessage::Verify { request } => { + Self::process_verify_message(request, receipt_handle, &sqs_receiver).await } - SqsMessage::Verify { request } => {} // TODO; } + } + } + + // TODO(future me): could return bool. + async fn process_compile_message( + request: CompilationRequest, + receipt_handle: String, + sqs_receiver: &SqsReceiver, + db_client: &DynamoDBClient, + s3_client: &S3Client, + purgatory: &mut Purgatory, + ) { + let id = request.id; + let compilation_input = match prepare_compile_input(&request, db_client, s3_client).await { + Ok(value) => value, + Err(PreparationError::NoDBItemError(err)) => { + // Delete the message in this case. something weird. + // No need to clean up s3 + error!("{}", PreparationError::NoDBItemError(err)); + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { + warn!("{}", err); + } + return; + } + Err(PreparationError::UnexpectedStatusError(err)) => { + // Probably some other instance executing this at the same time. + // For sake of safety still try to delete it. Doesn't matter if succeeds. + // No need to clean up s3 + info!("{}", PreparationError::UnexpectedStatusError(err)); + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { + warn!("{}", err); + } - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); + return; } + Err(PreparationError::VersionNotSupported(err)) => { + // Clean everything since the request failed + let dir = s3_compilation_files_dir(&id); + s3_client.delete_dir(&dir).await.unwrap(); // TODO: do those retriable + + // This error doesn't create any artifacts + let _ = on_compilation_failed( + id, + &db_client, + PreparationError::VersionNotSupported(err).to_string(), + ) + .await + .unwrap(); + + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { + warn!("{}", err); + } + return; + } + Err(PreparationError::S3Error(err)) => { + warn!("S3Error during preparation - ignoring. {}", err); + return; + } + Err(PreparationError::DBError(err)) => { + warn!("DBError during preparation - ignoring. {}", err); + return; + } + }; + + let task_result = match do_compile(compilation_input).await { + Ok(value) => on_compilation_success(id, &db_client, &s3_client, value) + .await + .unwrap(), // TODO: unwraps + Err(err) => on_compilation_failed(id, &db_client, err.to_string()) + .await + .unwrap(), + }; + purgatory.add_record(id, task_result).await; + + // Clean compilation input files right away + let dir = s3_artifacts_dir(&id); + s3_client.delete_dir(&dir).await.unwrap(); + + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { + warn!("{}", err); + } + } + + async fn process_verify_message( + request: VerificationRequest, + receipt_handle: String, + sqs_receiver: &SqsReceiver, + ) { + // TODO: implement + + if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { + warn!("{}", err); } } From d070263e65e148d8f79ea9d01bfd838ef02a348b Mon Sep 17 00:00:00 2001 From: taco-paco Date: Wed, 11 Sep 2024 12:56:23 +0900 Subject: [PATCH 7/9] feat: finishing purgatory --- crates/worker/src/purgatory.rs | 37 ++++++++++++++++++++++++++++------ crates/worker/src/utils/lib.rs | 2 -- crates/worker/src/worker.rs | 4 ++-- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index 459039d..2100d01 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -12,7 +12,7 @@ use uuid::Uuid; use crate::clients::dynamodb_client::DynamoDBClient; use crate::clients::s3_client::S3Client; -use crate::utils::lib::{s3_artifacts_dir, timestamp}; +use crate::utils::lib::{s3_artifacts_dir, s3_compilation_files_dir, timestamp}; pub type Timestamp = u64; @@ -107,7 +107,13 @@ impl Inner { } fn add_record(&mut self, id: Uuid, result: TaskResult) { + pub const DURATION_TO_PURGE: u64 = 60 * 5; // 5 minutes + let to_purge_timestampe = timestamp() + DURATION_TO_PURGE; + self.state.task_status.insert(id, Status::Done(result)); + self.state + .expiration_timestamps + .push((id, to_purge_timestampe)); } pub async fn purge(&mut self) { @@ -124,14 +130,28 @@ impl Inner { continue; }; - let artifacts_dir = s3_artifacts_dir(&id); match status { Status::InProgress => warn!("Item compiling for too long!"), Status::Pending => { warn!("Item pending for too long"); + + // Remove compilation files + let files_dir = s3_compilation_files_dir(id.to_string().as_str()); + self.s3_client.delete_dir(&files_dir).await.unwrap(); + + // Remove artifacts + let artifacts_dir = s3_compilation_files_dir(id.to_string().as_str()); + self.s3_client.delete_dir(&artifacts_dir).await.unwrap(); // TODO: fix + + // TODO: design choice. Delete or update status to purged? + // Second would give neater replies + self.db_client + .delete_item(id.to_string().as_str()) + .await + .unwrap(); } Status::Done(_) => { - let dir = format!("{}/{}/", ARTIFACTS_FOLDER, id); + let dir = s3_artifacts_dir(id.to_string().as_str()); self.s3_client.delete_dir(&dir).await.unwrap(); // TODO: fix self.db_client .delete_item(id.to_string().as_str()) @@ -141,8 +161,13 @@ impl Inner { } } - self.state - .expiration_timestamps - .retain(|(_, timestamp)| *timestamp > now); + self.state.expiration_timestamps.retain(|(id, timestamp)| { + if *timestamp > now { + return true; + }; + + self.state.task_status.remove(id); + false + }); } } diff --git a/crates/worker/src/utils/lib.rs b/crates/worker/src/utils/lib.rs index 27eeb61..d999eea 100644 --- a/crates/worker/src/utils/lib.rs +++ b/crates/worker/src/utils/lib.rs @@ -18,8 +18,6 @@ pub const ARTIFACTS_ROOT: &str = pub const CARGO_MANIFEST_DIR: &str = env!("CARGO_MANIFEST_DIR"); -pub const DURATION_TO_PURGE: u64 = 60 * 5; // 5 minutes - pub const ZKSOLC_VERSIONS: [&str; 2] = ["1.4.1", "1.4.0"]; pub const DEFAULT_SOLIDITY_VERSION: &str = "0.8.24"; diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index 9d8595f..3ca5b3d 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -184,7 +184,7 @@ impl RunningEngine { } Err(PreparationError::VersionNotSupported(err)) => { // Clean everything since the request failed - let dir = s3_compilation_files_dir(&id); + let dir = s3_compilation_files_dir(id.to_string().as_str()); s3_client.delete_dir(&dir).await.unwrap(); // TODO: do those retriable // This error doesn't create any artifacts @@ -222,7 +222,7 @@ impl RunningEngine { purgatory.add_record(id, task_result).await; // Clean compilation input files right away - let dir = s3_artifacts_dir(&id); + let dir = s3_compilation_files_dir(id.to_string().as_str()); s3_client.delete_dir(&dir).await.unwrap(); if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { From 10e73091a5df7770231ff33a28119db051b405f3 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Wed, 11 Sep 2024 13:34:43 +0900 Subject: [PATCH 8/9] feat: don't keep artifacts content in ram, upload it consecutively to s3 --- crates/worker/src/clients/s3_client.rs | 5 +-- crates/worker/src/commands/compile.rs | 57 ++++++++++++++------------ crates/worker/src/commands/errors.rs | 37 ++++++++--------- crates/worker/src/commands/utils.rs | 35 +++++++++++----- 4 files changed, 74 insertions(+), 60 deletions(-) diff --git a/crates/worker/src/clients/s3_client.rs b/crates/worker/src/clients/s3_client.rs index 1ccaa07..67014a2 100644 --- a/crates/worker/src/clients/s3_client.rs +++ b/crates/worker/src/clients/s3_client.rs @@ -43,10 +43,7 @@ impl S3Client { .expect("Unreachable. list_all_keys bug."); files.push(CompilationFile { file_content: contents, - file_path: file_path - .to_str() - .expect("Unexpected encoding issue.") - .to_string(), + file_path: file_path.to_path_buf(), }); } diff --git a/crates/worker/src/commands/compile.rs b/crates/worker/src/commands/compile.rs index 6fe84e3..8dcd4e5 100644 --- a/crates/worker/src/commands/compile.rs +++ b/crates/worker/src/commands/compile.rs @@ -12,7 +12,7 @@ use crate::utils::hardhat_config::HardhatConfigBuilder; use crate::utils::lib::{initialize_files, list_files_in_directory, DEFAULT_SOLIDITY_VERSION}; pub struct CompilationFile { - pub file_path: String, + pub file_path: PathBuf, pub file_content: Vec, } @@ -23,14 +23,19 @@ pub struct CompilationInput { pub contracts: Vec, } -pub struct CompilationArtifact { - pub file_name: String, - pub file_content: Vec, +pub struct ArtifactData { + pub file_path: PathBuf, pub is_contract: bool, } + +pub struct CompilationOutput { + pub artifacts_dir: PathBuf, + pub artifacts_data: Vec, +} + pub async fn do_compile( compilation_input: CompilationInput, -) -> Result, CompilationError> { +) -> Result { // root directory for the contracts let workspace_path = compilation_input.workspace_path; // root directory for the artifacts @@ -100,30 +105,30 @@ pub async fn do_compile( } // fetch the files in the artifacts directory - let mut file_contents: Vec = vec![]; let file_paths = list_files_in_directory(&artifacts_path).expect("Unexpected error listing artifact"); - for file_path in file_paths.iter() { - // TODO: change this - don't store files in RAM. copy 1-1 to S3 - let file_content = tokio::fs::read(file_path).await?; - let full_path = Path::new(file_path); - - let relative_path = full_path - .strip_prefix(&artifacts_path) - .expect("Unexpected prefix"); - let relative_path_str = relative_path.to_str().unwrap(); - - let is_contract = - !relative_path_str.ends_with(".dbg.json") && relative_path_str.ends_with(".json"); - - file_contents.push(CompilationArtifact { - file_name: relative_path_str.to_string(), - file_content, - is_contract, - }); - } + let artifacts_data = file_paths + .into_iter() + .map(|file_path| { + let full_path = Path::new(&file_path); + let relative_path = full_path + .strip_prefix(&artifacts_path) + .expect("Unexpected prefix"); + + let is_contract = + !relative_path.ends_with(".dbg.json") && relative_path.ends_with(".json"); + + ArtifactData { + file_path: relative_path.to_path_buf(), + is_contract, + } + }) + .collect(); // calling here explicitly to avoid dropping the AutoCleanUp struct auto_clean_up.clean_up().await; - Ok(file_contents) + Ok(CompilationOutput { + artifacts_dir: artifacts_path, + artifacts_data, + }) } diff --git a/crates/worker/src/commands/errors.rs b/crates/worker/src/commands/errors.rs index a3dc954..641080d 100644 --- a/crates/worker/src/commands/errors.rs +++ b/crates/worker/src/commands/errors.rs @@ -1,13 +1,5 @@ use crate::clients::errors::{DBError, S3Error}; -#[derive(thiserror::Error, Debug)] -pub enum CompilationError { - #[error("IoError: {0}")] - IoError(#[from] std::io::Error), - #[error("Failed to compile: {0}")] - CompilationFailureError(String), -} - #[derive(thiserror::Error, Debug)] pub enum PreparationError { #[error("DBError: {0}")] @@ -22,15 +14,20 @@ pub enum PreparationError { VersionNotSupported(String), } -// impl CompilationError { -// pub fn recoverable(&self) -> bool { -// match self { -// CompilationError::DBError(_) | CompilationError::S3Error(_) => true, -// CompilationError::NoDBItemError(_) -// | CompilationError::UnexpectedStatusError(_) -// | CompilationError::IoError(_) -// | CompilationError::VersionNotSupported(_) -// | CompilationError::CompilationFailureError(_) => false, -// } -// } -// } +#[derive(thiserror::Error, Debug)] +pub enum CompilationError { + #[error("IoError: {0}")] + IoError(#[from] std::io::Error), + #[error("Failed to compile: {0}")] + CompilationFailureError(String), +} + +#[derive(thiserror::Error, Debug)] +pub enum CommandResultHandleError { + #[error("IoError: {0}")] + IoError(#[from] std::io::Error), + #[error("DBError: {0}")] + DBError(#[from] DBError), + #[error("S3Error: {0}")] + S3Error(#[from] S3Error), +} diff --git a/crates/worker/src/commands/utils.rs b/crates/worker/src/commands/utils.rs index 92d0832..8e7a3de 100644 --- a/crates/worker/src/commands/utils.rs +++ b/crates/worker/src/commands/utils.rs @@ -14,8 +14,9 @@ use uuid::Uuid; use crate::clients::dynamodb_client::DynamoDBClient; use crate::clients::errors::DBError; use crate::clients::s3_client::S3Client; -use crate::commands::compile::{CompilationArtifact, CompilationInput}; -use crate::commands::errors::PreparationError; +use crate::commands::compile::{ArtifactData, CompilationInput, CompilationOutput}; +use crate::commands::errors::{CommandResultHandleError, PreparationError}; +use crate::utils::cleaner::AutoCleanUp; use crate::utils::lib::{SOL_ROOT, ZKSOLC_VERSIONS}; async fn try_set_compiling_status( @@ -101,14 +102,26 @@ pub async fn on_compilation_success( id: Uuid, db_client: &DynamoDBClient, s3_client: &S3Client, - compilation_artifacts: Vec, -) -> Result { + compilation_output: CompilationOutput, +) -> Result { const DOWNLOAD_URL_EXPIRATION: Duration = Duration::from_secs(5 * 60 * 60); - let mut presigned_urls = Vec::with_capacity(compilation_artifacts.len()); - for el in compilation_artifacts { - let file_key = format!("{}/{}/{}", ARTIFACTS_FOLDER, id, el.file_name); - s3_client.put_object(&file_key, el.file_content).await?; + let auto_clean_up = AutoCleanUp { + dirs: vec![compilation_output.artifacts_dir.to_str().unwrap()], + }; + + let mut presigned_urls = Vec::with_capacity(compilation_output.artifacts_data.len()); + for el in compilation_output.artifacts_data { + let absolute_path = compilation_output.artifacts_dir.join(&el.file_path); + let file_content = tokio::fs::read(absolute_path).await?; + + let file_key = format!( + "{}/{}/{}", + ARTIFACTS_FOLDER, + id, + el.file_path.to_str().unwrap() + ); + s3_client.put_object(&file_key, file_content).await?; let expires_in = PresigningConfig::expires_in(DOWNLOAD_URL_EXPIRATION).unwrap(); let presigned_request = s3_client @@ -140,6 +153,7 @@ pub async fn on_compilation_success( .await .map_err(DBError::from)?; + auto_clean_up.clean_up().await; Ok(TaskResult::Success { presigned_urls }) } @@ -147,7 +161,7 @@ pub async fn on_compilation_failed( id: Uuid, db_client: &DynamoDBClient, message: String, -) -> Result { +) -> Result { db_client .client .update_item() @@ -162,7 +176,8 @@ pub async fn on_compilation_failed( ) .expression_attribute_values(":data", AttributeValue::S(message.clone())) .send() - .await?; + .await + .map_err(DBError::from)?; Ok(TaskResult::Failure(message)) } From cbde110f6a38f5e0c3ded803428a88a73443e077 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Wed, 18 Sep 2024 14:14:29 +0900 Subject: [PATCH 9/9] refactor: pr edits --- crates/worker/src/clients/s3_client.rs | 5 +---- crates/worker/src/commands/utils.rs | 9 +++------ crates/worker/src/purgatory.rs | 10 +++++----- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/crates/worker/src/clients/s3_client.rs b/crates/worker/src/clients/s3_client.rs index 67014a2..265f962 100644 --- a/crates/worker/src/clients/s3_client.rs +++ b/crates/worker/src/clients/s3_client.rs @@ -108,10 +108,7 @@ impl S3Client { self.delete_object(key).await?; } - // TODO: check that works - let result = self.delete_object(dir).await; - result?; - Ok(()) + self.delete_object(dir).await } pub async fn delete_object(&self, key: &str) -> Result<(), S3Error> { diff --git a/crates/worker/src/commands/utils.rs b/crates/worker/src/commands/utils.rs index 8e7a3de..c4d709e 100644 --- a/crates/worker/src/commands/utils.rs +++ b/crates/worker/src/commands/utils.rs @@ -78,12 +78,9 @@ pub(crate) async fn prepare_compile_input( } }; - match item.status { - Status::Pending => {} - status => { - warn!("Item already processing: {}", status); - return Err(PreparationError::UnexpectedStatusError(status.to_string())); - } + if !matches!(item.status, Status::Pending) { + warn!("Item already processing: {}", item.status); + return Err(PreparationError::UnexpectedStatusError(item.status.to_string())); } let dir = format!("{}/", request.id); diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index 2100d01..5d709b6 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use std::ptr::NonNull; use std::sync::Arc; use std::time::Duration; -use tokio::time::sleep; +use tokio::time::{interval, sleep}; use tokio::{sync::Mutex, task::JoinHandle}; use tracing::warn; use types::item::{Status, TaskResult}; @@ -30,7 +30,7 @@ impl Purgatory { { let mut inner = this.inner.try_lock().unwrap(); - let mut initialized_handle = tokio::spawn(this.clone().deamon()); + let mut initialized_handle = tokio::spawn(this.clone().daemon()); inner.handle = unsafe { NonNull::new_unchecked(&mut initialized_handle as *mut _) }; } @@ -45,12 +45,13 @@ impl Purgatory { self.inner.lock().await.add_record(id, result); } - async fn deamon(mut self) { + async fn daemon(mut self) { const PURGE_INTERVAL: Duration = Duration::from_secs(60); + let mut interval = interval(PURGE_INTERVAL); loop { + interval.tick().await; self.purge().await; - sleep(PURGE_INTERVAL).await; } } } @@ -143,7 +144,6 @@ impl Inner { let artifacts_dir = s3_compilation_files_dir(id.to_string().as_str()); self.s3_client.delete_dir(&artifacts_dir).await.unwrap(); // TODO: fix - // TODO: design choice. Delete or update status to purged? // Second would give neater replies self.db_client .delete_item(id.to_string().as_str())