Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 wizard: anomalist (v2) #3388

Merged
merged 36 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1835b2a
🎉 wizard: anomalist (v2)
lucasrodes Oct 9, 2024
b382d6c
show anomalist
lucasrodes Oct 9, 2024
b3338d8
Merge branch 'master' into wizard-anomalist
lucasrodes Oct 10, 2024
9a4447b
:tada: anomalist: CLI for creating anomalies (#3381)
Marigold Oct 10, 2024
d09cdbb
✨ indicator-upgrader: store variable mapping (#3389)
lucasrodes Oct 10, 2024
925936e
✨ wizard: anomalist ui (#3394)
lucasrodes Oct 10, 2024
4e6aaf5
use large blob for dfScore
Marigold Oct 11, 2024
6333bf4
🎉 anomalist: Detect new datasets automatically (#3396)
pabloarosado Oct 11, 2024
c224fb7
Merge branch 'master' of github.com:owid/etl into wizard-anomalist
pabloarosado Oct 11, 2024
4ed14b3
🎉 anomalist: Detect abrupt changes in consecutive versions of an indi…
pabloarosado Oct 11, 2024
5f3e9b6
Merge branch 'master' into wizard-anomalist
lucasrodes Oct 11, 2024
a8d77ea
Merge branch 'wizard-anomalist' of https://github.com/owid/etl into w…
lucasrodes Oct 11, 2024
b50c1a1
🎉 anomalist: Improve anomalist CLI (#3399)
pabloarosado Oct 11, 2024
f9f5668
✨ wizard: anomalist ui (2) (#3395)
lucasrodes Oct 13, 2024
a552776
Merge branch 'master' into wizard-anomalist
lucasrodes Oct 14, 2024
991d295
✨ anomalist: ui flow (#3402)
lucasrodes Oct 14, 2024
47ac33e
🎉 anomalist: Improve Anomalist backend (#3405)
pabloarosado Oct 15, 2024
8a6569b
✨ wizard: improve app flow (#3407)
lucasrodes Oct 15, 2024
09184fc
Merge branch 'master' into wizard-anomalist
lucasrodes Oct 15, 2024
42bab09
:sparkles: Integrate GP outlier detector (#3411)
Marigold Oct 16, 2024
ad5aa7e
✨ anomalist: stop using mock data (#3410)
lucasrodes Oct 16, 2024
2d98d3f
🐛 anomalist: Fix unknown variable ids (#3413)
pabloarosado Oct 16, 2024
a163c35
✨ anomalist: minor tweaks
lucasrodes Oct 16, 2024
786f81a
fix type
lucasrodes Oct 16, 2024
aab0dc2
Merge branch 'master' into wizard-anomalist
lucasrodes Oct 16, 2024
fe3027b
✨ anomalist: GP support, refactor functions, add dfReduced (#3416)
lucasrodes Oct 16, 2024
000d0c5
🎉 anomalist: Add population and analytics scores (#3412)
pabloarosado Oct 16, 2024
f5b58a9
✨ anomalist: test llms for summary (#3417)
lucasrodes Oct 16, 2024
885a6b5
:sparkles: Add max_time and n_jobs to GP (#3422)
Marigold Oct 16, 2024
9e1caac
:bug: Fix anomalist bugs (#3427)
Marigold Oct 18, 2024
45ed79a
🎉 anomalist: Experiment with different anomaly detection methods (#3420)
pabloarosado Oct 18, 2024
0476593
:sparkles: Add anomalist to owidbot (#3431)
Marigold Oct 21, 2024
8caeb49
🐛 anomalist: Fix bug with unknown indicators and long loading time (#…
pabloarosado Oct 21, 2024
92adc85
✨ anomalist: Small improvement in Anomalist filters (#3437)
pabloarosado Oct 21, 2024
f0c131c
:sparkles: Persist filter values in the URL for Anomalist (#3441)
Marigold Oct 21, 2024
b00bf99
Merge remote-tracking branch 'origin/master' into wizard-anomalist
Marigold Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added apps/anomalist/__init__.py
Empty file.
452 changes: 452 additions & 0 deletions apps/anomalist/anomalist_api.py

Large diffs are not rendered by default.

127 changes: 127 additions & 0 deletions apps/anomalist/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import json
from typing import Optional, Tuple, get_args

import click
import structlog
from joblib import Memory
from rich_click.rich_command import RichCommand

from apps.anomalist.anomalist_api import ANOMALY_TYPE, anomaly_detection
from etl.db import get_engine, read_sql
from etl.paths import CACHE_DIR

log = structlog.get_logger()

memory = Memory(CACHE_DIR, verbose=0)


@click.command(name="anomalist", cls=RichCommand, help=anomaly_detection.__doc__)
@click.option(
"--anomaly-types",
type=click.Choice(list(get_args(ANOMALY_TYPE))),
multiple=True,
default=None,
help="Type (or types) of anomaly detection algorithm to use.",
)
@click.option(
"--dataset-ids",
type=int,
multiple=True,
default=None,
help="Generate anomalies for the variables of a specific dataset ID (or multiple dataset IDs).",
)
@click.option(
"--variable-mapping",
type=str,
default="",
help="Optional JSON dictionary mapping variable IDs from a previous to a new version (where at least some of the new variable IDs must belong to the datasets whose IDs were given).",
)
@click.option(
"--variable-ids",
type=int,
multiple=True,
default=None,
help="Generate anomalies for a list of variable IDs (in addition to the ones from dataset ID, if any dataset was given).",
)
@click.option(
"--dry-run/--no-dry-run",
default=False,
type=bool,
help="Do not write to target database.",
)
@click.option(
"--force",
"-f",
is_flag=True,
help="TBD",
)
@click.option(
"--reset-db/--no-reset-db",
default=False,
type=bool,
help="Drop anomalies table and recreate it. This is useful for development when the schema changes.",
)
def cli(
anomaly_types: Optional[Tuple[str, ...]],
dataset_ids: Optional[list[int]],
variable_mapping: str,
variable_ids: Optional[list[int]],
dry_run: bool,
force: bool,
reset_db: bool,
) -> None:
"""TBD

TBD

**Example 1:** Create random anomaly for a dataset

```
$ etl anomalist --anomaly-type sample --dataset-ids 6369
```

**Example 2:** Create GP anomalies

```
$ etl anomalist --anomaly-type gp --dataset-ids 6369
```

**Example 3:** Create anomalies by comparing dataset to its previous version

```
$ etl anomalist --anomaly-type gp --dataset-ids 6589
```
"""
# Convert variable mapping from JSON to dictionary.
if variable_mapping:
try:
variable_mapping_dict = {
int(variable_old): int(variable_new)
for variable_old, variable_new in json.loads(variable_mapping).items()
}
except json.JSONDecodeError:
raise ValueError("Invalid JSON format for variable_mapping.")
else:
variable_mapping_dict = {}

# Load all variables from given datasets
if dataset_ids:
assert not variable_ids, "Cannot specify both dataset IDs and variable IDs."
q = """
select id from variables
where datasetId in %(dataset_ids)s
"""
variable_ids = list(read_sql(q, get_engine(), params={"dataset_ids": dataset_ids})["id"])

anomaly_detection(
anomaly_types=anomaly_types,
variable_mapping=variable_mapping_dict,
variable_ids=list(variable_ids) if variable_ids else None,
dry_run=dry_run,
force=force,
reset_db=reset_db,
)


if __name__ == "__main__":
cli()
232 changes: 232 additions & 0 deletions apps/anomalist/detectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
from typing import Dict, List

import numpy as np
import pandas as pd
import structlog
from sklearn.ensemble import IsolationForest
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.svm import OneClassSVM
from tqdm.auto import tqdm

from etl.data_helpers.misc import bard

log = structlog.get_logger()

# Name of index columns for dataframe.
INDEX_COLUMNS = ["entity_name", "year"]


def estimate_bard_epsilon(series: pd.Series) -> float:
# Make all values positive, and ignore zeros.
positive_values = abs(series.dropna())
# Ignore zeros, since they can lead to epsilon being zero, hence allowing division by zero in BARD.
positive_values = positive_values.loc[positive_values > 0]
# Estimate epsilon as the absolute range of values divided by 10.
# eps = (positive_values.max() - positive_values.min()) / 10
# Instead of just taking maximum and minimum, take 95th percentile and 5th percentile.
eps = (positive_values.quantile(0.95) - positive_values.quantile(0.05)) / 10

return eps


def get_long_format_score_df(df_score: pd.DataFrame) -> pd.DataFrame:
# Output is already in long format
if set(df_score.columns) == {"entity_name", "year", "variable_id", "anomaly_score"}:
df_score_long = df_score
else:
# Create a reduced score dataframe.
df_score_long = df_score.melt(
id_vars=["entity_name", "year"], var_name="variable_id", value_name="anomaly_score"
)

# Drop NaN anomalies.
df_score_long = df_score_long.dropna(subset=["anomaly_score"])

# Drop zero anomalies.
df_score_long = df_score_long[df_score_long["anomaly_score"] != 0]

# Save memory by converting to categoricals.
df_score_long = df_score_long.astype({"entity_name": "category", "year": "category", "variable_id": "category"})

return df_score_long


class AnomalyDetector:
anomaly_type: str

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"Anomaly happened in {entity} in {year}!"

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
raise NotImplementedError

def get_zeros_df(self, df: pd.DataFrame, variable_ids: List[int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_zeros = pd.DataFrame(np.zeros_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids]
df_zeros[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy()
return df_zeros

def get_nans_df(self, df: pd.DataFrame, variable_ids: List[int]) -> pd.DataFrame:
# Create a dataframe of nans.
df_nans = pd.DataFrame(np.empty_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids]
df_nans[variable_ids] = np.nan
df_nans[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy()
return df_nans


class AnomalyUpgradeMissing(AnomalyDetector):
"""New data misses entity-years that used to exist in old version."""

anomaly_type = "upgrade_missing"

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are missing values for {entity}! There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_lost = self.get_zeros_df(df, variable_ids)

# Make 1 all cells that used to have data in the old version, but are missing in the new version.
for variable_id_old, variable_id_new in variable_mapping.items():
affected_rows = df[(df[variable_id_old].notnull()) & (df[variable_id_new].isnull())].index
df_lost.loc[affected_rows, variable_id_new] = 1

return df_lost


class AnomalyUpgradeChange(AnomalyDetector):
"""New dataframe has changed abruptly with respect to the old version."""

anomaly_type = "upgrade_change"

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are abrupt changes for {entity} in {year}! There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_version_change = self.get_zeros_df(df, variable_ids)

for variable_id_old, variable_id_new in variable_mapping.items():
# Calculate the BARD epsilon for each variable.
eps = estimate_bard_epsilon(series=df[variable_id_new])
# Calculate the BARD for each variable.
variable_bard = bard(a=df[variable_id_old], b=df[variable_id_new], eps=eps)
# Add bard to the dataframe.
df_version_change[variable_id_new] = variable_bard

return df_version_change


class AnomalyTimeChange(AnomalyDetector):
"""New dataframe has abrupt changes in time series."""

anomaly_type = "time_change"

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are significant changes for {entity} in {year} compared to the old version of the indicator. There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_time_change = self.get_zeros_df(df, variable_ids)

# Sanity check.
error = "The function that detects abrupt time changes assumes the data is sorted by entity_name and year. But this is not the case. Either ensure the data is sorted, or fix the function."
assert (df.sort_values(by=INDEX_COLUMNS).index == df.index).all(), error
for variable_id in variable_ids:
series = df[variable_id].copy()
# Calculate the BARD epsilon for this variable.
eps = estimate_bard_epsilon(series=series)
# Calculate the BARD for this variable.
_bard = bard(series, series.shift(), eps).fillna(0)

# Add bard to the dataframe.
df_time_change[variable_id] = _bard
# The previous procedure includes the calculation of the deviation between the last point of an entity and the first point of the next, which is meaningless, and can lead to a high BARD.
# Therefore, make zero the first point of each entity_name for all columns.
# df_time_change.loc[df_time_change["entity_name"].diff().fillna(1) > 0, self.variable_ids] = 0
df_time_change.loc[df_time_change["entity_name"] != df_time_change["entity_name"].shift(), variable_ids] = 0

return df_time_change


class AnomalyIsolationForest(AnomalyDetector):
"""Anomaly detection using Isolation Forest, applied separately to each country-variable time series."""

anomaly_type = "isolation_forest"

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Initialize a dataframe of zeros.
df_anomalies = self.get_zeros_df(df, variable_ids)

# Initialize an imputer to handle missing values.
imputer = SimpleImputer(strategy="mean")

for variable_id in tqdm(variable_ids):
for country, group in df.groupby("entity_name", observed=True):
# Get the time series for the current country and variable.
series = group[[variable_id]].copy()

# Skip if the series is all zeros or nans.
if ((series == 0).all().values) or (series.dropna().shape[0] == 0):
continue

# Impute missing values for this country's time series.
series_imputed = imputer.fit_transform(series)

# Scale the data.
scaler = StandardScaler()
series_scaled = scaler.fit_transform(series_imputed)

# Initialize the Isolation Forest model.
isolation_forest = IsolationForest(contamination=0.05, random_state=1) # type: ignore

# Fit the model and calculate anomaly scores.
isolation_forest.fit(series_scaled)
scores = isolation_forest.decision_function(series_scaled)
df_anomalies.loc[df["entity_name"] == country, variable_id] = scores

return df_anomalies


class AnomalyOneClassSVM(AnomalyDetector):
"""Anomaly detection using One-Class SVM, applied separately to each country-variable time series."""

anomaly_type = "one_class_svm"

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Initialize a dataframe of zeros.
df_anomalies = self.get_zeros_df(df, variable_ids)

# Initialize an imputer to handle missing values.
imputer = SimpleImputer(strategy="mean")

for variable_id in tqdm(variable_ids):
for country, group in df.groupby("entity_name", observed=True):
# Get the time series for the current country and variable.
series = group[[variable_id]].copy()

# Skip if the series is all zeros or nans.
if ((series == 0).all().values) or (series.dropna().shape[0] == 0):
continue

# Impute missing values for this country's time series.
series_imputed = imputer.fit_transform(series)

# Scale the data for better performance.
scaler = StandardScaler()
series_scaled = scaler.fit_transform(series_imputed)

# Initialize the One-Class SVM model for this country's time series.
svm = OneClassSVM(kernel="rbf", gamma="scale", nu=0.05)

# Fit the model and calculate anomaly scores.
svm.fit(series_scaled)
scores = svm.decision_function(series_scaled)
df_anomalies.loc[df["entity_name"] == country, variable_id] = scores

return df_anomalies
Loading
Loading