Skip to content

Commit

Permalink
fix: update archive-mysql to not remove rows from `representatives_…
Browse files Browse the repository at this point in the history
…telemetry_index` in production server
  • Loading branch information
mistakia committed Mar 23, 2024
1 parent df5e022 commit 61308ab
Showing 1 changed file with 94 additions and 94 deletions.
188 changes: 94 additions & 94 deletions scripts/archive-mysql.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -213,97 +213,97 @@ const archive_representatives_telemetry = async ({ batch_size = 20000 }) => {
}
}

const archive_representatives_telemetry_index = async ({
batch_size = 20000
}) => {
logger('archiving representatives_telemetry_index')
const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss')
const hours = 6 * 7 * 24 // 6 weeks

const stale_timestamps = await db.raw(
`SELECT timestamp, COUNT(*) as count FROM representatives_telemetry_index WHERE timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY timestamp ORDER BY timestamp ASC`
)

let current_batch_size = 0
let timestamps_for_batch = []
for (const { timestamp, count } of stale_timestamps[0]) {
if (current_batch_size + count <= batch_size) {
timestamps_for_batch.push(timestamp)
current_batch_size += count
} else {
await processBatch(timestamps_for_batch)
timestamps_for_batch = [timestamp]
current_batch_size = count
}
}
if (timestamps_for_batch.length > 0) {
await processBatch(timestamps_for_batch)
}

async function processBatch(timestamps) {
logger(
`processing batch of ${timestamps.length} timestamps. Newest: ${dayjs
.unix(timestamps[timestamps.length - 1])
.format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs
.unix(timestamps[0])
.format('YYYY-MM-DD HH:mm:ss')}`
)
const rows = await db('representatives_telemetry_index')
.whereIn('timestamp', timestamps)
.select()

if (!rows.length) {
logger('no rows to archive')
return
}

await storage_mysql('representatives_telemetry_index')
.insert(rows)
.onConflict()
.merge()
logger(`copied ${rows.length} rows to storage_mysql`)

logger(`archiving ${rows.length} representatives_telemetry_index entries`)
const filename = `representatives-telemetry-index-archive_${timestamp}_${dayjs().format(
'YYYY-MM-DD_HH-mm-ss'
)}`
const csv_filename = `${filename}.csv`
const csv_writer = createCsvWriter({
path: `${dir}/${csv_filename}`,
header: [
{ id: 'account', title: 'account' },
{ id: 'weight', title: 'weight' },
{ id: 'block_count', title: 'block_count' },
{ id: 'block_behind', title: 'block_behind' },
{ id: 'cemented_count', title: 'cemented_count' },
{ id: 'cemented_behind', title: 'cemented_behind' },
{ id: 'unchecked_count', title: 'unchecked_count' },
{ id: 'bandwidth_cap', title: 'bandwidth_cap' },
{ id: 'peer_count', title: 'peer_count' },
{ id: 'protocol_version', title: 'protocol_version' },
{ id: 'uptime', title: 'uptime' },
{ id: 'major_version', title: 'major_version' },
{ id: 'minor_version', title: 'minor_version' },
{ id: 'patch_version', title: 'patch_version' },
{ id: 'pre_release_version', title: 'pre_release_version' },
{ id: 'maker', title: 'maker' },
{ id: 'node_id', title: 'node_id' },
{ id: 'address', title: 'address' },
{ id: 'port', title: 'port' },
{ id: 'telemetry_timestamp', title: 'telemetry_timestamp' },
{ id: 'timestamp', title: 'timestamp' }
]
})
await csv_writer.writeRecords(rows)
// const archive_representatives_telemetry_index = async ({
// batch_size = 20000
// }) => {
// logger('archiving representatives_telemetry_index')
// const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss')
// const hours = 6 * 7 * 24 // 6 weeks

// const stale_timestamps = await db.raw(
// `SELECT timestamp, COUNT(*) as count FROM representatives_telemetry_index WHERE timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY timestamp ORDER BY timestamp ASC`
// )

await upload(csv_filename)
// let current_batch_size = 0
// let timestamps_for_batch = []
// for (const { timestamp, count } of stale_timestamps[0]) {
// if (current_batch_size + count <= batch_size) {
// timestamps_for_batch.push(timestamp)
// current_batch_size += count
// } else {
// await processBatch(timestamps_for_batch)
// timestamps_for_batch = [timestamp]
// current_batch_size = count
// }
// }
// if (timestamps_for_batch.length > 0) {
// await processBatch(timestamps_for_batch)
// }

const count_deleted = await db('representatives_telemetry_index')
.whereIn('timestamp', timestamps)
.del()
logger(`removed ${count_deleted} rows from representatives_telemetry_index`)
}
}
// async function processBatch(timestamps) {
// logger(
// `processing batch of ${timestamps.length} timestamps. Newest: ${dayjs
// .unix(timestamps[timestamps.length - 1])
// .format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs
// .unix(timestamps[0])
// .format('YYYY-MM-DD HH:mm:ss')}`
// )
// const rows = await db('representatives_telemetry_index')
// .whereIn('timestamp', timestamps)
// .select()

