Skip to content

Commit

Permalink
Merge pull request #975 from permaweb/jfrain99/optimize-delete-old-crons
Browse files Browse the repository at this point in the history
chore(mu): optimize delete old traces
  • Loading branch information
jfrain99 authored Aug 25, 2024
2 parents 8047a6d + d6b3b44 commit 5b184d0
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 21 deletions.
6 changes: 4 additions & 2 deletions servers/mu/src/domain/clients/cron.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export function updateCronProcessCursorWith ({ db }) {
}
}

export function deleteOldTracesWith ({ db }) {
export function deleteOldTracesWith ({ db, logger }) {
return async () => {
function createQuery ({ overflow }) {
/**
Expand Down Expand Up @@ -123,8 +123,10 @@ export function deleteOldTracesWith ({ db }) {
const maxBytes = 1024 * 1024 * 1024
const overflow = maxBytes / totalBytes
if (overflow >= 1) return
logger(({ log: `Deleting old traces, overflow of ${1 - overflow}` }))
await db.run(createQuery({ overflow: 1 - overflow }))
db.run({ sql: 'VACUUM;', parameters: [] })
await db.run({ sql: 'VACUUM;', parameters: [] })
logger({ log: 'Deleted old traces.' })
}
}

Expand Down
7 changes: 7 additions & 0 deletions servers/mu/src/domain/clients/sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ const createTracesIndexes = async (db) => db.prepare(
(messageId, processId);`
).run()

const createTracesTimestampIndexes = async (db) => db.prepare(
`CREATE INDEX IF NOT EXISTS idx_${TRACES_TABLE}_timestamp
ON ${TRACES_TABLE}
(timestamp ASC);`
).run()

let internalSqliteDb
export async function createSqliteClient ({ url, bootstrap = false, walLimit = bytes.parse('100mb'), type = 'tasks' }) {
if (internalSqliteDb) return internalSqliteDb
Expand All @@ -71,6 +77,7 @@ export async function createSqliteClient ({ url, bootstrap = false, walLimit = b
await Promise.resolve()
.then(() => createTraces(db))
.then(() => createTracesIndexes(db))
.then(() => createTracesTimestampIndexes(db))
}
}

Expand Down
22 changes: 21 additions & 1 deletion servers/mu/src/domain/clients/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { cond, equals, propOr, tap } from 'ramda'
import cron from 'node-cron'

import { createTaskQueue, enqueueWith, dequeueWith, removeDequeuedTasksWith } from './taskQueue.js'
import { deleteOldTracesWith } from './cron.js'
import { domainConfigSchema, config } from '../../config.js'
// Without this import the worker crashes
import { createResultApis } from '../../domain/index.js'
Expand Down Expand Up @@ -293,6 +294,24 @@ ct = cron.schedule('*/2 * * * * *', async () => {
}
})

let traceCt = null
let traceIsJobRunning = false
const traceDb = await createSqliteClient({ url: workerData.TRACE_DB_URL, bootstrap: false, type: 'traces' })
const deleteOldTraces = deleteOldTracesWith({ db: traceDb, logger: broadcastLogger })
/**
* Create cron to clear out traces, each hour
*/
function startDeleteTraceCron () {
traceCt = cron.schedule('0 * * * *', async () => {
if (!traceIsJobRunning) {
traceIsJobRunning = true
traceCt.stop()
await deleteOldTraces()
traceCt.start()
traceIsJobRunning = false
}
})
}
/**
* Start the processing of results from
* the queue and expose the worker api
Expand All @@ -301,5 +320,6 @@ ct = cron.schedule('*/2 * * * * *', async () => {
processResults()

worker({
enqueueResults
enqueueResults,
startDeleteTraceCron
})
21 changes: 9 additions & 12 deletions servers/mu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import gatewayClient from './clients/gateway.js'
import * as InMemoryClient from './clients/in-memory.js'
import * as MetricsClient from './clients/metrics.js'
import * as SqliteClient from './clients/sqlite.js'
import cronClient, { deleteCronProcessWith, deleteOldTracesWith, getCronProcessCursorWith, saveCronProcessWith, updateCronProcessCursorWith } from './clients/cron.js'
import cronClient, { deleteCronProcessWith, getCronProcessCursorWith, saveCronProcessWith, updateCronProcessCursorWith } from './clients/cron.js'
import { readTracesWith } from './clients/tracer.js'

import { processMsgWith } from './api/processMsg.js'
Expand Down Expand Up @@ -151,6 +151,7 @@ export const createApis = async (ctx) => {
id: workerId,
queueId,
DB_URL,
TRACE_DB_URL,
TASK_QUEUE_MAX_RETRIES: ctx.TASK_QUEUE_MAX_RETRIES,
TASK_QUEUE_RETRY_DELAY: ctx.TASK_QUEUE_RETRY_DELAY
}
Expand Down Expand Up @@ -201,21 +202,18 @@ export const createApis = async (ctx) => {
logger: sendAssignLogger
})

// /**
// * Create cron to clear out traces, each hour
// */
workerPool.exec('startDeleteTraceCron')

const monitorProcessLogger = logger.child('monitorProcess')
const fetchCron = fromPromise(cuClient.fetchCronWith({ fetch, histogram, CU_URL, logger: monitorProcessLogger }))

const saveCronProcess = saveCronProcessWith({ db })
const deleteCronProcess = deleteCronProcessWith({ db })
const updateCronProcessCursor = updateCronProcessCursorWith({ db })
const getCronProcessCursor = getCronProcessCursorWith({ db })
const deleteOldTraces = deleteOldTracesWith({ db: traceDb })

/**
* Create cron to clear out traces, each hour
*/
cron.schedule('* * * * *', async () => {
await deleteOldTraces()
})

async function getCronProcesses () {
function createQuery () {
Expand All @@ -238,6 +236,7 @@ export const createApis = async (ctx) => {

const startProcessMonitor = cronClient.startMonitoredProcessWith({
fetch,
cron,
histogram,
logger: monitorProcessLogger,
PROC_FILE_PATH,
Expand Down Expand Up @@ -281,9 +280,7 @@ export const createApis = async (ctx) => {
traceMsgs,
initCronProcs: cronClient.initCronProcsWith({
startMonitoredProcess: startProcessMonitor,
getCronProcesses,
deleteOldTraces,
cron
getCronProcesses
})
}
}
Expand Down
11 changes: 5 additions & 6 deletions servers/mu/src/domain/lib/handle-worker-metrics-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,14 @@ export function handleWorkerMetricsMessage ({ retriesGauge, stageGauge, maximumQ
return ({ payload }) => {
if (payload.purpose === 'queue-size') {
queueMessageHandler(payload, maximumQueueArray, maximumQueueTimeArray)
}
if (payload.purpose === 'task-retries') {
} else if (payload.purpose === 'task-retries') {
retriesMessageHandler(payload, retriesGauge)
}
if (payload.purpose === 'error-stage') {
} else if (payload.purpose === 'error-stage') {
errorStageMessageHandler(payload, stageGauge)
}
if (payload.purpose === 'log') {
} else if (payload.purpose === 'log') {
logMessageHandler(payload, logger)
} else if (!payload.purpose) {
logger(payload)
}
}
}

0 comments on commit 5b184d0

Please sign in to comment.