Skip to content

Commit

Permalink
workflows: fix data harvesting
Browse files Browse the repository at this point in the history
- correctly format observables
- add date parameter to dag
- fetch collections ids relative to the logical date

* ref: cern-sis/issues-inspire/issues/633
  • Loading branch information
DonHaul committed Dec 20, 2024
1 parent 9e78629 commit 33a1957
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 10 deletions.
16 changes: 10 additions & 6 deletions workflows/dags/data/data_harvest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime
import logging
from datetime import timedelta

from airflow.decorators import dag, task, task_group
from airflow.macros import ds_add
from airflow.models import Variable
from airflow.models.param import Param
from hooks.generic_http_hook import GenericHttpHook
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
Expand All @@ -19,6 +20,7 @@
catchup=False,
tags=["data"],
max_active_runs=5,
params={"last_updated": Param(type=["null", "string"], default="")},
)
def data_harvest_dag():
"""Defines the DAG for the HEPData harvest workflow.
Expand All @@ -36,14 +38,15 @@ def data_harvest_dag():
url = inspire_http_record_management_hook.get_url()

@task(task_id="collect_ids")
def collect_ids():
def collect_ids(**context):
"""Collects the ids of the records that have been updated in the last two days.
Returns: list of ids
"""

from_date = (datetime.datetime.now().date() - timedelta(days=1)).strftime(
"%Y-%m-%d"
from_date = (
context["params"]["last_updated"]
if context["params"]["last_updated"]
else ds_add(context["ds"], -1)
)
payload = {"inspire_ids": True, "last_updated": from_date, "sort_by": "latest"}
hepdata_response = generic_http_hook.call_api(
Expand Down Expand Up @@ -115,7 +118,8 @@ def add_keywords(record, builder):
f"{keyword}: {item[0]['lte']}-{item[0]['gte']}"
)
elif keyword == "observables":
builder.add_keyword(f"{keyword}: {','.join(item)}")
for value in item:
builder.add_keyword(f"observables: {value}")
else:
for value in item:
builder.add_keyword(value)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
interactions:
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
method: GET
uri: https://www.hepdata.net/search/ids?inspire_ids=True&last_updated=2024-12-15&sort_by=latest
response:
body:
string: '[2693068,2807749,2809112]
'
headers:
access-control-allow-origin:
- '*'
alt-svc:
- h3=":443";ma=60;
content-length:
- '26'
content-type:
- application/json
date:
- Fri, 20 Dec 2024 13:31:10 GMT
permissions-policy:
- interest-cohort=()
referrer-policy:
- strict-origin-when-cross-origin
retry-after:
- '60'
server:
- gunicorn
set-cookie:
- session=90cb32b42632028e_6765719e.Nlm50tE_qaIh5AoWIDrxXj38HSg; Domain=.www.hepdata.net;
Expires=Sat, 21 Dec 2024 01:31:10 GMT; HttpOnly; Path=/; SameSite=Lax
transfer-encoding:
- chunked
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- sameorigin
x-proxy-backend:
- hepdata-prod_hepdata-web_http
x-ratelimit-limit:
- '60'
x-ratelimit-remaining:
- '59'
x-ratelimit-reset:
- '1734701531'
x-xss-protection:
- 1; mode=block
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
interactions:
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
method: GET
uri: https://www.hepdata.net/search/ids?inspire_ids=True&last_updated=2024-12-15&sort_by=latest
response:
body:
string: '[2693068,2807749,2809112]
'
headers:
access-control-allow-origin:
- '*'
alt-svc:
- h3=":443";ma=60;
content-length:
- '26'
content-type:
- application/json
date:
- Fri, 20 Dec 2024 13:27:17 GMT
permissions-policy:
- interest-cohort=()
referrer-policy:
- strict-origin-when-cross-origin
retry-after:
- '29'
server:
- gunicorn
set-cookie:
- session=350679c09a0fcb67_676570b5.rCaGWwHUs2OzQ84fS-iSUDafDik; Domain=.www.hepdata.net;
Expires=Sat, 21 Dec 2024 01:27:17 GMT; HttpOnly; Path=/; SameSite=Lax
transfer-encoding:
- chunked
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- sameorigin
x-proxy-backend:
- hepdata-prod_hepdata-web_http
x-ratelimit-limit:
- '60'
x-ratelimit-remaining:
- '58'
x-ratelimit-reset:
- '1734701267'
x-xss-protection:
- 1; mode=block
status:
code: 200
message: OK
version: 1
2 changes: 1 addition & 1 deletion workflows/tests/data_records/ins1906174_version3.json

Large diffs are not rendered by default.

15 changes: 12 additions & 3 deletions workflows/tests/test_data_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,18 @@ class TestDataHarvest:
context = Context()

@pytest.mark.vcr
def test_collect_ids(self):
def test_collect_ids_param(self):
task = self.dag.get_task("collect_ids")
res = task.execute(context=self.context)
assert res == [2807680, 2779339]
res = task.execute(context=Context({"params": {"last_updated": "2024-12-15"}}))
assert res == [2693068, 2807749, 2809112]

@pytest.mark.vcr
def test_collect_ids_logical_date(self):
task = self.dag.get_task("collect_ids")
res = task.execute(
context=Context({"ds": "2024-12-16", "params": {"last_updated": ""}})
)
assert res == [2693068, 2807749, 2809112]

@pytest.mark.vcr
def test_download_record_versions(self):
Expand Down Expand Up @@ -50,6 +58,7 @@ def test_build_record(self):

assert res["keywords"][0]["value"] == "cmenergies: 13000.0-13000.0"
assert res["keywords"][1]["value"] == "observables: m_MMC"
assert res["keywords"][1]["value"] == "observables: e=mc2"

assert (
res["urls"][0]["value"] == payload["base"]["record"]["resources"][0]["url"]
Expand Down

0 comments on commit 33a1957

Please sign in to comment.