Skip to content

Commit

Permalink
🎉 merge OpenVAS XML and CSV parsers (#9322)
Browse files Browse the repository at this point in the history
* 🎉 merge OpenVAS XML and CSV formats to have only one parser for OpenVAS

* Update docs/content/en/integrations/parsers/file/openvas.md

Co-authored-by: Charles Neill <[email protected]>

---------

Co-authored-by: Charles Neill <[email protected]>
  • Loading branch information
manuel-sommer and cneill authored Jan 19, 2024
1 parent 67c8f9f commit ca23b91
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 181 deletions.
5 changes: 5 additions & 0 deletions docs/content/en/integrations/parsers/file/openvas.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: "OpenVAS Parser"
toc_hide: true
---
You can either upload the exported results of an OpenVAS Scan in a .csv or .xml format.
5 changes: 0 additions & 5 deletions docs/content/en/integrations/parsers/file/openvas_csv.md

This file was deleted.

5 changes: 0 additions & 5 deletions docs/content/en/integrations/parsers/file/openvas_xml.md

This file was deleted.

File renamed without changes.
159 changes: 106 additions & 53 deletions dojo/tools/openvas_csv/parser.py → dojo/tools/openvas/parser.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import csv
import hashlib
import io

from dateutil.parser import parse

from dojo.models import Endpoint, Finding
from xml.dom import NamespaceErr
from defusedxml import ElementTree as ET
from dojo.models import Finding, Endpoint


class ColumnMappingStrategy(object):
Expand Down Expand Up @@ -194,7 +194,7 @@ def map_column_value(self, finding, column_value):
finding.duplicate = self.evaluate_bool_value(column_value)


class OpenVASCsvParser(object):
class OpenVASParser(object):
def create_chain(self):
date_column_strategy = DateColumnMappingStrategy()
title_column_strategy = TitleColumnMappingStrategy()
Expand Down Expand Up @@ -240,62 +240,115 @@ def read_column_names(self, row):
return column_names

def get_scan_types(self):
return ["OpenVAS CSV"]
return ["OpenVAS Parser"]

def get_label_for_scan_types(self, scan_type):
return scan_type # no custom label for now

def get_description_for_scan_types(self, scan_type):
return "Import OpenVAS Scan in CSV format. Export as CSV Results on OpenVAS."
return "Import CSV or XML output of Greenbone OpenVAS report."

def convert_cvss_score(self, raw_value):
val = float(raw_value)
if val == 0.0:
return "Info"
elif val < 4.0:
return "Low"
elif val < 7.0:
return "Medium"
elif val < 9.0:
return "High"
else:
return "Critical"

def get_findings(self, filename, test):
column_names = dict()
dupes = dict()
chain = self.create_chain()
if str(filename.name).endswith('.csv'):
column_names = dict()
dupes = dict()
chain = self.create_chain()

content = filename.read()
if isinstance(content, bytes):
content = content.decode("utf-8")
reader = csv.reader(io.StringIO(content), delimiter=",", quotechar='"')

row_number = 0
for row in reader:
finding = Finding(test=test)
finding.unsaved_endpoints = [Endpoint()]

if row_number == 0:
column_names = self.read_column_names(row)
row_number += 1
continue

column_number = 0
for column in row:
chain.process_column(
column_names[column_number], column, finding
)
column_number += 1

if finding is not None and row_number > 0:
if finding.title is None:
finding.title = ""
if finding.description is None:
finding.description = ""

key = hashlib.sha256(
(
str(finding.unsaved_endpoints[0])
+ "|"
+ finding.severity
+ "|"
+ finding.title
+ "|"
+ finding.description
).encode("utf-8")
).hexdigest()

if key not in dupes:
dupes[key] = finding

content = filename.read()
if isinstance(content, bytes):
content = content.decode("utf-8")
reader = csv.reader(io.StringIO(content), delimiter=",", quotechar='"')

row_number = 0
for row in reader:
finding = Finding(test=test)
finding.unsaved_endpoints = [Endpoint()]

if row_number == 0:
column_names = self.read_column_names(row)
row_number += 1
continue

column_number = 0
for column in row:
chain.process_column(
column_names[column_number], column, finding
return list(dupes.values())
elif str(filename.name).endswith('.xml'):
findings = []
tree = ET.parse(filename)
root = tree.getroot()
if "report" not in root.tag:
raise NamespaceErr(
"This doesn't seem to be a valid Greenbone OpenVAS XML file."
)
report = root.find("report")
results = report.find("results")
for result in results:
for finding in result:
if finding.tag == "name":
title = finding.text
description = [f"**Name**: {finding.text}"]
if finding.tag == "host":
title = title + "_" + finding.text
description.append(f"**Host**: {finding.text}")
if finding.tag == "port":
title = title + "_" + finding.text
description.append(f"**Port**: {finding.text}")
if finding.tag == "nvt":
description.append(f"**NVT**: {finding.text}")
if finding.tag == "severity":
severity = self.convert_cvss_score(finding.text)
description.append(f"**Severity**: {finding.text}")
if finding.tag == "qod":
description.append(f"**QOD**: {finding.text}")
if finding.tag == "description":
description.append(f"**Description**: {finding.text}")

finding = Finding(
title=str(title),
description="\n".join(description),
severity=severity,
dynamic_finding=True,
static_finding=False
)
column_number += 1

if finding is not None and row_number > 0:
if finding.title is None:
finding.title = ""
if finding.description is None:
finding.description = ""

key = hashlib.sha256(
(
str(finding.unsaved_endpoints[0])
+ "|"
+ finding.severity
+ "|"
+ finding.title
+ "|"
+ finding.description
).encode("utf-8")
).hexdigest()

if key not in dupes:
dupes[key] = finding

row_number += 1

return list(dupes.values())
findings.append(finding)
return findings
Empty file removed dojo/tools/openvas_csv/__init__.py
Empty file.
68 changes: 0 additions & 68 deletions dojo/tools/openvas_xml/parser.py

This file was deleted.

File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from ..dojo_test_case import DojoTestCase
from dojo.tools.openvas_csv.parser import OpenVASCsvParser
from dojo.tools.openvas.parser import OpenVASParser
from dojo.models import Test, Engagement, Product


class TestOpenVASUploadCsvParser(DojoTestCase):

class TestOpenVASParser(DojoTestCase):
def test_openvas_csv_one_vuln(self):
with open("unittests/scans/openvas_csv/one_vuln.csv") as f:
with open("unittests/scans/openvas/one_vuln.csv") as f:
test = Test()
test.engagement = Engagement()
test.engagement.product = Product()
parser = OpenVASCsvParser()
parser = OpenVASParser()
findings = parser.get_findings(f, test)
for finding in findings:
for endpoint in finding.unsaved_endpoints:
Expand All @@ -27,11 +26,11 @@ def test_openvas_csv_one_vuln(self):
self.assertEqual(22, findings[0].unsaved_endpoints[0].port)

def test_openvas_csv_many_vuln(self):
with open("unittests/scans/openvas_csv/many_vuln.csv") as f:
with open("unittests/scans/openvas/many_vuln.csv") as f:
test = Test()
test.engagement = Engagement()
test.engagement.product = Product()
parser = OpenVASCsvParser()
parser = OpenVASParser()
findings = parser.get_findings(f, test)
for finding in findings:
for endpoint in finding.unsaved_endpoints:
Expand All @@ -48,3 +47,40 @@ def test_openvas_csv_many_vuln(self):
self.assertEqual("LOGSRV", endpoint.host)
self.assertEqual("tcp", endpoint.protocol)
self.assertEqual(9200, endpoint.port)

def test_openvas_xml_no_vuln(self):
with open("unittests/scans/openvas/no_vuln.xml") as f:
test = Test()
test.engagement = Engagement()
test.engagement.product = Product()
parser = OpenVASParser()
findings = parser.get_findings(f, test)
self.assertEqual(0, len(findings))

def test_openvas_xml_one_vuln(self):
with open("unittests/scans/openvas/one_vuln.xml") as f:
test = Test()
test.engagement = Engagement()
test.engagement.product = Product()
parser = OpenVASParser()
findings = parser.get_findings(f, test)
for finding in findings:
for endpoint in finding.unsaved_endpoints:
endpoint.clean()
self.assertEqual(1, len(findings))
with self.subTest(i=0):
finding = findings[0]
self.assertEqual("Mozilla Firefox Security Update (mfsa_2023-32_2023-36) - Windows_10.0.101.2_general/tcp", finding.title)
self.assertEqual("Critical", finding.severity)

def test_openvas_xml_many_vuln(self):
with open("unittests/scans/openvas/many_vuln.xml") as f:
test = Test()
test.engagement = Engagement()
test.engagement.product = Product()
parser = OpenVASParser()
findings = parser.get_findings(f, test)
for finding in findings:
for endpoint in finding.unsaved_endpoints:
endpoint.clean()
self.assertEqual(44, len(findings))
43 changes: 0 additions & 43 deletions unittests/tools/test_openvas_xml_parser.py

This file was deleted.

0 comments on commit ca23b91

Please sign in to comment.