Skip to content

Commit

Permalink
remove queue hanging bug
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Jul 23, 2024
1 parent a14ec88 commit 0fd0f54
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
21 changes: 9 additions & 12 deletions apps/nocapd/src/classes/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,8 @@ export class NWWorker {

async populator(){
this.log.debug(`populator()`)
await this.$.worker.pause()
const relays = await this.getRelays()
await this.addRelayJobs(relays)
return async () => await this.$.worker.resume()
}

async work(job){
Expand Down Expand Up @@ -386,7 +384,6 @@ export class NWWorker {

this.log.debug(`getRelays() relay: ${relay.url}`)


const lastChecked = await this.rcache.cachetime.get.one(this.cacheId(relay.url));
this.log.debug(`getRelays() relay: ${relay.url}: lastChecked(): ${lastChecked}`)

Expand Down Expand Up @@ -443,15 +440,6 @@ export class NWWorker {
this.log.info(chalk.blue.bold(cacheMessage));
}

get_truncate_length(relays){
let length = relays.length
if(typeof this.opts?.checks?.options?.max === 'number')
length = this.opts.checks.options.max
if(typeof this.opts?.checks?.options?.max === 'string' )
length = evaluateMaxRelays(this.opts.checks.options.max, relays)
return length < relays.length? length: relays.length
}

qualifyNetwork(url){
const network = parseRelayNetwork(url)
return this.networks.includes(network)
Expand All @@ -464,6 +452,15 @@ export class NWWorker {
return lastChecked < Date.now() - expiry;
}

get_truncate_length(relays){
let length = relays.length
if(typeof this.opts?.checks?.options?.max === 'number')
length = this.opts.checks.options.max
if(typeof this.opts?.checks?.options?.max === 'string' )
length = evaluateMaxRelays(this.opts.checks.options.max, relays)
return length < relays.length? length: relays.length
}

}

const evaluateMaxRelays = (evaluate, relays) => {
Expand Down
8 changes: 3 additions & 5 deletions apps/nocapd/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,14 @@ let config
let $q

const populateQueue = async () => {
const resume = await $q.checker.populator()
await delay(2000)
await $q.checker.populator()
await $q.checker.resetProgressCounts()
resume()
// lastPopulate = Date.now()
}

const checkQueue = async () => {
const counts = await $q.checker.counts()
const enqueue = counts.prioritized + counts.active
if(enqueue > 0) return log.debug(`checkQueue(): ${$q.queue.name}: ${enqueue} events active`)
// log.debug(`drained: ${$q.queue.name}`)
populateQueue()
}

Expand All @@ -62,6 +58,8 @@ const initWorker = async () => {
.set( 'checker', new NWWorker(PUBKEY, $q, rcache, {...config, logger: new Logger('@nostrwatch/nocapd:worker'), pubkey: PUBKEY }) )
.set( 'worker' , new BullMQ.Worker($q.queue.name, $q.route_work.bind($q), { concurrency, connection, ...queueOpts() } ) )


$q.queue.on('drained', () => { log.info(`queue was fucking drained.`) })
await $q.checker.drainSmart()
setIntervals()
await populateQueue()
Expand Down

0 comments on commit 0fd0f54

Please sign in to comment.