Skip to content

Commit

Permalink
update nocapd && publisher to nip66@draft7
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Jul 27, 2024
1 parent 11db0e4 commit b3bd654
Show file tree
Hide file tree
Showing 44 changed files with 683 additions and 1,000 deletions.
4 changes: 2 additions & 2 deletions apps/nocapd/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
"@nostrwatch/announce": "0.1.0",
"@nostrwatch/controlflow": "0.2.1",
"@nostrwatch/logger": "0.0.6",
"@nostrwatch/nocap": "0.5.2",
"@nostrwatch/nocap": "0.5.3",
"@nostrwatch/nocap-every-adapter-default": "1.4.0",
"@nostrwatch/nwcache": "0.1.2",
"@nostrwatch/publisher": "0.4.3",
"@nostrwatch/publisher": "0.5.1",
"@nostrwatch/seed": "0.0.2",
"@nostrwatch/utils": "0.1.3",
"bluebird": "3.7.2",
Expand Down
36 changes: 16 additions & 20 deletions apps/nocapd/src/classes/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,8 @@ export class NWWorker {
async on_success(result){
if(this.hard_stop) return
this.log.debug(`on_success(): ${result.url}`)
if(this.config?.publisher?.kinds?.includes(30066) ){
const publish30066 = new Publish.Kind30066()
await publish30066.one( result ).catch(this.log.error)
}
if(this.config?.publisher?.kinds?.includes(30166) ){
const publish30166 = new Publish.Kind30166()
await publish30166.one( result ).catch(this.log.error)
}
const publish30166 = new Publish.Kind30166()
await publish30166.one( result ).catch(this.log.error)
}

async on_fail(result){
Expand Down Expand Up @@ -269,10 +263,8 @@ export class NWWorker {

async counts(){
const counts = await this.$.queue.getJobCounts()
if(counts.prioritized + counts.active > 0) {
this.log.info(chalk.magenta.bold(`=== [queue stats] active: ${counts.active} - completed: ${ counts.completed } - failed: ${counts.failed} - prioritized: ${counts.prioritized} - delayed: ${counts.delayed} - waiting: ${counts.waiting} - paused: ${counts.paused} - total: ${counts.completed} / ${counts.active} + ${counts.waiting + counts.prioritized} ===`))
this.show_cache_counts()
}
this.log.info(chalk.magenta.bold(`=== [queue stats] active: ${counts.active} - completed: ${ counts.completed } - failed: ${counts.failed} - prioritized: ${counts.prioritized} - delayed: ${counts.delayed} - waiting: ${counts.waiting} - paused: ${counts.paused} - total: ${counts.completed} / ${counts.active} + ${counts.waiting + counts.prioritized} ===`))
this.show_cache_counts()
return counts
}

Expand Down Expand Up @@ -302,7 +294,9 @@ export class NWWorker {
}

getPriority(relay){
const {group, retries} = this.relayMeta.get(relay)
const relayMeta = this.relayMeta.get(relay)
if(!relayMeta) return this.priority
const {group, retries} = relayMeta
const format = i => Math.ceil(i)
if(group === 'online')
return format(this.priority/2)
Expand Down Expand Up @@ -431,13 +425,15 @@ export class NWWorker {
}

show_cache_counts(){
let cacheMessage = ''
cacheMessage += `=== [cache stats] online: ${this.cache_counts.online} - `
cacheMessage += `online & expired: ${this.cache_counts.onlineExpired} - `
cacheMessage += `expired: ${this.cache_counts.expired} - `
cacheMessage += `unchecked: ${this.cache_counts.unchecked} - `
cacheMessage += `total: ${this.cache_counts.allRelays} ===`
this.log.info(chalk.blue.bold(cacheMessage));
this.getRelays().then( () => {
let cacheMessage = ''
cacheMessage += `=== [cache stats] online: ${this.cache_counts.online} - `
cacheMessage += `online & expired: ${this.cache_counts.onlineExpired} - `
cacheMessage += `expired: ${this.cache_counts.expired} - `
cacheMessage += `unchecked: ${this.cache_counts.unchecked} - `
cacheMessage += `total: ${this.cache_counts.allRelays} ===`
this.log.info(chalk.blue.bold(cacheMessage));
})
}

qualifyNetwork(url){
Expand Down
17 changes: 9 additions & 8 deletions apps/nocapd/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ const populateQueue = async () => {
const checkQueue = async () => {
const counts = await $q.checker.counts()
const enqueue = counts.prioritized + counts.active
if(enqueue > 0) return log.debug(`checkQueue(): ${$q.queue.name}: ${enqueue} events active`)
if(enqueue > 0) {
return log.debug(`checkQueue(): ${$q.queue.name}: ${enqueue} events active`)
}
populateQueue()
}

const setIntervals = () => {
// intervalSyncRelays = setInterval( syncRelaysIn, timestring(config?.nocapd?.seed?.options?.events?.interval, "ms") || timestring("1h", "ms"))
schedulePopulator()
scheduleSyncRelays()
// intervalPopulate = setInterval( checkQueue, timestring( config?.nocapd?.checks?.options?.interval, "ms" ))
}


Expand All @@ -59,7 +59,6 @@ const initWorker = async () => {
.set( 'worker' , new BullMQ.Worker($q.queue.name, $q.route_work.bind($q), { concurrency, connection, ...queueOpts() } ) )


$q.queue.on('drained', () => { log.info(`queue was fucking drained.`) })
await $q.checker.drainSmart()
setIntervals()
await populateQueue()
Expand All @@ -78,6 +77,7 @@ const stop = async(signal) => {
$q.worker.pause()
log.info(`shutdown progress: $q.queue.pause()`)
$q.queue.pause()
log.info(`shutdown progress: close lmdb`)
rcache.$.close()
log.info(`shutdown progress: $q.queue.drain()`)
$q.queue.drain()
Expand All @@ -103,13 +103,13 @@ const maybeAnnounce = async () => {
const announce = new AnnounceMonitor(conf)
announce.generate()
announce.sign( process.env.DAEMON_PRIVKEY )
await announce.publish( conf.relays )
await announce.publish( conf.relays ).catch(log.warn)
}

const scheduleSeconds = async (name, intervalMs, cb) => {
const active = await $q.queue.getActive();
const jobIds = active.map(job => job.id);
console.log('Active Job IDs:', jobIds);
// console.log('Active Job IDs:', jobIds);

log.info(`${name}: scheduling to fire every ${timestring(intervalMs, "s")} seconds`)
const rule = new schedule.RecurrenceRule();
Expand Down Expand Up @@ -192,7 +192,7 @@ const globalHandlers = () => {
}

async function gracefulShutdown(signal) {
console.log(`Received ${signal}`);
// console.log(`Received ${signal}`);
await stop(signal)
process.exit(9);
}
Expand All @@ -205,9 +205,10 @@ export const Nocapd = async () => {
rcache = relaycache(process.env.NWCACHE_PATH || './.lmdb')
await migrate(rcache)
await delay(1000)
// await maybeAnnounce()
await maybeAnnounce()
await maybeBootstrap()
$q = await initWorker()
$q.worker.on('drained', populateQueue)
// setInterval( async () => {
// const active = await $q.queue.getActive();
// const jobIds = active.map(job => job.id);
Expand Down
26 changes: 13 additions & 13 deletions apps/trawler/src/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import rcache from "./relaydb.js"
import config from "./config.js"
import { lastPublishedId } from "./utils.js"

const p30066 = new Publish.Kind30066()
const p30166 = new Publish.Kind30166()

const filterRelayProperties = (relay) => {
const relay_ = {}
Expand All @@ -20,13 +20,13 @@ const filterRelaysProperties = (relays) => {
return relays.map( filterRelayProperties )
}

const relayIsExpired = (relay) => {
const lastPublished = rcache.cachetime.get.one( lastPublishedId(relay.url) );
const expiry = eval(config?.trawler?.publisher?.expiry) || 4 * 60 * 60 * 10000;
if (!lastPublished) return true;
if (lastPublished < new Date() - expiry) return true;
return false;
}
// const relayIsExpired = (relay) => {
// const lastPublished = rcache.cachetime.get.one( lastPublishedId(relay.url) );
// const expiry = eval(config?.trawler?.publisher?.expiry) || 4 * 60 * 60 * 10000;
// if (!lastPublished) return true;
// if (lastPublished < new Date() - expiry) return true;
// return false;
// }

const updatePublishTimes = async (relays=[]) => {
for await ( const relay of relays ) {
Expand All @@ -37,15 +37,15 @@ const updatePublishTimes = async (relays=[]) => {
export const publishOne = async (relay) => {
relay = filterRelayProperties(relay)
if(!relay) throw new Error('publishOne(): relay must be defined')
await p30066.one(relay)
await p30166.one(relay)
}

export const publishMany = async (relays = []) => {
relays = filterRelaysProperties(relays)
const filteredRelays = relays.filter(relayIsExpired);
if (!filteredRelays.length) return;
await p30066.many(filteredRelays);
await updatePublishTimes(filteredRelays);
// const filteredRelays = relays.filter(relayIsExpired);
if (!relays.length) return;
await p30166.many(relays);
// await updatePublishTimes(relays)
}

export const publishAll = async () => {
Expand Down
44 changes: 32 additions & 12 deletions apps/trawler/src/trawler.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { SyncQueue, TrawlQueue } from "@nostrwatch/controlflow"

const logger = new Logger('trawler')

const ignore = ["wss://nostr.searx.is/"]

const { $Queue:$SyncQueue, $QueueEvents:$SyncEvents } = SyncQueue()
const { $Queue:$TrawlQueue, $QueueEvents:$TrawlEvents } = TrawlQueue()

Expand All @@ -29,10 +31,10 @@ let promises,

const addRelaysToCache = async (relayList) => {
const ids = []
relayList.forEach(async (relayObj) => {
for (const relayObj of relayList) {
ids.push(await rcache.relay.insertIfNotExists(relayObj))
})
return ids
}
return ids.filter(id => id !== undefined)
}

const noteInCache = async (ev, relay, lastEvent) => {
Expand Down Expand Up @@ -86,38 +88,54 @@ export const trawl = async function($job){
listCount = 0
$currentJob = $job

const relays = $job.data.relays
const relays = $job.data.relays.filter( relay => !ignore.includes(relay.url) )
const pool = new SimplePool();
const fetcher = NostrFetcher.withCustomPool(simplePoolAdapter(pool));
const kinds = [ 2, 10002 ]

relays.forEach( async (relay) => {

promises.push(new Promise( async (resolve) => {

let lastEvent = 0
let since = await determineSince(relay)
$job.updateProgress({ type: 'resuming', source: relay, since })
try {

const it = await fetcher.allEventsIterator(
[ relay ],
{ kinds: [ 2, 10002 ] },
{ kinds },
{ since },
{ sort: true }
)
for await (const ev of it) {
if(!kinds.includes(ev.kind)) {
continue
}
lastEvent = setLastEvent(ev, since, lastEvent)
if( await noteInCache(ev, relay, lastEvent) ) {
console.log('noteInCache() true')
continue
}
const relayList = await relaysFromRelayList(ev)

const cacheIds = addRelaysToCache(relayList)
if(relayList === false) continue
listCount++

const cacheIds = await addRelaysToCache(relayList)

for( const id of cacheIds ) {
relaysPersisted.add(rcache.relay.get.one(id)?.url)
}

deferPersist[ev.id] = async () => await rcache.note.set.one(ev)

if(cacheIds.length === 0) {
await deferPersist[ev.id]()
delete deferPersist[ev.id]
continue
}

console.log(`found ${cacheIds.length} new relays`)

const roundtrip = {
requestedBy: process.env.DAEMON_PUBKEY,
source: relay,
Expand All @@ -126,6 +144,7 @@ export const trawl = async function($job){
}
if(config?.trawler?.sync?.out?.events)
roundtrip.syncEventsCallback = syncEventsCallback

const data = jobData(relayList, roundtrip)
await sync.relays.out(data)
}
Expand All @@ -139,8 +158,10 @@ export const trawl = async function($job){
}))
})
await Promise.allSettled(Object.values(promises))
console.log(`relays persisted: ${[...relaysPersisted]}`)
return [...relaysPersisted]
if(relaysPersisted.size > 0 ){
console.log(`${relaysPersisted.size} relays persisted: ${[...relaysPersisted]}`)
}
return [...Array.from(relaysPersisted)]
}

const watchQueue = () => {
Expand Down Expand Up @@ -172,5 +193,4 @@ const finish = async (result, roundtrip) => {
}
if(deferPersist?.[eventId])
delete deferPersist[eventId]
}

}
24 changes: 15 additions & 9 deletions internal/announce/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,31 @@ export class AnnounceMonitor {

generate(): any {
const $monReg = new Kind10166()
this.events["10166"] = $monReg.generateEvent({...this.monReg})
$monReg.generateEvent({...this.monReg})
this.events["10166"] = $monReg

const $monRelays = new Kind10002()
if(this.monRelays.length)
this.events["10002"] = $monRelays.generateEvent([...this.monRelays])
if(this.monRelays.length) {
$monRelays.generateEvent([...this.monRelays])
this.events["10002"] = $monRelays
}

const $monProfile = new Kind0()
if(Object.keys(this.monProfile).length)
this.events["0"] = $monProfile.generateEvent({...this.monProfile})
if(Object.keys(this.monProfile).length) {
$monProfile.generateEvent({...this.monProfile})
this.events["0"] = $monProfile
}
return this.events
}

sign(sk: Uint8Array): any {
if(!this.events) throw new Error("Event has not yet been generated (run generate() first)")
const signed: Event[] = []
Object.keys(this.events).forEach( kind => {
this.events[kind] = finalizeEvent(this.events[kind], sk)
Object.values(this.events).forEach( publisher => {
console.log(publisher.signEvent())
this.events[publisher.kind] = publisher.signEvent()
})
return signed
console.log(this.events[publisher.kind])
return this.events[publisher.kind]
}

async publish( relays: string[] ): Promise<string[]> {
Expand Down
2 changes: 1 addition & 1 deletion internal/nwcache/mixins/cachetime.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export default class CacheTimeMixin {
return [...this.db.$.select(IDS).from( this.schema ).where( where )] || []
},
one: (key, select=null) => {
select = this.parseSelect(select)
// select = this.parseSelect(select)
if(!key.includes(this.id()))
key = this.id(key)
return this.db.$.get( key )?.v
Expand Down
2 changes: 1 addition & 1 deletion internal/nwcache/mixins/relay.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export default class RelayMixin {

async patch(RelayFieldsObj) {
this.validate(RelayFieldsObj)
const current = await this.db.$.get(relayId(RelayFieldsObj.url))
const current = this.db.$.get(relayId(RelayFieldsObj.url))
if(!current)
throw new Error(`Cannot patch because ${RelayFieldsObj.url} does not exist`)
RelayFieldsObj.url = new URL(RelayFieldsObj.url).toString()
Expand Down
7 changes: 2 additions & 5 deletions internal/nwcache/mixins/retry.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { operators } from "lmdb-oql";
import { Retry } from "../schemas.js"

const { $eq, $gte } = operators

export default class RetryMixin {
constructor(db) {
this.db = db;
Expand All @@ -29,13 +26,13 @@ export default class RetryMixin {

async increment(key, amt=1){
key = this.inferKey(key)
const current = await this.get(key)
const current = this.get(key)
return this.set(key, current? current + amt: amt)
}

async decrement(key, amt=1){
key = this.inferKey(key)
const current = await this.get(key)
const current = this.get(key)
return this.set(key, current? current - amt: -amt)
}
}
Loading

0 comments on commit b3bd654

Please sign in to comment.