Skip to content

Commit

Permalink
fix referencing bug
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Mar 16, 2024
1 parent 6b15508 commit 2866449
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 99 deletions.
7 changes: 5 additions & 2 deletions apps/nocapd/.dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
node_modules
docker-compose.yaml
yarn.lock
yarn.lock
.*
config.yaml
docker-compose.yaml
README.md
7 changes: 4 additions & 3 deletions apps/nocapd/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
"dependencies": {
"@nostr-fetch/adapter-nostr-tools": "0.14.1",
"@nostrwatch/announce": "^0.1.0",
"@nostrwatch/controlflow": "^0.0.3",
"@nostrwatch/controlflow": "^0.1.0",
"@nostrwatch/logger": "^0.0.4",
"@nostrwatch/nocap": "^0.4.2",
"@nostrwatch/nwcache": "^0.0.2",
"@nostrwatch/publisher": "^0.3.2",
"@nostrwatch/nwcache": "^0.1.0",
"@nostrwatch/publisher": "^0.4.0",
"@nostrwatch/seed": "^0.0.2",
"@nostrwatch/utils": "^0.0.3",
"bluebird": "3.7.2",
Expand All @@ -20,6 +20,7 @@
"nostr-fetch": "0.14.1",
"nostr-geotags": "^0.5.0",
"object-mapper": "6.2.0",
"promise-deferred": "2.0.4",
"timestring": "^7.0.0"
},
"scripts": {
Expand Down
16 changes: 11 additions & 5 deletions apps/nocapd/src/classes/NocapdQueues.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class NocapdQueues {

setup(opts){
this.pubkey = opts?.pubkey? opts.pubkey: null
this.log = this.opts?.logger? this.opts.logger: new Logger('nocap/$NocapdQueues')
this.log = this.opts?.logger? this.opts.logger: new Logger('nocap/queue-manager')
this.cb = {}
this.queue = null
this.events = null
Expand Down Expand Up @@ -64,6 +64,7 @@ export class NocapdQueues {

on(event, handler){
this.cb[event] = handler.bind(this)
return this
}

cbcall(...args){
Expand All @@ -75,18 +76,23 @@ export class NocapdQueues {
}

async pause(){
return await this.queue.pause()
await this.queue.pause()
return this
}

async resume(){
return await this.queue.resume()
await this.queue.resume()
return this
}

async drain(){
return await this.queue.drain()
await this.queue.drain()
return this

}

async obliterate(){
return await this.queue.obliterate()
await this.queue.obliterate()
return this
}
}
98 changes: 62 additions & 36 deletions apps/nocapd/src/classes/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export class NWWorker {
this.setupConfig()
this.setupNocapOpts()
this.setupJobOpts()
this.setupInstances()
}

setupDefaultValues(){
Expand All @@ -33,11 +34,12 @@ export class NWWorker {
this.relayMeta = new Map()
this.jobOpts = {}
this.nocapOpts = {}
this.hard_stop = false
}

setupConfig(){
this.opts = this.config.nocapd
this.retry = new RetryManager(`nocapd/${this.pubkey}`, this.opts?.retry)

this.checks = this.opts?.checks?.enabled.includes('all')? Nocap.checksSupported(): this.opts?.checks?.enabled
this.checkOpts = this.opts?.checks?.options || {}
this.timeout = this.setTimeout(this.checkOpts?.timeout)
Expand All @@ -46,6 +48,11 @@ export class NWWorker {
this.interval = this.checkOpts?.interval? timestring(this.checkOpts.interval, 'ms'): 60*1000
this.networks = this.opts?.networks? this.opts.networks: ['clearnet']
this.log = this.config?.logger? this.config.logger: new Logger('nocap/$NWWorker')

}

setupInstances(){
this.retry = new RetryManager(`nocapd/${this.pubkey}`, this.opts?.retry, this.rcache)
}

setupNocapOpts(){
Expand All @@ -70,14 +77,16 @@ export class NWWorker {
}

async populator(){
this.log.info(`${this.id()}:_populator(): populating queue`)
const relays = await this.getRelays()
this.log.info(relays.length)
this.log.info(`populator()`)
await this.$.worker.pause()
this.log.debug(`populator(): worker paused`)
const relays = await this.getRelays()
this.log.debug(`populator(): adding ${relays.length} relays to queue`)
await this.addRelayJobs(relays)
this.log.debug(`${this.id()}:_populator(): Added ${relays?.length} to queue`)
delay(1000)
this.log.debug(`populator(): added ${relays.length} relays to queue`)
await this.$.worker.resume()
this.log.debug(`populator(): worker resumed`)

}

async work(job){
Expand All @@ -101,44 +110,45 @@ export class NWWorker {
const { result } = rvalue
let fail = result?.open?.data? false: true
if(fail)
await this.on_fail(result)
await this.on_fail( result )
else
await this.on_success(result)
await this.on_success( result )
await this.after_completed( result, fail )
}

async on_success(result){
this.log.debug(`on_success(): ${result.url}`)
this.progressMessage(result.url, result)
async publish_30066(result){
if(this.config?.publisher?.kinds?.includes(30066) ){
const publish30066 = new Publish.Kind30066()
await publish30066.one( result )
}
}

async publish_30166(result){
if(this.config?.publisher?.kinds?.includes(30166) ){
const publish30166 = new Publish.Kind30166()
await publish30166.one( result )
}
}

async on_success(result){
this.log.debug(`on_success(): ${result.url}`)
await this.publish_30066(result)
await this.publish_30166(result)
}

async on_fail(result){
this.log.debug(`on_fail(): ${result.url}`)
this.progressMessage(result.url, null, true)
}

async after_completed(result, error=false){
this.log.debug(`after_completed(): ${result.url}`)
this.processed++
await this.updateRelayCache( { ...result } )
await delay(200)
await delay(100)
await this.retry.setRetries( result.url, !error )
await delay(200)
await delay(100)
await this.setLastChecked( result.url, Date.now() )
}

async on_drained(){
//this.log.debug(`on_drained()`)
this.total = 0
this.processed = 0
this.progressMessage(result.url, result, error)
}

cbcall(...args){
Expand All @@ -151,12 +161,19 @@ export class NWWorker {
}

async addRelayJobs(relays){
this.log.debug(`Relay Counts: ${JSON.stringify(await this.counts())}`)
this.log.debug(`addRelayJobs(): for ${relays.length} relays`)
const promises = []
for await ( const relay of relays ){
const $job = await this.addRelayJob({ relay })
if($job?.id)
this.total++
promises.push(this.addRelayJob({ relay }))
}
await Promise.all(promises)
}

async resetProgressCounts(){
const c = await this.counts()
this.total = c.prioritized
this.processed = 0
this.log.debug(`total jobs: ${this.total}`)
}

async addRelayJob(jdata){
Expand All @@ -180,7 +197,7 @@ export class NWWorker {
const mute = chalk.gray
let progress = ''
progress += `[${chalk.bgBlack(this.calculateProgress())}] `
progress += `${mute(this.processed)}/${mute(this.total)} `
progress += `${mute(this.processed++)}/${mute(this.total)} `
progress += `${url}: `
if(this.checks.includes('open'))
progress += `${result?.open?.data === true? success("online"): failure("offline")} `
Expand All @@ -197,9 +214,14 @@ export class NWWorker {
if(this.checks.includes('info'))
progress += `${Object.keys(result?.info?.data || {}).length? success("info"): failure("info")} `

progress += `${(result?.open?.duration+result?.read?.duration+result?.write?.duration)/1000} seconds `
progress += `${error? chalk.gray.italic('error'): ''} `
progress += `[${error? await this.retry.getRetries(url) + " retries": ""}]`
if(!error){
progress += `${(result?.open?.duration+result?.read?.duration+result?.write?.duration)/1000} seconds `
}

if(error) {
progress += `${error? chalk.gray.italic('error'): ''} `
progress += `[${await this.retry.getRetries(url)} retries}]`
}

this.log.info(progress)
}
Expand All @@ -216,7 +238,6 @@ export class NWWorker {

hasChanged(data1, data2){
const changed = hash(data1) !== hash(data2)
//this.log.debug(`hasChanged: ${changed}`)
return changed
}

Expand Down Expand Up @@ -290,29 +311,31 @@ export class NWWorker {
const promises = new Array()
let record = new Object()

record.url = url
if(result?.open?.data) record.online = result.open.data

for( const key of ['info', 'dns', 'geo', 'ssl'] ){
const resultHasKey = result?.[key]?.data && Object.keys(result[key].data)?.length > 0
if(resultHasKey){
const persist_result = async (resolve, reject) => {
const _record = { url: url, relay_id, updated_at: Date.now(), hash: hash(result[key].data), data: result[key].data }
const _record = { url: url, relay_id, updated_at: Date.now(), hash: hash(result[key].data), data: JSON.stringify(result[key].data) }
const _check_id = await this.rcache.check[key].insert(_record)
if(!_check_id)
reject()
record = {...record, ...{ [key]: _check_id }}
record[key] = _check_id
resolve()
}
promises.push( new Promise( persist_result ) )
}
}
await Promise.allSettled(promises)
record.url = url
if(result?.open?.data) record.online = result.open.data
await delay(100)
const $id = await this.rcache.relay.patch(record)
return $id
}

async getRelays() {
this.log.debug(`getRelays()`)
const allRelays = await this.rcache.relay.get.all();
const onlineRelays = [];
const uncheckedRelays = [];
Expand All @@ -321,8 +344,13 @@ export class NWWorker {
this.relayMeta = new Map()

for (const relay of allRelays) {
this.log.debug(`getRelays() relay: ${relay.url}`)

this.log.debug(`getRelays() relay: ${relay.url}: lastChecked()`)
const lastChecked = await this.rcache.cachetime.get.one(this.cacheId(relay.url));
this.log.debug(`getRelays() relay: ${relay.url}: retries()`)
const retries = await this.retry.getRetries(relay.url);
this.log.debug(`getRelays() relay: ${relay.url}: isExpired()`)
const isExpired = await this.isExpired(relay.url, lastChecked);
const isOnline = relay?.online === true;

Expand All @@ -337,7 +365,6 @@ export class NWWorker {
expiredRelays.push({ url: relay.url, lastChecked, retries });
group = 'expired';
}

this.relayMeta.set(relay.url, { group, retries: retries > 0 ? retries : undefined });
}

Expand All @@ -355,7 +382,6 @@ export class NWWorker {

return relaysFiltered.slice(0, truncateLength);
}


get_truncate_length(relays){
let length = relays.length
Expand All @@ -374,7 +400,7 @@ export class NWWorker {
async isExpired(url, lastChecked) {
let retries = await this.retry.getRetries(url);
retries = retries === null? 0: retries
const expiry = retries > 0 ? await this.retry.getExpiry(url) : this.expires;
const expiry = retries > 0 ? this.retry.getExpiry(url) : this.expires;
return lastChecked < Date.now() - expiry;
}

Expand Down
Loading

0 comments on commit 2866449

Please sign in to comment.