Skip to content
This repository has been archived by the owner on Dec 6, 2024. It is now read-only.

Implementation of periodic purging #181

Merged
merged 9 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ aws-sdk-sqs = "1.39.0"
aws-sdk-dynamodb = "1.42.0"
tokio = {version = "1.39.3", features = ["macros"]}
serde = "1.0.207"
serde_json = "1.0.124"
serde_json = "1.0.128"
thiserror = "1.0.63"
tracing = { version = "0.1.40", features = ["log"] }
tracing-subscriber = { version = "0.3.18", default-features = false, features = ["fmt", "ansi"] }
Expand Down
6 changes: 3 additions & 3 deletions crates/lambdas/src/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn compile(
queue_url: &str,
) -> Result<(), Error> {
let item = Item {
id: request.id.clone(),
id: request.id.to_string(),
status: Status::Pending,
};

Expand Down Expand Up @@ -112,7 +112,7 @@ async fn process_request(
let objects = s3_client
.list_objects_v2()
.delimiter('/')
.prefix(request.id.clone().add("/"))
.prefix(request.id.to_string().add("/"))
.bucket(bucket_name)
.send()
.await
Expand Down Expand Up @@ -178,4 +178,4 @@ async fn main() -> Result<(), LambdaError> {
}
}))
.await
}
}
3 changes: 3 additions & 0 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ aws-sdk-dynamodb = {workspace = true}
serde = {workspace = true}
thiserror = {worksapce = true}
uuid = {workspace = true}

[dev-dependencies]
serde_json = {workspace = true}
varex83 marked this conversation as resolved.
Show resolved Hide resolved
104 changes: 62 additions & 42 deletions crates/types/src/item.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use aws_sdk_dynamodb::types::AttributeValue;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;

pub type AttributeMap = HashMap<String, AttributeValue>;

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TaskResult {
Success { presigned_urls: Vec<String> },
Failure(String),
}

#[derive(Debug, Clone)]
pub enum Status {
// TODO: add FilesUploaded(?)
Pending,
Compiling,
Ready {
presigned_urls: Vec<String>,
},
Failed(String),
InProgress,
Done(TaskResult),
}

impl Status {
Expand All @@ -27,9 +30,9 @@ impl fmt::Display for Status {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Status::Pending => write!(f, "Pending"),
Status::Compiling => write!(f, "Compiling"),
Status::Ready { .. } => write!(f, "Ready"),
Status::Failed(msg) => write!(f, "Failed: {}", msg),
Status::InProgress => write!(f, "Compiling"),
Status::Done(TaskResult::Success { .. }) => write!(f, "Success"),
Status::Done(TaskResult::Failure(msg)) => write!(f, "Failure: {}", msg),
}
}
}
Expand All @@ -38,9 +41,9 @@ impl From<&Status> for u32 {
fn from(value: &Status) -> Self {
match value {
Status::Pending => 0,
Status::Compiling => 1,
Status::Ready { .. } => 2,
Status::Failed(_) => 3,
Status::InProgress => 1,
Status::Done(TaskResult::Success { .. }) => 2,
Status::Done(TaskResult::Failure(_)) => 3,
}
}
}
Expand All @@ -51,31 +54,37 @@ impl From<Status> for u32 {
}
}

