diff --git a/requirements.txt b/requirements.txt index 0a87ab9..45b29c0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ tqdm==4.64.1 Pymem==1.8.5 python-magic==0.4.24 pyinstaller # dont set a version, not compatible -yara-python==4.3.1 \ No newline at end of file +yara-python==4.3.1 +lz4==4.3.3 \ No newline at end of file diff --git a/tests/test_import.py b/tests/test_import.py index b7c3062..ef442ec 100644 --- a/tests/test_import.py +++ b/tests/test_import.py @@ -8,12 +8,11 @@ class TestImport(unittest.TestCase): system: BaseSystem - zip_path: str @classmethod def setUpClass(cls) -> None: cls.system = acquire_system() - cls.zip_path = cls.system.acquire_volatile() + cls.system.acquire_volatile() @classmethod def tearDownClass(cls) -> None: @@ -29,5 +28,5 @@ def test_get_network(self) -> None: def test_got_files(self) -> None: # Check we got atleast 10 files - with ZipFile(self.zip_path) as z: + with ZipFile(self.system.output_path) as z: self.assertGreater(len(z.namelist()), 10) diff --git a/tests/test_linux/test_base.py b/tests/test_linux/test_base.py index 1d26c06..0cd73fd 100644 --- a/tests/test_linux/test_base.py +++ b/tests/test_linux/test_base.py @@ -5,18 +5,13 @@ class TestBaseCases(unittest.TestCase): system: BaseSystem - zip_path: str @classmethod def setUpClass(cls) -> None: cls.system = acquire_system() - cls.zip_path = cls.system.acquire_volatile() + cls.system.acquire_volatile() - @classmethod - def tearDownClass(cls) -> None: - pass - def test_some_processes(self) -> None: processes = self.system.get_processes() self.assertTrue(len(processes) > 0) @@ -28,7 +23,7 @@ def test_dump_files(self) -> None: open_files = self.system.dump_loaded_files() self.assertTrue(len(open_files) > 0) # Check we pulled at least one file from /bin/ - with ZipFile(self.zip_path) as z: + with ZipFile(self.system.output_path) as z: binary_files = [binary for binary in z.namelist() if ("/bin/" in binary in binary.lower())] self.assertGreater(len(binary_files), 0) \ No newline at end of file diff --git a/tests/test_shared.py b/tests/test_shared.py index 252582b..a4781a6 100644 --- a/tests/test_shared.py +++ b/tests/test_shared.py @@ -6,12 +6,11 @@ class TestBaseCases(unittest.TestCase): system: BaseSystem - zip_path: str @classmethod def setUpClass(cls) -> None: cls.system = acquire_system() - cls.zip_path = cls.system.acquire_volatile() + cls.system.acquire_volatile() @classmethod def tearDownClass(cls) -> None: @@ -27,5 +26,5 @@ def test_get_network(self) -> None: def test_got_files(self) -> None: # Check we got atleast 10 files - with ZipFile(self.zip_path) as z: + with ZipFile(self.system.output_path) as z: self.assertGreater(len(z.namelist()), 10) diff --git a/varc_core/systems/__init__.py b/varc_core/systems/__init__.py index 4b59744..b1b0870 100644 --- a/varc_core/systems/__init__.py +++ b/varc_core/systems/__init__.py @@ -10,7 +10,8 @@ def acquire_system( include_memory: bool = True, include_open: bool = True, extract_dumps: bool = False, - yara_file: Optional[str] = None + yara_file: Optional[str] = None, + output_path: Optional[str] = None ) -> BaseSystem: """Returns the either a windows or linux system or osx system @@ -20,12 +21,12 @@ def acquire_system( logging.info(f"Operating System is: {platform}") if platform == "linux" or platform == "linux2": from varc_core.systems.linux import LinuxSystem - return LinuxSystem(include_memory, include_open, extract_dumps, yara_file) + return LinuxSystem(include_memory, include_open, extract_dumps, yara_file, output_path=output_path) elif platform == "darwin": from varc_core.systems.osx import OsxSystem - return OsxSystem(include_memory, include_open, extract_dumps) + return OsxSystem(include_memory, include_open, extract_dumps, output_path=output_path) elif platform == "win32": from varc_core.systems.windows import WindowsSystem - return WindowsSystem(include_memory, include_open, extract_dumps, yara_file) + return WindowsSystem(include_memory, include_open, extract_dumps, yara_file, output_path=output_path) else: raise MissingOperatingSystemInfo() \ No newline at end of file diff --git a/varc_core/systems/base_system.py b/varc_core/systems/base_system.py index 091bd28..a9b7a92 100644 --- a/varc_core/systems/base_system.py +++ b/varc_core/systems/base_system.py @@ -7,17 +7,20 @@ If it can't work cross-platform, put any platform specific code in the class that inherits this base e.g. In linux.py """ +import io import json import logging import os import os.path import socket +import tarfile import time import zipfile from base64 import b64encode from datetime import datetime -from typing import Any, List, Optional +from typing import Any, List, Optional, Union +import lz4.frame # type: ignore import mss import psutil from tqdm import tqdm @@ -33,6 +36,28 @@ _MAX_OPEN_FILE_SIZE = 10000000 # 10 Mb max dumped filesize +class _TarLz4Wrapper: + + def __init__(self, path: str) -> None: + self._lz4 = lz4.frame.open(path, 'wb') + self._tar = tarfile.open(fileobj=self._lz4, mode="w") + + def writestr(self, path: str, value: Union[str, bytes]) -> None: + info = tarfile.TarInfo(path) + info.size = len(value) + self._tar.addfile(info, io.BytesIO(value if isinstance(value, bytes) else value.encode())) + + def write(self, path: str, arcname: str) -> None: + self._tar.add(path, arcname) + + def __enter__(self) -> "_TarLz4Wrapper": + return self + + def __exit__(self, type: Any, value: Any, traceback: Any) -> None: + self._tar.close() + self._lz4.close() + + class BaseSystem: """A @@ -52,7 +77,8 @@ def __init__( include_memory: bool = True, include_open: bool = True, extract_dumps: bool = False, - yara_file: Optional[str] = None + yara_file: Optional[str] = None, + output_path: Optional[str] = None ) -> None: self.todays_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logging.info(f'Acquiring system: {self.get_machine_name()}, at {self.todays_date}') @@ -66,11 +92,13 @@ def __init__( self.yara_file = yara_file self.yara_results: List[dict] = [] self.yara_hit_pids: List[int] = [] + self.output_path = output_path or os.path.join("", f"{self.get_machine_name()}-{self.timestamp}.zip") if self.process_name and self.process_id: raise ValueError( "Only one of Process name or Process ID (PID) can be used. Please re-run using one or the other.") - self.zip_path = self.acquire_volatile() + + self.acquire_volatile() if self.yara_file: if not _YARA_AVAILABLE: @@ -273,11 +301,9 @@ def take_screenshot(self) -> Optional[bytes]: logging.error("Unable to take screenshot") return None - def acquire_volatile(self, output_path: Optional[str] = None) -> str: + def acquire_volatile(self) -> None: """Acquire volatile data into a zip file This is called by all OS's - - :return: The filepath of the zip """ self.process_info = self.get_processes() self.network_log = self.get_network() @@ -290,21 +316,15 @@ def acquire_volatile(self, output_path: Optional[str] = None) -> str: screenshot_image = self.take_screenshot() else: screenshot_image = None - if not output_path: - output_path = os.path.join("", f"{self.get_machine_name()}-{self.timestamp}.zip") - # strip .zip if in filename as shutil appends to end - archive_out = output_path + ".zip" if not output_path.endswith(".zip") else output_path - self.output_path = output_path - with zipfile.ZipFile(archive_out, 'a', compression=zipfile.ZIP_DEFLATED) as zip_file: + + with self._open_output() as output_file: if screenshot_image: - zip_file.writestr(f"{self.get_machine_name()}-{self.timestamp}.png", screenshot_image) + output_file.writestr(f"{self.get_machine_name()}-{self.timestamp}.png", screenshot_image) for key, value in table_data.items(): - with zip_file.open(f"{key}.json", 'w') as json_file: - json_file.write(value.encode()) + output_file.writestr(f"{key}.json", value.encode()) if self.network_log: logging.info("Adding Netstat Data") - with zip_file.open("netstat.log", 'w') as network_file: - network_file.write("\r\n".join(self.network_log).encode()) + output_file.writestr("netstat.log", "\r\n".join(self.network_log).encode()) if self.include_open and self.dumped_files: for file_path in self.dumped_files: logging.info(f"Adding open file {file_path}") @@ -313,14 +333,17 @@ def acquire_volatile(self, output_path: Optional[str] = None) -> str: logging.warning(f"Skipping file as too large {file_path}") else: try: - zip_file.write(file_path, strip_drive(f"./collected_files/{file_path}")) + output_file.write(file_path, strip_drive(f"./collected_files/{file_path}")) except PermissionError: logging.warn(f"Permission denied copying {file_path}") except FileNotFoundError: logging.warning(f"Could not open {file_path} for reading") - return archive_out - + def _open_output(self) -> Union[zipfile.ZipFile, _TarLz4Wrapper]: + if self.output_path.endswith('.tar.lz4'): + return _TarLz4Wrapper(self.output_path) + else: + return zipfile.ZipFile(self.output_path, 'a', compression=zipfile.ZIP_DEFLATED) def yara_scan(self) -> None: def yara_hit_callback(hit: dict) -> Any: @@ -357,3 +380,4 @@ def yara_hit_callback(hit: dict) -> Any: logging.info("YARA scan results written to yara_results.json in output archive.") else: logging.info("No YARA rules were triggered. Nothing will be written to the output archive.") +