// if (!rows.length) {
// logger('no rows to archive')
// return
// }

// await storage_mysql('representatives_telemetry_index')
// .insert(rows)
// .onConflict()
// .merge()
// logger(`copied ${rows.length} rows to storage_mysql`)

// logger(`archiving ${rows.length} representatives_telemetry_index entries`)
// const filename = `representatives-telemetry-index-archive_${timestamp}_${dayjs().format(
// 'YYYY-MM-DD_HH-mm-ss'
// )}`
// const csv_filename = `${filename}.csv`
// const csv_writer = createCsvWriter({
// path: `${dir}/${csv_filename}`,
// header: [
// { id: 'account', title: 'account' },
// { id: 'weight', title: 'weight' },
// { id: 'block_count', title: 'block_count' },
// { id: 'block_behind', title: 'block_behind' },
// { id: 'cemented_count', title: 'cemented_count' },
// { id: 'cemented_behind', title: 'cemented_behind' },
// { id: 'unchecked_count', title: 'unchecked_count' },
// { id: 'bandwidth_cap', title: 'bandwidth_cap' },
// { id: 'peer_count', title: 'peer_count' },
// { id: 'protocol_version', title: 'protocol_version' },
// { id: 'uptime', title: 'uptime' },
// { id: 'major_version', title: 'major_version' },
// { id: 'minor_version', title: 'minor_version' },
// { id: 'patch_version', title: 'patch_version' },
// { id: 'pre_release_version', title: 'pre_release_version' },
// { id: 'maker', title: 'maker' },
// { id: 'node_id', title: 'node_id' },
// { id: 'address', title: 'address' },
// { id: 'port', title: 'port' },
// { id: 'telemetry_timestamp', title: 'telemetry_timestamp' },
// { id: 'timestamp', title: 'timestamp' }
// ]
// })
// await csv_writer.writeRecords(rows)

// await upload(csv_filename)

// const count_deleted = await db('representatives_telemetry_index')
// .whereIn('timestamp', timestamps)
// .del()
// logger(`removed ${count_deleted} rows from representatives_telemetry_index`)
// }
// }

const archive_posts = async ({ batch_size = 20000 }) => {
logger('archiving posts')
Expand Down Expand Up @@ -397,11 +397,11 @@ const archive_mysql = async ({ batch_size = 20000 }) => {
console.log(err)
}

try {
await archive_representatives_telemetry_index({ batch_size })
} catch (err) {
console.log(err)
}
// try {
// await archive_representatives_telemetry_index({ batch_size })
// } catch (err) {
// console.log(err)
// }

try {
await archive_posts({ batch_size })
Expand Down

0 comments on commit 61308ab

Please sign in to comment.