Skip to content

Commit

Permalink
remove completed from queue
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Mar 17, 2024
1 parent ef9953b commit 9f23fbc
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
2 changes: 1 addition & 1 deletion apps/nocapd/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ RUN mkdir /app
WORKDIR /app
COPY . .
RUN yarn install
CMD yarn launch
CMD node src/index.js
12 changes: 7 additions & 5 deletions apps/nocapd/src/classes/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@ export class NWWorker {

setupJobOpts(){
this.jobOpts ={
removeOnComplete: {
age: Math.round(this.expires/1000),
},
removeOnFail: 11,
removeOnComplete: true,
removeOnFail: true
}
}

Expand Down Expand Up @@ -347,6 +345,8 @@ export class NWWorker {
this.relayMeta = new Map()

for (const relay of allRelays) {
if(!this.qualifyNetwork(relay.url)) continue

this.log.debug(`getRelays() relay: ${relay.url}`)

this.log.debug(`getRelays() relay: ${relay.url}: lastChecked()`)
Expand Down Expand Up @@ -375,9 +375,11 @@ export class NWWorker {

this.log.info(`online: ${await this.rcache.relay.get.online()?.length}, \
online & expired: ${onlineRelays.length}, \
expired: ${expiredRelays.length}, \
expired: ${expiredRelays.filter(this.qualifyNetwork.bind(this)).length}, \
unchecked: ${uncheckedRelays.filter(this.qualifyNetwork.bind(this)).length}, \
total: ${allRelays.length}`);

console.log('!!! expired relays', expiredRelays)

const deduped = [...new Set([...onlineRelays, ...uncheckedRelays, ...expiredRelays])];
const relaysFiltered = deduped.filter(this.qualifyNetwork.bind(this));
Expand Down
10 changes: 8 additions & 2 deletions apps/nocapd/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const log = new Logger('@nostrwatch/nocapd')
let rcache
let config
let $q
let intervalPopulate
let intervalSyncRelays

const populateQueue = async () => {
log.info(`drained: ${$q.queue.name}`)
Expand All @@ -37,6 +39,11 @@ const checkQueue = async () => {
await populateQueue()
}

const setIntervals = () => {
intervalSyncRelays = setInterval( syncRelaysIn, timestring(config?.nocapd?.seed?.options?.events?.interval, "ms") || timestring("1h", "ms"))
intervalPopulate = setInterval( checkQueue, timestring( "1m", "ms" ))
}

const initWorker = async () => {
const connection = RedisConnectionDetails()
const concurrency = config?.nocapd?.bullmq?.worker?.concurrency? config.nocapd.bullmq.worker.concurrency: 1
Expand All @@ -49,8 +56,7 @@ const initWorker = async () => {
.set( 'worker' , new BullMQ.Worker($q.queue.name, $q.route_work.bind($q), { concurrency, connection, ...queueOpts() } ) )
.drain()
await $q.obliterate().catch(()=>{})
setInterval( syncRelaysIn, timestring(config?.nocapd?.seed?.options?.events?.interval, "ms") || timestring("1h", "ms"))
setInterval( checkQueue, timestring( "1m", "ms" ))
setIntervals()
// $q.events.on('drained', populateQueue)
await populateQueue()
$q.resume()
Expand Down

0 comments on commit 9f23fbc

Please sign in to comment.