Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix single_proc + add support for single_proc for multi-runs & sweeps #315

Merged
merged 7 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions cadCAD/configuration/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import pandas as pd
import numpy as np
import pandas as pd # type: ignore
from datetime import datetime, timedelta
from collections import Counter
from copy import deepcopy
from functools import reduce
from funcy import curry
from funcy import curry # type: ignore
from cadCAD.types import *
from typing import Union, Dict, List

from cadCAD.configuration.utils.depreciationHandler import sanitize_partial_state_updates
from cadCAD.utils import dict_filter, contains_type, flatten_tabulated_dict, tabulate_dict
Expand Down Expand Up @@ -161,27 +162,40 @@ def env_update(state_dict, sweep_dict, target_value):
curry(trigger)(end_substep)(trigger_field)(trigger_vals)(funct_list)


def config_sim(d):
def process_variables(d):
return flatten_tabulated_dict(tabulate_dict(d))
def config_sim(config_dict: ConfigurationDict):

if "N" in d:
if d["N"] <= 0:
if "N" in config_dict:
if config_dict["N"] <= 0:
raise ValueError("'N' must be > 0")
else:
pass
else:
raise KeyError("The 'sim_configs' dictionary must contain the key 'N'")

if "T" not in d:
raise KeyError("The 'sim_configs' dictionary must contain the key 'T'")
raise KeyError("The 'sim_configs' dictionary must contain the key 'N' (# of Monte Carlo Runs)")

if "M" in d:
M_lengths = len(list(set({key: len(value) for key, value in d["M"].items()}.values())))
if M_lengths > 2:
raise Exception('`M` values require up to a maximum of 2 distinct lengths')
return [{"N": d["N"], "T": d["T"], "M": M} for M in process_variables(d["M"])]
if "T" not in config_dict:
raise KeyError("The 'sim_configs' dictionary must contain the key 'T' (Timestep Iterator)")
else:
d["M"] = [{}]
return d
if "M" in config_dict:
params = config_dict['M']

param_values_length = {key: len(value) if type(value) == list else 0
for key, value in params.items()}
param_values_length_set = set(param_values_length.values())
distinct_param_value_lengths = len(param_values_length_set)

if distinct_param_value_lengths > 2:
raise Exception('When sweeping, `M` list lengths should either be 1 and/or equal. More than two distinct lengths are not allowed')
elif (distinct_param_value_lengths == 1) and (0 in param_values_length_set):
return config_dict
elif (1 in param_values_length_set):
return [{**config_dict, "M": M}
for M in flatten_tabulated_dict(tabulate_dict(params))]
else:
raise Exception('When sweeping, `M` list lengths should either be 1 and/or equal. ')

else:
config_dict["M"] = [{}]
return config_dict


def psub_list(psu_block, psu_steps):
Expand Down
77 changes: 51 additions & 26 deletions cadCAD/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts
from cadCAD.engine.simulation import Executor as SimExecutor
from cadCAD.engine.execution import single_proc_exec, parallelize_simulations, local_simulations
from cadCAD.types import *

VarDictType = Dict[str, List[Any]]
StatesListsType = List[Dict[str, Any]]
Expand All @@ -24,6 +25,17 @@ class ExecutionMode:
multi_proc = 'multi_proc'


def auto_mode_switcher(config_amt: int):
try:
if config_amt == 1:
return ExecutionMode.single_mode, single_proc_exec
elif (config_amt > 1):
return ExecutionMode.multi_mode, parallelize_simulations
except AttributeError:
if config_amt < 1:
raise ValueError('N must be >= 1!')


class ExecutionContext:
def __init__(self, context=ExecutionMode.local_mode, method=None, additional_objs=None) -> None:
self.name = context
Expand All @@ -39,15 +51,15 @@ def distroduce_proc(
ExpIDs,
SubsetIDs,
SubsetWindows,
configured_n, # exec_method,
configured_n, # exec_method,
sc, additional_objs=additional_objs
):
return method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs,
ExpIDs,
SubsetIDs,
SubsetWindows,
configured_n, # exec_method,
configured_n, # exec_method,
sc, additional_objs
)

Expand All @@ -56,8 +68,8 @@ def distroduce_proc(

class Executor:
def __init__(self,
exec_context: ExecutionContext, configs: List[Configuration], sc=None, empty_return=False
) -> None:
exec_context: ExecutionContext, configs: List[Configuration], sc=None, empty_return=False
) -> None:
self.sc = sc
self.SimExecutor = SimExecutor
self.exec_method = exec_context.method
Expand All @@ -70,7 +82,8 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]:
return [], [], []

config_proc = Processor()
create_tensor_field = TensorFieldReport(config_proc).create_tensor_field
create_tensor_field = TensorFieldReport(
config_proc).create_tensor_field

sessions = []
var_dict_list, states_lists = [], []
Expand Down Expand Up @@ -105,18 +118,30 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]:
var_dict_list.append(x.sim_config['M'])
states_lists.append([x.initial_state])
eps.append(list(x.exogenous_states.values()))
configs_structs.append(config_proc.generate_config(x.initial_state, x.partial_state_update_blocks, eps[config_idx]))
configs_structs.append(config_proc.generate_config(
x.initial_state, x.partial_state_update_blocks, eps[config_idx]))
env_processes_list.append(x.env_processes)
partial_state_updates.append(x.partial_state_update_blocks)
sim_executors.append(SimExecutor(x.policy_ops).simulation)

config_idx += 1

def get_final_dist_results(simulations, psus, eps, sessions):
tensor_fields = [create_tensor_field(psu, ep) for psu, ep in list(zip(psus, eps))]
remote_threshold = 100
config_amt = len(self.configs)

