Skip to content

Commit

Permalink
Update internal dependencies with actual versions
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Oct 4, 2024
1 parent ad3c4c6 commit 3711c6e
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 52 deletions.
8 changes: 8 additions & 0 deletions .changeset/bright-moles-worry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@nostrwatch/nocap-every-adapter-default': minor
'@nostrwatch/nocap-info-adapter-default': minor
'@nostrwatch/nocap-geo-adapter-default': minor
'@nostrwatch/nocapd': minor
---

fixed hanging job
2 changes: 1 addition & 1 deletion apps/nocapd/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
},
"scripts": {
"launch": "node src/index.js",
"trace": "node --trace-warnings src/index.js",
"trace": "node --trace-warnings --trace-uncaught src/index.js",
"debug": "DEBUG=* node --trace-warnings src/index.js",
"docker:build@debug": "docker build -t nostrwatch/nocapd:latest -f Dockerfile.debug ."
},
Expand Down
8 changes: 4 additions & 4 deletions apps/nocapd/src/classes/NocapdQueues.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ export class NocapdQueues {
}

async pause(){
await this.queue.pause()
await this.queue.pause().catch(console.error)
return this
}

async resume(){
await this.queue.resume()
await this.queue.resume().catch(console.error)
return this
}

async drain(){
await this.queue.drain()
await this.queue.drain().catch(console.error)
return this
}

