Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LZ to DMZ Sync #709

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
13 changes: 13 additions & 0 deletions backend/db-init/initmdl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Create the user (role)
CREATE USER mdl_local WITH PASSWORD 'mini_data_lake';

SELECT CURRENT_USER;

-- Optionally grant privileges on a specific database (if required)
CREATE DATABASE mini_data_lake_local;
GRANT ALL PRIVILEGES ON DATABASE mini_data_lake_local TO mdl_local;
ALTER SCHEMA public OWNER TO mdl_local;
ALTER DATABASE mini_data_lake_local OWNER TO mdl_local;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp" WITH SCHEMA public;

SET ROLE mdl_local;
24 changes: 12 additions & 12 deletions backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
"scan-exec": "docker compose exec -T backend npx ts-node src/tools/run-scanExecution.ts",
"syncdb": "docker compose exec -T backend npx ts-node src/tools/run-syncdb.ts",
"syncmdl": "docker compose exec -T backend npx ts-node src/tools/run-syncmdl.ts",
"syncnewmdl": "docker compose exec -T backend npx ts-node src/tools/run-syncnewmdl.ts",
"test": "jest --detectOpenHandles",
"test-python": "pytest"
},
Expand Down
49 changes: 48 additions & 1 deletion backend/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as cves from './cves';
import * as domains from './domains';
import * as notifications from './notifications';
import * as search from './search';
import * as sync from './sync';
import * as vulnerabilities from './vulnerabilities';
import * as organizations from './organizations';
import * as scans from './scans';
Expand All @@ -30,6 +31,7 @@ import { Request, Response, NextFunction } from 'express';
import fetch from 'node-fetch';
import * as searchOrganizations from './organizationSearch';
import { Logger, RecordMessage } from '../tools/logger';
import { parse } from 'papaparse';

const sanitizer = require('sanitizer');

Expand All @@ -52,7 +54,10 @@ const handlerToExpress =
pathParameters: req.params,
query: req.query,
requestContext: req.requestContext,
body: JSON.stringify(req.body || '{}'),
body:
typeof req.body !== 'string'
? JSON.stringify(req.body || '{}')
: req.body,
headers: req.headers,
path: req.originalUrl
},
Expand Down Expand Up @@ -89,6 +94,39 @@ app.use(
})
); // limit 1000 requests per 15 minutes

app.use((req, res, next) => {
// Middleware to parse CSV data
if (req.headers['content-type'] === 'text/csv') {
let data = '';

req.on('data', (chunk) => {
data += chunk;
});
req.on('end', () => {
const parsedData = parse(data, {
header: true,
skipEmptyLines: true
});

if (parsedData.errors.length > 0) {
return res
.status(400)
.json({ message: 'CSV Parsing Error', errors: parsedData.errors });
}
// We don't need to parse the data, just validate it
// CSV Parsing will happen in the handler
req.body = data;
next();
});

req.on('error', () => {
res.status(500).json({ message: 'Error reading CSV data' });
});
} else {
next();
}
});

app.use(express.json({ strict: false }));

// These CORS origins work in all Crossfeed environments
Expand Down Expand Up @@ -762,6 +800,15 @@ authenticatedRoute.post(
handlerToExpress(reports.list_reports)
);

if (process.env.IS_LOCAL) {
console.log('Unauthenticated local route');
app.post('/sync', handlerToExpress(sync.ingest));
} else {
authenticatedRoute.post('/sync', handlerToExpress(sync.ingest));
}

authenticatedRoute.post('/sync', handlerToExpress(sync.ingest));

