From 286644954b15ac6ae9d98aad9d54c1cabef1e99f Mon Sep 17 00:00:00 2001 From: dskvr Date: Sat, 16 Mar 2024 23:30:32 +0100 Subject: [PATCH] fix referencing bug --- apps/nocapd/.dockerignore | 7 +- apps/nocapd/package.json | 7 +- apps/nocapd/src/classes/NocapdQueues.js | 16 ++- apps/nocapd/src/classes/Worker.js | 98 +++++++++++------- apps/nocapd/src/daemon.js | 116 ++++++++++++++++------ apps/nocapd/src/index.js | 3 + packages/controlflow/package.json | 2 +- packages/controlflow/src/retry.js | 21 ++-- packages/nwcache/package.json | 2 +- packages/nwcache/schemas.js | 2 + packages/publisher/package.json | 2 +- packages/publisher/src/kinds/Kind30066.js | 23 +++-- packages/utils/index.js | 18 ++++ packages/utils/package.json | 2 +- 14 files changed, 220 insertions(+), 99 deletions(-) diff --git a/apps/nocapd/.dockerignore b/apps/nocapd/.dockerignore index 05fbb4d8..b17d381d 100644 --- a/apps/nocapd/.dockerignore +++ b/apps/nocapd/.dockerignore @@ -1,3 +1,6 @@ node_modules -docker-compose.yaml -yarn.lock \ No newline at end of file +yarn.lock +.* +config.yaml +docker-compose.yaml +README.md \ No newline at end of file diff --git a/apps/nocapd/package.json b/apps/nocapd/package.json index e01fbbe4..d5619415 100644 --- a/apps/nocapd/package.json +++ b/apps/nocapd/package.json @@ -7,11 +7,11 @@ "dependencies": { "@nostr-fetch/adapter-nostr-tools": "0.14.1", "@nostrwatch/announce": "^0.1.0", - "@nostrwatch/controlflow": "^0.0.3", + "@nostrwatch/controlflow": "^0.1.0", "@nostrwatch/logger": "^0.0.4", "@nostrwatch/nocap": "^0.4.2", - "@nostrwatch/nwcache": "^0.0.2", - "@nostrwatch/publisher": "^0.3.2", + "@nostrwatch/nwcache": "^0.1.0", + "@nostrwatch/publisher": "^0.4.0", "@nostrwatch/seed": "^0.0.2", "@nostrwatch/utils": "^0.0.3", "bluebird": "3.7.2", @@ -20,6 +20,7 @@ "nostr-fetch": "0.14.1", "nostr-geotags": "^0.5.0", "object-mapper": "6.2.0", + "promise-deferred": "2.0.4", "timestring": "^7.0.0" }, "scripts": { diff --git a/apps/nocapd/src/classes/NocapdQueues.js b/apps/nocapd/src/classes/NocapdQueues.js index e50d671d..70b5e243 100644 --- a/apps/nocapd/src/classes/NocapdQueues.js +++ b/apps/nocapd/src/classes/NocapdQueues.js @@ -9,7 +9,7 @@ export class NocapdQueues { setup(opts){ this.pubkey = opts?.pubkey? opts.pubkey: null - this.log = this.opts?.logger? this.opts.logger: new Logger('nocap/$NocapdQueues') + this.log = this.opts?.logger? this.opts.logger: new Logger('nocap/queue-manager') this.cb = {} this.queue = null this.events = null @@ -64,6 +64,7 @@ export class NocapdQueues { on(event, handler){ this.cb[event] = handler.bind(this) + return this } cbcall(...args){ @@ -75,18 +76,23 @@ export class NocapdQueues { } async pause(){ - return await this.queue.pause() + await this.queue.pause() + return this } async resume(){ - return await this.queue.resume() + await this.queue.resume() + return this } async drain(){ - return await this.queue.drain() + await this.queue.drain() + return this + } async obliterate(){ - return await this.queue.obliterate() + await this.queue.obliterate() + return this } } \ No newline at end of file diff --git a/apps/nocapd/src/classes/Worker.js b/apps/nocapd/src/classes/Worker.js index 31e5363b..b4f550ba 100644 --- a/apps/nocapd/src/classes/Worker.js +++ b/apps/nocapd/src/classes/Worker.js @@ -24,6 +24,7 @@ export class NWWorker { this.setupConfig() this.setupNocapOpts() this.setupJobOpts() + this.setupInstances() } setupDefaultValues(){ @@ -33,11 +34,12 @@ export class NWWorker { this.relayMeta = new Map() this.jobOpts = {} this.nocapOpts = {} + this.hard_stop = false } setupConfig(){ this.opts = this.config.nocapd - this.retry = new RetryManager(`nocapd/${this.pubkey}`, this.opts?.retry) + this.checks = this.opts?.checks?.enabled.includes('all')? Nocap.checksSupported(): this.opts?.checks?.enabled this.checkOpts = this.opts?.checks?.options || {} this.timeout = this.setTimeout(this.checkOpts?.timeout) @@ -46,6 +48,11 @@ export class NWWorker { this.interval = this.checkOpts?.interval? timestring(this.checkOpts.interval, 'ms'): 60*1000 this.networks = this.opts?.networks? this.opts.networks: ['clearnet'] this.log = this.config?.logger? this.config.logger: new Logger('nocap/$NWWorker') + + } + + setupInstances(){ + this.retry = new RetryManager(`nocapd/${this.pubkey}`, this.opts?.retry, this.rcache) } setupNocapOpts(){ @@ -70,14 +77,16 @@ export class NWWorker { } async populator(){ - this.log.info(`${this.id()}:_populator(): populating queue`) - const relays = await this.getRelays() - this.log.info(relays.length) + this.log.info(`populator()`) await this.$.worker.pause() + this.log.debug(`populator(): worker paused`) + const relays = await this.getRelays() + this.log.debug(`populator(): adding ${relays.length} relays to queue`) await this.addRelayJobs(relays) - this.log.debug(`${this.id()}:_populator(): Added ${relays?.length} to queue`) - delay(1000) + this.log.debug(`populator(): added ${relays.length} relays to queue`) await this.$.worker.resume() + this.log.debug(`populator(): worker resumed`) + } async work(job){ @@ -101,44 +110,45 @@ export class NWWorker { const { result } = rvalue let fail = result?.open?.data? false: true if(fail) - await this.on_fail(result) + await this.on_fail( result ) else - await this.on_success(result) + await this.on_success( result ) await this.after_completed( result, fail ) } - async on_success(result){ - this.log.debug(`on_success(): ${result.url}`) - this.progressMessage(result.url, result) + async publish_30066(result){ if(this.config?.publisher?.kinds?.includes(30066) ){ const publish30066 = new Publish.Kind30066() await publish30066.one( result ) } + } + + async publish_30166(result){ if(this.config?.publisher?.kinds?.includes(30166) ){ const publish30166 = new Publish.Kind30166() await publish30166.one( result ) } } + async on_success(result){ + this.log.debug(`on_success(): ${result.url}`) + await this.publish_30066(result) + await this.publish_30166(result) + } + async on_fail(result){ this.log.debug(`on_fail(): ${result.url}`) - this.progressMessage(result.url, null, true) } async after_completed(result, error=false){ this.log.debug(`after_completed(): ${result.url}`) - this.processed++ await this.updateRelayCache( { ...result } ) - await delay(200) + await delay(100) await this.retry.setRetries( result.url, !error ) - await delay(200) + await delay(100) await this.setLastChecked( result.url, Date.now() ) - } - async on_drained(){ - //this.log.debug(`on_drained()`) - this.total = 0 - this.processed = 0 + this.progressMessage(result.url, result, error) } cbcall(...args){ @@ -151,12 +161,19 @@ export class NWWorker { } async addRelayJobs(relays){ - this.log.debug(`Relay Counts: ${JSON.stringify(await this.counts())}`) + this.log.debug(`addRelayJobs(): for ${relays.length} relays`) + const promises = [] for await ( const relay of relays ){ - const $job = await this.addRelayJob({ relay }) - if($job?.id) - this.total++ + promises.push(this.addRelayJob({ relay })) } + await Promise.all(promises) + } + + async resetProgressCounts(){ + const c = await this.counts() + this.total = c.prioritized + this.processed = 0 + this.log.debug(`total jobs: ${this.total}`) } async addRelayJob(jdata){ @@ -180,7 +197,7 @@ export class NWWorker { const mute = chalk.gray let progress = '' progress += `[${chalk.bgBlack(this.calculateProgress())}] ` - progress += `${mute(this.processed)}/${mute(this.total)} ` + progress += `${mute(this.processed++)}/${mute(this.total)} ` progress += `${url}: ` if(this.checks.includes('open')) progress += `${result?.open?.data === true? success("online"): failure("offline")} ` @@ -197,9 +214,14 @@ export class NWWorker { if(this.checks.includes('info')) progress += `${Object.keys(result?.info?.data || {}).length? success("info"): failure("info")} ` - progress += `${(result?.open?.duration+result?.read?.duration+result?.write?.duration)/1000} seconds ` - progress += `${error? chalk.gray.italic('error'): ''} ` - progress += `[${error? await this.retry.getRetries(url) + " retries": ""}]` + if(!error){ + progress += `${(result?.open?.duration+result?.read?.duration+result?.write?.duration)/1000} seconds ` + } + + if(error) { + progress += `${error? chalk.gray.italic('error'): ''} ` + progress += `[${await this.retry.getRetries(url)} retries}]` + } this.log.info(progress) } @@ -216,7 +238,6 @@ export class NWWorker { hasChanged(data1, data2){ const changed = hash(data1) !== hash(data2) - //this.log.debug(`hasChanged: ${changed}`) return changed } @@ -290,29 +311,31 @@ export class NWWorker { const promises = new Array() let record = new Object() + record.url = url + if(result?.open?.data) record.online = result.open.data + for( const key of ['info', 'dns', 'geo', 'ssl'] ){ const resultHasKey = result?.[key]?.data && Object.keys(result[key].data)?.length > 0 if(resultHasKey){ const persist_result = async (resolve, reject) => { - const _record = { url: url, relay_id, updated_at: Date.now(), hash: hash(result[key].data), data: result[key].data } + const _record = { url: url, relay_id, updated_at: Date.now(), hash: hash(result[key].data), data: JSON.stringify(result[key].data) } const _check_id = await this.rcache.check[key].insert(_record) if(!_check_id) reject() - record = {...record, ...{ [key]: _check_id }} + record[key] = _check_id resolve() } promises.push( new Promise( persist_result ) ) } } await Promise.allSettled(promises) - record.url = url - if(result?.open?.data) record.online = result.open.data await delay(100) const $id = await this.rcache.relay.patch(record) return $id } async getRelays() { + this.log.debug(`getRelays()`) const allRelays = await this.rcache.relay.get.all(); const onlineRelays = []; const uncheckedRelays = []; @@ -321,8 +344,13 @@ export class NWWorker { this.relayMeta = new Map() for (const relay of allRelays) { + this.log.debug(`getRelays() relay: ${relay.url}`) + + this.log.debug(`getRelays() relay: ${relay.url}: lastChecked()`) const lastChecked = await this.rcache.cachetime.get.one(this.cacheId(relay.url)); + this.log.debug(`getRelays() relay: ${relay.url}: retries()`) const retries = await this.retry.getRetries(relay.url); + this.log.debug(`getRelays() relay: ${relay.url}: isExpired()`) const isExpired = await this.isExpired(relay.url, lastChecked); const isOnline = relay?.online === true; @@ -337,7 +365,6 @@ export class NWWorker { expiredRelays.push({ url: relay.url, lastChecked, retries }); group = 'expired'; } - this.relayMeta.set(relay.url, { group, retries: retries > 0 ? retries : undefined }); } @@ -355,7 +382,6 @@ export class NWWorker { return relaysFiltered.slice(0, truncateLength); } - get_truncate_length(relays){ let length = relays.length @@ -374,7 +400,7 @@ export class NWWorker { async isExpired(url, lastChecked) { let retries = await this.retry.getRetries(url); retries = retries === null? 0: retries - const expiry = retries > 0 ? await this.retry.getExpiry(url) : this.expires; + const expiry = retries > 0 ? this.retry.getExpiry(url) : this.expires; return lastChecked < Date.now() - expiry; } diff --git a/apps/nocapd/src/daemon.js b/apps/nocapd/src/daemon.js index 0993dd8d..ffe90230 100644 --- a/apps/nocapd/src/daemon.js +++ b/apps/nocapd/src/daemon.js @@ -1,4 +1,5 @@ import schedule from 'node-schedule' +import Deferred from 'promise-deferred' import timestring from 'timestring' import chalk from 'chalk' @@ -21,6 +22,48 @@ let rcache let config let $q +const populateQueue = async () => { + log.info(`drained: ${$q.queue.name}`) + await $q.checker.populator() + await delay(2000) + await $q.checker.resetProgressCounts() +} + +const initWorker = async () => { + const connection = RedisConnectionDetails() + const concurrency = config?.nocapd?.bullmq?.worker?.concurrency? config.nocapd.bullmq.worker.concurrency: 1 + const ncdq = NocapdQueue(`nocapd/${config?.monitor?.slug}` || null) + $q = new NocapdQueues({ pubkey: PUBKEY, logger: new Logger('@nostrwatch/nocapd:queue-control'), redis: connection }) + await $q + .set( 'queue' , ncdq.$Queue ) + .set( 'events' , ncdq.$QueueEvents ) + .set( 'checker', new NWWorker(PUBKEY, $q, rcache, {...config, logger: new Logger('@nostrwatch/nocapd:worker'), pubkey: PUBKEY }) ) + .set( 'worker' , new BullMQ.Worker($q.queue.name, $q.route_work.bind($q), { concurrency, connection, ...queueOpts() } ) ) + .drain() + await $q.obliterate().catch(()=>{}) + setInterval( syncRelaysIn, timestring(config?.nocapd?.seed?.options?.events?.interval, "ms") || timestring("1h", "ms")) + $q.events.on('drained', populateQueue) + await populateQueue() + $q.resume() + log.info(`initialized: ${$q.queue.name}`) + return $q +} + +const stop = async() => { + log.info(`Gracefully shutting down...`) + $q.worker.hard_stop = true + + log.debug(`shutdown progress: $q.worker.pause()`) + await $q.worker.pause() + // log.debug(`shutdown progress: $q.worker.stop()`) + // await $q.worker.stop() + log.debug(`shutdown progress: $q.queue.drain()`) + await $q.queue.drain() + log.debug(`shutdown progress: await rcache.$.close()`) + await rcache.$.close() + log.debug(`shutdown progress: complete!`) +} + const maybeAnnounce = async () => { log.info(`maybeAnnounce()`) const map = { @@ -64,8 +107,10 @@ const scheduleSyncRelays = () =>{ const syncRelaysIn = async () => { log.debug(`syncRelaysIn()`) const syncData = await bootstrap('nocapd') + log.debug(`syncRelaysIn(): synced ${syncData[0].length} relays`) const relays = syncData[0].map(r => { return { url: r, online: null, network: parseRelayNetwork(r), info: "", dns: "", geo: "", ssl: "" } }) const persisted = await rcache.relay.batch.insertIfNotExists(relays) + log.debug(`syncRelaysIn(): persisted ${persisted.length} relays`) log.info(`synced ${persisted.length} relays`) return persisted } @@ -76,28 +121,6 @@ const queueOpts = () => { } } -const initWorker = async () => { - const connection = RedisConnectionDetails() - const concurrency = config?.nocapd?.bullmq?.worker?.concurrency? config.nocapd.bullmq.worker.concurrency: 1 - const ncdq = NocapdQueue(`nocapd/${config?.monitor?.slug}` || null) - $q = new NocapdQueues({ pubkey: PUBKEY, logger: new Logger('@nostrwatch/nocapd:queue-control'), redis: connection }) - $q - .set( 'queue' , ncdq.$Queue ) - .set( 'events' , ncdq.$QueueEvents ) - .set( 'checker', new NWWorker(PUBKEY, $q, rcache, {...config, logger: new Logger('@nostrwatch/nocapd:worker'), pubkey: PUBKEY }) ) - .set( 'worker' , new BullMQ.Worker($q.queue.name, $q.route_work.bind($q), { concurrency, connection, ...queueOpts() } ) ) - await $q.checker.populator() - - schedulePopulator($q.checker) - scheduleSyncRelays() - - await $q.resume() - - log.info(`@nostrwatch/nocapd initialized: ${$q.queue.name}`) - - return $q -} - const maybeBootstrap = async () => { log.info(`Bootstrapping...`) if(rcache.relay.count.all() === 0){ @@ -122,23 +145,58 @@ dP dP \`88888P' \`88888P' \`88888P8 88Y888P' \`88888P8 `)) } -const stop = async() => { - log.info(`Gracefully shutting down...`) - await $q.worker.stop() - await rcache.$.close() -} +// export const Nocapd = async () => { +// const lmdbConnected = new Deferred() +// const lmdbRetry = async (e) => { +// log.warn(`lmdb is in a mood, retrying in 2 seconds. Here's its' error btw: ${e}`) +// await delay(2000) +// await lmdbConnect() +// } + +// const lmdbConnect = async () => { +// await relaycache(process.env.NWCACHE_PATH || './.lmdb') +// .then( lmdbConnected.resolve ) +// .catch( lmdbRetry ) +// } + +// const init = async () =>{ +// header() +// config = await loadConfig().catch( (err) => { log.err(err); process.exit() } ) +// await delay(2000) +// lmdbConnect() +// rcache = await lmdbConnected.promise +// await delay(1000) +// await maybeAnnounce() +// await maybeBootstrap() +// $q = await initWorker() +// return { +// $q, +// stop +// } +// } + +// try { +// init() +// } +// catch(e){ +// await delay(2000) +// init() +// } + +// } export const Nocapd = async () => { header() config = await loadConfig().catch( (err) => { log.err(err); process.exit() } ) await delay(2000) rcache = relaycache(process.env.NWCACHE_PATH || './.lmdb') + await delay(1000) await maybeAnnounce() await maybeBootstrap() $q = await initWorker() return { $q, stop - } -} \ No newline at end of file + } +} diff --git a/apps/nocapd/src/index.js b/apps/nocapd/src/index.js index 38bb6271..56ea7179 100644 --- a/apps/nocapd/src/index.js +++ b/apps/nocapd/src/index.js @@ -1,5 +1,8 @@ import { Nocapd as daemon } from './daemon.js'; import _bluebird from 'bluebird' +// import Segfault from 'segfault' + +// Segfault.registerHandler("segfaults.log"); const run = async () => { await tracePromises() diff --git a/packages/controlflow/package.json b/packages/controlflow/package.json index 384f3ea7..15161815 100644 --- a/packages/controlflow/package.json +++ b/packages/controlflow/package.json @@ -1,6 +1,6 @@ { "name": "@nostrwatch/controlflow", - "version": "0.0.4", + "version": "0.1.0", "description": "Provides exports for application control flow", "main": "index.js", "type": "module", diff --git a/packages/controlflow/src/retry.js b/packages/controlflow/src/retry.js index e3abc4c7..868ddeaf 100644 --- a/packages/controlflow/src/retry.js +++ b/packages/controlflow/src/retry.js @@ -1,17 +1,14 @@ import relaycache from '@nostrwatch/nwcache' import { capitalize, loadConfig } from "@nostrwatch/utils" -let rcache - export class RetryManager { - constructor(caller, config) { - rcache = relaycache(process.env.NWCACHE_PATH) + constructor(caller, config, rcache) { if(!caller) throw new Error('caller is required') - // if(!action) throw new Error('action is required') this.caller = caller this.config = config || {} this.retries = [] + this.rcache = rcache } cacheId(url){ @@ -33,24 +30,24 @@ export class RetryManager { ]; const found = map.find(entry => retries <= entry.max); return found ? found.delay : map[map.length - 1].delay; - }; + } - async getRetries( url ){ - return await rcache.retry.get(this.cacheId(url)) + getRetries( url ){ + return this.rcache.retry.get(this.cacheId(url)) } - async getExpiry( url ){ - return this.expiry( await this.getRetries(url) ) + getExpiry( url ){ + return this.expiry( this.getRetries(url) ) } async setRetries( url, success ){ let id if(success) { this.log?.debug(`${url} did not require a retry`) - id = await rcache.retry.set(this.cacheId(url), 0) + id = await this.rcache.retry.set(this.cacheId(url), 0) } else { this.log?.debug(`${url} required a retry`) - id = await rcache.retry.increment(this.cacheId(url)) + id = await this.rcache.retry.increment(this.cacheId(url)) } return id } diff --git a/packages/nwcache/package.json b/packages/nwcache/package.json index cb403a1d..0e88ad6b 100644 --- a/packages/nwcache/package.json +++ b/packages/nwcache/package.json @@ -1,6 +1,6 @@ { "name": "@nostrwatch/nwcache", - "version": "0.0.2", + "version": "0.1.0", "type": "module", "main": "index.js", "license": "MIT", diff --git a/packages/nwcache/schemas.js b/packages/nwcache/schemas.js index b392639d..384bc266 100644 --- a/packages/nwcache/schemas.js +++ b/packages/nwcache/schemas.js @@ -73,6 +73,8 @@ export const defineSchemas = ($db) => { $db.defineSchema(RelayCheckGeo); $db.defineSchema(RelayCheckSsl); + $db.defineSchema(Retry); + //app meta $db.defineSchema(CacheTime); return $db diff --git a/packages/publisher/package.json b/packages/publisher/package.json index a1e73742..b15c18f5 100644 --- a/packages/publisher/package.json +++ b/packages/publisher/package.json @@ -1,6 +1,6 @@ { "name": "@nostrwatch/publisher", - "version": "0.3.2", + "version": "0.4.0", "type": "module", "description": "Library for publishing nostr.watch relay status and publisher registration events", "main": "index.js", diff --git a/packages/publisher/src/kinds/Kind30066.js b/packages/publisher/src/kinds/Kind30066.js index 99a1e1ca..4fd5dceb 100644 --- a/packages/publisher/src/kinds/Kind30066.js +++ b/packages/publisher/src/kinds/Kind30066.js @@ -135,19 +135,16 @@ export class Kind30066 extends PublisherNocap { if(val instanceof Number || isFloat(val)){ tags.push( [ 'geo', prop, val.toString() ] ) } - } - - + } } - if(isSsl){ const sslIgnore = ['valid', 'days_remaining'] - data.ssl.data = transformSslResult(data.ssl.data) - for(const prop in data.ssl.data){ + const sslData = transformSslResult(data.ssl.data) + for(const prop in sslData){ if(sslIgnore.includes(prop)) continue - const val = data.ssl.data[prop] + const val = sslData[prop] if(val instanceof Array){ tags.push( [ 'ssl', prop, ...val ] ) @@ -202,7 +199,17 @@ const transformGeoResult = geo => { const transformSslResult = ssl => { const map = { - "publisher.kinds": "kinds", + "modulus": "modulus", + "subjectaltname": "subject_alt_name", + "exponent": "exponent", + "valid_from": "valid_from", + "valid_to": "valid_to", + "fingerprint": "fingerprint", + "fingerprint256": "fingerprint256", + "fingerprint512": "fingerprint512", + "ext_key_usage": "ext_key_usage", + "serialNumber": "serial_number", + "pemEncoded": "pem_encoded" } return mapper(ssl, map) } diff --git a/packages/utils/index.js b/packages/utils/index.js index d18e2019..9a9e4028 100644 --- a/packages/utils/index.js +++ b/packages/utils/index.js @@ -63,3 +63,21 @@ export const msToCronTime = (milliseconds) => { export const capitalize = (str) => { return str.charAt(0).toUpperCase()+str.slice(1); } + + +export const isObject = (item) => { + return (item && typeof item === 'object' && !Array.isArray(item)); +} + +export const deepCopy = (obj) => { + if (isObject(obj)) { + const copy = {}; + Object.keys(obj).forEach((key) => { + copy[key] = deepCopy(obj[key]); + }); + return copy; + } else if (Array.isArray(obj)) { + return obj.map((item) => deepCopy(item)); + } + return obj; +} \ No newline at end of file diff --git a/packages/utils/package.json b/packages/utils/package.json index 1129071a..88c473cc 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -1,6 +1,6 @@ { "name": "@nostrwatch/utils", - "version": "0.0.3", + "version": "0.1.0", "type": "module", "main": "index.js", "license": "MIT",