async obliterate(){
await this.queue.obliterate()
await this.queue.obliterate().catch(console.error)
return this
}
}
65 changes: 37 additions & 28 deletions apps/nocapd/src/classes/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,59 +98,68 @@ export class NWWorker {

async populator(){
this.log.debug(`populator()`)
const relays = await this.getRelays()
await this.addRelayJobs(relays)
const relays = await this.getRelays().catch(console.error)
await this.addRelayJobs(relays).catch(console.error)
}

async syncQueue(){
await this.cleanCompletedJobs()
await this.populateActivePendingJobs()
await this.cleanCompletedJobs().catch(console.error)
await this.populateActivePendingJobs().catch(console.error)
}

async populateActivePendingJobs(){
let jobs = [...(await this.$.queue.getJobs(['active']))]
jobs = [...jobs, ...(await this.$.queue.getJobs(['waiting']))]
let jobs = [...(await this.$.queue.getJobs(['active']).catch(console.error) )]
jobs = [...jobs, ...(await this.$.queue.getJobs(['waiting']).catch(console.error) )]

jobs.forEach( job => {
this.jobs[job.id] = job
})
}

async getActiveJobs(){
return await this.$.queue.getJobs(['active'])
return await this.$.queue.getJobs(['active']).catch(console.error)
}

async getCompletedJobs(){
return await this.$.queue.getJobs(['completed'])
return await this.$.queue.getJobs(['completed']).catch(console.error)
}

async cleanCompletedJobs() {
const jobs = await this.getCompletedJobs()
const jobs = await this.getCompletedJobs().catch(console.error)
jobs.forEach( job => {
job.remove()
})
}

async work(job){
let timeout;
this.log.debug(`${this.id()}: work(): ${job.id} checking ${job.data?.relay} for ${this.opts?.checks?.enabled || "unknown checks"}`)
const failure = (err) => { this.log.err(`Could not run ${this.pubkey} check for ${job.data.relay}: ${err.message}`) }
const failure = (err) => { this.log.error(`Could not run ${this.pubkey} check for ${job.data.relay}: ${err.message}`) }
let result = {}
try {
const timeout = setTimeout( //needed to prevent hanging jobs
() => { throw new Error(`Job Timeout: ${job.id} after ${TIMEOUT/1000}s`) },
let nocap
timeout = setTimeout(//needed to prevent hanging jobs
() => {
const message = `Job Timeout: ${job.id} after ${TIMEOUT/1000}s`
console.log(message)
throw Error(message)
},
TIMEOUT
)
);
const { relay:url } = job.data
const nocap = new Nocap(url, {...this.nocapOpts, logLevel: 'debug'})
await nocap.useAdapters([...Object.values(nocapAdapters)])
nocap = new Nocap(url, {...this.nocapOpts, logLevel: 'debug'})
await nocap.useAdapters([...Object.values(nocapAdapters)]).catch(failure)
result = await nocap.check(this.opts.checks.enabled).catch(failure)
clearTimeout(timeout) //don't forget to clear!
return { result }
}
catch(err) {
this.log.err(`Could not run ${this.pubkey} check for ${job.data.relay}: ${err.message}`)
this.log.error(`Could not run ${this.pubkey} check for ${job.data.relay}: ${err.message}`)
return { result: { url: job.data.relay, open: { data: false }} }
}
finally {
clearTimeout(timeout)
}
}

async on_failed(job, err){
Expand All @@ -171,13 +180,13 @@ export class NWWorker {
this.progressMessage(result.url, result, fail)
delete this.jobs[job.id]
if(fail) {
await this.on_fail( result )
await this.on_fail( result ).catch(console.error)
}
else {
result = await relayHostnameDedup( result, this.rcache ).catch(console.error)
await this.on_success( result )
await this.on_success( result ).catch(console.error)
}
await this.after_completed( result )
await this.after_completed( result ).catch(console.error)
}

async on_success(result){
Expand All @@ -197,7 +206,7 @@ export class NWWorker {
// const id = await publish30166.one( result, process.env.DAEMON_PRIVKEY ).catch(this.log.error.bind(this.log))
k30166.generateEvent( result )
k30166.signEvent( process.env.DAEMON_PRIVKEY )
const id = await this.publisher.publishEvent( k30166.json() )
const id = await this.publisher.publishEvent( k30166.json() ).catch(console.error)
log.debug(`on_success(): ${result.url} published${result?.parent? ' child of '+result.parent: ''}: ${id}`)
}

Expand Down Expand Up @@ -253,7 +262,7 @@ export class NWWorker {
}

async resetProgressCounts(){
const c = await this.counts()
const c = await this.counts().catch(console.error)
this.total = c.prioritized + c.active
this.processed = 1
this.log.debug(`total jobs: ${this.total}`)
Expand All @@ -270,7 +279,7 @@ export class NWWorker {
}
const $relay = this.rcache.relay.get.one(url)
const online = $relay?.online === true
const expired = await this.isExpired(url, timestring(job.timestamp, "ms"))
const expired = await this.isExpired(url, timestring(job.timestamp, "ms")).catch(console.error)
if(!expired && online) return
this.log.debug(`drainSmart(): removing expired job: ${url}: online? ${online}, expired? ${this.isExpired(url, timestring(job.timestamp, "ms"))}`)
expiredJobs.push(
Expand All @@ -286,20 +295,20 @@ export class NWWorker {

async addRelayJobs(relays){
this.log.debug(`addRelayJobs(): for ${relays.length} relays`)
await this.drainSmart()
await this.drainSmart().catch(console.error)
for(const relay of relays){
let job = this.jobs?.[this.jobId(relay)]
if(job) {
await job.remove()
.then( () => this.log.debug(`job removed: ${this.jobId(relay)}`))
.catch( e => this.log.debug(`Could not remove job: ${relay}: Error:`, e))
}
job = await this.addRelayJob({ relay })
job = await this.addRelayJob({ relay }).catch(console.error)
this.jobs[job.id] = job
}
const jobs = Object.values(this.jobs)
if(jobs.length === 0) return
await Promise.allSettled(jobs)
await Promise.allSettled(jobs).catch(console.error)
}

async addRelayJob(job){
Expand Down Expand Up @@ -363,7 +372,7 @@ export class NWWorker {
progress += `${duration/1000} seconds `
}
if(error) {
const retries = await this.retry.getRetries(url)
const retries = this.retry.getRetries(url)
progress += `${error? chalk.gray.italic('error'): ''} `
progress += `[${retries !== null? retries: 0} retries]`
}
Expand All @@ -375,7 +384,7 @@ export class NWWorker {
}

async counts(){
const counts = await this.$.queue.getJobCounts()
const counts = await this.$.queue.getJobCounts().catch(console.error)
this.log.info(chalk.magenta.bold(`=== [queue stats] active: ${counts.active} - completed: ${ counts.completed } - failed: ${counts.failed} - prioritized: ${counts.prioritized} - delayed: ${counts.delayed} - waiting: ${counts.waiting} - paused: ${counts.paused} - total: ${counts.completed} / ${counts.active} + ${counts.waiting + counts.prioritized} ===`))
this.show_cache_counts()
return counts
Expand Down Expand Up @@ -602,7 +611,7 @@ const evaluateMaxRelays = (evaluate, relays) => {
return parseInt( eval( evaluate ) )
}
catch(e){
this.log.err(`Error evaluating this.opts.checks.options.max -> "${this?.opts?.checks?.options?.max} || "is undefined"": ${e?.message || "error undefined"}`)
this.log.error(`Error evaluating this.opts.checks.options.max -> "${this?.opts?.checks?.options?.max} || "is undefined"": ${e?.message || "error undefined"}`)
}
}

Expand Down
15 changes: 5 additions & 10 deletions apps/nocapd/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,13 @@ const globalHandlers = () => {
process.on(signal, async () => await gracefulShutdown(signal));
});

process.on('uncaughtException', async (error) => {
log.error('Uncaught Exception:', error);
process.on('uncaughtException', async (err) => {
log.error('!! Uncaught Exception:', err);
// log.error('Uncaught Exception:', err.stack);
});

process.on('unhandledRejection', async (reason, promise) => {
log.error('Unhandled Rejection:', promise.catch(console.error));
log.error('!! Unhandled Rejection:', promise.catch(console.error));
});

$q.worker.on('error', async (err) => {
Expand All @@ -295,7 +296,7 @@ async function gracefulShutdown(signal) {

export const Nocapd = async () => {
log.info('Starting Nocapd...')
config = await loadConfig().catch( (err) => { log.err(err); process.exit() } )
config = await loadConfig().catch( (err) => { log.err(err); process.exit(9) } )
log.info('Loaded config')
const lmdbOpts = config?.lmdb ?? {}
concurrency = config?.nocapd?.bullmq?.worker?.concurrency? config.nocapd.bullmq.worker.concurrency: 1
Expand All @@ -305,13 +306,7 @@ export const Nocapd = async () => {
await migrate(rcache)
log.info('ran migrations...')

// await maybeAnnounce()
// log.info('announced...')

await populateRelays( true )

// if(await maybeBootstrap())
// log.info('Bootstrapped')

initBus()
await initQueue()
Expand Down
3 changes: 1 addition & 2 deletions internal/controlflow/src/retry.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import relaycache from '@nostrwatch/nwcache'
import { capitalize, loadConfig } from "@nostrwatch/utils"
import { capitalize } from "@nostrwatch/utils"

export class RetryManager {

Expand Down
4 changes: 3 additions & 1 deletion libraries/nocap/adapters/default/GeoAdapterDefault/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const IPV4 = /\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01
async getGeoData(ip){
const API_KEY = this.getApiKey();
const FIELDS = 'proxy,mobile,timezone,continent,continentCode,country,countryCode,region,regionName,city,district,zip,lat,lon,isp,as,asname,query'
const headers = { accept: 'application/json' }
const signal = this.$.controller.signal

let response;
let endpoint;
Expand All @@ -32,7 +34,7 @@ const IPV4 = /\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01
else
endpoint = `http://ip-api.com/json/${ip}?fields=${FIELDS}`

response = await fetch(endpoint, { 'accept': 'application/json' }).catch(this.$.logger.error)
response = await fetch(endpoint, { headers, signal }).catch(this.$.logger.error)

delete response.query
delete response.status
Expand Down
26 changes: 21 additions & 5 deletions libraries/nocap/adapters/default/InfoAdapterDefault/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ import fetch from 'cross-fetch'
// import Ajv from 'ajv'

class InfoAdapterDefault {
uses = ['AbortController']

constructor(parent){
this.$ = parent

// this.ajv = new Ajv()
}

async check_info(){
let result, data = {}
const controller = new AbortController(),
{ signal } = controller,
url = new URL(this.$.url),
const url = new URL(this.$.url),
headers = {"Accept": "application/nostr+json"},
method = 'GET'

Expand All @@ -30,8 +31,23 @@ class InfoAdapterDefault {

try
{
const response = await fetch(url.toString(), { method, headers, signal })
data = await response.json()
await fetch(url.toString(), { method, headers, signal: this.$.controller.signal })
.then(async (response) => {
if(!response.ok)
{
this.$.logger.debug(`check_info(): fetch error: ${e.message}`)
result = { status: "error", message: e.message, data }
}
else {
this.$.logger.debug(`check_info(): response status: ${response}`)
data = await response.json()
}
return response
})
.catch((e) => {
this.$.logger.debug(`check_info(): fetch error: ${e.message}`)
result = { status: "error", message: e.message, data }
})
}

catch(e)
Expand Down
26 changes: 26 additions & 0 deletions libraries/nocap/src/classes/Base.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ export default class Base {
config = {}
limits = {}

controller = new AbortController()
signal = this.controller.signal

constructor(url, config={}) {
this._url = new URL(url)
this.url = this._url.toString()
Expand Down Expand Up @@ -244,6 +247,9 @@ export default class Base {
const message = `${key}: check timed out (after ${this.config.timeout[key]}ms}`
this.logger.debug(message)
const data = this.isWebsocketKey(key)? false: {}
if(this.doesAdapterUse(key, 'AbortController')){
this.controller.abort()
}
if(key === 'open' && this.config.rejectOnConnectFailure){
return reject({ data, duration: -1, status: "error", message})
}
Expand Down Expand Up @@ -546,6 +552,11 @@ export default class Base {
)
}

abort(){
this.controller.abort();
this.terminate();
}

/**
* on
* Adds a callback function to the Check
Expand Down Expand Up @@ -1138,6 +1149,21 @@ export default class Base {
return type
}

/**
* doesAdapterUse
* Helper that checks an adapter's "uses" property for a given key.
*
* @private
* @param {string} checkKey - The key of the adpater to check
* @param {string} useKey - The key to check if the adapter uses
* @returns {boolean} - True if the adapter uses the key, false otherwise
*/
doesAdapterUse(checkKey, useKey){
const adapter = this.routeAdapter(checkKey)
const $adapter = this.adapters[adapter]
return $adapter?.uses?.includes(useKey)
}

/**
* set
* Sets a value for a given key in the instance
Expand Down
Loading

0 comments on commit 3711c6e

Please sign in to comment.