Skip to content
This repository has been archived by the owner on Dec 6, 2024. It is now read-only.

Commit

Permalink
feat: tested + added const
Browse files Browse the repository at this point in the history
  • Loading branch information
taco-paco committed Sep 18, 2024
1 parent 6477dda commit a89ee2a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
24 changes: 12 additions & 12 deletions crates/worker/src/purgatory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::time::{interval, sleep};
use tokio::{sync::Mutex, task::JoinHandle};
use tracing::warn;
use types::item::{Item, ItemError, Status, TaskResult};
use uuid::{Uuid};
use uuid::Uuid;

use crate::clients::dynamodb_client::DynamoDBClient;
use crate::clients::s3_client::S3Client;
Expand All @@ -29,7 +29,12 @@ impl Purgatory {
pub fn new(db_client: DynamoDBClient, s3_client: S3Client) -> Self {
let handle = NonNull::dangling();
let this = Self {
inner: Arc::new(Mutex::new(Inner::new(handle, State::new(), db_client, s3_client))),
inner: Arc::new(Mutex::new(Inner::new(
handle,
State::new(),
db_client,
s3_client,
))),
};

{
Expand Down Expand Up @@ -110,8 +115,10 @@ impl Inner {
}

async fn global_state_purge(db_client: DynamoDBClient, s3_client: S3Client) {
const SYNC_FROM_OFFSET: Option<Duration> = PURGE_INTERVAL.checked_mul(6);

let mut global_state = GlobalState::new(db_client.clone(), s3_client.clone());
let sync_from = Utc::now() - 6 * PURGE_INTERVAL;
let sync_from = Utc::now() - SYNC_FROM_OFFSET.unwrap();

loop {
if global_state.sync(&sync_from).await.is_err() {
Expand All @@ -123,13 +130,7 @@ impl Inner {

let items: Vec<Item> = global_state.items.drain(..).collect();
for item in items {
Inner::purge_item(
&db_client,
&s3_client,
&item.id,
&item.status,
)
.await;
Inner::purge_item(&db_client, &s3_client, &item.id, &item.status).await;
}
}
}
Expand Down Expand Up @@ -245,8 +246,7 @@ impl GlobalState {
.into_iter()
.take(remaining_capacity)
.map(|raw_item| raw_item.try_into())
.collect::<Result<_, ItemError>>()
.unwrap();
.collect::<Result<_, ItemError>>()?;

self.items.append(&mut to_append);
if self.items.len() == MAX_CAPACITY {
Expand Down
4 changes: 2 additions & 2 deletions crates/worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use crate::commands::errors::PreparationError;
use crate::commands::utils::{
on_compilation_failed, on_compilation_success, prepare_compile_input,
};
use crate::purgatory::{Purgatory};
use crate::purgatory::Purgatory;
use crate::sqs_listener::{SqsListener, SqsReceiver};
use crate::utils::lib::{s3_compilation_files_dir};
use crate::utils::lib::s3_compilation_files_dir;

pub struct EngineBuilder {
sqs_client: SqsClientWrapper,
Expand Down

0 comments on commit a89ee2a

Please sign in to comment.