Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make encryption work with zip files #196

Merged
merged 10 commits into from
Oct 24, 2024
2 changes: 1 addition & 1 deletion acquire/crypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def tell(self):
return self.fh.tell()

def seek(self, pos, whence=io.SEEK_CUR):
raise TypeError("seeking is not allowed")
raise io.UnsupportedOperation("seeking is not allowed")

def close(self):
self.finalize()
Expand Down
2 changes: 1 addition & 1 deletion acquire/outputs/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(

if encrypt:
self._fh = EncryptedStream(self.path.open("wb"), public_key)
self.archive = zipfile.ZipFile(fileobj=self._fh, mode="w", compression=self.compression, allowZip64=True)
self.archive = zipfile.ZipFile(self._fh, mode="w", compression=self.compression, allowZip64=True)
else:
self.archive = zipfile.ZipFile(self.path, mode="w", compression=self.compression, allowZip64=True)

Expand Down
62 changes: 38 additions & 24 deletions acquire/tools/decrypter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import argparse
import base64
import contextlib
Expand All @@ -14,6 +16,9 @@
from datetime import datetime, timezone
from pathlib import Path
from queue import Empty as QueueEmptyError
from queue import Queue
from threading import Event
from typing import BinaryIO, Iterator
from urllib import request
from urllib.error import HTTPError
from urllib.parse import urljoin
Expand Down Expand Up @@ -73,7 +78,7 @@


class EncryptedFile(AlignedStream):
def __init__(self, fh, key_file=None, key_server=None):
def __init__(self, fh: BinaryIO, key_file: Path | None = None, key_server: str | None = None) -> None:
self.fh = fh
self.key_file = key_file
self.key_server = key_server
Expand Down Expand Up @@ -116,10 +121,10 @@
def seekable(self):
return False

def seek(self, pos, whence=io.SEEK_CUR):
def seek(self, pos: int, whence: int = io.SEEK_CUR) -> int:
raise io.UnsupportedOperation("seeking is not allowed")

def _read(self, offset, length):
def _read(self, offset: int, length: int) -> bytes:
if not self.size:
result = []

Expand Down Expand Up @@ -162,25 +167,25 @@
read_size = max(0, min(length, self.size - offset))
return self.cipher.decrypt(self.fh.read(read_size))

def chunks(self, chunk_size=CHUNK_SIZE):
def chunks(self, chunk_size: int = CHUNK_SIZE) -> Iterator[bytes]:
while True:
chunk = self.read(chunk_size)
if not chunk:
break
yield chunk

def verify(self):
def verify(self) -> None:
try:
self.cipher.verify(self.digest)
except ValueError:
raise VerifyError("Digest check failed")

@property
def file_header(self):
def file_header(self) -> c_acquire.file:
return self._file_header

@file_header.setter
def file_header(self, file_header):
def file_header(self, file_header: c_acquire.file) -> None:
if file_header.magic != FILE_MAGIC:
raise ValueError(f"Invalid file magic: {file_header.magic}")

Expand All @@ -193,31 +198,31 @@
self._file_header = file_header

@property
def header(self):
def header(self) -> c_acquire.header:
return self._header

@header.setter
def header(self, header):
def header(self, header: c_acquire.header) -> None:
if header.magic != HEADER_MAGIC:
raise ValueError(f"Invalid header magic: {header.magic}")
self._header = header

@property
def footer(self):
def footer(self) -> c_acquire.footer:
return self._footer

@footer.setter
def footer(self, footer):
def footer(self, footer: c_acquire.footer) -> None:
if footer.magic != FOOTER_MAGIC:
raise ValueError(f"Invalid footer magic: {footer}")
self._footer = footer

@property
def timestamp(self):
def timestamp(self) -> datetime:
return datetime.fromtimestamp(self.file_header.timestamp, timezone.utc)


def decrypt_header(header, fingerprint, key_file=None, key_server=None):
def decrypt_header(header, fingerprint: bytes, key_file: Path | None = None, key_server: str | None = None) -> bytes:
if not key_file and not key_server:
raise ValueError("Need either key file or key server")

Expand Down Expand Up @@ -264,7 +269,16 @@
return False


def worker(task_id, stop_event, status_queue, in_path, out_path, key_file=None, key_server=None, clobber=False):
def worker(
task_id: int,
stop_event: Event,
status_queue: Queue,
in_path: Path,
out_path: Path,
key_file: Path | None = None,
key_server: str | None = None,
clobber: bool = False,
) -> None:
success = False
message = "An unknown error occurred"

Expand Down Expand Up @@ -325,23 +339,23 @@
_exit(status_queue, task_id, str(in_path), message, success)


def _start(queue, task_id):
def _start(queue: Queue, task_id: int) -> None:
queue.put_nowait((STATUS_START, task_id))


def _update(queue, task_id, *args, **kwargs):
def _update(queue: Queue, task_id: int, *args, **kwargs) -> None:
queue.put_nowait((STATUS_UPDATE, (task_id, args, kwargs)))


def _info(queue, msg):
def _info(queue: Queue, msg: str) -> None:
queue.put_nowait((STATUS_INFO, msg))


def _exit(queue: multiprocessing.Queue, task_id: int, in_path: str, message: str, success: bool):
def _exit(queue: multiprocessing.Queue, task_id: int, in_path: str, message: str, success: bool) -> None:
queue.put_nowait((STATUS_EXIT, (task_id, in_path, message, success)))


def setup_logging(logger, verbosity):
def setup_logging(logger: logging.Logger, verbosity: int) -> None:
if verbosity == 1:
level = logging.ERROR
elif verbosity == 2:
Expand All @@ -360,7 +374,7 @@
logger.setLevel(level)


def main():
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("files", nargs="+", type=Path, help="paths to encrypted files")
parser.add_argument("-o", "--output", type=Path, help="optional path to output file")
Expand Down Expand Up @@ -476,12 +490,12 @@
# If no successful results, return 1
if not any(successes):
exit_code = 1
# Else, if some results were successful return 2
# Else, if some results but not all were successful return 2
elif not all(successes):
exit_code = 2
# Else, if all were successful but there were still tasks to handle, return 2
elif all(success) and tasks:
exit_code = 2
# Else, if all were successful but there were still tasks to handle, return 3
elif tasks:
exit_code = 3

Check warning on line 498 in acquire/tools/decrypter.py

View check run for this annotation

Codecov / codecov/patch

acquire/tools/decrypter.py#L497-L498

Added lines #L497 - L498 were not covered by tests
exit(exit_code)


Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ def mock_target(mock_fs: VirtualFilesystem) -> Target:
target.filesystems.add(mock_fs)
target.os = "mock"
return target


@pytest.fixture
def public_key() -> bytes:
with open("tests/data/public_key.pem", "r") as f:
return f.read()
28 changes: 28 additions & 0 deletions tests/data/private_key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDDd7qXEe7cR4b0
DFmszQm5MNFPkf/uJd5i6Y4ya0prts9s4mDdS90oDUt7bvLl42i/eB+S1fw9UpmK
r8OBV51VoFPswLVSwHho7LjbGvBSgKCW31skalgOmCAiCl6BqqjI7eIqxJx3V2+G
tW19v0EvbPJNs4gWlLIzn0WZJ65HWNZxToD9NcM2uZeIFxziUFvQYv1Nzkih/HWA
8Fw8Sdrq3JN3RG9jikJSH/MisU25IP5ehP5g66akPCufBCEc6Y2OE4wAQ7sbBMpf
/6PLZHGCVMnE8TXGBUl7m9UVwdyhvfIFBNhGilpqtSQhlqymrko7R7uZ6x5gQIz/
YzUW/Ho9AgMBAAECggEABqDTvKBeDNgxalXU4KLfS96s6lmAEo+e1+nQPu4byu/w
XrfaeFvvNuwkfXNeBzpL6K+RGoXpFMdCmk0AKy2mZytATUyc7skaDCzNeUM+QlNG
9CFvfMT3vB71JVJcBrebxkcoHofQ6ncWOrzXkVEKoSoSRAeXe3SKtRdsi8H9teuX
uXzs8fyk+Xrp9qBE3y541HcZCh8oLypQgTFoV3cZJgcsrnRaLQUooU2n1lvl+EZx
xoZnL1LBMmX/teVICE20NJOlJN25Z+Q26tNM6ADMFmmN0hDaHUEv/tlV+MSB01Mq
nBvC1/q6pHODHW1AsLfxwT2f+VeEz4Hpbxpu1fCkXQKBgQDyhzJgVkkKctCcMlcB
4fck+mxvQlRuWHs94RdMd28bZay5BQhd/rqicDzqIQ2NIhaPDDbUR5ElqLXQEDZD
6s+FeHbT+iXTOtWA6qQJL0/alcEL22Nxxv098nFrjUBeinaw+PbOZq5DOT4NVL/Q
i0lGzQs+6jEH9aYc24Tu9Gi5EwKBgQDOU1KQhAea1rztLkAbEI0giKM/6vf0g6im
1/UlUy8TjyIaE4Cwgsy/H6LuvY1KOiV/6boO3jBl5OyZZBFqIEbmEd3MH75XC0XP
bLtI00EVHU6jCf/dLE5wNhxhEuAw0KB12ecR7fZv1Wg9ltj/IR6dFBJ+Q7uuxufk
yq9R9QU5bwKBgB/Qdl5G01wIha8Ht3wqvTXfl9vccqDrAHe0kE7al/ubEdZPf7J8
2NS4LnV0EogCAb2QF50vKi4rfHYnukachc53Z/cUqGOWIy2/GfeOekYtQN6iT+A7
/zpiFFjMdbYxKbK7ZfzbYV62IpqzFFpx+xHLkf8Vz4rAwaKldUG3VAl7AoGASlef
gk7wZoxFWri1hIr8LuLM37UMTuA5npRl0mMcrVF/miG41uDqYVtG2/sUs9ArvuE6
lyzcB3rq/YIe/DxRD4kUf/5YGQkIyGqHOQBVjQQYV4q81LaoNKpqo1enzC8AAjbX
mZBCoZ0liDuYSKVoYHThDPne4GTvHXMipMdCcKUCgYEA5h0686KZxIHgxPx7A4Br
zHKigjFGda4C9xPWBdpjvLcbFgyc+ULzP61q+h2LnE+HEuFzVqMj56dlIJl73ooI
FyjJR9ceNDyZ37XzBF3IM5AaxdvPfB/OIjMOGS5yV+1hijcGdy+RgBGVADv4lgUd
soFdGXF8UUBPdYczZR2R7jQ=
-----END PRIVATE KEY-----
9 changes: 9 additions & 0 deletions tests/data/public_key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAw3e6lxHu3EeG9AxZrM0J
uTDRT5H/7iXeYumOMmtKa7bPbOJg3UvdKA1Le27y5eNov3gfktX8PVKZiq/DgVed
VaBT7MC1UsB4aOy42xrwUoCglt9bJGpYDpggIgpegaqoyO3iKsScd1dvhrVtfb9B
L2zyTbOIFpSyM59FmSeuR1jWcU6A/TXDNrmXiBcc4lBb0GL9Tc5Iofx1gPBcPEna
6tyTd0RvY4pCUh/zIrFNuSD+XoT+YOumpDwrnwQhHOmNjhOMAEO7GwTKX/+jy2Rx
glTJxPE1xgVJe5vVFcHcob3yBQTYRopaarUkIZaspq5KO0e7meseYECM/2M1Fvx6
PQIDAQAB
-----END PUBLIC KEY-----
18 changes: 18 additions & 0 deletions tests/test_outputs_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dissect.target.filesystem import VirtualFilesystem

from acquire.outputs import TarOutput
from acquire.tools.decrypter import EncryptedFile


@pytest.fixture(params=[(True, "gzip"), (True, "bzip2"), (True, "xz"), (False, None)])
Expand Down Expand Up @@ -41,3 +42,20 @@ def test_tar_output_write_entry(mock_fs: VirtualFilesystem, tar_output: TarOutpu
assert file.issym()
elif entry.is_file():
assert file.isfile()


def test_tar_output_encrypt(mock_fs: VirtualFilesystem, public_key: bytes, tmp_path: Path) -> None:
entry_name = "/foo/bar/some-file"
entry = mock_fs.get(entry_name)
tar_output = TarOutput(tmp_path, compress=True, compression_method="gzip", encrypt=True, public_key=public_key)
tar_output.write_entry(entry_name, entry)
tar_output.close()

encrypted_stream = EncryptedFile(tar_output.path.open("rb"), Path("tests/data/private_key.pem"))
decrypted_path = tmp_path / "decrypted.tar"
# Direct streaming is not an option because tarfile needs seek when reading from encrypted files directly
with open(decrypted_path, "wb") as f:
f.write(encrypted_stream.read())

tar_file = tarfile.open(name=decrypted_path, mode="r")
assert entry.open().read() == tar_file.extractfile(entry_name).read()
18 changes: 18 additions & 0 deletions tests/test_outputs_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dissect.target.filesystem import VirtualFilesystem

from acquire.outputs import ZipOutput
from acquire.tools.decrypter import EncryptedFile


@pytest.fixture(params=[(True, "deflate"), (True, "bzip2"), (True, "lzma"), (False, None)])
Expand Down Expand Up @@ -45,3 +46,20 @@ def test_zip_output_write_entry(mock_fs: VirtualFilesystem, zip_output: ZipOutpu
assert stat.S_ISLNK(file_type)
elif entry.is_file():
assert stat.S_ISREG(file_type)


def test_zip_output_encrypt(mock_fs: VirtualFilesystem, public_key: bytes, tmp_path: Path) -> None:
entry_name = "/foo/bar/some-file"
entry = mock_fs.get(entry_name)
zip_output = ZipOutput(tmp_path, compress=True, compression_method="bzip2", encrypt=True, public_key=public_key)
zip_output.write_entry(entry_name, entry)
zip_output.close()

encrypted_stream = EncryptedFile(zip_output.path.open("rb"), Path("tests/data/private_key.pem"))
decrypted_path = tmp_path / "decrypted.zip"
# Direct streaming is not an option because zipfile needs seek when reading from encrypted files directly
with open(decrypted_path, "wb") as f:
f.write(encrypted_stream.read())

zip_file = zipfile.ZipFile(decrypted_path, mode="r")
assert entry.open().read() == zip_file.open(entry_name).read()
Loading