Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New internal module "extract" #1918

Open
wants to merge 14 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions bbot/core/helpers/libmagic.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ def get_compression(mime_type):
"application/fictionbook2+zip": "zip", # FictionBook 2.0 (Zip)
"application/fictionbook3+zip": "zip", # FictionBook 3.0 (Zip)
"application/gzip": "gzip", # Gzip compressed file
"application/java-archive": "zip", # Java Archive (JAR)
"application/pak": "pak", # PAK archive
"application/vnd.android.package-archive": "zip", # Android package (APK)
"application/vnd.comicbook-rar": "rar", # Comic book archive (RAR)
"application/vnd.comicbook+zip": "zip", # Comic book archive (Zip)
"application/vnd.ms-cab-compressed": "cab", # Microsoft Cabinet archive
Expand Down
6 changes: 6 additions & 0 deletions bbot/modules/filedownload.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class filedownload(BaseModule):
"swp", # Swap File (temporary file, often Vim)
"sxw", # OpenOffice.org Writer document
"tar.gz", # Gzip-Compressed Tar Archive
"tgz", # Gzip-Compressed Tar Archive
"tar", # Tar Archive
"txt", # Plain Text Document
"vbs", # Visual Basic Script
Expand All @@ -74,6 +75,11 @@ class filedownload(BaseModule):
"yaml", # YAML Ain't Markup Language
"yml", # YAML Ain't Markup Language
"zip", # Zip Archive
"lzma", # LZMA Compressed File
"rar", # RAR Compressed File
"7z", # 7-Zip Compressed File
"xz", # XZ Compressed File
"bz2", # Bzip2 Compressed File
],
"max_filesize": "10MB",
"base_64_encoded_file": "false",
Expand Down
74 changes: 74 additions & 0 deletions bbot/modules/internal/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from pathlib import Path
from bbot.modules.internal.base import BaseInternalModule
from bbot.core.helpers.libmagic import get_magic_info, get_compression


class extract(BaseInternalModule):
watched_events = ["FILESYSTEM"]
produced_events = ["FILESYSTEM"]
flags = ["passive"]
meta = {
"description": "Extract different types of files into folders on the filesystem",
"created_date": "2024-12-08",
"author": "@domwhewell-sage",
}
deps_apt = ["7zip", "tar", "rar", "unrar", "gunzip"]

async def setup(self):
self.compression_methods = {
"zip": ["7z", "x", '-p""', "-aoa", "{filename}", "-o{extract_dir}/"],
"bzip2": ["tar", "--overwrite", "-xvjf", "{filename}", "-C", "{extract_dir}/"],
"xz": ["tar", "--overwrite", "-xvJf", "{filename}", "-C", "{extract_dir}/"],
"7z": ["7z", "x", '-p""', "-aoa", "{filename}", "-o{extract_dir}/"],
"rar": ["unrar", "x", "-o+", "-p-", "{filename}", "{extract_dir}/"],
"lzma": ["tar", "--overwrite", "--lzma", "-xvf", "{filename}", "-C", "{extract_dir}/"],
"tar": ["tar", "--overwrite", "-xvf", "{filename}", "-C", "{extract_dir}/"],
"gzip": ["tar", "--overwrite", "-xvzf", "{filename}", "-C", "{extract_dir}/"],
}
return True

async def filter_event(self, event):
if "file" in event.tags:
if not event.data["compression"] in self.compression_methods:
return False, f"Extract unable to handle file type: {event.data['compression']}, {event.data['path']}"
else:
return False, "Event is not a file"
return True

async def handle_event(self, event):
path = Path(event.data["path"])
output_dir = path.parent / path.name.replace(".", "_")

# Use the appropriate extraction method based on the file type
self.info(f"Extracting {path} to {output_dir}")
success = await self.extract_file(path, output_dir)

# If the extraction was successful, emit the event
if success:
await self.emit_event(
{"path": str(output_dir)},
"FILESYSTEM",
tags="folder",
parent=event,
context=f'extracted "{path}" to: {output_dir}',
)
else:
output_dir.rmdir()

