Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

intercom backend refactoring: make data flow comprehensible #1251

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 50 additions & 80 deletions src/intercom/back_end_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@

import difflib
import logging
import os
from multiprocessing import Process, Value
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING

import config
from helperFunctions.process import stop_processes
from helperFunctions.yara_binary_search import YaraBinarySearchScanner
from intercom.common_redis_binding import InterComListener, InterComListenerAndResponder, InterComRedisInterface
from intercom.common_redis_binding import (
InterComListener,
InterComListenerAndResponder,
publish_available_analysis_plugins,
)
from storage.binary_service import BinaryService
from storage.db_interface_common import DbInterfaceCommon
from storage.fsorganizer import FSOrganizer

if TYPE_CHECKING:
from collections.abc import Callable

from objects.firmware import Firmware
from storage.unpacking_locks import UnpackingLockManager

Expand All @@ -34,90 +33,65 @@ def __init__(
compare_service=None,
unpacking_service=None,
unpacking_locks=None,
testing=False, # noqa: ARG002
):
self.analysis_service = analysis_service
self.compare_service = compare_service
self.unpacking_service = unpacking_service
self.unpacking_locks = unpacking_locks
self.poll_delay = config.backend.intercom_poll_delay

self.stop_condition = Value('i', 0)
self.process_list = []
self.listeners = [
InterComBackEndAnalysisTask(self.unpacking_service.add_task),
InterComBackEndReAnalyzeTask(self.unpacking_service.add_task),
InterComBackEndCompareTask(self.compare_service.add_task),
InterComBackEndRawDownloadTask(),
InterComBackEndFileDiffTask(),
InterComBackEndTarRepackTask(),
InterComBackEndBinarySearchTask(),
InterComBackEndUpdateTask(self.analysis_service.update_analysis_of_object_and_children),
InterComBackEndDeleteFile(
unpacking_locks=self.unpacking_locks,
db_interface=DbInterfaceCommon(),
),
InterComBackEndSingleFileTask(self.analysis_service.update_analysis_of_single_object),
InterComBackEndPeekBinaryTask(),
InterComBackEndLogsTask(),
]

def start(self):
InterComBackEndAnalysisPlugInsPublisher(analysis_service=self.analysis_service)
self._start_listener(InterComBackEndAnalysisTask, self.unpacking_service.add_task)
self._start_listener(InterComBackEndReAnalyzeTask, self.unpacking_service.add_task)
self._start_listener(InterComBackEndCompareTask, self.compare_service.add_task)
self._start_listener(InterComBackEndRawDownloadTask)
self._start_listener(InterComBackEndFileDiffTask)
self._start_listener(InterComBackEndTarRepackTask)
self._start_listener(InterComBackEndBinarySearchTask)
self._start_listener(InterComBackEndUpdateTask, self.analysis_service.update_analysis_of_object_and_children)

self._start_listener(
InterComBackEndDeleteFile,
unpacking_locks=self.unpacking_locks,
db_interface=DbInterfaceCommon(),
)
self._start_listener(InterComBackEndSingleFileTask, self.analysis_service.update_analysis_of_single_object)
self._start_listener(InterComBackEndPeekBinaryTask)
self._start_listener(InterComBackEndLogsTask)
publish_available_analysis_plugins(self.analysis_service.get_plugin_dict())
for listener in self.listeners:
listener.start()
logging.info('Intercom online')

def shutdown(self):
self.stop_condition.value = 1
stop_processes(self.process_list, config.backend.intercom_poll_delay + 1)
for listener in self.listeners:
listener.shutdown()
stop_processes(
[listener.process for listener in self.listeners if listener],
config.backend.intercom_poll_delay + 1,
)
logging.info('Intercom offline')

def _start_listener(self, listener: type[InterComListener], do_after_function: Callable | None = None, **kwargs):
process = Process(target=self._backend_worker, args=(listener, do_after_function, kwargs))
process.start()
self.process_list.append(process)

