diff --git a/backend/src/models/connection.ts b/backend/src/models/connection.ts index a5147ebf..8f2cfdbf 100644 --- a/backend/src/models/connection.ts +++ b/backend/src/models/connection.ts @@ -53,6 +53,59 @@ let connection: Connection | null = null; let dl_connection: Connection | null = null; +let dl2_connection: Connection | null = null; + +const connectDl2 = async (logging?: boolean) => { + const dl2_connection = createConnection({ + type: 'postgres', + host: process.env.DB_HOST, + port: parseInt(process.env.DB_PORT ?? ''), + username: process.env.MDL_USERNAME, + password: process.env.MDL_PASSWORD, + database: process.env.MDL_NAME, + entities: [ + CertScan, + Cidr, + Contact, + DL_Cpe, + DL_Cve, + DL_Domain, + HostScan, + Host, + Ip, + Kev, + Location, + DL_Organization, + PortScan, + PrecertScan, + Report, + Sector, + Snapshot, + SslyzeScan, + Tag, + Tally, + TicketEvent, + Ticket, + TrustymailScan, + VulnScan + ], + synchronize: false, + name: 'default2', + dropSchema: false, + logging: logging ?? false, + cache: true + }); + return dl2_connection; +}; + +export const connectToDatalake2 = async (logging?: boolean) => { + if (!dl2_connection?.isConnected) { + console.log('Connected to datalake'); + dl2_connection = await connectDl2(logging); + } + return dl2_connection; +}; + const connectDl = async (logging?: boolean) => { // process.env.DB_HOST = 'db'; // process.env.MDL_USERNAME = 'mdl'; @@ -108,8 +161,6 @@ export const connectToDatalake = async (logging?: boolean) => { if (!dl_connection?.isConnected) { console.log('Connected to datalake'); dl_connection = await connectDl(logging); - } else { - console.log("didn't connect"); } return dl_connection; }; diff --git a/backend/src/models/mini_data_lake/ticket_events.ts b/backend/src/models/mini_data_lake/ticket_events.ts index e9d662fa..3c339d8c 100644 --- a/backend/src/models/mini_data_lake/ticket_events.ts +++ b/backend/src/models/mini_data_lake/ticket_events.ts @@ -6,21 +6,22 @@ import { Column, PrimaryGeneratedColumn, BaseEntity, - ManyToOne + ManyToOne, + Unique } from 'typeorm'; import { Ticket } from './tickets'; import { VulnScan } from './vuln_scans'; @Entity() +@Unique(['eventTimestamp', 'ticket', 'action']) export class TicketEvent extends BaseEntity { @PrimaryGeneratedColumn('uuid') id: string; @Column({ nullable: true, - type: 'varchar', - unique: true + type: 'varchar' }) reference: string | null; diff --git a/backend/src/tasks/helpers/saveHost.ts b/backend/src/tasks/helpers/saveHost.ts new file mode 100644 index 00000000..77c52eb0 --- /dev/null +++ b/backend/src/tasks/helpers/saveHost.ts @@ -0,0 +1,28 @@ +import { plainToClass } from 'class-transformer'; +import { Host, connectToDatalake } from '../../models'; + +export default async (host: Host): Promise => { + console.log(`Starting to save host ${host.ipString} to datalake`); + await connectToDatalake(); + const hostUpdatedValues = Object.keys(host) + .map((key) => { + if (['id'].indexOf(key) > -1) return ''; + else if (key === 'organization') return 'organizationId'; + else if (key === 'ip') return 'ipId'; + return host[key] !== null ? key : ''; + }) + .filter((key) => key !== ''); + const host_id: string = ( + await Host.createQueryBuilder() + .insert() + .values(host) + .orUpdate({ + conflict_target: ['id'], + overwrite: hostUpdatedValues + }) + .returning('id') + .execute() + ).identifiers[0].id; + + return host_id; +}; diff --git a/backend/src/tasks/helpers/saveIpToMdl.ts b/backend/src/tasks/helpers/saveIpToMdl.ts index 70a35ee3..fd42b52f 100644 --- a/backend/src/tasks/helpers/saveIpToMdl.ts +++ b/backend/src/tasks/helpers/saveIpToMdl.ts @@ -2,7 +2,7 @@ import { plainToClass } from 'class-transformer'; import { Ip, connectToDatalake } from '../../models'; export default async (ipObj: Ip): Promise => { - console.log('Starting to save IP to datalake'); + console.log(`Starting to save IP to datalake: ${ipObj.ip}`); await connectToDatalake(); const ipUpdatedValues = Object.keys(ipObj) .map((key) => { diff --git a/backend/src/tasks/helpers/saveOrganizationToMdl.ts b/backend/src/tasks/helpers/saveOrganizationToMdl.ts index 4980af7c..06cf6311 100644 --- a/backend/src/tasks/helpers/saveOrganizationToMdl.ts +++ b/backend/src/tasks/helpers/saveOrganizationToMdl.ts @@ -11,7 +11,7 @@ export default async ( cidrs: Cidr[], location: Location | null ): Promise => { - console.log('Starting to save Org to datalake'); + console.log(`Saving org ${organization.acronym} to datalake`); await connectToDatalake(); const cidr_entities: Cidr[] = []; diff --git a/backend/src/tasks/helpers/saveTicket.ts b/backend/src/tasks/helpers/saveTicket.ts new file mode 100644 index 00000000..29499f04 --- /dev/null +++ b/backend/src/tasks/helpers/saveTicket.ts @@ -0,0 +1,35 @@ +import { plainToClass } from 'class-transformer'; +import { + Ticket, + DL_Organization, + Cidr, + Location, + connectToDatalake +} from '../../models'; + +export default async (ticket: Ticket): Promise => { + console.log(`Starting to save Ticket to datalake`); + await connectToDatalake(); + const ticketUpdatedValues = Object.keys(ticket) + .map((key) => { + if (['id'].indexOf(key) > -1) return ''; + else if (key === 'organization') return 'organizationId'; + else if (key === 'ip') return 'ipId'; + else if (key === 'cve') return 'cveId'; + return ticket[key] !== null ? key : ''; + }) + .filter((key) => key !== ''); + const ticket_id: string = ( + await Ticket.createQueryBuilder() + .insert() + .values(ticket) + .orUpdate({ + conflict_target: ['id'], + overwrite: ticketUpdatedValues + }) + .returning('id') + .execute() + ).identifiers[0].id; + + return ticket_id; +}; diff --git a/backend/src/tasks/helpers/saveTicketEvent.ts b/backend/src/tasks/helpers/saveTicketEvent.ts new file mode 100644 index 00000000..67cd4017 --- /dev/null +++ b/backend/src/tasks/helpers/saveTicketEvent.ts @@ -0,0 +1,32 @@ +import { plainToClass } from 'class-transformer'; +import { + TicketEvent, + DL_Organization, + Cidr, + Location, + connectToDatalake +} from '../../models'; + +export default async (ticket_event: TicketEvent): Promise => { + await connectToDatalake(); + const ticketEventUpdatedValues = Object.keys(ticket_event) + .map((key) => { + if (['eventTimestamp', 'action', 'ticket'].indexOf(key) > -1) return ''; + else if (key === 'vulnScan') return 'vulnScanId'; + return ticket_event[key] !== null ? key : ''; + }) + .filter((key) => key !== ''); + const ticket_event_id: string = ( + await TicketEvent.createQueryBuilder() + .insert() + .values(ticket_event) + .orUpdate({ + conflict_target: ['eventTimestamp', 'action', 'ticketId'], + overwrite: ticketEventUpdatedValues + }) + .returning('id') + .execute() + ).identifiers[0].id; + + return ticket_event_id; +}; diff --git a/backend/src/tasks/syncmdl.ts b/backend/src/tasks/syncmdl.ts index c405822f..5c9ac128 100644 --- a/backend/src/tasks/syncmdl.ts +++ b/backend/src/tasks/syncmdl.ts @@ -1,5 +1,5 @@ import { Handler } from 'aws-lambda'; -import { connectToDatalake, connectToDatabase } from '../models'; +import { connectToDatalake2, connectToDatabase } from '../models'; export const handler: Handler = async (event) => { const connection = await connectToDatabase(); @@ -42,7 +42,7 @@ export const handler: Handler = async (event) => { ); } - const mdl_connection = await connectToDatalake(true); + const mdl_connection = await connectToDatalake2(true); const type = event?.type || event; const dangerouslyforce = type === 'dangerouslyforce'; diff --git a/backend/src/tasks/vs_sync.ts b/backend/src/tasks/vs_sync.ts index 57dbddcf..b56bd042 100644 --- a/backend/src/tasks/vs_sync.ts +++ b/backend/src/tasks/vs_sync.ts @@ -5,15 +5,21 @@ import { Location, Sector, VulnScan, + Host, Ip, - DL_Cve + DL_Cve, + Ticket, + TicketEvent } from '../models'; const { Client } = require('pg'); import { CommandOptions } from './ecs-client'; import saveVulnScan from './helpers/saveVulnScan'; import saveOrganizationToMdl from './helpers/saveOrganizationToMdl'; +import saveHost from './helpers/saveHost'; import saveIpToMdl from './helpers/saveIpToMdl'; import saveCveToMdl from './helpers/saveCveToMdl'; +import saveTicket from './helpers/saveTicket'; +import saveTicketEvent from './helpers/saveTicketEvent'; import { plainToClass } from 'class-transformer'; /** Removes a value for a given key from the dictionary and then returns it. */ @@ -86,7 +92,6 @@ export const handler = async (commandOptions: CommandOptions) => { // Sectors don't have types can use the type property to determine if its a sector if (!request.agency.hasOwnProperty('type')) { - console.log('In sector section'); // Do Sector Category and Tag logic here // If the sector is in the non_sector_list skip it, it doesn't link to any orgs just other sectors if (non_sector_list.includes(request._id)) { @@ -135,7 +140,6 @@ export const handler = async (commandOptions: CommandOptions) => { Array.isArray(request.children) && request.children.length > 0 ) { - console.log('in parentChild link section'); parent_child_dict[request._id] = request.children; } // Loop through the networks and create network objects @@ -229,7 +233,6 @@ export const handler = async (commandOptions: CommandOptions) => { // Do the same thing to link Sectors and Orgs for (const [key, value] of Object.entries(sector_child_dict)) { - console.log(`Key: ${key}, Value: ${value}`); const sector_promise = await Sector.findOne(key, { relations: ['organizations'] }); @@ -256,7 +259,7 @@ export const handler = async (commandOptions: CommandOptions) => { } } } catch (error) { - console.error('Error reading requests file:', error); + console.error('Error reading requests:', error); throw error; } @@ -276,7 +279,7 @@ export const handler = async (commandOptions: CommandOptions) => { vulnScansArray = result.rows; } catch (error) { console.error( - 'Error connecting to redshift and getting requests data:', + 'Error connecting to redshift and getting vuln_scan data:', error ); return error; @@ -382,4 +385,185 @@ export const handler = async (commandOptions: CommandOptions) => { console.error('Error parsing data', error); throw error; } + let hostsArray; + try { + const startTime = Date.now(); + const query = + 'SELECT * FROM vmtableau.hosts WHERE last_change >= DATE_SUB(NOW(), INTERVAL 2 DAY);'; + const result = await client.query(query); + const endTime = Date.now(); + const durationMs = endTime - startTime; + const durationSeconds = Math.round(durationMs / 1000); + console.log( + `[Redshift] [${durationMs}ms] [${durationSeconds}s] [${result.rows.length.toLocaleString()} records] ${query}` + ); + hostsArray = result.rows; + } catch (error) { + console.error( + 'Error connecting to redshift and getting hosts data:', + error + ); + return error; + } + + try { + if (hostsArray && Array.isArray(hostsArray)) { + const host_list: Host[] = []; + for (const host of hostsArray) { + host.latest_scan = JSON.parse(host.latest_scan); + host.loc = JSON.parse(host.loc); + host.state = JSON.parse(host.state); + let ip_id: string | null = null; + if (!(host.owner in org_id_dict)) { + console.log( + `${host.owner} is not a recognized organizations, skipping host.` + ); + continue; + } + if (host.ip != null) { + ip_id = await saveIpToMdl( + plainToClass(Ip, { + ip: host.ip, + organization: { id: org_id_dict[host.owner] } + }) + ); + } + const hostObj: Host = plainToClass(Host, { + id: host._id, + ipString: host.ip, + ip: ip_id == null ? null : { id: ip_id }, + updatedTimestamp: host.last_change, + latestNetscan1Timestamp: host.latest_scan.NETSCAN1, + latestNetscan2Timestamp: host.latest_scan.NETSCAN2, + latestVulnscanTimestamp: host.latest_scan.VULNSCAN, + latestPortscanTimestamp: host.latest_scan.PORTSCAN, + latestScanCompletionTimestamp: host.latest_scan.DONE, + locationLongitude: host.loc[1], + locationLatitude: host.loc[0], + priority: host.priority, + nextScanTimestamp: host.next_scan, + rand: host.r, + currStage: host.stage, + hostLive: host.state.up, + hostLiveReason: host.state.reason, + status: host.status, + organization: { id: org_id_dict[host.owner] } + }); + + await saveHost(hostObj); + } + } + } catch (error) { + console.error('Error saving hosts:', error); + throw error; + } + + let ticketsArray; + try { + const startTime = Date.now(); + const query = + 'SELECT * FROM vmtableau.tickets WHERE last_change >= DATE_SUB(NOW(), INTERVAL 2 DAY);'; + const result = await client.query(query); + const endTime = Date.now(); + const durationMs = endTime - startTime; + const durationSeconds = Math.round(durationMs / 1000); + console.log( + `[Redshift] [${durationMs}ms] [${durationSeconds}s] [${result.rows.length.toLocaleString()} records] ${query}` + ); + ticketsArray = result.rows; + } catch (error) { + console.error( + 'Error connecting to redshift and getting tickets data:', + error + ); + return error; + } + + try { + if (ticketsArray && Array.isArray(ticketsArray)) { + const ticket_list: Ticket[] = []; + for (const ticket of ticketsArray) { + ticket.details = JSON.parse(ticket.details); + ticket.events = JSON.parse(ticket.events); + ticket.loc = JSON.parse(ticket.loc); + + let ip_id: string | null = null; + if (ticket.ip != null) { + ip_id = await saveIpToMdl( + plainToClass(Ip, { + ip: ticket.ip, + organization: { id: org_id_dict[ticket.owner] } + }) + ); + } + + let cve_id: string | null = null; + if (ticket.details.cve != null) { + cve_id = await saveCveToMdl( + plainToClass(DL_Cve, { + name: ticket.details.cve + }) + ); + } + + const ticketObj: Ticket = plainToClass(Ticket, { + id: ticket._id.replace("ObjectId('", '').replace("')", ''), + cveString: ticket.details.cve, + cve: cve_id == null ? null : { id: cve_id }, + cvss_base_score: ticket.details.cvss_base_score, + cvss_version: null, + // TODO Link Kev once they are added + kev: null, + vulnName: ticket.details.name, + cvssScoreSource: ticket.details.score_source, + cvssSeverity: ticket.details.severity, + vprScore: null, + falsePositive: ticket.false_positive, + ipString: ticket.ip, + ip: ip_id == null ? null : { id: ip_id }, + updatedTimestamp: ticket.last_change, + locationLongitude: ticket.loc[1], + locationLatitude: ticket.loc[0], + foundInLatestHostScan: ticket.open, + organization: { id: org_id_dict[ticket.owner] }, + vulnPort: ticket.port, + portProtocol: ticket.protocol, + snapshotsBool: + ticket.snapshots == null || ticket.snapshots.length > 0 + ? false + : true, + vulnSource: ticket.source, + vulnSourceId: ticket.source_id, + closedTimestamp: ticket.time_closed, + openedTimestamp: ticket.time_opened + + // TODO link snapshots, once added + // snapshots: Snapshot[]; + }); + + const ticket_id = await saveTicket(ticketObj); + for (const event of ticket.events ?? []) { + try { + await saveTicketEvent( + plainToClass(TicketEvent, { + reference: event.reference, + vulnScan: { id: event.reference }, + action: event.action, + reason: event.reason, + eventTimestamp: event.time, + delta: event.delta, + ticket: { id: ticket_id } + }) + ); + } catch (error) { + console.error('Invalid Event Could not save:', error.message); + continue; + } + } + } + } + } catch (error) { + console.error('Error saving tickets:', error); + throw error; + } }; diff --git a/backend/worker/requirements.txt b/backend/worker/requirements.txt index c15afd3f..db5e4877 100644 --- a/backend/worker/requirements.txt +++ b/backend/worker/requirements.txt @@ -7,7 +7,7 @@ dnstwist==20230509 docopt==0.6.2 git+https://github.com/LeapBeyond/scrubadub.git@d0e12c5d922631af3532d044196b05fb1b7c8c1c git+https://github.com/mitmproxy/mitmproxy@e0e46f4 -idna==3.4 +# idna==3.4 joblib==1.2.0 mitmproxy_wireguard==0.1.23 numpy==1.24.3