Skip to content

Commit

Permalink
Merge branch 'develop' into data-type-op-pd-cat
Browse files Browse the repository at this point in the history
  • Loading branch information
i-am-sijia authored Dec 7, 2023
2 parents 0a34951 + 2ff39b8 commit 37e7ab5
Show file tree
Hide file tree
Showing 30 changed files with 177 additions and 106 deletions.
2 changes: 1 addition & 1 deletion activitysim/abm/models/parking_location_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def parking_location(
if "trip_period" not in trips_merged_df:
# TODO: resolve this to the skim time period index not the label, it will be faster
trips_merged_df["trip_period"] = network_los.skim_time_period_label(
trips_merged_df[proposed_trip_departure_period]
trips_merged_df[proposed_trip_departure_period], as_cat=True
)
model_settings["TRIP_DEPARTURE_PERIOD"] = "trip_period"

Expand Down
1 change: 0 additions & 1 deletion activitysim/abm/models/tour_mode_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import numpy as np
import pandas as pd
from orca import orca

from activitysim.abm.models.util import annotate, school_escort_tours_trips, trip
from activitysim.abm.models.util.mode import run_tour_mode_choice_simulate
Expand Down
12 changes: 5 additions & 7 deletions activitysim/abm/models/tour_od_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@ def tour_od_choice(
Parameters
----------
tours : orca.DataFrameWrapper
tours : pd.DataFrame
lazy-loaded tours table
persons : orca.DataFrameWrapper
persons : pd.DataFrame
lazy-loaded persons table
households : orca.DataFrameWrapper
households : pd.DataFrame
lazy-loaded households table
land_use : orca.DataFrameWrapper
land_use : pd.DataFrame
lazy-loaded land use data table
stop_frequency_alts : orca.DataFrameWrapper
lazy-loaded table of stop frequency alternatives, e.g. "1out2in"
network_los : orca._InjectableFuncWrapper
network_los : los.Network_LOS
lazy-loaded activitysim.los.Network_LOS object
chunk_size
simulation chunk size, set in main settings.yaml
Expand Down
2 changes: 1 addition & 1 deletion activitysim/abm/models/trip_mode_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def trip_mode_choice(
# setup skim keys
assert "trip_period" not in trips_merged
trips_merged["trip_period"] = network_los.skim_time_period_label(
trips_merged.depart
trips_merged.depart, as_cat=True
)

orig_col = "origin"
Expand Down
16 changes: 10 additions & 6 deletions activitysim/abm/models/util/logsums.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def compute_logsums(
# FIXME - are we ok with altering choosers (so caller doesn't have to set these)?
if (in_period_col is not None) and (out_period_col is not None):
choosers["in_period"] = network_los.skim_time_period_label(
choosers[in_period_col]
choosers[in_period_col], as_cat=True
)
choosers["out_period"] = network_los.skim_time_period_label(
choosers[out_period_col]
choosers[out_period_col], as_cat=True
)
elif ("in_period" not in choosers.columns) and (
"out_period" not in choosers.columns
Expand All @@ -92,17 +92,21 @@ def compute_logsums(
and tour_purpose in model_settings["OUT_PERIOD"]
):
choosers["in_period"] = network_los.skim_time_period_label(
model_settings["IN_PERIOD"][tour_purpose]
model_settings["IN_PERIOD"][tour_purpose],
as_cat=True,
broadcast_to=choosers.index,
)
choosers["out_period"] = network_los.skim_time_period_label(
model_settings["OUT_PERIOD"][tour_purpose]
model_settings["OUT_PERIOD"][tour_purpose],
as_cat=True,
broadcast_to=choosers.index,
)
else:
choosers["in_period"] = network_los.skim_time_period_label(
model_settings["IN_PERIOD"]
model_settings["IN_PERIOD"], as_cat=True, broadcast_to=choosers.index
)
choosers["out_period"] = network_los.skim_time_period_label(
model_settings["OUT_PERIOD"]
model_settings["OUT_PERIOD"], as_cat=True, broadcast_to=choosers.index
)
else:
logger.error("Choosers table already has columns 'in_period' and 'out_period'.")
Expand Down
11 changes: 4 additions & 7 deletions activitysim/abm/models/util/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,12 @@ def run_tour_mode_choice_simulate(
assert ("in_period" not in choosers) and ("out_period" not in choosers)
in_time = skims["in_time_col_name"]
out_time = skims["out_time_col_name"]
time_cat_type = pd.api.types.CategoricalDtype(
list(set(network_los.skim_time_periods["labels"])), ordered=False
)
choosers["in_period"] = network_los.skim_time_period_label(
choosers[in_time]
).astype(time_cat_type)
choosers[in_time], as_cat=True
)
choosers["out_period"] = network_los.skim_time_period_label(
choosers[out_time]
).astype(time_cat_type)
choosers[out_time], as_cat=True
)

expressions.annotate_preprocessors(
state, choosers, locals_dict, skims, model_settings, trace_label
Expand Down
1 change: 0 additions & 1 deletion activitysim/abm/models/util/tour_od.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import numpy as np
import pandas as pd
from orca import orca

from activitysim.abm.models.util import logsums as logsum
from activitysim.abm.models.util import trip
Expand Down
54 changes: 32 additions & 22 deletions activitysim/abm/models/util/vectorize_tour_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ def dedupe_alt_tdd(state: workflow.State, alt_tdd, tour_purpose, trace_label):

logger.info("tdd_alt_segments specified for representative logsums")

if tdd_segments is not None:
# apply categorical dtypes
tdd_segments["time_period"] = tdd_segments["time_period"].astype(
alt_tdd["out_period"].dtype
)

with chunk.chunk_log(
state, tracing.extend_trace_label(trace_label, "dedupe_alt_tdd")
) as chunk_sizer:
Expand Down Expand Up @@ -338,18 +344,11 @@ def compute_tour_scheduling_logsums(
assert "out_period" not in alt_tdd
assert "in_period" not in alt_tdd

# FIXME:MEMORY
# These two lines each generate a massive array of strings,
# using a bunch of RAM and slowing things down.
time_cat_type = pd.api.types.CategoricalDtype(
list(OrderedDict.fromkeys(network_los.skim_time_periods["labels"])),
ordered=True,
)
alt_tdd["out_period"] = network_los.skim_time_period_label(alt_tdd["start"]).astype(
time_cat_type
alt_tdd["out_period"] = network_los.skim_time_period_label(
alt_tdd["start"], as_cat=True
)
alt_tdd["in_period"] = network_los.skim_time_period_label(alt_tdd["end"]).astype(
time_cat_type
alt_tdd["in_period"] = network_los.skim_time_period_label(
alt_tdd["end"], as_cat=True
)

alt_tdd["duration"] = alt_tdd["end"] - alt_tdd["start"]
Expand Down Expand Up @@ -401,17 +400,28 @@ def compute_tour_scheduling_logsums(

# tracing.log_runtime(model_name=trace_label, start_time=t0)

# redupe - join the alt_tdd_period logsums to alt_tdd to get logsums for alt_tdd
logsums = (
pd.merge(
alt_tdd.reset_index(),
deduped_alt_tdds.reset_index(),
on=[index_name] + redupe_columns,
how="left",
)
.set_index(index_name)
.logsums
)
logsums = pd.Series(data=0, index=alt_tdd.index, dtype=np.float64)
left_on = [alt_tdd.index]
right_on = [deduped_alt_tdds.index]
for i in redupe_columns:
if (
alt_tdd[i].dtype == "category"
and alt_tdd[i].dtype.ordered
and alt_tdd[i].dtype == deduped_alt_tdds[i].dtype
):
left_on += [alt_tdd[i].cat.codes]
right_on += [deduped_alt_tdds[i].cat.codes]
else:
left_on += [alt_tdd[i].to_numpy()]
right_on += [deduped_alt_tdds[i].to_numpy()]

logsums.iloc[:] = pd.merge(
pd.DataFrame(index=alt_tdd.index),
deduped_alt_tdds.logsums,
left_on=left_on,
right_on=right_on,
how="left",
).logsums.to_numpy()
chunk_sizer.log_df(trace_label, "logsums", logsums)

del deduped_alt_tdds
Expand Down
13 changes: 6 additions & 7 deletions activitysim/abm/models/vehicle_allocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,12 @@ def vehicle_allocation(
Parameters
----------
state : workflow.State
persons : orca.DataFrameWrapper
households : orca.DataFrameWrapper
vehicles : orca.DataFrameWrapper
vehicles_merged : orca.DataFrameWrapper
tours : orca.DataFrameWrapper
tours_merged : orca.DataFrameWrapper
chunk_size : orca.injectable
persons : pd.DataFrame
households : pd.DataFrame
vehicles : pd.DataFrame
tours : pd.DataFrame
tours_merged : pd.DataFrame
network_los : los.Network_LOS
"""
trace_label = "vehicle_allocation"
model_settings_file_name = "vehicle_allocation.yaml"
Expand Down
8 changes: 4 additions & 4 deletions activitysim/abm/models/vehicle_type_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def iterate_vehicle_type_choice(
locals_dict : dict
additional variables available when writing expressions
estimator : Estimator object
chunk_size : orca.injectable
chunk_size : int
trace_label : str
Returns
Expand Down Expand Up @@ -549,9 +549,9 @@ def vehicle_type_choice(
Parameters
----------
persons : orca.DataFrameWrapper
households : orca.DataFrameWrapper
vehicles : orca.DataFrameWrapper
persons : pd.DataFrame
households : pd.DataFrame
vehicles : pd.DataFrame
vehicles_merged : DataFrame
"""
trace_label = "vehicle_type_choice"
Expand Down
12 changes: 8 additions & 4 deletions activitysim/abm/tables/landuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ def land_use(state: workflow.State):

sharrow_enabled = state.settings.sharrow
if sharrow_enabled:
err_msg = (
"a zero-based land_use index is required for sharrow,\n"
"try adding `recode_pipeline_columns: true` to your settings file."
)
# when using sharrow, the land use file must be organized (either in raw
# form or via recoding) so that the index is zero-based and contiguous
assert df.index.is_monotonic_increasing
assert df.index[0] == 0
assert df.index[-1] == len(df.index) - 1
assert df.index.dtype.kind == "i"
assert df.index.is_monotonic_increasing, err_msg
assert df.index[0] == 0, err_msg
assert df.index[-1] == len(df.index) - 1, err_msg
assert df.index.dtype.kind == "i", err_msg

# try to make life easy for everybody by keeping everything in canonical order
# but as long as coalesce_pipeline doesn't sort tables it coalesces, it might not stay in order
Expand Down
2 changes: 1 addition & 1 deletion activitysim/benchmarking/componentwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def run_component(state, component_name):
def teardown_component(state, component_name):
logger.info("teardown_component: %s", component_name)

# use the pipeline module to clear out all the orca tables, so
# use the pipeline module to clear out all the tables, so
# the next benchmark run has a clean slate.
# anything needed should be reloaded from the pipeline checkpoint file
pipeline_tables = state.registered_tables()
Expand Down
3 changes: 2 additions & 1 deletion activitysim/cli/benchmark.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import json
import os
import shutil
Expand Down Expand Up @@ -43,7 +45,6 @@
"pyyaml": [],
"pytables": [],
"toolz": [],
"orca": [],
"psutil": [],
"requests": [],
"numba": ["0.54"],
Expand Down
36 changes: 29 additions & 7 deletions activitysim/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def skims_mapping(
parking_col_name=None,
zone_layer=None,
primary_origin_col_name=None,
predigitized_time_periods=False,
):
logger.info("loading skims_mapping")
logger.info(f"- orig_col_name: {orig_col_name}")
Expand Down Expand Up @@ -337,6 +338,10 @@ def skims_mapping(
),
)
else:
if predigitized_time_periods:
time_rel = "_code ->"
else:
time_rel = " @"
return dict(
# TODO:SHARROW: organize dimensions.
odt_skims=skim_dataset,
Expand All @@ -347,16 +352,16 @@ def skims_mapping(
relationships=(
f"df._orig_col_name -> odt_skims.{odim}",
f"df._dest_col_name -> odt_skims.{ddim}",
"df.out_period @ odt_skims.time_period",
f"df.out_period{time_rel} odt_skims.time_period",
f"df._dest_col_name -> dot_skims.{odim}",
f"df._orig_col_name -> dot_skims.{ddim}",
"df.in_period @ dot_skims.time_period",
f"df.in_period{time_rel} dot_skims.time_period",
f"df._orig_col_name -> odr_skims.{odim}",
f"df._dest_col_name -> odr_skims.{ddim}",
"df.in_period @ odr_skims.time_period",
f"df.in_period{time_rel} odr_skims.time_period",
f"df._dest_col_name -> dor_skims.{odim}",
f"df._orig_col_name -> dor_skims.{ddim}",
"df.out_period @ dor_skims.time_period",
f"df.out_period{time_rel} dor_skims.time_period",
f"df._orig_col_name -> od_skims.{odim}",
f"df._dest_col_name -> od_skims.{ddim}",
),
Expand Down Expand Up @@ -525,6 +530,15 @@ def new_flow(

cache_dir = state.filesystem.get_sharrow_cache_dir()
logger.debug(f"flow.cache_dir: {cache_dir}")
predigitized_time_periods = False
if "out_period" in choosers and "in_period" in choosers:
if (
choosers["out_period"].dtype == "category"
and choosers["in_period"].dtype == "category"
):
choosers["out_period_code"] = choosers["out_period"].cat.codes
choosers["in_period_code"] = choosers["in_period"].cat.codes
predigitized_time_periods = True
skims_mapping_ = skims_mapping(
state,
orig_col_name,
Expand All @@ -534,6 +548,7 @@ def new_flow(
parking_col_name=parking_col_name,
zone_layer=zone_layer,
primary_origin_col_name=primary_origin_col_name,
predigitized_time_periods=predigitized_time_periods,
)
if size_term_mapping is None:
size_term_mapping = {}
Expand Down Expand Up @@ -774,6 +789,9 @@ def apply_flow(
it ever again, but having a reference to it available later can be useful
in debugging and tracing. Flows are cached and reused anyway, so it is
generally not important to delete this at any point to free resources.
tree : sharrow.DataTree
The tree data used to compute the flow result. It is seperate from the
flow to prevent it from being cached with the flow.
"""
if sh is None:
return None, None
Expand All @@ -800,7 +818,7 @@ def apply_flow(
logger.error(f"error in apply_flow: {err!s}")
if required:
raise
return None, None
return None, None, None
else:
raise
with logtime(f"{flow.name}.load", trace_label or ""):
Expand All @@ -822,7 +840,9 @@ def apply_flow(
logger.error(f"error in apply_flow: {err!s}")
if required:
raise
return None, flow
tree = flow.tree
flow.tree = None
return None, flow, tree
raise
except Exception as err:
logger.error(f"error in apply_flow: {err!s}")
Expand All @@ -833,4 +853,6 @@ def apply_flow(
# Detecting compilation activity when in production mode is a bug
# that should be investigated.
tracing.timing_notes.add(f"compiled:{flow.name}")
return flow_result, flow
tree = flow.tree
flow.tree = None
return flow_result, flow, tree
Loading

0 comments on commit 37e7ab5

Please sign in to comment.