From d4d5a60e10c0716c3071263000cc862e4a3aeacb Mon Sep 17 00:00:00 2001 From: Cody Maffucci <46459665+Maffooch@users.noreply.github.com> Date: Fri, 1 Nov 2024 16:17:49 -0500 Subject: [PATCH] Fix ruff --- dojo/tools/aws_inspector2/parser.py | 63 +++++++++++-------- unittests/tools/test_aws_inspector2_parser.py | 8 +-- 2 files changed, 41 insertions(+), 30 deletions(-) diff --git a/dojo/tools/aws_inspector2/parser.py b/dojo/tools/aws_inspector2/parser.py index 2f615c67fbe..863c9ebc18a 100644 --- a/dojo/tools/aws_inspector2/parser.py +++ b/dojo/tools/aws_inspector2/parser.py @@ -7,9 +7,8 @@ class AWSInspector2Parser: - """ - Import AWS Inspector2 json - """ + + """Import AWS Inspector2 json.""" def get_scan_types(self): return ["AWS Inspector2 Scan"] @@ -166,7 +165,7 @@ def get_network_reachability(self, finding: Finding, raw_finding: dict) -> Findi network_path_steps = network_path_info.get("steps", []) steps_descriptions = "\n".join( [ - ("steps:\n" f'{step_number}: {step.get("componentId", "N/A")} {step.get("componentType", "N/A")}') + f'steps:\n{step_number}: {step.get("componentId", "N/A")} {step.get("componentType", "N/A")}' for step_number, step in enumerate(network_path_steps) ], ) @@ -196,27 +195,35 @@ def process_endpoints(self, finding: Finding, raw_finding: dict) -> Finding: endpoint_host = resource_id ec2_instance_details = resource_details.get("awsEc2Instance", None) if ec2_instance_details: - impact.append(f"ARN: {resource_id}") - impact.append(f"Image ID: {ec2_instance_details.get('imageId', 'N/A')}") - impact.append(f"IPv4 address: {ec2_instance_details.get('ipV4Addresses', 'N/A')}") - impact.append(f"Subnet: {ec2_instance_details.get('subnetId', 'N/A')}") - impact.append(f"VPC: {ec2_instance_details.get('vpcId', 'N/A')}") - impact.append(f"Region: {resource_region}") - impact.append(f"AWS Account: {aws_account}") - impact.append(f"Launched at: {ec2_instance_details.get('launchedAt', 'N/A')}") - impact.append("---") + impact.extend( + ( + f"ARN: {resource_id}", + f"Image ID: {ec2_instance_details.get('imageId', 'N/A')}", + f"IPv4 address: {ec2_instance_details.get('ipV4Addresses', 'N/A')}", + f"Subnet: {ec2_instance_details.get('subnetId', 'N/A')}", + f"VPC: {ec2_instance_details.get('vpcId', 'N/A')}", + f"Region: {resource_region}", + f"AWS Account: {aws_account}", + f"Launched at: {ec2_instance_details.get('launchedAt', 'N/A')}", + "---", + ), + ) elif resource_type == "AWS_ECR_CONTAINER_IMAGE": image_id = resource_id.split("repository/")[1].replace("sha256:", "").replace("/", "-") endpoint_host = image_id ecr_image_details = resource_details.get("awsEcrContainerImage", None) if ecr_image_details: - impact.append(f"ARN: {resource_id}") - impact.append(f"Registry: {ecr_image_details.get('registry', 'N/A')}") - impact.append(f"Repository: {ecr_image_details.get('repositoryName', 'N/A')}") - impact.append(f"Hash: {ecr_image_details.get('imageHash', 'N/A')}") - impact.append(f"Author: {ecr_image_details.get('author', 'N/A')}") - impact.append(f"Pushed at: {ecr_image_details.get('pushedAt', 'N/A')}") - impact.append("---") + impact.extend( + ( + f"ARN: {resource_id}", + f"Registry: {ecr_image_details.get('registry', 'N/A')}", + f"Repository: {ecr_image_details.get('repositoryName', 'N/A')}", + f"Hash: {ecr_image_details.get('imageHash', 'N/A')}", + f"Author: {ecr_image_details.get('author', 'N/A')}", + f"Pushed at: {ecr_image_details.get('pushedAt', 'N/A')}", + "---", + ), + ) elif resource_type == "AWS_ECR_REPOSITORY": # no corresponding # key present in @@ -227,12 +234,16 @@ def process_endpoints(self, finding: Finding, raw_finding: dict) -> Finding: endpoint_host = lambda_id lambda_details = resource_details.get("awsLambdaFunction", None) if lambda_details: - impact.append(f"ARN: {resource_id}") - impact.append(f"Name: {lambda_details.get('functionName', 'N/A')}") - impact.append(f"Version: {lambda_details.get('version', 'N/A')}") - impact.append(f"Runtime: {lambda_details.get('runtime', 'N/A')}") - impact.append(f"Hash: {lambda_details.get('codeSha256', 'N/A')}") - impact.append(f"Pushed at: {lambda_details.get('lastModifiedAt', 'N/A')}") + impact.extend( + ( + f"ARN: {resource_id}", + f"Name: {lambda_details.get('functionName', 'N/A')}", + f"Version: {lambda_details.get('version', 'N/A')}", + f"Runtime: {lambda_details.get('runtime', 'N/A')}", + f"Hash: {lambda_details.get('codeSha256', 'N/A')}", + f"Pushed at: {lambda_details.get('lastModifiedAt', 'N/A')}", + ), + ) else: msg = "Incorrect Inspector2 report format" raise TypeError(msg) diff --git a/unittests/tools/test_aws_inspector2_parser.py b/unittests/tools/test_aws_inspector2_parser.py index 5883dc86272..f023bec88a2 100644 --- a/unittests/tools/test_aws_inspector2_parser.py +++ b/unittests/tools/test_aws_inspector2_parser.py @@ -7,14 +7,14 @@ class TestAWSInspector2Parser(TestCase): def test_aws_inspector2_parser_with_no_vuln_has_no_findings(self): - with open("unittests/scans/aws_inspector2/aws_inspector2_zero_vul.json") as testfile: + with open("unittests/scans/aws_inspector2/aws_inspector2_zero_vul.json", encoding="utf-8") as testfile: parser = AWSInspector2Parser() findings = parser.get_findings(testfile, Test()) testfile.close() self.assertEqual(0, len(findings)) def test_aws_inspector2_parser_with_one_vuln_has_one_findings(self): - with open("unittests/scans/aws_inspector2/aws_inspector2_one_vul.json") as testfile: + with open("unittests/scans/aws_inspector2/aws_inspector2_one_vul.json", encoding="utf-8") as testfile: parser = AWSInspector2Parser() findings = parser.get_findings(testfile, Test()) testfile.close() @@ -26,7 +26,7 @@ def test_aws_inspector2_parser_with_one_vuln_has_one_findings(self): self.assertEqual("Medium", findings[0].severity) def test_aws_inspector2_parser_with_many_vuln_has_many_findings(self): - with open("unittests/scans/aws_inspector2/aws_inspector2_many_vul.json") as testfile: + with open("unittests/scans/aws_inspector2/aws_inspector2_many_vul.json", encoding="utf-8") as testfile: parser = AWSInspector2Parser() findings = parser.get_findings(testfile, Test()) testfile.close() @@ -37,7 +37,7 @@ def test_aws_inspector2_parser_with_many_vuln_has_many_findings(self): def test_aws_inspector2_parser_empty_with_error(self): with self.assertRaises(TypeError) as context: - with open("unittests/scans/aws_inspector2/empty_with_error.json") as testfile: + with open("unittests/scans/aws_inspector2/empty_with_error.json", encoding="utf-8") as testfile: parser = AWSInspector2Parser() parser.get_findings(testfile, Test()) testfile.close()