Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate production schema #219

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions eis/feature_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ def get_dataset(self, as_of_dates_to_use):
if 'ND' in table_name:
complete_df = complete_df.merge(table, on='officer_id', how='left')
else:
complete_df = complete_df.merge(table, on=['officer_id','as_of_date'], how='left')
complete_df = complete_df.merge(table, on=['officer_id','as_of_date'], how='left')

# Set index
complete_df = complete_df.set_index('officer_id')
# Set index
complete_df = complete_df.set_index('officer_id')

# Zero imputation
complete_df.fillna(0, inplace=True)
Expand Down
151 changes: 151 additions & 0 deletions eis/populate_production_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import numpy as np
import pandas as pd
import yaml
import json
import logging
import sys
import argparse
import pickle
import psycopg2
import datetime
import time
import os
import pdb
from itertools import product
from joblib import Parallel, delayed

from triage.storage import InMemoryModelStorageEngine
from . import setup_environment
from . import populate_features, populate_labels
from . import utils
from .run_models import RunModels
from triage.utils import save_experiment_and_get_hash

from sklearn import linear_model, preprocessing

log = logging.getLogger(__name__)


def main(chosen_model_group_id, matrix_location):

now = datetime.datetime.now().strftime('%d-%m-%y_%H:%M:S')
log_filename = 'logs/{}.log'.format(now)
logging.basicConfig(format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
level=logging.DEBUG,
handlers=[logging.FileHandler(log_filename), logging.StreamHandler()])
log = logging.getLogger('eis')

try:
db_engine = setup_environment.get_database()
except:
log.warning('Could not connect to the database')

test_object = populate_production_schema(db_engine, chosen_model_group_id,
matrix_location)
test_object.populate_production_predictions()
test_object.populate_production_time_delta()
test_object.populate_production_individual_importances()

log.info("Done!")
return None



def get_query(feature_name):
## Load the features_config
with open('eis/features/features_descriptions.yaml') as f:
features_config = yaml.load(f)
query = """SELECT * from public.get_feature_complete_description('{feature}',
'{feature_names}'::JSON, '{time_aggregations}'::JSON, '{metrics}'::JSON)""".format(feature=feature_name,feature_names=json.dumps(features_config['feature_names']), time_aggregations = json.dumps(features_config['time_aggregations']), metrics = json.dumps(features_config['metrics_name']))
return query


class populate_production_schema(object):

def __init__(self, db_engine, chosen_model_group_id, matrix_location):
self.db_engine = db_engine
self.chosen_model_group_id = chosen_model_group_id
self.matrix_location = matrix_location


def populate_production_predictions(self):
"""
Assumes production.predictions table exists
INPUT: the model group id you want to use and database info
OUTPUT: returns 'true' on completion
"""

db_conn = self.db_engine.raw_connection()
query = "select production.populate_predictions({:d});".format(self.chosen_model_group_id)
db_conn.cursor().execute(query)
db_conn.commit()
db_conn.close()


def populate_production_time_delta(self):
"""
Assumes production.time_delta table exists
INPUT: database info
OUTPUT: returns 'true' on completion
"""

db_conn = self.db_engine.raw_connection()
query = "select production.populate_time_delta();"
db_conn.cursor().execute(query)
db_conn.commit()
db_conn.close()

def populate_production_individual_importances(self):
"""
Assumes production.individual_importances table exists
INPUT: the model group id you want to use and database info
OUTPUT: returns 'true' on completion
"""

db_conn = self.db_engine.raw_connection()

query = "select production.populate_individual_importances({:d});".format(self.chosen_model_group_id)
db_conn.cursor().execute(query)
db_conn.commit()

## Get the feature names you are trying to map
feature_query = "SELECT distinct risk_1 as feature from production.individual_importances order by risk_1;"
feature_names = pd.read_sql(feature_query, db_conn)

list_of_dfs = []
for i in range(len(feature_names)):
list_of_dfs.append(pd.read_sql(get_query(feature_names.feature[i]), db_conn))

## Concat the dfs into one df
feature_mapping = pd.concat(list_of_dfs, axis=0, ignore_index=True)
feature_mapping['column_new_name'] = feature_mapping['metric_name'] + ' ' + feature_mapping['feature_long_name'] + ' ' + feature_mapping['of_type'] + ' ' + feature_mapping['time_aggregation']
feature_mapping = feature_mapping.drop(['metric_name', 'feature_long_name', 'of_type', 'time_aggregation'], axis=1)

## replace old values with new values
individual_importances = pd.read_sql('select * from production.individual_importances;', db_conn)
individual_importances1 = pd.merge(individual_importances[['model_id', 'as_of_date','risk_1']], feature_mapping, left_on='risk_1', right_on='column_original_name', how='left')
individual_importances2 = pd.merge(individual_importances[['model_id', 'as_of_date','risk_2']], feature_mapping, left_on='risk_2', right_on='column_original_name', how='left')
individual_importances3 = pd.merge(individual_importances[['model_id', 'as_of_date','risk_3']], feature_mapping, left_on='risk_3', right_on='column_original_name', how='left')
individual_importances4 = pd.merge(individual_importances[['model_id', 'as_of_date','risk_4']], feature_mapping, left_on='risk_4', right_on='column_original_name', how='left')
individual_importances5 = pd.merge(individual_importances[['model_id', 'as_of_date','risk_5']], feature_mapping, left_on='risk_5', right_on='column_original_name', how='left')

