Skip to content

Commit

Permalink
Merge pull request #65 from UDST/parallel_sim
Browse files Browse the repository at this point in the history
parallel iterative lottery choices
  • Loading branch information
mxndrwgrdnr authored Apr 23, 2019
2 parents 5cf353e + 458bcd4 commit 54c936d
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 19 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# ChoiceModels change log
### 0.2.2dev0 (2019-04-23)

- adds a function `choicemodels.tools.parallel_lottery_choices()` to run iterative lottery choice batches in parallel rather than seqeuentially.

### 0.2.1 (2019-01-30)

Expand Down
2 changes: 1 addition & 1 deletion choicemodels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

from .mnl import MultinomialLogit, MultinomialLogitResults

version = __version__ = '0.2.1'
version = __version__ = '0.2.2dev0'
3 changes: 0 additions & 3 deletions choicemodels/mnl.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,18 +505,15 @@ def mnl_probs(data, beta, numalts):

# https://stats.stackexchange.com/questions/304758/softmax-overflow
utilities = utilities.subtract(utilities.max(0))

exponentiated_utility = utilities.exp(inplace=True)
if clamp:
exponentiated_utility.inftoval(1e20)
if clamp:
exponentiated_utility.clamptomin(1e-300)
sum_exponentiated_utility = exponentiated_utility.sum(axis=0)
probs = exponentiated_utility.divide_by_row(
sum_exponentiated_utility, inplace=True)
if clamp:
probs.nantoval(1e-300)
if clamp:
probs.clamptomin(1e-300)

