Skip to content
This repository has been archived by the owner on Nov 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #21 from john-cado/jlugton/lz4
Browse files Browse the repository at this point in the history
Allow varc to output as tar.lz4
  • Loading branch information
john-cado authored Sep 20, 2024
2 parents fb44b36 + 2122d03 commit b8102a9
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 38 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ tqdm==4.64.1
Pymem==1.8.5
python-magic==0.4.24
pyinstaller # dont set a version, not compatible
yara-python==4.3.1
yara-python==4.3.1
lz4==4.3.3
5 changes: 2 additions & 3 deletions tests/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@

class TestImport(unittest.TestCase):
system: BaseSystem
zip_path: str

@classmethod
def setUpClass(cls) -> None:
cls.system = acquire_system()
cls.zip_path = cls.system.acquire_volatile()
cls.system.acquire_volatile()

@classmethod
def tearDownClass(cls) -> None:
Expand All @@ -29,5 +28,5 @@ def test_get_network(self) -> None:

def test_got_files(self) -> None:
# Check we got atleast 10 files
with ZipFile(self.zip_path) as z:
with ZipFile(self.system.output_path) as z:
self.assertGreater(len(z.namelist()), 10)
9 changes: 2 additions & 7 deletions tests/test_linux/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@

class TestBaseCases(unittest.TestCase):
system: BaseSystem
zip_path: str

@classmethod
def setUpClass(cls) -> None:
cls.system = acquire_system()
cls.zip_path = cls.system.acquire_volatile()
cls.system.acquire_volatile()


@classmethod
def tearDownClass(cls) -> None:
pass

def test_some_processes(self) -> None:
processes = self.system.get_processes()
self.assertTrue(len(processes) > 0)
Expand All @@ -28,7 +23,7 @@ def test_dump_files(self) -> None:
open_files = self.system.dump_loaded_files()
self.assertTrue(len(open_files) > 0)
# Check we pulled at least one file from /bin/
with ZipFile(self.zip_path) as z:
with ZipFile(self.system.output_path) as z:
binary_files = [binary for binary in z.namelist() if ("/bin/" in binary in binary.lower())]
self.assertGreater(len(binary_files), 0)

5 changes: 2 additions & 3 deletions tests/test_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@

class TestBaseCases(unittest.TestCase):
system: BaseSystem
zip_path: str

@classmethod
def setUpClass(cls) -> None:
cls.system = acquire_system()
cls.zip_path = cls.system.acquire_volatile()
cls.system.acquire_volatile()

@classmethod
def tearDownClass(cls) -> None:
Expand All @@ -27,5 +26,5 @@ def test_get_network(self) -> None:

def test_got_files(self) -> None:
# Check we got atleast 10 files
with ZipFile(self.zip_path) as z:
with ZipFile(self.system.output_path) as z:
self.assertGreater(len(z.namelist()), 10)
9 changes: 5 additions & 4 deletions varc_core/systems/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ def acquire_system(
include_memory: bool = True,
include_open: bool = True,
extract_dumps: bool = False,
yara_file: Optional[str] = None
yara_file: Optional[str] = None,
output_path: Optional[str] = None
) -> BaseSystem:
"""Returns the either a windows or linux system or osx system
Expand All @@ -20,12 +21,12 @@ def acquire_system(
logging.info(f"Operating System is: {platform}")
if platform == "linux" or platform == "linux2":
from varc_core.systems.linux import LinuxSystem
return LinuxSystem(include_memory, include_open, extract_dumps, yara_file)
return LinuxSystem(include_memory, include_open, extract_dumps, yara_file, output_path=output_path)
elif platform == "darwin":
from varc_core.systems.osx import OsxSystem
return OsxSystem(include_memory, include_open, extract_dumps)
return OsxSystem(include_memory, include_open, extract_dumps, output_path=output_path)
elif platform == "win32":
from varc_core.systems.windows import WindowsSystem
return WindowsSystem(include_memory, include_open, extract_dumps, yara_file)
return WindowsSystem(include_memory, include_open, extract_dumps, yara_file, output_path=output_path)
else:
raise MissingOperatingSystemInfo()
64 changes: 44 additions & 20 deletions varc_core/systems/base_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@
If it can't work cross-platform, put any platform specific code in the class that inherits this base
e.g. In linux.py
"""
import io
import json
import logging
import os
import os.path
import socket
import tarfile
import time
import zipfile
from base64 import b64encode
from datetime import datetime
from typing import Any, List, Optional
from typing import Any, List, Optional, Union

import lz4.frame # type: ignore
import mss
import psutil
from tqdm import tqdm
Expand All @@ -33,6 +36,28 @@
_MAX_OPEN_FILE_SIZE = 10000000 # 10 Mb max dumped filesize


class _TarLz4Wrapper:

