From 54992a6d832f26c3c32c05d3825bd666445af58c Mon Sep 17 00:00:00 2001 From: Janson Bunce Date: Thu, 14 Nov 2024 08:34:44 -0800 Subject: [PATCH] Chunk by data size --- backend/src/api/sync.ts | 3 ++- backend/src/tasks/vs_sync.ts | 17 ++++++++------- backend/src/tools/chunk.ts | 42 ++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 9 deletions(-) create mode 100644 backend/src/tools/chunk.ts diff --git a/backend/src/api/sync.ts b/backend/src/api/sync.ts index 2e83a6a3..18b1e345 100644 --- a/backend/src/api/sync.ts +++ b/backend/src/api/sync.ts @@ -301,6 +301,7 @@ async function processOrganizations(client: Client, organizations) { export const ingest = wrapHandler(async (event) => { const originalChecksum = event.headers['x-checksum']; + const cursor = event.headers['x-cursor']; const newChecksum = event.body ? createChecksum(event.body) : ''; const csvData = event.body; @@ -312,7 +313,7 @@ export const ingest = wrapHandler(async (event) => { try { const { key } = await s3Client.saveCSV( csvData, - '', + cursor, process.env.IS_LOCAL ? 'crossfeed-local-exports' : 'crossfeed-lz-sync' ); uploadKey = key; diff --git a/backend/src/tasks/vs_sync.ts b/backend/src/tasks/vs_sync.ts index db1d45f5..e649a27e 100644 --- a/backend/src/tasks/vs_sync.ts +++ b/backend/src/tasks/vs_sync.ts @@ -25,11 +25,10 @@ import { plainToClass } from 'class-transformer'; import savePortScan from './helpers/savePortScan'; import axios from 'axios'; import { createChecksum } from '../tools/csv-utils'; -// import { TEST_DATA } from './REMOVE_ME'; import { getRepository } from 'typeorm'; import { unparse } from 'papaparse'; -import { chunk } from 'lodash'; import saveOrganizationToMdl from './helpers/saveOrganizationToMdl'; +import { chunkBySize } from '../tools/chunk'; /** Removes a value for a given key from the dictionary and then returns it. */ function getValueAndDelete( @@ -61,9 +60,7 @@ export const handler = async (commandOptions: CommandOptions) => { await client.connect(); const startTime = Date.now(); const query = 'SELECT * FROM vmtableau.requests;'; - // const query = 'SELECT * FROM organization;'; const result = await client.query(query); - // const result = { rows: TEST_DATA }; const endTime = Date.now(); const durationMs = endTime - startTime; const durationSeconds = Math.round(durationMs / 1000); @@ -278,7 +275,8 @@ export const handler = async (commandOptions: CommandOptions) => { await connectToDatalake(); const orgRepository = getRepository(DL_Organization); const organizations = await orgRepository.find({ - relations: ['location', 'sectors', 'cidrs', 'parent', 'children'] + relations: ['location', 'sectors', 'cidrs', 'parent', 'children'], + order: { acronym: 'DESC' } }); const shapedOrganizations = organizations.map((item) => { @@ -292,10 +290,12 @@ export const handler = async (commandOptions: CommandOptions) => { }; }); - const chunkedOrgs = chunk(shapedOrganizations, 1000); + const { chunks, chunkBounds } = chunkBySize(shapedOrganizations, 4194304); - const requests = chunkedOrgs.map((orgs) => { + const requests = chunks.map((orgs, i) => { const csvRows = unparse(orgs, { header: true }); + const start = chunkBounds[i].start; + const end = chunkBounds[i].end; const checksum = createChecksum(csvRows); console.log( `Posting CSV file with ${orgs.length} organizations to /sync` @@ -306,7 +306,8 @@ export const handler = async (commandOptions: CommandOptions) => { headers: { 'Content-Type': 'text/csv', Authorization: process.env.DMZ_API_KEY, - 'x-checksum': checksum + 'x-checksum': checksum, + 'x-cursor': `${start}-${end}` }, data: csvRows }); diff --git a/backend/src/tools/chunk.ts b/backend/src/tools/chunk.ts new file mode 100644 index 00000000..f52f7dc7 --- /dev/null +++ b/backend/src/tools/chunk.ts @@ -0,0 +1,42 @@ +export const chunkBySize = ( + array: T[], + maxSize: number +): { chunks: T[][]; chunkBounds: { start: number; end: number }[] } => { + const chunks: T[][] = []; + const chunkBounds: { start: number; end: number }[] = []; + let currentChunk: T[] = []; + let currentSize = 0; + let startIndex = 0; + + const calculateSize = (item: T): number => { + return Buffer.byteLength(JSON.stringify(item), 'utf8'); + }; + + for (let i = 0; i < array.length; i++) { + const item = array[i]; + const itemSize = calculateSize(item); + + if (currentSize + itemSize > maxSize) { + if (currentChunk.length === 0 && itemSize > maxSize) { + throw new Error( + `Item size (${itemSize} bytes) exceeds the maximum chunk size (${maxSize} bytes).` + ); + } + chunks.push(currentChunk); + chunkBounds.push({ start: startIndex, end: i - 1 }); + currentChunk = []; + currentSize = 0; + startIndex = i; + } + + currentChunk.push(item); + currentSize += itemSize; + } + + if (currentChunk.length > 0) { + chunks.push(currentChunk); + chunkBounds.push({ start: startIndex, end: array.length - 1 }); + } + + return { chunks, chunkBounds }; +};