diff --git a/scripts/archive-mysql.mjs b/scripts/archive-mysql.mjs index 51504f29..b68fa52a 100644 --- a/scripts/archive-mysql.mjs +++ b/scripts/archive-mysql.mjs @@ -45,17 +45,46 @@ const upload = async (gzFilename) => { fs.unlinkSync(file) } -const archive_representatives_uptime = async ({ batch_size = 10000 }) => { +const archive_representatives_uptime = async ({ batch_size = 20000 }) => { logger('archiving representatives_uptime') const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss') const hours = 12 * 7 * 24 // 12 weeks - let offset = 0 - let rows = [] - do { - const resp = await db.raw( - `select * from representatives_uptime where timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) limit ${batch_size} offset ${offset}` + + // Get a list of stale timestamps and number of rows for each + const stale_timestamps = await db.raw( + `SELECT timestamp, COUNT(*) as count FROM representatives_uptime WHERE timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY timestamp ORDER BY timestamp ASC` + ) + + let current_batch_size = 0 + let timestamps_for_batch = [] + for (const { timestamp, count } of stale_timestamps[0]) { + if (current_batch_size + count <= batch_size) { + timestamps_for_batch.push(timestamp) + current_batch_size += count + } else { + // Process the current batch + await processBatch(timestamps_for_batch) + timestamps_for_batch = [timestamp] // Start a new batch with the current timestamp + current_batch_size = count + } + } + // Process the last batch + if (timestamps_for_batch.length > 0) { + await processBatch(timestamps_for_batch) + } + + async function processBatch(timestamps) { + logger( + `processing batch of ${timestamps.length} timestamps. Newest: ${dayjs + .unix(timestamps[timestamps.length - 1]) + .format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs + .unix(timestamps[0]) + .format('YYYY-MM-DD HH:mm:ss')}` ) - rows = resp[0] + const rows = await db('representatives_uptime') + .whereIn('timestamp', timestamps) + .select() + if (!rows.length) { logger('no rows to archive') return @@ -67,8 +96,10 @@ const archive_representatives_uptime = async ({ batch_size = 10000 }) => { .merge() logger(`copied ${rows.length} rows to storage_mysql`) - logger(`archving ${rows.length} representatives_uptime entries`) - const filename = `representatives-uptime-archive_${timestamp}` + logger(`archiving ${rows.length} representatives_uptime entries`) + const filename = `representatives-uptime-archive_${timestamp}_${dayjs().format( + 'YYYY-MM-DD_HH-mm-ss' + )}` const csv_filename = `${filename}.csv` const csv_writer = createCsvWriter({ path: `${dir}/${csv_filename}`, @@ -80,32 +111,55 @@ const archive_representatives_uptime = async ({ batch_size = 10000 }) => { }) await csv_writer.writeRecords(rows) - // const gz_filename = `${filename}.tar.gz` - // await zip({ gz_filename, csv_filename }) await upload(csv_filename) - const timestamps = rows.map((r) => r.timestamp) - const uniq_timestamps = [...new Set(timestamps)] - const count = await db('representatives_uptime') - .whereIn('timestamp', uniq_timestamps) + const count_deleted = await db('representatives_uptime') + .whereIn('timestamp', timestamps) .del() - logger(`removed ${count} rows from representatives_uptime`) - - offset += batch_size - } while (rows.length === batch_size) + logger(`removed ${count_deleted} rows from representatives_uptime`) + } } -const archive_representatives_telemetry = async ({ batch_size = 10000 }) => { +const archive_representatives_telemetry = async ({ batch_size = 20000 }) => { logger('archiving representatives_telemetry') const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss') const hours = 6 * 7 * 24 // 6 weeks - let offset = 0 - let rows = [] - do { - const resp = await db.raw( - `select * from representatives_telemetry where timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) limit ${batch_size} offset ${offset}` + + // Get a list of stale timestamps and number of rows for each + const stale_timestamps = await db.raw( + `SELECT timestamp, COUNT(*) as count FROM representatives_telemetry WHERE timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY timestamp ORDER BY timestamp ASC` + ) + + let current_batch_size = 0 + let timestamps_for_batch = [] + for (const { timestamp, count } of stale_timestamps[0]) { + if (current_batch_size + count <= batch_size) { + timestamps_for_batch.push(timestamp) + current_batch_size += count + } else { + // Process the current batch + await processBatch(timestamps_for_batch) + timestamps_for_batch = [timestamp] // Start a new batch with the current timestamp + current_batch_size = count + } + } + // Process the last batch + if (timestamps_for_batch.length > 0) { + await processBatch(timestamps_for_batch) + } + + async function processBatch(timestamps) { + logger( + `processing batch of ${timestamps.length} timestamps. Newest: ${dayjs + .unix(timestamps[timestamps.length - 1]) + .format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs + .unix(timestamps[0]) + .format('YYYY-MM-DD HH:mm:ss')}` ) - rows = resp[0] + const rows = await db('representatives_telemetry') + .whereIn('timestamp', timestamps) + .select() + if (!rows.length) { logger('no rows to archive') return @@ -117,8 +171,10 @@ const archive_representatives_telemetry = async ({ batch_size = 10000 }) => { .merge() logger(`copied ${rows.length} rows to storage_mysql`) - logger(`archving ${rows.length} representatives_telemetry entries`) - const filename = `representatives-telemetry-archive_${timestamp}` + logger(`archiving ${rows.length} representatives_telemetry entries`) + const filename = `representatives-telemetry-archive_${timestamp}_${dayjs().format( + 'YYYY-MM-DD_HH-mm-ss' + )}` const csv_filename = `${filename}.csv` const csv_writer = createCsvWriter({ path: `${dir}/${csv_filename}`, @@ -148,35 +204,141 @@ const archive_representatives_telemetry = async ({ batch_size = 10000 }) => { }) await csv_writer.writeRecords(rows) - // const gz_filename = `${filename}.tar.gz` - // await zip({ gz_filename, csv_filename }) await upload(csv_filename) - const timestamps = rows.map((r) => r.timestamp) - const uniq_timestamps = [...new Set(timestamps)] - const count = await db('representatives_telemetry') - .whereIn('timestamp', uniq_timestamps) + const count_deleted = await db('representatives_telemetry') + .whereIn('timestamp', timestamps) .del() - logger(`removed ${count} rows from representatives_telemetry`) + logger(`removed ${count_deleted} rows from representatives_telemetry`) + } +} + +const archive_representatives_telemetry_index = async ({ + batch_size = 20000 +}) => { + logger('archiving representatives_telemetry_index') + const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss') + const hours = 6 * 7 * 24 // 6 weeks + + const stale_timestamps = await db.raw( + `SELECT timestamp, COUNT(*) as count FROM representatives_telemetry_index WHERE timestamp < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY timestamp ORDER BY timestamp ASC` + ) + + let current_batch_size = 0 + let timestamps_for_batch = [] + for (const { timestamp, count } of stale_timestamps[0]) { + if (current_batch_size + count <= batch_size) { + timestamps_for_batch.push(timestamp) + current_batch_size += count + } else { + await processBatch(timestamps_for_batch) + timestamps_for_batch = [timestamp] + current_batch_size = count + } + } + if (timestamps_for_batch.length > 0) { + await processBatch(timestamps_for_batch) + } + + async function processBatch(timestamps) { + logger( + `processing batch of ${timestamps.length} timestamps. Newest: ${dayjs + .unix(timestamps[timestamps.length - 1]) + .format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs + .unix(timestamps[0]) + .format('YYYY-MM-DD HH:mm:ss')}` + ) + const rows = await db('representatives_telemetry_index') + .whereIn('timestamp', timestamps) + .select() + + if (!rows.length) { + logger('no rows to archive') + return + } + + await storage_mysql('representatives_telemetry_index') + .insert(rows) + .onConflict() + .merge() + logger(`copied ${rows.length} rows to storage_mysql`) + + logger(`archiving ${rows.length} representatives_telemetry_index entries`) + const filename = `representatives-telemetry-index-archive_${timestamp}_${dayjs().format( + 'YYYY-MM-DD_HH-mm-ss' + )}` + const csv_filename = `${filename}.csv` + const csv_writer = createCsvWriter({ + path: `${dir}/${csv_filename}`, + header: [ + { id: 'account', title: 'account' }, + { id: 'weight', title: 'weight' }, + { id: 'block_count', title: 'block_count' }, + { id: 'block_behind', title: 'block_behind' }, + { id: 'cemented_count', title: 'cemented_count' }, + { id: 'cemented_behind', title: 'cemented_behind' }, + { id: 'unchecked_count', title: 'unchecked_count' }, + { id: 'bandwidth_cap', title: 'bandwidth_cap' }, + { id: 'peer_count', title: 'peer_count' }, + { id: 'protocol_version', title: 'protocol_version' }, + { id: 'uptime', title: 'uptime' }, + { id: 'major_version', title: 'major_version' }, + { id: 'minor_version', title: 'minor_version' }, + { id: 'patch_version', title: 'patch_version' }, + { id: 'pre_release_version', title: 'pre_release_version' }, + { id: 'maker', title: 'maker' }, + { id: 'node_id', title: 'node_id' }, + { id: 'address', title: 'address' }, + { id: 'port', title: 'port' }, + { id: 'telemetry_timestamp', title: 'telemetry_timestamp' }, + { id: 'timestamp', title: 'timestamp' } + ] + }) + await csv_writer.writeRecords(rows) + + await upload(csv_filename) - offset += batch_size - } while (rows.length === batch_size) + const count_deleted = await db('representatives_telemetry_index') + .whereIn('timestamp', timestamps) + .del() + logger(`removed ${count_deleted} rows from representatives_telemetry_index`) + } } -const archive_posts = async ({ batch_size = 10000 }) => { +const archive_posts = async ({ batch_size = 20000 }) => { logger('archiving posts') const timestamp = dayjs().format('YYYY-MM-DD_HH-mm-ss') const hours = 6 * 7 * 24 // 6 weeks - let offset = 0 - let posts = [] - do { - posts = await db('posts') - .select('posts.*') - .leftJoin('post_labels', 'posts.id', 'post_labels.post_id') - .whereNull('post_labels.label') - .whereRaw( - `created_at < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) limit ${batch_size} offset ${offset}` - ) + + const stale_timestamps = await db.raw( + `SELECT created_at as timestamp, COUNT(*) as count FROM posts WHERE created_at < UNIX_TIMESTAMP(NOW() - INTERVAL ${hours} HOUR) GROUP BY created_at ORDER BY created_at ASC` + ) + + let current_batch_size = 0 + let timestamps_for_batch = [] + for (const { timestamp, count } of stale_timestamps[0]) { + if (current_batch_size + count <= batch_size) { + timestamps_for_batch.push(timestamp) + current_batch_size += count + } else { + await processBatch(timestamps_for_batch) + timestamps_for_batch = [timestamp] + current_batch_size = count + } + } + if (timestamps_for_batch.length > 0) { + await processBatch(timestamps_for_batch) + } + + async function processBatch(timestamps) { + logger( + `processing batch of ${timestamps.length} timestamps. Newest: ${dayjs + .unix(timestamps[timestamps.length - 1]) + .format('YYYY-MM-DD HH:mm:ss')}, oldest: ${dayjs + .unix(timestamps[0]) + .format('YYYY-MM-DD HH:mm:ss')}` + ) + const posts = await db('posts').whereIn('created_at', timestamps).select() if (!posts.length) { logger('no posts to archive') @@ -186,8 +348,10 @@ const archive_posts = async ({ batch_size = 10000 }) => { await storage_mysql('posts').insert(posts).onConflict().merge() logger(`copied ${posts.length} rows to storage_mysql`) - logger(`archving ${posts.length} posts`) - const filename = `posts-archive_${timestamp}` + logger(`archiving ${posts.length} posts`) + const filename = `posts-archive_${timestamp}_${dayjs().format( + 'YYYY-MM-DD_HH-mm-ss' + )}` const csv_filename = `${filename}.csv` const csv_writer = createCsvWriter({ path: `${dir}/${csv_filename}`, @@ -211,20 +375,16 @@ const archive_posts = async ({ batch_size = 10000 }) => { }) await csv_writer.writeRecords(posts) - // const gz_filename = `${filename}.tar.gz` - // await zip({ gz_filename, csv_filename }) await upload(csv_filename) - const post_ids = posts.map((r) => r.id) - const uniq_post_ids = [...new Set(post_ids)] - const count = await db('posts').whereIn('id', uniq_post_ids).del() - logger(`removed ${count} rows from posts`) - - offset += batch_size - } while (posts.length === batch_size) + const count_deleted = await db('posts') + .whereIn('created_at', timestamps) + .del() + logger(`removed ${count_deleted} rows from posts`) + } } -const archive_mysql = async ({ batch_size = 10000 }) => { +const archive_mysql = async ({ batch_size = 20000 }) => { try { await archive_representatives_uptime({ batch_size }) } catch (err) { @@ -237,6 +397,12 @@ const archive_mysql = async ({ batch_size = 10000 }) => { console.log(err) } + try { + await archive_representatives_telemetry_index({ batch_size }) + } catch (err) { + console.log(err) + } + try { await archive_posts({ batch_size }) } catch (err) { @@ -247,7 +413,7 @@ const archive_mysql = async ({ batch_size = 10000 }) => { if (isMain(import.meta.url)) { const main = async () => { try { - const batch_size = argv.batch_size || 10000 + const batch_size = argv.batch_size || 20000 await archive_mysql({ batch_size }) } catch (err) { console.log(err)