Skip to content

Commit

Permalink
Add sshaudit parser #8837 (#8838)
Browse files Browse the repository at this point in the history
* 🎉 added ssh_audit importer, first shot #8837

* 🐛 fix unittest typo

* 🎉 added unittests

* flake8
  • Loading branch information
manuel-sommer authored Oct 25, 2023
1 parent 6ff5506 commit 4c65de5
Show file tree
Hide file tree
Showing 6 changed files with 1,115 additions and 0 deletions.
5 changes: 5 additions & 0 deletions docs/content/en/integrations/parsers/file/ssh_audit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: "SSH Audit"
toc_hide: true
---
Import JSON output of ssh_audit report. See <https://github.com/jtesta/ssh-audit>
1 change: 1 addition & 0 deletions dojo/tools/ssh_audit/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__author__ = "manuel_sommer"
204 changes: 204 additions & 0 deletions dojo/tools/ssh_audit/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import json
from dojo.models import Endpoint, Finding


class SSHAuditParser(object):
def get_scan_types(self):
return ["SSH Audit Importer"]

def get_label_for_scan_types(self, scan_type):
return scan_type # no custom label for now

def get_description_for_scan_types(self, scan_type):
return "Import result of SSH Audit JSON output."

def convert_cvss_score(self, raw_value):
"""According to CVSS official numbers https://nvd.nist.gov/vuln-metrics/cvss
None 0.0
Low 0.0-3.9 Low 0.1-3.9
Medium 4.0-6.9 Medium 4.0-6.9
High 7.0-10.0 High 7.0-8.9
Critical 9.0-10.0"""
val = float(raw_value)
if val == 0.0:
return "Info"
elif val < 4.0:
return "Low"
elif val < 7.0:
return "Medium"
elif val < 9.0:
return "High"
else:
return "Critical"

