Skip to content

Commit

Permalink
fix: fix queue items not unique (#216)
Browse files Browse the repository at this point in the history
* fix: fix queues items uniqueness

* fix: fix queues items uniqueness
  • Loading branch information
wa0x6e authored Jan 8, 2024
1 parent 44de118 commit 9c2b0cb
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions src/lib/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { capture } from '@snapshot-labs/snapshot-sentry';
import Cache from './cache';
import { timeQueueProcess } from './metrics';

const queues = new Set<Cache>();
const queues = new Map<string, Cache>();
const processingItems = new Map<string, Cache>();

async function processItem(cacheable: Cache) {
Expand All @@ -17,13 +17,15 @@ async function processItem(cacheable: Cache) {
capture(e, { id: cacheable.id });
console.error(`[queue] Error while processing item`, e);
} finally {
queues.delete(cacheable);
queues.delete(cacheable.id);
processingItems.delete(cacheable.id);
}
}

export function queue(cacheable: Cache) {
queues.add(cacheable);
if (!queues.has(cacheable.id)) {
queues.set(cacheable.id, cacheable);
}

return queues.size;
}
Expand All @@ -43,12 +45,11 @@ export function getProgress(id: string) {
async function run() {
try {
console.log(`[queue] Poll queue (found ${queues.size} items)`);
queues.forEach(async cacheable => {
if (processingItems.has(cacheable.id)) {
queues.forEach(async (cacheable, id) => {
if (processingItems.has(id)) {
console.log(
`[queue] Skip: ${cacheable} is currently being processed, progress: ${processingItems.get(
cacheable.id
)?.generationProgress}%`
`[queue] Skip: ${cacheable} is currently being processed, progress:
${processingItems.get(id)?.generationProgress}%`
);
return;
}
Expand Down

0 comments on commit 9c2b0cb

Please sign in to comment.