Skip to content

Commit

Permalink
BayDAG Contribution #14: Increasing Larch Loading for NMTF (ActivityS…
Browse files Browse the repository at this point in the history
…im#780)

* decrease larch load time for nmtf

* nmtf larch av arg
  • Loading branch information
dhensle authored Apr 1, 2024
1 parent 0a119ad commit 2540ede
Showing 1 changed file with 98 additions and 1 deletion.
99 changes: 98 additions & 1 deletion activitysim/estimation/larch/nonmand_tour_freq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from larch import DataFrames, Model
from larch.log import logger_name
from larch.util import Dict
import pickle
from datetime import datetime

from .general import (
apply_coefficients,
Expand All @@ -27,6 +29,7 @@ def interaction_simulate_data(
coefficients_files="{segment_name}/{name}_coefficients_{segment_name}.csv",
chooser_data_files="{segment_name}/{name}_choosers_combined.csv",
alt_values_files="{segment_name}/{name}_interaction_expression_values.csv",
segment_subset=[],
):
edb_directory = edb_directory.format(name=name)

Expand All @@ -46,21 +49,30 @@ def _read_csv(filename, **kwargs):
alt_values = {}

segment_names = [s["NAME"] for s in settings["SPEC_SEGMENTS"]]
if len(segment_subset) > 0:
assert set(segment_subset).issubset(
set(segment_names)
), f"{segment_subset} is not a subset of {segment_names}"
segment_names = segment_subset

for segment_name in segment_names:
print(f"Loading EDB for {segment_name} segment")
coefficients[segment_name] = _read_csv(
coefficients_files.format(name=name, segment_name=segment_name),
index_col="coefficient_name",
comment="#",
)
chooser_data[segment_name] = _read_csv(
chooser_data_files.format(name=name, segment_name=segment_name),
)
alt_values[segment_name] = _read_csv(
alt_values_files.format(name=name, segment_name=segment_name),
comment="#",
)

spec = _read_csv(
spec_file,
comment="#",
)
spec = remove_apostrophes(spec, ["Label"])
# alt_names = list(spec.columns[3:])
Expand Down Expand Up @@ -118,10 +130,80 @@ def unavail(model, x_ca):
return unav


# FIXME move all this to larch/general.py? see ActititySim issue #686
def _read_feather(filename, name, edb_directory, **kwargs):
filename = filename.format(name=name)
return pd.read_feather(os.path.join(edb_directory, filename), **kwargs)


def _to_feather(df, filename, name, edb_directory, **kwargs):
filename = filename.format(name=name)
return df.to_feather(os.path.join(edb_directory, filename), **kwargs)


def _read_pickle(filename, name, edb_directory, **kwargs):
filename = filename.format(name=name)
return pd.read_pickle(os.path.join(edb_directory, filename), **kwargs)


def _to_pickle(df, filename, name, edb_directory, **kwargs):
filename = filename.format(name=name)
return df.to_pickle(os.path.join(edb_directory, filename), **kwargs)


def _file_exists(filename, name, edb_directory):
filename = filename.format(name=name)
return os.path.exists(os.path.join(edb_directory, filename))


def get_x_ca_df(alt_values, name, edb_directory, num_chunks):
def split(a, n):
k, m = divmod(len(a), n)
return (a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n))

# process x_ca with cv_to_ca with or without chunking
x_ca_pickle_file = "{name}_x_ca.pkl"
if num_chunks == 1:
x_ca = cv_to_ca(alt_values)
elif _file_exists(x_ca_pickle_file, name, edb_directory):
# if pickle file from previous x_ca processing exist, load it to save time
time_start = datetime.now()
x_ca = _read_pickle(x_ca_pickle_file, name, edb_directory)
print(
f"x_ca data loaded from {name}_x_ca.fea - time elapsed {(datetime.now() - time_start).total_seconds()}"
)
else:
time_start = datetime.now()
# calculate num_chunks based on chunking_size (or max number of rows per chunk)
chunking_size = round(len(alt_values) / num_chunks, 3)
print(
f"Using {num_chunks} chunks results in chunk size of {chunking_size} (of {len(alt_values)} total rows)"
)
all_chunk_ids = list(alt_values.index.get_level_values(0).unique())
split_ids = list(split(all_chunk_ids, num_chunks))
x_ca_list = []
for i, chunk_ids in enumerate(split_ids):
alt_values_i = alt_values.loc[chunk_ids]
x_ca_i = cv_to_ca(alt_values_i)
x_ca_list.append(x_ca_i)
print(
f"\rx_ca_i compute done for chunk {i+1}/{num_chunks} - time elapsed {(datetime.now() - time_start).total_seconds()}"
)
x_ca = pd.concat(x_ca_list, axis=0)
# save final x_ca result as pickle file to save time for future data loading
_to_pickle(x_ca, x_ca_pickle_file, name, edb_directory)
print(
f"x_ca compute done - time elapsed {(datetime.now() - time_start).total_seconds()}"
)
return x_ca


def nonmand_tour_freq_model(
edb_directory="output/estimation_data_bundle/{name}/",
return_data=False,
condense_parameters=False,
segment_subset=[],
num_chunks=1,
):
"""
Prepare nonmandatory tour frequency models for estimation.
Expand All @@ -141,10 +223,16 @@ def nonmand_tour_freq_model(
data = interaction_simulate_data(
name="non_mandatory_tour_frequency",
edb_directory=edb_directory,
segment_subset=segment_subset,
)

settings = data.settings
segment_names = [s["NAME"] for s in settings["SPEC_SEGMENTS"]]
if len(segment_subset) > 0:
assert set(segment_subset).issubset(
set(segment_names)
), f"{segment_subset} is not a subset of {segment_names}"
segment_names = segment_subset
if condense_parameters:
data.relabel_coef = link_same_value_coefficients(
segment_names, data.coefficients, data.spec
Expand All @@ -157,6 +245,7 @@ def nonmand_tour_freq_model(

m = {}
for segment_name in segment_names:
print(f"Creating larch model for {segment_name}")
segment_model = m[segment_name] = Model()
# One of the alternatives is coded as 0, so
# we need to explicitly initialize the MNL nesting graph
Expand All @@ -178,7 +267,15 @@ def nonmand_tour_freq_model(
.set_index("person_id")
.rename(columns={"TAZ": "HOMETAZ"})
)
x_ca = cv_to_ca(alt_values[segment_name].set_index(["person_id", "variable"]))
print("\t performing cv to ca step")
# x_ca = cv_to_ca(alt_values[segment_name].set_index(["person_id", "variable"]))
x_ca = get_x_ca_df(
alt_values=alt_values[segment_name].set_index(["person_id", "variable"]),
name=segment_name,
edb_directory=edb_directory.format(name="non_mandatory_tour_frequency"),
num_chunks=num_chunks,
)

d = DataFrames(
co=x_co,
ca=x_ca,
Expand Down

0 comments on commit 2540ede

Please sign in to comment.