Skip to content

Commit

Permalink
Feat: Added analyze logsource command line
Browse files Browse the repository at this point in the history
  • Loading branch information
andurin committed Jul 17, 2024
1 parent 42d2b9d commit e33ebea
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 2 deletions.
55 changes: 55 additions & 0 deletions sigma/analyze/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import copy
from typing import Dict, List
from sigma.rule import SigmaRule, SigmaLevel
from sigma.collection import SigmaCollection


rule_level_mapping = {
None: "None",
SigmaLevel.INFORMATIONAL: "Informational",
SigmaLevel.LOW: "Low",
SigmaLevel.MEDIUM: "Medium",
SigmaLevel.HIGH: "High",
SigmaLevel.CRITICAL: "Critical",
}

template_stat_detail = {
"Overall": 0,
"Critical": 0,
"High": 0,
"Medium": 0,
"Low": 0,
"Informational": 0,
"None": 0,
}


def format_row(row: str, column_widths: List) -> str:
"""Format rows for table."""
return " | ".join(
f"{str(item).ljust(width)}" for item, width in zip(row, column_widths)
)


def get_rulelevel_mapping(rule: SigmaRule) -> int:
"""Calculate rule score according to rule_level_scores."""
return rule_level_mapping[rule.level]


def create_logsourcestats(rules: SigmaCollection) -> Dict[str, int]:
"""
Iterate through all the rules and count SigmaLevel grouped by
Logsource Category Name.
"""
stats = {}

for rule in rules:
if hasattr(rule, "logsource"):
# Create stats key for logsource category.
if not rule.logsource.category in stats:
stats[rule.logsource.category] = copy.deepcopy(template_stat_detail)

stats[rule.logsource.category]["Overall"] += 1
stats[rule.logsource.category][get_rulelevel_mapping(rule)] += 1

return stats
56 changes: 56 additions & 0 deletions sigma/cli/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
mitre_attack_techniques_tactics_mapping,
mitre_attack_version,
)
from sigma.analyze.stats import create_logsourcestats, format_row


@click.group(name="analyze", help="Analyze Sigma rule sets")
Expand Down Expand Up @@ -126,3 +127,58 @@ def analyze_attack(
"techniques": layer_techniques,
}
json.dump(layer, output, indent=2)

@analyze_group.command(name="logsource", help="Create stats about logsources.")
@click.option(
"--file-pattern",
"-P",
default="*.yml",
show_default=True,
help="Pattern for file names to be included in recursion into directories.",
)
@click.option(
"--sort-by",
"-k",
type=str,
default="Overall",
show_default=True,
help="Sort by column.",
)
@click.argument(
"output",
type=click.File("w"),
)
@click.argument(
"input",
nargs=-1,
required=True,
type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path),
)
def analyze_logsource(
file_pattern,
sort_by,
output,
input,
):
rules = load_rules(input, file_pattern)
stats = create_logsourcestats(rules)

# Extract column header
headers = ["Logsource"] + list(next(iter(stats.values())).keys())

# Prepare rows
rows = [[key] + list(value.values()) for key, value in stats.items()]
sort_index = headers.index(sort_by)
rows.sort(key=lambda x: x[sort_index], reverse=True)

# Determine col width
column_widths = [
max(len(str(item)) for item in column) for column in zip(*([headers] + rows))
]

# Print table
print("-+-".join("-" * width for width in column_widths), file=output)
print(format_row(headers, column_widths), file=output)
print("-+-".join("-" * width for width in column_widths), file=output)
for row in rows:
print(format_row(row, column_widths), file=output)
31 changes: 29 additions & 2 deletions tests/test_analyze.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest
from click.testing import CliRunner
from sigma.cli.analyze import analyze_group, analyze_attack
from sigma.cli.analyze import analyze_group, analyze_attack, analyze_logsource
from sigma.rule import (
SigmaRule,
SigmaLogSource,
Expand All @@ -18,6 +18,7 @@
rule_level_scores,
score_functions,
)
from sigma.analyze.stats import create_logsourcestats, get_rulelevel_mapping, format_row


def test_analyze_group():
Expand Down Expand Up @@ -120,7 +121,7 @@ def sigma_rules():
title="Low severity rule",
logsource=logsource,
detection=detections,
level=SigmaLevel.LOW
level=SigmaLevel.LOW,
),
SigmaRule(
title="Critical severity rule",
Expand Down Expand Up @@ -170,3 +171,29 @@ def test_generate_attack_scores_no_subtechniques(sigma_rules):
"T1234": 2,
"T4321": 3,
}


def test_logsource_help():
cli = CliRunner()
result = cli.invoke(analyze_logsource, ["--help"])
assert result.exit_code == 0
assert len(result.stdout.split()) > 8


def test_logsource_get_rulelevel_mapping(sigma_rules):
for sigma_rule in sigma_rules:
if sigma_rule.level:
assert (
str(get_rulelevel_mapping(sigma_rule)).lower()
== sigma_rule.level.name.lower()
)
else:
assert str(get_rulelevel_mapping(sigma_rule)).lower() == "none"


def test_logsource_create_logsourcestats(sigma_rules):
ret = create_logsourcestats(sigma_rules)

assert 'test' in ret
assert ret['test'].get("Overall") == len(sigma_rules)

0 comments on commit e33ebea

Please sign in to comment.