-
Notifications
You must be signed in to change notification settings - Fork 225
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: converted hash plugin to new base class
also moved methods from helperFunctions.hash that were used exclusively in the plugin into the plugin
- Loading branch information
Showing
5 changed files
with
163 additions
and
174 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,108 +1,39 @@ | ||
from __future__ import annotations | ||
|
||
import contextlib | ||
import logging | ||
import sys | ||
from hashlib import md5, new | ||
from typing import TYPE_CHECKING | ||
from hashlib import new | ||
|
||
import lief | ||
import ssdeep | ||
import tlsh | ||
|
||
from helperFunctions.data_conversion import make_bytes | ||
|
||
if TYPE_CHECKING: | ||
from objects.file import FileObject | ||
|
||
ELF_MIME_TYPES = [ | ||
'application/x-executable', | ||
'application/x-object', | ||
'application/x-pie-executable', | ||
'application/x-sharedlib', | ||
] | ||
|
||
|
||
def get_hash(hash_function, binary): | ||
def get_hash(hash_function: str, binary: bytes | str) -> str: | ||
""" | ||
Hashes binary with hash_function. | ||
:param hash_function: The hash function to use. See hashlib for more | ||
:param binary: The data to hash, either as string or array of Integers | ||
:return: The hash as hexstring | ||
:return: The hash as hex string | ||
""" | ||
binary = make_bytes(binary) | ||
raw_hash = new(hash_function) | ||
raw_hash.update(binary) | ||
raw_hash.update(make_bytes(binary)) | ||
return raw_hash.hexdigest() | ||
|
||
|
||
def get_sha256(code): | ||
def get_sha256(code: bytes | str) -> str: | ||
return get_hash('sha256', code) | ||
|
||
|
||
def get_md5(code): | ||
def get_md5(code: bytes | str) -> str: | ||
return get_hash('md5', code) | ||
|
||
|
||
def get_ssdeep(code): | ||
binary = make_bytes(code) | ||
raw_hash = ssdeep.Hash() | ||
raw_hash.update(binary) | ||
return raw_hash.digest() | ||
|
||
|
||
def get_tlsh(code): | ||
tlsh_hash = tlsh.hash(make_bytes(code)) | ||
return tlsh_hash if tlsh_hash != 'TNULL' else '' | ||
|
||
|
||
def get_tlsh_comparison(first, second): | ||
return tlsh.diff(first, second) | ||
|
||
|
||
def get_imphash(file_object: FileObject) -> str | None: | ||
""" | ||
Generates and returns the md5 hash of the (sorted) imported functions of an ELF file represented by `file_object`. | ||
Returns `None` if there are no imports or if an exception occurs. | ||
:param file_object: The FileObject of which the imphash shall be computed | ||
""" | ||
if _is_elf_file(file_object): | ||
try: | ||
with _suppress_stdout(): | ||
functions = [f.name for f in lief.ELF.parse(file_object.file_path).imported_functions] | ||
if functions: | ||
return md5(','.join(sorted(functions)).encode()).hexdigest() | ||
except Exception: | ||
logging.exception(f'Could not compute imphash for {file_object.file_path}') | ||
return None | ||
|
||
|
||
def _is_elf_file(file_object: FileObject) -> bool: | ||
return file_object.processed_analysis['file_type']['result']['mime'] in ELF_MIME_TYPES | ||
|
||
|
||
def normalize_lief_items(functions): | ||
""" | ||
Shorthand to convert a list of objects to a list of strings | ||
""" | ||
return [str(function) for function in functions] | ||
|
||
|
||
class _StandardOutWriter: | ||
def write(self, _): | ||
pass | ||
|
||
|
||
@contextlib.contextmanager | ||
def _suppress_stdout(): | ||
"""A context manager that suppresses any output to stdout and stderr.""" | ||
writer = _StandardOutWriter() | ||
|
||
stdout, stderr = sys.stdout, sys.stderr | ||
sys.stdout, sys.stderr = writer, writer | ||
|
||
yield | ||
|
||
sys.stdout, sys.stderr = stdout, stderr |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,43 +1,113 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
from hashlib import algorithms_guaranteed | ||
from typing import TYPE_CHECKING, Optional | ||
|
||
import lief | ||
import ssdeep | ||
import tlsh | ||
from pydantic import BaseModel, Field | ||
from semver import Version | ||
|
||
import config | ||
from analysis.PluginBase import AnalysisBasePlugin | ||
from helperFunctions.hash import get_hash, get_imphash, get_ssdeep, get_tlsh | ||
from analysis.plugin import AnalysisPluginV0 | ||
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin | ||
from helperFunctions.hash import get_hash, get_md5 | ||
|
||
if TYPE_CHECKING: | ||
from io import FileIO | ||
|
||
ELF_MIME_TYPES = [ | ||
'application/x-executable', | ||
'application/x-object', | ||
'application/x-pie-executable', | ||
'application/x-sharedlib', | ||
] | ||
|
||
|
||
class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): | ||
class Schema(BaseModel): | ||
# The supported hashes are the ones from helperFunctions.hash and hashlib (except "shake" which is of | ||
# little use considering its variable length). | ||
# If they are not supported on the platform or not selected in the configuration of the plugin, the value will | ||
# be `None`. | ||
# Only the md5 and sha256 hashes are guaranteed to be available (since they are required down the line) | ||
|
||
# from hashlib | ||
md5: str = Field(description="md5 hash of the file's content") | ||
sha256: str = Field(description="sha256 hash of the file's content") | ||
sha1: Optional[str] = Field(description="sha1 hash of the file's content", default=None) | ||
sha224: Optional[str] = Field(description="sha224 hash of the file's content", default=None) | ||
sha384: Optional[str] = Field(description="sha384 hash of the file's content", default=None) | ||
sha512: Optional[str] = Field(description="sha512 hash of the file's content", default=None) | ||
blake2b: Optional[str] = Field(description="blake2b hash of the file's content", default=None) | ||
blake2s: Optional[str] = Field(description="blake2s hash of the file's content", default=None) | ||
sha3_224: Optional[str] = Field(description="sha3_224 hash of the file's content", default=None) | ||
sha3_256: Optional[str] = Field(description="sha3_256 hash of the file's content", default=None) | ||
sha3_384: Optional[str] = Field(description="sha3_384 hash of the file's content", default=None) | ||
sha3_512: Optional[str] = Field(description="sha3_512 hash of the file's content", default=None) | ||
|
||
ssdeep: Optional[str] = Field(description="ssdeep hash of the file's content", default=None) | ||
tlsh: Optional[str] = Field(description="tlsh hash of the file's content", default=None) | ||
imphash: Optional[str] = Field( | ||
description='import hash: the MD5 hash of the sorted imported functions (ELF files only)', | ||
default=None, | ||
) | ||
|
||
def __init__(self): | ||
super().__init__( | ||
metadata=self.MetaData( | ||
name='file_hashes', | ||
description='calculate different hash values of the file', | ||
version=Version(1, 3, 0), | ||
dependencies=['file_type'], | ||
Schema=self.Schema, | ||
), | ||
) | ||
configured_hashes = getattr(config.backend.plugin.get(self.NAME, None), 'hashes', []) | ||
self.hashes_to_create = set(configured_hashes).union({'sha256', 'md5'}) | ||
|
||
def analyze(self, file_handle: FileIO, virtual_file_path: str, analyses: dict) -> Schema: | ||
del virtual_file_path | ||
result = {} | ||
|
||
class AnalysisPlugin(AnalysisBasePlugin): | ||
file_handle.seek(0) | ||
file_contents = file_handle.read() | ||
for hash_ in self.hashes_to_create.intersection(algorithms_guaranteed): | ||
result[hash_] = get_hash(hash_, file_contents) | ||
result['ssdeep'] = get_ssdeep(file_contents) | ||
result['imphash'] = get_imphash(file_handle, analyses.get('file_type')) | ||
result['tlsh'] = get_tlsh(file_contents) | ||
|
||
return self.Schema(**result) | ||
|
||
|
||
def get_imphash(file: FileIO, type_analysis: BaseModel | None) -> str | None: | ||
""" | ||
This Plugin creates several hashes of the file | ||
Generates and returns the md5 hash for the (sorted) imported functions of an ELF file. | ||
Returns `None` if there are no imports or if an exception occurs. | ||
""" | ||
if type_analysis is not None and _is_elf_file(type_analysis): | ||
try: | ||
if (parsed_elf := lief.ELF.parse(file.name)) is not None and len(parsed_elf.imported_functions) > 0: | ||
functions = [f.name for f in parsed_elf.imported_functions] | ||
return get_md5(','.join(sorted(functions))) | ||
except Exception as error: | ||
logging.warning(f'Could not compute imphash for {file}: {error}') | ||
return None | ||
|
||
|
||
def _is_elf_file(type_analysis: BaseModel) -> bool: | ||
return type_analysis.mime in ELF_MIME_TYPES | ||
|
||
|
||
def get_ssdeep(file_contents: bytes) -> str: | ||
raw_hash = ssdeep.Hash() | ||
raw_hash.update(file_contents) | ||
return raw_hash.digest() | ||
|
||
|
||
NAME = 'file_hashes' | ||
DEPENDENCIES = ['file_type'] # noqa: RUF012 | ||
DESCRIPTION = 'calculate different hash values of the file' | ||
VERSION = '1.2' | ||
FILE = __file__ | ||
|
||
def additional_setup(self): | ||
hashes = getattr(config.backend.plugin.get(self.NAME, None), 'hashes', ['sha256']) | ||
self.hashes_to_create = hashes | ||
|
||
def process_object(self, file_object): | ||
""" | ||
This function must be implemented by the plugin. | ||
Analysis result must be a dict stored in file_object.processed_analysis[self.NAME] | ||
If you want to propagate results to parent objects store a list of strings 'summary' entry of your result dict | ||
""" | ||
file_object.processed_analysis[self.NAME] = {} | ||
for hash_ in self.hashes_to_create: | ||
if hash_ in algorithms_guaranteed: | ||
file_object.processed_analysis[self.NAME][hash_] = get_hash(hash_, file_object.binary) | ||
else: | ||
logging.debug(f'algorithm {hash_} not available') | ||
file_object.processed_analysis[self.NAME]['ssdeep'] = get_ssdeep(file_object.binary) | ||
file_object.processed_analysis[self.NAME]['imphash'] = get_imphash(file_object) | ||
|
||
tlsh_hash = get_tlsh(file_object.binary) | ||
if tlsh_hash: | ||
file_object.processed_analysis[self.NAME]['tlsh'] = get_tlsh(file_object.binary) | ||
|
||
return file_object | ||
def get_tlsh(file_contents: bytes) -> str | None: | ||
tlsh_hash = tlsh.hash(file_contents) | ||
return tlsh_hash if tlsh_hash != 'TNULL' else None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
{% extends "analysis_plugins/general_information.html" %} | ||
|
||
{% block analysis_result_details %} | ||
|
||
{% for key, value in analysis_result.items() | sort %} | ||
{% if value %} | ||
<tr> | ||
<td>{{ key }}</td> | ||
<td style="font-family: monospace">{{ value }}</td> | ||
</tr> | ||
{% endif %} | ||
{% endfor %} | ||
|
||
{% endblock %} |
Oops, something went wrong.