logging.debug('finish: calculate MNL probabilities')
Expand Down
268 changes: 259 additions & 9 deletions choicemodels/tools/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"""
import numpy as np
import pandas as pd
from multiprocessing import Process, Manager, Array, cpu_count
from tqdm import tqdm
import warnings


def monte_carlo_choices(probabilities):
Expand Down Expand Up @@ -186,40 +189,287 @@ def iterative_lottery_choices(choosers, alternatives, mct_callable, probs_callab
if len(mct.to_frame()) == 0:
print("No valid alternatives for the remaining choosers")
break

probs = probs_callable(mct)
choices = pd.DataFrame(monte_carlo_choices(probs))

# join capacities and sizes
oid, aid = (mct.observation_id_col, mct.alternative_id_col)
c = choices.join(alts[capacity], on=aid).join(choosers[size], on=oid)
c.loc[:,'_cumsize'] = c.groupby(aid)[size].cumsum()

# save valid choices
c_valid = (c._cumsize <= c[capacity])
valid_choices = pd.concat([valid_choices, c[aid].loc[c_valid]])

print("Iteration {}: {} of {} valid choices".format(iter, len(valid_choices),
len_choosers))

# update choosers and alternatives
choosers = choosers.drop(c.loc[c_valid].index.values)
# print("{} remaining choosers".format(len(choosers)))

placed_capacity = c.loc[c_valid].groupby(aid)._cumsize.max()
alts.loc[:,capacity] = alts[capacity].subtract(placed_capacity, fill_value=0)

full = alts.loc[alts[capacity] == 0]
alts = alts.drop(full.index)
# print("{} remaining alternatives".format(len(alts)))

# retain original index names
valid_choices.index.name = choosers.index.name
valid_choices.name = alts.index.name

return valid_choices


def _parallel_lottery_choices_worker(
choosers, alternatives, choices_dict, chosen_alts,
mct_callable, probs_callable, alt_capacity=None,
chooser_size=None, proc_num=0, batch_size=0): # pragma: no cover

"""
Worker process called only by the parallel_lottery_choices() method.
Parameters
----------
choosers : pd.DataFrame
Table with one row for each chooser or choice scenario, with unique ID's in the
index field. Additional columns can contain fixed attributes of the choosers.
(Reserved column names: '_size'.)
alternatives : pd.DataFrame
Table with one row for each alternative, with unique ID's in the index field.
Additional columns can contain fixed attributes of the alternatives. (Reserved
column names: '_capacity'.)
choices_dict : multiprocessing.managers.SyncManager.dict
A dictionary array allocated from shared memory
chosen_alts : multiprocessing.Array
A ctypes array allocated from shared memory
mct_callable : callable
Callable that samples alternatives to generate a table of choice scenarios. It
should accept subsets of the choosers and alternatives tables and return a
choicemodels.tools.MergedChoiceTable.
probs_callable : callable
Callable that generates predicted probabilities for a table of choice scenarios.
It should accept a choicemodels.tools.MergedChoiceTable and return a pd.Series
with indexes matching the input.
alt_capacity : str, optional
Name of a column in the alternatives table that expresses the capacity of
alternatives. If not provided, each alternative is interpreted as accommodating a
single chooser.
chooser_size : str, optional
Name of a column in the choosers table that expresses the size of choosers.
Choosers might have varying sizes if the alternative capacities are amounts
rather than counts -- e.g. square footage or employment capacity. Chooser sizes
must be in the same units as alternative capacities. If not provided, each chooser
has a size of 1.
proc_num : int
Integer representing the sequential order in which the worker was spawned
batch_size : int
Integer representing the chooser batch size
"""

st_choice_idx = proc_num * batch_size
if alt_capacity is None:
alt_capacity = '_capacity'
if chooser_size is None:
chooser_size = '_size'
choosers.loc[:, chooser_size] = 1

capacity, size = (alt_capacity, chooser_size)

len_choosers = len(choosers)
alts_name = alternatives.index.name
valid_choices = pd.Series()
max_mct_size = 0

iter = 0
while (len(valid_choices) < len_choosers):
chosen_alts_list = list(chosen_alts.get_obj())
alternatives = alternatives[~alternatives.index.isin(chosen_alts_list)]
iter += 1

if alternatives['_capacity'].max() < choosers[size].min():
print("{} choosers cannot be allocated.".format(len(choosers)))
print("\nRemaining capacity on alternatives but "
"not enough to accodomodate choosers' sizes")
break

mct = mct_callable(choosers.sample(frac=1), alternatives)
mct_size = len(mct.to_frame())
max_mct_size = max(max_mct_size, mct_size)
if mct_size == 0:
# print("No valid alternatives for the remaining choosers")
break

probs = probs_callable(mct)
choices = pd.DataFrame(monte_carlo_choices(probs))

# join capacities and sizes
oid, aid = (mct.observation_id_col, mct.alternative_id_col)
c = choices.join(
alternatives[capacity], on=aid).join(
choosers[size], on=oid)

# when size==1, _cumsize counts the cumulative number of times
# each alternative appears. thus, below, c_valid creates a
# mask that retains just the choice of the chooser who chose
# their alternative first, provided that choice hasn't been
# made elsewhere as documented by the shared chosen_alts list
c.loc[:, '_cumsize'] = c.groupby(aid)[size].cumsum()

# the shared array of chosen alts must stay locked by the
# current worker between the time the worker converts it
# to a list to make sure the workers choices haven't been
# chosen already and the time the worker updates the array
with chosen_alts.get_lock():

chosen_alts_list = list(chosen_alts.get_obj())
c_valid = (c._cumsize <= c[capacity]) & (
~c[alts_name].isin(chosen_alts_list))
iter_valid_choices = c[aid].loc[c_valid]
if len(iter_valid_choices) == 0:
continue

num_valid_choices = len(iter_valid_choices.values)
chosen_alts[st_choice_idx:st_choice_idx + num_valid_choices] = \
iter_valid_choices.values

st_choice_idx += num_valid_choices
alternatives.drop(iter_valid_choices.values, inplace=True)

iter_valid_choices.index.name = choosers.index.name
iter_valid_choices.name = alternatives.index.name
choices_dict.update(iter_valid_choices.to_dict())
valid_choices = pd.concat([valid_choices, iter_valid_choices])

choosers = choosers.drop(c.loc[c_valid].index.values)
return


def parallel_lottery_choices(
choosers, alternatives, mct_callable, probs_callable,
alt_capacity=None, chooser_size=None, chooser_batch_size=None):
"""
A parallelized version of the iterative_lottery_choices method. Chooser
batches are processed in parallel rather than sequentially.
NOTE: In it's current form, this method is only supported for simulating
choices where every alternative has a capacity of 1.
Parameters
----------
choosers : pd.DataFrame
Table with one row for each chooser or choice scenario, with unique ID's in the
index field. Additional columns can contain fixed attributes of the choosers.
(Reserved column names: '_size'.)
alternatives : pd.DataFrame
Table with one row for each alternative, with unique ID's in the index field.
Additional columns can contain fixed attributes of the alternatives. (Reserved
column names: '_capacity'.)
mct_callable : callable
Callable that samples alternatives to generate a table of choice scenarios. It
should accept subsets of the choosers and alternatives tables and return a
choicemodels.tools.MergedChoiceTable.
probs_callable : callable
Callable that generates predicted probabilities for a table of choice scenarios.
It should accept a choicemodels.tools.MergedChoiceTable and return a pd.Series
with indexes matching the input.

