Skip to content

Commit

Permalink
workflows: add normalizing collaborations to data harvest
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Jan 9, 2025
1 parent 9e78629 commit 4354531
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 1 deletion.
44 changes: 43 additions & 1 deletion workflows/dags/data/data_harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from airflow.decorators import dag, task, task_group
from airflow.models import Variable
from hooks.generic_http_hook import GenericHttpHook
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
)
from inspire_utils.dedupers import dedupe_list
from tenacity import RetryError

logger = logging.getLogger(__name__)
Expand All @@ -27,10 +29,12 @@ def data_harvest_dag():
1. collect_ids: Obtains all new data ids to process.
2. download_record_versions: fetches a data record and all its previous versions
3. build_record: Build a record that is compatible with the INSPIRE data schema
4. load_record: Creates or Updates the record on INSPIRE.
4. normalize_collaborations: Normalize the collaborations in the record.
5. load_record: Creates or Updates the record on INSPIRE.
"""
generic_http_hook = GenericHttpHook(http_conn_id="hepdata_connection")
inspire_http_record_management_hook = InspireHTTPRecordManagementHook()
inspire_http_hook = InspireHttpHook()

data_schema = Variable.get("data_schema")
url = inspire_http_record_management_hook.get_url()
Expand Down Expand Up @@ -176,6 +180,43 @@ def add_keywords(record, builder):
data["$schema"] = data_schema
return data

@task
def normalize_collaborations(record):
"""Normalize the collaborations in the record.
Args: record (dict): The record to normalize.
Returns: dict: The normalized record.
"""

collaborations = record.get("collaborations", [])

if not collaborations:
return record

payload = {
"collaborations": collaborations,
"workflow_id": record["acquisition_source"]["submission_number"],
}

response = inspire_http_hook.call_api(
endpoint="api/curation/literature/collaborations-normalization",
method="GET",
data=payload,
)
response.raise_for_status()
obj_accelerator_experiments = record.get("accelerator_experiments", [])
normalized_accelerator_experiments = response.json()[
"accelerator_experiments"
]

if normalized_accelerator_experiments or obj_accelerator_experiments:
record["accelerator_experiments"] = dedupe_list(
obj_accelerator_experiments + normalized_accelerator_experiments
)
record["collaborations"] = response.json()["normalized_collaborations"]

return record

@task
def load_record(new_record):
"""Load the record to inspirehep.
Expand Down Expand Up @@ -210,6 +251,7 @@ def load_record(new_record):
record = build_record(
data_schema=data_schema, inspire_url=url, payload=hepdata_record_versions
)
record = normalize_collaborations(record)
load_record(record)

process_record.expand(record_id=collect_ids())
Expand Down
1 change: 1 addition & 0 deletions workflows/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
apache-airflow==2.9.3
inspire-utils~=3.0.63
15 changes: 15 additions & 0 deletions workflows/tests/test_data_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,21 @@ def test_load_record_put(self):
json_response = task.execute(context=self.context)
assert json_response

def test_normalize_collaborations(self):
# test what is returned if collaborations are initally empty
record = {
"collaborations": [{"value": "ETM"}],
"acquisition_source": {"submission_number": "123"},
}
task = self.dag.get_task("process_record.normalize_collaborations")
task.op_args = (record,)
json_response = task.execute(context=self.context)

assert "record" in json_response["collaborations"][0]
assert (
json_response["accelerator_experiments"][0]["legacy_name"] == "LATTICE-ETM"
)

@pytest.mark.vcr
def test_load_record_post(self):
record = {
Expand Down

0 comments on commit 4354531

Please sign in to comment.