From c7ce7649332e02e3dae3378d3c1333a99c922a84 Mon Sep 17 00:00:00 2001 From: manuelsommer <47991713+manuel-sommer@users.noreply.github.com> Date: Tue, 9 Apr 2024 01:52:43 +0200 Subject: [PATCH] rework intsights to split csv and json (#9855) --- dojo/tools/intsights/csv_handler.py | 95 ++++++++++++++++ dojo/tools/intsights/json_handler.py | 51 +++++++++ dojo/tools/intsights/parser.py | 155 +-------------------------- 3 files changed, 150 insertions(+), 151 deletions(-) create mode 100644 dojo/tools/intsights/csv_handler.py create mode 100644 dojo/tools/intsights/json_handler.py diff --git a/dojo/tools/intsights/csv_handler.py b/dojo/tools/intsights/csv_handler.py new file mode 100644 index 00000000000..3369d4f0100 --- /dev/null +++ b/dojo/tools/intsights/csv_handler.py @@ -0,0 +1,95 @@ +import collections +import csv +import io + + +class IntSightsCSVParser(object): + def _parse_csv(self, csv_file) -> [dict]: + """ + + Parses entries from the CSV file object into a list of alerts + Args: + csv_file: The JSON file object to parse + Returns: + A list of alerts [dict()] + + """ + default_keys = [ + "Alert ID", + "Title", + "Description", + "Severity", + "Type", + "Source Date (UTC)", + "Report Date (UTC)", + "Network Type", + "Source URL", + "Source Name", + "Assets", + "Tags", + "Assignees", + "Remediation", + "Status", + "Closed Reason", + "Additional Info", + "Rating", + "Alert Link" + ] + + # These keys require a value. If one ore more of the values is null or empty, the entire Alert is ignored. + # This is to avoid attempting to import incomplete Findings. + required_keys = ["alert_id", "title", "severity", "status"] + + alerts = [] + invalid_alerts = [] + + content = csv_file.read() + if isinstance(content, bytes): + content = content.decode("utf-8") + csv_reader = csv.DictReader( + io.StringIO(content), delimiter=",", quotechar='"' + ) + + # Don't bother parsing if the keys don't match exactly what's expected + if collections.Counter(default_keys) == collections.Counter( + csv_reader.fieldnames + ): + default_valud = "None provided" + for alert in csv_reader: + alert["alert_id"] = alert.pop("Alert ID") + alert["title"] = alert.pop("Title") + alert["description"] = alert.pop("Description") + alert["severity"] = alert.pop("Severity") + alert["type"] = alert.pop( + "Type", + ) + alert["source_date"] = alert.pop( + "Source Date (UTC)", default_valud + ) + alert["report_date"] = alert.pop( + "Report Date (UTC)", default_valud + ) + alert["network_type"] = alert.pop( + "Network Type", default_valud + ) + alert["source_url"] = alert.pop("Source URL", default_valud) + alert["assets"] = alert.pop("Assets", default_valud) + alert["tags"] = alert.pop("Tags", default_valud) + alert["status"] = alert.pop("Status", default_valud) + alert["alert_link"] = alert.pop("Alert Link") + alert.pop("Assignees") + alert.pop("Remediation") + alert.pop("Closed Reason") + alert.pop("Rating") + for key in required_keys: + if not alert[key]: + invalid_alerts.append(alert) + + if alert not in invalid_alerts: + alerts.append(alert) + else: + self._LOGGER.error( + "The CSV file has one or more missing or unexpected header values" + ) + + return alerts diff --git a/dojo/tools/intsights/json_handler.py b/dojo/tools/intsights/json_handler.py new file mode 100644 index 00000000000..d3d2faae6d5 --- /dev/null +++ b/dojo/tools/intsights/json_handler.py @@ -0,0 +1,51 @@ +import json + + +class IntSightsJSONParser(object): + def _parse_json(self, json_file) -> [dict]: + """ + Parses entries from the JSON object into a list of alerts + Args: + json_file: The JSON file object to parse + Returns: + A list of alerts [dict()] + """ + alerts = [] + + original_alerts = json.load(json_file) + for original_alert in original_alerts.get("Alerts", []): + alert = dict() + alert["alert_id"] = original_alert["_id"] + alert["title"] = original_alert["Details"]["Title"] + alert["description"] = original_alert["Details"]["Description"] + alert["severity"] = original_alert["Details"]["Severity"] + alert["type"] = original_alert["Details"]["Type"] + alert["source_date"] = original_alert["Details"]["Source"].get( + "Date", "None provided" + ) + alert["report_date"] = original_alert.get( + "FoundDate", "None provided" + ) + alert["network_type"] = original_alert["Details"]["Source"].get( + "NetworkType" + ) + alert["source_url"] = original_alert["Details"]["Source"].get( + "URL" + ) + alert["assets"] = ",".join( + [item.get("Value") for item in original_alert["Assets"]] + ) + alert["tags"] = original_alert["Details"].get("Tags") + alert["status"] = ( + "Closed" + if original_alert["Closed"].get("IsClosed") + else "Open" + ) + alert["alert_link"] = ( + f"https://dashboard.intsights.com/#/threat-command/alerts?search=" + f'{original_alert["_id"]}' + ) + + alerts.append(alert) + + return alerts diff --git a/dojo/tools/intsights/parser.py b/dojo/tools/intsights/parser.py index 7b69598c802..094dc4d7951 100644 --- a/dojo/tools/intsights/parser.py +++ b/dojo/tools/intsights/parser.py @@ -1,10 +1,7 @@ -import collections -import csv -import io -import json import logging - from dojo.models import Finding +from dojo.tools.intsights.csv_handler import IntSightsCSVParser +from dojo.tools.intsights.json_handler import IntSightsJSONParser class IntSightsParser: @@ -23,144 +20,6 @@ def get_label_for_scan_types(self, scan_type): def get_description_for_scan_types(self, scan_type): return "IntSights report file can be imported in JSON format." - def _parse_json(self, json_file) -> [dict]: - """ - Parses entries from the JSON object into a list of alerts - Args: - json_file: The JSON file object to parse - Returns: - A list of alerts [dict()] - """ - alerts = [] - - original_alerts = json.load(json_file) - for original_alert in original_alerts.get("Alerts", []): - alert = dict() - alert["alert_id"] = original_alert["_id"] - alert["title"] = original_alert["Details"]["Title"] - alert["description"] = original_alert["Details"]["Description"] - alert["severity"] = original_alert["Details"]["Severity"] - alert["type"] = original_alert["Details"]["Type"] - alert["source_date"] = original_alert["Details"]["Source"].get( - "Date", "None provided" - ) - alert["report_date"] = original_alert.get( - "FoundDate", "None provided" - ) - alert["network_type"] = original_alert["Details"]["Source"].get( - "NetworkType" - ) - alert["source_url"] = original_alert["Details"]["Source"].get( - "URL" - ) - alert["assets"] = ",".join( - [item.get("Value") for item in original_alert["Assets"]] - ) - alert["tags"] = original_alert["Details"].get("Tags") - alert["status"] = ( - "Closed" - if original_alert["Closed"].get("IsClosed") - else "Open" - ) - alert["alert_link"] = ( - f"https://dashboard.intsights.com/#/threat-command/alerts?search=" - f'{original_alert["_id"]}' - ) - - alerts.append(alert) - - return alerts - - def _parse_csv(self, csv_file) -> [dict]: - """ - - Parses entries from the CSV file object into a list of alerts - Args: - csv_file: The JSON file object to parse - Returns: - A list of alerts [dict()] - - """ - default_keys = [ - "Alert ID", - "Title", - "Description", - "Severity", - "Type", - "Source Date (UTC)", - "Report Date (UTC)", - "Network Type", - "Source URL", - "Source Name", - "Assets", - "Tags", - "Assignees", - "Remediation", - "Status", - "Closed Reason", - "Additional Info", - "Rating", - "Alert Link" - ] - - # These keys require a value. If one ore more of the values is null or empty, the entire Alert is ignored. - # This is to avoid attempting to import incomplete Findings. - required_keys = ["alert_id", "title", "severity", "status"] - - alerts = [] - invalid_alerts = [] - - content = csv_file.read() - if isinstance(content, bytes): - content = content.decode("utf-8") - csv_reader = csv.DictReader( - io.StringIO(content), delimiter=",", quotechar='"' - ) - - # Don't bother parsing if the keys don't match exactly what's expected - if collections.Counter(default_keys) == collections.Counter( - csv_reader.fieldnames - ): - default_valud = "None provided" - for alert in csv_reader: - alert["alert_id"] = alert.pop("Alert ID") - alert["title"] = alert.pop("Title") - alert["description"] = alert.pop("Description") - alert["severity"] = alert.pop("Severity") - alert["type"] = alert.pop( - "Type", - ) - alert["source_date"] = alert.pop( - "Source Date (UTC)", default_valud - ) - alert["report_date"] = alert.pop( - "Report Date (UTC)", default_valud - ) - alert["network_type"] = alert.pop( - "Network Type", default_valud - ) - alert["source_url"] = alert.pop("Source URL", default_valud) - alert["assets"] = alert.pop("Assets", default_valud) - alert["tags"] = alert.pop("Tags", default_valud) - alert["status"] = alert.pop("Status", default_valud) - alert["alert_link"] = alert.pop("Alert Link") - alert.pop("Assignees") - alert.pop("Remediation") - alert.pop("Closed Reason") - alert.pop("Rating") - for key in required_keys: - if not alert[key]: - invalid_alerts.append(alert) - - if alert not in invalid_alerts: - alerts.append(alert) - else: - self._LOGGER.error( - "The CSV file has one or more missing or unexpected header values" - ) - - return alerts - def _build_finding_description(self, alert: dict) -> str: """ Builds an IntSights Finding description from various pieces of information. @@ -185,21 +44,18 @@ def _build_finding_description(self, alert: dict) -> str: def get_findings(self, file, test): duplicates = dict() - if file.name.lower().endswith(".json"): - alerts = self._parse_json( + alerts = IntSightsJSONParser()._parse_json( file, ) elif file.name.lower().endswith(".csv"): - alerts = self._parse_csv(file) + alerts = IntSightsCSVParser()._parse_csv(file) else: raise ValueError( "Filename extension not recognized. Use .json or .csv" ) - for alert in alerts: dupe_key = alert["alert_id"] - alert = Finding( title=alert["title"], test=test, @@ -212,10 +68,7 @@ def get_findings(self, file, test): dynamic_finding=True, unique_id_from_tool=alert["alert_id"] ) - duplicates[dupe_key] = alert - if dupe_key not in duplicates: duplicates[dupe_key] = True - return duplicates.values()