Skip to content

Commit

Permalink
Change Exception types
Browse files Browse the repository at this point in the history
  • Loading branch information
LindseyDurst authored and kofrezo committed Sep 29, 2023
1 parent 66e3693 commit 8685e74
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 53 deletions.
61 changes: 31 additions & 30 deletions serveradmin/nessus/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import logging
import requests
import certifi
from ipaddress import IPv4Address, IPv4Network, ip_address, ip_network
from ipaddress import IPv4Address, IPv4Network, ip_address, ip_network, AddressValueError

ca_certificates = certifi.where()

Expand Down Expand Up @@ -132,26 +132,24 @@ def login(self):
if "Invalid Credentials" in response.text:
self.logger.error('Invalid credentials provided! Cannot authenticate to Nessus.')
raise Exception('[FAIL] Invalid credentials provided! Cannot authenticate to Nessus.')
elif response.status_code != 200:
if response.status_code != 200:
self.logger.error('Couldn\'t authenticate! Error returned by Nessus: %s' % (json.loads(response.text)['error']))
raise Exception('[FAIL] Couldn\'t authenticate! Error returned by Nessus: %s' % (json.loads(response.text)['error']))
else:
self.logger.info('Logged in to Nessus using password authentication and X-Api-Token - %s' % (self.api_token))
return json.loads(response.text)['token']
self.logger.info('Logged in to Nessus using password authentication and X-Api-Token - %s' % (self.api_token))
return json.loads(response.text)['token']

def get_api_token(self) -> None:
"""Refresh X-Api-Token value."""
response = self.request('/nessus6.js?v=1642551183681', method='get')
offset = response.text.index('return g(a,[{key:"getApiToken",value:function(){')
token = re.findall(r'return"(.*?)"\}\}', response.text[offset:offset + 100])

if token[0]:
self.api_token = token[0]
self.session.headers['X-Api-Token'] = self.api_token
self.logger.info('Got new X-Api-Token from Nessus - %s' % (self.api_token))
else:
if not token[0]:
self.logger.error('Could not get new X-Api-Token from Nessus')
raise Exception('Could not get new X-Api-Token from Nessus')
self.api_token = token[0]
self.session.headers['X-Api-Token'] = self.api_token
self.logger.info('Got new X-Api-Token from Nessus - %s' % (self.api_token))

def request(self, url, data=None, method='POST', download=False, json_output=False):
"""
Expand All @@ -175,7 +173,7 @@ def request(self, url, data=None, method='POST', download=False, json_output=Fal
try:
response = getattr(self.session, method)(url, data=data, verify=ca_certificates)
break
except Exception as e:
except requests.RequestException as e:
self.logger.error("[!] [CONNECTION ERROR] - Run into connection issue: %s" % (e))
self.logger.error("[!] Retrying in 10 seconds")
time.sleep(10)
Expand All @@ -191,7 +189,7 @@ def request(self, url, data=None, method='POST', download=False, json_output=Fal
continue
self.login()
self.logger.info('Session token refreshed')
except Exception as e:
except requests.RequestException as e:
self.logger.error('Could not refresh session token. Reason: %s' % (str(e)))
else:
success = True
Expand Down Expand Up @@ -255,25 +253,28 @@ def check_if_running(self, new_targets):
scan_ids = set()
if not running_scans['scans']:
return []
else:
for scan in running_scans['scans']:
existing_targets = self.get_scan_targets(scan['scan_id']).split(',')
existing_targets = [ element.strip() for element in existing_targets ]
for existing_target in existing_targets:
ip = None
network = None
for new_target in new_targets:
for scan in running_scans['scans']:
existing_targets = self.get_scan_targets(scan['scan_id']).split(',')
existing_targets = [ element.strip() for element in existing_targets ]
for existing_target in existing_targets:
ip = None
network = None
for new_target in new_targets:
try:
ip = IPv4Address(existing_target)
except AddressValueError:
network = IPv4Network(existing_target)

if ip and ip_address(new_target) and ip == new_target:
scan_ids.add(str(scan['scan_id']))
elif network and ip_address(new_target) and new_target in network:
scan_ids.add(str(scan['scan_id']))
else:
try:
ip = IPv4Address(existing_target)
except Exception:
network = IPv4Network(existing_target)

if ip and ip_address(new_target) and ip == new_target:
scan_ids.add(str(scan['scan_id']))
elif network and ip_address(new_target) and new_target in network:
scan_ids.add(str(scan['scan_id']))
elif network and ip_network(new_target) and network.overlaps(new_target):
scan_ids.add(str(scan['scan_id']))
if network and ip_network(new_target) and network.overlaps(new_target):
scan_ids.add(str(scan['scan_id']))
except TypeError:
continue
scan_ids = list(scan_ids)
return scan_ids

Expand Down
8 changes: 4 additions & 4 deletions serveradmin/nessus/templates/nessus/nessus.html
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{% extends "base.html" %}

{% block title %}
Scan configuration
Nessus Scan configuration
{% endblock %}

{% block content %}
<div class="row">
<div class="col-md-1"></div>
<div class="col-md-9">
<h3>Scan configuration</h3>
<h3>Nessus Scan configuration</h3>
</div>
</div>
<div class="row">
Expand All @@ -34,11 +34,11 @@ <h3>Scan configuration</h3>
<div class="form-group row input-controls">
<label for="email" class="col-sm-1 col-form-label">Receiving email:</label>
<div class="col-sm-8">
<input id="email" name="email" type="text" value="user@email.com" class="form-control form-control-sm" />
<input id="email" name="email" type="text" value="{{ email }}" class="form-control form-control-sm" />
</div>
</div>
<div class="form-group row input-controls buttons">
<input class="btn btn-success" name="action" type="submit" value="Submit" action="/nessus" />
<input class="btn-success btn-lg" name="action" type="submit" value="Start Scan" action="/nessus" />
</div>
</form>
</div>
Expand Down
2 changes: 1 addition & 1 deletion serveradmin/nessus/urls.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Serveradmin - Nessus Integration
Copyright (c) 2020 InnoGames GmbH
Copyright (c) 2023 InnoGames GmbH
"""

