From 6c6dc8bea5fe8db81297b6de8e6a6f0be6bbc5ce Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Tue, 16 Apr 2024 09:31:32 +0200 Subject: [PATCH] OA: added postgres operator --- dags/open_access/open_access.py | 35 ++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/dags/open_access/open_access.py b/dags/open_access/open_access.py index a0fe034..fd75577 100644 --- a/dags/open_access/open_access.py +++ b/dags/open_access/open_access.py @@ -3,6 +3,7 @@ import open_access.utils as utils import pendulum from airflow.decorators import dag, task +from airflow.providers.postgres.operators.postgres import PostgresOperator @dag( @@ -26,8 +27,10 @@ def fetch_data_task(query, **kwargs): return {type_of_query: total} @task() - def join(values): - return reduce(lambda a, b: {**a, **b}, values) + def join(values, **kwargs): + results = reduce(lambda a, b: {**a, **b}, values) + results["years"] = kwargs["params"].get("year") + return results results = fetch_data_task.expand( query=[ @@ -37,7 +40,33 @@ def join(values): {"gold": utils.gold_access_query}, ], ) - join(results) + + unpacked_results = join(results) + + PostgresOperator( + task_id="populate_open_access_table", + postgres_conn_id="superset_qa", + sql=""" + INSERT INTO oa.open_access (year, closed_access, bronze_open_access, + green_open_access, gold_open_access, created_at, updated_at) + VALUES (%(year)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (year) + DO UPDATE SET + closed_access = EXCLUDED.closed_access, + bronze_open_access = EXCLUDED.bronze_open_access, + green_open_access = EXCLUDED.green_open_access, + gold_open_access = EXCLUDED.gold_open_access, + updated_at = CURRENT_TIMESTAMP; + """, + parameters={ + "year": unpacked_results["year"], + "closed": unpacked_results["closed"], + "bronze": unpacked_results["bronze"], + "green": unpacked_results["green"], + "gold": unpacked_results["gold"], + }, + ) OA_dag = oa_dag()