async def extract_file(self, path, output_dir):
if not output_dir.exists():
self.helpers.mkdir(output_dir)
extension, mime_type, description, confidence = get_magic_info(path)
compression_format = get_compression(mime_type)
cmd_list = self.compression_methods.get(compression_format, [])
if cmd_list:
command = [s.format(filename=path, extract_dir=output_dir) for s in cmd_list]
try:
await self.run_process(command, check=True)
for item in output_dir.iterdir():
if item.is_file():
await self.extract_file(item, output_dir / item.stem)
except Exception as e:
self.warning(f"Error extracting {path}. Error: {e}")
return False
return True
6 changes: 3 additions & 3 deletions bbot/test/test_step_1/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,17 +326,17 @@ async def test_cli_args(monkeypatch, caplog, capsys, clean_default_config):
monkeypatch.setattr("sys.argv", ["bbot", "-y"])
result = await cli._main()
assert result is True
assert "Loaded 5/5 internal modules (aggregate,cloudcheck,dnsresolve,excavate,speculate)" in caplog.text
assert "Loaded 6/6 internal modules (aggregate,cloudcheck,dnsresolve,excavate,extract,speculate)" in caplog.text
caplog.clear()
monkeypatch.setattr("sys.argv", ["bbot", "-em", "excavate", "speculate", "-y"])
result = await cli._main()
assert result is True
assert "Loaded 3/3 internal modules (aggregate,cloudcheck,dnsresolve)" in caplog.text
assert "Loaded 4/4 internal modules (aggregate,cloudcheck,dnsresolve,extract)" in caplog.text
caplog.clear()
monkeypatch.setattr("sys.argv", ["bbot", "-c", "speculate=false", "-y"])
result = await cli._main()
assert result is True
assert "Loaded 4/4 internal modules (aggregate,cloudcheck,dnsresolve,excavate)" in caplog.text
assert "Loaded 5/5 internal modules (aggregate,cloudcheck,dnsresolve,excavate,extract)" in caplog.text

# custom target type
out, err = capsys.readouterr()
Expand Down
10 changes: 9 additions & 1 deletion bbot/test/test_step_1/test_presets.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,14 @@ def test_preset_module_resolution(clean_default_config):
# make sure we have the expected defaults
assert not preset.scan_modules
assert set(preset.output_modules) == {"python", "csv", "txt", "json"}
assert set(preset.internal_modules) == {"aggregate", "excavate", "speculate", "cloudcheck", "dnsresolve"}
assert set(preset.internal_modules) == {
"aggregate",
"excavate",
"extract",
"speculate",
"cloudcheck",
"dnsresolve",
}
assert preset.modules == set(preset.output_modules).union(set(preset.internal_modules))

# make sure dependency resolution works as expected
Expand Down Expand Up @@ -553,6 +560,7 @@ def test_preset_module_resolution(clean_default_config):
"dnsresolve",
"aggregate",
"excavate",
"extract",
"txt",
"httpx",
"csv",
Expand Down
210 changes: 210 additions & 0 deletions bbot/test/test_step_2/module_tests/test_module_extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import subprocess

from pathlib import Path
from .base import ModuleTestBase


class TestExtract(ModuleTestBase):
targets = ["http://127.0.0.1:8888"]
modules_overrides = ["filedownload", "httpx", "excavate", "speculate", "extract"]
temp_path = Path("/tmp/.bbot_test")

# Create a text file to compress
text_file = temp_path / "test.txt"
with open(text_file, "w") as f:
f.write("This is a test file")
zip_file = temp_path / "test.zip"
zip_zip_file = temp_path / "test_zip.zip"
bz2_file = temp_path / "test.bz2"
xz_file = temp_path / "test.xz"
zip7_file = temp_path / "test.7z"
rar_file = temp_path / "test.rar"
lzma_file = temp_path / "test.lzma"
tar_file = temp_path / "test.tar"
tgz_file = temp_path / "test.tgz"
commands = [
("7z", "a", '-p""', "-aoa", f"{zip_file}", f"{text_file}"),
("7z", "a", '-p""', "-aoa", f"{zip_zip_file}", f"{zip_file}"),
("tar", "-C", f"{temp_path}", "-cvjf", f"{bz2_file}", f"{text_file.name}"),
("tar", "-C", f"{temp_path}", "-cvJf", f"{xz_file}", f"{text_file.name}"),
("7z", "a", '-p""', "-aoa", f"{zip7_file}", f"{text_file}"),
("rar", "a", f"{rar_file}", f"{text_file}"),
("tar", "-C", f"{temp_path}", "--lzma", "-cvf", f"{lzma_file}", f"{text_file.name}"),
("tar", "-C", f"{temp_path}", "-cvf", f"{tar_file}", f"{text_file.name}"),
("tar", "-C", f"{temp_path}", "-cvzf", f"{tgz_file}", f"{text_file.name}"),
]