alt_capacity : str, optional
Name of a column in the alternatives table that expresses the capacity of
alternatives. If not provided, each alternative is interpreted as accommodating a
single chooser.
chooser_size : str, optional
Name of a column in the choosers table that expresses the size of choosers.
Choosers might have varying sizes if the alternative capacities are amounts
rather than counts -- e.g. square footage or employment capacity. Chooser sizes
must be in the same units as alternative capacities. If not provided, each chooser
has a size of 1.
max_iter : int or None, optional
Maximum number of iterations. If None (default), the algorithm will iterate until
all choosers are matched or no alternatives remain.
chooser_batch_size : int or None, optional
Size of the batches for processing smaller groups of choosers one at a time. Useful
when the anticipated size of the merged choice tables (choosers X alternatives
X covariates) will be too large for python/pandas to handle.
Returns
-------
pd.Series
List of chosen alternative id's, indexed with the chooser (observation) id.
"""

choosers = choosers.copy()
alternatives = alternatives.copy()

if alt_capacity is None:
alt_capacity = '_capacity'
alternatives.loc[:, alt_capacity] = 1

if chooser_size is None:
chooser_size = '_size'
choosers.loc[:, chooser_size] = 1

if chooser_batch_size is None or chooser_batch_size > len(choosers):
obs_batches = [choosers.index.values]
else:
obs_batches = [
choosers.index.values[x:x + chooser_batch_size] for
x in range(0, len(choosers), chooser_batch_size)]
num_cpus = cpu_count()
num_batches = len(obs_batches)
if num_batches > num_cpus:
warnings.warn(
"The specified batch size yields more batches than there "
"are vCPU's for parallel processing on this computer "
"({0} vs. {1}). To optimize this code, choose a larger "
"batch size or consider using the iterative_lottery_choices() "
"method instead.".format(num_batches, num_cpus))

manager = Manager()
shared_choices_dict = manager.dict()
alternatives[alt_capacity] = 1
shared_chosen_alts = Array('i', len(choosers))
jobs = []
for b, batch in enumerate(obs_batches):
obs = choosers.loc[batch]
proc = Process(
target=_parallel_lottery_choices_worker,
args=(
obs, alternatives, shared_choices_dict, shared_chosen_alts,
mct_callable, probs_callable, alt_capacity, chooser_size,
b, chooser_batch_size)
)
proc.start()
jobs.append(proc)

for j, job in tqdm(enumerate(jobs), total=len(jobs)):
job.join()

choices_dict = shared_choices_dict._getvalue()

# convert choices dict to series with original index names
out_choices = pd.Series(choices_dict)
out_choices.index.name = choosers.index.name
out_choices.name = alternatives.index.name

return out_choices
2 changes: 1 addition & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ChoiceModels

ChoiceModels is a Python library for discrete choice modeling, with utilities for sampling, simulation, and other ancillary tasks. It's part of the `Urban Data Science Toolkit <https://docs.udst.org>`__ (UDST).

v0.2.1, released January 30, 2019
v0.2.2dev0, released April 23, 2019


Contents
Expand Down
10 changes: 9 additions & 1 deletion docs/source/simulation-utilities.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ ChoiceModels provides general-purpose tools for Monte Carlo simulation of choice

``iterative_lottery_choices()`` is for cases where the alternatives have limited capacitiesxs, requiring multiple passes to match choosers and alternatives. Effectively, choices are simulated sequentially, each time removing the chosen alternative or reducing its available capacity. (It's actually done in batches for better performance.)

``parallel_lottery_choices()`` works functionally the same as the above but the batches run in parallel rather than sequentially.

Independent choices
-------------------
Expand All @@ -17,4 +18,11 @@ Independent choices
Capacity-constrained choices
----------------------------

.. autofunction:: choicemodels.tools.iterative_lottery_choices

.. autofunction:: choicemodels.tools.iterative_lottery_choices


Parallelized capacity-constrained choices
----------------------------

.. autofunction:: choicemodels.tools.parallel_lottery_choices
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

setup(
name='choicemodels',
version='0.2.1',
description='Tools for discrete choice modeling',
version='0.2.2dev0',
description='Tools for discrete choice estimation',
long_description=long_description,
author='UDST',
url='https://github.com/udst/choicemodels',
Expand Down
Loading

0 comments on commit 54c936d

Please sign in to comment.