diff --git a/analyzers/Capa/Capa.json b/analyzers/Capa/Capa.json index 439128882..3d1e45658 100755 --- a/analyzers/Capa/Capa.json +++ b/analyzers/Capa/Capa.json @@ -1,7 +1,7 @@ { "name": "Capa", "version": "1.0", - "author": "Wes Lambert", + "author": "Wes Lambert; nusantara-self, StrangeBee", "url": "https://github.com/TheHive-Project/Cortex-Analyzers", "license": "AGPL-V3", "description": "Analyze files with Capa", diff --git a/analyzers/Capa/CapaAnalyze.py b/analyzers/Capa/CapaAnalyze.py index 22e5fac96..0d7f273e2 100755 --- a/analyzers/Capa/CapaAnalyze.py +++ b/analyzers/Capa/CapaAnalyze.py @@ -4,10 +4,7 @@ from cortexutils.analyzer import Analyzer import os import subprocess -import argparse import json -import re -from collections import defaultdict class CapaAnalyzer(Analyzer): def __init__(self): @@ -26,66 +23,100 @@ def summary(self, raw): return {"taxonomies": taxonomies} def run(self): - parser = argparse.ArgumentParser(description='exec capa.') - parser.add_argument('filepath', type=str, help='file path') - args = parser.parse_args() - - if os.path.exists(self.filepath): - f = subprocess.check_output([self.capa_path, '-j', self.filepath]) - process = json.loads(f) - rules = process['rules'] - tactics = [] - techniques = [] - subtechniques = [] - ids = [] - capabilities = {} - - for rule in rules: - try: - # Metadata - meta = process['rules'][rule]['meta'] - - # ATT&CK details - attack = meta['att&ck'][0] - - # ID - id = attack['id'] - - # Technique - technique = attack['technique'] + " - " + id - - # Subtechnique - subtechnique = attack['subtechnique'] - - # Tactic - tactic = attack['tactic'] - - # Capability - capability_name = process['rules'][rule]['meta']['name'] - - if tactic not in tactics: - tactics.append(tactic) - - if subtechnique != "": - if subtechnique not in subtechniques: - subtechniques.append(attack['subtechnique']) - - if technique not in techniques: - techniques.append(attack['technique']) - - if id not in ids: - ids.append(id) - - if tactic not in capabilities: - capabilities[tactic] = {} - - if technique not in capabilities[tactic]: - capabilities[tactic][technique] = [] - - if capability_name not in capabilities[tactic][technique]: - capabilities[tactic][technique].append(capability_name) - except: - continue - self.report({ 'capabilities': capabilities, 'tactics': tactics, 'techniques': techniques, 'subtechniques': subtechniques, 'ids': ids, 'rules': rules }) + if not os.path.isfile(self.capa_path) or not os.access(self.capa_path, os.X_OK): + self.error(f"capa binary not found or not executable at path: {self.capa_path}") + return + + if not os.path.exists(self.filepath): + self.error(f"File not found: {self.filepath}") + return + + try: + result = subprocess.run( + [self.capa_path, '-j', self.filepath], + capture_output=True, + text=True + ) + if result.returncode != 0: + self.error(f"capa execution failed with return code {result.returncode}: {result.stderr}") + return + except Exception as e: + self.error(f"An error occurred while executing capa: {e}") + return + + try: + process = json.loads(result.stdout) + except json.JSONDecodeError as e: + self.error(f"Failed to parse capa output as JSON: {e}") + return + + rules = process.get('rules', {}) + tactics = [] + techniques = [] + subtechniques = [] + ids = [] + capabilities = {} + + for rule_key, rule_value in rules.items(): + try: + # Metadata + meta = rule_value['meta'] + + # ATT&CK details + attack = meta['att&ck'][0] + + # ID + attack_id = attack['id'] + + # Technique + technique = f"{attack['technique']} - {attack_id}" + + # Subtechnique + subtechnique = attack.get('subtechnique', '') + + # Tactic + tactic = attack['tactic'] + + # Capability + capability_name = meta['name'] + + # Collect data + if tactic not in tactics: + tactics.append(tactic) + + if subtechnique: + if subtechnique not in subtechniques: + subtechniques.append(subtechnique) + + if technique not in techniques: + techniques.append(technique) + + if attack_id not in ids: + ids.append(attack_id) + + if tactic not in capabilities: + capabilities[tactic] = {} + + if technique not in capabilities[tactic]: + capabilities[tactic][technique] = [] + + if capability_name not in capabilities[tactic][technique]: + capabilities[tactic][technique].append(capability_name) + except KeyError as e: + #self.error(f"KeyError processing rule {rule_key}: {e}") + continue + except Exception as e: + #self.error(f"Unexpected error processing rule {rule_key}: {e}") + continue + + self.report({ + 'capabilities': capabilities, + 'tactics': tactics, + 'techniques': techniques, + 'subtechniques': subtechniques, + 'ids': ids, + 'rules': rules + }) + if __name__ == '__main__': CapaAnalyzer().run()