def get_final_dist_results(simulations: List[StateHistory],
psus: List[StateUpdateBlocks],
eps,
sessions: List[SessionDict]):
tensor_fields = [create_tensor_field(
psu, ep) for psu, ep in list(zip(psus, eps))]
return simulations, tensor_fields, sessions

def get_final_results(simulations, psus, eps, sessions, remote_threshold):
def get_final_results(simulations: List[StateHistory],
psus: List[StateUpdateBlocks],
eps,
sessions: List[SessionDict],
remote_threshold: int):
flat_timesteps, tensor_fields = [], []
for sim_result, psu, ep in list(zip(simulations, psus, eps)):
flat_timesteps.append(flatten(sim_result))
Expand All @@ -128,40 +153,40 @@ def get_final_results(simulations, psus, eps, sessions, remote_threshold):
elif config_amt > 1:
return flat_simulations, tensor_fields, sessions

remote_threshold = 100
config_amt = len(self.configs)

def auto_mode_switcher(config_amt):
try:
if config_amt == 1:
return ExecutionMode.single_mode, single_proc_exec
elif (config_amt > 1):
return ExecutionMode.multi_mode, parallelize_simulations
except AttributeError:
if config_amt < 1:
raise ValueError('N must be >= 1!')

final_result = None
original_N = len(configs_as_dicts(self.configs))
if self.exec_context != ExecutionMode.distributed:
# Consider Legacy Support
if self.exec_context != ExecutionMode.local_mode:
self.exec_context, self.exec_method = auto_mode_switcher(config_amt)
if self.exec_context == ExecutionMode.local_mode:
self.exec_context, self.exec_method = auto_mode_switcher(
config_amt)
elif self.exec_context == ExecutionMode.single_mode or self.exec_context == ExecutionMode.single_proc:
self.exec_context, self.exec_method = ExecutionMode.single_mode, single_proc_exec
elif self.exec_context == ExecutionMode.multi_mode or self.exec_context == ExecutionMode.multi_proc:
if config_amt == 1:
raise ValueError("Multi mode must have at least 2 configs")
else:
self.exec_context, self.exec_method = ExecutionMode.multi_mode, parallelize_simulations
else:
raise ValueError("Invalid execution mode specified")


print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs,
ExpIDs, SubsetIDs, SubsetWindows, original_N
)

final_result = get_final_results(simulations_results, partial_state_updates, eps, sessions, remote_threshold)
final_result = get_final_results(
simulations_results, partial_state_updates, eps, sessions, remote_threshold)
elif self.exec_context == ExecutionMode.distributed:
print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts,
SimIDs, RunIDs, ExpIDs, SubsetIDs, SubsetWindows, original_N, self.sc
)
final_result = get_final_dist_results(simulations_results, partial_state_updates, eps, sessions)
final_result = get_final_dist_results(
simulations_results, partial_state_updates, eps, sessions)

t2 = time()
print(f"Total execution time: {t2 - t1 :.2f}s")
Expand Down
91 changes: 48 additions & 43 deletions cadCAD/engine/execution.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Callable, Dict, List, Any, Tuple
from pathos.multiprocessing import ProcessPool as PPool
from pathos.multiprocessing import ProcessPool as PPool # type: ignore
from collections import Counter

from cadCAD.types import *
from cadCAD.utils import flatten

VarDictType = Dict[str, List[Any]]
Expand All @@ -11,46 +11,54 @@


def single_proc_exec(
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
SimIDs,
Ns: List[int],
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
states_lists: List[StateHistory],
configs_structs: List[StateUpdateBlocks],
env_processes_list: List[EnvProcesses],
Ts: List[TimeSeq],
SimIDs: List[SimulationID],
Ns: List[Run],
ExpIDs: List[int],
SubsetIDs,
SubsetWindows,
configured_n
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
):

# HACK for making it run with N_Runs=1
if type(var_dict_list) == list:
var_dict_list = var_dict_list[0]

print(f'Execution Mode: single_threaded')
params = [
raw_params: List[List] = [
simulation_execs, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, SubsetIDs, SubsetWindows
]
simulation_exec, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window = list(
map(lambda x: x.pop(), params)
map(lambda x: x.pop(), raw_params)
)
result = simulation_exec(
var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n
)
return flatten(result)





def parallelize_simulations(
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
SimIDs,
Ns: List[int],
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
states_lists: List[StateHistory],
configs_structs: List[StateUpdateBlocks],
env_processes_list: List[EnvProcesses],
Ts: List[TimeSeq],
SimIDs: List[SimulationID],
Ns: List[Run],
ExpIDs: List[int],
SubsetIDs,
SubsetWindows,
configured_n
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
):

print(f'Execution Mode: parallelized')
Expand Down Expand Up @@ -104,32 +112,29 @@ def process_executor(params):


def local_simulations(
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
SimIDs,
Ns: List[int],
ExpIDs: List[int],
SubsetIDs,
SubsetWindows,
configured_n
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
states_lists: List[StateHistory],
configs_structs: List[StateUpdateBlocks],
env_processes_list: List[EnvProcesses],
Ts: List[TimeSeq],
SimIDs: List[SimulationID],
Ns: List[Run],
ExpIDs: List[int],
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
):
config_amt = len(configs_structs)

_params = None
if config_amt == 1: # and configured_n != 1
_params = var_dict_list[0]
return single_proc_exec(
simulation_execs, _params, states_lists, configs_structs, env_processes_list,
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n
)
elif config_amt > 1: # and configured_n != 1
_params = var_dict_list
return parallelize_simulations(
simulation_execs, _params, states_lists, configs_structs, env_processes_list,
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n
)
# elif config_amt > 1 and configured_n == 1:
Loading