Skip to content

Commit

Permalink
Merge pull request #676 from sandwichfarm/next-kit
Browse files Browse the repository at this point in the history
fix nocap, fix nocapd, reorg, add kit
  • Loading branch information
dskvr authored Jul 22, 2024
2 parents a965177 + 4bdb9a4 commit dd9c759
Show file tree
Hide file tree
Showing 366 changed files with 37,204 additions and 255 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.turbo
**/node_modules
*.wasm
**/dist
Expand All @@ -19,4 +20,5 @@ packages/kinds
.ansible
.envs
.configs
.docker
.docker
.sveltekit
5 changes: 3 additions & 2 deletions apps/nocapd/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
"@nostrwatch/controlflow": "^0.1.0",
"@nostrwatch/logger": "^0.0.4",
"@nostrwatch/nocap": "^0.4.2",
"@nostrwatch/nocap-every-adapter-default": "1.2.2",
"@nostrwatch/nwcache": "^0.1.2",
"@nostrwatch/publisher": "^0.4.1",
"@nostrwatch/publisher": "^0.4.3",
"@nostrwatch/seed": "^0.0.2",
"@nostrwatch/utils": "^0.0.3",
"@nostrwatch/utils": "0.1.2",
"bluebird": "3.7.2",
"chalk": "5.3.0",
"ngeohash": "^0.6.3",
Expand Down
21 changes: 17 additions & 4 deletions apps/nocapd/src/classes/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import chalk from 'chalk';

import { RetryManager } from '@nostrwatch/controlflow'
import Logger from '@nostrwatch/logger'
import { Nocap } from '@nostrwatch/nocap'

import { parseRelayNetwork, delay, lastCheckedId } from '@nostrwatch/utils'
import Publish from '@nostrwatch/publisher'

import { Nocap } from "@nostrwatch/nocap"
import nocapAdapters from "@nostrwatch/nocap-every-adapter-default"


export class NWWorker {

constructor(pubkey, $q, rcache, config){
Expand Down Expand Up @@ -91,11 +95,13 @@ export class NWWorker {

async work(job){
this.log.debug(`${this.id()}: work(): ${job.id} checking ${job.data?.relay} for ${this.opts?.checks?.enabled || "unknown checks"}`)
const failure = (err) => { this.log.debug(`Could not run ${this.pubkey} check for ${job.data.relay}: ${err.message}`) }
const failure = (err) => { this.log.err(`Could not run ${this.pubkey} check for ${job.data.relay}: ${err.message}`) }
try {
const { relay:url } = job.data
const nocap = new Nocap(url, this.nocapOpts)
await nocap.useAdapters(Object.values(nocapAdapters))
const result = await nocap.check(this.opts.checks.enabled).catch(failure)
// console.log(url, result)
return { result }
}
catch(err) {
Expand All @@ -105,13 +111,16 @@ export class NWWorker {
}

async on_error(job, err){
if(this.hard_stop) return
this.log.debug(`on_error(): ${job.id}: ${err}`)
await this.on_fail( result )
await this.on_fail( job )
}

async on_completed(job, rvalue){
if(this.hard_stop) return
this.log.debug(`on_completed(): ${job.id}: ${JSON.stringify(rvalue)}`)
const { result } = rvalue
if(!result?.url) return console.error(`url was empty:`, job.id)
let fail = result?.open?.data? false: true
this.progressMessage(result.url, result, fail)
if(fail)
Expand All @@ -122,6 +131,7 @@ export class NWWorker {
}

async on_success(result){
if(this.hard_stop) return
this.log.debug(`on_success(): ${result.url}`)
if(this.config?.publisher?.kinds?.includes(30066) ){
const publish30066 = new Publish.Kind30066()
Expand All @@ -134,17 +144,20 @@ export class NWWorker {
}

async on_fail(result){
if(this.hard_stop) return
this.log.debug(`on_fail(): ${result.url}`)
}

async after_completed(result, error=false){
if(this.hard_stop) return
this.log.debug(`after_completed(): ${result.url}`)
await this.updateRelayCache( { ...result } )
await this.retry.setRetries( result.url, !error )
await this.setLastChecked( result.url, Date.now() )
}

cbcall(...args){
if(this.hard_stop) return
this.log.debug(`cbcall(): ${JSON.stringify(args)}`)
const handler = [].shift.call(args)
if(this?.[`on_${handler}`] && typeof this[`on_${handler}`] === 'function')
Expand Down Expand Up @@ -460,6 +473,6 @@ const evaluateMaxRelays = (evaluate, relays) => {
return parseInt( eval( evaluate ) )
}
catch(e){
this.log.error(`Error evaluating this.opts.checks.options.max -> "${this?.opts?.checks?.options?.max} || "is undefined"": ${e?.message || "error undefined"}`)
this.log.err(`Error evaluating this.opts.checks.options.max -> "${this?.opts?.checks?.options?.max} || "is undefined"": ${e?.message || "error undefined"}`)
}
}
37 changes: 28 additions & 9 deletions apps/nocapd/src/daemon.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "websocket-polyfill";

import schedule from 'node-schedule'
import Deferred from 'promise-deferred'

Expand Down Expand Up @@ -73,12 +75,31 @@ const stop = async(signal) => {
log.info(`Received ${signal}`);
log.info(`Gracefully shutting down...`)
$q.worker.hard_stop = true
if(signal !== 'EAI_AGAIN'){
log.debug(`shutdown progress: $q.worker.pause()`)
await $q.worker.pause()
log.debug(`shutdown progress: $q.queue.drain()`)
await $q.queue.drain()
log.info(`shutdown progress: schedule.gracefulShutdown()`)
schedule.gracefulShutdown()
log.info(`shutdown progress: $q.worker.pause()`)
$q.worker.pause()
log.info(`shutdown progress: $q.queue.pause()`)
$q.queue.pause()
log.info(`shutdown progress: $q.queue.drain()`)
await $q.queue.drain()
log.info(`shutdown progress: checking active jobs`)
const {active:numActive} = await $q.queue.getJobCounts('active')
if(numActive > 0) {
log.info(`shutdown progress: ${numActive} active jobs`)
await new Promise( resolve => {
$q.queue.on('drained', resolve)
})
log.info(`shutdown progress: no more jobs`)
}
log.info(`shutdown progress: $q.queue.obliterate()`)
await $q.queue.obliterate()
// if(signal !== 'EAI_AGAIN'){

// }
// else {

// }
log.debug(`shutdown progress: await rcache.$.close()`)
await rcache.$.close()
log.debug(`shutdown progress: complete!`)
Expand Down Expand Up @@ -124,7 +145,7 @@ const schedulePopulator = () =>{
}

const scheduleSyncRelays = () =>{
const name = "syncRelaysIn()"
const name = "scheduleSyncRelays()"
if(!config?.nocapd?.seed?.options?.events) return
const intervalMs = config.nocapd.seed.options.events.interval
log.info(`syncRelaysIn(): scheduling to fire every ${timestring(intervalMs, "s")} seconds`)
Expand Down Expand Up @@ -190,14 +211,12 @@ async function gracefulShutdown(signal) {
}

export const Nocapd = async () => {


config = await loadConfig().catch( (err) => { log.err(err); process.exit() } )
await delay(2000)
rcache = relaycache(process.env.NWCACHE_PATH || './.lmdb')
await migrate(rcache)
await delay(1000)
await maybeAnnounce()
// await maybeAnnounce()
await maybeBootstrap()
$q = await initWorker()
globalHandlers()
Expand Down
Loading

0 comments on commit dd9c759

Please sign in to comment.