Skip to content

Commit

Permalink
✨ indicator-upgrader: store variable mapping (#3389)
Browse files Browse the repository at this point in the history
* Start a new staging server for branch 'variable-mapping'

* add to_sql

* define sqlite db name in variable

* new methods to store variable mapping

* force int if possible

* fix inifinte loop

* save variable mapping

* minor ui tweak

* add undo capabilities

* store var mapping
  • Loading branch information
lucasrodes authored Oct 10, 2024
1 parent 9a4447b commit d09cdbb
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 17 deletions.
47 changes: 39 additions & 8 deletions apps/wizard/app_pages/indicator_upgrade/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
from structlog import get_logger

from apps.wizard import utils
from apps.wizard.app_pages.indicator_upgrade.charts_update import get_affected_charts_and_preview, push_new_charts
from apps.wizard.app_pages.indicator_upgrade.charts_update import (
get_affected_charts_and_preview,
push_new_charts,
save_variable_mapping,
undo_upgrade_dialog,
)
from apps.wizard.app_pages.indicator_upgrade.dataset_selection import build_dataset_form
from apps.wizard.app_pages.indicator_upgrade.indicator_mapping import render_indicator_mapping
from apps.wizard.app_pages.indicator_upgrade.utils import get_datasets
Expand All @@ -40,7 +45,7 @@
page_title="Wizard: Indicator Upgrader",
layout="wide",
page_icon="🪄",
initial_sidebar_state="collapsed",
# initial_sidebar_state="collapsed",
menu_items={
"Report a bug": "https://github.com/owid/etl/issues/new?assignees=marigold%2Clucasrodes&labels=wizard&projects=&template=wizard-issue---.md&title=wizard%3A+meaningful+title+for+the+issue",
"About": """
Expand Down Expand Up @@ -148,14 +153,40 @@
if st.session_state.submitted_charts:
if isinstance(charts, list) and len(charts) > 0:
st.toast("Updating charts...")

# Push charts
push_new_charts(charts)

# Save variable mapping
save_variable_mapping(
indicator_mapping,
int(search_form.dataset_old_id),
int(search_form.dataset_new_id),
comments="Done with indicator-upgrader",
)

# Undo upgrade
st.markdown("Do you want to undo the indicator upgrade?")
st.button(
"Undo upgrade",
on_click=undo_upgrade_dialog,
icon=":material/undo:",
help="Undo all indicator upgrades",
key="btn_undo_upgrade_end",
)


##########################################################################################
# 4 UPDATE CHARTS
# 5 UNDO UPGRADE
#
# TODO: add description
# You may have accidentally upgraded the wrong indicators. Here you can undo the upgrade.
##########################################################################################
# if st.session_state.submitted_datasets and st.session_state.submitted_charts and st.session_state.submitted_indicators:
# if isinstance(charts, list) and len(charts) > 0:
# st.toast("Updating charts...")
# push_new_charts(charts, SCHEMA_CHART_CONFIG)
with st.sidebar:
st.markdown("### Advanced tools")
st.button(
"Undo upgrade",
on_click=undo_upgrade_dialog,
icon=":material/undo:",
help="Undo all indicator upgrades",
key="btn_undo_upgrade_sidebar",
)
57 changes: 57 additions & 0 deletions apps/wizard/app_pages/indicator_upgrade/charts_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import etl.grapher_model as gm
from apps.chart_sync.admin_api import AdminAPI
from apps.wizard.utils import set_states, st_page_link, st_toast_error
from apps.wizard.utils.db import WizardDB
from etl.config import OWID_ENV
from etl.helpers import get_schema_from_url
from etl.indicator_upgrade.indicator_update import find_charts_from_variable_ids, update_chart_config
Expand Down Expand Up @@ -130,3 +131,59 @@ def push_new_charts(charts: List[gm.Chart]) -> None:
"The charts were successfully updated! If indicators from other datasets also need to be upgraded, simply refresh this page, otherwise move on to `chart diff` to review all changes."
)
st_page_link("chart-diff")


def save_variable_mapping(
indicator_mapping: Dict[int, int], dataset_id_new: int, dataset_id_old: int, comments: str = ""
) -> None:
WizardDB.add_variable_mapping(
mapping=indicator_mapping,
dataset_id_new=dataset_id_new,
dataset_id_old=dataset_id_old,
comments=comments,
)


def undo_indicator_upgrade(indicator_mapping):
mapping_inverted = {v: k for k, v in indicator_mapping.items()}
with st.spinner("Undoing upgrade..."):
# Get affected charts
charts = get_affected_charts_and_preview(
mapping_inverted,
)

# TODO: instead of pushing new charts, we should revert the changes!
# To do this, we should have kept a copy or reference to the original revision.
# Idea: when 'push_new_charts' is called, store in a table the original revision of the chart.
push_new_charts(charts)

# Reset variable mapping
WizardDB.delete_variable_mapping()


@st.dialog("Undo upgrade", width="large")
def undo_upgrade_dialog():
mapping = WizardDB.get_variable_mapping()

if mapping != {}:
st.markdown(
"The following table shows the indicator mapping that has been applied to the charts. Undoing means inverting this mapping."
)
data = {
"id_old": list(mapping.keys()),
"id_new": list(mapping.values()),
}
st.dataframe(data)
st.button(
"Undo upgrade",
on_click=lambda m=mapping: undo_indicator_upgrade(m),
icon=":material/undo:",
help="Undo all indicator upgrades",
type="primary",
key="btn_undo_upgrade_2",
)
st.warning(
"Charts will still appear in chart-diff. This is because the chart configs have actually changed (their version has beem bumped). In the future, we do not want to show these charts in chart-diff. For the time being, you should reject these chart diffs."
)
else:
st.markdown("No indicator mapping found. Nothing to undo.")
4 changes: 2 additions & 2 deletions apps/wizard/app_pages/indicator_upgrade/indicator_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def __init__(self, indicator_upgrade: "IndicatorUpgrade"):
@st.fragment
def render(self, indicator_id_to_display, df_data=None):
with st.container(border=True):
cols = [100, 10, 10]
cols = [100, 5, 10]
cols = st.columns(cols, vertical_alignment="bottom")

# Indicators (old, new)
Expand Down Expand Up @@ -450,7 +450,7 @@ def _set_states_checkbox():
st.session_state[k][self.iu.key] = not st.session_state[k][self.iu.key]

st.checkbox(
label="Ignore",
label="Skip",
key=self.iu.key_ignore,
# label_visibility="collapsed",
value=self.iu.skip,
Expand Down
4 changes: 2 additions & 2 deletions apps/wizard/utils/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

@st.cache_data
def load_dataset_uris() -> List[str]:
return load_dataset_uris()
return io.load_dataset_uris()


@st.cache_data
Expand All @@ -19,7 +19,7 @@ def load_variables_in_dataset(
_owid_env: OWIDEnv = OWID_ENV,
) -> List[Variable]:
"""Load Variable objects that belong to a dataset with URI `dataset_uri`."""
return load_variables_in_dataset(dataset_uri, _owid_env)
return io.load_variables_in_dataset(dataset_uri, _owid_env)


@st.cache_data
Expand Down
124 changes: 122 additions & 2 deletions apps/wizard/utils/db.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
"""Interact with database."""
"""Interact with custom interim database.
NOTE: We are currently migrating some of the logic to our actual MySQL database. The goal is to have a tracking of user interaction with Wizard in our main database. For now, we will be storing new tables with names 'wizard__*' in the main database. In the future, we might want to have these tables in another database in staging.
Some of the tools here rely on a local temporary sqlite database. This database is a custom and temporary database used to store data in a server. Not intended for production use.
"""
import datetime as dt
import hashlib
import os
import time
from contextlib import contextmanager
from typing import Any, Generator, Literal, Optional, Tuple
from typing import Any, Dict, Generator, Literal, Optional, Tuple

import pandas as pd
import streamlit as st
from sqlalchemy import text
from sqlalchemy.orm import Session

from apps.wizard.utils.paths import STREAMLIT_SECRETS, WIZARD_DB
from etl.db import get_engine, read_sql, to_sql

# DB is set up
DB_IS_SET_UP = STREAMLIT_SECRETS.exists() & WIZARD_DB.exists()
Expand All @@ -20,6 +27,7 @@
TB_USAGE = "expert_usage"
TB_PR = "pull_requests"
TB_NS = "etl_news"
TB_VARMAP = "wiz__variable_mapping"
# Window accepted values
WND_LITERAL = Literal["1d", "7d"]

Expand Down Expand Up @@ -152,6 +160,84 @@ def get_news_summary(cls, window_type: WND_LITERAL) -> Tuple[str, str, str] | No
data[0][2],
)

@classmethod
def delete_variable_mapping(cls) -> None:
"""Delete variable mapping."""
if cls.table_exists(TB_VARMAP):
query = f"DELETE FROM {TB_VARMAP};"
engine = get_engine()
with Session(engine) as s:
s.execute(text(query))
s.commit()

@classmethod
def add_variable_mapping(
cls, mapping: Dict[int, int], dataset_id_old: int, dataset_id_new: int, comments: str = ""
) -> None:
"""Add a mapping to TB_VARMAP.
This table should have columns 'id_old' (key), 'id_new' (value), 'timestamp', and 'dataset_id_old' and 'dataset_id_new'.
"""
timestamp = dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%d %H:%M:%S")

# Build dataframe
query_params = [
{
"id_old": id_old,
"id_new": id_new,
"timestamp": timestamp,
"dataset_id_old": dataset_id_old,
"dataset_id_new": dataset_id_new,
"comments": comments,
}
for id_old, id_new in mapping.items()
]
df = pd.DataFrame(query_params)

# Insert in table
to_sql(df, TB_VARMAP, if_exists="append", index=False)

@classmethod
def get_variable_mapping_raw(cls) -> pd.DataFrame:
"""Get the mapping from TB_VARMAP."""
if cls.table_exists(TB_VARMAP):
return read_sql(f"SELECT * FROM {TB_VARMAP};")
return pd.DataFrame()

@classmethod
def get_variable_mapping(cls) -> Dict[int, int]:
"""Get variable mapping.
This mapping can be the result of multiple mappings.
Example: you upgrade indicators twice, the mapping will be the result of the two mappings.
First mapping is: 1 -> 4 and 2 -> 5
Second mapping is: 4 -> 2
Then, the resulting mapping is 1 -> 2, 2 -> 5, and 4 -> 2.
"""
df = cls.get_variable_mapping_raw()

if df.empty:
return {}

mapping = simplify_varmap(df)

return mapping

@classmethod
def table_exists(cls, tb_name: str):
"""Check if table exists in the database."""
query = """
SELECT *
FROM information_schema.tables
WHERE table_schema = 'owid';
"""
df = read_sql(query)
return tb_name in set(df["TABLE_NAME"])


@contextmanager
def create_session(db_name: str) -> Generator[Session, None, None]:
Expand All @@ -165,3 +251,37 @@ def _prepare_query_insert(tb_name: str, fields: Tuple[Any]) -> str:
values = ":" + ", :".join(fields)
query = f"INSERT INTO {tb_name} {fields} VALUES ({values});"
return query


def simplify_varmap(df):
groups = df.groupby("timestamp")

mapping = {}
# Iterate over each 'submitted mapping'
for group in groups:
# Get mapping for a certain timestamp
mapping_ = group[1][["id_old", "id_new"]].set_index("id_old")["id_new"].to_dict()

# Initialize the mapping
if mapping == {}:
mapping = mapping_
continue

# Sanity check that: there is no key in mapping_ already present in mapping
if any(k in mapping for k in mapping_):
raise ValueError(
"The variable mapping has an unexpected format. An indicator is being upgraded multiple times."
)

# Update the mapping sequentially
for k, v in mapping.items():
if v in mapping_:
mapping[k] = mapping_[v]

# Update with new mappings
mapping = mapping | mapping_

# Remove self-mappings
mapping_no_identical = {k: v for k, v in mapping.items() if k != v}

return mapping_no_identical
7 changes: 5 additions & 2 deletions apps/wizard/utils/paths.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from etl.paths import BASE_DIR

WIZARD_CFG_NAME = ".wizardcfg"
WIZARD_DB_NAME = "wizard.db"

# PATH TO WIZARD CONFIG FOLDER
WIZARD_CFG = BASE_DIR / ".wizardcfg"
WIZARD_CFG = BASE_DIR / WIZARD_CFG_NAME

# PATH WIZARD DEFAULTS (old)
WIZARD_VARIABLES_DEFAULTS_OLD = BASE_DIR / ".wizard"

# PATH WIZARD DEFAULTS (new)
WIZARD_VARIABLES_DEFAULTS = WIZARD_CFG / "defaults.json"
WIZARD_DB = WIZARD_CFG / "wizard.db"
WIZARD_DB = WIZARD_CFG / WIZARD_DB_NAME

# STREAMLIT SECRETS
STREAMLIT_SECRETS = BASE_DIR / ".streamlit" / "secrets.toml"
16 changes: 16 additions & 0 deletions etl/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,19 @@ def read_sql(sql: str, engine: Optional[Engine | Session] = None, *args, **kwarg
return pd.read_sql(sql, engine.bind, *args, **kwargs)
else:
raise ValueError(f"Unsupported engine type {type(engine)}")


def to_sql(df: pd.DataFrame, name: str, engine: Optional[Engine | Session] = None, *args, **kwargs):
"""Wrapper around pd.to_sql that creates a connection and closes it after reading the data.
This adds overhead, so if you need performance, reuse the same connection and cursor.
"""
engine = engine or get_engine()
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
if isinstance(engine, Engine):
with engine.connect() as con:
return df.to_sql(name, con, *args, **kwargs)
elif isinstance(engine, Session):
return df.to_sql(name, engine.bind, *args, **kwargs)
else:
raise ValueError(f"Unsupported engine type {type(engine)}")
11 changes: 10 additions & 1 deletion etl/grapher_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,16 @@ def load_variable(
id_or_path: str | int,
owid_env: OWIDEnv = OWID_ENV,
) -> Variable:
"""Load variable"""
"""Load variable.
If id_or_path is str, it'll be used as catalog path.
"""
if not isinstance(id_or_path, str):
try:
id_or_path = int(id_or_path)
except Exception:
pass

with Session(owid_env.engine) as session:
variable = Variable.from_id_or_path(
session=session,
Expand Down

0 comments on commit d09cdbb

Please sign in to comment.