Skip to content

Commit

Permalink
Change Exception types
Browse files Browse the repository at this point in the history
  • Loading branch information
LindseyDurst committed Jun 1, 2023
1 parent 4e3fc74 commit b1a4f8f
Showing 1 changed file with 32 additions and 31 deletions.
63 changes: 32 additions & 31 deletions serveradmin/nessus/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import logging
import requests
import certifi
from ipaddress import IPv4Address, IPv4Network, ip_address, ip_network
from ipaddress import IPv4Address, IPv4Network, ip_address, ip_network, AddressValueError

ca_certificates = certifi.where()

Expand Down Expand Up @@ -132,26 +132,24 @@ def login(self):
if "Invalid Credentials" in response.text:
self.logger.error('Invalid credentials provided! Cannot authenticate to Nessus.')
raise Exception('[FAIL] Invalid credentials provided! Cannot authenticate to Nessus.')
elif response.status_code != 200:
if response.status_code != 200:
self.logger.error('Couldn\'t authenticate! Error returned by Nessus: %s' % (json.loads(response.text)['error']))
raise Exception('[FAIL] Couldn\'t authenticate! Error returned by Nessus: %s' % (json.loads(response.text)['error']))
else:
self.logger.info('Logged in to Nessus using password authentication and X-Api-Token - %s' % (self.api_token))
return json.loads(response.text)['token']
self.logger.info('Logged in to Nessus using password authentication and X-Api-Token - %s' % (self.api_token))
return json.loads(response.text)['token']

def get_api_token(self) -> None:
"""Refresh X-Api-Token value."""
response = self.request('/nessus6.js?v=1642551183681', method='get')
offset = response.text.index('return g(a,[{key:"getApiToken",value:function(){')
token = re.findall(r'return"(.*?)"\}\}', response.text[offset:offset + 100])

if token[0]:
self.api_token = token[0]
self.session.headers['X-Api-Token'] = self.api_token
self.logger.info('Got new X-Api-Token from Nessus - %s' % (self.api_token))
else:
if not token[0]:
self.logger.error('Could not get new X-Api-Token from Nessus')
raise Exception('Could not get new X-Api-Token from Nessus')
self.api_token = token[0]
self.session.headers['X-Api-Token'] = self.api_token
self.logger.info('Got new X-Api-Token from Nessus - %s' % (self.api_token))

def request(self, url, data=None, method='POST', download=False, json_output=False):
"""
Expand All @@ -175,7 +173,7 @@ def request(self, url, data=None, method='POST', download=False, json_output=Fal
try:
response = getattr(self.session, method)(url, data=data, verify=ca_certificates)
break
except Exception as e:
except requests.RequestException as e:
self.logger.error("[!] [CONNECTION ERROR] - Run into connection issue: %s" % (e))
self.logger.error("[!] Retrying in 10 seconds")
time.sleep(10)
Expand All @@ -191,7 +189,7 @@ def request(self, url, data=None, method='POST', download=False, json_output=Fal
continue
self.login()
self.logger.info('Session token refreshed')
except Exception as e:
except requests.RequestException as e:
self.logger.error('Could not refresh session token. Reason: %s' % (str(e)))
else:
success = True
Expand Down Expand Up @@ -255,29 +253,32 @@ def check_if_running(self, new_targets):
scan_ids = set()
if not running_scans['scans']:
return []
else:
for scan in running_scans['scans']:
existing_targets = self.get_scan_targets(scan['scan_id']).split(',')
existing_targets = [ element.strip() for element in existing_targets ]
for existing_target in existing_targets:
ip = None
network = None
for new_target in new_targets:
for scan in running_scans['scans']:
existing_targets = self.get_scan_targets(scan['scan_id']).split(',')
existing_targets = [ element.strip() for element in existing_targets ]
for existing_target in existing_targets:
ip = None
network = None
for new_target in new_targets:
try:
ip = IPv4Address(existing_target)
except AddressValueError:
network = IPv4Network(existing_target)

if ip and ip_address(new_target) and ip == new_target:
scan_ids.add(str(scan['scan_id']))
elif network and ip_address(new_target) and new_target in network:
scan_ids.add(str(scan['scan_id']))
else:
try:
ip = IPv4Address(existing_target)
except Exception:
network = IPv4Network(existing_target)

if ip and ip_address(new_target) and ip == new_target:
scan_ids.add(str(scan['scan_id']))
elif network and ip_address(new_target) and new_target in network:
scan_ids.add(str(scan['scan_id']))
elif network and ip_network(new_target) and network.overlaps(new_target):
scan_ids.add(str(scan['scan_id']))
if network and ip_network(new_target) and network.overlaps(new_target):
scan_ids.add(str(scan['scan_id']))
except TypeError:
continue
scan_ids = list(scan_ids)
return scan_ids

def stop_scan(self, scan_id):
def stop_scan(self, scan_id)
"""
Stop scan.
Expand Down

0 comments on commit b1a4f8f

Please sign in to comment.