forked from ibp-network/ibp-monitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkers.js
174 lines (157 loc) · 7.84 KB
/
workers.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import
// dotenv.config()
import { config } from './config/config.js'
import { config as configLocal } from './config/config.local.js'
const cfg = Object.assign(config, configLocal)
import express from 'express'
import { Queue, Worker } from 'bullmq'
import * as pkg1 from '@bull-board/express'
const { ExpressAdapter } = pkg1
import * as pkg2 from '@bull-board/api'
const { createBullBoard } = pkg2
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter.js'
// import axios from 'axios'
import { asyncForeach } from './lib/utils.js'
import { checkService } from './workers/f-check-service.js'
// import { f_1kv_nominations_update } from './workers/1kv-nominations-update.js'
// import { f_1kv_nominators_update } from './workers/1kv-nominators-update.js'
// import { f_w3f_exposures_update } from './workers/w3f-exposures-update.js'
// import { f_w3f_nominators_update } from './workers/w3f-nominators-update.js'
// import { f_w3f_pools_update } from './workers/w3f-pools-update.js'
// import { f_w3f_validator_location_stats_update } from './workers/w3f-validator-location-stats-update.js'
// import { f_w3f_validators_update } from './workers/w3f-validators-update.js'
// import { f_w3f_nominations_update } from './workers/w3f-nominations-update.js'
// import { f_dock_auto_payout } from './functions/f_dock_auto_payout.js'
console.debug('cfg.redis', cfg.redis)
const qOpts = {
// connection to Redis
// connection: {
// host: "localhost",
// port: 6379
// }
connection: cfg.redis,
}
const jobs = [
// 'health_check',
'checkService',
// '1kv_nominations_update',
// '1kv_nominators_update',
// 'w3f_exposures_update',
// 'w3f_nominators_update',
// 'w3f_pools_update',
// 'w3f_validator_location_stats_update',
// 'w3f_validators_update',
// 'w3f_nominations_update',
// 'dock_auto_payout'
]
async function onError(job, err) {
const errStr = `ERROR: ${job}: ` + typeof err === 'string' ? err : JSON.stringify(err)
// await axios.get('http://192.168.1.2:1880/sendToTelegram?text='+ errStr)
console.log(errStr)
}
async function onFailed(job, event) {
const errStr = `FAILED: ${job}: ` + typeof event === 'string' ? event : JSON.stringify(event)
// await axios.get('http://192.168.1.2:1880/sendToTelegram?text='+ errStr)
console.log(errStr)
}
const q_checkService = new Queue('checkService', qOpts)
// const q_health_check = new Queue('health_check', qOpts)
// const q_1kv_nominators_update = new Queue('1kv_nominators_update', qOpts)
// const q_w3f_exposures_update = new Queue('w3f_exposures_update', qOpts)
// const q_w3f_nominators_update = new Queue('w3f_nominators_update', qOpts)
// const q_w3f_pools_update = new Queue('w3f_pools_update', qOpts)
// const q_w3f_validator_location_stats_update = new Queue('w3f_validator_location_stats_update', qOpts)
// const q_w3f_validators_update = new Queue('w3f_validators_update', qOpts)
// const q_w3f_nominations_update = new Queue('w3f_nominations_update', qOpts)
// const q_dock_auto_payout = new Queue('dock_auto_payout', qOpts)
const w_checkService = new Worker('checkService', checkService, qOpts)
// const w_health_check = new Worker('health_check', f_health_check, qOpts)
// const w_1kv_nominators_update = new Worker('1kv_nominators_update', f_1kv_nominators_update, qOpts)
// const w_w3f_exposures_update = new Worker('w3f_exposures_update', f_w3f_exposures_update, qOpts)
// const w_w3f_nominators_update = new Worker('w3f_nominators_update', f_w3f_nominators_update, qOpts)
// const w_w3f_pools_update = new Worker('w3f_pools_update', f_w3f_pools_update, qOpts)
// const w_w3f_validator_location_stats_update = new Worker('w3f_validator_location_stats_update', f_w3f_validator_location_stats_update, qOpts)
// const w_w3f_validators_update = new Worker('w3f_validators_update', f_w3f_validators_update, qOpts)
// const w_w3f_nominations_update = new Worker('w3f_nominations_update', f_w3f_nominations_update, qOpts)
// const w_dock_auto_payout = new Worker('dock_auto_payout', f_dock_auto_payout, qOpts)
// handle all error/failed
jobs.forEach((job) => {
const worker = eval(`w_${job}`)
worker.on('error', (err) => onError(job, err))
worker.on('failed', (event) => onFailed(job, event))
})
// const jobRetention = {
// removeOnComplete: {
// age: 24 * 60 *60, // keep up to 24 hour (in millis)
// count: 1000, // keep up to 1000 jobs
// },
// removeOnFail: {
// age: 48 * 60 * 60, // keep up to 48 hours (in millis)
// }
// }
async function clearQueue(jobname) {
let qname = eval(`q_${jobname}`)
await qname.pause()
// // Removes all jobs that are waiting or delayed, but not active, completed or failed
// await qname.drain()
// Completely obliterates a queue and all of its contents.
await qname.resume()
}
;(async () => {
// on startup, drain the queues and start again
async function clearQueues() {
await asyncForeach(jobs, clearQueue)
}
// jobs will be added by server.js
// async function addJobs() {
// // asyncForEach(chains, async (CHAIN, idx, arr) => {
// // const jOpts = { CHAIN }
// // await q_health_check.add(`1kv_candidates_${CHAIN}`, jOpts,
// // { repeat: { pattern: '00,30 * * * *' }, ...jobRetention })
// // // await q_1kv_nominations_update.add(`1kv_nominations_${CHAIN}`, jOpts,
// // // { repeat: { pattern: '01,31 * * * *' }, ...jobRetention })
// // // await q_1kv_nominators_update.add(`1kv_nominators_${CHAIN}`, jOpts,
// // // { repeat: { pattern: '02,32 * * * *' }, ...jobRetention })
// // // await q_w3f_exposures_update.add(`w3f_exposures_${CHAIN}`, jOpts,
// // // { repeat: { pattern: '03,33 * * * *' }, ...jobRetention })
// // // await q_w3f_nominators_update.add(`w3f_nominators_${CHAIN}`, jOpts,
// // // // once per hour
// // // { repeat: { pattern: '04 * * * *' }, ...jobRetention })
// // // await q_w3f_pools_update.add(`w3f_pools_${CHAIN}`, jOpts,
// // // { repeat: { pattern: '05,35 * * * *' }, ...jobRetention })
// // // await q_w3f_validator_location_stats_update.add(`w3f_validator_location_stats_${CHAIN}`, jOpts,
// // // { repeat: { pattern: '06,36 * * * *' }, ...jobRetention })
// // // await q_w3f_validators_update.add(`w3f_validators_${CHAIN}`, jOpts,
// // // { repeat: { pattern: '07,37 * * * *' }, ...jobRetention })
// // // await q_w3f_nominations_update.add(`w3f_nominations_${CHAIN}`, jOpts,
// // // { repeat: { pattern: '07,37 * * * *' }, ...jobRetention })
// // })
// }
await clearQueues()
// await addJobs()
const serverAdapter = new ExpressAdapter()
serverAdapter.setBasePath('/admin/queues')
// const queueMQ = new QueueMQ()
const { setQueues, replaceQueues } = createBullBoard({
queues: [
// new BullMQAdapter(q_health_check, { readOnlyMode: false }),
new BullMQAdapter(q_checkService, { readOnlyMode: false }),
// new BullMQAdapter(q_1kv_nominators_update, { readOnlyMode: false }),
// new BullMQAdapter(q_w3f_exposures_update, { readOnlyMode: false }),
// new BullMQAdapter(q_w3f_nominators_update, { readOnlyMode: false }),
// new BullMQAdapter(q_w3f_pools_update, { readOnlyMode: false }),
// new BullMQAdapter(q_w3f_validator_location_stats_update, { readOnlyMode: false }),
// new BullMQAdapter(q_w3f_validators_update, { readOnlyMode: false }),
// new BullMQAdapter(q_w3f_nominations_update, { readOnlyMode: false }),
// new BullMQAdapter(q_dock_auto_payout, { readOnlyMode: false }),
],
serverAdapter: serverAdapter,
})
const app = express()
app.use('/admin/queues', serverAdapter.getRouter())
app.listen(3000, () => {
console.log('Running on 3000...')
console.log('For the UI, open http://localhost:3000/admin/queues')
// console.log('Make sure Redis is running on port 6379 by default');
})
})()