diff --git a/src/plugins/analysis/cve_lookup/code/cve_lookup.py b/src/plugins/analysis/cve_lookup/code/cve_lookup.py index 489422832..d6e661c17 100644 --- a/src/plugins/analysis/cve_lookup/code/cve_lookup.py +++ b/src/plugins/analysis/cve_lookup/code/cve_lookup.py @@ -20,6 +20,7 @@ from lookup import Lookup DB_PATH = str(Path(__file__).parent / '../internal/database/cve_cpe.db') +MINIMUM_CRITICAL_SCORE = 9.0 class AnalysisPlugin(AnalysisBasePlugin): @@ -31,7 +32,7 @@ class AnalysisPlugin(AnalysisBasePlugin): DESCRIPTION = 'lookup CVE vulnerabilities' MIME_BLACKLIST = MIME_BLACKLIST_NON_EXECUTABLE DEPENDENCIES = ['software_components'] # noqa: RUF012 - VERSION = '0.1.0' + VERSION = '0.2.0' FILE = __file__ def process_object(self, file_object: FileObject) -> FileObject: @@ -86,8 +87,8 @@ def add_tags(self, cve_results: dict[str, dict[str, dict[str, str]]], file_objec return @staticmethod - def _entry_has_critical_rating(entry: dict[str, str]) -> bool: + def _entry_has_critical_rating(entry: dict[str, dict[str, str]]) -> bool: """ Check if the given entry has a critical rating. """ - return any(entry[key] != 'N/A' and float(entry[key]) >= 9.0 for key in ['score2', 'score3']) # noqa: PLR2004 + return any(value != 'N/A' and float(value) >= MINIMUM_CRITICAL_SCORE for value in entry['scores'].values()) diff --git a/src/plugins/analysis/cve_lookup/internal/data_parsing.py b/src/plugins/analysis/cve_lookup/internal/data_parsing.py index d500959c2..4068883ab 100644 --- a/src/plugins/analysis/cve_lookup/internal/data_parsing.py +++ b/src/plugins/analysis/cve_lookup/internal/data_parsing.py @@ -2,6 +2,7 @@ import json import lzma +import re from typing import TYPE_CHECKING import requests @@ -42,14 +43,16 @@ def extract_english_summary(descriptions: list) -> str: def extract_cve_impact(metrics: dict) -> dict[str, str]: impact = {} - for version in [2, 30, 31]: - cvss_key = f'cvssMetricV{version}' - if cvss_key in metrics: - for entry in metrics[cvss_key]: - if entry['type'] == 'Primary': - impact.setdefault(cvss_key, entry['cvssData']['baseScore']) - elif cvss_key not in impact: - impact[cvss_key] = entry['cvssData']['baseScore'] + for cvss_type, cvss_data in metrics.items(): + key = cvss_type.replace('cvssMetric', '') + if re.match(r'V\d\d', key): + # V30 / V31 / V40 -> V3.0 / V3.1 / V4.0 + key = f'{key[:2]}.{key[2:]}' + for cvss_dict in cvss_data: + if cvss_dict['type'] == 'Primary': + impact.setdefault(key, cvss_dict['cvssData']['baseScore']) + elif key not in impact: + impact[key] = cvss_dict['cvssData']['baseScore'] return impact @@ -57,20 +60,19 @@ def extract_cpe_data(configurations: list) -> list[tuple[str, str, str, str, str unique_criteria = {} cpe_entries = [] for configuration in configurations: - for node in configuration['nodes']: - if 'cpeMatch' in node: - for cpe in node['cpeMatch']: - if 'criteria' in cpe and cpe['vulnerable'] and cpe['criteria'] not in unique_criteria: - cpe_entries.append( - ( - cpe['criteria'], - cpe.get('versionStartIncluding', ''), - cpe.get('versionStartExcluding', ''), - cpe.get('versionEndIncluding', ''), - cpe.get('versionEndExcluding', ''), - ) + for node in configuration.get('nodes', []): + for cpe in node.get('cpeMatch', []): + if 'criteria' in cpe and cpe['vulnerable'] and cpe['criteria'] not in unique_criteria: + cpe_entries.append( + ( + cpe['criteria'], + cpe.get('versionStartIncluding', ''), + cpe.get('versionStartExcluding', ''), + cpe.get('versionEndIncluding', ''), + cpe.get('versionEndExcluding', ''), ) - unique_criteria[cpe['criteria']] = True + ) + unique_criteria[cpe['criteria']] = True return cpe_entries diff --git a/src/plugins/analysis/cve_lookup/internal/database/db_interface.py b/src/plugins/analysis/cve_lookup/internal/database/db_interface.py index 556edc25c..403ae3ca6 100644 --- a/src/plugins/analysis/cve_lookup/internal/database/db_interface.py +++ b/src/plugins/analysis/cve_lookup/internal/database/db_interface.py @@ -1,9 +1,12 @@ from __future__ import annotations +import logging from collections import defaultdict from pathlib import Path from typing import TYPE_CHECKING +from sqlalchemy.exc import OperationalError + from ..database.schema import Association, Cpe, Cve if TYPE_CHECKING: @@ -38,5 +41,12 @@ def get_cves(self, cve_ids: list[str]) -> dict[str, Cve]: """ Retrieve a dictionary of CVE objects for the given CVE IDs. """ - cves = self.session.query(Cve).filter(Cve.cve_id.in_(cve_ids)).all() + try: + cves = self.session.query(Cve).filter(Cve.cve_id.in_(cve_ids)).all() + except OperationalError as error: + if 'no such column' in str(error): + logging.error( + "The DB schema of the cve_lookup plugin has changed. Please re-run the plugin's installation." + ) + raise return {cve.cve_id: cve for cve in cves} diff --git a/src/plugins/analysis/cve_lookup/internal/database/db_setup.py b/src/plugins/analysis/cve_lookup/internal/database/db_setup.py index b2db805a1..84a1d2125 100644 --- a/src/plugins/analysis/cve_lookup/internal/database/db_setup.py +++ b/src/plugins/analysis/cve_lookup/internal/database/db_setup.py @@ -26,15 +26,11 @@ def create_cve(self, cve_item: CveEntry) -> Cve: Create a Cve object from a CveEntry object. """ year = cve_item.cve_id.split('-')[1] - score_v2 = cve_item.impact.get('cvssMetricV2', 'N/A') - score_v30 = cve_item.impact.get('cvssMetricV30', 'N/A') - score_v3 = cve_item.impact.get('cvssMetricV31', score_v30) return Cve( cve_id=cve_item.cve_id, year=year, summary=cve_item.summary, - cvss_v2_score=score_v2, - cvss_v3_score=score_v3, + cvss_score=cve_item.impact, ) def create_cpe(self, cpe_id: str): diff --git a/src/plugins/analysis/cve_lookup/internal/database/schema.py b/src/plugins/analysis/cve_lookup/internal/database/schema.py index 2f66d8d0f..ba34dab95 100644 --- a/src/plugins/analysis/cve_lookup/internal/database/schema.py +++ b/src/plugins/analysis/cve_lookup/internal/database/schema.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, ForeignKey, String +from sqlalchemy import JSON, Column, ForeignKey, String from sqlalchemy.orm import declarative_base, relationship Base = declarative_base() @@ -27,8 +27,7 @@ class Cve(Base): cve_id = Column(String(), primary_key=True) year = Column(String()) summary = Column(String()) - cvss_v2_score = Column(String()) - cvss_v3_score = Column(String()) + cvss_score = Column(JSON()) cpes = relationship(Association, back_populates='cve') def __repr__(self) -> str: diff --git a/src/plugins/analysis/cve_lookup/internal/lookup.py b/src/plugins/analysis/cve_lookup/internal/lookup.py index e8dad5ee9..cbc6f609e 100644 --- a/src/plugins/analysis/cve_lookup/internal/lookup.py +++ b/src/plugins/analysis/cve_lookup/internal/lookup.py @@ -56,8 +56,7 @@ def lookup_vulnerabilities( if cve: cpe = cpe_matches.get(association.cpe_id) vulnerabilities[cve.cve_id] = { - 'score2': cve.cvss_v2_score, - 'score3': cve.cvss_v3_score, + 'scores': cve.cvss_score, 'cpe_version': self._build_version_string(association, cpe), } diff --git a/src/plugins/analysis/cve_lookup/view/cve_lookup.html b/src/plugins/analysis/cve_lookup/view/cve_lookup.html index 9d9e8a0d6..a6088533f 100644 --- a/src/plugins/analysis/cve_lookup/view/cve_lookup.html +++ b/src/plugins/analysis/cve_lookup/view/cve_lookup.html @@ -9,22 +9,33 @@ {% else %} + {% set cvss_versions = result | get_cvss_versions %} - - - - + + + + + + {% if (cvss_versions | length) > 0 %} + {% for version in cvss_versions | sort %} + + {% endfor %} + {% else %} + {# if there is no entry with a CVSS score, display a row of "N/A" #} + + {% endif %} {% for cve_id, entry in result | sort_cve() %} - - {% for score in [entry.get("score2", "N/A"), entry.get("score3", "N/A")] %} + {% for version in (cvss_versions or ["N/A"]) | sort %} + {% set score = entry.scores.get(version, "N/A") %} {% set class = "secondary" if score == "N/A" else ("success" if score|float < 4 else ("warning" if score|float < 7 else "danger")) %} - + {% endfor %} - + {% endfor %}
CVE IDCVSS v2 scoreCVSS v3 scoreAffected versionsCVE IDCVSS scoresAffected versions
{{ version }}N/A
+ {{ cve_id }} {{ score }}{{ score }}{{ entry.get("cpe_version", "N/A") }}{{ entry.get("cpe_version", "N/A") }}
diff --git a/src/web_interface/components/jinja_filter.py b/src/web_interface/components/jinja_filter.py index e7fbd2754..484d84335 100644 --- a/src/web_interface/components/jinja_filter.py +++ b/src/web_interface/components/jinja_filter.py @@ -189,6 +189,7 @@ def _setup_filters(self): 'format_duration': flt.format_duration, 'format_string_list_with_offset': flt.filter_format_string_list_with_offset, 'get_canvas_height': flt.get_canvas_height, + 'get_cvss_versions': flt.get_cvss_versions, 'get_searchable_crypto_block': flt.get_searchable_crypto_block, 'get_unique_keys_from_list_of_dicts': flt.get_unique_keys_from_list_of_dicts, 'hex': hex, @@ -203,6 +204,7 @@ def _setup_filters(self): 'list_to_line_break_string': flt.list_to_line_break_string, 'list_to_line_break_string_no_sort': flt.list_to_line_break_string_no_sort, 'md5_hash': get_md5, + 'max': max, 'min': min, 'nice_generic': flt.generic_nice_representation, 'nice_number': flt.nice_number_filter, diff --git a/src/web_interface/filter.py b/src/web_interface/filter.py index 236e9ac9e..747a43431 100644 --- a/src/web_interface/filter.py +++ b/src/web_interface/filter.py @@ -415,15 +415,16 @@ def sort_cve_results(cve_result: dict[str, dict[str, str]]) -> list[tuple[str, d return sorted(cve_result.items(), key=_cve_sort_key) -def _cve_sort_key(item: tuple[str, dict[str, str]]) -> tuple[float, float, str]: +def _cve_sort_key(item: tuple[str, dict]) -> tuple[float, float, str]: """ - primary sorting key: -max(v2 score, v3 score) - secondary sorting key: -min(v2 score, v3 score) + primary sorting key: -max(v2 score, v3.0 score, v3.1 score, v4.0 score, ...) + secondary sorting key: -min(v2 score, v3.0 score, v3.1 score, v4.0 score, ...) tertiary sorting key: CVE ID use negative values so that highest scores come first, and we can also sort by CVE ID """ - v2_score, v3_score = (_cve_score_to_float(item[1].get(key, 0.0)) for key in ['score2', 'score3']) - return -max(v2_score, v3_score), -min(v2_score, v3_score), item[0] + score_dict = item[1]['scores'] + scores = {_cve_score_to_float(value) for value in score_dict.values()} + return -max(scores or [0.0]), -min(scores or [0.0]), item[0] def _cve_score_to_float(score: float | str) -> float: @@ -433,6 +434,10 @@ def _cve_score_to_float(score: float | str) -> float: return 0.0 +def get_cvss_versions(cve_result: dict) -> set[str]: + return {score_version for entry in cve_result.values() for score_version in entry['scores']} + + def linter_reformat_issues(issues) -> dict[str, list[dict[str, str]]]: reformatted = defaultdict(list, {}) for issue in issues: