From 733ba1c3bb846e9d3c06426163e99ab533369e57 Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Mon, 19 Aug 2024 15:02:41 +0200 Subject: [PATCH] DAGs: fixes * Inspire matomo record update fix. * Annual Reports scheduler date fix to every 15 days. --- dags/annual_reports/annual_reports_categories.py | 2 +- .../annual_reports/anuual_reports_publications.py | 5 ++++- .../models/inspire_matomo/inspire_matomo.py | 2 +- dags/inspire_matomo/inspire_visits_per_day_dag.py | 15 +++++++-------- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/dags/annual_reports/annual_reports_categories.py b/dags/annual_reports/annual_reports_categories.py index d48fb86..0291b42 100644 --- a/dags/annual_reports/annual_reports_categories.py +++ b/dags/annual_reports/annual_reports_categories.py @@ -17,7 +17,7 @@ @dag( start_date=pendulum.today("UTC").add(days=-1), - schedule="@monthly", + schedule="0 0 */15 * *", ) def annual_reports_categories_dag(): @task(executor_config=kubernetes_executor_config) diff --git a/dags/annual_reports/anuual_reports_publications.py b/dags/annual_reports/anuual_reports_publications.py index 9a84183..bb97ce8 100644 --- a/dags/annual_reports/anuual_reports_publications.py +++ b/dags/annual_reports/anuual_reports_publications.py @@ -15,7 +15,10 @@ years = list(range(2004, current_year + 1)) -@dag(start_date=pendulum.today("UTC").add(days=-1), schedule="@monthly") +@dag( + start_date=pendulum.today("UTC").add(days=-1), + schedule="0 0 */15 * *", +) def annual_reports_publications_dag(): @task(executor_config=kubernetes_executor_config) def fetch_publication_report_count(year, **kwargs): diff --git a/dags/common/models/inspire_matomo/inspire_matomo.py b/dags/common/models/inspire_matomo/inspire_matomo.py index e23162f..b8514a8 100644 --- a/dags/common/models/inspire_matomo/inspire_matomo.py +++ b/dags/common/models/inspire_matomo/inspire_matomo.py @@ -12,4 +12,4 @@ class MatomoData(Base): visits = Column(Integer) unique_visitors = Column(Integer) created_at = Column(DateTime, default=func.now()) - updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) + updated_at = Column(DateTime, default=func.now()) diff --git a/dags/inspire_matomo/inspire_visits_per_day_dag.py b/dags/inspire_matomo/inspire_visits_per_day_dag.py index 48866ce..922beab 100644 --- a/dags/inspire_matomo/inspire_visits_per_day_dag.py +++ b/dags/inspire_matomo/inspire_visits_per_day_dag.py @@ -10,6 +10,7 @@ from common.operators.sqlalchemy_operator import sqlalchemy_task from executor_config import kubernetes_executor_config from inspire_matomo.utils import get_parameters +from sqlalchemy import func from tenacity import retry_if_exception_type, stop_after_attempt now = datetime.now() @@ -64,20 +65,18 @@ def fetch_unique_visitors_per_day(**kwargs): @sqlalchemy_task(conn_id="superset") def populate_database(visits_per_day, unique_visitors_per_day, session, **kwargs): + print(json.loads(visits_per_day)) visits_per_day_json = json.loads(visits_per_day) unique_visitors_per_day_json = json.loads(unique_visitors_per_day) date = kwargs["params"].get("date") + parsed_date = datetime.strptime(date, "%Y-%m-%d").date() - record = ( - session.query(MatomoData) - .filter_by(date=visits_per_day_json.get("date")) - .first() - ) + record = session.query(MatomoData).filter_by(date=parsed_date).first() if record: - record.visits = int(visits_per_day_json.visits) - record.unique_visitors = int(unique_visitors_per_day_json.unique_visitors) + record.visits = int(visits_per_day_json.get("value")) + record.unique_visitors = int(unique_visitors_per_day_json.get("value")) + record.updated_at = func.now() else: - parsed_date = datetime.strptime(date, "%Y-%m-%d").date() new_record = MatomoData( visits=int(visits_per_day_json.get("value")), unique_visitors=int(unique_visitors_per_day_json.get("value")),