Skip to content

Commit

Permalink
fml
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Jul 22, 2024
1 parent 20f911d commit dbe1e5f
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 81 deletions.
2 changes: 1 addition & 1 deletion apps/nocapd/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"@nostrwatch/controlflow": "^0.2.1",
"@nostrwatch/logger": "^0.0.6",
"@nostrwatch/nocap": "^0.5.2",
"@nostrwatch/nocap-every-adapter-default": "1.3.0",
"@nostrwatch/nocap-every-adapter-default": "1.3.1",
"@nostrwatch/nwcache": "^0.1.2",
"@nostrwatch/publisher": "^0.4.3",
"@nostrwatch/seed": "^0.0.2",
Expand Down
107 changes: 59 additions & 48 deletions apps/nocapd/src/classes/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,15 @@ import Publish from '@nostrwatch/publisher'
import { Nocap } from "@nostrwatch/nocap"
import nocapAdapters from "@nostrwatch/nocap-every-adapter-default"

const adaptersArray = Object.values(nocapAdapters)


export class NWWorker {

$
rcache
pubkey
cb = {}
processed = 1
total = 0
relayMeta = new Map()
cache_counts = {}
jobs = {}
hard_stop = false

nocapOpts = {
timeout: this.timeout,
checked_by: this.pubkey
}

jobOpts = {
removeOnComplete: false,
removeOnFail: {
age: timestring('10m', 's')
}
}

timeout = {
open: 3000,
read: 3000,
write: 3000,
info: 2000,
dns: 1000,
geo: 1000,
ssl: 1000
}


constructor(pubkey, $q, rcache, config){
this.pubkey = pubkey
Expand All @@ -58,6 +32,37 @@ export class NWWorker {

setup(){
this.setupConfig()

this.cb = {}
this.processed = 1
this.total = 0
this.relayMeta = new Map()
this.cache_counts = {}
this.jobs = {}
this.hard_stop = false

this.nocapOpts = {
timeout: this.timeout,
checked_by: this.pubkey
}

this.jobOpts = {
removeOnComplete: false,
removeOnFail: {
age: timestring('10m', 's')
}
}

this.timeout = {
open: 3000,
read: 3000,
write: 3000,
info: 2000,
dns: 1000,
geo: 1000,
ssl: 1000
}

this.setupInstances()
}

Expand All @@ -78,8 +83,8 @@ export class NWWorker {
}

updateJobOpts(obj){
this.jobOpts = { ...this.jobOpts, ...obj }
return this.jobOpts
const jobOpts = { ...this.jobOpts, ...obj }
return jobOpts
}

async populator(){
Expand All @@ -99,9 +104,8 @@ export class NWWorker {
try {
const { relay:url } = job.data
const nocap = new Nocap(url, this.nocapOpts)
await nocap.useAdapters(Object.values(nocapAdapters))
await nocap.useAdapters([...adaptersArray])
const result = await nocap.check(this.opts.checks.enabled).catch(failure)
// console.log(url, result)
return { result }
}
catch(err) {
Expand All @@ -111,18 +115,19 @@ export class NWWorker {
}

async on_error(job, err){
if(this.hard_stop) return
// if(this.hard_stop) return
this.log.debug(`on_error(): ${job.id}: ${err}`)
await this.on_fail( job )
}

async on_completed(job, rvalue){
if(this.hard_stop) return
// if(this.hard_stop) return
this.log.debug(`on_completed(): ${job.id}: ${JSON.stringify(rvalue)}`)
const { result } = rvalue
if(!result?.url) return console.error(`url was empty:`, job.id)
let fail = result?.open?.data? false: true
this.progressMessage(result.url, result, fail)
delete this.jobs[job.id]
if(fail)
await this.on_fail( result )
else
Expand All @@ -131,7 +136,7 @@ export class NWWorker {
}

async on_success(result){
if(this.hard_stop) return
// if(this.hard_stop) return
this.log.debug(`on_success(): ${result.url}`)
if(this.config?.publisher?.kinds?.includes(30066) ){
const publish30066 = new Publish.Kind30066()
Expand All @@ -144,20 +149,20 @@ export class NWWorker {
}

async on_fail(result){
if(this.hard_stop) return
// if(this.hard_stop) return
this.log.debug(`on_fail(): ${result.url}`)
}

async after_completed(result, error=false){
if(this.hard_stop) return
// if(this.hard_stop) return
this.log.debug(`after_completed(): ${result.url}`)
await this.updateRelayCache( { ...result } )
await this.retry.setRetries( result.url, !error )
await this.setLastChecked( result.url, Date.now() )
}

cbcall(...args){
if(this.hard_stop) return
// if(this.hard_stop) return
this.log.debug(`cbcall(): ${JSON.stringify(args)}`)
const handler = [].shift.call(args)
if(this?.[`on_${handler}`] && typeof this[`on_${handler}`] === 'function')
Expand Down Expand Up @@ -196,7 +201,7 @@ export class NWWorker {
async addRelayJobs(relays){
this.log.debug(`addRelayJobs(): for ${relays.length} relays`)
await this.drainSmart()
relays.forEach( async (relay) => {
for(const relay of relays){
let job = this.jobs?.[this.jobId(relay)]
if(job) {
await job.remove()
Expand All @@ -205,17 +210,18 @@ export class NWWorker {
}
job = await this.addRelayJob({ relay })
this.jobs[job.id] = job
})
}
const jobs = Object.values(this.jobs)
if(jobs.length > 0) await Promise.all(jobs)
if(jobs.length === 0) return
await Promise.allSettled(jobs)
}

async addRelayJob(jdata){
this.log.debug(`Adding job for ${jdata.relay} with ${this.opts.checks.enabled} nocap checks: ${JSON.stringify(jdata)}`)
this.log.debug(`Adding job for ${jdata.relay} with ${this.opts.checks.enabled} nocap checks: ${JSON.stringify(jdata.relay)}`)
const jobId = this.jobId(jdata.relay)
const priority = this.getPriority(jdata.relay)
this.updateJobOpts({ priority })
return this.$.queue.add( this.id(), jdata, { jobId, ...this.jobOpts})
const jobOpts = this.updateJobOpts({ priority })
return this.$.queue.add( this.id(), jdata, { jobId, ...jobOpts})
}

calculateProgress() {
Expand Down Expand Up @@ -383,13 +389,18 @@ export class NWWorker {

this.log.debug(`getRelays() relay: ${relay.url}`)

this.log.debug(`getRelays() relay: ${relay.url}: lastChecked()`)

const lastChecked = await this.rcache.cachetime.get.one(this.cacheId(relay.url));
this.log.debug(`getRelays() relay: ${relay.url}: retries()`)
this.log.debug(`getRelays() relay: ${relay.url}: lastChecked(): ${lastChecked}`)

const retries = await this.retry.getRetries(relay.url);
this.log.debug(`getRelays() relay: ${relay.url}: isExpired()`)
this.log.debug(`getRelays() relay: ${relay.url}: retries(): ${retries}`)

const isExpired = lastChecked? await this.isExpired(relay.url, lastChecked): true;
this.log.debug(`getRelays() relay: ${relay.url}: isExpired(): ${isExpired}`)

const isOnline = relay?.online === true;
this.log.debug(`getRelays() relay: ${relay.url}: isOnline(): ${isOnline}`)

if(isOnline) onlineRelays.push(relay.url);

Expand Down
39 changes: 25 additions & 14 deletions apps/nocapd/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ const populateQueue = async () => {
const checkQueue = async () => {
const counts = await $q.checker.counts()
const enqueue = counts.prioritized + counts.active
if(enqueue > 0) return
log.debug(`drained: ${$q.queue.name}`)
if(enqueue > 0) return log.debug(`checkQueue(): ${$q.queue.name}: ${enqueue} events active`)
// log.debug(`drained: ${$q.queue.name}`)
populateQueue()
}

Expand All @@ -51,6 +51,7 @@ const setIntervals = () => {

const initWorker = async () => {
const connection = RedisConnectionDetails()
log.info(`initWorker(): connecting to redis at`, connection)
const concurrency = config?.nocapd?.bullmq?.worker?.concurrency? config.nocapd.bullmq.worker.concurrency: 1
const ncdq = NocapdQueue(`nocapd/${config?.monitor?.slug}` || null)
$q = new NocapdQueues({ pubkey: PUBKEY, logger: new Logger('@nostrwatch/nocapd:queue-control'), redis: connection })
Expand All @@ -59,9 +60,9 @@ const initWorker = async () => {
.set( 'events' , ncdq.$QueueEvents )
.set( 'checker', new NWWorker(PUBKEY, $q, rcache, {...config, logger: new Logger('@nostrwatch/nocapd:worker'), pubkey: PUBKEY }) )
.set( 'worker' , new BullMQ.Worker($q.queue.name, $q.route_work.bind($q), { concurrency, connection, ...queueOpts() } ) )
// .drain()
// await $q.obliterate().catch(()=>{})


await $q.queue.drain()
// await $q.queue.obliterate()
await $q.checker.drainSmart()
setIntervals()
// $q.events.on('drained', populateQueue)
Expand All @@ -81,17 +82,24 @@ const stop = async(signal) => {
$q.worker.pause()
log.info(`shutdown progress: $q.queue.pause()`)
$q.queue.pause()
await rcache.$.close()
log.info(`shutdown progress: $q.queue.drain()`)
await $q.queue.drain()
log.info(`shutdown progress: checking active jobs`)
const {active:numActive} = await $q.queue.getJobCounts('active')
if(numActive > 0) {
log.info(`shutdown progress: ${numActive} active jobs`)
await new Promise( resolve => {
$q.queue.on('drained', resolve)
})
log.info(`shutdown progress: no more jobs`)
}
// const {active:numActive} = await $q.queue.getJobCounts('active')
// if(numActive > 0) {
// log.info(`shutdown progress: ${numActive} active jobs`)
// await new Promise( resolve => {
// const intVal = setInterval(async () => {
// const {active:numActive} = await $q.queue.getJobCounts('active')
// if(numActive === 0) {
// clearInterval(intVal)
// resolve()
// }
// }, 1000)
// })
// log.info(`shutdown progress: no more jobs`)
// }
log.info(`shutdown progress: $q.queue.obliterate()`)
await $q.queue.obliterate()
// if(signal !== 'EAI_AGAIN'){
Expand Down Expand Up @@ -160,7 +168,10 @@ const syncRelaysIn = async () => {
const syncData = await bootstrap('nocapd')
log.debug(`syncRelaysIn(): found ${syncData[0].length} *maybe new* relays`)
const relays = syncData[0].map(r => { return { url: new URL(r).toString(), online: null, network: parseRelayNetwork(r), info: "", dns: "", geo: "", ssl: "" } })
const persisted = await rcache.relay.batch.insertIfNotExists(relays)
log.debug(`syncRelaysIn(): Persisting ${relays.length} relays`, relays)
const persisted = await rcache.relay.batch.insertIfNotExists(relays).catch(console.error)
log.debug(`syncRelaysIn(): Persisted ${persisted} new relays`)
console.log('fucking relays', await this.rcache.relay.get.all().length)
if(persisted.length === 0) return 0
log.info(chalk.yellow.bold(`Persisted ${persisted.length} new relays`))
return persisted
Expand Down
2 changes: 1 addition & 1 deletion internal/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const RedisConnectionDetails = () => {
const redis = {}
Object.keys(process.env).forEach(key => {
if(key.startsWith('REDIS_'))
redis[key.replace('REDIS_', '').toLowerCase()] = process.env[key]
redis[key.replace('REDIS_', '').toLowerCase()] = process.env[key]
})
if(redis?.tls === "true") redis.tls = {}
return redis
Expand Down
Loading

0 comments on commit dbe1e5f

Please sign in to comment.