diff --git a/crates/lambdas/Cargo.toml b/crates/lambdas/Cargo.toml index 5398911..6a3f9e5 100644 --- a/crates/lambdas/Cargo.toml +++ b/crates/lambdas/Cargo.toml @@ -31,3 +31,8 @@ path = "src/generate_presigned_urls.rs" name = "compile" version = "0.0.1" path = "src/compile.rs" + +[[bin]] +name = "verify" +version = "0.0.1" +path = "src/verify.rs" \ No newline at end of file diff --git a/crates/lambdas/src/compile.rs b/crates/lambdas/src/compile.rs index f03e2fb..6ce545c 100644 --- a/crates/lambdas/src/compile.rs +++ b/crates/lambdas/src/compile.rs @@ -1,17 +1,20 @@ use aws_config::BehaviorVersion; use aws_sdk_dynamodb::{error::SdkError, operation::put_item::PutItemError}; +use chrono::Utc; use lambda_http::{ run, service_fn, Error as LambdaError, Request as LambdaRequest, Response as LambdaResponse, }; use std::ops::Add; -use chrono::Utc; use tracing::{error, info}; -use types::{CompilationRequest, SqsMessage, item::{Item, Status}}; +use types::{ + item::{Item, Status}, + CompilationRequest, SqsMessage, +}; mod common; use crate::common::{errors::Error, utils::extract_request, BUCKET_NAME_DEFAULT}; -// TODO: remove on release +// TODO: remove on release. random change const QUEUE_URL_DEFAULT: &str = "https://sqs.ap-southeast-2.amazonaws.com/266735844848/zksync-sqs"; const TABLE_NAME_DEFAULT: &str = "zksync-table"; @@ -40,7 +43,7 @@ async fn compile( let item = Item { id: request.id, status: Status::Pending, - created_at + created_at, }; let result = dynamo_client @@ -181,4 +184,4 @@ async fn main() -> Result<(), LambdaError> { } })) .await -} \ No newline at end of file +} diff --git a/crates/lambdas/src/verify.rs b/crates/lambdas/src/verify.rs new file mode 100644 index 0000000..5d60e63 --- /dev/null +++ b/crates/lambdas/src/verify.rs @@ -0,0 +1,176 @@ +use aws_config::BehaviorVersion; +use aws_sdk_dynamodb::{error::SdkError, operation::put_item::PutItemError}; +use chrono::Utc; +use lambda_http::{ + run, service_fn, Error as LambdaError, Request as LambdaRequest, Response as LambdaResponse, +}; +use std::ops::Add; +use tracing::{error, info}; +use types::{ + item::{Item, Status}, + SqsMessage, VerificationRequest, +}; + +mod common; +use crate::common::{errors::Error, utils::extract_request, BUCKET_NAME_DEFAULT}; + +// TODO: remove on release +const QUEUE_URL_DEFAULT: &str = "https://sqs.ap-southeast-2.amazonaws.com/266735844848/zksync-sqs"; +const TABLE_NAME_DEFAULT: &str = "zksync-table"; + +const NO_OBJECTS_TO_COMPILE_ERROR: &str = "There are no objects to compile"; +const RECOMPILATION_ATTEMPT_ERROR: &str = "Recompilation attemp"; + +async fn verify( + request: VerificationRequest, + dynamo_client: &aws_sdk_dynamodb::Client, + table_name: &str, + sqs_client: &aws_sdk_sqs::Client, + queue_url: &str, +) -> Result<(), Error> { + let created_at = Utc::now(); + let item = Item { + id: request.id, + status: Status::Pending, + created_at, + }; + + let result = dynamo_client + .put_item() + .table_name(table_name) + .set_item(Some(item.into())) + .condition_expression("attribute_not_exists(ID)") + .send() + .await; + + match result { + Ok(value) => value, + Err(SdkError::ServiceError(val)) => match val.err() { + PutItemError::ConditionalCheckFailedException(_) => { + error!("Reverification attempt, id: {}", request.id); + let response = lambda_http::Response::builder() + .status(400) + .header("content-type", "text/html") + .body(RECOMPILATION_ATTEMPT_ERROR.into()) + .map_err(Error::from)?; + + return Err(Error::HttpError(response)); + } + _ => return Err(Box::new(SdkError::ServiceError(val)).into()), + }, + Err(err) => return Err(Box::new(err).into()), + }; + + let message = SqsMessage::Verify { request }; + let message = match serde_json::to_string(&message) { + Ok(val) => val, + Err(err) => { + error!("Serialization failed, id: {:?}", message); + return Err(Box::new(err).into()); + } + }; + let message_output = sqs_client + .send_message() + .queue_url(queue_url) + .message_body(message) + .send() + .await + .map_err(Box::new)?; + + info!( + "message sent to sqs. message_id: {}", + message_output.message_id.unwrap_or("empty_id".into()) + ); + + Ok(()) +} + +#[tracing::instrument(skip( + dynamo_client, + table_name, + sqs_client, + queue_url, + s3_client, + bucket_name +))] +async fn process_request( + request: LambdaRequest, + dynamo_client: &aws_sdk_dynamodb::Client, + table_name: &str, + sqs_client: &aws_sdk_sqs::Client, + queue_url: &str, + s3_client: &aws_sdk_s3::Client, + bucket_name: &str, +) -> Result, Error> { + let request = extract_request::(request)?; + + let objects = s3_client + .list_objects_v2() + .delimiter('/') + .prefix(request.id.to_string().add("/")) + .bucket(bucket_name) + .send() + .await + .map_err(Box::new)?; + + if let None = &objects.contents { + error!("No objects in folder: {}", request.id); + let response = LambdaResponse::builder() + .status(400) + .header("content-type", "text/html") + .body(NO_OBJECTS_TO_COMPILE_ERROR.into()) + .map_err(Error::from)?; + + return Err(Error::HttpError(response)); + } + + info!("Verify"); + verify(request, dynamo_client, table_name, sqs_client, queue_url).await?; + + let response = LambdaResponse::builder() + .status(200) + .header("content-type", "text/html") + .body(Default::default()) + .map_err(Box::new)?; + + return Ok(response); +} + +#[tokio::main] +async fn main() -> Result<(), LambdaError> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_ansi(false) + .without_time() // CloudWatch will add the ingestion time + .with_target(false) + .init(); + + let queue_url = std::env::var("QUEUE_URL").unwrap_or(QUEUE_URL_DEFAULT.into()); + let table_name = std::env::var("TABLE_NAME").unwrap_or(TABLE_NAME_DEFAULT.into()); + let bucket_name = std::env::var("BUCKET_NAME").unwrap_or(BUCKET_NAME_DEFAULT.into()); + + let config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let dynamo_client = aws_sdk_dynamodb::Client::new(&config); + let sqs_client = aws_sdk_sqs::Client::new(&config); + let s3_client = aws_sdk_s3::Client::new(&config); + + run(service_fn(|request: LambdaRequest| async { + let result = process_request( + request, + &dynamo_client, + &table_name, + &sqs_client, + &queue_url, + &s3_client, + &bucket_name, + ) + .await; + + match result { + Ok(val) => Ok(val), + Err(Error::HttpError(val)) => Ok(val), + Err(Error::LambdaError(err)) => Err(err), + } + })) + .await +} diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 5fbcf59..564f7c5 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -5,8 +5,10 @@ edition = "2021" [dependencies] aws-sdk-dynamodb.workspace = true +aws-sdk-sqs.workspace = true chrono.workspace = true serde.workspace = true +serde_json.workspace = true thiserror.workspace = true uuid.workspace = true diff --git a/crates/types/src/item.rs b/crates/types/src/item.rs index 7506703..f5d5e1c 100644 --- a/crates/types/src/item.rs +++ b/crates/types/src/item.rs @@ -1,25 +1,25 @@ use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, FixedOffset, Utc}; -use serde::{Deserialize, Serialize}; +use errors::ItemError; use std::collections::HashMap; use std::fmt; use std::fmt::Formatter; use uuid::Uuid; -pub type AttributeMap = HashMap; +use crate::item::task_result::TaskResult; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum TaskResult { - Success { presigned_urls: Vec }, - Failure(String), -} +pub mod errors; +pub mod task_result; + +pub type AttributeMap = HashMap; #[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq))] pub enum Status { // TODO: add FilesUploaded(?) Pending, InProgress, - Done(TaskResult), + Done(TaskResult), // TODO: TaskResult will be generic probably } impl Status { @@ -33,8 +33,7 @@ impl fmt::Display for Status { match self { Status::Pending => write!(f, "Pending"), Status::InProgress => write!(f, "InProgress"), - Status::Done(TaskResult::Success { .. }) => write!(f, "Success"), - Status::Done(TaskResult::Failure(msg)) => write!(f, "Failure: {}", msg), + Status::Done(val) => val.fmt(f), } } } @@ -56,21 +55,6 @@ impl From for u32 { } } -impl From for AttributeMap { - fn from(value: TaskResult) -> Self { - match value { - TaskResult::Success { presigned_urls } => HashMap::from([( - Item::data_attribute_name().into(), - AttributeValue::Ss(presigned_urls), - )]), - TaskResult::Failure(message) => HashMap::from([( - Item::data_attribute_name().into(), - AttributeValue::S(message), - )]), - } - } -} - impl From for AttributeMap { fn from(value: Status) -> Self { let mut map = HashMap::from([( @@ -80,7 +64,11 @@ impl From for AttributeMap { // For `Done` variant, reuse the conversion logic from `TaskResult` if let Status::Done(task_result) = value { - map.extend(AttributeMap::from(task_result)); + let task_result_map = HashMap::from([( + TaskResult::attribute_name().into(), + AttributeValue::M(AttributeMap::from(task_result)), + )]); + map.extend(task_result_map); } map @@ -97,60 +85,35 @@ impl TryFrom<&AttributeMap> for Status { .as_n() .map_err(|_| ItemError::invalid_attribute_type(Status::attribute_name(), "number"))? .parse::()?; - let status = match status { - 0 => Status::Pending, - 1 => Status::InProgress, - 2 => { - let data = value.get(Item::data_attribute_name()).ok_or( - ItemError::absent_attribute_error(Item::data_attribute_name()), - )?; - let data = data.as_ss().map_err(|_| { - ItemError::invalid_attribute_type(Item::data_attribute_name(), "string array") - })?; - Status::Done(TaskResult::Success { - presigned_urls: data.clone(), - }) - } - 3 => { - let data = value.get(Item::data_attribute_name()).ok_or( - ItemError::absent_attribute_error(Item::data_attribute_name()), + match status { + 0 => Ok(Status::Pending), + 1 => Ok(Status::InProgress), + 2 | 3 => { + let data = value.get(TaskResult::attribute_name()).ok_or( + ItemError::absent_attribute_error(TaskResult::attribute_name()), )?; - let data = data.as_s().map_err(|_| { - ItemError::invalid_attribute_type(Item::data_attribute_name(), "string") + let data = data.as_m().map_err(|_| { + ItemError::invalid_attribute_type(TaskResult::attribute_name(), "map") })?; - Status::Done(TaskResult::Failure(data.clone())) + let task_result: TaskResult = data.try_into()?; + match (status, &task_result) { + (2, TaskResult::Success(_)) => Ok(Status::Done(task_result)), + (3, TaskResult::Failure(_)) => Ok(Status::Done(task_result)), + _ => Err(ItemError::FormatError(format!( + "status is {}, while actual value: {}", + status, task_result + ))), + } } val => return Err(ItemError::FormatError(format!("Status value is: {}", val))), - }; - - Ok(status) - } -} - -#[derive(thiserror::Error, Debug)] -pub enum ItemError { - #[error("Invalid Item format: {0}")] - FormatError(String), - #[error(transparent)] - NumParseError(#[from] std::num::ParseIntError), - #[error(transparent)] - DataParseError(#[from] chrono::format::ParseError), -} - -impl ItemError { - fn absent_attribute_error(attribute_name: &str) -> Self { - let err_str = format!("No {} attribute in item", attribute_name); - Self::FormatError(err_str) - } - - fn invalid_attribute_type(attribute_name: &str, t: &str) -> Self { - let err_str = format!("{} attribute value isn't a {}", attribute_name, t); - Self::FormatError(err_str) + } } } +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq))] pub struct Item { pub id: Uuid, pub status: Status, @@ -163,10 +126,6 @@ impl Item { Status::attribute_name() } - pub const fn data_attribute_name() -> &'static str { - "Data" - } - pub const fn id_attribute_name() -> &'static str { "ID" } @@ -225,3 +184,206 @@ impl TryFrom for Item { }) } } + +#[cfg(test)] +mod tests { + use super::{task_result::*, *}; + + use crate::item::errors::ServerError; + use aws_sdk_dynamodb::types::AttributeValue; + use chrono::Utc; + use std::collections::HashMap; + use uuid::Uuid; + + #[test] + fn test_status_pending_to_attribute_map() { + let status = Status::Pending; + let expected_map = HashMap::from([( + Status::attribute_name().to_string(), + AttributeValue::N("0".to_string()), + )]); + + let attribute_map: AttributeMap = status.into(); + assert_eq!(attribute_map, expected_map); + } + + #[test] + fn test_status_pending_from_attribute_map() { + let attribute_map = HashMap::from([( + Status::attribute_name().to_string(), + AttributeValue::N("0".to_string()), + )]); + + let result: Status = (&attribute_map).try_into().expect("Conversion failed"); + assert_eq!(result, Status::Pending); + } + + #[test] + fn test_status_inprogress_to_attribute_map() { + let status = Status::InProgress; + let expected_map = HashMap::from([( + Status::attribute_name().to_string(), + AttributeValue::N("1".to_string()), + )]); + + let attribute_map: AttributeMap = status.into(); + assert_eq!(attribute_map, expected_map); + } + + #[test] + fn test_status_inprogress_from_attribute_map() { + let attribute_map = HashMap::from([( + Status::attribute_name().to_string(), + AttributeValue::N("1".to_string()), + )]); + + let result: Status = (&attribute_map).try_into().expect("Conversion failed"); + assert_eq!(result, Status::InProgress); + } + + fn status_success_compile_map() -> AttributeMap { + HashMap::from([ + ( + Status::attribute_name().to_string(), + AttributeValue::N("2".to_string()), + ), + ( + TaskResult::attribute_name().to_string(), + AttributeValue::M(HashMap::from([( + TaskResult::success_attribute_name().to_string(), + AttributeValue::M(HashMap::from([( + TaskSuccess::compile_attribute_name().to_string(), + AttributeValue::Ss(vec!["url1".to_string(), "url2".to_string()]), + )])), + )])), + ), + ]) + } + + #[test] + fn test_status_done_compile_success_to_attribute_map() { + let expected_map = status_success_compile_map(); + + let task_result = TaskResult::Success(TaskSuccess::Compile { + presigned_urls: vec!["url1".to_string(), "url2".to_string()], + }); + let status = Status::Done(task_result.clone()); + let attribute_map: AttributeMap = status.into(); + + assert_eq!(attribute_map, expected_map); + } + + #[test] + fn test_status_done_compile_success_from_attribute_map() { + let expected_result = Status::Done(TaskResult::Success(TaskSuccess::Compile { + presigned_urls: vec!["url1".to_string(), "url2".to_string()], + })); + + let attribute_map = status_success_compile_map(); + let result: Status = (&attribute_map).try_into().expect("Conversion failed"); + + assert_eq!(result, expected_result); + } + + fn status_failure_compilation_map() -> AttributeMap { + HashMap::from([ + ( + Status::attribute_name().to_string(), + AttributeValue::N("3".to_string()), + ), + ( + TaskResult::attribute_name().to_string(), + AttributeValue::M(HashMap::from([( + TaskResult::failure_attribute_name().to_string(), + AttributeValue::Ss(vec![ + "CompilationError".to_string(), + "Compilation failed".to_string(), + ]), + )])), + ), + ]) + } + + #[test] + fn test_status_done_failure_to_attribute_map() { + let expected_map = status_failure_compilation_map(); + + let task_result = TaskResult::Failure(TaskFailure { + error_type: ServerError::CompilationError, + message: "Compilation failed".to_string(), + }); + let status = Status::Done(task_result); + let attribute_map: AttributeMap = status.into(); + + assert_eq!(attribute_map, expected_map); + } + + #[test] + fn test_status_done_failure_from_attribute_map() { + let expected_result = Status::Done(TaskResult::Failure(TaskFailure { + error_type: ServerError::CompilationError, + message: "Compilation failed".to_string(), + })); + + let attribute_map = status_failure_compilation_map(); + let result: Status = (&attribute_map).try_into().expect("Conversion failed"); + + assert_eq!(result, expected_result); + } + + #[test] + fn test_item_to_attribute_map() { + let item = Item { + id: Uuid::new_v4(), + status: Status::InProgress, + created_at: Utc::now(), + }; + + let expected_map = HashMap::from([ + ( + Item::id_attribute_name().to_string(), + AttributeValue::S(item.id.to_string()), + ), + ( + Item::created_at_attribute_name().to_string(), + AttributeValue::S(item.created_at.to_rfc3339()), + ), + ( + Status::attribute_name().to_string(), + AttributeValue::N("1".to_string()), + ), + ]); + + let attribute_map: AttributeMap = item.clone().into(); + assert_eq!(attribute_map, expected_map); + } + + #[test] + fn test_item_from_attribute_map() { + let id = Uuid::new_v4(); + let created_at = Utc::now(); + let attribute_map = HashMap::from([ + ( + Item::id_attribute_name().to_string(), + AttributeValue::S(id.to_string()), + ), + ( + Item::created_at_attribute_name().to_string(), + AttributeValue::S(created_at.to_rfc3339()), + ), + ( + Status::attribute_name().to_string(), + AttributeValue::N("1".to_string()), + ), + ]); + + let expected_item = Item { + id, + status: Status::InProgress, + created_at, + }; + + let result: Item = attribute_map.try_into().expect("Conversion failed"); + assert_eq!(result, expected_item); + } +} diff --git a/crates/types/src/item/errors.rs b/crates/types/src/item/errors.rs new file mode 100644 index 0000000..b168ad5 --- /dev/null +++ b/crates/types/src/item/errors.rs @@ -0,0 +1,57 @@ +#[derive(thiserror::Error, Debug)] +pub enum ItemError { + #[error("Invalid Item format: {0}")] + FormatError(String), + #[error(transparent)] + NumParseError(#[from] std::num::ParseIntError), + #[error(transparent)] + DataParseError(#[from] chrono::format::ParseError), +} + +impl ItemError { + pub(crate) fn absent_attribute_error(attribute_name: &str) -> Self { + let err_str = format!("No {} attribute in item", attribute_name); + Self::FormatError(err_str) + } + + pub(crate) fn invalid_attribute_type(attribute_name: &str, t: &str) -> Self { + let err_str = format!("{} attribute value isn't a {}", attribute_name, t); + Self::FormatError(err_str) + } +} + +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq))] +pub enum ServerError { + UnsupportedCompilerVersion, + CompilationError, + UnknownNetworkError, + VerificationError, + InternalError, +} + +impl Into<&'static str> for ServerError { + fn into(self) -> &'static str { + match self { + ServerError::UnsupportedCompilerVersion => "UnsupportedCompilerVersion", + ServerError::CompilationError => "CompilationError", + ServerError::InternalError => "InternalError", + ServerError::UnknownNetworkError => "UnknownNetworkError", + ServerError::VerificationError => "VerificationError" + } + } +} + +impl TryFrom<&str> for ServerError { + type Error = String; + fn try_from(value: &str) -> Result { + match value { + "UnsupportedCompilerVersion" => Ok(ServerError::UnsupportedCompilerVersion), + "CompilationError" => Ok(ServerError::CompilationError), + "InternalError" => Ok(ServerError::InternalError), + "VerificationError" => Ok(ServerError::VerificationError), + "UnknownNetworkError" => Ok(ServerError::UnknownNetworkError), + _ => Err(value.into()), + } + } +} diff --git a/crates/types/src/item/task_result.rs b/crates/types/src/item/task_result.rs new file mode 100644 index 0000000..9009866 --- /dev/null +++ b/crates/types/src/item/task_result.rs @@ -0,0 +1,378 @@ +use aws_sdk_dynamodb::types::AttributeValue; +use std::collections::HashMap; +use std::fmt::Formatter; + +use crate::item::errors::{ItemError, ServerError}; +use crate::item::AttributeMap; + +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq))] +pub enum TaskResult { + Success(TaskSuccess), + Failure(TaskFailure), +} + +impl TaskResult { + pub const fn attribute_name() -> &'static str { + "Data" + } + + pub const fn success_attribute_name() -> &'static str { + "Success" + } + + pub const fn failure_attribute_name() -> &'static str { + "Failure" + } +} + +impl std::fmt::Display for TaskResult { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + TaskResult::Success(_) => write!(f, "Success"), + TaskResult::Failure(msg) => write!(f, "Failure: {}", msg.message), + } + } +} + +impl From for AttributeMap { + fn from(value: TaskResult) -> Self { + match value { + TaskResult::Success(task_success) => HashMap::from([( + TaskResult::success_attribute_name().into(), + AttributeValue::M(task_success.into()), + )]), + TaskResult::Failure(failure) => failure.into(), + } + } +} + +impl TryFrom<&AttributeMap> for TaskResult { + type Error = ItemError; + fn try_from(value: &AttributeMap) -> Result { + let (key, value) = if let Some(el) = value.iter().next() { + if value.len() != 1 { + Err(ItemError::FormatError( + "More than 1 key in TaskSuccess".into(), + )) + } else { + Ok(el) + } + } else { + Err(ItemError::FormatError("No keys for TaskResult".into())) + }?; + + match key.as_str() { + "Success" => { + let data = value + .as_m() + .map_err(|_| ItemError::FormatError("invalid type".into()))?; + Ok(TaskResult::Success(data.try_into()?)) + } + "Failure" => Ok(TaskResult::Failure(value.try_into()?)), + _ => Err(ItemError::FormatError(format!( + "Invalid key in TaskResult: {key}" + ))), + } + } +} + +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq))] +pub enum TaskSuccess { + Compile { presigned_urls: Vec }, + Verify { message: String }, +} + +impl TaskSuccess { + pub const fn compile_attribute_name() -> &'static str { + "Compile" + } + + pub const fn verify_attribute_name() -> &'static str { + "Verify" + } +} + +impl From for AttributeMap { + fn from(value: TaskSuccess) -> Self { + match value { + TaskSuccess::Compile { presigned_urls } => HashMap::from([( + TaskSuccess::compile_attribute_name().into(), + AttributeValue::Ss(presigned_urls), + )]), + TaskSuccess::Verify { message } => HashMap::from([( + TaskSuccess::verify_attribute_name().into(), + AttributeValue::S(message), + )]), + } + } +} + +impl TryFrom<&AttributeMap> for TaskSuccess { + type Error = ItemError; + + fn try_from(value: &AttributeMap) -> Result { + let (key, value) = if let Some(el) = value.iter().next() { + if value.len() != 1 { + Err(ItemError::FormatError( + "More than 1 key in TaskSuccess".into(), + )) + } else { + Ok(el) + } + } else { + Err(ItemError::FormatError("No keys for TaskResult".into())) + }?; + + match key.as_str() { + "Compile" => { + let presigned_urls = value + .as_ss() + .map_err(|_| ItemError::FormatError("invalid type".into()))?; + + Ok(TaskSuccess::Compile { + presigned_urls: presigned_urls.clone(), + }) + } + "Verify" => { + let message = value + .as_s() + .map_err(|_| ItemError::FormatError("invalid type".into()))?; + + Ok(TaskSuccess::Verify { + message: message.clone(), + }) + } + _ => Err(ItemError::FormatError(format!( + "Invalid key in TaskSuccess: {key}" + ))), + } + } +} + +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq))] +pub struct TaskFailure { + pub error_type: ServerError, + pub message: String, +} + +impl TaskFailure { + pub const fn attribute_name() -> &'static str { + "Failure" + } +} + +impl From for AttributeMap { + fn from(value: TaskFailure) -> Self { + HashMap::from([( + TaskFailure::attribute_name().into(), + AttributeValue::Ss(vec![ + >::into(value.error_type).to_string(), + value.message, + ]), + )]) + } +} + +impl TryFrom<&AttributeValue> for TaskFailure { + type Error = ItemError; + + fn try_from(value: &AttributeValue) -> Result { + let data = value + .as_ss() + .map_err(|_| ItemError::FormatError("invalid type".into()))?; + if data.len() != 2 { + Err(ItemError::FormatError( + "Invalid Failure values format".into(), + )) + } else { + let error_type: ServerError = data[0] + .as_str() + .try_into() + .map_err(ItemError::FormatError)?; + Ok(TaskFailure { + error_type, + message: data[1].to_string(), + }) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use aws_sdk_dynamodb::types::AttributeValue; + use std::collections::HashMap; + + #[test] + fn test_task_success_compile_to_attribute_map() { + let task_success = TaskSuccess::Compile { + presigned_urls: vec!["url1".to_string(), "url2".to_string()], + }; + + let expected_map = HashMap::from([( + "Compile".to_string(), + AttributeValue::Ss(vec!["url1".to_string(), "url2".to_string()]), + )]); + + let attribute_map: AttributeMap = task_success.into(); + assert_eq!(attribute_map, expected_map); + } + + #[test] + fn test_task_success_compile_from_attribute_map() { + let attribute_map = HashMap::from([( + "Compile".to_string(), + AttributeValue::Ss(vec!["url1".to_string(), "url2".to_string()]), + )]); + + let expected_task_success = TaskSuccess::Compile { + presigned_urls: vec!["url1".to_string(), "url2".to_string()], + }; + + let result: TaskSuccess = (&attribute_map).try_into().expect("Conversion failed"); + assert_eq!(result, expected_task_success); + } + + #[test] + fn test_task_success_verify_to_attribute_map() { + let task_success = TaskSuccess::Verify { + message: "Verification successful".to_string(), + }; + + let expected_map = HashMap::from([( + "Verify".to_string(), + AttributeValue::S("Verification successful".to_string()), + )]); + + let attribute_map: AttributeMap = task_success.into(); + assert_eq!(attribute_map, expected_map); + } + + #[test] + fn test_task_success_verify_from_attribute_map() { + let attribute_map = HashMap::from([( + "Verify".to_string(), + AttributeValue::S("Verification successful".to_string()), + )]); + + let expected_task_success = TaskSuccess::Verify { + message: "Verification successful".to_string(), + }; + + let result: TaskSuccess = (&attribute_map).try_into().expect("Conversion failed"); + assert_eq!(result, expected_task_success); + } + + #[test] + fn test_task_result_failure_to_attribute_map() { + let task_result = TaskResult::Failure(TaskFailure { + error_type: ServerError::CompilationError, + message: "Compilation failed".to_string(), + }); + + let expected_map = HashMap::from([( + "Failure".to_string(), + AttributeValue::Ss(vec![ + "CompilationError".into(), + "Compilation failed".to_string(), + ]), + )]); + + let attribute_map: AttributeMap = task_result.into(); + assert_eq!(attribute_map, expected_map); + } + + #[test] + fn test_task_result_failure_from_attribute_map() { + let attribute_map = HashMap::from([( + "Failure".to_string(), + AttributeValue::Ss(vec![ + "CompilationError".into(), + "Compilation failed".to_string(), + ]), + )]); + + let expected_task_result = TaskResult::Failure(TaskFailure { + error_type: ServerError::CompilationError, + message: "Compilation failed".to_string(), + }); + + let result: TaskResult = (&attribute_map).try_into().expect("Conversion failed"); + assert_eq!(result, expected_task_result); + } + + #[test] + fn test_task_result_success_compile_to_attribute_map() { + let task_result = TaskResult::Success(TaskSuccess::Compile { + presigned_urls: vec!["url1".to_string(), "url2".to_string()], + }); + + let expected_map = HashMap::from([( + "Success".to_string(), + AttributeValue::M(HashMap::from([( + "Compile".to_string(), + AttributeValue::Ss(vec!["url1".to_string(), "url2".to_string()]), + )])), + )]); + + let attribute_map: AttributeMap = task_result.into(); + assert_eq!(attribute_map, expected_map); + } + + #[test] + fn test_task_result_success_compile_from_attribute_map() { + let attribute_map = HashMap::from([( + "Success".to_string(), + AttributeValue::M(HashMap::from([( + "Compile".to_string(), + AttributeValue::Ss(vec!["url1".to_string(), "url2".to_string()]), + )])), + )]); + + let expected_task_result = TaskResult::Success(TaskSuccess::Compile { + presigned_urls: vec!["url1".to_string(), "url2".to_string()], + }); + + let result: TaskResult = (&attribute_map).try_into().expect("Conversion failed"); + assert_eq!(result, expected_task_result); + } + + #[test] + fn test_task_result_success_verify_to_attribute_map() { + let task_result = TaskResult::Success(TaskSuccess::Verify { + message: "Verification successful".to_string(), + }); + + let expected_map = HashMap::from([( + "Success".to_string(), + AttributeValue::M(HashMap::from([( + "Verify".to_string(), + AttributeValue::S("Verification successful".to_string()), + )])), + )]); + + let attribute_map: AttributeMap = task_result.into(); + assert_eq!(attribute_map, expected_map); + } + + #[test] + fn test_task_result_success_verify_from_attribute_map() { + let attribute_map = HashMap::from([( + "Success".to_string(), + AttributeValue::M(HashMap::from([( + "Verify".to_string(), + AttributeValue::S("Verification successful".to_string()), + )])), + )]); + + let expected_task_result = TaskResult::Success(TaskSuccess::Verify { + message: "Verification successful".to_string(), + }); + + let result: TaskResult = (&attribute_map).try_into().expect("Conversion failed"); + assert_eq!(result, expected_task_result); + } +} diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index 0012302..cbebc38 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -27,6 +27,7 @@ pub struct VerifyConfig { pub network: String, pub contract_address: String, pub inputs: Vec, + pub target_contract: Option, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -56,3 +57,21 @@ impl SqsMessage { } } } + +#[derive(thiserror::Error, Debug)] +pub enum SqsRawMessageError { + #[error("Empty message body")] + NoMessageBody, + #[error(transparent)] + SerdeJsonError(#[from] serde_json::Error), +} + +impl TryFrom for SqsMessage { + type Error = SqsRawMessageError; + + fn try_from(value: aws_sdk_sqs::types::Message) -> Result { + let body = value.body.ok_or(SqsRawMessageError::NoMessageBody)?; + let sqs_message = serde_json::from_str::(&body)?; + Ok(sqs_message) + } +} diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index dc05778..d9ed605 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -17,6 +17,7 @@ tracing.workspace = true tracing-subscriber.workspace = true uuid.workspace = true +anyhow = "1.0.89" aws-runtime = "1.4.0" aws-smithy-types = "1.2.2" async-channel = "2.3.1" diff --git a/crates/worker/src/clients/dynamodb_clients/client.rs b/crates/worker/src/clients/dynamodb_clients/client.rs index 47a84e4..2d1e3be 100644 --- a/crates/worker/src/clients/dynamodb_clients/client.rs +++ b/crates/worker/src/clients/dynamodb_clients/client.rs @@ -117,7 +117,7 @@ impl DynamoDBClient { Err(DBError::GetItemError(err)) => { match_result!(DBGetError, Err(err)).map_err(DBError::from) } - result => result.map(|value| Some(value)), + result => result.map(Some), } } diff --git a/crates/worker/src/clients/errors.rs b/crates/worker/src/clients/errors.rs index 391b832..2e000a2 100644 --- a/crates/worker/src/clients/errors.rs +++ b/crates/worker/src/clients/errors.rs @@ -11,7 +11,7 @@ use aws_sdk_sqs::error::SdkError; use aws_sdk_sqs::operation::delete_message::DeleteMessageError; use aws_sdk_sqs::operation::receive_message::ReceiveMessageError; use tracing::error; -use types::item::ItemError; +use types::item::errors::ItemError; // SQS related errors pub(crate) type SqsReceiveError = SdkError; diff --git a/crates/worker/src/clients/retriable.rs b/crates/worker/src/clients/retriable.rs index 470e963..44bd91e 100644 --- a/crates/worker/src/clients/retriable.rs +++ b/crates/worker/src/clients/retriable.rs @@ -165,5 +165,5 @@ where let action = action_factory(sender); action_sender.send(action).await; - receiver.await.unwrap() // TODO: remove unwrap + receiver.await.unwrap() // TODO(101): remove unwrap } diff --git a/crates/worker/src/clients/s3_clients/client.rs b/crates/worker/src/clients/s3_clients/client.rs index 7082aed..fd26166 100644 --- a/crates/worker/src/clients/s3_clients/client.rs +++ b/crates/worker/src/clients/s3_clients/client.rs @@ -15,7 +15,7 @@ use crate::clients::errors::{ S3DeleteObjectError, S3Error, S3GetObjectError, S3ListObjectsError, S3PutObjectError, }; use crate::clients::retriable::{handle_action_result, match_result, ActionHandler}; -use crate::commands::compile::CompilationFile; +use crate::commands::CompilationFile; #[derive(Clone)] pub struct S3Client { @@ -70,7 +70,7 @@ impl S3Client { Err(S3Error::GetObjectError(err)) => { match_result!(S3GetObjectError, Err(err)).map_err(S3Error::from) } - result => result.map(|value| Some(value)), + result => result.map(Some), } } diff --git a/crates/worker/src/clients/s3_clients/wrapper.rs b/crates/worker/src/clients/s3_clients/wrapper.rs index 1869ae2..febe54d 100644 --- a/crates/worker/src/clients/s3_clients/wrapper.rs +++ b/crates/worker/src/clients/s3_clients/wrapper.rs @@ -10,7 +10,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::sync::{mpsc, oneshot}; use crate::clients::s3_clients::client::{S3Action, S3Client}; -use crate::commands::compile::CompilationFile; +use crate::commands::CompilationFile; #[derive(Clone)] pub struct S3ClientWrapper { diff --git a/crates/worker/src/commands/compile.rs b/crates/worker/src/commands/compile.rs index 8dcd4e5..1f46ad3 100644 --- a/crates/worker/src/commands/compile.rs +++ b/crates/worker/src/commands/compile.rs @@ -1,21 +1,17 @@ // TODO: extract in class +use anyhow::Context; use std::path::{Path, PathBuf}; use std::process::Stdio; use tracing::{error, info}; use types::CompilationConfig; use crate::commands::errors::CompilationError; -use crate::commands::SPAWN_SEMAPHORE; +use crate::commands::{CompilationFile, SPAWN_SEMAPHORE}; use crate::utils::cleaner::AutoCleanUp; use crate::utils::hardhat_config::HardhatConfigBuilder; use crate::utils::lib::{initialize_files, list_files_in_directory, DEFAULT_SOLIDITY_VERSION}; -pub struct CompilationFile { - pub file_path: PathBuf, - pub file_content: Vec, -} - pub struct CompilationInput { pub workspace_path: PathBuf, pub config: CompilationConfig, @@ -44,8 +40,24 @@ pub async fn do_compile( let hardhat_config_path = workspace_path.join("hardhat.config.ts"); // instantly create the directories - tokio::fs::create_dir_all(&workspace_path).await?; - tokio::fs::create_dir_all(&artifacts_path).await?; + tokio::fs::create_dir_all(&workspace_path) + .await + .map_err(anyhow::Error::from) + .with_context(|| { + format!( + "Couldn't create workspace dir: {}", + workspace_path.display() + ) + })?; + tokio::fs::create_dir_all(&artifacts_path) + .await + .map_err(anyhow::Error::from) + .with_context(|| { + format!( + "Couldn't create artifacts dir: {}", + artifacts_path.display() + ) + })?; // when the compilation is done, clean up the directories // it will be called when the AutoCleanUp struct is dropped @@ -66,8 +78,19 @@ pub async fn do_compile( let hardhat_config_content = hardhat_config_builder.build().to_string_config(); // create parent directories - tokio::fs::create_dir_all(hardhat_config_path.parent().unwrap()).await?; - tokio::fs::write(hardhat_config_path, hardhat_config_content).await?; + tokio::fs::create_dir_all(hardhat_config_path.parent().unwrap()) + .await + .map_err(anyhow::Error::from) + .with_context(|| { + format!( + "Couldn't create hardhat dir: {}", + hardhat_config_path.display() + ) + })?; + tokio::fs::write(hardhat_config_path, hardhat_config_content) + .await + .map_err(anyhow::Error::from) + .with_context(|| "Couldn't write hardhat.config file")?; // filter test files from compilation candidates let contracts = compilation_input @@ -77,7 +100,10 @@ pub async fn do_compile( .collect(); // initialize the files - initialize_files(&workspace_path, contracts).await?; + initialize_files(&workspace_path, contracts) + .await + .map_err(anyhow::Error::from) + .with_context(|| "Couldn't write contract to fs")?; // Limit number of spawned processes. RAII released let _permit = SPAWN_SEMAPHORE.acquire().await.expect("Expired semaphore"); @@ -88,9 +114,14 @@ pub async fn do_compile( .current_dir(&workspace_path) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .spawn()?; + .spawn() + .map_err(anyhow::Error::from) + .with_context(|| "Couldn't spawn process")?; - let output = process.wait_with_output().await?; + let output = process + .wait_with_output() + .await + .map_err(anyhow::Error::from)?; let status = output.status; let message = String::from_utf8_lossy(&output.stdout).to_string(); @@ -105,8 +136,10 @@ pub async fn do_compile( } // fetch the files in the artifacts directory - let file_paths = - list_files_in_directory(&artifacts_path).expect("Unexpected error listing artifact"); + let file_paths = list_files_in_directory(&artifacts_path) + .map_err(anyhow::Error::from) + .with_context(|| "Unexpected error listing artifact")?; + let artifacts_data = file_paths .into_iter() .map(|file_path| { diff --git a/crates/worker/src/commands/errors.rs b/crates/worker/src/commands/errors.rs index 641080d..e07beba 100644 --- a/crates/worker/src/commands/errors.rs +++ b/crates/worker/src/commands/errors.rs @@ -1,33 +1,45 @@ -use crate::clients::errors::{DBError, S3Error}; - -#[derive(thiserror::Error, Debug)] -pub enum PreparationError { - #[error("DBError: {0}")] - DBError(#[from] DBError), - #[error("S3Error: {0}")] - S3Error(#[from] S3Error), - #[error("Item isn't id DB: {0}")] - NoDBItemError(String), - #[error("Unexpected status: {0}")] - UnexpectedStatusError(String), - #[error("Unsupported version: {0}")] - VersionNotSupported(String), -} +use types::item::errors::ServerError; +use types::item::task_result::TaskFailure; #[derive(thiserror::Error, Debug)] pub enum CompilationError { - #[error("IoError: {0}")] - IoError(#[from] std::io::Error), - #[error("Failed to compile: {0}")] + #[error("CompilationFailureError: {0}")] CompilationFailureError(String), + #[error("UnknownError: {0}")] + UnknownError(#[from] anyhow::Error), } #[derive(thiserror::Error, Debug)] -pub enum CommandResultHandleError { - #[error("IoError: {0}")] - IoError(#[from] std::io::Error), - #[error("DBError: {0}")] - DBError(#[from] DBError), - #[error("S3Error: {0}")] - S3Error(#[from] S3Error), +pub enum VerificationError { + #[error("UnknownNetwork: {0}")] + UnknownNetworkError(String), + #[error("VerificationFailureError : {0}")] + VerificationFailureError(String), + #[error("UnknownError: {0}")] + UnknownError(#[from] anyhow::Error), +} + +impl Into for &VerificationError { + fn into(self) -> TaskFailure { + match self { + VerificationError::UnknownNetworkError(err) => TaskFailure { + error_type: ServerError::VerificationError, + message: err.to_string(), + }, + VerificationError::VerificationFailureError(err) => TaskFailure { + error_type: ServerError::UnsupportedCompilerVersion, + message: err.clone(), + }, + VerificationError::UnknownError(err) => TaskFailure { + error_type: ServerError::InternalError, + message: err.to_string(), + }, + } + } +} + +impl Into for VerificationError { + fn into(self) -> TaskFailure { + (&self).into() + } } diff --git a/crates/worker/src/commands/mod.rs b/crates/worker/src/commands/mod.rs index 2bf6f39..751013f 100644 --- a/crates/worker/src/commands/mod.rs +++ b/crates/worker/src/commands/mod.rs @@ -1,12 +1,18 @@ use lazy_static::lazy_static; +use std::path::PathBuf; use tokio::sync::Semaphore; pub mod compile; pub mod errors; -pub mod utils; pub mod verify; const PROCESS_SPAWN_LIMIT: usize = 8; lazy_static! { static ref SPAWN_SEMAPHORE: Semaphore = Semaphore::new(PROCESS_SPAWN_LIMIT); } + +#[derive()] +pub struct CompilationFile { + pub file_path: PathBuf, + pub file_content: Vec, +} diff --git a/crates/worker/src/commands/utils.rs b/crates/worker/src/commands/utils.rs deleted file mode 100644 index c333626..0000000 --- a/crates/worker/src/commands/utils.rs +++ /dev/null @@ -1,175 +0,0 @@ -// TODO: probably extract preparations to a class - -use aws_sdk_dynamodb::error::SdkError; -use aws_sdk_dynamodb::operation::update_item::UpdateItemError; -use aws_sdk_dynamodb::types::AttributeValue; -use aws_sdk_s3::presigning::PresigningConfig; -use std::path::Path; -use std::time::Duration; -use tracing::{error, warn}; -use types::item::{Item, Status, TaskResult}; -use types::{CompilationRequest, ARTIFACTS_FOLDER}; -use uuid::Uuid; - -use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; -use crate::clients::errors::{DBError, S3Error}; -use crate::clients::s3_clients::wrapper::S3ClientWrapper; -use crate::commands::compile::{CompilationInput, CompilationOutput}; -use crate::commands::errors::{CommandResultHandleError, PreparationError}; -use crate::utils::cleaner::AutoCleanUp; -use crate::utils::lib::{SOL_ROOT, ZKSOLC_VERSIONS}; - -async fn try_set_compiling_status( - db_client: &DynamoDBClientWrapper, - key: Uuid, -) -> Result<(), PreparationError> { - let db_update_result = db_client - .update_item_status_conditional( - key.to_string().as_str(), - &Status::Pending, - &Status::InProgress, - ) - .await; - match db_update_result { - Ok(_) => Ok(()), - Err(SdkError::ServiceError(err)) => match err.err() { - UpdateItemError::ConditionalCheckFailedException(_) => { - error!("Conditional check not met"); - Err(PreparationError::UnexpectedStatusError( - "Concurrent status change from another instance".into(), - )) - } - _ => Err(DBError::from(SdkError::ServiceError(err)).into()), - }, - Err(err) => Err(DBError::from(err).into()), - } -} - -pub(crate) async fn prepare_compile_input( - request: &CompilationRequest, - db_client: &DynamoDBClientWrapper, - s3_client: &S3ClientWrapper, -) -> Result { - let zksolc_version = request.config.version.as_str(); - if !ZKSOLC_VERSIONS.contains(&zksolc_version) { - return Err(PreparationError::VersionNotSupported( - zksolc_version.to_string(), - )); - } - - let item = db_client.get_item(request.id.to_string().as_str()).await?; - let item: Item = match item { - Some(item) => item, - None => { - return Err(PreparationError::NoDBItemError(request.id.to_string())); - } - }; - - if !matches!(item.status, Status::Pending) { - warn!("Item already processing: {}", item.status); - return Err(PreparationError::UnexpectedStatusError(item.status.to_string())); - } - - let dir = format!("{}/", request.id); - let files = s3_client.extract_files(&dir).await?; - - // Update status to Compiling - try_set_compiling_status(db_client, request.id).await?; - - Ok(CompilationInput { - workspace_path: Path::new(SOL_ROOT).join(request.id.to_string().as_str()), - config: request.config.clone(), - contracts: files, - }) -} - -pub async fn on_compilation_success( - id: Uuid, - db_client: &DynamoDBClientWrapper, - s3_client: &S3ClientWrapper, - compilation_output: CompilationOutput, -) -> Result { - const DOWNLOAD_URL_EXPIRATION: Duration = Duration::from_secs(5 * 60 * 60); - - let auto_clean_up = AutoCleanUp { - dirs: vec![compilation_output.artifacts_dir.to_str().unwrap()], - }; - - let mut presigned_urls = Vec::with_capacity(compilation_output.artifacts_data.len()); - for el in compilation_output.artifacts_data { - let absolute_path = compilation_output.artifacts_dir.join(&el.file_path); - let file_content = tokio::fs::File::open(absolute_path).await?; - - let file_key = format!( - "{}/{}/{}", - ARTIFACTS_FOLDER, - id, - el.file_path.to_str().unwrap() - ); - s3_client.put_object(&file_key, file_content).await?; - - let expires_in = PresigningConfig::expires_in(DOWNLOAD_URL_EXPIRATION).unwrap(); - let presigned_request = s3_client - .get_object_presigned(&file_key, &expires_in) - .await - .map_err(S3Error::from)?; - - presigned_urls.push(presigned_request.uri().to_string()); - } - - if presigned_urls.is_empty() { - // TODO: AttributeValue::Ss doesn't allow empty arrays. Decide what to do. for now - presigned_urls.push("".to_string()); - } - - let builder = db_client - .client - .client - .update_item() - .table_name(db_client.client.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) - .update_expression("SET #status = :newStatus, #data = :data") - .expression_attribute_names("#status", Status::attribute_name()) - .expression_attribute_names("#data", Item::data_attribute_name()) - .expression_attribute_values( - ":newStatus", - AttributeValue::N(2.to_string()), // Ready - ) - .expression_attribute_values(":data", AttributeValue::Ss(presigned_urls.clone())); - - db_client - .update_item_raw(&builder) - .await - .map_err(DBError::from)?; - - auto_clean_up.clean_up().await; - Ok(TaskResult::Success { presigned_urls }) -} - -pub async fn on_compilation_failed( - id: Uuid, - db_client: &DynamoDBClientWrapper, - message: String, -) -> Result { - let builder = db_client - .client - .client - .update_item() - .table_name(db_client.client.table_name.clone()) - .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) - .update_expression("SET #status = :newStatus, #data = :data") - .expression_attribute_names("#status", Status::attribute_name()) - .expression_attribute_names("#data", Item::data_attribute_name()) - .expression_attribute_values( - ":newStatus", - AttributeValue::N(3.to_string()), // Failed - ) - .expression_attribute_values(":data", AttributeValue::S(message.clone())); - - db_client - .update_item_raw(&builder) - .await - .map_err(DBError::from)?; - - Ok(TaskResult::Failure(message)) -} diff --git a/crates/worker/src/commands/verify.rs b/crates/worker/src/commands/verify.rs index a479eeb..1e2d3da 100644 --- a/crates/worker/src/commands/verify.rs +++ b/crates/worker/src/commands/verify.rs @@ -1,183 +1,152 @@ -// use crate::errors::{ApiError, Result}; -// use crate::handlers::process::{do_process_command, fetch_process_result}; -// use crate::handlers::types::{ApiCommand, ApiCommandResult, VerificationRequest, VerifyResponse}; -// use crate::handlers::SPAWN_SEMAPHORE; -// use crate::rate_limiter::RateLimited; -// use crate::utils::cleaner::AutoCleanUp; -// use crate::utils::hardhat_config::HardhatConfigBuilder; -// use crate::utils::lib::{ -// generate_folder_name, initialize_files, ALLOWED_NETWORKS, DEFAULT_SOLIDITY_VERSION, SOL_ROOT, -// ZKSOLC_VERSIONS, -// }; -// use crate::worker::WorkerEngine; -// use rocket::serde::{json, json::Json}; -// use rocket::{tokio, State}; -// use std::path::Path; -// use std::process::Stdio; -// use tracing::info; -// use tracing::instrument; -// -// #[instrument] -// #[post("/verify", format = "json", data = "")] -// pub async fn verify( -// verification_request_json: Json, -// _rate_limited: RateLimited, -// ) -> Json { -// info!("/verify"); -// -// do_verify(verification_request_json.0) -// .await -// .unwrap_or_else(|e| { -// Json(VerifyResponse { -// message: e.to_string(), -// status: "Error".to_string(), -// }) -// }) -// } -// -// #[instrument] -// #[post("/verify-async", format = "json", data = "")] -// pub fn verify_async( -// verification_request_json: Json, -// _rate_limited: RateLimited, -// engine: &State, -// ) -> String { -// info!("/verify-async",); -// -// do_process_command(ApiCommand::Verify(verification_request_json.0), engine) -// } -// -// #[instrument] -// #[get("/verify-result/")] -// pub async fn get_verify_result(process_id: String, engine: &State) -> String { -// info!("/verify-result/{:?}", process_id); -// -// fetch_process_result(process_id, engine, |result| match result { -// ApiCommandResult::Verify(verification_result) => { -// json::to_string(&verification_result).unwrap_or_default() -// } -// _ => String::from("Result not available"), -// }) -// } -// -// fn extract_verify_args(request: &VerificationRequest) -> Vec { -// let mut args: Vec = vec!["hardhat".into(), "verify".into(), "--network".into()]; -// if request.config.network == "sepolia" { -// args.push("zkSyncTestnet".into()) -// } else { -// args.push("zkSyncMainnet".into()) -// } -// -// if let Some(ref target_contract) = request.target_contract { -// args.push("--contract".into()); -// args.push(target_contract.clone()); -// } -// -// args.push(request.config.contract_address.clone()); -// args.extend(request.config.inputs.clone()); -// -// args -// } -// -// pub async fn do_verify(verification_request: VerificationRequest) -> Result> { -// let zksolc_version = verification_request.config.zksolc_version.clone(); -// -// // check if the version is supported -// if !ZKSOLC_VERSIONS.contains(&zksolc_version.as_str()) { -// return Err(ApiError::VersionNotSupported(zksolc_version)); -// } -// -// let solc_version = verification_request -// .config -// .solc_version -// .clone() -// .unwrap_or(DEFAULT_SOLIDITY_VERSION.to_string()); -// -// let network = verification_request.config.network.clone(); -// -// // check if the network is supported -// if !ALLOWED_NETWORKS.contains(&network.as_str()) { -// return Err(ApiError::UnknownNetwork(network)); -// } -// -// let namespace = generate_folder_name(); -// -// // root directory for the contracts -// let workspace_path_str = format!("{}/{}", SOL_ROOT, namespace); -// let workspace_path = Path::new(&workspace_path_str); -// -// // root directory for the artifacts -// let artifacts_path_str = format!("{}/{}", workspace_path_str, "artifacts-zk"); -// let artifacts_path = Path::new(&artifacts_path_str); -// -// // root directory for user files (hardhat config, etc) -// let user_files_path_str = workspace_path_str.clone(); -// let hardhat_config_path = Path::new(&user_files_path_str).join("hardhat.config.ts"); -// -// // instantly create the directories -// tokio::fs::create_dir_all(workspace_path) -// .await -// .map_err(ApiError::FailedToWriteFile)?; -// tokio::fs::create_dir_all(artifacts_path) -// .await -// .map_err(ApiError::FailedToWriteFile)?; -// -// // when the compilation is done, clean up the directories -// // it will be called when the AutoCleanUp struct is dropped -// let auto_clean_up = AutoCleanUp { -// dirs: vec![workspace_path.to_str().unwrap()], -// }; -// -// // write the hardhat config file -// let hardhat_config_content = HardhatConfigBuilder::new() -// .zksolc_version(&zksolc_version) -// .solidity_version(&solc_version) -// .build() -// .to_string_config(); -// -// // create parent directories -// tokio::fs::create_dir_all(hardhat_config_path.parent().unwrap()) -// .await -// .map_err(ApiError::FailedToWriteFile)?; -// -// tokio::fs::write(hardhat_config_path, hardhat_config_content) -// .await -// .map_err(ApiError::FailedToWriteFile)?; -// -// // initialize the files -// initialize_files(verification_request.contracts.clone(), workspace_path).await?; -// -// // Limit number of spawned processes. RAII released -// let _permit = SPAWN_SEMAPHORE.acquire().await.expect("Expired semaphore"); -// -// let args = extract_verify_args(&verification_request); -// let command = tokio::process::Command::new("npx") -// .args(args) -// .current_dir(workspace_path) -// .stdout(Stdio::piped()) -// .stderr(Stdio::piped()) -// .spawn(); -// -// let process = command.map_err(ApiError::FailedToExecuteCommand)?; -// let output = process -// .wait_with_output() -// .await -// .map_err(ApiError::FailedToReadOutput)?; -// let status = output.status; -// let message = String::from_utf8_lossy(&output.stdout).to_string(); -// -// // calling here explicitly to avoid dropping the AutoCleanUp struct -// auto_clean_up.clean_up().await; -// -// if !status.success() { -// return Ok(Json(VerifyResponse { -// status: "Error".to_string(), -// message: String::from_utf8_lossy(&output.stderr).to_string(), -// })); -// } -// -// Ok(Json(VerifyResponse { -// status: "Success".to_string(), -// message, -// })) -// } +use anyhow::Context; +use std::path::PathBuf; +use std::process::Stdio; +use types::VerifyConfig; + +use crate::commands::errors::VerificationError; +use crate::commands::{CompilationFile, SPAWN_SEMAPHORE}; +use crate::utils::cleaner::AutoCleanUp; +use crate::utils::hardhat_config::HardhatConfigBuilder; +use crate::utils::lib::{initialize_files, ALLOWED_NETWORKS, DEFAULT_SOLIDITY_VERSION}; + +pub struct VerificationInput { + pub workspace_path: PathBuf, + pub config: VerifyConfig, + pub contracts: Vec, +} + +fn extract_verify_args(config: &VerifyConfig) -> Vec { + let VerifyConfig { + target_contract, + network, + contract_address, + inputs, + .. + } = &config; + + let mut args: Vec = vec!["hardhat".into(), "verify".into(), "--network".into()]; + if network == "sepolia" { + args.push("zkSyncTestnet".into()) + } else { + args.push("zkSyncMainnet".into()) + } + + if let Some(target_contract) = target_contract { + args.push("--contract".into()); + args.push(target_contract.clone()); + } + + args.push(contract_address.clone()); + args.extend(inputs.clone()); + + args +} + +pub async fn do_verify( + verification_request: VerificationInput, +) -> Result { + let zksolc_version = verification_request.config.zksolc_version.clone(); + + let solc_version = verification_request + .config + .solc_version + .clone() + .unwrap_or(DEFAULT_SOLIDITY_VERSION.to_string()); + + let network = verification_request.config.network.clone(); + + // check if the network is supported + if !ALLOWED_NETWORKS.contains(&network.as_str()) { + return Err(VerificationError::UnknownNetworkError(network)); + } + + // root directory for the contracts + let workspace_path = verification_request.workspace_path; + // root directory for the artifacts + let artifacts_path = workspace_path.join("artifacts-zk"); + // root directory for user files (hardhat config, etc) + let hardhat_config_path = workspace_path.join("hardhat.config.ts"); + + // instantly create the directories + tokio::fs::create_dir_all(&workspace_path) + .await + .map_err(anyhow::Error::from) + .with_context(|| { + format!( + "Couldn't create workspace dir: {}", + workspace_path.display() + ) + })?; + tokio::fs::create_dir_all(&artifacts_path) + .await + .map_err(anyhow::Error::from) + .with_context(|| { + format!( + "Couldn't create artifacts dir: {}", + artifacts_path.display() + ) + })?; + + // when the compilation is done, clean up the directories + // it will be called when the AutoCleanUp struct is dropped + let auto_clean_up = AutoCleanUp { + dirs: vec![workspace_path.to_str().unwrap()], + }; + + // write the hardhat config file + let hardhat_config_content = HardhatConfigBuilder::new() + .zksolc_version(&zksolc_version) + .solidity_version(&solc_version) + .build() + .to_string_config(); + + // create parent directories + tokio::fs::create_dir_all(hardhat_config_path.parent().unwrap()) + .await + .map_err(anyhow::Error::from) + .with_context(|| { + format!( + "Couldn't create hardhat dir: {}", + hardhat_config_path.display() + ) + })?; + tokio::fs::write(hardhat_config_path, hardhat_config_content) + .await + .map_err(anyhow::Error::from) + .with_context(|| "Couldn't write hardhat.config file")?; + + // initialize the files + initialize_files(&workspace_path, verification_request.contracts) + .await + .map_err(anyhow::Error::from) + .with_context(|| "Couldn't write contract to fs")?; + + // Limit number of spawned processes. RAII released + let _permit = SPAWN_SEMAPHORE.acquire().await.expect("Expired semaphore"); + + let args = extract_verify_args(&verification_request.config); + let process = tokio::process::Command::new("npx") + .args(args) + .current_dir(&workspace_path) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(anyhow::Error::from) + .with_context(|| "Couldn't spawn process")?; + + let output = process + .wait_with_output() + .await + .map_err(anyhow::Error::from)?; + let status = output.status; + let message = String::from_utf8_lossy(&output.stdout).to_string(); + + // calling here explicitly to avoid dropping the AutoCleanUp struct + auto_clean_up.clean_up().await; + + if !status.success() { + Err(VerificationError::VerificationFailureError(message)) + } else { + Ok(message) + } +} diff --git a/crates/worker/src/errors.rs b/crates/worker/src/errors.rs index d7bc43c..648dae1 100644 --- a/crates/worker/src/errors.rs +++ b/crates/worker/src/errors.rs @@ -1,7 +1,6 @@ -use types::item::ItemError; +use types::item::errors::ItemError; -use crate::clients::errors::{DBError, S3Error, SqsDeleteError}; -use crate::commands::errors::{CommandResultHandleError, PreparationError}; +use crate::clients::errors::{DBError, S3Error}; #[derive(thiserror::Error, Debug)] pub enum PurgeError { @@ -12,15 +11,3 @@ pub enum PurgeError { #[error("ItemError: {0}")] ItemError(#[from] ItemError), } - -#[derive(thiserror::Error, Debug)] -pub enum MessageProcessorError { - #[error("PreparationError: {0}")] - PreparationError(#[from] PreparationError), - #[error("CommandResultHandleError: {0}")] - CommandResultHandleError(#[from] CommandResultHandleError), - #[error("S3Error: {0}")] - S3Error(#[from] S3Error), - #[error("SqsDeleteError: {0}")] - SqsDeleteError(#[from] SqsDeleteError), -} diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 428309f..8682a7c 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,6 +1,7 @@ mod clients; mod commands; mod errors; +mod processor; mod purgatory; mod sqs_listener; mod utils; @@ -13,6 +14,10 @@ use std::num::NonZeroUsize; use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::clients::sqs_clients::wrapper::SqsClientWrapper; +use crate::processor::compile_processor::CompileProcessor; +use crate::processor::verify_processor::VerifyProcessor; +use crate::processor::Processor; +use crate::purgatory::Purgatory; use crate::worker::EngineBuilder; const AWS_PROFILE_DEFAULT: &str = "dev"; @@ -55,7 +60,24 @@ async fn main() { let s3_client = aws_sdk_s3::Client::new(&config); let s3_client = S3ClientWrapper::new(s3_client, BUCKET_NAME_DEFAULT); - let engine = EngineBuilder::new(sqs_client, db_client, s3_client); + let purgatory = Purgatory::new(db_client.clone(), s3_client.clone()); + + // Initialize processors + let compile_processor = + CompileProcessor::new(sqs_client.clone(), s3_client.clone(), purgatory.clone()); + let verify_processor = + VerifyProcessor::new(sqs_client.clone(), s3_client.clone(), purgatory.clone()); + let processor = Processor::new( + db_client, + s3_client, + sqs_client.clone(), + compile_processor, + verify_processor, + purgatory, + ); + + // Engine + let engine = EngineBuilder::new(sqs_client, processor); let running_engine = engine.start(NonZeroUsize::new(10).unwrap()); running_engine.wait().await; diff --git a/crates/worker/src/processor.rs b/crates/worker/src/processor.rs new file mode 100644 index 0000000..4584f19 --- /dev/null +++ b/crates/worker/src/processor.rs @@ -0,0 +1,166 @@ +use anyhow::{anyhow, Context}; +use aws_sdk_dynamodb::error::SdkError; +use aws_sdk_dynamodb::operation::update_item::UpdateItemError; +use aws_sdk_dynamodb::types::AttributeValue; +use tracing::error; +use types::item::task_result::TaskSuccess; +use types::item::{task_result::TaskResult, Item, Status}; +use types::{SqsMessage, VerificationRequest}; +use uuid::Uuid; + +use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; +use crate::clients::s3_clients::wrapper::S3ClientWrapper; +use crate::clients::sqs_clients::wrapper::SqsClientWrapper; +use crate::processor::compile_processor::CompileProcessor; +use crate::processor::verify_processor::VerifyProcessor; +use crate::purgatory::Purgatory; + +pub mod compile_processor; +pub mod errors; +mod input_preparator; +pub mod verify_processor; + +// TODO: generic in the future, handling specific message type- chain dependant. +pub struct Processor { + db_client: DynamoDBClientWrapper, + s3_client: S3ClientWrapper, + sqs_client: SqsClientWrapper, + compile_processor: CompileProcessor, + verify_processor: VerifyProcessor, + purgatory: Purgatory, +} + +impl Processor { + pub fn new( + db_client: DynamoDBClientWrapper, + s3_client: S3ClientWrapper, + sqs_client: SqsClientWrapper, + compile_processor: CompileProcessor, + verify_processor: VerifyProcessor, + purgatory: Purgatory, + ) -> Self { + Self { + db_client, + compile_processor, + verify_processor, + sqs_client, + s3_client, + purgatory, + } + } + + async fn lock_item(&self, id: Uuid) -> anyhow::Result<()> { + self.db_client + .update_item_status_conditional( + id.to_string().as_str(), + &Status::Pending, + &Status::InProgress, + ) + .await + .map_err(|err| match err { + SdkError::ServiceError(err) => match err.into_err() { + UpdateItemError::ConditionalCheckFailedException(err) => { + anyhow!("Couldn't lock the item: {}", err) + } + err => anyhow!(err), + }, + err => anyhow!(err), + }) + } + + // TODO(future me): could return bool. + pub async fn process_message( + &self, + sqs_message: SqsMessage, + receipt_handle: String, + ) -> anyhow::Result<()> { + let id = sqs_message.id(); + self.lock_item(id).await.or_else(|err| { + let sqs_client = self.sqs_client.clone(); + let receipt_handle_copy = receipt_handle.clone(); + tokio::spawn(async move { + // That could be due to wrong Status or no item + // 1. No item is possible in case GlobalState purges old message - delete from sqs + // 2. Wrong Status - other instance picked this up. + // For sake of safety still try to delete it. Doesn't matter if succeeds. + if let Err(err) = sqs_client.delete_message(receipt_handle_copy).await { + error!("Couldn't delete sqs message: {err}") + } + }); + + Err(err) + })?; + + let task_result = match sqs_message { + SqsMessage::Compile { request } => { + let result = self + .compile_processor + .process_message(request, receipt_handle) + .await; + + match result { + Ok(val) => TaskResult::Success(TaskSuccess::Compile { + presigned_urls: val, + }), + Err(err) => TaskResult::Failure(err.into()), + } + } + SqsMessage::Verify { request } => { + let result = self + .verify_processor + .process_message(request, receipt_handle) + .await; + match result { + Ok(message) => TaskResult::Success(TaskSuccess::Verify { message }), + Err(err) => TaskResult::Failure(err.into()), + } + } + }; + + self.handle_task_result(id, task_result).await + } + + async fn handle_task_result(&self, id: Uuid, task_result: TaskResult) -> anyhow::Result<()> { + let mut builder = self + .db_client + .client + .client + .update_item() + .table_name(self.db_client.client.table_name.clone()) + .key(Item::primary_key_name(), AttributeValue::S(id.to_string())) + .update_expression("SET #status = :newStatus, #data = :data") + .expression_attribute_names("#status", Status::attribute_name()) + .expression_attribute_names("#data", TaskResult::attribute_name()); + + builder = match task_result { + TaskResult::Success(value) => { + builder + .expression_attribute_values( + ":newStatus", + AttributeValue::N(2.to_string()), // Ready + ) + .expression_attribute_values( + ":data", + AttributeValue::M(TaskResult::Success(value).into()), + ) + } + TaskResult::Failure(message) => { + builder + .expression_attribute_values( + ":newStatus", + AttributeValue::N(3.to_string()), // Failed + ) + .expression_attribute_values( + ":data", + AttributeValue::M(TaskResult::Failure(message).into()), + ) + } + }; + + self.db_client + .update_item_raw(&builder) + .await + .map_err(anyhow::Error::from) + .with_context(|| "Couldn't write task result") + } +} diff --git a/crates/worker/src/processor/compile_processor.rs b/crates/worker/src/processor/compile_processor.rs new file mode 100644 index 0000000..9fa0d83 --- /dev/null +++ b/crates/worker/src/processor/compile_processor.rs @@ -0,0 +1,192 @@ +use anyhow::Context; +use aws_sdk_s3::presigning::PresigningConfig; +use std::time::Duration; +use tracing::error; +use types::item::errors::ServerError; +use types::item::task_result::{TaskFailure, TaskResult, TaskSuccess}; +use types::{CompilationRequest, ARTIFACTS_FOLDER}; +use uuid::Uuid; + +use crate::clients::s3_clients::wrapper::S3ClientWrapper; +use crate::clients::sqs_clients::wrapper::SqsClientWrapper; +use crate::commands::compile::{do_compile, CompilationOutput}; +use crate::processor::errors::CompileProcessorError; +use crate::processor::input_preparator::CompileInputPreparator; +use crate::purgatory::Purgatory; +use crate::utils::cleaner::AutoCleanUp; +use crate::utils::lib::{s3_compilation_files_dir, ZKSOLC_VERSIONS}; + +pub struct CompileProcessor { + sqs_client: SqsClientWrapper, + s3_client: S3ClientWrapper, + input_preparator: CompileInputPreparator, + purgatory: Purgatory, +} + +impl CompileProcessor { + pub fn new( + sqs_client: SqsClientWrapper, + s3_client: S3ClientWrapper, + purgatory: Purgatory, + ) -> Self { + let input_preparator = CompileInputPreparator::new(s3_client.clone()); + Self { + sqs_client, + s3_client, + input_preparator, + purgatory, + } + } + + async fn validate_message( + &self, + message: &CompilationRequest, + ) -> Result<(), CompileProcessorError> { + let zksolc_version = message.config.version.as_str(); + if !ZKSOLC_VERSIONS.contains(&zksolc_version) { + Err(CompileProcessorError::VersionNotSupportedError( + zksolc_version.to_string(), + )) + } else { + Ok(()) + } + } + + // TODO(future me): could return bool. + pub async fn process_message( + &self, + message: CompilationRequest, + receipt_handle: String, + ) -> Result, CompileProcessorError> { + let id = message.id; + + self.validate_message(&message).await.map_err(|err| { + // Reckoned as independent piece + let receipt_handle_copy = receipt_handle.clone(); + let dir = s3_compilation_files_dir(id.to_string().as_str()); + let s3_client = self.s3_client.clone(); + let sqs_client = self.sqs_client.clone(); + tokio::spawn(async move { + if let Err(err) = s3_client.delete_dir(&dir).await { + error!("Couldn't delete compilation files on failed validation: {err}") + } + if let Err(err) = sqs_client.delete_message(receipt_handle_copy).await { + error!("Failed to delete message from sqs: {err}"); + } + }); + + err + })?; + + let compilation_input = self + .input_preparator + .prepare_input(&message) + .await + .map_err(|err| { + let receipt_handle_copy = receipt_handle.clone(); + let sqs_client = self.sqs_client.clone(); + tokio::spawn(async move { + if let Err(err) = sqs_client.delete_message(receipt_handle_copy).await { + error!("Failed to delete message from sqs: {err}"); + } + }); + + err + })?; + + let compilation_output = do_compile(compilation_input).await?; + let file_keys = self.upload_artifacts(id, compilation_output).await?; + + // Reckoned as independent piece + { + let dir = s3_compilation_files_dir(id.to_string().as_str()); + let s3_client = self.s3_client.clone(); + let sqs_client = self.sqs_client.clone(); + tokio::spawn(async move { + // Clean compilation input files right away + if let Err(err) = s3_client.delete_dir(&dir).await { + error!("Filed to delete compilation file: {err}") + } + if let Err(err) = sqs_client.delete_message(receipt_handle).await { + error!("Failed to delete message from sqs: {err}"); + } + }); + } + + match self.generate_presigned_urls(&file_keys).await { + Ok(presigned_urls) => { + self.purgatory + .add_record( + id, + TaskResult::Success(TaskSuccess::Compile { + presigned_urls: presigned_urls.clone(), + }), + ) + .await; + Ok(presigned_urls) + } + Err(err) => { + let task_result = TaskResult::Failure(TaskFailure { + error_type: ServerError::InternalError, + message: err.to_string(), + }); + self.purgatory.add_record(id, task_result).await; + + Err(err.into()) + } + } + } + + async fn upload_artifacts( + &self, + id: Uuid, + compilation_output: CompilationOutput, + ) -> anyhow::Result> { + let auto_clean_up = AutoCleanUp { + dirs: vec![compilation_output.artifacts_dir.to_str().unwrap()], + }; + + let mut file_keys = Vec::with_capacity(compilation_output.artifacts_data.len()); + for el in compilation_output.artifacts_data { + let absolute_path = compilation_output.artifacts_dir.join(&el.file_path); + let file_content = tokio::fs::File::open(absolute_path.clone()) + .await + .map_err(anyhow::Error::from) + .with_context(|| format!("Couldn't open file: {}", absolute_path.display()))?; + + let file_key = format!( + "{}/{}/{}", + ARTIFACTS_FOLDER, + id, + el.file_path.to_str().unwrap() + ); + self.s3_client + .put_object(&file_key, file_content) + .await + .map_err(anyhow::Error::from) + .with_context(|| "Couldn't upload artifact")?; // TODO: TODO(101) + file_keys.push(file_key); + } + + auto_clean_up.clean_up().await; + Ok(file_keys) + } + + async fn generate_presigned_urls(&self, file_keys: &[String]) -> anyhow::Result> { + const DOWNLOAD_URL_EXPIRATION: Duration = Duration::from_secs(5 * 60 * 60); + + let mut presigned_urls = Vec::with_capacity(file_keys.len()); + for el in file_keys { + let expires_in = PresigningConfig::expires_in(DOWNLOAD_URL_EXPIRATION).unwrap(); + let presigned_request = self + .s3_client + .get_object_presigned(el.as_str(), &expires_in) + .await + .map_err(anyhow::Error::from)?; // TODO: maybe extra handle in case chan closed TODO(101) + + presigned_urls.push(presigned_request.uri().to_string()); + } + + Ok(presigned_urls) + } +} diff --git a/crates/worker/src/processor/errors.rs b/crates/worker/src/processor/errors.rs new file mode 100644 index 0000000..6f012da --- /dev/null +++ b/crates/worker/src/processor/errors.rs @@ -0,0 +1,59 @@ +use types::item::errors::ServerError; +use types::item::task_result::TaskFailure; + +use crate::commands::errors::{CompilationError, VerificationError}; + +#[derive(thiserror::Error, Debug)] +pub enum CompileProcessorError { + #[error("Unsupported version: {0}")] + VersionNotSupportedError(String), + #[error("CompilationError: {0}")] + CompilationError(#[from] CompilationError), + #[error("UnknownError: {0}")] + UnknownError(#[from] anyhow::Error), +} + +impl Into for CompileProcessorError { + fn into(self) -> TaskFailure { + match self { + Self::CompilationError(err) => TaskFailure { + error_type: ServerError::CompilationError, + message: err.to_string(), + }, + Self::VersionNotSupportedError(err) => TaskFailure { + error_type: ServerError::UnsupportedCompilerVersion, + message: err, + }, + Self::UnknownError(err) => TaskFailure { + error_type: ServerError::InternalError, + message: err.to_string(), + }, + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum VerifyProcessorError { + #[error("Unsupported version: {0}")] + VersionNotSupportedError(String), + #[error("CompilationError: {0}")] + VerificationError(#[from] VerificationError), + #[error("UnknownError: {0}")] + UnknownError(#[from] anyhow::Error), +} + +impl Into for VerifyProcessorError { + fn into(self) -> TaskFailure { + match self { + Self::VerificationError(err) => err.into(), + Self::VersionNotSupportedError(err) => TaskFailure { + error_type: ServerError::UnsupportedCompilerVersion, + message: err, + }, + Self::UnknownError(err) => TaskFailure { + error_type: ServerError::InternalError, + message: err.to_string(), + }, + } + } +} diff --git a/crates/worker/src/processor/input_preparator.rs b/crates/worker/src/processor/input_preparator.rs new file mode 100644 index 0000000..7200758 --- /dev/null +++ b/crates/worker/src/processor/input_preparator.rs @@ -0,0 +1,63 @@ +use std::path::Path; +use types::{CompilationRequest, VerificationRequest}; + +use crate::clients::s3_clients::wrapper::S3ClientWrapper; +use crate::commands::compile::CompilationInput; +use crate::commands::verify::VerificationInput; +use crate::utils::lib::SOL_ROOT; + +pub struct CompileInputPreparator { + s3_client: S3ClientWrapper, +} + +impl CompileInputPreparator { + pub fn new(s3_client: S3ClientWrapper) -> Self { + Self { s3_client } + } + + pub(crate) async fn prepare_input( + &self, + request: &CompilationRequest, + ) -> anyhow::Result { + let dir = format!("{}/", request.id); + let files = self + .s3_client + .extract_files(&dir) + .await + .map_err(anyhow::Error::from)?; + + Ok(CompilationInput { + workspace_path: Path::new(SOL_ROOT).join(request.id.to_string().as_str()), + config: request.config.clone(), + contracts: files, + }) + } +} + +pub struct VerifyInputPreparator { + s3_client: S3ClientWrapper, +} + +impl VerifyInputPreparator { + pub fn new(s3_client: S3ClientWrapper) -> Self { + Self { s3_client } + } + + pub(crate) async fn prepare_input( + &self, + request: &VerificationRequest, + ) -> anyhow::Result { + let dir = format!("{}/", request.id); + let files = self + .s3_client + .extract_files(&dir) + .await + .map_err(anyhow::Error::from)?; + + Ok(VerificationInput { + workspace_path: Path::new(SOL_ROOT).join(request.id.to_string().as_str()), + config: request.config.clone(), + contracts: files, + }) + } +} diff --git a/crates/worker/src/processor/verify_processor.rs b/crates/worker/src/processor/verify_processor.rs new file mode 100644 index 0000000..61da14a --- /dev/null +++ b/crates/worker/src/processor/verify_processor.rs @@ -0,0 +1,128 @@ +use tracing::error; +use types::item::task_result::{TaskResult, TaskSuccess}; +use types::VerificationRequest; + +use crate::clients::s3_clients::wrapper::S3ClientWrapper; +use crate::clients::sqs_clients::wrapper::SqsClientWrapper; +use crate::commands::verify::do_verify; +use crate::processor::errors::VerifyProcessorError; +use crate::processor::input_preparator::VerifyInputPreparator; +use crate::purgatory::Purgatory; +use crate::utils::lib::{s3_compilation_files_dir, ZKSOLC_VERSIONS}; + +// TODO: make generic via adding MessageProcessor trait with process_message(...) +pub struct VerifyProcessor { + sqs_client: SqsClientWrapper, + s3_client: S3ClientWrapper, + input_preparator: VerifyInputPreparator, + purgatory: Purgatory, +} + +impl VerifyProcessor { + pub fn new( + sqs_client: SqsClientWrapper, + s3_client: S3ClientWrapper, + purgatory: Purgatory, + ) -> Self { + let input_preparator = VerifyInputPreparator::new(s3_client.clone()); + Self { + sqs_client, + s3_client, + input_preparator, + purgatory, + } + } + + async fn validate_message( + &self, + message: &VerificationRequest, + ) -> Result<(), VerifyProcessorError> { + let zksolc_version = message.config.zksolc_version.as_str(); + if !ZKSOLC_VERSIONS.contains(&zksolc_version) { + Err(VerifyProcessorError::VersionNotSupportedError( + zksolc_version.to_string(), + )) + } else { + Ok(()) + } + } + + pub async fn process_message( + &self, + message: VerificationRequest, + receipt_handle: String, + ) -> Result { + let id = message.id; + + // TODO: validator accepting SqsMessage + self.validate_message(&message).await.map_err(|err| { + // Reckoned as independent piece + let receipt_handle_copy = receipt_handle.clone(); + let dir = s3_compilation_files_dir(id.to_string().as_str()); + let s3_client = self.s3_client.clone(); + let sqs_client = self.sqs_client.clone(); + tokio::spawn(async move { + if let Err(err) = s3_client.delete_dir(&dir).await { + error!("Couldn't delete compilation files on failed validation: {err}") + } + if let Err(err) = sqs_client.delete_message(receipt_handle_copy).await { + error!("Failed to delete message from sqs: {err}"); + } + }); + + err + })?; + + let input = self + .input_preparator + .prepare_input(&message) + .await + .map_err(|err| { + let receipt_handle_copy = receipt_handle.clone(); + let sqs_client = self.sqs_client.clone(); + tokio::spawn(async move { + if let Err(err) = sqs_client.delete_message(receipt_handle_copy).await { + error!("Failed to delete message from sqs: {err}"); + } + }); + + err + })?; + + // Reckoned as independent piece + { + let dir = s3_compilation_files_dir(id.to_string().as_str()); + let s3_client = self.s3_client.clone(); + let sqs_client = self.sqs_client.clone(); + tokio::spawn(async move { + // Clean compilation input files right away + if let Err(err) = s3_client.delete_dir(&dir).await { + error!("Filed to delete compilation file: {err}") + } + if let Err(err) = sqs_client.delete_message(receipt_handle).await { + error!("Failed to delete message from sqs: {err}"); + } + }); + } + + match do_verify(input).await { + Ok(message) => { + self.purgatory + .add_record( + id, + TaskResult::Success(TaskSuccess::Verify { + message: message.clone(), + }), + ) + .await; + Ok(message) + } + Err(err) => { + let task_result = TaskResult::Failure((&err).into()); + self.purgatory.add_record(id, task_result).await; + + Err(err.into()) + } + } + } +} diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index 41336e3..3d7992d 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -4,10 +4,11 @@ use std::marker::PhantomData; use std::ptr::NonNull; use std::sync::Arc; use std::time::Duration; -use tokio::time::{interval, sleep}; +use tokio::time::interval; use tokio::{sync::Mutex, task::JoinHandle}; use tracing::warn; -use types::item::{Item, ItemError, Status, TaskResult}; +use types::item::errors::ItemError; +use types::item::{task_result::TaskResult, Item, Status}; use uuid::Uuid; use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; @@ -46,15 +47,15 @@ impl Purgatory { this } - pub async fn purge(&mut self) { + pub async fn purge(&self) { self.inner.lock().await.purge().await; } - pub async fn add_record(&mut self, id: Uuid, result: TaskResult) { + pub async fn add_record(&self, id: Uuid, result: TaskResult) { self.inner.lock().await.add_record(id, result); } - async fn daemon(mut self) { + async fn daemon(self) { const PURGE_INTERVAL: Duration = Duration::from_secs(60); let mut interval = interval(PURGE_INTERVAL); @@ -144,6 +145,7 @@ impl Inner { id: &Uuid, status: &Status, ) -> Result<(), PurgeError> { + // TODO: recheck status of the item first match status { Status::InProgress => warn!("Item compiling for too long!"), Status::Pending => { diff --git a/crates/worker/src/utils/lib.rs b/crates/worker/src/utils/lib.rs index a895459..2317ea9 100644 --- a/crates/worker/src/utils/lib.rs +++ b/crates/worker/src/utils/lib.rs @@ -4,7 +4,7 @@ use types::ARTIFACTS_FOLDER; use uuid::Uuid; use walkdir::WalkDir; -use crate::commands::compile::CompilationFile; +use crate::commands::CompilationFile; pub const SOL_ROOT: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/", "hardhat_env/workspaces/"); pub const ZK_CACHE_ROOT: &str = concat!( diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index 11a2f0b..4c49bc4 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -1,39 +1,24 @@ use std::num::NonZeroUsize; +use std::sync::Arc; use std::time::Duration; use tokio::task::JoinHandle; -use tracing::{error, warn}; -use types::{CompilationRequest, SqsMessage, VerificationRequest}; +use tracing::error; -use crate::clients::dynamodb_clients::wrapper::DynamoDBClientWrapper; -use crate::clients::s3_clients::wrapper::S3ClientWrapper; use crate::clients::sqs_clients::wrapper::SqsClientWrapper; -use crate::commands::compile::{do_compile, CompilationInput}; -use crate::commands::errors::PreparationError; -use crate::commands::utils::{ - on_compilation_failed, on_compilation_success, prepare_compile_input, -}; -use crate::errors::MessageProcessorError; -use crate::purgatory::Purgatory; +use crate::processor::Processor; use crate::sqs_listener::{SqsListener, SqsReceiver}; -use crate::utils::lib::s3_compilation_files_dir; pub struct EngineBuilder { sqs_client: SqsClientWrapper, - db_client: DynamoDBClientWrapper, - s3_client: S3ClientWrapper, + processor: Processor, running_workers: Vec, } impl EngineBuilder { - pub fn new( - sqs_client: SqsClientWrapper, - db_client: DynamoDBClientWrapper, - s3_client: S3ClientWrapper, - ) -> Self { + pub fn new(sqs_client: SqsClientWrapper, processor: Processor) -> Self { EngineBuilder { sqs_client, - db_client, - s3_client, + processor, running_workers: vec![], } } @@ -41,212 +26,61 @@ impl EngineBuilder { pub fn start(self, num_workers: NonZeroUsize) -> RunningEngine { let sqs_listener = SqsListener::new(self.sqs_client, Duration::from_millis(500)); - RunningEngine::new( - sqs_listener, - self.db_client, - self.s3_client, - num_workers.get(), - ) + RunningEngine::new(sqs_listener, self.processor, num_workers.get()) } } pub struct RunningEngine { sqs_listener: SqsListener, - purgatory: Purgatory, num_workers: usize, worker_threads: Vec>, } impl RunningEngine { - pub fn new( - sqs_listener: SqsListener, - db_client: DynamoDBClientWrapper, - s3_client: S3ClientWrapper, - num_workers: usize, - ) -> Self { - let purgatory = Purgatory::new(db_client.clone(), s3_client.clone()); - + pub fn new(sqs_listener: SqsListener, processor: Processor, num_workers: usize) -> Self { + let arc_processor = Arc::new(processor); let mut worker_threads = Vec::with_capacity(num_workers); for _ in 0..num_workers { // Start worker let sqs_receiver = sqs_listener.receiver(); - let db_client_copy = db_client.clone(); - let s3_client_copy = s3_client.clone(); - let purgatory_copy = purgatory.clone(); - + let arc_processor_copy = arc_processor.clone(); worker_threads.push(tokio::spawn(async move { - RunningEngine::worker(sqs_receiver, db_client_copy, s3_client_copy, purgatory_copy) - .await; + RunningEngine::worker(sqs_receiver, arc_processor_copy).await; })); } Self { sqs_listener, - purgatory, num_workers, worker_threads, } } - async fn worker( - sqs_receiver: SqsReceiver, - db_client: DynamoDBClientWrapper, - s3_client: S3ClientWrapper, - mut purgatory: Purgatory, - ) { - // TODO: process error + async fn worker(sqs_receiver: SqsReceiver, processor: Arc) { while let Ok(message) = sqs_receiver.recv().await { - let receipt_handle = if let Some(receipt_handle) = message.receipt_handle { - receipt_handle - } else { - continue; - }; - - let body = if let Some(body) = message.body { - body + let receipt_handle = if let Some(ref receipt_handle) = message.receipt_handle { + receipt_handle.to_owned() } else { - warn!("Has handle but not body"); - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } - continue; }; - let sqs_message = match serde_json::from_str::(&body) { - Ok(sqs_message) => sqs_message, + let sqs_message = match message.try_into() { + Ok(val) => val, Err(err) => { - error!("Could not deserialize message: {}", err.to_string()); - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } - + error!("Error converting into SqsMessage: {err}"); continue; } }; - // TODO: add metrics for how long it takes - + // TODO: add metrics for how long it takes & // adjust "visibility timeout" or receiver chan capacity - match sqs_message { - SqsMessage::Compile { request } => { - let result = Self::process_compile_message( - request, - receipt_handle, - &sqs_receiver, - &db_client, - &s3_client, - &mut purgatory, - ) - .await; - if let Err(err) = result { - error!("{}", err); - } - } - SqsMessage::Verify { request } => { - Self::process_verify_message(request, receipt_handle, &sqs_receiver).await - } + // TODO: add critical errors and return + if let Err(err) = processor.process_message(sqs_message, receipt_handle).await { + error!("{err}"); } } } - // TODO(future me): could return bool. - async fn process_compile_message( - request: CompilationRequest, - receipt_handle: String, - sqs_receiver: &SqsReceiver, - db_client: &DynamoDBClientWrapper, - s3_client: &S3ClientWrapper, - purgatory: &mut Purgatory, - ) -> Result<(), MessageProcessorError> { - let compilation_input = Self::handle_prepare_compile_input( - &request, - &receipt_handle, - sqs_receiver, - db_client, - s3_client, - ) - .await?; - - let id = request.id; - let task_result = match do_compile(compilation_input).await { - Ok(value) => on_compilation_success(id, &db_client, &s3_client, value).await?, - Err(err) => on_compilation_failed(id, &db_client, err.to_string()).await?, - }; - purgatory.add_record(id, task_result).await; - - // Clean compilation input files right away - let dir = s3_compilation_files_dir(id.to_string().as_str()); - s3_client.delete_dir(&dir).await?; - - sqs_receiver.delete_message(receipt_handle).await?; - Ok(()) - } - - // TODO(future me): extract in a class - pub(crate) async fn handle_prepare_compile_input( - request: &CompilationRequest, - receipt_handle: &str, - sqs_receiver: &SqsReceiver, - db_client: &DynamoDBClientWrapper, - s3_client: &S3ClientWrapper, - ) -> Result { - let id = request.id; - let result = match prepare_compile_input(&request, db_client, s3_client).await { - Ok(value) => Ok(value), - Err(PreparationError::NoDBItemError(err)) => { - // Possible in case GlobalState purges old message - // that somehow stuck in queue for too long - sqs_receiver.delete_message(receipt_handle).await?; - Err(PreparationError::NoDBItemError(err)) - } - Err(PreparationError::UnexpectedStatusError(err)) => { - // Probably some other instance executing this at the same time. - // For sake of safety still try to delete it. Doesn't matter if succeeds. - // No need to clean up s3 - sqs_receiver.delete_message(receipt_handle).await?; - Err(PreparationError::UnexpectedStatusError(err)) - } - Err(PreparationError::VersionNotSupported(err)) => { - // Clean everything since the request failed - let dir = s3_compilation_files_dir(id.to_string().as_str()); - s3_client.delete_dir(&dir).await?; - - // This error doesn't create any artifacts - let _ = on_compilation_failed( - id, - &db_client, - PreparationError::VersionNotSupported(err.clone()).to_string(), - ) - .await?; - - sqs_receiver.delete_message(receipt_handle).await?; - Err(PreparationError::VersionNotSupported(err)) - } - Err(PreparationError::S3Error(err)) => { - // Certain cases don't require delete_message - Err(PreparationError::S3Error(err)) - } - Err(PreparationError::DBError(err)) => { - // Certain cases don't require delete_message - Err(PreparationError::DBError(err)) - } - }; - - result.map_err(MessageProcessorError::from) - } - - async fn process_verify_message( - request: VerificationRequest, - receipt_handle: String, - sqs_receiver: &SqsReceiver, - ) { - // TODO: implement - - if let Err(err) = sqs_receiver.delete_message(receipt_handle).await { - warn!("{}", err); - } - } - pub async fn wait(self) { futures::future::join_all(self.worker_threads).await; }