def get_findings(self, filename, test):
items = []
try:
data = json.load(filename)
except ValueError as err:
data = {}
if data != {}:
title = data['banner']['raw']
for cve in data['cves']:
cvename = cve['name']
description = [f"**CVE**: {cvename}"]
description.append(f"**Description**: {cve['description']}")
description.append(f"**Banner**: {title}")
severity = self.convert_cvss_score(raw_value=cve['cvssv2'])
finding = Finding(title=str(title) + "_" + str(cvename),
test=test,
description="\n".join(description),
severity=severity,
static_finding=False)
items.append(finding)
finding.unsaved_endpoints = list()
endpoint = Endpoint(host=data['target'].split(':')[0], port=data['target'].split(':')[1])
finding.unsaved_endpoints.append(endpoint)
for kex in data['kex']:
if 'fail' in kex['notes'] and 'warn' in kex['notes']:
kexname = kex['algorithm']
description = [f"**Algorithm**: {kexname}"]
description.append(f"**Description Failure**: {kex['notes']['fail']}")
description.append(f"**Description Warning**: {kex['notes']['warn']}")
description.append(f"**Info**: {kex['notes']['info']}")
severity = "High"
finding = Finding(title=str(title) + "_" + str(kexname),
test=test,
description="\n".join(description),
severity=severity,
static_finding=False)
items.append(finding)
finding.unsaved_endpoints = list()
endpoint = Endpoint(host=data['target'].split(':')[0], port=data['target'].split(':')[1])
finding.unsaved_endpoints.append(endpoint)
elif 'fail' in kex['notes']:
kexname = kex['algorithm']
description = [f"**Algorithm**: {kexname}"]
description.append(f"**Description Failure**: {kex['notes']['fail']}")
description.append(f"**Info**: {kex['notes']['info']}")
severity = "High"
finding = Finding(title=str(title) + "_" + str(kexname),
test=test,
description="\n".join(description),
severity=severity,
static_finding=False)
items.append(finding)
finding.unsaved_endpoints = list()
endpoint = Endpoint(host=data['target'].split(':')[0], port=data['target'].split(':')[1])
finding.unsaved_endpoints.append(endpoint)
elif 'warn' in kex['notes']:
kexname = kex['algorithm']
description = [f"**Algorithm**: {kexname}"]
description.append(f"**Description Warning**: {kex['notes']['warn']}")
description.append(f"**Info**: {kex['notes']['info']}")
severity = "Medium"
finding = Finding(title=str(title) + "_" + str(kexname),
test=test,
description="\n".join(description),
severity=severity,
static_finding=False)
items.append(finding)
finding.unsaved_endpoints = list()
endpoint = Endpoint(host=data['target'].split(':')[0], port=data['target'].split(':')[1])
finding.unsaved_endpoints.append(endpoint)
for key in data['key']:
if 'fail' in key['notes'] and 'warn' in key['notes']:
keyname = key['algorithm']
description = [f"**Algorithm**: {keyname}"]
description.append(f"**Description Failure**: {key['notes']['fail']}")
description.append(f"**Description Warning**: {key['notes']['warn']}")
if 'keysize' in key:
description.append(f"**KeySize**: {key['keysize']}")
description.append(f"**Info**: {key['notes']['info']}")
severity = "High"
finding = Finding(title=str(title) + "_" + str(keyname),
test=test,
description="\n".join(description),
severity=severity,
static_finding=False)
items.append(finding)
finding.unsaved_endpoints = list()
endpoint = Endpoint(host=data['target'].split(':')[0], port=data['target'].split(':')[1])
finding.unsaved_endpoints.append(endpoint)
elif 'fail' in key['notes']:
keyname = key['algorithm']
description = [f"**Algorithm**: {keyname}"]
description.append(f"**Description Failure**: {key['notes']['fail']}")
if 'keysize' in key:
description.append(f"**KeySize**: {key['keysize']}")
description.append(f"**Info**: {key['notes']['info']}")
severity = "High"
finding = Finding(title=str(title) + "_" + str(keyname),
test=test,
description="\n".join(description),
severity=severity,
static_finding=False)
items.append(finding)
finding.unsaved_endpoints = list()
endpoint = Endpoint(host=data['target'].split(':')[0], port=data['target'].split(':')[1])
finding.unsaved_endpoints.append(endpoint)
elif 'warn' in key['notes']:
keyname = key['algorithm']
description = [f"**Algorithm**: {keyname}"]
description.append(f"**Description Warning**: {key['notes']['warn']}")
if 'keysize' in key:
description.append(f"**KeySize**: {key['keysize']}")
description.append(f"**Info**: {key['notes']['info']}")
severity = "Medium"
finding = Finding(title=str(title) + "_" + str(keyname),
test=test,
description="\n".join(description),
severity=severity,
static_finding=False)
items.append(finding)
finding.unsaved_endpoints = list()
endpoint = Endpoint(host=data['target'].split(':')[0], port=data['target'].split(':')[1])
finding.unsaved_endpoints.append(endpoint)
for mac in data['mac']:
if 'fail' in mac['notes'] and 'warn' in mac['notes']:
macname = mac['algorithm']
description = [f"**Algorithm**: {macname}"]
description.append(f"**Description Failure**: {mac['notes']['fail']}")
description.append(f"**Description Warning**: {mac['notes']['warn']}")
description.append(f"**Info**: {mac['notes']['info']}")
severity = "High"
finding = Finding(title=str(title) + "_" + str(macname),
test=test,
description="\n".join(description),
severity=severity,
static_finding=False)
items.append(finding)
finding.unsaved_endpoints = list()
endpoint = Endpoint(host=data['target'].split(':')[0], port=data['target'].split(':')[1])
finding.unsaved_endpoints.append(endpoint)
elif 'fail' in mac['notes']:
macname = mac['algorithm']
description = [f"**Algorithm**: {macname}"]
description.append(f"**Description Failure**: {mac['notes']['fail']}")
description.append(f"**Info**: {mac['notes']['info']}")
severity = "High"
finding = Finding(title=str(title) + "_" + str(macname),
test=test,
description="\n".join(description),
severity=severity,
static_finding=False)
items.append(finding)
finding.unsaved_endpoints = list()
endpoint = Endpoint(host=data['target'].split(':')[0], port=data['target'].split(':')[1])
finding.unsaved_endpoints.append(endpoint)
elif 'warn' in mac['notes']:
macname = mac['algorithm']
description = [f"**Algorithm**: {macname}"]
description.append(f"**Description Warning**: {mac['notes']['warn']}")
description.append(f"**Info**: {mac['notes']['info']}")
severity = "Medium"
finding = Finding(title=str(title) + "_" + str(macname),
test=test,
description="\n".join(description),
severity=severity,
static_finding=False)
items.append(finding)
finding.unsaved_endpoints = list()
endpoint = Endpoint(host=data['target'].split(':')[0], port=data['target'].split(':')[1])
finding.unsaved_endpoints.append(endpoint)
return items
Loading

0 comments on commit 4c65de5

Please sign in to comment.