From e33ebeac348915a9f6b1da3e4f5a9545e79bebcf Mon Sep 17 00:00:00 2001 From: Hendrik Baecker Date: Wed, 17 Jul 2024 15:50:11 +0200 Subject: [PATCH] Feat: Added analyze logsource command line --- sigma/analyze/stats.py | 55 +++++++++++++++++++++++++++++++++++++++++ sigma/cli/analyze.py | 56 ++++++++++++++++++++++++++++++++++++++++++ tests/test_analyze.py | 31 +++++++++++++++++++++-- 3 files changed, 140 insertions(+), 2 deletions(-) create mode 100644 sigma/analyze/stats.py diff --git a/sigma/analyze/stats.py b/sigma/analyze/stats.py new file mode 100644 index 0000000..89217e6 --- /dev/null +++ b/sigma/analyze/stats.py @@ -0,0 +1,55 @@ +import copy +from typing import Dict, List +from sigma.rule import SigmaRule, SigmaLevel +from sigma.collection import SigmaCollection + + +rule_level_mapping = { + None: "None", + SigmaLevel.INFORMATIONAL: "Informational", + SigmaLevel.LOW: "Low", + SigmaLevel.MEDIUM: "Medium", + SigmaLevel.HIGH: "High", + SigmaLevel.CRITICAL: "Critical", +} + +template_stat_detail = { + "Overall": 0, + "Critical": 0, + "High": 0, + "Medium": 0, + "Low": 0, + "Informational": 0, + "None": 0, +} + + +def format_row(row: str, column_widths: List) -> str: + """Format rows for table.""" + return " | ".join( + f"{str(item).ljust(width)}" for item, width in zip(row, column_widths) + ) + + +def get_rulelevel_mapping(rule: SigmaRule) -> int: + """Calculate rule score according to rule_level_scores.""" + return rule_level_mapping[rule.level] + + +def create_logsourcestats(rules: SigmaCollection) -> Dict[str, int]: + """ + Iterate through all the rules and count SigmaLevel grouped by + Logsource Category Name. + """ + stats = {} + + for rule in rules: + if hasattr(rule, "logsource"): + # Create stats key for logsource category. + if not rule.logsource.category in stats: + stats[rule.logsource.category] = copy.deepcopy(template_stat_detail) + + stats[rule.logsource.category]["Overall"] += 1 + stats[rule.logsource.category][get_rulelevel_mapping(rule)] += 1 + + return stats diff --git a/sigma/cli/analyze.py b/sigma/cli/analyze.py index 1a6ac4f..07c0281 100644 --- a/sigma/cli/analyze.py +++ b/sigma/cli/analyze.py @@ -8,6 +8,7 @@ mitre_attack_techniques_tactics_mapping, mitre_attack_version, ) +from sigma.analyze.stats import create_logsourcestats, format_row @click.group(name="analyze", help="Analyze Sigma rule sets") @@ -126,3 +127,58 @@ def analyze_attack( "techniques": layer_techniques, } json.dump(layer, output, indent=2) + +@analyze_group.command(name="logsource", help="Create stats about logsources.") +@click.option( + "--file-pattern", + "-P", + default="*.yml", + show_default=True, + help="Pattern for file names to be included in recursion into directories.", +) +@click.option( + "--sort-by", + "-k", + type=str, + default="Overall", + show_default=True, + help="Sort by column.", +) +@click.argument( + "output", + type=click.File("w"), +) +@click.argument( + "input", + nargs=-1, + required=True, + type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path), +) +def analyze_logsource( + file_pattern, + sort_by, + output, + input, +): + rules = load_rules(input, file_pattern) + stats = create_logsourcestats(rules) + + # Extract column header + headers = ["Logsource"] + list(next(iter(stats.values())).keys()) + + # Prepare rows + rows = [[key] + list(value.values()) for key, value in stats.items()] + sort_index = headers.index(sort_by) + rows.sort(key=lambda x: x[sort_index], reverse=True) + + # Determine col width + column_widths = [ + max(len(str(item)) for item in column) for column in zip(*([headers] + rows)) + ] + + # Print table + print("-+-".join("-" * width for width in column_widths), file=output) + print(format_row(headers, column_widths), file=output) + print("-+-".join("-" * width for width in column_widths), file=output) + for row in rows: + print(format_row(row, column_widths), file=output) diff --git a/tests/test_analyze.py b/tests/test_analyze.py index 0f1984b..28e5d11 100644 --- a/tests/test_analyze.py +++ b/tests/test_analyze.py @@ -1,6 +1,6 @@ import pytest from click.testing import CliRunner -from sigma.cli.analyze import analyze_group, analyze_attack +from sigma.cli.analyze import analyze_group, analyze_attack, analyze_logsource from sigma.rule import ( SigmaRule, SigmaLogSource, @@ -18,6 +18,7 @@ rule_level_scores, score_functions, ) +from sigma.analyze.stats import create_logsourcestats, get_rulelevel_mapping, format_row def test_analyze_group(): @@ -120,7 +121,7 @@ def sigma_rules(): title="Low severity rule", logsource=logsource, detection=detections, - level=SigmaLevel.LOW + level=SigmaLevel.LOW, ), SigmaRule( title="Critical severity rule", @@ -170,3 +171,29 @@ def test_generate_attack_scores_no_subtechniques(sigma_rules): "T1234": 2, "T4321": 3, } + + +def test_logsource_help(): + cli = CliRunner() + result = cli.invoke(analyze_logsource, ["--help"]) + assert result.exit_code == 0 + assert len(result.stdout.split()) > 8 + + +def test_logsource_get_rulelevel_mapping(sigma_rules): + for sigma_rule in sigma_rules: + if sigma_rule.level: + assert ( + str(get_rulelevel_mapping(sigma_rule)).lower() + == sigma_rule.level.name.lower() + ) + else: + assert str(get_rulelevel_mapping(sigma_rule)).lower() == "none" + + +def test_logsource_create_logsourcestats(sigma_rules): + ret = create_logsourcestats(sigma_rules) + + assert 'test' in ret + assert ret['test'].get("Overall") == len(sigma_rules) +