diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7d60f1a2..7bf4e0e2 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -21,6 +21,7 @@ variables: SOMENERGIA_REGISTRY: $SOM_HARBOR_DADES_URL DBT_PROJECT_DIR_NAME: dbt_jardiner SOM_PROJECT_NAME: somenergia-jardiner + DBT_SKIP_STATE: "false" IMAGE_NAME_PREFIX: $SOM_HARBOR_DADES_URL/$CI_PROJECT_NAME IMAGE_NAME_APP: $IMAGE_NAME_PREFIX-app IMAGE_NAME_APP_PRE_RELEASE: $IMAGE_NAME_APP:pre-release @@ -135,7 +136,8 @@ dbt-build: DBUSER: $SOM_JARDINER_DB_USER DBPASSWORD: $SOM_JARDINER_DB_PASSWORD DBNAME: $SOM_JARDINER_DB_DBNAME - DBT_MODELS_SELECTOR: "--models state:modified+" + DBT_MODELS_SELECTOR: "--models state:modified+ --defer" + DBT_STATE: "--state state/prod" DBT_MANIFEST_ARTIFACT_URL: "https://$CI_SERVER_HOST/api/v4/projects/$CI_PROJECT_ID/jobs/artifacts/${CI_DEFAULT_BRANCH}/download?job=pages&job_token=$CI_JOB_TOKEN" image: ${SOM_HARBOR_DADES_URL}/${SOM_PROJECT_NAME}-dbt-docs:latest script: @@ -146,7 +148,7 @@ dbt-build: - curl --location --output /tmp/artifacts.zip ${DBT_MANIFEST_ARTIFACT_URL} - unzip -o /tmp/artifacts.zip -d /tmp/artifacts - cp /tmp/artifacts/public/dbt_docs/manifest.json ${CI_PROJECT_DIR}/${DBT_PROJECT_DIR_NAME}/state/prod/manifest.json - - dbt build --target ${DBT_TARGET_NAME} --store-failures --threads 4 ${DBT_MODELS_SELECTOR} --state state/prod + - dbt build --target ${DBT_TARGET_NAME} --store-failures --threads 4 ${DBT_MODELS_SELECTOR} ${DBT_STATE} tags: - somenergia-et rules: @@ -154,23 +156,19 @@ dbt-build: when: always variables: DBT_TARGET_NAME: prod - DBT_MODELS_SELECTOR: "--models state:modified+" changes: paths: *dbt-build-changes-paths allow_failure: false - if: $CI_PIPELINE_SOURCE == "merge_request_event" when: always variables: - DBT_MODELS_SELECTOR: "--models state:modified+ tag:dset_responses_fresh" + DBT_MODELS_SELECTOR: "--select tag:dset_responses_fresh state:modified+ +state:modified+,config.materialized:incremental +state:modified+,config.materialized:table --defer" DBT_TARGET_NAME: pre DBT_FAIL_FAST: "True" changes: compare_to: "refs/heads/main" paths: *dbt-build-changes-paths allow_failure: false - artifacts: - paths: - - ${CI_PROJECT_DIR}/${DBT_PROJECT_DIR_NAME}/target release-app-image: stage: release diff --git a/dags/dag_jardiner_dbt_snapshot.py b/dags/dag_jardiner_dbt_snapshot.py index 3f86ac11..339690cd 100644 --- a/dags/dag_jardiner_dbt_snapshot.py +++ b/dags/dag_jardiner_dbt_snapshot.py @@ -71,7 +71,7 @@ def dbapi_to_dict(dbapi: str) -> dict: start_date=datetime(2024, 1, 28), schedule_interval="30 22 * * *", catchup=False, - tags=["jardiner", "dbt", "snapshot"], + tags=["project:jardiner", "dbt", "dbt-snapshot"], max_active_runs=1, default_args=args, doc_md=__doc__, diff --git a/dags/dag_simel_mhcil_nas_mirror_from_minio.py b/dags/dag_simel_mhcil_nas_mirror_from_minio.py index 2eb09c54..60bd2fc2 100644 --- a/dags/dag_simel_mhcil_nas_mirror_from_minio.py +++ b/dags/dag_simel_mhcil_nas_mirror_from_minio.py @@ -49,13 +49,13 @@ def get_random_moll(): schedule_interval="@daily", catchup=False, tags=[ - "curve:MHCIL", - "scope:Jardiner", - "scope:Mercat", - "ingesta:historic", - "ingesta:minio", - "source:SIMEL", - "source:NAS", + "ingesta", + "project:jardiner", + "project:mercat", + "source:minio", + "source:mhcil", + "source:simel", + "source:nas", ], default_args=args, ) as dag: diff --git a/dags/dbt_test_quality_dset.py b/dags/dbt_test_quality_dset.py index 28ed40cf..aaad2fad 100644 --- a/dags/dbt_test_quality_dset.py +++ b/dags/dbt_test_quality_dset.py @@ -69,7 +69,13 @@ def dbapi_to_dict(dbapi: str): start_date=datetime(2023, 12, 1, 0, 0, 0), schedule_interval="0 3 * * *", catchup=False, - tags=["Plantmonitor", "Jardiner", "test", "dbt", "data_quality"], + tags=[ + "project:plantmonitor", + "project:jardiner", + "dbt-test", + "dbt", + "data-quality", + ], default_args=args, doc_md=__doc__, ) as dag: diff --git a/dags/dset_materialize_dag.py b/dags/dset_materialize_dag.py index 06de6e80..816584d5 100644 --- a/dags/dset_materialize_dag.py +++ b/dags/dset_materialize_dag.py @@ -67,7 +67,13 @@ def dbapi_to_dict(dbapi: str): start_date=datetime(2023, 1, 10), schedule_interval="3/10 * * * *", catchup=False, - tags=["scope:Plantmonitor", "scope:Jardiner", "dbt", "source:DSET"], + tags=[ + "project:plantmonitor", + "project:jardiner", + "dbt", + "dbt-run", + "source:dset-api", + ], max_active_runs=1, default_args=args, doc_md=__doc__, diff --git a/dags/generate_gitlab_ci_pages.py b/dags/generate_gitlab_ci_pages.py index a4fabd88..e7353dca 100644 --- a/dags/generate_gitlab_ci_pages.py +++ b/dags/generate_gitlab_ci_pages.py @@ -29,7 +29,12 @@ start_date=dt.datetime(2023, 9, 27), catchup=False, dagrun_timeout=dt.timedelta(minutes=10), - tags=["jardiner", "gitlab-ci", "pages"], + tags=[ + "project:jardiner", + "gitlab-ci", + "gitlab-pages", + "experimental", + ], max_active_runs=1, ) as dag: TOKEN = Variable.get("GITLAB_CI_TOKEN_JARDINER_PAGES") diff --git a/dags/jardineria_dag.py b/dags/jardineria_dag.py deleted file mode 100644 index dc87807f..00000000 --- a/dags/jardineria_dag.py +++ /dev/null @@ -1,81 +0,0 @@ -import random -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.models import Variable -from airflow.providers.docker.operators.docker import DockerOperator -from docker.types import DriverConfig, Mount - -my_email = Variable.get("fail_email") -addr = Variable.get("repo_server_url") - -smtp = dict( - host=Variable.get("notifier_smtp_url"), - port=Variable.get("notifier_smtp_port"), - user=Variable.get("notifier_smtp_user"), - password=Variable.get("notifier_smtp_password"), -) - -args = { - "email": my_email, - "email_on_failure": True, - "email_on_retry": False, - "retries": 5, - "retry_delay": timedelta(minutes=5), -} - - -nfs_config = { - "type": "nfs", - "o": f"addr={addr},nfsvers=4", - "device": ":/opt/airflow/repos", -} - - -def get_random_moll(): - available_molls = Variable.get("available_molls").split() - # trunk-ignore(bandit/B311) - return random.choice(available_molls) - - -driver_config = DriverConfig(name="local", options=nfs_config) -mount_nfs = Mount( - source="local", target="/repos", type="volume", driver_config=driver_config -) - - -with DAG( - dag_id="jardiner_dag_v2", - start_date=datetime(2022, 10, 21), - schedule_interval="@daily", - catchup=False, - tags=["Plantmonitor", "Jardiner"], - max_active_runs=1, - default_args=args, -) as dag: - repo_name = "somenergia-jardiner" - - sampled_moll = get_random_moll() - - notify_alarms_task = DockerOperator( - api_version="auto", - task_id="jardineria", - docker_conn_id="somenergia_harbor_dades_registry", - image="{}/{}-app:latest".format( - "{{ conn.somenergia_harbor_dades_registry.host }}", repo_name - ), - working_dir=f"/repos/{repo_name}", - command=( - "python3 -m scripts.notify_alarms" - ' "{{ var.value.plantmonitor_db }}"' - ' "{{ var.value.novu_base_url }}"' - ' "{{ var.value.novu_api_key }}"' - ' "{{ var.value.plantmonitor_db_prod_schema }}"' - ), - docker_url=sampled_moll, - mounts=[mount_nfs], - mount_tmp_dir=False, - auto_remove=True, - retrieve_output=True, - trigger_rule="none_failed", - ) diff --git a/dags/plant_alerts.py b/dags/plant_alerts.py deleted file mode 100644 index 071051b0..00000000 --- a/dags/plant_alerts.py +++ /dev/null @@ -1,179 +0,0 @@ -import random -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.models import Variable -from airflow.providers.docker.operators.docker import DockerOperator -from docker.types import DriverConfig, Mount - -my_email = Variable.get("fail_email") -addr = Variable.get("repo_server_url") - -smtp = dict( - host=Variable.get("notifier_smtp_url"), - port=Variable.get("notifier_smtp_port"), - user=Variable.get("notifier_smtp_user"), - password=Variable.get("notifier_smtp_password"), -) - -args = { - "email": my_email, - "email_on_failure": True, - "email_on_retry": False, - "retries": 0, - "retry_delay": timedelta(minutes=5), -} - - -nfs_config = { - "type": "nfs", - "o": f"addr={addr},nfsvers=4", - "device": ":/opt/airflow/repos", -} - - -def get_random_moll(): - available_molls = Variable.get("available_molls").split() - # trunk-ignore(bandit/B311) - return random.choice(available_molls) - - -driver_config = DriverConfig(name="local", options=nfs_config) -mount_nfs = Mount( - source="local", target="/repos", type="volume", driver_config=driver_config -) - - -with DAG( - dag_id="plant_alerts", - start_date=datetime(2022, 11, 17), - schedule_interval="3-59/5 * * * *", - catchup=False, - tags=["Plantmonitor", "Jardiner"], - max_active_runs=1, - default_args=args, -) as dag: - repo_name = "somenergia-jardiner" - - sampled_moll = get_random_moll() - - alert_meter_no_readings_task = DockerOperator( - api_version="auto", - task_id="alert_meter_no_readings", - docker_conn_id="somenergia_harbor_dades_registry", - image="{}/{}-app:latest".format( - "{{ conn.somenergia_harbor_dades_registry.host }}", repo_name - ), - working_dir=f"/repos/{repo_name}", - command=( - "python3 -m scripts.notify_alert" - " {{ var.value.plantmonitor_db }}" - " {{ var.value.novu_base_url }}" - " {{ var.value.novu_api_key }}" - " {{ var.value.plantmonitor_db_prod_schema }}" - " alert_meter_no_readings True" - ), - docker_url=sampled_moll, - mounts=[mount_nfs], - mount_tmp_dir=False, - auto_remove=True, - retrieve_output=True, - trigger_rule="none_failed", - ) - - alert_inverter_zero_power_task = DockerOperator( - api_version="auto", - task_id="alert_inverter_zero_power", - docker_conn_id="somenergia_harbor_dades_registry", - image="{}/{}-app:latest".format( - "{{ conn.somenergia_harbor_dades_registry.host }}", repo_name - ), - working_dir=f"/repos/{repo_name}", - command=( - "python3 -m scripts.notify_alert " - " {{ var.value.plantmonitor_db }}" - " {{ var.value.novu_base_url }}" - " {{ var.value.novu_api_key }}" - " {{ var.value.plantmonitor_db_prod_schema }}" - " alert_inverter_zero_power_at_daylight True" - ), - docker_url=sampled_moll, - mounts=[mount_nfs], - mount_tmp_dir=False, - auto_remove=True, - retrieve_output=True, - trigger_rule="none_failed", - ) - - alert_meter_zero_energy_task = DockerOperator( - api_version="auto", - task_id="alert_meter_zero_energy", - docker_conn_id="somenergia_harbor_dades_registry", - image="{}/{}-app:latest".format( - "{{ conn.somenergia_harbor_dades_registry.host }}", repo_name - ), - working_dir=f"/repos/{repo_name}", - command=( - "python3 -m scripts.notify_alert" - " {{ var.value.plantmonitor_db }}" - " {{ var.value.novu_base_url }}" - " {{ var.value.novu_api_key }}" - " {{ var.value.plantmonitor_db_prod_schema }}" - " alert_meter_zero_energy True" - ), - docker_url=sampled_moll, - mounts=[mount_nfs], - mount_tmp_dir=False, - auto_remove=True, - retrieve_output=True, - trigger_rule="none_failed", - ) - - alert_inverter_temperature_task = DockerOperator( - api_version="auto", - task_id="alert_inverter_temperature", - docker_conn_id="somenergia_harbor_dades_registry", - image="{}/{}-app:latest".format( - "{{ conn.somenergia_harbor_dades_registry.host }}", repo_name - ), - working_dir=f"/repos/{repo_name}", - command=( - "python3 -m scripts.notify_alert" - " {{ var.value.plantmonitor_db }}" - " {{ var.value.novu_base_url }}" - " {{ var.value.novu_api_key }}" - " {{ var.value.plantmonitor_db_prod_schema }}" - " alert_inverter_temperature False" - ), - docker_url=sampled_moll, - mounts=[mount_nfs], - mount_tmp_dir=False, - auto_remove=True, - retrieve_output=True, - trigger_rule="none_failed", - ) - - alert_inverter_interinverter_relative_temperature_task = DockerOperator( - api_version="auto", - task_id="alert_inverter_interinverter_relative_temperature", - docker_conn_id="somenergia_harbor_dades_registry", - image="{}/{}-app:latest".format( - "{{ conn.somenergia_harbor_dades_registry.host }}", - repo_name, - ), - working_dir=f"/repos/{repo_name}", - command=( - "python3 -m scripts.notify_alert " - " {{ var.value.plantmonitor_db }}" - " {{ var.value.novu_base_url }}" - " {{ var.value.novu_api_key }}" - " {{ var.value.plantmonitor_db_prod_schema }}" - " alert_inverter_interinverter_relative_temperature False" - ), - docker_url=sampled_moll, - mounts=[mount_nfs], - mount_tmp_dir=False, - auto_remove=True, - retrieve_output=True, - trigger_rule="none_failed", - ) diff --git a/dags/plant_production_datasets_dag.py b/dags/plant_production_datasets_dag.py index cd583225..dcb0e018 100644 --- a/dags/plant_production_datasets_dag.py +++ b/dags/plant_production_datasets_dag.py @@ -58,7 +58,7 @@ def dbapi_to_dict(dbapi: str): start_date=datetime(2023, 1, 10), schedule_interval="@daily", catchup=False, - tags=["Plantmonitor", "Jardiner", "Transform", "DBT"], + tags=["project:plantmonitor", "project:jardiner", "dbt", "dbt-run"], default_args=args, ) as dag: repo_name = "somenergia-jardiner" @@ -91,7 +91,7 @@ def dbapi_to_dict(dbapi: str): "dbt run" " --profiles-dir config" " --target prod" - " --select tag:legacy,plant_production_hourly+" + " --select tag:legacy,plant_production_hourly+ elementary" ), docker_url=sampled_moll, mounts=[mount_nfs], diff --git a/dags/plant_production_datasets_jardiner_dag.py b/dags/plant_production_datasets_jardiner_dag.py index 8cc061f7..c301d6b2 100644 --- a/dags/plant_production_datasets_jardiner_dag.py +++ b/dags/plant_production_datasets_jardiner_dag.py @@ -44,9 +44,11 @@ def dbapi_to_dict(dbapi: str): return { "provider": parsed_string.scheme, "user": parsed_string.username, - "password": urllib.parse.unquote(parsed_string.password) - if parsed_string.password - else None, + "password": ( + urllib.parse.unquote(parsed_string.password) + if parsed_string.password + else None + ), "host": parsed_string.hostname, "port": parsed_string.port, "database": parsed_string.path[1:], @@ -58,7 +60,7 @@ def dbapi_to_dict(dbapi: str): start_date=datetime(2023, 1, 10), schedule_interval="30 * * * *", catchup=False, - tags=["Plantmonitor", "Jardiner", "Transform", "dbt"], + tags=["project:plantmonitor", "project:jardiner", "dbt", "dbt-run"], max_active_runs=1, default_args=args, ) as dag: @@ -94,6 +96,7 @@ def dbapi_to_dict(dbapi: str): " --target prod" " --models tag:jardiner,config.materialized:table+" " tag:jardiner,config.materialized:incremental+" + " elementary" " --exclude tag:dset_responses_fresh" ), docker_url=sampled_moll, diff --git a/dbt_jardiner/config/profiles.yml b/dbt_jardiner/config/profiles.yml index 9ae6567e..ebed5652 100644 --- a/dbt_jardiner/config/profiles.yml +++ b/dbt_jardiner/config/profiles.yml @@ -21,7 +21,17 @@ dbt_jardiner: dbname: "{{ env_var('DBNAME') }}" schema: dbt_pre - target: pre + test: + type: postgres + threads: 1 + host: "{{ env_var('DBHOST') }}" + port: "{{ env_var('DBPORT') | as_number }}" + user: "{{ env_var('DBUSER') }}" + password: "{{ env_var('DBPASSWORD') }}" + dbname: "{{ env_var('DBNAME') }}" + schema: dbt_test + + target: test elementary: outputs: @@ -47,6 +57,17 @@ elementary: threads: 4 keepalives_idle: 0 # default 0 seconds connect_timeout: 10 # default 10 seconds + test: + type: "postgres" + host: "{{ env_var('DBHOST') }}" + port: "{{ env_var('DBPORT') | as_number }}" + user: "{{ env_var('DBUSER') }}" + password: "{{ env_var('DBPASSWORD') }}" + dbname: "{{ env_var('DBNAME') }}" + schema: "dbt_test_elementary" + threads: 4 + keepalives_idle: 0 # default 0 seconds + connect_timeout: 10 # default 10 seconds config: send_anonymous_usage_stats: false diff --git a/scripts/.gitkeep b/scripts/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/scripts/notify_alert.py b/scripts/notify_alert.py deleted file mode 100644 index f94831ed..00000000 --- a/scripts/notify_alert.py +++ /dev/null @@ -1,230 +0,0 @@ -import json -import logging -from typing import List - -import numpy as np -import pandas as pd -import requests -import sqlalchemy -import typer - -logging.basicConfig( - level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" -) - -app = typer.Typer() - - -def notify_subscriber(url, api_key, payload, email, alert): - alert = alert.replace("_", "") - headers = {"Authorization": f"ApiKey {api_key}"} - data = { - "name": alert, - "to": {"subscriberId": "recipient_lucia", "email": email}, - "payload": {"alerts": payload}, - } - - response = requests.post(url, headers=headers, json=data) - logging.info(response.text) - response.raise_for_status() - - return response - - -def notify_topic(url, api_key, payload, topic_ids, alert): - alert = alert.replace("_", "") - headers = {"Authorization": f"ApiKey {api_key}"} - data = { - "name": alert, - "to": [{"type": "Topic", "topicKey": topic_id} for topic_id in topic_ids], - "payload": {"alerts": payload}, - } - - topic_url = f"{url}/events/trigger" - - response = requests.post(topic_url, headers=headers, json=data) - logging.info(response.text) - response.raise_for_status() - - return response - - -def refresh_notification_table(con, schema, alertdf, alert_name): - alertdf_new = alertdf - - table_name = alert_name + "_status" - - table_exists = True - try: - alert_status_df_old = pd.read_sql_table( - con=con, table_name=table_name, schema=schema - ) - except ValueError: - table_exists = False - - if table_exists: - alertdf_new_clean = alertdf_new.drop(columns=["time"]) - alert_status_df_old_clean = alert_status_df_old.copy() - - alertdf_new_clean_merged = alertdf_new_clean.merge( - alert_status_df_old_clean, - how="left", - on=["plant_id", "plant_name", "device_type", "device_name", "alarm_name"], - ) - alertdf_new_clean["is_alarmed"] = np.where( - alertdf_new_clean_merged["is_alarmed_x"].isnull(), - alertdf_new_clean_merged["is_alarmed_y"], - alertdf_new_clean_merged["is_alarmed_x"], - ) - # If still NULL then False (rare case without readings 30 days before): - alertdf_new_clean["is_alarmed"] = np.where( - alertdf_new_clean["is_alarmed"].isnull(), - False, - alertdf_new_clean["is_alarmed"], - ) - - # TODO: to solve the futurewarning of bool reduction we might have to - # cast is_alarmed column to bool - # (since it can't have nulls at this point) - - alert_status_df_old_clean["xgroupby"] = "old" - alertdf_new_clean["xgroupby"] = "new" - df = pd.concat([alert_status_df_old_clean, alertdf_new_clean]).reset_index( - drop=True - ) - groupby_df = df.groupby( - ["plant_id", "plant_name", "device_type", "device_name", "is_alarmed"] - ) - difference_rows = [x[0] for x in groupby_df.groups.values() if len(x) == 1] - df = df.reindex(difference_rows) - alertdf_diff = df[df["xgroupby"] == "new"].drop("xgroupby", axis=1) - alertdf_new_clean.drop("xgroupby", axis=1, inplace=True) - - else: - alertdf_new["is_alarmed"] = False - alertdf_new_clean = alertdf_new.copy() - alertdf_new_clean.drop(columns=["time"], inplace=True) - alertdf_diff = alertdf_new.copy() - alertdf_diff.drop(columns=["time"], inplace=True) - - alertdf_diff["notification_time"] = pd.Timestamp.utcnow() - - alertdf_diff.to_sql( - con=con, - name=alert_name + "_historic", - if_exists="append", - schema=schema, - index=False, - ) - alertdf_new_clean.to_sql( - con=con, name=table_name, if_exists="replace", schema=schema, index=False - ) - - return alertdf_new, alertdf_diff - - -def df_notify_topic_if_diff(alertdf_diff, novu_base_url, api_key, topic_ids, alert): - alertjson = alertdf_diff.to_json(orient="table") - alertdata = json.loads(alertjson)["data"] - - if len(alertdf_diff) > 0: - notify_topic(novu_base_url, api_key, alertdata, topic_ids, alert) - logging.info(f"Alert {alert} notifed.") - return topic_ids - else: - logging.info(f"No alert {alert} to notify.") - return [] - - -def evaluate_and_notify_alarm( - conn, - novu_base_url: str, - api_key: str, - schema: str, - alert: str, - to_notify: str, - default_topic_ids: List[str] = None, -): - topic_ids = default_topic_ids or ["dades", "gestio_dactius"] - - alertdf = pd.read_sql_table(alert, conn, schema=schema) - alertdf = alertdf.filter( - items=[ - "time", - "plant_id", - "plant_name", - "device_type", - "device_name", - "alarm_name", - "is_alarmed", - ] - ) - if len(alertdf) == 0: - logging.info(f"No alert {alert} returned..") - return [] - - alertdf, alertdf_diff = refresh_notification_table( - con=conn, schema=schema, alertdf=alertdf, alert_name=alert - ) - alertdf_diff["color"] = np.where( - alertdf_diff["is_alarmed"].astype(bool), - "#FD0D0D", - "#2F9905", - ) - - if not to_notify: - logging.info(f"Alert {alert} would notify but to_notify is False.") - return [] - - notified = df_notify_topic_if_diff( - alertdf_diff, novu_base_url, api_key, topic_ids, alert - ) - - notification_topics_df = pd.read_sql_table( - "plant_topic_association", conn, schema=schema - ) - - plants_by_topic = ( - notification_topics_df.groupby("notification_topic")["plant_id"] - .apply(list) - .to_dict() - ) - - for notification_topic, plants in plants_by_topic.items(): - sub_alertdf_diff = alertdf_diff[alertdf_diff["plant_id"].isin(plants)] - - sub_notified = df_notify_topic_if_diff( - sub_alertdf_diff, novu_base_url, api_key, [notification_topic], alert - ) - - notified = notified + sub_notified - - return notified - - -@app.command() -def refresh_alert( - plantmonitor_db: str, - novu_base_url: str, - api_key: str, - schema: str, - alert: str, - to_notify: bool, -): - logging.info(f"Got {novu_base_url} and {api_key}") - dbapi = ( - plantmonitor_db # pending implement custom function jardiner.utils get_dbapi - ) - db_engine = sqlalchemy.create_engine(dbapi) - with db_engine.begin() as conn: - notified = evaluate_and_notify_alarm( - conn, novu_base_url, api_key, schema, alert, to_notify - ) - - logging.info(f"Notified topics: {notified}") - - return True - - -if __name__ == "__main__": - app()