From 401262f5d2a4c86efb6771633d7dfbc06bea5ebc Mon Sep 17 00:00:00 2001 From: Alona Kotenieva Date: Wed, 31 May 2023 16:21:11 +0200 Subject: [PATCH] Change Exception types --- serveradmin/nessus/models.py | 61 ++++++++++--------- .../nessus/templates/nessus/nessus.html | 8 +-- serveradmin/nessus/urls.py | 2 +- serveradmin/nessus/views.py | 39 ++++++------ 4 files changed, 57 insertions(+), 53 deletions(-) diff --git a/serveradmin/nessus/models.py b/serveradmin/nessus/models.py index 4a69cedb..22450e50 100644 --- a/serveradmin/nessus/models.py +++ b/serveradmin/nessus/models.py @@ -10,7 +10,7 @@ import logging import requests import certifi -from ipaddress import IPv4Address, IPv4Network, ip_address, ip_network +from ipaddress import IPv4Address, IPv4Network, ip_address, ip_network, AddressValueError ca_certificates = certifi.where() @@ -132,12 +132,11 @@ def login(self): if "Invalid Credentials" in response.text: self.logger.error('Invalid credentials provided! Cannot authenticate to Nessus.') raise Exception('[FAIL] Invalid credentials provided! Cannot authenticate to Nessus.') - elif response.status_code != 200: + if response.status_code != 200: self.logger.error('Couldn\'t authenticate! Error returned by Nessus: %s' % (json.loads(response.text)['error'])) raise Exception('[FAIL] Couldn\'t authenticate! Error returned by Nessus: %s' % (json.loads(response.text)['error'])) - else: - self.logger.info('Logged in to Nessus using password authentication and X-Api-Token - %s' % (self.api_token)) - return json.loads(response.text)['token'] + self.logger.info('Logged in to Nessus using password authentication and X-Api-Token - %s' % (self.api_token)) + return json.loads(response.text)['token'] def get_api_token(self) -> None: """Refresh X-Api-Token value.""" @@ -145,13 +144,12 @@ def get_api_token(self) -> None: offset = response.text.index('return g(a,[{key:"getApiToken",value:function(){') token = re.findall(r'return"(.*?)"\}\}', response.text[offset:offset + 100]) - if token[0]: - self.api_token = token[0] - self.session.headers['X-Api-Token'] = self.api_token - self.logger.info('Got new X-Api-Token from Nessus - %s' % (self.api_token)) - else: + if not token[0]: self.logger.error('Could not get new X-Api-Token from Nessus') raise Exception('Could not get new X-Api-Token from Nessus') + self.api_token = token[0] + self.session.headers['X-Api-Token'] = self.api_token + self.logger.info('Got new X-Api-Token from Nessus - %s' % (self.api_token)) def request(self, url, data=None, method='POST', download=False, json_output=False): """ @@ -175,7 +173,7 @@ def request(self, url, data=None, method='POST', download=False, json_output=Fal try: response = getattr(self.session, method)(url, data=data, verify=ca_certificates) break - except Exception as e: + except requests.RequestException as e: self.logger.error("[!] [CONNECTION ERROR] - Run into connection issue: %s" % (e)) self.logger.error("[!] Retrying in 10 seconds") time.sleep(10) @@ -191,7 +189,7 @@ def request(self, url, data=None, method='POST', download=False, json_output=Fal continue self.login() self.logger.info('Session token refreshed') - except Exception as e: + except requests.RequestException as e: self.logger.error('Could not refresh session token. Reason: %s' % (str(e))) else: success = True @@ -255,25 +253,28 @@ def check_if_running(self, new_targets): scan_ids = set() if not running_scans['scans']: return [] - else: - for scan in running_scans['scans']: - existing_targets = self.get_scan_targets(scan['scan_id']).split(',') - existing_targets = [ element.strip() for element in existing_targets ] - for existing_target in existing_targets: - ip = None - network = None - for new_target in new_targets: + for scan in running_scans['scans']: + existing_targets = self.get_scan_targets(scan['scan_id']).split(',') + existing_targets = [ element.strip() for element in existing_targets ] + for existing_target in existing_targets: + ip = None + network = None + for new_target in new_targets: + try: + ip = IPv4Address(existing_target) + except AddressValueError: + network = IPv4Network(existing_target) + + if ip and ip_address(new_target) and ip == new_target: + scan_ids.add(str(scan['scan_id'])) + elif network and ip_address(new_target) and new_target in network: + scan_ids.add(str(scan['scan_id'])) + else: try: - ip = IPv4Address(existing_target) - except Exception: - network = IPv4Network(existing_target) - - if ip and ip_address(new_target) and ip == new_target: - scan_ids.add(str(scan['scan_id'])) - elif network and ip_address(new_target) and new_target in network: - scan_ids.add(str(scan['scan_id'])) - elif network and ip_network(new_target) and network.overlaps(new_target): - scan_ids.add(str(scan['scan_id'])) + if network and ip_network(new_target) and network.overlaps(new_target): + scan_ids.add(str(scan['scan_id'])) + except TypeError: + continue scan_ids = list(scan_ids) return scan_ids diff --git a/serveradmin/nessus/templates/nessus/nessus.html b/serveradmin/nessus/templates/nessus/nessus.html index 39abbe91..addad8b9 100644 --- a/serveradmin/nessus/templates/nessus/nessus.html +++ b/serveradmin/nessus/templates/nessus/nessus.html @@ -1,14 +1,14 @@ {% extends "base.html" %} {% block title %} - Scan configuration + Nessus Scan configuration {% endblock %} {% block content %}
-

Scan configuration

+

Nessus Scan configuration

@@ -34,11 +34,11 @@

Scan configuration

- +
- +
diff --git a/serveradmin/nessus/urls.py b/serveradmin/nessus/urls.py index e65abd87..4435ad47 100644 --- a/serveradmin/nessus/urls.py +++ b/serveradmin/nessus/urls.py @@ -1,6 +1,6 @@ """Serveradmin - Nessus Integration -Copyright (c) 2020 InnoGames GmbH +Copyright (c) 2023 InnoGames GmbH """ from django.urls import path diff --git a/serveradmin/nessus/views.py b/serveradmin/nessus/views.py index 8280ac21..0d53377a 100644 --- a/serveradmin/nessus/views.py +++ b/serveradmin/nessus/views.py @@ -26,38 +26,41 @@ def nessus_config(request): :return: """ - hostnames = [h for h in request.GET.get('hosts', '').split(', ')] - object_ids = [o.strip() for o in request.GET.getlist('object_id', []) if o] + object_ids = request.GET.getlist('object_id', []) + email = request.user.email - if len(hostnames) == 0 and len(object_ids) == 0: - return HttpResponseBadRequest('No hostname or object_id provided') - - servers = {s['hostname']: s for s in - Query({'hostname': Any(*hostnames)}, None)} - servers.update({s['hostname']: s for s in - Query({'object_id': Any(*object_ids)}, None)}) - - if request.GET.get('action') == 'Submit': - user_email = request.GET['email'] + if request.GET.get('action') == 'Start Scan': + email = request.GET['email'] scan_type = request.GET['type'] + hostnames = request.GET['hosts'].replace(', ', ' ').replace(',',' ').split(' ') try: nessus = NessusAPI(username=settings.NESSUS_USER, password=settings.NESSUS_PASSWORD, url=settings.NESSUS_URL) policy_id = settings.NESSUS_POLICIES[scan_type] uuid = settings.NESSUS_UUID folder_id = settings.NESSUS_FOLDER - ips = [ s['intern_ip'] for s in Query({'hostname': Any(*hostnames)}, None) ] + ips = [ s['intern_ip'] for s in Query({'hostname': Any(*hostnames)}, ['intern_ip']) ] scan_ids = nessus.check_if_running(ips) if not scan_ids: try: - nessus.create_scan(scan_name=', '.join(hostnames), uuid=uuid, folder_id=folder_id, target=ips, policy_id=policy_id, receiver=user_email) - messages.info(request, str('Scan started.')) + nessus.create_scan(scan_name=', '.join(hostnames), uuid=uuid, folder_id=folder_id, target=ips, policy_id=policy_id, receiver=email) + messages.info(request, 'Scan started.') except Exception as error: - messages.error(request, str('Scan could not be started. %s' % (error))) + messages.error(request, 'Scan could not be started. %s' % (error)) else: - messages.error(request, str('Scan for at least one of the targets is already running with scan id: %s.' % (', '.join(scan_ids)))) + messages.error(request, 'Scan for at least one of the targets is already running with scan id: %s.' % (', '.join(scan_ids))) except IOError as error: return HttpResponseServerError("Communication with nessus failed.") + else: + if len(object_ids) == 0: + return HttpResponseBadRequest('No hostname or object_id provided') + + servers = Query({'object_id': Any(*object_ids)}, ['hostname', 'intern_ip']) + hostnames = [ s['hostname'] for s in servers ] + for server in servers: + if not server['intern_ip']: + return HttpResponseBadRequest('Submitted object does not have intern_ip') return TemplateResponse(request, 'nessus/nessus.html', { - 'hostnames': servers.keys(), + 'hostnames': hostnames, + 'email': email })