Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and improvments for cti-kb project #1

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
NEO4J_URL="localhost"
NEO4J_USERNAME="neo4j"
NEO4J_PASSWORD=changemeplease
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.venv
.env
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,18 @@ The CTI database contains up to 2060 nodes, 27411 relationships, and 366 APTs co

4. Download the MITRE Entreprise ATT&CK database in JSON format

`wget https://github.com/mitre/cti/blob/master/enterprise-attack/enterprise-attack.json`
`wget https://github.com/mitre/cti/raw/master/enterprise-attack/enterprise-attack.json`


5. Configure the main.py script

```python
100. # open graph connection`
101. graph_bolt = "bolt://127.0.0.1:7687"
102. graph_auth = ("neo4j","1234")
5. Configure script with .env file
```bash
cp .env.example .env
```
```env
# Change values according to created database
NEO4J_URL=bolt://127.0.0.1:7687
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=1234
```

6. Run the script
Expand Down
258 changes: 163 additions & 95 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,151 @@
#!/usr/bin/env python3

import argparse
from os import getenv
from dotenv import load_dotenv

import json
import re
import sys
from py2neo import Graph, Node, Relationship, NodeMatcher, cypher
from py2neo import Graph, Node, Relationship, NodeMatcher


# BUILD_LABEL
def build_label(txt):
if txt.startswith('intrusion-set'): return 'Group'
if txt.startswith('malware'): return 'Software'
if txt.startswith('tool'): return 'Tool'
if txt.startswith('attack-pattern'): return 'Technique'
if txt.startswith('course-of-action'): return 'Technique'
return 'Unknown'
if txt.startswith("intrusion-set"):
return "Group"
if txt.startswith("malware"):
return "Software"
if txt.startswith("tool"):
return "Tool"
if txt.startswith("attack-pattern"):
return "Technique"
if txt.startswith("course-of-action"):
return "Technique"
return "Unknown"


# BUILD ALIASES
def build_objects(obj,key):
label = build_label(obj['type'])

# Add properties
props = {}
props['name'] = obj['name']
props['id'] = obj['id']
props['type'] = obj['type']
if obj.get('description'): props['description'] = obj['description']
if obj.get('created'): props['created'] = obj['created']
if obj.get('modified'): props['modified'] = obj['modified']
if obj.get('x_mitre_version'): props['version'] = obj['x_mitre_version']

# Create node for the group
node_main = Node(label, **props)
# Merge node to graph
graph.merge(node_main,label,'name')
print('%s: "%s"' % (label,obj['name']),end='') if dbg_mode else None

# Dealing with aliases
if obj.get('aliases'):
aliases = obj['aliases']
elif obj.get('x_mitre_aliases'):
aliases = obj['x_mitre_aliases']
else:
aliases = None
if aliases:
for alias in aliases:
if alias != obj['name']:
node_alias = Node('Alias', name=alias, type=obj['type'])
relation = Relationship.type('alias')
graph.merge(relation(node_main,node_alias),label,'name')
print(' -[alias]-> "%s"' % (alias),end='') if dbg_mode else None
print() if dbg_mode else None
def build_objects(obj, key):
label = build_label(obj["type"])

# Add properties
props = {}
props["name"] = obj["name"]
props["id"] = obj["id"]
props["type"] = obj["type"]
if obj.get("description"):
props["description"] = obj["description"]
if obj.get("created"):
props["created"] = obj["created"]
if obj.get("modified"):
props["modified"] = obj["modified"]
if obj.get("x_mitre_version"):
props["version"] = obj["x_mitre_version"]
if obj.get("x_mitre_deprecated"):
props["deprecated"] = obj["x_mitre_deprecated"]
props["external_id"] = next(
filter(
lambda s: s.get("source_name", "") == "mitre-attack"
and s.get("external_id", None),
obj.get("external_references", []),
),
{"external_id": None},
)["external_id"]
# Create node for the group
node_main = Node(label, **props)
# Merge node to graph
graph.merge(node_main, label, "name")
print('%s: "%s"' % (label, obj["name"]), end="") if dbg_mode else None

# Dealing with aliases
if obj.get("aliases"):
aliases = obj["aliases"]
elif obj.get("x_mitre_aliases"):
aliases = obj["x_mitre_aliases"]
else:
aliases = None
if aliases:
for alias in aliases:
if alias != obj["name"]:
node_alias = Node("Alias", name=alias, type=obj["type"])
relation = Relationship.type("alias")
graph.merge(relation(node_main, node_alias), label, "name")
print(' -[alias]-> "%s"' % (alias), end="") if dbg_mode else None
print() if dbg_mode else None


# BUILD RELATIONS
def build_relations(obj):
if not gnames.get(obj["source_ref"]):
return
if not gnames.get(obj["target_ref"]):
return

if not gnames.get(obj['source_ref']): return
if not gnames.get(obj['target_ref']): return

m = NodeMatcher(graph)
m = NodeMatcher(graph)

source = m.match( build_label(obj['source_ref']), name=gnames[obj['source_ref']] ).first()
target = m.match( build_label(obj['target_ref']), name=gnames[obj['target_ref']] ).first()
source = m.match(
build_label(obj["source_ref"]), name=gnames[obj["source_ref"]]
).first()
target = m.match(
build_label(obj["target_ref"]), name=gnames[obj["target_ref"]]
).first()

relation = Relationship.type( obj['relationship_type'] )
relation = Relationship.type(obj["relationship_type"])

