Skip to content

Commit

Permalink
Chunk by data size
Browse files Browse the repository at this point in the history
  • Loading branch information
Janson Bunce committed Nov 14, 2024
1 parent afad920 commit 54992a6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 9 deletions.
3 changes: 2 additions & 1 deletion backend/src/api/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ async function processOrganizations(client: Client, organizations) {

export const ingest = wrapHandler(async (event) => {
const originalChecksum = event.headers['x-checksum'];
const cursor = event.headers['x-cursor'];
const newChecksum = event.body ? createChecksum(event.body) : '';
const csvData = event.body;

Expand All @@ -312,7 +313,7 @@ export const ingest = wrapHandler(async (event) => {
try {
const { key } = await s3Client.saveCSV(
csvData,
'',
cursor,
process.env.IS_LOCAL ? 'crossfeed-local-exports' : 'crossfeed-lz-sync'
);
uploadKey = key;
Expand Down
17 changes: 9 additions & 8 deletions backend/src/tasks/vs_sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import { plainToClass } from 'class-transformer';
import savePortScan from './helpers/savePortScan';
import axios from 'axios';
import { createChecksum } from '../tools/csv-utils';
// import { TEST_DATA } from './REMOVE_ME';
import { getRepository } from 'typeorm';
import { unparse } from 'papaparse';
import { chunk } from 'lodash';
import saveOrganizationToMdl from './helpers/saveOrganizationToMdl';
import { chunkBySize } from '../tools/chunk';

/** Removes a value for a given key from the dictionary and then returns it. */
function getValueAndDelete<T>(
Expand Down Expand Up @@ -61,9 +60,7 @@ export const handler = async (commandOptions: CommandOptions) => {
await client.connect();
const startTime = Date.now();
const query = 'SELECT * FROM vmtableau.requests;';
// const query = 'SELECT * FROM organization;';
const result = await client.query(query);
// const result = { rows: TEST_DATA };
const endTime = Date.now();
const durationMs = endTime - startTime;
const durationSeconds = Math.round(durationMs / 1000);
Expand Down Expand Up @@ -278,7 +275,8 @@ export const handler = async (commandOptions: CommandOptions) => {
await connectToDatalake();
const orgRepository = getRepository(DL_Organization);
const organizations = await orgRepository.find({
relations: ['location', 'sectors', 'cidrs', 'parent', 'children']
relations: ['location', 'sectors', 'cidrs', 'parent', 'children'],
order: { acronym: 'DESC' }
});

const shapedOrganizations = organizations.map((item) => {
Expand All @@ -292,10 +290,12 @@ export const handler = async (commandOptions: CommandOptions) => {
};
});

const chunkedOrgs = chunk(shapedOrganizations, 1000);
const { chunks, chunkBounds } = chunkBySize(shapedOrganizations, 4194304);

const requests = chunkedOrgs.map((orgs) => {
const requests = chunks.map((orgs, i) => {
const csvRows = unparse(orgs, { header: true });
const start = chunkBounds[i].start;
const end = chunkBounds[i].end;
const checksum = createChecksum(csvRows);
console.log(
`Posting CSV file with ${orgs.length} organizations to /sync`
Expand All @@ -306,7 +306,8 @@ export const handler = async (commandOptions: CommandOptions) => {
headers: {
'Content-Type': 'text/csv',
Authorization: process.env.DMZ_API_KEY,
'x-checksum': checksum
'x-checksum': checksum,
'x-cursor': `${start}-${end}`
},
data: csvRows
});
Expand Down
42 changes: 42 additions & 0 deletions backend/src/tools/chunk.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
export const chunkBySize = <T>(
array: T[],
maxSize: number
): { chunks: T[][]; chunkBounds: { start: number; end: number }[] } => {
const chunks: T[][] = [];
const chunkBounds: { start: number; end: number }[] = [];
let currentChunk: T[] = [];
let currentSize = 0;
let startIndex = 0;

const calculateSize = (item: T): number => {
return Buffer.byteLength(JSON.stringify(item), 'utf8');
};

for (let i = 0; i < array.length; i++) {
const item = array[i];
const itemSize = calculateSize(item);

if (currentSize + itemSize > maxSize) {
if (currentChunk.length === 0 && itemSize > maxSize) {
throw new Error(
`Item size (${itemSize} bytes) exceeds the maximum chunk size (${maxSize} bytes).`
);
}
chunks.push(currentChunk);
chunkBounds.push({ start: startIndex, end: i - 1 });
currentChunk = [];
currentSize = 0;
startIndex = i;
}

currentChunk.push(item);
currentSize += itemSize;
}

if (currentChunk.length > 0) {
chunks.push(currentChunk);
chunkBounds.push({ start: startIndex, end: array.length - 1 });
}

return { chunks, chunkBounds };
};

0 comments on commit 54992a6

Please sign in to comment.