diff --git a/apps/nocapd/package.json b/apps/nocapd/package.json index fbe02f12..4520a282 100644 --- a/apps/nocapd/package.json +++ b/apps/nocapd/package.json @@ -10,7 +10,7 @@ "@nostrwatch/controlflow": "^0.2.1", "@nostrwatch/logger": "^0.0.6", "@nostrwatch/nocap": "^0.5.2", - "@nostrwatch/nocap-every-adapter-default": "1.3.0", + "@nostrwatch/nocap-every-adapter-default": "1.3.1", "@nostrwatch/nwcache": "^0.1.2", "@nostrwatch/publisher": "^0.4.3", "@nostrwatch/seed": "^0.0.2", diff --git a/apps/nocapd/src/classes/Worker.js b/apps/nocapd/src/classes/Worker.js index 19570b3a..181a19eb 100644 --- a/apps/nocapd/src/classes/Worker.js +++ b/apps/nocapd/src/classes/Worker.js @@ -11,41 +11,15 @@ import Publish from '@nostrwatch/publisher' import { Nocap } from "@nostrwatch/nocap" import nocapAdapters from "@nostrwatch/nocap-every-adapter-default" +const adaptersArray = Object.values(nocapAdapters) + export class NWWorker { $ rcache pubkey - cb = {} - processed = 1 - total = 0 - relayMeta = new Map() - cache_counts = {} - jobs = {} - hard_stop = false - - nocapOpts = { - timeout: this.timeout, - checked_by: this.pubkey - } - - jobOpts = { - removeOnComplete: false, - removeOnFail: { - age: timestring('10m', 's') - } - } - - timeout = { - open: 3000, - read: 3000, - write: 3000, - info: 2000, - dns: 1000, - geo: 1000, - ssl: 1000 - } + constructor(pubkey, $q, rcache, config){ this.pubkey = pubkey @@ -58,6 +32,37 @@ export class NWWorker { setup(){ this.setupConfig() + + this.cb = {} + this.processed = 1 + this.total = 0 + this.relayMeta = new Map() + this.cache_counts = {} + this.jobs = {} + this.hard_stop = false + + this.nocapOpts = { + timeout: this.timeout, + checked_by: this.pubkey + } + + this.jobOpts = { + removeOnComplete: false, + removeOnFail: { + age: timestring('10m', 's') + } + } + + this.timeout = { + open: 3000, + read: 3000, + write: 3000, + info: 2000, + dns: 1000, + geo: 1000, + ssl: 1000 + } + this.setupInstances() } @@ -78,8 +83,8 @@ export class NWWorker { } updateJobOpts(obj){ - this.jobOpts = { ...this.jobOpts, ...obj } - return this.jobOpts + const jobOpts = { ...this.jobOpts, ...obj } + return jobOpts } async populator(){ @@ -99,9 +104,8 @@ export class NWWorker { try { const { relay:url } = job.data const nocap = new Nocap(url, this.nocapOpts) - await nocap.useAdapters(Object.values(nocapAdapters)) + await nocap.useAdapters([...adaptersArray]) const result = await nocap.check(this.opts.checks.enabled).catch(failure) - // console.log(url, result) return { result } } catch(err) { @@ -111,18 +115,19 @@ export class NWWorker { } async on_error(job, err){ - if(this.hard_stop) return + // if(this.hard_stop) return this.log.debug(`on_error(): ${job.id}: ${err}`) await this.on_fail( job ) } async on_completed(job, rvalue){ - if(this.hard_stop) return + // if(this.hard_stop) return this.log.debug(`on_completed(): ${job.id}: ${JSON.stringify(rvalue)}`) const { result } = rvalue if(!result?.url) return console.error(`url was empty:`, job.id) let fail = result?.open?.data? false: true this.progressMessage(result.url, result, fail) + delete this.jobs[job.id] if(fail) await this.on_fail( result ) else @@ -131,7 +136,7 @@ export class NWWorker { } async on_success(result){ - if(this.hard_stop) return + // if(this.hard_stop) return this.log.debug(`on_success(): ${result.url}`) if(this.config?.publisher?.kinds?.includes(30066) ){ const publish30066 = new Publish.Kind30066() @@ -144,12 +149,12 @@ export class NWWorker { } async on_fail(result){ - if(this.hard_stop) return + // if(this.hard_stop) return this.log.debug(`on_fail(): ${result.url}`) } async after_completed(result, error=false){ - if(this.hard_stop) return + // if(this.hard_stop) return this.log.debug(`after_completed(): ${result.url}`) await this.updateRelayCache( { ...result } ) await this.retry.setRetries( result.url, !error ) @@ -157,7 +162,7 @@ export class NWWorker { } cbcall(...args){ - if(this.hard_stop) return + // if(this.hard_stop) return this.log.debug(`cbcall(): ${JSON.stringify(args)}`) const handler = [].shift.call(args) if(this?.[`on_${handler}`] && typeof this[`on_${handler}`] === 'function') @@ -196,7 +201,7 @@ export class NWWorker { async addRelayJobs(relays){ this.log.debug(`addRelayJobs(): for ${relays.length} relays`) await this.drainSmart() - relays.forEach( async (relay) => { + for(const relay of relays){ let job = this.jobs?.[this.jobId(relay)] if(job) { await job.remove() @@ -205,17 +210,18 @@ export class NWWorker { } job = await this.addRelayJob({ relay }) this.jobs[job.id] = job - }) + } const jobs = Object.values(this.jobs) - if(jobs.length > 0) await Promise.all(jobs) + if(jobs.length === 0) return + await Promise.allSettled(jobs) } async addRelayJob(jdata){ - this.log.debug(`Adding job for ${jdata.relay} with ${this.opts.checks.enabled} nocap checks: ${JSON.stringify(jdata)}`) + this.log.debug(`Adding job for ${jdata.relay} with ${this.opts.checks.enabled} nocap checks: ${JSON.stringify(jdata.relay)}`) const jobId = this.jobId(jdata.relay) const priority = this.getPriority(jdata.relay) - this.updateJobOpts({ priority }) - return this.$.queue.add( this.id(), jdata, { jobId, ...this.jobOpts}) + const jobOpts = this.updateJobOpts({ priority }) + return this.$.queue.add( this.id(), jdata, { jobId, ...jobOpts}) } calculateProgress() { @@ -383,13 +389,18 @@ export class NWWorker { this.log.debug(`getRelays() relay: ${relay.url}`) - this.log.debug(`getRelays() relay: ${relay.url}: lastChecked()`) + const lastChecked = await this.rcache.cachetime.get.one(this.cacheId(relay.url)); - this.log.debug(`getRelays() relay: ${relay.url}: retries()`) + this.log.debug(`getRelays() relay: ${relay.url}: lastChecked(): ${lastChecked}`) + const retries = await this.retry.getRetries(relay.url); - this.log.debug(`getRelays() relay: ${relay.url}: isExpired()`) + this.log.debug(`getRelays() relay: ${relay.url}: retries(): ${retries}`) + const isExpired = lastChecked? await this.isExpired(relay.url, lastChecked): true; + this.log.debug(`getRelays() relay: ${relay.url}: isExpired(): ${isExpired}`) + const isOnline = relay?.online === true; + this.log.debug(`getRelays() relay: ${relay.url}: isOnline(): ${isOnline}`) if(isOnline) onlineRelays.push(relay.url); diff --git a/apps/nocapd/src/daemon.js b/apps/nocapd/src/daemon.js index f25e3413..bf0813df 100644 --- a/apps/nocapd/src/daemon.js +++ b/apps/nocapd/src/daemon.js @@ -37,8 +37,8 @@ const populateQueue = async () => { const checkQueue = async () => { const counts = await $q.checker.counts() const enqueue = counts.prioritized + counts.active - if(enqueue > 0) return - log.debug(`drained: ${$q.queue.name}`) + if(enqueue > 0) return log.debug(`checkQueue(): ${$q.queue.name}: ${enqueue} events active`) + // log.debug(`drained: ${$q.queue.name}`) populateQueue() } @@ -51,6 +51,7 @@ const setIntervals = () => { const initWorker = async () => { const connection = RedisConnectionDetails() + log.info(`initWorker(): connecting to redis at`, connection) const concurrency = config?.nocapd?.bullmq?.worker?.concurrency? config.nocapd.bullmq.worker.concurrency: 1 const ncdq = NocapdQueue(`nocapd/${config?.monitor?.slug}` || null) $q = new NocapdQueues({ pubkey: PUBKEY, logger: new Logger('@nostrwatch/nocapd:queue-control'), redis: connection }) @@ -59,9 +60,9 @@ const initWorker = async () => { .set( 'events' , ncdq.$QueueEvents ) .set( 'checker', new NWWorker(PUBKEY, $q, rcache, {...config, logger: new Logger('@nostrwatch/nocapd:worker'), pubkey: PUBKEY }) ) .set( 'worker' , new BullMQ.Worker($q.queue.name, $q.route_work.bind($q), { concurrency, connection, ...queueOpts() } ) ) - // .drain() - // await $q.obliterate().catch(()=>{}) - + + await $q.queue.drain() + // await $q.queue.obliterate() await $q.checker.drainSmart() setIntervals() // $q.events.on('drained', populateQueue) @@ -81,17 +82,24 @@ const stop = async(signal) => { $q.worker.pause() log.info(`shutdown progress: $q.queue.pause()`) $q.queue.pause() + await rcache.$.close() log.info(`shutdown progress: $q.queue.drain()`) await $q.queue.drain() log.info(`shutdown progress: checking active jobs`) - const {active:numActive} = await $q.queue.getJobCounts('active') - if(numActive > 0) { - log.info(`shutdown progress: ${numActive} active jobs`) - await new Promise( resolve => { - $q.queue.on('drained', resolve) - }) - log.info(`shutdown progress: no more jobs`) - } + // const {active:numActive} = await $q.queue.getJobCounts('active') + // if(numActive > 0) { + // log.info(`shutdown progress: ${numActive} active jobs`) + // await new Promise( resolve => { + // const intVal = setInterval(async () => { + // const {active:numActive} = await $q.queue.getJobCounts('active') + // if(numActive === 0) { + // clearInterval(intVal) + // resolve() + // } + // }, 1000) + // }) + // log.info(`shutdown progress: no more jobs`) + // } log.info(`shutdown progress: $q.queue.obliterate()`) await $q.queue.obliterate() // if(signal !== 'EAI_AGAIN'){ @@ -160,7 +168,10 @@ const syncRelaysIn = async () => { const syncData = await bootstrap('nocapd') log.debug(`syncRelaysIn(): found ${syncData[0].length} *maybe new* relays`) const relays = syncData[0].map(r => { return { url: new URL(r).toString(), online: null, network: parseRelayNetwork(r), info: "", dns: "", geo: "", ssl: "" } }) - const persisted = await rcache.relay.batch.insertIfNotExists(relays) + log.debug(`syncRelaysIn(): Persisting ${relays.length} relays`, relays) + const persisted = await rcache.relay.batch.insertIfNotExists(relays).catch(console.error) + log.debug(`syncRelaysIn(): Persisted ${persisted} new relays`) + console.log('fucking relays', await this.rcache.relay.get.all().length) if(persisted.length === 0) return 0 log.info(chalk.yellow.bold(`Persisted ${persisted.length} new relays`)) return persisted diff --git a/internal/utils/index.js b/internal/utils/index.js index 384ff30c..0d1357db 100644 --- a/internal/utils/index.js +++ b/internal/utils/index.js @@ -29,7 +29,7 @@ export const RedisConnectionDetails = () => { const redis = {} Object.keys(process.env).forEach(key => { if(key.startsWith('REDIS_')) - redis[key.replace('REDIS_', '').toLowerCase()] = process.env[key] + redis[key.replace('REDIS_', '').toLowerCase()] = process.env[key] }) if(redis?.tls === "true") redis.tls = {} return redis diff --git a/libraries/nocap/adapters/default/WebsocketAdapterDefault/index.js b/libraries/nocap/adapters/default/WebsocketAdapterDefault/index.js index 9471cf16..79947725 100644 --- a/libraries/nocap/adapters/default/WebsocketAdapterDefault/index.js +++ b/libraries/nocap/adapters/default/WebsocketAdapterDefault/index.js @@ -18,7 +18,7 @@ class WebsocketAdapterDefault { * @returns promise */ async check_open(deferred){ - //this.$.logger.debug(`${this.$.url}: WebsocketAdapterDefault.check_open()`) + this.$.logger.debug(`${this.$.url}: WebsocketAdapterDefault.check_open()`) try { this.$.set('ws', new WebSocketNode(this.$.url)); this.bind_events(); @@ -36,7 +36,7 @@ class WebsocketAdapterDefault { * @returns promise */ async check_read() { - //this.$.logger.debug(`${this.$.url}: WebsocketAdapterDefault.check_read()`) + this.$.logger.debug(`${this.$.url}: WebsocketAdapterDefault.check_read()`) if (!this.$.isConnected()) { throw new Error('WebSocket is not connected'); } @@ -52,11 +52,11 @@ class WebsocketAdapterDefault { * @returns promise */ async check_write() { - //this.$.logger.debug(`${this.$.url}: WebsocketAdapterDefault.check_write()`) + this.$.logger.debug(`${this.$.url}: WebsocketAdapterDefault.check_write()`) if (!this.$.isConnected()) { throw new Error('WebSocket is not connected'); } - //this.$.logger.debug(`check_write()`); + this.$.logger.debug(`check_write()`); const ev = JSON.stringify(['EVENT', this.config?.event_sample || this.$.SAMPLE_EVENT]); this.$.ws.send(ev); } @@ -69,7 +69,7 @@ class WebsocketAdapterDefault { * @returns null */ bind_events(){ - //this.$.logger.debug(`${this.$.url}: WebsocketAdapterDefault.bind_events()`) + this.$.logger.debug(`${this.$.url}: WebsocketAdapterDefault.bind_events()`) try { this.$.ws.on('open', (e) => { this.$.on_open(e) @@ -86,7 +86,7 @@ class WebsocketAdapterDefault { }) } catch(e) { - //this.$.logger.warn(e) + this.$.logger.warn(e) } } @@ -97,14 +97,14 @@ class WebsocketAdapterDefault { * @returns null */ handle_nostr_event(buffer){ - //this.$.logger.debug(`${this.$.url}: WebsocketAdapterDefault.handle_nostr_event()`) + this.$.logger.debug(`${this.$.url}: WebsocketAdapterDefault.handle_nostr_event()`) let ev try{ ev = JSON.parse(buffer.toString()) } catch(e){ const err = `${this.$.url} is not NIP-01 compatible, responded with invalid JSON: ${e}` - //this.$.logger.err(err) + this.$.logger.err(err) this.$.auditor.fail('INVALID_JSON', { description: 'Relay responded to subscription with invalid JSON.', severity: 'high', @@ -114,11 +114,6 @@ class WebsocketAdapterDefault { return this.$.websocket_hard_fail(err) } if(!ev || !(ev instanceof Array) || !ev.length) return - // if(ev[0] === 'REQ') { - // for(let i = 0; i < 1000000; i++){ - // this.$.ws.send(`["CLOSE", "${this.$.subid('read')}"]`) - // } - // } if(ev[0] === 'EVENT') { if(this.count.event > 0) { if(this.count.event > this.$.config.tooManyEventsLimit) { @@ -131,7 +126,7 @@ class WebsocketAdapterDefault { this.$.handle_eose() } this.count.event++ - return //this.$.logger.debug(`${this.$.url}: sent too many events: ${this.count.event}`) + return this.$.logger.debug(`${this.$.url}: sent too many events: ${this.count.event}`) } this.count.event++ if(this.$.subid('read') === ev[1]) @@ -155,15 +150,14 @@ class WebsocketAdapterDefault { terminate(){ if(!this.$.isConnected()) return - //this.$.logger.debug('WebsocketAdapterDefault.terminate()') + this.$.logger.debug('WebsocketAdapterDefault.terminate()') this.$.ws.terminate() } close(){ if(!this.$.isConnected()) return - //this.$.logger.info('WebsocketAdapterDefault.close()') - //this.$.logger.debug('WebsocketAdapterDefault.close()') + this.$.logger.debug('WebsocketAdapterDefault.close()') this.$.ws.close() } /*