Skip to content

Commit

Permalink
Merge pull request #669 from sandwichfarm/refactor/nocapd
Browse files Browse the repository at this point in the history
nocapd refactor
  • Loading branch information
dskvr authored Mar 15, 2024
2 parents 37e18c0 + 120ffa1 commit da5086f
Show file tree
Hide file tree
Showing 24 changed files with 299 additions and 579 deletions.
16 changes: 1 addition & 15 deletions apps/nocapd/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,21 +1,7 @@
# Use an official Node.js runtime as a parent image, based on Alpine
FROM node:20-alpine

RUN mkdir /app

# Set the working directory in the container
WORKDIR /app

# Copy the current directory contents into the container at /usr/src/app
COPY . .

# RUN echo "registry=http://host.docker.internal:4873" > .npmrc

# Install any needed packages specified in package.json
RUN yarn install --network-timeout 1000000

# Make port 3000 available to the world outside this container
# EXPOSE 3000

# Run the app when the container launches
RUN yarn install
CMD yarn launch
8 changes: 4 additions & 4 deletions apps/nocapd/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
"license": "MIT",
"dependencies": {
"@nostr-fetch/adapter-nostr-tools": "0.14.1",
"@nostrwatch/announce": "^0.0.5",
"@nostrwatch/announce": "^0.0.6",
"@nostrwatch/controlflow": "^0.0.2",
"@nostrwatch/logger": "^0.0.3",
"@nostrwatch/nocap": "^0.2.3",
"@nostrwatch/logger": "^0.0.4",
"@nostrwatch/nocap": "^0.2.4",
"@nostrwatch/nwcache": "^0.0.2",
"@nostrwatch/publisher": "^0.3.1",
"@nostrwatch/publisher": "^0.3.2",
"@nostrwatch/seed": "^0.0.1",
"@nostrwatch/utils": "^0.0.1",
"chalk": "5.3.0",
Expand Down
106 changes: 30 additions & 76 deletions apps/nocapd/src/classes/NocapdQueues.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,25 @@
export class NocapdQueues {
constructor(config){
/** @type {object} */
// this.checks = {}
/** @type {BullQueue} */
this.queue = null
/** @type {BullQueueEvents} */
this.events = null
/** @type {BullWorker} */
this.worker = null
// /** @type {WorkerManager} */
this.checks = null
/** @type {Scheduler} */
this.scheduler = null
/** @type {object} */
this.cb = {}
/** @type {object} */
this.pubkey = config?.pubkey? config.pubkey: null

/** @type {array} */
this.worker_events = ['completed', 'failed', 'progress', 'stalled', 'waiting', 'active', 'delayed', 'drained', 'paused', 'resumed']
import Logger from '@nostrwatch/logger'

export class NocapdQueues {
constructor(opts){
this.setup(opts)
if(!this.pubkey)
throw new Error(`NocapdQueues requires a pubkey`)
}

route(job){
const { name } = job
const daemonManager = name.split('@')[0]
const daemonPubkey = name.split('@')[1]

// if(daemonPubkey !== this.pubkey)
// console.warn(`[route] ${daemonPubkey} !== ${this.pubkey}`)

if(!this.checks[daemonManager])
throw new Error(`No manager found for ${daemonManager}`)

return this.checks[daemonManager].work(job)
setup(opts){
this.pubkey = opts?.pubkey? opts.pubkey: null
this.log = this.opts?.logger? this.opts.logger: new Logger('nocap/$NocapdQueues')
this.cb = {}
this.queue = null
this.events = null
this.worker = null
this.checker = null
this.worker_events = ['completed', 'failed', 'progress', 'stalled', 'waiting', 'active', 'delayed', 'drained', 'paused', 'resumed']
}

route_event(event, ...args){
// this.log.info(`route_event(): ${event}, ${args || "no event args"}`)
const job = args[0]
let name = null

Expand All @@ -49,37 +30,26 @@ export class NocapdQueues {
name = job.split(':')[0]

if(name) {
const daemonManager = name.split('@')[0]
const daemonPubkey = name.split('@')[1]

// if(daemonPubkey !== this.pubkey)
// return this.log.warn(`[route_event] ${daemonPubkey} !== ${this.pubkey}`)

if(!this.checks[daemonManager])
return
// this.log.warn(`No manager found for ${daemonManager} to handle ${event} event for pubkey: ${daemonPubkey}`)

return this.checks[daemonManager].cbcall(event, ...args)
return this.checker.cbcall(event, ...args)
}
//these events apply to the worker not the manager an d don't have any parameters,
//so cannot be routed like completions and failures.
else if( event === 'drained' ) {
for( const manager in this.checks ) {
this.checks[manager].cbcall(event)
}
this.checker.cbcall(event)
}
else {
this.cbcall(event, ...args)
}
}

setWorker($worker){
this.worker = $worker
this.bind_events()
set(key, fn){
// this.log.info(`set('${key}'): with ${typeof fn}`)
this[key] = fn
if(key === 'worker') this.bind_events()
return this
}

bind_events(){
this.worker_events.forEach(handler => {
//this.log.debug(`bind_events(): binding ${handler} to ${this.worker.name}`)
this.worker.on(handler, (...args) => this.route_event(handler, ...args))
})
}
Expand All @@ -96,35 +66,19 @@ export class NocapdQueues {
this.cb[handler](...args)
}

async populateAll(){
const mkeys = Object.keys(this.checks)
for await ( const mkey of mkeys ){
// this.log.debug(`populateAll() -> ${mkey}:populator()`)
await this.checks[mkey].populator()
}
}

pause(q){
if(q)
return this.queue?.[q].pause()
Object.keys(this.queue).forEach(q => this.queue[q].pause())
async pause(){
return await this.queue.pause()
}

start(q){
if(q)
return this.queue?.[q].start()
Object.keys(this.queue).forEach(q => this.queue[q].start())
async resume(){
return await this.queue.resume()
}

drain(q){
if(q)
return this.queue?.[q].drain()
Object.keys(this.queue).forEach(q => this.queue[q].drain())
async drain(){
return await this.queue.drain()
}

obliterate(q){
if(q)
return this.queue?.[q].obliterate()
Object.keys(this.queue).forEach(q => this.queue[q].obliterate())
async obliterate(){
return await this.queue.obliterate()
}
}
Loading

0 comments on commit da5086f

Please sign in to comment.