Skip to content

Commit

Permalink
refactor: add representatives_telemetry_index to archive script, im…
Browse files Browse the repository at this point in the history
…prove batching process
  • Loading branch information
mistakia committed Feb 15, 2024
1 parent 11ceb98 commit 3dbdaba
Showing 1 changed file with 227 additions and 61 deletions.
288 changes: 227 additions & 61 deletions scripts/archive-mysql.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,46 @@ const upload = async (gzFilename) => {
fs.unlinkSync(file)
}

const archive_representatives_uptime = async ({ batch_size = 10000 }) => {
const archive_representatives_uptime = async ({ batch_size = 20000 }) => {
logger('archiving representatives_uptime')
const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss')
const hours = 12 * 7 * 24 // 12 weeks
let offset = 0
let rows = []
do {
const resp = await db.raw(
`select * from representatives_uptime where timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) limit ${batch_size} offset ${offset}`

// Get a list of stale timestamps and number of rows for each
const stale_timestamps = await db.raw(
`SELECT timestamp, COUNT(*) as count FROM representatives_uptime WHERE timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY timestamp ORDER BY timestamp ASC`
)

let current_batch_size = 0
let timestamps_for_batch = []
for (const { timestamp, count } of stale_timestamps[0]) {
if (current_batch_size + count <= batch_size) {
timestamps_for_batch.push(timestamp)
current_batch_size += count
} else {
// Process the current batch
await processBatch(timestamps_for_batch)
timestamps_for_batch = [timestamp] // Start a new batch with the current timestamp
current_batch_size = count
}
}
// Process the last batch
if (timestamps_for_batch.length > 0) {
await processBatch(timestamps_for_batch)
}

async function processBatch(timestamps) {
logger(
`processing batch of ${timestamps.length} timestamps. Newest: ${dayjs
.unix(timestamps[timestamps.length - 1])
.format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs
.unix(timestamps[0])
.format('YYYY-MM-DD HH:mm:ss')}`
)
rows = resp[0]
const rows = await db('representatives_uptime')
.whereIn('timestamp', timestamps)
.select()

if (!rows.length) {
logger('no rows to archive')
return
Expand All @@ -67,8 +96,10 @@ const archive_representatives_uptime = async ({ batch_size = 10000 }) => {
.merge()
logger(`copied ${rows.length} rows to storage_mysql`)

logger(`archving ${rows.length} representatives_uptime entries`)
const filename = `representatives-uptime-archive_${timestamp}`
logger(`archiving ${rows.length} representatives_uptime entries`)
const filename = `representatives-uptime-archive_${timestamp}_${dayjs().format(
'YYYY-MM-DD_HH-mm-ss'
)}`
const csv_filename = `${filename}.csv`
const csv_writer = createCsvWriter({
path: `${dir}/${csv_filename}`,
Expand All @@ -80,32 +111,55 @@ const archive_representatives_uptime = async ({ batch_size = 10000 }) => {
})
await csv_writer.writeRecords(rows)

// const gz_filename = `${filename}.tar.gz`
// await zip({ gz_filename, csv_filename })
await upload(csv_filename)

const timestamps = rows.map((r) => r.timestamp)
const uniq_timestamps = [...new Set(timestamps)]
const count = await db('representatives_uptime')
.whereIn('timestamp', uniq_timestamps)
const count_deleted = await db('representatives_uptime')
.whereIn('timestamp', timestamps)
.del()
logger(`removed ${count} rows from representatives_uptime`)

offset += batch_size
} while (rows.length === batch_size)
logger(`removed ${count_deleted} rows from representatives_uptime`)
}
}

const archive_representatives_telemetry = async ({ batch_size = 10000 }) => {
const archive_representatives_telemetry = async ({ batch_size = 20000 }) => {
logger('archiving representatives_telemetry')
const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss')
const hours = 6 * 7 * 24 // 6 weeks
let offset = 0
let rows = []
do {
const resp = await db.raw(
`select * from representatives_telemetry where timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) limit ${batch_size} offset ${offset}`

// Get a list of stale timestamps and number of rows for each
const stale_timestamps = await db.raw(
`SELECT timestamp, COUNT(*) as count FROM representatives_telemetry WHERE timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY timestamp ORDER BY timestamp ASC`
)

let current_batch_size = 0
let timestamps_for_batch = []
for (const { timestamp, count } of stale_timestamps[0]) {
if (current_batch_size + count <= batch_size) {
timestamps_for_batch.push(timestamp)
current_batch_size += count
} else {
// Process the current batch
await processBatch(timestamps_for_batch)
timestamps_for_batch = [timestamp] // Start a new batch with the current timestamp
current_batch_size = count
}
}
// Process the last batch
if (timestamps_for_batch.length > 0) {
await processBatch(timestamps_for_batch)
}

async function processBatch(timestamps) {
logger(
`processing batch of ${timestamps.length} timestamps. Newest: ${dayjs
.unix(timestamps[timestamps.length - 1])
.format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs
.unix(timestamps[0])
.format('YYYY-MM-DD HH:mm:ss')}`
)
rows = resp[0]
const rows = await db('representatives_telemetry')
.whereIn('timestamp', timestamps)
.select()

if (!rows.length) {
logger('no rows to archive')
return
Expand All @@ -117,8 +171,10 @@ const archive_representatives_telemetry = async ({ batch_size = 10000 }) => {
.merge()
logger(`copied ${rows.length} rows to storage_mysql`)

logger(`archving ${rows.length} representatives_telemetry entries`)
const filename = `representatives-telemetry-archive_${timestamp}`
logger(`archiving ${rows.length} representatives_telemetry entries`)
const filename = `representatives-telemetry-archive_${timestamp}_${dayjs().format(
'YYYY-MM-DD_HH-mm-ss'
)}`
const csv_filename = `${filename}.csv`
const csv_writer = createCsvWriter({
path: `${dir}/${csv_filename}`,
Expand Down Expand Up @@ -148,35 +204,141 @@ const archive_representatives_telemetry = async ({ batch_size = 10000 }) => {
})
await csv_writer.writeRecords(rows)

// const gz_filename = `${filename}.tar.gz`
// await zip({ gz_filename, csv_filename })
await upload(csv_filename)

const timestamps = rows.map((r) => r.timestamp)
const uniq_timestamps = [...new Set(timestamps)]
const count = await db('representatives_telemetry')
.whereIn('timestamp', uniq_timestamps)
const count_deleted = await db('representatives_telemetry')
.whereIn('timestamp', timestamps)
.del()
logger(`removed ${count} rows from representatives_telemetry`)
logger(`removed ${count_deleted} rows from representatives_telemetry`)
}
}

const archive_representatives_telemetry_index = async ({
batch_size = 20000
}) => {
logger('archiving representatives_telemetry_index')
const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss')
const hours = 6 * 7 * 24 // 6 weeks

const stale_timestamps = await db.raw(
`SELECT timestamp, COUNT(*) as count FROM representatives_telemetry_index WHERE timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY timestamp ORDER BY timestamp ASC`
)

let current_batch_size = 0
let timestamps_for_batch = []
for (const { timestamp, count } of stale_timestamps[0]) {
if (current_batch_size + count <= batch_size) {
timestamps_for_batch.push(timestamp)
current_batch_size += count
} else {
await processBatch(timestamps_for_batch)
timestamps_for_batch = [timestamp]
current_batch_size = count
}
}
if (timestamps_for_batch.length > 0) {
await processBatch(timestamps_for_batch)
}

async function processBatch(timestamps) {
logger(
`processing batch of ${timestamps.length} timestamps. Newest: ${dayjs
.unix(timestamps[timestamps.length - 1])
.format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs
.unix(timestamps[0])
.format('YYYY-MM-DD HH:mm:ss')}`
)
const rows = await db('representatives_telemetry_index')
.whereIn('timestamp', timestamps)
.select()

if (!rows.length) {
logger('no rows to archive')
return
}

await storage_mysql('representatives_telemetry_index')
.insert(rows)
.onConflict()
.merge()
logger(`copied ${rows.length} rows to storage_mysql`)

logger(`archiving ${rows.length} representatives_telemetry_index entries`)
const filename = `representatives-telemetry-index-archive_${timestamp}_${dayjs().format(
'YYYY-MM-DD_HH-mm-ss'
)}`
const csv_filename = `${filename}.csv`
const csv_writer = createCsvWriter({
path: `${dir}/${csv_filename}`,
header: [
{ id: 'account', title: 'account' },
{ id: 'weight', title: 'weight' },
{ id: 'block_count', title: 'block_count' },
{ id: 'block_behind', title: 'block_behind' },
{ id: 'cemented_count', title: 'cemented_count' },
{ id: 'cemented_behind', title: 'cemented_behind' },
{ id: 'unchecked_count', title: 'unchecked_count' },
{ id: 'bandwidth_cap', title: 'bandwidth_cap' },
{ id: 'peer_count', title: 'peer_count' },
{ id: 'protocol_version', title: 'protocol_version' },
{ id: 'uptime', title: 'uptime' },
{ id: 'major_version', title: 'major_version' },
{ id: 'minor_version', title: 'minor_version' },
{ id: 'patch_version', title: 'patch_version' },
{ id: 'pre_release_version', title: 'pre_release_version' },
{ id: 'maker', title: 'maker' },
{ id: 'node_id', title: 'node_id' },
{ id: 'address', title: 'address' },
{ id: 'port', title: 'port' },
{ id: 'telemetry_timestamp', title: 'telemetry_timestamp' },
{ id: 'timestamp', title: 'timestamp' }
]
})
await csv_writer.writeRecords(rows)

await upload(csv_filename)

offset += batch_size
} while (rows.length === batch_size)
const count_deleted = await db('representatives_telemetry_index')
.whereIn('timestamp', timestamps)
.del()
logger(`removed ${count_deleted} rows from representatives_telemetry_index`)
}
}

const archive_posts = async ({ batch_size = 10000 }) => {
const archive_posts = async ({ batch_size = 20000 }) => {
logger('archiving posts')
const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss')
const hours = 6 * 7 * 24 // 6 weeks
let offset = 0
let posts = []
do {
posts = await db('posts')
.select('posts.*')
.leftJoin('post_labels', 'posts.id', 'post_labels.post_id')
.whereNull('post_labels.label')
.whereRaw(
`created_at < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) limit ${batch_size} offset ${offset}`
)

const stale_timestamps = await db.raw(
`SELECT created_at as timestamp, COUNT(*) as count FROM posts WHERE created_at < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY created_at ORDER BY created_at ASC`
)

let current_batch_size = 0
let timestamps_for_batch = []
for (const { timestamp, count } of stale_timestamps[0]) {
if (current_batch_size + count <= batch_size) {
timestamps_for_batch.push(timestamp)
current_batch_size += count
} else {
await processBatch(timestamps_for_batch)
timestamps_for_batch = [timestamp]
current_batch_size = count
}
}
if (timestamps_for_batch.length > 0) {
await processBatch(timestamps_for_batch)
}

async function processBatch(timestamps) {
logger(
`processing batch of ${timestamps.length} timestamps. Newest: ${dayjs
.unix(timestamps[timestamps.length - 1])
.format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs
.unix(timestamps[0])
.format('YYYY-MM-DD HH:mm:ss')}`
)
const posts = await db('posts').whereIn('created_at', timestamps).select()

if (!posts.length) {
logger('no posts to archive')
Expand All @@ -186,8 +348,10 @@ const archive_posts = async ({ batch_size = 10000 }) => {
await storage_mysql('posts').insert(posts).onConflict().merge()
logger(`copied ${posts.length} rows to storage_mysql`)

logger(`archving ${posts.length} posts`)
const filename = `posts-archive_${timestamp}`
logger(`archiving ${posts.length} posts`)
const filename = `posts-archive_${timestamp}_${dayjs().format(
'YYYY-MM-DD_HH-mm-ss'
)}`
const csv_filename = `${filename}.csv`
const csv_writer = createCsvWriter({
path: `${dir}/${csv_filename}`,
Expand All @@ -211,20 +375,16 @@ const archive_posts = async ({ batch_size = 10000 }) => {
})
await csv_writer.writeRecords(posts)

// const gz_filename = `${filename}.tar.gz`
// await zip({ gz_filename, csv_filename })
await upload(csv_filename)

const post_ids = posts.map((r) => r.id)
const uniq_post_ids = [...new Set(post_ids)]
const count = await db('posts').whereIn('id', uniq_post_ids).del()
logger(`removed ${count} rows from posts`)

offset += batch_size
} while (posts.length === batch_size)
const count_deleted = await db('posts')
.whereIn('created_at', timestamps)
.del()
logger(`removed ${count_deleted} rows from posts`)
}
}

const archive_mysql = async ({ batch_size = 10000 }) => {
const archive_mysql = async ({ batch_size = 20000 }) => {
try {
await archive_representatives_uptime({ batch_size })
} catch (err) {
Expand All @@ -237,6 +397,12 @@ const archive_mysql = async ({ batch_size = 10000 }) => {
console.log(err)
}

try {
await archive_representatives_telemetry_index({ batch_size })
} catch (err) {
console.log(err)
}

try {
await archive_posts({ batch_size })
} catch (err) {
Expand All @@ -247,7 +413,7 @@ const archive_mysql = async ({ batch_size = 10000 }) => {
if (isMain(import.meta.url)) {
const main = async () => {
try {
const batch_size = argv.batch_size || 10000
const batch_size = argv.batch_size || 20000
await archive_mysql({ batch_size })
} catch (err) {
console.log(err)
Expand Down

0 comments on commit 3dbdaba

Please sign in to comment.