graph.merge(relation(source,target),build_label(obj['source_ref']),'name')
print('Relation: "%s" -[%s]-> "%s"' % (gnames[obj['source_ref']],obj['relationship_type'],gnames[obj['target_ref']]) ) if dbg_mode else None
graph.merge(relation(source, target), build_label(obj["source_ref"]), "name")
print(
'Relation: "%s" -[%s]-> "%s"'
% (
gnames[obj["source_ref"]],
obj["relationship_type"],
gnames[obj["target_ref"]],
)
) if dbg_mode else None


# Set command-line arguments and parsing options
parser = argparse.ArgumentParser()
parser.add_argument('-d','--debug', help='enter debug mode', default=False, action='store_true')
parser.add_argument('-f', help='input file name', metavar='<filename>', action='store', required=True)
parser.add_argument('-g','--groups', help='import Groups objects (type:intrusion-set)', default=False, action='store_true')
parser.add_argument('-s','--softwares', help='import Softwares objects (type:malware)', default=False, action='store_true')
parser.add_argument('-o','--tools', help='import Tools objects (type:tool)', default=False, action='store_true')
parser.add_argument('-t','--techniques', help='import Techniques objects (type:attack-pattern and type:course-of-action)', default=False, action='store_true')
parser.add_argument('-r','--relations', help='import Relations objects (type:relationship)', default=False, action='store_true')
parser.add_argument(
"-d", "--debug", help="enter debug mode", default=False, action="store_true"
)
parser.add_argument(
"-f", help="input file name", metavar="<filename>", action="store", required=True
)
parser.add_argument(
"-g",
"--groups",
help="import Groups objects (type:intrusion-set)",
default=False,
action="store_true",
)
parser.add_argument(
"-s",
"--softwares",
help="import Softwares objects (type:malware)",
default=False,
action="store_true",
)
parser.add_argument(
"-o",
"--tools",
help="import Tools objects (type:tool)",
default=False,
action="store_true",
)
parser.add_argument(
"-t",
"--techniques",
help="import Techniques objects (type:attack-pattern and type:course-of-action)",
default=False,
action="store_true",
)
parser.add_argument(
"-r",
"--relations",
help="import Relations objects (type:relationship)",
default=False,
action="store_true",
)
args = parser.parse_args()

# Checks arguments and options
Expand All @@ -88,54 +154,56 @@ def build_relations(obj):

# Load JSON data from file
try:
with open(json_file, encoding="utf8") as fh:
data = json.load(fh)
fh.close()
with open(json_file, encoding="utf8") as fh:
data = json.load(fh)
fh.close()
except Exception as e:
sys.stderr.write( '[ERROR] reading configuration file %s\n' % json_file )
sys.stderr.write( '[ERROR] %s\n' % str(e) )
sys.exit(1)
sys.stderr.write("[ERROR] reading configuration file %s\n" % json_file)
sys.stderr.write("[ERROR] %s\n" % str(e))
sys.exit(1)

# Open graph connection
graph_bolt = "bolt://127.0.0.1:7687"
graph_auth = ("neo4j","12345678")
load_dotenv()

graph_bolt = getenv("NEO4J_URL")
graph_auth = (getenv("NEO4J_USERNAME"), getenv("NEO4J_PASSWORD"))

graph = Graph(graph_bolt,auth=graph_auth)
graph = Graph(graph_bolt, auth=graph_auth)

# Delete existing nodes and edges
graph.delete_all()

# Global names
gnames = {}

# Walk through JSON objects to create nodes
for obj in data['objects']:

# if JSON object is about Groups
if args.groups and obj['type']=='intrusion-set':
gnames[ obj['id'] ] = obj['name']
build_objects(obj,'aliases')

# if JSON object is about Softwares
if args.softwares and obj['type']=='malware':
gnames[ obj['id'] ] = obj['name']
build_objects(obj,'x_mitre_aliases')

# if JSON object is about Tools
if args.tools and obj['type']=='tool':
gnames[ obj['id'] ] = obj['name']
build_objects(obj,'x_mitre_aliases')

# if JSON object is about Techniques
if args.techniques and (obj['type']=='attack-pattern' or obj['type']=='course-of-action'):
gnames[ obj['id'] ] = obj['name']
build_objects(obj,None)
for obj in data["objects"]:
# if JSON object is about Groups
if args.groups and obj["type"] == "intrusion-set":
gnames[obj["id"]] = obj["name"]
build_objects(obj, "aliases")

# if JSON object is about Softwares
if args.softwares and obj["type"] == "malware":
gnames[obj["id"]] = obj["name"]
build_objects(obj, "x_mitre_aliases")

# if JSON object is about Tools
if args.tools and obj["type"] == "tool":
gnames[obj["id"]] = obj["name"]
build_objects(obj, "x_mitre_aliases")

# if JSON object is about Techniques
if args.techniques and (
obj["type"] == "attack-pattern" or obj["type"] == "course-of-action"
):
gnames[obj["id"]] = obj["name"]
build_objects(obj, None)

# Walk through JSON objects to create edges
for obj in data['objects']:

# if JSON object is about Relationships
if args.relations and obj['type']=='relationship':
build_relations(obj)
for obj in data["objects"]:
# if JSON object is about Relationships
if args.relations and obj["type"] == "relationship":
build_relations(obj)

# EOF
# EOF
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pytz
six
urllib3
wcwidth
python-dotenv