diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2fc4a58..6e6a4b1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,10 +7,6 @@ repos: rev: "v2.7.1" hooks: - id: prettier - - repo: https://github.com/pycqa/isort - rev: "5.12.0" - hooks: - - id: isort - repo: https://github.com/pycqa/flake8 rev: "3.9.2" hooks: @@ -22,3 +18,8 @@ repos: - id: check-merge-conflict - id: end-of-file-fixer - id: trailing-whitespace + - repo: https://github.com/pycqa/isort + rev: "5.12.0" + hooks: + - id: isort + args: ["--profile", "black"] diff --git a/Dockerfile b/Dockerfile index b51ef49..cf24a24 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM apache/airflow:2.8.2-python3.10 +FROM registry.cern.ch/cern-sis/airflow-base:2.8.3 ENV PYTHONBUFFERED=0 ENV AIRFLOW__LOGGING__LOGGING_LEVEL=INFO @@ -9,4 +9,4 @@ COPY requirements-test.txt ./requirements-test.txt COPY dags ./dags -RUN pip install --no-cache-dir --user -r requirements-test.txt -r requirements.txt +RUN pip install -r requirements-test.txt -r requirements.txt diff --git a/README.md b/README.md index 6e60172..56ae385 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ +# What is BI-DAGs + +BI-DAGs is a component of the monitoring project developed for the RCS-SIS group. It plays an important role in tracking key performance indicators (KPIs) to monitor progress and analyze the current situation. + +BI-DAGs operates as an Apache Airflow instance dedicated to managing data harvesting from various sources including CDS, ILS, and others. The harvested data is then processed and pushed into a PostgreSQL database. Subsequently, Apache Superset retrieves this data to present it in the desired format for analysis and reporting. + # BI-DAGs Setup Guide This README provides a step-by-step guide on setting up your environment for running BI-DAGs with Airflow. @@ -66,7 +72,7 @@ airflow standalone If you're using Docker to manage your Postgres database, start the service. ```sh -docker-compose start +docker-compose -f docker-compose.standalone.yaml up ``` ### 7. Add Airflow Connections via UI @@ -87,6 +93,41 @@ More information, how to manage db connections can be found [here](https://airfl After completing these steps, your environment should be set up and ready for running BI-DAGs with Airflow. +## Running with Docker Compose + +1. To start the services using Docker Compose, simply run: + +```sh +docker-compose up +``` + +All the required environment variables are already configured in the `docker-compose.yml` file. + +### Creating a User in Airflow + +Before logging into the Airflow UI, you need to create a user. Follow these steps to create a user in the Airflow web container from the command line: + +1. Ensure the Airflow services are running. +2. Access the Airflow web container by running: + +```sh +docker-compose exec airflow-web bash +``` + +3. Create a new Airflow user with the following command (replace ``, ``, ``, ``, and `` with your desired values): + +```sh +airflow users create --username --password --firstname --lastname --role Admin --email +``` + +Example: + +```sh +airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com +``` + +After creating the user, you can log in to the Airflow UI with the credentials you specified. + ## Database Migrations By following these guidelines, you can seamlessly manage and track database migrations within your Airflow environment. diff --git a/dags/open_access/constants.py b/dags/open_access/constants.py index 294509c..1b68abe 100644 --- a/dags/open_access/constants.py +++ b/dags/open_access/constants.py @@ -2,11 +2,8 @@ r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+" + r"not+540__f:Bronze+not+540__3:preprint" ) BRONZE_ACCESS = r"540__f:'Bronze'" -GREEN_ACCESS = ( - r"not+540__a:'CC+BY'+not+540__a:'CC-BY'+not+540__a:" - + r"'arXiv+nonexclusive-distrib'+not+540__f:'Bronze'" -) -GOLD_ACCESS = r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')" +GREEN_ACCESS = r"" +GOLD_ACCESS = r"" CERN_READ_AND_PUBLISH = r"540__f:'CERN-RP" CERN_INDIVIDUAL_APCS = r"540__f:'CERN-APC'" diff --git a/dags/open_access/gold_open_access_mechanisms.py b/dags/open_access/gold_open_access_mechanisms.py index 1887e60..fb5b91f 100644 --- a/dags/open_access/gold_open_access_mechanisms.py +++ b/dags/open_access/gold_open_access_mechanisms.py @@ -3,56 +3,78 @@ import open_access.constants as constants import pendulum from airflow.decorators import dag, task +from airflow.providers.http.hooks.http import HttpHook from airflow.providers.postgres.operators.postgres import PostgresOperator -from common.utils import get_total_results_count, request_again_if_failed +from common.utils import get_total_results_count from executor_config import kubernetes_executor_config -from open_access.utils import get_url +from tenacity import retry_if_exception_type, stop_after_attempt @dag( start_date=pendulum.today("UTC").add(days=-1), - schedule="@monthly", + schedule_interval="@monthly", params={"year": 2023}, ) def oa_gold_open_access_mechanisms(): - @task(executor_config=kubernetes_executor_config) - def fetch_data_task(query, **kwargs): + @task(multiple_outputs=True) + def generate_params(query, **kwargs): year = kwargs["params"].get("year") + current_collection = "Published+Articles" golden_access_base_query = ( r"(affiliation:CERN+or+595:'For+annual+report')" + rf"and+year:{year}+not+980:ConferencePaper+" + r"not+980:BookChapter+not+595:'Not+for+annual+report" ) type_of_query = [*query][0] - url = get_url(f"{golden_access_base_query}+{query[type_of_query]}") - data = request_again_if_failed(url) - total = get_total_results_count(data.text) - return {type_of_query: total} + query_p = rf"{golden_access_base_query}+{query[type_of_query]}" + + return { + "endpoint": rf"search?ln=en&cc={current_collection}&p={query_p}" + + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" + + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm", + "type_of_query": type_of_query, + } + + @task + def fetch_count(parameters): + http_hook = HttpHook(http_conn_id="cds", method="GET") + response = http_hook.run_with_advanced_retry( + endpoint=parameters["endpoint"], + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(Exception), + }, + ) + count = get_total_results_count(response.text) + return {parameters["type_of_query"]: count} + + query_list = [ + {"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH}, + {"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS}, + {"scoap3": constants.SCOAP3}, + {"other": constants.OTHER}, + {"other_collective_models": constants.OTHER_COLLECTIVE_MODELS}, + ] + + parameters = generate_params.expand(query=query_list) + counts = fetch_count.expand(parameters=parameters) @task(multiple_outputs=True, executor_config=kubernetes_executor_config) - def join(values, **kwargs): - results = reduce(lambda a, b: {**a, **b}, values) - results["years"] = kwargs["params"].get("year") + def join_and_add_year(counts, **kwargs): + year = kwargs["params"].get("year") + results = reduce(lambda a, b: {**a, **b}, counts) + results["year"] = year return results - results = fetch_data_task.expand( - query=[ - {"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH}, - {"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS}, - {"scoap3": constants.SCOAP3}, - {"other": constants.OTHER}, - {"other_collective_models": constants.OTHER_COLLECTIVE_MODELS}, - ], - ) - unpacked_results = join(results) + results = join_and_add_year(counts) - PostgresOperator( + populate_golden_open_access = PostgresOperator( task_id="populate_golden_open_access", postgres_conn_id="superset_qa", sql=""" INSERT INTO oa_golden_open_access (year, cern_read_and_publish, cern_individual_apcs, scoap3, other, other_collective_models, created_at, updated_at) - VALUES (%(years)s, %(cern_read_and_publish)s, %(cern_individual_apcs)s, + VALUES (%(year)s, %(cern_read_and_publish)s, %(cern_individual_apcs)s, %(scoap3)s, %(other)s, %(other_collective_models)s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT (year) @@ -64,16 +86,11 @@ def join(values, **kwargs): other_collective_models = EXCLUDED.other_collective_models, updated_at = CURRENT_TIMESTAMP; """, - parameters={ - "years": unpacked_results["years"], - "cern_read_and_publish": unpacked_results["cern_read_and_publish"], - "cern_individual_apcs": unpacked_results["cern_individual_apcs"], - "scoap3": unpacked_results["scoap3"], - "other": unpacked_results["other"], - "other_collective_models": unpacked_results["other_collective_models"], - }, + parameters=results, executor_config=kubernetes_executor_config, ) + counts >> results >> populate_golden_open_access + OA_gold_open_access_mechanisms = oa_gold_open_access_mechanisms() diff --git a/dags/open_access/open_access.py b/dags/open_access/open_access.py index 9d221de..b2731b4 100644 --- a/dags/open_access/open_access.py +++ b/dags/open_access/open_access.py @@ -1,5 +1,3 @@ -import logging -import os from functools import reduce import open_access.constants as constants @@ -20,22 +18,19 @@ def oa_dag(): @task(executor_config=kubernetes_executor_config) def fetch_data_task(query, **kwargs): year = kwargs["params"].get("year") - cds_token = os.environ.get("CDS_TOKEN") - if not cds_token: - logging.warning("cds token is not set!") base_query = ( r"(affiliation:CERN+or+595:'For+annual+report')" + rf"and+year:{year}+not+980:ConferencePaper+" + r"not+980:BookChapter" ) type_of_query = [*query][0] - url = utils.get_url(query=f"{base_query}+{query[type_of_query]}") - data = request_again_if_failed(url=url, cds_token=cds_token) + url = utils.get_url(query=f"{base_query}") + data = request_again_if_failed(url=url) total = get_total_results_count(data.text) if type_of_query == "gold": - total = utils.get_gold_access_count(total, url) + total = utils.get_golden_access_count(total, url) if type_of_query == "green": - total = total - utils.get_gold_access_count(total, url) + total = utils.get_green_access_count(total, url) return {type_of_query: total} @task(multiple_outputs=True, executor_config=kubernetes_executor_config) diff --git a/dags/open_access/parsers.py b/dags/open_access/parsers.py index 55da0ca..7230322 100644 --- a/dags/open_access/parsers.py +++ b/dags/open_access/parsers.py @@ -1,3 +1,4 @@ +import re import xml.etree.ElementTree as ET from io import StringIO @@ -13,25 +14,123 @@ def parse_without_names_spaces(xml): return root -def get_golden_access_records_ids(data): - xml = parse_without_names_spaces(data) - records = xml.findall(".record") - golden_access = [] +def is_correct_value(value): + match value.text.lower(): + case "accepted manuscript": + return True + case "preprint": + return True + case _: + return False + + +def field_has_cc_by(field_value): + pattern = re.compile(r"CC(\s|-)?BY(\s|-)?.*", flags=re.I) + return bool(pattern.match(field_value)) + + +def is_subset_856_for_green_access(datafields_856): + at_least_one_found = False + for datafield in datafields_856: + subfield = datafield.find("subfield[@code='y']") + try: + is_subfield_y_wanted_value = is_correct_value(subfield) + if not at_least_one_found: + at_least_one_found = is_subfield_y_wanted_value + at_least_one_found = is_subfield_y_wanted_value + except AttributeError: + pass + return at_least_one_found + + +def is_subset_540_preprint_green_access(datafields_540): + at_least_one_found = False + for datafield in datafields_540: + subfield_3 = datafield.find("subfield[@code='3']") + try: + is_subfield_3_wanted_value = subfield_3.text.lower() == "preprint" + if not at_least_one_found: + at_least_one_found = is_subfield_3_wanted_value + except AttributeError: + pass + return at_least_one_found + + +def is_subset_540_publication_golden_access(datafields_540): + at_least_one_found = False + for datafield in datafields_540: + subfield_3 = datafield.find("subfield[@code='3']") + subfield_a = datafield.find("subfield[@code='a']") + try: + is_subfield_wanted_3_value = subfield_3.text.lower() == "publication" + is_subfield_a_wanted_value = field_has_cc_by(subfield_a.text) + if not at_least_one_found: + at_least_one_found = bool( + is_subfield_wanted_3_value and is_subfield_a_wanted_value + ) + except AttributeError: + pass + return at_least_one_found + + +def parse_subset_green_access(records): + filtered_records = [] + for record in records: + datafields_856 = record.findall("datafield[@tag='856'][@ind1='4'][@ind2=' ']") + datafields_540 = record.findall("datafield/[@tag='540']") + if datafields_856 is None: + continue + if datafields_540 is None: + continue + is_it_wanted_record_by_856 = is_subset_856_for_green_access(datafields_856) + is_it_wanted_record_by_540_preprint = is_subset_540_preprint_green_access( + datafields_540 + ) + is_it_wanted_record_by_540_publication = ( + not is_subset_540_publication_golden_access(datafields_540) + ) + + if ( + is_it_wanted_record_by_856 + or is_it_wanted_record_by_540_preprint + or is_it_wanted_record_by_540_publication + ): + filtered_records.append(record) + + return filtered_records + + +def parse_subset_golden_access(records): + filtered_records = [] for record in records: - datafields = record.findall("datafield/[@tag='540']") - if datafields is None: + datafields_540 = record.findall("datafield/[@tag='540']") + if datafields_540 is None: continue - for datafield in datafields: - record_type = datafield.find("subfield/[@code='3']") - license = datafield.find("subfield/[@code='a']") - if record_type is not None and license is not None: - if ( - "CC" in license.text - and "BY" in license.text - and record_type.text == "publication" - ): - record_id = record.find("controlfield/[@tag='001']") - if record_id is not None: - doi = record_id.text - golden_access.append(doi) - return golden_access + is_it_wanted_record_by_540_publication = ( + is_subset_540_publication_golden_access(datafields_540) + ) + + if is_it_wanted_record_by_540_publication: + filtered_records.append(record) + return filtered_records + + +def get_records_ids(data, record_filter): + xml = parse_without_names_spaces(data) + records = xml.findall(".record") + filtered_records = record_filter(records) + green_access = [] + for record in filtered_records: + record_id = record.find("controlfield/[@tag='001']") + if record_id is not None: + doi = record_id.text + green_access.append(doi) + return green_access + + +def get_golden_access_records_ids(data): + return get_records_ids(data, parse_subset_golden_access) + + +def get_green_access_records_ids(data): + return get_records_ids(data, parse_subset_green_access) diff --git a/dags/open_access/utils.py b/dags/open_access/utils.py index e2d101f..1f7bc97 100644 --- a/dags/open_access/utils.py +++ b/dags/open_access/utils.py @@ -2,23 +2,32 @@ import math from common.utils import request_again_if_failed -from open_access.parsers import get_golden_access_records_ids +from open_access.parsers import ( + get_golden_access_records_ids, + get_green_access_records_ids, +) -def get_gold_access_count(total, url): +def get_count(total, url, record_extractor): iterations = math.ceil(total / 100.0) records_ids_count = 0 for i in range(0, iterations): jrec = (i * 100) + 1 full_url = f"{url}&jrec={jrec}" response = request_again_if_failed(full_url) - records_ids_count = records_ids_count + len( - get_golden_access_records_ids(response.text) - ) + records_ids_count = records_ids_count + len(record_extractor(response.text)) logging.info(f"In total was found {records_ids_count} golden access records") return records_ids_count +def get_golden_access_count(total, url): + return get_count(total, url, get_golden_access_records_ids) + + +def get_green_access_count(total, url): + return get_count(total, url, get_green_access_records_ids) + + def get_url(query, current_collection="Published+Articles"): url = ( rf"https://cds.cern.ch/search?ln=en&cc={current_collection}&p={query}" diff --git a/docker-compose.standalone.yaml b/docker-compose.standalone.yaml new file mode 100644 index 0000000..6e923ab --- /dev/null +++ b/docker-compose.standalone.yaml @@ -0,0 +1,22 @@ +services: + postgres: + image: docker.io/library/postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + expose: + - 5432 + ports: + - 5432:5432 + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + +volumes: + postgres-db-volume: diff --git a/docker-compose.yaml b/docker-compose.yaml index 6e923ab..1bdf9f0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,16 +1,95 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. +# +# WARNING: This configuration is for local development. Do not use it in a production deployment. +# +# This configuration supports basic configuration using environment variables or an .env file +# The following variables are supported: +# +# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. +# Default: apache/airflow:2.8.3 +# AIRFLOW_UID - User ID in Airflow containers +# Default: 50000 +# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. +# Default: . +# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode +# +# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). +# Default: airflow +# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). +# Default: airflow +# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Use this option ONLY for quick checks. Installing requirements at container +# startup is done EVERY TIME the service is started. +# A better way is to build a custom image or extend the official image +# as described in https://airflow.apache.org/docs/docker-stack/build.html. +# Default: '' +# +# Feel free to modify this file to suit your needs. +--- +x-airflow-common: &airflow-common + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.3} + build: . + environment: &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: "" + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true" + AIRFLOW__CORE__LOAD_EXAMPLES: "true" + AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session" + AIRFLOW_CONN_CDS: "http://cds.cern.ch" + AIRFLOW_CONN_SUPERSET_QA: "postgresql://bi_local:bi_local@postgres_local:5432/bi_local" + # yamllint disable rule:line-length + # Use simple http server on scheduler for health checks + # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server + # yamllint enable rule:line-length + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true" + # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks + # for other purpose (development, test and especially production usage) build/extend Airflow image. + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + volumes: + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config + - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins + user: "${AIRFLOW_UID:-50000}:0" + depends_on: &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + services: postgres: - image: docker.io/library/postgres:13 + image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data - expose: - - 5432 - ports: - - 5432:5432 healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 10s @@ -18,5 +97,215 @@ services: start_period: 5s restart: always + postgres_local: + image: postgres:13 + environment: + POSTGRES_DB: bi_local + POSTGRES_USER: bi_local + POSTGRES_PASSWORD: bi_local + ports: + - "5433:5432" # Use a different port to avoid conflict + healthcheck: + test: ["CMD", "pg_isready", "-U", "bi_local"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + command: > + bash -c " + set -e + docker-entrypoint.sh postgres & + until pg_isready -U bi_local; do + sleep 1; + done; + psql -v ON_ERROR_STOP=1 --username bi_local --dbname bi_local -c 'GRANT ALL PRIVILEGES ON DATABASE bi_local TO bi_local;' + wait + " + redis: + image: redis:latest + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + # yamllint disable rule:line-length + test: + - "CMD-SHELL" + - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: + [ + "CMD-SHELL", + 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"', + ] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + mkdir -p /sources/logs /sources/dags /sources/plugins + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} + exec /entrypoint airflow version + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_MIGRATE: "true" + _AIRFLOW_WWW_USER_CREATE: "true" + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: "" + user: "0:0" + volumes: + - ${AIRFLOW_PROJ_DIR:-.}:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + + # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up + # or by explicitly targeted on the command line e.g. docker-compose up flower. + # See: https://docs.docker.com/compose/profiles/ + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - "5555:5555" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + volumes: postgres-db-volume: + postgres_data: diff --git a/requirements-test.txt b/requirements-test.txt index c0cf4b9..96bd19a 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,5 @@ -pre-commit==3.6.2 -pytest==7.4.4 -coverage==7.4.3 -pytest-cov==4.1.0 +pre-commit +pytest +coverage +pytest-cov pytest-datadir==1.5.0 diff --git a/requirements.txt b/requirements.txt index 4e15ccb..f1a1f06 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.3/constraints-3.10.txt apache-airflow[celery, postgres, redis, cncf.kubernetes]==2.8.3 -alembic==1.13.1 +alembic airflow-provider-alembic==1.0.0 +apache-airflow-providers-http==4.10.0 elementpath==4.4.0 diff --git a/tests/open_access/test_parser.py b/tests/open_access/test_parser.py index 599a76d..f2dc7c7 100644 --- a/tests/open_access/test_parser.py +++ b/tests/open_access/test_parser.py @@ -1,6 +1,13 @@ -from open_access.parsers import get_golden_access_records_ids +from open_access.parsers import ( + get_golden_access_records_ids, + get_green_access_records_ids, + is_subset_540_preprint_green_access, + is_subset_540_publication_golden_access, + is_subset_856_for_green_access, + parse_without_names_spaces, +) -expected = [ +expected_golden = [ "2894668", "2891488", "2888511", @@ -18,8 +25,86 @@ "2882298", ] +expected_green = [ + "2894668", + "2891489", + "2891488", + "2891487", + "2888511", + "2888151", + "2886038", + "2884472", + "2884471", + "2884470", + "2884469", + "2883672", + "2882429", + "2882335", + "2882328", + "2882327", + "2882324", + "2882322", + "2882311", + "2882298", +] + def test_get_golden_access_records_dois(shared_datadir): with open(shared_datadir / "search.xml") as file: records_ids = get_golden_access_records_ids(file.read()) - assert records_ids == expected + assert records_ids == expected_golden + + +def test_parse_subset_856(shared_datadir): + with open(shared_datadir / "search.xml") as file: + filtered_records_count = 0 + parsed_records = parse_without_names_spaces(file.read()) + records = parsed_records.findall(".record") + for record in records: + datafields_856 = record.findall( + "datafield[@tag='856'][@ind1='4'][@ind2=' ']" + ) + is_it_wanted_record_by_856 = is_subset_856_for_green_access(datafields_856) + if is_it_wanted_record_by_856: + filtered_records_count = filtered_records_count + 1 + assert filtered_records_count == 0 + + +def test_parse_subset_540_preprint(shared_datadir): + with open(shared_datadir / "search.xml") as file: + filtered_records_count = 0 + parsed_records = parse_without_names_spaces(file.read()) + records = parsed_records.findall(".record") + for record in records: + datafields_540 = record.findall( + "datafield[@tag='540'][@ind1=' '][@ind2=' ']" + ) + is_it_wanted_record_by_540 = is_subset_540_preprint_green_access( + datafields_540 + ) + if is_it_wanted_record_by_540: + filtered_records_count = filtered_records_count + 1 + assert filtered_records_count == 20 + + +def test_parse_subset_540_publications(shared_datadir): + with open(shared_datadir / "search.xml") as file: + filtered_records_count = 0 + parsed_records = parse_without_names_spaces(file.read()) + records = parsed_records.findall(".record") + for record in records: + datafields_540 = record.findall( + "datafield[@tag='540'][@ind1=' '][@ind2=' ']" + ) + is_it_wanted_record_by_540 = is_subset_540_publication_golden_access( + datafields_540 + ) + if is_it_wanted_record_by_540: + filtered_records_count = filtered_records_count + 1 + assert filtered_records_count == 15 + + +def test_get_green_access_records_dois(shared_datadir): + with open(shared_datadir / "search.xml") as file: + records_ids = get_green_access_records_ids(file.read()) + assert records_ids == expected_green