Skip to content
This repository has been archived by the owner on Dec 6, 2024. It is now read-only.

Verification + refactoring with preparations to generalization #202

Merged
merged 13 commits into from
Oct 18, 2024
5 changes: 5 additions & 0 deletions crates/lambdas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ path = "src/generate_presigned_urls.rs"
name = "compile"
version = "0.0.1"
path = "src/compile.rs"

[[bin]]
name = "verify"
version = "0.0.1"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@taco-paco taco-paco Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.1.0 makes sense.

And the version can be set at a workspace level, e.g.

I'm afraid release cycle for components won't be the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

path = "src/verify.rs"
13 changes: 8 additions & 5 deletions crates/lambdas/src/compile.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use aws_config::BehaviorVersion;
use aws_sdk_dynamodb::{error::SdkError, operation::put_item::PutItemError};
use chrono::Utc;
use lambda_http::{
run, service_fn, Error as LambdaError, Request as LambdaRequest, Response as LambdaResponse,
};
use std::ops::Add;
use chrono::Utc;
use tracing::{error, info};
use types::{CompilationRequest, SqsMessage, item::{Item, Status}};
use types::{
item::{Item, Status},
CompilationRequest, SqsMessage,
};

mod common;
use crate::common::{errors::Error, utils::extract_request, BUCKET_NAME_DEFAULT};

// TODO: remove on release
// TODO: remove on release. random change
const QUEUE_URL_DEFAULT: &str = "https://sqs.ap-southeast-2.amazonaws.com/266735844848/zksync-sqs";
const TABLE_NAME_DEFAULT: &str = "zksync-table";

Expand Down Expand Up @@ -40,7 +43,7 @@ async fn compile(
let item = Item {
id: request.id,
status: Status::Pending,
created_at
created_at,
};

let result = dynamo_client
Expand Down Expand Up @@ -181,4 +184,4 @@ async fn main() -> Result<(), LambdaError> {
}
}))
.await
}
}
176 changes: 176 additions & 0 deletions crates/lambdas/src/verify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use aws_config::BehaviorVersion;
use aws_sdk_dynamodb::{error::SdkError, operation::put_item::PutItemError};
use chrono::Utc;
use lambda_http::{
run, service_fn, Error as LambdaError, Request as LambdaRequest, Response as LambdaResponse,
};
use std::ops::Add;
use tracing::{error, info};
use types::{
item::{Item, Status},
SqsMessage, VerificationRequest,
};

mod common;
use crate::common::{errors::Error, utils::extract_request, BUCKET_NAME_DEFAULT};

// TODO: remove on release
const QUEUE_URL_DEFAULT: &str = "https://sqs.ap-southeast-2.amazonaws.com/266735844848/zksync-sqs";
const TABLE_NAME_DEFAULT: &str = "zksync-table";

const NO_OBJECTS_TO_COMPILE_ERROR: &str = "There are no objects to compile";
const RECOMPILATION_ATTEMPT_ERROR: &str = "Recompilation attemp";

async fn verify(
request: VerificationRequest,
dynamo_client: &aws_sdk_dynamodb::Client,
table_name: &str,
sqs_client: &aws_sdk_sqs::Client,
queue_url: &str,
) -> Result<(), Error> {
let created_at = Utc::now();
let item = Item {
id: request.id,
status: Status::Pending,
created_at,
};

let result = dynamo_client
.put_item()
.table_name(table_name)
.set_item(Some(item.into()))
.condition_expression("attribute_not_exists(ID)")
.send()
.await;

match result {
Ok(value) => value,
Err(SdkError::ServiceError(val)) => match val.err() {
PutItemError::ConditionalCheckFailedException(_) => {
error!("Reverification attempt, id: {}", request.id);
let response = lambda_http::Response::builder()
.status(400)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: perhaps, it is worth to use a designated const for readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of them were replaced here. Child branch tho

.header("content-type", "text/html")
.body(RECOMPILATION_ATTEMPT_ERROR.into())
.map_err(Error::from)?;

return Err(Error::HttpError(response));
}
_ => return Err(Box::new(SdkError::ServiceError(val)).into()),
},
Err(err) => return Err(Box::new(err).into()),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: probably, error handling would be more ergonomic with anyhow crate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done here

};

let message = SqsMessage::Verify { request };
let message = match serde_json::to_string(&message) {
Ok(val) => val,
Err(err) => {
error!("Serialization failed, id: {:?}", message);
return Err(Box::new(err).into());
}
};
let message_output = sqs_client
.send_message()
.queue_url(queue_url)
.message_body(message)
.send()
.await
.map_err(Box::new)?;

info!(
"message sent to sqs. message_id: {}",
message_output.message_id.unwrap_or("empty_id".into())
);

Ok(())
}

#[tracing::instrument(skip(
dynamo_client,
table_name,
sqs_client,
queue_url,
s3_client,
bucket_name
))]
async fn process_request(
request: LambdaRequest,
dynamo_client: &aws_sdk_dynamodb::Client,
table_name: &str,
sqs_client: &aws_sdk_sqs::Client,
queue_url: &str,
s3_client: &aws_sdk_s3::Client,
bucket_name: &str,
) -> Result<LambdaResponse<String>, Error> {
let request = extract_request::<VerificationRequest>(request)?;

let objects = s3_client
.list_objects_v2()
.delimiter('/')
.prefix(request.id.to_string().add("/"))
.bucket(bucket_name)
.send()
.await
.map_err(Box::new)?;

if let None = &objects.contents {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: more idiomatic

objects.contents.as_ref().ok_or_else(|| {...})?;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done here

error!("No objects in folder: {}", request.id);
let response = LambdaResponse::builder()
.status(400)
.header("content-type", "text/html")
.body(NO_OBJECTS_TO_COMPILE_ERROR.into())
.map_err(Error::from)?;

return Err(Error::HttpError(response));
}

info!("Verify");
verify(request, dynamo_client, table_name, sqs_client, queue_url).await?;

let response = LambdaResponse::builder()
.status(200)
.header("content-type", "text/html")
.body(Default::default())
.map_err(Box::new)?;

return Ok(response);
}

#[tokio::main]
async fn main() -> Result<(), LambdaError> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_ansi(false)
.without_time() // CloudWatch will add the ingestion time
.with_target(false)
.init();

let queue_url = std::env::var("QUEUE_URL").unwrap_or(QUEUE_URL_DEFAULT.into());
let table_name = std::env::var("TABLE_NAME").unwrap_or(TABLE_NAME_DEFAULT.into());
let bucket_name = std::env::var("BUCKET_NAME").unwrap_or(BUCKET_NAME_DEFAULT.into());

let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let dynamo_client = aws_sdk_dynamodb::Client::new(&config);
let sqs_client = aws_sdk_sqs::Client::new(&config);
let s3_client = aws_sdk_s3::Client::new(&config);

run(service_fn(|request: LambdaRequest| async {
let result = process_request(
request,
&dynamo_client,
&table_name,
&sqs_client,
&queue_url,
&s3_client,
&bucket_name,
)
.await;

match result {
Ok(val) => Ok(val),
Err(Error::HttpError(val)) => Ok(val),
Err(Error::LambdaError(err)) => Err(err),
}
}))
.await
}
2 changes: 2 additions & 0 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ edition = "2021"

[dependencies]
aws-sdk-dynamodb.workspace = true
aws-sdk-sqs.workspace = true
chrono.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
uuid.workspace = true

Expand Down
Loading
Loading