From 7affd57ecb375f91d095e88ba57cbe6dcdad62aa Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Tue, 7 Jul 2020 16:59:51 -0300 Subject: [PATCH 01/11] Include improvements from the parallel branch one by one --- synthpop/synthesizer.py | 114 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/synthpop/synthesizer.py b/synthpop/synthesizer.py index f5532b1..48e8fdd 100644 --- a/synthpop/synthesizer.py +++ b/synthpop/synthesizer.py @@ -5,6 +5,11 @@ import numpy as np import pandas as pd from scipy.stats import chisquare +from tqdm import tqdm +from concurrent.futures import ProcessPoolExecutor, as_completed +import multiprocessing +from multiprocessing import Pool +from itertools import repeat from . import categorizer as cat from . import draw @@ -89,6 +94,115 @@ def synthesize(h_marg, p_marg, h_jd, p_jd, h_pums, p_pums, num_households, h_pums, p_pums, household_freq, h_constraint, p_constraint, best_weights, hh_index_start=hh_index_start) +def geog_preprocessing(geog_id, recipe, marginal_zero_sub, jd_zero_sub, + hh_index_start): + h_marg = recipe.get_household_marginal_for_geography(geog_id) + logger.debug("Household marginal") + logger.debug(h_marg) + + p_marg = recipe.get_person_marginal_for_geography(geog_id) + logger.debug("Person marginal") + logger.debug(p_marg) + + h_pums, h_jd = recipe.\ + get_household_joint_dist_for_geography(geog_id) + logger.debug("Household joint distribution") + logger.debug(h_jd) + + p_pums, p_jd = recipe.get_person_joint_dist_for_geography(geog_id) + logger.debug("Person joint distribution") + logger.debug(p_jd) + + return h_marg, p_marg, h_jd, p_jd, h_pums, p_pums, marginal_zero_sub,\ + jd_zero_sub, hh_index_start + +def synthesize_all_in_parallel( + recipe, num_geogs=None, indexes=None, marginal_zero_sub=.01, + jd_zero_sub=.001, hh_index_start = 0): + # , max_workers=None, hh_index_start=0): + """ + Returns + ------- + households, people : pandas.DataFrame + fit_quality : dict of FitQuality + Keys are geographic IDs, values are namedtuples with attributes + ``.household_chisq``, ``household_p``, ``people_chisq``, + and ``people_p``. + ignore_max_iters: boolean which indicates to ignore the max iterations in the ipu. Default, False. + """ + # cluster = LocalCluster() + # client = Client(cluster) + with ProcessPoolExecutor(max_workers=5) as ex: + + if indexes is None: + indexes = recipe.get_available_geography_ids() + + hh_list = [] + people_list = [] + cnt = 0 + fit_quality = {} + # hh_index_start = 0 + geog_synth_args = [] + finished_args = [] + geog_ids = [] + futures = [] + + print('Submitting function args for parallel processing:') + for i, geog_id in enumerate(indexes): + geog_synth_args.append(ex.submit( + geog_preprocessing, geog_id, recipe, marginal_zero_sub, + jd_zero_sub, hh_index_start)) + geog_ids.append(geog_id) + cnt += 1 + if num_geogs is not None and cnt >= num_geogs: + break + + print('Processing function args in parallel:') + for finished_arg in tqdm( + as_completed(geog_synth_args), total=len(geog_synth_args)): + finished_args.append(finished_arg.result()) + + print('Submitting {0} geographies for parallel processing.'.format( + len(finished_args))) + futures = [ + ex.submit(synthesize, *geog_args) for geog_args in finished_args] + + print('Beginning population synthesis in parallel:') + for f in tqdm(as_completed(futures), total=len(futures)): + pass + + print('Processing results:') + for i, future in tqdm(enumerate(futures), total=len(futures)): + try: + households, people, people_chisq, people_p = future.result() + except Exception as e: + print('Generated an exception: {0}'.format(e)) + else: + geog_id = geog_ids[i] + + # Append location identifiers to the synthesized households + for geog_cat in geog_id.keys(): + households[geog_cat] = geog_id[geog_cat] + + # update the household_ids since we can't do it in the call to + # synthesize when we execute in parallel + households.index += hh_index_start + people.hh_id += hh_index_start + + hh_list.append(households) + people_list.append(people) + key = BlockGroupID( + geog_id['state'], geog_id['county'], geog_id['tract'], + geog_id['block group']) + fit_quality[key] = FitQuality(people_chisq, people_p) + + if len(households) > 0: + hh_index_start = households.index.values[-1] + 1 + + all_households = pd.concat(hh_list) + all_persons = pd.concat(people_list, ignore_index=True) + + return (all_households, all_persons, fit_quality) def synthesize_all(recipe, num_geogs=None, indexes=None, marginal_zero_sub=.01, jd_zero_sub=.001): From 333bc78418f42df16d71e51da213c9282b9c04da Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Fri, 10 Jul 2020 16:48:56 -0300 Subject: [PATCH 02/11] Fix ordering bug --- synthpop/synthesizer.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/synthpop/synthesizer.py b/synthpop/synthesizer.py index 48e8fdd..d75eb60 100644 --- a/synthpop/synthesizer.py +++ b/synthpop/synthesizer.py @@ -157,19 +157,19 @@ def synthesize_all_in_parallel( if num_geogs is not None and cnt >= num_geogs: break - print('Processing function args in parallel:') - for finished_arg in tqdm( - as_completed(geog_synth_args), total=len(geog_synth_args)): - finished_args.append(finished_arg.result()) + # print('Processing function args in parallel:') + # for finished_arg in tqdm( + # as_completed(geog_synth_args), total=len(geog_synth_args)): + # finished_args.append(finished_arg.result()) print('Submitting {0} geographies for parallel processing.'.format( len(finished_args))) futures = [ - ex.submit(synthesize, *geog_args) for geog_args in finished_args] + ex.submit(synthesize, *geog_args) for geog_args in geog_synth_args] - print('Beginning population synthesis in parallel:') - for f in tqdm(as_completed(futures), total=len(futures)): - pass + # print('Beginning population synthesis in parallel:') + # for f in tqdm(as_completed(futures), total=len(futures)): + # pass print('Processing results:') for i, future in tqdm(enumerate(futures), total=len(futures)): From 8fc5b7d65dc482edeee91bae58cd9b05ff405ad7 Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Fri, 10 Jul 2020 18:37:43 -0300 Subject: [PATCH 03/11] provide arguments instead of a future obj --- synthpop/synthesizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synthpop/synthesizer.py b/synthpop/synthesizer.py index d75eb60..a32ac6b 100644 --- a/synthpop/synthesizer.py +++ b/synthpop/synthesizer.py @@ -165,7 +165,7 @@ def synthesize_all_in_parallel( print('Submitting {0} geographies for parallel processing.'.format( len(finished_args))) futures = [ - ex.submit(synthesize, *geog_args) for geog_args in geog_synth_args] + ex.submit(synthesize, *geog_args.result()) for geog_args in geog_synth_args] # print('Beginning population synthesis in parallel:') # for f in tqdm(as_completed(futures), total=len(futures)): From 4755e8ccc447d6610104bd11346de1dcd764d7e6 Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Fri, 10 Jul 2020 19:39:01 -0300 Subject: [PATCH 04/11] re organize code to first run, then continue --- synthpop/synthesizer.py | 74 ++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 41 deletions(-) diff --git a/synthpop/synthesizer.py b/synthpop/synthesizer.py index a32ac6b..46f7368 100644 --- a/synthpop/synthesizer.py +++ b/synthpop/synthesizer.py @@ -157,52 +157,44 @@ def synthesize_all_in_parallel( if num_geogs is not None and cnt >= num_geogs: break - # print('Processing function args in parallel:') - # for finished_arg in tqdm( - # as_completed(geog_synth_args), total=len(geog_synth_args)): - # finished_args.append(finished_arg.result()) - + with ProcessPoolExecutor(max_workers=5) as ex: print('Submitting {0} geographies for parallel processing.'.format( len(finished_args))) futures = [ ex.submit(synthesize, *geog_args.result()) for geog_args in geog_synth_args] - # print('Beginning population synthesis in parallel:') - # for f in tqdm(as_completed(futures), total=len(futures)): - # pass - - print('Processing results:') - for i, future in tqdm(enumerate(futures), total=len(futures)): - try: - households, people, people_chisq, people_p = future.result() - except Exception as e: - print('Generated an exception: {0}'.format(e)) - else: - geog_id = geog_ids[i] - - # Append location identifiers to the synthesized households - for geog_cat in geog_id.keys(): - households[geog_cat] = geog_id[geog_cat] - - # update the household_ids since we can't do it in the call to - # synthesize when we execute in parallel - households.index += hh_index_start - people.hh_id += hh_index_start - - hh_list.append(households) - people_list.append(people) - key = BlockGroupID( - geog_id['state'], geog_id['county'], geog_id['tract'], - geog_id['block group']) - fit_quality[key] = FitQuality(people_chisq, people_p) - - if len(households) > 0: - hh_index_start = households.index.values[-1] + 1 - - all_households = pd.concat(hh_list) - all_persons = pd.concat(people_list, ignore_index=True) - - return (all_households, all_persons, fit_quality) + print('Processing results:') + for i, future in tqdm(enumerate(futures), total=len(futures)): + try: + households, people, people_chisq, people_p = future.result() + except Exception as e: + print('Generated an exception: {0}'.format(e)) + else: + geog_id = geog_ids[i] + + # Append location identifiers to the synthesized households + for geog_cat in geog_id.keys(): + households[geog_cat] = geog_id[geog_cat] + + # update the household_ids since we can't do it in the call to + # synthesize when we execute in parallel + households.index += hh_index_start + people.hh_id += hh_index_start + + hh_list.append(households) + people_list.append(people) + key = BlockGroupID( + geog_id['state'], geog_id['county'], geog_id['tract'], + geog_id['block group']) + fit_quality[key] = FitQuality(people_chisq, people_p) + + if len(households) > 0: + hh_index_start = households.index.values[-1] + 1 + + all_households = pd.concat(hh_list) + all_persons = pd.concat(people_list, ignore_index=True) + + return (all_households, all_persons, fit_quality) def synthesize_all(recipe, num_geogs=None, indexes=None, marginal_zero_sub=.01, jd_zero_sub=.001): From a53935913c8d94009c5660e64ffee2767af5f52c Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Mon, 13 Jul 2020 19:08:26 -0300 Subject: [PATCH 05/11] If there is an exception, we let the model fail --- synthpop/synthesizer.py | 52 ++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/synthpop/synthesizer.py b/synthpop/synthesizer.py index 46f7368..f8b6463 100644 --- a/synthpop/synthesizer.py +++ b/synthpop/synthesizer.py @@ -130,10 +130,9 @@ def synthesize_all_in_parallel( and ``people_p``. ignore_max_iters: boolean which indicates to ignore the max iterations in the ipu. Default, False. """ - # cluster = LocalCluster() - # client = Client(cluster) - with ProcessPoolExecutor(max_workers=5) as ex: + with ProcessPoolExecutor(max_workers=5) as ex: + if indexes is None: indexes = recipe.get_available_geography_ids() @@ -141,7 +140,6 @@ def synthesize_all_in_parallel( people_list = [] cnt = 0 fit_quality = {} - # hh_index_start = 0 geog_synth_args = [] finished_args = [] geog_ids = [] @@ -165,31 +163,27 @@ def synthesize_all_in_parallel( print('Processing results:') for i, future in tqdm(enumerate(futures), total=len(futures)): - try: - households, people, people_chisq, people_p = future.result() - except Exception as e: - print('Generated an exception: {0}'.format(e)) - else: - geog_id = geog_ids[i] - - # Append location identifiers to the synthesized households - for geog_cat in geog_id.keys(): - households[geog_cat] = geog_id[geog_cat] - - # update the household_ids since we can't do it in the call to - # synthesize when we execute in parallel - households.index += hh_index_start - people.hh_id += hh_index_start - - hh_list.append(households) - people_list.append(people) - key = BlockGroupID( - geog_id['state'], geog_id['county'], geog_id['tract'], - geog_id['block group']) - fit_quality[key] = FitQuality(people_chisq, people_p) - - if len(households) > 0: - hh_index_start = households.index.values[-1] + 1 + households, people, people_chisq, people_p = future.result() + geog_id = geog_ids[i] + + # Append location identifiers to the synthesized households + for geog_cat in geog_id.keys(): + households[geog_cat] = geog_id[geog_cat] + + # update the household_ids since we can't do it in the call to + # synthesize when we execute in parallel + households.index += hh_index_start + people.hh_id += hh_index_start + + hh_list.append(households) + people_list.append(people) + key = BlockGroupID( + geog_id['state'], geog_id['county'], geog_id['tract'], + geog_id['block group']) + fit_quality[key] = FitQuality(people_chisq, people_p) + + if len(households) > 0: + hh_index_start = households.index.values[-1] + 1 all_households = pd.concat(hh_list) all_persons = pd.concat(people_list, ignore_index=True) From 7e41e67391cf1b64da149ff8f56161543745ee4c Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Mon, 13 Jul 2020 19:23:30 -0300 Subject: [PATCH 06/11] delete unnecessary packages --- synthpop/synthesizer.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/synthpop/synthesizer.py b/synthpop/synthesizer.py index f8b6463..62be5fa 100644 --- a/synthpop/synthesizer.py +++ b/synthpop/synthesizer.py @@ -7,9 +7,6 @@ from scipy.stats import chisquare from tqdm import tqdm from concurrent.futures import ProcessPoolExecutor, as_completed -import multiprocessing -from multiprocessing import Pool -from itertools import repeat from . import categorizer as cat from . import draw @@ -132,7 +129,7 @@ def synthesize_all_in_parallel( """ with ProcessPoolExecutor(max_workers=5) as ex: - + if indexes is None: indexes = recipe.get_available_geography_ids() From 49b89e7b37af87944d8bc5ff409dbf8fa32cb52d Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Mon, 13 Jul 2020 19:33:38 -0300 Subject: [PATCH 07/11] Add reference to geoid we are processing --- synthpop/synthesizer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synthpop/synthesizer.py b/synthpop/synthesizer.py index 62be5fa..b8a58bc 100644 --- a/synthpop/synthesizer.py +++ b/synthpop/synthesizer.py @@ -160,6 +160,7 @@ def synthesize_all_in_parallel( print('Processing results:') for i, future in tqdm(enumerate(futures), total=len(futures)): + print ('Processing results for {}'.format(geog_id[geog_cat])) households, people, people_chisq, people_p = future.result() geog_id = geog_ids[i] From f5c68df158296677b53c2fcf4e3942dafd72d9ad Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Tue, 14 Jul 2020 16:07:41 -0300 Subject: [PATCH 08/11] correct the geog id to print --- synthpop/synthesizer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synthpop/synthesizer.py b/synthpop/synthesizer.py index b8a58bc..7cc0c2a 100644 --- a/synthpop/synthesizer.py +++ b/synthpop/synthesizer.py @@ -160,9 +160,10 @@ def synthesize_all_in_parallel( print('Processing results:') for i, future in tqdm(enumerate(futures), total=len(futures)): - print ('Processing results for {}'.format(geog_id[geog_cat])) - households, people, people_chisq, people_p = future.result() geog_id = geog_ids[i] + print ('Processing results for: ', geog_id) + households, people, people_chisq, people_p = future.result() + # Append location identifiers to the synthesized households for geog_cat in geog_id.keys(): From a7199edf63c59c4e80375f2d46daf4545e067b28 Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Tue, 9 Nov 2021 16:59:18 -0300 Subject: [PATCH 09/11] update parallel branch --- synthpop/synthesizer.py | 62 +++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/synthpop/synthesizer.py b/synthpop/synthesizer.py index 7cc0c2a..42716f0 100644 --- a/synthpop/synthesizer.py +++ b/synthpop/synthesizer.py @@ -6,7 +6,7 @@ import pandas as pd from scipy.stats import chisquare from tqdm import tqdm -from concurrent.futures import ProcessPoolExecutor, as_completed +from concurrent.futures import ProcessPoolExecutor from . import categorizer as cat from . import draw @@ -73,8 +73,8 @@ def synthesize(h_marg, p_marg, h_jd, p_jd, h_pums, p_pums, person_freq, h_constraint, p_constraint) - logger.info("Time to run ipu: %.3fs" % (time.time()-t1)) + logger.info("Time to run ipu: %.3fs" % (time.time() - t1)) logger.debug("IPU weights:") logger.debug(best_weights.describe()) logger.debug("Fit quality:") @@ -115,8 +115,7 @@ def geog_preprocessing(geog_id, recipe, marginal_zero_sub, jd_zero_sub, def synthesize_all_in_parallel( recipe, num_geogs=None, indexes=None, marginal_zero_sub=.01, - jd_zero_sub=.001, hh_index_start = 0): - # , max_workers=None, hh_index_start=0): + jd_zero_sub=.001, max_workers=5, hh_index_start=0): """ Returns ------- @@ -125,11 +124,8 @@ def synthesize_all_in_parallel( Keys are geographic IDs, values are namedtuples with attributes ``.household_chisq``, ``household_p``, ``people_chisq``, and ``people_p``. - ignore_max_iters: boolean which indicates to ignore the max iterations in the ipu. Default, False. """ - - with ProcessPoolExecutor(max_workers=5) as ex: - + with ProcessPoolExecutor(max_workers) as ex: if indexes is None: indexes = recipe.get_available_geography_ids() @@ -152,9 +148,8 @@ def synthesize_all_in_parallel( if num_geogs is not None and cnt >= num_geogs: break + with ProcessPoolExecutor(max_workers=5) as ex: - print('Submitting {0} geographies for parallel processing.'.format( - len(finished_args))) futures = [ ex.submit(synthesize, *geog_args.result()) for geog_args in geog_synth_args] @@ -162,27 +157,29 @@ def synthesize_all_in_parallel( for i, future in tqdm(enumerate(futures), total=len(futures)): geog_id = geog_ids[i] print ('Processing results for: ', geog_id) - households, people, people_chisq, people_p = future.result() - - - # Append location identifiers to the synthesized households - for geog_cat in geog_id.keys(): - households[geog_cat] = geog_id[geog_cat] - - # update the household_ids since we can't do it in the call to - # synthesize when we execute in parallel - households.index += hh_index_start - people.hh_id += hh_index_start - - hh_list.append(households) - people_list.append(people) - key = BlockGroupID( - geog_id['state'], geog_id['county'], geog_id['tract'], - geog_id['block group']) - fit_quality[key] = FitQuality(people_chisq, people_p) - - if len(households) > 0: - hh_index_start = households.index.values[-1] + 1 + try: + households, people, people_chisq, people_p = future.result() + except: + raise ValueError('The synthesis failed for geog_id: {}'.format(geog_id)) + else: + # Append location identifiers to the synthesized households + for geog_cat in geog_id.keys(): + households[geog_cat] = geog_id[geog_cat] + + # update the household_ids since we can't do it in the call to + # synthesize when we execute in parallel + households.index += hh_index_start + people.hh_id += hh_index_start + + hh_list.append(households) + people_list.append(people) + key = BlockGroupID( + geog_id['state'], geog_id['county'], geog_id['tract'], + geog_id['block group']) + fit_quality[key] = FitQuality(people_chisq, people_p) + + if len(households) > 0: + hh_index_start = households.index.values[-1] + 1 all_households = pd.concat(hh_list) all_persons = pd.concat(people_list, ignore_index=True) @@ -213,8 +210,7 @@ def synthesize_all(recipe, num_geogs=None, indexes=None, fit_quality = {} hh_index_start = 0 - # TODO will parallelization work here? - for geog_id in indexes: + for geog_id in tqdm(indexes, total=num_geogs): print("Synthesizing geog id:\n", geog_id) h_marg = recipe.get_household_marginal_for_geography(geog_id) From c353ef3df8680be147901a734a17c8a7300b6fb9 Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Wed, 10 Nov 2021 16:38:26 -0300 Subject: [PATCH 10/11] update --- synthpop/ipu/ipu.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synthpop/ipu/ipu.py b/synthpop/ipu/ipu.py index 4fdf10f..dc3a437 100644 --- a/synthpop/ipu/ipu.py +++ b/synthpop/ipu/ipu.py @@ -25,8 +25,8 @@ def for_each_col(col): nz = col.nonzero()[0] return col[nz], nz - for (col_idx, (col, nz)) in df.apply(for_each_col, axis=0, raw=True).items(): - yield (col_idx, col, nz) + for (col_idx, (col, nz)) in df.apply(for_each_col, axis=0, raw=False).items(): + yield (col_idx, col.values, nz) class _FrequencyAndConstraints(object): From 16360ce7232c688f1d25cd9891b37f9bdf807e0a Mon Sep 17 00:00:00 2001 From: msoltadeo Date: Thu, 11 Nov 2021 10:27:44 -0300 Subject: [PATCH 11/11] update --- synthpop/ipu/ipu.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synthpop/ipu/ipu.py b/synthpop/ipu/ipu.py index dc3a437..520b646 100644 --- a/synthpop/ipu/ipu.py +++ b/synthpop/ipu/ipu.py @@ -22,8 +22,8 @@ def _drop_zeros(df): """ def for_each_col(col): - nz = col.nonzero()[0] - return col[nz], nz + nz = col.values.nonzero()[0] + return col.iloc[nz], nz for (col_idx, (col, nz)) in df.apply(for_each_col, axis=0, raw=False).items(): yield (col_idx, col.values, nz)