-
Notifications
You must be signed in to change notification settings - Fork 0
/
make_lags.py
executable file
·176 lines (138 loc) · 5.81 KB
/
make_lags.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python
"""
Compute stimulus orderings for the mnemonic similarity task, described in:
Stark SM, Stevenson R, Wu C, Rutledge S, & Stark CEL (2015). Stability of
age-related deficits in the mnemonic similarity task across task variations.
Behavioral Neuroscience 129(3), 257-268.
Written by Nate Vack <[email protected]> at the Center for Healthy Minds,
University of Wisconsin-Madison, for the mp2 study conducted by Dan Grupe
<[email protected]>. Modified by Dan Fitch <[email protected]>
Copyright 2019 Board of Regents of the University of Wisconsin System
Licensed under the MIT license.
TODO:
trial number
which trial is the pair of this stimulus
subject number + stim set => predictable order
"""
from __future__ import print_function, unicode_literals
import sys
import random
from itertools import count
import pandas as pd
import logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger()
# Change these constants to vary the generated data.
# We choose a lag by turning the ranges into
# lists, picking a random element from the list, and if we can't choose a lag
# of that length, removing it from the list.
# If a list is exhausted, we can't place any more lags of that length.
ZERO_LAGS = range(0, 1)
SHORT_LAGS = range(1, 10)
MED_LAGS = range(20, 81)
LONG_LAGS = range(120, 181)
LAG_COUNTS = [
(8, ZERO_LAGS),
(28, SHORT_LAGS),
(14, MED_LAGS),
(14, LONG_LAGS)
]
FOIL_TRIALS = 64
PAIR_TYPES = ['repeat', 'lure']
PAIR_COUNTS = [e[0] for e in LAG_COUNTS]
TOTAL_PAIR_TRIALS = sum(PAIR_COUNTS) * len(PAIR_TYPES) * 2 # two trials/pair
TOTAL_TRIALS = FOIL_TRIALS + TOTAL_PAIR_TRIALS
def main(task_filename, debug_filename=None):
trial_list = make_trial_list()
formatted = format_trial_list(trial_list)
trial_list.to_json(task_filename, orient="records")
logger.info('Saved task data to {}'.format(task_filename))
if debug_filename:
trial_list.to_csv(debug_filename, index=False)
logger.debug('Saved debug data to {}'.format(debug_filename))
def format_trial_list(trial_list):
output = pd.DataFrame(trial_list, columns=['stype', 'lag'])
return output
def make_trial_list():
logger.info('Generating {} total trials'.format(TOTAL_TRIALS))
pd.set_option('display.max_rows', TOTAL_TRIALS)
trial_list = pd.DataFrame(
index=range(TOTAL_TRIALS),
columns=['stim_bin_index', 'trial_type', 'repetition', 'lag'])
pair_type_counters = {pair_type: count(1) for pair_type in PAIR_TYPES}
for c, lag_range in LAG_COUNTS:
populate_lags(trial_list, c, lag_range, pair_type_counters)
fill_foils(trial_list)
trial_list['trial_number'] = range(1, len(trial_list) + 1)
return trial_list
def populate_lags(trial_list, lag_count, lag_range, pair_type_counters):
"""
Adds lag_count * pair_types * 2 elements to trial_list.
WARNING: Actively modifies trial_list.
raises a RuntimeError if it can't place a lag in the specified range.
"""
possible_lags = list(lag_range)
for i in range(lag_count):
for pair_type, counter in pair_type_counters.items():
stim_bin_index = next(counter)
logger.debug('Trying to place {} #{}'.format(
pair_type, stim_bin_index))
place_lagged_trials(
trial_list, possible_lags, pair_type, stim_bin_index)
def fill_foils(trial_list):
foil_stim_bin_indexs = list(range(1, FOIL_TRIALS + 1))
random.shuffle(foil_stim_bin_indexs)
ix = trial_list[trial_list.stim_bin_index.isnull()].index
trial_list.loc[ix, 'stim_bin_index'] = foil_stim_bin_indexs
trial_list.loc[ix, 'trial_type'] = 'foil'
trial_list.loc[ix, 'repetition'] = 'a'
def place_lagged_trials(trial_list, possible_lags, pair_type, stim_bin_index):
"""
Puts both parts of a pair of trials into trial_list. Raises a RuntimeError
if no possible_lags can fit in trial_list.
"""
while len(possible_lags) > 0:
lag = random.sample(possible_lags, 1)[0]
starts = potential_start_indexes(trial_list, lag)
if len(starts) == 0:
logger.debug('No room for pair with lag {}'.format(lag))
possible_lags.remove(lag)
continue
start_index = random.sample(starts, 1)[0]
end_index = start_index + lag + 1
logger.debug('Placed at {} and {}, lag {}'.format(
start_index, end_index, lag))
trial_list.loc[start_index].stim_bin_index = stim_bin_index
trial_list.loc[start_index].trial_type = pair_type
trial_list.loc[start_index].repetition = 'a'
trial_list.loc[start_index].lag = lag
trial_list.loc[end_index].stim_bin_index = stim_bin_index
trial_list.loc[end_index].trial_type = pair_type
trial_list.loc[end_index].repetition = 'b'
trial_list.loc[end_index].lag = lag
return
else:
logger.error(trial_list)
free_count = trial_list.stim_bin_index.isnull().sum()
raise RuntimeError(
'Out of possible lags: {} slots remain'.format(free_count))
def potential_start_indexes(trial_list, lag):
"""
Find all the indexes (i) of trial_list where trial_list[i] and
trial_list[i+lag+1] are blank. Returns an empty index if there are no
candidate indexes.
We can find this out by looking at the blank spaces in trial_list and
a copy of the trial list shifted up by lag+1 slots, and seeing where
both lists are blank.
"""
start_blanks = trial_list.stim_bin_index.isnull()
end_blanks = start_blanks.shift(-1 * (lag + 1))
matches = start_blanks & end_blanks
return list(trial_list.index[matches])
if __name__ == '__main__':
out_filename = sys.argv[1]
debug_filename = None
if len(sys.argv) > 2:
debug_filename = sys.argv[2]
logger.setLevel(logging.DEBUG)
main(out_filename, debug_filename)