Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add responder functionality to TheHive4Py #108

Closed
wants to merge 10 commits into from
86 changes: 84 additions & 2 deletions thehive4py/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def update_case(self, case, fields=[]):
# Choose which attributes to send
update_keys = [
'title', 'description', 'severity', 'startDate', 'owner', 'flag', 'tlp', 'tags', 'status', 'resolutionStatus',
'impactStatus', 'summary', 'endDate', 'metrics', 'customFields'
'impactStatus', 'summary', 'endDate', 'metrics', 'customFields', 'artifacts'
]
data = {k: v for k, v in case.__dict__.items() if (len(fields) > 0 and k in fields) or (len(fields) == 0 and k in update_keys)}
try:
Expand Down Expand Up @@ -415,7 +415,7 @@ def update_alert(self, alert_id, alert, fields=[]):
req = self.url + "/api/alert/{}".format(alert_id)

# update only the alert attributes that are not read-only
update_keys = ['tlp', 'severity', 'tags', 'caseTemplate', 'title', 'description']
update_keys = ['tlp', 'severity', 'tags', 'caseTemplate', 'title', 'description', 'customFields', 'artifacts']

data = {k: v for k, v in alert.__dict__.items() if
(len(fields) > 0 and k in fields) or (len(fields) == 0 and k in update_keys)}
Expand Down Expand Up @@ -489,6 +489,88 @@ def run_analyzer(self, cortex_id, artifact_id, analyzer_id):
except requests.exceptions.RequestException as e:
raise TheHiveException("Analyzer run error: {}".format(e))

def run_responder(self, object_id, object_type, responder_name=None, responder_id=None):

"""
Run a responder by name or id. Must have either name or ID provided.
:param object_id: identifier of the object to run the responder on
:param object_type: type of the object the responder is running on
:param responder_name: Name of the responder to run (optional)
:param responder_id: identifier of the Cortex responder (optional)
:rtype: json
"""
if not responder_id and not responder_name:
raise TheHiveException("Responder run error: No responder ID or Name provided")

data = {
"objectType": object_type,
"objectId": object_id
}

if responder_name:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of all this if statement?

if not self.verify_responder_name(responder_name):
resp_id = self.search_responder_by_name(responder_name)
if resp_id:
data['responderId'] = resp_id
else:
data['responderName'] = responder_name
else:
data['responderId'] = responder_id

req = self.url + "/api/connector/cortex/action"

try:
return requests.post(req, headers={'Content-Type': 'application/json'}, json=data, proxies=self.proxies,
auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise TheHiveException("Responder run error: {}".format(e))

def verify_responder_name(self, responder_name):
"""
Verify the responder name.
:param responder_name: Cortex responder name
:return: bool
"""
data = {
"query": {
"name": responder_name
}
}

req = self.url + "/api/connector/cortex/responder/_search"
try:
responder_list = requests.post(req, headers={'Content-Type': 'application/json'}, json=data,
proxies=self.proxies,
auth=self.auth, verify=self.cert).json()
if len(responder_list) == 1:
return True
return False
except requests.exceptions.RequestException as e:
raise TheHiveException("Responder verify error: {}".format(e))

def search_responder_by_name(self, responder_name):
"""
Find a responder by short name (without version)
:param responder_name: Cortex responder name
:return: str
"""
data = {
"query": {
"_string": responder_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _string is no longer recommended with ES6

}
}

req = self.url + "/api/connector/cortex/responder/_search"
try:
responder_list = requests.post(req, headers={'Content-Type': 'application/json'}, json=data,
proxies=self.proxies,
auth=self.auth, verify=self.cert).json()
if len(responder_list) == 1:
return responder_list[0].get('id')
return None
except requests.exceptions.RequestException as e:
raise TheHiveException("Responder verify error: {}".format(e))

def find_tasks(self, **attributes):
"""
:return: list of Tasks
Expand Down