Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add switch for toggling deepcopy off #316

Merged
merged 10 commits into from
Dec 14, 2023
6 changes: 4 additions & 2 deletions cadCAD/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from time import time
from typing import Callable, Dict, List, Any, Tuple
from typing import Callable, Dict, List, Any, Tuple, Union

from cadCAD.utils import flatten
from cadCAD.utils.execution import print_exec_info
Expand Down Expand Up @@ -39,6 +39,7 @@ def auto_mode_switcher(config_amt: int):
class ExecutionContext:
def __init__(self, context=ExecutionMode.local_mode, method=None, additional_objs=None) -> None:
self.name = context
self.additional_objs = additional_objs
if context == 'local_proc':
self.method = local_simulations
elif context == 'single_proc':
Expand Down Expand Up @@ -74,6 +75,7 @@ def __init__(self,
self.SimExecutor = SimExecutor
self.exec_method = exec_context.method
self.exec_context = exec_context.name
self.additional_objs = exec_context.additional_objs
self.configs = configs
self.empty_return = empty_return

Expand Down Expand Up @@ -174,7 +176,7 @@ def get_final_results(simulations: List[StateHistory],
print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs,
ExpIDs, SubsetIDs, SubsetWindows, original_N
ExpIDs, SubsetIDs, SubsetWindows, original_N, self.additional_objs
)

final_result = get_final_results(
Expand Down
17 changes: 10 additions & 7 deletions cadCAD/engine/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def single_proc_exec(
ExpIDs: List[int],
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
configured_n: List[N_Runs],
additional_objs=None
):

# HACK for making it run with N_Runs=1
Expand All @@ -38,7 +39,7 @@ def single_proc_exec(
map(lambda x: x.pop(), raw_params)
)
result = simulation_exec(
var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n
var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n, additional_objs
)
return flatten(result)

Expand All @@ -58,7 +59,8 @@ def parallelize_simulations(
ExpIDs: List[int],
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
configured_n: List[N_Runs],
additional_objs=None
):

print(f'Execution Mode: parallelized')
Expand Down Expand Up @@ -96,7 +98,7 @@ def process_executor(params):
if len_configs_structs > 1:
pp = PPool(processes=len_configs_structs)
results = pp.map(
lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n), params
lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n, additional_objs), params
)
pp.close()
pp.join()
Expand All @@ -123,18 +125,19 @@ def local_simulations(
ExpIDs: List[int],
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
configured_n: List[N_Runs],
additional_objs=None
):
config_amt = len(configs_structs)

if config_amt == 1: # and configured_n != 1
return single_proc_exec(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs
)
elif config_amt > 1: # and configured_n != 1
return parallelize_simulations(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs
)
# elif config_amt > 1 and configured_n == 1:
50 changes: 31 additions & 19 deletions cadCAD/engine/simulation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import Any, Callable, Dict, List, Tuple
from copy import deepcopy
from types import MappingProxyType
from functools import reduce
from funcy import curry
from funcy import curry # type: ignore

from cadCAD.utils import flatten
from cadCAD.engine.utils import engine_exception
from cadCAD.types import *

id_exception: Callable = curry(engine_exception)(KeyError)(KeyError)(None)

Expand Down Expand Up @@ -102,20 +104,30 @@ def env_composition(target_field, state_dict, target_value):
# mech_step
def partial_state_update(
self,
sweep_dict: Dict[str, List[Any]],
sub_step: int,
sL,
sH,
state_funcs: List[Callable],
policy_funcs: List[Callable],
env_processes: Dict[str, Callable],
sweep_dict: Parameters,
sub_step: Substep,
sL: list[State],
sH: StateHistory,
state_funcs: List[StateUpdateFunction],
policy_funcs: List[PolicyFunction],
env_processes: EnvProcesses,
time_step: int,
run: int,
additional_objs
) -> List[Dict[str, Any]]:

