Skip to content

Commit

Permalink
cve_lookup: added support for CVSS metrics v4.0+
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke committed Nov 27, 2024
1 parent 00dc1f3 commit 24bdc3e
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 48 deletions.
7 changes: 4 additions & 3 deletions src/plugins/analysis/cve_lookup/code/cve_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from lookup import Lookup

DB_PATH = str(Path(__file__).parent / '../internal/database/cve_cpe.db')
MINIMUM_CRITICAL_SCORE = 9.0


class AnalysisPlugin(AnalysisBasePlugin):
Expand All @@ -31,7 +32,7 @@ class AnalysisPlugin(AnalysisBasePlugin):
DESCRIPTION = 'lookup CVE vulnerabilities'
MIME_BLACKLIST = MIME_BLACKLIST_NON_EXECUTABLE
DEPENDENCIES = ['software_components'] # noqa: RUF012
VERSION = '0.1.0'
VERSION = '0.2.0'
FILE = __file__

def process_object(self, file_object: FileObject) -> FileObject:
Expand Down Expand Up @@ -86,8 +87,8 @@ def add_tags(self, cve_results: dict[str, dict[str, dict[str, str]]], file_objec
return

@staticmethod
def _entry_has_critical_rating(entry: dict[str, str]) -> bool:
def _entry_has_critical_rating(entry: dict[str, dict[str, str]]) -> bool:
"""
Check if the given entry has a critical rating.
"""
return any(entry[key] != 'N/A' and float(entry[key]) >= 9.0 for key in ['score2', 'score3']) # noqa: PLR2004
return any(value != 'N/A' and float(value) >= MINIMUM_CRITICAL_SCORE for value in entry['scores'].values())
44 changes: 23 additions & 21 deletions src/plugins/analysis/cve_lookup/internal/data_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import lzma
import re
from typing import TYPE_CHECKING

import requests
Expand Down Expand Up @@ -42,35 +43,36 @@ def extract_english_summary(descriptions: list) -> str:

def extract_cve_impact(metrics: dict) -> dict[str, str]:
impact = {}
for version in [2, 30, 31]:
cvss_key = f'cvssMetricV{version}'
if cvss_key in metrics:
for entry in metrics[cvss_key]:
if entry['type'] == 'Primary':
impact.setdefault(cvss_key, entry['cvssData']['baseScore'])
elif cvss_key not in impact:
impact[cvss_key] = entry['cvssData']['baseScore']
for cvss_type, cvss_data in metrics.items():
key = cvss_type.replace('cvssMetric', '')
if re.match(r'V\d\d', key):
# V30 / V31 / V40 -> V3.0 / V3.1 / V4.0
key = f'{key[:2]}.{key[2:]}'
for cvss_dict in cvss_data:
if cvss_dict['type'] == 'Primary':
impact.setdefault(key, cvss_dict['cvssData']['baseScore'])
elif key not in impact:
impact[key] = cvss_dict['cvssData']['baseScore']
return impact


def extract_cpe_data(configurations: list) -> list[tuple[str, str, str, str, str]]:
unique_criteria = {}
cpe_entries = []
for configuration in configurations:
for node in configuration['nodes']:
if 'cpeMatch' in node:
for cpe in node['cpeMatch']:
if 'criteria' in cpe and cpe['vulnerable'] and cpe['criteria'] not in unique_criteria:
cpe_entries.append(
(
cpe['criteria'],
cpe.get('versionStartIncluding', ''),
cpe.get('versionStartExcluding', ''),
cpe.get('versionEndIncluding', ''),
cpe.get('versionEndExcluding', ''),
)
for node in configuration.get('nodes', []):
for cpe in node.get('cpeMatch', []):
if 'criteria' in cpe and cpe['vulnerable'] and cpe['criteria'] not in unique_criteria:
cpe_entries.append(
(
cpe['criteria'],
cpe.get('versionStartIncluding', ''),
cpe.get('versionStartExcluding', ''),
cpe.get('versionEndIncluding', ''),
cpe.get('versionEndExcluding', ''),
)
unique_criteria[cpe['criteria']] = True
)
unique_criteria[cpe['criteria']] = True
return cpe_entries


Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

import logging
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING

from sqlalchemy.exc import OperationalError

from ..database.schema import Association, Cpe, Cve

if TYPE_CHECKING:
Expand Down Expand Up @@ -38,5 +41,12 @@ def get_cves(self, cve_ids: list[str]) -> dict[str, Cve]:
"""
Retrieve a dictionary of CVE objects for the given CVE IDs.
"""
cves = self.session.query(Cve).filter(Cve.cve_id.in_(cve_ids)).all()
try:
cves = self.session.query(Cve).filter(Cve.cve_id.in_(cve_ids)).all()
except OperationalError as error:
if 'no such column' in str(error):
logging.error(
"The DB schema of the cve_lookup plugin has changed. Please re-run the plugin's installation."
)
raise
return {cve.cve_id: cve for cve in cves}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@ def create_cve(self, cve_item: CveEntry) -> Cve:
Create a Cve object from a CveEntry object.
"""
year = cve_item.cve_id.split('-')[1]
score_v2 = cve_item.impact.get('cvssMetricV2', 'N/A')
score_v30 = cve_item.impact.get('cvssMetricV30', 'N/A')
score_v3 = cve_item.impact.get('cvssMetricV31', score_v30)
return Cve(
cve_id=cve_item.cve_id,
year=year,
summary=cve_item.summary,
cvss_v2_score=score_v2,
cvss_v3_score=score_v3,
cvss_score=cve_item.impact,
)

def create_cpe(self, cpe_id: str):
Expand Down
5 changes: 2 additions & 3 deletions src/plugins/analysis/cve_lookup/internal/database/schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy import Column, ForeignKey, String
from sqlalchemy import JSON, Column, ForeignKey, String
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()
Expand Down Expand Up @@ -27,8 +27,7 @@ class Cve(Base):
cve_id = Column(String(), primary_key=True)
year = Column(String())
summary = Column(String())
cvss_v2_score = Column(String())
cvss_v3_score = Column(String())
cvss_score = Column(JSON())
cpes = relationship(Association, back_populates='cve')

def __repr__(self) -> str:
Expand Down
3 changes: 1 addition & 2 deletions src/plugins/analysis/cve_lookup/internal/lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ def lookup_vulnerabilities(
if cve:
cpe = cpe_matches.get(association.cpe_id)
vulnerabilities[cve.cve_id] = {
'score2': cve.cvss_v2_score,
'score3': cve.cvss_v3_score,
'scores': cve.cvss_score,
'cpe_version': self._build_version_string(association, cpe),
}

Expand Down
27 changes: 19 additions & 8 deletions src/plugins/analysis/cve_lookup/view/cve_lookup.html
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,33 @@
{% else %}
<td class="p-0 m-0">
<table class="table table-bordered m-0" style="width: 100%;">
{% set cvss_versions = result | get_cvss_versions %}
<tr>
<th>CVE ID</th>
<th style="width: 130px">CVSS v2 score</th>
<th style="width: 130px">CVSS v3 score</th>
<th style="width: 200px">Affected versions</th>
<th class="text-right" rowspan="2" style="min-width: 200px">CVE ID</th>
<th class="text-center" colspan="{{ cvss_versions | length | max(1) }}" style="min-width: 200px">CVSS scores</th>
<th class="text-left" rowspan="2" style="width: 100%;">Affected versions</th>
</tr>
<tr>
{% if (cvss_versions | length) > 0 %}
{% for version in cvss_versions | sort %}
<th class="text-center" style="width: 75px;">{{ version }}</th>
{% endfor %}
{% else %}
{# if there is no entry with a CVSS score, display a row of "N/A" #}
<th class="text-center" style="width: 75px;">N/A</th>
{% endif %}
</tr>
{% for cve_id, entry in result | sort_cve() %}
<tr>
<td>
<td class="text-right">
<a href="https://nvd.nist.gov/vuln/detail/{{ cve_id }}" target="_blank" rel="noopener noreferrer">{{ cve_id }} </a>
</td>
{% for score in [entry.get("score2", "N/A"), entry.get("score3", "N/A")] %}
{% for version in (cvss_versions or ["N/A"]) | sort %}
{% set score = entry.scores.get(version, "N/A") %}
{% set class = "secondary" if score == "N/A" else ("success" if score|float < 4 else ("warning" if score|float < 7 else "danger")) %}
<td><span class="badge badge-{{ class }}" style="font-size: 100%;">{{ score }}</span></td>
<td class="text-center"><span class="badge badge-{{ class }}" style="font-size: 100%;">{{ score }}</span></td>
{% endfor %}
<td>{{ entry.get("cpe_version", "N/A") }}</td>
<td class="text-left">{{ entry.get("cpe_version", "N/A") }}</td>
</tr>
{% endfor %}
</table>
Expand Down
2 changes: 2 additions & 0 deletions src/web_interface/components/jinja_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def _setup_filters(self):
'format_duration': flt.format_duration,
'format_string_list_with_offset': flt.filter_format_string_list_with_offset,
'get_canvas_height': flt.get_canvas_height,
'get_cvss_versions': flt.get_cvss_versions,
'get_searchable_crypto_block': flt.get_searchable_crypto_block,
'get_unique_keys_from_list_of_dicts': flt.get_unique_keys_from_list_of_dicts,
'hex': hex,
Expand All @@ -203,6 +204,7 @@ def _setup_filters(self):
'list_to_line_break_string': flt.list_to_line_break_string,
'list_to_line_break_string_no_sort': flt.list_to_line_break_string_no_sort,
'md5_hash': get_md5,
'max': max,
'min': min,
'nice_generic': flt.generic_nice_representation,
'nice_number': flt.nice_number_filter,
Expand Down
15 changes: 10 additions & 5 deletions src/web_interface/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,16 @@ def sort_cve_results(cve_result: dict[str, dict[str, str]]) -> list[tuple[str, d
return sorted(cve_result.items(), key=_cve_sort_key)


def _cve_sort_key(item: tuple[str, dict[str, str]]) -> tuple[float, float, str]:
def _cve_sort_key(item: tuple[str, dict]) -> tuple[float, float, str]:
"""
primary sorting key: -max(v2 score, v3 score)
secondary sorting key: -min(v2 score, v3 score)
primary sorting key: -max(v2 score, v3.0 score, v3.1 score, v4.0 score, ...)
secondary sorting key: -min(v2 score, v3.0 score, v3.1 score, v4.0 score, ...)
tertiary sorting key: CVE ID
use negative values so that highest scores come first, and we can also sort by CVE ID
"""
v2_score, v3_score = (_cve_score_to_float(item[1].get(key, 0.0)) for key in ['score2', 'score3'])
return -max(v2_score, v3_score), -min(v2_score, v3_score), item[0]
score_dict = item[1]['scores']
scores = {_cve_score_to_float(value) for value in score_dict.values()}
return -max(scores or [0.0]), -min(scores or [0.0]), item[0]


def _cve_score_to_float(score: float | str) -> float:
Expand All @@ -433,6 +434,10 @@ def _cve_score_to_float(score: float | str) -> float:
return 0.0


def get_cvss_versions(cve_result: dict) -> set[str]:
return {score_version for entry in cve_result.values() for score_version in entry['scores']}


def linter_reformat_issues(issues) -> dict[str, list[dict[str, str]]]:
reformatted = defaultdict(list, {})
for issue in issues:
Expand Down

0 comments on commit 24bdc3e

Please sign in to comment.