From 0a28196d079a20bc1a939a4116fc42927e5077c5 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Sat, 14 Sep 2024 19:46:20 +0900 Subject: [PATCH 1/9] feat: generic retrier introduced --- crates/worker/src/clients.rs | 1 + crates/worker/src/clients/retriable.rs | 141 ++++++++++++++++++ crates/worker/src/clients/s3_client.rs | 61 +++++++- .../worker/src/clients/sqs_clients/client.rs | 71 +++++---- .../worker/src/clients/sqs_clients/wrapper.rs | 105 +------------ 5 files changed, 242 insertions(+), 137 deletions(-) create mode 100644 crates/worker/src/clients/retriable.rs diff --git a/crates/worker/src/clients.rs b/crates/worker/src/clients.rs index e642bf5..59001ef 100644 --- a/crates/worker/src/clients.rs +++ b/crates/worker/src/clients.rs @@ -1,4 +1,5 @@ pub mod dynamodb_client; pub mod errors; +mod retriable; pub mod s3_client; pub mod sqs_clients; diff --git a/crates/worker/src/clients/retriable.rs b/crates/worker/src/clients/retriable.rs new file mode 100644 index 0000000..ba615a6 --- /dev/null +++ b/crates/worker/src/clients/retriable.rs @@ -0,0 +1,141 @@ +use crate::clients::errors::S3DeleteObjectError; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::{sleep, Instant}; + +macro_rules! match_result { + ($err_type:ident, $result:expr) => { + match $result { + Ok(val) => Ok(Some(val)), + Err(err) => match err { + $err_type::ConstructionFailure(_) => Err(err), + $err_type::TimeoutError(_) => Ok(None), + $err_type::DispatchFailure(dispatch_err) => { + if dispatch_err.is_io() { + Ok(None) + } else if dispatch_err.is_timeout() { + Ok(None) + } else if dispatch_err.is_user() { + Err($err_type::DispatchFailure(dispatch_err)) + } else if let Some(other) = dispatch_err.as_other() { + match other { + aws_config::retry::ErrorKind::ClientError => { + Err($err_type::DispatchFailure(dispatch_err)) + } + _ => Ok(None), + } + } else { + Err($err_type::DispatchFailure(dispatch_err)) + } + } + other => Err(other), + }, + } + }; +} +pub(crate) use match_result; + +pub trait ActionHandler { + type Action: Default; + async fn handle(&self, action: Self::Action, state: Arc) -> Option; +} + +pub(crate) fn handle_action_result( + result: Result, E>, + sender: oneshot::Sender>, + state: Arc, +) -> Option>> +where + T: Send + 'static, + E: Send + 'static, +{ + match result { + Ok(Some(val)) => { + state.store(State::Connected as u8, Ordering::Release); + let _ = sender.send(Ok(val)); + None + } + Err(err) => { + let _ = sender.send(Err(err)); + None + } + Ok(None) => { + state.store(State::Reconnecting as u8, Ordering::Release); + Some(sender) + } + } +} +pub enum State { + Connected = 0, + Reconnecting = 1, +} + +pub struct Retrier { + client: T, + receiver: mpsc::Receiver, + state: Arc, +} + +impl Retrier { + pub fn new(client: T, receiver: mpsc::Receiver, state: Arc) -> Self { + Self { + client, + receiver, + state, + } + } + + pub async fn start(mut self) { + const SLEEP_DURATION: Duration = Duration::from_secs(3); + // add lru instead + let mut pending_actions = vec![]; + + loop { + if pending_actions.is_empty() { + if let Some(action) = self.receiver.recv().await { + pending_actions.push(action); + } else { + return; + } + } + + self.resend_pending_actions(&mut pending_actions).await; + + let start_time = Instant::now(); + let value = select! { + value = self.receiver.recv() => value, + _ = sleep(SLEEP_DURATION) => continue, + }; + + if let Some(action) = value { + pending_actions.push(action); + } else { + return; + } + + let elapsed = start_time.elapsed(); + if let Some(remaining_sleep) = SLEEP_DURATION.checked_sub(elapsed) { + sleep(remaining_sleep).await; + } + } + } + + pub async fn resend_pending_actions(&self, pending_actions: &mut Vec) { + let mut pivot = 0; + for i in 0..pending_actions.len() { + let action = std::mem::take(&mut pending_actions[i]); + let action_unhandled = self.client.handle(action, self.state.clone()).await; + + // Keeping in the array to resend. + if let Some(action) = action_unhandled { + pending_actions[pivot] = action; + pivot += 1; + } + } + + pending_actions.truncate(pivot); + } +} diff --git a/crates/worker/src/clients/s3_client.rs b/crates/worker/src/clients/s3_client.rs index 265f962..8453c15 100644 --- a/crates/worker/src/clients/s3_client.rs +++ b/crates/worker/src/clients/s3_client.rs @@ -4,9 +4,13 @@ use aws_sdk_s3::Client; use aws_smithy_types::byte_stream::ByteStream; use std::io::Write; use std::path::Path; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use tokio::sync::oneshot; use tracing::{error, warn}; -use crate::clients::errors::S3Error; +use crate::clients::errors::{S3DeleteObjectError, S3Error}; +use crate::clients::retriable::{handle_action_result, match_result, ActionHandler, State}; use crate::commands::compile::CompilationFile; #[derive(Clone)] @@ -108,10 +112,11 @@ impl S3Client { self.delete_object(key).await?; } - self.delete_object(dir).await + self.delete_object(dir).await?; + Ok(()) } - pub async fn delete_object(&self, key: &str) -> Result<(), S3Error> { + pub async fn delete_object(&self, key: &str) -> Result<(), S3DeleteObjectError> { let _ = self .client .delete_object() @@ -119,6 +124,7 @@ impl S3Client { .key(key) .send() .await?; + Ok(()) } @@ -163,3 +169,52 @@ impl S3Client { Ok(objects) } } + +#[derive(Default)] +pub enum S3Action { + #[default] + Default, + DeleteDir { + dir: String, + sender: oneshot::Sender>, + }, + DeleteObject { + key: String, + sender: oneshot::Sender>, + }, +} + +impl ActionHandler for S3Client { + type Action = S3Action; + async fn handle(&self, action: Self::Action, state: Arc) -> Option { + match action { + S3Action::Default => unreachable!(), + S3Action::DeleteDir { dir, sender } => { + let result = self.delete_dir(&dir).await; + match result { + Ok(()) => { + state.store(State::Connected as u8, Ordering::Release); + let _ = sender.send(Ok(())); + None + } + Err(S3Error::DeleteObjectError(err)) => { + let result = match_result!(S3DeleteObjectError, Err(err)); + let result = result.map_err(S3Error::from); + handle_action_result(result, sender, state) + .map(|sender| S3Action::DeleteDir { dir, sender }) + } + Err(err) => { + let _ = sender.send(Err(err)); + None + } + } + } + S3Action::DeleteObject { key, sender } => { + let result = self.delete_object(&key).await; + let result = match_result!(S3DeleteObjectError, result); + handle_action_result(result, sender, state) + .map(|sender| S3Action::DeleteObject { key, sender }) + } + } + } +} diff --git a/crates/worker/src/clients/sqs_clients/client.rs b/crates/worker/src/clients/sqs_clients/client.rs index e22b477..d9db088 100644 --- a/crates/worker/src/clients/sqs_clients/client.rs +++ b/crates/worker/src/clients/sqs_clients/client.rs @@ -1,41 +1,12 @@ use aws_sdk_sqs::operation::delete_message::DeleteMessageOutput; use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput; use aws_sdk_sqs::Client; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use tokio::sync::oneshot; use crate::clients::errors::{SqsDeleteError, SqsReceiveError}; - -macro_rules! match_result { - ($err_type:ident, $result:expr) => { - match $result { - Ok(val) => Ok(Some(val)), - Err(err) => match err { - $err_type::ConstructionFailure(_) => Err(err), - $err_type::TimeoutError(_) => Ok(None), - $err_type::DispatchFailure(dispatch_err) => { - if dispatch_err.is_io() { - return Ok(None); - } - if dispatch_err.is_timeout() { - return Ok(None); - } - if dispatch_err.is_user() { - return Err($err_type::DispatchFailure(dispatch_err)); - } - if let Some(other) = dispatch_err.as_other() { - return match other { - aws_config::retry::ErrorKind::ClientError => { - Err($err_type::DispatchFailure(dispatch_err)) - } - _ => Ok(None), - }; - } - Err($err_type::DispatchFailure(dispatch_err)) - } - other => Err(other), - }, - } - }; -} +use crate::clients::retriable::{handle_action_result, match_result, ActionHandler}; #[derive(Clone)] pub struct SqsClient { @@ -78,3 +49,37 @@ impl SqsClient { match_result!(SqsDeleteError, result) } } + +#[derive(Default)] +pub enum Action { + #[default] + Default, // TODO: get rid of this. crutches + Receive(oneshot::Sender>), + Delete { + receipt_handle: String, + sender: oneshot::Sender>, + }, +} + +impl ActionHandler for SqsClient { + type Action = Action; + async fn handle(&self, action: Action, state: Arc) -> Option { + match action { + Action::Default => unreachable!(), + Action::Receive(sender) => { + let result = self.receive_attempt().await; + handle_action_result(result, sender, state).map(|sender| Action::Receive(sender)) + } + Action::Delete { + receipt_handle, + sender, + } => { + let result = self.delete_attempt(receipt_handle.clone()).await; + handle_action_result(result, sender, state).map(|sender| Action::Delete { + sender, + receipt_handle, + }) + } + } + } +} diff --git a/crates/worker/src/clients/sqs_clients/wrapper.rs b/crates/worker/src/clients/sqs_clients/wrapper.rs index 76b6b6f..bbad884 100644 --- a/crates/worker/src/clients/sqs_clients/wrapper.rs +++ b/crates/worker/src/clients/sqs_clients/wrapper.rs @@ -9,23 +9,8 @@ use tokio::sync::{mpsc, oneshot}; use tokio::time::{sleep, Instant}; use crate::clients::errors::{SqsDeleteError, SqsReceiveError}; -use crate::clients::sqs_clients::client::SqsClient; - -#[derive(Default)] -pub enum Action { - #[default] - Default, // TODO: get rid of this. crutches - Receive(oneshot::Sender>), - Delete { - receipt_handle: String, - sender: oneshot::Sender>, - }, -} - -enum State { - Connected = 0, - Reconnecting = 1, -} +use crate::clients::retriable::{Retrier, State}; +use crate::clients::sqs_clients::client::{Action, SqsClient}; #[derive(Clone)] pub struct SqsClientWrapper { @@ -40,7 +25,8 @@ impl SqsClientWrapper { let state = Arc::new(AtomicU8::new(State::Connected as u8)); let (sender, receiver) = mpsc::channel(1000); - tokio::spawn(Self::worker(client.clone(), state.clone(), receiver)); + let retrier = Retrier::new(client.clone(), receiver, state.clone()); + tokio::spawn(retrier.start()); Self { client, @@ -49,89 +35,6 @@ impl SqsClientWrapper { } } - async fn worker(client: SqsClient, state: Arc, mut receiver: mpsc::Receiver) { - const SLEEP_DURATION: Duration = Duration::from_secs(3); - let mut pending_actions = vec![]; - - loop { - if pending_actions.is_empty() { - if let Some(action) = receiver.recv().await { - pending_actions.push(action); - } else { - return; - } - } - - Self::resend_pending_actions(&mut pending_actions, &client, &state).await; - - let start_time = Instant::now(); - let value = select! { - value = receiver.recv() => value, - _ = sleep(SLEEP_DURATION) => continue, - }; - - if let Some(action) = value { - pending_actions.push(action); - } else { - return; - } - - let elapsed = start_time.elapsed(); - if let Some(remaining_sleep) = SLEEP_DURATION.checked_sub(elapsed) { - sleep(remaining_sleep).await; - } - } - } - - pub async fn resend_pending_actions( - pending_actions: &mut Vec, - client: &SqsClient, - state: &Arc, - ) { - let mut pivot = 0; - for i in 0..pending_actions.len() { - let action = std::mem::take(&mut pending_actions[i]); - match action { - Action::Receive(sender) => match client.receive_attempt().await { - Ok(Some(val)) => { - state.store(State::Connected as u8, Ordering::Release); - let _ = sender.send(Ok(val)); - } - Err(err) => { - let _ = sender.send(Err(err)); - } - Ok(None) => { - // Keeping in the array to resend. - pending_actions[pivot] = Action::Receive(sender); - pivot += 1; - } - }, - Action::Delete { - receipt_handle, - sender, - } => match client.delete_attempt(receipt_handle.clone()).await { - Ok(Some(val)) => { - state.store(State::Connected as u8, Ordering::Release); - let _ = sender.send(Ok(val)); - } - Err(err) => { - let _ = sender.send(Err(err)); - } - Ok(None) => { - pending_actions[pivot] = Action::Delete { - receipt_handle, - sender, - }; - pivot += 1; - } - }, - Action::Default => unreachable!(), - }; - } - - pending_actions.truncate(pivot); - } - pub async fn receive_message(&self) -> Result { match self.state.load(Ordering::Acquire) { 0 => match self.client.receive_attempt().await { From b1b8a5053134c13c3b1e9a941433d6e04589814f Mon Sep 17 00:00:00 2001 From: taco-paco Date: Mon, 16 Sep 2024 18:41:05 +0900 Subject: [PATCH 2/9] feat: s3 retriable client --- crates/worker/src/clients.rs | 2 +- crates/worker/src/clients/retriable.rs | 7 +- .../{s3_client.rs => s3_clients/client.rs} | 165 +++++++++++++---- crates/worker/src/clients/s3_clients/mod.rs | 2 + .../worker/src/clients/s3_clients/wrapper.rs | 168 ++++++++++++++++++ .../worker/src/clients/sqs_clients/client.rs | 2 +- .../worker/src/clients/sqs_clients/wrapper.rs | 3 - crates/worker/src/commands/utils.rs | 19 +- crates/worker/src/errors.rs | 19 +- crates/worker/src/main.rs | 4 +- crates/worker/src/purgatory.rs | 49 ++--- crates/worker/src/worker.rs | 114 ++++++------ 12 files changed, 428 insertions(+), 126 deletions(-) rename crates/worker/src/clients/{s3_client.rs => s3_clients/client.rs} (54%) create mode 100644 crates/worker/src/clients/s3_clients/mod.rs create mode 100644 crates/worker/src/clients/s3_clients/wrapper.rs diff --git a/crates/worker/src/clients.rs b/crates/worker/src/clients.rs index 59001ef..3d221f6 100644 --- a/crates/worker/src/clients.rs +++ b/crates/worker/src/clients.rs @@ -1,5 +1,5 @@ pub mod dynamodb_client; pub mod errors; mod retriable; -pub mod s3_client; +pub mod s3_clients; pub mod sqs_clients; diff --git a/crates/worker/src/clients/retriable.rs b/crates/worker/src/clients/retriable.rs index ba615a6..a42d39e 100644 --- a/crates/worker/src/clients/retriable.rs +++ b/crates/worker/src/clients/retriable.rs @@ -1,4 +1,3 @@ -use crate::clients::errors::S3DeleteObjectError; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -39,7 +38,7 @@ macro_rules! match_result { pub(crate) use match_result; pub trait ActionHandler { - type Action: Default; + type Action: Default; // TODO: get rid of default async fn handle(&self, action: Self::Action, state: Arc) -> Option; } @@ -73,13 +72,13 @@ pub enum State { Reconnecting = 1, } -pub struct Retrier { +pub struct Retrier { client: T, receiver: mpsc::Receiver, state: Arc, } -impl Retrier { +impl Retrier { pub fn new(client: T, receiver: mpsc::Receiver, state: Arc) -> Self { Self { client, diff --git a/crates/worker/src/clients/s3_client.rs b/crates/worker/src/clients/s3_clients/client.rs similarity index 54% rename from crates/worker/src/clients/s3_client.rs rename to crates/worker/src/clients/s3_clients/client.rs index 8453c15..7082aed 100644 --- a/crates/worker/src/clients/s3_client.rs +++ b/crates/worker/src/clients/s3_clients/client.rs @@ -2,15 +2,19 @@ use aws_sdk_s3::presigning::{PresignedRequest, PresigningConfig}; use aws_sdk_s3::types::Object; use aws_sdk_s3::Client; use aws_smithy_types::byte_stream::ByteStream; -use std::io::Write; +use std::io::{SeekFrom, Write}; use std::path::Path; -use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::atomic::AtomicU8; use std::sync::Arc; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::sync::oneshot; use tracing::{error, warn}; -use crate::clients::errors::{S3DeleteObjectError, S3Error}; -use crate::clients::retriable::{handle_action_result, match_result, ActionHandler, State}; +use crate::clients::errors::{ + S3DeleteObjectError, S3Error, S3GetObjectError, S3ListObjectsError, S3PutObjectError, +}; +use crate::clients::retriable::{handle_action_result, match_result, ActionHandler}; use crate::commands::compile::CompilationFile; #[derive(Clone)] @@ -54,6 +58,22 @@ impl S3Client { Ok(files) } + pub async fn extract_files_attempt( + &self, + dir: &str, + ) -> Result>, S3Error> { + let result = self.extract_files(dir).await; + match result { + Err(S3Error::ListObjectsError(err)) => { + match_result!(S3ListObjectsError, Err(err)).map_err(S3Error::from) + } + Err(S3Error::GetObjectError(err)) => { + match_result!(S3GetObjectError, Err(err)).map_err(S3Error::from) + } + result => result.map(|value| Some(value)), + } + } + pub async fn get_object_into(&self, key: &str, writer: &mut impl Write) -> Result<(), S3Error> { let mut object = self .client @@ -80,19 +100,32 @@ impl S3Client { pub async fn get_object_presigned( &self, key: &str, - expires_in: PresigningConfig, - ) -> Result { - Ok(self - .client + expires_in: &PresigningConfig, + ) -> Result { + self.client .get_object() .bucket(self.bucket_name.clone()) .key(key.to_string()) - .presigned(expires_in) + .presigned(expires_in.clone()) .await - .map_err(S3Error::from)?) } - pub async fn put_object(&self, key: &str, data: impl Into) -> Result<(), S3Error> { + pub async fn get_object_presigned_attempt( + &self, + key: &str, + expires_in: &PresigningConfig, + ) -> Result, S3GetObjectError> { + match_result!( + S3GetObjectError, + self.get_object_presigned(key, expires_in).await + ) + } + + pub async fn put_object( + &self, + key: &str, + data: impl Into, + ) -> Result<(), S3PutObjectError> { let _ = self .client .put_object() @@ -105,8 +138,17 @@ impl S3Client { Ok(()) } + pub async fn put_object_attempt( + &self, + key: &str, + data: impl Into, + ) -> Result, S3PutObjectError> { + match_result!(S3PutObjectError, self.put_object(key, data).await) + } + pub async fn delete_dir(&self, dir: &str) -> Result<(), S3Error> { let objects = self.list_all_keys(dir).await?; + // TODO: delete_objects instead for object in objects { let key = object.key().ok_or(S3Error::InvalidObjectError)?; self.delete_object(key).await?; @@ -116,6 +158,15 @@ impl S3Client { Ok(()) } + pub async fn delete_dir_attempt(&self, dir: &str) -> Result, S3Error> { + match self.delete_dir(dir).await { + Err(S3Error::DeleteObjectError(err)) => { + match_result!(S3DeleteObjectError, Err(err)).map_err(S3Error::from) + } + result => result.map(|value| Some(value)), + } + } + pub async fn delete_object(&self, key: &str) -> Result<(), S3DeleteObjectError> { let _ = self .client @@ -128,7 +179,14 @@ impl S3Client { Ok(()) } - pub async fn list_all_keys(&self, dir: &str) -> Result, S3Error> { + pub async fn delete_object_attempt( + &self, + key: &str, + ) -> Result, S3DeleteObjectError> { + match_result!(S3DeleteObjectError, self.delete_object(key).await) + } + + pub async fn list_all_keys(&self, dir: &str) -> Result, S3ListObjectsError> { let mut objects = Vec::new(); let mut continuation_token: Option = None; @@ -182,6 +240,20 @@ pub enum S3Action { key: String, sender: oneshot::Sender>, }, + ExtractFiles { + dir: String, + sender: oneshot::Sender, S3Error>>, + }, + PutObject { + key: String, + file: File, + sender: oneshot::Sender>, + }, + GetObjectPresigned { + key: String, + expires_in: PresigningConfig, + sender: oneshot::Sender>, + }, } impl ActionHandler for S3Client { @@ -190,31 +262,60 @@ impl ActionHandler for S3Client { match action { S3Action::Default => unreachable!(), S3Action::DeleteDir { dir, sender } => { - let result = self.delete_dir(&dir).await; - match result { - Ok(()) => { - state.store(State::Connected as u8, Ordering::Release); - let _ = sender.send(Ok(())); - None - } - Err(S3Error::DeleteObjectError(err)) => { - let result = match_result!(S3DeleteObjectError, Err(err)); - let result = result.map_err(S3Error::from); - handle_action_result(result, sender, state) - .map(|sender| S3Action::DeleteDir { dir, sender }) - } - Err(err) => { - let _ = sender.send(Err(err)); - None - } - } + let result = self.delete_dir_attempt(&dir).await; + handle_action_result(result, sender, state) + .map(|sender| S3Action::DeleteDir { dir, sender }) } S3Action::DeleteObject { key, sender } => { - let result = self.delete_object(&key).await; - let result = match_result!(S3DeleteObjectError, result); + let result = self.delete_object_attempt(&key).await; handle_action_result(result, sender, state) .map(|sender| S3Action::DeleteObject { key, sender }) } + S3Action::ExtractFiles { dir, sender } => { + let result = self.extract_files_attempt(&dir).await; + handle_action_result(result, sender, state) + .map(|sender| S3Action::ExtractFiles { dir, sender }) + } + S3Action::GetObjectPresigned { + key, + expires_in, + sender, + } => { + let result = self.get_object_presigned_attempt(&key, &expires_in).await; + handle_action_result(result, sender, state).map(|sender| { + S3Action::GetObjectPresigned { + key, + expires_in, + sender, + } + }) + } + S3Action::PutObject { + key, + mut file, + sender, + } => { + let mut buf = Vec::new(); + if let Err(err) = file.read_to_end(&mut buf).await { + let _ = sender.send(Err(S3Error::IoError(err))); + return None; + }; + + let result = self + .put_object_attempt(&key, buf) + .await + .map_err(S3Error::from); + if let Some(sender) = handle_action_result(result, sender, state) { + if let Err(err) = file.seek(SeekFrom::Start(0)).await { + let _ = sender.send(Err(S3Error::IoError(err))); + None + } else { + Some(S3Action::PutObject { key, file, sender }) + } + } else { + None + } + } } } } diff --git a/crates/worker/src/clients/s3_clients/mod.rs b/crates/worker/src/clients/s3_clients/mod.rs new file mode 100644 index 0000000..8402e4f --- /dev/null +++ b/crates/worker/src/clients/s3_clients/mod.rs @@ -0,0 +1,2 @@ +pub mod client; +pub mod wrapper; diff --git a/crates/worker/src/clients/s3_clients/wrapper.rs b/crates/worker/src/clients/s3_clients/wrapper.rs new file mode 100644 index 0000000..1869ae2 --- /dev/null +++ b/crates/worker/src/clients/s3_clients/wrapper.rs @@ -0,0 +1,168 @@ +use crate::clients::errors::{S3DeleteObjectError, S3Error, S3GetObjectError}; +use crate::clients::retriable::{Retrier, State}; +use aws_sdk_s3::presigning::{PresignedRequest, PresigningConfig}; +use aws_sdk_s3::Client; +use std::io::SeekFrom; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::sync::{mpsc, oneshot}; + +use crate::clients::s3_clients::client::{S3Action, S3Client}; +use crate::commands::compile::CompilationFile; + +#[derive(Clone)] +pub struct S3ClientWrapper { + pub client: S3Client, + actions_sender: mpsc::Sender, + state: Arc, +} + +impl S3ClientWrapper { + pub fn new(client: Client, bucket_name: &str) -> Self { + let client = S3Client::new(client, bucket_name); + let state = Arc::new(AtomicU8::new(State::Connected as u8)); + let (sender, receiver) = mpsc::channel(1000); + + let retrier = Retrier::new(client.clone(), receiver, state.clone()); + tokio::spawn(retrier.start()); + + Self { + client, + state, + actions_sender: sender, + } + } + + pub async fn delete_dir(&self, dir: &str) -> Result<(), S3Error> { + match self.state.load(Ordering::Acquire) { + 0 => match self.client.delete_dir_attempt(dir).await { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(S3Action::DeleteDir { + dir: dir.to_string(), + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn delete_object(&self, key: &str) -> Result<(), S3DeleteObjectError> { + match self.state.load(Ordering::Acquire) { + 0 => match self.client.delete_object_attempt(key).await { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(S3Action::DeleteObject { + key: key.to_string(), + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn extract_files(&self, dir: &str) -> Result, S3Error> { + match self.state.load(Ordering::Acquire) { + 0 => match self.client.extract_files_attempt(dir).await { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(S3Action::ExtractFiles { + dir: dir.to_string(), + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn put_object(&self, key: &str, mut file: File) -> Result<(), S3Error> { + match self.state.load(Ordering::Acquire) { + 0 => { + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await?; + match self.client.put_object_attempt(key, buf).await { + Ok(Some(val)) => return Ok(val), + Ok(None) => { + self.state + .store(State::Reconnecting as u8, Ordering::Release); + file.seek(SeekFrom::Start(0)).await?; + } + Err(err) => return Err(err.into()), + } + } + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(S3Action::PutObject { + key: key.to_string(), + file, + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn get_object_presigned( + &self, + key: &str, + expires_in: &PresigningConfig, + ) -> Result { + match self.state.load(Ordering::Acquire) { + 0 => match self + .client + .get_object_presigned_attempt(key, expires_in) + .await + { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(S3Action::GetObjectPresigned { + key: key.to_string(), + expires_in: expires_in.clone(), + sender, + }) + .await; + receiver.await.unwrap() + } +} diff --git a/crates/worker/src/clients/sqs_clients/client.rs b/crates/worker/src/clients/sqs_clients/client.rs index d9db088..dc5d22c 100644 --- a/crates/worker/src/clients/sqs_clients/client.rs +++ b/crates/worker/src/clients/sqs_clients/client.rs @@ -1,7 +1,7 @@ use aws_sdk_sqs::operation::delete_message::DeleteMessageOutput; use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput; use aws_sdk_sqs::Client; -use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::atomic::AtomicU8; use std::sync::Arc; use tokio::sync::oneshot; diff --git a/crates/worker/src/clients/sqs_clients/wrapper.rs b/crates/worker/src/clients/sqs_clients/wrapper.rs index bbad884..b55cfa0 100644 --- a/crates/worker/src/clients/sqs_clients/wrapper.rs +++ b/crates/worker/src/clients/sqs_clients/wrapper.rs @@ -3,10 +3,7 @@ use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput; use aws_sdk_sqs::Client; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; -use std::time::Duration; -use tokio::select; use tokio::sync::{mpsc, oneshot}; -use tokio::time::{sleep, Instant}; use crate::clients::errors::{SqsDeleteError, SqsReceiveError}; use crate::clients::retriable::{Retrier, State}; diff --git a/crates/worker/src/commands/utils.rs b/crates/worker/src/commands/utils.rs index 6bb8887..1adb3ea 100644 --- a/crates/worker/src/commands/utils.rs +++ b/crates/worker/src/commands/utils.rs @@ -12,12 +12,13 @@ use types::{CompilationRequest, ARTIFACTS_FOLDER}; use uuid::Uuid; use crate::clients::dynamodb_client::DynamoDBClient; -use crate::clients::errors::DBError; -use crate::clients::s3_client::S3Client; +use crate::clients::errors::{DBError, S3Error}; +use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::commands::compile::{CompilationInput, CompilationOutput}; use crate::commands::errors::{CommandResultHandleError, PreparationError}; +use crate::sqs_listener::SqsReceiver; use crate::utils::cleaner::AutoCleanUp; -use crate::utils::lib::{SOL_ROOT, ZKSOLC_VERSIONS}; +use crate::utils::lib::{s3_compilation_files_dir, SOL_ROOT, ZKSOLC_VERSIONS}; async fn try_set_compiling_status( db_client: &DynamoDBClient, @@ -60,7 +61,7 @@ async fn try_set_compiling_status( pub(crate) async fn prepare_compile_input( request: &CompilationRequest, db_client: &DynamoDBClient, - s3_client: &S3Client, + s3_client: &S3ClientWrapper, ) -> Result { let zksolc_version = request.config.version.as_str(); if !ZKSOLC_VERSIONS.contains(&zksolc_version) { @@ -95,10 +96,11 @@ pub(crate) async fn prepare_compile_input( contracts: files, }) } + pub async fn on_compilation_success( id: Uuid, db_client: &DynamoDBClient, - s3_client: &S3Client, + s3_client: &S3ClientWrapper, compilation_output: CompilationOutput, ) -> Result { const DOWNLOAD_URL_EXPIRATION: Duration = Duration::from_secs(5 * 60 * 60); @@ -110,7 +112,7 @@ pub async fn on_compilation_success( let mut presigned_urls = Vec::with_capacity(compilation_output.artifacts_data.len()); for el in compilation_output.artifacts_data { let absolute_path = compilation_output.artifacts_dir.join(&el.file_path); - let file_content = tokio::fs::read(absolute_path).await?; + let file_content = tokio::fs::File::open(absolute_path).await?; let file_key = format!( "{}/{}/{}", @@ -122,8 +124,9 @@ pub async fn on_compilation_success( let expires_in = PresigningConfig::expires_in(DOWNLOAD_URL_EXPIRATION).unwrap(); let presigned_request = s3_client - .get_object_presigned(&file_key, expires_in) - .await?; + .get_object_presigned(&file_key, &expires_in) + .await + .map_err(S3Error::from)?; presigned_urls.push(presigned_request.uri().to_string()); } diff --git a/crates/worker/src/errors.rs b/crates/worker/src/errors.rs index 8e2e06b..d7bc43c 100644 --- a/crates/worker/src/errors.rs +++ b/crates/worker/src/errors.rs @@ -1,11 +1,26 @@ use types::item::ItemError; -use crate::clients::errors::DBError; +use crate::clients::errors::{DBError, S3Error, SqsDeleteError}; +use crate::commands::errors::{CommandResultHandleError, PreparationError}; #[derive(thiserror::Error, Debug)] -pub enum GlobalPurgeError { +pub enum PurgeError { #[error("DBError: {0}")] DBError(#[from] DBError), + #[error("S3Error: {0}")] + S3Error(#[from] S3Error), #[error("ItemError: {0}")] ItemError(#[from] ItemError), } + +#[derive(thiserror::Error, Debug)] +pub enum MessageProcessorError { + #[error("PreparationError: {0}")] + PreparationError(#[from] PreparationError), + #[error("CommandResultHandleError: {0}")] + CommandResultHandleError(#[from] CommandResultHandleError), + #[error("S3Error: {0}")] + S3Error(#[from] S3Error), + #[error("SqsDeleteError: {0}")] + SqsDeleteError(#[from] SqsDeleteError), +} diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 5af23ae..4457b85 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -11,7 +11,7 @@ use aws_runtime::env_config::file::{EnvConfigFileKind, EnvConfigFiles}; use std::num::NonZeroUsize; use crate::clients::dynamodb_client::DynamoDBClient; -use crate::clients::s3_client::S3Client; +use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::clients::sqs_clients::wrapper::SqsClientWrapper; use crate::worker::EngineBuilder; @@ -46,7 +46,7 @@ async fn main() { // Initialize S3 client let s3_client = aws_sdk_s3::Client::new(&config); - let s3_client = S3Client::new(s3_client, BUCKET_NAME_DEFAULT); + let s3_client = S3ClientWrapper::new(s3_client, BUCKET_NAME_DEFAULT); let engine = EngineBuilder::new(sqs_client, db_client, s3_client); let running_engine = engine.start(NonZeroUsize::new(10).unwrap()); diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index b6a8349..2d9fa89 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -12,8 +12,8 @@ use types::item::{Item, ItemError, Status, TaskResult}; use uuid::Uuid; use crate::clients::dynamodb_client::DynamoDBClient; -use crate::clients::s3_client::S3Client; -use crate::errors::GlobalPurgeError; +use crate::clients::s3_clients::wrapper::S3ClientWrapper; +use crate::errors::PurgeError; use crate::utils::lib::{s3_artifacts_dir, s3_compilation_files_dir, timestamp}; pub type Timestamp = u64; @@ -26,7 +26,7 @@ pub struct Purgatory { } impl Purgatory { - pub fn new(db_client: DynamoDBClient, s3_client: S3Client) -> Self { + pub fn new(db_client: DynamoDBClient, s3_client: S3ClientWrapper) -> Self { let handle = NonNull::dangling(); let this = Self { inner: Arc::new(Mutex::new(Inner::new( @@ -66,7 +66,7 @@ impl Purgatory { } struct Inner { state: State, - s3_client: S3Client, + s3_client: S3ClientWrapper, db_client: DynamoDBClient, // No aliases possible since only we own the data @@ -89,7 +89,7 @@ impl Inner { handle: NonNull>, state: State, db_client: DynamoDBClient, - s3_client: S3Client, + s3_client: S3ClientWrapper, ) -> Self { tokio::spawn(Self::global_state_purge( db_client.clone(), @@ -114,7 +114,10 @@ impl Inner { .push((id, to_purge_timestampe)); } - async fn global_state_purge(db_client: DynamoDBClient, s3_client: S3Client) { + async fn global_state_purge( + db_client: DynamoDBClient, + s3_client: S3ClientWrapper, + ) -> Result<(), PurgeError> { const SYNC_FROM_OFFSET: Option = PURGE_INTERVAL.checked_mul(6); let mut global_state = GlobalState::new(db_client.clone(), s3_client.clone()); @@ -122,25 +125,25 @@ impl Inner { loop { if global_state.sync(&sync_from).await.is_err() { - break; + break Ok(()); } if global_state.items.is_empty() { - break; + break Ok(()); } let items: Vec = global_state.items.drain(..).collect(); for item in items { - Inner::purge_item(&db_client, &s3_client, &item.id, &item.status).await; + Inner::purge_item(&db_client, &s3_client, &item.id, &item.status).await?; } } } pub async fn purge_item( db_client: &DynamoDBClient, - s3_client: &S3Client, + s3_client: &S3ClientWrapper, id: &Uuid, status: &Status, - ) { + ) -> Result<(), PurgeError> { match status { Status::InProgress => warn!("Item compiling for too long!"), Status::Pending => { @@ -148,30 +151,32 @@ impl Inner { // Remove compilation files let files_dir = s3_compilation_files_dir(id.to_string().as_str()); - s3_client.delete_dir(&files_dir).await.unwrap(); + s3_client.delete_dir(&files_dir).await?; // Remove artifacts let artifacts_dir = s3_compilation_files_dir(id.to_string().as_str()); - s3_client.delete_dir(&artifacts_dir).await.unwrap(); // TODO: fix + s3_client.delete_dir(&artifacts_dir).await?; // Second would give neater replies db_client .delete_item(id.to_string().as_str()) .await - .unwrap(); + .unwrap(); // TODO: unwraps } Status::Done(_) => { let dir = s3_artifacts_dir(id.to_string().as_str()); - s3_client.delete_dir(&dir).await.unwrap(); // TODO: fix + s3_client.delete_dir(&dir).await?; db_client .delete_item(id.to_string().as_str()) .await - .unwrap(); + .unwrap(); // TODO: unwraps } } + + Ok(()) } - pub async fn purge(&mut self) { + pub async fn purge(&mut self) -> Result<(), PurgeError> { let now = timestamp(); for (id, timestamp) in self.state.expiration_timestamps.iter() { if *timestamp > now { @@ -185,7 +190,7 @@ impl Inner { continue; }; - Self::purge_item(&self.db_client, &self.s3_client, &id, &status).await; + Self::purge_item(&self.db_client, &self.s3_client, &id, &status).await?; } self.state.expiration_timestamps.retain(|(id, timestamp)| { @@ -196,17 +201,19 @@ impl Inner { self.state.task_status.remove(id); false }); + + Ok(()) } } struct GlobalState { db_client: DynamoDBClient, - s3_client: S3Client, + s3_client: S3ClientWrapper, pub items: Vec, } impl GlobalState { - pub fn new(db_client: DynamoDBClient, s3_client: S3Client) -> Self { + pub fn new(db_client: DynamoDBClient, s3_client: S3ClientWrapper) -> Self { Self { db_client, s3_client, @@ -214,7 +221,7 @@ impl GlobalState { } } - pub async fn sync(&mut self, sync_from: &DateTime) -> Result<(), GlobalPurgeError> { + pub async fn sync(&mut self, sync_from: &DateTime) -> Result<(), PurgeError> { const MAX_CAPACITY: usize = 1000; let mut last_evaluated_key = None; diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index aac3822..efbab41 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -5,13 +5,14 @@ use tracing::{error, info, warn}; use types::{CompilationRequest, SqsMessage, VerificationRequest}; use crate::clients::dynamodb_client::DynamoDBClient; -use crate::clients::s3_client::S3Client; +use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::clients::sqs_clients::wrapper::SqsClientWrapper; -use crate::commands::compile::do_compile; +use crate::commands::compile::{do_compile, CompilationInput}; use crate::commands::errors::PreparationError; use crate::commands::utils::{ on_compilation_failed, on_compilation_success, prepare_compile_input, }; +use crate::errors::MessageProcessorError; use crate::purgatory::Purgatory; use crate::sqs_listener::{SqsListener, SqsReceiver}; use crate::utils::lib::s3_compilation_files_dir; @@ -19,7 +20,7 @@ use crate::utils::lib::s3_compilation_files_dir; pub struct EngineBuilder { sqs_client: SqsClientWrapper, db_client: DynamoDBClient, - s3_client: S3Client, + s3_client: S3ClientWrapper, running_workers: Vec, } @@ -27,7 +28,7 @@ impl EngineBuilder { pub fn new( sqs_client: SqsClientWrapper, db_client: DynamoDBClient, - s3_client: S3Client, + s3_client: S3ClientWrapper, ) -> Self { EngineBuilder { sqs_client, @@ -60,7 +61,7 @@ impl RunningEngine { pub fn new( sqs_listener: SqsListener, db_client: DynamoDBClient, - s3_client: S3Client, + s3_client: S3ClientWrapper, num_workers: usize, ) -> Self { let purgatory = Purgatory::new(db_client.clone(), s3_client.clone()); @@ -90,7 +91,7 @@ impl RunningEngine { async fn worker( sqs_receiver: SqsReceiver, db_client: DynamoDBClient, - s3_client: S3Client, + s3_client: S3ClientWrapper, mut purgatory: Purgatory, ) { // TODO: process error @@ -128,7 +129,7 @@ impl RunningEngine { // adjust "visibility timeout" or receiver chan capacity match sqs_message { SqsMessage::Compile { request } => { - Self::process_compile_message( + let result = Self::process_compile_message( request, receipt_handle, &sqs_receiver, @@ -136,7 +137,10 @@ impl RunningEngine { &s3_client, &mut purgatory, ) - .await + .await; + if let Err(err) = result { + error!("{}", err); + } } SqsMessage::Verify { request } => { Self::process_verify_message(request, receipt_handle, &sqs_receiver).await @@ -151,78 +155,84 @@ impl RunningEngine { receipt_handle: String, sqs_receiver: &SqsReceiver, db_client: &DynamoDBClient, - s3_client: &S3Client, + s3_client: &S3ClientWrapper, purgatory: &mut Purgatory, - ) { + ) -> Result<(), MessageProcessorError> { + let compilation_input = Self::handle_prepare_compile_input( + &request, + &receipt_handle, + sqs_receiver, + db_client, + s3_client, + ) + .await?; + + let id = request.id; + let task_result = match do_compile(compilation_input).await { + Ok(value) => on_compilation_success(id, &db_client, &s3_client, value).await?, + Err(err) => on_compilation_failed(id, &db_client, err.to_string()).await?, + }; + purgatory.add_record(id, task_result).await; + + // Clean compilation input files right away + let dir = s3_compilation_files_dir(id.to_string().as_str()); + s3_client.delete_dir(&dir).await?; + + sqs_receiver.delete_message(receipt_handle).await?; + Ok(()) + } + + // TODO(future me): extract in a class + pub(crate) async fn handle_prepare_compile_input( + request: &CompilationRequest, + receipt_handle: &str, + sqs_receiver: &SqsReceiver, + db_client: &DynamoDBClient, + s3_client: &S3ClientWrapper, + ) -> Result { let id = request.id; - let compilation_input = match prepare_compile_input(&request, db_client, s3_client).await { - Ok(value) => value, + let result = match prepare_compile_input(&request, db_client, s3_client).await { + Ok(value) => Ok(value), Err(PreparationError::NoDBItemError(err)) => { // Possible in case GlobalState purges old message // that somehow stuck in queue for too long - error!("{}", PreparationError::NoDBItemError(err)); - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } - return; + sqs_receiver.delete_message(receipt_handle).await?; + Err(PreparationError::NoDBItemError(err)) } Err(PreparationError::UnexpectedStatusError(err)) => { // Probably some other instance executing this at the same time. // For sake of safety still try to delete it. Doesn't matter if succeeds. // No need to clean up s3 - info!("{}", PreparationError::UnexpectedStatusError(err)); - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } - - return; + sqs_receiver.delete_message(receipt_handle).await?; + Err(PreparationError::UnexpectedStatusError(err)) } Err(PreparationError::VersionNotSupported(err)) => { // Clean everything since the request failed let dir = s3_compilation_files_dir(id.to_string().as_str()); - s3_client.delete_dir(&dir).await.unwrap(); // TODO: do those retriable + s3_client.delete_dir(&dir).await?; // This error doesn't create any artifacts let _ = on_compilation_failed( id, &db_client, - PreparationError::VersionNotSupported(err).to_string(), + PreparationError::VersionNotSupported(err.clone()).to_string(), ) - .await - .unwrap(); + .await?; - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } - return; + sqs_receiver.delete_message(receipt_handle).await?; + Err(PreparationError::VersionNotSupported(err)) } Err(PreparationError::S3Error(err)) => { - warn!("S3Error during preparation - ignoring. {}", err); - return; + // Certain cases don't require delete_message + Err(PreparationError::S3Error(err)) } Err(PreparationError::DBError(err)) => { - warn!("DBError during preparation - ignoring. {}", err); - return; + // Certain cases don't require delete_message + Err(PreparationError::DBError(err)) } }; - let task_result = match do_compile(compilation_input).await { - Ok(value) => on_compilation_success(id, &db_client, &s3_client, value) - .await - .unwrap(), // TODO: unwraps - Err(err) => on_compilation_failed(id, &db_client, err.to_string()) - .await - .unwrap(), - }; - purgatory.add_record(id, task_result).await; - - // Clean compilation input files right away - let dir = s3_compilation_files_dir(id.to_string().as_str()); - s3_client.delete_dir(&dir).await.unwrap(); - - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } + result.map_err(MessageProcessorError::from) } async fn process_verify_message( From 523f48abfc99c90049e27a266b543322d96d7c43 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Mon, 16 Sep 2024 18:46:12 +0900 Subject: [PATCH 3/9] feat: file structure preparation for DynamoDbWrapper --- crates/worker/src/clients.rs | 2 +- .../clients/{dynamodb_client.rs => dynamodb_clients/client.rs} | 0 crates/worker/src/clients/dynamodb_clients/mod.rs | 1 + crates/worker/src/commands/utils.rs | 2 +- crates/worker/src/main.rs | 2 +- crates/worker/src/purgatory.rs | 2 +- crates/worker/src/worker.rs | 2 +- 7 files changed, 6 insertions(+), 5 deletions(-) rename crates/worker/src/clients/{dynamodb_client.rs => dynamodb_clients/client.rs} (100%) create mode 100644 crates/worker/src/clients/dynamodb_clients/mod.rs diff --git a/crates/worker/src/clients.rs b/crates/worker/src/clients.rs index 3d221f6..557002e 100644 --- a/crates/worker/src/clients.rs +++ b/crates/worker/src/clients.rs @@ -1,4 +1,4 @@ -pub mod dynamodb_client; +pub mod dynamodb_clients; pub mod errors; mod retriable; pub mod s3_clients; diff --git a/crates/worker/src/clients/dynamodb_client.rs b/crates/worker/src/clients/dynamodb_clients/client.rs similarity index 100% rename from crates/worker/src/clients/dynamodb_client.rs rename to crates/worker/src/clients/dynamodb_clients/client.rs diff --git a/crates/worker/src/clients/dynamodb_clients/mod.rs b/crates/worker/src/clients/dynamodb_clients/mod.rs new file mode 100644 index 0000000..b9babe5 --- /dev/null +++ b/crates/worker/src/clients/dynamodb_clients/mod.rs @@ -0,0 +1 @@ +pub mod client; diff --git a/crates/worker/src/commands/utils.rs b/crates/worker/src/commands/utils.rs index 1adb3ea..c4326f1 100644 --- a/crates/worker/src/commands/utils.rs +++ b/crates/worker/src/commands/utils.rs @@ -11,7 +11,7 @@ use types::item::{Item, Status, TaskResult}; use types::{CompilationRequest, ARTIFACTS_FOLDER}; use uuid::Uuid; -use crate::clients::dynamodb_client::DynamoDBClient; +use crate::clients::dynamodb_clients::client::DynamoDBClient; use crate::clients::errors::{DBError, S3Error}; use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::commands::compile::{CompilationInput, CompilationOutput}; diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 4457b85..c5db1e5 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -10,7 +10,7 @@ use aws_config::BehaviorVersion; use aws_runtime::env_config::file::{EnvConfigFileKind, EnvConfigFiles}; use std::num::NonZeroUsize; -use crate::clients::dynamodb_client::DynamoDBClient; +use clients::dynamodb_clients::client::DynamoDBClient; use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::clients::sqs_clients::wrapper::SqsClientWrapper; use crate::worker::EngineBuilder; diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index 2d9fa89..70161c3 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -11,7 +11,7 @@ use tracing::warn; use types::item::{Item, ItemError, Status, TaskResult}; use uuid::Uuid; -use crate::clients::dynamodb_client::DynamoDBClient; +use crate::clients::dynamodb_clients::client::DynamoDBClient; use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::errors::PurgeError; use crate::utils::lib::{s3_artifacts_dir, s3_compilation_files_dir, timestamp}; diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index efbab41..4543769 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -4,7 +4,7 @@ use tokio::task::JoinHandle; use tracing::{error, info, warn}; use types::{CompilationRequest, SqsMessage, VerificationRequest}; -use crate::clients::dynamodb_client::DynamoDBClient; +use crate::clients::dynamodb_clients::client::DynamoDBClient; use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::clients::sqs_clients::wrapper::SqsClientWrapper; use crate::commands::compile::{do_compile, CompilationInput}; From 570c9d0815e2ecc786e31f701b05c53813c2e619 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Tue, 17 Sep 2024 15:21:46 +0900 Subject: [PATCH 4/9] feat: retriable dynamodb --- .../src/clients/dynamodb_clients/client.rs | 195 +++++++++++++++++- .../src/clients/dynamodb_clients/mod.rs | 1 + .../src/clients/dynamodb_clients/wrapper.rs | 177 ++++++++++++++++ crates/worker/src/clients/errors.rs | 5 +- crates/worker/src/commands/utils.rs | 58 ++---- crates/worker/src/main.rs | 4 +- crates/worker/src/purgatory.rs | 37 ++-- crates/worker/src/worker.rs | 14 +- 8 files changed, 416 insertions(+), 75 deletions(-) create mode 100644 crates/worker/src/clients/dynamodb_clients/wrapper.rs diff --git a/crates/worker/src/clients/dynamodb_clients/client.rs b/crates/worker/src/clients/dynamodb_clients/client.rs index 434aa8f..0122a1a 100644 --- a/crates/worker/src/clients/dynamodb_clients/client.rs +++ b/crates/worker/src/clients/dynamodb_clients/client.rs @@ -1,8 +1,16 @@ +use aws_sdk_dynamodb::operation::scan::ScanOutput; use aws_sdk_dynamodb::types::AttributeValue; use aws_sdk_dynamodb::Client; -use types::item::Item; +use chrono::{DateTime, Utc}; +use std::collections::HashMap; +use std::sync::atomic::AtomicU8; +use std::sync::Arc; +use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder; +use tokio::sync::oneshot; +use types::item::{Item, Status}; -use crate::clients::errors::DBError; +use crate::clients::errors::{DBDeleteError, DBError, DBGetError, DBScanError, DBUpdateError}; +use crate::clients::retriable::{handle_action_result, match_result, ActionHandler}; #[derive(Clone)] pub struct DynamoDBClient { @@ -18,8 +26,9 @@ impl DynamoDBClient { } } - pub async fn delete_item(&self, id: &str) -> Result<(), DBError> { - self.client + pub async fn delete_item(&self, id: &str) -> Result<(), DBDeleteError> { + let _ = self + .client .delete_item() .table_name(self.table_name.clone()) .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) @@ -29,6 +38,55 @@ impl DynamoDBClient { Ok(()) } + pub async fn delete_item_attempt(&self, id: &str) -> Result, DBDeleteError> { + match_result!(DBDeleteError, self.delete_item(id).await) + } + + pub async fn update_item_raw(&self, update_item_builder: UpdateItemFluentBuilder) -> Result<(), DBUpdateError> { + let _ = update_item_builder.send().await; + Ok(()) + } + + pub async fn update_item_raw_attempt(&self, update_item_builder: UpdateItemFluentBuilder) -> Result, DBUpdateError> { + match_result!(DBUpdateError, self.update_item_raw(update_item_builder).await) + } + + pub async fn update_item_status_conditional( + &self, + id: &str, + from: &Status, + to: &Status, + ) -> Result<(), DBUpdateError> { + let _ = self + .client + .update_item() + .table_name(self.table_name.clone()) + .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) + .update_expression("SET #status = :toStatus") + .condition_expression("#status = :fromStatus") + .expression_attribute_names("#status", Status::attribute_name()) + .expression_attribute_values(":toStatus", AttributeValue::N(u32::from(to).to_string())) + .expression_attribute_values( + ":fromStatus", + AttributeValue::N(u32::from(from).to_string()), + ) + .send() + .await?; + Ok(()) + } + + pub async fn update_item_status_conditional_attempt( + &self, + id: &str, + from: &Status, + to: &Status, + ) -> Result, DBUpdateError> { + match_result!( + DBUpdateError, + self.update_item_status_conditional(id, from, to).await + ) + } + pub async fn get_item(&self, key: &str) -> Result, DBError> { let result = self .client @@ -39,10 +97,137 @@ impl DynamoDBClient { .await?; if let Some(item) = result.item { - // TODO: maybe change status or delete when error? Ok(Some(item.try_into()?)) } else { Ok(None) } } + + pub async fn get_item_attempt(&self, key: &str) -> Result>, DBError> { + match self.get_item(key).await { + Err(DBError::GetItemError(err)) => { + match_result!(DBGetError, Err(err)).map_err(DBError::from) + } + result => result.map(|value| Some(value)), + } + } + + pub async fn scan_items_prior_to( + &self, + time: &DateTime, + exclusive_start_key: &Option>, + ) -> Result { + const MAX_CAPACITY: usize = 100; + + self.client + .scan() + .table_name(self.table_name.clone()) + .filter_expression("CreatedAt <= :created_at") + .expression_attribute_values(":created_at", AttributeValue::S(time.to_rfc3339())) + .limit(MAX_CAPACITY as i32) + .set_exclusive_start_key(exclusive_start_key.clone()) + .send() + .await + } + + pub async fn scan_items_prior_to_attempt( + &self, + time: &DateTime, + exclusive_start_key: &Option>, + ) -> Result, DBScanError> { + match_result!( + DBScanError, + self.scan_items_prior_to(time, exclusive_start_key).await + ) + } +} + +#[derive(Default)] +pub enum DynamoDBAction { + #[default] + Default, + DeleteItem { + id: String, + sender: oneshot::Sender>, + }, + GetItem { + id: String, + sender: oneshot::Sender, DBError>>, + }, + ScanPriorTo { + time: DateTime, + exclusive_start_key: Option>, + sender: oneshot::Sender>, + }, + UpdateItemRaw { + update_item_builder: UpdateItemFluentBuilder, + sender: oneshot::Sender> + }, + UpdateItemStatusConditional { + id: String, + from: Status, + to: Status, + sender: oneshot::Sender>, + }, +} + +impl ActionHandler for DynamoDBClient { + type Action = DynamoDBAction; + + async fn handle(&self, action: Self::Action, state: Arc) -> Option { + match action { + DynamoDBAction::Default => unreachable!(), + DynamoDBAction::DeleteItem { id, sender } => { + let result = self.delete_item_attempt(&id).await; + handle_action_result(result, sender, state) + .map(|sender| DynamoDBAction::DeleteItem { id, sender }) + } + DynamoDBAction::GetItem { id, sender } => { + let result = self.get_item_attempt(&id).await; + handle_action_result(result, sender, state) + .map(|sender| DynamoDBAction::GetItem { id, sender }) + } + DynamoDBAction::ScanPriorTo { + time, + exclusive_start_key, + sender, + } => { + let result = self + .scan_items_prior_to_attempt(&time, &exclusive_start_key) + .await; + handle_action_result(result, sender, state).map(|sender| { + DynamoDBAction::ScanPriorTo { + time, + exclusive_start_key, + sender, + } + }) + } + DynamoDBAction::UpdateItemRaw {update_item_builder, sender} => { + let result = self.update_item_raw_attempt(update_item_builder.clone()).await; + handle_action_result(result, sender, state).map(|sender| DynamoDBAction::UpdateItemRaw { + update_item_builder, + sender + }) + } + DynamoDBAction::UpdateItemStatusConditional { + id, + from, + to, + sender, + } => { + let result = self + .update_item_status_conditional_attempt(&id, &to, &from) + .await; + handle_action_result(result, sender, state).map(|sender| { + DynamoDBAction::UpdateItemStatusConditional { + id, + to, + from, + sender, + } + }) + } + } + } } diff --git a/crates/worker/src/clients/dynamodb_clients/mod.rs b/crates/worker/src/clients/dynamodb_clients/mod.rs index b9babe5..8402e4f 100644 --- a/crates/worker/src/clients/dynamodb_clients/mod.rs +++ b/crates/worker/src/clients/dynamodb_clients/mod.rs @@ -1 +1,2 @@ pub mod client; +pub mod wrapper; diff --git a/crates/worker/src/clients/dynamodb_clients/wrapper.rs b/crates/worker/src/clients/dynamodb_clients/wrapper.rs new file mode 100644 index 0000000..139f2d5 --- /dev/null +++ b/crates/worker/src/clients/dynamodb_clients/wrapper.rs @@ -0,0 +1,177 @@ +use aws_sdk_dynamodb::operation::scan::ScanOutput; +use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_dynamodb::Client; +use chrono::{DateTime, Utc}; +use std::collections::HashMap; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder; +use tokio::sync::{mpsc, oneshot}; +use types::item::{Item, Status}; + +use crate::clients::dynamodb_clients::client::{DynamoDBAction, DynamoDBClient}; +use crate::clients::errors::{DBDeleteError, DBError, DBScanError, DBUpdateError}; +use crate::clients::retriable::{Retrier, State}; + +#[derive(Clone)] +pub struct DynamoDBClientWrapper { + pub client: DynamoDBClient, + actions_sender: mpsc::Sender, + state: Arc, +} + +impl DynamoDBClientWrapper { + pub fn new(client: Client, table_name: &str) -> Self { + let client = DynamoDBClient::new(client, table_name); + let state = Arc::new(AtomicU8::new(State::Connected as u8)); + let (sender, receiver) = mpsc::channel(1000); + + let retrier = Retrier::new(client.clone(), receiver, state.clone()); + tokio::spawn(retrier.start()); + + Self { + client, + state, + actions_sender: sender, + } + } + + pub async fn delete_item(&self, id: &str) -> Result<(), DBDeleteError> { + match self.state.load(Ordering::Acquire) { + 0 => match self.client.delete_item_attempt(id).await { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err.into()), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(DynamoDBAction::DeleteItem { + id: id.to_string(), + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn get_item(&self, key: &str) -> Result, DBError> { + match self.state.load(Ordering::Acquire) { + 0 => match self.client.get_item_attempt(key).await { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err.into()), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(DynamoDBAction::GetItem { + id: key.to_string(), + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn update_item_raw(&self, update_item_builder: UpdateItemFluentBuilder) -> Result<(), DBUpdateError> { + match self.state.load(Ordering::Acquire) { + 0 => match self + .client + .update_item_raw_attempt(update_item_builder.clone()) + .await + { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err.into()), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(DynamoDBAction::UpdateItemRaw { + update_item_builder, + sender + }) + .await; + receiver.await.unwrap() + } + + pub async fn scan_items_prior_to( + &self, + time: &DateTime, + exclusive_start_key: &Option>, + ) -> Result { + match self.state.load(Ordering::Acquire) { + 0 => match self + .client + .scan_items_prior_to_attempt(time, exclusive_start_key) + .await + { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err.into()), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(DynamoDBAction::ScanPriorTo { + time: time.clone(), + exclusive_start_key: exclusive_start_key.clone(), + sender, + }) + .await; + receiver.await.unwrap() + } + + pub async fn update_item_status_conditional( + &self, + id: &str, + from: &Status, + to: &Status, + ) -> Result<(), DBUpdateError> { + match self.state.load(Ordering::Acquire) { + 0 => match self + .client + .update_item_status_conditional_attempt(id, from, to) + .await + { + Ok(Some(val)) => return Ok(val), + Ok(None) => self + .state + .store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err.into()), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + self.actions_sender + .send(DynamoDBAction::UpdateItemStatusConditional { + id: id.to_string(), + from: from.clone(), + to: to.clone(), + sender, + }) + .await; + receiver.await.unwrap() + } +} diff --git a/crates/worker/src/clients/errors.rs b/crates/worker/src/clients/errors.rs index e7ff8e3..391b832 100644 --- a/crates/worker/src/clients/errors.rs +++ b/crates/worker/src/clients/errors.rs @@ -1,6 +1,7 @@ use aws_sdk_dynamodb::config::http::HttpResponse; use aws_sdk_dynamodb::operation::delete_item::DeleteItemError; use aws_sdk_dynamodb::operation::get_item::GetItemError; +use aws_sdk_dynamodb::operation::scan::ScanError; use aws_sdk_dynamodb::operation::update_item::UpdateItemError; use aws_sdk_s3::operation::delete_object::DeleteObjectError; use aws_sdk_s3::operation::get_object::GetObjectError; @@ -19,8 +20,8 @@ pub(crate) type SqsDeleteError = SdkError; // DynamoDB related errors pub(crate) type DBDeleteError = SdkError; pub(crate) type DBGetError = SdkError; - pub(crate) type DBUpdateError = SdkError; +pub(crate) type DBScanError = SdkError; // S3 related errors pub(crate) type S3ListObjectsError = SdkError; @@ -46,6 +47,8 @@ pub enum DBError { ItemFormatError(#[from] ItemError), #[error(transparent)] UpdateItemError(#[from] DBUpdateError), + #[error(transparent)] + ScanError(#[from] DBScanError), } #[derive(thiserror::Error, Debug)] diff --git a/crates/worker/src/commands/utils.rs b/crates/worker/src/commands/utils.rs index c4326f1..e2d5d1f 100644 --- a/crates/worker/src/commands/utils.rs +++ b/crates/worker/src/commands/utils.rs @@ -11,38 +11,25 @@ use types::item::{Item, Status, TaskResult}; use types::{CompilationRequest, ARTIFACTS_FOLDER}; use uuid::Uuid; -use crate::clients::dynamodb_clients::client::DynamoDBClient; +use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; use crate::clients::errors::{DBError, S3Error}; use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::commands::compile::{CompilationInput, CompilationOutput}; use crate::commands::errors::{CommandResultHandleError, PreparationError}; -use crate::sqs_listener::SqsReceiver; use crate::utils::cleaner::AutoCleanUp; -use crate::utils::lib::{s3_compilation_files_dir, SOL_ROOT, ZKSOLC_VERSIONS}; +use crate::utils::lib::{SOL_ROOT, ZKSOLC_VERSIONS}; async fn try_set_compiling_status( - db_client: &DynamoDBClient, + db_client: &DynamoDBClientWrapper, key: Uuid, ) -> Result<(), PreparationError> { let db_update_result = db_client - .client - .update_item() - .table_name(db_client.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(key.to_string())) - .update_expression("SET #status = :newStatus") - .condition_expression("#status = :currentStatus") - .expression_attribute_names("#status", Status::attribute_name()) - .expression_attribute_values( - ":newStatus", - AttributeValue::N(u32::from(Status::InProgress).to_string()), - ) - .expression_attribute_values( - ":currentStatus", - AttributeValue::N(u32::from(Status::Pending).to_string()), + .update_item_status_conditional( + key.to_string().as_str(), + &Status::Pending, + &Status::InProgress, ) - .send() .await; - match db_update_result { Ok(_) => Ok(()), Err(SdkError::ServiceError(err)) => match err.err() { @@ -60,7 +47,7 @@ async fn try_set_compiling_status( pub(crate) async fn prepare_compile_input( request: &CompilationRequest, - db_client: &DynamoDBClient, + db_client: &DynamoDBClientWrapper, s3_client: &S3ClientWrapper, ) -> Result { let zksolc_version = request.config.version.as_str(); @@ -74,7 +61,6 @@ pub(crate) async fn prepare_compile_input( let item: Item = match item { Some(item) => item, None => { - error!("No item id: {}", request.id); return Err(PreparationError::NoDBItemError(request.id.to_string())); } }; @@ -99,7 +85,7 @@ pub(crate) async fn prepare_compile_input( pub async fn on_compilation_success( id: Uuid, - db_client: &DynamoDBClient, + db_client: &DynamoDBClientWrapper, s3_client: &S3ClientWrapper, compilation_output: CompilationOutput, ) -> Result { @@ -136,10 +122,11 @@ pub async fn on_compilation_success( presigned_urls.push("".to_string()); } - db_client + let builder = db_client + .client .client .update_item() - .table_name(db_client.table_name.clone()) + .table_name(db_client.client.table_name.clone()) .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) .update_expression("SET #status = :newStatus, #data = :data") .expression_attribute_names("#status", Status::attribute_name()) @@ -148,10 +135,9 @@ pub async fn on_compilation_success( ":newStatus", AttributeValue::N(2.to_string()), // Ready ) - .expression_attribute_values(":data", AttributeValue::Ss(presigned_urls.clone())) - .send() - .await - .map_err(DBError::from)?; + .expression_attribute_values(":data", AttributeValue::Ss(presigned_urls.clone())); + + db_client.update_item_raw(builder).await.map_err(DBError::from)?; auto_clean_up.clean_up().await; Ok(TaskResult::Success { presigned_urls }) @@ -159,13 +145,14 @@ pub async fn on_compilation_success( pub async fn on_compilation_failed( id: Uuid, - db_client: &DynamoDBClient, + db_client: &DynamoDBClientWrapper, message: String, ) -> Result { - db_client + let builder = db_client + .client .client .update_item() - .table_name(db_client.table_name.clone()) + .table_name(db_client.client.table_name.clone()) .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) .update_expression("SET #status = :newStatus, #data = :data") .expression_attribute_names("#status", Status::attribute_name()) @@ -174,10 +161,9 @@ pub async fn on_compilation_failed( ":newStatus", AttributeValue::N(3.to_string()), // Failed ) - .expression_attribute_values(":data", AttributeValue::S(message.clone())) - .send() - .await - .map_err(DBError::from)?; + .expression_attribute_values(":data", AttributeValue::S(message.clone())); + + db_client.update_item_raw(builder).await.map_err(DBError::from)?; Ok(TaskResult::Failure(message)) } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index c5db1e5..4d8cd99 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -10,7 +10,7 @@ use aws_config::BehaviorVersion; use aws_runtime::env_config::file::{EnvConfigFileKind, EnvConfigFiles}; use std::num::NonZeroUsize; -use clients::dynamodb_clients::client::DynamoDBClient; +use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::clients::sqs_clients::wrapper::SqsClientWrapper; use crate::worker::EngineBuilder; @@ -42,7 +42,7 @@ async fn main() { // Initialize DynamoDb client let db_client = aws_sdk_dynamodb::Client::new(&config); - let db_client = DynamoDBClient::new(db_client, TABLE_NAME_DEFAULT); + let db_client = DynamoDBClientWrapper::new(db_client, TABLE_NAME_DEFAULT); // Initialize S3 client let s3_client = aws_sdk_s3::Client::new(&config); diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index 70161c3..cab7ca9 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -1,4 +1,3 @@ -use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::marker::PhantomData; @@ -11,7 +10,8 @@ use tracing::warn; use types::item::{Item, ItemError, Status, TaskResult}; use uuid::Uuid; -use crate::clients::dynamodb_clients::client::DynamoDBClient; +use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; +use crate::clients::errors::DBError; use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::errors::PurgeError; use crate::utils::lib::{s3_artifacts_dir, s3_compilation_files_dir, timestamp}; @@ -26,7 +26,7 @@ pub struct Purgatory { } impl Purgatory { - pub fn new(db_client: DynamoDBClient, s3_client: S3ClientWrapper) -> Self { + pub fn new(db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper) -> Self { let handle = NonNull::dangling(); let this = Self { inner: Arc::new(Mutex::new(Inner::new( @@ -67,7 +67,7 @@ impl Purgatory { struct Inner { state: State, s3_client: S3ClientWrapper, - db_client: DynamoDBClient, + db_client: DynamoDBClientWrapper, // No aliases possible since only we own the data handle: NonNull>, @@ -88,7 +88,7 @@ impl Inner { fn new( handle: NonNull>, state: State, - db_client: DynamoDBClient, + db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper, ) -> Self { tokio::spawn(Self::global_state_purge( @@ -115,7 +115,7 @@ impl Inner { } async fn global_state_purge( - db_client: DynamoDBClient, + db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper, ) -> Result<(), PurgeError> { const SYNC_FROM_OFFSET: Option = PURGE_INTERVAL.checked_mul(6); @@ -139,7 +139,7 @@ impl Inner { } pub async fn purge_item( - db_client: &DynamoDBClient, + db_client: &DynamoDBClientWrapper, s3_client: &S3ClientWrapper, id: &Uuid, status: &Status, @@ -161,7 +161,7 @@ impl Inner { db_client .delete_item(id.to_string().as_str()) .await - .unwrap(); // TODO: unwraps + .map_err(DBError::from)?; } Status::Done(_) => { let dir = s3_artifacts_dir(id.to_string().as_str()); @@ -169,7 +169,7 @@ impl Inner { db_client .delete_item(id.to_string().as_str()) .await - .unwrap(); // TODO: unwraps + .map_err(DBError::from)?; } } @@ -207,13 +207,13 @@ impl Inner { } struct GlobalState { - db_client: DynamoDBClient, + db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper, pub items: Vec, } impl GlobalState { - pub fn new(db_client: DynamoDBClient, s3_client: S3ClientWrapper) -> Self { + pub fn new(db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper) -> Self { Self { db_client, s3_client, @@ -228,20 +228,9 @@ impl GlobalState { loop { let output = self .db_client - .client - .scan() - .table_name(self.db_client.table_name.clone()) - .filter_expression("CreatedAt <= :created_at") - .expression_attribute_values( - ":created_at", - AttributeValue::S(sync_from.to_rfc3339()), - ) - .limit(MAX_CAPACITY as i32) - .set_exclusive_start_key(last_evaluated_key) - .send() + .scan_items_prior_to(sync_from, &last_evaluated_key) .await - .unwrap(); - + .map_err(DBError::from)?; let raw_items = if let Some(items) = output.items { items } else { diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index 4543769..713ba2e 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -4,7 +4,7 @@ use tokio::task::JoinHandle; use tracing::{error, info, warn}; use types::{CompilationRequest, SqsMessage, VerificationRequest}; -use crate::clients::dynamodb_clients::client::DynamoDBClient; +use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::clients::sqs_clients::wrapper::SqsClientWrapper; use crate::commands::compile::{do_compile, CompilationInput}; @@ -19,7 +19,7 @@ use crate::utils::lib::s3_compilation_files_dir; pub struct EngineBuilder { sqs_client: SqsClientWrapper, - db_client: DynamoDBClient, + db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper, running_workers: Vec, } @@ -27,7 +27,7 @@ pub struct EngineBuilder { impl EngineBuilder { pub fn new( sqs_client: SqsClientWrapper, - db_client: DynamoDBClient, + db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper, ) -> Self { EngineBuilder { @@ -60,7 +60,7 @@ pub struct RunningEngine { impl RunningEngine { pub fn new( sqs_listener: SqsListener, - db_client: DynamoDBClient, + db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper, num_workers: usize, ) -> Self { @@ -90,7 +90,7 @@ impl RunningEngine { async fn worker( sqs_receiver: SqsReceiver, - db_client: DynamoDBClient, + db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper, mut purgatory: Purgatory, ) { @@ -154,7 +154,7 @@ impl RunningEngine { request: CompilationRequest, receipt_handle: String, sqs_receiver: &SqsReceiver, - db_client: &DynamoDBClient, + db_client: &DynamoDBClientWrapper, s3_client: &S3ClientWrapper, purgatory: &mut Purgatory, ) -> Result<(), MessageProcessorError> { @@ -187,7 +187,7 @@ impl RunningEngine { request: &CompilationRequest, receipt_handle: &str, sqs_receiver: &SqsReceiver, - db_client: &DynamoDBClient, + db_client: &DynamoDBClientWrapper, s3_client: &S3ClientWrapper, ) -> Result { let id = request.id; From 5f00a93342bd7d47a3c23c66b9f48973468a8400 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Wed, 18 Sep 2024 12:29:31 +0900 Subject: [PATCH 5/9] feat: proposal how to remove boilerplate in wrappers --- .../src/clients/dynamodb_clients/client.rs | 36 ++-- .../src/clients/dynamodb_clients/wrapper.rs | 167 ++++++------------ crates/worker/src/clients/retriable.rs | 81 ++++++--- crates/worker/src/commands/utils.rs | 10 +- crates/worker/src/purgatory.rs | 6 +- crates/worker/src/worker.rs | 2 +- 6 files changed, 145 insertions(+), 157 deletions(-) diff --git a/crates/worker/src/clients/dynamodb_clients/client.rs b/crates/worker/src/clients/dynamodb_clients/client.rs index 0122a1a..47a84e4 100644 --- a/crates/worker/src/clients/dynamodb_clients/client.rs +++ b/crates/worker/src/clients/dynamodb_clients/client.rs @@ -1,11 +1,11 @@ use aws_sdk_dynamodb::operation::scan::ScanOutput; +use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder; use aws_sdk_dynamodb::types::AttributeValue; use aws_sdk_dynamodb::Client; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::sync::atomic::AtomicU8; use std::sync::Arc; -use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder; use tokio::sync::oneshot; use types::item::{Item, Status}; @@ -42,13 +42,22 @@ impl DynamoDBClient { match_result!(DBDeleteError, self.delete_item(id).await) } - pub async fn update_item_raw(&self, update_item_builder: UpdateItemFluentBuilder) -> Result<(), DBUpdateError> { - let _ = update_item_builder.send().await; + pub async fn update_item_raw( + &self, + update_item_builder: &UpdateItemFluentBuilder, + ) -> Result<(), DBUpdateError> { + let _ = update_item_builder.clone().send().await; Ok(()) } - pub async fn update_item_raw_attempt(&self, update_item_builder: UpdateItemFluentBuilder) -> Result, DBUpdateError> { - match_result!(DBUpdateError, self.update_item_raw(update_item_builder).await) + pub async fn update_item_raw_attempt( + &self, + update_item_builder: &UpdateItemFluentBuilder, + ) -> Result, DBUpdateError> { + match_result!( + DBUpdateError, + self.update_item_raw(update_item_builder).await + ) } pub async fn update_item_status_conditional( @@ -161,7 +170,7 @@ pub enum DynamoDBAction { }, UpdateItemRaw { update_item_builder: UpdateItemFluentBuilder, - sender: oneshot::Sender> + sender: oneshot::Sender>, }, UpdateItemStatusConditional { id: String, @@ -203,11 +212,16 @@ impl ActionHandler for DynamoDBClient { } }) } - DynamoDBAction::UpdateItemRaw {update_item_builder, sender} => { - let result = self.update_item_raw_attempt(update_item_builder.clone()).await; - handle_action_result(result, sender, state).map(|sender| DynamoDBAction::UpdateItemRaw { - update_item_builder, - sender + DynamoDBAction::UpdateItemRaw { + update_item_builder, + sender, + } => { + let result = self.update_item_raw_attempt(&update_item_builder).await; + handle_action_result(result, sender, state).map(|sender| { + DynamoDBAction::UpdateItemRaw { + update_item_builder, + sender, + } }) } DynamoDBAction::UpdateItemStatusConditional { diff --git a/crates/worker/src/clients/dynamodb_clients/wrapper.rs b/crates/worker/src/clients/dynamodb_clients/wrapper.rs index 139f2d5..223d996 100644 --- a/crates/worker/src/clients/dynamodb_clients/wrapper.rs +++ b/crates/worker/src/clients/dynamodb_clients/wrapper.rs @@ -1,17 +1,17 @@ use aws_sdk_dynamodb::operation::scan::ScanOutput; +use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder; use aws_sdk_dynamodb::types::AttributeValue; use aws_sdk_dynamodb::Client; use chrono::{DateTime, Utc}; use std::collections::HashMap; -use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::atomic::AtomicU8; use std::sync::Arc; -use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use types::item::{Item, Status}; use crate::clients::dynamodb_clients::client::{DynamoDBAction, DynamoDBClient}; use crate::clients::errors::{DBDeleteError, DBError, DBScanError, DBUpdateError}; -use crate::clients::retriable::{Retrier, State}; +use crate::clients::retriable::{execute_retriable_operation, Retrier, State}; #[derive(Clone)] pub struct DynamoDBClientWrapper { @@ -37,76 +37,41 @@ impl DynamoDBClientWrapper { } pub async fn delete_item(&self, id: &str) -> Result<(), DBDeleteError> { - match self.state.load(Ordering::Acquire) { - 0 => match self.client.delete_item_attempt(id).await { - Ok(Some(val)) => return Ok(val), - Ok(None) => self - .state - .store(State::Reconnecting as u8, Ordering::Release), - Err(err) => return Err(err.into()), - }, - 1 => {} - _ => unreachable!(), - } + let operation = || self.client.delete_item_attempt(id); + let action_factory = |sender| DynamoDBAction::DeleteItem { + id: id.to_string(), + sender, + }; - let (sender, receiver) = oneshot::channel(); - self.actions_sender - .send(DynamoDBAction::DeleteItem { - id: id.to_string(), - sender, - }) - .await; - receiver.await.unwrap() + // TODO: if all good. rewrite all other clients like that? + execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state) + .await } pub async fn get_item(&self, key: &str) -> Result, DBError> { - match self.state.load(Ordering::Acquire) { - 0 => match self.client.get_item_attempt(key).await { - Ok(Some(val)) => return Ok(val), - Ok(None) => self - .state - .store(State::Reconnecting as u8, Ordering::Release), - Err(err) => return Err(err.into()), - }, - 1 => {} - _ => unreachable!(), - } + let operation = || self.client.get_item_attempt(key); + let action_factory = |sender| DynamoDBAction::GetItem { + id: key.to_string(), + sender, + }; - let (sender, receiver) = oneshot::channel(); - self.actions_sender - .send(DynamoDBAction::GetItem { - id: key.to_string(), - sender, - }) - .await; - receiver.await.unwrap() + execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state) + .await } - pub async fn update_item_raw(&self, update_item_builder: UpdateItemFluentBuilder) -> Result<(), DBUpdateError> { - match self.state.load(Ordering::Acquire) { - 0 => match self - .client - .update_item_raw_attempt(update_item_builder.clone()) - .await - { - Ok(Some(val)) => return Ok(val), - Ok(None) => self - .state - .store(State::Reconnecting as u8, Ordering::Release), - Err(err) => return Err(err.into()), - }, - 1 => {} - _ => unreachable!(), - } + pub async fn update_item_raw( + &self, + update_item_builder: &UpdateItemFluentBuilder, + ) -> Result<(), DBUpdateError> { + let operation = || self.client.update_item_raw_attempt(update_item_builder); + + let action_factory = |sender| DynamoDBAction::UpdateItemRaw { + update_item_builder: update_item_builder.clone(), + sender, + }; - let (sender, receiver) = oneshot::channel(); - self.actions_sender - .send(DynamoDBAction::UpdateItemRaw { - update_item_builder, - sender - }) - .await; - receiver.await.unwrap() + execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state) + .await } pub async fn scan_items_prior_to( @@ -114,31 +79,19 @@ impl DynamoDBClientWrapper { time: &DateTime, exclusive_start_key: &Option>, ) -> Result { - match self.state.load(Ordering::Acquire) { - 0 => match self - .client + let operation = || { + self.client .scan_items_prior_to_attempt(time, exclusive_start_key) - .await - { - Ok(Some(val)) => return Ok(val), - Ok(None) => self - .state - .store(State::Reconnecting as u8, Ordering::Release), - Err(err) => return Err(err.into()), - }, - 1 => {} - _ => unreachable!(), - } + }; - let (sender, receiver) = oneshot::channel(); - self.actions_sender - .send(DynamoDBAction::ScanPriorTo { - time: time.clone(), - exclusive_start_key: exclusive_start_key.clone(), - sender, - }) - .await; - receiver.await.unwrap() + let action_factory = |sender| DynamoDBAction::ScanPriorTo { + time: time.clone(), + exclusive_start_key: exclusive_start_key.clone(), + sender, + }; + + execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state) + .await } pub async fn update_item_status_conditional( @@ -147,31 +100,19 @@ impl DynamoDBClientWrapper { from: &Status, to: &Status, ) -> Result<(), DBUpdateError> { - match self.state.load(Ordering::Acquire) { - 0 => match self - .client + let operation = || { + self.client .update_item_status_conditional_attempt(id, from, to) - .await - { - Ok(Some(val)) => return Ok(val), - Ok(None) => self - .state - .store(State::Reconnecting as u8, Ordering::Release), - Err(err) => return Err(err.into()), - }, - 1 => {} - _ => unreachable!(), - } + }; + + let action_factory = |sender| DynamoDBAction::UpdateItemStatusConditional { + id: id.to_string(), + from: from.clone(), + to: to.clone(), + sender, + }; - let (sender, receiver) = oneshot::channel(); - self.actions_sender - .send(DynamoDBAction::UpdateItemStatusConditional { - id: id.to_string(), - from: from.clone(), - to: to.clone(), - sender, - }) - .await; - receiver.await.unwrap() + execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state) + .await } } diff --git a/crates/worker/src/clients/retriable.rs b/crates/worker/src/clients/retriable.rs index a42d39e..470e963 100644 --- a/crates/worker/src/clients/retriable.rs +++ b/crates/worker/src/clients/retriable.rs @@ -42,31 +42,6 @@ pub trait ActionHandler { async fn handle(&self, action: Self::Action, state: Arc) -> Option; } -pub(crate) fn handle_action_result( - result: Result, E>, - sender: oneshot::Sender>, - state: Arc, -) -> Option>> -where - T: Send + 'static, - E: Send + 'static, -{ - match result { - Ok(Some(val)) => { - state.store(State::Connected as u8, Ordering::Release); - let _ = sender.send(Ok(val)); - None - } - Err(err) => { - let _ = sender.send(Err(err)); - None - } - Ok(None) => { - state.store(State::Reconnecting as u8, Ordering::Release); - Some(sender) - } - } -} pub enum State { Connected = 0, Reconnecting = 1, @@ -89,7 +64,7 @@ impl Retrier { pub async fn start(mut self) { const SLEEP_DURATION: Duration = Duration::from_secs(3); - // add lru instead + // TODO: introduce limit let mut pending_actions = vec![]; loop { @@ -138,3 +113,57 @@ impl Retrier { pending_actions.truncate(pivot); } } + +pub(crate) fn handle_action_result( + result: Result, E>, + sender: oneshot::Sender>, + state: Arc, +) -> Option>> +where + T: Send + 'static, + E: Send + 'static, +{ + match result { + Ok(Some(val)) => { + state.store(State::Connected as u8, Ordering::Release); + let _ = sender.send(Ok(val)); + None + } + Err(err) => { + let _ = sender.send(Err(err)); + None + } + Ok(None) => { + state.store(State::Reconnecting as u8, Ordering::Release); + Some(sender) + } + } +} + +pub(crate) async fn execute_retriable_operation( + operation: F, + action_factory: AFactory, + action_sender: &mpsc::Sender, + state: &AtomicU8, +) -> Result +where + F: Fn() -> Fut, + Fut: std::future::Future, E>>, + AFactory: Fn(oneshot::Sender>) -> A, +{ + match state.load(Ordering::Acquire) { + 0 => match operation().await { + Ok(Some(val)) => return Ok(val), + Ok(None) => state.store(State::Reconnecting as u8, Ordering::Release), + Err(err) => return Err(err.into()), + }, + 1 => {} + _ => unreachable!(), + } + + let (sender, receiver) = oneshot::channel(); + let action = action_factory(sender); + action_sender.send(action).await; + + receiver.await.unwrap() // TODO: remove unwrap +} diff --git a/crates/worker/src/commands/utils.rs b/crates/worker/src/commands/utils.rs index e2d5d1f..c333626 100644 --- a/crates/worker/src/commands/utils.rs +++ b/crates/worker/src/commands/utils.rs @@ -137,7 +137,10 @@ pub async fn on_compilation_success( ) .expression_attribute_values(":data", AttributeValue::Ss(presigned_urls.clone())); - db_client.update_item_raw(builder).await.map_err(DBError::from)?; + db_client + .update_item_raw(&builder) + .await + .map_err(DBError::from)?; auto_clean_up.clean_up().await; Ok(TaskResult::Success { presigned_urls }) @@ -163,7 +166,10 @@ pub async fn on_compilation_failed( ) .expression_attribute_values(":data", AttributeValue::S(message.clone())); - db_client.update_item_raw(builder).await.map_err(DBError::from)?; + db_client + .update_item_raw(&builder) + .await + .map_err(DBError::from)?; Ok(TaskResult::Failure(message)) } diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index cab7ca9..41336e3 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -120,7 +120,7 @@ impl Inner { ) -> Result<(), PurgeError> { const SYNC_FROM_OFFSET: Option = PURGE_INTERVAL.checked_mul(6); - let mut global_state = GlobalState::new(db_client.clone(), s3_client.clone()); + let mut global_state = GlobalState::new(db_client.clone()); let sync_from = Utc::now() - SYNC_FROM_OFFSET.unwrap(); loop { @@ -208,15 +208,13 @@ impl Inner { struct GlobalState { db_client: DynamoDBClientWrapper, - s3_client: S3ClientWrapper, pub items: Vec, } impl GlobalState { - pub fn new(db_client: DynamoDBClientWrapper, s3_client: S3ClientWrapper) -> Self { + pub fn new(db_client: DynamoDBClientWrapper) -> Self { Self { db_client, - s3_client, items: vec![], } } diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index 713ba2e..11a2f0b 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -1,7 +1,7 @@ use std::num::NonZeroUsize; use std::time::Duration; use tokio::task::JoinHandle; -use tracing::{error, info, warn}; +use tracing::{error, warn}; use types::{CompilationRequest, SqsMessage, VerificationRequest}; use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; From 4b3481450d14391a301f9f08121edcd724633a08 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Fri, 20 Sep 2024 13:07:46 +0900 Subject: [PATCH 6/9] refactor: ci fixes regarding formatting --- Cargo.toml | 16 ++++++++-------- crates/lambdas/Cargo.toml | 2 -- crates/worker/Cargo.toml | 2 +- crates/worker/src/main.rs | 7 +++++++ 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6053abd..4e82507 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,5 @@ [workspace] -members = [ - "crates/types", - "crates/lambdas", - "crates/worker", -] +members = ["crates/types", "crates/lambdas", "crates/worker"] exclude = ["api"] [workspace.dependencies] @@ -12,12 +8,16 @@ aws-sdk-s3 = "1.43.0" aws-sdk-sqs = "1.39.0" aws-sdk-dynamodb = "1.42.0" chrono = "0.4.38" -tokio = {version = "1.39.3", features = ["macros"]} +tokio = { version = "1.39.3", features = ["macros"] } serde = "1.0.207" serde_json = "1.0.128" thiserror = "1.0.63" tracing = { version = "0.1.40", features = ["log"] } -tracing-subscriber = { version = "0.3.18", default-features = false, features = ["fmt", "ansi"] } +tracing-subscriber = { version = "0.3.18", default-features = false, features = [ + "fmt", + "ansi", +] } uuid = { version = "1.10.0", features = ["serde", "v4"] } -types = {version = "0.0.1", path = "crates/types"} \ No newline at end of file +# Internal dependencies +types = { version = "0.0.1", path = "crates/types" } \ No newline at end of file diff --git a/crates/lambdas/Cargo.toml b/crates/lambdas/Cargo.toml index c221c22..344116c 100644 --- a/crates/lambdas/Cargo.toml +++ b/crates/lambdas/Cargo.toml @@ -13,12 +13,10 @@ chrono.workspace = true tokio.workspace = true serde.workspace = true serde_json.workspace = true -thiserror.workspace = true tracing.workspace = true tracing-subscriber.workspace = true uuid.workspace = true -lambda_runtime = "0.13.0" lambda_http = "0.13.0" # Inner crates diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 3f404d7..883ab01 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -9,7 +9,7 @@ aws-sdk-s3.workspace = true aws-sdk-sqs.workspace = true aws-sdk-dynamodb.workspace = true chrono.workspace = true -tokio = {workspace = true, features = ["rt-multi-thread", "sync"]} +tokio = { workspace = true, features = ["rt-multi-thread", "sync"] } serde = { workspace = true, features = ["derive"] } serde_json.workspace = true thiserror.workspace = true diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 4d8cd99..428309f 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -25,6 +25,13 @@ const BUCKET_NAME_DEFAULT: &str = "zksync-compilation-s3"; #[tokio::main] async fn main() { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_ansi(false) + .without_time() // CloudWatch will add the ingestion time + .with_target(false) + .init(); + let profile_name = std::env::var("AWS_PROFILE").unwrap_or(AWS_PROFILE_DEFAULT.into()); let profile_files = EnvConfigFiles::builder() .with_file(EnvConfigFileKind::Credentials, "./credentials") From 296a24c0e59c32df3274b51749a77c99866339fa Mon Sep 17 00:00:00 2001 From: taco-paco Date: Fri, 20 Sep 2024 13:10:21 +0900 Subject: [PATCH 7/9] fix: taplo --- crates/worker/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 883ab01..dc05778 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -9,7 +9,7 @@ aws-sdk-s3.workspace = true aws-sdk-sqs.workspace = true aws-sdk-dynamodb.workspace = true chrono.workspace = true -tokio = { workspace = true, features = ["rt-multi-thread", "sync"] } +tokio = { workspace = true, features = ["rt-multi-thread", "sync"] } serde = { workspace = true, features = ["derive"] } serde_json.workspace = true thiserror.workspace = true From 57630f6571bed2502d130dd77daaf3e13e6737e9 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Fri, 20 Sep 2024 13:17:32 +0900 Subject: [PATCH 8/9] fix: taplo --- crates/lambdas/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/lambdas/Cargo.toml b/crates/lambdas/Cargo.toml index 344116c..5398911 100644 --- a/crates/lambdas/Cargo.toml +++ b/crates/lambdas/Cargo.toml @@ -20,7 +20,7 @@ uuid.workspace = true lambda_http = "0.13.0" # Inner crates -types = {workspace = true} +types.workspace = true [[bin]] name = "generate-presigned-urls" From 52589eac20bcaacd4af2e1326cf37e91dde25ddb Mon Sep 17 00:00:00 2001 From: taco-paco Date: Thu, 17 Oct 2024 15:06:41 +0300 Subject: [PATCH 9/9] fix: taplo --- Cargo.toml | 2 +- crates/types/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4e82507..9cdd25d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,4 +20,4 @@ tracing-subscriber = { version = "0.3.18", default-features = false, features = uuid = { version = "1.10.0", features = ["serde", "v4"] } # Internal dependencies -types = { version = "0.0.1", path = "crates/types" } \ No newline at end of file +types = { version = "0.0.1", path = "crates/types" } diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index c94bb1f..5fbcf59 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -11,4 +11,4 @@ thiserror.workspace = true uuid.workspace = true [dev-dependencies] -serde_json.workspace = true \ No newline at end of file +serde_json.workspace = true