Skip to content

Commit

Permalink
Fix/get note to sdk (#24)
Browse files Browse the repository at this point in the history
* Add analysis summary
* Change S1 script
  • Loading branch information
matany90 authored Jan 3, 2022
1 parent 1a43607 commit d6487d8
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 116 deletions.
5 changes: 5 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
1.6
------
- Add analysis summary utility function
- Handle no ttps correctly

1.5
------
- Add family search
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ You can find more code examples under [analyze-python-sdk/examples/](https://git

## Changelog

### 1.6
- Feat: Add analysis summary utility function
- Fix: Handle no ttps correctly

### 1.5
- Feat: Add family search
- Feat: Support for zip password
- Feat: Add iocs and dynamic ttps to analysis
- Feat: Add capabilities to sub analysis

### 1.4.5
- Feat: Add a timeout option when waiting for operation completion

Expand Down
113 changes: 9 additions & 104 deletions examples/sentinel_one_integration.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
#!/usr/bin/env python3

import argparse
import collections
import datetime
import io
import logging
import secrets
import time
import urllib.parse
from http import HTTPStatus
from typing import List
from typing import Optional
from typing import Tuple

Expand All @@ -18,6 +16,7 @@
from intezer_sdk import api
from intezer_sdk import errors
from intezer_sdk.analysis import Analysis
from intezer_sdk.util import get_analysis_summary

_s1_session: Optional[requests.Session] = None
_logger = logging.getLogger('intezer')
Expand Down Expand Up @@ -140,105 +139,8 @@ def filter_threat(threat_info: dict) -> bool:
return threat_info['agentDetectionInfo']['agentOsName'].lower().startswith(('linux', 'windows'))


def get_analysis_family(analysis: Analysis, software_type_priorities: List[str]) -> Tuple[Optional[str], Optional[int]]:
result = analysis.result()
family_name = result.get('family_name')
if family_name:
reused_gene_count = get_analysis_family_by_family_id(analysis, result['family_id'])
return family_name, reused_gene_count

largest_family_by_software_type = find_largest_family(analysis)
for software_type in software_type_priorities:
if software_type in largest_family_by_software_type:
family = largest_family_by_software_type[software_type]
return family['family_name'], family['reused_gene_count']

return None, None


def get_analysis_family_by_family_id(analysis: Analysis, family_id: str) -> Optional[int]:
reused_gene_count = None
for sub_analysis in analysis.get_sub_analyses():
if not sub_analysis.code_reuse:
continue
for family in sub_analysis.code_reuse['families']:
if family['family_id'] == family_id:
reused_gene_count = family['reused_gene_count']
if reused_gene_count:
break
return reused_gene_count


def find_largest_family(analysis: Analysis) -> dict:
largest_family_by_software_type = collections.defaultdict(lambda: {'reused_gene_count': 0})
for sub_analysis in analysis.get_sub_analyses():
for family in sub_analysis.code_reuse['families']:
software_type = family['family_type']
if family['reused_gene_count'] > largest_family_by_software_type[software_type]['reused_gene_count']:
largest_family_by_software_type[software_type] = family
return largest_family_by_software_type


def human_readable_size(num: int) -> str:
for unit in ['', 'KB', 'MB', 'GB']:
if abs(num) < 1024.0:
return f'{num:3.1f}{unit}'
num /= 1024.0
return f'{num:.1f}GB'


def get_note(analysis: Analysis) -> str:
result = analysis.result()
metadata = analysis.get_root_analysis().metadata
verdict = result['verdict'].lower()
sub_verdict = result['sub_verdict'].lower()

note = (f'Intezer Analyze File Scan\n'
f'=========================\n\n')

if verdict == 'malicious':
emoji = '🧨'
main_family, gene_count = get_analysis_family(analysis, ['malware', 'malicious_packer'])
elif verdict == 'trusted':
emoji = '🟢'
main_family, gene_count = get_analysis_family(analysis, ['application', 'library', 'interpreter', 'installer'])
elif verdict == 'suspicious':
emoji = '⚠'
main_family, gene_count = get_analysis_family(analysis, ['administration_tool', 'packer'])
else:
emoji = '❔'
main_family = None
gene_count = None

note = f'{note}{emoji} {verdict.capitalize()}'

if verdict == 'suspicious' or verdict == 'unknown':
note = f'{note} - {sub_verdict.replace("_", " ").title()}'
if main_family:
note = f'{note} - {main_family}'
if gene_count:
note = f'{note} ({gene_count} shared code genes)'
note = f'{note}\n\nSize: {human_readable_size(metadata["size_in_bytes"])}\n'
if 'file_type' in metadata:
note = f'{note}File type: {metadata["file_type"]}\n'
if verdict == 'malicious':
iocs = len(analysis.iocs['files']) + len(analysis.iocs['network']) - 1

if iocs:
note = f'{note}IOCs: {iocs} IOCs\n'
dynamic_ttps = len(analysis.dynamic_ttps)

if dynamic_ttps:
note = f'{note}TTPs: {dynamic_ttps} techniques\n'

note = (f'{note}\nFull report\n'
f'👉{result["analysis_url"]}')

return note


def send_note(threat_id: str, analysis: Analysis):
note = get_note(analysis)
def send_note(threat_id: str, analysis: Analysis, no_emojis: bool):
note = get_analysis_summary(analysis, no_emojis)
response = _s1_session.post('/web/api/v2.1/threats/notes',
json={'data': {'text': note}, 'filter': {'ids': [threat_id]}})
assert_s1_response(response)
Expand All @@ -250,7 +152,7 @@ def send_failure_note(note: str, threat_id: str):
assert_s1_response(response)


def analyze_threat(intezer_api_key: str, s1_api_key: str, s1_base_address: str, threat_id: str, skip_ssl_verification: bool=True):
def analyze_threat(intezer_api_key: str, s1_api_key: str, s1_base_address: str, threat_id: str, skip_ssl_verification: bool=True, no_emojis: bool=False):
api.set_global_api(intezer_api_key)
init_s1_requests_session(s1_api_key, s1_base_address, skip_ssl_verification)
_logger.info(f'incoming threat: {threat_id}')
Expand Down Expand Up @@ -281,7 +183,7 @@ def analyze_threat(intezer_api_key: str, s1_api_key: str, s1_base_address: str,
analysis.wait_for_completion()
_logger.debug('analysis completed')

send_note(threat_id, analysis)
send_note(threat_id, analysis, no_emojis)
except Exception as ex:
send_failure_note(str(ex), threat_id)

Expand All @@ -297,6 +199,7 @@ def parse_argparse_args():
parser.add_argument('-t', '--threat-id', help='S1 threat id', required=True)
parser.add_argument('-sv', '--skip-ssl-verification', action='store_false',
help='Skipping SSL verification on S1 request')
parser.add_argument('-ne', '--no-emojis', action='store_true', help="Don't show emojis")

return parser.parse_args()

Expand All @@ -308,5 +211,7 @@ def parse_argparse_args():
args.s1_api_key,
args.s1_base_address,
args.threat_id,
args.skip_ssl_verification)
args.skip_ssl_verification,
args.no_emojis)


2 changes: 1 addition & 1 deletion intezer_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.5'
__version__ = '1.6'
10 changes: 9 additions & 1 deletion intezer_sdk/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import typing
from http import HTTPStatus

import requests

from intezer_sdk import consts
from intezer_sdk import errors
from intezer_sdk.api import IntezerApi
Expand Down Expand Up @@ -191,7 +193,13 @@ def iocs(self) -> dict:
def dynamic_ttps(self) -> dict:
self._assert_analysis_finished()
if not self._dynamic_ttps_report:
self._dynamic_ttps_report = self._api.get_dynamic_ttps(self.analysis_id).json()['result']
try:
self._dynamic_ttps_report = self._api.get_dynamic_ttps(self.analysis_id).json()['result']
except requests.HTTPError as e:
if e.response.status_code == HTTPStatus.NOT_FOUND:
self._dynamic_ttps_report = None
else:
raise

return self._dynamic_ttps_report

Expand Down
118 changes: 118 additions & 0 deletions intezer_sdk/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import collections
from typing import List
from typing import Optional
from typing import Tuple

from intezer_sdk.analysis import Analysis

emojis_by_key = {
'trusted': '✅',
'malicious': '🧨',
'suspicious': '⚠️',
'unknown': '❔',
'result_url': '👉'
}


def get_analysis_summary(analysis: Analysis, no_emojis: bool=False) -> str:
result = analysis.result()
metadata = analysis.get_root_analysis().metadata
verdict = result['verdict'].lower()
sub_verdict = result['sub_verdict'].lower()
emoji = ''

note = (f'Intezer Analyze File Scan\n'
f'=========================\n\n')

if not no_emojis:
emoji = get_emoji(verdict)

print(emoji)
if verdict == 'malicious':
main_family, gene_count = get_analysis_family(analysis, ['malware', 'malicious_packer'])
elif verdict == 'trusted':
main_family, gene_count = get_analysis_family(analysis, ['application', 'library', 'interpreter', 'installer'])
elif verdict == 'suspicious':
main_family, gene_count = get_analysis_family(analysis, ['administration_tool', 'packer'])
else:
main_family = None
gene_count = None

note = f'{note}{emoji} {verdict.capitalize()}'

if verdict in ('suspicious', 'unknown'):
note = f'{note} - {sub_verdict.replace("_", " ").title()}'
if main_family:
note = f'{note} - {main_family}'
if gene_count:
note = f'{note} ({gene_count} shared code genes)'
note = f'{note}\n\nSize: {human_readable_size(metadata["size_in_bytes"])}\n'
if 'file_type' in metadata:
note = f'{note}File type: {metadata["file_type"]}\n'

if verdict == 'malicious' or verdict == 'suspicious':
iocs = len(analysis.iocs['files']) + len(analysis.iocs['network']) - 1

if iocs:
note = f'{note}IOCs: {iocs} IOCs\n'

if analysis.dynamic_ttps:
note = f'{note}TTPs: {len(analysis.dynamic_ttps)} techniques\n'

note = (f'{note}\nFull report:\n'
f'{"" if no_emojis else get_emoji("result_url")} {result["analysis_url"]}')

return note


def get_analysis_family(analysis: Analysis, software_type_priorities: List[str]) -> Tuple[Optional[str], Optional[int]]:
result = analysis.result()
family_name = result.get('family_name')
if family_name:
reused_gene_count = get_analysis_family_by_family_id(analysis, result['family_id'])
return family_name, reused_gene_count

largest_family_by_software_type = find_largest_family(analysis)
for software_type in software_type_priorities:
if software_type in largest_family_by_software_type:
family = largest_family_by_software_type[software_type]
return family['family_name'], family['reused_gene_count']

return None, None


def get_analysis_family_by_family_id(analysis: Analysis, family_id: str) -> int:
reused_gene_count = 0

for sub_analysis in analysis.get_sub_analyses():
if not sub_analysis.code_reuse:
continue

for family in sub_analysis.code_reuse['families']:
if family['family_id'] == family_id:
if family['reused_gene_count'] > reused_gene_count:
reused_gene_count = family['reused_gene_count']

return reused_gene_count


def find_largest_family(analysis: Analysis) -> dict:
largest_family_by_software_type = collections.defaultdict(lambda: {'reused_gene_count': 0})
for sub_analysis in analysis.get_sub_analyses():
for family in sub_analysis.code_reuse['families']:
software_type = family['family_type']
if family['reused_gene_count'] > largest_family_by_software_type[software_type]['reused_gene_count']:
largest_family_by_software_type[software_type] = family
return largest_family_by_software_type


def human_readable_size(num: int) -> str:
for unit in ['', 'KB', 'MB', 'GB']:
if abs(num) < 1024.0:
return f'{num:3.1f}{unit}'
num /= 1024.0
return f'{num:.1f}GB'


def get_emoji(key: str):
return emojis_by_key[key]
Loading

0 comments on commit d6487d8

Please sign in to comment.