Skip to content

Commit

Permalink
workflows: add normalizing collaborations to data harvest
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Jan 8, 2025
1 parent 9e78629 commit 62a935d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
52 changes: 51 additions & 1 deletion workflows/dags/data/data_harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
from airflow.decorators import dag, task, task_group
from airflow.models import Variable
from hooks.generic_http_hook import GenericHttpHook
from hooks.inspirehep.inspire_http_hook import (
InspireHttpHook,
)
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
)
from inspire_utils import dedupe_list
from tenacity import RetryError

logger = logging.getLogger(__name__)
Expand All @@ -27,10 +31,12 @@ def data_harvest_dag():
1. collect_ids: Obtains all new data ids to process.
2. download_record_versions: fetches a data record and all its previous versions
3. build_record: Build a record that is compatible with the INSPIRE data schema
4. load_record: Creates or Updates the record on INSPIRE.
4. normalize_collaborations: Normalize the collaborations in the record.
5. load_record: Creates or Updates the record on INSPIRE.
"""
generic_http_hook = GenericHttpHook(http_conn_id="hepdata_connection")
inspire_http_record_management_hook = InspireHTTPRecordManagementHook()
inspire_http_hook = InspireHttpHook()

data_schema = Variable.get("data_schema")
url = inspire_http_record_management_hook.get_url()
Expand Down Expand Up @@ -176,6 +182,49 @@ def add_keywords(record, builder):
data["$schema"] = data_schema
return data

@task
def normalize_collaborations(record):
"""Normalize the collaborations in the record.
Args: record (dict): The record to normalize.
Returns: dict: The normalized record.
"""

collaborations = record.get("collaborations", [])

if not collaborations:
return record

payload = {
"collaborations": collaborations,
"workflow_id": record.id,
}

# "accept": "application/json",
# "content-type": "application/json",
# "Authorization": "Bearer {token}".format(
# token=current_app.config["AUTHENTICATION_TOKEN"]
# ),

response = inspire_http_hook.call_api(
endpoint="/curation/literature/collaborations-normalization",
method="GET",
params=payload,
)
response.raise_for_status()

obj_accelerator_experiments = record.get("accelerator_experiments", [])
normalized_accelerator_experiments = response.json()[
"accelerator_experiments"
]

if normalized_accelerator_experiments or obj_accelerator_experiments:
record["accelerator_experiments"] = dedupe_list(
obj_accelerator_experiments + normalized_accelerator_experiments
)
record["collaborations"] = response.json()["normalized_collaborations"]
return record

@task
def load_record(new_record):
"""Load the record to inspirehep.
Expand Down Expand Up @@ -210,6 +259,7 @@ def load_record(new_record):
record = build_record(
data_schema=data_schema, inspire_url=url, payload=hepdata_record_versions
)
record = normalize_collaborations(record)
load_record(record)

process_record.expand(record_id=collect_ids())
Expand Down
1 change: 1 addition & 0 deletions workflows/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
apache-airflow==2.9.3
inspire-utils~=3.0.63

0 comments on commit 62a935d

Please sign in to comment.