Skip to content

Commit

Permalink
Parser for AWS Inspector2 findings (#10829)
Browse files Browse the repository at this point in the history
* Create parser for AWS Inspector2 findings

* Parser cleanup

* Fix ruff

* Fix ruff

* Update hash code fields

---------

Co-authored-by: Cody Maffucci <[email protected]>
  • Loading branch information
siniysv and Maffooch authored Nov 4, 2024
1 parent 8b1242a commit 649528e
Show file tree
Hide file tree
Showing 10 changed files with 1,102 additions and 1 deletion.
24 changes: 24 additions & 0 deletions docs/content/en/integrations/parsers/file/aws_inspector2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
title: "AWS Inspector2 Scanner"
toc_hide: true
---

### File Types
AWS Inspector2 report can be imported in json format. Inspector2 name comes from API calls to "modern" Inspector API - `aws inspector2` as opposite to Classic Inspector (previous version of the service), this is an example of how such report can be generated: `aws inspector2 list-findings --filter-criteria '{"resourceId":[{"comparison":"EQUALS","value":"i-instance_id_here"}]}' --region us-east-1 > inspector2_findings.json`


This parser can help to get findings in a delegated admin account for AWS Inspector or in a standalone AWS account. The parser is developed mostly for a scenario where findings are obtained for a specific resource like an ECR image or an instance, and uploaded to a test in a DefectDojo engagement that represents a branch from a git repository.


A minimal valid json file with no findings:

```json
{
"findings": []
}
```

Detailed API response format can be obtained [here](https://docs.aws.amazon.com/inspector/v2/APIReference/API_Finding.html)

### Sample Scan Data
Sample AWS Inspector2 findings can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/aws_inspector2).
2 changes: 1 addition & 1 deletion dojo/settings/.settings.dist.py.sha256sum
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6a90a111e2b89eb2c400945c80ff76c64b135d78b84fdf6b09a6b83569946904
4b0c6ee05222e622f74d80c8e93504aba986ad0b187aab305ff7ecef89080f11
3 changes: 3 additions & 0 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,7 @@ def saml2_attrib_map_format(dict):
"Kiuwan SCA Scan": ["description", "severity", "component_name", "component_version", "cwe"],
"Rapplex Scan": ["title", "endpoints", "severity"],
"AppCheck Web Application Scanner": ["title", "severity"],
"AWS Inspector2 Scan": ["title", "severity", "description"],
"Legitify Scan": ["title", "endpoints", "severity"],
"ThreatComposer Scan": ["title", "description"],
"Invicti Scan": ["title", "description", "severity"],
Expand Down Expand Up @@ -1350,6 +1351,7 @@ def saml2_attrib_map_format(dict):
"Wazuh": True,
"Nuclei Scan": True,
"Threagile risks report": True,
"AWS Inspector2 Scan": True,
}

# List of fields that are known to be usable in hash_code computation)
Expand Down Expand Up @@ -1510,6 +1512,7 @@ def saml2_attrib_map_format(dict):
"Kiuwan SCA Scan": DEDUPE_ALGO_HASH_CODE,
"Rapplex Scan": DEDUPE_ALGO_HASH_CODE,
"AppCheck Web Application Scanner": DEDUPE_ALGO_HASH_CODE,
"AWS Inspector2 Scan": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE,
"Legitify Scan": DEDUPE_ALGO_HASH_CODE,
"ThreatComposer Scan": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE,
"Invicti Scan": DEDUPE_ALGO_HASH_CODE,
Expand Down
Empty file.
255 changes: 255 additions & 0 deletions dojo/tools/aws_inspector2/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import json
from datetime import UTC, datetime

from dateutil import parser as date_parser

from dojo.models import Endpoint, Finding


class AWSInspector2Parser:

"""Import AWS Inspector2 json."""

def get_scan_types(self):
return ["AWS Inspector2 Scan"]

def get_label_for_scan_types(self, scan_type):
return "AWS Inspector2 Scan"

def get_description_for_scan_types(self, scan_type):
return "AWS Inspector2 report file can be imported in JSON format (aws inspector2 list-findings)."

def get_findings(self, file, test):
tree = json.load(file)
raw_findings = tree.get("findings", None)
if not isinstance(raw_findings, list):
msg = "Incorrect Inspector2 report format"
raise TypeError(msg)
self.test = test
findings = []
for raw_finding in raw_findings:
finding = self.get_base_finding(raw_finding)
# type specific details
finding_type = raw_finding.get("type", None)
if finding_type == "PACKAGE_VULNERABILITY":
finding = self.get_package_vulnerability(finding, raw_finding)
elif finding_type == "CODE_VULNERABILITY":
finding = self.get_code_vulnerability(finding, raw_finding)
elif finding_type == "NETWORK_REACHABILITY":
finding = self.get_network_reachability(finding, raw_finding)
else:
msg = "Incorrect Inspector2 report format"
raise TypeError(msg)
# process the endpoints
finding = self.process_endpoints(finding, raw_finding)
findings.append(finding)

return findings

def get_severity(self, severity_string):
if severity_string == "UNTRIAGED":
severity_string = "Info"
return severity_string.title()

def get_base_finding(self, raw_finding: dict) -> Finding:
# basic fields
finding_id = raw_finding.get("findingArn")
title = raw_finding.get("title", "The title could not be identified...")
description = ""
if (aws_account := raw_finding.get("awsAccountId")) is not None:
description += f"**AWS Account**: {aws_account}\n"
if finding_id is not None:
description += f"**Finding ARN**: {finding_id}\n"
if (inspector_score := raw_finding.get("inspectorScore")) is not None:
description += f"Inspector score: {inspector_score}\n"
if (discovered_at := raw_finding.get("firstObservedAt")) is not None:
description += f"Discovered at: {discovered_at}\n"
if (last_seen_at := raw_finding.get("lastObservedAt")) is not None:
description += f"Last seen: {last_seen_at}\n"
if (orig_description := raw_finding.get("description")) is not None:
description += f"Original description: \n{orig_description}\n"
finding = Finding(
title=title,
test=self.test,
description=description,
severity=self.get_severity(raw_finding.get("severity", "Info")),
unique_id_from_tool=finding_id,
static_finding=True,
dynamic_finding=False,
)
# set mitigation status
if raw_finding.get("status", "ACTIVE") == "ACTIVE":
mitigated = None
is_mitigated = False
active = True
else:
is_mitigated = True
active = False
if (last_observed := raw_finding.get("lastObservedAt", None)) is not None:
mitigated = date_parser(last_observed)
else:
mitigated = datetime.now(UTC)
finding.active = active
finding.is_mitigated = is_mitigated
finding.mitigated = mitigated
# EPSS
finding.epss_score = raw_finding.get("epss", {}).get("score", None)

return finding

def get_package_vulnerability(self, finding: Finding, raw_finding: dict) -> Finding:
vulnerability_details = raw_finding.get("packageVulnerabilityDetails", {})
vulnerability_packages_descriptions = "\n".join(
[
(
f'*Vulnerable package*: {vulnerability_package.get("name", "N/A")}\n'
f'\tpackage manager: {vulnerability_package.get("packageManager", "N/A")}\n'
f'\tversion: {vulnerability_package.get("version", "N/A")}\n'
f'\tfixed version: {vulnerability_package.get("fixedInVersion", "N/A")}\n'
f'\tremediation: {vulnerability_package.get("remediation", "N/A")}\n'
)
for vulnerability_package in vulnerability_details.get("vulnerablePackages", [])
],
)
if (vulnerability_id := vulnerability_details.get("vulnerabilityId", None)) is not None:
finding.unsaved_vulnerability_ids = [vulnerability_id]
vulnerability_source = vulnerability_details.get("source")
vulnerability_source_url = vulnerability_details.get("sourceUrl")
# populate fields
if vulnerability_source is not None and vulnerability_source_url is not None:
finding.url = vulnerability_source_url
finding.description += (
"\n**Additional info**\n"
f"Vulnerability info from: {vulnerability_source} {vulnerability_source_url}\n"
"Affected packages:\n"
f"{vulnerability_packages_descriptions}\n"
)

return finding

def get_code_vulnerability(self, finding: Finding, raw_finding: dict) -> Finding:
cwes = raw_finding.get("cwes", [])
detector_id = raw_finding.get("detectorId", "N/A")
detector_name = raw_finding.get("detectorName", "N/A")
file_path_info = raw_finding.get("filePath", {})
file_name = file_path_info.get("fileName", "N/A")
file_path = file_path_info.get("filePath", "N/A")
start_line = file_path_info.get("startLine", "N/A")
end_line = file_path_info.get("endLine", "N/A")
detector_tags = ", ".join(raw_finding.get("detectorTags", []))
reference_urls = ", ".join(raw_finding.get("referenceUrls", []))
rule_id = raw_finding.get("ruleId", "N/A")
layer_arn = raw_finding.get("sourceLambdaLayerArn", "N/A")
string_cwes = ", ".join(cwes)
# populate fields
finding.cwe = cwes[0] if cwes else None
finding.file_path = f"{file_path}{file_name}"
finding.sast_source_file_path = f"{file_path}{file_name}"
finding.line = start_line
finding.sast_source_line = start_line
finding.description += (
"\n**Additional info**\n"
f"CWEs: {string_cwes}\n"
f"Vulnerability info from: {detector_id} {detector_name}\n"
f"Rule: {rule_id}\n"
f"Lines: {start_line} - {end_line}\n"
f"Tags: {detector_tags or 'N/A'}\n"
f"URLs: {reference_urls or 'N/A'}\n"
f"Lambda layer ARN: {layer_arn}\n"
)