def _backend_worker(self, listener: type[InterComListener], do_after_function: Callable | None, additional_args):
interface = listener(**additional_args)
logging.debug(f'{listener.__name__} listener started (pid={os.getpid()})')
while self.stop_condition.value == 0:
task = interface.get_next_task()
if task is None:
sleep(self.poll_delay)
elif do_after_function is not None:
do_after_function(task)
logging.debug(f'{listener.__name__} listener stopped')


class InterComBackEndAnalysisPlugInsPublisher(InterComRedisInterface):
def __init__(self, analysis_service=None):
super().__init__()
self.publish_available_analysis_plugins(analysis_service)

def publish_available_analysis_plugins(self, analysis_service):
available_plugin_dictionary = analysis_service.get_plugin_dict()
self.redis.set('analysis_plugins', available_plugin_dictionary)


class InterComBackEndAnalysisTask(InterComListener):
CONNECTION_TYPE = 'analysis_task'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.fs_organizer = FSOrganizer()

def post_processing(self, task, task_id): # noqa: ARG002
def pre_process(self, task, task_id): # noqa: ARG002
self.fs_organizer.store_file(task)
return task


class InterComBackEndReAnalyzeTask(InterComListener):
CONNECTION_TYPE = 're_analyze_task'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.fs_organizer = FSOrganizer()

def post_processing(self, task: Firmware, task_id): # noqa: ARG002
def pre_process(self, task: Firmware, task_id): # noqa: ARG002
task.file_path = self.fs_organizer.generate_path(task)
task.create_binary_from_path()
return task
Expand All @@ -139,8 +113,8 @@ class InterComBackEndRawDownloadTask(InterComListenerAndResponder):
CONNECTION_TYPE = 'raw_download_task'
OUTGOING_CONNECTION_TYPE = 'raw_download_task_resp'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.binary_service = BinaryService()

def get_response(self, task):
Expand All @@ -151,8 +125,8 @@ class InterComBackEndFileDiffTask(InterComListenerAndResponder):
CONNECTION_TYPE = 'file_diff_task'
OUTGOING_CONNECTION_TYPE = 'file_diff_task_resp'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.binary_service = BinaryService()

def get_response(self, task: tuple[str, str]) -> str | None:
Expand All @@ -174,8 +148,8 @@ class InterComBackEndPeekBinaryTask(InterComListenerAndResponder):
CONNECTION_TYPE = 'binary_peek_task'
OUTGOING_CONNECTION_TYPE = 'binary_peek_task_resp'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.binary_service = BinaryService()

def get_response(self, task: tuple[str, int, int]) -> bytes:
Expand All @@ -186,8 +160,8 @@ class InterComBackEndTarRepackTask(InterComListenerAndResponder):
CONNECTION_TYPE = 'tar_repack_task'
OUTGOING_CONNECTION_TYPE = 'tar_repack_task_resp'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.binary_service = BinaryService()

def get_response(self, task):
Expand All @@ -204,16 +178,16 @@ def get_response(self, task):
return search_result, task


class InterComBackEndDeleteFile(InterComListenerAndResponder):
class InterComBackEndDeleteFile(InterComListener):
CONNECTION_TYPE = 'file_delete_task'

def __init__(self, unpacking_locks=None, db_interface=None):
super().__init__()
def __init__(self, *args, unpacking_locks: UnpackingLockManager, db_interface: DbInterfaceCommon):
super().__init__(*args)
self.fs_organizer = FSOrganizer()
self.db = db_interface
self.unpacking_locks: UnpackingLockManager = unpacking_locks
self.unpacking_locks = unpacking_locks

def post_processing(self, task: set[str], task_id): # noqa: ARG002
def pre_process(self, task: set[str], task_id): # noqa: ARG002
# task is a set of UIDs
uids_in_db = self.db.uid_list_exists(task)
deleted = 0
Expand All @@ -228,10 +202,6 @@ def post_processing(self, task: set[str], task_id): # noqa: ARG002
logging.warning(f'File not removed, because database entry exists: {uid}')
if deleted:
logging.info(f'Deleted {deleted} file(s)')
return task

