From 61308ab48f1df0d12cdcec5cf4803ebf9efabfda Mon Sep 17 00:00:00 2001 From: mistakia <1823355+mistakia@users.noreply.github.com> Date: Fri, 22 Mar 2024 21:47:42 -0400 Subject: [PATCH] fix: update `archive-mysql` to not remove rows from `representatives_telemetry_index` in production server --- scripts/archive-mysql.mjs | 188 +++++++++++++++++++------------------- 1 file changed, 94 insertions(+), 94 deletions(-) diff --git a/scripts/archive-mysql.mjs b/scripts/archive-mysql.mjs index b68fa52a..5e3eae7c 100644 --- a/scripts/archive-mysql.mjs +++ b/scripts/archive-mysql.mjs @@ -213,97 +213,97 @@ const archive_representatives_telemetry = async ({ batch_size = 20000 }) => { } } -const archive_representatives_telemetry_index = async ({ - batch_size = 20000 -}) => { - logger('archiving representatives_telemetry_index') - const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss') - const hours = 6 * 7 * 24 // 6 weeks - - const stale_timestamps = await db.raw( - `SELECT timestamp, COUNT(*) as count FROM representatives_telemetry_index WHERE timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY timestamp ORDER BY timestamp ASC` - ) - - let current_batch_size = 0 - let timestamps_for_batch = [] - for (const { timestamp, count } of stale_timestamps[0]) { - if (current_batch_size + count <= batch_size) { - timestamps_for_batch.push(timestamp) - current_batch_size += count - } else { - await processBatch(timestamps_for_batch) - timestamps_for_batch = [timestamp] - current_batch_size = count - } - } - if (timestamps_for_batch.length > 0) { - await processBatch(timestamps_for_batch) - } - - async function processBatch(timestamps) { - logger( - `processing batch of ${timestamps.length} timestamps. Newest: ${dayjs - .unix(timestamps[timestamps.length - 1]) - .format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs - .unix(timestamps[0]) - .format('YYYY-MM-DD HH:mm:ss')}` - ) - const rows = await db('representatives_telemetry_index') - .whereIn('timestamp', timestamps) - .select() - - if (!rows.length) { - logger('no rows to archive') - return - } - - await storage_mysql('representatives_telemetry_index') - .insert(rows) - .onConflict() - .merge() - logger(`copied ${rows.length} rows to storage_mysql`) - - logger(`archiving ${rows.length} representatives_telemetry_index entries`) - const filename = `representatives-telemetry-index-archive_${timestamp}_${dayjs().format( - 'YYYY-MM-DD_HH-mm-ss' - )}` - const csv_filename = `${filename}.csv` - const csv_writer = createCsvWriter({ - path: `${dir}/${csv_filename}`, - header: [ - { id: 'account', title: 'account' }, - { id: 'weight', title: 'weight' }, - { id: 'block_count', title: 'block_count' }, - { id: 'block_behind', title: 'block_behind' }, - { id: 'cemented_count', title: 'cemented_count' }, - { id: 'cemented_behind', title: 'cemented_behind' }, - { id: 'unchecked_count', title: 'unchecked_count' }, - { id: 'bandwidth_cap', title: 'bandwidth_cap' }, - { id: 'peer_count', title: 'peer_count' }, - { id: 'protocol_version', title: 'protocol_version' }, - { id: 'uptime', title: 'uptime' }, - { id: 'major_version', title: 'major_version' }, - { id: 'minor_version', title: 'minor_version' }, - { id: 'patch_version', title: 'patch_version' }, - { id: 'pre_release_version', title: 'pre_release_version' }, - { id: 'maker', title: 'maker' }, - { id: 'node_id', title: 'node_id' }, - { id: 'address', title: 'address' }, - { id: 'port', title: 'port' }, - { id: 'telemetry_timestamp', title: 'telemetry_timestamp' }, - { id: 'timestamp', title: 'timestamp' } - ] - }) - await csv_writer.writeRecords(rows) +// const archive_representatives_telemetry_index = async ({ +// batch_size = 20000 +// }) => { +// logger('archiving representatives_telemetry_index') +// const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss') +// const hours = 6 * 7 * 24 // 6 weeks + +// const stale_timestamps = await db.raw( +// `SELECT timestamp, COUNT(*) as count FROM representatives_telemetry_index WHERE timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY timestamp ORDER BY timestamp ASC` +// ) - await upload(csv_filename) +// let current_batch_size = 0 +// let timestamps_for_batch = [] +// for (const { timestamp, count } of stale_timestamps[0]) { +// if (current_batch_size + count <= batch_size) { +// timestamps_for_batch.push(timestamp) +// current_batch_size += count +// } else { +// await processBatch(timestamps_for_batch) +// timestamps_for_batch = [timestamp] +// current_batch_size = count +// } +// } +// if (timestamps_for_batch.length > 0) { +// await processBatch(timestamps_for_batch) +// } - const count_deleted = await db('representatives_telemetry_index') - .whereIn('timestamp', timestamps) - .del() - logger(`removed ${count_deleted} rows from representatives_telemetry_index`) - } -} +// async function processBatch(timestamps) { +// logger( +// `processing batch of ${timestamps.length} timestamps. Newest: ${dayjs +// .unix(timestamps[timestamps.length - 1]) +// .format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs +// .unix(timestamps[0]) +// .format('YYYY-MM-DD HH:mm:ss')}` +// ) +// const rows = await db('representatives_telemetry_index') +// .whereIn('timestamp', timestamps) +// .select() + +// if (!rows.length) { +// logger('no rows to archive') +// return +// } + +// await storage_mysql('representatives_telemetry_index') +// .insert(rows) +// .onConflict() +// .merge() +// logger(`copied ${rows.length} rows to storage_mysql`) + +// logger(`archiving ${rows.length} representatives_telemetry_index entries`) +// const filename = `representatives-telemetry-index-archive_${timestamp}_${dayjs().format( +// 'YYYY-MM-DD_HH-mm-ss' +// )}` +// const csv_filename = `${filename}.csv` +// const csv_writer = createCsvWriter({ +// path: `${dir}/${csv_filename}`, +// header: [ +// { id: 'account', title: 'account' }, +// { id: 'weight', title: 'weight' }, +// { id: 'block_count', title: 'block_count' }, +// { id: 'block_behind', title: 'block_behind' }, +// { id: 'cemented_count', title: 'cemented_count' }, +// { id: 'cemented_behind', title: 'cemented_behind' }, +// { id: 'unchecked_count', title: 'unchecked_count' }, +// { id: 'bandwidth_cap', title: 'bandwidth_cap' }, +// { id: 'peer_count', title: 'peer_count' }, +// { id: 'protocol_version', title: 'protocol_version' }, +// { id: 'uptime', title: 'uptime' }, +// { id: 'major_version', title: 'major_version' }, +// { id: 'minor_version', title: 'minor_version' }, +// { id: 'patch_version', title: 'patch_version' }, +// { id: 'pre_release_version', title: 'pre_release_version' }, +// { id: 'maker', title: 'maker' }, +// { id: 'node_id', title: 'node_id' }, +// { id: 'address', title: 'address' }, +// { id: 'port', title: 'port' }, +// { id: 'telemetry_timestamp', title: 'telemetry_timestamp' }, +// { id: 'timestamp', title: 'timestamp' } +// ] +// }) +// await csv_writer.writeRecords(rows) + +// await upload(csv_filename) + +// const count_deleted = await db('representatives_telemetry_index') +// .whereIn('timestamp', timestamps) +// .del() +// logger(`removed ${count_deleted} rows from representatives_telemetry_index`) +// } +// } const archive_posts = async ({ batch_size = 20000 }) => { logger('archiving posts') @@ -397,11 +397,11 @@ const archive_mysql = async ({ batch_size = 20000 }) => { console.log(err) } - try { - await archive_representatives_telemetry_index({ batch_size }) - } catch (err) { - console.log(err) - } + // try { + // await archive_representatives_telemetry_index({ batch_size }) + // } catch (err) { + // console.log(err) + // } try { await archive_posts({ batch_size })