Skip to content

Commit

Permalink
intercom backend refactoring: make data flow comprehensible
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke committed Dec 2, 2024
1 parent 4b87ffa commit 6e948f0
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 117 deletions.
130 changes: 50 additions & 80 deletions src/intercom/back_end_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@

import difflib
import logging
import os
from multiprocessing import Process, Value
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING

import config
from helperFunctions.process import stop_processes
from helperFunctions.yara_binary_search import YaraBinarySearchScanner
from intercom.common_redis_binding import InterComListener, InterComListenerAndResponder, InterComRedisInterface
from intercom.common_redis_binding import (
InterComListener,
InterComListenerAndResponder,
publish_available_analysis_plugins,
)
from storage.binary_service import BinaryService
from storage.db_interface_common import DbInterfaceCommon
from storage.fsorganizer import FSOrganizer

if TYPE_CHECKING:
from collections.abc import Callable

from objects.firmware import Firmware
from storage.unpacking_locks import UnpackingLockManager

Expand All @@ -34,90 +33,65 @@ def __init__(
compare_service=None,
unpacking_service=None,
unpacking_locks=None,
testing=False, # noqa: ARG002
):
self.analysis_service = analysis_service
self.compare_service = compare_service
self.unpacking_service = unpacking_service
self.unpacking_locks = unpacking_locks
self.poll_delay = config.backend.intercom_poll_delay

self.stop_condition = Value('i', 0)
self.process_list = []
self.listeners = [
InterComBackEndAnalysisTask(self.unpacking_service.add_task),
InterComBackEndReAnalyzeTask(self.unpacking_service.add_task),
InterComBackEndCompareTask(self.compare_service.add_task),
InterComBackEndRawDownloadTask(),
InterComBackEndFileDiffTask(),
InterComBackEndTarRepackTask(),
InterComBackEndBinarySearchTask(),
InterComBackEndUpdateTask(self.analysis_service.update_analysis_of_object_and_children),
InterComBackEndDeleteFile(
unpacking_locks=self.unpacking_locks,
db_interface=DbInterfaceCommon(),
),
InterComBackEndSingleFileTask(self.analysis_service.update_analysis_of_single_object),
InterComBackEndPeekBinaryTask(),
InterComBackEndLogsTask(),
]

def start(self):
InterComBackEndAnalysisPlugInsPublisher(analysis_service=self.analysis_service)
self._start_listener(InterComBackEndAnalysisTask, self.unpacking_service.add_task)
self._start_listener(InterComBackEndReAnalyzeTask, self.unpacking_service.add_task)
self._start_listener(InterComBackEndCompareTask, self.compare_service.add_task)
self._start_listener(InterComBackEndRawDownloadTask)
self._start_listener(InterComBackEndFileDiffTask)
self._start_listener(InterComBackEndTarRepackTask)
self._start_listener(InterComBackEndBinarySearchTask)
self._start_listener(InterComBackEndUpdateTask, self.analysis_service.update_analysis_of_object_and_children)

self._start_listener(
InterComBackEndDeleteFile,
unpacking_locks=self.unpacking_locks,
db_interface=DbInterfaceCommon(),
)
self._start_listener(InterComBackEndSingleFileTask, self.analysis_service.update_analysis_of_single_object)
self._start_listener(InterComBackEndPeekBinaryTask)
self._start_listener(InterComBackEndLogsTask)
publish_available_analysis_plugins(self.analysis_service.get_plugin_dict())
for listener in self.listeners:
listener.start()
logging.info('Intercom online')

def shutdown(self):
self.stop_condition.value = 1
stop_processes(self.process_list, config.backend.intercom_poll_delay + 1)
for listener in self.listeners:
listener.shutdown()
stop_processes(
[listener.process for listener in self.listeners if listener],
config.backend.intercom_poll_delay + 1,
)
logging.info('Intercom offline')

def _start_listener(self, listener: type[InterComListener], do_after_function: Callable | None = None, **kwargs):
process = Process(target=self._backend_worker, args=(listener, do_after_function, kwargs))
process.start()
self.process_list.append(process)