def __init__(self, path: str) -> None:
self._lz4 = lz4.frame.open(path, 'wb')
self._tar = tarfile.open(fileobj=self._lz4, mode="w")

def writestr(self, path: str, value: Union[str, bytes]) -> None:
info = tarfile.TarInfo(path)
info.size = len(value)
self._tar.addfile(info, io.BytesIO(value if isinstance(value, bytes) else value.encode()))

def write(self, path: str, arcname: str) -> None:
self._tar.add(path, arcname)

def __enter__(self) -> "_TarLz4Wrapper":
return self

def __exit__(self, type: Any, value: Any, traceback: Any) -> None:
self._tar.close()
self._lz4.close()


class BaseSystem:
"""A
Expand All @@ -52,7 +77,8 @@ def __init__(
include_memory: bool = True,
include_open: bool = True,
extract_dumps: bool = False,
yara_file: Optional[str] = None
yara_file: Optional[str] = None,
output_path: Optional[str] = None
) -> None:
self.todays_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logging.info(f'Acquiring system: {self.get_machine_name()}, at {self.todays_date}')
Expand All @@ -66,11 +92,13 @@ def __init__(
self.yara_file = yara_file
self.yara_results: List[dict] = []
self.yara_hit_pids: List[int] = []
self.output_path = output_path or os.path.join("", f"{self.get_machine_name()}-{self.timestamp}.zip")

if self.process_name and self.process_id:
raise ValueError(
"Only one of Process name or Process ID (PID) can be used. Please re-run using one or the other.")
self.zip_path = self.acquire_volatile()

self.acquire_volatile()

if self.yara_file:
if not _YARA_AVAILABLE:
Expand Down Expand Up @@ -273,11 +301,9 @@ def take_screenshot(self) -> Optional[bytes]:
logging.error("Unable to take screenshot")
return None

def acquire_volatile(self, output_path: Optional[str] = None) -> str:
def acquire_volatile(self) -> None:
"""Acquire volatile data into a zip file
This is called by all OS's
:return: The filepath of the zip
"""
self.process_info = self.get_processes()
self.network_log = self.get_network()
Expand All @@ -290,21 +316,15 @@ def acquire_volatile(self, output_path: Optional[str] = None) -> str:
screenshot_image = self.take_screenshot()
else:
screenshot_image = None
if not output_path:
output_path = os.path.join("", f"{self.get_machine_name()}-{self.timestamp}.zip")
# strip .zip if in filename as shutil appends to end
archive_out = output_path + ".zip" if not output_path.endswith(".zip") else output_path
self.output_path = output_path
with zipfile.ZipFile(archive_out, 'a', compression=zipfile.ZIP_DEFLATED) as zip_file:

with self._open_output() as output_file:
if screenshot_image:
zip_file.writestr(f"{self.get_machine_name()}-{self.timestamp}.png", screenshot_image)
output_file.writestr(f"{self.get_machine_name()}-{self.timestamp}.png", screenshot_image)
for key, value in table_data.items():
with zip_file.open(f"{key}.json", 'w') as json_file:
json_file.write(value.encode())
output_file.writestr(f"{key}.json", value.encode())
if self.network_log:
logging.info("Adding Netstat Data")
with zip_file.open("netstat.log", 'w') as network_file:
network_file.write("\r\n".join(self.network_log).encode())
output_file.writestr("netstat.log", "\r\n".join(self.network_log).encode())
if self.include_open and self.dumped_files:
for file_path in self.dumped_files:
logging.info(f"Adding open file {file_path}")
Expand All @@ -313,14 +333,17 @@ def acquire_volatile(self, output_path: Optional[str] = None) -> str:
logging.warning(f"Skipping file as too large {file_path}")
else:
try:
zip_file.write(file_path, strip_drive(f"./collected_files/{file_path}"))
output_file.write(file_path, strip_drive(f"./collected_files/{file_path}"))
except PermissionError:
logging.warn(f"Permission denied copying {file_path}")
except FileNotFoundError:
logging.warning(f"Could not open {file_path} for reading")

return archive_out

def _open_output(self) -> Union[zipfile.ZipFile, _TarLz4Wrapper]:
if self.output_path.endswith('.tar.lz4'):
return _TarLz4Wrapper(self.output_path)
else:
return zipfile.ZipFile(self.output_path, 'a', compression=zipfile.ZIP_DEFLATED)

def yara_scan(self) -> None:
def yara_hit_callback(hit: dict) -> Any:
Expand Down Expand Up @@ -357,3 +380,4 @@ def yara_hit_callback(hit: dict) -> Any:
logging.info("YARA scan results written to yara_results.json in output archive.")
else:
logging.info("No YARA rules were triggered. Nothing will be written to the output archive.")

0 comments on commit b8102a9

Please sign in to comment.