forked from 20142995/Goby
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
229 lines (209 loc) · 7.86 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import os
import time
import requests
import traceback
import tempfile
import shutil
import hashlib
import re
import json
requests.packages.urllib3.disable_warnings()
class GithubClient:
def __init__(self, token):
self.url = 'https://api.github.com'
self.headers = {
'Authorization': f'Bearer {token}',
'Connection': 'close',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.119 Safari/537.36'
}
self.limit = 0
self.users_octocat()
def connect(self, method, resource, data=None):
'''访问api'''
time.sleep(0.1)
if method == 'GET':
r = requests.get('{0}{1}'.format(
self.url, resource), params=data, headers=self.headers, verify=False, allow_redirects=False)
elif method == 'POST':
r = requests.post('{0}{1}'.format(
self.url, resource), data=data, headers=self.headers, verify=False, allow_redirects=False)
r.encoding = r.apparent_encoding
if 'X-RateLimit-Remaining' in r.headers.keys():
self.limit = int(r.headers['X-RateLimit-Remaining'])
try:
return r.status_code, r.headers, r.json()
except:
return r.status_code, r.headers, r.content
def search_code(self, keyword, page=1, per_page=10):
'''搜索代码'''
try:
time.sleep(2)
data = {'q': keyword, 'sort': 'indexed',
'order': 'desc', 'page': page, 'per_page': per_page}
_, _, rs = self.connect("GET", '/search/code', data=data)
return rs
except:
return {}
def search_repositories(self, keyword, page=1, per_page=10):
'''搜索项目'''
try:
time.sleep(2)
data = {'q': keyword, 'sort': 'updated',
'order': 'desc', 'page': page, 'per_page': per_page}
_, _, rs = self.connect("GET", '/search/repositories', data=data)
return rs
except:
return {}
def repos(self, author, repo):
'''项目信息'''
try:
_, _, rs = self.connect("GET", f'/repos/{author}/{repo}')
return rs
except:
return {}
def repos_commits(self, author, repo):
'''项目commit信息'''
try:
_, _, rs = self.connect(
"GET", f'/repos/{author}/{repo}/commits')
if isinstance(rs, dict):
if rs.get('message', '') == 'Moved Permanently' and 'url' in rs:
_, _, rs1 = self.connect("GET", rs['url'][18:])
if isinstance(rs1, list):
return rs1
elif isinstance(rs, list):
return rs
except:
pass
return []
def repos_releases_latest(self, author, repo):
'''项目最新release'''
try:
_, _, rs = self.connect(
"GET", f'/repos/{author}/{repo}/releases/latest')
return rs
except:
return {}
def users_octocat(self):
'''检查速率限制'''
try:
_, _, _ = self.connect(
"GET", '/users/octocat')
except:
pass
def clone_repo(url):
temp_dir = tempfile.TemporaryDirectory().name
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
os.chdir(temp_dir)
os.system('git clone {}'.format(url))
return os.path.join(temp_dir, url[19:].split('/', 1)[1])
def chr_len2(s):
return int((len(s.encode('utf-8')) - len(s))/2 + len(s))
def _md5(file):
with open(file,'rb') as f:
s = re.sub('\s+','',f.read().decode('utf8',errors='ignore'))
return hashlib.md5(s.encode('utf8')).hexdigest()
def parse(x, y):
s = ''
n = 0
for i in re.sub('\s{2,}', '', x if x else ''):
n += chr_len2(i)
if n >= y:
s += '<br>'
n = 0
s += i
return s
if __name__ == '__main__':
# 更新历史
data = {}
data_file = 'data.json'
if os.path.exists(data_file):
try:
data = json.loads(open(data_file, 'r', encoding='utf8').read())
except:
with open(data_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
else:
with open(data_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
# 项目主页
html_urls = []
gc = GithubClient(os.getenv('GH_TOKEN'))
# 搜索项目
try:
rs = gc.search_repositories("goby", page=1, per_page=100)
html_urls += [item['html_url']
for item in rs.get('items', []) if item.get('html_url')]
except:
traceback.print_exc()
# 本地路径
root_path = os.path.dirname(os.path.abspath(__file__))
# 搜索代码,获取项目主页
try:
rs = gc.search_code("GobyQuery+language:Go",
page=1, per_page=100)
html_urls += [item['repository']['html_url']
for item in rs.get('items', []) if item.get('repository', {}).get('html_url')]
except:
traceback.print_exc()
try:
rs = gc.search_code("GobyQuery+language:Json",
page=1, per_page=100)
html_urls += [item['repository']['html_url']
for item in rs.get('items', []) if item.get('repository', {}).get('html_url')]
except:
traceback.print_exc()
html_urls = set(html_urls)
print(f'[+] html_urls: {len(html_urls)}')
# 克隆项目代码并复制poc
for url in html_urls:
print(url)
try:
repo_path = clone_repo(url)
if not os.path.exists(repo_path):
continue
for root, _, files in os.walk(repo_path):
for file in files:
if not file.endswith('.go') and not file.endswith('.json'):
continue
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf8') as f:
content = f.read()
if 'GobyQuery' in content and 'ScanSteps' in content:
md5 = _md5(file_path)
if md5 not in data:
shutil.copyfile(file_path, os.path.join(
root_path, 'poc', file))
data[md5] = {'name': file, 'from': url, "up_time": time.strftime(
"%Y-%m-%d %H:%M:%S")}
except:
traceback.print_exc()
except:
traceback.print_exc()
os.chdir(root_path)
# 清理无效data
md5s = []
for file in os.listdir(os.path.join(root_path, 'poc')):
if not file.endswith('.go') and not file.endswith('.json'):
continue
md5 = _md5(os.path.join(root_path, 'poc', file))
md5s.append(md5)
for md5 in [md5 for md5 in data.keys() if md5 not in md5s]:
del data[md5]
# 写入README.md
readme_md = '## goby poc (共{}个) 最近一次检查时间 {}\n'.format(
len(data.keys()), time.strftime("%Y-%m-%d %H:%M:%S"))
readme_md += '### 收集记录\n| 文件名称 | 收录时间 |\n| :----| :---- |\n'
_data = sorted(data.values(), key=lambda x: x['up_time'], reverse=True)
for item in _data:
readme_md += '| [{}]({}) | {} |\n'.format(parse(item['name'], 50),
item['from'], item['up_time'])
with open('README.md', 'w', encoding='utf8') as f:
f.write(readme_md)
# 写入data
with open(data_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)