-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
✨ merge acunetix and acunetix360 (#9522)
* ✨ merge acunetix and acunetix360 * move unittests together * merge unittests * fix unittests * remove acunetix360 * update docs * update get_description_for_scan_types * flake8 * 🐛 fix unittests * add db migrations * flake8,rufflinter * resolve db migrations problem in advance * fix db migrations according to latest dev * 🐛 fix, see PR 9606 * basic structure update * update acunetix xml * update acunetix360 json * 🐛 fix * flake8 * update * 🚧 db migration revert option * revert last commit * remove deduplication setting * update db migrations * adapt db migrations * fix db migrations according to latest dev
- Loading branch information
1 parent
aafdc41
commit 150b4b4
Showing
16 changed files
with
368 additions
and
351 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
from django.db import migrations | ||
import logging | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
PARSER_REFERENCES = ['Acunetix360 Scan'] | ||
|
||
|
||
def update_parser_test(test, parser_test_type) -> None: | ||
if test.test_type.name in PARSER_REFERENCES or test.scan_type in PARSER_REFERENCES: | ||
test.test_type = parser_test_type | ||
test.scan_type = parser_test_type.name | ||
test.save() | ||
|
||
|
||
# Update the found_by field to remove Acunetix360 and add Acunetix | ||
def update_parser_finding(finding, newparser_test_type, parser_test_type) -> None: | ||
# Check if nessus is in found by list and remove | ||
if parser_test_type in finding.found_by.all(): | ||
finding.found_by.remove(parser_test_type.id) | ||
# Check if tenable is already in list somehow before adding it | ||
if newparser_test_type not in finding.found_by.all(): | ||
finding.found_by.add(newparser_test_type.id) | ||
finding.save() | ||
|
||
|
||
# Update all finding objects that came from Acunetix360 reports | ||
def forward_merge_parser(apps, schema_editor): | ||
finding_model = apps.get_model('dojo', 'Finding') | ||
test_type_model = apps.get_model('dojo', 'Test_Type') | ||
# Get or create Acunetix Scan Test Type and fetch the Acunetix360 Scan test types | ||
newparser_test_type, _ = test_type_model.objects.get_or_create(name="Acunetix Scan", defaults={"active": True}) | ||
parser_test_type = test_type_model.objects.filter(name="Acunetix360 Scan").first() | ||
# Get all the findings found by Acunetix360 Scan | ||
findings = finding_model.objects.filter(test__scan_type__in=PARSER_REFERENCES) | ||
logger.warning(f'We identified {findings.count()} Acunetix360 Scan findings to migrate to Acunetix Scan findings') | ||
# Iterate over all findings and change | ||
for finding in findings: | ||
# Update the found by field | ||
update_parser_finding(finding, newparser_test_type, parser_test_type) | ||
# Update the test object | ||
update_parser_test(finding.test, newparser_test_type) | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
('dojo', '0207_alter_sonarqube_issue_key'), | ||
] | ||
|
||
operations = [ | ||
migrations.RunPython(forward_merge_parser), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
import hashlib | ||
import dateutil | ||
import html2text | ||
import logging | ||
import hyperlink | ||
from cvss import parser as cvss_parser | ||
from defusedxml.ElementTree import parse | ||
from dojo.models import Endpoint, Finding | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class AcunetixXMLParser(object): | ||
"""This parser is written for Acunetix XML reports""" | ||
def get_findings(self, filename, test): | ||
dupes = dict() | ||
root = parse(filename).getroot() | ||
for scan in root.findall("Scan"): | ||
start_url = scan.findtext("StartURL") | ||
if ":" not in start_url: | ||
start_url = "//" + start_url | ||
# get report date | ||
if scan.findtext("StartTime") and "" != scan.findtext("StartTime"): | ||
report_date = dateutil.parser.parse( | ||
scan.findtext("StartTime") | ||
).date() | ||
for item in scan.findall("ReportItems/ReportItem"): | ||
finding = Finding( | ||
test=test, | ||
title=item.findtext("Name"), | ||
severity=self.get_severity(item.findtext("Severity")), | ||
description=html2text.html2text( | ||
item.findtext("Description") | ||
).strip(), | ||
false_p=self.get_false_positive( | ||
item.findtext("IsFalsePositive") | ||
), | ||
static_finding=True, | ||
dynamic_finding=False, | ||
nb_occurences=1, | ||
) | ||
if item.findtext("Impact") and "" != item.findtext("Impact"): | ||
finding.impact = item.findtext("Impact") | ||
if item.findtext("Recommendation") and "" != item.findtext( | ||
"Recommendation" | ||
): | ||
finding.mitigation = item.findtext("Recommendation") | ||
if report_date: | ||
finding.date = report_date | ||
if item.findtext("CWEList/CWE"): | ||
finding.cwe = self.get_cwe_number( | ||
item.findtext("CWEList/CWE") | ||
) | ||
references = [] | ||
for reference in item.findall("References/Reference"): | ||
url = reference.findtext("URL") | ||
db = reference.findtext("Database") or url | ||
references.append(" * [{}]({})".format(db, url)) | ||
if len(references) > 0: | ||
finding.references = "\n".join(references) | ||
if item.findtext("CVSS3/Descriptor"): | ||
cvss_objects = cvss_parser.parse_cvss_from_text( | ||
item.findtext("CVSS3/Descriptor") | ||
) | ||
if len(cvss_objects) > 0: | ||
finding.cvssv3 = cvss_objects[0].clean_vector() | ||
# more description are in "Details" | ||
if ( | ||
item.findtext("Details") | ||
and len(item.findtext("Details").strip()) > 0 | ||
): | ||
finding.description += "\n\n**Details:**\n{}".format( | ||
html2text.html2text(item.findtext("Details")) | ||
) | ||
if ( | ||
item.findtext("TechnicalDetails") | ||
and len(item.findtext("TechnicalDetails").strip()) > 0 | ||
): | ||
finding.description += ( | ||
"\n\n**TechnicalDetails:**\n\n{}".format( | ||
item.findtext("TechnicalDetails") | ||
) | ||
) | ||
# add requests | ||
finding.unsaved_req_resp = list() | ||
if len(item.findall("TechnicalDetails/Request")): | ||
finding.dynamic_finding = ( | ||
True # if there is some requests it's dynamic | ||
) | ||
finding.static_finding = ( | ||
False # if there is some requests it's dynamic | ||
) | ||
for request in item.findall("TechnicalDetails/Request"): | ||
finding.unsaved_req_resp.append( | ||
{"req": (request.text or ""), "resp": ""} | ||
) | ||
# manage the endpoint | ||
url = hyperlink.parse(start_url) | ||
endpoint = Endpoint( | ||
host=url.host, | ||
port=url.port, | ||
path=item.findtext("Affects"), | ||
) | ||
if url.scheme is not None and "" != url.scheme: | ||
endpoint.protocol = url.scheme | ||
finding.unsaved_endpoints = [endpoint] | ||
dupe_key = hashlib.sha256( | ||
"|".join( | ||
[ | ||
finding.title, | ||
str(finding.impact), | ||
str(finding.mitigation), | ||
] | ||
).encode("utf-8") | ||
).hexdigest() | ||
if dupe_key in dupes: | ||
find = dupes[dupe_key] | ||
# add details for the duplicate finding | ||
if ( | ||
item.findtext("Details") | ||
and len(item.findtext("Details").strip()) > 0 | ||
): | ||
find.description += ( | ||
"\n-----\n\n**Details:**\n{}".format( | ||
html2text.html2text(item.findtext("Details")) | ||
) | ||
) | ||
find.unsaved_endpoints.extend(finding.unsaved_endpoints) | ||
find.unsaved_req_resp.extend(finding.unsaved_req_resp) | ||
find.nb_occurences += finding.nb_occurences | ||
logger.debug( | ||
"Duplicate finding : {defectdojo_title}".format( | ||
defectdojo_title=finding.title | ||
) | ||
) | ||
else: | ||
dupes[dupe_key] = finding | ||
return list(dupes.values()) | ||
|
||
def get_cwe_number(self, cwe): | ||
""" | ||
Returns cwe number. | ||
:param cwe: | ||
:return: cwe number | ||
""" | ||
if cwe is None: | ||
return None | ||
else: | ||
return int(cwe.split("-")[1]) | ||
|
||
def get_severity(self, severity): | ||
""" | ||
Returns Severity as per DefectDojo standards. | ||
:param severity: | ||
:return: | ||
""" | ||
if severity == "high": | ||
return "High" | ||
elif severity == "medium": | ||
return "Medium" | ||
elif severity == "low": | ||
return "Low" | ||
elif severity == "informational": | ||
return "Info" | ||
else: | ||
return "Critical" | ||
|
||
def get_false_positive(self, false_p): | ||
""" | ||
Returns True, False for false positive as per DefectDojo standards. | ||
:param false_p: | ||
:return: | ||
""" | ||
if false_p: | ||
return True | ||
else: | ||
return False |
Oops, something went wrong.