diff --git a/pyproject.toml b/pyproject.toml index 12786b85..4dc5e597 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,7 +93,7 @@ s3 = "e3.event.handler.s3:S3Handler" [project.entry-points."e3.store"] http-simple-store = "e3.store.backends.http_simple_store:HTTPSimpleStore" - + [project.entry-points."e3.store.cache.backend"] file-cache = "e3.store.cache.backends.filecache:FileCache" @@ -124,7 +124,7 @@ addopts = "--failed-first --disable-socket --e3" [tool.mypy] # Ensure mypy works with namespace in which there is no toplevel -# __init__.py. Explicit_package_bases means that mypy_path +# __init__.py. Explicit_package_bases means that that mypy_path # will define which directory is the toplevel directory of the # namespace. mypy_path = "src" diff --git a/src/e3/archive.py b/src/e3/archive.py index 870a412b..5e3783ab 100644 --- a/src/e3/archive.py +++ b/src/e3/archive.py @@ -8,9 +8,7 @@ import tarfile import tempfile import zipfile - from contextlib import closing -from pathlib import Path from typing import TYPE_CHECKING, cast import e3 @@ -46,7 +44,7 @@ if sys.platform == "win32": # On Windows force the executable bit on all files. This ensures that when - # using cygwin unzipped files get an executable bit set (the executable + # using cygwin unziped files get an executable bit set (the executable # does not exist in win32 but is simulated in Cygwin). class E3ZipInfo(zipfile.ZipInfo): @@ -68,7 +66,6 @@ def _extract_member( path: str | PathLike[str] | None, pwd: bytes | None, ) -> str: - # noinspection PyProtectedMember result = super()._extract_member(member, path, pwd) # type: ignore if sys.platform != "win32": @@ -229,7 +226,7 @@ def unpack_archive( ext = check_type(filename, force_extension=force_extension) # If remove_root_dir is set then extract to a temp directory first. - # Otherwise, extract directly to the final destination + # Otherwise extract directly to the final destination if remove_root_dir: if tmp_dir_root is None: tmp_dir_root = os.path.dirname(os.path.abspath(dest)) @@ -258,62 +255,6 @@ def unpack_archive( ) as fd: check_selected = set(selected_files) - def filter_invalid_paths( - destination: str, members: list[tarfile.TarInfo] - ) -> list[tarfile.TarInfo]: - """Filter invalid archive members. - - As per CWE-22 (and tracked by bandit's B202 report), - using TarFile.extractall() should at least filter - members on invalid paths (like absolute paths, or - traversal path - containing ".."). - - :param destination: The destination path of the - uncompressed archive content. This is used to make - sure no element may be uncompressed outside this - path. - :param members: The list of elements of the archive to - perform path filtering for. - - :return: The list of elements from the archive which are - allowed to be uncompressed. - """ - filtered: list[tarfile.TarInfo] = [] - if hasattr(tarfile, "data_filter"): - # Use the tarfile filter. - for member in members: - try: - # The call to data_filter() raises an - # exception on any problematic member. We - # may build the list of members from - # unfiltered. - tarfile.data_filter(member, destination) - filtered.append(member) - except Exception: - logger.error( - f"Ignoring invalid member {member.name}" - " from tarball" - ) - else: - # At least remove some problematic paths. - for member in members: - member_path: Path = Path(member.name) - if member_path.is_absolute(): - logger.error( - f"Ignoring absolute path {member.name}" - ) - elif ".." in member_path.parts: - logger.error( - f"Ignoring traversal path {member.name}" - ) - elif member_path.is_reserved(): - logger.error( - f"Ignoring reserved path {member.name}" - ) - else: - filtered.append(member) - return filtered - def is_match(name: str, files: Sequence[str]) -> bool: """check if name match any of the expression in files. @@ -331,12 +272,12 @@ def is_match(name: str, files: Sequence[str]) -> bool: dirs: list[str] = [] # IMPORTANT: don't use the method extract. Always use the - # extractall function. Indeed, extractall will set file + # extractall function. Indeed extractall will set file # permissions only once all selected members are unpacked. # Using extract can lead to permission denied for example # if a read-only directory is created. - member_list: list[tarfile.TarInfo] = [] if selected_files: + member_list = [] for tinfo in fd: if is_match( tinfo.name, selected_files @@ -352,16 +293,10 @@ def is_match(name: str, files: Sequence[str]) -> bool: raise ArchiveError( "unpack_archive", f"Cannot untar {filename} " ) + + fd.extractall(path=tmp_dest, members=member_list) else: - member_list = fd.getmembers() - - fd.extractall( - path=tmp_dest, - members=filter_invalid_paths( - destination=tmp_dest, - members=member_list, - ), - ) + fd.extractall(path=tmp_dest) except tarfile.TarError as e: raise ArchiveError( @@ -374,12 +309,8 @@ def is_match(name: str, files: Sequence[str]) -> bool: with closing( E3ZipFile(fileobj if fileobj is not None else filename, mode="r") ) as zip_fd: - # Note that the ZipFile.extractall() already takes care - # of file names with "." or "..". Still bandit thinks this - # is a call to TarFile.extractall(), adding a flag to let - # `bandit` ignore this. zip_fd.extractall( - tmp_dest, selected_files if selected_files else None # nosec + tmp_dest, selected_files if selected_files else None ) except zipfile.BadZipfile as e: raise ArchiveError( @@ -387,7 +318,6 @@ def is_match(name: str, files: Sequence[str]) -> bool: message=f"Cannot unzip {filename} ({e})", ) from e else: - # noinspection PyArgumentList assert_never() if remove_root_dir: @@ -519,7 +449,6 @@ def create_archive( elif ext == "tar.xz": tar_format = "w:xz" else: - # noinspection PyArgumentList assert_never() with closing( tarfile.open(filepath, fileobj=fileobj, mode=tar_format) diff --git a/src/e3/fs.py b/src/e3/fs.py index fbb60154..fe9b60d5 100644 --- a/src/e3/fs.py +++ b/src/e3/fs.py @@ -501,7 +501,7 @@ def onerror( # And continue to delete the subdir if Version(python_version()) >= Version("3.12"): - shutil.rmtree(error_path, onexc=onerror) # type: ignore[call-arg] + shutil.rmtree(error_path, onexc=onerror) else: shutil.rmtree(error_path, onerror=onerror) @@ -523,7 +523,7 @@ def onerror( # directory, not a symbolic link to a directory if recursive and os.path.isdir(f) and not os.path.islink(f): if Version(python_version()) >= Version("3.12"): - shutil.rmtree(f, onexc=onerror) # type: ignore[call-arg] + shutil.rmtree(f, onexc=onerror) else: shutil.rmtree(f, onerror=onerror) else: diff --git a/tests/tests_e3/archive/main_test.py b/tests/tests_e3/archive/main_test.py index 863c5c3d..f5c51bbb 100644 --- a/tests/tests_e3/archive/main_test.py +++ b/tests/tests_e3/archive/main_test.py @@ -12,47 +12,7 @@ from unittest.mock import patch -E3_ARCHIVE_EXTENSIONS: list[str] = [".tar.gz", ".tar.bz2", ".tar.xz", ".tar", ".zip"] - - -@pytest.mark.parametrize("ext", E3_ARCHIVE_EXTENSIONS) -def test_bandit_b202(ext) -> None: - archive_name: str = f"test_b202{ext}" - - # Create a tarball with ".." everywhere - e3.fs.mkdir("archive") - e3.fs.mkdir("archive/dir1") - with open("archive/dir1/file1", "w") as f: - f.write("file1") - e3.fs.mkdir("archive/dir2/") - with open("archive/dir2/file2", "w") as f: - f.write("file2") - dest = os.getcwd() - os.chdir("archive/dir1") - e3.archive.create_archive(archive_name, "..", dest) - os.chdir(dest) - - # Make sure e3.archive.unpack_archive() unpacks it with caring about the - # ".." issues. - os.chdir(dest) - e3.fs.mkdir("unpack/level1") - e3.archive.unpack_archive( - os.path.join(dest, archive_name), os.path.join(dest, "unpack/level1") - ) - # As we packed with (for instance) "../dir1/file1", make sure that none of - # - unpack/dir1 - # - unpack/dir2 - # exist. - - assert ( - os.path.exists("unpack/dir1") is False - ), "Directory unpack/dir1 should not exist" - assert ( - os.path.exists("unpack/dir2") is False - ), "Directory unpack/dir2 should not exist" - - -@pytest.mark.parametrize("ext", E3_ARCHIVE_EXTENSIONS) +@pytest.mark.parametrize("ext", (".tar.gz", ".tar.bz2", ".tar.xz", ".tar", ".zip")) def test_unpack(ext): dir_to_pack = os.path.dirname(__file__) diff --git a/tox.ini b/tox.ini index 3f1de4f9..b09b4eff 100644 --- a/tox.ini +++ b/tox.ini @@ -33,8 +33,8 @@ commands = # is needed by e3. There is also e3.env.tmp_dir that returns the TMPDIR # environment variable. Don't check for that. # Ignore B324 that is no longer similar to B303 since Python3.9. - bandit -r {toxinidir}/src -ll -ii -s B102,B108,B301,B506,B303,B324 - mypy {toxinidir}/src + bandit -r {toxinidir}/src -ll -ii -s B102,B108,B301,B506,B303,B324,B202 + mypy {toxinidir}/src [testenv:docs] changedir = docs