for command in commands:
subprocess.run(command, check=True)

async def setup_after_prep(self, module_test):
module_test.set_expect_requests(
dict(uri="/"),
dict(
response_data="""<a href="/test.zip">
<a href="/test-zip.zip">
<a href="/test.bz2">
<a href="/test.xz">
<a href="/test.7z">
<a href="/test.rar">
<a href="/test.lzma">
<a href="/test.tar">
<a href="/test.tgz">""",
),
)
module_test.set_expect_requests(
dict(uri="/test.zip"),
dict(
response_data=self.zip_file.read_bytes(),
headers={"Content-Type": "application/zip"},
),
),
module_test.set_expect_requests(
dict(uri="/test-zip.zip"),
dict(
response_data=self.zip_zip_file.read_bytes(),
headers={"Content-Type": "application/zip"},
),
),
module_test.set_expect_requests(
dict(uri="/test.bz2"),
dict(
response_data=self.bz2_file.read_bytes(),
headers={"Content-Type": "application/x-bzip2"},
),
),
module_test.set_expect_requests(
dict(uri="/test.xz"),
dict(
response_data=self.xz_file.read_bytes(),
headers={"Content-Type": "application/x-xz"},
),
),
module_test.set_expect_requests(
dict(uri="/test.7z"),
dict(
response_data=self.zip7_file.read_bytes(),
headers={"Content-Type": "application/x-7z-compressed"},
),
),
module_test.set_expect_requests(
dict(uri="/test.rar"),
dict(
response_data=self.zip7_file.read_bytes(),
headers={"Content-Type": "application/vnd.rar"},
),
),
module_test.set_expect_requests(
dict(uri="/test.lzma"),
dict(
response_data=self.lzma_file.read_bytes(),
headers={"Content-Type": "application/x-lzma"},
),
),
module_test.set_expect_requests(
dict(uri="/test.tar"),
dict(
response_data=self.tar_file.read_bytes(),
headers={"Content-Type": "application/x-tar"},
),
),
module_test.set_expect_requests(
dict(uri="/test.tgz"),
dict(
response_data=self.tgz_file.read_bytes(),
headers={"Content-Type": "application/x-tgz"},
),
),

def check(self, module_test, events):
filesystem_events = [e for e in events if e.type == "FILESYSTEM"]

# ZIP
zip_file_event = [e for e in filesystem_events if "test.zip" in e.data["path"]]
assert 1 == len(zip_file_event), "No zip file found"
file = Path(zip_file_event[0].data["path"])
assert file.is_file(), f"File not found at {file}"
extract_event = [e for e in filesystem_events if "test_zip" in e.data["path"] and "folder" in e.tags]
assert 1 == len(extract_event), "Failed to extract zip"
extract_path = Path(extract_event[0].data["path"]) / "test.txt"
assert extract_path.is_file(), "Failed to extract the test file"

# Recursive ZIP
zip_zip_file_event = [e for e in filesystem_events if "test-zip.zip" in e.data["path"]]
assert 1 == len(zip_zip_file_event), "No recursive file found"
file = Path(zip_zip_file_event[0].data["path"])
assert file.is_file(), f"File not found at {file}"
extract_event = [e for e in filesystem_events if "test-zip_zip" in e.data["path"] and "folder" in e.tags]
assert 1 == len(extract_event), "Failed to extract zip"
extract_path = Path(extract_event[0].data["path"]) / "test" / "test.txt"
assert extract_path.is_file(), "Failed to extract the test file"

