Skip to content

Commit

Permalink
Elsevier: reprocess tar file
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Jul 23, 2024
1 parent 8e7ee7e commit 6466cbf
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 14 deletions.
46 changes: 35 additions & 11 deletions dags/common/pull_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def reprocess_files(repo, logger, **kwargs):
logger.msg("Processing specified filenames.")
filenames_pull_params = kwargs["params"]["filenames_pull"]
filenames = filenames_pull_params["filenames"]
return _find_files_in_zip(filenames, repo)
return _find_files_in_extracted_dir(filenames, repo, logger)


def _force_pull(
Expand All @@ -110,20 +110,44 @@ def _filenames_pull(
return migrate_files(filenames, s_ftp, repo, logger)


def _find_files_in_zip(filenames, repo):
def _find_files_in_tar(tar_filename, repo):
extracted_filenames = []
for zipped_filename in filenames:
zipped_file = repo.get_by_id(f"raw/{zipped_filename}")
with zipfile.ZipFile(zipped_file) as zip:
for zip_filename in zip.namelist():
if repo.is_meta(zip_filename):
filename_without_extension = zipped_filename.split(".")[0]
extracted_filenames.append(
f"extracted/{filename_without_extension}/{zip_filename}"
)
tar_file = repo.get_by_id(f"raw/{tar_filename}")
with tarfile.open(fileobj=tar_file, mode="r") as tar:
for tar_filename in tar.getnames():
if repo.is_meta(tar_filename):
filename_without_extension = tar_filename.split(".")[0]
extracted_filenames.append(
f"extracted/{filename_without_extension}/{tar_filename}"
)
return extracted_filenames


def _find_files_in_zip(zipped_filename, repo):
extracted_filenames = []
zipped_file = repo.get_by_id(f"raw/{zipped_filename}")
with zipfile.ZipFile(zipped_file) as zip:
for zip_filename in zip.namelist():
if repo.is_meta(zip_filename):
filename_without_extension = zipped_filename.split(".")[0]
extracted_filenames.append(
f"extracted/{filename_without_extension}/{zip_filename}"
)
return extracted_filenames


def _find_files_in_extracted_dir(filenames, repo, logger):
all_found_files = []
for filename in filenames:
if filename.endswith(".tar"):
all_found_files.extend(_find_files_in_tar(filename, repo))
elif filename.endswith(".zip"):
all_found_files.extend(_find_files_in_zip(filename, repo))
else:
logger.msg("This file will be not processed. File is not an archive", filename=filename)
return all_found_files


def _differential_pull(
s_ftp,
repo,
Expand Down
44 changes: 43 additions & 1 deletion tests/integration/elsevier/test_elsevier_dag_pull_sftp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from airflow.models import DagBag
from common.pull_ftp import migrate_from_ftp

from common.pull_ftp import migrate_from_ftp, reprocess_files
from elsevier.repository import ElsevierRepository
from elsevier.sftp_service import ElsevierSFTPService
from pytest import fixture
Expand Down Expand Up @@ -52,3 +53,44 @@ def test_dag_migrate_from_FTP(elsevier_empty_repo):
"CERNQ000000010669A.tar",
"vtex00403986_a-2b_CLEANED.zip",
]


def test_dag_migrate_from_FTP_specific_folder(elsevier_empty_repo):
assert len(elsevier_empty_repo.find_all()) == 0
with ElsevierSFTPService() as ftp:
migrate_from_ftp(
ftp,
elsevier_empty_repo,
get_logger().bind(class_name="test_logger"),
publisher="elsevier",
**{
"params": {
"excluded_directories": [],
"force_pull": False,
"filenames_pull": {
"enabled": True,
"filenames": ["CERNQ000000010669A.tar"],
"force_from_ftp": True,
},
}
},
)
assert elsevier_empty_repo.get_all_raw_filenames() == ["CERNQ000000010669A.tar"]

reprocess_files(
elsevier_empty_repo,
get_logger().bind(class_name="test_logger"),
**{
"params": {
"excluded_directories": [],
"force_pull": False,
"filenames_pull": {
"enabled": True,
"filenames": ["CERNQ000000010669A.tar"],
"force_from_ftp": False,
},
}
},
)
assert elsevier_empty_repo.get_all_raw_filenames() == ["CERNQ000000010669A.tar"]
assert elsevier_empty_repo.find_all() == [{'pdf': 'extracted/CERNQ000000010669A/CERNQ000000010669/S0370269323005105/main.pdf', 'xml': 'extracted/CERNQ000000010669A/CERNQ000000010669/S0370269323005105/main.xml'}, {'xml': 'extracted/CERNQ000000010669A/CERNQ000000010669/dataset.xml'}]
93 changes: 91 additions & 2 deletions tests/units/common/test_common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import zipfile
from io import BytesIO
from typing import List
from unittest.mock import Mock, patch
from unittest.mock import Mock, MagicMock, patch

import pytest
from common.pull_ftp import migrate_from_ftp, reprocess_files, trigger_file_processing
Expand All @@ -14,13 +14,26 @@
"file1.zip",
"file2.zip",
]
SFTP_LIST_FILES_RETURN_VALUE: List[str] = SFTP_NOT_ZIP_FILES + SFTP_ZIP_FILES
SFTP_TAR_FILES: List[str] = [
"file1.tar",
]
SFTP_LIST_FILES_RETURN_VALUE: List[str] = SFTP_NOT_ZIP_FILES + SFTP_ZIP_FILES + SFTP_TAR_FILES

REPO_FIND_ALL_RETURN_VALUE: List[dict] = [
{"xml": f, "pdf": f} for f in SFTP_LIST_FILES_RETURN_VALUE
]


@pytest.fixture
def tar_fixture():
with patch("tarfile.open", autospec=True) as tar_patch:
mock_tararchive = MagicMock()
mock_tararchive.getnames.return_value = SFTP_TAR_FILES
mock_tararchive.extractfile.return_value.read.return_value = BytesIO(b"file content").read()
tar_patch.return_value.__enter__.return_value = mock_tararchive
yield tar_patch


@pytest.fixture
def zip_fixture():
with patch("zipfile.ZipFile", autospec=True) as zip_patch:
Expand Down Expand Up @@ -227,6 +240,82 @@ def test_migrate_from_ftp_specified_file(
assert repo_is_meta.call_count == 2


@patch.object(SFTPService, attribute="__init__", return_value=None)
@patch.object(
SFTPService, attribute="list_files", return_value=SFTP_LIST_FILES_RETURN_VALUE
)
@patch.object(IRepository, attribute="get_by_id")
@patch.object(IRepository, attribute="is_meta")
@patch.object(IRepository, attribute="get_all_raw_filenames")
@patch.object(IRepository, attribute="save")
def test_migrate_from_ftp_specified_file_tar(
repo_save,
repo_get_all,
repo_is_meta,
repo_find_by_id,
sftp_list_files,
ftp_init,
ftp_get_file_fixture,
tar_fixture,
):
repo_get_all.return_value = SFTP_ZIP_FILES[0:-1]
repo = IRepository()
reprocess_files(
repo,
get_logger().bind(class_name="test_logger"),
**{
"params": {
"force_pull": False,
"excluded_directories": [],
"filenames_pull": {
"enabled": True,
"filenames": ["file1.tar"],
"force_from_ftp": False,
},
}
}
)
assert repo_save.call_count == 0
assert repo_find_by_id.call_count == 1
assert repo_is_meta.call_count == 1


@patch.object(
SFTPService, attribute="list_files", return_value=SFTP_LIST_FILES_RETURN_VALUE
)
@patch.object(IRepository, attribute="is_meta")
@patch.object(IRepository, attribute="get_all_raw_filenames")
@patch.object(IRepository, attribute="save")
def test_migrate_from_ftp_specified_file_force_from_ftp(
repo_save,
repo_get_all,
repo_is_meta,
sftp_list_files,
ftp_get_file_fixture,
tar_fixture,
):
repo_get_all.return_value = SFTP_ZIP_FILES[0:-1]
with SFTPService() as sftp:
repo = IRepository()
migrate_from_ftp(
sftp,
repo,
get_logger().bind(class_name="test_logger"),
**{
"params": {
"force_pull": False,
"excluded_directories": [],
"filenames_pull": {
"enabled": True,
"filenames": ["file1.tar"],
"force_from_ftp": True,
},
}
}
)
assert repo_save.call_count == 2


@patch("common.pull_ftp.trigger_dag.trigger_dag")
@patch.object(IRepository, attribute="get_by_id", return_value=BytesIO())
@patch.object(
Expand Down

0 comments on commit 6466cbf

Please sign in to comment.