diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..62fbcbb --- /dev/null +++ b/.env.example @@ -0,0 +1,3 @@ +NEO4J_URL="localhost" +NEO4J_USERNAME="neo4j" +NEO4J_PASSWORD=changemeplease diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8d19362 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.venv +.env diff --git a/README.md b/README.md index feffe92..ce6a2cf 100644 --- a/README.md +++ b/README.md @@ -57,15 +57,18 @@ The CTI database contains up to 2060 nodes, 27411 relationships, and 366 APTs co 4. Download the MITRE Entreprise ATT&CK database in JSON format - `wget https://github.com/mitre/cti/blob/master/enterprise-attack/enterprise-attack.json` + `wget https://github.com/mitre/cti/raw/master/enterprise-attack/enterprise-attack.json` -5. Configure the main.py script - - ```python - 100. # open graph connection` - 101. graph_bolt = "bolt://127.0.0.1:7687" - 102. graph_auth = ("neo4j","1234") +5. Configure script with .env file + ```bash + cp .env.example .env + ``` + ```env + # Change values according to created database + NEO4J_URL=bolt://127.0.0.1:7687 + NEO4J_USERNAME=neo4j + NEO4J_PASSWORD=1234 ``` 6. Run the script diff --git a/main.py b/main.py index d50adb5..4af43cd 100644 --- a/main.py +++ b/main.py @@ -1,85 +1,151 @@ #!/usr/bin/env python3 import argparse +from os import getenv +from dotenv import load_dotenv + import json -import re import sys -from py2neo import Graph, Node, Relationship, NodeMatcher, cypher +from py2neo import Graph, Node, Relationship, NodeMatcher # BUILD_LABEL def build_label(txt): - if txt.startswith('intrusion-set'): return 'Group' - if txt.startswith('malware'): return 'Software' - if txt.startswith('tool'): return 'Tool' - if txt.startswith('attack-pattern'): return 'Technique' - if txt.startswith('course-of-action'): return 'Technique' - return 'Unknown' + if txt.startswith("intrusion-set"): + return "Group" + if txt.startswith("malware"): + return "Software" + if txt.startswith("tool"): + return "Tool" + if txt.startswith("attack-pattern"): + return "Technique" + if txt.startswith("course-of-action"): + return "Technique" + return "Unknown" # BUILD ALIASES -def build_objects(obj,key): - label = build_label(obj['type']) - - # Add properties - props = {} - props['name'] = obj['name'] - props['id'] = obj['id'] - props['type'] = obj['type'] - if obj.get('description'): props['description'] = obj['description'] - if obj.get('created'): props['created'] = obj['created'] - if obj.get('modified'): props['modified'] = obj['modified'] - if obj.get('x_mitre_version'): props['version'] = obj['x_mitre_version'] - - # Create node for the group - node_main = Node(label, **props) - # Merge node to graph - graph.merge(node_main,label,'name') - print('%s: "%s"' % (label,obj['name']),end='') if dbg_mode else None - - # Dealing with aliases - if obj.get('aliases'): - aliases = obj['aliases'] - elif obj.get('x_mitre_aliases'): - aliases = obj['x_mitre_aliases'] - else: - aliases = None - if aliases: - for alias in aliases: - if alias != obj['name']: - node_alias = Node('Alias', name=alias, type=obj['type']) - relation = Relationship.type('alias') - graph.merge(relation(node_main,node_alias),label,'name') - print(' -[alias]-> "%s"' % (alias),end='') if dbg_mode else None - print() if dbg_mode else None +def build_objects(obj, key): + label = build_label(obj["type"]) + + # Add properties + props = {} + props["name"] = obj["name"] + props["id"] = obj["id"] + props["type"] = obj["type"] + if obj.get("description"): + props["description"] = obj["description"] + if obj.get("created"): + props["created"] = obj["created"] + if obj.get("modified"): + props["modified"] = obj["modified"] + if obj.get("x_mitre_version"): + props["version"] = obj["x_mitre_version"] + if obj.get("x_mitre_deprecated"): + props["deprecated"] = obj["x_mitre_deprecated"] + props["external_id"] = next( + filter( + lambda s: s.get("source_name", "") == "mitre-attack" + and s.get("external_id", None), + obj.get("external_references", []), + ), + {"external_id": None}, + )["external_id"] + # Create node for the group + node_main = Node(label, **props) + # Merge node to graph + graph.merge(node_main, label, "name") + print('%s: "%s"' % (label, obj["name"]), end="") if dbg_mode else None + + # Dealing with aliases + if obj.get("aliases"): + aliases = obj["aliases"] + elif obj.get("x_mitre_aliases"): + aliases = obj["x_mitre_aliases"] + else: + aliases = None + if aliases: + for alias in aliases: + if alias != obj["name"]: + node_alias = Node("Alias", name=alias, type=obj["type"]) + relation = Relationship.type("alias") + graph.merge(relation(node_main, node_alias), label, "name") + print(' -[alias]-> "%s"' % (alias), end="") if dbg_mode else None + print() if dbg_mode else None # BUILD RELATIONS def build_relations(obj): + if not gnames.get(obj["source_ref"]): + return + if not gnames.get(obj["target_ref"]): + return - if not gnames.get(obj['source_ref']): return - if not gnames.get(obj['target_ref']): return - - m = NodeMatcher(graph) + m = NodeMatcher(graph) - source = m.match( build_label(obj['source_ref']), name=gnames[obj['source_ref']] ).first() - target = m.match( build_label(obj['target_ref']), name=gnames[obj['target_ref']] ).first() + source = m.match( + build_label(obj["source_ref"]), name=gnames[obj["source_ref"]] + ).first() + target = m.match( + build_label(obj["target_ref"]), name=gnames[obj["target_ref"]] + ).first() - relation = Relationship.type( obj['relationship_type'] ) + relation = Relationship.type(obj["relationship_type"]) - graph.merge(relation(source,target),build_label(obj['source_ref']),'name') - print('Relation: "%s" -[%s]-> "%s"' % (gnames[obj['source_ref']],obj['relationship_type'],gnames[obj['target_ref']]) ) if dbg_mode else None + graph.merge(relation(source, target), build_label(obj["source_ref"]), "name") + print( + 'Relation: "%s" -[%s]-> "%s"' + % ( + gnames[obj["source_ref"]], + obj["relationship_type"], + gnames[obj["target_ref"]], + ) + ) if dbg_mode else None # Set command-line arguments and parsing options parser = argparse.ArgumentParser() -parser.add_argument('-d','--debug', help='enter debug mode', default=False, action='store_true') -parser.add_argument('-f', help='input file name', metavar='', action='store', required=True) -parser.add_argument('-g','--groups', help='import Groups objects (type:intrusion-set)', default=False, action='store_true') -parser.add_argument('-s','--softwares', help='import Softwares objects (type:malware)', default=False, action='store_true') -parser.add_argument('-o','--tools', help='import Tools objects (type:tool)', default=False, action='store_true') -parser.add_argument('-t','--techniques', help='import Techniques objects (type:attack-pattern and type:course-of-action)', default=False, action='store_true') -parser.add_argument('-r','--relations', help='import Relations objects (type:relationship)', default=False, action='store_true') +parser.add_argument( + "-d", "--debug", help="enter debug mode", default=False, action="store_true" +) +parser.add_argument( + "-f", help="input file name", metavar="", action="store", required=True +) +parser.add_argument( + "-g", + "--groups", + help="import Groups objects (type:intrusion-set)", + default=False, + action="store_true", +) +parser.add_argument( + "-s", + "--softwares", + help="import Softwares objects (type:malware)", + default=False, + action="store_true", +) +parser.add_argument( + "-o", + "--tools", + help="import Tools objects (type:tool)", + default=False, + action="store_true", +) +parser.add_argument( + "-t", + "--techniques", + help="import Techniques objects (type:attack-pattern and type:course-of-action)", + default=False, + action="store_true", +) +parser.add_argument( + "-r", + "--relations", + help="import Relations objects (type:relationship)", + default=False, + action="store_true", +) args = parser.parse_args() # Checks arguments and options @@ -88,54 +154,56 @@ def build_relations(obj): # Load JSON data from file try: - with open(json_file, encoding="utf8") as fh: - data = json.load(fh) - fh.close() + with open(json_file, encoding="utf8") as fh: + data = json.load(fh) + fh.close() except Exception as e: - sys.stderr.write( '[ERROR] reading configuration file %s\n' % json_file ) - sys.stderr.write( '[ERROR] %s\n' % str(e) ) - sys.exit(1) + sys.stderr.write("[ERROR] reading configuration file %s\n" % json_file) + sys.stderr.write("[ERROR] %s\n" % str(e)) + sys.exit(1) # Open graph connection -graph_bolt = "bolt://127.0.0.1:7687" -graph_auth = ("neo4j","12345678") +load_dotenv() + +graph_bolt = getenv("NEO4J_URL") +graph_auth = (getenv("NEO4J_USERNAME"), getenv("NEO4J_PASSWORD")) -graph = Graph(graph_bolt,auth=graph_auth) +graph = Graph(graph_bolt, auth=graph_auth) # Delete existing nodes and edges graph.delete_all() - + # Global names gnames = {} # Walk through JSON objects to create nodes -for obj in data['objects']: - - # if JSON object is about Groups - if args.groups and obj['type']=='intrusion-set': - gnames[ obj['id'] ] = obj['name'] - build_objects(obj,'aliases') - - # if JSON object is about Softwares - if args.softwares and obj['type']=='malware': - gnames[ obj['id'] ] = obj['name'] - build_objects(obj,'x_mitre_aliases') - - # if JSON object is about Tools - if args.tools and obj['type']=='tool': - gnames[ obj['id'] ] = obj['name'] - build_objects(obj,'x_mitre_aliases') - - # if JSON object is about Techniques - if args.techniques and (obj['type']=='attack-pattern' or obj['type']=='course-of-action'): - gnames[ obj['id'] ] = obj['name'] - build_objects(obj,None) +for obj in data["objects"]: + # if JSON object is about Groups + if args.groups and obj["type"] == "intrusion-set": + gnames[obj["id"]] = obj["name"] + build_objects(obj, "aliases") + + # if JSON object is about Softwares + if args.softwares and obj["type"] == "malware": + gnames[obj["id"]] = obj["name"] + build_objects(obj, "x_mitre_aliases") + + # if JSON object is about Tools + if args.tools and obj["type"] == "tool": + gnames[obj["id"]] = obj["name"] + build_objects(obj, "x_mitre_aliases") + + # if JSON object is about Techniques + if args.techniques and ( + obj["type"] == "attack-pattern" or obj["type"] == "course-of-action" + ): + gnames[obj["id"]] = obj["name"] + build_objects(obj, None) # Walk through JSON objects to create edges -for obj in data['objects']: - - # if JSON object is about Relationships - if args.relations and obj['type']=='relationship': - build_relations(obj) +for obj in data["objects"]: + # if JSON object is about Relationships + if args.relations and obj["type"] == "relationship": + build_relations(obj) -# EOF \ No newline at end of file +# EOF diff --git a/requirements.txt b/requirements.txt index 36d7163..24eeb97 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,4 @@ pytz six urllib3 wcwidth +python-dotenv