Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cve_lookup: added support for CVSS metrics v4.0+ #1233

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/plugins/analysis/cve_lookup/code/cve_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from lookup import Lookup

DB_PATH = str(Path(__file__).parent / '../internal/database/cve_cpe.db')
MINIMUM_CRITICAL_SCORE = 9.0


class AnalysisPlugin(AnalysisBasePlugin):
Expand All @@ -31,7 +32,7 @@ class AnalysisPlugin(AnalysisBasePlugin):
DESCRIPTION = 'lookup CVE vulnerabilities'
MIME_BLACKLIST = MIME_BLACKLIST_NON_EXECUTABLE
DEPENDENCIES = ['software_components'] # noqa: RUF012
VERSION = '0.1.0'
VERSION = '0.2.0'
FILE = __file__

def process_object(self, file_object: FileObject) -> FileObject:
Expand Down Expand Up @@ -86,8 +87,8 @@ def add_tags(self, cve_results: dict[str, dict[str, dict[str, str]]], file_objec
return

@staticmethod
def _entry_has_critical_rating(entry: dict[str, str]) -> bool:
def _entry_has_critical_rating(entry: dict[str, dict[str, str]]) -> bool:
"""
Check if the given entry has a critical rating.
"""
return any(entry[key] != 'N/A' and float(entry[key]) >= 9.0 for key in ['score2', 'score3']) # noqa: PLR2004
return any(value != 'N/A' and float(value) >= MINIMUM_CRITICAL_SCORE for value in entry['scores'].values())
43 changes: 22 additions & 21 deletions src/plugins/analysis/cve_lookup/internal/data_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import lzma
import re
from typing import TYPE_CHECKING

import requests
Expand Down Expand Up @@ -42,35 +43,35 @@ def extract_english_summary(descriptions: list) -> str:

def extract_cve_impact(metrics: dict) -> dict[str, str]:
impact = {}
for version in [2, 30, 31]:
cvss_key = f'cvssMetricV{version}'
if cvss_key in metrics:
for entry in metrics[cvss_key]:
if entry['type'] == 'Primary':
impact.setdefault(cvss_key, entry['cvssData']['baseScore'])
elif cvss_key not in impact:
impact[cvss_key] = entry['cvssData']['baseScore']
for cvss_type, cvss_data in metrics.items():
key = cvss_type.replace('cvssMetric', '')
if re.match(r'V\d\d', key):
# V30 / V31 / V40 -> V3.0 / V3.1 / V4.0
key = f'{key[:2]}.{key[2:]}'
for cvss_dict in cvss_data:
score = str(cvss_dict['cvssData']['baseScore'])
if cvss_dict['type'] == 'Primary' or key not in impact:
impact[key] = score
return impact


def extract_cpe_data(configurations: list) -> list[tuple[str, str, str, str, str]]:
unique_criteria = {}
cpe_entries = []
for configuration in configurations:
for node in configuration['nodes']:
if 'cpeMatch' in node:
for cpe in node['cpeMatch']:
if 'criteria' in cpe and cpe['vulnerable'] and cpe['criteria'] not in unique_criteria:
cpe_entries.append(
(
cpe['criteria'],
cpe.get('versionStartIncluding', ''),
cpe.get('versionStartExcluding', ''),
cpe.get('versionEndIncluding', ''),
cpe.get('versionEndExcluding', ''),
)
for node in configuration.get('nodes', []):
for cpe in node.get('cpeMatch', []):
if 'criteria' in cpe and cpe['vulnerable'] and cpe['criteria'] not in unique_criteria:
cpe_entries.append(
(
cpe['criteria'],
cpe.get('versionStartIncluding', ''),
cpe.get('versionStartExcluding', ''),
cpe.get('versionEndIncluding', ''),
cpe.get('versionEndExcluding', ''),
)
unique_criteria[cpe['criteria']] = True
)
unique_criteria[cpe['criteria']] = True
return cpe_entries


Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

import logging
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING

from sqlalchemy.exc import OperationalError

from ..database.schema import Association, Cpe, Cve

if TYPE_CHECKING:
Expand Down Expand Up @@ -38,5 +41,12 @@ def get_cves(self, cve_ids: list[str]) -> dict[str, Cve]:
"""
Retrieve a dictionary of CVE objects for the given CVE IDs.
"""
cves = self.session.query(Cve).filter(Cve.cve_id.in_(cve_ids)).all()
try:
cves = self.session.query(Cve).filter(Cve.cve_id.in_(cve_ids)).all()
except OperationalError as error:
if 'no such column' in str(error):
logging.error(
"The DB schema of the cve_lookup plugin has changed. Please re-run the plugin's installation."
)
raise
return {cve.cve_id: cve for cve in cves}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@ def create_cve(self, cve_item: CveEntry) -> Cve:
Create a Cve object from a CveEntry object.
"""
year = cve_item.cve_id.split('-')[1]
score_v2 = cve_item.impact.get('cvssMetricV2', 'N/A')
score_v30 = cve_item.impact.get('cvssMetricV30', 'N/A')
score_v3 = cve_item.impact.get('cvssMetricV31', score_v30)
return Cve(
cve_id=cve_item.cve_id,
year=year,
summary=cve_item.summary,
cvss_v2_score=score_v2,
cvss_v3_score=score_v3,
cvss_score=cve_item.impact,
)

def create_cpe(self, cpe_id: str):
Expand Down
5 changes: 2 additions & 3 deletions src/plugins/analysis/cve_lookup/internal/database/schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy import Column, ForeignKey, String
from sqlalchemy import JSON, Column, ForeignKey, String
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()
Expand Down Expand Up @@ -27,8 +27,7 @@ class Cve(Base):
cve_id = Column(String(), primary_key=True)
year = Column(String())
summary = Column(String())
cvss_v2_score = Column(String())
cvss_v3_score = Column(String())
cvss_score = Column(JSON())
cpes = relationship(Association, back_populates='cve')

def __repr__(self) -> str:
Expand Down
3 changes: 1 addition & 2 deletions src/plugins/analysis/cve_lookup/internal/lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ def lookup_vulnerabilities(
if cve:
cpe = cpe_matches.get(association.cpe_id)
vulnerabilities[cve.cve_id] = {
'score2': cve.cvss_v2_score,
'score3': cve.cvss_v3_score,
'scores': cve.cvss_score,
'cpe_version': self._build_version_string(association, cpe),
}

Expand Down
39 changes: 13 additions & 26 deletions src/plugins/analysis/cve_lookup/test/test_busybox_cve_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
"A use-after-free in Busybox's awk applet leads to denial of service and possibly code execution when "
'processing a crafted awk pattern in the evaluate function'
),
cvss_v2_score='6.5',
cvss_v3_score='7.2',
cvss_score={'V2': '6.5', 'V3.0': '7.2'},
),
'CVE-2021-42379': Cve(
cve_id='CVE-2021-42379',
Expand All @@ -21,8 +20,7 @@
"A use-after-free in Busybox's awk applet leads to denial of service and possibly code execution when "
' processing a crafted awk pattern in the next_input_file function'
),
cvss_v2_score='6.5',
cvss_v3_score='7.2',
cvss_score={'V2': '6.5', 'V3.0': '7.2'},
),
'CVE-2021-42381': Cve(
cve_id='CVE-2021-42381',
Expand All @@ -31,8 +29,7 @@
"A use-after-free in Busybox's awk applet leads to denial of service and possibly code execution when "
'processing a crafted awk pattern in the hash_init function'
),
cvss_v2_score='6.5',
cvss_v3_score='7.2',
cvss_score={'V2': '6.5', 'V3.0': '7.2'},
),
'CVE-2021-28831': Cve(
cve_id='CVE-2021-28831',
Expand All @@ -41,8 +38,7 @@
'decompress_gunzip.c in BusyBox through 1.32.1 mishandles the error bit on the huft_build result pointer, '
'with a resultant invalid free or segmentation fault, via malformed gzip data.'
),
cvss_v2_score='5.0',
cvss_v3_score='7.5',
cvss_score={'V2': '5.0', 'V3.0': '7.5'},
),
'CVE-2021-42386': Cve(
cve_id='CVE-2021-42386',
Expand All @@ -51,8 +47,7 @@
"A use-after-free in Busybox's awk applet leads to denial of service and possibly code execution when "
'processing a crafted awk pattern in the nvalloc function'
),
cvss_v2_score='6.5',
cvss_v3_score='7.2',
cvss_score={'V2': '6.5', 'V3.0': '7.2'},
),
'CVE-2021-42380': Cve(
cve_id='CVE-2021-42380',
Expand All @@ -61,8 +56,7 @@
"A use-after-free in Busybox's awk applet leads to denial of service and possibly code execution when "
'processing a crafted awk pattern in the clrvar function'
),
cvss_v2_score='6.5',
cvss_v3_score='7.2',
cvss_score={'V2': '6.5', 'V3.0': '7.2'},
),
'CVE-2021-42376': Cve(
cve_id='CVE-2021-42376',
Expand All @@ -72,8 +66,7 @@
'crafted shell command, due to missing validation after a \\x03 delimiter character. This may be used '
'for DoS under very rare conditions of filtered command input.'
),
cvss_v2_score='1.9',
cvss_v3_score='5.5',
cvss_score={'V2': '1.9', 'V3.0': '5.5'},
),
'CVE-2022-28391': Cve(
cve_id='CVE-2022-28391',
Expand All @@ -83,8 +76,7 @@
"DNS PTR record's value to a VT compatible terminal. Alternatively, the attacker could choose to change "
"the terminal's colors."
),
cvss_v2_score='6.8',
cvss_v3_score='8.8',
cvss_score={'V2': '6.8', 'V3.0': '8.8'},
),
'CVE-2021-42384': Cve(
cve_id='CVE-2021-42384',
Expand All @@ -93,8 +85,7 @@
"A use-after-free in Busybox's awk applet leads to denial of service and possibly code execution when "
'processing a crafted awk pattern in the handle_special function'
),
cvss_v2_score='6.5',
cvss_v3_score='7.2',
cvss_score={'V2': '6.5', 'V3.0': '7.2'},
),
'CVE-2021-42374': Cve(
cve_id='CVE-2021-42374',
Expand All @@ -104,8 +95,7 @@
'when crafted LZMA-compressed input is decompressed. This can be triggered by any '
'applet/format that'
),
cvss_v2_score='3.3',
cvss_v3_score='5.3',
cvss_score={'V2': '3.3', 'V3.0': '5.3'},
),
'CVE-2021-42378': Cve(
cve_id='CVE-2021-42378',
Expand All @@ -114,8 +104,7 @@
"A use-after-free in Busybox's awk applet leads to denial of service and possibly code execution when "
'processing a crafted awk pattern in the getvar_i function'
),
cvss_v2_score='6.5',
cvss_v3_score='7.2',
cvss_score={'V2': '6.5', 'V3.0': '7.2'},
),
'CVE-2021-42382': Cve(
cve_id='CVE-2021-42382',
Expand All @@ -124,8 +113,7 @@
"A use-after-free in Busybox's awk applet leads to denial of service and possibly code execution when "
'processing a crafted awk pattern in the getvar_s function'
),
cvss_v2_score='6.5',
cvss_v3_score='7.2',
cvss_score={'V2': '6.5', 'V3.0': '7.2'},
),
'CVE-2022-30065': Cve(
cve_id='CVE-2022-30065',
Expand All @@ -134,8 +122,7 @@
"A use-after-free in Busybox 1.35-x's awk applet leads to denial of service and possibly code execution "
'when processing a crafted awk pattern in the copyvar function.'
),
cvss_v2_score='6.8',
cvss_v3_score='7.8',
cvss_score={'V2': '6.8', 'V3.0': '7.8'},
),
}

Expand Down
10 changes: 5 additions & 5 deletions src/plugins/analysis/cve_lookup/test/test_cve_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def test_process_object(self, analysis_plugin):
@pytest.mark.parametrize(('cve_score', 'should_be_tagged'), [('9.9', True), ('5.5', False)])
def test_add_tags(self, analysis_plugin, cve_score, should_be_tagged):
TEST_FW.processed_analysis['cve_lookup'] = {}
cve_results = {'component': {'cve_id': {'score2': cve_score, 'score3': 'N/A'}}}
cve_results = {'component': {'cve_id': {'scores': {'V2': cve_score, 'V3.1': 'N/A'}}}}
analysis_plugin.add_tags(cve_results, TEST_FW)
if should_be_tagged:
assert 'tags' in TEST_FW.processed_analysis['cve_lookup']
Expand All @@ -96,13 +96,13 @@ def test_add_tags(self, analysis_plugin, cve_score, should_be_tagged):
('cve_results_dict', 'expected_output'),
[
({}, []),
({'component': {'cve_id': {'score2': '6.4', 'score3': 'N/A'}}}, ['component']),
({'component': {'cve_id': {'score2': '9.4', 'score3': 'N/A'}}}, ['component (CRITICAL)']),
({'component': {'cve_id': {'scores': {'V2': '6.4', 'V3.1': 'N/A'}}}}, ['component']),
({'component': {'cve_id': {'scores': {'V2': '9.4', 'V3.1': 'N/A'}}}}, ['component (CRITICAL)']),
(
{
'component': {
'cve_id': {'score2': '1.1', 'score3': '9.9'},
'cve_id2': {'score2': '1.1', 'score3': '0.0'},
'cve_id': {'scores': {'V2': '1.1', 'V3.1': '9.9'}},
'cve_id2': {'scores': {'V2': '1.1', 'V3.1': '0.0'}},
}
},
['component (CRITICAL)'],
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/analysis/cve_lookup/test/test_data_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
CVE_ENTRY = CveEntry(
cve_id='CVE-2012-0010',
summary='Microsoft Internet Explorer 6 through 9 does not properly perform copy-and-paste operations, which allows user-assisted remote attackers to read content from a different (1) domain or (2) zone via a crafted web site, aka "Copy and Paste Information Disclosure Vulnerability."', # noqa: E501
impact={'cvssMetricV2': 4.3},
impact={'V2': '4.3'},
cpe_entries=[
('cpe:2.3:a:microsoft:internet_explorer:6:*:*:*:*:*:*:*', '', '', '', ''),
('cpe:2.3:a:microsoft:internet_explorer:9:*:*:*:*:*:*:*', '', '', '', ''),
Expand Down
5 changes: 2 additions & 3 deletions src/plugins/analysis/cve_lookup/test/test_db_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
CPE_ID = 'cpe:2.3:o:vendor:product:version:update:edition:language:sw_edition:target_sw:target_hw:other'
CVE_ENTRY = CveEntry(
cve_id='CVE-2023-1234',
impact={'cvssMetricV2': '5.0', 'cvssMetricV30': '6.0', 'cvssMetricV31': '7.0'},
impact={'V2': '5.0', 'V3.0': '6.0', 'V3.1': '7.0'},
summary='This is a test CVE',
cpe_entries=[
(
Expand Down Expand Up @@ -35,8 +35,7 @@ def test_create_cve(self):
assert cve.cve_id == 'CVE-2023-1234'
assert cve.year == '2023'
assert cve.summary == 'This is a test CVE'
assert cve.cvss_v2_score == '5.0'
assert cve.cvss_v3_score == '7.0'
assert cve.cvss_score == {'V2': '5.0', 'V3.0': '6.0', 'V3.1': '7.0'}

def test_create_cpe(self):
cpe_id = CPE_ID
Expand Down
27 changes: 19 additions & 8 deletions src/plugins/analysis/cve_lookup/view/cve_lookup.html
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,33 @@
{% else %}
<td class="p-0 m-0">
<table class="table table-bordered m-0" style="width: 100%;">
{% set cvss_versions = result | get_cvss_versions %}
<tr>
<th>CVE ID</th>
<th style="width: 130px">CVSS v2 score</th>
<th style="width: 130px">CVSS v3 score</th>
<th style="width: 200px">Affected versions</th>
<th class="text-right" rowspan="2" style="min-width: 200px">CVE ID</th>
<th class="text-center" colspan="{{ cvss_versions | length | max(1) }}" style="min-width: 200px">CVSS scores</th>
<th class="text-left" rowspan="2" style="width: 100%;">Affected versions</th>
</tr>
<tr>
{% if (cvss_versions | length) > 0 %}
{% for version in cvss_versions | sort %}
<th class="text-center" style="width: 75px;">{{ version }}</th>
{% endfor %}
{% else %}
{# if there is no entry with a CVSS score, display a row of "N/A" #}
<th class="text-center" style="width: 75px;">N/A</th>
{% endif %}
</tr>
{% for cve_id, entry in result | sort_cve() %}
<tr>
<td>
<td class="text-right">
<a href="https://nvd.nist.gov/vuln/detail/{{ cve_id }}" target="_blank" rel="noopener noreferrer">{{ cve_id }} </a>
</td>
{% for score in [entry.get("score2", "N/A"), entry.get("score3", "N/A")] %}
{% for version in (cvss_versions or ["N/A"]) | sort %}
{% set score = entry.scores.get(version, "N/A") %}
{% set class = "secondary" if score == "N/A" else ("success" if score|float < 4 else ("warning" if score|float < 7 else "danger")) %}
<td><span class="badge badge-{{ class }}" style="font-size: 100%;">{{ score }}</span></td>
<td class="text-center"><span class="badge badge-{{ class }}" style="font-size: 100%;">{{ score }}</span></td>
{% endfor %}
<td>{{ entry.get("cpe_version", "N/A") }}</td>
<td class="text-left">{{ entry.get("cpe_version", "N/A") }}</td>
</tr>
{% endfor %}
</table>
Expand Down
Loading