return finding

def get_network_reachability(self, finding: Finding, raw_finding: dict) -> Finding:
network_path_info = raw_finding.get("networkPath", {})
network_path_steps = network_path_info.get("steps", [])
steps_descriptions = "\n".join(
[
f'steps:\n{step_number}: {step.get("componentId", "N/A")} {step.get("componentType", "N/A")}'
for step_number, step in enumerate(network_path_steps)
],
)
open_port_range_info = raw_finding.get("openPortRange", {})
port_range_start = open_port_range_info.get("begin", "N/A")
port_range_end = open_port_range_info.get("end", "N/A")
protocol = raw_finding.get("protocol", "N/A")
finding.description += (
"\n**Additional info**\n"
f"protocol {protocol}, port range {port_range_start} - {port_range_end}"
f"{steps_descriptions}\n"
)

return finding

def process_endpoints(self, finding: Finding, raw_finding: dict) -> Finding:
impact = []
endpoints = []
for resource_info in raw_finding.get("resources", {}):
resource_type = resource_info.get("type", None)
resource_id = resource_info.get("id", "N/A")
resource_details = resource_info.get("details", {})
endpoint_host = f"{resource_type} - {resource_id}"
if resource_type == "AWS_EC2_INSTANCE":
aws_account = raw_finding.get("awsAccountId")
resource_region = resource_info.get("region", "N/A")
endpoint_host = resource_id
ec2_instance_details = resource_details.get("awsEc2Instance", None)
if ec2_instance_details:
impact.extend(
(
f"ARN: {resource_id}",
f"Image ID: {ec2_instance_details.get('imageId', 'N/A')}",
f"IPv4 address: {ec2_instance_details.get('ipV4Addresses', 'N/A')}",
f"Subnet: {ec2_instance_details.get('subnetId', 'N/A')}",
f"VPC: {ec2_instance_details.get('vpcId', 'N/A')}",
f"Region: {resource_region}",
f"AWS Account: {aws_account}",
f"Launched at: {ec2_instance_details.get('launchedAt', 'N/A')}",
"---",
),
)
elif resource_type == "AWS_ECR_CONTAINER_IMAGE":
image_id = resource_id.split("repository/")[1].replace("sha256:", "").replace("/", "-")
endpoint_host = image_id
ecr_image_details = resource_details.get("awsEcrContainerImage", None)
if ecr_image_details:
impact.extend(
(
f"ARN: {resource_id}",
f"Registry: {ecr_image_details.get('registry', 'N/A')}",
f"Repository: {ecr_image_details.get('repositoryName', 'N/A')}",
f"Hash: {ecr_image_details.get('imageHash', 'N/A')}",
f"Author: {ecr_image_details.get('author', 'N/A')}",
f"Pushed at: {ecr_image_details.get('pushedAt', 'N/A')}",
"---",
),
)
elif resource_type == "AWS_ECR_REPOSITORY":
# no corresponding
# key present in
# https://docs.aws.amazon.com/inspector/v2/APIReference/API_ResourceDetails.html
pass
elif resource_type == "AWS_LAMBDA_FUNCTION":
lambda_id = resource_id.split("function:")[1].replace(":", "-").replace("/", "-")
endpoint_host = lambda_id
lambda_details = resource_details.get("awsLambdaFunction", None)
if lambda_details:
impact.extend(
(
f"ARN: {resource_id}",
f"Name: {lambda_details.get('functionName', 'N/A')}",
f"Version: {lambda_details.get('version', 'N/A')}",
f"Runtime: {lambda_details.get('runtime', 'N/A')}",
f"Hash: {lambda_details.get('codeSha256', 'N/A')}",
f"Pushed at: {lambda_details.get('lastModifiedAt', 'N/A')}",
),
)
else:
msg = "Incorrect Inspector2 report format"
raise TypeError(msg)
endpoints.append(Endpoint(host=endpoint_host))
finding.impact = "\n".join(impact)
finding.unsaved_endpoints = []
finding.unsaved_endpoints.extend(endpoints)

return finding
Loading

0 comments on commit 649528e

Please sign in to comment.