From 6466cbf058e8e160e819fdaeda96c9b637eda712 Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Tue, 23 Jul 2024 13:21:52 +0200 Subject: [PATCH] Elsevier: reprocess tar file --- dags/common/pull_ftp.py | 46 ++++++--- .../elsevier/test_elsevier_dag_pull_sftp.py | 44 ++++++++- tests/units/common/test_common.py | 93 ++++++++++++++++++- 3 files changed, 169 insertions(+), 14 deletions(-) diff --git a/dags/common/pull_ftp.py b/dags/common/pull_ftp.py index 08b7c16c..bd5e9a3e 100644 --- a/dags/common/pull_ftp.py +++ b/dags/common/pull_ftp.py @@ -83,7 +83,7 @@ def reprocess_files(repo, logger, **kwargs): logger.msg("Processing specified filenames.") filenames_pull_params = kwargs["params"]["filenames_pull"] filenames = filenames_pull_params["filenames"] - return _find_files_in_zip(filenames, repo) + return _find_files_in_extracted_dir(filenames, repo, logger) def _force_pull( @@ -110,20 +110,44 @@ def _filenames_pull( return migrate_files(filenames, s_ftp, repo, logger) -def _find_files_in_zip(filenames, repo): +def _find_files_in_tar(tar_filename, repo): extracted_filenames = [] - for zipped_filename in filenames: - zipped_file = repo.get_by_id(f"raw/{zipped_filename}") - with zipfile.ZipFile(zipped_file) as zip: - for zip_filename in zip.namelist(): - if repo.is_meta(zip_filename): - filename_without_extension = zipped_filename.split(".")[0] - extracted_filenames.append( - f"extracted/{filename_without_extension}/{zip_filename}" - ) + tar_file = repo.get_by_id(f"raw/{tar_filename}") + with tarfile.open(fileobj=tar_file, mode="r") as tar: + for tar_filename in tar.getnames(): + if repo.is_meta(tar_filename): + filename_without_extension = tar_filename.split(".")[0] + extracted_filenames.append( + f"extracted/{filename_without_extension}/{tar_filename}" + ) return extracted_filenames +def _find_files_in_zip(zipped_filename, repo): + extracted_filenames = [] + zipped_file = repo.get_by_id(f"raw/{zipped_filename}") + with zipfile.ZipFile(zipped_file) as zip: + for zip_filename in zip.namelist(): + if repo.is_meta(zip_filename): + filename_without_extension = zipped_filename.split(".")[0] + extracted_filenames.append( + f"extracted/{filename_without_extension}/{zip_filename}" + ) + return extracted_filenames + + +def _find_files_in_extracted_dir(filenames, repo, logger): + all_found_files = [] + for filename in filenames: + if filename.endswith(".tar"): + all_found_files.extend(_find_files_in_tar(filename, repo)) + elif filename.endswith(".zip"): + all_found_files.extend(_find_files_in_zip(filename, repo)) + else: + logger.msg("This file will be not processed. File is not an archive", filename=filename) + return all_found_files + + def _differential_pull( s_ftp, repo, diff --git a/tests/integration/elsevier/test_elsevier_dag_pull_sftp.py b/tests/integration/elsevier/test_elsevier_dag_pull_sftp.py index 67825c11..11674b3f 100644 --- a/tests/integration/elsevier/test_elsevier_dag_pull_sftp.py +++ b/tests/integration/elsevier/test_elsevier_dag_pull_sftp.py @@ -1,5 +1,6 @@ from airflow.models import DagBag -from common.pull_ftp import migrate_from_ftp + +from common.pull_ftp import migrate_from_ftp, reprocess_files from elsevier.repository import ElsevierRepository from elsevier.sftp_service import ElsevierSFTPService from pytest import fixture @@ -52,3 +53,44 @@ def test_dag_migrate_from_FTP(elsevier_empty_repo): "CERNQ000000010669A.tar", "vtex00403986_a-2b_CLEANED.zip", ] + + +def test_dag_migrate_from_FTP_specific_folder(elsevier_empty_repo): + assert len(elsevier_empty_repo.find_all()) == 0 + with ElsevierSFTPService() as ftp: + migrate_from_ftp( + ftp, + elsevier_empty_repo, + get_logger().bind(class_name="test_logger"), + publisher="elsevier", + **{ + "params": { + "excluded_directories": [], + "force_pull": False, + "filenames_pull": { + "enabled": True, + "filenames": ["CERNQ000000010669A.tar"], + "force_from_ftp": True, + }, + } + }, + ) + assert elsevier_empty_repo.get_all_raw_filenames() == ["CERNQ000000010669A.tar"] + + reprocess_files( + elsevier_empty_repo, + get_logger().bind(class_name="test_logger"), + **{ + "params": { + "excluded_directories": [], + "force_pull": False, + "filenames_pull": { + "enabled": True, + "filenames": ["CERNQ000000010669A.tar"], + "force_from_ftp": False, + }, + } + }, + ) + assert elsevier_empty_repo.get_all_raw_filenames() == ["CERNQ000000010669A.tar"] + assert elsevier_empty_repo.find_all() == [{'pdf': 'extracted/CERNQ000000010669A/CERNQ000000010669/S0370269323005105/main.pdf', 'xml': 'extracted/CERNQ000000010669A/CERNQ000000010669/S0370269323005105/main.xml'}, {'xml': 'extracted/CERNQ000000010669A/CERNQ000000010669/dataset.xml'}] diff --git a/tests/units/common/test_common.py b/tests/units/common/test_common.py index 326cb6c6..ed8cf6d8 100644 --- a/tests/units/common/test_common.py +++ b/tests/units/common/test_common.py @@ -1,7 +1,7 @@ import zipfile from io import BytesIO from typing import List -from unittest.mock import Mock, patch +from unittest.mock import Mock, MagicMock, patch import pytest from common.pull_ftp import migrate_from_ftp, reprocess_files, trigger_file_processing @@ -14,13 +14,26 @@ "file1.zip", "file2.zip", ] -SFTP_LIST_FILES_RETURN_VALUE: List[str] = SFTP_NOT_ZIP_FILES + SFTP_ZIP_FILES +SFTP_TAR_FILES: List[str] = [ + "file1.tar", +] +SFTP_LIST_FILES_RETURN_VALUE: List[str] = SFTP_NOT_ZIP_FILES + SFTP_ZIP_FILES + SFTP_TAR_FILES REPO_FIND_ALL_RETURN_VALUE: List[dict] = [ {"xml": f, "pdf": f} for f in SFTP_LIST_FILES_RETURN_VALUE ] +@pytest.fixture +def tar_fixture(): + with patch("tarfile.open", autospec=True) as tar_patch: + mock_tararchive = MagicMock() + mock_tararchive.getnames.return_value = SFTP_TAR_FILES + mock_tararchive.extractfile.return_value.read.return_value = BytesIO(b"file content").read() + tar_patch.return_value.__enter__.return_value = mock_tararchive + yield tar_patch + + @pytest.fixture def zip_fixture(): with patch("zipfile.ZipFile", autospec=True) as zip_patch: @@ -227,6 +240,82 @@ def test_migrate_from_ftp_specified_file( assert repo_is_meta.call_count == 2 +@patch.object(SFTPService, attribute="__init__", return_value=None) +@patch.object( + SFTPService, attribute="list_files", return_value=SFTP_LIST_FILES_RETURN_VALUE +) +@patch.object(IRepository, attribute="get_by_id") +@patch.object(IRepository, attribute="is_meta") +@patch.object(IRepository, attribute="get_all_raw_filenames") +@patch.object(IRepository, attribute="save") +def test_migrate_from_ftp_specified_file_tar( + repo_save, + repo_get_all, + repo_is_meta, + repo_find_by_id, + sftp_list_files, + ftp_init, + ftp_get_file_fixture, + tar_fixture, +): + repo_get_all.return_value = SFTP_ZIP_FILES[0:-1] + repo = IRepository() + reprocess_files( + repo, + get_logger().bind(class_name="test_logger"), + **{ + "params": { + "force_pull": False, + "excluded_directories": [], + "filenames_pull": { + "enabled": True, + "filenames": ["file1.tar"], + "force_from_ftp": False, + }, + } + } + ) + assert repo_save.call_count == 0 + assert repo_find_by_id.call_count == 1 + assert repo_is_meta.call_count == 1 + + +@patch.object( + SFTPService, attribute="list_files", return_value=SFTP_LIST_FILES_RETURN_VALUE +) +@patch.object(IRepository, attribute="is_meta") +@patch.object(IRepository, attribute="get_all_raw_filenames") +@patch.object(IRepository, attribute="save") +def test_migrate_from_ftp_specified_file_force_from_ftp( + repo_save, + repo_get_all, + repo_is_meta, + sftp_list_files, + ftp_get_file_fixture, + tar_fixture, +): + repo_get_all.return_value = SFTP_ZIP_FILES[0:-1] + with SFTPService() as sftp: + repo = IRepository() + migrate_from_ftp( + sftp, + repo, + get_logger().bind(class_name="test_logger"), + **{ + "params": { + "force_pull": False, + "excluded_directories": [], + "filenames_pull": { + "enabled": True, + "filenames": ["file1.tar"], + "force_from_ftp": True, + }, + } + } + ) + assert repo_save.call_count == 2 + + @patch("common.pull_ftp.trigger_dag.trigger_dag") @patch.object(IRepository, attribute="get_by_id", return_value=BytesIO()) @patch.object(