Skip to content

Commit

Permalink
fix uncaught exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Jul 22, 2024
1 parent dbe1e5f commit 827989d
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 125 deletions.
2 changes: 1 addition & 1 deletion apps/nocapd/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"timestring": "^7.0.0"
},
"scripts": {
"launch": "node src/index.js",
"launch": "NODE_DEBUG=bull node src/index.js",
"trace": "node --trace-warnings src/index.js",
"debug": "DEBUG=* node --trace-warnings src/index.js",
"docker:build@debug": "docker build -t nostrwatch/nocapd:latest -f Dockerfile.debug ."
Expand Down
17 changes: 7 additions & 10 deletions apps/nocapd/src/classes/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class NWWorker {
this.expires = this.checkOpts?.expires? timestring(this.checkOpts.expires, 'ms'): 60*60*1000
this.interval = this.checkOpts?.interval? timestring(this.checkOpts.interval, 'ms'): 60*1000
this.networks = this.opts?.networks? this.opts.networks: ['clearnet']
this.log = this.config?.logger? this.config.logger: new Logger('nocap/$NWWorker')
this.log = this.config?.logger? this.config.logger: new Logger('nocap/NWWorker')
}

setupInstances(){
Expand All @@ -91,10 +91,7 @@ export class NWWorker {
this.log.debug(`populator()`)
await this.$.worker.pause()
const relays = await this.getRelays()
// if(relays.length > 0)
await this.addRelayJobs(relays)
// else
// this.setTimeout( this.populator.bind(this), this.interval)
return async () => await this.$.worker.resume()
}

Expand All @@ -115,13 +112,13 @@ export class NWWorker {
}

async on_error(job, err){
// if(this.hard_stop) return
if(this.hard_stop) return
this.log.debug(`on_error(): ${job.id}: ${err}`)
await this.on_fail( job )
}

async on_completed(job, rvalue){
// if(this.hard_stop) return
if(this.hard_stop) return
this.log.debug(`on_completed(): ${job.id}: ${JSON.stringify(rvalue)}`)
const { result } = rvalue
if(!result?.url) return console.error(`url was empty:`, job.id)
Expand All @@ -136,15 +133,15 @@ export class NWWorker {
}

async on_success(result){
// if(this.hard_stop) return
if(this.hard_stop) return
this.log.debug(`on_success(): ${result.url}`)
if(this.config?.publisher?.kinds?.includes(30066) ){
const publish30066 = new Publish.Kind30066()
await publish30066.one( result )
await publish30066.one( result ).catch(this.log.error)
}
if(this.config?.publisher?.kinds?.includes(30166) ){
const publish30166 = new Publish.Kind30166()
await publish30166.one( result )
const publish30166 = new Publish.Kind30166()
await publish30166.one( result ).catch(this.log.error)
}
}

Expand Down
50 changes: 18 additions & 32 deletions apps/nocapd/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const setIntervals = () => {
// intervalPopulate = setInterval( checkQueue, timestring( config?.nocapd?.checks?.options?.interval, "ms" ))
}


const initWorker = async () => {
const connection = RedisConnectionDetails()
log.info(`initWorker(): connecting to redis at`, connection)
Expand All @@ -61,11 +62,8 @@ const initWorker = async () => {
.set( 'checker', new NWWorker(PUBKEY, $q, rcache, {...config, logger: new Logger('@nostrwatch/nocapd:worker'), pubkey: PUBKEY }) )
.set( 'worker' , new BullMQ.Worker($q.queue.name, $q.route_work.bind($q), { concurrency, connection, ...queueOpts() } ) )

await $q.queue.drain()
// await $q.queue.obliterate()
await $q.checker.drainSmart()
setIntervals()
// $q.events.on('drained', populateQueue)
await populateQueue()
$q.resume()
log.info(`initialized: ${$q.queue.name}`)
Expand All @@ -75,41 +73,18 @@ const initWorker = async () => {
const stop = async(signal) => {
log.info(`Received ${signal}`);
log.info(`Gracefully shutting down...`)
$q.worker.hard_stop = true
// $q.worker.hard_stop = true
log.info(`shutdown progress: schedule.gracefulShutdown()`)
schedule.gracefulShutdown()
log.info(`shutdown progress: $q.worker.pause()`)
$q.worker.pause()
log.info(`shutdown progress: $q.queue.pause()`)
$q.queue.pause()
await rcache.$.close()
rcache.$.close()
log.info(`shutdown progress: $q.queue.drain()`)
await $q.queue.drain()
log.info(`shutdown progress: checking active jobs`)
// const {active:numActive} = await $q.queue.getJobCounts('active')
// if(numActive > 0) {
// log.info(`shutdown progress: ${numActive} active jobs`)
// await new Promise( resolve => {
// const intVal = setInterval(async () => {
// const {active:numActive} = await $q.queue.getJobCounts('active')
// if(numActive === 0) {
// clearInterval(intVal)
// resolve()
// }
// }, 1000)
// })
// log.info(`shutdown progress: no more jobs`)
// }
$q.queue.drain()
log.info(`shutdown progress: $q.queue.obliterate()`)
await $q.queue.obliterate()
// if(signal !== 'EAI_AGAIN'){

// }
// else {

// }
log.debug(`shutdown progress: await rcache.$.close()`)
await rcache.$.close()
$q.queue.obliterate()
log.debug(`shutdown progress: complete!`)
}

Expand All @@ -133,7 +108,11 @@ const maybeAnnounce = async () => {
await announce.publish( conf.relays )
}

const scheduleSeconds = (name, intervalMs, cb) => {
const scheduleSeconds = async (name, intervalMs, cb) => {
const active = await $q.queue.getActive();
const jobIds = active.map(job => job.id);
console.log('Active Job IDs:', jobIds);

log.info(`${name}: scheduling to fire every ${timestring(intervalMs, "s")} seconds`)
const rule = new schedule.RecurrenceRule();
const _interval = timestring(intervalMs, "s")
Expand Down Expand Up @@ -171,7 +150,6 @@ const syncRelaysIn = async () => {
log.debug(`syncRelaysIn(): Persisting ${relays.length} relays`, relays)
const persisted = await rcache.relay.batch.insertIfNotExists(relays).catch(console.error)
log.debug(`syncRelaysIn(): Persisted ${persisted} new relays`)
console.log('fucking relays', await this.rcache.relay.get.all().length)
if(persisted.length === 0) return 0
log.info(chalk.yellow.bold(`Persisted ${persisted.length} new relays`))
return persisted
Expand Down Expand Up @@ -221,6 +199,8 @@ async function gracefulShutdown(signal) {
process.exit(9);
}



export const Nocapd = async () => {
config = await loadConfig().catch( (err) => { log.err(err); process.exit() } )
await delay(2000)
Expand All @@ -230,6 +210,12 @@ export const Nocapd = async () => {
// await maybeAnnounce()
await maybeBootstrap()
$q = await initWorker()
// setInterval( async () => {
// const active = await $q.queue.getActive();
// const jobIds = active.map(job => job.id);
// console.log('Active Job IDs:', jobIds);
// }, 5000)

globalHandlers()
return {
$q,
Expand Down
4 changes: 1 addition & 3 deletions internal/publisher/src/Publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ const config = await loadConfig()

import fs from 'fs/promises';

const log = new Logger('publisher')

async function writeObjectToFile(obj) {

let filename = obj.tags.find( tag => tag[0] === 'd' )[1].replace('wss://', '').replace('ws://', '').replace('http://', '').replace('https://', '').replace('/', '')
Expand Down Expand Up @@ -85,7 +83,7 @@ export class Publisher {
return event
} catch(e) {
this.logger.err(`signEvent(): Error: ${e}`)
console.log(event)
// console.log(event)
}
}

Expand Down
53 changes: 21 additions & 32 deletions internal/redis/bullboard.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,27 @@
const { createBullBoard } = require('@bull-board/api');
const { BullMQAdapter } = require('@bull-board/api/bullMQAdapter');
const { FastifyAdapter } = require('@bull-board/fastify');
const { Queue: QueueMQ, Worker } = require('bullmq');
const fastify = require('fastify');
import express from 'express';
import { createBullBoard } from '@bull-board/api';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';

const sleep = (t) => new Promise((resolve) => setTimeout(resolve, t * 1000));
import { NoCapdQueue } from '@nostrwatch/controlflow'

const redisOptions = {
port: 6379,
host: 'localhost',
password: '',
tls: false,
};
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/');

const queueMQ = new QueueMQ();
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(someQueue), new BullAdapter(someOtherQueue), new BullMQAdapter(queueMQ)],
serverAdapter: serverAdapter,
});

const serverAdapter = new FastifyAdapter()
const app = express();

createBullBoard({
queues: [new BullMQAdapter(queueMQ)],
serverAdapter,
options: {
uiConfig: {
boardTitle: 'My BOARD',
boardLogo: {
path: 'https://cdn.my-domain.com/logo.png',
width: '100px',
height: 200,
},
miscLinks: [{text: 'Logout', url: '/logout'}],
favIcon: {
default: 'static/images/logo.svg',
alternative: 'static/favicon-32x32.png',
},
},
},
app.use('/', serverAdapter.getRouter());

// other configurations of your server

app.listen(3000, () => {
console.log('Running on 3000...');
console.log('For the UI, open http://localhost:3000/admin/queues');
console.log('Make sure Redis is running on port 6379 by default');
});
10 changes: 6 additions & 4 deletions libraries/nocap/adapters/default/DnsAdapterDefault/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import fetch from 'cross-fetch'

const IPV4 = /^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$/;
const IPV6 = /(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))/;


class DnsAdapterDefault {
constructor(parent){
this.$ = parent
Expand Down Expand Up @@ -30,13 +34,11 @@ class DnsAdapterDefault {
}

const getIpv4 = (jsonData) => {
const ipv4Regex = /^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$/;
return jsonData.Answer.filter(answer => ipv4Regex.test(answer.data)).map(answer => answer.data) || null;
return jsonData.Answer?.filter(answer => IPV4.test(answer.data)).map(answer => answer.data) || null;
}

const getIpv6 = (jsonData) => {
const ipv6Regex = /(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))/;
return jsonData.Answer.filter(answer => ipv6Regex.test(answer.data)).map(answer => answer.data) || null;
return jsonData.Answer?.filter(answer => IPV6.test(answer.data)).map(answer => answer.data) || null;
}

export default DnsAdapterDefault
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nostrwatch/nocap-dns-adapter-default",
"version": "1.0.2",
"version": "1.0.3",
"type": "module",
"main": "index.js",
"license": "MIT",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{
"name": "@nostrwatch/nocap-every-adapter-default",
"version": "1.3.1",
"version": "1.3.2",
"type": "module",
"main": "index.js",
"license": "MIT",
"dependencies": {
"@nostrwatch/nocap-dns-adapter-default": "^1.0.2",
"@nostrwatch/nocap-geo-adapter-default": "^1.0.3",
"@nostrwatch/nocap-dns-adapter-default": "^1.0.3",
"@nostrwatch/nocap-geo-adapter-default": "^1.0.4",
"@nostrwatch/nocap-info-adapter-default": "^1.0.1",
"@nostrwatch/nocap-ssl-adapter-default": "^1.0.2",
"@nostrwatch/nocap-websocket-adapter-default": "^1.3.1",
Expand Down
20 changes: 17 additions & 3 deletions libraries/nocap/adapters/default/GeoAdapterDefault/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { fetch } from 'cross-fetch'

const IPV4 = /\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b/g;


class GeoAdapterDefault {
constructor(parent){
this.$ = parent
Expand All @@ -19,8 +22,19 @@ import { fetch } from 'cross-fetch'

async check_geo(){
let endpoint
const iparr = this.$.results.get('dns')?.data.ipv4
const ip = iparr[iparr.length-1]
let ip
const result = { status: "success", data: {} }
if(IPV4.test(this.$.url)) {
ip = this.$.url.match(IPV4)[0];
}
else if(!this.$.results.get('dns')?.data?.ipv4?.length) {
return this.$.finish('geo', result)
}
else {
const iparr = this.$.results.get('dns')?.data?.ipv4
// console.log(iparr)
ip = iparr[iparr.length-1]
}
const apiKey = this.getApiKey();
//todo, enable override via options
const fields = 'proxy,mobile,timezone,continent,continentCode,country,countryCode,region,regionName,city,district,zip,lat,lon,isp,as,asname,query'
Expand All @@ -34,7 +48,7 @@ import { fetch } from 'cross-fetch'
const response = await fetch(endpoint, { headers }).catch(this.$.logger.warn)
delete response.query
delete response.status
const result = { status: "success", data: await response.json() }
result.data = await response.json()
this.$.finish('geo', result)
}
}
Expand Down
2 changes: 1 addition & 1 deletion libraries/nocap/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
},
"dependencies": {
"@nostrwatch/logger": "0.0.6",
"@nostrwatch/nocap-every-adapter-default": "^1.3.1",
"@nostrwatch/nocap-every-adapter-default": "^1.3.2",
"fetch-h2": "3.0.2",
"get-ssl-certificate": "2.3.3",
"jest": "29.7.0",
Expand Down
5 changes: 3 additions & 2 deletions libraries/nocap/src/classes/Base.js
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ export default class Base {
.then(async () => {
this.logger.debug(`${key}: precheck resolved`)
this.latency.start(key)
await this.adapters[adapter][`check_${key}`]()
this.logger.debug(`${key}: this.adapters[${adapter}][check_${key}]()`)
await this.adapters[adapter][`check_${key}`]().catch( (e) => this.logger.err(`${key}: ${e.message}`) )
})
.catch((precheck) => {
let reason
Expand Down Expand Up @@ -517,7 +518,7 @@ export default class Base {
'close',
() => this.ws.close()
)
return this.terminate(key)
this.terminate(key)
}

/**
Expand Down
Loading

0 comments on commit 827989d

Please sign in to comment.