Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Nov 15, 2023
1 parent fb37938 commit 04c42b8
Showing 2 changed files with 25 additions and 24 deletions.
22 changes: 12 additions & 10 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
@@ -233,22 +233,24 @@ def process_zip_file_elsevier(file_bytes, file_name):
file_bytes.seek(0)
with zipfile.ZipFile(file_bytes) as zip:
for filename in zip.namelist():
if "dataset.xml" in filename:
file_prefix = ".".join(file_name.split(".")[:-1])
zip_file_content = zip.read(filename)
s3_filename = os.path.join(file_prefix, filename)
return (zip_file_content, s3_filename)
if "dataset.xml" not in filename:
continue
file_prefix = ".".join(file_name.split(".")[:-1])
zip_file_content = zip.read(filename)
s3_filename = os.path.join(file_prefix, filename)
return (zip_file_content, s3_filename)


def process_tar_file_elsevier(file_bytes, file_name):
file_bytes.seek(0)
with tarfile.open(fileobj=file_bytes, mode="r") as tar:
for filename in tar.getnames():
if "dataset.xml" in filename:
file_prefix = ".".join(file_name.split(".")[:-1])
tar_file_content = tar.extractfile(filename).read()
s3_filename = os.path.join(file_prefix, filename)
return (tar_file_content, s3_filename)
if "dataset.xml" not in filename:
continue
file_prefix = ".".join(file_name.split(".")[:-1])
tar_file_content = tar.extractfile(filename).read()
s3_filename = os.path.join(file_prefix, filename)
return (tar_file_content, s3_filename)


def process_archive_elsevier(file_bytes, file_name):
27 changes: 13 additions & 14 deletions dags/elsevier/trigger_file_processing.py
Original file line number Diff line number Diff line change
@@ -44,18 +44,17 @@ def trigger_file_processing_elsevier(
file_bytes_extracted = repo.get_by_id(
os.path.join(*["extracted", parsed_article["files"]["xml"]])
)
for article in article_splitter_function(file_bytes_extracted):
_id = _generate_id(publisher)
encoded_article = base64.b64encode(article.getvalue()).decode()
trigger_dag.trigger_dag(
dag_id=f"{publisher}_process_file",
run_id=_id,
conf={
"file": encoded_article,
"file_name": full_file_path,
"metadata": parsed_article,
},
replace_microseconds=False,
)
files.append(full_file_path)
_id = _generate_id(publisher)
encoded_article = base64.b64encode(file_bytes_extracted.getvalue()).decode()
trigger_dag.trigger_dag(
dag_id=f"{publisher}_process_file",
run_id=_id,
conf={
"file": encoded_article,
"file_name": full_file_path,
"metadata": parsed_article,
},
replace_microseconds=False,
)
files.append(full_file_path)
return files

0 comments on commit 04c42b8

Please sign in to comment.