-
Notifications
You must be signed in to change notification settings - Fork 38
/
burp_csp_bypass.py
318 lines (281 loc) · 12.1 KB
/
burp_csp_bypass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
"""
@author: moloch
This is a Burp plugin to parse Content-Security-Policy headers and detect
weaknesses and possibly bypasses in the policy.
"""
# pylint: disable=E0602,C0103,W0621,R0903,R0201
from httplib import HTTPResponse
from StringIO import StringIO
from urlparse import urlparse
from burp import IBurpExtender, IScannerCheck
class HttpDummySocket(object):
"""
A dummy socket object so we can use httplib to parse the response bytearray
"""
def __init__(self, byteResponse):
self._file = StringIO(byteResponse)
def makefile(self, *args, **kwargs):
""" API compatability with `socket' """
return self._file
class ContentSecurityPolicyScan(IScannerCheck):
""" Implements the actual passive scan """
def __init__(self, callbacks):
"""
WARNING: Only one IScannerCheck object is ever created, because of this
you effectively cannot use Python's `self' have to pass around any
variables as method args if you want access to them, fml.
"""
self._helpers = callbacks.getHelpers()
# Checks must return a list of IScanIssue objects
self._checks = [
self.deprecatedHeaderCheck,
self.reportOnlyHeaderCheck,
self.unsafeContentSourceCheck,
self.wildcardContentSourceCheck,
self.wildcardSubdomainContentSourceCheck,
self.insecureContentSourceCheck,
self.nonceSourceCheck,
self.missingDirectiveCheck,
self.weakDefaultSourceCheck,
self.knownBypassCheck,
]
def _getUrl(self, burpHttpReqResp):
"""
Uses the Burp helper APIs to get the URL from the HttpReqResp object
"""
return self._helpers.analyzeRequest(burpHttpReqResp).getUrl()
def doPassiveScan(self, burpHttpReqResp):
"""
This is a callback method for Burp, its called for each HTTP req/resp.
Returns a list of IScanIssue(s)
"""
if len(burpHttpReqResp.getResponse()):
return self.proccessHttpResponse(burpHttpReqResp)
else:
return []
def consolidateDuplicateIssues(self, existingIssue, newIssue):
"""
This is a callback method for Burp, and is used to cleanup duplicate
findings.
@return An indication of which issue(s) should be reported in the main
Scanner results.
<code>-1</code> to report the existing issue only
<code>0</code> to report both issues
<code>1</code> to report the new issue only
"""
if existingIssue.getIssueName() == newIssue.getIssueName():
return -1
else:
return 0
def proccessHttpResponse(self, burpHttpReqResp):
""" Processes only the HTTP repsonses with a CSP header """
byteResponse = burpHttpReqResp.getResponse()
httpSocket = HttpDummySocket(bytearray(byteResponse))
response = HTTPResponse(httpSocket)
response.begin()
issues = []
for header in response.getheaders():
if header[0].lower() in ContentSecurityPolicy.HEADERS:
findings = self.parseContentSecurityPolicy(header, burpHttpReqResp)
issues.extend(findings)
return issues
def parseContentSecurityPolicy(self, cspHeader, burpHttpReqResp):
""" Parses the CSP response header and searches for issues """
csp = ContentSecurityPolicy(cspHeader[0], cspHeader[1])
issues = []
for check in self._checks:
issues.extend(check(csp, burpHttpReqResp))
return issues
def deprecatedHeaderCheck(self, csp, burpHttpReqResp):
"""
Checks for the use of a deprecated header such as `X-WebKit-CSP'
"""
issues = []
if csp.is_deprecated_header():
deprecatedHeader = DeprecatedHeader(
httpService=burpHttpReqResp.getHttpService(),
url=self._getUrl(burpHttpReqResp),
httpMessages=burpHttpReqResp,
severity="Medium",
confidence="Certain")
issues.append(deprecatedHeader)
return issues
def reportOnlyHeaderCheck(self, csp, burpHttpReqResp):
"""
Checks for the use of a report-only CSP header
"""
issues = []
if csp.is_report_only_mode():
reportOnly = ReportOnlyHeader(
httpService=burpHttpReqResp.getHttpService(),
url=self._getUrl(burpHttpReqResp),
httpMessages=burpHttpReqResp,
severity="High",
confidence="Certain")
issues.append(reportOnly)
return issues
def unsafeContentSourceCheck(self, csp, burpHttpReqResp):
""" Checks the current CSP header for unsafe content sources """
issues = []
for directive in [SCRIPT_SRC, STYLE_SRC]:
if UNSAFE_EVAL in csp[directive] or UNSAFE_INLINE in csp[directive]:
unsafeContent = UnsafeContentSource(
httpService=burpHttpReqResp.getHttpService(),
url=self._getUrl(burpHttpReqResp),
httpMessages=burpHttpReqResp,
severity="High",
confidence="Certain",
directive=directive)
issues.append(unsafeContent)
return issues
def wildcardContentSourceCheck(self, csp, burpHttpReqResp):
"""
Check content sources for wildcards '*' note that wilcard subdomains
are checked by `wildcardSubdomainContentSourceCheck'
"""
issues = []
for directive, sources in csp.iteritems():
if sources is None:
continue # Skip unspecified directives in NO_FALLBACK
if any(src == "*" for src in sources):
wildcardContent = WildcardContentSource(
httpService=burpHttpReqResp.getHttpService(),
url=self._getUrl(burpHttpReqResp),
httpMessages=burpHttpReqResp,
severity="Medium",
confidence="Certain",
directive=directive)
issues.append(wildcardContent)
return issues
def wildcardSubdomainContentSourceCheck(self, csp, burpHttpReqResp):
""" Check content sources for wildcards subdomains '*.foo.com' """
issues = []
for directive, sources in csp.iteritems():
if sources is None:
continue
# This check is a little hacky but should work well
# the shortest subdomain string should be like *.a.bc
if any("*" in src and 5 <= len(src) for src in sources):
wilcardSubdomain = WildcardSubdomainContentSource(
httpService=burpHttpReqResp.getHttpService(),
url=self._getUrl(burpHttpReqResp),
httpMessages=burpHttpReqResp,
severity="Low",
confidence="Certain",
directive=directive)
issues.append(wilcardSubdomain)
return issues
def nonceSourceCheck(self, csp, burpHttpReqResp):
"""
Check content sources for wildcards '*' note that wilcard subdomains
are checked by `wildcardSubdomainContentSourceCheck'
"""
issues = []
for directive, sources in csp.iteritems():
if sources is None:
continue
if any(src.startswith("'nonce-") for src in sources):
nonceContent = NonceContentSource(
httpService=burpHttpReqResp.getHttpService(),
url=self._getUrl(burpHttpReqResp),
httpMessages=burpHttpReqResp,
severity="Informational",
confidence="Certain",
directive=directive)
issues.append(nonceContent)
return issues
def insecureContentSourceCheck(self, csp, burpHttpReqResp):
""" Check content sources that allow insecure network protocols """
issues = []
for directive, sources in csp.iteritems():
if sources is None:
continue
for src in sources:
if src == HTTP or urlparse(src).scheme in ["http", "ws"]:
insecureContent = InsecureContentDirective(
httpService=burpHttpReqResp.getHttpService(),
url=self._getUrl(burpHttpReqResp),
httpMessages=burpHttpReqResp,
severity="High",
confidence="Certain",
directive=directive)
issues.append(insecureContent)
return issues
def missingDirectiveCheck(self, csp, burpHttpReqResp):
"""
Check for missing directives that do not inherit from `default-src'
"""
issues = []
for directive in ContentSecurityPolicy.NO_FALLBACK:
if directive not in csp:
missingDirective = MissingDirective(
httpService=burpHttpReqResp.getHttpService(),
url=self._getUrl(burpHttpReqResp),
httpMessages=burpHttpReqResp,
severity="Medium",
confidence="Certain",
directive=directive)
issues.append(missingDirective)
return issues
def weakDefaultSourceCheck(self, csp, burpHttpReqResp):
"""
Any `default-src' that is not 'none'/'self'/https: is considered weak
"""
issues = []
for contentSource in csp[DEFAULT_SRC]:
if contentSource not in [SELF, NONE, HTTPS]:
weakDefault = WeakDefaultSource(
httpService=burpHttpReqResp.getHttpService(),
url=self._getUrl(burpHttpReqResp),
httpMessages=burpHttpReqResp,
severity="Medium",
confidence="Certain")
issues.append(weakDefault)
break
return issues
def knownBypassCheck(self, csp, burpHttpReqResp):
"""
Parses the CSP for known bypasses, this check is a little more
complicated, and calls into other subroutines.
"""
issues = []
for directive, knownBypasses in CSP_KNOWN_BYPASSES.iteritems():
bypasses = self._bypassCheckDirective(csp, directive, knownBypasses)
for bypass in bypasses:
bypassIssue = self._createBypassIssue(directive, bypass, burpHttpReqResp)
issues.append(bypassIssue)
return issues
def _createBypassIssue(self, directive, bypass, burpHttpReqResp):
""" Creates the KnownCSPBypass issue object """
knownBypass = KnownCSPBypass(
httpService=burpHttpReqResp.getHttpService(),
url=self._getUrl(burpHttpReqResp),
httpMessages=burpHttpReqResp,
severity="High",
confidence="Certain",
directive=directive,
bypass=bypass)
return knownBypass
def _bypassCheckDirective(self, csp, directive, knownBypasses):
"""
Check an individual directive (e.g. `script-src') to see if it contains
any domains that host known CSP bypasses.
"""
bypasses = []
for src in csp[directive]:
if src.startswith("'") or src in [HTTP, HTTPS, DATA, BLOB]:
continue # We only care about domains
# Iterate over all bypasses and check if `src' allows loading
# content from `domain' if so, we have a bypass!
for domain, payload in knownBypasses:
if csp_match_domains(src, domain):
bypasses.append((domain, payload,))
return bypasses
class BurpExtender(IBurpExtender):
""" Burp extension object """
NAME = "CSP Bypass"
def registerExtenderCallbacks(self, callbacks):
""" Entrypoint and setup """
callbacks.setExtensionName(self.NAME)
callbacks.registerScannerCheck(ContentSecurityPolicyScan(callbacks))
print '[*] CSP-Bypass extension loaded successfully.'