diff --git a/CI/SCRIPTS/test_archives.py b/CI/SCRIPTS/test_archives.py
new file mode 100644
index 0000000..3415618
--- /dev/null
+++ b/CI/SCRIPTS/test_archives.py
@@ -0,0 +1,147 @@
+import os
+import shutil
+
+import pytest
+from lxml import etree, html
+
+from CI.SCRIPTS.script_utils import files_path, s3_env
+from sertit import archives, ci, files, path, s3, vectors
+
+
+def test_archive(tmp_path):
+ """Test extracting functions"""
+ # Archives
+ zip_file = files_path().joinpath("test_zip.zip")
+ zip2_file = files_path().joinpath("test_zip.zip") # For overwrite
+ zip_without_directory = files_path().joinpath("test_zip_without_directory.zip")
+ tar_file = files_path().joinpath("test_tar.tar")
+ tar_gz_file = files_path().joinpath("test_targz.tar.gz")
+
+ # Core dir
+ core_dir = files_path().joinpath("core")
+ folder = core_dir
+ arch = [
+ zip_file,
+ tar_file,
+ tar_gz_file,
+ folder,
+ zip2_file,
+ zip_without_directory,
+ ]
+
+ # Extract
+ extracted_dirs = archives.extract_files(arch, tmp_path, overwrite=True)
+ archives.extract_files([zip2_file], tmp_path, overwrite=False) # Already existing
+
+ # Test
+ for ex_dir in extracted_dirs:
+ ci.assert_dir_equal(core_dir, ex_dir)
+
+ # Archive
+ archive_base = os.path.join(tmp_path, "archive")
+ for fmt in ["zip", "tar", "gztar"]:
+ archive_fn = archives.archive(
+ folder_path=core_dir, archive_path=archive_base, fmt=fmt
+ )
+ out = archives.extract_file(archive_fn, tmp_path)
+ # an additional folder is created
+ out_dir = path.listdir_abspath(out)[0]
+ ci.assert_dir_equal(core_dir, out_dir)
+
+ # Remove out directory in order to avoid any interferences
+ files.remove(out)
+
+ # Add to zip
+ zip_out = zip2_file if path.is_cloud_path(zip2_file) else archive_base + ".zip"
+ core_copy = files.copy(core_dir, os.path.join(tmp_path, "core2"))
+ zip_out = archives.add_to_zip(zip_out, core_copy)
+
+ # Extract
+ unzip_out = os.path.join(tmp_path, "out")
+ unzip_out = archives.extract_file(zip_out, unzip_out)
+
+ # Test
+ unzip_dirs = path.listdir_abspath(unzip_out)
+
+ assert len(unzip_dirs) == 2
+ ci.assert_dir_equal(unzip_dirs[0], unzip_dirs[1])
+
+
+@s3_env
+def test_archived_files(tmp_path):
+ landsat_name = "LM05_L1TP_200030_20121230_20200820_02_T2_CI"
+ ok_folder = files_path().joinpath(landsat_name)
+ zip_file = files_path().joinpath(f"{landsat_name}.zip")
+ tar_file = files_path().joinpath(f"{landsat_name}.tar")
+ targz_file = files_path().joinpath(f"{landsat_name}.tar.gz")
+ sz_file = files_path().joinpath(f"{landsat_name}.7z")
+
+ # VECTORS
+ vect_name = "map-overlay.kml"
+ vec_ok_path = ok_folder.joinpath(vect_name)
+ if shutil.which("ogr2ogr"): # Only works if ogr2ogr can be found.
+ vect_regex = f".*{vect_name}"
+ vect_zip = vectors.read(zip_file, archive_regex=vect_regex)
+ vect_tar = vectors.read(tar_file, archive_regex=r".*overlay\.kml")
+ vect_ok = vectors.read(vec_ok_path)
+ assert not vect_ok.empty
+ ci.assert_geom_equal(vect_ok, vect_zip)
+ ci.assert_geom_equal(vect_ok, vect_tar)
+
+ # XML
+ xml_name = "LM05_L1TP_200030_20121230_20200820_02_T2_MTL.xml"
+ xml_ok_path = ok_folder.joinpath(xml_name)
+ xml_ok_path = str(s3.download(xml_ok_path, tmp_path))
+
+ xml_regex = f".*{xml_name}"
+ xml_zip = archives.read_archived_xml(zip_file, xml_regex)
+ xml_tar = archives.read_archived_xml(tar_file, r".*_MTL\.xml")
+ xml_ok = etree.parse(xml_ok_path).getroot()
+ ci.assert_xml_equal(xml_ok, xml_zip)
+ ci.assert_xml_equal(xml_ok, xml_tar)
+
+ # FILE + HTML
+ html_zip_file = files_path().joinpath("productPreview.zip")
+ html_tar_file = files_path().joinpath("productPreview.tar")
+ html_name = "productPreview.html"
+ html_ok_path = files_path().joinpath(html_name)
+ html_ok_path = str(s3.download(html_ok_path, tmp_path))
+
+ html_regex = f".*{html_name}"
+
+ # FILE
+ file_zip = archives.read_archived_file(html_zip_file, html_regex)
+ file_tar = archives.read_archived_file(html_tar_file, html_regex)
+ html_ok = html.parse(html_ok_path).getroot()
+ ci.assert_html_equal(html_ok, html.fromstring(file_zip))
+ ci.assert_html_equal(html_ok, html.fromstring(file_tar))
+
+ file_list = archives.get_archived_file_list(html_zip_file)
+ ci.assert_html_equal(
+ html_ok,
+ html.fromstring(
+ archives.read_archived_file(html_zip_file, html_regex, file_list=file_list)
+ ),
+ )
+
+ # HTML
+ html_zip = archives.read_archived_html(html_zip_file, html_regex)
+ html_tar = archives.read_archived_html(html_tar_file, html_regex)
+ ci.assert_html_equal(html_ok, html_zip)
+ ci.assert_html_equal(html_ok, html_tar)
+ ci.assert_html_equal(
+ html_ok,
+ archives.read_archived_html(
+ html_tar_file,
+ html_regex,
+ file_list=archives.get_archived_file_list(html_tar_file),
+ ),
+ )
+
+ # ERRORS
+ with pytest.raises(TypeError):
+ archives.read_archived_file(targz_file, xml_regex)
+ with pytest.raises(TypeError):
+ archives.read_archived_file(sz_file, xml_regex)
+ with pytest.raises(FileNotFoundError):
+ archives.read_archived_file(zip_file, "cdzeferf")
diff --git a/CI/SCRIPTS/test_files.py b/CI/SCRIPTS/test_files.py
index 04015b7..a0f9889 100644
--- a/CI/SCRIPTS/test_files.py
+++ b/CI/SCRIPTS/test_files.py
@@ -16,160 +16,18 @@
"""Script testing the files"""
import os
-import shutil
import tempfile
from datetime import date, datetime
import numpy as np
import pytest
-from lxml import etree, html
-from CI.SCRIPTS.script_utils import Polarization, files_path, s3_env
-from sertit import AnyPath, ci, files, path, s3, vectors
+from CI.SCRIPTS.script_utils import Polarization
+from sertit import AnyPath, ci, files
ci.reduce_verbosity()
-def test_archive():
- """Test extracting functions"""
- with tempfile.TemporaryDirectory() as tmp_dir:
- # Archives
- zip_file = files_path().joinpath("test_zip.zip")
- zip2_file = files_path().joinpath("test_zip.zip") # For overwrite
- zip_without_directory = files_path().joinpath("test_zip_without_directory.zip")
- tar_file = files_path().joinpath("test_tar.tar")
- tar_gz_file = files_path().joinpath("test_targz.tar.gz")
-
- # Core dir
- core_dir = files_path().joinpath("core")
- folder = core_dir
- archives = [
- zip_file,
- tar_file,
- tar_gz_file,
- folder,
- zip2_file,
- zip_without_directory,
- ]
-
- # Extract
- extracted_dirs = files.extract_files(archives, tmp_dir, overwrite=True)
- files.extract_files([zip2_file], tmp_dir, overwrite=False) # Already existing
-
- # Test
- for ex_dir in extracted_dirs:
- ci.assert_dir_equal(core_dir, ex_dir)
-
- # Archive
- archive_base = os.path.join(tmp_dir, "archive")
- for fmt in ["zip", "tar", "gztar"]:
- archive_fn = files.archive(
- folder_path=core_dir, archive_path=archive_base, fmt=fmt
- )
- out = files.extract_file(archive_fn, tmp_dir)
- # an additional folder is created
- out_dir = path.listdir_abspath(out)[0]
- ci.assert_dir_equal(core_dir, out_dir)
-
- # Remove out directory in order to avoid any interferences
- files.remove(out)
-
- # Add to zip
- zip_out = zip2_file if path.is_cloud_path(zip2_file) else archive_base + ".zip"
- core_copy = files.copy(core_dir, os.path.join(tmp_dir, "core2"))
- zip_out = files.add_to_zip(zip_out, core_copy)
-
- # Extract
- unzip_out = os.path.join(tmp_dir, "out")
- unzip_out = files.extract_file(zip_out, unzip_out)
-
- # Test
- unzip_dirs = path.listdir_abspath(unzip_out)
-
- assert len(unzip_dirs) == 2
- ci.assert_dir_equal(unzip_dirs[0], unzip_dirs[1])
-
-
-@s3_env
-def test_archived_files(tmp_path):
- landsat_name = "LM05_L1TP_200030_20121230_20200820_02_T2_CI"
- ok_folder = files_path().joinpath(landsat_name)
- zip_file = files_path().joinpath(f"{landsat_name}.zip")
- tar_file = files_path().joinpath(f"{landsat_name}.tar")
- targz_file = files_path().joinpath(f"{landsat_name}.tar.gz")
- sz_file = files_path().joinpath(f"{landsat_name}.7z")
-
- # VECTORS
- vect_name = "map-overlay.kml"
- vec_ok_path = ok_folder.joinpath(vect_name)
- if shutil.which("ogr2ogr"): # Only works if ogr2ogr can be found.
- vect_regex = f".*{vect_name}"
- vect_zip = vectors.read(zip_file, archive_regex=vect_regex)
- vect_tar = vectors.read(tar_file, archive_regex=r".*overlay\.kml")
- vect_ok = vectors.read(vec_ok_path)
- assert not vect_ok.empty
- ci.assert_geom_equal(vect_ok, vect_zip)
- ci.assert_geom_equal(vect_ok, vect_tar)
-
- # XML
- xml_name = "LM05_L1TP_200030_20121230_20200820_02_T2_MTL.xml"
- xml_ok_path = ok_folder.joinpath(xml_name)
- xml_ok_path = str(s3.download(xml_ok_path, tmp_path))
-
- xml_regex = f".*{xml_name}"
- xml_zip = files.read_archived_xml(zip_file, xml_regex)
- xml_tar = files.read_archived_xml(tar_file, r".*_MTL\.xml")
- xml_ok = etree.parse(xml_ok_path).getroot()
- ci.assert_xml_equal(xml_ok, xml_zip)
- ci.assert_xml_equal(xml_ok, xml_tar)
-
- # FILE + HTML
- html_zip_file = files_path().joinpath("productPreview.zip")
- html_tar_file = files_path().joinpath("productPreview.tar")
- html_name = "productPreview.html"
- html_ok_path = files_path().joinpath(html_name)
- html_ok_path = str(s3.download(html_ok_path, tmp_path))
-
- html_regex = f".*{html_name}"
-
- # FILE
- file_zip = files.read_archived_file(html_zip_file, html_regex)
- file_tar = files.read_archived_file(html_tar_file, html_regex)
- html_ok = html.parse(html_ok_path).getroot()
- ci.assert_html_equal(html_ok, html.fromstring(file_zip))
- ci.assert_html_equal(html_ok, html.fromstring(file_tar))
-
- file_list = path.get_archived_file_list(html_zip_file)
- ci.assert_html_equal(
- html_ok,
- html.fromstring(
- files.read_archived_file(html_zip_file, html_regex, file_list=file_list)
- ),
- )
-
- # HTML
- html_zip = files.read_archived_html(html_zip_file, html_regex)
- html_tar = files.read_archived_html(html_tar_file, html_regex)
- ci.assert_html_equal(html_ok, html_zip)
- ci.assert_html_equal(html_ok, html_tar)
- ci.assert_html_equal(
- html_ok,
- files.read_archived_html(
- html_tar_file,
- html_regex,
- file_list=path.get_archived_file_list(html_tar_file),
- ),
- )
-
- # ERRORS
- with pytest.raises(TypeError):
- files.read_archived_file(targz_file, xml_regex)
- with pytest.raises(TypeError):
- files.read_archived_file(sz_file, xml_regex)
- with pytest.raises(FileNotFoundError):
- files.read_archived_file(zip_file, "cdzeferf")
-
-
def test_cp_rm():
"""Test CP/RM functions"""
with tempfile.TemporaryDirectory() as tmp_dir:
diff --git a/CI/SCRIPTS/test_path.py b/CI/SCRIPTS/test_path.py
index bf335ec..d173bc3 100644
--- a/CI/SCRIPTS/test_path.py
+++ b/CI/SCRIPTS/test_path.py
@@ -16,13 +16,12 @@
"""Script testing the files"""
import os
-import shutil
import tempfile
import pytest
-from CI.SCRIPTS.script_utils import files_path, get_s3_ci_path, s3_env
-from sertit import AnyPath, ci, misc, path, vectors
+from CI.SCRIPTS.script_utils import get_s3_ci_path
+from sertit import AnyPath, ci, misc, path
ci.reduce_verbosity()
@@ -65,58 +64,6 @@ def test_paths():
assert not path.is_writable("cvfgbherth") # Non-existing
-@s3_env
-def test_archived_paths():
- landsat_name = "LM05_L1TP_200030_20121230_20200820_02_T2_CI"
- ok_folder = files_path().joinpath(landsat_name)
- zip_file = files_path().joinpath(f"{landsat_name}.zip")
- tar_file = files_path().joinpath(f"{landsat_name}.tar")
- targz_file = files_path().joinpath(f"{landsat_name}.tar.gz")
- sz_file = files_path().joinpath(f"{landsat_name}.7z")
-
- # Archive file
- tif_name = "LM05_L1TP_200030_20121230_20200820_02_T2_QA_RADSAT.TIF"
- tif_ok = f"{ok_folder.name}/{tif_name}"
- tif_regex = f".*{tif_name}"
- assert tif_ok == path.get_archived_path(zip_file, tif_regex)
- assert tif_ok == path.get_archived_path(zip_file, tif_regex, as_list=True)[0]
- assert tif_ok == path.get_archived_path(tar_file, ".*RADSAT")
-
- # RASTERIO
- tif_zip = path.get_archived_rio_path(zip_file, tif_regex)
- tif_list = path.get_archived_rio_path(zip_file, tif_regex, as_list=True)
- tif_tar = path.get_archived_rio_path(tar_file, ".*RADSAT")
- tif_ok = ok_folder.joinpath(tif_name)
- ci.assert_raster_equal(tif_ok, tif_zip)
- ci.assert_raster_equal(tif_ok, tif_list[0])
- ci.assert_raster_equal(tif_ok, tif_tar)
-
- file_list = path.get_archived_file_list(zip_file)
- ci.assert_raster_equal(
- tif_ok, path.get_archived_rio_path(zip_file, tif_regex, file_list=file_list)
- )
-
- # VECTORS
- vect_name = "map-overlay.kml"
- vec_ok_path = ok_folder.joinpath(vect_name)
- if shutil.which("ogr2ogr"): # Only works if ogr2ogr can be found.
- vect_regex = f".*{vect_name}"
- vect_zip = vectors.read(zip_file, archive_regex=vect_regex)
- vect_tar = vectors.read(tar_file, archive_regex=r".*overlay\.kml")
- vect_ok = vectors.read(vec_ok_path)
- assert not vect_ok.empty
- ci.assert_geom_equal(vect_ok, vect_zip)
- ci.assert_geom_equal(vect_ok, vect_tar)
-
- # ERRORS
- with pytest.raises(TypeError):
- path.get_archived_rio_path(targz_file, tif_regex)
- with pytest.raises(TypeError):
- path.get_archived_rio_path(sz_file, tif_regex)
- with pytest.raises(FileNotFoundError):
- path.get_archived_rio_path(zip_file, "cdzeferf")
-
-
def test_get_file_name():
"""Test get_file_name"""
file_name = path.get_filename(__file__)
diff --git a/CI/SCRIPTS/test_vectors.py b/CI/SCRIPTS/test_vectors.py
index 5f9bd92..5a79272 100644
--- a/CI/SCRIPTS/test_vectors.py
+++ b/CI/SCRIPTS/test_vectors.py
@@ -25,7 +25,7 @@
from shapely import wkt
from CI.SCRIPTS.script_utils import KAPUT_KWARGS, files_path, s3_env, vectors_path
-from sertit import ci, files, path, vectors
+from sertit import archives, ci, files, path, vectors
from sertit.vectors import EPSG_4326, DataSourceError
ci.reduce_verbosity()
@@ -294,7 +294,7 @@ def test_read_archived():
vectors.read(tar_landsat, archive_regex=map_overlay_regex),
)
- file_list = path.get_archived_file_list(tar_landsat)
+ file_list = archives.get_archived_file_list(tar_landsat)
ci.assert_geom_equal(
map_overlay_extracted,
vectors.read(tar_landsat, archive_regex=map_overlay_regex, file_list=file_list),
diff --git a/sertit/archives.py b/sertit/archives.py
new file mode 100644
index 0000000..2115071
--- /dev/null
+++ b/sertit/archives.py
@@ -0,0 +1,558 @@
+import logging
+import os
+import re
+import shutil
+import tarfile
+import tempfile
+import zipfile
+from contextlib import contextmanager
+from pathlib import Path
+from typing import Union
+
+from lxml import etree, html
+from tqdm import tqdm
+
+from sertit import AnyPath, logs, path, s3
+from sertit.logs import SU_NAME
+from sertit.types import AnyPathStrType, AnyPathType
+
+LOGGER = logging.getLogger(SU_NAME)
+
+
+@contextmanager
+def open_zipfile(file_path, mode="r"):
+ if path.is_cloud_path(file_path):
+ file_path = s3.read(file_path)
+
+ with zipfile.ZipFile(file_path, mode) as zip_file:
+ yield zip_file
+
+
+@contextmanager
+def open_tarfile(file_path, mode="r"):
+ if path.is_cloud_path(file_path):
+ args = {"fileobj": s3.read(file_path), "mode": mode}
+ else:
+ args = {"name": file_path, "mode": mode}
+ with tarfile.open(**args) as tar_file:
+ yield tar_file
+
+
+def extract_file(
+ file_path: AnyPathStrType,
+ output: AnyPathStrType,
+ overwrite: bool = False,
+) -> AnyPathType:
+ """
+ Extract an archived file (zip or others). Overwrites if specified.
+ If the archive don't contain a root directory with the name of the archive without the extension, create it
+
+ Args:
+ file_path (str): Archive file path
+ output (str): Output where to put the extracted directory
+ overwrite (bool): Overwrite found extracted directory
+
+ Returns:
+ AnyPathType: Extracted directory paths
+
+ Example:
+ >>> file_path = 'D:/path/to/zip.zip'
+ >>> output = 'D:/path/to/output'
+ >>> extract_file(file_path, output, overwrite=True)
+ D:/path/to/output/zip'
+ """
+ # Convert to path
+ file_path = AnyPath(file_path)
+ output = AnyPath(output)
+
+ # In case a folder is given, returns it (this means that the file is already extracted)
+ if file_path.is_dir():
+ return file_path
+
+ # Beware with .SEN3 and .SAFE extensions
+ archive_output = output.joinpath(path.get_filename(file_path))
+
+ # In case not overwrite and the extracted directory already exists
+ if not overwrite and archive_output.exists():
+ LOGGER.debug(
+ "Already existing extracted %s. It won't be overwritten.",
+ archive_output,
+ )
+ return archive_output
+
+ def extract_sub_dir(arch, filename_list):
+ top_level_files = list({item.split("/")[0] for item in filename_list})
+
+ # When the only root directory in the archive has the right name, we don't have to create it
+ if len(top_level_files) == 1 and archive_output.name == path.get_filename(
+ top_level_files[0]
+ ):
+ arch.extractall(archive_output.parent)
+ archive_output.parent.joinpath(top_level_files[0]).rename(archive_output)
+ else:
+ arch.extractall(archive_output)
+
+ # Manage archive type
+ if file_path.suffix == ".zip":
+ with open_zipfile(file_path) as zip_file:
+ extract_sub_dir(zip_file, zip_file.namelist())
+ elif file_path.suffix == ".tar" or file_path.suffixes == [".tar", ".gz"]:
+ with open_tarfile(file_path) as tar_file:
+ extract_sub_dir(tar_file, tar_file.getnames())
+ elif file_path.suffix == ".7z":
+ try:
+ import py7zr
+
+ with py7zr.SevenZipFile(file_path, "r") as z7_file:
+ extract_sub_dir(z7_file, z7_file.getnames())
+ except ModuleNotFoundError as exc:
+ raise TypeError("Please install 'py7zr' to extract .7z files") from exc
+ else:
+ raise TypeError(
+ f"Only .zip, .tar, .tar.gz and .7z files can be extracted, not {file_path}"
+ )
+
+ return archive_output
+
+
+def extract_files(
+ archives: list, output: AnyPathStrType, overwrite: bool = False
+) -> list:
+ """
+ Extract all archived files. Overwrites if specified.
+
+ Example:
+ >>> file_path = ['D:/path/to/zip1.zip', 'D:/path/to/zip2.zip']
+ >>> output = 'D:/path/to/output'
+ >>> extract_files(file_path, output, overwrite=True)
+ ['D:/path/to/output.zip1', 'D:/path/to/output.zip2']
+
+ Args:
+ archives (list of str): List of archives to be extracted
+ output (str): Output folder where extracted files will be written
+ overwrite (bool): Overwrite found extracted files
+
+ Returns:
+ list: Extracted files (even pre-existing ones)
+ """
+ LOGGER.info("Extracting products in %s", output)
+ progress_bar = tqdm(archives)
+ extracts = []
+ for arch in progress_bar:
+ progress_bar.set_description(f"Extracting product {os.path.basename(arch)}")
+ extracts.append(extract_file(arch, output, overwrite))
+
+ return extracts
+
+
+def read_archived_file(
+ archive_path: AnyPathStrType, regex: str, file_list: list = None
+) -> bytes:
+ """
+ Read archived file (in bytes) from :code:`zip` or :code:`tar` archives.
+
+ You can use this `site `_ to build your regex.
+
+ Args:
+ archive_path (AnyPathStrType): Archive path
+ regex (str): Regex (used by re) as it can be found in the getmembers() list
+ file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed.
+
+ Returns:
+ bytes: Archived file in bytes
+ """
+ archive_path = AnyPath(archive_path)
+
+ # Compile regex
+ regex = re.compile(regex)
+
+ # Open tar and zip XML
+ try:
+ if archive_path.suffix == ".tar":
+ with open_tarfile(archive_path) as tar_ds:
+ # file_list is not very useful for TAR files...
+ if file_list is None:
+ tar_mb = tar_ds.getmembers()
+ file_list = [mb.name for mb in tar_mb]
+ name = list(filter(regex.match, file_list))[0]
+ tarinfo = tar_ds.getmember(name)
+ file_str = tar_ds.extractfile(tarinfo).read()
+ elif archive_path.suffix == ".zip":
+ with open_zipfile(archive_path) as zip_ds:
+ if file_list is None:
+ file_list = [f.filename for f in zip_ds.filelist]
+ name = list(filter(regex.match, file_list))[0]
+ file_str = zip_ds.read(name)
+
+ elif archive_path.suffix == ".tar.gz":
+ raise TypeError(
+ ".tar.gz files are too slow to read from inside the archive. Please extract them instead."
+ )
+ else:
+ raise TypeError(
+ "Only .zip and .tar files can be read from inside its archive."
+ )
+ except IndexError as exc:
+ raise FileNotFoundError(
+ f"Impossible to find file {regex} in {path.get_filename(archive_path)}"
+ ) from exc
+
+ return file_str
+
+
+def read_archived_xml(
+ archive_path: AnyPathStrType, regex: str = None, file_list: list = None, **kwargs
+) -> etree._Element:
+ """
+ Read archived XML from :code:`zip` or :code:`tar` archives.
+
+ You can use this `site `_ to build your regex.
+
+ Args:
+ archive_path (AnyPathStrType): Archive path
+ regex (str): XML regex (used by re) as it can be found in the getmembers() list
+ file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed.
+
+ Returns:
+ etree._Element: XML file
+
+ Example:
+ >>> arch_path = 'D:/path/to/zip.zip'
+ >>> file_regex = '.*dir.*file_name' # Use .* for any character
+ >>> read_archived_xml(arch_path, file_regex)
+
+ """
+ if regex is None:
+ logs.deprecation_warning(
+ "'xml_regex' is deprecated, please use 'regex' instead."
+ )
+ regex = kwargs.pop("xml_regex")
+
+ xml_bytes = read_archived_file(archive_path, regex=regex, file_list=file_list)
+
+ return etree.fromstring(xml_bytes)
+
+
+def read_archived_html(
+ archive_path: AnyPathStrType, regex: str, file_list: list = None
+) -> html.HtmlElement:
+ """
+ Read archived HTML from :code:`zip` or :code:`tar` archives.
+
+ You can use this `site `_ to build your regex.
+
+ Args:
+ archive_path (AnyPathStrType): Archive path
+ regex (str): HTML regex (used by re) as it can be found in the getmembers() list
+ file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed.
+
+ Returns:
+ html._Element: HTML file
+
+ Example:
+ >>> arch_path = 'D:/path/to/zip.zip'
+ >>> file_regex = '.*dir.*file_name' # Use .* for any character
+ >>> read_archived_html(arch_path, file_regex)
+
+ """
+ html_bytes = read_archived_file(archive_path, regex, file_list=file_list)
+
+ return html.fromstring(html_bytes)
+
+
+def archive(
+ folder_path: AnyPathStrType,
+ archive_path: AnyPathStrType,
+ fmt: str = "zip",
+) -> AnyPathType:
+ """
+ Archives a folder recursively.
+
+ Args:
+ folder_path (AnyPathStrType): Folder to archive
+ archive_path (AnyPathStrType): Archive path, with or without extension
+ fmt (str): Format of the archive, used by :code:`shutil.make_archive`. Choose between [zip, tar, gztar, bztar, xztar]
+
+ Returns:
+ str: Archive filename
+
+ Example:
+ >>> folder_path = 'D:/path/to/folder_to_archive'
+ >>> archive_path = 'D:/path/to/output'
+ >>> archive = archive(folder_path, archive_path, fmt="gztar")
+ 'D:/path/to/output/folder_to_archive.tar.gz'
+ """
+ archive_path = AnyPath(archive_path)
+ folder_path = AnyPath(folder_path)
+
+ tmp_dir = None
+ if path.is_cloud_path(folder_path):
+ tmp_dir = tempfile.TemporaryDirectory()
+ folder_path = folder_path.download_to(tmp_dir.name)
+
+ # Shutil make_archive needs a path without extension
+ archive_base = os.path.splitext(archive_path)[0]
+
+ # Archive the folder
+ archive_fn = shutil.make_archive(
+ archive_base,
+ format=fmt,
+ root_dir=folder_path.parent,
+ base_dir=folder_path.name,
+ )
+
+ if tmp_dir is not None:
+ tmp_dir.cleanup()
+
+ return AnyPath(archive_fn)
+
+
+def add_to_zip(
+ zip_path: AnyPathStrType,
+ dirs_to_add: Union[list, AnyPathStrType],
+) -> AnyPathType:
+ """
+ Add folders to an already existing zip file (recursively).
+
+ Args:
+ zip_path (AnyPathStrType): Already existing zip file
+ dirs_to_add (Union[list, AnyPathStrType]): Directories to add
+
+ Returns:
+ AnyPathType: Updated zip_path
+
+ Example:
+ >>> zip_path = 'D:/path/to/zip.zip'
+ >>> dirs_to_add = ['D:/path/to/dir1', 'D:/path/to/dir2']
+ >>> add_to_zip(zip_path, dirs_to_add)
+ zip.zip contains 2 more folders, dir1 and dir2
+ """
+ zip_path = AnyPath(zip_path)
+
+ # If the zip is on the cloud, cache it (zipfile doesn't like cloud paths)
+ if path.is_cloud_path(zip_path):
+ zip_path = AnyPath(zip_path.fspath)
+
+ # Check if existing zipfile
+ if not zip_path.is_file():
+ raise FileNotFoundError(f"Non existing {zip_path}")
+
+ # Convert to list if needed
+ if not isinstance(dirs_to_add, list):
+ dirs_to_add = [dirs_to_add]
+
+ # Add all folders to the existing zip
+ # Forced to use ZipFile because make_archive only works with one folder and not existing zipfile
+ with open_zipfile(zip_path, "a") as zip_file:
+ progress_bar = tqdm(dirs_to_add)
+ for dir_to_add_path in progress_bar:
+ # Just to be sure, use str instead of Paths
+ if isinstance(dir_to_add_path, Path):
+ dir_to_add = str(dir_to_add_path)
+ elif path.is_cloud_path(dir_to_add_path):
+ dir_to_add = dir_to_add_path.fspath
+ else:
+ dir_to_add = dir_to_add_path
+
+ progress_bar.set_description(
+ f"Adding {os.path.basename(dir_to_add)} to {os.path.basename(zip_path)}"
+ )
+ tmp = tempfile.TemporaryDirectory()
+ if os.path.isfile(dir_to_add):
+ dir_to_add = extract_file(dir_to_add, tmp.name)
+
+ for root, _, files in os.walk(dir_to_add):
+ base_path = os.path.join(dir_to_add, "..")
+
+ # Write dir (in namelist at least)
+ zip_file.write(root, os.path.relpath(root, base_path))
+
+ # Write files
+ for file in files:
+ zip_file.write(
+ os.path.join(root, file),
+ os.path.relpath(
+ os.path.join(root, file), os.path.join(dir_to_add, "..")
+ ),
+ )
+
+ # Clean tmp
+ tmp.cleanup()
+
+ return zip_path
+
+
+def get_archived_file_list(archive_path: AnyPathStrType) -> list:
+ """
+ Get the list of all the files contained in an archive.
+
+ Args:
+ archive_path (AnyPathStrType): Archive path
+
+ Returns:
+ list: All files contained in the given archive
+
+ Example:
+ >>> arch_path = 'D:/path/to/zip.zip'
+ >>> get_archived_file_list(arch_path, file_regex)
+ ['file_1.txt', 'file_2.tif', 'file_3.xml', 'file_4.geojson']
+ """
+ archive_path = AnyPath(archive_path)
+
+ is_zip = archive_path.suffix == ".zip"
+ archive_fn = path.get_filename(archive_path)
+ if is_zip:
+ with open_zipfile(archive_path) as zip_ds:
+ file_list = [f.filename for f in zip_ds.filelist]
+ else:
+ try:
+ with open_tarfile(archive_path) as tar_ds:
+ tar_mb = tar_ds.getmembers()
+ file_list = [mb.name for mb in tar_mb]
+ except tarfile.ReadError as ex:
+ raise tarfile.ReadError(f"Impossible to open archive: {archive_fn}") from ex
+
+ return file_list
+
+
+def get_archived_path(
+ archive_path: AnyPathStrType,
+ regex: str,
+ as_list: bool = False,
+ case_sensitive: bool = False,
+ file_list: list = None,
+ **kwargs,
+) -> Union[list, AnyPathType]:
+ """
+ Get archived file path from inside the archive.
+
+ .. WARNING::
+ If :code:`as_list` is :code:`False`, it will only return the first file matched !
+
+ You can use this `site `_ to build your regex.
+
+ Args:
+ archive_path (AnyPathStrType): Archive path
+ regex (str): File regex (used by re) as it can be found in the getmembers() list
+ as_list (bool): If true, returns a list (including all found files). If false, returns only the first match
+ case_sensitive (bool): If true, the regex is case-sensitive.
+ file_list (list): List of files to get archived from. Optional, if not given it will be re-computed.
+
+ Returns:
+ Union[list, str]: Path from inside the zipfile
+
+ Example:
+ >>> arch_path = 'D:/path/to/zip.zip'
+ >>> file_regex = '.*dir.*file_name' # Use .* for any character
+ >>> path = get_archived_path(arch_path, file_regex)
+ 'dir/filename.tif'
+ """
+ if regex is None:
+ logs.deprecation_warning(
+ "'file_regex' is deprecated, please use 'regex' instead."
+ )
+ regex = kwargs.pop("file_regex")
+
+ # Get file list
+ archive_path = AnyPath(archive_path)
+
+ # Offer the ability to give the file list directly, as this operation is expensive when done with large archives stored on the cloud
+ if file_list is None:
+ file_list = get_archived_file_list(archive_path)
+
+ # Search for file
+ re_rgx = re.compile(regex) if case_sensitive else re.compile(regex, re.IGNORECASE)
+ archived_band_paths = list(filter(re_rgx.match, file_list))
+ if not archived_band_paths:
+ raise FileNotFoundError(
+ f"Impossible to find file {regex} in {path.get_filename(archive_path)}"
+ )
+
+ # Convert to str if needed
+ if not as_list:
+ archived_band_paths = archived_band_paths[0]
+
+ return archived_band_paths
+
+
+def get_archived_rio_path(
+ archive_path: AnyPathStrType,
+ regex: str,
+ as_list: bool = False,
+ file_list: list = None,
+ **kwargs,
+) -> Union[list, AnyPathType]:
+ """
+ Get archived file path from inside the archive, to be read with rasterio:
+
+ - :code:`zip+file://{zip_path}!{file_name}`
+ - :code:`tar+file://{tar_path}!{file_name}`
+
+
+ See `here `_
+ for more information.
+
+ .. WARNING::
+ It wont be readable by pandas, geopandas or xmltree !
+
+ .. WARNING::
+ If :code:`as_list` is :code:`False`, it will only return the first file matched !
+
+ You can use this `site `_ to build your regex.
+
+ Args:
+ archive_path (AnyPathStrType): Archive path
+ regex (str): File regex (used by re) as it can be found in the getmembers() list
+ as_list (bool): If true, returns a list (including all found files). If false, returns only the first match
+ file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed.
+
+ Returns:
+ Union[list, str]: Band path that can be read by rasterio
+
+ Example:
+ >>> arch_path = 'D:/path/to/zip.zip'
+ >>> file_regex = '.*dir.*file_name' # Use .* for any character
+ >>> path = get_archived_tif_path(arch_path, file_regex)
+ 'zip+file://D:/path/to/output.zip!dir/filename.tif'
+ >>> rasterio.open(path)
+
+ """
+ if regex is None:
+ logs.deprecation_warning(
+ "'file_regex' is deprecated, please use 'regex' instead."
+ )
+ regex = kwargs.pop("file_regex")
+
+ archive_path = AnyPath(archive_path)
+ if archive_path.suffix in [".tar", ".zip"]:
+ prefix = archive_path.suffix[-3:]
+ elif archive_path.suffix == ".tar.gz":
+ raise TypeError(
+ ".tar.gz files are too slow to be read from inside the archive. Please extract them instead."
+ )
+ else:
+ raise TypeError("Only .zip and .tar files can be read from inside its archive.")
+
+ # Search for file
+ archived_band_paths = get_archived_path(
+ archive_path, regex=regex, as_list=True, file_list=file_list
+ )
+
+ # Convert to rio path
+ if path.is_cloud_path(archive_path):
+ archived_band_paths = [
+ f"{prefix}+file+{archive_path}!{p}" for p in archived_band_paths
+ ]
+ else:
+ # archived_band_paths = [
+ # f"{prefix}+file://{archive_path}!{path}" for path in archived_band_paths
+ # ]
+ archived_band_paths = [
+ f"/vsi{prefix}/{archive_path}/{p}" for p in archived_band_paths
+ ]
+
+ # Convert to str if needed
+ if not as_list:
+ archived_band_paths = archived_band_paths[0]
+
+ return archived_band_paths
diff --git a/sertit/files.py b/sertit/files.py
index 4bdb6c2..c2ec5a3 100644
--- a/sertit/files.py
+++ b/sertit/files.py
@@ -19,11 +19,7 @@
import json
import logging
import os
-import re
import shutil
-import tarfile
-import tempfile
-import zipfile
from datetime import date, datetime
from enum import Enum
from json import JSONDecoder, JSONEncoder
@@ -32,10 +28,8 @@
import dill
import numpy as np
-from lxml import etree, html
-from tqdm import tqdm
-from sertit import AnyPath, logs, path
+from sertit import AnyPath, logs, path, s3
from sertit.logs import SU_NAME
from sertit.strings import DATE_FORMAT
from sertit.types import AnyPathStrType, AnyPathType
@@ -165,454 +159,6 @@ def real_rel_path(raw_path: AnyPathStrType, start: AnyPathStrType) -> AnyPathTyp
return path.real_rel_path(raw_path, start)
-def extract_file(
- file_path: AnyPathStrType,
- output: AnyPathStrType,
- overwrite: bool = False,
-) -> AnyPathType:
- """
- Extract an archived file (zip or others). Overwrites if specified.
- If the archive don't contain a root directory with the name of the archive without the extension, create it
-
- Args:
- file_path (str): Archive file path
- output (str): Output where to put the extracted directory
- overwrite (bool): Overwrite found extracted directory
-
- Returns:
- AnyPathType: Extracted directory paths
-
- Example:
- >>> file_path = 'D:/path/to/zip.zip'
- >>> output = 'D:/path/to/output'
- >>> extract_file(file_path, output, overwrite=True)
- D:/path/to/output/zip'
- """
- # Convert to path
- file_path = AnyPath(file_path)
- output = AnyPath(output)
-
- # In case a folder is given, returns it (this means that the file is already extracted)
- if file_path.is_dir():
- return file_path
-
- # Beware with .SEN3 and .SAFE extensions
- archive_output = output.joinpath(path.get_filename(file_path))
-
- # In case not overwrite and the extracted directory already exists
- if not overwrite and archive_output.exists():
- LOGGER.debug(
- "Already existing extracted %s. It won't be overwritten.",
- archive_output,
- )
- return archive_output
-
- def extract_sub_dir(arch, filename_list):
- top_level_files = list({item.split("/")[0] for item in filename_list})
-
- # When the only root directory in the archive has the right name, we don't have to create it
- if len(top_level_files) == 1 and archive_output.name == path.get_filename(
- top_level_files[0]
- ):
- arch.extractall(archive_output.parent)
- archive_output.parent.joinpath(top_level_files[0]).rename(archive_output)
- else:
- arch.extractall(archive_output)
-
- # Manage archive type
- if file_path.suffix == ".zip":
- with zipfile.ZipFile(file_path, "r") as zip_file:
- extract_sub_dir(zip_file, zip_file.namelist())
- elif file_path.suffix == ".tar" or file_path.suffixes == [".tar", ".gz"]:
- with tarfile.open(file_path, "r") as tar_file:
- extract_sub_dir(tar_file, tar_file.getnames())
- elif file_path.suffix == ".7z":
- try:
- import py7zr
-
- with py7zr.SevenZipFile(file_path, "r") as z7_file:
- extract_sub_dir(z7_file, z7_file.getnames())
- except ModuleNotFoundError as exc:
- raise TypeError("Please install 'py7zr' to extract .7z files") from exc
- else:
- raise TypeError(
- f"Only .zip, .tar, .tar.gz and .7z files can be extracted, not {file_path}"
- )
-
- return archive_output
-
-
-def extract_files(
- archives: list, output: AnyPathStrType, overwrite: bool = False
-) -> list:
- """
- Extract all archived files. Overwrites if specified.
-
- Example:
- >>> file_path = ['D:/path/to/zip1.zip', 'D:/path/to/zip2.zip']
- >>> output = 'D:/path/to/output'
- >>> extract_files(file_path, output, overwrite=True)
- ['D:/path/to/output.zip1', 'D:/path/to/output.zip2']
-
- Args:
- archives (list of str): List of archives to be extracted
- output (str): Output folder where extracted files will be written
- overwrite (bool): Overwrite found extracted files
-
- Returns:
- list: Extracted files (even pre-existing ones)
- """
- LOGGER.info("Extracting products in %s", output)
- progress_bar = tqdm(archives)
- extracts = []
- for arch in progress_bar:
- progress_bar.set_description(f"Extracting product {os.path.basename(arch)}")
- extracts.append(extract_file(arch, output, overwrite))
-
- return extracts
-
-
-def get_archived_file_list(archive_path: AnyPathStrType) -> list:
- """
- .. deprecated:: 1.30.0
- Import it from :py:mod:`sertit.path` instead of :py:mod:`sertit.files`
-
- Get the list of all the files contained in an archive.
-
- Args:
- archive_path (AnyPathStrType): Archive path
-
- Returns:
- list: All files contained in the given archive
-
- Example:
- >>> arch_path = 'D:/path/to/zip.zip'
- >>> get_archived_file_list(arch_path, file_regex)
- ['file_1.txt', 'file_2.tif', 'file_3.xml', 'file_4.geojson']
- """
- logs.deprecation_warning(
- "This function is deprecated. Import it from 'sertit.path' instead of 'sertit.files'"
- )
- return path.get_archived_file_list(archive_path)
-
-
-def get_archived_path(
- archive_path: AnyPathStrType, file_regex: str, as_list: bool = False
-) -> Union[list, AnyPathType]:
- """
- .. deprecated:: 1.30.0
- Import it from :py:mod:`sertit.path` instead of :py:mod:`sertit.files`
-
- Get archived file path from inside the archive.
-
- .. WARNING::
- If :code:`as_list` is :code:`False`, it will only return the first file matched !
-
- You can use this `site `_ to build your regex.
-
- Example:
- >>> arch_path = 'D:/path/to/zip.zip'
- >>> file_regex = '.*dir.*file_name' # Use .* for any character
- >>> path = get_archived_path(arch_path, file_regex)
- 'dir/filename.tif'
-
- Args:
- archive_path (AnyPathStrType): Archive path
- file_regex (str): File regex (used by re) as it can be found in the getmembers() list
- as_list (bool): If true, returns a list (including all found files). If false, returns only the first match
-
- Returns:
- Union[list, str]: Path from inside the zipfile
- """
- logs.deprecation_warning(
- "This function is deprecated. Import it from 'sertit.path' instead of 'sertit.files'"
- )
- return path.get_archived_path(archive_path, file_regex, as_list)
-
-
-def get_archived_rio_path(
- archive_path: AnyPathStrType, file_regex: str, as_list: bool = False
-) -> Union[list, AnyPathType]:
- """
- .. deprecated:: 1.30.0
- Import it from :py:mod:`sertit.path` instead of :py:mod:`sertit.files`
-
- Get archived file path from inside the archive, to be read with rasterio:
-
- - :code:`zip+file://{zip_path}!{file_name}`
- - :code:`tar+file://{tar_path}!{file_name}`
-
-
- See `here `_
- for more information.
-
- .. WARNING::
- It won't be readable by pandas, geopandas or xmltree !
-
- .. WARNING::
- If :code:`as_list` is :code:`False`, it will only return the first file matched !
-
- You can use this `site `_ to build your regex.
-
- Args:
- archive_path (AnyPathStrType): Archive path
- file_regex (str): File regex (used by re) as it can be found in the getmembers() list
- as_list (bool): If true, returns a list (including all found files). If false, returns only the first match
-
- Returns:
- Union[list, str]: Band path that can be read by rasterio
-
- Example:
- >>> arch_path = 'D:/path/to/zip.zip'
- >>> file_regex = '.*dir.*file_name' # Use .* for any character
- >>> path = get_archived_tif_path(arch_path, file_regex)
- 'zip+file://D:/path/to/output.zip!dir/filename.tif'
- >>> rasterio.open(path)
-
- """
- logs.deprecation_warning(
- "This function is deprecated. Import it from 'sertit.path' instead of 'sertit.files'"
- )
- return path.get_archived_rio_path(archive_path, file_regex, as_list)
-
-
-def read_archived_file(
- archive_path: AnyPathStrType, regex: str, file_list: list = None
-) -> bytes:
- """
- Read archived file (in bytes) from :code:`zip` or :code:`tar` archives.
-
- You can use this `site `_ to build your regex.
-
- Args:
- archive_path (AnyPathStrType): Archive path
- regex (str): Regex (used by re) as it can be found in the getmembers() list
- file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed.
-
- Returns:
- bytes: Archived file in bytes
- """
- archive_path = AnyPath(archive_path)
-
- # Compile regex
- regex = re.compile(regex)
-
- # Open tar and zip XML
- try:
- if archive_path.suffix == ".tar":
- with tarfile.open(archive_path) as tar_ds:
- # file_list is not very useful for TAR files...
- if file_list is None:
- tar_mb = tar_ds.getmembers()
- file_list = [mb.name for mb in tar_mb]
- name = list(filter(regex.match, file_list))[0]
- tarinfo = tar_ds.getmember(name)
- file_str = tar_ds.extractfile(tarinfo).read()
- elif archive_path.suffix == ".zip":
- with zipfile.ZipFile(archive_path) as zip_ds:
- if file_list is None:
- file_list = [f.filename for f in zip_ds.filelist]
- name = list(filter(regex.match, file_list))[0]
- file_str = zip_ds.read(name)
-
- elif archive_path.suffix == ".tar.gz":
- raise TypeError(
- ".tar.gz files are too slow to read from inside the archive. Please extract them instead."
- )
- else:
- raise TypeError(
- "Only .zip and .tar files can be read from inside its archive."
- )
- except IndexError as exc:
- raise FileNotFoundError(
- f"Impossible to find file {regex} in {path.get_filename(archive_path)}"
- ) from exc
-
- return file_str
-
-
-def read_archived_xml(
- archive_path: AnyPathStrType, regex: str = None, file_list: list = None, **kwargs
-) -> etree._Element:
- """
- Read archived XML from :code:`zip` or :code:`tar` archives.
-
- You can use this `site `_ to build your regex.
-
- Args:
- archive_path (AnyPathStrType): Archive path
- regex (str): XML regex (used by re) as it can be found in the getmembers() list
- file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed.
-
- Returns:
- etree._Element: XML file
-
- Example:
- >>> arch_path = 'D:/path/to/zip.zip'
- >>> file_regex = '.*dir.*file_name' # Use .* for any character
- >>> read_archived_xml(arch_path, file_regex)
-
- """
- if regex is None:
- logs.deprecation_warning(
- "'xml_regex' is deprecated, please use 'regex' instead."
- )
- regex = kwargs.pop("xml_regex")
-
- xml_bytes = read_archived_file(archive_path, regex=regex, file_list=file_list)
-
- return etree.fromstring(xml_bytes)
-
-
-def read_archived_html(
- archive_path: AnyPathStrType, regex: str, file_list: list = None
-) -> html.HtmlElement:
- """
- Read archived HTML from :code:`zip` or :code:`tar` archives.
-
- You can use this `site `_ to build your regex.
-
- Args:
- archive_path (AnyPathStrType): Archive path
- regex (str): HTML regex (used by re) as it can be found in the getmembers() list
- file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed.
-
- Returns:
- html._Element: HTML file
-
- Example:
- >>> arch_path = 'D:/path/to/zip.zip'
- >>> file_regex = '.*dir.*file_name' # Use .* for any character
- >>> read_archived_html(arch_path, file_regex)
-
- """
- html_bytes = read_archived_file(archive_path, regex, file_list=file_list)
-
- return html.fromstring(html_bytes)
-
-
-def archive(
- folder_path: AnyPathStrType,
- archive_path: AnyPathStrType,
- fmt: str = "zip",
-) -> AnyPathType:
- """
- Archives a folder recursively.
-
- Args:
- folder_path (AnyPathStrType): Folder to archive
- archive_path (AnyPathStrType): Archive path, with or without extension
- fmt (str): Format of the archive, used by :code:`shutil.make_archive`. Choose between [zip, tar, gztar, bztar, xztar]
-
- Returns:
- str: Archive filename
-
- Example:
- >>> folder_path = 'D:/path/to/folder_to_archive'
- >>> archive_path = 'D:/path/to/output'
- >>> archive = archive(folder_path, archive_path, fmt="gztar")
- 'D:/path/to/output/folder_to_archive.tar.gz'
- """
- archive_path = AnyPath(archive_path)
- folder_path = AnyPath(folder_path)
-
- tmp_dir = None
- if path.is_cloud_path(folder_path):
- tmp_dir = tempfile.TemporaryDirectory()
- folder_path = folder_path.download_to(tmp_dir.name)
-
- # Shutil make_archive needs a path without extension
- archive_base = os.path.splitext(archive_path)[0]
-
- # Archive the folder
- archive_fn = shutil.make_archive(
- archive_base,
- format=fmt,
- root_dir=folder_path.parent,
- base_dir=folder_path.name,
- )
-
- if tmp_dir is not None:
- tmp_dir.cleanup()
-
- return AnyPath(archive_fn)
-
-
-def add_to_zip(
- zip_path: AnyPathStrType,
- dirs_to_add: Union[list, AnyPathStrType],
-) -> AnyPathType:
- """
- Add folders to an already existing zip file (recursively).
-
- Args:
- zip_path (AnyPathStrType): Already existing zip file
- dirs_to_add (Union[list, AnyPathStrType]): Directories to add
-
- Returns:
- AnyPathType: Updated zip_path
-
- Example:
- >>> zip_path = 'D:/path/to/zip.zip'
- >>> dirs_to_add = ['D:/path/to/dir1', 'D:/path/to/dir2']
- >>> add_to_zip(zip_path, dirs_to_add)
- zip.zip contains 2 more folders, dir1 and dir2
- """
- zip_path = AnyPath(zip_path)
-
- # If the zip is on the cloud, cache it (zipfile doesn't like cloud paths)
- if path.is_cloud_path(zip_path):
- zip_path = AnyPath(zip_path.fspath)
-
- # Check if existing zipfile
- if not zip_path.is_file():
- raise FileNotFoundError(f"Non existing {zip_path}")
-
- # Convert to list if needed
- if not isinstance(dirs_to_add, list):
- dirs_to_add = [dirs_to_add]
-
- # Add all folders to the existing zip
- # Forced to use ZipFile because make_archive only works with one folder and not existing zipfile
- with zipfile.ZipFile(zip_path, "a") as zip_file:
- progress_bar = tqdm(dirs_to_add)
- for dir_to_add_path in progress_bar:
- # Just to be sure, use str instead of Paths
- if isinstance(dir_to_add_path, Path):
- dir_to_add = str(dir_to_add_path)
- elif path.is_cloud_path(dir_to_add_path):
- dir_to_add = dir_to_add_path.fspath
- else:
- dir_to_add = dir_to_add_path
-
- progress_bar.set_description(
- f"Adding {os.path.basename(dir_to_add)} to {os.path.basename(zip_path)}"
- )
- tmp = tempfile.TemporaryDirectory()
- if os.path.isfile(dir_to_add):
- dir_to_add = extract_file(dir_to_add, tmp.name)
-
- for root, _, files in os.walk(dir_to_add):
- base_path = os.path.join(dir_to_add, "..")
-
- # Write dir (in namelist at least)
- zip_file.write(root, os.path.relpath(root, base_path))
-
- # Write files
- for file in files:
- zip_file.write(
- os.path.join(root, file),
- os.path.relpath(
- os.path.join(root, file), os.path.join(dir_to_add, "..")
- ),
- )
-
- # Clean tmp
- tmp.cleanup()
-
- return zip_path
-
-
def get_filename(file_path: AnyPathStrType, other_exts: Union[list, str] = None) -> str:
"""
.. deprecated:: 1.30.0
@@ -754,7 +300,7 @@ def copy(src: AnyPathStrType, dst: AnyPathStrType) -> AnyPathType:
src = AnyPath(src)
if path.is_cloud_path(src):
- out = src.download_to(dst)
+ out = s3.download(src, dst)
else:
out = None
try:
diff --git a/sertit/path.py b/sertit/path.py
index 48e9b90..30451e1 100644
--- a/sertit/path.py
+++ b/sertit/path.py
@@ -19,13 +19,10 @@
import logging
import os
import pprint
-import re
-import tarfile
import tempfile
-import zipfile
from typing import Any, Union
-from sertit import AnyPath, logs, s3
+from sertit import AnyPath
from sertit.logs import SU_NAME
from sertit.types import AnyPathStrType, AnyPathType
@@ -150,189 +147,6 @@ def real_rel_path(raw_path: AnyPathStrType, start: AnyPathStrType) -> AnyPathTyp
return rel_path
-def get_archived_file_list(archive_path: AnyPathStrType) -> list:
- """
- Get the list of all the files contained in an archive.
-
- Args:
- archive_path (AnyPathStrType): Archive path
-
- Returns:
- list: All files contained in the given archive
-
- Example:
- >>> arch_path = 'D:/path/to/zip.zip'
- >>> get_archived_file_list(arch_path, file_regex)
- ['file_1.txt', 'file_2.tif', 'file_3.xml', 'file_4.geojson']
- """
- archive_path = AnyPath(archive_path)
-
- is_zip = archive_path.suffix == ".zip"
- archive_fn = get_filename(archive_path)
- if is_zip:
- if is_cloud_path(archive_path):
- archive_path = s3.read(archive_path)
-
- with zipfile.ZipFile(archive_path) as zip_ds:
- file_list = [f.filename for f in zip_ds.filelist]
- else:
- try:
- if is_cloud_path(archive_path):
- args = {"fileobj": s3.read(archive_path), "mode": "r"}
- else:
- args = {"name": archive_path, "mode": "r"}
- with tarfile.open(**args) as tar_ds:
- tar_mb = tar_ds.getmembers()
- file_list = [mb.name for mb in tar_mb]
- except tarfile.ReadError as ex:
- raise tarfile.ReadError(f"Impossible to open archive: {archive_fn}") from ex
-
- return file_list
-
-
-def get_archived_path(
- archive_path: AnyPathStrType,
- regex: str,
- as_list: bool = False,
- case_sensitive: bool = False,
- file_list: list = None,
- **kwargs,
-) -> Union[list, AnyPathType]:
- """
- Get archived file path from inside the archive.
-
- .. WARNING::
- If :code:`as_list` is :code:`False`, it will only return the first file matched !
-
- You can use this `site `_ to build your regex.
-
- Args:
- archive_path (AnyPathStrType): Archive path
- regex (str): File regex (used by re) as it can be found in the getmembers() list
- as_list (bool): If true, returns a list (including all found files). If false, returns only the first match
- case_sensitive (bool): If true, the regex is case-sensitive.
- file_list (list): List of files to get archived from. Optional, if not given it will be re-computed.
-
- Returns:
- Union[list, str]: Path from inside the zipfile
-
- Example:
- >>> arch_path = 'D:/path/to/zip.zip'
- >>> file_regex = '.*dir.*file_name' # Use .* for any character
- >>> path = get_archived_path(arch_path, file_regex)
- 'dir/filename.tif'
- """
- if regex is None:
- logs.deprecation_warning(
- "'file_regex' is deprecated, please use 'regex' instead."
- )
- regex = kwargs.pop("file_regex")
-
- # Get file list
- archive_path = AnyPath(archive_path)
-
- # Offer the ability to give the file list directly, as this operation is expensive when done with large archives stored on the cloud
- if file_list is None:
- file_list = get_archived_file_list(archive_path)
-
- # Search for file
- re_rgx = re.compile(regex) if case_sensitive else re.compile(regex, re.IGNORECASE)
- archived_band_paths = list(filter(re_rgx.match, file_list))
- if not archived_band_paths:
- raise FileNotFoundError(
- f"Impossible to find file {regex} in {get_filename(archive_path)}"
- )
-
- # Convert to str if needed
- if not as_list:
- archived_band_paths = archived_band_paths[0]
-
- return archived_band_paths
-
-
-def get_archived_rio_path(
- archive_path: AnyPathStrType,
- regex: str,
- as_list: bool = False,
- file_list: list = None,
- **kwargs,
-) -> Union[list, AnyPathType]:
- """
- Get archived file path from inside the archive, to be read with rasterio:
-
- - :code:`zip+file://{zip_path}!{file_name}`
- - :code:`tar+file://{tar_path}!{file_name}`
-
-
- See `here `_
- for more information.
-
- .. WARNING::
- It wont be readable by pandas, geopandas or xmltree !
-
- .. WARNING::
- If :code:`as_list` is :code:`False`, it will only return the first file matched !
-
- You can use this `site `_ to build your regex.
-
- Args:
- archive_path (AnyPathStrType): Archive path
- regex (str): File regex (used by re) as it can be found in the getmembers() list
- as_list (bool): If true, returns a list (including all found files). If false, returns only the first match
- file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed.
-
- Returns:
- Union[list, str]: Band path that can be read by rasterio
-
- Example:
- >>> arch_path = 'D:/path/to/zip.zip'
- >>> file_regex = '.*dir.*file_name' # Use .* for any character
- >>> path = get_archived_tif_path(arch_path, file_regex)
- 'zip+file://D:/path/to/output.zip!dir/filename.tif'
- >>> rasterio.open(path)
-
- """
- if regex is None:
- logs.deprecation_warning(
- "'file_regex' is deprecated, please use 'regex' instead."
- )
- regex = kwargs.pop("file_regex")
-
- archive_path = AnyPath(archive_path)
- if archive_path.suffix in [".tar", ".zip"]:
- prefix = archive_path.suffix[-3:]
- elif archive_path.suffix == ".tar.gz":
- raise TypeError(
- ".tar.gz files are too slow to be read from inside the archive. Please extract them instead."
- )
- else:
- raise TypeError("Only .zip and .tar files can be read from inside its archive.")
-
- # Search for file
- archived_band_paths = get_archived_path(
- archive_path, regex=regex, as_list=True, file_list=file_list
- )
-
- # Convert to rio path
- if is_cloud_path(archive_path):
- archived_band_paths = [
- f"{prefix}+file+{archive_path}!{path}" for path in archived_band_paths
- ]
- else:
- # archived_band_paths = [
- # f"{prefix}+file://{archive_path}!{path}" for path in archived_band_paths
- # ]
- archived_band_paths = [
- f"/vsi{prefix}/{archive_path}/{path}" for path in archived_band_paths
- ]
-
- # Convert to str if needed
- if not as_list:
- archived_band_paths = archived_band_paths[0]
-
- return archived_band_paths
-
-
def get_filename(file_path: AnyPathStrType, other_exts: Union[list, str] = None) -> str:
"""
Get file name (without extension) from file path, ie:
diff --git a/sertit/vectors.py b/sertit/vectors.py
index 9b214b2..87856e9 100644
--- a/sertit/vectors.py
+++ b/sertit/vectors.py
@@ -23,9 +23,7 @@
import os
import re
import shutil
-import tarfile
import tempfile
-import zipfile
from collections.abc import Generator
from contextlib import contextmanager
from typing import Any, Union
@@ -36,7 +34,7 @@
from cloudpathlib.exceptions import AnyPathTypeError
from shapely import Polygon, wkt
-from sertit import AnyPath, files, geometry, logs, misc, path, strings
+from sertit import AnyPath, archives, files, geometry, logs, misc, path, s3, strings
from sertit.logs import SU_NAME
from sertit.types import AnyPathStrType, AnyPathType
@@ -256,8 +254,11 @@ def get_aoi_wkt(aoi_path: AnyPathStrType, as_str: bool = True) -> Union[str, Pol
if aoi_path.suffix == ".wkt":
try:
- with open(aoi_path) as aoi_f:
- aoi = wkt.load(aoi_f)
+ if path.is_cloud_path(aoi_path):
+ aoi = wkt.load(s3.read(aoi_path))
+ else:
+ with open(aoi_path) as aoi_f:
+ aoi = wkt.load(aoi_f)
except Exception as ex:
raise ValueError("AOI WKT cannot be read") from ex
else:
@@ -471,13 +472,17 @@ def read(
if "!" in str(vector_path):
split_vect = str(vector_path).split("!")
archive_regex = ".*{}".format(split_vect[1].replace(".", r"\."))
- vector_path = AnyPath(split_vect[0])
+ try:
+ vector_path = AnyPath(split_vect[0], **vector_path.storage_options)
+ except Exception:
+ # Cloudpathlib
+ vector_path = AnyPath(split_vect[0])
# Manage archive case
if vector_path.suffix in [".tar", ".zip"]:
prefix = vector_path.suffix[-3:]
file_list = kwargs.pop(
- "file_list", path.get_archived_file_list(vector_path)
+ "file_list", archives.get_archived_file_list(vector_path)
)
try:
@@ -710,16 +715,16 @@ def ogr2geojson(
# archived vector_path are extracted in a tmp folder so no need to be downloaded
if vector_path.suffix == ".zip":
- with zipfile.ZipFile(vector_path, "r") as zip_ds:
+ with archives.open_zipfile(vector_path, "r") as zip_ds:
vect_path = zip_ds.extract(arch_vect_path, out_dir)
elif vector_path.suffix == ".tar":
- with tarfile.open(vector_path, "r") as tar_ds:
+ with archives.open_tarfile(vector_path, "r") as tar_ds:
tar_ds.extract(arch_vect_path, out_dir)
vect_path = os.path.join(out_dir, arch_vect_path)
else:
# vector_path should be downloaded to work with 'ogr2ogr'
if path.is_cloud_path(vector_path):
- vector_path = AnyPath(vector_path).fspath
+ vector_path = s3.download(vector_path, out_dir)
vect_path = vector_path
vect_path_gj = os.path.join(
diff --git a/sertit/xml.py b/sertit/xml.py
index 9ddc44a..8d334ef 100644
--- a/sertit/xml.py
+++ b/sertit/xml.py
@@ -30,7 +30,7 @@
)
from lxml.html.builder import E
-from sertit import AnyPath, files, path
+from sertit import AnyPath, archives, logs, path, s3
from sertit.logs import SU_NAME
from sertit.misc import ListEnum
from sertit.types import AnyPathStrType
@@ -61,7 +61,7 @@ def read(xml_path: AnyPathStrType) -> _Element:
# Slower but works with:
# {ValueError}Unicode strings with encoding declaration are not supported.
# Please use bytes input or XML fragments without declaration.
- root = fromstring(xml_path.read_bytes())
+ root = fromstring(s3.read(xml_path).read())
else:
# pylint: disable=I1101:
# Module 'lxml.etree' has no 'parse' member, but source is unavailable.
@@ -75,7 +75,10 @@ def read(xml_path: AnyPathStrType) -> _Element:
def read_archive(
- path: AnyPathStrType, regex: str = None, file_list: list = None
+ archive_path: AnyPathStrType,
+ regex: str = None,
+ file_list: list = None,
+ **kwargs,
) -> _Element:
"""
Read an XML file from inside an archive (zip or tar)
@@ -87,25 +90,40 @@ def read_archive(
- path to the archive plus a regex looking inside the archive. Duplicate behaviour to :py:func:`files.read_archived_xml`
Args:
- path (AnyPathStrType): Path to the XML file, stored inside an archive or path to the archive itself
+ archive_path (AnyPathStrType): Path to the XML file, stored inside an archive or path to the archive itself
regex (str): Optional. If specified, the path should be the archive path and the regex should be the key to find the XML file inside the archive.
file_list (list): List of files contained in the archive. Optional, if not given it will be re-computed.
Returns:
_Element: XML Root
"""
+ if archive_path is None:
+ logs.deprecation_warning(
+ "'path' argument is deprecated, use 'archive_path' instead."
+ )
+ archive_path = kwargs.pop("path")
try:
if not regex:
- path, basename = str(path).split("!")
+ archive_base_path, basename = str(archive_path).split("!")
regex = basename
- if path.startswith("zip://") or path.startswith("tar://"):
- path = path[5:]
+ if archive_base_path.startswith("zip://") or archive_base_path.startswith(
+ "tar://"
+ ):
+ archive_base_path = archive_base_path[5:]
+
+ # For UPath
+ with contextlib.suppress(Exception):
+ archive_base_path = AnyPath(
+ archive_base_path, **archive_path.storage_options
+ )
+ else:
+ archive_base_path = archive_path
- return files.read_archived_xml(path, regex, file_list=file_list)
+ return archives.read_archived_xml(archive_base_path, regex, file_list=file_list)
except XMLSyntaxError as exc:
- raise ValueError(f"Invalid metadata XML for {path}!") from exc
+ raise ValueError(f"Invalid metadata XML for {archive_path}!") from exc
def write(xml: _Element, path: str) -> None: