From 4e6d924eee6d3fafdd3c868f5e0425db30d27ed6 Mon Sep 17 00:00:00 2001 From: Ofir Date: Tue, 26 Mar 2024 18:04:56 +0200 Subject: [PATCH 1/6] add alerts history with filters --- CHANGES | 4 + README.md | 14 +- examples/get_alerts_history.py | 21 +++ intezer_sdk/alerts.py | 221 ++++++++++++++++++++++++++++++++ intezer_sdk/alerts_results.py | 27 ++++ intezer_sdk/analyses_results.py | 63 +-------- intezer_sdk/history_results.py | 79 ++++++++++++ tests/unit/test_results.py | 21 +++ 8 files changed, 390 insertions(+), 60 deletions(-) create mode 100644 examples/get_alerts_history.py create mode 100644 intezer_sdk/alerts_results.py create mode 100644 intezer_sdk/history_results.py diff --git a/CHANGES b/CHANGES index 65903dd..2e40803 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,7 @@ +1.20 +_______ +- Support getting alerts history with filters from server. + 1.19.17 _______ - Add sandbox_machine_type to FileAnalysis diff --git a/README.md b/README.md index 712270b..e41cf70 100644 --- a/README.md +++ b/README.md @@ -242,13 +242,25 @@ for analyse in history_results: print(analyse) ``` -### Get alert by id +### Alerts +#### Get alert by id ```python alert = Alert.from_id(alert_id=alert_id, fetch_scans=False, wait=False) ``` +#### Alerts History + +```python +history_results = query_file_analyses_history( + api = , + filters** +) +for analyse in history_results: + print(analyse) +``` + ## Code examples You can find more code examples under [analyze-python-sdk/examples/](https://github.com/intezer/analyze-python-sdk/tree/master/examples) directory diff --git a/examples/get_alerts_history.py b/examples/get_alerts_history.py new file mode 100644 index 0000000..9e782d0 --- /dev/null +++ b/examples/get_alerts_history.py @@ -0,0 +1,21 @@ +import datetime +import sys +from pprint import pprint + +from intezer_sdk.alerts import query_alerts_history +from intezer_sdk import api + + +def alerts_history_example(start_date: datetime.datetime, end_date: datetime.datetime): + results = query_alerts_history(start_time=start_date, end_time=end_date) + for result in results: + pprint(result) + + +def main(args): + api.set_global_api('') + alerts_history_example(**args) + + +if __name__ == '__main__': + main(sys.argv[1:]) \ No newline at end of file diff --git a/intezer_sdk/alerts.py b/intezer_sdk/alerts.py index d6a6cef..8d3716c 100644 --- a/intezer_sdk/alerts.py +++ b/intezer_sdk/alerts.py @@ -3,6 +3,7 @@ import time import datetime from io import BytesIO +from typing import Any from typing import BinaryIO import requests @@ -14,6 +15,7 @@ from typing import Optional from intezer_sdk._api import IntezerApi +from intezer_sdk.alerts_results import AlertsHistoryResult from intezer_sdk.analysis import FileAnalysis from intezer_sdk.analysis import UrlAnalysis from intezer_sdk.endpoint_analysis import EndpointAnalysis @@ -23,6 +25,10 @@ from intezer_sdk import errors from intezer_sdk import consts +DEFAULT_LIMIT = 100 +DEFAULT_OFFSET = 0 +ALERTS_SEARCH_REQUEST = '/alerts/search' + def get_alerts_by_alert_ids(alert_ids: List[str], environments: List[str] = None, @@ -40,6 +46,221 @@ def get_alerts_by_alert_ids(alert_ids: List[str], return result['alerts_count'], result['alerts'] +def generate_alerts_history_search_filters(*, + start_time: datetime.datetime = None, + end_time: datetime.datetime = None, + environments: List[str] = None, + offset: int = None, + limit: int = None, + sources: List[str] = None, + risk_categories: List[str] = None, + alert_verdicts: List[str] = None, + family_names: List[str] = None, + response_statuses: List[str] = None, + hostnames: List[str] = None, + free_text: str = None, + site_name: str = None, + account_name: str = None, + exclude_alert_ids: List[str] = None, + usernames: List[str] = None, + file_hashes: List[str] = None, + process_commandlines: List[str] = None, + sort_by: List[str] = None, + is_mitigated: bool = None, + email_sender: str = None, + email_recipient: str = None, + email_subject: str = None, + email_cc: str = None, + email_bcc: str = None, + email_message_id: str = None, + email_reported_by: str = None, + device_private_ips: List[str] = None, + device_external_ips: List[str] = None, + device_ids: List[str] = None, + time_filter_type: List[str] = None, + sort_order: str = None) -> Dict[str, Any]: + filters = {} + if start_time: + filters['start_time'] = start_time.timestamp() + if end_time: + filters['end_time'] = end_time.timestamp() + if environments: + filters['environments'] = environments + if offset is not None: + filters['offset'] = offset + if limit: + filters['limit'] = limit + if sources: + filters['sources'] = sources + if risk_categories: + filters['risk_categories'] = risk_categories + if alert_verdicts: + filters['alert_verdicts'] = alert_verdicts + if family_names: + filters['family_names'] = family_names + if response_statuses: + filters['response_statuses'] = response_statuses + if hostnames: + filters['hostnames'] = hostnames + if free_text: + filters['free_text'] = free_text + if site_name: + filters['site_name'] = site_name + if account_name: + filters['account_name'] = account_name + if exclude_alert_ids: + filters['exclude_alert_ids'] = exclude_alert_ids + if usernames: + filters['usernames'] = usernames + if file_hashes: + filters['file_hashes'] = file_hashes + if process_commandlines: + filters['process_commandlines'] = process_commandlines + if sort_by: + filters['sort_by'] = sort_by + if is_mitigated: + filters['is_mitigated'] = is_mitigated + if email_sender: + filters['email_sender'] = email_sender + if email_recipient: + filters['email_recipient'] = email_recipient + if email_subject: + filters['email_subject'] = email_subject + if email_cc: + filters['email_cc'] = email_cc + if email_bcc: + filters['email_bcc'] = email_bcc + if email_message_id: + filters['email_message_id'] = email_message_id + if email_reported_by: + filters['email_reported_by'] = email_reported_by + if device_private_ips: + filters['device_private_ips'] = device_private_ips + if device_external_ips: + filters['device_external_ips'] = device_external_ips + if device_ids: + filters['device_ids'] = device_ids + if time_filter_type: + filters['time_filter_type'] = time_filter_type + if sort_order: + filters['sort_order'] = sort_order + + return filters + + +def query_alerts_history(*, + start_time: datetime.datetime = None, + end_time: datetime.datetime = None, + api: IntezerApiClient = None, + environments: List[str] = None, + offset: int = DEFAULT_OFFSET, + limit: int = DEFAULT_LIMIT, + sources: List[str] = None, + risk_categories: List[str] = None, + alert_verdicts: List[str] = None, + family_names: List[str] = None, + response_statuses: List[str] = None, + hostnames: List[str] = None, + free_text: str = None, + site_name: str = None, + account_name: str = None, + exclude_alert_ids: List[str] = None, + usernames: List[str] = None, + file_hashes: List[str] = None, + process_commandlines: List[str] = None, + sort_by: List[str] = None, + is_mitigated: bool = None, + email_sender: str = None, + email_recipient: str = None, + email_subject: str = None, + email_cc: str = None, + email_bcc: str = None, + email_message_id: str = None, + email_reported_by: str = None, + device_private_ips: List[str] = None, + device_external_ips: List[str] = None, + device_ids: List[str] = None, + time_filter_type: List[str] = None, + sort_order: str = None) -> AlertsHistoryResult: + """ + Query for alerts history with query param. + + :param environments: Query alerts only from these environments. + :param offset: Offset to start querying from - used for pagination. + :param limit: Maximum number of alerts to return - used for pagination. + :param start_time: Query alerts that were created after this timestamp (in UTC). + :param end_time: Query alerts that were created before this timestamp (in UTC). + :param api: Instance of Intezer API for request server. + :param sources: Query alerts only with these sources (Valid source can be taken from AlertSource). + :param risk_categories: Query alerts only with these risk categories. + :param alert_verdicts: Query alerts only with these alert verdicts. + :param family_names: Query alerts only with these family names. + :param response_statuses: Query alerts only with these response statuses. + :param hostnames: Query alerts only with these hostnames. + :param free_text: Query alerts that contain this text in the following fields: family name, hostname, alert verdict. + :param site_name: Query alerts only with this site name. + :param account_name: Query alerts only with this account name. + :param exclude_alert_ids: Query alerts that do not have these alert ids. + :param usernames: Query alerts only with these usernames. + :param file_hashes: Query alerts only with these file hashes. + :param process_commandlines: Query alerts only with these process commandlines. + :param sort_by: Sort alerts only with this sort_by_key value. + :param is_mitigated: Query alerts only with this is_mitigated value. + :param email_sender: Query alerts only with these email sender. + :param email_recipient: Query alerts only with these email recipient. + :param email_subject: Query alerts only with this email subject. + :param email_cc: Sort alerts only with this email cc. + :param email_bcc: Sort alerts only with this email bcc. + :param email_message_id: Sort alerts only with this email message id. + :param email_reported_by: Sort alerts only with this email reported by. + :param device_private_ips: Query alerts only with these private ips. + :param device_external_ips: Query alerts only with these external ips. + :param device_ids: Query alerts only with these device ids. + :param time_filter_type: The time value to filter alerts by. + :param sort_order: The order to sort the alerts by. + + :return: Alert query result from server as Results iterator. + """ + api = api or get_global_api() + api.assert_any_on_premise() + filters = generate_alerts_history_search_filters( + start_time=start_time, + end_time=end_time, + environments=environments, + offset=offset, + limit=limit, + sources=sources, + risk_categories=risk_categories, + alert_verdicts=alert_verdicts, + family_names=family_names, + response_statuses=response_statuses, + hostnames=hostnames, + free_text=free_text, + site_name=site_name, + account_name=account_name, + exclude_alert_ids=exclude_alert_ids, + usernames=usernames, + file_hashes=file_hashes, + process_commandlines=process_commandlines, + sort_by=sort_by, + is_mitigated=is_mitigated, + email_sender=email_sender, + email_recipient=email_recipient, + email_subject=email_subject, + email_cc=email_cc, + email_bcc=email_bcc, + email_message_id=email_message_id, + email_reported_by=email_reported_by, + device_private_ips=device_private_ips, + device_external_ips=device_external_ips, + device_ids=device_ids, + time_filter_type=time_filter_type, + sort_order=sort_order + ) + + return AlertsHistoryResult(ALERTS_SEARCH_REQUEST, api, filters) + + class Alert: """ The Alert class is used to represent an alert from the Intezer Analyze API. diff --git a/intezer_sdk/alerts_results.py b/intezer_sdk/alerts_results.py new file mode 100644 index 0000000..78f0a00 --- /dev/null +++ b/intezer_sdk/alerts_results.py @@ -0,0 +1,27 @@ +from typing import List +from typing import Dict +from typing import Tuple + +from intezer_sdk.api import IntezerApiClient +from intezer_sdk.api import raise_for_status +from intezer_sdk.history_results import HistoryResult + + +class AlertsHistoryResult(HistoryResult): + def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): + super().__init__(request_url_path, api, filters) + + def _fetch_history(self, url_path: str, data: Dict + ) -> Tuple[int, List]: + """ + Request from server filtered alerts history. + :param url_path: Url to request new data from. + :param data: filtered data. + :return: Count of all results exits in filtered request and amount + analyses as requested. + """ + response = self._api.request_with_refresh_expired_access_token( + path=url_path, method='POST', data=data) + raise_for_status(response) + data_response = response.json()['result'] + return data_response['alerts_count'], data_response['alerts'] diff --git a/intezer_sdk/analyses_results.py b/intezer_sdk/analyses_results.py index b94223f..af57cb1 100644 --- a/intezer_sdk/analyses_results.py +++ b/intezer_sdk/analyses_results.py @@ -5,69 +5,14 @@ from intezer_sdk.api import IntezerApiClient from intezer_sdk.api import raise_for_status +from intezer_sdk.history_results import HistoryResult -class AnalysesHistoryResult: +class AnalysesHistoryResult(HistoryResult): def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): - """ - Fetch all analyses history results from server. - - :param request_url_path: Url to request new filter from. - :param api: Instance of Intezer API for request server. - :param filters: Filters requested from server. - """ - self._api = api - self.filters: Dict = filters - self._pages: List[Any] = [] - self._current_page: List[Any] = [] - self._request_url_path: str = request_url_path - self._total_count: int = 0 - self._current_offset: int = 0 - - def __iter__(self): - """Iterate between page.""" - if not self._current_page: - self._fetch_page() - if self._current_page: - yield from self._current_page - if len(self._pages) * self.filters['limit'] < self._total_count: - self._fetch_page() - yield from iter(self) - - def __len__(self) -> int: - """Amount of results fetched currently.""" - return self._total_count - - @property - def current_page(self) -> list: - """Get current page, if not exits, ask a new one from server.""" - return self._current_page or self._fetch_page() - - def all(self) -> list: - """List all remaining and exists analysis's from server.""" - results = list(self) - if self._pages: - self._current_page = self._pages[0] - return results - - def _fetch_page(self) -> list: - """Request for new page from server.""" - self.filters['offset'] = self._current_offset - self._total_count, new_page = self._fetch_analyses_history( - self._request_url_path, self.filters) - - if new_page: - self._current_page = new_page - self._update_current_page_metadata() - return self._current_page - - def _update_current_page_metadata(self): - """Update all metadata about new current page.""" - self._current_offset += len(self._current_page) - self._pages.append(self._current_page) + super().__init__(request_url_path, api, filters) - def _fetch_analyses_history(self, url_path: str, data: Dict - ) -> Tuple[int, List]: + def _fetch_history(self, url_path: str, data: Dict) -> Tuple[int, List]: """ Request from server filtered analyses history. :param url_path: Url to request new data from. diff --git a/intezer_sdk/history_results.py b/intezer_sdk/history_results.py new file mode 100644 index 0000000..e66b334 --- /dev/null +++ b/intezer_sdk/history_results.py @@ -0,0 +1,79 @@ +import abc +from typing import List +from typing import Any +from typing import Dict +from typing import Tuple + +from intezer_sdk.api import IntezerApiClient + + +class HistoryResult: + def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): + """ + Fetch all analyses history results from server. + + :param request_url_path: Url to request new filter from. + :param api: Instance of Intezer API for request server. + :param filters: Filters requested from server. + """ + self._api = api + self.filters: Dict = filters + self._pages: List[Any] = [] + self._current_page: List[Any] = [] + self._request_url_path: str = request_url_path + self._total_count: int = 0 + self._current_offset: int = 0 + + def __iter__(self): + """Iterate between page.""" + if not self._current_page: + self._fetch_page() + if self._current_page: + yield from self._current_page + if len(self._pages) * self.filters['limit'] < self._total_count: + self._fetch_page() + yield from iter(self) + + def __len__(self) -> int: + """Amount of results fetched currently.""" + return self._total_count + + @property + def current_page(self) -> list: + """Get current page, if not exits, ask a new one from server.""" + return self._current_page or self._fetch_page() + + def all(self) -> list: + """List all remaining and exists analysis's from server.""" + results = list(self) + if self._pages: + self._current_page = self._pages[0] + return results + + def _fetch_page(self) -> list: + """Request for new page from server.""" + self.filters['offset'] = self._current_offset + self._total_count, new_page = self._fetch_history( + self._request_url_path, self.filters) + + if new_page: + self._current_page = new_page + self._update_current_page_metadata() + return self._current_page + + def _update_current_page_metadata(self): + """Update all metadata about new current page.""" + self._current_offset += len(self._current_page) + self._pages.append(self._current_page) + + + @abc.abstractmethod + def _fetch_history(self, url_path: str, data: Dict) -> Tuple[int, List]: + """ + Request from server filtered history results. + :param url_path: Url to request new data from. + :param data: filtered data. + :return: Count of all results exits in filtered request and amount + analyses as requested. + """ + raise NotImplementedError() diff --git a/tests/unit/test_results.py b/tests/unit/test_results.py index a3b1563..5a8dc2f 100644 --- a/tests/unit/test_results.py +++ b/tests/unit/test_results.py @@ -7,6 +7,8 @@ import responses +from intezer_sdk.alerts import query_alerts_history +from intezer_sdk.alerts import ALERTS_SEARCH_REQUEST from intezer_sdk.analyses_history import ENDPOINT_ANALYSES_REQUEST from intezer_sdk.analyses_history import URL_ANALYSES_REQUEST from intezer_sdk.analyses_history import generate_analyses_history_filter @@ -31,6 +33,12 @@ def setUp(self): 'total_count': 2, 'analyses': [{'account_id': '123'}, {'account_id': '456'}] } + self.alerts_history_result = { + 'result': { + 'alerts_count': 2, + 'alerts': [{'account_id': '123'}, {'account_id': '456'}] + } + } self.no_result = {'total_count': 0, 'analyses': []} self.expected_result = copy.deepcopy(self.normal_result['analyses']) @@ -186,3 +194,16 @@ def test_url_analyses_history_happy_flow(self): ) for result in results: assert result + + def test_query_alerts_history(self): + """Simple usage of alerts history request using the SDK.""" + end_time = datetime.datetime.utcnow() + start_time = end_time - datetime.timedelta(days=3) + with self.add_mock_response(self.alerts_history_result, request_url_path=ALERTS_SEARCH_REQUEST): + results = query_alerts_history( + start_time=start_time, + end_time=end_time, + sources=["xsoar"] + ) + for result in results: + assert result From dae99667fff51dfed70dd83e4ebc97501314cccb Mon Sep 17 00:00:00 2001 From: Ofir Date: Tue, 26 Mar 2024 18:49:10 +0200 Subject: [PATCH 2/6] fix --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e41cf70..30901d2 100644 --- a/README.md +++ b/README.md @@ -255,7 +255,7 @@ alert = Alert.from_id(alert_id=alert_id, ```python history_results = query_file_analyses_history( api = , - filters** + **filters ) for analyse in history_results: print(analyse) From 98c1c6eb6620cee2036b88ebc57f545a06a50c30 Mon Sep 17 00:00:00 2001 From: Ofir Date: Tue, 26 Mar 2024 18:54:01 +0200 Subject: [PATCH 3/6] fix some text --- intezer_sdk/alerts_results.py | 6 +++++- intezer_sdk/analyses_results.py | 3 +++ intezer_sdk/history_results.py | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/intezer_sdk/alerts_results.py b/intezer_sdk/alerts_results.py index 78f0a00..7f94cf5 100644 --- a/intezer_sdk/alerts_results.py +++ b/intezer_sdk/alerts_results.py @@ -8,7 +8,11 @@ class AlertsHistoryResult(HistoryResult): + def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): + """ + Fetch all alerts history results from server. + """ super().__init__(request_url_path, api, filters) def _fetch_history(self, url_path: str, data: Dict @@ -18,7 +22,7 @@ def _fetch_history(self, url_path: str, data: Dict :param url_path: Url to request new data from. :param data: filtered data. :return: Count of all results exits in filtered request and amount - analyses as requested. + alerts as requested. """ response = self._api.request_with_refresh_expired_access_token( path=url_path, method='POST', data=data) diff --git a/intezer_sdk/analyses_results.py b/intezer_sdk/analyses_results.py index af57cb1..7aac5be 100644 --- a/intezer_sdk/analyses_results.py +++ b/intezer_sdk/analyses_results.py @@ -10,6 +10,9 @@ class AnalysesHistoryResult(HistoryResult): def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): + """ + Fetch all analyses history results from server. + """ super().__init__(request_url_path, api, filters) def _fetch_history(self, url_path: str, data: Dict) -> Tuple[int, List]: diff --git a/intezer_sdk/history_results.py b/intezer_sdk/history_results.py index e66b334..829c304 100644 --- a/intezer_sdk/history_results.py +++ b/intezer_sdk/history_results.py @@ -10,7 +10,7 @@ class HistoryResult: def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): """ - Fetch all analyses history results from server. + Fetch all history results from server. :param request_url_path: Url to request new filter from. :param api: Instance of Intezer API for request server. From 9ef99056b0fbd77d8b88a3f705af49a07754df38 Mon Sep 17 00:00:00 2001 From: Ofir Date: Wed, 27 Mar 2024 13:33:54 +0200 Subject: [PATCH 4/6] add add_filter func --- intezer_sdk/alerts.py | 107 ++++++++++++-------------------- intezer_sdk/analyses_history.py | 32 ++++------ intezer_sdk/util.py | 6 ++ 3 files changed, 56 insertions(+), 89 deletions(-) diff --git a/intezer_sdk/alerts.py b/intezer_sdk/alerts.py index 8d3716c..aed147a 100644 --- a/intezer_sdk/alerts.py +++ b/intezer_sdk/alerts.py @@ -24,6 +24,7 @@ from intezer_sdk.api import get_global_api from intezer_sdk import errors from intezer_sdk import consts +from intezer_sdk.util import add_filter DEFAULT_LIMIT = 100 DEFAULT_OFFSET = 0 @@ -84,66 +85,36 @@ def generate_alerts_history_search_filters(*, filters['start_time'] = start_time.timestamp() if end_time: filters['end_time'] = end_time.timestamp() - if environments: - filters['environments'] = environments - if offset is not None: - filters['offset'] = offset - if limit: - filters['limit'] = limit - if sources: - filters['sources'] = sources - if risk_categories: - filters['risk_categories'] = risk_categories - if alert_verdicts: - filters['alert_verdicts'] = alert_verdicts - if family_names: - filters['family_names'] = family_names - if response_statuses: - filters['response_statuses'] = response_statuses - if hostnames: - filters['hostnames'] = hostnames - if free_text: - filters['free_text'] = free_text - if site_name: - filters['site_name'] = site_name - if account_name: - filters['account_name'] = account_name - if exclude_alert_ids: - filters['exclude_alert_ids'] = exclude_alert_ids - if usernames: - filters['usernames'] = usernames - if file_hashes: - filters['file_hashes'] = file_hashes - if process_commandlines: - filters['process_commandlines'] = process_commandlines - if sort_by: - filters['sort_by'] = sort_by - if is_mitigated: - filters['is_mitigated'] = is_mitigated - if email_sender: - filters['email_sender'] = email_sender - if email_recipient: - filters['email_recipient'] = email_recipient - if email_subject: - filters['email_subject'] = email_subject - if email_cc: - filters['email_cc'] = email_cc - if email_bcc: - filters['email_bcc'] = email_bcc - if email_message_id: - filters['email_message_id'] = email_message_id - if email_reported_by: - filters['email_reported_by'] = email_reported_by - if device_private_ips: - filters['device_private_ips'] = device_private_ips - if device_external_ips: - filters['device_external_ips'] = device_external_ips - if device_ids: - filters['device_ids'] = device_ids - if time_filter_type: - filters['time_filter_type'] = time_filter_type - if sort_order: - filters['sort_order'] = sort_order + add_filter(filters, 'environments', environments) + add_filter(filters, 'offset', offset) + add_filter(filters, 'limit', limit) + add_filter(filters, 'sources', sources) + add_filter(filters, 'risk_categories', risk_categories) + add_filter(filters, 'alert_verdicts', alert_verdicts) + add_filter(filters, 'family_names', family_names) + add_filter(filters, 'response_statuses', response_statuses) + add_filter(filters, 'hostnames', hostnames) + add_filter(filters, 'free_text', free_text) + add_filter(filters, 'site_name', site_name) + add_filter(filters, 'account_name', account_name) + add_filter(filters, 'exclude_alert_ids', exclude_alert_ids) + add_filter(filters, 'usernames', usernames) + add_filter(filters, 'file_hashes', file_hashes) + add_filter(filters, 'process_commandlines', process_commandlines) + add_filter(filters, 'sort_by', sort_by) + add_filter(filters, 'is_mitigated', is_mitigated) + add_filter(filters, 'email_sender', email_sender) + add_filter(filters, 'email_recipient', email_recipient) + add_filter(filters, 'email_subject', email_subject) + add_filter(filters, 'email_cc', email_cc) + add_filter(filters, 'email_bcc', email_bcc) + add_filter(filters, 'email_message_id', email_message_id) + add_filter(filters, 'email_reported_by', email_reported_by) + add_filter(filters, 'device_private_ips', device_private_ips) + add_filter(filters, 'device_external_ips', device_external_ips) + add_filter(filters, 'device_ids', device_ids) + add_filter(filters, 'time_filter_type', time_filter_type) + add_filter(filters, 'sort_order', sort_order) return filters @@ -191,7 +162,7 @@ def query_alerts_history(*, :param start_time: Query alerts that were created after this timestamp (in UTC). :param end_time: Query alerts that were created before this timestamp (in UTC). :param api: Instance of Intezer API for request server. - :param sources: Query alerts only with these sources (Valid source can be taken from AlertSource). + :param sources: Query alerts only with these sources. :param risk_categories: Query alerts only with these risk categories. :param alert_verdicts: Query alerts only with these alert verdicts. :param family_names: Query alerts only with these family names. @@ -204,20 +175,20 @@ def query_alerts_history(*, :param usernames: Query alerts only with these usernames. :param file_hashes: Query alerts only with these file hashes. :param process_commandlines: Query alerts only with these process commandlines. - :param sort_by: Sort alerts only with this sort_by_key value. :param is_mitigated: Query alerts only with this is_mitigated value. :param email_sender: Query alerts only with these email sender. :param email_recipient: Query alerts only with these email recipient. :param email_subject: Query alerts only with this email subject. - :param email_cc: Sort alerts only with this email cc. - :param email_bcc: Sort alerts only with this email bcc. - :param email_message_id: Sort alerts only with this email message id. - :param email_reported_by: Sort alerts only with this email reported by. + :param email_cc: Query alerts only with this email cc. + :param email_bcc: Query alerts only with this email bcc. + :param email_message_id: Query alerts only with this email message id. + :param email_reported_by: Query alerts only with this email reported by. :param device_private_ips: Query alerts only with these private ips. :param device_external_ips: Query alerts only with these external ips. :param device_ids: Query alerts only with these device ids. - :param time_filter_type: The time value to filter alerts by. - :param sort_order: The order to sort the alerts by. + :param time_filter_type: The time value to filter alerts by (creation_time / triage_time / triage_change_time / triage_or_triage_change_time). + :param sort_order: The order to sort the alerts by (asc / desc). + :param sort_by: Sort alerts only with this sort_by_key value (CREATION_TIME / TRIAGE_TIME / TRIAGE_CHANGE_TIME ). :return: Alert query result from server as Results iterator. """ diff --git a/intezer_sdk/analyses_history.py b/intezer_sdk/analyses_history.py index dfa7e63..ebd8b40 100644 --- a/intezer_sdk/analyses_history.py +++ b/intezer_sdk/analyses_history.py @@ -6,6 +6,7 @@ from intezer_sdk.analyses_results import AnalysesHistoryResult from intezer_sdk.api import IntezerApiClient from intezer_sdk.api import get_global_api +from intezer_sdk.util import add_filter DEFAULT_LIMIT = 100 DEFAULT_OFFSET = 0 @@ -54,12 +55,9 @@ def query_file_analyses_history(*, limit=limit, offset=offset ) - if file_hash: - filters['hash'] = file_hash - if family_names: - filters['family_names'] = family_names - if file_name: - filters['file_name'] = file_name + add_filter(filters, 'hash', file_hash) + add_filter(filters, 'family_names', family_names) + add_filter(filters, 'file_name', file_name) return AnalysesHistoryResult(FILE_ANALYSES_REQUEST, api, filters) @@ -143,13 +141,9 @@ def query_url_analyses_history(*, limit=limit, offset=offset ) - - if did_download_file: - filters['did_download_file'] = did_download_file - if submitted_url: - filters['submitted_url'] = submitted_url - if sub_verdicts: - filters['sub_verdicts'] = sub_verdicts + add_filter(filters, 'did_download_file', did_download_file) + add_filter(filters, 'submitted_url', submitted_url) + add_filter(filters, 'sub_verdicts', sub_verdicts) return AnalysesHistoryResult(URL_ANALYSES_REQUEST, api, filters) @@ -170,12 +164,8 @@ def generate_analyses_history_filter(*, 'limit': limit, 'offset': offset } - if aggregated_view is not None: - base_filter['aggregated_view'] = aggregated_view - if sources: - base_filter['sources'] = sources - if verdicts: - base_filter['verdicts'] = verdicts - if computer_names: - base_filter['computer_names'] = computer_names + add_filter(base_filter, 'aggregated_view', aggregated_view) + add_filter(base_filter, 'sources', sources) + add_filter(base_filter, 'verdicts', verdicts) + add_filter(base_filter, 'computer_names', computer_names) return base_filter diff --git a/intezer_sdk/util.py b/intezer_sdk/util.py index 3a14c34..98b0b1c 100644 --- a/intezer_sdk/util.py +++ b/intezer_sdk/util.py @@ -8,6 +8,7 @@ from typing import List from typing import Optional from typing import Tuple +from typing import Any from intezer_sdk.analysis import FileAnalysis from intezer_sdk.consts import ANALYZE_URL @@ -222,3 +223,8 @@ def human_readable_size(num: int) -> str: def get_emoji(key: str): return emojis_by_key[key] + + +def add_filter(filters: dict, key: str, value: Any): + if value is not None: + filters[key] = value From 151596bd89bb8963c9328f70f772ba90d5ce79ff Mon Sep 17 00:00:00 2001 From: Ofir Date: Wed, 27 Mar 2024 13:47:20 +0200 Subject: [PATCH 5/6] fix dependencies --- intezer_sdk/analyses_history.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/intezer_sdk/analyses_history.py b/intezer_sdk/analyses_history.py index ebd8b40..dfa7e63 100644 --- a/intezer_sdk/analyses_history.py +++ b/intezer_sdk/analyses_history.py @@ -6,7 +6,6 @@ from intezer_sdk.analyses_results import AnalysesHistoryResult from intezer_sdk.api import IntezerApiClient from intezer_sdk.api import get_global_api -from intezer_sdk.util import add_filter DEFAULT_LIMIT = 100 DEFAULT_OFFSET = 0 @@ -55,9 +54,12 @@ def query_file_analyses_history(*, limit=limit, offset=offset ) - add_filter(filters, 'hash', file_hash) - add_filter(filters, 'family_names', family_names) - add_filter(filters, 'file_name', file_name) + if file_hash: + filters['hash'] = file_hash + if family_names: + filters['family_names'] = family_names + if file_name: + filters['file_name'] = file_name return AnalysesHistoryResult(FILE_ANALYSES_REQUEST, api, filters) @@ -141,9 +143,13 @@ def query_url_analyses_history(*, limit=limit, offset=offset ) - add_filter(filters, 'did_download_file', did_download_file) - add_filter(filters, 'submitted_url', submitted_url) - add_filter(filters, 'sub_verdicts', sub_verdicts) + + if did_download_file: + filters['did_download_file'] = did_download_file + if submitted_url: + filters['submitted_url'] = submitted_url + if sub_verdicts: + filters['sub_verdicts'] = sub_verdicts return AnalysesHistoryResult(URL_ANALYSES_REQUEST, api, filters) @@ -164,8 +170,12 @@ def generate_analyses_history_filter(*, 'limit': limit, 'offset': offset } - add_filter(base_filter, 'aggregated_view', aggregated_view) - add_filter(base_filter, 'sources', sources) - add_filter(base_filter, 'verdicts', verdicts) - add_filter(base_filter, 'computer_names', computer_names) + if aggregated_view is not None: + base_filter['aggregated_view'] = aggregated_view + if sources: + base_filter['sources'] = sources + if verdicts: + base_filter['verdicts'] = verdicts + if computer_names: + base_filter['computer_names'] = computer_names return base_filter From 1774cbb1f7a7638632e9b76b9b7db8401831d8f2 Mon Sep 17 00:00:00 2001 From: Ofir Date: Wed, 27 Mar 2024 14:25:19 +0200 Subject: [PATCH 6/6] add version --- intezer_sdk/__init__.py | 2 +- intezer_sdk/alerts_results.py | 3 +-- intezer_sdk/analyses_results.py | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/intezer_sdk/__init__.py b/intezer_sdk/__init__.py index 69532f3..0f2616a 100644 --- a/intezer_sdk/__init__.py +++ b/intezer_sdk/__init__.py @@ -1 +1 @@ -__version__ = '1.19.18' +__version__ = '1.20' diff --git a/intezer_sdk/alerts_results.py b/intezer_sdk/alerts_results.py index 7f94cf5..046ab21 100644 --- a/intezer_sdk/alerts_results.py +++ b/intezer_sdk/alerts_results.py @@ -15,8 +15,7 @@ def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): """ super().__init__(request_url_path, api, filters) - def _fetch_history(self, url_path: str, data: Dict - ) -> Tuple[int, List]: + def _fetch_history(self, url_path: str, data: Dict) -> Tuple[int, List]: """ Request from server filtered alerts history. :param url_path: Url to request new data from. diff --git a/intezer_sdk/analyses_results.py b/intezer_sdk/analyses_results.py index 7aac5be..6e0ae53 100644 --- a/intezer_sdk/analyses_results.py +++ b/intezer_sdk/analyses_results.py @@ -1,5 +1,4 @@ from typing import List -from typing import Any from typing import Dict from typing import Tuple