Skip to content

Commit

Permalink
OA: added postgres operator
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Apr 16, 2024
1 parent 30c8bfd commit 6c6dc8b
Showing 1 changed file with 32 additions and 3 deletions.
35 changes: 32 additions & 3 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator


@dag(
Expand All @@ -26,8 +27,10 @@ def fetch_data_task(query, **kwargs):
return {type_of_query: total}

@task()
def join(values):
return reduce(lambda a, b: {**a, **b}, values)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
return results

results = fetch_data_task.expand(
query=[
Expand All @@ -37,7 +40,33 @@ def join(values):
{"gold": utils.gold_access_query},
],
)
join(results)

unpacked_results = join(results)

PostgresOperator(
task_id="populate_open_access_table",
postgres_conn_id="superset_qa",
sql="""
INSERT INTO oa.open_access (year, closed_access, bronze_open_access,
green_open_access, gold_open_access, created_at, updated_at)
VALUES (%(year)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
closed_access = EXCLUDED.closed_access,
bronze_open_access = EXCLUDED.bronze_open_access,
green_open_access = EXCLUDED.green_open_access,
gold_open_access = EXCLUDED.gold_open_access,
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"year": unpacked_results["year"],
"closed": unpacked_results["closed"],
"bronze": unpacked_results["bronze"],
"green": unpacked_results["green"],
"gold": unpacked_results["gold"],
},
)


OA_dag = oa_dag()

0 comments on commit 6c6dc8b

Please sign in to comment.