//Authenticated Registration Routes
authenticatedRoute.put(
'/users/:userId/register/approve',
Expand Down
2 changes: 1 addition & 1 deletion backend/src/api/domains.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ export const export_ = wrapHandler(async (event) => {
res.products = Object.values(products).join(', ');
return res;
});
const url = await client.saveCSV(
const { url } = await client.saveCSV(
Papa.unparse({
fields: [
'name',
Expand Down
175 changes: 175 additions & 0 deletions backend/src/api/sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import { parse } from 'papaparse';
import S3Client from '../tasks/s3-client';
import { createChecksum } from '../tools/csv-utils';
import { REGION_STATE_MAP, wrapHandler } from './helpers';
import { Client } from 'pg';
import { v4 } from 'uuid';
import { getCidrInfo } from '../tools/cidr-utils';

interface ShapedOrg {
networks: string[];
name: string;
report_types: string;
scan_types: string;
stakeholder: string;
retired: string;
period_start: string;
enrolled: string;
acronym: string;
country: string;
country_name: string;
state: string;
state_name: string;
state_fips: string;
county: string;
county_fips: string;
agency_type: string;
}

const persistOrgAndCidrs = async (client: any, org: ShapedOrg) => {
const report_types = org.report_types.includes(',')
? org.report_types.split(',')
: [org.report_types];
const scan_types = org.scan_types.includes(',')
? org.scan_types.split(',')
: [org.scan_types];

try {
const insertOrgText = `
INSERT INTO public.organization (
id, name, report_types, scan_types, stakeholder, retired, acronym, country,
country_name, state, state_name, state_fips, county, county_fips, agency_type,
created_at, updated_at, ip_blocks, is_passive, enrolled_in_vs_timestamp,
period_start_vs_timestamp, region_id
)
VALUES (
uuid_generate_v4(), $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13,
$14, $15, $16, $17, $18, $19, $20, $21
)
RETURNING id;
`;
const result = await client.query(insertOrgText, [
org.name,
JSON.stringify(report_types),
JSON.stringify(scan_types),
org.stakeholder,
org.retired === '' ? false : org.retired,
org.acronym,
org.country,
org.country_name,
org.state,
org.state_name,
org.state_fips,
org.county,
org.county_fips,
org.agency_type,
'now()',
'now()',
'N/A',
'false',
org.enrolled === '' ? 'now' : org.enrolled,
org.period_start === '' ? 'now()' : org.period_start,
REGION_STATE_MAP[org.state_name]
]);
const organizationId = result?.rows[0].id;

// Collect CIDR info and batch insert
const cidrValues = org.networks
.map((network) => {
const cidrInfo = getCidrInfo(network);
return cidrInfo
? `('${v4()}', '${cidrInfo.network}', '${cidrInfo.startIp}', '${
cidrInfo.endIp
}', 'now()', 'now()')`
: null;
})
.filter(Boolean);

const insertCidrText = `
INSERT INTO public.cidr (id, network, start_ip, end_ip, created_date, updated_at)
VALUES ${cidrValues.join(', ')}
RETURNING id;
`;

const cidrResults = await client.query(insertCidrText);
const cidrIds = cidrResults.rows.map((row) => row.id);

// Batch insert CIDR-to-Organization links
const cidrOrgLinkValues = cidrIds
.map((id) => `('${id}', '${organizationId}')`)
.join(', ');

const insertCidrOrgLinkText = `
INSERT INTO public.cidr_organizations (cidr_id, organization_id)
VALUES ${cidrOrgLinkValues};
`;

await client.query(insertCidrOrgLinkText);
} catch (error) {
console.log(`Error while saving organization - ${org.name} ${error}`);
}
};

export const ingest = wrapHandler(async (event) => {
console.time('IngestTimer');
const originalChecksum = event.headers['x-checksum'];
const newChecksum = event.body ? createChecksum(event.body) : '';
const csvData = event.body;

if (originalChecksum === newChecksum) {
// Checksums match, upload the file to S3
let uploadKey: string = '';
const s3Client = new S3Client(false);
if (csvData) {
try {
const { key } = await s3Client.saveCSV(
csvData,
'',
'crossfeed-lz-sync'
);
uploadKey = key;
console.log('Uploaded CSV data to S3');
} catch (error) {
console.error(`Error occurred pushing data to S3: ${error}`);
}
try {
const data = await s3Client.getObject(uploadKey, 'crossfeed-lz-sync');
const fileContents = (await data?.promise())?.Body?.toString('utf-8');
if (fileContents) {
const parsed = parse<ShapedOrg>(fileContents, {
header: true,
transform: (v, f) => {
if (f === 'networks') {
return v.split(',');
}
return v;
}
});
const client = new Client({
user: process.env.MDL_USERNAME,
host: process.env.MDL_HOST,
database: process.env.MDL_DATABASE,
password: process.env.MDL_PASSWORD
});
await client.connect();

const persists = parsed.data.map((org) => {
return persistOrgAndCidrs(client, org);
});

await Promise.all(persists);
console.timeEnd('IngestTimer');
} else {
console.log('File contents empty');
}
} catch (error) {
console.error(`Error occurred fetching object from S3: ${error} `);
}
}
}

return {
statusCode: 200,
body: ''
};
});
3 changes: 2 additions & 1 deletion backend/src/tasks/ecs-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ class ECSClient {
`AWS_ACCESS_KEY_ID=${process.env.AWS_ACCESS_KEY_ID}`,
`AWS_SECRET_ACCESS_KEY=${process.env.AWS_SECRET_ACCESS_KEY}`,
`LG_API_KEY=${process.env.LG_API_KEY}`,
`LG_WORKSPACE_NAME=${process.env.LG_WORKSPACE_NAME}`
`LG_WORKSPACE_NAME=${process.env.LG_WORKSPACE_NAME}`,
`DMZ_API_KEY=${process.env.DMZ_API_KEY}`
]
} as any);
await container.start();
Expand Down
Loading
Loading