individual_importances = pd.merge(individual_importances1, individual_importances2, on=['model_id', 'as_of_date'])
individual_importances = pd.merge(individual_importances, individual_importances3, on=['model_id', 'as_of_date'])
individual_importances = pd.merge(individual_importances, individual_importances4, on=['model_id', 'as_of_date'])
individual_importances = pd.merge(individual_importances, individual_importances5, on=['model_id', 'as_of_date'])


## Write to csv
individual_importances.to_sql('production.individual_importances', db_conn, if_exists='replace')


db_conn.close()


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--modelgroupid", type=int, help="pass your chosen model group id", default=1)
parser.add_argument("--matrixlocation", type=str, help="pass the path to your stored hdf files")
args = parser.parse_args()
main(args.modelgroupid, args.matrixlocation)

121 changes: 119 additions & 2 deletions schemas/create_stored_procedures/load_common_stored_procedures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ Returns a Table with:
column_original_name | feature_long_name | of_type | time_aggregation | metric_used

*/
CREATE OR REPLACE FUNCTION get_feature_complet_description (column_name TEXT,
CREATE OR REPLACE FUNCTION get_feature_complete_description (column_name TEXT,
feature_dict JSON,
time_agg_dict JSON,
metric_dict JSON)
Expand Down Expand Up @@ -293,4 +293,121 @@ BEGIN
on metrics.key = t1.metric_used ;
END; $$

LANGUAGE 'plpgsql';;
LANGUAGE 'plpgsql';




/*
FUNCTION THAT POPULATES THE PRODUCTION.PREDICTIONS TABLE

inserts predictions going a year before the as_of_date and a month into the future
*/

create or replace function production.populate_predictions(chosen_model_group_id integer)
returns boolean as
$$
begin
delete from production.predictions;
insert into production.predictions (model_id, as_of_date, entity_id, score, rank_abs, rank_pct)
select a.model_id, b.as_of_date, b.entity_id, b.score, b.rank_abs, b.rank_pct
from results.models as a
inner join results.predictions as b using (model_id)
inner join staging.officers_hub as c on b.entity_id = c.officer_id
where a.model_group_id = chosen_model_group_id
and a.train_end_time >= now() - interval '366 days'
and b.as_of_date <= a.train_end_time + interval '31 days';
return true;
end;
$$
language 'plpgsql';


CREATE OR REPLACE FUNCTION production.populate_individual_importances(chosen_model_group_id INTEGER)
RETURNS BOOLEAN AS
$$
BEGIN
DELETE FROM production.individual_importances;
INSERT INTO production.individual_importances (model_id, as_of_date, entity_id, risk_1, risk_2, risk_3, risk_4, risk_5)
WITH sub AS (
SELECT
a.model_id,
b.as_of_date,
b.entity_id
FROM results.models AS a
INNER JOIN results.predictions AS b USING (model_id)
INNER JOIN staging.officers_hub AS c ON b.entity_id = c.officer_id
WHERE a.model_group_id = chosen_model_group_id
AND a.train_end_time >= now() - INTERVAL '366 days'
AND b.as_of_date <= a.train_end_time + INTERVAL '31 days'
GROUP BY
model_id,
as_of_date,
entity_id
), importance_list AS (
SELECT *
FROM sub,
LATERAL (
SELECT feature
FROM results.feature_importances f
WHERE sub.model_id = f.model_id
AND rank_abs < 30
ORDER BY random()
LIMIT 5
) a
), officer_aggregates AS (
SELECT
model_id,
as_of_date,
entity_id,
array_agg(feature) AS risk_array
FROM importance_list
GROUP BY
model_id,
as_of_date,
entity_id
)
SELECT
model_id,
as_of_date,
entity_id,
risk_array [1] AS risk_1,
risk_array [2] AS risk_2,
risk_array [3] AS risk_3,
risk_array [4] AS risk_4,
risk_array [5] AS risk_5
FROM officer_aggregates;
RETURN TRUE;
END;
$$
LANGUAGE 'plpgsql';



/*
FUNCTION THAT POPULATES THE PRODUCTION.TIME_DELTA

inserts changes in relative rank over the last day, week, month, quarter, and year
*/
create or replace function production.populate_time_delta()
returns boolean as
$$
begin
delete from production.time_delta;
insert into production.time_delta (model_id, entity_id, as_of_date,
last_day, last_week, last_month, last_quarter, last_year)
select
a.model_id, a.entity_id, a.as_of_date,
a.rank_pct - lag(a.rank_pct, 1) over (partition by a.entity_id order by a.as_of_date) as last_day,
a.rank_pct - lag(a.rank_pct, 7) over (partition by a.entity_id order by a.as_of_date) as last_week,
a.rank_pct - lag(a.rank_pct, 30) over (partition by a.entity_id order by a.as_of_date) as last_month,
a.rank_pct - lag(a.rank_pct, 91) over (partition by a.entity_id order by a.as_of_date) as last_quarter,
a.rank_pct - lag(a.rank_pct, 365) over (partition by a.entity_id order by a.as_of_date) as last_year
from production.predictions as a
inner join staging.officers_hub as b on a.entity_id = b.officer_id;
return true;
end;
$$
language 'plpgsql';