From 25130cdf2d82ba436f8523a32fc884e96861c448 Mon Sep 17 00:00:00 2001 From: Antoine Ruffino <138585151+a-ruff@users.noreply.github.com> Date: Fri, 21 Jun 2024 23:25:11 +0200 Subject: [PATCH] Enhance Kubescape parser (#10369) * Enhance Kubescape parser * Fix typo. * Update settings check-sum * Update settings check-sum --- dojo/settings/.settings.dist.py.sha256sum | 2 +- dojo/settings/settings.dist.py | 4 +- dojo/tools/kubescape/parser.py | 113 +++++++++++++++++----- unittests/tools/test_kubescape_parser.py | 6 +- 4 files changed, 97 insertions(+), 28 deletions(-) diff --git a/dojo/settings/.settings.dist.py.sha256sum b/dojo/settings/.settings.dist.py.sha256sum index 4de58bdb1b..95d13b9ae0 100644 --- a/dojo/settings/.settings.dist.py.sha256sum +++ b/dojo/settings/.settings.dist.py.sha256sum @@ -1 +1 @@ -e9aab91c011f6aa1933791c57e7c37b165e5369606c459f772c4269c56212b53 +ed4d321ce9ae47f9500965e8494a069fb464a9bd4ea3edf994020523f0dea085 diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index 4362b84405..af42b07d84 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -1267,7 +1267,8 @@ def saml2_attrib_map_format(dict): 'Snyk Code Scan': ['vuln_id_from_tool', 'file_path'], 'Bearer CLI': ['title', 'severity'], 'Nancy Scan': ['title', 'vuln_id_from_tool'], - 'Wiz Scan': ['title', 'description', 'severity'] + 'Wiz Scan': ['title', 'description', 'severity'], + 'Kubescape JSON Importer': ['title', 'component_name'] } # Override the hardcoded settings here via the env var @@ -1485,6 +1486,7 @@ def saml2_attrib_map_format(dict): 'Nosey Parker Scan': DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE, 'Bearer CLI': DEDUPE_ALGO_HASH_CODE, 'Wiz Scan': DEDUPE_ALGO_HASH_CODE, + 'Kubescape JSON Importer': DEDUPE_ALGO_HASH_CODE } # Override the hardcoded settings here via the env var diff --git a/dojo/tools/kubescape/parser.py b/dojo/tools/kubescape/parser.py index a5797e8402..be9cd6d741 100644 --- a/dojo/tools/kubescape/parser.py +++ b/dojo/tools/kubescape/parser.py @@ -1,4 +1,5 @@ import json +import textwrap from dojo.models import Finding @@ -13,13 +14,36 @@ def get_label_for_scan_types(self, scan_type): def get_description_for_scan_types(self, scan_type): return "Import result of Kubescape JSON output." + def find_control_summary_by_id(self, data, control_id): + # Browse summaryDetails to look for matching control id. If the Control id is matching, return the first occurence. + try: + controls = data.get("summaryDetails", {}).get("controls", {}) + return controls.get(control_id, None) + except ValueError: + return None + + @staticmethod + def __hyperlink(link: str) -> str: + return "[" + link + "](" + link + ")" + def severity_mapper(self, input): - if input == 1: + if input <= 4: return "Low" - elif input == 2: + elif input <= 7: return "Medium" - elif input == 3: + elif input <= 9: return "High" + elif input <= 10: + return "Critical" + + def parse_resource_id(self, resource_id): + try: + parts = resource_id.split("/") + resource_type = parts[-2] + resource_name = parts[-1] + return resource_type, resource_name + except IndexError: + return None, None def get_findings(self, filename, test): findings = [] @@ -29,27 +53,70 @@ def get_findings(self, filename, test): data = {} for resource in data["resources"]: resourceid = resource["resourceID"] + resource_type, resource_name = self.parse_resource_id(resourceid) results = ([each for each in data["results"] if each.get('resourceID') == resourceid]) controls = results[0].get("controls", []) - try: - prioritizedResource = results[0]["prioritizedResource"]["severity"] - except KeyError: - prioritizedResource = "Info" + for control in controls: - controlID = control['controlID'] - description = control["name"] + "\n\n" - description += "**resourceID:** " + resourceid + "\n" - description += "**resource object:** " + str(resource["object"]) + "\n" - description += "**controlID:** " + controlID + "\n" - description += "**Rules:** " + str(control["rules"]) + "\n" - if self.severity_mapper(prioritizedResource) is None: - severity = "Info" - else: - severity = self.severity_mapper(prioritizedResource) - find = Finding(title=str(controlID), - test=test, - description=description, - severity=severity, - static_finding=True) - findings.append(find) + # This condition is true if the result doesn't contain the status for each control (old format) + retrocompatibility_condition = 'status' not in control or 'status' not in control['status'] + if retrocompatibility_condition or control["status"]["status"] == "failed": + control_name = control["name"] + if resource_type and resource_name and control_name: + title = f"{control_name} - {resource_type} {resource_name}" + else: + title = f"{control_name} - {resourceid}" + controlID = control['controlID'] + + # Find control details + controlSummary = self.find_control_summary_by_id(data, controlID) + if controlSummary is None: + severity = "Info" + mitigation = "" + else: + severity = self.severity_mapper(controlSummary.get("scoreFactor", 0)) + # Define mitigation if available + if "mitigation" in controlSummary: + mitigation = controlSummary["mitigation"] + else: + mitigation = "" + + armoLink = f"https://hub.armosec.io/docs/{controlID.lower()}" + description = "**Summary:** " + f"The ressource '{resourceid}' has failed the control '{control_name}'." + "\n" + if controlSummary is not None and "description" in controlSummary: + description += "**Description:** " + controlSummary["description"] + "\n" + + # Define category if available + if controlSummary is not None and "category" in controlSummary and "subCategory" in controlSummary["category"]: + category_name = controlSummary["category"]["name"] + category_subname = controlSummary["category"]["subCategory"]["name"] + category = f"{category_name} > {category_subname}" + description += "**Category:** " + category + "\n" + elif controlSummary is not None and "category" in controlSummary and "name" in controlSummary["category"]: + category = controlSummary["category"]["name"] + description += "**Category:** " + category + "\n" + + description += "View control details here: " + self.__hyperlink(armoLink) + + steps_to_reproduce = "The following rules have failed :" + "\n" + steps_to_reproduce += "\t**Rules:** " + str(json.dumps(control["rules"], indent=4)) + "\n" + + steps_to_reproduce += "Resource object may contain evidence:" + "\n" + steps_to_reproduce += "\t**Resource object:** " + str(json.dumps(resource["object"], indent=4)) + + references = armoLink + + find = Finding( + title=textwrap.shorten(title, 150), + test=test, + description=description, + mitigation=mitigation, + steps_to_reproduce=steps_to_reproduce, + references=references, + severity=severity, + component_name=resourceid, + static_finding=True, + dynamic_finding=False + ) + findings.append(find) return findings diff --git a/unittests/tools/test_kubescape_parser.py b/unittests/tools/test_kubescape_parser.py index bccbed220a..c68cb2f1f7 100644 --- a/unittests/tools/test_kubescape_parser.py +++ b/unittests/tools/test_kubescape_parser.py @@ -4,7 +4,7 @@ from ..dojo_test_case import DojoTestCase, get_unit_tests_path -class TestOrtParser(DojoTestCase): +class TestKubescapeParser(DojoTestCase): def test_parse_file_has_many_findings(self): with open(get_unit_tests_path() + "/scans/kubescape/many_findings.json") as testfile: parser = KubescapeParser() @@ -15,10 +15,10 @@ def test_parse_file_has_many_results(self): with open(get_unit_tests_path() + "/scans/kubescape/results.json") as testfile: parser = KubescapeParser() findings = parser.get_findings(testfile, Test()) - self.assertEqual(20, len(findings)) + self.assertEqual(0, len(findings)) def test_parse_file_with_a_failure(self): with open(get_unit_tests_path() + "/scans/kubescape/with_a_failure.json") as testfile: parser = KubescapeParser() findings = parser.get_findings(testfile, Test()) - self.assertEqual(18, len(findings)) + self.assertEqual(3, len(findings))