diff --git a/Cargo.toml b/Cargo.toml index 6053abd..9cdd25d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,5 @@ [workspace] -members = [ - "crates/types", - "crates/lambdas", - "crates/worker", -] +members = ["crates/types", "crates/lambdas", "crates/worker"] exclude = ["api"] [workspace.dependencies] @@ -12,12 +8,16 @@ aws-sdk-s3 = "1.43.0" aws-sdk-sqs = "1.39.0" aws-sdk-dynamodb = "1.42.0" chrono = "0.4.38" -tokio = {version = "1.39.3", features = ["macros"]} +tokio = { version = "1.39.3", features = ["macros"] } serde = "1.0.207" serde_json = "1.0.128" thiserror = "1.0.63" tracing = { version = "0.1.40", features = ["log"] } -tracing-subscriber = { version = "0.3.18", default-features = false, features = ["fmt", "ansi"] } +tracing-subscriber = { version = "0.3.18", default-features = false, features = [ + "fmt", + "ansi", +] } uuid = { version = "1.10.0", features = ["serde", "v4"] } -types = {version = "0.0.1", path = "crates/types"} \ No newline at end of file +# Internal dependencies +types = { version = "0.0.1", path = "crates/types" } diff --git a/crates/lambdas/Cargo.toml b/crates/lambdas/Cargo.toml index c221c22..5398911 100644 --- a/crates/lambdas/Cargo.toml +++ b/crates/lambdas/Cargo.toml @@ -13,16 +13,14 @@ chrono.workspace = true tokio.workspace = true serde.workspace = true serde_json.workspace = true -thiserror.workspace = true tracing.workspace = true tracing-subscriber.workspace = true uuid.workspace = true -lambda_runtime = "0.13.0" lambda_http = "0.13.0" # Inner crates -types = {workspace = true} +types.workspace = true [[bin]] name = "generate-presigned-urls" diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index c94bb1f..5fbcf59 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -11,4 +11,4 @@ thiserror.workspace = true uuid.workspace = true [dev-dependencies] -serde_json.workspace = true \ No newline at end of file +serde_json.workspace = true diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 3f404d7..dc05778 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -9,7 +9,7 @@ aws-sdk-s3.workspace = true aws-sdk-sqs.workspace = true aws-sdk-dynamodb.workspace = true chrono.workspace = true -tokio = {workspace = true, features = ["rt-multi-thread", "sync"]} +tokio = { workspace = true, features = ["rt-multi-thread", "sync"] } serde = { workspace = true, features = ["derive"] } serde_json.workspace = true thiserror.workspace = true diff --git a/crates/worker/src/clients.rs b/crates/worker/src/clients.rs index e642bf5..557002e 100644 --- a/crates/worker/src/clients.rs +++ b/crates/worker/src/clients.rs @@ -1,4 +1,5 @@ -pub mod dynamodb_client; +pub mod dynamodb_clients; pub mod errors; -pub mod s3_client; +mod retriable; +pub mod s3_clients; pub mod sqs_clients; diff --git a/crates/worker/src/clients/dynamodb_client.rs b/crates/worker/src/clients/dynamodb_client.rs deleted file mode 100644 index 434aa8f..0000000 --- a/crates/worker/src/clients/dynamodb_client.rs +++ /dev/null @@ -1,48 +0,0 @@ -use aws_sdk_dynamodb::types::AttributeValue; -use aws_sdk_dynamodb::Client; -use types::item::Item; - -use crate::clients::errors::DBError; - -#[derive(Clone)] -pub struct DynamoDBClient { - pub client: Client, - pub table_name: String, -} - -impl DynamoDBClient { - pub fn new(client: Client, table_name: &str) -> Self { - Self { - client, - table_name: table_name.into(), - } - } - - pub async fn delete_item(&self, id: &str) -> Result<(), DBError> { - self.client - .delete_item() - .table_name(self.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) - .send() - .await?; - - Ok(()) - } - - pub async fn get_item(&self, key: &str) -> Result, DBError> { - let result = self - .client - .get_item() - .table_name(self.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(key.to_string())) - .send() - .await?; - - if let Some(item) = result.item { - // TODO: maybe change status or delete when error? - Ok(Some(item.try_into()?)) - } else { - Ok(None) - } - } -} diff --git a/crates/worker/src/clients/dynamodb_clients/client.rs b/crates/worker/src/clients/dynamodb_clients/client.rs new file mode 100644 index 0000000..47a84e4 --- /dev/null +++ b/crates/worker/src/clients/dynamodb_clients/client.rs @@ -0,0 +1,247 @@ +use aws_sdk_dynamodb::operation::scan::ScanOutput; +use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder; +use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_dynamodb::Client; +use chrono::{DateTime, Utc}; +use std::collections::HashMap; +use std::sync::atomic::AtomicU8; +use std::sync::Arc; +use tokio::sync::oneshot; +use types::item::{Item, Status}; + +use crate::clients::errors::{DBDeleteError, DBError, DBGetError, DBScanError, DBUpdateError}; +use crate::clients::retriable::{handle_action_result, match_result, ActionHandler}; + +#[derive(Clone)] +pub struct DynamoDBClient { + pub client: Client, + pub table_name: String, +} + +impl DynamoDBClient { + pub fn new(client: Client, table_name: &str) -> Self { + Self { + client, + table_name: table_name.into(), + } + } + + pub async fn delete_item(&self, id: &str) -> Result<(), DBDeleteError> { + let _ = self + .client + .delete_item() + .table_name(self.table_name.clone()) + .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) + .send() + .await?; + + Ok(()) + } + + pub async fn delete_item_attempt(&self, id: &str) -> Result, DBDeleteError> { + match_result!(DBDeleteError, self.delete_item(id).await) + } + + pub async fn update_item_raw( + &self, + update_item_builder: &UpdateItemFluentBuilder, + ) -> Result<(), DBUpdateError> { + let _ = update_item_builder.clone().send().await; + Ok(()) + } + + pub async fn update_item_raw_attempt( + &self, + update_item_builder: &UpdateItemFluentBuilder, + ) -> Result, DBUpdateError> { + match_result!( + DBUpdateError, + self.update_item_raw(update_item_builder).await + ) + } + + pub async fn update_item_status_conditional( + &self, + id: &str, + from: &Status, + to: &Status, + ) -> Result<(), DBUpdateError> { + let _ = self + .client + .update_item() + .table_name(self.table_name.clone()) + .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) + .update_expression("SET #status = :toStatus") + .condition_expression("#status = :fromStatus") + .expression_attribute_names("#status", Status::attribute_name()) + .expression_attribute_values(":toStatus", AttributeValue::N(u32::from(to).to_string())) + .expression_attribute_values( + ":fromStatus", + AttributeValue::N(u32::from(from).to_string()), + ) + .send() + .await?; + Ok(()) + } + + pub async fn update_item_status_conditional_attempt( + &self, + id: &str, + from: &Status, + to: &Status, + ) -> Result, DBUpdateError> { + match_result!( + DBUpdateError, + self.update_item_status_conditional(id, from, to).await + ) + } + + pub async fn get_item(&self, key: &str) -> Result, DBError> { + let result = self + .client + .get_item() + .table_name(self.table_name.clone()) + .key(Item::primary_key_name(), AttributeValue::S(key.to_string())) + .send() + .await?; + + if let Some(item) = result.item { + Ok(Some(item.try_into()?)) + } else { + Ok(None) + } + } + + pub async fn get_item_attempt(&self, key: &str) -> Result>, DBError> { + match self.get_item(key).await { + Err(DBError::GetItemError(err)) => { + match_result!(DBGetError, Err(err)).map_err(DBError::from) + } + result => result.map(|value| Some(value)), + } + } + + pub async fn scan_items_prior_to( + &self, + time: &DateTime, + exclusive_start_key: &Option>, + ) -> Result { + const MAX_CAPACITY: usize = 100; + + self.client + .scan() + .table_name(self.table_name.clone()) + .filter_expression("CreatedAt <= :created_at") + .expression_attribute_values(":created_at", AttributeValue::S(time.to_rfc3339())) + .limit(MAX_CAPACITY as i32) + .set_exclusive_start_key(exclusive_start_key.clone()) + .send() + .await + } + + pub async fn scan_items_prior_to_attempt( + &self, + time: &DateTime, + exclusive_start_key: &Option>, + ) -> Result, DBScanError> { + match_result!( + DBScanError, + self.scan_items_prior_to(time, exclusive_start_key).await + ) + } +} + +#[derive(Default)] +pub enum DynamoDBAction { + #[default] + Default, + DeleteItem { + id: String, + sender: oneshot::Sender>, + }, + GetItem { + id: String, + sender: oneshot::Sender, DBError>>, + }, + ScanPriorTo { + time: DateTime, + exclusive_start_key: Option>, + sender: oneshot::Sender>, + }, + UpdateItemRaw { + update_item_builder: UpdateItemFluentBuilder, + sender: oneshot::Sender>, + }, + UpdateItemStatusConditional { + id: String, + from: Status, + to: Status, + sender: oneshot::Sender>, + }, +} + +impl ActionHandler for DynamoDBClient { + type Action = DynamoDBAction; + + async fn handle(&self, action: Self::Action, state: Arc) -> Option { + match action { + DynamoDBAction::Default => unreachable!(), + DynamoDBAction::DeleteItem { id, sender } => { + let result = self.delete_item_attempt(&id).await; + handle_action_result(result, sender, state) + .map(|sender| DynamoDBAction::DeleteItem { id, sender }) + } + DynamoDBAction::GetItem { id, sender } => { + let result = self.get_item_attempt(&id).await; + handle_action_result(result, sender, state) + .map(|sender| DynamoDBAction::GetItem { id, sender }) + } + DynamoDBAction::ScanPriorTo { + time, + exclusive_start_key, + sender, + } => { + let result = self + .scan_items_prior_to_attempt(&time, &exclusive_start_key) + .await; + handle_action_result(result, sender, state).map(|sender| { + DynamoDBAction::ScanPriorTo { + time, + exclusive_start_key, + sender, + } + }) + } + DynamoDBAction::UpdateItemRaw { + update_item_builder, + sender, + } => { + let result = self.update_item_raw_attempt(&update_item_builder).await; + handle_action_result(result, sender, state).map(|sender| { + DynamoDBAction::UpdateItemRaw { + update_item_builder, + sender, + } + }) + } + DynamoDBAction::UpdateItemStatusConditional { + id, + from, + to, + sender, + } => { + let result = self + .update_item_status_conditional_attempt(&id, &to, &from) + .await; + handle_action_result(result, sender, state).map(|sender| { + DynamoDBAction::UpdateItemStatusConditional { + id, + to, + from, + sender, + } + }) + } + } + } +} diff --git a/crates/worker/src/clients/dynamodb_clients/mod.rs b/crates/worker/src/clients/dynamodb_clients/mod.rs new file mode 100644 index 0000000..8402e4f --- /dev/null +++ b/crates/worker/src/clients/dynamodb_clients/mod.rs @@ -0,0 +1,2 @@ +pub mod client; +pub mod wrapper; diff --git a/crates/worker/src/clients/dynamodb_clients/wrapper.rs b/crates/worker/src/clients/dynamodb_clients/wrapper.rs new file mode 100644 index 0000000..223d996 --- /dev/null +++ b/crates/worker/src/clients/dynamodb_clients/wrapper.rs @@ -0,0 +1,118 @@ +use aws_sdk_dynamodb::operation::scan::ScanOutput; +use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder; +use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_dynamodb::Client; +use chrono::{DateTime, Utc}; +use std::collections::HashMap; +use std::sync::atomic::AtomicU8; +use std::sync::Arc; +use tokio::sync::mpsc; +use types::item::{Item, Status}; + +use crate::clients::dynamodb_clients::client::{DynamoDBAction, DynamoDBClient}; +use crate::clients::errors::{DBDeleteError, DBError, DBScanError, DBUpdateError}; +use crate::clients::retriable::{execute_retriable_operation, Retrier, State}; + +#[derive(Clone)] +pub struct DynamoDBClientWrapper { + pub client: DynamoDBClient, + actions_sender: mpsc::Sender, + state: Arc, +} + +impl DynamoDBClientWrapper { + pub fn new(client: Client, table_name: &str) -> Self { + let client = DynamoDBClient::new(client, table_name); + let state = Arc::new(AtomicU8::new(State::Connected as u8)); + let (sender, receiver) = mpsc::channel(1000); + + let retrier = Retrier::new(client.clone(), receiver, state.clone()); + tokio::spawn(retrier.start()); + + Self { + client, + state, + actions_sender: sender, + } + } + + pub async fn delete_item(&self, id: &str) -> Result<(), DBDeleteError> { + let operation = || self.client.delete_item_attempt(id); + let action_factory = |sender| DynamoDBAction::DeleteItem { + id: id.to_string(), + sender, + }; + + // TODO: if all good. rewrite all other clients like that? + execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state) + .await + } + + pub async fn get_item(&self, key: &str) -> Result, DBError> { + let operation = || self.client.get_item_attempt(key); + let action_factory = |sender| DynamoDBAction::GetItem { + id: key.to_string(), + sender, + }; + + execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state) + .await + } + + pub async fn update_item_raw( + &self, + update_item_builder: &UpdateItemFluentBuilder, + ) -> Result<(), DBUpdateError> { + let operation = || self.client.update_item_raw_attempt(update_item_builder); + + let action_factory = |sender| DynamoDBAction::UpdateItemRaw { + update_item_builder: update_item_builder.clone(), + sender, + }; + + execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state) + .await + } + + pub async fn scan_items_prior_to( + &self, + time: &DateTime, + exclusive_start_key: &Option>, + ) -> Result { + let operation = || { + self.client + .scan_items_prior_to_attempt(time, exclusive_start_key) + }; + + let action_factory = |sender| DynamoDBAction::ScanPriorTo { + time: time.clone(), + exclusive_start_key: exclusive_start_key.clone(), + sender, + }; + + execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state) + .await + } + + pub async fn update_item_status_conditional( + &self, + id: &str, + from: &Status, + to: &Status, + ) -> Result<(), DBUpdateError> { + let operation = || { + self.client + .update_item_status_conditional_attempt(id, from, to) + }; + + let action_factory = |sender| DynamoDBAction::UpdateItemStatusConditional { + id: id.to_string(), + from: from.clone(), + to: to.clone(), + sender, + }; + + execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state) + .await + } +} diff --git a/crates/worker/src/clients/errors.rs b/crates/worker/src/clients/errors.rs index e7ff8e3..391b832 100644 --- a/crates/worker/src/clients/errors.rs +++ b/crates/worker/src/clients/errors.rs @@ -1,6 +1,7 @@ use aws_sdk_dynamodb::config::http::HttpResponse; use aws_sdk_dynamodb::operation::delete_item::DeleteItemError; use aws_sdk_dynamodb::operation::get_item::GetItemError; +use aws_sdk_dynamodb::operation::scan::ScanError; use aws_sdk_dynamodb::operation::update_item::UpdateItemError; use aws_sdk_s3::operation::delete_object::DeleteObjectError; use aws_sdk_s3::operation::get_object::GetObjectError; @@ -19,8 +20,8 @@ pub(crate) type SqsDeleteError = SdkError; // DynamoDB related errors pub(crate) type DBDeleteError = SdkError; pub(crate) type DBGetError = SdkError; - pub(crate) type DBUpdateError = SdkError; +pub(crate) type DBScanError = SdkError; // S3 related errors pub(crate) type S3ListObjectsError = SdkError; @@ -46,6 +47,8 @@ pub enum DBError { ItemFormatError(#[from] ItemError), #[error(transparent)] UpdateItemError(#[from] DBUpdateError), + #[error(transparent)] + ScanError(#[from] DBScanError), } #[derive(thiserror::Error, Debug)] diff --git a/crates/worker/src/clients/retriable.rs b/crates/worker/src/clients/retriable.rs new file mode 100644 index 0000000..470e963 --- /dev/null +++ b/crates/worker/src/clients/retriable.rs @@ -0,0 +1,169 @@ +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::{sleep, Instant}; + +macro_rules! match_result { + ($err_type:ident, $result:expr) => { + match $result { + Ok(val) => Ok(Some(val)), + Err(err) => match err { + $err_type::ConstructionFailure(_) => Err(err), + $err_type::TimeoutError(_) => Ok(None), + $err_type::DispatchFailure(dispatch_err) => { + if dispatch_err.is_io() { + Ok(None) + } else if dispatch_err.is_timeout() { + Ok(None) + } else if dispatch_err.is_user() { + Err($err_type::DispatchFailure(dispatch_err)) + } else if let Some(other) = dispatch_err.as_other() { + match other { + aws_config::retry::ErrorKind::ClientError => { + Err($err_type::DispatchFailure(dispatch_err)) + } + _ => Ok(None), + } + } else { + Err($err_type::DispatchFailure(dispatch_err)) + } + } + other => Err(other), + }, + } + }; +} +pub(crate) use match_result; + +pub trait ActionHandler { + type Action: Default; // TODO: get rid of default + async fn handle(&self, action: Self::Action, state: Arc) -> Option; +} + +pub enum State { + Connected = 0, + Reconnecting = 1, +} + +pub struct Retrier { + client: T, + receiver: mpsc::Receiver, + state: Arc, +} + +impl Retrier { + pub fn new(client: T, receiver: mpsc::Receiver, state: Arc) -> Self { + Self { + client, + receiver, + state, + } + } + + pub async fn start(mut self) { + const SLEEP_DURATION: Duration = Duration::from_secs(3); + // TODO: introduce limit + let mut pending_actions = vec![]; + + loop { + if pending_actions.is_empty() { + if let Some(action) = self.receiver.recv().await { + pending_actions.push(action); + } else { + return; + } + } + + self.resend_pending_actions(&mut pending_actions).await; + + let start_time = Instant::now(); + let value = select! { + value = self.receiver.recv() => value, + _ = sleep(SLEEP_DURATION) => continue, + }; + + if let Some(action) = value { + pending_actions.push(action); + } else { + return; + } + + let elapsed = start_time.elapsed(); + if let Some(remaining_sleep) = SLEEP_DURATION.checked_sub(elapsed) { + sleep(remaining_sleep).await; + } + } + } + + pub async fn resend_pending_actions(&self, pending_actions: &mut Vec) { + let mut pivot = 0; + for i in 0..pending_actions.len() { + let action = std::mem::take(&mut pending_actions[i]); + let action_unhandled = self.client.handle(action, self.state.clone()).await; + + // Keeping in the array to resend. + if let Some(action) = action_unhandled { + pending_actions[pivot] = action; + pivot += 1; + } + } + + pending_actions.truncate(pivot); + } +} + +pub(crate) fn handle_action_result( + result: Result, E>, + sender: oneshot::Sender>, + state: Arc, +) -> Option>> +where + T: Send + 'static, + E: Send + 'static, +{ + match result { + Ok(Some(val)) => { + state.store(State::Connected as u8, Ordering::Release); + let _ = sender.send(Ok(val)); + None + } + Err(err) => { + let _ = sender.send(Err(err)); + None + } + Ok(None) => { + state.store(State::Reconnecting as u8, Ordering::Release); + Some(sender) + } + } +} + +pub(crate) async fn execute_retriable_operation( + operation: F, + action_factory: AFactory, + action_sender: &mpsc::Sender, + state: &AtomicU8, +) -> Result +where + F: Fn() -> Fut, + Fut: std::future::Future, E>>, + AFactory: Fn(oneshot::Sender>) -> A, +{ + match state.load(Ordering::Acquire) { + 0 => match operation().await { + Ok(Some(val)) => return Ok(val), + Ok(None) => state.store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err.into()), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + let action = action_factory(sender); + action_sender.send(action).await; + + receiver.await.unwrap() // TODO: remove unwrap +} diff --git a/crates/worker/src/clients/s3_client.rs b/crates/worker/src/clients/s3_client.rs deleted file mode 100644 index 265f962..0000000 --- a/crates/worker/src/clients/s3_client.rs +++ /dev/null @@ -1,165 +0,0 @@ -use aws_sdk_s3::presigning::{PresignedRequest, PresigningConfig}; -use aws_sdk_s3::types::Object; -use aws_sdk_s3::Client; -use aws_smithy_types::byte_stream::ByteStream; -use std::io::Write; -use std::path::Path; -use tracing::{error, warn}; - -use crate::clients::errors::S3Error; -use crate::commands::compile::CompilationFile; - -#[derive(Clone)] -pub struct S3Client { - pub client: Client, - pub bucket_name: String, -} - -impl S3Client { - pub fn new(client: Client, bucket_name: &str) -> Self { - Self { - bucket_name: bucket_name.to_string(), - client, - } - } - - pub async fn extract_files(&self, dir: &str) -> Result, S3Error> { - let objects = self.list_all_keys(dir).await?; - - let mut files = vec![]; - for object in objects { - let key = object.key().ok_or(S3Error::InvalidObjectError)?; - let expected_size = object.size.ok_or(S3Error::InvalidObjectError)?; - - let mut contents = Vec::with_capacity(expected_size as usize); - self.get_object_into(key, &mut contents).await?; - if contents.len() as i64 != expected_size { - error!("Fetched num bytes != expected size of file."); - return Err(S3Error::InvalidObjectError); - } - - let file_path = Path::new(key) - .strip_prefix(dir) - .expect("Unreachable. list_all_keys bug."); - files.push(CompilationFile { - file_content: contents, - file_path: file_path.to_path_buf(), - }); - } - - Ok(files) - } - - pub async fn get_object_into(&self, key: &str, writer: &mut impl Write) -> Result<(), S3Error> { - let mut object = self - .client - .get_object() - .bucket(self.bucket_name.clone()) - .key(key) - .send() - .await?; - - while let Some(bytes) = object.body.try_next().await? { - writer.write_all(&bytes)?; - } - - Ok(()) - } - - pub async fn get_object(&self, key: &str) -> Result, S3Error> { - let mut contents = vec![]; - self.get_object_into(key, &mut contents).await?; - - Ok(contents) - } - - pub async fn get_object_presigned( - &self, - key: &str, - expires_in: PresigningConfig, - ) -> Result { - Ok(self - .client - .get_object() - .bucket(self.bucket_name.clone()) - .key(key.to_string()) - .presigned(expires_in) - .await - .map_err(S3Error::from)?) - } - - pub async fn put_object(&self, key: &str, data: impl Into) -> Result<(), S3Error> { - let _ = self - .client - .put_object() - .bucket(self.bucket_name.clone()) - .key(key.to_string()) - .body(data.into()) - .send() - .await?; - - Ok(()) - } - - pub async fn delete_dir(&self, dir: &str) -> Result<(), S3Error> { - let objects = self.list_all_keys(dir).await?; - for object in objects { - let key = object.key().ok_or(S3Error::InvalidObjectError)?; - self.delete_object(key).await?; - } - - self.delete_object(dir).await - } - - pub async fn delete_object(&self, key: &str) -> Result<(), S3Error> { - let _ = self - .client - .delete_object() - .bucket(self.bucket_name.clone()) - .key(key) - .send() - .await?; - Ok(()) - } - - pub async fn list_all_keys(&self, dir: &str) -> Result, S3Error> { - let mut objects = Vec::new(); - let mut continuation_token: Option = None; - - loop { - let mut request = self - .client - .list_objects_v2() - .bucket(self.bucket_name.clone()) - .delimiter('/') - .prefix(dir.to_string()); - if let Some(token) = continuation_token { - request = request.continuation_token(token); - } - - let response = request.send().await?; - if let Some(contents) = response.contents { - objects.extend(contents); - } - - let is_truncated = if let Some(is_truncated) = response.is_truncated { - is_truncated - } else { - warn!("is_truncated empty"); - break; - }; - - if !is_truncated { - break; - } - - continuation_token = response.next_continuation_token; - if continuation_token.is_none() { - error!("continuation_token wasn't set!"); - break; - } - } - - Ok(objects) - } -} diff --git a/crates/worker/src/clients/s3_clients/client.rs b/crates/worker/src/clients/s3_clients/client.rs new file mode 100644 index 0000000..7082aed --- /dev/null +++ b/crates/worker/src/clients/s3_clients/client.rs @@ -0,0 +1,321 @@ +use aws_sdk_s3::presigning::{PresignedRequest, PresigningConfig}; +use aws_sdk_s3::types::Object; +use aws_sdk_s3::Client; +use aws_smithy_types::byte_stream::ByteStream; +use std::io::{SeekFrom, Write}; +use std::path::Path; +use std::sync::atomic::AtomicU8; +use std::sync::Arc; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::sync::oneshot; +use tracing::{error, warn}; + +use crate::clients::errors::{ + S3DeleteObjectError, S3Error, S3GetObjectError, S3ListObjectsError, S3PutObjectError, +}; +use crate::clients::retriable::{handle_action_result, match_result, ActionHandler}; +use crate::commands::compile::CompilationFile; + +#[derive(Clone)] +pub struct S3Client { + pub client: Client, + pub bucket_name: String, +} + +impl S3Client { + pub fn new(client: Client, bucket_name: &str) -> Self { + Self { + bucket_name: bucket_name.to_string(), + client, + } + } + + pub async fn extract_files(&self, dir: &str) -> Result, S3Error> { + let objects = self.list_all_keys(dir).await?; + + let mut files = vec![]; + for object in objects { + let key = object.key().ok_or(S3Error::InvalidObjectError)?; + let expected_size = object.size.ok_or(S3Error::InvalidObjectError)?; + + let mut contents = Vec::with_capacity(expected_size as usize); + self.get_object_into(key, &mut contents).await?; + if contents.len() as i64 != expected_size { + error!("Fetched num bytes != expected size of file."); + return Err(S3Error::InvalidObjectError); + } + + let file_path = Path::new(key) + .strip_prefix(dir) + .expect("Unreachable. list_all_keys bug."); + files.push(CompilationFile { + file_content: contents, + file_path: file_path.to_path_buf(), + }); + } + + Ok(files) + } + + pub async fn extract_files_attempt( + &self, + dir: &str, + ) -> Result>, S3Error> { + let result = self.extract_files(dir).await; + match result { + Err(S3Error::ListObjectsError(err)) => { + match_result!(S3ListObjectsError, Err(err)).map_err(S3Error::from) + } + Err(S3Error::GetObjectError(err)) => { + match_result!(S3GetObjectError, Err(err)).map_err(S3Error::from) + } + result => result.map(|value| Some(value)), + } + } + + pub async fn get_object_into(&self, key: &str, writer: &mut impl Write) -> Result<(), S3Error> { + let mut object = self + .client + .get_object() + .bucket(self.bucket_name.clone()) + .key(key) + .send() + .await?; + + while let Some(bytes) = object.body.try_next().await? { + writer.write_all(&bytes)?; + } + + Ok(()) + } + + pub async fn get_object(&self, key: &str) -> Result, S3Error> { + let mut contents = vec![]; + self.get_object_into(key, &mut contents).await?; + + Ok(contents) + } + + pub async fn get_object_presigned( + &self, + key: &str, + expires_in: &PresigningConfig, + ) -> Result { + self.client + .get_object() + .bucket(self.bucket_name.clone()) + .key(key.to_string()) + .presigned(expires_in.clone()) + .await + } + + pub async fn get_object_presigned_attempt( + &self, + key: &str, + expires_in: &PresigningConfig, + ) -> Result, S3GetObjectError> { + match_result!( + S3GetObjectError, + self.get_object_presigned(key, expires_in).await + ) + } + + pub async fn put_object( + &self, + key: &str, + data: impl Into, + ) -> Result<(), S3PutObjectError> { + let _ = self + .client + .put_object() + .bucket(self.bucket_name.clone()) + .key(key.to_string()) + .body(data.into()) + .send() + .await?; + + Ok(()) + } + + pub async fn put_object_attempt( + &self, + key: &str, + data: impl Into, + ) -> Result, S3PutObjectError> { + match_result!(S3PutObjectError, self.put_object(key, data).await) + } + + pub async fn delete_dir(&self, dir: &str) -> Result<(), S3Error> { + let objects = self.list_all_keys(dir).await?; + // TODO: delete_objects instead + for object in objects { + let key = object.key().ok_or(S3Error::InvalidObjectError)?; + self.delete_object(key).await?; + } + + self.delete_object(dir).await?; + Ok(()) + } + + pub async fn delete_dir_attempt(&self, dir: &str) -> Result, S3Error> { + match self.delete_dir(dir).await { + Err(S3Error::DeleteObjectError(err)) => { + match_result!(S3DeleteObjectError, Err(err)).map_err(S3Error::from) + } + result => result.map(|value| Some(value)), + } + } + + pub async fn delete_object(&self, key: &str) -> Result<(), S3DeleteObjectError> { + let _ = self + .client + .delete_object() + .bucket(self.bucket_name.clone()) + .key(key) + .send() + .await?; + + Ok(()) + } + + pub async fn delete_object_attempt( + &self, + key: &str, + ) -> Result, S3DeleteObjectError> { + match_result!(S3DeleteObjectError, self.delete_object(key).await) + } + + pub async fn list_all_keys(&self, dir: &str) -> Result, S3ListObjectsError> { + let mut objects = Vec::new(); + let mut continuation_token: Option = None; + + loop { + let mut request = self + .client + .list_objects_v2() + .bucket(self.bucket_name.clone()) + .delimiter('/') + .prefix(dir.to_string()); + if let Some(token) = continuation_token { + request = request.continuation_token(token); + } + + let response = request.send().await?; + if let Some(contents) = response.contents { + objects.extend(contents); + } + + let is_truncated = if let Some(is_truncated) = response.is_truncated { + is_truncated + } else { + warn!("is_truncated empty"); + break; + }; + + if !is_truncated { + break; + } + + continuation_token = response.next_continuation_token; + if continuation_token.is_none() { + error!("continuation_token wasn't set!"); + break; + } + } + + Ok(objects) + } +} + +#[derive(Default)] +pub enum S3Action { + #[default] + Default, + DeleteDir { + dir: String, + sender: oneshot::Sender>, + }, + DeleteObject { + key: String, + sender: oneshot::Sender>, + }, + ExtractFiles { + dir: String, + sender: oneshot::Sender, S3Error>>, + }, + PutObject { + key: String, + file: File, + sender: oneshot::Sender>, + }, + GetObjectPresigned { + key: String, + expires_in: PresigningConfig, + sender: oneshot::Sender>, + }, +} + +impl ActionHandler for S3Client { + type Action = S3Action; + async fn handle(&self, action: Self::Action, state: Arc) -> Option { + match action { + S3Action::Default => unreachable!(), + S3Action::DeleteDir { dir, sender } => { + let result = self.delete_dir_attempt(&dir).await; + handle_action_result(result, sender, state) + .map(|sender| S3Action::DeleteDir { dir, sender }) + } + S3Action::DeleteObject { key, sender } => { + let result = self.delete_object_attempt(&key).await; + handle_action_result(result, sender, state) + .map(|sender| S3Action::DeleteObject { key, sender }) + } + S3Action::ExtractFiles { dir, sender } => { + let result = self.extract_files_attempt(&dir).await; + handle_action_result(result, sender, state) + .map(|sender| S3Action::ExtractFiles { dir, sender }) + } + S3Action::GetObjectPresigned { + key, + expires_in, + sender, + } => { + let result = self.get_object_presigned_attempt(&key, &expires_in).await; + handle_action_result(result, sender, state).map(|sender| { + S3Action::GetObjectPresigned { + key, + expires_in, + sender, + } + }) + } + S3Action::PutObject { + key, + mut file, + sender, + } => { + let mut buf = Vec::new(); + if let Err(err) = file.read_to_end(&mut buf).await { + let _ = sender.send(Err(S3Error::IoError(err))); + return None; + }; + + let result = self + .put_object_attempt(&key, buf) + .await + .map_err(S3Error::from); + if let Some(sender) = handle_action_result(result, sender, state) { + if let Err(err) = file.seek(SeekFrom::Start(0)).await { + let _ = sender.send(Err(S3Error::IoError(err))); + None + } else { + Some(S3Action::PutObject { key, file, sender }) + } + } else { + None + } + } + } + } +} diff --git a/crates/worker/src/clients/s3_clients/mod.rs b/crates/worker/src/clients/s3_clients/mod.rs new file mode 100644 index 0000000..8402e4f --- /dev/null +++ b/crates/worker/src/clients/s3_clients/mod.rs @@ -0,0 +1,2 @@ +pub mod client; +pub mod wrapper; diff --git a/crates/worker/src/clients/s3_clients/wrapper.rs b/crates/worker/src/clients/s3_clients/wrapper.rs new file mode 100644 index 0000000..1869ae2 --- /dev/null +++ b/crates/worker/src/clients/s3_clients/wrapper.rs @@ -0,0 +1,168 @@ +use crate::clients::errors::{S3DeleteObjectError, S3Error, S3GetObjectError}; +use crate::clients::retriable::{Retrier, State}; +use aws_sdk_s3::presigning::{PresignedRequest, PresigningConfig}; +use aws_sdk_s3::Client; +use std::io::SeekFrom; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::sync::{mpsc, oneshot}; + +use crate::clients::s3_clients::client::{S3Action, S3Client}; +use crate::commands::compile::CompilationFile; + +#[derive(Clone)] +pub struct S3ClientWrapper { + pub client: S3Client, + actions_sender: mpsc::Sender, + state: Arc, +} + +impl S3ClientWrapper { + pub fn new(client: Client, bucket_name: &str) -> Self { + let client = S3Client::new(client, bucket_name); + let state = Arc::new(AtomicU8::new(State::Connected as u8)); + let (sender, receiver) = mpsc::channel(1000); + + let retrier = Retrier::new(client.clone(), receiver, state.clone()); + tokio::spawn(retrier.start()); + + Self { + client, + state, + actions_sender: sender, + } + } + + pub async fn delete_dir(&self, dir: &str) -> Result<(), S3Error> { + match self.state.load(Ordering::Acquire) { + 0 => match self.client.delete_dir_attempt(dir).await { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(S3Action::DeleteDir { + dir: dir.to_string(), + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn delete_object(&self, key: &str) -> Result<(), S3DeleteObjectError> { + match self.state.load(Ordering::Acquire) { + 0 => match self.client.delete_object_attempt(key).await { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(S3Action::DeleteObject { + key: key.to_string(), + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn extract_files(&self, dir: &str) -> Result, S3Error> { + match self.state.load(Ordering::Acquire) { + 0 => match self.client.extract_files_attempt(dir).await { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(S3Action::ExtractFiles { + dir: dir.to_string(), + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn put_object(&self, key: &str, mut file: File) -> Result<(), S3Error> { + match self.state.load(Ordering::Acquire) { + 0 => { + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await?; + match self.client.put_object_attempt(key, buf).await { + Ok(Some(val)) => return Ok(val), + Ok(None) => { + self.state + .store(State::Reconnecting as u8, Ordering::Release); + file.seek(SeekFrom::Start(0)).await?; + } + Err(err) => return Err(err.into()), + } + } + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(S3Action::PutObject { + key: key.to_string(), + file, + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn get_object_presigned( + &self, + key: &str, + expires_in: &PresigningConfig, + ) -> Result { + match self.state.load(Ordering::Acquire) { + 0 => match self + .client + .get_object_presigned_attempt(key, expires_in) + .await + { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(S3Action::GetObjectPresigned { + key: key.to_string(), + expires_in: expires_in.clone(), + sender, + }) + .await; + receiver.await.unwrap() + } +} diff --git a/crates/worker/src/clients/sqs_clients/client.rs b/crates/worker/src/clients/sqs_clients/client.rs index e22b477..dc5d22c 100644 --- a/crates/worker/src/clients/sqs_clients/client.rs +++ b/crates/worker/src/clients/sqs_clients/client.rs @@ -1,41 +1,12 @@ use aws_sdk_sqs::operation::delete_message::DeleteMessageOutput; use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput; use aws_sdk_sqs::Client; +use std::sync::atomic::AtomicU8; +use std::sync::Arc; +use tokio::sync::oneshot; use crate::clients::errors::{SqsDeleteError, SqsReceiveError}; - -macro_rules! match_result { - ($err_type:ident, $result:expr) => { - match $result { - Ok(val) => Ok(Some(val)), - Err(err) => match err { - $err_type::ConstructionFailure(_) => Err(err), - $err_type::TimeoutError(_) => Ok(None), - $err_type::DispatchFailure(dispatch_err) => { - if dispatch_err.is_io() { - return Ok(None); - } - if dispatch_err.is_timeout() { - return Ok(None); - } - if dispatch_err.is_user() { - return Err($err_type::DispatchFailure(dispatch_err)); - } - if let Some(other) = dispatch_err.as_other() { - return match other { - aws_config::retry::ErrorKind::ClientError => { - Err($err_type::DispatchFailure(dispatch_err)) - } - _ => Ok(None), - }; - } - Err($err_type::DispatchFailure(dispatch_err)) - } - other => Err(other), - }, - } - }; -} +use crate::clients::retriable::{handle_action_result, match_result, ActionHandler}; #[derive(Clone)] pub struct SqsClient { @@ -78,3 +49,37 @@ impl SqsClient { match_result!(SqsDeleteError, result) } } + +#[derive(Default)] +pub enum Action { + #[default] + Default, // TODO: get rid of this. crutches + Receive(oneshot::Sender>), + Delete { + receipt_handle: String, + sender: oneshot::Sender>, + }, +} + +impl ActionHandler for SqsClient { + type Action = Action; + async fn handle(&self, action: Action, state: Arc) -> Option { + match action { + Action::Default => unreachable!(), + Action::Receive(sender) => { + let result = self.receive_attempt().await; + handle_action_result(result, sender, state).map(|sender| Action::Receive(sender)) + } + Action::Delete { + receipt_handle, + sender, + } => { + let result = self.delete_attempt(receipt_handle.clone()).await; + handle_action_result(result, sender, state).map(|sender| Action::Delete { + sender, + receipt_handle, + }) + } + } + } +} diff --git a/crates/worker/src/clients/sqs_clients/wrapper.rs b/crates/worker/src/clients/sqs_clients/wrapper.rs index 76b6b6f..b55cfa0 100644 --- a/crates/worker/src/clients/sqs_clients/wrapper.rs +++ b/crates/worker/src/clients/sqs_clients/wrapper.rs @@ -3,29 +3,11 @@ use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput; use aws_sdk_sqs::Client; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; -use std::time::Duration; -use tokio::select; use tokio::sync::{mpsc, oneshot}; -use tokio::time::{sleep, Instant}; use crate::clients::errors::{SqsDeleteError, SqsReceiveError}; -use crate::clients::sqs_clients::client::SqsClient; - -#[derive(Default)] -pub enum Action { - #[default] - Default, // TODO: get rid of this. crutches - Receive(oneshot::Sender>), - Delete { - receipt_handle: String, - sender: oneshot::Sender>, - }, -} - -enum State { - Connected = 0, - Reconnecting = 1, -} +use crate::clients::retriable::{Retrier, State}; +use crate::clients::sqs_clients::client::{Action, SqsClient}; #[derive(Clone)] pub struct SqsClientWrapper { @@ -40,7 +22,8 @@ impl SqsClientWrapper { let state = Arc::new(AtomicU8::new(State::Connected as u8)); let (sender, receiver) = mpsc::channel(1000); - tokio::spawn(Self::worker(client.clone(), state.clone(), receiver)); + let retrier = Retrier::new(client.clone(), receiver, state.clone()); + tokio::spawn(retrier.start()); Self { client, @@ -49,89 +32,6 @@ impl SqsClientWrapper { } } - async fn worker(client: SqsClient, state: Arc, mut receiver: mpsc::Receiver) { - const SLEEP_DURATION: Duration = Duration::from_secs(3); - let mut pending_actions = vec![]; - - loop { - if pending_actions.is_empty() { - if let Some(action) = receiver.recv().await { - pending_actions.push(action); - } else { - return; - } - } - - Self::resend_pending_actions(&mut pending_actions, &client, &state).await; - - let start_time = Instant::now(); - let value = select! { - value = receiver.recv() => value, - _ = sleep(SLEEP_DURATION) => continue, - }; - - if let Some(action) = value { - pending_actions.push(action); - } else { - return; - } - - let elapsed = start_time.elapsed(); - if let Some(remaining_sleep) = SLEEP_DURATION.checked_sub(elapsed) { - sleep(remaining_sleep).await; - } - } - } - - pub async fn resend_pending_actions( - pending_actions: &mut Vec, - client: &SqsClient, - state: &Arc, - ) { - let mut pivot = 0; - for i in 0..pending_actions.len() { - let action = std::mem::take(&mut pending_actions[i]); - match action { - Action::Receive(sender) => match client.receive_attempt().await { - Ok(Some(val)) => { - state.store(State::Connected as u8, Ordering::Release); - let _ = sender.send(Ok(val)); - } - Err(err) => { - let _ = sender.send(Err(err)); - } - Ok(None) => { - // Keeping in the array to resend. - pending_actions[pivot] = Action::Receive(sender); - pivot += 1; - } - }, - Action::Delete { - receipt_handle, - sender, - } => match client.delete_attempt(receipt_handle.clone()).await { - Ok(Some(val)) => { - state.store(State::Connected as u8, Ordering::Release); - let _ = sender.send(Ok(val)); - } - Err(err) => { - let _ = sender.send(Err(err)); - } - Ok(None) => { - pending_actions[pivot] = Action::Delete { - receipt_handle, - sender, - }; - pivot += 1; - } - }, - Action::Default => unreachable!(), - }; - } - - pending_actions.truncate(pivot); - } - pub async fn receive_message(&self) -> Result { match self.state.load(Ordering::Acquire) { 0 => match self.client.receive_attempt().await { diff --git a/crates/worker/src/commands/utils.rs b/crates/worker/src/commands/utils.rs index 6bb8887..c333626 100644 --- a/crates/worker/src/commands/utils.rs +++ b/crates/worker/src/commands/utils.rs @@ -11,37 +11,25 @@ use types::item::{Item, Status, TaskResult}; use types::{CompilationRequest, ARTIFACTS_FOLDER}; use uuid::Uuid; -use crate::clients::dynamodb_client::DynamoDBClient; -use crate::clients::errors::DBError; -use crate::clients::s3_client::S3Client; +use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; +use crate::clients::errors::{DBError, S3Error}; +use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::commands::compile::{CompilationInput, CompilationOutput}; use crate::commands::errors::{CommandResultHandleError, PreparationError}; use crate::utils::cleaner::AutoCleanUp; use crate::utils::lib::{SOL_ROOT, ZKSOLC_VERSIONS}; async fn try_set_compiling_status( - db_client: &DynamoDBClient, + db_client: &DynamoDBClientWrapper, key: Uuid, ) -> Result<(), PreparationError> { let db_update_result = db_client - .client - .update_item() - .table_name(db_client.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(key.to_string())) - .update_expression("SET #status = :newStatus") - .condition_expression("#status = :currentStatus") - .expression_attribute_names("#status", Status::attribute_name()) - .expression_attribute_values( - ":newStatus", - AttributeValue::N(u32::from(Status::InProgress).to_string()), + .update_item_status_conditional( + key.to_string().as_str(), + &Status::Pending, + &Status::InProgress, ) - .expression_attribute_values( - ":currentStatus", - AttributeValue::N(u32::from(Status::Pending).to_string()), - ) - .send() .await; - match db_update_result { Ok(_) => Ok(()), Err(SdkError::ServiceError(err)) => match err.err() { @@ -59,8 +47,8 @@ async fn try_set_compiling_status( pub(crate) async fn prepare_compile_input( request: &CompilationRequest, - db_client: &DynamoDBClient, - s3_client: &S3Client, + db_client: &DynamoDBClientWrapper, + s3_client: &S3ClientWrapper, ) -> Result { let zksolc_version = request.config.version.as_str(); if !ZKSOLC_VERSIONS.contains(&zksolc_version) { @@ -73,7 +61,6 @@ pub(crate) async fn prepare_compile_input( let item: Item = match item { Some(item) => item, None => { - error!("No item id: {}", request.id); return Err(PreparationError::NoDBItemError(request.id.to_string())); } }; @@ -95,10 +82,11 @@ pub(crate) async fn prepare_compile_input( contracts: files, }) } + pub async fn on_compilation_success( id: Uuid, - db_client: &DynamoDBClient, - s3_client: &S3Client, + db_client: &DynamoDBClientWrapper, + s3_client: &S3ClientWrapper, compilation_output: CompilationOutput, ) -> Result { const DOWNLOAD_URL_EXPIRATION: Duration = Duration::from_secs(5 * 60 * 60); @@ -110,7 +98,7 @@ pub async fn on_compilation_success( let mut presigned_urls = Vec::with_capacity(compilation_output.artifacts_data.len()); for el in compilation_output.artifacts_data { let absolute_path = compilation_output.artifacts_dir.join(&el.file_path); - let file_content = tokio::fs::read(absolute_path).await?; + let file_content = tokio::fs::File::open(absolute_path).await?; let file_key = format!( "{}/{}/{}", @@ -122,8 +110,9 @@ pub async fn on_compilation_success( let expires_in = PresigningConfig::expires_in(DOWNLOAD_URL_EXPIRATION).unwrap(); let presigned_request = s3_client - .get_object_presigned(&file_key, expires_in) - .await?; + .get_object_presigned(&file_key, &expires_in) + .await + .map_err(S3Error::from)?; presigned_urls.push(presigned_request.uri().to_string()); } @@ -133,10 +122,11 @@ pub async fn on_compilation_success( presigned_urls.push("".to_string()); } - db_client + let builder = db_client + .client .client .update_item() - .table_name(db_client.table_name.clone()) + .table_name(db_client.client.table_name.clone()) .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) .update_expression("SET #status = :newStatus, #data = :data") .expression_attribute_names("#status", Status::attribute_name()) @@ -145,8 +135,10 @@ pub async fn on_compilation_success( ":newStatus", AttributeValue::N(2.to_string()), // Ready ) - .expression_attribute_values(":data", AttributeValue::Ss(presigned_urls.clone())) - .send() + .expression_attribute_values(":data", AttributeValue::Ss(presigned_urls.clone())); + + db_client + .update_item_raw(&builder) .await .map_err(DBError::from)?; @@ -156,13 +148,14 @@ pub async fn on_compilation_success( pub async fn on_compilation_failed( id: Uuid, - db_client: &DynamoDBClient, + db_client: &DynamoDBClientWrapper, message: String, ) -> Result { - db_client + let builder = db_client + .client .client .update_item() - .table_name(db_client.table_name.clone()) + .table_name(db_client.client.table_name.clone()) .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) .update_expression("SET #status = :newStatus, #data = :data") .expression_attribute_names("#status", Status::attribute_name()) @@ -171,8 +164,10 @@ pub async fn on_compilation_failed( ":newStatus", AttributeValue::N(3.to_string()), // Failed ) - .expression_attribute_values(":data", AttributeValue::S(message.clone())) - .send() + .expression_attribute_values(":data", AttributeValue::S(message.clone())); + + db_client + .update_item_raw(&builder) .await .map_err(DBError::from)?; diff --git a/crates/worker/src/errors.rs b/crates/worker/src/errors.rs index 8e2e06b..d7bc43c 100644 --- a/crates/worker/src/errors.rs +++ b/crates/worker/src/errors.rs @@ -1,11 +1,26 @@ use types::item::ItemError; -use crate::clients::errors::DBError; +use crate::clients::errors::{DBError, S3Error, SqsDeleteError}; +use crate::commands::errors::{CommandResultHandleError, PreparationError}; #[derive(thiserror::Error, Debug)] -pub enum GlobalPurgeError { +pub enum PurgeError { #[error("DBError: {0}")] DBError(#[from] DBError), + #[error("S3Error: {0}")] + S3Error(#[from] S3Error), #[error("ItemError: {0}")] ItemError(#[from] ItemError), } + +#[derive(thiserror::Error, Debug)] +pub enum MessageProcessorError { + #[error("PreparationError: {0}")] + PreparationError(#[from] PreparationError), + #[error("CommandResultHandleError: {0}")] + CommandResultHandleError(#[from] CommandResultHandleError), + #[error("S3Error: {0}")] + S3Error(#[from] S3Error), + #[error("SqsDeleteError: {0}")] + SqsDeleteError(#[from] SqsDeleteError), +} diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 5af23ae..428309f 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -10,8 +10,8 @@ use aws_config::BehaviorVersion; use aws_runtime::env_config::file::{EnvConfigFileKind, EnvConfigFiles}; use std::num::NonZeroUsize; -use crate::clients::dynamodb_client::DynamoDBClient; -use crate::clients::s3_client::S3Client; +use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; +use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::clients::sqs_clients::wrapper::SqsClientWrapper; use crate::worker::EngineBuilder; @@ -25,6 +25,13 @@ const BUCKET_NAME_DEFAULT: &str = "zksync-compilation-s3"; #[tokio::main] async fn main() { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_ansi(false) + .without_time() // CloudWatch will add the ingestion time + .with_target(false) + .init(); + let profile_name = std::env::var("AWS_PROFILE").unwrap_or(AWS_PROFILE_DEFAULT.into()); let profile_files = EnvConfigFiles::builder() .with_file(EnvConfigFileKind::Credentials, "./credentials") @@ -42,11 +49,11 @@ async fn main() { // Initialize DynamoDb client let db_client = aws_sdk_dynamodb::Client::new(&config); - let db_client = DynamoDBClient::new(db_client, TABLE_NAME_DEFAULT); + let db_client = DynamoDBClientWrapper::new(db_client, TABLE_NAME_DEFAULT); // Initialize S3 client let s3_client = aws_sdk_s3::Client::new(&config); - let s3_client = S3Client::new(s3_client, BUCKET_NAME_DEFAULT); + let s3_client = S3ClientWrapper::new(s3_client, BUCKET_NAME_DEFAULT); let engine = EngineBuilder::new(sqs_client, db_client, s3_client); let running_engine = engine.start(NonZeroUsize::new(10).unwrap()); diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index b6a8349..41336e3 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -1,4 +1,3 @@ -use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::marker::PhantomData; @@ -11,9 +10,10 @@ use tracing::warn; use types::item::{Item, ItemError, Status, TaskResult}; use uuid::Uuid; -use crate::clients::dynamodb_client::DynamoDBClient; -use crate::clients::s3_client::S3Client; -use crate::errors::GlobalPurgeError; +use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; +use crate::clients::errors::DBError; +use crate::clients::s3_clients::wrapper::S3ClientWrapper; +use crate::errors::PurgeError; use crate::utils::lib::{s3_artifacts_dir, s3_compilation_files_dir, timestamp}; pub type Timestamp = u64; @@ -26,7 +26,7 @@ pub struct Purgatory { } impl Purgatory { - pub fn new(db_client: DynamoDBClient, s3_client: S3Client) -> Self { + pub fn new(db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper) -> Self { let handle = NonNull::dangling(); let this = Self { inner: Arc::new(Mutex::new(Inner::new( @@ -66,8 +66,8 @@ impl Purgatory { } struct Inner { state: State, - s3_client: S3Client, - db_client: DynamoDBClient, + s3_client: S3ClientWrapper, + db_client: DynamoDBClientWrapper, // No aliases possible since only we own the data handle: NonNull>, @@ -88,8 +88,8 @@ impl Inner { fn new( handle: NonNull>, state: State, - db_client: DynamoDBClient, - s3_client: S3Client, + db_client: DynamoDBClientWrapper, + s3_client: S3ClientWrapper, ) -> Self { tokio::spawn(Self::global_state_purge( db_client.clone(), @@ -114,33 +114,36 @@ impl Inner { .push((id, to_purge_timestampe)); } - async fn global_state_purge(db_client: DynamoDBClient, s3_client: S3Client) { + async fn global_state_purge( + db_client: DynamoDBClientWrapper, + s3_client: S3ClientWrapper, + ) -> Result<(), PurgeError> { const SYNC_FROM_OFFSET: Option = PURGE_INTERVAL.checked_mul(6); - let mut global_state = GlobalState::new(db_client.clone(), s3_client.clone()); + let mut global_state = GlobalState::new(db_client.clone()); let sync_from = Utc::now() - SYNC_FROM_OFFSET.unwrap(); loop { if global_state.sync(&sync_from).await.is_err() { - break; + break Ok(()); } if global_state.items.is_empty() { - break; + break Ok(()); } let items: Vec = global_state.items.drain(..).collect(); for item in items { - Inner::purge_item(&db_client, &s3_client, &item.id, &item.status).await; + Inner::purge_item(&db_client, &s3_client, &item.id, &item.status).await?; } } } pub async fn purge_item( - db_client: &DynamoDBClient, - s3_client: &S3Client, + db_client: &DynamoDBClientWrapper, + s3_client: &S3ClientWrapper, id: &Uuid, status: &Status, - ) { + ) -> Result<(), PurgeError> { match status { Status::InProgress => warn!("Item compiling for too long!"), Status::Pending => { @@ -148,30 +151,32 @@ impl Inner { // Remove compilation files let files_dir = s3_compilation_files_dir(id.to_string().as_str()); - s3_client.delete_dir(&files_dir).await.unwrap(); + s3_client.delete_dir(&files_dir).await?; // Remove artifacts let artifacts_dir = s3_compilation_files_dir(id.to_string().as_str()); - s3_client.delete_dir(&artifacts_dir).await.unwrap(); // TODO: fix + s3_client.delete_dir(&artifacts_dir).await?; // Second would give neater replies db_client .delete_item(id.to_string().as_str()) .await - .unwrap(); + .map_err(DBError::from)?; } Status::Done(_) => { let dir = s3_artifacts_dir(id.to_string().as_str()); - s3_client.delete_dir(&dir).await.unwrap(); // TODO: fix + s3_client.delete_dir(&dir).await?; db_client .delete_item(id.to_string().as_str()) .await - .unwrap(); + .map_err(DBError::from)?; } } + + Ok(()) } - pub async fn purge(&mut self) { + pub async fn purge(&mut self) -> Result<(), PurgeError> { let now = timestamp(); for (id, timestamp) in self.state.expiration_timestamps.iter() { if *timestamp > now { @@ -185,7 +190,7 @@ impl Inner { continue; }; - Self::purge_item(&self.db_client, &self.s3_client, &id, &status).await; + Self::purge_item(&self.db_client, &self.s3_client, &id, &status).await?; } self.state.expiration_timestamps.retain(|(id, timestamp)| { @@ -196,45 +201,34 @@ impl Inner { self.state.task_status.remove(id); false }); + + Ok(()) } } struct GlobalState { - db_client: DynamoDBClient, - s3_client: S3Client, + db_client: DynamoDBClientWrapper, pub items: Vec, } impl GlobalState { - pub fn new(db_client: DynamoDBClient, s3_client: S3Client) -> Self { + pub fn new(db_client: DynamoDBClientWrapper) -> Self { Self { db_client, - s3_client, items: vec![], } } - pub async fn sync(&mut self, sync_from: &DateTime) -> Result<(), GlobalPurgeError> { + pub async fn sync(&mut self, sync_from: &DateTime) -> Result<(), PurgeError> { const MAX_CAPACITY: usize = 1000; let mut last_evaluated_key = None; loop { let output = self .db_client - .client - .scan() - .table_name(self.db_client.table_name.clone()) - .filter_expression("CreatedAt <= :created_at") - .expression_attribute_values( - ":created_at", - AttributeValue::S(sync_from.to_rfc3339()), - ) - .limit(MAX_CAPACITY as i32) - .set_exclusive_start_key(last_evaluated_key) - .send() + .scan_items_prior_to(sync_from, &last_evaluated_key) .await - .unwrap(); - + .map_err(DBError::from)?; let raw_items = if let Some(items) = output.items { items } else { diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index aac3822..11a2f0b 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -1,33 +1,34 @@ use std::num::NonZeroUsize; use std::time::Duration; use tokio::task::JoinHandle; -use tracing::{error, info, warn}; +use tracing::{error, warn}; use types::{CompilationRequest, SqsMessage, VerificationRequest}; -use crate::clients::dynamodb_client::DynamoDBClient; -use crate::clients::s3_client::S3Client; +use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; +use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::clients::sqs_clients::wrapper::SqsClientWrapper; -use crate::commands::compile::do_compile; +use crate::commands::compile::{do_compile, CompilationInput}; use crate::commands::errors::PreparationError; use crate::commands::utils::{ on_compilation_failed, on_compilation_success, prepare_compile_input, }; +use crate::errors::MessageProcessorError; use crate::purgatory::Purgatory; use crate::sqs_listener::{SqsListener, SqsReceiver}; use crate::utils::lib::s3_compilation_files_dir; pub struct EngineBuilder { sqs_client: SqsClientWrapper, - db_client: DynamoDBClient, - s3_client: S3Client, + db_client: DynamoDBClientWrapper, + s3_client: S3ClientWrapper, running_workers: Vec, } impl EngineBuilder { pub fn new( sqs_client: SqsClientWrapper, - db_client: DynamoDBClient, - s3_client: S3Client, + db_client: DynamoDBClientWrapper, + s3_client: S3ClientWrapper, ) -> Self { EngineBuilder { sqs_client, @@ -59,8 +60,8 @@ pub struct RunningEngine { impl RunningEngine { pub fn new( sqs_listener: SqsListener, - db_client: DynamoDBClient, - s3_client: S3Client, + db_client: DynamoDBClientWrapper, + s3_client: S3ClientWrapper, num_workers: usize, ) -> Self { let purgatory = Purgatory::new(db_client.clone(), s3_client.clone()); @@ -89,8 +90,8 @@ impl RunningEngine { async fn worker( sqs_receiver: SqsReceiver, - db_client: DynamoDBClient, - s3_client: S3Client, + db_client: DynamoDBClientWrapper, + s3_client: S3ClientWrapper, mut purgatory: Purgatory, ) { // TODO: process error @@ -128,7 +129,7 @@ impl RunningEngine { // adjust "visibility timeout" or receiver chan capacity match sqs_message { SqsMessage::Compile { request } => { - Self::process_compile_message( + let result = Self::process_compile_message( request, receipt_handle, &sqs_receiver, @@ -136,7 +137,10 @@ impl RunningEngine { &s3_client, &mut purgatory, ) - .await + .await; + if let Err(err) = result { + error!("{}", err); + } } SqsMessage::Verify { request } => { Self::process_verify_message(request, receipt_handle, &sqs_receiver).await @@ -150,79 +154,85 @@ impl RunningEngine { request: CompilationRequest, receipt_handle: String, sqs_receiver: &SqsReceiver, - db_client: &DynamoDBClient, - s3_client: &S3Client, + db_client: &DynamoDBClientWrapper, + s3_client: &S3ClientWrapper, purgatory: &mut Purgatory, - ) { + ) -> Result<(), MessageProcessorError> { + let compilation_input = Self::handle_prepare_compile_input( + &request, + &receipt_handle, + sqs_receiver, + db_client, + s3_client, + ) + .await?; + let id = request.id; - let compilation_input = match prepare_compile_input(&request, db_client, s3_client).await { - Ok(value) => value, + let task_result = match do_compile(compilation_input).await { + Ok(value) => on_compilation_success(id, &db_client, &s3_client, value).await?, + Err(err) => on_compilation_failed(id, &db_client, err.to_string()).await?, + }; + purgatory.add_record(id, task_result).await; + + // Clean compilation input files right away + let dir = s3_compilation_files_dir(id.to_string().as_str()); + s3_client.delete_dir(&dir).await?; + + sqs_receiver.delete_message(receipt_handle).await?; + Ok(()) + } + + // TODO(future me): extract in a class + pub(crate) async fn handle_prepare_compile_input( + request: &CompilationRequest, + receipt_handle: &str, + sqs_receiver: &SqsReceiver, + db_client: &DynamoDBClientWrapper, + s3_client: &S3ClientWrapper, + ) -> Result { + let id = request.id; + let result = match prepare_compile_input(&request, db_client, s3_client).await { + Ok(value) => Ok(value), Err(PreparationError::NoDBItemError(err)) => { // Possible in case GlobalState purges old message // that somehow stuck in queue for too long - error!("{}", PreparationError::NoDBItemError(err)); - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } - return; + sqs_receiver.delete_message(receipt_handle).await?; + Err(PreparationError::NoDBItemError(err)) } Err(PreparationError::UnexpectedStatusError(err)) => { // Probably some other instance executing this at the same time. // For sake of safety still try to delete it. Doesn't matter if succeeds. // No need to clean up s3 - info!("{}", PreparationError::UnexpectedStatusError(err)); - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } - - return; + sqs_receiver.delete_message(receipt_handle).await?; + Err(PreparationError::UnexpectedStatusError(err)) } Err(PreparationError::VersionNotSupported(err)) => { // Clean everything since the request failed let dir = s3_compilation_files_dir(id.to_string().as_str()); - s3_client.delete_dir(&dir).await.unwrap(); // TODO: do those retriable + s3_client.delete_dir(&dir).await?; // This error doesn't create any artifacts let _ = on_compilation_failed( id, &db_client, - PreparationError::VersionNotSupported(err).to_string(), + PreparationError::VersionNotSupported(err.clone()).to_string(), ) - .await - .unwrap(); + .await?; - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } - return; + sqs_receiver.delete_message(receipt_handle).await?; + Err(PreparationError::VersionNotSupported(err)) } Err(PreparationError::S3Error(err)) => { - warn!("S3Error during preparation - ignoring. {}", err); - return; + // Certain cases don't require delete_message + Err(PreparationError::S3Error(err)) } Err(PreparationError::DBError(err)) => { - warn!("DBError during preparation - ignoring. {}", err); - return; + // Certain cases don't require delete_message + Err(PreparationError::DBError(err)) } }; - let task_result = match do_compile(compilation_input).await { - Ok(value) => on_compilation_success(id, &db_client, &s3_client, value) - .await - .unwrap(), // TODO: unwraps - Err(err) => on_compilation_failed(id, &db_client, err.to_string()) - .await - .unwrap(), - }; - purgatory.add_record(id, task_result).await; - - // Clean compilation input files right away - let dir = s3_compilation_files_dir(id.to_string().as_str()); - s3_client.delete_dir(&dir).await.unwrap(); - - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } + result.map_err(MessageProcessorError::from) } async fn process_verify_message(