diff --git a/backend/env.yml b/backend/env.yml index 8c062466..013e1c23 100644 --- a/backend/env.yml +++ b/backend/env.yml @@ -50,17 +50,13 @@ staging: REPORTS_BUCKET_NAME: cisa-crossfeed-staging-reports CLOUDWATCH_BUCKET_NAME: cisa-crossfeed-staging-cloudwatch STAGE: staging - PE_CLUSTER_NAME: pe-staging-worker + PE_FARGATE_CLUSTER_NAME: pe-staging-worker + PE_FARGATE_TASK_DEFINITION_NAME: pe-staging-worker SHODAN_QUEUE_URL: ${ssm:/crossfeed/staging/SHODAN_QUEUE_URL} - SHODAN_SERVICE_NAME: pe-staging-shodan DNSTWIST_QUEUE_URL: ${ssm:/crossfeed/staging/DNSTWIST_QUEUE_URL} - DNSTWIST_SERVICE_NAME: pe-staging-dnstwist HIBP_QUEUE_URL: ${ssm:/crossfeed/staging/HIBP_QUEUE_URL} - HIBP_SERVICE_NAME: pe-staging-hibp INTELX_QUEUE_URL: ${ssm:/crossfeed/staging/INTELX_QUEUE_URL} - INTELX_SERVICE_NAME: pe-staging-intelx CYBERSIXGILL_QUEUE_URL: ${ssm:/crossfeed/staging/CYBERSIXGILL_QUEUE_URL} - CYBERSIXGILL_SERVICE_NAME: pe-staging-cybersixgill EMAIL_BUCKET_NAME: cisa-crossfeed-staging-html-email prod: @@ -103,9 +99,13 @@ prod: REPORTS_BUCKET_NAME: cisa-crossfeed-prod-reports CLOUDWATCH_BUCKET_NAME: cisa-crossfeed-prod-cloudwatch STAGE: prod - PE_CLUSTER_NAME: pe-prod-worker + PE_FARGATE_CLUSTER_NAME: pe-prod-worker + PE_FARGATE_TASK_DEFINITION_NAME: pe-prod-worker SHODAN_QUEUE_URL: ${ssm:/crossfeed/prod/SHODAN_QUEUE_URL} - SHODAN_SERVICE_NAME: pe-prod-shodan + DNSTWIST_QUEUE_URL: ${ssm:/crossfeed/prod/DNSTWIST_QUEUE_URL} + HIBP_QUEUE_URL: ${ssm:/crossfeed/prod/HIBP_QUEUE_URL} + INTELX_QUEUE_URL: ${ssm:/crossfeed/prod/INTELX_QUEUE_URL} + CYBERSIXGILL_QUEUE_URL: ${ssm:/crossfeed/prod/CYBERSIXGILL_QUEUE_URL} EMAIL_BUCKET_NAME: cisa-crossfeed-staging-html-email dev-vpc: diff --git a/backend/package.json b/backend/package.json index d9e669fe..bc563f50 100644 --- a/backend/package.json +++ b/backend/package.json @@ -111,6 +111,8 @@ "lint": "eslint '**/*.{ts,tsx,js,jsx}'", "lint:fix": "eslint '**/*.{ts,tsx,js,jsx}' --fix", "pesyncdb": "docker-compose exec -T backend npx ts-node src/tools/run-pesyncdb.ts", + "scan-exec": "docker-compose exec -T backend npx ts-node src/tools/run-scanExecution.ts", + "send-message": "node sendMessage.js", "syncdb": "docker-compose exec -T backend npx ts-node src/tools/run-syncdb.ts", "syncmdl": "docker-compose exec -T backend npx ts-node src/tools/run-syncmdl.ts", "test": "jest --detectOpenHandles", diff --git a/backend/sendMessage.js b/backend/sendMessage.js index 6e875286..e868adb8 100644 --- a/backend/sendMessage.js +++ b/backend/sendMessage.js @@ -1,19 +1,18 @@ // sendMessage.js const amqp = require('amqplib'); -async function sendMessageToControlQueue(message) { +async function sendMessageToQueue(message, queue) { const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); - const controlQueue = 'ControlQueue'; - await channel.assertQueue(controlQueue, { durable: true }); + await channel.assertQueue(queue, { durable: true }); - // Simulate sending a message to the ControlQueue - channel.sendToQueue(controlQueue, Buffer.from(JSON.stringify(message)), { + // Simulate sending a message to the queue + channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), { persistent: true }); - console.log('Message sent to ControlQueue:', message); + console.log('Message sent:', message); setTimeout(() => { connection.close(); @@ -22,7 +21,12 @@ async function sendMessageToControlQueue(message) { // Simulate sending a message const message = { - scriptType: 'shodan', + scriptType: 'dnstwist', org: 'DHS' }; -sendMessageToControlQueue(message); +const queue = 'dnstwistQueue'; +sendMessageToQueue(message, queue); +sendMessageToQueue(message, queue); +sendMessageToQueue(message, queue); +sendMessageToQueue(message, queue); +sendMessageToQueue(message, queue); diff --git a/backend/serverless.yml b/backend/serverless.yml index b94a238f..caf161d3 100644 --- a/backend/serverless.yml +++ b/backend/serverless.yml @@ -108,13 +108,6 @@ provider: resources: Resources: - WorkerControlQueue: - Type: AWS::SQS::Queue - Properties: - QueueName: ${self:provider.stage}-worker-control-queue - VisibilityTimeout: 300 # Should match or exceed function timeout - MaximumMessageSize: 262144 # 256 KB - MessageRetentionPeriod: 604800 # 7 days ShodanQueue: Type: AWS::SQS::Queue Properties: diff --git a/backend/src/tasks/ecs-client.ts b/backend/src/tasks/ecs-client.ts index 3a4a88ef..ff9efc1e 100644 --- a/backend/src/tasks/ecs-client.ts +++ b/backend/src/tasks/ecs-client.ts @@ -81,7 +81,7 @@ class ECSClient { // In order to use the host name "db" to access the database from the // crossfeed-worker image, we must launch the Docker container with // the Crossfeed backend network. - NetworkMode: 'crossfeed_backend', + NetworkMode: 'xfd_backend', Memory: 4000000000 // Limit memory to 4 GB. We do this locally to better emulate fargate memory conditions. TODO: In the future, we could read the exact memory from SCAN_SCHEMA to better emulate memory requirements for each scan. }, Env: [ diff --git a/backend/src/tasks/functions.yml b/backend/src/tasks/functions.yml index 3109c32f..bd5ee23b 100644 --- a/backend/src/tasks/functions.yml +++ b/backend/src/tasks/functions.yml @@ -39,17 +39,10 @@ checkUserExpiration: handler: src/tasks/checkUserExpiration.handler events: - schedule: cron(0 0 * * ? *) # Runs every day at midnight + scanExecution: + timeout: 900 # 15 minutes handler: src/tasks/scanExecution.handler - timeout: 300 # 5 minutes - environment: - SQS_QUEUE_NAME: ${self:provider.stage}-worker-control-queue - events: - - sqs: - arn: - Fn::GetAtt: - - WorkerControlQueue - - Arn memorySize: 4096 updateScanTaskStatus: diff --git a/backend/src/tasks/scanExecution.ts b/backend/src/tasks/scanExecution.ts index dd21893c..3bbfa35e 100644 --- a/backend/src/tasks/scanExecution.ts +++ b/backend/src/tasks/scanExecution.ts @@ -1,101 +1,87 @@ -import { Handler, SQSRecord } from 'aws-lambda'; +import { Handler } from 'aws-lambda'; import * as AWS from 'aws-sdk'; import { integer } from 'aws-sdk/clients/cloudfront'; -import { connect } from 'amqplib'; const ecs = new AWS.ECS(); -const sqs = new AWS.SQS(); -let docker; +let docker: any; + if (process.env.IS_LOCAL) { - docker = require('dockerode'); + const Docker = require('dockerode'); + docker = new Docker(); } const toSnakeCase = (input) => input.replace(/ /g, '-'); -async function updateServiceAndQueue( - queueUrl: string, - serviceName: string, +async function startDesiredTasks( + scanType: string, desiredCount: integer, - message_body: any, // Add this parameter - clusterName: string // Add this parameter -) { - // Place message in scan specific queue - if (process.env.IS_LOCAL) { - // If running locally, use RabbitMQ instead of SQS - console.log('Publishing to rabbitMQ'); - await publishToRabbitMQ(queueUrl, message_body); - console.log('Done publishing to rabbitMQ'); - } else { - // Place in AWS SQS queue - console.log('Publishing to scan specific queue'); - await placeMessageInQueue(queueUrl, message_body); - } - - // Check if Fargate is running desired count and start if not - await updateServiceDesiredCount( - clusterName, - serviceName, - desiredCount, - queueUrl - ); - console.log('Done'); -} - -export async function updateServiceDesiredCount( - clusterName: string, - serviceName: string, - desiredCountNum: integer, queueUrl: string ) { try { - if (process.env.IS_LOCAL) { - console.log('starting local containers'); - await startLocalContainers(desiredCountNum, serviceName, queueUrl); - } else { - const describeServiceParams = { - cluster: clusterName, - services: [serviceName] - }; - const serviceDescription = await ecs - .describeServices(describeServiceParams) - .promise(); - if ( - serviceDescription && - serviceDescription.services && - serviceDescription.services.length > 0 - ) { - const service = serviceDescription.services[0]; - - // Check if the desired task count is less than # provided - if (service.desiredCount !== desiredCountNum) { - console.log('Setting desired count.'); - const updateServiceParams = { - cluster: clusterName, - service: serviceName, - desiredCount: desiredCountNum // Set to desired # of Fargate tasks - }; - - await ecs.updateService(updateServiceParams).promise(); - } else { - console.log('Desired count already set.'); - } + // ECS can only start 10 tasks at a time. Split up into batches + const batchSize = 10; + let remainingCount = desiredCount; + while (remainingCount > 0) { + const currentBatchCount = Math.min(remainingCount, batchSize); + + if (process.env.IS_LOCAL) { + // If running locally, use RabbitMQ and Docker instead of SQS and ECS + console.log('Starting local containers'); + await startLocalContainers(currentBatchCount, scanType, queueUrl); + } else { + await ecs + .runTask({ + cluster: process.env.PE_FARGATE_CLUSTER_NAME!, + taskDefinition: process.env.PE_FARGATE_TASK_DEFINITION_NAME!, + networkConfiguration: { + awsvpcConfiguration: { + assignPublicIp: 'ENABLED', + securityGroups: [process.env.FARGATE_SG_ID!], + subnets: [process.env.FARGATE_SUBNET_ID!] + } + }, + platformVersion: '1.4.0', + launchType: 'FARGATE', + count: currentBatchCount, + overrides: { + containerOverrides: [ + { + name: 'main', + environment: [ + { + name: 'SERVICE_TYPE', + value: scanType + }, + { + name: 'SERVICE_QUEUE_URL', + value: queueUrl + } + ] + } + ] + } + }) + .promise(); } + console.log('Tasks started:', currentBatchCount); + remainingCount -= currentBatchCount; } } catch (error) { - console.error('Error: ', error); + console.error('Error starting tasks:', error); + throw error; } } async function startLocalContainers( count: number, - serviceName: string, + scanType: string, queueUrl: string ) { // Start 'count' number of local Docker containers for (let i = 0; i < count; i++) { try { const containerName = toSnakeCase( - `crossfeed_worker_${serviceName}_${i}_` + + `crossfeed_worker_${scanType}_${i}_` + Math.floor(Math.random() * 10000000) ); const container = await docker!.createContainer({ @@ -106,7 +92,7 @@ async function startLocalContainers( // In order to use the host name "db" to access the database from the // crossfeed-worker image, we must launch the Docker container with // the Crossfeed backend network. - NetworkMode: 'crossfeed_backend', + NetworkMode: 'xfd_backend', Memory: 4000000000 // Limit memory to 4 GB. We do this locally to better emulate fargate memory conditions. TODO: In the future, we could read the exact memory from SCAN_SCHEMA to better emulate memory requirements for each scan. }, Env: [ @@ -141,93 +127,64 @@ async function startLocalContainers( `LG_API_KEY=${process.env.LG_API_KEY}`, `LG_WORKSPACE_NAME=${process.env.LG_WORKSPACE_NAME}`, `SERVICE_QUEUE_URL=${queueUrl}`, - `SERVICE_TYPE=${serviceName}` + `SERVICE_TYPE=${scanType}` ] } as any); await container.start(); - console.log(`done starting container ${i}`); + console.log(`Done starting container ${i}`); } catch (e) { console.error(e); } } } -// Place message in AWS SQS Queue -async function placeMessageInQueue(queueUrl: string, message: any) { - const sendMessageParams = { - QueueUrl: queueUrl, - MessageBody: JSON.stringify(message) - }; - - await sqs.sendMessage(sendMessageParams).promise(); -} - -// Function to connect to RabbitMQ and publish a message -async function publishToRabbitMQ(queue: string, message: any) { - const connection = await connect('amqp://rabbitmq'); - const channel = await connection.createChannel(); - - await channel.assertQueue(queue, { durable: true }); - await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message))); - - await channel.close(); - await connection.close(); -} - export const handler: Handler = async (event) => { - try { - let desiredCount; - const clusterName = process.env.PE_CLUSTER_NAME!; + let desiredCount: integer; + let scanType: string; + if (event.desiredCount) { + desiredCount = event.desiredCount; + } else { + console.log('Desired count not found. Setting to 1.'); + desiredCount = 1; + } - // Get the Control SQS record and message body - const sqsRecord: SQSRecord = event.Records[0]; - const message_body = JSON.parse(sqsRecord.body); - console.log(message_body); + if (event.scanType) { + scanType = event.scanType; + } else { + console.error('scanType must be provided.'); + return 'Failed no scanType'; + } - if (message_body.scriptType === 'shodan') { - desiredCount = 5; - await updateServiceAndQueue( - process.env.SHODAN_QUEUE_URL!, - process.env.SHODAN_SERVICE_NAME!, + try { + if (scanType === 'shodan') { + await startDesiredTasks( + scanType, desiredCount, - message_body, - clusterName + process.env.SHODAN_QUEUE_URL! ); - } else if (message_body.scriptType === 'dnstwist') { - desiredCount = 30; - await updateServiceAndQueue( - process.env.DNSTWIST_QUEUE_URL!, - process.env.DNSTWIST_SERVICE_NAME!, + } else if (scanType === 'dnstwist') { + await startDesiredTasks( + scanType, desiredCount, - message_body, - clusterName + process.env.DNSTWIST_QUEUE_URL! ); - } else if (message_body.scriptType === 'hibp') { - desiredCount = 20; - await updateServiceAndQueue( - process.env.HIBP_QUEUE_URL!, - process.env.HIBP_SERVICE_NAME!, + } else if (scanType === 'hibp') { + await startDesiredTasks( + scanType, desiredCount, - message_body, - clusterName + process.env.HIBP_QUEUE_URL! ); - } else if (message_body.scriptType === 'intelx') { - desiredCount = 10; - await updateServiceAndQueue( - process.env.INTELX_QUEUE_URL!, - process.env.INTELX_SERVICE_NAME!, + } else if (scanType === 'intelx') { + await startDesiredTasks( + scanType, desiredCount, - message_body, - clusterName + process.env.INTELX_QUEUE_URL! ); - } else if (message_body.scriptType === 'cybersixgill') { - desiredCount = 10; - await updateServiceAndQueue( - process.env.CYBERSIXGILL_QUEUE_URL!, - process.env.CYBERSIXGILL_SERVICE_NAME!, + } else if (scanType === 'cybersixgill') { + await startDesiredTasks( + scanType, desiredCount, - message_body, - clusterName + process.env.CYBERSIXGILL_QUEUE_URL! ); } else { console.log( diff --git a/backend/src/tools/consumeControlQueue.ts b/backend/src/tools/consumeControlQueue.ts deleted file mode 100644 index 87289464..00000000 --- a/backend/src/tools/consumeControlQueue.ts +++ /dev/null @@ -1,42 +0,0 @@ -// Script to setup Control Queue locally so when messages are sent to it, -// the scanExecution lambda is triggered -import { handler as scanExecution } from '../tasks/scanExecution'; -const amqp = require('amqplib'); -import * as dotenv from 'dotenv'; -import * as path from 'path'; - -async function consumeControlQueue() { - // Load the environment variables from the .env file - const envPath = path.resolve(__dirname, '../../.env'); - dotenv.config({ path: envPath }); - console.log(process.env.SHODAN_QUEUE_URL); - - // Connect to RabbitMQ - const connection = await amqp.connect('amqp://rabbitmq'); - const channel = await connection.createChannel(); - const controlQueue = 'ControlQueue'; - - await channel.assertQueue(controlQueue, { durable: true }); - - console.log('Waiting for messages from ControlQueue...'); - - channel.consume(controlQueue, (message) => { - if (message !== null) { - const payload = JSON.parse(message.content.toString()); - - // Trigger your local Lambda function here - console.log('Received message:', payload); - - // Call scanExecution with the payload from message - scanExecution( - { Records: [{ body: JSON.stringify(payload) }] }, - {} as any, - () => null - ); - - channel.ack(message); - } - }); -} - -consumeControlQueue(); diff --git a/backend/src/tools/run-scanExecution.ts b/backend/src/tools/run-scanExecution.ts new file mode 100644 index 00000000..8e0fccab --- /dev/null +++ b/backend/src/tools/run-scanExecution.ts @@ -0,0 +1,10 @@ +// Script to execute the scanExecution function +import { handler as scanExecution } from '../tasks/scanExecution'; + +async function localScanExecution() { + console.log('Starting...'); + const payload = { scanType: 'dnstwist', desiredCount: 3 }; + scanExecution(payload, {} as any, () => null); +} + +localScanExecution(); diff --git a/dev.env.example b/dev.env.example index 970f359c..b0f65672 100644 --- a/dev.env.example +++ b/dev.env.example @@ -96,15 +96,11 @@ PE_DB_PASSWORD=password SHODAN_QUEUE_URL =shodanQueue -SHODAN_SERVICE_NAME=pe-shodan PE_SHODAN_API_KEYS= DNSTWIST_QUEUE_URL=dnstwistQueue -DNSTWIST_SERVICE_NAME=pe-dnstwist HIBP_QUEUE_URL=hibpQueue -HIBP_SERVICE_NAME=pe-hibp INTELX_QUEUE_URL=intelxQueue -INTELX_SERVICE_NAME=pe-intelx CYBERSIXGILL_QUEUE_URL=cybersixgillQueue -CYBERSIXGILL_SERVICE_NAME=pe-cybersixgill -PE_CLUSTER_NAME=pe-staging-worker +PE_FARGATE_CLUSTER_NAME=pe-staging-worker +PE_FARGATE_TASK_DEFINITION_NAME=pe-staging-worker diff --git a/docs/src/documentation-pages/dev/quickstart.md b/docs/src/documentation-pages/dev/quickstart.md index c6744d18..03335c7c 100644 --- a/docs/src/documentation-pages/dev/quickstart.md +++ b/docs/src/documentation-pages/dev/quickstart.md @@ -62,22 +62,23 @@ This quickstart describes the initial setup required to run an instance of Cross npm run pesyncdb ``` -4. Start the RabbitMQ listener. This will listen for any messages sent to the queue and - trigger the scanExecution.ts function. This will stay running with this message: "Waiting for messages from ControlQueue..." +4. Send messages to RabbitMQ queue. First, edit backend/nodeMessage.js to run the desired scan and + organization. Then run below:" ```bash cd backend - npm run control-queue + npm run send-message ``` -5. Run sendMessage.js to send a sample message to the queue. Feel free to edit this file - while testing. +5. Invoke scans by running below. You can edit the backend/src/tools/run-scanExecution.ts to run the desired scan type." ```bash cd backend - node sendMessage.js + npm run scan-exec ``` +6. Observe logs in docker containers. + ### Running tests To run tests, first make sure you have already started Crossfeed with `npm start` (or, at bare minimum, that the database container is running). Then run: diff --git a/infrastructure/database.tf b/infrastructure/database.tf index 00c6a83f..aae38b31 100644 --- a/infrastructure/database.tf +++ b/infrastructure/database.tf @@ -160,6 +160,13 @@ resource "aws_iam_role_policy" "db_accessor_s3_policy" { "s3:*" ], "Resource": ["${aws_s3_bucket.reports_bucket.arn}", "${aws_s3_bucket.reports_bucket.arn}/*", "${aws_s3_bucket.pe_db_backups_bucket.arn}", "${aws_s3_bucket.pe_db_backups_bucket.arn}/*"] + }, + { + "Effect": "Allow", + "Action": [ + "lambda:InvokeFunction" + ], + "Resource": ["*"] } ] } diff --git a/infrastructure/pe.tf b/infrastructure/pe.tf deleted file mode 100644 index 8256102e..00000000 --- a/infrastructure/pe.tf +++ /dev/null @@ -1,333 +0,0 @@ - -resource "aws_cloudwatch_event_rule" "scheduled_pe_task" { - - name = "scheduled-pe-event-rule" - schedule_expression = "cron(0 0 1,16 * ? *)" -} - -resource "aws_cloudwatch_event_rule" "scheduled_pe_cybersixgill_task" { - - name = "scheduled-pe-event-cybersixgill-rule" - schedule_expression = "cron(0 0 1,16 * ? *)" -} - -resource "aws_iam_role" "cloudwatch_scheduled_task_execution" { - name = "crossfeed-pe-cloudwatch-role-${var.stage}" - assume_role_policy = <