def _backend_worker(self, listener: type[InterComListener], do_after_function: Callable | None, additional_args):
interface = listener(**additional_args)
logging.debug(f'{listener.__name__} listener started (pid={os.getpid()})')
while self.stop_condition.value == 0:
task = interface.get_next_task()
if task is None:
sleep(self.poll_delay)
elif do_after_function is not None:
do_after_function(task)
logging.debug(f'{listener.__name__} listener stopped')


class InterComBackEndAnalysisPlugInsPublisher(InterComRedisInterface):
def __init__(self, analysis_service=None):
super().__init__()
self.publish_available_analysis_plugins(analysis_service)

def publish_available_analysis_plugins(self, analysis_service):
available_plugin_dictionary = analysis_service.get_plugin_dict()
self.redis.set('analysis_plugins', available_plugin_dictionary)


class InterComBackEndAnalysisTask(InterComListener):
CONNECTION_TYPE = 'analysis_task'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.fs_organizer = FSOrganizer()

def post_processing(self, task, task_id): # noqa: ARG002
def pre_process(self, task, task_id): # noqa: ARG002
self.fs_organizer.store_file(task)
return task


class InterComBackEndReAnalyzeTask(InterComListener):
CONNECTION_TYPE = 're_analyze_task'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.fs_organizer = FSOrganizer()

def post_processing(self, task: Firmware, task_id): # noqa: ARG002
def pre_process(self, task: Firmware, task_id): # noqa: ARG002
task.file_path = self.fs_organizer.generate_path(task)
task.create_binary_from_path()
return task
Expand All @@ -139,8 +113,8 @@ class InterComBackEndRawDownloadTask(InterComListenerAndResponder):
CONNECTION_TYPE = 'raw_download_task'
OUTGOING_CONNECTION_TYPE = 'raw_download_task_resp'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.binary_service = BinaryService()

def get_response(self, task):
Expand All @@ -151,8 +125,8 @@ class InterComBackEndFileDiffTask(InterComListenerAndResponder):
CONNECTION_TYPE = 'file_diff_task'
OUTGOING_CONNECTION_TYPE = 'file_diff_task_resp'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.binary_service = BinaryService()

def get_response(self, task: tuple[str, str]) -> str | None:
Expand All @@ -174,8 +148,8 @@ class InterComBackEndPeekBinaryTask(InterComListenerAndResponder):
CONNECTION_TYPE = 'binary_peek_task'
OUTGOING_CONNECTION_TYPE = 'binary_peek_task_resp'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.binary_service = BinaryService()

def get_response(self, task: tuple[str, int, int]) -> bytes:
Expand All @@ -186,8 +160,8 @@ class InterComBackEndTarRepackTask(InterComListenerAndResponder):
CONNECTION_TYPE = 'tar_repack_task'
OUTGOING_CONNECTION_TYPE = 'tar_repack_task_resp'

def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self.binary_service = BinaryService()

def get_response(self, task):
Expand All @@ -204,16 +178,16 @@ def get_response(self, task):
return search_result, task


class InterComBackEndDeleteFile(InterComListenerAndResponder):
class InterComBackEndDeleteFile(InterComListener):
CONNECTION_TYPE = 'file_delete_task'

def __init__(self, unpacking_locks=None, db_interface=None):
super().__init__()
def __init__(self, *args, unpacking_locks: UnpackingLockManager, db_interface: DbInterfaceCommon):
super().__init__(*args)
self.fs_organizer = FSOrganizer()
self.db = db_interface
self.unpacking_locks: UnpackingLockManager = unpacking_locks
self.unpacking_locks = unpacking_locks

def post_processing(self, task: set[str], task_id): # noqa: ARG002
def pre_process(self, task: set[str], task_id): # noqa: ARG002
# task is a set of UIDs
uids_in_db = self.db.uid_list_exists(task)
deleted = 0
Expand All @@ -228,10 +202,6 @@ def post_processing(self, task: set[str], task_id): # noqa: ARG002
logging.warning(f'File not removed, because database entry exists: {uid}')
if deleted:
logging.info(f'Deleted {deleted} file(s)')
return task

