Skip to content

Commit

Permalink
✨ wizard: anomalist (first draft) (#3363)
Browse files Browse the repository at this point in the history
* ✨ wizard: anomalies

* wip

* bump streamlit

* wip

* wip: chart

* wip

* todo

* plot indicator

* re-structure

* wip: loading indicators

* fix API grapher_chart

* deprecate chart_html

* chart_html -> grapher_chart

* clean

* ci/cd

* wip

* wip

* changed module name

* custom components module

* add methods to get uris

* new alias

* get dataset uris

* update import

* update gpt pricing

* update import

* wip

* provide entity-context for anomaly

* wip: anomalist v2

* wip

* wip

* lock

* ✨ anomalist: improve utils (#3385)

* wip

* db -> db_utils

* io -> db

* move things db_utils -> db

* db -> grapher_io

* db -> grapher_io, db_utils -> db

* docstring

* db_utils -> db

* wip

* remove indicator

* add overloads

* ci/cd

* wip

* cicd

* wip

* deprecation warnings

* missing import

* hide anomalist in wizard
  • Loading branch information
lucasrodes authored Oct 9, 2024
1 parent 0e894b2 commit d0afe39
Show file tree
Hide file tree
Showing 36 changed files with 1,794 additions and 685 deletions.
2 changes: 1 addition & 1 deletion api/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _load_and_validate_indicator(catalog_path: str) -> gm.Variable:
# update YAML file
with Session(engine) as session:
try:
db_indicator = gm.Variable.load_from_catalog_path(session, catalog_path)
db_indicator = gm.Variable.from_id_or_path(session, catalog_path)
except NoResultFound:
raise HTTPException(
404,
Expand Down
2 changes: 1 addition & 1 deletion apps/backport/backport.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
from apps.backport.datasync.data_metadata import (
_variable_metadata,
variable_data,
variable_data_df_from_s3,
)
from apps.backport.datasync.datasync import upload_gzip_dict
from etl import config, paths
from etl import grapher_model as gm
from etl.backport_helpers import GrapherConfig
from etl.db import get_engine, read_sql
from etl.files import checksum_str
from etl.grapher_io import variable_data_df_from_s3
from etl.snapshot import Snapshot, SnapshotMeta

from . import utils
Expand Down
134 changes: 1 addition & 133 deletions apps/backport/datasync/data_metadata.py
Original file line number Diff line number Diff line change
@@ -1,148 +1,16 @@
import concurrent.futures
import json
from copy import deepcopy
from http.client import RemoteDisconnected
from typing import Any, Dict, List, Union, cast
from urllib.error import HTTPError, URLError
from typing import Any, Dict, List, Union

import numpy as np
import pandas as pd
import requests
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from structlog import get_logger
from tenacity import Retrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed

from etl import config
from etl.config import OWIDEnv
from etl.db import read_sql

log = get_logger()


def _fetch_data_df_from_s3(variable_id: int):
try:
# Cloudflare limits us to 600 requests per minute, retry in case we hit the limit
# NOTE: increase wait time or attempts if we hit the limit too often
for attempt in Retrying(
wait=wait_fixed(2),
stop=stop_after_attempt(3),
retry=retry_if_exception_type((URLError, RemoteDisconnected)),
):
with attempt:
return (
pd.read_json(config.variable_data_url(variable_id))
.rename(
columns={
"entities": "entityId",
"values": "value",
"years": "year",
}
)
.assign(variableId=variable_id)
)
# no data on S3
except HTTPError:
return pd.DataFrame(columns=["variableId", "entityId", "year", "value"])


def variable_data_df_from_s3(
engine: Engine,
variable_ids: List[int] = [],
workers: int = 1,
value_as_str: bool = True,
) -> pd.DataFrame:
"""Fetch data from S3 and add entity code and name from DB."""
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(_fetch_data_df_from_s3, variable_ids))

if isinstance(results, list) and all(isinstance(df, pd.DataFrame) for df in results):
df = pd.concat(cast(List[pd.DataFrame], results))
else:
raise TypeError(f"results must be a list of pd.DataFrame, got {type(results)}")

# we work with strings and convert to specific types later
if value_as_str:
df["value"] = df["value"].astype("string")

with Session(engine) as session:
res = add_entity_code_and_name(session, df)
return res


def _fetch_metadata_from_s3(variable_id: int, env: OWIDEnv | None = None) -> Dict[str, Any] | None:
try:
# Cloudflare limits us to 600 requests per minute, retry in case we hit the limit
# NOTE: increase wait time or attempts if we hit the limit too often
for attempt in Retrying(
wait=wait_fixed(2),
stop=stop_after_attempt(3),
retry=retry_if_exception_type((URLError, RemoteDisconnected)),
):
with attempt:
if env is not None:
url = env.indicator_metadata_url(variable_id)
else:
url = config.variable_metadata_url(variable_id)
return requests.get(url).json()
# no data on S3
except HTTPError:
return {}


def variable_metadata_df_from_s3(
variable_ids: List[int] = [],
workers: int = 1,
env: OWIDEnv | None = None,
) -> List[Dict[str, Any]]:
"""Fetch data from S3 and add entity code and name from DB."""
args = [variable_ids]
if env:
args += [[env for _ in range(len(variable_ids))]]

with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(_fetch_metadata_from_s3, *args))

if not (isinstance(results, list) and all(isinstance(res, dict) for res in results)):
raise TypeError(f"results must be a list of dictionaries, got {type(results)}")

return results # type: ignore


def _fetch_entities(session: Session, entity_ids: List[int]) -> pd.DataFrame:
# Query entities from the database
q = """
SELECT
id AS entityId,
name AS entityName,
code AS entityCode
FROM entities
WHERE id in %(entity_ids)s
"""
return read_sql(q, session, params={"entity_ids": entity_ids})


def add_entity_code_and_name(session: Session, df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
df["entityName"] = []
df["entityCode"] = []
return df

unique_entities = df["entityId"].unique()

entities = _fetch_entities(session, list(unique_entities))

if set(unique_entities) - set(entities.entityId):
missing_entities = set(unique_entities) - set(entities.entityId)
raise ValueError(f"Missing entities in the database: {missing_entities}")

return pd.merge(df, entities.astype({"entityName": "category", "entityCode": "category"}), on="entityId")


def variable_data(data_df: pd.DataFrame) -> Dict[str, Any]:
data_df = data_df.rename(
columns={
Expand Down
2 changes: 1 addition & 1 deletion apps/explorer_update/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from structlog import get_logger
from tqdm.auto import tqdm

from etl.db import get_variables_data
from etl.grapher_io import get_variables_data
from etl.paths import EXPLORERS_DIR
from etl.version_tracker import VersionTracker

Expand Down
2 changes: 1 addition & 1 deletion apps/metadata_migrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def cli(
var_id = grapher_config["dimensions"][0]["variableId"]

with Session(engine) as session:
variable = gm.Variable.load_variable(session, var_id)
variable = gm.Variable.from_id_or_path(session, var_id)

assert variable.catalogPath, f"Variable {var_id} does not come from ETL. Migrate it there first."

Expand Down
18 changes: 17 additions & 1 deletion apps/utils/gpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
"gpt-3.5-turbo": "gpt-3.5-turbo-0125",
"gpt-4-turbo-preview": "gpt-4-0125-preview",
"gpt-4-turbo": "gpt-4-turbo-2024-04-09",
"gpt-4o": "gpt-4o-2024-05-13",
"gpt-4o": "gpt-4o-2024-08-06",
"o1-preview": "o1-preview-2024-09-12",
"gpt-4o-mini": "gpt-4o-mini-2024-07-18",
}
MODEL_RATES_1000_TOKEN = {
# GPT 3.5
Expand Down Expand Up @@ -59,6 +61,20 @@
"in": 5 / 1000,
"out": 15 / 1000,
},
"gpt-4o-2024-08-06": {
"in": 2.5 / 1000,
"out": 10 / 1000,
},
# GPTO 4o mini
"gpt-4o-mini-2024-07-18": {
"in": 0.150 / 1000,
"out": 0.600 / 1000,
},
# GPT o1
"o1-preview-2024-09-12": {
"in": 15 / 1000,
"out": 60 / 1000,
},
}
MODEL_RATES_1000_TOKEN = {
**MODEL_RATES_1000_TOKEN,
Expand Down
Loading

0 comments on commit d0afe39

Please sign in to comment.