diff --git a/src/lib/queue.ts b/src/lib/queue.ts index 73cd1df..f154eda 100644 --- a/src/lib/queue.ts +++ b/src/lib/queue.ts @@ -3,7 +3,7 @@ import { capture } from '@snapshot-labs/snapshot-sentry'; import Cache from './cache'; import { timeQueueProcess } from './metrics'; -const queues = new Set(); +const queues = new Map(); const processingItems = new Map(); async function processItem(cacheable: Cache) { @@ -17,13 +17,15 @@ async function processItem(cacheable: Cache) { capture(e, { id: cacheable.id }); console.error(`[queue] Error while processing item`, e); } finally { - queues.delete(cacheable); + queues.delete(cacheable.id); processingItems.delete(cacheable.id); } } export function queue(cacheable: Cache) { - queues.add(cacheable); + if (!queues.has(cacheable.id)) { + queues.set(cacheable.id, cacheable); + } return queues.size; } @@ -43,12 +45,11 @@ export function getProgress(id: string) { async function run() { try { console.log(`[queue] Poll queue (found ${queues.size} items)`); - queues.forEach(async cacheable => { - if (processingItems.has(cacheable.id)) { + queues.forEach(async (cacheable, id) => { + if (processingItems.has(id)) { console.log( - `[queue] Skip: ${cacheable} is currently being processed, progress: ${processingItems.get( - cacheable.id - )?.generationProgress}%` + `[queue] Skip: ${cacheable} is currently being processed, progress: + ${processingItems.get(id)?.generationProgress}%` ); return; }