Skip to content
This repository has been archived by the owner on Dec 6, 2024. It is now read-only.

Commit

Permalink
Merge pull request #177 from NethermindEth/feat/horizontal/worker-setup
Browse files Browse the repository at this point in the history
Engine refactoring, AWS clients, and compile integration
  • Loading branch information
taco-paco committed Sep 18, 2024
2 parents 0c07917 + cb616b9 commit 41413ba
Show file tree
Hide file tree
Showing 37 changed files with 2,386 additions and 110 deletions.
8 changes: 6 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ Cargo.lock
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb

node_modules
api/logs
api/hardhat_env/workspaces

node_modules

# MacOS related
.DS_Store
.DS_Store

# credentials
/crates/worker/credentials
22 changes: 22 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[workspace]
members = [
"crates/types",
"crates/lambdas",
"crates/worker",
]
exclude = ["api"]

[workspace.dependencies]
aws-config = "1.5.5"
aws-sdk-s3 = "1.43.0"
aws-sdk-sqs = "1.39.0"
aws-sdk-dynamodb = "1.42.0"
tokio = {version = "1.39.3", features = ["macros"]}
serde = "1.0.207"
serde_json = "1.0.124"
thiserror = "1.0.63"
tracing = { version = "0.1.40", features = ["log"] }
tracing-subscriber = { version = "0.3.18", default-features = false, features = ["fmt", "ansi"] }
uuid = { version = "1.10.0", features = ["serde", "v4"] }

