From a89ee2af83420da547b682ed4da4ff79587b0377 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Thu, 12 Sep 2024 18:28:11 +0900 Subject: [PATCH] feat: tested + added const --- crates/worker/src/purgatory.rs | 24 ++++++++++++------------ crates/worker/src/worker.rs | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/crates/worker/src/purgatory.rs b/crates/worker/src/purgatory.rs index 47a5a3b..b6a8349 100644 --- a/crates/worker/src/purgatory.rs +++ b/crates/worker/src/purgatory.rs @@ -9,7 +9,7 @@ use tokio::time::{interval, sleep}; use tokio::{sync::Mutex, task::JoinHandle}; use tracing::warn; use types::item::{Item, ItemError, Status, TaskResult}; -use uuid::{Uuid}; +use uuid::Uuid; use crate::clients::dynamodb_client::DynamoDBClient; use crate::clients::s3_client::S3Client; @@ -29,7 +29,12 @@ impl Purgatory { pub fn new(db_client: DynamoDBClient, s3_client: S3Client) -> Self { let handle = NonNull::dangling(); let this = Self { - inner: Arc::new(Mutex::new(Inner::new(handle, State::new(), db_client, s3_client))), + inner: Arc::new(Mutex::new(Inner::new( + handle, + State::new(), + db_client, + s3_client, + ))), }; { @@ -110,8 +115,10 @@ impl Inner { } async fn global_state_purge(db_client: DynamoDBClient, s3_client: S3Client) { + const SYNC_FROM_OFFSET: Option = PURGE_INTERVAL.checked_mul(6); + let mut global_state = GlobalState::new(db_client.clone(), s3_client.clone()); - let sync_from = Utc::now() - 6 * PURGE_INTERVAL; + let sync_from = Utc::now() - SYNC_FROM_OFFSET.unwrap(); loop { if global_state.sync(&sync_from).await.is_err() { @@ -123,13 +130,7 @@ impl Inner { let items: Vec = global_state.items.drain(..).collect(); for item in items { - Inner::purge_item( - &db_client, - &s3_client, - &item.id, - &item.status, - ) - .await; + Inner::purge_item(&db_client, &s3_client, &item.id, &item.status).await; } } } @@ -245,8 +246,7 @@ impl GlobalState { .into_iter() .take(remaining_capacity) .map(|raw_item| raw_item.try_into()) - .collect::>() - .unwrap(); + .collect::>()?; self.items.append(&mut to_append); if self.items.len() == MAX_CAPACITY { diff --git a/crates/worker/src/worker.rs b/crates/worker/src/worker.rs index 6dfda3e..aac3822 100644 --- a/crates/worker/src/worker.rs +++ b/crates/worker/src/worker.rs @@ -12,9 +12,9 @@ use crate::commands::errors::PreparationError; use crate::commands::utils::{ on_compilation_failed, on_compilation_success, prepare_compile_input, }; -use crate::purgatory::{Purgatory}; +use crate::purgatory::Purgatory; use crate::sqs_listener::{SqsListener, SqsReceiver}; -use crate::utils::lib::{s3_compilation_files_dir}; +use crate::utils::lib::s3_compilation_files_dir; pub struct EngineBuilder { sqs_client: SqsClientWrapper,