Skip to content

Commit

Permalink
🎉 anomalist: CLI for creating anomalies (#3381)
Browse files Browse the repository at this point in the history
* 🎉 Add CLI for running anomaly detectors

* merge with wizard-anomalist, pass ci/cd

---------

Co-authored-by: lucasrodes <[email protected]>
  • Loading branch information
Marigold and lucasrodes authored Oct 10, 2024
1 parent b3338d8 commit 9a4447b
Show file tree
Hide file tree
Showing 7 changed files with 481 additions and 1 deletion.
Empty file added apps/anomalist/__init__.py
Empty file.
24 changes: 24 additions & 0 deletions apps/anomalist/bard_anomaly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pandas as pd

from etl import grapher_model as gm

from .gp_anomaly import AnomalyDetector


class NaNAnomalyDetector(AnomalyDetector):
anomaly_type = "nan"

def get_score_df(self, df: pd.DataFrame, variables: list[gm.Variable]) -> pd.DataFrame:
return df.isnull().astype(float)


class LostAnomalyDetector(AnomalyDetector):
anomaly_type = "lost"


class VersionChangeAnomalyDetector(AnomalyDetector):
anomaly_type = "version_change"


class TimeChangeAnomalyDetector(AnomalyDetector):
anomaly_type = "time_change"
220 changes: 220 additions & 0 deletions apps/anomalist/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
from typing import Literal, Optional, get_args

import click
import pandas as pd
import structlog
from joblib import Memory
from rich_click.rich_command import RichCommand
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session

from etl import grapher_model as gm
from etl.db import get_engine, read_sql
from etl.grapher_io import variable_data_df_from_s3
from etl.paths import CACHE_DIR

from .bard_anomaly import NaNAnomalyDetector
from .gp_anomaly import GPAnomalyDetector, SampleAnomalyDetector

log = structlog.get_logger()

memory = Memory(CACHE_DIR, verbose=0)

ANOMALY_TYPE = Literal["sample", "gp", "nan"]


@click.command(name="anomalist", cls=RichCommand, help=__doc__)
@click.option(
"--type",
type=click.Choice(get_args(ANOMALY_TYPE)),
multiple=True,
help="Type of anomaly detection algorithm to use.",
)
@click.option(
"--dataset-id",
type=int,
help="Generate anomalies for a specific dataset ID.",
)
@click.option(
"--previous-dataset-id",
type=int,
help="Dataset ID of the previous version.",
)
@click.option(
"--variable-id",
type=int,
multiple=True,
help="Generate anomalies for a list of variable IDs.",
)
@click.option(
"--dry-run/--no-dry-run",
default=False,
type=bool,
help="Do not write to target database.",
)
@click.option(
"--reset-db/--no-reset-db",
default=False,
type=bool,
help="Drop anomalies table and recreate it. This is useful for development when the schema changes.",
)
def cli(
type: Optional[ANOMALY_TYPE],
dataset_id: Optional[int],
previous_dataset_id: Optional[int],
variable_id: Optional[list[int]],
dry_run: bool,
reset_db: bool,
) -> None:
"""TBD
TBD
**Example 1:** Create random anomaly for a dataset
```
$ etl anomalist --type sample --dataset-id 6369
```
**Example 2:** Create GP anomalies
```
$ etl anomalist --type gp --dataset-id 6369
```
**Example 3:** Create anomalies by comparing dataset to its previous version
```
$ etl anomalist --type gp --previous-dataset-id 6322 --dataset-id 6589
```
"""
engine = get_engine()

if reset_db:
# Drop the 'anomalies' table if it exists
gm.Anomaly.__table__.drop(engine, checkfirst=True) # type: ignore

# Create the 'anomalies' table
gm.Anomaly.__table__.create(engine) # type: ignore
return

assert type, "Anomaly type must be specified."

# load metadata
variables = _load_variables_meta(engine, dataset_id, variable_id)

# set dataset_id if we're using variables
if not dataset_id:
assert set(v.datasetId for v in variables) == {variables[0].datasetId}
dataset_id = variables[0].datasetId

log.info("Detecting anomalies")
anomalies = []

for typ in type:
if typ == "gp":
detector = GPAnomalyDetector()
elif typ == "sample":
detector = SampleAnomalyDetector()
elif typ == "nan":
detector = NaNAnomalyDetector()
else:
raise ValueError(f"Unsupported anomaly type: {typ}")

# dataframe with (entityName, year) as index and variableId as columns
log.info("Loading data from S3")
df = load_data_for_variables(engine, variables)

# detect anomalies
log.info("Detecting anomalies")
# the output has the same shape as the input dataframe, but we should make
# it possible to return anomalies in a long format (for detectors that return
# just a few anomalies)
df_score = detector.get_score_df(df, variables)

# validate format of the output dataframe
# TODO

anomaly = gm.Anomaly(
datasetId=dataset_id,
anomalyType=detector.anomaly_type,
)
anomaly.dfScore = df_score

anomalies.append(anomaly)

if dry_run:
for anomaly in anomalies:
log.info(anomaly)
else:
with Session(engine) as session:
log.info("Deleting existing anomalies")
session.query(gm.Anomaly).filter(
gm.Anomaly.datasetId == dataset_id,
gm.Anomaly.anomalyType.in_([a.anomalyType for a in anomalies]),
).delete(synchronize_session=False)
session.commit()

# Insert new anomalies
log.info("Writing anomalies to database")
session.add_all(anomalies)
session.commit()


# @memory.cache
def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd.DataFrame:
# TODO: cache this on disk & re-validate with etags
df_long = variable_data_df_from_s3(engine, [v.id for v in variables])

# pivot dataframe
df = df_long.pivot(index=["entityName", "year"], columns="variableId", values="value")

# reorder in the same order as variables
df = df[[v.id for v in variables]]

# try converting to numeric
df = df.astype(float)

# TODO:
# remove countries with all nulls or all zeros or constant values
# df = df.loc[:, df.fillna(0).std(axis=0) != 0]

return df


@memory.cache
def _load_variables_meta(
engine: Engine, dataset_id: Optional[int], variable_ids: Optional[list[int]]
) -> list[gm.Variable]:
if dataset_id and variable_ids:
raise ValueError("Cannot specify both dataset ID and variable IDs.")

if variable_ids:
q = """
select id from variables
where id in %(variable_ids)s
"""
elif dataset_id:
q = """
select id from variables
where datasetId = %(dataset_id)s
"""
# load all variables from a random dataset
else:
q = """
with t as (
select id from datasets order by rand() limit 1
)
select id from variables
where datasetId in (select id from t)
"""

df = read_sql(q, engine, params={"variable_ids": variable_ids, "dataset_id": dataset_id})

# select all variables using SQLAlchemy
with Session(engine) as session:
return gm.Variable.load_variables(session, list(df["id"]))


if __name__ == "__main__":
cli()
49 changes: 49 additions & 0 deletions apps/anomalist/explore.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# from myml.nbinit import *\n",
"import pandas as pd\n",
"from apps.anomalist.gp_anomaly import GPAnomalyDetector\n",
"from apps.anomalist.cli import load_data_for_variable\n",
"from etl.db import get_engine\n",
"from etl import grapher_model as gm\n",
"from sqlalchemy.orm import Session\n",
"\n",
"engine = get_engine()\n",
"\n",
"# get random dataset and random variable\n",
"q = \"\"\"\n",
"with t as (\n",
" select id from datasets order by rand() limit 1\n",
")\n",
"select id from variables\n",
"where datasetId in (select id from t)\n",
"order by rand()\n",
"limit 1\n",
"\"\"\"\n",
"\n",
"mf = pd.read_sql(q, engine)\n",
"variable_id = mf.id[0]\n",
"\n",
"with Session(engine) as session:\n",
" variable = gm.Variable.load_variable(session, variable_id)\n",
"df = load_data_for_variable(engine, variable)\n",
"\n",
"gp = GPAnomalyDetector()\n",
"gp.viz(df, variable)"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit 9a4447b

Please sign in to comment.