From 2540ede5d6718c20dbd65143205e81c9aabaaf1d Mon Sep 17 00:00:00 2001 From: David Hensle <51132108+dhensle@users.noreply.github.com> Date: Mon, 1 Apr 2024 10:00:45 -0700 Subject: [PATCH] BayDAG Contribution #14: Increasing Larch Loading for NMTF (#780) * decrease larch load time for nmtf * nmtf larch av arg --- .../estimation/larch/nonmand_tour_freq.py | 99 ++++++++++++++++++- 1 file changed, 98 insertions(+), 1 deletion(-) diff --git a/activitysim/estimation/larch/nonmand_tour_freq.py b/activitysim/estimation/larch/nonmand_tour_freq.py index 9dfdac73a..c0db830e2 100644 --- a/activitysim/estimation/larch/nonmand_tour_freq.py +++ b/activitysim/estimation/larch/nonmand_tour_freq.py @@ -7,6 +7,8 @@ from larch import DataFrames, Model from larch.log import logger_name from larch.util import Dict +import pickle +from datetime import datetime from .general import ( apply_coefficients, @@ -27,6 +29,7 @@ def interaction_simulate_data( coefficients_files="{segment_name}/{name}_coefficients_{segment_name}.csv", chooser_data_files="{segment_name}/{name}_choosers_combined.csv", alt_values_files="{segment_name}/{name}_interaction_expression_values.csv", + segment_subset=[], ): edb_directory = edb_directory.format(name=name) @@ -46,21 +49,30 @@ def _read_csv(filename, **kwargs): alt_values = {} segment_names = [s["NAME"] for s in settings["SPEC_SEGMENTS"]] + if len(segment_subset) > 0: + assert set(segment_subset).issubset( + set(segment_names) + ), f"{segment_subset} is not a subset of {segment_names}" + segment_names = segment_subset for segment_name in segment_names: + print(f"Loading EDB for {segment_name} segment") coefficients[segment_name] = _read_csv( coefficients_files.format(name=name, segment_name=segment_name), index_col="coefficient_name", + comment="#", ) chooser_data[segment_name] = _read_csv( chooser_data_files.format(name=name, segment_name=segment_name), ) alt_values[segment_name] = _read_csv( alt_values_files.format(name=name, segment_name=segment_name), + comment="#", ) spec = _read_csv( spec_file, + comment="#", ) spec = remove_apostrophes(spec, ["Label"]) # alt_names = list(spec.columns[3:]) @@ -118,10 +130,80 @@ def unavail(model, x_ca): return unav +# FIXME move all this to larch/general.py? see ActititySim issue #686 +def _read_feather(filename, name, edb_directory, **kwargs): + filename = filename.format(name=name) + return pd.read_feather(os.path.join(edb_directory, filename), **kwargs) + + +def _to_feather(df, filename, name, edb_directory, **kwargs): + filename = filename.format(name=name) + return df.to_feather(os.path.join(edb_directory, filename), **kwargs) + + +def _read_pickle(filename, name, edb_directory, **kwargs): + filename = filename.format(name=name) + return pd.read_pickle(os.path.join(edb_directory, filename), **kwargs) + + +def _to_pickle(df, filename, name, edb_directory, **kwargs): + filename = filename.format(name=name) + return df.to_pickle(os.path.join(edb_directory, filename), **kwargs) + + +def _file_exists(filename, name, edb_directory): + filename = filename.format(name=name) + return os.path.exists(os.path.join(edb_directory, filename)) + + +def get_x_ca_df(alt_values, name, edb_directory, num_chunks): + def split(a, n): + k, m = divmod(len(a), n) + return (a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)) + + # process x_ca with cv_to_ca with or without chunking + x_ca_pickle_file = "{name}_x_ca.pkl" + if num_chunks == 1: + x_ca = cv_to_ca(alt_values) + elif _file_exists(x_ca_pickle_file, name, edb_directory): + # if pickle file from previous x_ca processing exist, load it to save time + time_start = datetime.now() + x_ca = _read_pickle(x_ca_pickle_file, name, edb_directory) + print( + f"x_ca data loaded from {name}_x_ca.fea - time elapsed {(datetime.now() - time_start).total_seconds()}" + ) + else: + time_start = datetime.now() + # calculate num_chunks based on chunking_size (or max number of rows per chunk) + chunking_size = round(len(alt_values) / num_chunks, 3) + print( + f"Using {num_chunks} chunks results in chunk size of {chunking_size} (of {len(alt_values)} total rows)" + ) + all_chunk_ids = list(alt_values.index.get_level_values(0).unique()) + split_ids = list(split(all_chunk_ids, num_chunks)) + x_ca_list = [] + for i, chunk_ids in enumerate(split_ids): + alt_values_i = alt_values.loc[chunk_ids] + x_ca_i = cv_to_ca(alt_values_i) + x_ca_list.append(x_ca_i) + print( + f"\rx_ca_i compute done for chunk {i+1}/{num_chunks} - time elapsed {(datetime.now() - time_start).total_seconds()}" + ) + x_ca = pd.concat(x_ca_list, axis=0) + # save final x_ca result as pickle file to save time for future data loading + _to_pickle(x_ca, x_ca_pickle_file, name, edb_directory) + print( + f"x_ca compute done - time elapsed {(datetime.now() - time_start).total_seconds()}" + ) + return x_ca + + def nonmand_tour_freq_model( edb_directory="output/estimation_data_bundle/{name}/", return_data=False, condense_parameters=False, + segment_subset=[], + num_chunks=1, ): """ Prepare nonmandatory tour frequency models for estimation. @@ -141,10 +223,16 @@ def nonmand_tour_freq_model( data = interaction_simulate_data( name="non_mandatory_tour_frequency", edb_directory=edb_directory, + segment_subset=segment_subset, ) settings = data.settings segment_names = [s["NAME"] for s in settings["SPEC_SEGMENTS"]] + if len(segment_subset) > 0: + assert set(segment_subset).issubset( + set(segment_names) + ), f"{segment_subset} is not a subset of {segment_names}" + segment_names = segment_subset if condense_parameters: data.relabel_coef = link_same_value_coefficients( segment_names, data.coefficients, data.spec @@ -157,6 +245,7 @@ def nonmand_tour_freq_model( m = {} for segment_name in segment_names: + print(f"Creating larch model for {segment_name}") segment_model = m[segment_name] = Model() # One of the alternatives is coded as 0, so # we need to explicitly initialize the MNL nesting graph @@ -178,7 +267,15 @@ def nonmand_tour_freq_model( .set_index("person_id") .rename(columns={"TAZ": "HOMETAZ"}) ) - x_ca = cv_to_ca(alt_values[segment_name].set_index(["person_id", "variable"])) + print("\t performing cv to ca step") + # x_ca = cv_to_ca(alt_values[segment_name].set_index(["person_id", "variable"])) + x_ca = get_x_ca_df( + alt_values=alt_values[segment_name].set_index(["person_id", "variable"]), + name=segment_name, + edb_directory=edb_directory.format(name="non_mandatory_tour_frequency"), + num_chunks=num_chunks, + ) + d = DataFrames( co=x_co, ca=x_ca,