{% if system_settings.enable_product_tracking_files %}
{% if prod|has_object_permission:"Product_Tracking_Files_Add" %}
diff --git a/dojo/tools/appcheck_web_application_scanner/engines/appcheck.py b/dojo/tools/appcheck_web_application_scanner/engines/appcheck.py
index ffcfa4b563..ba29a780bc 100644
--- a/dojo/tools/appcheck_web_application_scanner/engines/appcheck.py
+++ b/dojo/tools/appcheck_web_application_scanner/engines/appcheck.py
@@ -27,7 +27,7 @@ def extract_request_response(self, finding: Finding, value: dict[str, [str]]) ->
value.pop("Messages")
finding.unsaved_request, finding.unsaved_response = (d.strip() for d in rr_details[0])
- def parse_details(self, finding: Finding, value: dict[str, Union[str, dict[str, [str]]]]) -> None:
+ def parse_details(self, finding: Finding, value: dict[str, Union[str, dict[str, list[str]]]]) -> None:
self.extract_request_response(finding, value)
# super's version adds everything else to the description field
return super().parse_details(finding, value)
diff --git a/dojo/tools/appcheck_web_application_scanner/engines/base.py b/dojo/tools/appcheck_web_application_scanner/engines/base.py
index 2b2f1cc189..f45fd50669 100644
--- a/dojo/tools/appcheck_web_application_scanner/engines/base.py
+++ b/dojo/tools/appcheck_web_application_scanner/engines/base.py
@@ -5,6 +5,7 @@
import cvss.parser
import dateutil.parser
from cpe import CPE
+from cvss.exceptions import CVSSError
from django.core.exceptions import ImproperlyConfigured
from dojo.models import Endpoint, Finding
@@ -41,6 +42,35 @@ def escape_if_needed(x):
return "".join([escape_if_needed(c) for c in s])
+def cvss_score_to_severity(score: float, version: int) -> str:
+ """
+ Maps a CVSS score with a given version to a severity level.
+ Mapping from https://nvd.nist.gov/vuln-metrics/cvss (modified slightly to have "Info" in range [0.0, 0.1) for CVSS
+ v3/v4)
+ """
+ cvss_score = float(score)
+ if version == 2:
+ if cvss_score >= 7.0:
+ severity = "High"
+ elif cvss_score >= 4.0:
+ severity = "Medium"
+ else:
+ severity = "Low"
+ else:
+ if cvss_score >= 9.0:
+ severity = "Critical"
+ elif cvss_score >= 7.0:
+ severity = "High"
+ elif cvss_score >= 4.0:
+ severity = "Medium"
+ elif cvss_score >= 0.1:
+ severity = "Low"
+ else:
+ severity = "Info"
+
+ return severity
+
+
#######
# Field parsing helper classes
#######
@@ -122,7 +152,6 @@ class BaseEngineParser:
* status -> active/false_p/risk_accepted (depending on value)
* cves -> unsaved_vulnerability_ids (vulnerability_ids)
* cpe -> component name/version
- * cvss_vector -> severity (determined using CVSS package)
* notes -> appended to Finding description
* details -> appended to Finding description
@@ -143,7 +172,6 @@ class BaseEngineParser:
"status": Method("parse_status"),
"cves": Method("parse_cves"),
"cpe": Method("parse_components"),
- "cvss_vector": Method("parse_severity"),
# These should be listed after the 'description' entry; they append to it
"notes": Method("parse_notes"),
"details": Method("parse_details")}
@@ -176,7 +204,7 @@ def parse_initial_date(self, finding: Finding, value: str) -> None:
def is_cve(self, c: str) -> bool:
return bool(c and isinstance(c, str) and self.CVE_PATTERN.fullmatch(c))
- def parse_cves(self, finding: Finding, value: [str]) -> None:
+ def parse_cves(self, finding: Finding, value: list[str]) -> None:
finding.unsaved_vulnerability_ids = [c.upper() for c in value if self.is_cve(c)]
#####
@@ -192,19 +220,6 @@ def parse_status(self, finding: Finding, value: str) -> None:
elif value == "acceptable_risk":
finding.risk_accepted = True
- #####
- # For severity (extracted from cvss vector)
- #####
- def get_severity(self, value: str) -> Optional[str]:
- if cvss_obj := cvss.parser.parse_cvss_from_text(value):
- if (severity := cvss_obj[0].severities()[0].title()) in Finding.SEVERITIES:
- return severity
- return None
-
- def parse_severity(self, finding: Finding, value: str) -> None:
- if severity := self.get_severity(value):
- finding.severity = severity
-
#####
# For parsing component data
#####
@@ -217,7 +232,7 @@ def parse_cpe(self, cpe_str: str) -> (Optional[str], Optional[str]):
(cpe_obj.get_version() and cpe_obj.get_version()[0]) or None,
)
- def parse_components(self, finding: Finding, value: [str]) -> None:
+ def parse_components(self, finding: Finding, value: list[str]) -> None:
# Only use the first entry
finding.component_name, finding.component_version = self.parse_cpe(value[0])
@@ -236,12 +251,12 @@ def append_description(self, finding: Finding, addendum: dict[str, str]) -> None
def parse_notes(self, finding: Finding, value: str) -> None:
self.append_description(finding, {"Notes": value})
- def extract_details(self, value: Union[str, dict[str, Union[str, dict[str, [str]]]]]) -> dict[str, str]:
+ def extract_details(self, value: Union[str, dict[str, Union[str, dict[str, list[str]]]]]) -> dict[str, str]:
if isinstance(value, dict):
return {k: v for k, v in value.items() if k != "_meta"}
return {"Details": str(value)}
- def parse_details(self, finding: Finding, value: dict[str, Union[str, dict[str, [str]]]]) -> None:
+ def parse_details(self, finding: Finding, value: dict[str, Union[str, dict[str, list[str]]]]) -> None:
self.append_description(finding, self.extract_details(value))
#####
@@ -282,6 +297,44 @@ def set_endpoints(self, finding: Finding, item: Any) -> None:
endpoints = self.parse_endpoints(item)
finding.unsaved_endpoints.extend(endpoints)
+ #####
+ # For severity (extracted from various cvss vectors)
+ #####
+ def parse_cvss_vector(self, value: str) -> Optional[str]:
+ # CVSS4 vectors don't parse with the handy-danty parse method :(
+ try:
+ if (severity := cvss.CVSS4(value).severity) in Finding.SEVERITIES:
+ return severity
+ except CVSSError:
+ pass
+
+ if cvss_obj := cvss.parser.parse_cvss_from_text(value):
+ if (severity := cvss_obj[0].severities()[0].title()) in Finding.SEVERITIES:
+ return severity
+ return None
+
+ def set_severity(self, finding: Finding, item: Any) -> None:
+ for base_score_entry, cvss_version in [
+ ("cvss_v4_base_score", 4),
+ ("cvss_v3_base_score", 3),
+ ("cvss_base_score", 2),
+ ]:
+ if base_score := item.get(base_score_entry):
+ finding.severity = cvss_score_to_severity(base_score, cvss_version)
+ return
+
+ for vector_type in ["cvss_v4_vector", "cvss_v3_vector", "cvss_vector"]:
+ if vector := item.get(vector_type):
+ if severity := self.parse_cvss_vector(vector):
+ finding.severity = severity
+ return
+
+ finding.severity = "Info"
+
+ def process_whole_item(self, finding: Finding, item: Any) -> None:
+ self.set_severity(finding, item)
+ self.set_endpoints(finding, item)
+
# Returns the complete field processing map: common fields plus any engine-specific
def get_engine_fields(self) -> dict[str, FieldType]:
return {
@@ -302,7 +355,7 @@ def parse_finding(self, item: dict[str, Any]) -> Tuple[Finding, Tuple]:
# Check first whether the field even exists on this item entry; if not, skip it
if value := item.get(field):
field_handler(self, finding, value)
- self.set_endpoints(finding, item)
+ self.process_whole_item(finding, item)
# Make a note of what scanning engine was used for this Finding
self.append_description(finding, {"Scanning Engine": self.SCANNING_ENGINE})
return finding, self.get_finding_key(finding)
diff --git a/dojo/tools/fortify/fpr_parser.py b/dojo/tools/fortify/fpr_parser.py
index d0d62e2aa9..a5a1105135 100644
--- a/dojo/tools/fortify/fpr_parser.py
+++ b/dojo/tools/fortify/fpr_parser.py
@@ -1,9 +1,10 @@
import re
import zipfile
+from xml.etree.ElementTree import Element
from defusedxml import ElementTree
-from dojo.models import Finding
+from dojo.models import Finding, Test
class FortifyFPRParser:
@@ -12,70 +13,156 @@ def parse_fpr(self, filename, test):
input_zip = zipfile.ZipFile(filename.name, "r")
else:
input_zip = zipfile.ZipFile(filename, "r")
- zipdata = {name: input_zip.read(name) for name in input_zip.namelist()}
- root = ElementTree.fromstring(zipdata["audit.fvdl"].decode("utf-8"))
+ # Read each file from the zip artifact into a dict with the format of
+ # filename: file_content
+ zip_data = {name: input_zip.read(name) for name in input_zip.namelist()}
+ root = self.identify_root(zip_data)
+ return self.parse_vulnerabilities_and_convert_to_findings(root, test)
+
+ def identify_root(self, zip_data: dict) -> Element:
+ """Iterate through the zip data to determine which file in the zip could be the XMl to be parsed."""
+ # Determine where the "audit.fvdl" could be
+ audit_file = None
+ for file_name in zip_data:
+ if file_name.endswith("audit.fvdl"):
+ audit_file = file_name
+ break
+ # Make sure we have an audit file
+ if audit_file is None:
+ msg = 'A search for an "audit.fvdl" file was not successful. '
+ raise ValueError(msg)
+ # Parser the XML file and determine the name space, if present
+ root = ElementTree.fromstring(zip_data.get(audit_file).decode("utf-8"))
+ self.identify_namespace(root)
+ return root
+
+ def identify_namespace(self, root: Element) -> None:
+ """Determine what the namespace could be, and then set the value in a class var labeled `namespace`"""
regex = r"{.*}"
matches = re.match(regex, root.tag)
try:
- namespace = matches.group(0)
+ self.namespace = matches.group(0)
except BaseException:
- namespace = ""
+ self.namespace = ""
+
+ def parse_vulnerabilities_and_convert_to_findings(self, root: Element, test: Test) -> list[Finding]:
+ """Parse the XML and generate a list of findings."""
items = []
for child in root:
if "Vulnerabilities" in child.tag:
for vuln in child:
- ClassID = vuln.find(f"{namespace}ClassInfo").find(f"{namespace}ClassID").text
- Kingdom = vuln.find(f"{namespace}ClassInfo").find(f"{namespace}Kingdom").text
- Type = vuln.find(f"{namespace}ClassInfo").find(f"{namespace}Type").text
- AnalyzerName = vuln.find(f"{namespace}ClassInfo").find(f"{namespace}AnalyzerName").text
- DefaultSeverity = vuln.find(f"{namespace}ClassInfo").find(f"{namespace}DefaultSeverity").text
- InstanceID = vuln.find(f"{namespace}InstanceInfo").find(f"{namespace}InstanceID").text
- InstanceSeverity = vuln.find(f"{namespace}InstanceInfo").find(f"{namespace}InstanceSeverity").text
- Confidence = vuln.find(f"{namespace}InstanceInfo").find(f"{namespace}Confidence").text
- SourceLocationpath = vuln.find(f"{namespace}AnalysisInfo").find(f"{namespace}Unified").find(f"{namespace}Trace").find(f"{namespace}Primary").find(f"{namespace}Entry").find(f"{namespace}Node").find(f"{namespace}SourceLocation").attrib.get("path")
- SourceLocationline = vuln.find(f"{namespace}AnalysisInfo").find(f"{namespace}Unified").find(f"{namespace}Trace").find(f"{namespace}Primary").find(f"{namespace}Entry").find(f"{namespace}Node").find(f"{namespace}SourceLocation").attrib.get("line")
- SourceLocationlineEnd = vuln.find(f"{namespace}AnalysisInfo").find(f"{namespace}Unified").find(f"{namespace}Trace").find(f"{namespace}Primary").find(f"{namespace}Entry").find(f"{namespace}Node").find(f"{namespace}SourceLocation").attrib.get("lineEnd")
- SourceLocationcolStart = vuln.find(f"{namespace}AnalysisInfo").find(f"{namespace}Unified").find(f"{namespace}Trace").find(f"{namespace}Primary").find(f"{namespace}Entry").find(f"{namespace}Node").find(f"{namespace}SourceLocation").attrib.get("colStart")
- SourceLocationcolEnd = vuln.find(f"{namespace}AnalysisInfo").find(f"{namespace}Unified").find(f"{namespace}Trace").find(f"{namespace}Primary").find(f"{namespace}Entry").find(f"{namespace}Node").find(f"{namespace}SourceLocation").attrib.get("colEnd")
- SourceLocationsnippet = vuln.find(f"{namespace}AnalysisInfo").find(f"{namespace}Unified").find(f"{namespace}Trace").find(f"{namespace}Primary").find(f"{namespace}Entry").find(f"{namespace}Node").find(f"{namespace}SourceLocation").attrib.get("snippet")
- description = Type + "\n"
- severity = self.fpr_severity(Confidence, InstanceSeverity)
- description += "**ClassID:** " + ClassID + "\n"
- description += "**Kingdom:** " + Kingdom + "\n"
- description += "**AnalyzerName:** " + AnalyzerName + "\n"
- description += "**DefaultSeverity:** " + DefaultSeverity + "\n"
- description += "**InstanceID:** " + InstanceID + "\n"
- description += "**InstanceSeverity:** " + InstanceSeverity + "\n"
- description += "**Confidence:** " + Confidence + "\n"
- description += "**SourceLocationpath:** " + str(SourceLocationpath) + "\n"
- description += "**SourceLocationline:** " + str(SourceLocationline) + "\n"
- description += "**SourceLocationlineEnd:** " + str(SourceLocationlineEnd) + "\n"
- description += "**SourceLocationcolStart:** " + str(SourceLocationcolStart) + "\n"
- description += "**SourceLocationcolEnd:** " + str(SourceLocationcolEnd) + "\n"
- description += "**SourceLocationsnippet:** " + str(SourceLocationsnippet) + "\n"
- items.append(
- Finding(
- title=Type + " " + ClassID,
- severity=severity,
- static_finding=True,
- test=test,
- description=description,
- unique_id_from_tool=ClassID,
- file_path=SourceLocationpath,
- line=SourceLocationline,
- ),
- )
+ finding_context = {
+ "title": "",
+ "description": "",
+ "static_finding": True,
+ "test": test,
+ }
+ self.parse_class_information(vuln, finding_context)
+ self.parse_instance_information(vuln, finding_context)
+ self.parse_analysis_information(vuln, finding_context)
+ self.parse_severity_and_convert(vuln, finding_context)
+ items.append(Finding(**finding_context))
return items
- def fpr_severity(self, Confidence, InstanceSeverity):
- if float(Confidence) >= 2.5 and float(InstanceSeverity) >= 2.5:
- severity = "Critical"
- elif float(Confidence) >= 2.5 and float(InstanceSeverity) < 2.5:
- severity = "High"
- elif float(Confidence) < 2.5 and float(InstanceSeverity) >= 2.5:
- severity = "Medium"
- elif float(Confidence) < 2.5 and float(InstanceSeverity) < 2.5:
- severity = "Low"
- else:
- severity = "Info"
- return severity
+ def parse_severity_and_convert(self, vulnerability: Element, finding_context: dict) -> None:
+ """Convert the the float representation of severity and confidence to a string severity."""
+ # Default info severity in the case of an error
+ severity = "Info"
+ instance_severity = None
+ confidence = None
+ # Attempt to fetch the confidence and instance severity
+ if (instance_info := vulnerability.find(f"{self.namespace}InstanceInfo")) is not None:
+ instance_severity = getattr(instance_info.find(f"{self.namespace}InstanceSeverity"), "text", None)
+ confidence = getattr(instance_info.find(f"{self.namespace}Confidence"), "text", None)
+ # Make sure we have something to work with
+ if confidence is not None and instance_severity is not None:
+ if float(confidence) >= 2.5 and float(instance_severity) >= 2.5:
+ severity = "Critical"
+ elif float(confidence) >= 2.5 and float(instance_severity) < 2.5:
+ severity = "High"
+ elif float(confidence) < 2.5 and float(instance_severity) >= 2.5:
+ severity = "Medium"
+ elif float(confidence) < 2.5 and float(instance_severity) < 2.5:
+ severity = "Low"
+ # Return either info, or the calculated severity
+ finding_context["severity"] = severity
+
+ def parse_class_information(self, vulnerability: Element, finding_context: dict) -> None:
+ """Appends the description with any class information that can be extracted."""
+ if (class_info := vulnerability.find(f"{self.namespace}ClassInfo")) is not None:
+ if (namespace_type := class_info.find(f"{self.namespace}Type")) is not None:
+ finding_context["description"] += f"{namespace_type.text}\n"
+ finding_context["title"] += f"{namespace_type.text}"
+ if (class_id := class_info.find(f"{self.namespace}ClassID")) is not None:
+ finding_context["description"] += f"**ClassID:** {class_id.text}\n"
+ finding_context["unique_id_from_tool"] = class_id.text
+ finding_context["title"] += f" {class_id.text}"
+ if (kingdom := class_info.find(f"{self.namespace}Kingdom")) is not None:
+ finding_context["description"] += f"**Kingdom:** {kingdom.text}\n"
+ if (analyzer_name := class_info.find(f"{self.namespace}AnalyzerName")) is not None:
+ finding_context["description"] += f"**AnalyzerName:** {analyzer_name.text}\n"
+ if (default_severity := class_info.find(f"{self.namespace}DefaultSeverity")) is not None:
+ finding_context["description"] += f"**DefaultSeverity:** {default_severity.text}\n"
+
+ def parse_instance_information(self, vulnerability: Element, finding_context: dict) -> None:
+ """Appends the description with any instance information that can be extracted."""
+ if (instance_info := vulnerability.find(f"{self.namespace}InstanceInfo")) is not None:
+ if (instance_id := instance_info.find(f"{self.namespace}InstanceID")) is not None:
+ finding_context["description"] += f"**InstanceID:** {instance_id.text}\n"
+ if (instance_severity := instance_info.find(f"{self.namespace}InstanceSeverity")) is not None:
+ finding_context["description"] += f"**InstanceSeverity:** {instance_severity.text}\n"
+ if (confidence := instance_info.find(f"{self.namespace}Confidence")) is not None:
+ finding_context["description"] += f"**Confidence:** {confidence.text}\n"
+
+ def parse_analysis_information(self, vulnerability: Element, finding_context: dict) -> None:
+ """Appends the description with any analysis information that can be extracted."""
+ if (analysis_info := vulnerability.find(f"{self.namespace}AnalysisInfo")) is not None:
+ # See if we can get a SourceLocation from this
+ if (source_location := self.get_source_location(analysis_info)) is not None:
+ path = source_location.attrib.get("path")
+ line = source_location.attrib.get("line")
+ # Managed the description
+ finding_context["description"] += f"**SourceLocationPath:** {path}\n"
+ finding_context["description"] += f"**SourceLocationLine:** {line}\n"
+ finding_context["description"] += (
+ f"**SourceLocationLineEnd:** {source_location.attrib.get('lineEnd')}\n"
+ )
+ finding_context["description"] += (
+ f"**SourceLocationColStart:** {source_location.attrib.get('colStart')}\n"
+ )
+ finding_context["description"] += f"**SourceLocationColEnd:** {source_location.attrib.get('colEnd')}\n"
+ finding_context["description"] += (
+ f"**SourceLocationSnippet:** {source_location.attrib.get('snippet')}\n"
+ )
+ # manage the other metadata
+ finding_context["file_path"] = path
+ finding_context["line"] = line
+
+ def get_source_location(self, analysis_info: Element) -> Element | None:
+ """Return the SourceLocation element if we are able to reach it."""
+ # The order of this list is very important. Do not reorder it!
+ key_path = [
+ "Unified",
+ "Trace",
+ "Primary",
+ "Entry",
+ "Node",
+ "SourceLocation",
+ ]
+ # iterate of the keys until we find something that cannot be fulfilled
+ current_element = analysis_info
+ # Traverse the key path up to "Entry" to fetch all Entry elements
+ for key in key_path[:-3]: # stop before "Entry" level
+ if (next_current_element := current_element.find(f"{self.namespace}{key}")) is not None:
+ current_element = next_current_element
+ else:
+ return None
+ # Iterate over all "Entry" elements
+ entries = current_element.findall(f"{self.namespace}Entry")
+ for entry in entries:
+ # Continue the search for "Node" and "SourceLocation" within each entry
+ if (node := entry.find(f"{self.namespace}Node")) is not None:
+ if (source_location := node.find(f"{self.namespace}SourceLocation")) is not None:
+ return source_location
+ # Return None if no SourceLocation was found in any Entry
+ return None
diff --git a/dojo/tools/fortify/parser.py b/dojo/tools/fortify/parser.py
index 2b1f3e21e3..b6f7e5185c 100644
--- a/dojo/tools/fortify/parser.py
+++ b/dojo/tools/fortify/parser.py
@@ -17,3 +17,6 @@ def get_findings(self, filename, test):
return FortifyXMLParser().parse_xml(filename, test)
elif str(filename.name).endswith(".fpr"):
return FortifyFPRParser().parse_fpr(filename, test)
+ else:
+ msg = "Filename extension not recognized. Use .xml or .fpr"
+ raise ValueError(msg)
diff --git a/dojo/tools/invicti/__init__.py b/dojo/tools/invicti/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/dojo/tools/invicti/parser.py b/dojo/tools/invicti/parser.py
new file mode 100644
index 0000000000..93854d9a2d
--- /dev/null
+++ b/dojo/tools/invicti/parser.py
@@ -0,0 +1,20 @@
+from dojo.tools.netsparker.parser import NetsparkerParser
+
+
+class InvictiParser(NetsparkerParser):
+ def get_scan_types(self):
+ return ["Invicti Scan"]
+
+ def get_label_for_scan_types(self, scan_type):
+ return "Invicti Scan"
+
+ def get_description_for_scan_types(self, scan_type):
+ return "Invicti JSON format."
+
+ def get_findings(self, filename, test):
+ """Extended the NetSparker Parser since the Invicti is the a renamed version of Netsparker.
+
+ If there are deviations from the two report formats in the future, then this
+ function can be implemented then.
+ """
+ return super().get_findings(filename, test)
diff --git a/dojo/tools/threat_composer/parser.py b/dojo/tools/threat_composer/parser.py
index 1babba06fd..f1099641b0 100644
--- a/dojo/tools/threat_composer/parser.py
+++ b/dojo/tools/threat_composer/parser.py
@@ -70,12 +70,12 @@ def get_findings(self, file, test):
if "threatAction" in threat:
title = threat["threatAction"]
- severity, impact, comments = self.parse_threat_metadata(threat["metadata"])
+ severity, impact, comments = self.parse_threat_metadata(threat.get("metadata", []))
description = self.to_description_text(threat, comments, assumption_threat_links[threat["id"]])
mitigation = self.to_mitigation_text(mitigation_links[threat["id"]])
unique_id_from_tool = threat["id"]
vuln_id_from_tool = threat["numericId"]
- tags = threat["tags"] if "tags" in threat else []
+ tags = threat.get("tags", [])
finding = Finding(
title=title,
@@ -112,14 +112,12 @@ def to_mitigation_text(self, mitigations):
counti = i + 1
text += f"**Mitigation {counti} (ID: {mitigation['numericId']}, Status: {mitigation.get('status', 'Not defined')})**: {mitigation['content']}"
- for item in mitigation["metadata"]:
+ for item in mitigation.get("metadata", []):
if item["key"] == "Comments":
text += f"\n*Comments*: {item['value'].replace(linesep, ' ')} "
break
- for j, assumption in enumerate(assumption_links):
- countj = j + 1
- text += f"\n- *Assumption {countj} (ID: {assumption['numericId']})*: {assumption['content'].replace(linesep, ' ')}"
+ text += self.to_assumption_text(assumption_links)
text += "\n"
@@ -145,8 +143,19 @@ def to_description_text(self, threat, comments, assumption_links):
if comments:
text += f"\n*Comments*: {comments}"
+ text += self.to_assumption_text(assumption_links)
+
+ return text
+
+ def to_assumption_text(self, assumption_links):
+ text = ""
for i, assumption in enumerate(assumption_links):
counti = i + 1
text += f"\n- *Assumption {counti} (ID: {assumption['numericId']})*: {assumption['content'].replace(linesep, ' ')}"
+ for item in assumption.get("metadata", []):
+ if item["key"] == "Comments":
+ text += f"\n *Comments*: {item['value'].replace(linesep, ' ')} "
+ break
+
return text
diff --git a/dojo/tools/wiz/parser.py b/dojo/tools/wiz/parser.py
index a68ecae2bb..f312554474 100644
--- a/dojo/tools/wiz/parser.py
+++ b/dojo/tools/wiz/parser.py
@@ -1,91 +1,209 @@
import csv
import io
+import json
+import logging
import sys
-from dojo.models import Finding
+from dateutil import parser as date_parser
+from dojo.models import SEVERITIES, Finding, Test
-class WizParser:
- def get_scan_types(self):
- return ["Wiz Scan"]
+logger = logging.getLogger(__name__)
- def get_label_for_scan_types(self, scan_type):
- return "Wiz Scan"
- def get_description_for_scan_types(self, scan_type):
- return "Wiz scan results in csv file format."
+class WizParserByTitle:
+ """Parser the CSV where the "Title" field is the match for a finding title."""
- def get_findings(self, filename, test):
- content = filename.read()
- if isinstance(content, bytes):
- content = content.decode("utf-8")
- csv.field_size_limit(int(sys.maxsize / 10)) # the request/resp are big
- reader = csv.DictReader(io.StringIO(content))
+ def parse_findings(self, test: Test, reader: csv.DictReader) -> list[Finding]:
+ """Parse the CSV with the assumed format of the link below.
+
+ test file: https://github.com/DefectDojo/django-DefectDojo/blob/master/unittests/scans/wiz/multiple_findings.csv
+ """
findings = []
+ description_fields = [
+ "Description",
+ "Resource Type",
+ "Resource external ID",
+ "Subscription ID",
+ "Project IDs",
+ "Project Names",
+ "Control ID",
+ "Resource Name",
+ "Resource Region",
+ "Resource Status",
+ "Resource Platform",
+ "Resource OS",
+ "Resource original JSON",
+ "Issue ID",
+ "Resource vertex ID",
+ "Ticket URLs",
+ "Note",
+ "Due At",
+ "Subscription Name",
+ "Wiz URL",
+ "Cloud Provider URL",
+ "Resource Tags",
+ "Kubernetes Cluster",
+ "Kubernetes Namespace",
+ "Container Service",
+ ]
+ # Iterate over the objects to create findings
for row in reader:
if row.get("Status").lower() == "open":
- Title = row.get("Title")
- Severity = row.get("Severity")
- Description = row.get("Description")
- Resource_Type = row.get("Resource Type")
- Resource_external_ID = row.get("Resource external ID")
- Subscription_ID = row.get("Subscription ID")
- Project_IDs = row.get("Project IDs")
- Project_Names = row.get("Project Names")
- Control_ID = row.get("Control ID")
- Resource_Name = row.get("Resource Name")
- Resource_Region = row.get("Resource Region")
- Resource_Status = row.get("Resource Status")
- Resource_Platform = row.get("Resource Platform")
- Resource_OS = row.get("Resource OS")
- Resource_original_JSON = row.get("Resource original JSON")
- Issue_ID = row.get("Issue ID")
- Resource_vertex_ID = row.get("Resource vertex ID")
- Ticket_URLs = row.get("Ticket URLs")
- Note = row.get("Note")
- Due_At = row.get("Due At")
- Subscription_Name = row.get("Subscription Name")
- Wiz_URL = row.get("Wiz URL")
- Cloud_Provider_URL = row.get("Cloud Provider URL")
- Resource_Tags = row.get("Resource Tags")
- Kubernetes_Cluster = row.get("Kubernetes Cluster")
- Kubernetes_Namespace = row.get("Kubernetes Namespace")
- Container_Service = row.get("Container Service")
+ title = row.get("Title")
+ severity = row.get("Severity")
+ mitigation = row.get("Remediation Recommendation")
description = ""
- description += "**Description**: " + Description + "\n"
- description += "**Resource Type**: " + Resource_Type + "\n"
- description += "**external ID**: " + Resource_external_ID + "\n"
- description += "**Subscription ID**: " + Subscription_ID + "\n"
- description += "**Project IDs**: " + Project_IDs + "\n"
- description += "**Project Names**: " + Project_Names + "\n"
- description += "**Control ID**: " + Control_ID + "\n"
- description += "**Resource Name**: " + Resource_Name + "\n"
- description += "**Resource Region**: " + Resource_Region + "\n"
- description += "**Resource Status**: " + Resource_Status + "\n"
- description += "**Resource Platform**: " + Resource_Platform + "\n"
- description += "**Resource OS**: " + Resource_OS + "\n"
- description += "**original JSON**: " + Resource_original_JSON + "\n"
- description += "**Issue ID**: " + Issue_ID + "\n"
- description += "**vertex ID**: " + Resource_vertex_ID + "\n"
- description += "**Ticket URLs**: " + Ticket_URLs + "\n"
- description += "**Note**: " + Note + "\n"
- description += "**Due At**: " + Due_At + "\n"
- description += "**Subscription Name**: " + Subscription_Name + "\n"
- description += "**Wiz URL**: " + Wiz_URL + "\n"
- description += "**Provider URL**: " + Cloud_Provider_URL + "\n"
- description += "**Resource Tags**: " + Resource_Tags + "\n"
- description += "**Kubernetes Cluster**: " + Kubernetes_Cluster + "\n"
- description += "**Kubernetes Namespace**: " + Kubernetes_Namespace + "\n"
- description += "**Container Service**: " + Container_Service + "\n"
+ # Iterate over the description fields to create the description
+ for field in description_fields:
+ if (field_value := row.get(field)) is not None and len(field_value) > 0:
+ description += f"**{field}**: {field_value}\n"
+ # Create the finding object
findings.append(
Finding(
- title=Title,
+ title=title,
description=description,
- severity=Severity.lower().capitalize(),
+ severity=severity.lower().capitalize(),
static_finding=False,
dynamic_finding=True,
- mitigation=row.get("Remediation Recommendation"),
+ mitigation=mitigation,
test=test,
),
)
return findings
+
+
+class WizParserByDetailedName:
+ """Parser the CSV where the "DetailedName" and "Name" fields are the match for a finding title."""
+
+ def parse_findings(self, test: Test, reader: csv.DictReader) -> list[Finding]:
+ """Parse the CSV with the assumed format of the link below.
+
+ test file: Coming soon!
+ """
+ findings = []
+ description_fields = {
+ "WizURL": "Wiz URL",
+ "HasExploit": "Has Exploit",
+ "HasCisaKevExploit": "Has Cisa Kev Exploit",
+ "LocationPath": "Location Path",
+ "Version": "Version",
+ "DetectionMethod": "Detection Method",
+ "Link": "Link",
+ "Projects": "Projects",
+ "AssetID": "Asset ID",
+ "AssetName": "Asset Name",
+ "AssetRegion": "Asset Region",
+ "ProviderUniqueId": "Provider Unique Id",
+ "CloudProviderURL": "Cloud Provider URL",
+ "CloudPlatform": "Cloud Platform",
+ "SubscriptionExternalId": "Subscription External Id",
+ "SubscriptionId": "Subscription Id",
+ "SubscriptionName": "Subscription Name",
+ "ExecutionControllers": "Execution Controllers",
+ "ExecutionControllersSubscriptionExternalIds": "Execution Controllers Subscription External Ids",
+ "ExecutionControllersSubscriptionNames": "Execution Controllers Subscription Names",
+ "OperatingSystem": "Operating System",
+ "IpAddresses": "Ip Addresses",
+ }
+ mitigation_fields = {
+ "LocationPath": "Location Path",
+ "FixedVersion": "Fixed Version",
+ "Remediation": "Remediation",
+ }
+
+ for row in reader:
+ # Common fields
+ vulnerability_id = row.get("Name")
+ package_name = row.get("DetailedName")
+ package_version = row.get("Version")
+ severity = row.get("VendorSeverity")
+ finding_id = row.get("ID")
+
+ description = self._construct_string_field(description_fields, row)
+ mitigation = self._construct_string_field(mitigation_fields, row)
+ status_dict = self._convert_status(row)
+ # Create the finding object
+ finding = Finding(
+ title=f"{package_name}: {vulnerability_id}",
+ description=description,
+ mitigation=mitigation,
+ severity=self._validate_severities(severity),
+ static_finding=True,
+ unique_id_from_tool=finding_id,
+ component_name=package_name,
+ component_version=package_version,
+ date=date_parser.parse(row.get("FirstDetected")),
+ test=test,
+ **status_dict,
+ )
+ finding.unsaved_vulnerability_ids = [vulnerability_id]
+ finding.unsaved_tags = self._parse_tags(row.get("Tags", "[]"))
+ findings.append(finding)
+ return findings
+
+ def _construct_string_field(self, fields: dict[str, str], row: dict) -> str:
+ """Construct a formatted string based on the fields dict supplied."""
+ return_string = ""
+ for field, pretty_field in fields.items():
+ if (field_value := row.get(field)) is not None and len(field_value) > 0:
+ return_string += f"**{pretty_field}**: `{field_value}`\n"
+ return return_string
+
+ def _parse_tags(self, tags: str) -> list[str]:
+ """parse the Tag string dict, and convert to a list of strings.
+
+ The format of the tags is is "{""key"":""value""}" format
+ """
+ # Convert the string to a dict
+ tag_dict = json.loads(tags)
+ return [f"{key}: {value}" for key, value in tag_dict.items()]
+
+ def _validate_severities(self, severity: str) -> str:
+ """Ensure the supplied severity fits what DefectDojo is expecting."""
+ if severity not in SEVERITIES:
+ logger.error(f"Severity is not supported: {severity}")
+ # Default to Info severity
+ return "Info"
+ return severity
+
+ def _convert_status(self, row: dict) -> dict:
+ """Convert the "FindingStatus" column to a dict of Finding statuses.
+
+ - Open-> Active = True
+ - Other statuses that may exist...
+ """
+ if (status := row.get("FindingStatus")) is not None:
+ if status == "Open":
+ return {"active": True}
+ # Return the default status of active
+ return {"active": True}
+
+
+class WizParser(
+ WizParserByTitle,
+ WizParserByDetailedName,
+):
+ def get_scan_types(self):
+ return ["Wiz Scan"]
+
+ def get_label_for_scan_types(self, scan_type):
+ return "Wiz Scan"
+
+ def get_description_for_scan_types(self, scan_type):
+ return "Wiz scan results in csv file format."
+
+ def get_findings(self, filename, test):
+ content = filename.read()
+ if isinstance(content, bytes):
+ content = content.decode("utf-8")
+ csv.field_size_limit(int(sys.maxsize / 10)) # the request/resp are big
+ reader = csv.DictReader(io.StringIO(content))
+ # Determine which parser to use
+ if "Title" in reader.fieldnames:
+ return WizParserByTitle().parse_findings(test, reader)
+ if all(field in reader.fieldnames for field in ["Name", "DetailedName"]):
+ return WizParserByDetailedName().parse_findings(test, reader)
+ else:
+ msg = "This CSV format of Wiz is not supported"
+ raise ValueError(msg)
diff --git a/helm/defectdojo/Chart.yaml b/helm/defectdojo/Chart.yaml
index 61744bdfbd..9bd09f45fa 100644
--- a/helm/defectdojo/Chart.yaml
+++ b/helm/defectdojo/Chart.yaml
@@ -2,7 +2,7 @@ apiVersion: v2
appVersion: "2.39.0-dev"
description: A Helm chart for Kubernetes to install DefectDojo
name: defectdojo
-version: 1.6.150-dev
+version: 1.6.151-dev
icon: https://www.defectdojo.org/img/favicon.ico
maintainers:
- name: madchap
diff --git a/helm/defectdojo/templates/network-policy.yaml b/helm/defectdojo/templates/network-policy.yaml
index 251128004e..80c55ddcfa 100644
--- a/helm/defectdojo/templates/network-policy.yaml
+++ b/helm/defectdojo/templates/network-policy.yaml
@@ -13,7 +13,7 @@ spec:
podSelector:
matchLabels:
app.kubernetes.io/instance: {{ .Release.Name }}
- {{- if .Value.networkPolicy.ingress}}
+ {{- if .Values.networkPolicy.ingress}}
ingress:
{{- toYaml .Values.networkPolicy.ingress | nindent 4 }}
{{- else }}
diff --git a/unittests/scans/appcheck_web_application_scanner/appcheck_web_application_scanner_many_vul.json b/unittests/scans/appcheck_web_application_scanner/appcheck_web_application_scanner_many_vul.json
index ee12493a84..052de39077 100644
--- a/unittests/scans/appcheck_web_application_scanner/appcheck_web_application_scanner_many_vul.json
+++ b/unittests/scans/appcheck_web_application_scanner/appcheck_web_application_scanner_many_vul.json
@@ -514,7 +514,7 @@
"cvss_score": 0.0,
"type": "WEB_APP",
"web_app": "https://example.x73zjffz.com",
- "cvss_v4_vector": "CVSS:4.0/AV:N/AC:L/AT:N/PR:N/UI:N/VC:N/VI:N/VA:N/SC:N/SI:N/SA:N",
+ "cvss_v4_vector": "CVSS:4.0/AV:L/AC:H/AT:P/PR:L/UI:A/VC:N/VI:H/VA:N/SC:N/SI:N/SA:N",
"mss_confirmed": false,
"category": "web_app",
"description": "[[markup]]This is simply a report of HTTP request methods supported by the web application.",
diff --git a/unittests/scans/invicti/invicti_many_findings.json b/unittests/scans/invicti/invicti_many_findings.json
new file mode 100644
index 0000000000..c1a1bef778
--- /dev/null
+++ b/unittests/scans/invicti/invicti_many_findings.json
@@ -0,0 +1,4681 @@
+{
+ "Generated": "25/06/2021 10:00 AM",
+ "Target": {
+ "Duration": "00:12:24.8161163",
+ "Initiated": "25/06/2021 01:46 AM",
+ "ScanId": "ee9136920f6243486d12ad5104e2f745",
+ "Url": "http://php.testsparker.com/"
+ },
+ "Vulnerabilities": [
+ {
+ "Certainty": 100,
+ "Classification": {
+ "Iso27001": "A.14.2.5",
+ "Capec": "107",
+ "Cvss": {
+ "BaseScore": {
+ "Severity": 2,
+ "Type": "Base",
+ "Value": "5.7"
+ },
+ "EnvironmentalScore": {
+ "Severity": 2,
+ "Type": "Environmental",
+ "Value": "5.5"
+ },
+ "TemporalScore": {
+ "Severity": 2,
+ "Type": "Temporal",
+ "Value": "5.5"
+ },
+ "Vector": "CVSS:3.0/AV:N/AC:L/PR:L/UI:R/S:U/C:H/I:N/A:N/E:H/RL:O/RC:C"
+ },
+ "Cvss31": null,
+ "Cwe": "16",
+ "Hipaa": "",
+ "Owasp": "A5",
+ "OwaspProactiveControls": "",
+ "Pci32": "",
+ "Wasc": "15"
+ },
+ "Confirmed": true,
+ "Description": "
Netsparker Enterprise identified a cookie not marked as HTTPOnly.
\n
HTTPOnly cookies cannot be read by client-side scripts, therefore marking a cookie as HTTPOnly can provide an additional layer of protection against cross-site scripting attacks.
Consider marking all of the cookies used by the application as HTTPOnly. (After these changes javascript code will not be able to read cookies.)
\n\n
",
+ "RemedialProcedure": "
Mark the cookie as HTTPOnly. This will be an extra layer of defense against XSS. However this is not a silver bullet and will not protect the system against cross-site scripting attacks. An attacker can use a tool such as XSS Tunnel to bypass HTTPOnly protection.
Netsparker Enterprise identified a Boolean-Based SQL Injection, which occurs when data input by a user is interpreted as a SQL command rather than as normal data by the backend database.
\n
This is an extremely common vulnerability and its successful exploitation can have critical implications.
\n
Netsparker Enterprise confirmed the vulnerability by executing a test SQL query on the backend database. In these tests, SQL injection was not obvious, but the different responses from the page based on the injection test allowed Netsparker Enterprise to identify and confirm the SQL injection.
Proof of Exploit
Identified Database Name
sqlibench
Identified Database User
root@localhost
Identified Database Version
5.0.51b-community-nt-log
",
+ "ExploitationSkills": "
There are numerous freely available tools to exploit SQL injection vulnerabilities. This is a complex area with many dependencies; however, it should be noted that the numerous resources available in this area have raised both attacker awareness of the issues and their ability to discover and leverage them.
Depending on the backend database, the database connection settings and the operating system, an attacker can mount one or more of the following type of attacks successfully:\n
\n
Reading, updating and deleting arbitrary data/tables from the database
\n
Executing commands on the underlying operating system
If you are not using a database access layer (DAL), consider using one. This will help you centralize the issue. You can also use ORM (object relational mapping). Most of the ORM systems use only parameterized queries and this can solve the whole SQL injection problem.
\n
Locate all of the dynamically generated SQL queries and convert them to parameterized queries. (If you decide to use a DAL/ORM, change all legacy code to use these new libraries.)
\n
Use your weblogs and application logs to see if there were any previous but undetected attacks to this resource.
\n\n
",
+ "RemedialProcedure": "
The best way to protect your code against SQL injections is using parameterized queries (prepared statements). Almost all modern languages provide built-in libraries for this. Wherever possible, do not create dynamic SQL queries or SQL queries with string concatenation.
Netsparker Enterprise identified a version disclosure (Apache) in the target web server's HTTP response.
\n
This information might help an attacker gain a greater understanding of the systems in use and potentially develop further attacks targeted at the specific version of Apache.
Netsparker Enterprise identified a version disclosure (PHP) in target web server's HTTP response.
\n
This information can help an attacker gain a greater understanding of the systems in use and potentially develop further attacks targeted at the specific version of PHP.
Netsparker Enterprise detected backup source code on your web server.
<?php\nrequire("auth.php");\nini_set("display_errors","0");\n\n//global configuration area\n$globals["title"] = "Netsparker Test Web Site - PHP";\nfunction EndsWith($FullStr, $EndStr)\n{\n // Get the length of the end string\n $StrLen = strlen($EndStr);\n // Look at the end of FullStr for the substring the size of EndStr\n $FullStrEnd = substr($FullStr, strlen($FullStr) - $StrLen);\n // If it matches, it does end with EndStr\n return $FullStrEnd == $EndStr;\n}\n?>\n…\n<?php include "Internals/header.php"?>\n…\n<?php include "Internals/upmenu.php"?>\n…\n<?php\n $file = $_REQUEST["file"];\n if(EndsWith($file,".nsp"))\n include $_REQUEST["file"];\n ?>\n…\n<?php include "Internals/footer.php"?>
",
+ "ExploitationSkills": "
This is dependent on the information obtained from source code. Uncovering these forms of vulnerabilities does not require high levels of skills. However, a highly skilled attacker could leverage this form of vulnerability to obtain account information for databases or administrative panels, ultimately leading to control of the application or even the host the application resides on.