Skip to content

Commit

Permalink
Fix ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
Maffooch committed Nov 1, 2024
1 parent fd21ab7 commit d4d5a60
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 30 deletions.
63 changes: 37 additions & 26 deletions dojo/tools/aws_inspector2/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@


class AWSInspector2Parser:
"""
Import AWS Inspector2 json
"""

"""Import AWS Inspector2 json."""

def get_scan_types(self):
return ["AWS Inspector2 Scan"]
Expand Down Expand Up @@ -166,7 +165,7 @@ def get_network_reachability(self, finding: Finding, raw_finding: dict) -> Findi
network_path_steps = network_path_info.get("steps", [])
steps_descriptions = "\n".join(
[
("steps:\n" f'{step_number}: {step.get("componentId", "N/A")} {step.get("componentType", "N/A")}')
f'steps:\n{step_number}: {step.get("componentId", "N/A")} {step.get("componentType", "N/A")}'
for step_number, step in enumerate(network_path_steps)
],
)
Expand Down Expand Up @@ -196,27 +195,35 @@ def process_endpoints(self, finding: Finding, raw_finding: dict) -> Finding:
endpoint_host = resource_id
ec2_instance_details = resource_details.get("awsEc2Instance", None)
if ec2_instance_details:
impact.append(f"ARN: {resource_id}")
impact.append(f"Image ID: {ec2_instance_details.get('imageId', 'N/A')}")
impact.append(f"IPv4 address: {ec2_instance_details.get('ipV4Addresses', 'N/A')}")
impact.append(f"Subnet: {ec2_instance_details.get('subnetId', 'N/A')}")
impact.append(f"VPC: {ec2_instance_details.get('vpcId', 'N/A')}")
impact.append(f"Region: {resource_region}")
impact.append(f"AWS Account: {aws_account}")
impact.append(f"Launched at: {ec2_instance_details.get('launchedAt', 'N/A')}")
impact.append("---")
impact.extend(
(
f"ARN: {resource_id}",
f"Image ID: {ec2_instance_details.get('imageId', 'N/A')}",
f"IPv4 address: {ec2_instance_details.get('ipV4Addresses', 'N/A')}",
f"Subnet: {ec2_instance_details.get('subnetId', 'N/A')}",
f"VPC: {ec2_instance_details.get('vpcId', 'N/A')}",
f"Region: {resource_region}",
f"AWS Account: {aws_account}",
f"Launched at: {ec2_instance_details.get('launchedAt', 'N/A')}",
"---",
),
)
elif resource_type == "AWS_ECR_CONTAINER_IMAGE":
image_id = resource_id.split("repository/")[1].replace("sha256:", "").replace("/", "-")
endpoint_host = image_id
ecr_image_details = resource_details.get("awsEcrContainerImage", None)
if ecr_image_details:
impact.append(f"ARN: {resource_id}")
impact.append(f"Registry: {ecr_image_details.get('registry', 'N/A')}")
impact.append(f"Repository: {ecr_image_details.get('repositoryName', 'N/A')}")
impact.append(f"Hash: {ecr_image_details.get('imageHash', 'N/A')}")
impact.append(f"Author: {ecr_image_details.get('author', 'N/A')}")
impact.append(f"Pushed at: {ecr_image_details.get('pushedAt', 'N/A')}")
impact.append("---")
impact.extend(
(
f"ARN: {resource_id}",
f"Registry: {ecr_image_details.get('registry', 'N/A')}",
f"Repository: {ecr_image_details.get('repositoryName', 'N/A')}",
f"Hash: {ecr_image_details.get('imageHash', 'N/A')}",
f"Author: {ecr_image_details.get('author', 'N/A')}",
f"Pushed at: {ecr_image_details.get('pushedAt', 'N/A')}",
"---",
),
)
elif resource_type == "AWS_ECR_REPOSITORY":
# no corresponding
# key present in
Expand All @@ -227,12 +234,16 @@ def process_endpoints(self, finding: Finding, raw_finding: dict) -> Finding:
endpoint_host = lambda_id
lambda_details = resource_details.get("awsLambdaFunction", None)
if lambda_details:
impact.append(f"ARN: {resource_id}")
impact.append(f"Name: {lambda_details.get('functionName', 'N/A')}")
impact.append(f"Version: {lambda_details.get('version', 'N/A')}")
impact.append(f"Runtime: {lambda_details.get('runtime', 'N/A')}")
impact.append(f"Hash: {lambda_details.get('codeSha256', 'N/A')}")
impact.append(f"Pushed at: {lambda_details.get('lastModifiedAt', 'N/A')}")
impact.extend(
(
f"ARN: {resource_id}",
f"Name: {lambda_details.get('functionName', 'N/A')}",
f"Version: {lambda_details.get('version', 'N/A')}",
f"Runtime: {lambda_details.get('runtime', 'N/A')}",
f"Hash: {lambda_details.get('codeSha256', 'N/A')}",
f"Pushed at: {lambda_details.get('lastModifiedAt', 'N/A')}",
),
)
else:
msg = "Incorrect Inspector2 report format"
raise TypeError(msg)
Expand Down
8 changes: 4 additions & 4 deletions unittests/tools/test_aws_inspector2_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
class TestAWSInspector2Parser(TestCase):

def test_aws_inspector2_parser_with_no_vuln_has_no_findings(self):
with open("unittests/scans/aws_inspector2/aws_inspector2_zero_vul.json") as testfile:
with open("unittests/scans/aws_inspector2/aws_inspector2_zero_vul.json", encoding="utf-8") as testfile:
parser = AWSInspector2Parser()
findings = parser.get_findings(testfile, Test())
testfile.close()
self.assertEqual(0, len(findings))

def test_aws_inspector2_parser_with_one_vuln_has_one_findings(self):
with open("unittests/scans/aws_inspector2/aws_inspector2_one_vul.json") as testfile:
with open("unittests/scans/aws_inspector2/aws_inspector2_one_vul.json", encoding="utf-8") as testfile:
parser = AWSInspector2Parser()
findings = parser.get_findings(testfile, Test())
testfile.close()
Expand All @@ -26,7 +26,7 @@ def test_aws_inspector2_parser_with_one_vuln_has_one_findings(self):
self.assertEqual("Medium", findings[0].severity)

def test_aws_inspector2_parser_with_many_vuln_has_many_findings(self):
with open("unittests/scans/aws_inspector2/aws_inspector2_many_vul.json") as testfile:
with open("unittests/scans/aws_inspector2/aws_inspector2_many_vul.json", encoding="utf-8") as testfile:
parser = AWSInspector2Parser()
findings = parser.get_findings(testfile, Test())
testfile.close()
Expand All @@ -37,7 +37,7 @@ def test_aws_inspector2_parser_with_many_vuln_has_many_findings(self):

def test_aws_inspector2_parser_empty_with_error(self):
with self.assertRaises(TypeError) as context:
with open("unittests/scans/aws_inspector2/empty_with_error.json") as testfile:
with open("unittests/scans/aws_inspector2/empty_with_error.json", encoding="utf-8") as testfile:
parser = AWSInspector2Parser()
parser.get_findings(testfile, Test())
testfile.close()
Expand Down

0 comments on commit d4d5a60

Please sign in to comment.