types = {version = "0.0.1", path = "crates/types"}
1 change: 0 additions & 1 deletion api/src/handlers/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub struct CompilationConfig {
pub struct CompilationRequest {
pub config: CompilationConfig,
pub contracts: Vec<CompiledFile>,
pub target_path: Option<String>,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
Expand Down
34 changes: 34 additions & 0 deletions crates/lambdas/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "zksync-lambdas"
version = "0.0.1"
edition = "2021"
authors = ["[email protected]"]

[dependencies]
aws-config = {workspace = true}
aws-sdk-s3 = {workspace = true}
aws-sdk-sqs = {workspace = true}
aws-sdk-dynamodb = {workspace = true}
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
uuid = {workspace = true}

lambda_runtime = "0.13.0"
lambda_http = "0.13.0"

# Inner crates
types = {workspace = true}

[[bin]]
name = "generate-presigned-urls"
version = "0.0.1"
path = "src/generate_presigned_urls.rs"

[[bin]]
name = "compile"
version = "0.0.1"
path = "src/compile.rs"
File renamed without changes.
File renamed without changes.
4 changes: 4 additions & 0 deletions crates/lambdas/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod errors;
pub mod utils;

pub const BUCKET_NAME_DEFAULT: &str = "zksync-compilation-s3";
File renamed without changes.
32 changes: 17 additions & 15 deletions lambdas/src/compile.rs → crates/lambdas/src/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,19 @@ use aws_sdk_dynamodb::{error::SdkError, operation::put_item::PutItemError};
use lambda_http::{
run, service_fn, Error as LambdaError, Request as LambdaRequest, Response as LambdaResponse,
};
use serde::Deserialize;
use std::ops::Add;
use tracing::{error, info};
use types::{CompilationRequest, SqsMessage, item::{Item, Status}};

mod common;
use crate::common::{errors::Error, utils::extract_request, Item, Status, BUCKET_NAME_DEFAULT};
use crate::common::{errors::Error, utils::extract_request, BUCKET_NAME_DEFAULT};

// TODO: remove on release
const QUEUE_URL_DEFAULT: &str = "https://sqs.ap-southeast-2.amazonaws.com/266735844848/zksync-sqs";
const TABLE_NAME_DEFAULT: &str = "zksync-table";

const NO_OBJECTS_TO_COMPILE_ERROR: &str = "There are no objects to compile";
const RECOMPILATION_ATTEMPT_ERROR: &str = "Recompilation attemp";

#[derive(Debug, Deserialize)]
struct Request {
pub id: String,
}

// impl Deserialize for Response {
// fn deserialize<'de, D>(deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'de> {
// todo!()
Expand All @@ -35,14 +29,14 @@ struct Request {
// }

async fn compile(
id: String,
request: CompilationRequest,
dynamo_client: &aws_sdk_dynamodb::Client,
table_name: &str,
sqs_client: &aws_sdk_sqs::Client,
queue_url: &str,
) -> Result<(), Error> {
let item = Item {
id: id.clone(),
id: request.id.clone(),
status: Status::Pending,
};

Expand All @@ -55,10 +49,10 @@ async fn compile(
.await;

match result {
Ok(val) => val,
Ok(value) => value,
Err(SdkError::ServiceError(val)) => match val.err() {
PutItemError::ConditionalCheckFailedException(_) => {
error!("Recompilation attempt, id: {}", id);
error!("Recompilation attempt, id: {}", request.id);
let response = lambda_http::Response::builder()
.status(400)
.header("content-type", "text/html")
Expand All @@ -72,10 +66,18 @@ async fn compile(
Err(err) => return Err(Box::new(err).into()),
};

let message = SqsMessage::Compile { request };
let message = match serde_json::to_string(&message) {
Ok(val) => val,
Err(err) => {
error!("Serialization failed, id: {:?}", message);
return Err(Box::new(err).into());
}
};
let message_output = sqs_client
.send_message()
.queue_url(queue_url)
.message_body(id)
.message_body(message)
.send()
.await
.map_err(Box::new)?;
Expand Down Expand Up @@ -105,7 +107,7 @@ async fn process_request(
s3_client: &aws_sdk_s3::Client,
bucket_name: &str,
) -> Result<LambdaResponse<String>, Error> {
let request = extract_request::<Request>(request)?;
let request = extract_request::<CompilationRequest>(request)?;

let objects = s3_client
.list_objects_v2()
Expand All @@ -128,7 +130,7 @@ async fn process_request(
}

info!("Compile");
compile(request.id, dynamo_client, table_name, sqs_client, queue_url).await?;
compile(request, dynamo_client, table_name, sqs_client, queue_url).await?;

let response = LambdaResponse::builder()
.status(200)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,5 @@ async fn main() -> Result<(), LambdaError> {
Err(Error::LambdaError(err)) => Err(err),
}
}))
.await
.await
}
10 changes: 10 additions & 0 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "types"
version = "0.0.1"
edition = "2021"

[dependencies]
aws-sdk-dynamodb = {workspace = true}
serde = {workspace = true}
thiserror = {worksapce = true}
uuid = {workspace = true}
165 changes: 165 additions & 0 deletions crates/types/src/item.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use aws_sdk_dynamodb::types::AttributeValue;
use serde::Serialize;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;

pub type AttributeMap = HashMap<String, AttributeValue>;

#[derive(Debug, Clone, Serialize)]
pub enum Status {
// TODO: add FilesUploaded(?)
Pending,
Compiling,
Ready {
presigned_urls: Vec<String>,
},
Failed(String),
}

impl Status {
pub const fn attribute_name() -> &'static str {
"Status"
}
}

impl fmt::Display for Status {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Status::Pending => write!(f, "Pending"),
Status::Compiling => write!(f, "Compiling"),
Status::Ready { .. } => write!(f, "Ready"),
Status::Failed(msg) => write!(f, "Failed: {}", msg),
}
}
}

impl From<&Status> for u32 {
fn from(value: &Status) -> Self {
match value {
Status::Pending => 0,
Status::Compiling => 1,
Status::Ready { .. } => 2,
Status::Failed(_) => 3,
}
}
}

impl From<Status> for u32 {
fn from(value: Status) -> Self {
u32::from(&value)
}
}

impl From<Status> for HashMap<String, AttributeValue> {
fn from(value: Status) -> Self {
match value.clone() {
Status::Pending | Status::Compiling => HashMap::from([(
Status::attribute_name().into(),
AttributeValue::N(u32::from(&value).to_string()),
)]),
Status::Ready { presigned_urls } => HashMap::from([
(
Status::attribute_name().into(),
AttributeValue::N(u32::from(&value).to_string()),
),
(Item::data_attribute_name().into(), AttributeValue::Ss(presigned_urls)),
]),
Status::Failed(val) => HashMap::from([
(
Status::attribute_name().into(),
AttributeValue::N(u32::from(&value).to_string()),
),
(Item::data_attribute_name().into(), AttributeValue::S(val)),
]),
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum ItemError {
#[error("Invalid Item format")]
FormatError,
#[error(transparent)]
ParseError(#[from] std::num::ParseIntError),
}

pub struct Item {
// TODO: uuid?
pub id: String,
pub status: Status,
// TODO: type: Compiling/Verifying
}

impl Item {
pub const fn status_attribute_name() -> &'static str {
Status::attribute_name()
}

pub const fn data_attribute_name() -> &'static str {
"Data"
}

pub const fn id_attribute_name() -> &'static str {
"ID"
}

pub const fn primary_key_name() -> &'static str {
Self::id_attribute_name()
}
}

impl From<Item> for AttributeMap {
fn from(value: Item) -> Self {
let mut item_map = HashMap::from([(Item::id_attribute_name().into(), AttributeValue::S(value.id))]);
item_map.extend(HashMap::from(value.status));

item_map
}
}

impl TryFrom<&AttributeMap> for Status {
type Error = ItemError;
fn try_from(value: &AttributeMap) -> Result<Self, Self::Error> {
let status = value.get(Status::attribute_name()).ok_or(ItemError::FormatError)?;
let status: u32 = status
.as_n()
.map_err(|_| ItemError::FormatError)?
.parse::<u32>()?;
let status = match status {
0 => Status::Pending,
1 => Status::Compiling,
2 => {
let data = value.get(Item::data_attribute_name()).ok_or(ItemError::FormatError)?;
let data = data.as_ss().map_err(|_| ItemError::FormatError)?;

Status::Ready {
presigned_urls: data.clone(),
}
}
3 => {
let data = value.get(Item::data_attribute_name()).ok_or(ItemError::FormatError)?;
let data = data.as_s().map_err(|_| ItemError::FormatError)?;

Status::Failed(data.clone())
}
_ => return Err(ItemError::FormatError),
};

Ok(status)
}
}

impl TryFrom<AttributeMap> for Item {
type Error = ItemError;
fn try_from(value: AttributeMap) -> Result<Item, Self::Error> {
let id = value.get(Item::id_attribute_name()).ok_or(ItemError::FormatError)?;
let id = id.as_s().map_err(|_| ItemError::FormatError)?;
let status = (&value).try_into()?;

Ok(Item {
id: id.clone(),
status,
})
}
}
Loading

0 comments on commit 41413ba

Please sign in to comment.