from django.urls import path
Expand Down
39 changes: 21 additions & 18 deletions serveradmin/nessus/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,41 @@ def nessus_config(request):
:return:
"""

hostnames = [h for h in request.GET.get('hosts', '').split(', ')]
object_ids = [o.strip() for o in request.GET.getlist('object_id', []) if o]
object_ids = request.GET.getlist('object_id', [])
email = request.user.email

if len(hostnames) == 0 and len(object_ids) == 0:
return HttpResponseBadRequest('No hostname or object_id provided')

servers = {s['hostname']: s for s in
Query({'hostname': Any(*hostnames)}, None)}
servers.update({s['hostname']: s for s in
Query({'object_id': Any(*object_ids)}, None)})

if request.GET.get('action') == 'Submit':
user_email = request.GET['email']
if request.GET.get('action') == 'Start Scan':
email = request.GET['email']
scan_type = request.GET['type']
hostnames = request.GET['hosts'].replace(', ', ' ').replace(',',' ').split(' ')
try:
nessus = NessusAPI(username=settings.NESSUS_USER, password=settings.NESSUS_PASSWORD, url=settings.NESSUS_URL)
policy_id = settings.NESSUS_POLICIES[scan_type]
uuid = settings.NESSUS_UUID
folder_id = settings.NESSUS_FOLDER
ips = [ s['intern_ip'] for s in Query({'hostname': Any(*hostnames)}, None) ]
ips = [ s['intern_ip'] for s in Query({'hostname': Any(*hostnames)}, ['intern_ip']) ]
scan_ids = nessus.check_if_running(ips)
if not scan_ids:
try:
nessus.create_scan(scan_name=', '.join(hostnames), uuid=uuid, folder_id=folder_id, target=ips, policy_id=policy_id, receiver=user_email)
messages.info(request, str('Scan started.'))
nessus.create_scan(scan_name=', '.join(hostnames), uuid=uuid, folder_id=folder_id, target=ips, policy_id=policy_id, receiver=email)
messages.info(request, 'Scan started.')
except Exception as error:
messages.error(request, str('Scan could not be started. %s' % (error)))
messages.error(request, 'Scan could not be started. %s' % (error))
else:
messages.error(request, str('Scan for at least one of the targets is already running with scan id: %s.' % (', '.join(scan_ids))))
messages.error(request, 'Scan for at least one of the targets is already running with scan id: %s.' % (', '.join(scan_ids)))
except IOError as error:
return HttpResponseServerError("Communication with nessus failed.")
else:
if len(object_ids) == 0:
return HttpResponseBadRequest('No hostname or object_id provided')

servers = Query({'object_id': Any(*object_ids)}, ['hostname', 'intern_ip'])
hostnames = [ s['hostname'] for s in servers ]
for server in servers:
if not server['intern_ip']:
return HttpResponseBadRequest('Submitted object does not have intern_ip')

return TemplateResponse(request, 'nessus/nessus.html', {
'hostnames': servers.keys(),
'hostnames': hostnames,
'email': email
})

0 comments on commit 8685e74

Please sign in to comment.