Skip to content

Commit

Permalink
workflows: fix data harvesting
Browse files Browse the repository at this point in the history
- correctly format observables
- add date parameter to dag
- fetch collections ids relative to first run

* ref: cern-sis/issues-inspire/issues/633
  • Loading branch information
DonHaul committed Dec 20, 2024
1 parent 9e78629 commit 2e952a4
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
16 changes: 10 additions & 6 deletions workflows/dags/data/data_harvest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime
import logging
from datetime import timedelta

from airflow.decorators import dag, task, task_group
from airflow.macros import ds_add
from airflow.models import Variable
from airflow.models.param import Param
from hooks.generic_http_hook import GenericHttpHook
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
Expand All @@ -19,6 +20,7 @@
catchup=False,
tags=["data"],
max_active_runs=5,
params={"last_updated": Param(type=["null", "string"], default="")},
)
def data_harvest_dag():
"""Defines the DAG for the HEPData harvest workflow.
Expand All @@ -36,14 +38,15 @@ def data_harvest_dag():
url = inspire_http_record_management_hook.get_url()

@task(task_id="collect_ids")
def collect_ids():
def collect_ids(**context):
"""Collects the ids of the records that have been updated in the last two days.
Returns: list of ids
"""

from_date = (datetime.datetime.now().date() - timedelta(days=1)).strftime(
"%Y-%m-%d"
from_date = (
context["params"]["last_updated"]
if context["params"]["last_updated"]
else ds_add(context["ds"], -1)
)
payload = {"inspire_ids": True, "last_updated": from_date, "sort_by": "latest"}
hepdata_response = generic_http_hook.call_api(
Expand Down Expand Up @@ -115,7 +118,8 @@ def add_keywords(record, builder):
f"{keyword}: {item[0]['lte']}-{item[0]['gte']}"
)
elif keyword == "observables":
builder.add_keyword(f"{keyword}: {','.join(item)}")
for value in item:
builder.add_keyword(f"observables: {value}")
else:
for value in item:
builder.add_keyword(value)
Expand Down
2 changes: 1 addition & 1 deletion workflows/tests/data_records/ins1906174_version3.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions workflows/tests/test_data_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def test_build_record(self):

assert res["keywords"][0]["value"] == "cmenergies: 13000.0-13000.0"
assert res["keywords"][1]["value"] == "observables: m_MMC"
assert res["keywords"][1]["value"] == "observables: e=mc2"

assert (
res["urls"][0]["value"] == payload["base"]["record"]["resources"][0]["url"]
Expand Down

0 comments on commit 2e952a4

Please sign in to comment.