diff --git a/bbot/core/helpers/libmagic.py b/bbot/core/helpers/libmagic.py
index 37612f558..535c99c8c 100644
--- a/bbot/core/helpers/libmagic.py
+++ b/bbot/core/helpers/libmagic.py
@@ -20,9 +20,7 @@ def get_compression(mime_type):
"application/fictionbook2+zip": "zip", # FictionBook 2.0 (Zip)
"application/fictionbook3+zip": "zip", # FictionBook 3.0 (Zip)
"application/gzip": "gzip", # Gzip compressed file
- "application/java-archive": "zip", # Java Archive (JAR)
"application/pak": "pak", # PAK archive
- "application/vnd.android.package-archive": "zip", # Android package (APK)
"application/vnd.comicbook-rar": "rar", # Comic book archive (RAR)
"application/vnd.comicbook+zip": "zip", # Comic book archive (Zip)
"application/vnd.ms-cab-compressed": "cab", # Microsoft Cabinet archive
diff --git a/bbot/modules/filedownload.py b/bbot/modules/filedownload.py
index 35287252b..0cb446dee 100644
--- a/bbot/modules/filedownload.py
+++ b/bbot/modules/filedownload.py
@@ -63,6 +63,7 @@ class filedownload(BaseModule):
"swp", # Swap File (temporary file, often Vim)
"sxw", # OpenOffice.org Writer document
"tar.gz", # Gzip-Compressed Tar Archive
+ "tgz", # Gzip-Compressed Tar Archive
"tar", # Tar Archive
"txt", # Plain Text Document
"vbs", # Visual Basic Script
@@ -74,6 +75,11 @@ class filedownload(BaseModule):
"yaml", # YAML Ain't Markup Language
"yml", # YAML Ain't Markup Language
"zip", # Zip Archive
+ "lzma", # LZMA Compressed File
+ "rar", # RAR Compressed File
+ "7z", # 7-Zip Compressed File
+ "xz", # XZ Compressed File
+ "bz2", # Bzip2 Compressed File
],
"max_filesize": "10MB",
"base_64_encoded_file": "false",
diff --git a/bbot/modules/internal/extract.py b/bbot/modules/internal/extract.py
new file mode 100644
index 000000000..259e03822
--- /dev/null
+++ b/bbot/modules/internal/extract.py
@@ -0,0 +1,74 @@
+from pathlib import Path
+from bbot.modules.internal.base import BaseInternalModule
+from bbot.core.helpers.libmagic import get_magic_info, get_compression
+
+
+class extract(BaseInternalModule):
+ watched_events = ["FILESYSTEM"]
+ produced_events = ["FILESYSTEM"]
+ flags = ["passive"]
+ meta = {
+ "description": "Extract different types of files into folders on the filesystem",
+ "created_date": "2024-12-08",
+ "author": "@domwhewell-sage",
+ }
+ deps_apt = ["7zip", "tar", "rar", "unrar", "gunzip"]
+
+ async def setup(self):
+ self.compression_methods = {
+ "zip": ["7z", "x", '-p""', "-aoa", "{filename}", "-o{extract_dir}/"],
+ "bzip2": ["tar", "--overwrite", "-xvjf", "{filename}", "-C", "{extract_dir}/"],
+ "xz": ["tar", "--overwrite", "-xvJf", "{filename}", "-C", "{extract_dir}/"],
+ "7z": ["7z", "x", '-p""', "-aoa", "{filename}", "-o{extract_dir}/"],
+ "rar": ["unrar", "x", "-o+", "-p-", "{filename}", "{extract_dir}/"],
+ "lzma": ["tar", "--overwrite", "--lzma", "-xvf", "{filename}", "-C", "{extract_dir}/"],
+ "tar": ["tar", "--overwrite", "-xvf", "{filename}", "-C", "{extract_dir}/"],
+ "gzip": ["tar", "--overwrite", "-xvzf", "{filename}", "-C", "{extract_dir}/"],
+ }
+ return True
+
+ async def filter_event(self, event):
+ if "file" in event.tags:
+ if not event.data["compression"] in self.compression_methods:
+ return False, f"Extract unable to handle file type: {event.data['compression']}, {event.data['path']}"
+ else:
+ return False, "Event is not a file"
+ return True
+
+ async def handle_event(self, event):
+ path = Path(event.data["path"])
+ output_dir = path.parent / path.name.replace(".", "_")
+
+ # Use the appropriate extraction method based on the file type
+ self.info(f"Extracting {path} to {output_dir}")
+ success = await self.extract_file(path, output_dir)
+
+ # If the extraction was successful, emit the event
+ if success:
+ await self.emit_event(
+ {"path": str(output_dir)},
+ "FILESYSTEM",
+ tags="folder",
+ parent=event,
+ context=f'extracted "{path}" to: {output_dir}',
+ )
+ else:
+ output_dir.rmdir()
+
+ async def extract_file(self, path, output_dir):
+ if not output_dir.exists():
+ self.helpers.mkdir(output_dir)
+ extension, mime_type, description, confidence = get_magic_info(path)
+ compression_format = get_compression(mime_type)
+ cmd_list = self.compression_methods.get(compression_format, [])
+ if cmd_list:
+ command = [s.format(filename=path, extract_dir=output_dir) for s in cmd_list]
+ try:
+ await self.run_process(command, check=True)
+ for item in output_dir.iterdir():
+ if item.is_file():
+ await self.extract_file(item, output_dir / item.stem)
+ except Exception as e:
+ self.warning(f"Error extracting {path}. Error: {e}")
+ return False
+ return True
diff --git a/bbot/test/test_step_1/test_cli.py b/bbot/test/test_step_1/test_cli.py
index e48040e98..26aca1064 100644
--- a/bbot/test/test_step_1/test_cli.py
+++ b/bbot/test/test_step_1/test_cli.py
@@ -326,17 +326,17 @@ async def test_cli_args(monkeypatch, caplog, capsys, clean_default_config):
monkeypatch.setattr("sys.argv", ["bbot", "-y"])
result = await cli._main()
assert result is True
- assert "Loaded 5/5 internal modules (aggregate,cloudcheck,dnsresolve,excavate,speculate)" in caplog.text
+ assert "Loaded 6/6 internal modules (aggregate,cloudcheck,dnsresolve,excavate,extract,speculate)" in caplog.text
caplog.clear()
monkeypatch.setattr("sys.argv", ["bbot", "-em", "excavate", "speculate", "-y"])
result = await cli._main()
assert result is True
- assert "Loaded 3/3 internal modules (aggregate,cloudcheck,dnsresolve)" in caplog.text
+ assert "Loaded 4/4 internal modules (aggregate,cloudcheck,dnsresolve,extract)" in caplog.text
caplog.clear()
monkeypatch.setattr("sys.argv", ["bbot", "-c", "speculate=false", "-y"])
result = await cli._main()
assert result is True
- assert "Loaded 4/4 internal modules (aggregate,cloudcheck,dnsresolve,excavate)" in caplog.text
+ assert "Loaded 5/5 internal modules (aggregate,cloudcheck,dnsresolve,excavate,extract)" in caplog.text
# custom target type
out, err = capsys.readouterr()
diff --git a/bbot/test/test_step_1/test_presets.py b/bbot/test/test_step_1/test_presets.py
index 5b1564f12..43f571e13 100644
--- a/bbot/test/test_step_1/test_presets.py
+++ b/bbot/test/test_step_1/test_presets.py
@@ -493,7 +493,14 @@ def test_preset_module_resolution(clean_default_config):
# make sure we have the expected defaults
assert not preset.scan_modules
assert set(preset.output_modules) == {"python", "csv", "txt", "json"}
- assert set(preset.internal_modules) == {"aggregate", "excavate", "speculate", "cloudcheck", "dnsresolve"}
+ assert set(preset.internal_modules) == {
+ "aggregate",
+ "excavate",
+ "extract",
+ "speculate",
+ "cloudcheck",
+ "dnsresolve",
+ }
assert preset.modules == set(preset.output_modules).union(set(preset.internal_modules))
# make sure dependency resolution works as expected
@@ -553,6 +560,7 @@ def test_preset_module_resolution(clean_default_config):
"dnsresolve",
"aggregate",
"excavate",
+ "extract",
"txt",
"httpx",
"csv",
diff --git a/bbot/test/test_step_2/module_tests/test_module_extract.py b/bbot/test/test_step_2/module_tests/test_module_extract.py
new file mode 100644
index 000000000..15d1e785b
--- /dev/null
+++ b/bbot/test/test_step_2/module_tests/test_module_extract.py
@@ -0,0 +1,210 @@
+import subprocess
+
+from pathlib import Path
+from .base import ModuleTestBase
+
+
+class TestExtract(ModuleTestBase):
+ targets = ["http://127.0.0.1:8888"]
+ modules_overrides = ["filedownload", "httpx", "excavate", "speculate", "extract"]
+ temp_path = Path("/tmp/.bbot_test")
+
+ # Create a text file to compress
+ text_file = temp_path / "test.txt"
+ with open(text_file, "w") as f:
+ f.write("This is a test file")
+ zip_file = temp_path / "test.zip"
+ zip_zip_file = temp_path / "test_zip.zip"
+ bz2_file = temp_path / "test.bz2"
+ xz_file = temp_path / "test.xz"
+ zip7_file = temp_path / "test.7z"
+ rar_file = temp_path / "test.rar"
+ lzma_file = temp_path / "test.lzma"
+ tar_file = temp_path / "test.tar"
+ tgz_file = temp_path / "test.tgz"
+ commands = [
+ ("7z", "a", '-p""', "-aoa", f"{zip_file}", f"{text_file}"),
+ ("7z", "a", '-p""', "-aoa", f"{zip_zip_file}", f"{zip_file}"),
+ ("tar", "-C", f"{temp_path}", "-cvjf", f"{bz2_file}", f"{text_file.name}"),
+ ("tar", "-C", f"{temp_path}", "-cvJf", f"{xz_file}", f"{text_file.name}"),
+ ("7z", "a", '-p""', "-aoa", f"{zip7_file}", f"{text_file}"),
+ ("rar", "a", f"{rar_file}", f"{text_file}"),
+ ("tar", "-C", f"{temp_path}", "--lzma", "-cvf", f"{lzma_file}", f"{text_file.name}"),
+ ("tar", "-C", f"{temp_path}", "-cvf", f"{tar_file}", f"{text_file.name}"),
+ ("tar", "-C", f"{temp_path}", "-cvzf", f"{tgz_file}", f"{text_file.name}"),
+ ]
+
+ for command in commands:
+ subprocess.run(command, check=True)
+
+ async def setup_after_prep(self, module_test):
+ module_test.set_expect_requests(
+ dict(uri="/"),
+ dict(
+ response_data="""
+
+
+
+
+
+
+
+ """,
+ ),
+ )
+ module_test.set_expect_requests(
+ dict(uri="/test.zip"),
+ dict(
+ response_data=self.zip_file.read_bytes(),
+ headers={"Content-Type": "application/zip"},
+ ),
+ ),
+ module_test.set_expect_requests(
+ dict(uri="/test-zip.zip"),
+ dict(
+ response_data=self.zip_zip_file.read_bytes(),
+ headers={"Content-Type": "application/zip"},
+ ),
+ ),
+ module_test.set_expect_requests(
+ dict(uri="/test.bz2"),
+ dict(
+ response_data=self.bz2_file.read_bytes(),
+ headers={"Content-Type": "application/x-bzip2"},
+ ),
+ ),
+ module_test.set_expect_requests(
+ dict(uri="/test.xz"),
+ dict(
+ response_data=self.xz_file.read_bytes(),
+ headers={"Content-Type": "application/x-xz"},
+ ),
+ ),
+ module_test.set_expect_requests(
+ dict(uri="/test.7z"),
+ dict(
+ response_data=self.zip7_file.read_bytes(),
+ headers={"Content-Type": "application/x-7z-compressed"},
+ ),
+ ),
+ module_test.set_expect_requests(
+ dict(uri="/test.rar"),
+ dict(
+ response_data=self.zip7_file.read_bytes(),
+ headers={"Content-Type": "application/vnd.rar"},
+ ),
+ ),
+ module_test.set_expect_requests(
+ dict(uri="/test.lzma"),
+ dict(
+ response_data=self.lzma_file.read_bytes(),
+ headers={"Content-Type": "application/x-lzma"},
+ ),
+ ),
+ module_test.set_expect_requests(
+ dict(uri="/test.tar"),
+ dict(
+ response_data=self.tar_file.read_bytes(),
+ headers={"Content-Type": "application/x-tar"},
+ ),
+ ),
+ module_test.set_expect_requests(
+ dict(uri="/test.tgz"),
+ dict(
+ response_data=self.tgz_file.read_bytes(),
+ headers={"Content-Type": "application/x-tgz"},
+ ),
+ ),
+
+ def check(self, module_test, events):
+ filesystem_events = [e for e in events if e.type == "FILESYSTEM"]
+
+ # ZIP
+ zip_file_event = [e for e in filesystem_events if "test.zip" in e.data["path"]]
+ assert 1 == len(zip_file_event), "No zip file found"
+ file = Path(zip_file_event[0].data["path"])
+ assert file.is_file(), f"File not found at {file}"
+ extract_event = [e for e in filesystem_events if "test_zip" in e.data["path"] and "folder" in e.tags]
+ assert 1 == len(extract_event), "Failed to extract zip"
+ extract_path = Path(extract_event[0].data["path"]) / "test.txt"
+ assert extract_path.is_file(), "Failed to extract the test file"
+
+ # Recursive ZIP
+ zip_zip_file_event = [e for e in filesystem_events if "test-zip.zip" in e.data["path"]]
+ assert 1 == len(zip_zip_file_event), "No recursive file found"
+ file = Path(zip_zip_file_event[0].data["path"])
+ assert file.is_file(), f"File not found at {file}"
+ extract_event = [e for e in filesystem_events if "test-zip_zip" in e.data["path"] and "folder" in e.tags]
+ assert 1 == len(extract_event), "Failed to extract zip"
+ extract_path = Path(extract_event[0].data["path"]) / "test" / "test.txt"
+ assert extract_path.is_file(), "Failed to extract the test file"
+
+ # BZ2
+ bz2_file_event = [e for e in filesystem_events if "test.bz2" in e.data["path"]]
+ assert 1 == len(bz2_file_event), "No bz2 file found"
+ file = Path(bz2_file_event[0].data["path"])
+ assert file.is_file(), f"File not found at {file}"
+ extract_event = [e for e in filesystem_events if "test_bz2" in e.data["path"] and "folder" in e.tags]
+ assert 1 == len(extract_event), "Failed to extract bz2"
+ extract_path = Path(extract_event[0].data["path"]) / "test.txt"
+ assert extract_path.is_file(), "Failed to extract the test file"
+
+ # XZ
+ xz_file_event = [e for e in filesystem_events if "test.xz" in e.data["path"]]
+ assert 1 == len(xz_file_event), "No xz file found"
+ file = Path(xz_file_event[0].data["path"])
+ assert file.is_file(), f"File not found at {file}"
+ extract_event = [e for e in filesystem_events if "test_xz" in e.data["path"] and "folder" in e.tags]
+ assert 1 == len(extract_event), "Failed to extract xz"
+ extract_path = Path(extract_event[0].data["path"]) / "test.txt"
+ assert extract_path.is_file(), "Failed to extract the test file"
+
+ # 7z
+ zip7_file_event = [e for e in filesystem_events if "test.7z" in e.data["path"]]
+ assert 1 == len(zip7_file_event), "No 7z file found"
+ file = Path(zip7_file_event[0].data["path"])
+ assert file.is_file(), f"File not found at {file}"
+ extract_event = [e for e in filesystem_events if "test_7z" in e.data["path"] and "folder" in e.tags]
+ assert 1 == len(extract_event), "Failed to extract 7z"
+ extract_path = Path(extract_event[0].data["path"]) / "test.txt"
+ assert extract_path.is_file(), "Failed to extract the test file"
+
+ # RAR
+ rar_file_event = [e for e in filesystem_events if "test.rar" in e.data["path"]]
+ assert 1 == len(rar_file_event), "No rar file found"
+ file = Path(rar_file_event[0].data["path"])
+ assert file.is_file(), f"File not found at {file}"
+ extract_event = [e for e in filesystem_events if "test_rar" in e.data["path"] and "folder" in e.tags]
+ assert 1 == len(extract_event), "Failed to extract rar"
+ extract_path = Path(extract_event[0].data["path"]) / "test.txt"
+ assert extract_path.is_file(), "Failed to extract the test file"
+
+ # LZMA
+ lzma_file_event = [e for e in filesystem_events if "test.lzma" in e.data["path"]]
+ assert 1 == len(lzma_file_event), "No lzma file found"
+ file = Path(lzma_file_event[0].data["path"])
+ assert file.is_file(), f"File not found at {file}"
+ extract_event = [e for e in filesystem_events if "test_lzma" in e.data["path"] and "folder" in e.tags]
+ assert 1 == len(extract_event), "Failed to extract lzma"
+ extract_path = Path(extract_event[0].data["path"]) / "test.txt"
+ assert extract_path.is_file(), "Failed to extract the test file"
+
+ # TAR
+ tar_file_event = [e for e in filesystem_events if "test.tar" in e.data["path"]]
+ assert 1 == len(tar_file_event), "No tar file found"
+ file = Path(tar_file_event[0].data["path"])
+ assert file.is_file(), f"File not found at {file}"
+ extract_event = [e for e in filesystem_events if "test_tar" in e.data["path"] and "folder" in e.tags]
+ assert 1 == len(extract_event), "Failed to extract tar"
+ extract_path = Path(extract_event[0].data["path"]) / "test.txt"
+ assert extract_path.is_file(), "Failed to extract the test file"
+
+ # TGZ
+ tgz_file_event = [e for e in filesystem_events if "test.tgz" in e.data["path"]]
+ assert 1 == len(tgz_file_event), "No tgz file found"
+ file = Path(tgz_file_event[0].data["path"])
+ assert file.is_file(), f"File not found at {file}"
+ extract_event = [e for e in filesystem_events if "test_tgz" in e.data["path"] and "folder" in e.tags]
+ assert 1 == len(extract_event), "Failed to extract tgz"
+ extract_path = Path(extract_event[0].data["path"]) / "test.txt"
+ assert extract_path.is_file(), "Failed to extract the test file"