def get_response(self, task): # noqa: ARG002
return True # we only want to know when the deletion is completed and not actually return something


class InterComBackEndLogsTask(InterComListenerAndResponder):
Expand Down
53 changes: 42 additions & 11 deletions src/intercom/common_redis_binding.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,63 @@
from __future__ import annotations

import logging
import os
import pickle
from time import time
from typing import Any
from multiprocessing import Process, Value
from time import sleep, time
from typing import TYPE_CHECKING, Any, Callable

from redis.exceptions import RedisError

import config
from helperFunctions.hash import get_sha256
from storage.redis_interface import RedisInterface

if TYPE_CHECKING:
from objects.file import FileObject


def generate_task_id(input_data: Any) -> str:
serialized_data = pickle.dumps(input_data)
return f'{get_sha256(serialized_data)}_{time()}'


class InterComRedisInterface:
def __init__(self):
self.redis = RedisInterface()
def publish_available_analysis_plugins(plugin_dict: dict[str, tuple]):
redis = RedisInterface()
redis.set('analysis_plugins', plugin_dict)


class InterComListener(InterComRedisInterface):
class InterComListener:
"""
InterCom Listener Base Class
"""

CONNECTION_TYPE = 'test' # unique for each listener

def __init__(self, processing_function: Callable[[FileObject], None] | None = None):
super().__init__()
self.redis = RedisInterface()
self.process = None
self.processing_function = processing_function
self.stop_condition = Value('i', 0)

def start(self):
self.process = Process(target=self._worker)
self.process.start()

def shutdown(self):
self.stop_condition.value = 1

def _worker(self):
logging.debug(f'{self.CONNECTION_TYPE} listener started (pid={os.getpid()})')
while self.stop_condition.value == 0:
task = self.get_next_task()
if task is None:
sleep(config.backend.intercom_poll_delay)
elif self.processing_function is not None:
self.processing_function(task)
logging.debug(f'{self.CONNECTION_TYPE} listener stopped')

def get_next_task(self):
try:
task_obj = self.redis.queue_get(self.CONNECTION_TYPE)
Expand All @@ -34,14 +66,14 @@ def get_next_task(self):
return None
if task_obj is not None:
task, task_id = task_obj
task = self.post_processing(task, task_id)
task = self.pre_process(task, task_id)
logging.debug(f'{self.CONNECTION_TYPE}: New task received: {task}')
return task
return None

def post_processing(self, task, task_id): # noqa: ARG002
def pre_process(self, task, task_id): # noqa: ARG002
"""
optional post-processing of a task
optional pre-processing of a task
"""
return task

Expand All @@ -51,10 +83,9 @@ class InterComListenerAndResponder(InterComListener):
CONNECTION_TYPE and OUTGOING_CONNECTION_TYPE must be implemented by the sub_class
"""

CONNECTION_TYPE = 'test'
OUTGOING_CONNECTION_TYPE = 'test'

def post_processing(self, task, task_id):
def pre_process(self, task, task_id):
logging.debug(f'request received: {self.CONNECTION_TYPE} -> {task_id}')
response = self.get_response(task)
self.redis.set(task_id, response)
Expand Down
8 changes: 6 additions & 2 deletions src/intercom/front_end_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
from typing import Any

import config
from intercom.common_redis_binding import InterComRedisInterface, generate_task_id
from intercom.common_redis_binding import generate_task_id
from storage.redis_interface import RedisInterface


class InterComFrontEndBinding(InterComRedisInterface):
class InterComFrontEndBinding:
"""
Internal Communication FrontEnd Binding
"""

def __init__(self):
self.redis = RedisInterface()

def add_analysis_task(self, fw):
self._add_to_redis_queue('analysis_task', fw, fw.uid)

Expand Down
Loading

0 comments on commit 6e948f0

Please sign in to comment.