From 6a6aa69bdc5093893292608236e827b2c6359a8f Mon Sep 17 00:00:00 2001 From: OpsWorks user jwalsh Date: Tue, 30 May 2017 09:39:35 +0000 Subject: [PATCH 1/6] code to populate the production schema --- eis/populate_production_schema.py | 165 ++++++++++++++++++ .../load_common_stored_procedures.sql | 61 ++++++- 2 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 eis/populate_production_schema.py diff --git a/eis/populate_production_schema.py b/eis/populate_production_schema.py new file mode 100644 index 00000000..baefa33c --- /dev/null +++ b/eis/populate_production_schema.py @@ -0,0 +1,165 @@ +import numpy as np +import pandas as pd +import yaml +import logging +import sys +import argparse +import pickle +import psycopg2 +import datetime +import time +import os +import pdb +from itertools import product +from joblib import Parallel, delayed + +from triage.storage import InMemoryModelStorageEngine +from . import setup_environment +from . import populate_features, populate_labels +from . import utils +from .run_models import RunModels +from triage.utils import save_experiment_and_get_hash + +from sklearn import linear_model, preprocessing + +log = logging.getLogger(__name__) + + +def main(#config_file_name, labels_config_file, args + ): + + now = datetime.datetime.now().strftime('%d-%m-%y_%H:%M:S') + log_filename = 'logs/{}.log'.format(now) + logging.basicConfig(format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + level=logging.DEBUG, + handlers=[logging.FileHandler(log_filename), logging.StreamHandler()]) + log = logging.getLogger('eis') + + db_engine = setup_environment.get_database() + + test_object = populate_production_schema(db_engine) + test_object.populate_production_predictions(632) + test_object.populate_production_time_delta() + test_object.populate_production_individual_importances('/mnt/data/public_safety/charlotte_eis/triage/matrix_feature_mix_1m_6y/') + + log.info("Done!") + return None + + + +class populate_production_schema(object): + + def __init__(self, db_engine): + self.db_engine = db_engine + + + def populate_production_predictions(self, chosen_model_group_id): + """ + Assumes production.predictions table exists + INPUT: the model group id you want to use and database info + OUTPUT: returns 'true' on completion + """ + + try: + db_engine = setup_environment.get_database() + except: + log.warning('Could not connect to the database') + raise + + db_conn = self.db_engine.raw_connection() + query = "select production.populate_predictions({:d});".format(chosen_model_group_id) + db_conn.cursor().execute(query) + db_conn.commit() + db_conn.close() + + + def populate_production_time_delta(self): + """ + Assumes production.time_delta table exists + INPUT: database info + OUTPUT: returns 'true' on completion + """ + + try: + db_engine = setup_environment.get_database() + except: + log.warning('Could not connect to the database') + raise + + db_conn = self.db_engine.raw_connection() + query = "select production.populate_time_delta();" + db_conn.cursor().execute(query) + db_conn.commit() + db_conn.close() + + + def populate_production_individual_importances(self, matrix_location): + """ + Assumes production.individual_importances table exists + """ + + try: + db_engine = setup_environment.get_database() + except: + log.warning('Could not connect to the database') + raise + + db_conn = db_engine.raw_connection() + query = """ select model_id, as_of_date, entity_id, a.score, b.matrix_uuid + from production.predictions as a + inner join results.predictions as b using (model_id, as_of_date, entity_id);""" + #result = db_conn.cursor().execute(query) + #print(type(result)) + #db_conn.close() + #query = "select model_id, entity_id, as_of_date, score from production.predictions where as_of_date = '2015-03-30';" + + + + individual_explanations_object = pd.DataFrame(columns=['model_id', 'as_of_date', 'entity_id', + 'risk_1', 'risk_2', 'risk_3', 'risk_4', 'risk_5']) + y_df = pd.read_sql(query, db_conn) + + for matrix in np.unique(y_df['matrix_uuid']): + file_location = matrix_location + '/' + matrix + '.h5' + try: + X_df = pd.read_hdf(file_location, 'df') + except: + log.warning('Failed to read hdf file') + + del X_df['outcome'] + + data = pd.merge(X_df, y_df, left_on=['officer_id', 'as_of_date'], right_on=['entity_id', 'as_of_date']) + del data['as_of_date'] + del data['model_id'] + y = data['score'] + X = preprocessing.normalize(data.drop('score', axis=1), copy=False) + X_names = data.columns.drop('score') + + reg = linear_model.Ridge() + reg.fit(X, y) + +""" + #daterange = pd.date_range('2015-01-01', '2016-03-31') + + +i = 0 +for single_date in daterange: + + for row in range(data.shape[0]): + score_contribution = X[row,:] * reg.coef_ + score_contribution_abs = np.absolute(score_contribution) + + individual_features = np.argpartition(score_contribution_abs, -5)[-5:] + individual_features_names = X_names[individual_features] + + row_insert = [y_df['model_id'][row], single_date, y_df['entity_id'][row]] + list(individual_features_names) + individual_explanations_object.loc[i,:] = row_insert + + i += 1 +""" + + +if __name__ == '__main__': + main() + + diff --git a/schemas/create_stored_procedures/load_common_stored_procedures.sql b/schemas/create_stored_procedures/load_common_stored_procedures.sql index daebb409..e5a13aa9 100644 --- a/schemas/create_stored_procedures/load_common_stored_procedures.sql +++ b/schemas/create_stored_procedures/load_common_stored_procedures.sql @@ -251,7 +251,7 @@ Returns a Table with: column_original_name | feature_long_name | of_type | time_aggregation | metric_used */ -CREATE OR REPLACE FUNCTION get_feature_complet_description (column_name TEXT, +CREATE OR REPLACE FUNCTION get_feature_complete_description (column_name TEXT, feature_dict JSON, time_agg_dict JSON, metric_dict JSON) @@ -293,4 +293,61 @@ BEGIN on metrics.key = t1.metric_used ; END; $$ -LANGUAGE 'plpgsql';; +LANGUAGE 'plpgsql'; + + + + +/* +FUNCTION THAT POPULATES THE PRODUCTION.PREDICTIONS TABLE + +inserts predictions going a year before the as_of_date and a month into the future +*/ + +create or replace function production.populate_predictions(chosen_model_group_id integer) +returns boolean as +$$ +begin + delete from production.predictions; + insert into production.predictions (model_id, as_of_date, entity_id, score, rank_abs, rank_pct) + select a.model_id, b.as_of_date, b.entity_id, b.score, b.rank_abs, b.rank_pct + from results.models as a + inner join results.predictions as b using (model_id) + inner join staging.officers_hub as c on b.entity_id = c.officer_id + where a.model_group_id = chosen_model_group_id + and a.train_end_time >= now() - interval '366 days' + and b.as_of_date <= a.train_end_time + interval '31 days'; + return true; +end; +$$ +language 'plpgsql'; + + + +/* +FUNCTION THAT POPULATES THE PRODUCTION.TIME_DELTA + +inserts changes in relative rank over the last day, week, month, quarter, and year +*/ +create or replace function production.populate_time_delta() +returns boolean as +$$ +begin + delete from production.time_delta; + insert into production.time_delta (model_id, entity_id, as_of_date, + last_day, last_week, last_month, last_quarter, last_year) + select + a.model_id, a.entity_id, a.as_of_date, + a.rank_pct - lag(a.rank_pct, 1) over (partition by a.entity_id order by a.as_of_date) as last_day, + a.rank_pct - lag(a.rank_pct, 7) over (partition by a.entity_id order by a.as_of_date) as last_week, + a.rank_pct - lag(a.rank_pct, 30) over (partition by a.entity_id order by a.as_of_date) as last_month, + a.rank_pct - lag(a.rank_pct, 91) over (partition by a.entity_id order by a.as_of_date) as last_quarter, + a.rank_pct - lag(a.rank_pct, 365) over (partition by a.entity_id order by a.as_of_date) as last_year + from production.predictions as a + inner join staging.officers_hub as b on a.entity_id = b.officer_id; + return true; +end; +$$ +language 'plpgsql'; + + From 3a58073d6c41037c73ca1186824a940a926ee26b Mon Sep 17 00:00:00 2001 From: OpsWorks user jwalsh Date: Tue, 30 May 2017 09:55:00 +0000 Subject: [PATCH 2/6] removed unused portions of the script --- eis/populate_production_schema.py | 69 +------------------------------ 1 file changed, 1 insertion(+), 68 deletions(-) diff --git a/eis/populate_production_schema.py b/eis/populate_production_schema.py index baefa33c..c7972470 100644 --- a/eis/populate_production_schema.py +++ b/eis/populate_production_schema.py @@ -25,8 +25,7 @@ log = logging.getLogger(__name__) -def main(#config_file_name, labels_config_file, args - ): +def main(): now = datetime.datetime.now().strftime('%d-%m-%y_%H:%M:S') log_filename = 'logs/{}.log'.format(now) @@ -93,72 +92,6 @@ def populate_production_time_delta(self): db_conn.close() - def populate_production_individual_importances(self, matrix_location): - """ - Assumes production.individual_importances table exists - """ - - try: - db_engine = setup_environment.get_database() - except: - log.warning('Could not connect to the database') - raise - - db_conn = db_engine.raw_connection() - query = """ select model_id, as_of_date, entity_id, a.score, b.matrix_uuid - from production.predictions as a - inner join results.predictions as b using (model_id, as_of_date, entity_id);""" - #result = db_conn.cursor().execute(query) - #print(type(result)) - #db_conn.close() - #query = "select model_id, entity_id, as_of_date, score from production.predictions where as_of_date = '2015-03-30';" - - - - individual_explanations_object = pd.DataFrame(columns=['model_id', 'as_of_date', 'entity_id', - 'risk_1', 'risk_2', 'risk_3', 'risk_4', 'risk_5']) - y_df = pd.read_sql(query, db_conn) - - for matrix in np.unique(y_df['matrix_uuid']): - file_location = matrix_location + '/' + matrix + '.h5' - try: - X_df = pd.read_hdf(file_location, 'df') - except: - log.warning('Failed to read hdf file') - - del X_df['outcome'] - - data = pd.merge(X_df, y_df, left_on=['officer_id', 'as_of_date'], right_on=['entity_id', 'as_of_date']) - del data['as_of_date'] - del data['model_id'] - y = data['score'] - X = preprocessing.normalize(data.drop('score', axis=1), copy=False) - X_names = data.columns.drop('score') - - reg = linear_model.Ridge() - reg.fit(X, y) - -""" - #daterange = pd.date_range('2015-01-01', '2016-03-31') - - -i = 0 -for single_date in daterange: - - for row in range(data.shape[0]): - score_contribution = X[row,:] * reg.coef_ - score_contribution_abs = np.absolute(score_contribution) - - individual_features = np.argpartition(score_contribution_abs, -5)[-5:] - individual_features_names = X_names[individual_features] - - row_insert = [y_df['model_id'][row], single_date, y_df['entity_id'][row]] + list(individual_features_names) - individual_explanations_object.loc[i,:] = row_insert - - i += 1 -""" - - if __name__ == '__main__': main() From 53f613f8ff70be1c2dbd5a0dd8c31d4a577f36ed Mon Sep 17 00:00:00 2001 From: OpsWorks user jwalsh Date: Mon, 12 Jun 2017 16:13:04 +0000 Subject: [PATCH 3/6] edited script to take arguments from the command line --- eis/feature_loader.py | 6 +++--- eis/populate_production_schema.py | 24 +++++++++++++++--------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/eis/feature_loader.py b/eis/feature_loader.py index 2df29656..460b0fd4 100644 --- a/eis/feature_loader.py +++ b/eis/feature_loader.py @@ -255,10 +255,10 @@ def get_dataset(self, as_of_dates_to_use): if 'ND' in table_name: complete_df = complete_df.merge(table, on='officer_id', how='left') else: - complete_df = complete_df.merge(table, on=['officer_id','as_of_date'], how='left') + complete_df = complete_df.merge(table, on=['officer_id','as_of_date'], how='left') - # Set index - complete_df = complete_df.set_index('officer_id') + # Set index + complete_df = complete_df.set_index('officer_id') # Zero imputation complete_df.fillna(0, inplace=True) diff --git a/eis/populate_production_schema.py b/eis/populate_production_schema.py index c7972470..1ba6187f 100644 --- a/eis/populate_production_schema.py +++ b/eis/populate_production_schema.py @@ -25,7 +25,7 @@ log = logging.getLogger(__name__) -def main(): +def main(chosen_model_group_id, matrix_location): now = datetime.datetime.now().strftime('%d-%m-%y_%H:%M:S') log_filename = 'logs/{}.log'.format(now) @@ -36,10 +36,11 @@ def main(): db_engine = setup_environment.get_database() - test_object = populate_production_schema(db_engine) - test_object.populate_production_predictions(632) + test_object = populate_production_schema(db_engine, chosen_model_group_id, + matrix_location) + test_object.populate_production_predictions() test_object.populate_production_time_delta() - test_object.populate_production_individual_importances('/mnt/data/public_safety/charlotte_eis/triage/matrix_feature_mix_1m_6y/') + #test_object.populate_production_individual_importances() log.info("Done!") return None @@ -48,11 +49,13 @@ def main(): class populate_production_schema(object): - def __init__(self, db_engine): + def __init__(self, db_engine, chosen_model_group_id, matrix_location): self.db_engine = db_engine + self.chosen_model_group_id = chosen_model_group_id + self.matrix_location = matrix_location - def populate_production_predictions(self, chosen_model_group_id): + def populate_production_predictions(self): """ Assumes production.predictions table exists INPUT: the model group id you want to use and database info @@ -66,7 +69,7 @@ def populate_production_predictions(self, chosen_model_group_id): raise db_conn = self.db_engine.raw_connection() - query = "select production.populate_predictions({:d});".format(chosen_model_group_id) + query = "select production.populate_predictions({:d});".format(self.chosen_model_group_id) db_conn.cursor().execute(query) db_conn.commit() db_conn.close() @@ -93,6 +96,9 @@ def populate_production_time_delta(self): if __name__ == '__main__': - main() - + parser = argparse.ArgumentParser() + parser.add_argument("--modelgroupid", type=int, help="pass your chosen model group id", default=1) + parser.add_argument("--matrixlocation", type=str, help="pass the path to your stored hdf files") + args = parser.parse_args() + main(args.modelgroupid, args.matrixlocation) From 3b26f97b9de7265f5153f3fd3c03b6189620f535 Mon Sep 17 00:00:00 2001 From: OpsWorks user jwalsh Date: Mon, 12 Jun 2017 16:19:19 +0000 Subject: [PATCH 4/6] use only one database connection in the class rather than several --- eis/populate_production_schema.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/eis/populate_production_schema.py b/eis/populate_production_schema.py index 1ba6187f..ddab3fd1 100644 --- a/eis/populate_production_schema.py +++ b/eis/populate_production_schema.py @@ -34,7 +34,10 @@ def main(chosen_model_group_id, matrix_location): handlers=[logging.FileHandler(log_filename), logging.StreamHandler()]) log = logging.getLogger('eis') - db_engine = setup_environment.get_database() + try: + db_engine = setup_environment.get_database() + except: + log.warning('Could not connect to the database') test_object = populate_production_schema(db_engine, chosen_model_group_id, matrix_location) @@ -62,12 +65,6 @@ def populate_production_predictions(self): OUTPUT: returns 'true' on completion """ - try: - db_engine = setup_environment.get_database() - except: - log.warning('Could not connect to the database') - raise - db_conn = self.db_engine.raw_connection() query = "select production.populate_predictions({:d});".format(self.chosen_model_group_id) db_conn.cursor().execute(query) @@ -82,12 +79,6 @@ def populate_production_time_delta(self): OUTPUT: returns 'true' on completion """ - try: - db_engine = setup_environment.get_database() - except: - log.warning('Could not connect to the database') - raise - db_conn = self.db_engine.raw_connection() query = "select production.populate_time_delta();" db_conn.cursor().execute(query) From af277721163747d6825556e5c9376a9fdd161307 Mon Sep 17 00:00:00 2001 From: OpsWorks user kackermann Date: Fri, 16 Jun 2017 23:27:21 +0000 Subject: [PATCH 5/6] added individual importances --- eis/populate_production_schema.py | 15 ++++- .../load_common_stored_procedures.sql | 60 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/eis/populate_production_schema.py b/eis/populate_production_schema.py index ddab3fd1..37737e44 100644 --- a/eis/populate_production_schema.py +++ b/eis/populate_production_schema.py @@ -43,7 +43,7 @@ def main(chosen_model_group_id, matrix_location): matrix_location) test_object.populate_production_predictions() test_object.populate_production_time_delta() - #test_object.populate_production_individual_importances() + test_object.populate_production_individual_importances() log.info("Done!") return None @@ -85,6 +85,19 @@ def populate_production_time_delta(self): db_conn.commit() db_conn.close() + def populate_production_individual_importances(self): + """ + Assumes production.individual_importances table exists + INPUT: the model group id you want to use and database info + OUTPUT: returns 'true' on completion + """ + + db_conn = self.db_engine.raw_connection() + query = "select production.populate_individual_importances({:d});".format(self.chosen_model_group_id) + db_conn.cursor().execute(query) + db_conn.commit() + db_conn.close() + if __name__ == '__main__': parser = argparse.ArgumentParser() diff --git a/schemas/create_stored_procedures/load_common_stored_procedures.sql b/schemas/create_stored_procedures/load_common_stored_procedures.sql index e5a13aa9..ab542be1 100644 --- a/schemas/create_stored_procedures/load_common_stored_procedures.sql +++ b/schemas/create_stored_procedures/load_common_stored_procedures.sql @@ -323,6 +323,66 @@ $$ language 'plpgsql'; +CREATE OR REPLACE FUNCTION production.populate_individual_importances(chosen_model_group_id INTEGER) + RETURNS BOOLEAN AS +$$ +BEGIN + DELETE FROM production.individual_importances; + INSERT INTO production.individual_importances (model_id, as_of_date, entity_id, risk_1, risk_2, risk_3, risk_4, risk_5) + WITH sub AS ( + SELECT + a.model_id, + b.as_of_date, + b.entity_id + FROM results.models AS a + INNER JOIN results.predictions AS b USING (model_id) + INNER JOIN staging.officers_hub AS c ON b.entity_id = c.officer_id + WHERE a.model_group_id = chosen_model_group_id + AND a.train_end_time >= now() - INTERVAL '366 days' + AND b.as_of_date <= a.train_end_time + INTERVAL '31 days' + GROUP BY + model_id, + as_of_date, + entity_id + ), importance_list AS ( + SELECT * + FROM sub, + LATERAL ( + SELECT feature + FROM results.feature_importances f + WHERE sub.model_id = f.model_id + AND rank_abs < 30 + ORDER BY random() + LIMIT 5 + ) a + ), officer_aggregates AS ( + SELECT + model_id, + as_of_date, + entity_id, + array_agg(feature) AS risk_array + FROM importance_list + GROUP BY + model_id, + as_of_date, + entity_id + ) + SELECT + model_id, + as_of_date, + entity_id, + risk_array [1] AS risk_1, + risk_array [2] AS risk_2, + risk_array [3] AS risk_3, + risk_array [4] AS risk_4, + risk_array [5] AS risk_5 + FROM officer_aggregates; + RETURN TRUE; +END; +$$ +LANGUAGE 'plpgsql'; + + /* FUNCTION THAT POPULATES THE PRODUCTION.TIME_DELTA From 752b0985a1e64df11892b062182b7072f380db54 Mon Sep 17 00:00:00 2001 From: OpsWorks user jwalsh Date: Mon, 19 Jun 2017 06:50:30 +0000 Subject: [PATCH 6/6] readable feature names --- eis/populate_production_schema.py | 43 +++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/eis/populate_production_schema.py b/eis/populate_production_schema.py index 37737e44..ae1d3a9b 100644 --- a/eis/populate_production_schema.py +++ b/eis/populate_production_schema.py @@ -1,6 +1,7 @@ import numpy as np import pandas as pd import yaml +import json import logging import sys import argparse @@ -50,6 +51,15 @@ def main(chosen_model_group_id, matrix_location): +def get_query(feature_name): + ## Load the features_config + with open('eis/features/features_descriptions.yaml') as f: + features_config = yaml.load(f) + query = """SELECT * from public.get_feature_complete_description('{feature}', + '{feature_names}'::JSON, '{time_aggregations}'::JSON, '{metrics}'::JSON)""".format(feature=feature_name,feature_names=json.dumps(features_config['feature_names']), time_aggregations = json.dumps(features_config['time_aggregations']), metrics = json.dumps(features_config['metrics_name'])) + return query + + class populate_production_schema(object): def __init__(self, db_engine, chosen_model_group_id, matrix_location): @@ -93,9 +103,42 @@ def populate_production_individual_importances(self): """ db_conn = self.db_engine.raw_connection() + query = "select production.populate_individual_importances({:d});".format(self.chosen_model_group_id) db_conn.cursor().execute(query) db_conn.commit() + + ## Get the feature names you are trying to map + feature_query = "SELECT distinct risk_1 as feature from production.individual_importances order by risk_1;" + feature_names = pd.read_sql(feature_query, db_conn) + + list_of_dfs = [] + for i in range(len(feature_names)): + list_of_dfs.append(pd.read_sql(get_query(feature_names.feature[i]), db_conn)) + + ## Concat the dfs into one df + feature_mapping = pd.concat(list_of_dfs, axis=0, ignore_index=True) + feature_mapping['column_new_name'] = feature_mapping['metric_name'] + ' ' + feature_mapping['feature_long_name'] + ' ' + feature_mapping['of_type'] + ' ' + feature_mapping['time_aggregation'] + feature_mapping = feature_mapping.drop(['metric_name', 'feature_long_name', 'of_type', 'time_aggregation'], axis=1) + + ## replace old values with new values + individual_importances = pd.read_sql('select * from production.individual_importances;', db_conn) + individual_importances1 = pd.merge(individual_importances[['model_id', 'as_of_date','risk_1']], feature_mapping, left_on='risk_1', right_on='column_original_name', how='left') + individual_importances2 = pd.merge(individual_importances[['model_id', 'as_of_date','risk_2']], feature_mapping, left_on='risk_2', right_on='column_original_name', how='left') + individual_importances3 = pd.merge(individual_importances[['model_id', 'as_of_date','risk_3']], feature_mapping, left_on='risk_3', right_on='column_original_name', how='left') + individual_importances4 = pd.merge(individual_importances[['model_id', 'as_of_date','risk_4']], feature_mapping, left_on='risk_4', right_on='column_original_name', how='left') + individual_importances5 = pd.merge(individual_importances[['model_id', 'as_of_date','risk_5']], feature_mapping, left_on='risk_5', right_on='column_original_name', how='left') + + individual_importances = pd.merge(individual_importances1, individual_importances2, on=['model_id', 'as_of_date']) + individual_importances = pd.merge(individual_importances, individual_importances3, on=['model_id', 'as_of_date']) + individual_importances = pd.merge(individual_importances, individual_importances4, on=['model_id', 'as_of_date']) + individual_importances = pd.merge(individual_importances, individual_importances5, on=['model_id', 'as_of_date']) + + + ## Write to csv + individual_importances.to_sql('production.individual_importances', db_conn, if_exists='replace') + + db_conn.close()