def get_response(self, task): # noqa: ARG002
return True # we only want to know when the deletion is completed and not actually return something


class InterComBackEndLogsTask(InterComListenerAndResponder):
Expand Down
53 changes: 42 additions & 11 deletions src/intercom/common_redis_binding.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,63 @@
from __future__ import annotations

import logging
import os
import pickle
from time import time
from typing import Any
from multiprocessing import Process, Value
from time import sleep, time
from typing import TYPE_CHECKING, Any, Callable

from redis.exceptions import RedisError

import config
from helperFunctions.hash import get_sha256
from storage.redis_interface import RedisInterface

if TYPE_CHECKING:
from objects.file import FileObject


def generate_task_id(input_data: Any) -> str:
serialized_data = pickle.dumps(input_data)
return f'{get_sha256(serialized_data)}_{time()}'


class InterComRedisInterface:
def __init__(self):
self.redis = RedisInterface()
def publish_available_analysis_plugins(plugin_dict: dict[str, tuple]):
redis = RedisInterface()
redis.set('analysis_plugins', plugin_dict)


class InterComListener(InterComRedisInterface):
class InterComListener:
"""
InterCom Listener Base Class
"""

CONNECTION_TYPE = 'test' # unique for each listener

def __init__(self, processing_function: Callable[[FileObject], None] | None = None):
super().__init__()
self.redis = RedisInterface()
self.process = None
self.processing_function = processing_function
self.stop_condition = Value('i', 0)

def start(self):
self.process = Process(target=self._worker)
self.process.start()

def shutdown(self):
self.stop_condition.value = 1
dorpvom marked this conversation as resolved.
Show resolved Hide resolved

def _worker(self):
logging.debug(f'{self.CONNECTION_TYPE} listener started (pid={os.getpid()})')
while self.stop_condition.value == 0:
task = self.get_next_task()
if task is None:
sleep(config.backend.intercom_poll_delay)
elif self.processing_function is not None:
self.processing_function(task)
logging.debug(f'{self.CONNECTION_TYPE} listener stopped')

def get_next_task(self):
try:
task_obj = self.redis.queue_get(self.CONNECTION_TYPE)
Expand All @@ -34,14 +66,14 @@ def get_next_task(self):
return None
if task_obj is not None:
task, task_id = task_obj
task = self.post_processing(task, task_id)
task = self.pre_process(task, task_id)
logging.debug(f'{self.CONNECTION_TYPE}: New task received: {task}')
return task
return None

def post_processing(self, task, task_id): # noqa: ARG002
def pre_process(self, task, task_id): # noqa: ARG002
"""
optional post-processing of a task
optional pre-processing of a task
"""
return task

Expand All @@ -51,10 +83,9 @@ class InterComListenerAndResponder(InterComListener):
CONNECTION_TYPE and OUTGOING_CONNECTION_TYPE must be implemented by the sub_class
"""

CONNECTION_TYPE = 'test'
OUTGOING_CONNECTION_TYPE = 'test'

def post_processing(self, task, task_id):
def pre_process(self, task, task_id):
logging.debug(f'request received: {self.CONNECTION_TYPE} -> {task_id}')
response = self.get_response(task)
self.redis.set(task_id, response)
Expand Down
8 changes: 6 additions & 2 deletions src/intercom/front_end_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
from typing import Any

import config
from intercom.common_redis_binding import InterComRedisInterface, generate_task_id
from intercom.common_redis_binding import generate_task_id
from storage.redis_interface import RedisInterface


class InterComFrontEndBinding(InterComRedisInterface):
class InterComFrontEndBinding:
"""
Internal Communication FrontEnd Binding
"""

def __init__(self):
self.redis = RedisInterface()

def add_analysis_task(self, fw):
self._add_to_redis_queue('analysis_task', fw, fw.uid)

Expand Down
Loading