From d4fad0d895b993f16b1f9f0ecdea36c577673c51 Mon Sep 17 00:00:00 2001 From: davidt99 Date: Thu, 14 Apr 2022 11:15:04 +0300 Subject: [PATCH] feat: add url analysis support (#33) * feat: add url analysis support Co-authored-by: Jonathan Abrahamy --- CHANGES | 6 + README.md | 73 +++++- examples/analyze_by_file.py | 6 +- examples/analyze_by_hash.py | 8 +- examples/analyze_folder.py | 6 +- examples/analyze_url.py | 24 ++ examples/sentinel_one_integration.py | 8 +- intezer_sdk/__init__.py | 2 +- intezer_sdk/_util.py | 11 + intezer_sdk/analysis.py | 214 ++++++++--------- intezer_sdk/api.py | 42 +++- intezer_sdk/base_analysis.py | 117 ++++++++++ intezer_sdk/consts.py | 3 +- intezer_sdk/errors.py | 5 + intezer_sdk/util.py | 20 +- setup.py | 6 +- tests/unit/base_test.py | 18 +- tests/unit/test_analysis.py | 329 ++++++++++++++++++++------- tests/unit/test_family.py | 12 - tests/unit/test_index.py | 22 +- 20 files changed, 662 insertions(+), 270 deletions(-) create mode 100644 examples/analyze_url.py create mode 100644 intezer_sdk/_util.py create mode 100644 intezer_sdk/base_analysis.py diff --git a/CHANGES b/CHANGES index b8e1a41..235ff96 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,9 @@ +1.7.0 +------- +- Add UrlAnalysis +- `Analysis` was renamed to `FileAnalysis` +- Drop support for python 3.5, add support for python 3.10 + 1.6.4 - 1.6.10 ------- - Analysis summary utility improvements diff --git a/README.md b/README.md index ff7cc23..8b7e77c 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ Currently, the following options are available in the SDK: - Analyze by file - Analyze by SHA256 +- Analyze Url - Index by file - Index by SHA256 - Get Latest Analysis @@ -33,7 +34,7 @@ api.set_global_api('') ### Analyze By File ```python -analysis = Analysis(file_path=, +analysis = FileAnalysis(file_path=, dynamic_unpacking=, # optional static_unpacking=) # optional analysis.send(wait=True) @@ -41,12 +42,12 @@ result = analysis.result() ``` ### Analyze By SHA256 ```python -analysis = Analysis(file_hash=) +analysis = FileAnalysis(file_hash=) analysis.send(wait=True) result = analysis.result() ``` -### Analysis result example +### File Analysis result example ```python { 'analysis_id': '00000000-0000-0000-0000-000000000000', @@ -59,6 +60,66 @@ result = analysis.result() 'verdict': 'malicious' } ``` +### Analyze Url +```python +analysis = UrlAnalysis(url=) +analysis.send(wait=True) +result = analysis.result() +``` +### Url Analysis result example +```python +{ + 'analysis_id': '70d09f68-c7a3-43a3-a8de-07ec31fbf4ed', + 'domain_info': { + 'creation_date': '1997-08-13 04:00:00.000000', + 'domain_name': 'foo.com', + 'registrar': 'TUCOWS, INC.' + }, + 'indicators': [ + { + 'classification': 'informative', + 'text': 'URL is accessible' + }, + { + 'classification': 'informative', + 'text': 'Assigned IPv4 domain' + }, + { + 'classification': 'informative', + 'text': 'Vaild IPv4 domain' + } + ], + 'ip': '34.206.39.153', + 'redirect_chain': [ + { + 'response_status': 301, + 'url': 'https://foo.com/' + }, + { + 'response_status': 200, + 'url': 'http://www.foo.com/' + } + ], + 'scanned_url': 'http://www.foo.com/', + 'submitted_url': 'foo.com', + 'downloaded_file': { + 'analysis_id': '8db9a401-a142-41be-9a31-8e5f3642db62', + 'analysis_summary': { + 'verdict_description': 'This file contains code from malicious software, therefore it's very likely that it's malicious.', + 'verdict_name': 'malicious', + 'verdict_title': 'Malicious', + 'verdict_type': 'malicious' + }, + 'sha256': '4293c1d8574dc87c58360d6bac3daa182f64f7785c9d41da5e0741d2b1817fc7' + }, + 'summary': { + 'description': 'No suspicious activity was detected for this URL', + 'title': 'No Threats', + 'verdict_name': 'no_threats', + 'verdict_type': 'no_threats' + } +} +``` ### Index By File ```python from intezer_sdk import consts @@ -79,14 +140,14 @@ index.send(wait=True) index_id = index.index_id ``` -### Get Latest Analysis +### Get Latest File Analysis ```python analysis = get_latest_analysis(file_hash: ) result = analysis.result() ``` ### Get Sub Analyses -#### Root Analysis +#### Root File Analysis ```python root_analysis = analysis.get_root_analysis() ``` @@ -130,7 +191,7 @@ string_related_samples = operation.get_result() #### Wait with timeout ```python -analysis = Analysis(file_hash=) +analysis = FileAnalysis(file_hash=) analysis.send(wait=True, wait_timeout=datetime.timedelta(minutes=1)) ``` diff --git a/examples/analyze_by_file.py b/examples/analyze_by_file.py index 12d9c60..f6ef036 100644 --- a/examples/analyze_by_file.py +++ b/examples/analyze_by_file.py @@ -2,19 +2,19 @@ from pprint import pprint from intezer_sdk import api -from intezer_sdk.analysis import Analysis +from intezer_sdk.analysis import FileAnalysis def send_file_with_wait(file_path): api.set_global_api('') - analysis = Analysis(file_path=file_path) + analysis = FileAnalysis(file_path=file_path) analysis.send(wait=True) pprint(analysis.result()) def send_file_without_wait(file_path): api.set_global_api('') - analysis = Analysis(file_path=file_path) + analysis = FileAnalysis(file_path=file_path) analysis.send() analysis.wait_for_completion() pprint(analysis.result()) diff --git a/examples/analyze_by_hash.py b/examples/analyze_by_hash.py index 8b593dd..433cd4a 100644 --- a/examples/analyze_by_hash.py +++ b/examples/analyze_by_hash.py @@ -3,26 +3,26 @@ from pprint import pprint from intezer_sdk import api -from intezer_sdk.analysis import Analysis +from intezer_sdk.analysis import FileAnalysis def analysis_by_hash_with_wait(file_hash): # type: (str) -> None api.set_global_api('') - analysis = Analysis(file_hash=file_hash) + analysis = FileAnalysis(file_hash=file_hash) analysis.send(wait=True) pprint(analysis.result()) def analysis_by_hash_with_wait_timeout(file_hash): # type: (str) -> None api.set_global_api('') - analysis = Analysis(file_hash=file_hash) + analysis = FileAnalysis(file_hash=file_hash) analysis.send(wait=True, wait_timeout=datetime.timedelta(minutes=1)) pprint(analysis.result()) def analysis_by_hash_without_wait(file_hash): # type: (str) -> None api.set_global_api('') - analysis = Analysis(file_hash=file_hash) + analysis = FileAnalysis(file_hash=file_hash) analysis.send() analysis.wait_for_completion() pprint(analysis.result()) diff --git a/examples/analyze_folder.py b/examples/analyze_folder.py index d34285d..b8c5b09 100644 --- a/examples/analyze_folder.py +++ b/examples/analyze_folder.py @@ -2,13 +2,13 @@ import sys from intezer_sdk import api -from intezer_sdk.analysis import Analysis +from intezer_sdk.analysis import FileAnalysis API_KEY = os.environ.get('INTEZER_API_KEY') DIRECTORY_PATH = '' -def send_analysis(analysis: Analysis): +def send_analysis(analysis: FileAnalysis): analysis.send(wait=True) return analysis.result() @@ -16,7 +16,7 @@ def send_analysis(analysis: Analysis): def collect_suspicious_and_malicious_analyses() -> list: malicious_and_suspicious_analyses_results = [] file_paths = [file for file in os.listdir(DIRECTORY_PATH)] - analyses = [Analysis(os.path.join(DIRECTORY_PATH, path)) for path in file_paths if + analyses = [FileAnalysis(os.path.join(DIRECTORY_PATH, path)) for path in file_paths if os.path.isfile(os.path.join(DIRECTORY_PATH, path))] for analysis in analyses: diff --git a/examples/analyze_url.py b/examples/analyze_url.py new file mode 100644 index 0000000..170f7f1 --- /dev/null +++ b/examples/analyze_url.py @@ -0,0 +1,24 @@ +import sys +from pprint import pprint + +from intezer_sdk import api +from intezer_sdk.analysis import UrlAnalysis + + +def send_url_with_wait(url): + api.set_global_api('') + analysis = UrlAnalysis(url=url) + analysis.send(wait=True) + pprint(analysis.result()) + + +def send_url_without_wait(url): + api.set_global_api('') + analysis = UrlAnalysis(url=url) + analysis.send() + analysis.wait_for_completion() + pprint(analysis.result()) + + +if __name__ == '__main__': + send_file_with_wait(*sys.argv[1:]) diff --git a/examples/sentinel_one_integration.py b/examples/sentinel_one_integration.py index f9142c7..48a1198 100755 --- a/examples/sentinel_one_integration.py +++ b/examples/sentinel_one_integration.py @@ -19,7 +19,7 @@ from intezer_sdk import api from intezer_sdk import errors from intezer_sdk import util -from intezer_sdk.analysis import Analysis +from intezer_sdk.analysis import FileAnalysis _s1_session: Optional[requests.Session] = None _logger = logging.getLogger('intezer') @@ -65,7 +65,7 @@ def analyze_by_file(threat_id: str): download_url, zipp_password = fetch_file(threat_id) file = download_file(download_url) _logger.debug('starting to analyze file') - analysis = Analysis(file_stream=file, file_name=f'{threat_id}.zip', zip_password=zipp_password) + analysis = FileAnalysis(file_stream=file, file_name=f'{threat_id}.zip', zip_password=zipp_password) return analysis @@ -143,7 +143,7 @@ def filter_threat(threat_info: dict) -> bool: return threat_info['agentDetectionInfo']['agentOsName'].lower().startswith(('linux', 'windows')) -def send_note(threat_id: str, analysis: Analysis): +def send_note(threat_id: str, analysis: FileAnalysis): note = util.get_analysis_summary(analysis) response = _s1_session.post('/web/api/v2.1/threats/notes', @@ -174,7 +174,7 @@ def analyze_threat(threat_id: str, threat: dict = None): if file_hash: _logger.debug(f'trying to analyze by hash {file_hash}') try: - analysis = Analysis(file_hash=file_hash) + analysis = FileAnalysis(file_hash=file_hash) analysis.send() except errors.HashDoesNotExistError: _logger.debug(f'hash {file_hash} not found on server, fetching the file from endpoint') diff --git a/intezer_sdk/__init__.py b/intezer_sdk/__init__.py index 970bb6f..0e1a38d 100644 --- a/intezer_sdk/__init__.py +++ b/intezer_sdk/__init__.py @@ -1 +1 @@ -__version__ = '1.6.10' +__version__ = '1.7.0' diff --git a/intezer_sdk/_util.py b/intezer_sdk/_util.py new file mode 100644 index 0000000..df3b378 --- /dev/null +++ b/intezer_sdk/_util.py @@ -0,0 +1,11 @@ +import warnings + + +def deprecated(message: str): + def wrapper(func): + warnings.warn(message, + DeprecationWarning, + stacklevel=2) + return func + + return wrapper diff --git a/intezer_sdk/analysis.py b/intezer_sdk/analysis.py index 008e42f..5bdfcb9 100644 --- a/intezer_sdk/analysis.py +++ b/intezer_sdk/analysis.py @@ -1,45 +1,45 @@ -import datetime import logging import os -import time -import typing from http import HTTPStatus +from typing import BinaryIO +from typing import Optional import requests +from requests import Response from intezer_sdk import consts from intezer_sdk import errors +from intezer_sdk._util import deprecated from intezer_sdk.api import IntezerApi from intezer_sdk.api import get_global_api -from intezer_sdk.consts import AnalysisStatusCode -from intezer_sdk.consts import CodeItemType +from intezer_sdk.base_analysis import BaseAnalysis from intezer_sdk.sub_analysis import SubAnalysis logger = logging.getLogger(__name__) -class Analysis: +class FileAnalysis(BaseAnalysis): def __init__(self, file_path: str = None, file_hash: str = None, - file_stream: typing.BinaryIO = None, + file_stream: BinaryIO = None, disable_dynamic_unpacking: bool = None, disable_static_unpacking: bool = None, api: IntezerApi = None, file_name: str = None, code_item_type: str = None, - zip_password: str = None) -> None: + zip_password: str = None): + super().__init__(api) + if [file_path, file_hash, file_stream].count(None) != 2: raise ValueError('Choose between file hash, file stream or file path analysis') if file_hash and code_item_type: logger.warning('Analyze by hash ignores code item type') - if code_item_type and code_item_type not in [c.value for c in CodeItemType]: + if code_item_type and code_item_type not in [c.value for c in consts.CodeItemType]: raise ValueError('Invalid code item type, possible code item types are: file, memory module') - self.status = None - self.analysis_id = None self._file_hash = file_hash self._disable_dynamic_unpacking = disable_dynamic_unpacking self._disable_static_unpacking = disable_static_unpacking @@ -48,8 +48,6 @@ def __init__(self, self._file_name = file_name self._code_item_type = code_item_type self._zip_password = zip_password - self._report = None - self._api = api or get_global_api() self._sub_analyses = None self._root_analysis = None self._iocs_report = None @@ -65,94 +63,24 @@ def __init__(self, else: self._file_name = 'file.zip' - def send(self, - wait: typing.Union[bool, int] = False, - wait_timeout: typing.Optional[datetime.timedelta] = None, - **additional_parameters) -> None: - if self.analysis_id: - raise errors.AnalysisHasAlreadyBeenSent() + def _query_status_from_api(self) -> Response: + return self._api.get_file_analysis_response(self.analysis_id, False) + def _send_analyze_to_api(self, **additional_parameters) -> str: if self._file_hash: - self.analysis_id = self._api.analyze_by_hash(self._file_hash, - self._disable_dynamic_unpacking, - self._disable_static_unpacking, - **additional_parameters) - else: - self.analysis_id = self._api.analyze_by_file(self._file_path, - self._file_stream, - disable_dynamic_unpacking=self._disable_dynamic_unpacking, - disable_static_unpacking=self._disable_static_unpacking, - file_name=self._file_name, - code_item_type=self._code_item_type, - zip_password=self._zip_password, - **additional_parameters) - - self.status = consts.AnalysisStatusCode.CREATED - - if wait: - if isinstance(wait, int): - self.wait_for_completion(wait, sleep_before_first_check=True, timeout=wait_timeout) - else: - self.wait_for_completion(sleep_before_first_check=True, timeout=wait_timeout) - - def wait_for_completion(self, - interval: int = None, - sleep_before_first_check=False, - timeout: typing.Optional[datetime.timedelta] = None): - """ - Blocks until the analysis is completed - :param interval: The interval to wait between checks - :param sleep_before_first_check: Whether to sleep before the first status check - :param timeout: Maximum duration to wait for analysis completion - """ - start_time = datetime.datetime.utcnow() - if not interval: - interval = consts.CHECK_STATUS_INTERVAL - if self._is_analysis_running(): - if sleep_before_first_check: - time.sleep(interval) - status_code = self.check_status() - - while status_code != consts.AnalysisStatusCode.FINISH: - timeout_passed = timeout and datetime.datetime.utcnow() - start_time > timeout - if timeout_passed: - raise TimeoutError - time.sleep(interval) - status_code = self.check_status() - - def check_status(self): - if not self._is_analysis_running(): - raise errors.IntezerError('Analysis is not running') - - response = self._api.get_analysis_response(self.analysis_id) - if response.status_code == HTTPStatus.OK: - self._report = response.json()['result'] - self.status = consts.AnalysisStatusCode.FINISH - elif response.status_code == HTTPStatus.ACCEPTED: - self.status = consts.AnalysisStatusCode.IN_PROGRESS + return self._api.analyze_by_hash(self._file_hash, + self._disable_dynamic_unpacking, + self._disable_static_unpacking, + **additional_parameters) else: - raise errors.IntezerError('Error in response status code:{}'.format(response.status_code)) - - return self.status - - def result(self): - if self._is_analysis_running(): - raise errors.AnalysisIsStillRunning() - if not self._report: - raise errors.ReportDoesNotExistError() - - return self._report - - def set_report(self, report: dict): - if not report: - raise ValueError('Report can not be None') - - self.analysis_id = report['analysis_id'] - self._report = report - self.status = consts.AnalysisStatusCode.FINISH - - def _is_analysis_running(self): - return self.status in (consts.AnalysisStatusCode.CREATED, consts.AnalysisStatusCode.IN_PROGRESS) + return self._api.analyze_by_file(self._file_path, + self._file_stream, + disable_dynamic_unpacking=self._disable_dynamic_unpacking, + disable_static_unpacking=self._disable_static_unpacking, + file_name=self._file_name, + code_item_type=self._code_item_type, + zip_password=self._zip_password, + **additional_parameters) def get_sub_analyses(self): if self._sub_analyses is None and self.analysis_id: @@ -209,39 +137,99 @@ def dynamic_ttps(self) -> dict: return self._dynamic_ttps_report - def _assert_analysis_finished(self): - if self._is_analysis_running(): - raise errors.AnalysisIsStillRunning() - if self.status != AnalysisStatusCode.FINISH: - raise errors.IntezerError('Analysis not finished successfully') +def get_latest_analysis(file_hash: str, + api: IntezerApi = None, + private_only: bool = False, + **additional_parameters) -> Optional[FileAnalysis]: + api = api or get_global_api() + analysis_report = api.get_latest_analysis(file_hash, private_only, **additional_parameters) -def get_latest_analysis(file_hash: str, api: IntezerApi = None, private_only: bool = False) -> typing.Optional[Analysis]: + if not analysis_report: + return None + + analysis = FileAnalysis(file_hash=file_hash, api=api) + analysis.set_report(analysis_report) + + return analysis + + +def get_file_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> Optional[FileAnalysis]: api = api or get_global_api() - analysis_report = api.get_latest_analysis(file_hash, private_only) + response = api.get_file_analysis_response(analysis_id, True) + if response.status_code == HTTPStatus.NOT_FOUND: + return None + response_json = response.json() + _assert_analysis_status(response_json) + + analysis_report = response_json.get('result') if not analysis_report: return None - analysis = Analysis(file_hash=file_hash, api=api) + analysis = FileAnalysis(file_hash=analysis_report['sha256'], api=api) analysis.set_report(analysis_report) return analysis -def get_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> typing.Optional[Analysis]: +@deprecated('This method is deprecated, use get_file_analysis_by_id instead to be explict') +def get_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> Optional[FileAnalysis]: + return get_file_analysis_by_id(analysis_id, api) + + +Analysis = FileAnalysis + + +class UrlAnalysis(BaseAnalysis): + def __init__(self, url: str, api: IntezerApi = None): + super().__init__(api) + self.url = url + self._file_analysis: Optional[FileAnalysis] = None + + def _query_status_from_api(self) -> Response: + return self._api.get_url_analysis_response(self.analysis_id, False) + + def _send_analyze_to_api(self, **additional_parameters) -> str: + return self._api.analyze_url(self.url) + + @property + def downloaded_file_analysis(self) -> Optional[FileAnalysis]: + if self.status != consts.AnalysisStatusCode.FINISH: + raise + if self._file_analysis: + return self._file_analysis + + if 'downloaded_file' not in self._report: + return None + + file_analysis_id = self._report['downloaded_file']['analysis_id'] + self._file_analysis = get_file_analysis_by_id(file_analysis_id) + return self._file_analysis + + +def get_url_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> Optional[UrlAnalysis]: api = api or get_global_api() - response = api.get_analysis_response(analysis_id).json() + response = api.get_url_analysis_response(analysis_id, True) + if response.status_code == HTTPStatus.NOT_FOUND: + return None - if response['status'] in (AnalysisStatusCode.IN_PROGRESS.value, - AnalysisStatusCode.QUEUED.value): - raise errors.AnalysisIsStillRunning() + response_json = response.json() + _assert_analysis_status(response_json) - analysis_report = response.get('result') + analysis_report = response_json.get('result') if not analysis_report: return None - analysis = Analysis(file_hash=analysis_report['sha256'], api=api) + analysis = UrlAnalysis(analysis_report['submitted_url'], api=api) analysis.set_report(analysis_report) return analysis + + +def _assert_analysis_status(response: dict): + if response['status'] in (consts.AnalysisStatusCode.IN_PROGRESS.value, + consts.AnalysisStatusCode.QUEUED.value): + raise errors.AnalysisIsStillRunning() + if response['status'] == consts.AnalysisStatusCode.FAILED.value: + raise errors.AnalysisFailedError() diff --git a/intezer_sdk/api.py b/intezer_sdk/api.py index cd8f036..7c806ef 100644 --- a/intezer_sdk/api.py +++ b/intezer_sdk/api.py @@ -9,9 +9,10 @@ from intezer_sdk import consts from intezer_sdk import errors +from intezer_sdk._util import deprecated from intezer_sdk.consts import IndexType -_global_api = None +_global_api: typing.Optional['IntezerApi'] = None def raise_for_status(response: requests.Response, @@ -149,8 +150,11 @@ def analyze_by_file(self, with open(file_path, 'rb') as file_to_upload: return self._analyze_file_stream(file_to_upload, file_name, options) - def get_latest_analysis(self, file_hash: str, private_only: bool = False) -> typing.Optional[dict]: - options = {'should_get_only_private_analysis': private_only} + def get_latest_analysis(self, + file_hash: str, + private_only: bool = False, + **additional_parameters) -> typing.Optional[dict]: + options = {'should_get_only_private_analysis': private_only, **additional_parameters} response = self.request_with_refresh_expired_access_token(path='/files/{}'.format(file_hash), method='GET', data=options) @@ -162,10 +166,21 @@ def get_latest_analysis(self, file_hash: str, private_only: bool = False) -> typ return response.json()['result'] - def get_analysis_response(self, analyses_id: str) -> Response: + def get_file_analysis_response(self, analyses_id: str, ignore_not_found: bool) -> Response: response = self.request_with_refresh_expired_access_token(path='/analyses/{}'.format(analyses_id), method='GET') - raise_for_status(response) + self._assert_result_response(ignore_not_found, response) + + return response + + @deprecated('This method is deprecated, use get_file_analysis_response instead to be explict') + def get_analysis_response(self, analyses_id: str) -> Response: + return self.get_file_analysis_response(analyses_id, False) + + def get_url_analysis_response(self, analyses_id: str, ignore_not_found: bool) -> Response: + response = self.request_with_refresh_expired_access_token(path='/url/{}'.format(analyses_id), + method='GET') + self._assert_result_response(ignore_not_found, response) return response @@ -372,6 +387,19 @@ def set_session(self): self._session.headers['Authorization'] = 'Bearer {}'.format(self._access_token) self._session.headers['User-Agent'] = consts.USER_AGENT + def analyze_url(self, url: str, **additional_parameters) -> typing.Optional[str]: + response = self.request_with_refresh_expired_access_token(method='POST', + path='/url/', + data=dict(url=url, **additional_parameters)) + self._assert_analysis_response_status_code(response) + + return self._get_analysis_id_from_response(response) + + @staticmethod + def _assert_result_response(ignore_not_found: bool, response: Response): + statuses_to_ignore = [HTTPStatus.NOT_FOUND] if ignore_not_found else None + raise_for_status(response, statuses_to_ignore=statuses_to_ignore) + @staticmethod def _param_initialize(disable_dynamic_unpacking: bool, disable_static_unpacking: bool, @@ -401,6 +429,10 @@ def _assert_analysis_response_status_code(response: Response): raise errors.AnalysisIsAlreadyRunning(response) elif response.status_code == HTTPStatus.FORBIDDEN: raise errors.InsufficientQuota(response) + elif response.status_code == HTTPStatus.BAD_REQUEST: + data = response.json() + error = data.get('error', '') + raise errors.ServerError('Server returned bad request error: {}'.format(error), response) elif response.status_code != HTTPStatus.CREATED: raise errors.ServerError('Error in response status code:{}'.format(response.status_code), response) diff --git a/intezer_sdk/base_analysis.py b/intezer_sdk/base_analysis.py new file mode 100644 index 0000000..8c808e4 --- /dev/null +++ b/intezer_sdk/base_analysis.py @@ -0,0 +1,117 @@ +import abc +import datetime +import time +from http import HTTPStatus +from typing import Any +from typing import Dict +from typing import Optional +from typing import Union + +from requests import Response + +from intezer_sdk import consts +from intezer_sdk import errors +from intezer_sdk.api import IntezerApi +from intezer_sdk.api import get_global_api + + +class BaseAnalysis: + def __init__(self, api: IntezerApi = None): + self.status = None + self.analysis_id = None + self._api = api or get_global_api() + self._report: Optional[Dict[str, Any]] = None + + @abc.abstractmethod + def _query_status_from_api(self) -> Response: + raise NotImplementedError() + + @abc.abstractmethod + def _send_analyze_to_api(self, **additional_parameters) -> str: + raise NotImplementedError() + + def wait_for_completion(self, + interval: int = None, + sleep_before_first_check=False, + timeout: Optional[datetime.timedelta] = None): + """ + Blocks until the analysis is completed + :param interval: The interval to wait between checks + :param sleep_before_first_check: Whether to sleep before the first status check + :param timeout: Maximum duration to wait for analysis completion + """ + start_time = datetime.datetime.utcnow() + if not interval: + interval = consts.CHECK_STATUS_INTERVAL + if self._is_analysis_running(): + if sleep_before_first_check: + time.sleep(interval) + status_code = self.check_status() + + while status_code != consts.AnalysisStatusCode.FINISH: + timeout_passed = timeout and datetime.datetime.utcnow() - start_time > timeout + if timeout_passed: + raise TimeoutError + time.sleep(interval) + status_code = self.check_status() + + def _is_analysis_running(self) -> bool: + return self.status in (consts.AnalysisStatusCode.CREATED, consts.AnalysisStatusCode.IN_PROGRESS) + + def send(self, + wait: Union[bool, int] = False, + wait_timeout: Optional[datetime.timedelta] = None, + **additional_parameters) -> None: + if self.analysis_id: + raise errors.AnalysisHasAlreadyBeenSent() + + self.analysis_id = self._send_analyze_to_api(**additional_parameters) + + self.status = consts.AnalysisStatusCode.CREATED + + if wait: + if isinstance(wait, int): + self.wait_for_completion(wait, sleep_before_first_check=True, timeout=wait_timeout) + else: + self.wait_for_completion(sleep_before_first_check=True, timeout=wait_timeout) + + def check_status(self) -> consts.AnalysisStatusCode: + if not self._is_analysis_running(): + raise errors.IntezerError('Analysis is not running') + + response = self._query_status_from_api() + if response.status_code == HTTPStatus.OK: + result = response.json() + if result['status'] == consts.AnalysisStatusCode.FAILED.value: + self.status = consts.AnalysisStatusCode.FAILED + raise errors.IntezerError('Analysis failed') + self._report = result['result'] + self.status = consts.AnalysisStatusCode.FINISH + elif response.status_code == HTTPStatus.ACCEPTED: + self.status = consts.AnalysisStatusCode.IN_PROGRESS + else: + raise errors.IntezerError('Error in response status code:{}'.format(response.status_code)) + + return self.status + + def result(self) -> dict: + if self._is_analysis_running(): + raise errors.AnalysisIsStillRunning() + if not self._report: + raise errors.ReportDoesNotExistError() + + return self._report + + def set_report(self, report: dict): + if not report: + raise ValueError('Report can not be None') + + self.analysis_id = report['analysis_id'] + self._report = report + self.status = consts.AnalysisStatusCode.FINISH + + def _assert_analysis_finished(self): + if self._is_analysis_running(): + raise errors.AnalysisIsStillRunning() + if self.status != consts.AnalysisStatusCode.FINISH: + raise errors.IntezerError('Analysis not finished successfully') diff --git a/intezer_sdk/consts.py b/intezer_sdk/consts.py index 2d4729b..26b28de 100644 --- a/intezer_sdk/consts.py +++ b/intezer_sdk/consts.py @@ -7,6 +7,7 @@ class AnalysisStatusCode(Enum): CREATED = 'created' IN_PROGRESS = 'in_progress' QUEUED = 'queued' + FAILED = 'failed' FINISH = 'finished' @@ -36,7 +37,7 @@ class CodeItemType(Enum): ANALYZE_URL = 'https://analyze.intezer.com' -BASE_URL = f'{ANALYZE_URL}/api/' +BASE_URL = '{}/api/'.format(ANALYZE_URL) API_VERSION = 'v2-0' USER_AGENT = 'intezer-python-sdk-{}'.format(__version__) CHECK_STATUS_INTERVAL = 1 diff --git a/intezer_sdk/errors.py b/intezer_sdk/errors.py index db29337..f75a156 100644 --- a/intezer_sdk/errors.py +++ b/intezer_sdk/errors.py @@ -67,6 +67,11 @@ def __init__(self): super().__init__('Analysis is still running') +class AnalysisFailedError(IntezerError): + def __init__(self): + super().__init__('Analysis failed') + + class InvalidApiKey(ServerError): def __init__(self, response: requests.Response): super().__init__('Invalid api key', response) diff --git a/intezer_sdk/util.py b/intezer_sdk/util.py index c07a5da..cdeb889 100644 --- a/intezer_sdk/util.py +++ b/intezer_sdk/util.py @@ -4,7 +4,7 @@ from typing import Optional from typing import Tuple -from intezer_sdk.analysis import Analysis +from intezer_sdk.analysis import FileAnalysis from intezer_sdk.consts import ANALYZE_URL emojis_by_key = { @@ -18,12 +18,15 @@ def _get_title(short: bool) -> str: if short: - return 'Intezer Analysis: \n' - return (f'Intezer Analysis\n' - f'=========================\n\n') + return 'Intezer FileAnalysis: \n' + return ('Intezer FileAnalysis\n' + '=========================\n\n') -def get_analysis_summary(analysis: Analysis, no_emojis: bool = False, short: bool = False, use_hash_link=False) -> str: +def get_analysis_summary(analysis: FileAnalysis, + no_emojis: bool = False, + short: bool = False, + use_hash_link=False) -> str: result = analysis.result() metadata = analysis.get_root_analysis().metadata @@ -103,7 +106,8 @@ def get_analysis_summary(analysis: Analysis, no_emojis: bool = False, short: boo return note -def get_analysis_family(analysis: Analysis, software_type_priorities: List[str]) -> Tuple[Optional[str], Optional[int]]: +def get_analysis_family(analysis: FileAnalysis, + software_type_priorities: List[str]) -> Tuple[Optional[str], Optional[int]]: result = analysis.result() family_name = result.get('family_name') if family_name: @@ -119,7 +123,7 @@ def get_analysis_family(analysis: Analysis, software_type_priorities: List[str]) return None, None -def get_analysis_family_by_family_id(analysis: Analysis, family_id: str) -> int: +def get_analysis_family_by_family_id(analysis: FileAnalysis, family_id: str) -> int: reused_gene_count = 0 for sub_analysis in itertools.chain([analysis.get_root_analysis()], analysis.get_sub_analyses()): @@ -135,7 +139,7 @@ def get_analysis_family_by_family_id(analysis: Analysis, family_id: str) -> int: return reused_gene_count -def find_largest_family(analysis: Analysis) -> dict: +def find_largest_family(analysis: FileAnalysis) -> dict: largest_family_by_software_type = collections.defaultdict(lambda: {'reused_gene_count': 0}) for sub_analysis in itertools.chain([analysis.get_root_analysis()], analysis.get_sub_analyses()): if not sub_analysis.code_reuse: diff --git a/setup.py b/setup.py index d5b72b8..c3d3179 100644 --- a/setup.py +++ b/setup.py @@ -40,11 +40,11 @@ def rel(*xs): 'responses == 0.16.0', 'pytest == 6.2.5' ], - python_requires='!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*', + python_requires='!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*', classifiers=[ - 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9'] + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10'] ) diff --git a/tests/unit/base_test.py b/tests/unit/base_test.py index 0db19df..83515af 100644 --- a/tests/unit/base_test.py +++ b/tests/unit/base_test.py @@ -1,16 +1,22 @@ -import sys import unittest +import responses + from intezer_sdk import consts +from intezer_sdk.api import get_global_api +from intezer_sdk.api import set_global_api class BaseTest(unittest.TestCase): def setUp(self): self.full_url = consts.BASE_URL + consts.API_VERSION consts.CHECK_STATUS_INTERVAL = 0 + self.patch_prop = 'builtins.open' - # Python 2 support - if sys.version_info[0] < 3: - self.patch_prop = '__builtin__.open' - else: - self.patch_prop = 'builtins.open' + with responses.RequestsMock() as mock: + mock.add('POST', + url=self.full_url + '/get-access-token', + status=200, + json={'result': 'access-token'}) + set_global_api() + get_global_api().set_session() diff --git a/tests/unit/test_analysis.py b/tests/unit/test_analysis.py index 4f12350..8950566 100644 --- a/tests/unit/test_analysis.py +++ b/tests/unit/test_analysis.py @@ -1,5 +1,6 @@ import datetime import json +import uuid from http import HTTPStatus from unittest.mock import mock_open from unittest.mock import patch @@ -9,28 +10,17 @@ from intezer_sdk import consts from intezer_sdk import errors -from intezer_sdk.analysis import Analysis -from intezer_sdk.analysis import get_analysis_by_id +from intezer_sdk.analysis import FileAnalysis +from intezer_sdk.analysis import UrlAnalysis +from intezer_sdk.analysis import get_file_analysis_by_id from intezer_sdk.analysis import get_latest_analysis -from intezer_sdk.api import get_global_api -from intezer_sdk.api import set_global_api +from intezer_sdk.analysis import get_url_analysis_by_id from intezer_sdk.consts import AnalysisStatusCode from intezer_sdk.sub_analysis import SubAnalysis from tests.unit.base_test import BaseTest -class AnalysisSpec(BaseTest): - def setUp(self): - super(AnalysisSpec, self).setUp() - - with responses.RequestsMock() as mock: - mock.add('POST', - url=self.full_url + '/get-access-token', - status=200, - json={'result': 'access-token'}) - set_global_api() - get_global_api().set_session() - +class FileAnalysisSpec(BaseTest): def test_send_analysis_by_sha256_sent_analysis_and_sets_status(self): # Arrange with responses.RequestsMock() as mock: @@ -38,7 +28,7 @@ def test_send_analysis_by_sha256_sent_analysis_and_sets_status(self): url=self.full_url + '/analyze-by-hash', status=201, json={'result_url': 'a/sd/asd'}) - analysis = Analysis(file_hash='a' * 64) + analysis = FileAnalysis(file_hash='a' * 64) # Act analysis.send() @@ -53,7 +43,7 @@ def test_send_analysis_by_file_sent_analysis_and_sets_status(self): url=self.full_url + '/analyze', status=201, json={'result_url': 'a/sd/asd'}) - analysis = Analysis(file_path='a') + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act @@ -65,7 +55,7 @@ def test_send_analysis_by_file_sent_analysis_and_sets_status(self): def test_analysis_by_file_wrong_code_item_type(self): # Act + Assert with self.assertRaises(ValueError): - Analysis(file_path='a', code_item_type='anderson_paak') + FileAnalysis(file_path='a', code_item_type='anderson_paak') def test_analysis_by_file_correct_code_item_type(self): # Arrange @@ -74,8 +64,8 @@ def test_analysis_by_file_correct_code_item_type(self): url=self.full_url + '/analyze', status=201, json={'result_url': 'a/sd/asd'}) - analysis = Analysis(file_path='a', - code_item_type='memory_module') + analysis = FileAnalysis(file_path='a', + code_item_type='memory_module') with patch(self.patch_prop, mock_open(read_data='data')): # Act @@ -91,7 +81,7 @@ def test_send_analysis_by_file_with_file_stream_sent_analysis(self): url=self.full_url + '/analyze', status=201, json={'result_url': 'a/sd/asd'}) - analysis = Analysis(file_stream=__file__) + analysis = FileAnalysis(file_stream=__file__) # Act analysis.send() @@ -109,8 +99,8 @@ def test_send_analysis_by_file_sends_analysis_with_waits_to_compilation_when_req mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) - analysis = Analysis(file_path='a') + json={'result': 'report', 'status': 'succeeded'}) + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act @@ -129,8 +119,8 @@ def test_send_analysis_by_file_sends_analysis_and_waits_specific_time_until_comp mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) - analysis = Analysis(file_path='a') + json={'result': 'report', 'status': 'succeeded'}) + analysis = FileAnalysis(file_path='a') wait = 1 with patch(self.patch_prop, mock_open(read_data='data')): @@ -153,8 +143,8 @@ def test_send_analysis_by_file_sent_analysis_without_wait_and_get_status_finish( mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) - analysis = Analysis(file_path='a') + json={'result': 'report', 'status': 'succeeded'}) + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act @@ -180,8 +170,8 @@ def test_send_analysis_by_file_sent_analysis_with_pulling_and_get_status_finish( mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) - analysis = Analysis(file_path='a') + json={'result': 'report', 'status': 'succeeded'}) + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act @@ -203,8 +193,8 @@ def test_send_analysis_by_file_and_get_report(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) - analysis = Analysis(file_path='a') + json={'result': 'report', 'status': 'succeeded'}) + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act analysis.send(wait=True) @@ -223,12 +213,12 @@ def test_send_analysis_by_file_and_get_iocs(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) + json={'result': 'report', 'status': 'succeeded'}) mock.add('GET', url=self.full_url + '/analyses/asd/iocs', status=200, json={'result': 'ioc_report'}) - analysis = Analysis(file_path='a') + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act analysis.send(wait=True) @@ -248,12 +238,12 @@ def test_send_analysis_by_file_and_get_dynamic_ttps(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) + json={'result': 'report', 'status': 'succeeded'}) mock.add('GET', url=self.full_url + '/analyses/asd/dynamic-ttps', status=200, json={'result': 'ttps_report'}) - analysis = Analysis(file_path='a') + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act analysis.send(wait=True) @@ -273,11 +263,11 @@ def test_send_analysis_by_file_and_get_dynamic_ttps_handle_no_ttps(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) + json={'result': 'report', 'status': 'succeeded'}) mock.add('GET', url=self.full_url + '/analyses/asd/dynamic-ttps', status=404) - analysis = Analysis(file_path='a') + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act analysis.send(wait=True) @@ -293,11 +283,11 @@ def test_send_analysis_by_file_and_get_dynamic_ttps_handle_no_ttps2(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) + json={'result': 'report', 'status': 'succeeded'}) mock.add('GET', url=self.full_url + '/analyses/asd/dynamic-ttps', status=405) - analysis = Analysis(file_path='a') + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act analysis.send(wait=True) @@ -314,12 +304,12 @@ def test_send_analysis_by_file_and_get_dynamic_ttps_handle_no_iocs(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) + json={'result': 'report', 'status': 'succeeded'}) mock.add('GET', url=self.full_url + '/analyses/asd/iocs', status=405, json={'result': 'ioc_report'}) - analysis = Analysis(file_path='a') + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act analysis.send(wait=True) @@ -336,18 +326,17 @@ def test_send_analysis_by_file_and_get_dynamic_ttps_handle_no_iocs2(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) + json={'result': 'report', 'status': 'succeeded'}) mock.add('GET', url=self.full_url + '/analyses/asd/iocs', status=404, json={'result': 'ioc_report'}) - analysis = Analysis(file_path='a') + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act analysis.send(wait=True) self.assertIsNone(analysis.iocs) - def test_send_analysis_by_file_with_disable_unpacking(self): # Arrange with responses.RequestsMock() as mock: @@ -358,10 +347,10 @@ def test_send_analysis_by_file_with_disable_unpacking(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) - analysis = Analysis(file_path='a', - disable_dynamic_unpacking=True, - disable_static_unpacking=True) + json={'result': 'report', 'status': 'succeeded'}) + analysis = FileAnalysis(file_path='a', + disable_dynamic_unpacking=True, + disable_static_unpacking=True) with patch(self.patch_prop, mock_open(read_data='data')): # Act analysis.send(wait=True) @@ -385,10 +374,10 @@ def test_send_analysis_by_file_with_zip_password(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) - analysis = Analysis(file_path='a', - file_name='b.zip', - zip_password='asd') + json={'result': 'report', 'status': 'succeeded'}) + analysis = FileAnalysis(file_path='a', + file_name='b.zip', + zip_password='asd') with patch(self.patch_prop, mock_open(read_data='data')): # Act @@ -413,9 +402,9 @@ def test_send_analysis_by_file_with_zip_password_set_filename_to_generic_one(sel mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) - analysis = Analysis(file_stream=__file__, - zip_password='asd') + json={'result': 'report', 'status': 'succeeded'}) + analysis = FileAnalysis(file_stream=__file__, + zip_password='asd') with patch(self.patch_prop, mock_open(read_data='data')): # Act @@ -440,9 +429,9 @@ def test_send_analysis_by_file_with_zip_password_adds_zip_extension(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) - analysis = Analysis(file_path='a', - zip_password='asd') + json={'result': 'report', 'status': 'succeeded'}) + analysis = FileAnalysis(file_path='a', + zip_password='asd') with patch(self.patch_prop, mock_open(read_data='data')): # Act @@ -463,16 +452,16 @@ def test_send_analysis_by_sha256_that_dont_exist_raise_error(self): mock.add('POST', url=self.full_url + '/analyze-by-hash', status=404) - analysis = Analysis(file_hash='a' * 64) + analysis = FileAnalysis(file_hash='a' * 64) # Act + Assert with self.assertRaises(errors.HashDoesNotExistError): analysis.send() def test_send_analysis_by_sha256_with_expired_jwt_token_gets_new_token(self): # Arrange - analysis = Analysis(file_hash='a' * 64) + analysis = FileAnalysis(file_hash='a' * 64) - # Analysis attempt will initiate an access-token refresh by getting UNAUTHORIZED 401 + # FileAnalysis attempt will initiate an access-token refresh by getting UNAUTHORIZED 401 with responses.RequestsMock() as mock: def request_callback(request): if request.headers['Authorization'] == 'Bearer newer-access-token': @@ -501,7 +490,7 @@ def test_send_analysis_by_sha256_with_expired_jwt_token_doesnt_loop_indefinitley status=HTTPStatus.OK, json={'result': 'newer-access-token'}) - analysis = Analysis(file_hash='a' * 64) + analysis = FileAnalysis(file_hash='a' * 64) # Act & Assert with self.assertRaises(errors.IntezerError): @@ -517,7 +506,7 @@ def test_send_analysis_while_running_raise_error(self): url=self.full_url + '/analyze-by-hash', status=201, json={'result_url': 'a/sd/asd'}) - analysis = Analysis(file_hash='a' * 64) + analysis = FileAnalysis(file_hash='a' * 64) # Act + Assert with self.assertRaises(errors.AnalysisHasAlreadyBeenSent): analysis.send() @@ -536,7 +525,7 @@ def test_send_analysis_and_get_sub_analyses(self): json={'sub_analyses': [{'source': 'root', 'sub_analysis_id': 'ab', 'sha256': 'axaxaxax'}, {'source': 'static_extraction', 'sub_analysis_id': 'ac', 'sha256': 'ba'}]}) - analysis = Analysis(file_hash='a' * 64) + analysis = FileAnalysis(file_hash='a' * 64) # Act analysis.send() @@ -561,7 +550,7 @@ def test_send_analysis_and_get_root_analyses(self): json={'sub_analyses': [{'source': 'root', 'sub_analysis_id': 'ab', 'sha256': 'axaxaxax'}, {'source': 'static_extraction', 'sub_analysis_id': 'ac', 'sha256': 'ba'}]}) - analysis = Analysis(file_hash='a' * 64) + analysis = FileAnalysis(file_hash='a' * 64) # Act analysis.send() @@ -598,7 +587,7 @@ def test_send_analysis_and_sub_analyses_metadata_and_code_reuse(self): url=self.full_url + '/analyses/asd/sub-analyses/ac/metadata', status=200, json={}) - analysis = Analysis(file_hash='a' * 64) + analysis = FileAnalysis(file_hash='a' * 64) # Act analysis.send() @@ -688,7 +677,7 @@ def test_send_analysis_that_running_on_server_raise_error(self): url=self.full_url + '/analyze-by-hash', status=409, json={'result_url': 'a/sd/asd'}) - analysis = Analysis(file_hash='a' * 64) + analysis = FileAnalysis(file_hash='a' * 64) # Act + Assert with self.assertRaises(errors.AnalysisIsAlreadyRunning): analysis.send() @@ -696,33 +685,33 @@ def test_send_analysis_that_running_on_server_raise_error(self): def test_analysis_raise_value_error_when_no_file_option_given(self): # Assert with self.assertRaises(ValueError): - Analysis() + FileAnalysis() def test_analysis_by_sha256_and_file_sent_analysis_and_raise_value_error(self): # Assert with self.assertRaises(ValueError): - Analysis(file_hash='a', file_path='/test/test') + FileAnalysis(file_hash='a', file_path='/test/test') def test_analysis_by_sha256_raise_value_error_when_file_path_and_file_stream_given(self): # Assert with self.assertRaises(ValueError): - Analysis(file_stream=__file__, file_path='/test/test') + FileAnalysis(file_stream=__file__, file_path='/test/test') def test_analysis_by_sha256_raise_value_error_when_sha256_file_path_and_file_stream_given(self): # Assert with self.assertRaises(ValueError): - Analysis(file_hash='a', file_stream=__file__, file_path='/test/test') + FileAnalysis(file_hash='a', file_stream=__file__, file_path='/test/test') def test_analysis_get_report_for_not_finish_analyze_raise_error(self): # Arrange - analysis = Analysis(file_hash='a') + analysis = FileAnalysis(file_hash='a') # Act + Assert with self.assertRaises(errors.ReportDoesNotExistError): analysis.result() def test_analysis_check_status_before_send_raise_error(self): # Arrange - analysis = Analysis(file_hash='a') + analysis = FileAnalysis(file_hash='a') # Act + Assert with self.assertRaises(errors.IntezerError): @@ -738,8 +727,8 @@ def test_analysis_check_status_after_analysis_finish_raise_error(self): mock.add('GET', url=self.full_url + '/analyses/asd', status=200, - json={'result': 'report'}) - analysis = Analysis(file_path='a') + json={'result': 'report', 'status': 'succeeded'}) + analysis = FileAnalysis(file_path='a') with patch(self.patch_prop, mock_open(read_data='data')): # Act @@ -793,7 +782,7 @@ def test_get_analysis_by_id_analysis_object_when_latest_analysis_found(self): json={'result': analysis_report, 'status': 'succeeded'}) # Act - analysis = get_analysis_by_id(analysis_id) + analysis = get_file_analysis_by_id(analysis_id) self.assertIsNotNone(analysis) self.assertEqual(analysis_id, analysis.analysis_id) @@ -803,7 +792,6 @@ def test_get_analysis_by_id_analysis_object_when_latest_analysis_found(self): def test_get_analysis_by_id_raises_when_analysis_is_not_finished(self): # Arrange analysis_id = 'analysis_id' - analysis_report = {'analysis_id': analysis_id, 'sha256': 'hash'} with responses.RequestsMock() as mock: mock.add('GET', @@ -813,12 +801,11 @@ def test_get_analysis_by_id_raises_when_analysis_is_not_finished(self): # Act with self.assertRaises(errors.AnalysisIsStillRunning): - _ = get_analysis_by_id(analysis_id) + get_file_analysis_by_id(analysis_id) def test_get_analysis_by_id_raises_when_analysis_is_queued(self): # Arrange analysis_id = 'analysis_id' - analysis_report = {'analysis_id': analysis_id, 'sha256': 'hash'} with responses.RequestsMock() as mock: mock.add('GET', @@ -828,4 +815,184 @@ def test_get_analysis_by_id_raises_when_analysis_is_queued(self): # Act with self.assertRaises(errors.AnalysisIsStillRunning): - analysis = get_analysis_by_id(analysis_id) + get_file_analysis_by_id(analysis_id) + + +class UrlAnalysisSpec(BaseTest): + def test_get_analysis_by_id_analysis_object_when_latest_analysis_found(self): + # Arrange + analysis_id = 'analysis_id' + analysis_report = {'analysis_id': analysis_id, 'submitted_url': 'https://url.com'} + + with responses.RequestsMock() as mock: + mock.add('GET', + url='{}/url/{}'.format(self.full_url, analysis_id), + status=200, + json={'result': analysis_report, 'status': 'succeeded'}) + + # Act + analysis = get_url_analysis_by_id(analysis_id) + + self.assertIsNotNone(analysis) + self.assertEqual(analysis_id, analysis.analysis_id) + self.assertEqual(consts.AnalysisStatusCode.FINISH, analysis.status) + self.assertDictEqual(analysis_report, analysis.result()) + + def test_get_analysis_by_id_raises_when_analysis_is_not_finished(self): + # Arrange + analysis_id = 'analysis_id' + + with responses.RequestsMock() as mock: + mock.add('GET', + url='{}/url/{}'.format(self.full_url, analysis_id), + status=202, + json={'status': AnalysisStatusCode.IN_PROGRESS.value}) + + # Act + with self.assertRaises(errors.AnalysisIsStillRunning): + get_url_analysis_by_id(analysis_id) + + def test_get_analysis_by_id_raises_when_analysis_failed(self): + # Arrange + analysis_id = 'analysis_id' + + with responses.RequestsMock() as mock: + mock.add('GET', + url='{}/url/{}'.format(self.full_url, analysis_id), + status=200, + json={'status': AnalysisStatusCode.FAILED.value}) + + # Act + with self.assertRaises(errors.AnalysisFailedError): + get_url_analysis_by_id(analysis_id) + + def test_send_perform_request_and_sets_analysis_status(self): + # Arrange + analysis_id = str(uuid.uuid4()) + with responses.RequestsMock() as mock: + mock.add('POST', + url=self.full_url + '/url/', + status=201, + json={'result_url': '/url/{}'.format(analysis_id)}) + analysis = UrlAnalysis(url='https://intezer.com') + + # Act + analysis.send() + + # Assert + self.assertEqual(analysis.status, consts.AnalysisStatusCode.CREATED) + self.assertEqual(analysis_id, analysis.analysis_id) + + def test_send_fail_when_invalid_url(self): + # Arrange + with responses.RequestsMock() as mock: + mock.add('POST', + url=self.full_url + '/url/', + status=400, + json={'error': 'Some error description'}) + analysis = UrlAnalysis(url='httpdddds://intezer.com') + + # Act + with self.assertRaises(errors.ServerError): + analysis.send() + + def test_send_waits_to_compilation_when_requested(self): + # Arrange + analysis_id = str(uuid.uuid4()) + + with responses.RequestsMock() as mock: + mock.add('POST', + url=self.full_url + '/url/', + status=201, + json={'result_url': '/url/{}'.format(analysis_id)}) + result = {'analysis_id': analysis_id} + mock.add('GET', + url='{}/url/{}'.format(self.full_url, analysis_id), + status=200, + json={'result': result, 'status': 'succeeded'}) + analysis = UrlAnalysis('https://intezer.com') + + # Act + analysis.send(wait=True) + + # Assert + self.assertEqual(analysis.status, consts.AnalysisStatusCode.FINISH) + self.assertDictEqual(analysis.result(), result) + + def test_send_waits_to_compilation_when_requested_and_handles_failure(self): + # Arrange + analysis_id = str(uuid.uuid4()) + + with responses.RequestsMock() as mock: + mock.add('POST', + url=self.full_url + '/url/', + status=201, + json={'result_url': '/url/{}'.format(analysis_id)}) + result = {'analysis_id': analysis_id} + mock.add('GET', + url='{}/url/{}'.format(self.full_url, analysis_id), + status=200, + json={'result': result, 'status': 'failed'}) + analysis = UrlAnalysis('https://intezer.com') + + with self.assertRaises(errors.IntezerError): + # Act + analysis.send(wait=True) + + # Assert + self.assertEqual(consts.AnalysisStatusCode.FAILED, analysis.status) + + def test_url_analysis_references_file_analysis(self): + # Arrange + url_analysis_id = str(uuid.uuid4()) + file_analysis_id = str(uuid.uuid4()) + url_result = {'analysis_id': url_analysis_id, + 'downloaded_file': { + 'analysis_id': file_analysis_id + }} + file_analysis_report = {'analysis_id': file_analysis_id, 'sha256': 'hash'} + + with responses.RequestsMock() as mock: + mock.add('POST', + url=self.full_url + '/url/', + status=201, + json={'result_url': '/url/{}'.format(url_analysis_id)}) + mock.add('GET', + url='{}/url/{}'.format(self.full_url, url_analysis_id), + status=200, + json={'result': url_result, 'status': 'succeeded'}) + mock.add('GET', + url='{}/analyses/{}'.format(self.full_url, file_analysis_id), + status=200, + json={'result': file_analysis_report, 'status': 'succeeded'}) + analysis = UrlAnalysis('https://intezer.com') + + # Act + analysis.send(wait=True) + file_analysis = analysis.downloaded_file_analysis + + # Assert + self.assertEqual(file_analysis.analysis_id, file_analysis_id) + self.assertEqual(file_analysis.result(), file_analysis_report) + + def test_url_analysis_doesnt_reference_file_analysis_when_not_exists(self): + # Arrange + analysis_id = str(uuid.uuid4()) + result = {'analysis_id': analysis_id} + + with responses.RequestsMock() as mock: + mock.add('POST', + url=self.full_url + '/url/', + status=201, + json={'result_url': '/url/{}'.format(analysis_id)}) + mock.add('GET', + url='{}/url/{}'.format(self.full_url, analysis_id), + status=200, + json={'result': result, 'status': 'succeeded'}) + analysis = UrlAnalysis('https://intezer.com') + + # Act + analysis.send(wait=True) + + # Assert + self.assertIsNone(analysis.downloaded_file_analysis) diff --git a/tests/unit/test_family.py b/tests/unit/test_family.py index d386332..c367374 100644 --- a/tests/unit/test_family.py +++ b/tests/unit/test_family.py @@ -4,24 +4,12 @@ import responses from intezer_sdk import errors -from intezer_sdk.api import get_global_api -from intezer_sdk.api import set_global_api from intezer_sdk.family import Family from intezer_sdk.family import get_family_by_name from tests.unit.base_test import BaseTest class FamilySpec(BaseTest): - def setUp(self): - super(FamilySpec, self).setUp() - - with responses.RequestsMock() as mock: - mock.add('POST', - url=self.full_url + '/get-access-token', - status=200, - json={'result': 'access-token'}) - set_global_api() - get_global_api().set_session() def test_access_to_family_name_fetches_the_data_from_cloud(self): # Arrange diff --git a/tests/unit/test_index.py b/tests/unit/test_index.py index d97e08a..c74edc5 100644 --- a/tests/unit/test_index.py +++ b/tests/unit/test_index.py @@ -1,34 +1,16 @@ import datetime +from unittest.mock import mock_open +from unittest.mock import patch import responses from intezer_sdk import consts from intezer_sdk import errors -from intezer_sdk.api import get_global_api -from intezer_sdk.api import set_global_api from intezer_sdk.index import Index from tests.unit.base_test import BaseTest -try: - from unittest.mock import mock_open - from unittest.mock import patch -except ImportError: - from mock import mock_open - from mock import patch - class IndexSpec(BaseTest): - def setUp(self): - super(IndexSpec, self).setUp() - - with responses.RequestsMock() as mock: - mock.add('POST', - url=self.full_url + '/get-access-token', - status=200, - json={'result': 'access-token'}) - set_global_api() - get_global_api().set_session() - def test_index_malicious_without_family_name_raise_value_error(self): # Act + Assert with self.assertRaises(ValueError):