Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http sensor provider #29

Merged
merged 3 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ repos:
rev: "v2.7.1"
hooks:
- id: prettier
- repo: https://github.com/pycqa/isort
rev: "5.12.0"
hooks:
- id: isort
- repo: https://github.com/pycqa/flake8
rev: "3.9.2"
hooks:
Expand All @@ -22,3 +18,8 @@ repos:
- id: check-merge-conflict
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/pycqa/isort
rev: "5.12.0"
hooks:
- id: isort
args: ["--profile", "black"]
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/airflow:2.8.2-python3.10
FROM registry.cern.ch/cern-sis/airflow-base:2.8.3

ENV PYTHONBUFFERED=0
ENV AIRFLOW__LOGGING__LOGGING_LEVEL=INFO
Expand All @@ -9,4 +9,4 @@ COPY requirements-test.txt ./requirements-test.txt

COPY dags ./dags

RUN pip install --no-cache-dir --user -r requirements-test.txt -r requirements.txt
RUN pip install -r requirements-test.txt -r requirements.txt
43 changes: 42 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# What is BI-DAGs

BI-DAGs is a component of the monitoring project developed for the RCS-SIS group. It plays an important role in tracking key performance indicators (KPIs) to monitor progress and analyze the current situation.

BI-DAGs operates as an Apache Airflow instance dedicated to managing data harvesting from various sources including CDS, ILS, and others. The harvested data is then processed and pushed into a PostgreSQL database. Subsequently, Apache Superset retrieves this data to present it in the desired format for analysis and reporting.

# BI-DAGs Setup Guide

This README provides a step-by-step guide on setting up your environment for running BI-DAGs with Airflow.
Expand Down Expand Up @@ -66,7 +72,7 @@ airflow standalone
If you're using Docker to manage your Postgres database, start the service.

```sh
docker-compose start
docker-compose -f docker-compose.standalone.yaml up
```

### 7. Add Airflow Connections via UI
Expand All @@ -87,6 +93,41 @@ More information, how to manage db connections can be found [here](https://airfl

After completing these steps, your environment should be set up and ready for running BI-DAGs with Airflow.

## Running with Docker Compose

1. To start the services using Docker Compose, simply run:

```sh
docker-compose up
```

All the required environment variables are already configured in the `docker-compose.yml` file.

### Creating a User in Airflow

Before logging into the Airflow UI, you need to create a user. Follow these steps to create a user in the Airflow web container from the command line:

1. Ensure the Airflow services are running.
2. Access the Airflow web container by running:

```sh
docker-compose exec airflow-web bash
```

3. Create a new Airflow user with the following command (replace `<username>`, `<password>`, `<firstname>`, `<lastname>`, and `<email>` with your desired values):

```sh
airflow users create --username <username> --password <password> --firstname <firstname> --lastname <lastname> --role Admin --email <email>
```

Example:

```sh
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email [email protected]
```

After creating the user, you can log in to the Airflow UI with the credentials you specified.

## Database Migrations

By following these guidelines, you can seamlessly manage and track database migrations within your Airflow environment.
Expand Down
7 changes: 2 additions & 5 deletions dags/open_access/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+" + r"not+540__f:Bronze+not+540__3:preprint"
)
BRONZE_ACCESS = r"540__f:'Bronze'"
GREEN_ACCESS = (
r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:"
+ r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'"
)
GOLD_ACCESS = r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')"
GREEN_ACCESS = r""
GOLD_ACCESS = r""

CERN_READ_AND_PUBLISH = r"540__f:'CERN-RP"
CERN_INDIVIDUAL_APCS = r"540__f:'CERN-APC'"
Expand Down
81 changes: 49 additions & 32 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,78 @@
import open_access.constants as constants
import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from open_access.utils import get_url
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
start_date=pendulum.today("UTC").add(days=-1),
schedule="@monthly",
schedule_interval="@monthly",
params={"year": 2023},
)
def oa_gold_open_access_mechanisms():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
@task(multiple_outputs=True)
def generate_params(query, **kwargs):
year = kwargs["params"].get("year")
current_collection = "Published+Articles"
golden_access_base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter+not+595:'Not+for+annual+report"
)
type_of_query = [*query][0]
url = get_url(f"{golden_access_base_query}+{query[type_of_query]}")
data = request_again_if_failed(url)
total = get_total_results_count(data.text)
return {type_of_query: total}
query_p = rf"{golden_access_base_query}+{query[type_of_query]}"

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query_p}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
}

@task
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BEFORE DEPLOYING, PLEASE MAKE SURE THAT THE ENV VAR FOR CONNECTION IS EXPORTED IN HELM CHART

response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
},
)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

query_list = [
{"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH},
{"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS},
{"scoap3": constants.SCOAP3},
{"other": constants.OTHER},
{"other_collective_models": constants.OTHER_COLLECTIVE_MODELS},
]

parameters = generate_params.expand(query=query_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = fetch_data_task.expand(
query=[
{"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH},
{"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS},
{"scoap3": constants.SCOAP3},
{"other": constants.OTHER},
{"other_collective_models": constants.OTHER_COLLECTIVE_MODELS},
],
)
unpacked_results = join(results)
results = join_and_add_year(counts)

PostgresOperator(
populate_golden_open_access = PostgresOperator(
task_id="populate_golden_open_access",
postgres_conn_id="superset_qa",
sql="""
INSERT INTO oa_golden_open_access (year, cern_read_and_publish, cern_individual_apcs,
scoap3, other, other_collective_models, created_at, updated_at)
VALUES (%(years)s, %(cern_read_and_publish)s, %(cern_individual_apcs)s,
VALUES (%(year)s, %(cern_read_and_publish)s, %(cern_individual_apcs)s,
%(scoap3)s, %(other)s, %(other_collective_models)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
Expand All @@ -64,16 +86,11 @@ def join(values, **kwargs):
other_collective_models = EXCLUDED.other_collective_models,
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"cern_read_and_publish": unpacked_results["cern_read_and_publish"],
"cern_individual_apcs": unpacked_results["cern_individual_apcs"],
"scoap3": unpacked_results["scoap3"],
"other": unpacked_results["other"],
"other_collective_models": unpacked_results["other_collective_models"],
},
parameters=results,
executor_config=kubernetes_executor_config,
)

counts >> results >> populate_golden_open_access


OA_gold_open_access_mechanisms = oa_gold_open_access_mechanisms()
13 changes: 4 additions & 9 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import logging
import os
from functools import reduce

import open_access.constants as constants
Expand All @@ -20,22 +18,19 @@ def oa_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
year = kwargs["params"].get("year")
cds_token = os.environ.get("CDS_TOKEN")
if not cds_token:
logging.warning("cds token is not set!")
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query][0]
url = utils.get_url(query=f"{base_query}+{query[type_of_query]}")
data = request_again_if_failed(url=url, cds_token=cds_token)
url = utils.get_url(query=f"{base_query}")
data = request_again_if_failed(url=url)
total = get_total_results_count(data.text)
if type_of_query == "gold":
total = utils.get_gold_access_count(total, url)
total = utils.get_golden_access_count(total, url)
if type_of_query == "green":
total = total - utils.get_gold_access_count(total, url)
total = utils.get_green_access_count(total, url)
return {type_of_query: total}

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
Expand Down
Loading
Loading