Skip to content

Commit

Permalink
Merge pull request #1295 from TheHive-Project/capa-improvements-1
Browse files Browse the repository at this point in the history
Capa Analyzer - Code improvements
  • Loading branch information
nusantara-self authored Nov 25, 2024
2 parents 2da00c4 + b0107af commit 6d6eb50
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 65 deletions.
2 changes: 1 addition & 1 deletion analyzers/Capa/Capa.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "Capa",
"version": "1.0",
"author": "Wes Lambert",
"author": "Wes Lambert; nusantara-self, StrangeBee",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Analyze files with Capa",
Expand Down
159 changes: 95 additions & 64 deletions analyzers/Capa/CapaAnalyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
from cortexutils.analyzer import Analyzer
import os
import subprocess
import argparse
import json
import re
from collections import defaultdict

class CapaAnalyzer(Analyzer):
def __init__(self):
Expand All @@ -26,66 +23,100 @@ def summary(self, raw):
return {"taxonomies": taxonomies}

def run(self):
parser = argparse.ArgumentParser(description='exec capa.')
parser.add_argument('filepath', type=str, help='file path')
args = parser.parse_args()

if os.path.exists(self.filepath):
f = subprocess.check_output([self.capa_path, '-j', self.filepath])
process = json.loads(f)
rules = process['rules']
tactics = []
techniques = []
subtechniques = []
ids = []
capabilities = {}

for rule in rules:
try:
# Metadata
meta = process['rules'][rule]['meta']

# ATT&CK details
attack = meta['att&ck'][0]

# ID
id = attack['id']

# Technique
technique = attack['technique'] + " - " + id

# Subtechnique
subtechnique = attack['subtechnique']

# Tactic
tactic = attack['tactic']

# Capability
capability_name = process['rules'][rule]['meta']['name']

if tactic not in tactics:
tactics.append(tactic)

if subtechnique != "":
if subtechnique not in subtechniques:
subtechniques.append(attack['subtechnique'])

if technique not in techniques:
techniques.append(attack['technique'])

if id not in ids:
ids.append(id)

if tactic not in capabilities:
capabilities[tactic] = {}

if technique not in capabilities[tactic]:
capabilities[tactic][technique] = []

if capability_name not in capabilities[tactic][technique]:
capabilities[tactic][technique].append(capability_name)
except:
continue
self.report({ 'capabilities': capabilities, 'tactics': tactics, 'techniques': techniques, 'subtechniques': subtechniques, 'ids': ids, 'rules': rules })
if not os.path.isfile(self.capa_path) or not os.access(self.capa_path, os.X_OK):
self.error(f"capa binary not found or not executable at path: {self.capa_path}")
return

if not os.path.exists(self.filepath):
self.error(f"File not found: {self.filepath}")
return

try:
result = subprocess.run(
[self.capa_path, '-j', self.filepath],
capture_output=True,
text=True
)
if result.returncode != 0:
self.error(f"capa execution failed with return code {result.returncode}: {result.stderr}")
return
except Exception as e:
self.error(f"An error occurred while executing capa: {e}")
return

try:
process = json.loads(result.stdout)
except json.JSONDecodeError as e:
self.error(f"Failed to parse capa output as JSON: {e}")
return

rules = process.get('rules', {})
tactics = []
techniques = []
subtechniques = []
ids = []
capabilities = {}

for rule_key, rule_value in rules.items():
try:
# Metadata
meta = rule_value['meta']

# ATT&CK details
attack = meta['att&ck'][0]

# ID
attack_id = attack['id']

# Technique
technique = f"{attack['technique']} - {attack_id}"

# Subtechnique
subtechnique = attack.get('subtechnique', '')

# Tactic
tactic = attack['tactic']

# Capability
capability_name = meta['name']

# Collect data
if tactic not in tactics:
tactics.append(tactic)

if subtechnique:
if subtechnique not in subtechniques:
subtechniques.append(subtechnique)

if technique not in techniques:
techniques.append(technique)

if attack_id not in ids:
ids.append(attack_id)

if tactic not in capabilities:
capabilities[tactic] = {}

if technique not in capabilities[tactic]:
capabilities[tactic][technique] = []

if capability_name not in capabilities[tactic][technique]:
capabilities[tactic][technique].append(capability_name)
except KeyError as e:
#self.error(f"KeyError processing rule {rule_key}: {e}")
continue
except Exception as e:
#self.error(f"Unexpected error processing rule {rule_key}: {e}")
continue

self.report({
'capabilities': capabilities,
'tactics': tactics,
'techniques': techniques,
'subtechniques': subtechniques,
'ids': ids,
'rules': rules
})

if __name__ == '__main__':
CapaAnalyzer().run()

0 comments on commit 6d6eb50

Please sign in to comment.