diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs index d91cbc2974f88..bba5394bc7533 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs @@ -33,7 +33,7 @@ use std::{ time::Duration, }; use tokio::sync::{Mutex, Notify}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use url::Url; pub mod config; @@ -64,7 +64,7 @@ struct CloudflareImageUploadResponse { result: Option, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct UploadQueue { asset_queue: BTreeSet, in_progress_assets: AHashSet, @@ -206,20 +206,26 @@ impl AssetUploaderThrottlerContext { let self_arc = Arc::new(self.clone()); loop { // Wait until notified if rate limited - if self.is_rate_limited.load(Ordering::Relaxed) { + while self.is_rate_limited.load(Ordering::Relaxed) { self.rate_limit_over_notify.notified().await; self.is_rate_limited.store(false, Ordering::Relaxed); } // Wait until notified if queue is empty - let is_empty = self.upload_queue.lock().await.asset_queue.is_empty(); - if is_empty { + while self.upload_queue.lock().await.asset_queue.is_empty() { self.inserted_notify.notified().await; } // Pop the first asset from the queue and add it to the in-progress set let mut upload_queue = self.upload_queue.lock().await; - let asset = upload_queue.asset_queue.pop_first().unwrap(); // Safe to unwrap because we checked if the queue is empty + // Should be safe to unwrap because we checked if the queue is empty, but log in case + let Some(asset) = upload_queue.asset_queue.pop_first() else { + warn!( + queue = ?upload_queue, + "Asset queue is empty, despite being notified" + ); + continue; + }; upload_queue.in_progress_assets.insert(asset.clone()); drop(upload_queue); @@ -323,18 +329,12 @@ impl AssetUploaderThrottlerContext { Ok(num_queued) } - async fn start_update_loop(&self) { + async fn start_update_loop(&self) -> anyhow::Result<()> { let poll_interval_seconds = Duration::from_secs(self.config.poll_interval_seconds); loop { - match self.update_queue().await { - Ok(num_queued) => { - if num_queued > 0 { - self.inserted_notify.notify_one(); - } - }, - Err(e) => { - error!(error = ?e, "[Asset Uploader Throttler] Error updating queue"); - }, + let num_queued = self.update_queue().await?; + if num_queued > 0 { + self.inserted_notify.notify_one(); } tokio::time::sleep(poll_interval_seconds).await; @@ -384,7 +384,8 @@ impl Server for AssetUploaderThrottlerContext { let self_arc_clone = self_arc.clone(); tokio::spawn(async move { - self_arc_clone.start_update_loop().await; + self_arc_clone.start_update_loop().await?; + anyhow::Ok(()) }); axum::Router::new()