# BZ2
bz2_file_event = [e for e in filesystem_events if "test.bz2" in e.data["path"]]
assert 1 == len(bz2_file_event), "No bz2 file found"
file = Path(bz2_file_event[0].data["path"])
assert file.is_file(), f"File not found at {file}"
extract_event = [e for e in filesystem_events if "test_bz2" in e.data["path"] and "folder" in e.tags]
assert 1 == len(extract_event), "Failed to extract bz2"
extract_path = Path(extract_event[0].data["path"]) / "test.txt"
assert extract_path.is_file(), "Failed to extract the test file"

# XZ
xz_file_event = [e for e in filesystem_events if "test.xz" in e.data["path"]]
assert 1 == len(xz_file_event), "No xz file found"
file = Path(xz_file_event[0].data["path"])
assert file.is_file(), f"File not found at {file}"
extract_event = [e for e in filesystem_events if "test_xz" in e.data["path"] and "folder" in e.tags]
assert 1 == len(extract_event), "Failed to extract xz"
extract_path = Path(extract_event[0].data["path"]) / "test.txt"
assert extract_path.is_file(), "Failed to extract the test file"

# 7z
zip7_file_event = [e for e in filesystem_events if "test.7z" in e.data["path"]]
assert 1 == len(zip7_file_event), "No 7z file found"
file = Path(zip7_file_event[0].data["path"])
assert file.is_file(), f"File not found at {file}"
extract_event = [e for e in filesystem_events if "test_7z" in e.data["path"] and "folder" in e.tags]
assert 1 == len(extract_event), "Failed to extract 7z"
extract_path = Path(extract_event[0].data["path"]) / "test.txt"
assert extract_path.is_file(), "Failed to extract the test file"

# RAR
rar_file_event = [e for e in filesystem_events if "test.rar" in e.data["path"]]
assert 1 == len(rar_file_event), "No rar file found"
file = Path(rar_file_event[0].data["path"])
assert file.is_file(), f"File not found at {file}"
extract_event = [e for e in filesystem_events if "test_rar" in e.data["path"] and "folder" in e.tags]
assert 1 == len(extract_event), "Failed to extract rar"
extract_path = Path(extract_event[0].data["path"]) / "test.txt"
assert extract_path.is_file(), "Failed to extract the test file"

# LZMA
lzma_file_event = [e for e in filesystem_events if "test.lzma" in e.data["path"]]
assert 1 == len(lzma_file_event), "No lzma file found"
file = Path(lzma_file_event[0].data["path"])
assert file.is_file(), f"File not found at {file}"
extract_event = [e for e in filesystem_events if "test_lzma" in e.data["path"] and "folder" in e.tags]
assert 1 == len(extract_event), "Failed to extract lzma"
extract_path = Path(extract_event[0].data["path"]) / "test.txt"
assert extract_path.is_file(), "Failed to extract the test file"

# TAR
tar_file_event = [e for e in filesystem_events if "test.tar" in e.data["path"]]
assert 1 == len(tar_file_event), "No tar file found"
file = Path(tar_file_event[0].data["path"])
assert file.is_file(), f"File not found at {file}"
extract_event = [e for e in filesystem_events if "test_tar" in e.data["path"] and "folder" in e.tags]
assert 1 == len(extract_event), "Failed to extract tar"
extract_path = Path(extract_event[0].data["path"]) / "test.txt"
assert extract_path.is_file(), "Failed to extract the test file"

# TGZ
tgz_file_event = [e for e in filesystem_events if "test.tgz" in e.data["path"]]
assert 1 == len(tgz_file_event), "No tgz file found"
file = Path(tgz_file_event[0].data["path"])
assert file.is_file(), f"File not found at {file}"
extract_event = [e for e in filesystem_events if "test_tgz" in e.data["path"] and "folder" in e.tags]
assert 1 == len(extract_event), "Failed to extract tgz"
extract_path = Path(extract_event[0].data["path"]) / "test.txt"
assert extract_path.is_file(), "Failed to extract the test file"
Loading