impl From<Status> for HashMap<String, AttributeValue> {
fn from(value: Status) -> Self {
match value.clone() {
Status::Pending | Status::Compiling => HashMap::from([(
Status::attribute_name().into(),
AttributeValue::N(u32::from(&value).to_string()),
impl From<TaskResult> for AttributeMap {
fn from(value: TaskResult) -> Self {
match value {
TaskResult::Success { presigned_urls } => HashMap::from([(
Item::data_attribute_name().into(),
AttributeValue::Ss(presigned_urls),
)]),
TaskResult::Failure(message) => HashMap::from([(
Item::data_attribute_name().into(),
AttributeValue::S(message),
)]),
Status::Ready { presigned_urls } => HashMap::from([
(
Status::attribute_name().into(),
AttributeValue::N(u32::from(&value).to_string()),
),
(Item::data_attribute_name().into(), AttributeValue::Ss(presigned_urls)),
]),
Status::Failed(val) => HashMap::from([
(
Status::attribute_name().into(),
AttributeValue::N(u32::from(&value).to_string()),
),
(Item::data_attribute_name().into(), AttributeValue::S(val)),
]),
}
}
}

impl From<Status> for AttributeMap {
fn from(value: Status) -> Self {
let mut map = HashMap::from([(
Status::attribute_name().into(),
AttributeValue::N(u32::from(&value).to_string()),
)]);

// For `Done` variant, reuse the conversion logic from `TaskResult`
if let Status::Done(task_result) = value {
map.extend(AttributeMap::from(task_result));
}

map
}
}

#[derive(thiserror::Error, Debug)]
pub enum ItemError {
#[error("Invalid Item format")]
Expand Down Expand Up @@ -111,7 +120,10 @@ impl Item {

impl From<Item> for AttributeMap {
fn from(value: Item) -> Self {
let mut item_map = HashMap::from([(Item::id_attribute_name().into(), AttributeValue::S(value.id))]);
let mut item_map = HashMap::from([(
Item::id_attribute_name().into(),
AttributeValue::S(value.id),
)]);
item_map.extend(HashMap::from(value.status));

item_map
Expand All @@ -121,27 +133,33 @@ impl From<Item> for AttributeMap {
impl TryFrom<&AttributeMap> for Status {
type Error = ItemError;
fn try_from(value: &AttributeMap) -> Result<Self, Self::Error> {
let status = value.get(Status::attribute_name()).ok_or(ItemError::FormatError)?;
let status = value
.get(Status::attribute_name())
.ok_or(ItemError::FormatError)?;
let status: u32 = status
.as_n()
.map_err(|_| ItemError::FormatError)?
.parse::<u32>()?;
let status = match status {
0 => Status::Pending,
1 => Status::Compiling,
1 => Status::InProgress,
2 => {
let data = value.get(Item::data_attribute_name()).ok_or(ItemError::FormatError)?;
let data = value
.get(Item::data_attribute_name())
.ok_or(ItemError::FormatError)?;
let data = data.as_ss().map_err(|_| ItemError::FormatError)?;

Status::Ready {
Status::Done(TaskResult::Success {
presigned_urls: data.clone(),
}
})
}
3 => {
let data = value.get(Item::data_attribute_name()).ok_or(ItemError::FormatError)?;
let data = value
.get(Item::data_attribute_name())
.ok_or(ItemError::FormatError)?;
let data = data.as_s().map_err(|_| ItemError::FormatError)?;

Status::Failed(data.clone())
Status::Done(TaskResult::Failure(data.clone()))
}
_ => return Err(ItemError::FormatError),
};
Expand All @@ -153,7 +171,9 @@ impl TryFrom<&AttributeMap> for Status {
impl TryFrom<AttributeMap> for Item {
type Error = ItemError;
fn try_from(value: AttributeMap) -> Result<Item, Self::Error> {
let id = value.get(Item::id_attribute_name()).ok_or(ItemError::FormatError)?;
let id = value
.get(Item::id_attribute_name())
.ok_or(ItemError::FormatError)?;
let id = id.as_s().map_err(|_| ItemError::FormatError)?;
let status = (&value).try_into()?;

Expand Down
14 changes: 12 additions & 2 deletions crates/types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod item;

use serde::{Deserialize, Serialize};
use uuid::Uuid;

pub const ARTIFACTS_FOLDER: &str = "artifacts";

Expand All @@ -15,7 +16,7 @@ pub struct CompilationConfig {

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct CompilationRequest {
pub id: String,
pub id: Uuid,
pub config: CompilationConfig,
}

Expand All @@ -30,7 +31,7 @@ pub struct VerifyConfig {

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct VerificationRequest {
pub id: String,
pub id: Uuid,
pub config: VerifyConfig,
}

Expand All @@ -46,3 +47,12 @@ pub enum SqsMessage {
request: VerificationRequest,
},
}

impl SqsMessage {
pub fn id(&self) -> Uuid {
match self {
SqsMessage::Compile {request} => request.id,
SqsMessage::Verify {request} => request.id,
}
}
}
2 changes: 1 addition & 1 deletion crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ lazy_static = "1.5.0"
walkdir = "2.3.2"

# Inner crates
types = {workspace = true}
types = { workspace = true }
4 changes: 4 additions & 0 deletions crates/worker/src/clients.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod dynamodb_client;
pub mod errors;
pub mod s3_client;
pub mod sqs_clients;
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::errors::DBError;
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::Client;
use types::item::Item;

use crate::clients::errors::DBError;

#[derive(Clone)]
pub struct DynamoDBClient {
pub client: Client,
Expand All @@ -17,23 +18,23 @@ impl DynamoDBClient {
}
}

pub async fn delete_item(&self, id: String) -> Result<(), DBError> {
pub async fn delete_item(&self, id: &str) -> Result<(), DBError> {
self.client
.delete_item()
.table_name(self.table_name.clone())
.key(Item::primary_key_name(), AttributeValue::S(id))
.key(Item::primary_key_name(), AttributeValue::S(id.to_string()))
.send()
.await?;

Ok(())
}

pub async fn get_item(&self, id: String) -> Result<Option<Item>, DBError> {
pub async fn get_item(&self, key: &str) -> Result<Option<Item>, DBError> {
let result = self
.client
.get_item()
.table_name(self.table_name.clone())
.key(Item::primary_key_name(), AttributeValue::S(id))
.key(Item::primary_key_name(), AttributeValue::S(key.to_string()))
.send()
.await?;

Expand Down
67 changes: 67 additions & 0 deletions crates/worker/src/clients/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use aws_sdk_dynamodb::config::http::HttpResponse;
use aws_sdk_dynamodb::operation::delete_item::DeleteItemError;
use aws_sdk_dynamodb::operation::get_item::GetItemError;
use aws_sdk_dynamodb::operation::update_item::UpdateItemError;
use aws_sdk_s3::operation::delete_object::DeleteObjectError;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error;
use aws_sdk_s3::operation::put_object::PutObjectError;
use aws_sdk_sqs::error::SdkError;
use aws_sdk_sqs::operation::delete_message::DeleteMessageError;
use aws_sdk_sqs::operation::receive_message::ReceiveMessageError;
use tracing::error;
use types::item::ItemError;

// SQS related errors
pub(crate) type SqsReceiveError = SdkError<ReceiveMessageError, HttpResponse>;
pub(crate) type SqsDeleteError = SdkError<DeleteMessageError, HttpResponse>;

// DynamoDB related errors
pub(crate) type DBDeleteError = SdkError<DeleteItemError, HttpResponse>;
pub(crate) type DBGetError = SdkError<GetItemError, HttpResponse>;

pub(crate) type DBUpdateError = SdkError<UpdateItemError, HttpResponse>;

// S3 related errors
pub(crate) type S3ListObjectsError = SdkError<ListObjectsV2Error, HttpResponse>;
pub(crate) type S3GetObjectError = SdkError<GetObjectError, HttpResponse>;
pub(crate) type S3PutObjectError = SdkError<PutObjectError, HttpResponse>;
pub(crate) type S3DeleteObjectError = SdkError<DeleteObjectError, HttpResponse>;

#[derive(thiserror::Error, Debug)]
pub enum SqsError {
#[error("SqsReceiveError: {0}")]
ReceiveError(#[from] SqsReceiveError),
#[error("SqsDeleteError: {0}")]
DeleteError(#[from] SqsDeleteError),
}

#[derive(thiserror::Error, Debug)]
pub enum DBError {
#[error(transparent)]
DeleteItemError(#[from] DBDeleteError),
#[error(transparent)]
GetItemError(#[from] DBGetError),
#[error(transparent)]
ItemFormatError(#[from] ItemError),
#[error(transparent)]
UpdateItemError(#[from] DBUpdateError),
}

#[derive(thiserror::Error, Debug)]
pub enum S3Error {
#[error("Invalid object")]
InvalidObjectError,
#[error(transparent)]
GetObjectError(#[from] S3GetObjectError),
#[error(transparent)]
ListObjectsError(#[from] S3ListObjectsError),
#[error(transparent)]
PutObjectError(#[from] S3PutObjectError),
#[error(transparent)]
DeleteObjectError(#[from] S3DeleteObjectError),
#[error(transparent)]
IoError(#[from] std::io::Error),
#[error(transparent)]
ByteStreamError(#[from] aws_smithy_types::byte_stream::error::Error),
}
Loading
Loading