From e6ba4387856b440291bb3142d4e7dcc21ba7ea10 Mon Sep 17 00:00:00 2001 From: BRAUN REMI Date: Fri, 13 Dec 2024 14:30:27 +0100 Subject: [PATCH] Create archives module (to avoid circular imports between path and files) + remove some depr functions + fix erroneous merge --- CI/SCRIPTS/test_archives.py | 147 ++++++++++ CI/SCRIPTS/test_files.py | 146 +--------- CI/SCRIPTS/test_path.py | 57 +--- CI/SCRIPTS/test_vectors.py | 4 +- sertit/archives.py | 558 ++++++++++++++++++++++++++++++++++++ sertit/files.py | 458 +---------------------------- sertit/path.py | 188 +----------- sertit/vectors.py | 25 +- sertit/xml.py | 36 ++- 9 files changed, 756 insertions(+), 863 deletions(-) create mode 100644 CI/SCRIPTS/test_archives.py create mode 100644 sertit/archives.py diff --git a/CI/SCRIPTS/test_archives.py b/CI/SCRIPTS/test_archives.py new file mode 100644 index 0000000..3415618 --- /dev/null +++ b/CI/SCRIPTS/test_archives.py @@ -0,0 +1,147 @@ +import os +import shutil + +import pytest +from lxml import etree, html + +from CI.SCRIPTS.script_utils import files_path, s3_env +from sertit import archives, ci, files, path, s3, vectors + + +def test_archive(tmp_path): + """Test extracting functions""" + # Archives + zip_file = files_path().joinpath("test_zip.zip") + zip2_file = files_path().joinpath("test_zip.zip") # For overwrite + zip_without_directory = files_path().joinpath("test_zip_without_directory.zip") + tar_file = files_path().joinpath("test_tar.tar") + tar_gz_file = files_path().joinpath("test_targz.tar.gz") + + # Core dir + core_dir = files_path().joinpath("core") + folder = core_dir + arch = [ + zip_file, + tar_file, + tar_gz_file, + folder, + zip2_file, + zip_without_directory, + ] + + # Extract + extracted_dirs = archives.extract_files(arch, tmp_path, overwrite=True) + archives.extract_files([zip2_file], tmp_path, overwrite=False) # Already existing + + # Test + for ex_dir in extracted_dirs: + ci.assert_dir_equal(core_dir, ex_dir) + + # Archive + archive_base = os.path.join(tmp_path, "archive") + for fmt in ["zip", "tar", "gztar"]: + archive_fn = archives.archive( + folder_path=core_dir, archive_path=archive_base, fmt=fmt + ) + out = archives.extract_file(archive_fn, tmp_path) + # an additional folder is created + out_dir = path.listdir_abspath(out)[0] + ci.assert_dir_equal(core_dir, out_dir) + + # Remove out directory in order to avoid any interferences + files.remove(out) + + # Add to zip + zip_out = zip2_file if path.is_cloud_path(zip2_file) else archive_base + ".zip" + core_copy = files.copy(core_dir, os.path.join(tmp_path, "core2")) + zip_out = archives.add_to_zip(zip_out, core_copy) + + # Extract + unzip_out = os.path.join(tmp_path, "out") + unzip_out = archives.extract_file(zip_out, unzip_out) + + # Test + unzip_dirs = path.listdir_abspath(unzip_out) + + assert len(unzip_dirs) == 2 + ci.assert_dir_equal(unzip_dirs[0], unzip_dirs[1]) + + +@s3_env +def test_archived_files(tmp_path): + landsat_name = "LM05_L1TP_200030_20121230_20200820_02_T2_CI" + ok_folder = files_path().joinpath(landsat_name) + zip_file = files_path().joinpath(f"{landsat_name}.zip") + tar_file = files_path().joinpath(f"{landsat_name}.tar") + targz_file = files_path().joinpath(f"{landsat_name}.tar.gz") + sz_file = files_path().joinpath(f"{landsat_name}.7z") + + # VECTORS + vect_name = "map-overlay.kml" + vec_ok_path = ok_folder.joinpath(vect_name) + if shutil.which("ogr2ogr"): # Only works if ogr2ogr can be found. + vect_regex = f".*{vect_name}" + vect_zip = vectors.read(zip_file, archive_regex=vect_regex) + vect_tar = vectors.read(tar_file, archive_regex=r".*overlay\.kml") + vect_ok = vectors.read(vec_ok_path) + assert not vect_ok.empty + ci.assert_geom_equal(vect_ok, vect_zip) + ci.assert_geom_equal(vect_ok, vect_tar) + + # XML + xml_name = "LM05_L1TP_200030_20121230_20200820_02_T2_MTL.xml" + xml_ok_path = ok_folder.joinpath(xml_name) + xml_ok_path = str(s3.download(xml_ok_path, tmp_path)) + + xml_regex = f".*{xml_name}" + xml_zip = archives.read_archived_xml(zip_file, xml_regex) + xml_tar = archives.read_archived_xml(tar_file, r".*_MTL\.xml") + xml_ok = etree.parse(xml_ok_path).getroot() + ci.assert_xml_equal(xml_ok, xml_zip) + ci.assert_xml_equal(xml_ok, xml_tar) + + # FILE + HTML + html_zip_file = files_path().joinpath("productPreview.zip") + html_tar_file = files_path().joinpath("productPreview.tar") + html_name = "productPreview.html" + html_ok_path = files_path().joinpath(html_name) + html_ok_path = str(s3.download(html_ok_path, tmp_path)) + + html_regex = f".*{html_name}" + + # FILE + file_zip = archives.read_archived_file(html_zip_file, html_regex) + file_tar = archives.read_archived_file(html_tar_file, html_regex) + html_ok = html.parse(html_ok_path).getroot() + ci.assert_html_equal(html_ok, html.fromstring(file_zip)) + ci.assert_html_equal(html_ok, html.fromstring(file_tar)) + + file_list = archives.get_archived_file_list(html_zip_file) + ci.assert_html_equal( + html_ok, + html.fromstring( + archives.read_archived_file(html_zip_file, html_regex, file_list=file_list) + ), + ) + + # HTML + html_zip = archives.read_archived_html(html_zip_file, html_regex) + html_tar = archives.read_archived_html(html_tar_file, html_regex) + ci.assert_html_equal(html_ok, html_zip) + ci.assert_html_equal(html_ok, html_tar) + ci.assert_html_equal( + html_ok, + archives.read_archived_html( + html_tar_file, + html_regex, + file_list=archives.get_archived_file_list(html_tar_file), + ), + ) + + # ERRORS + with pytest.raises(TypeError): + archives.read_archived_file(targz_file, xml_regex) + with pytest.raises(TypeError): + archives.read_archived_file(sz_file, xml_regex) + with pytest.raises(FileNotFoundError): + archives.read_archived_file(zip_file, "cdzeferf") diff --git a/CI/SCRIPTS/test_files.py b/CI/SCRIPTS/test_files.py index 04015b7..a0f9889 100644 --- a/CI/SCRIPTS/test_files.py +++ b/CI/SCRIPTS/test_files.py @@ -16,160 +16,18 @@ """Script testing the files""" import os -import shutil import tempfile from datetime import date, datetime import numpy as np import pytest -from lxml import etree, html -from CI.SCRIPTS.script_utils import Polarization, files_path, s3_env -from sertit import AnyPath, ci, files, path, s3, vectors +from CI.SCRIPTS.script_utils import Polarization +from sertit import AnyPath, ci, files ci.reduce_verbosity() -def test_archive(): - """Test extracting functions""" - with tempfile.TemporaryDirectory() as tmp_dir: - # Archives - zip_file = files_path().joinpath("test_zip.zip") - zip2_file = files_path().joinpath("test_zip.zip") # For overwrite - zip_without_directory = files_path().joinpath("test_zip_without_directory.zip") - tar_file = files_path().joinpath("test_tar.tar") - tar_gz_file = files_path().joinpath("test_targz.tar.gz") - - # Core dir - core_dir = files_path().joinpath("core") - folder = core_dir - archives = [ - zip_file, - tar_file, - tar_gz_file, - folder, - zip2_file, - zip_without_directory, - ] - - # Extract - extracted_dirs = files.extract_files(archives, tmp_dir, overwrite=True) - files.extract_files([zip2_file], tmp_dir, overwrite=False) # Already existing - - # Test - for ex_dir in extracted_dirs: - ci.assert_dir_equal(core_dir, ex_dir) - - # Archive - archive_base = os.path.join(tmp_dir, "archive") - for fmt in ["zip", "tar", "gztar"]: - archive_fn = files.archive( - folder_path=core_dir, archive_path=archive_base, fmt=fmt - ) - out = files.extract_file(archive_fn, tmp_dir) - # an additional folder is created - out_dir = path.listdir_abspath(out)[0] - ci.assert_dir_equal(core_dir, out_dir) - - # Remove out directory in order to avoid any interferences - files.remove(out) - - # Add to zip - zip_out = zip2_file if path.is_cloud_path(zip2_file) else archive_base + ".zip" - core_copy = files.copy(core_dir, os.path.join(tmp_dir, "core2")) - zip_out = files.add_to_zip(zip_out, core_copy) - - # Extract - unzip_out = os.path.join(tmp_dir, "out") - unzip_out = files.extract_file(zip_out, unzip_out) - - # Test - unzip_dirs = path.listdir_abspath(unzip_out) - - assert len(unzip_dirs) == 2 - ci.assert_dir_equal(unzip_dirs[0], unzip_dirs[1]) - - -@s3_env -def test_archived_files(tmp_path): - landsat_name = "LM05_L1TP_200030_20121230_20200820_02_T2_CI" - ok_folder = files_path().joinpath(landsat_name) - zip_file = files_path().joinpath(f"{landsat_name}.zip") - tar_file = files_path().joinpath(f"{landsat_name}.tar") - targz_file = files_path().joinpath(f"{landsat_name}.tar.gz") - sz_file = files_path().joinpath(f"{landsat_name}.7z") - - # VECTORS - vect_name = "map-overlay.kml" - vec_ok_path = ok_folder.joinpath(vect_name) - if shutil.which("ogr2ogr"): # Only works if ogr2ogr can be found. - vect_regex = f".*{vect_name}" - vect_zip = vectors.read(zip_file, archive_regex=vect_regex) - vect_tar = vectors.read(tar_file, archive_regex=r".*overlay\.kml") - vect_ok = vectors.read(vec_ok_path) - assert not vect_ok.empty - ci.assert_geom_equal(vect_ok, vect_zip) - ci.assert_geom_equal(vect_ok, vect_tar) - - # XML - xml_name = "LM05_L1TP_200030_20121230_20200820_02_T2_MTL.xml" - xml_ok_path = ok_folder.joinpath(xml_name) - xml_ok_path = str(s3.download(xml_ok_path, tmp_path)) - - xml_regex = f".*{xml_name}" - xml_zip = files.read_archived_xml(zip_file, xml_regex) - xml_tar = files.read_archived_xml(tar_file, r".*_MTL\.xml") - xml_ok = etree.parse(xml_ok_path).getroot() - ci.assert_xml_equal(xml_ok, xml_zip) - ci.assert_xml_equal(xml_ok, xml_tar) - - # FILE + HTML - html_zip_file = files_path().joinpath("productPreview.zip") - html_tar_file = files_path().joinpath("productPreview.tar") - html_name = "productPreview.html" - html_ok_path = files_path().joinpath(html_name) - html_ok_path = str(s3.download(html_ok_path, tmp_path)) - - html_regex = f".*{html_name}" - - # FILE - file_zip = files.read_archived_file(html_zip_file, html_regex) - file_tar = files.read_archived_file(html_tar_file, html_regex) - html_ok = html.parse(html_ok_path).getroot() - ci.assert_html_equal(html_ok, html.fromstring(file_zip)) - ci.assert_html_equal(html_ok, html.fromstring(file_tar)) - - file_list = path.get_archived_file_list(html_zip_file) - ci.assert_html_equal( - html_ok, - html.fromstring( - files.read_archived_file(html_zip_file, html_regex, file_list=file_list) - ), - ) - - # HTML - html_zip = files.read_archived_html(html_zip_file, html_regex) - html_tar = files.read_archived_html(html_tar_file, html_regex) - ci.assert_html_equal(html_ok, html_zip) - ci.assert_html_equal(html_ok, html_tar) - ci.assert_html_equal( - html_ok, - files.read_archived_html( - html_tar_file, - html_regex, - file_list=path.get_archived_file_list(html_tar_file), - ), - ) - - # ERRORS - with pytest.raises(TypeError): - files.read_archived_file(targz_file, xml_regex) - with pytest.raises(TypeError): - files.read_archived_file(sz_file, xml_regex) - with pytest.raises(FileNotFoundError): - files.read_archived_file(zip_file, "cdzeferf") - - def test_cp_rm(): """Test CP/RM functions""" with tempfile.TemporaryDirectory() as tmp_dir: diff --git a/CI/SCRIPTS/test_path.py b/CI/SCRIPTS/test_path.py index bf335ec..d173bc3 100644 --- a/CI/SCRIPTS/test_path.py +++ b/CI/SCRIPTS/test_path.py @@ -16,13 +16,12 @@ """Script testing the files""" import os -import shutil import tempfile import pytest -from CI.SCRIPTS.script_utils import files_path, get_s3_ci_path, s3_env -from sertit import AnyPath, ci, misc, path, vectors +from CI.SCRIPTS.script_utils import get_s3_ci_path +from sertit import AnyPath, ci, misc, path ci.reduce_verbosity() @@ -65,58 +64,6 @@ def test_paths(): assert not path.is_writable("cvfgbherth") # Non-existing -@s3_env -def test_archived_paths(): - landsat_name = "LM05_L1TP_200030_20121230_20200820_02_T2_CI" - ok_folder = files_path().joinpath(landsat_name) - zip_file = files_path().joinpath(f"{landsat_name}.zip") - tar_file = files_path().joinpath(f"{landsat_name}.tar") - targz_file = files_path().joinpath(f"{landsat_name}.tar.gz") - sz_file = files_path().joinpath(f"{landsat_name}.7z") - - # Archive file - tif_name = "LM05_L1TP_200030_20121230_20200820_02_T2_QA_RADSAT.TIF" - tif_ok = f"{ok_folder.name}/{tif_name}" - tif_regex = f".*{tif_name}" - assert tif_ok == path.get_archived_path(zip_file, tif_regex) - assert tif_ok == path.get_archived_path(zip_file, tif_regex, as_list=True)[0] - assert tif_ok == path.get_archived_path(tar_file, ".*RADSAT") - - # RASTERIO - tif_zip = path.get_archived_rio_path(zip_file, tif_regex) - tif_list = path.get_archived_rio_path(zip_file, tif_regex, as_list=True) - tif_tar = path.get_archived_rio_path(tar_file, ".*RADSAT") - tif_ok = ok_folder.joinpath(tif_name) - ci.assert_raster_equal(tif_ok, tif_zip) - ci.assert_raster_equal(tif_ok, tif_list[0]) - ci.assert_raster_equal(tif_ok, tif_tar) - - file_list = path.get_archived_file_list(zip_file) - ci.assert_raster_equal( - tif_ok, path.get_archived_rio_path(zip_file, tif_regex, file_list=file_list) - ) - - # VECTORS - vect_name = "map-overlay.kml" - vec_ok_path = ok_folder.joinpath(vect_name) - if shutil.which("ogr2ogr"): # Only works if ogr2ogr can be found. - vect_regex = f".*{vect_name}" - vect_zip = vectors.read(zip_file, archive_regex=vect_regex) - vect_tar = vectors.read(tar_file, archive_regex=r".*overlay\.kml") - vect_ok = vectors.read(vec_ok_path) - assert not vect_ok.empty - ci.assert_geom_equal(vect_ok, vect_zip) - ci.assert_geom_equal(vect_ok, vect_tar) - - # ERRORS - with pytest.raises(TypeError): - path.get_archived_rio_path(targz_file, tif_regex) - with pytest.raises(TypeError): - path.get_archived_rio_path(sz_file, tif_regex) - with pytest.raises(FileNotFoundError): - path.get_archived_rio_path(zip_file, "cdzeferf") - - def test_get_file_name(): """Test get_file_name""" file_name = path.get_filename(__file__) diff --git a/CI/SCRIPTS/test_vectors.py b/CI/SCRIPTS/test_vectors.py index 5f9bd92..5a79272 100644 --- a/CI/SCRIPTS/test_vectors.py +++ b/CI/SCRIPTS/test_vectors.py @@ -25,7 +25,7 @@ from shapely import wkt from CI.SCRIPTS.script_utils import KAPUT_KWARGS, files_path, s3_env, vectors_path -from sertit import ci, files, path, vectors +from sertit import archives, ci, files, path, vectors from sertit.vectors import EPSG_4326, DataSourceError ci.reduce_verbosity() @@ -294,7 +294,7 @@ def test_read_archived(): vectors.read(tar_landsat, archive_regex=map_overlay_regex), ) - file_list = path.get_archived_file_list(tar_landsat) + file_list = archives.get_archived_file_list(tar_landsat) ci.assert_geom_equal( map_overlay_extracted, vectors.read(tar_landsat, archive_regex=map_overlay_regex, file_list=file_list), diff --git a/sertit/archives.py b/sertit/archives.py new file mode 100644 index 0000000..2115071 --- /dev/null +++ b/sertit/archives.py @@ -0,0 +1,558 @@ +import logging +import os +import re +import shutil +import tarfile +import tempfile +import zipfile +from contextlib import contextmanager +from pathlib import Path +from typing import Union + +from lxml import etree, html +from tqdm import tqdm + +from sertit import AnyPath, logs, path, s3 +from sertit.logs import SU_NAME +from sertit.types import AnyPathStrType, AnyPathType + +LOGGER = logging.getLogger(SU_NAME) + + +@contextmanager +def open_zipfile(file_path, mode="r"): + if path.is_cloud_path(file_path): + file_path = s3.read(file_path) + + with zipfile.ZipFile(file_path, mode) as zip_file: + yield zip_file + + +@contextmanager +def open_tarfile(file_path, mode="r"): + if path.is_cloud_path(file_path): + args = {"fileobj": s3.read(file_path), "mode": mode} + else: + args = {"name": file_path, "mode": mode} + with tarfile.open(**args) as tar_file: + yield tar_file + + +def extract_file( + file_path: AnyPathStrType, + output: AnyPathStrType, + overwrite: bool = False, +) -> AnyPathType: + """ + Extract an archived file (zip or others). Overwrites if specified. + If the archive don't contain a root directory with the name of the archive without the extension, create it + + Args: + file_path (str): Archive file path + output (str): Output where to put the extracted directory + overwrite (bool): Overwrite found extracted directory + + Returns: + AnyPathType: Extracted directory paths + + Example: + >>> file_path = 'D:/path/to/zip.zip' + >>> output = 'D:/path/to/output' + >>> extract_file(file_path, output, overwrite=True) + D:/path/to/output/zip' + """ + # Convert to path + file_path = AnyPath(file_path) + output = AnyPath(output) + + # In case a folder is given, returns it (this means that the file is already extracted) + if file_path.is_dir(): + return file_path + + # Beware with .SEN3 and .SAFE extensions + archive_output = output.joinpath(path.get_filename(file_path)) + + # In case not overwrite and the extracted directory already exists + if not overwrite and archive_output.exists(): + LOGGER.debug( + "Already existing extracted %s. It won't be overwritten.", + archive_output, + ) + return archive_output + + def extract_sub_dir(arch, filename_list): + top_level_files = list({item.split("/")[0] for item in filename_list}) + + # When the only root directory in the archive has the right name, we don't have to create it + if len(top_level_files) == 1 and archive_output.name == path.get_filename( + top_level_files[0] + ): + arch.extractall(archive_output.parent) + archive_output.parent.joinpath(top_level_files[0]).rename(archive_output) + else: + arch.extractall(archive_output) + + # Manage archive type + if file_path.suffix == ".zip": + with open_zipfile(file_path) as zip_file: + extract_sub_dir(zip_file, zip_file.namelist()) + elif file_path.suffix == ".tar" or file_path.suffixes == [".tar", ".gz"]: + with open_tarfile(file_path) as tar_file: + extract_sub_dir(tar_file, tar_file.getnames()) + elif file_path.suffix == ".7z": + try: + import py7zr + + with py7zr.SevenZipFile(file_path, "r") as z7_file: + extract_sub_dir(z7_file, z7_file.getnames()) + except ModuleNotFoundError as exc: + raise TypeError("Please install 'py7zr' to extract .7z files") from exc + else: + raise TypeError( + f"Only .zip, .tar, .tar.gz and .7z files can be extracted, not {file_path}" + ) + + return archive_output + + +def extract_files( + archives: list, output: AnyPathStrType, overwrite: bool = False +) -> list: + """ + Extract all archived files. Overwrites if specified. + + Example: + >>> file_path = ['D:/path/to/zip1.zip', 'D:/path/to/zip2.zip'] + >>> output = 'D:/path/to/output' + >>> extract_files(file_path, output, overwrite=True) + ['D:/path/to/output.zip1', 'D:/path/to/output.zip2'] + + Args: + archives (list of str): List of archives to be extracted + output (str): Output folder where extracted files will be written + overwrite (bool): Overwrite found extracted files + + Returns: + list: Extracted files (even pre-existing ones) + """ + LOGGER.info("Extracting products in %s", output) + progress_bar = tqdm(archives) + extracts = [] + for arch in progress_bar: + progress_bar.set_description(f"Extracting product {os.path.basename(arch)}") + extracts.append(extract_file(arch, output, overwrite)) + + return extracts + + +def read_archived_file( + archive_path: AnyPathStrType, regex: str, file_list: list = None +) -> bytes: + """ + Read archived file (in bytes) from :code:`zip` or :code:`tar` archives. + + You can use this `site `_ to build your regex. + + Args: + archive_path (AnyPathStrType): Archive path + regex (str): Regex (used by re) as it can be found in the getmembers() list + file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed. + + Returns: + bytes: Archived file in bytes + """ + archive_path = AnyPath(archive_path) + + # Compile regex + regex = re.compile(regex) + + # Open tar and zip XML + try: + if archive_path.suffix == ".tar": + with open_tarfile(archive_path) as tar_ds: + # file_list is not very useful for TAR files... + if file_list is None: + tar_mb = tar_ds.getmembers() + file_list = [mb.name for mb in tar_mb] + name = list(filter(regex.match, file_list))[0] + tarinfo = tar_ds.getmember(name) + file_str = tar_ds.extractfile(tarinfo).read() + elif archive_path.suffix == ".zip": + with open_zipfile(archive_path) as zip_ds: + if file_list is None: + file_list = [f.filename for f in zip_ds.filelist] + name = list(filter(regex.match, file_list))[0] + file_str = zip_ds.read(name) + + elif archive_path.suffix == ".tar.gz": + raise TypeError( + ".tar.gz files are too slow to read from inside the archive. Please extract them instead." + ) + else: + raise TypeError( + "Only .zip and .tar files can be read from inside its archive." + ) + except IndexError as exc: + raise FileNotFoundError( + f"Impossible to find file {regex} in {path.get_filename(archive_path)}" + ) from exc + + return file_str + + +def read_archived_xml( + archive_path: AnyPathStrType, regex: str = None, file_list: list = None, **kwargs +) -> etree._Element: + """ + Read archived XML from :code:`zip` or :code:`tar` archives. + + You can use this `site `_ to build your regex. + + Args: + archive_path (AnyPathStrType): Archive path + regex (str): XML regex (used by re) as it can be found in the getmembers() list + file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed. + + Returns: + etree._Element: XML file + + Example: + >>> arch_path = 'D:/path/to/zip.zip' + >>> file_regex = '.*dir.*file_name' # Use .* for any character + >>> read_archived_xml(arch_path, file_regex) + + """ + if regex is None: + logs.deprecation_warning( + "'xml_regex' is deprecated, please use 'regex' instead." + ) + regex = kwargs.pop("xml_regex") + + xml_bytes = read_archived_file(archive_path, regex=regex, file_list=file_list) + + return etree.fromstring(xml_bytes) + + +def read_archived_html( + archive_path: AnyPathStrType, regex: str, file_list: list = None +) -> html.HtmlElement: + """ + Read archived HTML from :code:`zip` or :code:`tar` archives. + + You can use this `site `_ to build your regex. + + Args: + archive_path (AnyPathStrType): Archive path + regex (str): HTML regex (used by re) as it can be found in the getmembers() list + file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed. + + Returns: + html._Element: HTML file + + Example: + >>> arch_path = 'D:/path/to/zip.zip' + >>> file_regex = '.*dir.*file_name' # Use .* for any character + >>> read_archived_html(arch_path, file_regex) + + """ + html_bytes = read_archived_file(archive_path, regex, file_list=file_list) + + return html.fromstring(html_bytes) + + +def archive( + folder_path: AnyPathStrType, + archive_path: AnyPathStrType, + fmt: str = "zip", +) -> AnyPathType: + """ + Archives a folder recursively. + + Args: + folder_path (AnyPathStrType): Folder to archive + archive_path (AnyPathStrType): Archive path, with or without extension + fmt (str): Format of the archive, used by :code:`shutil.make_archive`. Choose between [zip, tar, gztar, bztar, xztar] + + Returns: + str: Archive filename + + Example: + >>> folder_path = 'D:/path/to/folder_to_archive' + >>> archive_path = 'D:/path/to/output' + >>> archive = archive(folder_path, archive_path, fmt="gztar") + 'D:/path/to/output/folder_to_archive.tar.gz' + """ + archive_path = AnyPath(archive_path) + folder_path = AnyPath(folder_path) + + tmp_dir = None + if path.is_cloud_path(folder_path): + tmp_dir = tempfile.TemporaryDirectory() + folder_path = folder_path.download_to(tmp_dir.name) + + # Shutil make_archive needs a path without extension + archive_base = os.path.splitext(archive_path)[0] + + # Archive the folder + archive_fn = shutil.make_archive( + archive_base, + format=fmt, + root_dir=folder_path.parent, + base_dir=folder_path.name, + ) + + if tmp_dir is not None: + tmp_dir.cleanup() + + return AnyPath(archive_fn) + + +def add_to_zip( + zip_path: AnyPathStrType, + dirs_to_add: Union[list, AnyPathStrType], +) -> AnyPathType: + """ + Add folders to an already existing zip file (recursively). + + Args: + zip_path (AnyPathStrType): Already existing zip file + dirs_to_add (Union[list, AnyPathStrType]): Directories to add + + Returns: + AnyPathType: Updated zip_path + + Example: + >>> zip_path = 'D:/path/to/zip.zip' + >>> dirs_to_add = ['D:/path/to/dir1', 'D:/path/to/dir2'] + >>> add_to_zip(zip_path, dirs_to_add) + zip.zip contains 2 more folders, dir1 and dir2 + """ + zip_path = AnyPath(zip_path) + + # If the zip is on the cloud, cache it (zipfile doesn't like cloud paths) + if path.is_cloud_path(zip_path): + zip_path = AnyPath(zip_path.fspath) + + # Check if existing zipfile + if not zip_path.is_file(): + raise FileNotFoundError(f"Non existing {zip_path}") + + # Convert to list if needed + if not isinstance(dirs_to_add, list): + dirs_to_add = [dirs_to_add] + + # Add all folders to the existing zip + # Forced to use ZipFile because make_archive only works with one folder and not existing zipfile + with open_zipfile(zip_path, "a") as zip_file: + progress_bar = tqdm(dirs_to_add) + for dir_to_add_path in progress_bar: + # Just to be sure, use str instead of Paths + if isinstance(dir_to_add_path, Path): + dir_to_add = str(dir_to_add_path) + elif path.is_cloud_path(dir_to_add_path): + dir_to_add = dir_to_add_path.fspath + else: + dir_to_add = dir_to_add_path + + progress_bar.set_description( + f"Adding {os.path.basename(dir_to_add)} to {os.path.basename(zip_path)}" + ) + tmp = tempfile.TemporaryDirectory() + if os.path.isfile(dir_to_add): + dir_to_add = extract_file(dir_to_add, tmp.name) + + for root, _, files in os.walk(dir_to_add): + base_path = os.path.join(dir_to_add, "..") + + # Write dir (in namelist at least) + zip_file.write(root, os.path.relpath(root, base_path)) + + # Write files + for file in files: + zip_file.write( + os.path.join(root, file), + os.path.relpath( + os.path.join(root, file), os.path.join(dir_to_add, "..") + ), + ) + + # Clean tmp + tmp.cleanup() + + return zip_path + + +def get_archived_file_list(archive_path: AnyPathStrType) -> list: + """ + Get the list of all the files contained in an archive. + + Args: + archive_path (AnyPathStrType): Archive path + + Returns: + list: All files contained in the given archive + + Example: + >>> arch_path = 'D:/path/to/zip.zip' + >>> get_archived_file_list(arch_path, file_regex) + ['file_1.txt', 'file_2.tif', 'file_3.xml', 'file_4.geojson'] + """ + archive_path = AnyPath(archive_path) + + is_zip = archive_path.suffix == ".zip" + archive_fn = path.get_filename(archive_path) + if is_zip: + with open_zipfile(archive_path) as zip_ds: + file_list = [f.filename for f in zip_ds.filelist] + else: + try: + with open_tarfile(archive_path) as tar_ds: + tar_mb = tar_ds.getmembers() + file_list = [mb.name for mb in tar_mb] + except tarfile.ReadError as ex: + raise tarfile.ReadError(f"Impossible to open archive: {archive_fn}") from ex + + return file_list + + +def get_archived_path( + archive_path: AnyPathStrType, + regex: str, + as_list: bool = False, + case_sensitive: bool = False, + file_list: list = None, + **kwargs, +) -> Union[list, AnyPathType]: + """ + Get archived file path from inside the archive. + + .. WARNING:: + If :code:`as_list` is :code:`False`, it will only return the first file matched ! + + You can use this `site `_ to build your regex. + + Args: + archive_path (AnyPathStrType): Archive path + regex (str): File regex (used by re) as it can be found in the getmembers() list + as_list (bool): If true, returns a list (including all found files). If false, returns only the first match + case_sensitive (bool): If true, the regex is case-sensitive. + file_list (list): List of files to get archived from. Optional, if not given it will be re-computed. + + Returns: + Union[list, str]: Path from inside the zipfile + + Example: + >>> arch_path = 'D:/path/to/zip.zip' + >>> file_regex = '.*dir.*file_name' # Use .* for any character + >>> path = get_archived_path(arch_path, file_regex) + 'dir/filename.tif' + """ + if regex is None: + logs.deprecation_warning( + "'file_regex' is deprecated, please use 'regex' instead." + ) + regex = kwargs.pop("file_regex") + + # Get file list + archive_path = AnyPath(archive_path) + + # Offer the ability to give the file list directly, as this operation is expensive when done with large archives stored on the cloud + if file_list is None: + file_list = get_archived_file_list(archive_path) + + # Search for file + re_rgx = re.compile(regex) if case_sensitive else re.compile(regex, re.IGNORECASE) + archived_band_paths = list(filter(re_rgx.match, file_list)) + if not archived_band_paths: + raise FileNotFoundError( + f"Impossible to find file {regex} in {path.get_filename(archive_path)}" + ) + + # Convert to str if needed + if not as_list: + archived_band_paths = archived_band_paths[0] + + return archived_band_paths + + +def get_archived_rio_path( + archive_path: AnyPathStrType, + regex: str, + as_list: bool = False, + file_list: list = None, + **kwargs, +) -> Union[list, AnyPathType]: + """ + Get archived file path from inside the archive, to be read with rasterio: + + - :code:`zip+file://{zip_path}!{file_name}` + - :code:`tar+file://{tar_path}!{file_name}` + + + See `here `_ + for more information. + + .. WARNING:: + It wont be readable by pandas, geopandas or xmltree ! + + .. WARNING:: + If :code:`as_list` is :code:`False`, it will only return the first file matched ! + + You can use this `site `_ to build your regex. + + Args: + archive_path (AnyPathStrType): Archive path + regex (str): File regex (used by re) as it can be found in the getmembers() list + as_list (bool): If true, returns a list (including all found files). If false, returns only the first match + file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed. + + Returns: + Union[list, str]: Band path that can be read by rasterio + + Example: + >>> arch_path = 'D:/path/to/zip.zip' + >>> file_regex = '.*dir.*file_name' # Use .* for any character + >>> path = get_archived_tif_path(arch_path, file_regex) + 'zip+file://D:/path/to/output.zip!dir/filename.tif' + >>> rasterio.open(path) + + """ + if regex is None: + logs.deprecation_warning( + "'file_regex' is deprecated, please use 'regex' instead." + ) + regex = kwargs.pop("file_regex") + + archive_path = AnyPath(archive_path) + if archive_path.suffix in [".tar", ".zip"]: + prefix = archive_path.suffix[-3:] + elif archive_path.suffix == ".tar.gz": + raise TypeError( + ".tar.gz files are too slow to be read from inside the archive. Please extract them instead." + ) + else: + raise TypeError("Only .zip and .tar files can be read from inside its archive.") + + # Search for file + archived_band_paths = get_archived_path( + archive_path, regex=regex, as_list=True, file_list=file_list + ) + + # Convert to rio path + if path.is_cloud_path(archive_path): + archived_band_paths = [ + f"{prefix}+file+{archive_path}!{p}" for p in archived_band_paths + ] + else: + # archived_band_paths = [ + # f"{prefix}+file://{archive_path}!{path}" for path in archived_band_paths + # ] + archived_band_paths = [ + f"/vsi{prefix}/{archive_path}/{p}" for p in archived_band_paths + ] + + # Convert to str if needed + if not as_list: + archived_band_paths = archived_band_paths[0] + + return archived_band_paths diff --git a/sertit/files.py b/sertit/files.py index 4bdb6c2..c2ec5a3 100644 --- a/sertit/files.py +++ b/sertit/files.py @@ -19,11 +19,7 @@ import json import logging import os -import re import shutil -import tarfile -import tempfile -import zipfile from datetime import date, datetime from enum import Enum from json import JSONDecoder, JSONEncoder @@ -32,10 +28,8 @@ import dill import numpy as np -from lxml import etree, html -from tqdm import tqdm -from sertit import AnyPath, logs, path +from sertit import AnyPath, logs, path, s3 from sertit.logs import SU_NAME from sertit.strings import DATE_FORMAT from sertit.types import AnyPathStrType, AnyPathType @@ -165,454 +159,6 @@ def real_rel_path(raw_path: AnyPathStrType, start: AnyPathStrType) -> AnyPathTyp return path.real_rel_path(raw_path, start) -def extract_file( - file_path: AnyPathStrType, - output: AnyPathStrType, - overwrite: bool = False, -) -> AnyPathType: - """ - Extract an archived file (zip or others). Overwrites if specified. - If the archive don't contain a root directory with the name of the archive without the extension, create it - - Args: - file_path (str): Archive file path - output (str): Output where to put the extracted directory - overwrite (bool): Overwrite found extracted directory - - Returns: - AnyPathType: Extracted directory paths - - Example: - >>> file_path = 'D:/path/to/zip.zip' - >>> output = 'D:/path/to/output' - >>> extract_file(file_path, output, overwrite=True) - D:/path/to/output/zip' - """ - # Convert to path - file_path = AnyPath(file_path) - output = AnyPath(output) - - # In case a folder is given, returns it (this means that the file is already extracted) - if file_path.is_dir(): - return file_path - - # Beware with .SEN3 and .SAFE extensions - archive_output = output.joinpath(path.get_filename(file_path)) - - # In case not overwrite and the extracted directory already exists - if not overwrite and archive_output.exists(): - LOGGER.debug( - "Already existing extracted %s. It won't be overwritten.", - archive_output, - ) - return archive_output - - def extract_sub_dir(arch, filename_list): - top_level_files = list({item.split("/")[0] for item in filename_list}) - - # When the only root directory in the archive has the right name, we don't have to create it - if len(top_level_files) == 1 and archive_output.name == path.get_filename( - top_level_files[0] - ): - arch.extractall(archive_output.parent) - archive_output.parent.joinpath(top_level_files[0]).rename(archive_output) - else: - arch.extractall(archive_output) - - # Manage archive type - if file_path.suffix == ".zip": - with zipfile.ZipFile(file_path, "r") as zip_file: - extract_sub_dir(zip_file, zip_file.namelist()) - elif file_path.suffix == ".tar" or file_path.suffixes == [".tar", ".gz"]: - with tarfile.open(file_path, "r") as tar_file: - extract_sub_dir(tar_file, tar_file.getnames()) - elif file_path.suffix == ".7z": - try: - import py7zr - - with py7zr.SevenZipFile(file_path, "r") as z7_file: - extract_sub_dir(z7_file, z7_file.getnames()) - except ModuleNotFoundError as exc: - raise TypeError("Please install 'py7zr' to extract .7z files") from exc - else: - raise TypeError( - f"Only .zip, .tar, .tar.gz and .7z files can be extracted, not {file_path}" - ) - - return archive_output - - -def extract_files( - archives: list, output: AnyPathStrType, overwrite: bool = False -) -> list: - """ - Extract all archived files. Overwrites if specified. - - Example: - >>> file_path = ['D:/path/to/zip1.zip', 'D:/path/to/zip2.zip'] - >>> output = 'D:/path/to/output' - >>> extract_files(file_path, output, overwrite=True) - ['D:/path/to/output.zip1', 'D:/path/to/output.zip2'] - - Args: - archives (list of str): List of archives to be extracted - output (str): Output folder where extracted files will be written - overwrite (bool): Overwrite found extracted files - - Returns: - list: Extracted files (even pre-existing ones) - """ - LOGGER.info("Extracting products in %s", output) - progress_bar = tqdm(archives) - extracts = [] - for arch in progress_bar: - progress_bar.set_description(f"Extracting product {os.path.basename(arch)}") - extracts.append(extract_file(arch, output, overwrite)) - - return extracts - - -def get_archived_file_list(archive_path: AnyPathStrType) -> list: - """ - .. deprecated:: 1.30.0 - Import it from :py:mod:`sertit.path` instead of :py:mod:`sertit.files` - - Get the list of all the files contained in an archive. - - Args: - archive_path (AnyPathStrType): Archive path - - Returns: - list: All files contained in the given archive - - Example: - >>> arch_path = 'D:/path/to/zip.zip' - >>> get_archived_file_list(arch_path, file_regex) - ['file_1.txt', 'file_2.tif', 'file_3.xml', 'file_4.geojson'] - """ - logs.deprecation_warning( - "This function is deprecated. Import it from 'sertit.path' instead of 'sertit.files'" - ) - return path.get_archived_file_list(archive_path) - - -def get_archived_path( - archive_path: AnyPathStrType, file_regex: str, as_list: bool = False -) -> Union[list, AnyPathType]: - """ - .. deprecated:: 1.30.0 - Import it from :py:mod:`sertit.path` instead of :py:mod:`sertit.files` - - Get archived file path from inside the archive. - - .. WARNING:: - If :code:`as_list` is :code:`False`, it will only return the first file matched ! - - You can use this `site `_ to build your regex. - - Example: - >>> arch_path = 'D:/path/to/zip.zip' - >>> file_regex = '.*dir.*file_name' # Use .* for any character - >>> path = get_archived_path(arch_path, file_regex) - 'dir/filename.tif' - - Args: - archive_path (AnyPathStrType): Archive path - file_regex (str): File regex (used by re) as it can be found in the getmembers() list - as_list (bool): If true, returns a list (including all found files). If false, returns only the first match - - Returns: - Union[list, str]: Path from inside the zipfile - """ - logs.deprecation_warning( - "This function is deprecated. Import it from 'sertit.path' instead of 'sertit.files'" - ) - return path.get_archived_path(archive_path, file_regex, as_list) - - -def get_archived_rio_path( - archive_path: AnyPathStrType, file_regex: str, as_list: bool = False -) -> Union[list, AnyPathType]: - """ - .. deprecated:: 1.30.0 - Import it from :py:mod:`sertit.path` instead of :py:mod:`sertit.files` - - Get archived file path from inside the archive, to be read with rasterio: - - - :code:`zip+file://{zip_path}!{file_name}` - - :code:`tar+file://{tar_path}!{file_name}` - - - See `here `_ - for more information. - - .. WARNING:: - It won't be readable by pandas, geopandas or xmltree ! - - .. WARNING:: - If :code:`as_list` is :code:`False`, it will only return the first file matched ! - - You can use this `site `_ to build your regex. - - Args: - archive_path (AnyPathStrType): Archive path - file_regex (str): File regex (used by re) as it can be found in the getmembers() list - as_list (bool): If true, returns a list (including all found files). If false, returns only the first match - - Returns: - Union[list, str]: Band path that can be read by rasterio - - Example: - >>> arch_path = 'D:/path/to/zip.zip' - >>> file_regex = '.*dir.*file_name' # Use .* for any character - >>> path = get_archived_tif_path(arch_path, file_regex) - 'zip+file://D:/path/to/output.zip!dir/filename.tif' - >>> rasterio.open(path) - - """ - logs.deprecation_warning( - "This function is deprecated. Import it from 'sertit.path' instead of 'sertit.files'" - ) - return path.get_archived_rio_path(archive_path, file_regex, as_list) - - -def read_archived_file( - archive_path: AnyPathStrType, regex: str, file_list: list = None -) -> bytes: - """ - Read archived file (in bytes) from :code:`zip` or :code:`tar` archives. - - You can use this `site `_ to build your regex. - - Args: - archive_path (AnyPathStrType): Archive path - regex (str): Regex (used by re) as it can be found in the getmembers() list - file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed. - - Returns: - bytes: Archived file in bytes - """ - archive_path = AnyPath(archive_path) - - # Compile regex - regex = re.compile(regex) - - # Open tar and zip XML - try: - if archive_path.suffix == ".tar": - with tarfile.open(archive_path) as tar_ds: - # file_list is not very useful for TAR files... - if file_list is None: - tar_mb = tar_ds.getmembers() - file_list = [mb.name for mb in tar_mb] - name = list(filter(regex.match, file_list))[0] - tarinfo = tar_ds.getmember(name) - file_str = tar_ds.extractfile(tarinfo).read() - elif archive_path.suffix == ".zip": - with zipfile.ZipFile(archive_path) as zip_ds: - if file_list is None: - file_list = [f.filename for f in zip_ds.filelist] - name = list(filter(regex.match, file_list))[0] - file_str = zip_ds.read(name) - - elif archive_path.suffix == ".tar.gz": - raise TypeError( - ".tar.gz files are too slow to read from inside the archive. Please extract them instead." - ) - else: - raise TypeError( - "Only .zip and .tar files can be read from inside its archive." - ) - except IndexError as exc: - raise FileNotFoundError( - f"Impossible to find file {regex} in {path.get_filename(archive_path)}" - ) from exc - - return file_str - - -def read_archived_xml( - archive_path: AnyPathStrType, regex: str = None, file_list: list = None, **kwargs -) -> etree._Element: - """ - Read archived XML from :code:`zip` or :code:`tar` archives. - - You can use this `site `_ to build your regex. - - Args: - archive_path (AnyPathStrType): Archive path - regex (str): XML regex (used by re) as it can be found in the getmembers() list - file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed. - - Returns: - etree._Element: XML file - - Example: - >>> arch_path = 'D:/path/to/zip.zip' - >>> file_regex = '.*dir.*file_name' # Use .* for any character - >>> read_archived_xml(arch_path, file_regex) - - """ - if regex is None: - logs.deprecation_warning( - "'xml_regex' is deprecated, please use 'regex' instead." - ) - regex = kwargs.pop("xml_regex") - - xml_bytes = read_archived_file(archive_path, regex=regex, file_list=file_list) - - return etree.fromstring(xml_bytes) - - -def read_archived_html( - archive_path: AnyPathStrType, regex: str, file_list: list = None -) -> html.HtmlElement: - """ - Read archived HTML from :code:`zip` or :code:`tar` archives. - - You can use this `site `_ to build your regex. - - Args: - archive_path (AnyPathStrType): Archive path - regex (str): HTML regex (used by re) as it can be found in the getmembers() list - file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed. - - Returns: - html._Element: HTML file - - Example: - >>> arch_path = 'D:/path/to/zip.zip' - >>> file_regex = '.*dir.*file_name' # Use .* for any character - >>> read_archived_html(arch_path, file_regex) - - """ - html_bytes = read_archived_file(archive_path, regex, file_list=file_list) - - return html.fromstring(html_bytes) - - -def archive( - folder_path: AnyPathStrType, - archive_path: AnyPathStrType, - fmt: str = "zip", -) -> AnyPathType: - """ - Archives a folder recursively. - - Args: - folder_path (AnyPathStrType): Folder to archive - archive_path (AnyPathStrType): Archive path, with or without extension - fmt (str): Format of the archive, used by :code:`shutil.make_archive`. Choose between [zip, tar, gztar, bztar, xztar] - - Returns: - str: Archive filename - - Example: - >>> folder_path = 'D:/path/to/folder_to_archive' - >>> archive_path = 'D:/path/to/output' - >>> archive = archive(folder_path, archive_path, fmt="gztar") - 'D:/path/to/output/folder_to_archive.tar.gz' - """ - archive_path = AnyPath(archive_path) - folder_path = AnyPath(folder_path) - - tmp_dir = None - if path.is_cloud_path(folder_path): - tmp_dir = tempfile.TemporaryDirectory() - folder_path = folder_path.download_to(tmp_dir.name) - - # Shutil make_archive needs a path without extension - archive_base = os.path.splitext(archive_path)[0] - - # Archive the folder - archive_fn = shutil.make_archive( - archive_base, - format=fmt, - root_dir=folder_path.parent, - base_dir=folder_path.name, - ) - - if tmp_dir is not None: - tmp_dir.cleanup() - - return AnyPath(archive_fn) - - -def add_to_zip( - zip_path: AnyPathStrType, - dirs_to_add: Union[list, AnyPathStrType], -) -> AnyPathType: - """ - Add folders to an already existing zip file (recursively). - - Args: - zip_path (AnyPathStrType): Already existing zip file - dirs_to_add (Union[list, AnyPathStrType]): Directories to add - - Returns: - AnyPathType: Updated zip_path - - Example: - >>> zip_path = 'D:/path/to/zip.zip' - >>> dirs_to_add = ['D:/path/to/dir1', 'D:/path/to/dir2'] - >>> add_to_zip(zip_path, dirs_to_add) - zip.zip contains 2 more folders, dir1 and dir2 - """ - zip_path = AnyPath(zip_path) - - # If the zip is on the cloud, cache it (zipfile doesn't like cloud paths) - if path.is_cloud_path(zip_path): - zip_path = AnyPath(zip_path.fspath) - - # Check if existing zipfile - if not zip_path.is_file(): - raise FileNotFoundError(f"Non existing {zip_path}") - - # Convert to list if needed - if not isinstance(dirs_to_add, list): - dirs_to_add = [dirs_to_add] - - # Add all folders to the existing zip - # Forced to use ZipFile because make_archive only works with one folder and not existing zipfile - with zipfile.ZipFile(zip_path, "a") as zip_file: - progress_bar = tqdm(dirs_to_add) - for dir_to_add_path in progress_bar: - # Just to be sure, use str instead of Paths - if isinstance(dir_to_add_path, Path): - dir_to_add = str(dir_to_add_path) - elif path.is_cloud_path(dir_to_add_path): - dir_to_add = dir_to_add_path.fspath - else: - dir_to_add = dir_to_add_path - - progress_bar.set_description( - f"Adding {os.path.basename(dir_to_add)} to {os.path.basename(zip_path)}" - ) - tmp = tempfile.TemporaryDirectory() - if os.path.isfile(dir_to_add): - dir_to_add = extract_file(dir_to_add, tmp.name) - - for root, _, files in os.walk(dir_to_add): - base_path = os.path.join(dir_to_add, "..") - - # Write dir (in namelist at least) - zip_file.write(root, os.path.relpath(root, base_path)) - - # Write files - for file in files: - zip_file.write( - os.path.join(root, file), - os.path.relpath( - os.path.join(root, file), os.path.join(dir_to_add, "..") - ), - ) - - # Clean tmp - tmp.cleanup() - - return zip_path - - def get_filename(file_path: AnyPathStrType, other_exts: Union[list, str] = None) -> str: """ .. deprecated:: 1.30.0 @@ -754,7 +300,7 @@ def copy(src: AnyPathStrType, dst: AnyPathStrType) -> AnyPathType: src = AnyPath(src) if path.is_cloud_path(src): - out = src.download_to(dst) + out = s3.download(src, dst) else: out = None try: diff --git a/sertit/path.py b/sertit/path.py index 48e9b90..30451e1 100644 --- a/sertit/path.py +++ b/sertit/path.py @@ -19,13 +19,10 @@ import logging import os import pprint -import re -import tarfile import tempfile -import zipfile from typing import Any, Union -from sertit import AnyPath, logs, s3 +from sertit import AnyPath from sertit.logs import SU_NAME from sertit.types import AnyPathStrType, AnyPathType @@ -150,189 +147,6 @@ def real_rel_path(raw_path: AnyPathStrType, start: AnyPathStrType) -> AnyPathTyp return rel_path -def get_archived_file_list(archive_path: AnyPathStrType) -> list: - """ - Get the list of all the files contained in an archive. - - Args: - archive_path (AnyPathStrType): Archive path - - Returns: - list: All files contained in the given archive - - Example: - >>> arch_path = 'D:/path/to/zip.zip' - >>> get_archived_file_list(arch_path, file_regex) - ['file_1.txt', 'file_2.tif', 'file_3.xml', 'file_4.geojson'] - """ - archive_path = AnyPath(archive_path) - - is_zip = archive_path.suffix == ".zip" - archive_fn = get_filename(archive_path) - if is_zip: - if is_cloud_path(archive_path): - archive_path = s3.read(archive_path) - - with zipfile.ZipFile(archive_path) as zip_ds: - file_list = [f.filename for f in zip_ds.filelist] - else: - try: - if is_cloud_path(archive_path): - args = {"fileobj": s3.read(archive_path), "mode": "r"} - else: - args = {"name": archive_path, "mode": "r"} - with tarfile.open(**args) as tar_ds: - tar_mb = tar_ds.getmembers() - file_list = [mb.name for mb in tar_mb] - except tarfile.ReadError as ex: - raise tarfile.ReadError(f"Impossible to open archive: {archive_fn}") from ex - - return file_list - - -def get_archived_path( - archive_path: AnyPathStrType, - regex: str, - as_list: bool = False, - case_sensitive: bool = False, - file_list: list = None, - **kwargs, -) -> Union[list, AnyPathType]: - """ - Get archived file path from inside the archive. - - .. WARNING:: - If :code:`as_list` is :code:`False`, it will only return the first file matched ! - - You can use this `site `_ to build your regex. - - Args: - archive_path (AnyPathStrType): Archive path - regex (str): File regex (used by re) as it can be found in the getmembers() list - as_list (bool): If true, returns a list (including all found files). If false, returns only the first match - case_sensitive (bool): If true, the regex is case-sensitive. - file_list (list): List of files to get archived from. Optional, if not given it will be re-computed. - - Returns: - Union[list, str]: Path from inside the zipfile - - Example: - >>> arch_path = 'D:/path/to/zip.zip' - >>> file_regex = '.*dir.*file_name' # Use .* for any character - >>> path = get_archived_path(arch_path, file_regex) - 'dir/filename.tif' - """ - if regex is None: - logs.deprecation_warning( - "'file_regex' is deprecated, please use 'regex' instead." - ) - regex = kwargs.pop("file_regex") - - # Get file list - archive_path = AnyPath(archive_path) - - # Offer the ability to give the file list directly, as this operation is expensive when done with large archives stored on the cloud - if file_list is None: - file_list = get_archived_file_list(archive_path) - - # Search for file - re_rgx = re.compile(regex) if case_sensitive else re.compile(regex, re.IGNORECASE) - archived_band_paths = list(filter(re_rgx.match, file_list)) - if not archived_band_paths: - raise FileNotFoundError( - f"Impossible to find file {regex} in {get_filename(archive_path)}" - ) - - # Convert to str if needed - if not as_list: - archived_band_paths = archived_band_paths[0] - - return archived_band_paths - - -def get_archived_rio_path( - archive_path: AnyPathStrType, - regex: str, - as_list: bool = False, - file_list: list = None, - **kwargs, -) -> Union[list, AnyPathType]: - """ - Get archived file path from inside the archive, to be read with rasterio: - - - :code:`zip+file://{zip_path}!{file_name}` - - :code:`tar+file://{tar_path}!{file_name}` - - - See `here `_ - for more information. - - .. WARNING:: - It wont be readable by pandas, geopandas or xmltree ! - - .. WARNING:: - If :code:`as_list` is :code:`False`, it will only return the first file matched ! - - You can use this `site `_ to build your regex. - - Args: - archive_path (AnyPathStrType): Archive path - regex (str): File regex (used by re) as it can be found in the getmembers() list - as_list (bool): If true, returns a list (including all found files). If false, returns only the first match - file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed. - - Returns: - Union[list, str]: Band path that can be read by rasterio - - Example: - >>> arch_path = 'D:/path/to/zip.zip' - >>> file_regex = '.*dir.*file_name' # Use .* for any character - >>> path = get_archived_tif_path(arch_path, file_regex) - 'zip+file://D:/path/to/output.zip!dir/filename.tif' - >>> rasterio.open(path) - - """ - if regex is None: - logs.deprecation_warning( - "'file_regex' is deprecated, please use 'regex' instead." - ) - regex = kwargs.pop("file_regex") - - archive_path = AnyPath(archive_path) - if archive_path.suffix in [".tar", ".zip"]: - prefix = archive_path.suffix[-3:] - elif archive_path.suffix == ".tar.gz": - raise TypeError( - ".tar.gz files are too slow to be read from inside the archive. Please extract them instead." - ) - else: - raise TypeError("Only .zip and .tar files can be read from inside its archive.") - - # Search for file - archived_band_paths = get_archived_path( - archive_path, regex=regex, as_list=True, file_list=file_list - ) - - # Convert to rio path - if is_cloud_path(archive_path): - archived_band_paths = [ - f"{prefix}+file+{archive_path}!{path}" for path in archived_band_paths - ] - else: - # archived_band_paths = [ - # f"{prefix}+file://{archive_path}!{path}" for path in archived_band_paths - # ] - archived_band_paths = [ - f"/vsi{prefix}/{archive_path}/{path}" for path in archived_band_paths - ] - - # Convert to str if needed - if not as_list: - archived_band_paths = archived_band_paths[0] - - return archived_band_paths - - def get_filename(file_path: AnyPathStrType, other_exts: Union[list, str] = None) -> str: """ Get file name (without extension) from file path, ie: diff --git a/sertit/vectors.py b/sertit/vectors.py index 9b214b2..87856e9 100644 --- a/sertit/vectors.py +++ b/sertit/vectors.py @@ -23,9 +23,7 @@ import os import re import shutil -import tarfile import tempfile -import zipfile from collections.abc import Generator from contextlib import contextmanager from typing import Any, Union @@ -36,7 +34,7 @@ from cloudpathlib.exceptions import AnyPathTypeError from shapely import Polygon, wkt -from sertit import AnyPath, files, geometry, logs, misc, path, strings +from sertit import AnyPath, archives, files, geometry, logs, misc, path, s3, strings from sertit.logs import SU_NAME from sertit.types import AnyPathStrType, AnyPathType @@ -256,8 +254,11 @@ def get_aoi_wkt(aoi_path: AnyPathStrType, as_str: bool = True) -> Union[str, Pol if aoi_path.suffix == ".wkt": try: - with open(aoi_path) as aoi_f: - aoi = wkt.load(aoi_f) + if path.is_cloud_path(aoi_path): + aoi = wkt.load(s3.read(aoi_path)) + else: + with open(aoi_path) as aoi_f: + aoi = wkt.load(aoi_f) except Exception as ex: raise ValueError("AOI WKT cannot be read") from ex else: @@ -471,13 +472,17 @@ def read( if "!" in str(vector_path): split_vect = str(vector_path).split("!") archive_regex = ".*{}".format(split_vect[1].replace(".", r"\.")) - vector_path = AnyPath(split_vect[0]) + try: + vector_path = AnyPath(split_vect[0], **vector_path.storage_options) + except Exception: + # Cloudpathlib + vector_path = AnyPath(split_vect[0]) # Manage archive case if vector_path.suffix in [".tar", ".zip"]: prefix = vector_path.suffix[-3:] file_list = kwargs.pop( - "file_list", path.get_archived_file_list(vector_path) + "file_list", archives.get_archived_file_list(vector_path) ) try: @@ -710,16 +715,16 @@ def ogr2geojson( # archived vector_path are extracted in a tmp folder so no need to be downloaded if vector_path.suffix == ".zip": - with zipfile.ZipFile(vector_path, "r") as zip_ds: + with archives.open_zipfile(vector_path, "r") as zip_ds: vect_path = zip_ds.extract(arch_vect_path, out_dir) elif vector_path.suffix == ".tar": - with tarfile.open(vector_path, "r") as tar_ds: + with archives.open_tarfile(vector_path, "r") as tar_ds: tar_ds.extract(arch_vect_path, out_dir) vect_path = os.path.join(out_dir, arch_vect_path) else: # vector_path should be downloaded to work with 'ogr2ogr' if path.is_cloud_path(vector_path): - vector_path = AnyPath(vector_path).fspath + vector_path = s3.download(vector_path, out_dir) vect_path = vector_path vect_path_gj = os.path.join( diff --git a/sertit/xml.py b/sertit/xml.py index 9ddc44a..8d334ef 100644 --- a/sertit/xml.py +++ b/sertit/xml.py @@ -30,7 +30,7 @@ ) from lxml.html.builder import E -from sertit import AnyPath, files, path +from sertit import AnyPath, archives, logs, path, s3 from sertit.logs import SU_NAME from sertit.misc import ListEnum from sertit.types import AnyPathStrType @@ -61,7 +61,7 @@ def read(xml_path: AnyPathStrType) -> _Element: # Slower but works with: # {ValueError}Unicode strings with encoding declaration are not supported. # Please use bytes input or XML fragments without declaration. - root = fromstring(xml_path.read_bytes()) + root = fromstring(s3.read(xml_path).read()) else: # pylint: disable=I1101: # Module 'lxml.etree' has no 'parse' member, but source is unavailable. @@ -75,7 +75,10 @@ def read(xml_path: AnyPathStrType) -> _Element: def read_archive( - path: AnyPathStrType, regex: str = None, file_list: list = None + archive_path: AnyPathStrType, + regex: str = None, + file_list: list = None, + **kwargs, ) -> _Element: """ Read an XML file from inside an archive (zip or tar) @@ -87,25 +90,40 @@ def read_archive( - path to the archive plus a regex looking inside the archive. Duplicate behaviour to :py:func:`files.read_archived_xml` Args: - path (AnyPathStrType): Path to the XML file, stored inside an archive or path to the archive itself + archive_path (AnyPathStrType): Path to the XML file, stored inside an archive or path to the archive itself regex (str): Optional. If specified, the path should be the archive path and the regex should be the key to find the XML file inside the archive. file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed. Returns: _Element: XML Root """ + if archive_path is None: + logs.deprecation_warning( + "'path' argument is deprecated, use 'archive_path' instead." + ) + archive_path = kwargs.pop("path") try: if not regex: - path, basename = str(path).split("!") + archive_base_path, basename = str(archive_path).split("!") regex = basename - if path.startswith("zip://") or path.startswith("tar://"): - path = path[5:] + if archive_base_path.startswith("zip://") or archive_base_path.startswith( + "tar://" + ): + archive_base_path = archive_base_path[5:] + + # For UPath + with contextlib.suppress(Exception): + archive_base_path = AnyPath( + archive_base_path, **archive_path.storage_options + ) + else: + archive_base_path = archive_path - return files.read_archived_xml(path, regex, file_list=file_list) + return archives.read_archived_xml(archive_base_path, regex, file_list=file_list) except XMLSyntaxError as exc: - raise ValueError(f"Invalid metadata XML for {path}!") from exc + raise ValueError(f"Invalid metadata XML for {archive_path}!") from exc def write(xml: _Element, path: str) -> None: