diff --git a/bbot/core/helpers/libmagic.py b/bbot/core/helpers/libmagic.py index 37612f558..535c99c8c 100644 --- a/bbot/core/helpers/libmagic.py +++ b/bbot/core/helpers/libmagic.py @@ -20,9 +20,7 @@ def get_compression(mime_type): "application/fictionbook2+zip": "zip", # FictionBook 2.0 (Zip) "application/fictionbook3+zip": "zip", # FictionBook 3.0 (Zip) "application/gzip": "gzip", # Gzip compressed file - "application/java-archive": "zip", # Java Archive (JAR) "application/pak": "pak", # PAK archive - "application/vnd.android.package-archive": "zip", # Android package (APK) "application/vnd.comicbook-rar": "rar", # Comic book archive (RAR) "application/vnd.comicbook+zip": "zip", # Comic book archive (Zip) "application/vnd.ms-cab-compressed": "cab", # Microsoft Cabinet archive diff --git a/bbot/modules/filedownload.py b/bbot/modules/filedownload.py index 35287252b..0cb446dee 100644 --- a/bbot/modules/filedownload.py +++ b/bbot/modules/filedownload.py @@ -63,6 +63,7 @@ class filedownload(BaseModule): "swp", # Swap File (temporary file, often Vim) "sxw", # OpenOffice.org Writer document "tar.gz", # Gzip-Compressed Tar Archive + "tgz", # Gzip-Compressed Tar Archive "tar", # Tar Archive "txt", # Plain Text Document "vbs", # Visual Basic Script @@ -74,6 +75,11 @@ class filedownload(BaseModule): "yaml", # YAML Ain't Markup Language "yml", # YAML Ain't Markup Language "zip", # Zip Archive + "lzma", # LZMA Compressed File + "rar", # RAR Compressed File + "7z", # 7-Zip Compressed File + "xz", # XZ Compressed File + "bz2", # Bzip2 Compressed File ], "max_filesize": "10MB", "base_64_encoded_file": "false", diff --git a/bbot/modules/internal/extract.py b/bbot/modules/internal/extract.py new file mode 100644 index 000000000..259e03822 --- /dev/null +++ b/bbot/modules/internal/extract.py @@ -0,0 +1,74 @@ +from pathlib import Path +from bbot.modules.internal.base import BaseInternalModule +from bbot.core.helpers.libmagic import get_magic_info, get_compression + + +class extract(BaseInternalModule): + watched_events = ["FILESYSTEM"] + produced_events = ["FILESYSTEM"] + flags = ["passive"] + meta = { + "description": "Extract different types of files into folders on the filesystem", + "created_date": "2024-12-08", + "author": "@domwhewell-sage", + } + deps_apt = ["7zip", "tar", "rar", "unrar", "gunzip"] + + async def setup(self): + self.compression_methods = { + "zip": ["7z", "x", '-p""', "-aoa", "{filename}", "-o{extract_dir}/"], + "bzip2": ["tar", "--overwrite", "-xvjf", "{filename}", "-C", "{extract_dir}/"], + "xz": ["tar", "--overwrite", "-xvJf", "{filename}", "-C", "{extract_dir}/"], + "7z": ["7z", "x", '-p""', "-aoa", "{filename}", "-o{extract_dir}/"], + "rar": ["unrar", "x", "-o+", "-p-", "{filename}", "{extract_dir}/"], + "lzma": ["tar", "--overwrite", "--lzma", "-xvf", "{filename}", "-C", "{extract_dir}/"], + "tar": ["tar", "--overwrite", "-xvf", "{filename}", "-C", "{extract_dir}/"], + "gzip": ["tar", "--overwrite", "-xvzf", "{filename}", "-C", "{extract_dir}/"], + } + return True + + async def filter_event(self, event): + if "file" in event.tags: + if not event.data["compression"] in self.compression_methods: + return False, f"Extract unable to handle file type: {event.data['compression']}, {event.data['path']}" + else: + return False, "Event is not a file" + return True + + async def handle_event(self, event): + path = Path(event.data["path"]) + output_dir = path.parent / path.name.replace(".", "_") + + # Use the appropriate extraction method based on the file type + self.info(f"Extracting {path} to {output_dir}") + success = await self.extract_file(path, output_dir) + + # If the extraction was successful, emit the event + if success: + await self.emit_event( + {"path": str(output_dir)}, + "FILESYSTEM", + tags="folder", + parent=event, + context=f'extracted "{path}" to: {output_dir}', + ) + else: + output_dir.rmdir() + + async def extract_file(self, path, output_dir): + if not output_dir.exists(): + self.helpers.mkdir(output_dir) + extension, mime_type, description, confidence = get_magic_info(path) + compression_format = get_compression(mime_type) + cmd_list = self.compression_methods.get(compression_format, []) + if cmd_list: + command = [s.format(filename=path, extract_dir=output_dir) for s in cmd_list] + try: + await self.run_process(command, check=True) + for item in output_dir.iterdir(): + if item.is_file(): + await self.extract_file(item, output_dir / item.stem) + except Exception as e: + self.warning(f"Error extracting {path}. Error: {e}") + return False + return True diff --git a/bbot/test/test_step_1/test_cli.py b/bbot/test/test_step_1/test_cli.py index e48040e98..26aca1064 100644 --- a/bbot/test/test_step_1/test_cli.py +++ b/bbot/test/test_step_1/test_cli.py @@ -326,17 +326,17 @@ async def test_cli_args(monkeypatch, caplog, capsys, clean_default_config): monkeypatch.setattr("sys.argv", ["bbot", "-y"]) result = await cli._main() assert result is True - assert "Loaded 5/5 internal modules (aggregate,cloudcheck,dnsresolve,excavate,speculate)" in caplog.text + assert "Loaded 6/6 internal modules (aggregate,cloudcheck,dnsresolve,excavate,extract,speculate)" in caplog.text caplog.clear() monkeypatch.setattr("sys.argv", ["bbot", "-em", "excavate", "speculate", "-y"]) result = await cli._main() assert result is True - assert "Loaded 3/3 internal modules (aggregate,cloudcheck,dnsresolve)" in caplog.text + assert "Loaded 4/4 internal modules (aggregate,cloudcheck,dnsresolve,extract)" in caplog.text caplog.clear() monkeypatch.setattr("sys.argv", ["bbot", "-c", "speculate=false", "-y"]) result = await cli._main() assert result is True - assert "Loaded 4/4 internal modules (aggregate,cloudcheck,dnsresolve,excavate)" in caplog.text + assert "Loaded 5/5 internal modules (aggregate,cloudcheck,dnsresolve,excavate,extract)" in caplog.text # custom target type out, err = capsys.readouterr() diff --git a/bbot/test/test_step_1/test_presets.py b/bbot/test/test_step_1/test_presets.py index 5b1564f12..43f571e13 100644 --- a/bbot/test/test_step_1/test_presets.py +++ b/bbot/test/test_step_1/test_presets.py @@ -493,7 +493,14 @@ def test_preset_module_resolution(clean_default_config): # make sure we have the expected defaults assert not preset.scan_modules assert set(preset.output_modules) == {"python", "csv", "txt", "json"} - assert set(preset.internal_modules) == {"aggregate", "excavate", "speculate", "cloudcheck", "dnsresolve"} + assert set(preset.internal_modules) == { + "aggregate", + "excavate", + "extract", + "speculate", + "cloudcheck", + "dnsresolve", + } assert preset.modules == set(preset.output_modules).union(set(preset.internal_modules)) # make sure dependency resolution works as expected @@ -553,6 +560,7 @@ def test_preset_module_resolution(clean_default_config): "dnsresolve", "aggregate", "excavate", + "extract", "txt", "httpx", "csv", diff --git a/bbot/test/test_step_2/module_tests/test_module_extract.py b/bbot/test/test_step_2/module_tests/test_module_extract.py new file mode 100644 index 000000000..15d1e785b --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_extract.py @@ -0,0 +1,210 @@ +import subprocess + +from pathlib import Path +from .base import ModuleTestBase + + +class TestExtract(ModuleTestBase): + targets = ["http://127.0.0.1:8888"] + modules_overrides = ["filedownload", "httpx", "excavate", "speculate", "extract"] + temp_path = Path("/tmp/.bbot_test") + + # Create a text file to compress + text_file = temp_path / "test.txt" + with open(text_file, "w") as f: + f.write("This is a test file") + zip_file = temp_path / "test.zip" + zip_zip_file = temp_path / "test_zip.zip" + bz2_file = temp_path / "test.bz2" + xz_file = temp_path / "test.xz" + zip7_file = temp_path / "test.7z" + rar_file = temp_path / "test.rar" + lzma_file = temp_path / "test.lzma" + tar_file = temp_path / "test.tar" + tgz_file = temp_path / "test.tgz" + commands = [ + ("7z", "a", '-p""', "-aoa", f"{zip_file}", f"{text_file}"), + ("7z", "a", '-p""', "-aoa", f"{zip_zip_file}", f"{zip_file}"), + ("tar", "-C", f"{temp_path}", "-cvjf", f"{bz2_file}", f"{text_file.name}"), + ("tar", "-C", f"{temp_path}", "-cvJf", f"{xz_file}", f"{text_file.name}"), + ("7z", "a", '-p""', "-aoa", f"{zip7_file}", f"{text_file}"), + ("rar", "a", f"{rar_file}", f"{text_file}"), + ("tar", "-C", f"{temp_path}", "--lzma", "-cvf", f"{lzma_file}", f"{text_file.name}"), + ("tar", "-C", f"{temp_path}", "-cvf", f"{tar_file}", f"{text_file.name}"), + ("tar", "-C", f"{temp_path}", "-cvzf", f"{tgz_file}", f"{text_file.name}"), + ] + + for command in commands: + subprocess.run(command, check=True) + + async def setup_after_prep(self, module_test): + module_test.set_expect_requests( + dict(uri="/"), + dict( + response_data=""" + + + + + + + + """, + ), + ) + module_test.set_expect_requests( + dict(uri="/test.zip"), + dict( + response_data=self.zip_file.read_bytes(), + headers={"Content-Type": "application/zip"}, + ), + ), + module_test.set_expect_requests( + dict(uri="/test-zip.zip"), + dict( + response_data=self.zip_zip_file.read_bytes(), + headers={"Content-Type": "application/zip"}, + ), + ), + module_test.set_expect_requests( + dict(uri="/test.bz2"), + dict( + response_data=self.bz2_file.read_bytes(), + headers={"Content-Type": "application/x-bzip2"}, + ), + ), + module_test.set_expect_requests( + dict(uri="/test.xz"), + dict( + response_data=self.xz_file.read_bytes(), + headers={"Content-Type": "application/x-xz"}, + ), + ), + module_test.set_expect_requests( + dict(uri="/test.7z"), + dict( + response_data=self.zip7_file.read_bytes(), + headers={"Content-Type": "application/x-7z-compressed"}, + ), + ), + module_test.set_expect_requests( + dict(uri="/test.rar"), + dict( + response_data=self.zip7_file.read_bytes(), + headers={"Content-Type": "application/vnd.rar"}, + ), + ), + module_test.set_expect_requests( + dict(uri="/test.lzma"), + dict( + response_data=self.lzma_file.read_bytes(), + headers={"Content-Type": "application/x-lzma"}, + ), + ), + module_test.set_expect_requests( + dict(uri="/test.tar"), + dict( + response_data=self.tar_file.read_bytes(), + headers={"Content-Type": "application/x-tar"}, + ), + ), + module_test.set_expect_requests( + dict(uri="/test.tgz"), + dict( + response_data=self.tgz_file.read_bytes(), + headers={"Content-Type": "application/x-tgz"}, + ), + ), + + def check(self, module_test, events): + filesystem_events = [e for e in events if e.type == "FILESYSTEM"] + + # ZIP + zip_file_event = [e for e in filesystem_events if "test.zip" in e.data["path"]] + assert 1 == len(zip_file_event), "No zip file found" + file = Path(zip_file_event[0].data["path"]) + assert file.is_file(), f"File not found at {file}" + extract_event = [e for e in filesystem_events if "test_zip" in e.data["path"] and "folder" in e.tags] + assert 1 == len(extract_event), "Failed to extract zip" + extract_path = Path(extract_event[0].data["path"]) / "test.txt" + assert extract_path.is_file(), "Failed to extract the test file" + + # Recursive ZIP + zip_zip_file_event = [e for e in filesystem_events if "test-zip.zip" in e.data["path"]] + assert 1 == len(zip_zip_file_event), "No recursive file found" + file = Path(zip_zip_file_event[0].data["path"]) + assert file.is_file(), f"File not found at {file}" + extract_event = [e for e in filesystem_events if "test-zip_zip" in e.data["path"] and "folder" in e.tags] + assert 1 == len(extract_event), "Failed to extract zip" + extract_path = Path(extract_event[0].data["path"]) / "test" / "test.txt" + assert extract_path.is_file(), "Failed to extract the test file" + + # BZ2 + bz2_file_event = [e for e in filesystem_events if "test.bz2" in e.data["path"]] + assert 1 == len(bz2_file_event), "No bz2 file found" + file = Path(bz2_file_event[0].data["path"]) + assert file.is_file(), f"File not found at {file}" + extract_event = [e for e in filesystem_events if "test_bz2" in e.data["path"] and "folder" in e.tags] + assert 1 == len(extract_event), "Failed to extract bz2" + extract_path = Path(extract_event[0].data["path"]) / "test.txt" + assert extract_path.is_file(), "Failed to extract the test file" + + # XZ + xz_file_event = [e for e in filesystem_events if "test.xz" in e.data["path"]] + assert 1 == len(xz_file_event), "No xz file found" + file = Path(xz_file_event[0].data["path"]) + assert file.is_file(), f"File not found at {file}" + extract_event = [e for e in filesystem_events if "test_xz" in e.data["path"] and "folder" in e.tags] + assert 1 == len(extract_event), "Failed to extract xz" + extract_path = Path(extract_event[0].data["path"]) / "test.txt" + assert extract_path.is_file(), "Failed to extract the test file" + + # 7z + zip7_file_event = [e for e in filesystem_events if "test.7z" in e.data["path"]] + assert 1 == len(zip7_file_event), "No 7z file found" + file = Path(zip7_file_event[0].data["path"]) + assert file.is_file(), f"File not found at {file}" + extract_event = [e for e in filesystem_events if "test_7z" in e.data["path"] and "folder" in e.tags] + assert 1 == len(extract_event), "Failed to extract 7z" + extract_path = Path(extract_event[0].data["path"]) / "test.txt" + assert extract_path.is_file(), "Failed to extract the test file" + + # RAR + rar_file_event = [e for e in filesystem_events if "test.rar" in e.data["path"]] + assert 1 == len(rar_file_event), "No rar file found" + file = Path(rar_file_event[0].data["path"]) + assert file.is_file(), f"File not found at {file}" + extract_event = [e for e in filesystem_events if "test_rar" in e.data["path"] and "folder" in e.tags] + assert 1 == len(extract_event), "Failed to extract rar" + extract_path = Path(extract_event[0].data["path"]) / "test.txt" + assert extract_path.is_file(), "Failed to extract the test file" + + # LZMA + lzma_file_event = [e for e in filesystem_events if "test.lzma" in e.data["path"]] + assert 1 == len(lzma_file_event), "No lzma file found" + file = Path(lzma_file_event[0].data["path"]) + assert file.is_file(), f"File not found at {file}" + extract_event = [e for e in filesystem_events if "test_lzma" in e.data["path"] and "folder" in e.tags] + assert 1 == len(extract_event), "Failed to extract lzma" + extract_path = Path(extract_event[0].data["path"]) / "test.txt" + assert extract_path.is_file(), "Failed to extract the test file" + + # TAR + tar_file_event = [e for e in filesystem_events if "test.tar" in e.data["path"]] + assert 1 == len(tar_file_event), "No tar file found" + file = Path(tar_file_event[0].data["path"]) + assert file.is_file(), f"File not found at {file}" + extract_event = [e for e in filesystem_events if "test_tar" in e.data["path"] and "folder" in e.tags] + assert 1 == len(extract_event), "Failed to extract tar" + extract_path = Path(extract_event[0].data["path"]) / "test.txt" + assert extract_path.is_file(), "Failed to extract the test file" + + # TGZ + tgz_file_event = [e for e in filesystem_events if "test.tgz" in e.data["path"]] + assert 1 == len(tgz_file_event), "No tgz file found" + file = Path(tgz_file_event[0].data["path"]) + assert file.is_file(), f"File not found at {file}" + extract_event = [e for e in filesystem_events if "test_tgz" in e.data["path"] and "folder" in e.tags] + assert 1 == len(extract_event), "Failed to extract tgz" + extract_path = Path(extract_event[0].data["path"]) / "test.txt" + assert extract_path.is_file(), "Failed to extract the test file"