# last_in_obj: Dict[str, Any] = MappingProxyType(sL[-1])
last_in_obj: Dict[str, Any] = deepcopy(sL[-1])
if type(additional_objs) == dict:
if additional_objs.get('deepcopy_off', False) == True:
last_in_obj = MappingProxyType(sL[-1])
if len(additional_objs) == 1:
additional_objs = None
# XXX: drop the additional objects if only used for deepcopy
# toggling.
else:
last_in_obj = deepcopy(sL[-1])
else:
last_in_obj = deepcopy(sL[-1])

_input: Dict[str, Any] = self.policy_update_exception(
self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs, additional_objs)
)
Expand Down Expand Up @@ -206,18 +218,18 @@ def run_pipeline(

def simulation(
self,
sweep_dict: Dict[str, List[Any]],
states_list: List[Dict[str, Any]],
sweep_dict: SweepableParameters,
states_list: StateHistory,
configs,
env_processes: Dict[str, Callable],
time_seq: range,
simulation_id: int,
env_processes: EnvProcesses,
time_seq: TimeSeq,
simulation_id: SimulationID,
run: int,
subset_id,
subset_window,
configured_N,
subset_id: SubsetID,
subset_window: SubsetWindow,
configured_N: int,
# remote_ind
additional_objs=None
additional_objs: Union[None, Dict]=None
):
run += 1
subset_window.appendleft(subset_id)
Expand Down
4 changes: 3 additions & 1 deletion cadCAD/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ class ConfigurationDict(TypedDict):
M: Union[Parameters, SweepableParameters] # Parameters / List of Parameter to Sweep


EnvProcesses = object
TargetValue = object
EnvProcess: Callable[[State, SweepableParameters, TargetValue], TargetValue]
EnvProcesses = dict[str, Callable]
TimeSeq = Iterator
SimulationID = int
Run = int
Expand Down
187 changes: 187 additions & 0 deletions testing/test_additional_objs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
from typing import Dict, List
from cadCAD.engine import Executor, ExecutionContext, ExecutionMode
from cadCAD.configuration import Experiment
from cadCAD.configuration.utils import env_trigger, var_substep_trigger, config_sim, psub_list
from cadCAD.types import *
import pandas as pd # type: ignore
import types
import inspect
import pytest

def describe_or_return(v: object) -> object:
"""
Thanks @LinuxIsCool!
"""
if isinstance(v, types.FunctionType):
return f'function: {v.__name__}'
elif isinstance(v, types.LambdaType) and v.__name__ == '<lambda>':
return f'lambda: {inspect.signature(v)}'
else:
return v


def select_M_dict(M_dict: Dict[str, object], keys: set) -> Dict[str, object]:
"""
Thanks @LinuxIsCool!
"""
return {k: describe_or_return(v) for k, v in M_dict.items() if k in keys}


def select_config_M_dict(configs: list, i: int, keys: set) -> Dict[str, object]:
return select_M_dict(configs[i].sim_config['M'], keys)


def drop_substeps(_df):
first_ind = (_df.substep == 0) & (_df.timestep == 0)
last_ind = _df.substep == max(_df.substep)
inds_to_drop = first_ind | last_ind
return _df.copy().loc[inds_to_drop].drop(columns=['substep'])


def assign_params(_df: pd.DataFrame, configs) -> pd.DataFrame:
"""
Based on `cadCAD-tools` package codebase, by @danlessa
"""
M_dict = configs[0].sim_config['M']
params_set = set(M_dict.keys())
selected_params = params_set

# Attribute parameters to each row
# 1. Assign the parameter set from the first row first, so that
# columns are created
first_param_dict = select_config_M_dict(configs, 0, selected_params)

# 2. Attribute parameter on an (simulation, subset, run) basis
df = _df.assign(**first_param_dict).copy()
for i, (_, subset_df) in enumerate(df.groupby(['simulation', 'subset', 'run'])):
df.loc[subset_df.index] = subset_df.assign(**select_config_M_dict(configs,
i,
selected_params))
return df




SWEEP_PARAMS: Dict[str, List] = {
'alpha': [1],
'beta': [lambda x: 2 * x, lambda x: x],
'gamma': [3, 4],
'omega': [7]
}

SINGLE_PARAMS: Dict[str, object] = {
'alpha': 1,
'beta': lambda x: x,
'gamma': 3,
'omega': 5
}


def create_experiment(N_RUNS=2, N_TIMESTEPS=3, params: dict=SWEEP_PARAMS):
psu_steps = ['m1', 'm2', 'm3']
system_substeps = len(psu_steps)
var_timestep_trigger = var_substep_trigger([0, system_substeps])
env_timestep_trigger = env_trigger(system_substeps)
env_process = {}


# ['s1', 's2', 's3', 's4']
# Policies per Mechanism
def gamma(params: Parameters, substep: Substep, history: StateHistory, state: State, **kwargs):
return {'gamma': params['gamma']}


def omega(params: Parameters, substep: Substep, history: StateHistory, state: State, **kwarg):
return {'omega': params['omega']}


# Internal States per Mechanism
def alpha(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'alpha_var', params['alpha']


def beta(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'beta_var', params['beta']

def gamma_var(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'gamma_var', params['gamma']

def omega_var(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'omega_var', params['omega']


def policies(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'policies', _input


def sweeped(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'sweeped', {'beta': params['beta'], 'gamma': params['gamma']}

psu_block: dict = {k: {"policies": {}, "states": {}} for k in psu_steps}
for m in psu_steps:
psu_block[m]['policies']['gamma'] = gamma
psu_block[m]['policies']['omega'] = omega
psu_block[m]["states"]['alpha_var'] = alpha
psu_block[m]["states"]['beta_var'] = beta
psu_block[m]["states"]['gamma_var'] = gamma_var
psu_block[m]["states"]['omega_var'] = omega_var
psu_block[m]['states']['policies'] = policies
psu_block[m]["states"]['sweeped'] = var_timestep_trigger(y='sweeped', f=sweeped)


# Genesis States
genesis_states = {
'alpha_var': 0,
'beta_var': 0,
'gamma_var': 0,
'omega_var': 0,
'policies': {},
'sweeped': {}
}

# Environment Process
env_process['sweeped'] = env_timestep_trigger(trigger_field='timestep', trigger_vals=[5], funct_list=[lambda _g, x: _g['beta']])

sim_config = config_sim(
{
"N": N_RUNS,
"T": range(N_TIMESTEPS),
"M": params, # Optional
}
)

# New Convention
partial_state_update_blocks = psub_list(psu_block, psu_steps)

exp = Experiment()
exp.append_model(
sim_configs=sim_config,
initial_state=genesis_states,
env_processes=env_process,
partial_state_update_blocks=partial_state_update_blocks
)
return exp


def test_deepcopy_off():
exp = create_experiment()
mode = ExecutionMode().local_mode
exec_context = ExecutionContext(mode, additional_objs={'deepcopy_off': True})
executor = Executor(exec_context=exec_context, configs=exp.configs)
(records, tensor_field, _) = executor.execute()
df = drop_substeps(assign_params(pd.DataFrame(records), exp.configs))

# XXX: parameters should always be of the same type. Else, the test will fail
first_sim_config = exp.configs[0].sim_config['M']


for (i, row) in df.iterrows():
if row.timestep > 0:

assert row['alpha_var'] == row['alpha']
assert type(row['alpha_var']) == type(first_sim_config['alpha'])
assert row['gamma_var'] == row['gamma']
assert type(row['gamma_var']) == type(first_sim_config['gamma'])
assert row['omega_var'] == row['omega']
assert type(row['omega_var']) == type(first_sim_config['omega'])

Loading