-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathrepominer.py
executable file
·105 lines (90 loc) · 4.19 KB
/
repominer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import os
import os.path
import sys
import datetime
from core.codechecker import CodeCheckerFactory, Alert
from core.evaluators import LineEvalFactory
from core.ruleparser import load_rules, build_resolved_ruleset
from core.datastore import DataStore, DataStoreException
def check_alerts_in_file(code_checker, file, filename):
content = file.readlines()
check_context = {"filename": filename}
result = code_checker.check(content, check_context)
def create_alert(rule, vuln_line):
vuln_line_number = content.index(vuln_line) + 1
return Alert(rule, filename, repo='', commit='', line=line, line_number=vuln_line_number)
actual_alerts = [create_alert(rule, line) for rule, line in result]
return actual_alerts
parser = argparse.ArgumentParser(description='Check a sourcecode repo')
parser.add_argument('--rule-dir', default="rules/", help='Directory of rules')
parser.add_argument('--alerts', '-a', default=False,
help='Limit running only the given alert checks (comma separated list)')
parser.add_argument('--store', '-S', default=False, help='ElasticSearch node (host:port)')
parser.add_argument('files', metavar='file', nargs='*', default=None, help='Files to check')
args = parser.parse_args()
bare_rules = load_rules(args.rule_dir)
resolved_rules = build_resolved_ruleset(bare_rules)
# filter for items in --alerts parameter
enabled_alerts = [a.strip() for a in args.alerts.split(',')] if args.alerts else False
applied_alerts = {aid: adata for aid, adata in resolved_rules.iteritems()
if not enabled_alerts or any(aid.startswith(ea) for ea in enabled_alerts)}
if not applied_alerts:
print "No matching alerts"
sys.exit()
if not args.files:
print "No files given."
parser.print_help()
sys.exit()
code_checker = CodeCheckerFactory(applied_alerts).create(LineEvalFactory.MODE_SINGLE)
textchars = ''.join(map(chr, [7, 8, 9, 10, 12, 13, 27] + range(0x20, 0x100)))
is_binary_string = lambda bytes: bool(bytes.translate(None, textchars))
for path in args.files:
print "Checking " + path
alerts = []
if os.path.isdir(path):
for root, subFolders, files in os.walk(path):
for fname in files:
fpath = os.path.join(root, fname)
if not os.path.islink(fpath):
with open(fpath) as f:
if is_binary_string(f.read(128)):
continue
else:
f.seek(0)
alerts.extend(check_alerts_in_file(code_checker, f, fname))
else:
with open(path) as f:
alerts.extend(check_alerts_in_file(code_checker, f, path))
data_store = None
if args.store:
(host, port) = args.store.split(":")
data_store = DataStore(host=host, port=port, default_doctype="repoguard", default_index="repoguard")
for alert in alerts:
print 'file:\t%s:%s\nrule:\t%s\nline:\t%s\ndescr:\t%s\n' % (
alert.filename, alert.line_number, alert.rule.name,
alert.line[0:200].strip().replace("\t", " ").decode('utf-8', 'replace'), alert.rule.description,
)
if args.store:
try:
body = {
"check_id": alert.rule.name,
"description": alert.rule.description,
"filename": alert.filename,
"commit_id": alert.commit,
"matching_line": alert.line[0:200].replace("\t", " ").decode('utf-8', 'replace'),
"line_number": alert.line_number,
"diff_line_number": alert.diff_line_number,
"repo_name": alert.repo,
"@timestamp": datetime.datetime.utcnow().isoformat() + 'Z',
"type": "repoguard",
"false_positive": False,
"last_reviewer": "repoguard",
"author": alert.author,
"commit_description": alert.commit_description
}
data_store.store(body=body)
except DataStoreException:
print 'Got exception during storing results to ES.'