Skip to content

Commit

Permalink
Add debug parameter.
Browse files Browse the repository at this point in the history
  • Loading branch information
tsalo committed Nov 28, 2023
1 parent c2b7e91 commit ad75844
Showing 1 changed file with 70 additions and 53 deletions.
123 changes: 70 additions & 53 deletions fLoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def draw_countdown(win, stim, duration):
countdown_sec -= 1


def draw_until_keypress(win, stim, continueKeys=["5"]):
def draw_until_keypress(win, stim, continueKeys=["5"], debug=False):
"""Draw a screen until a specific key is pressed.
Parameters
Expand All @@ -168,10 +168,17 @@ def draw_until_keypress(win, stim, continueKeys=["5"]):
continueKeys : :obj:`list` of :obj:`str`, optional
Keys to accept to stop drawing the stimulus.
Default is ["5"].
debug : :obj:`bool`
If True, then the screen will just wait 5 seconds and then continue.
Default is False.
"""
response = event.BuilderKeyResponse()
win.callOnFlip(response.clock.reset)
event.clearEvents(eventType="keyboard")
if debug:
time.wait(5)
return

while True:
if isinstance(stim, list):
for s in stim:
Expand Down Expand Up @@ -232,26 +239,27 @@ def draw(win, stim, duration, clock):
return response.keys, response.rt


def main(config):
def main(debug=False):
"""Run the fLoc task."""
# Ensure that relative paths start from the same directory as this script
try:
script_dir = os.path.dirname(os.path.abspath(__file__)).decode(
sys.getfilesystemencoding()
)
except AttributeError:
script_dir = os.path.dirname(os.path.abspath(__file__))

# Load configuration file
config_file = os.path.join(script_dir, "config.yml")
with open(config_file, "r") as fo:
config = load(fo, Loader=Loader)

constants = config["constants"]
trial_duration = constants["IMAGE_DURATION"] + constants["TARGET_ISI"]

# Collect user input
# ------------------
# Remember to turn fullscr to True for the real deal.
exp_info = {
"Subject": "",
"Session": "",
"Task": ["OneBack", "TwoBack", "Oddball"],
"Image Set": ["default", "alternate", "both"],
"Number of Runs": "4",
}
dlg = DlgFromDict(
exp_info,
title="Functional localizer",
order=["Subject", "Session", "Task", "Image Set", "Number of Runs"],
)
window = visual.Window(
fullscr=True,
size=(800, 600),
Expand All @@ -264,9 +272,31 @@ def main(config):
blendMode="avg",
useFBO=True,
)
if not dlg.OK:
# Quit if user presses "Cancel" or "Close"
core.quit()

if not debug:
exp_info = {
"Subject": "",
"Session": "",
"Task": ["OneBack", "TwoBack", "Oddball"],
"Image Set": ["default", "alternate", "both"],
"Number of Runs": "4",
}
dlg = DlgFromDict(
exp_info,
title="Functional localizer",
order=["Subject", "Session", "Task", "Image Set", "Number of Runs"],
)
if not dlg.OK:
# Quit if user presses "Cancel" or "Close"
core.quit()
else:
exp_info = {
"Subject": "01",
"Session": "01",
"Task": "Oddball",
"Image Set": "default",
"Number of Runs": "1",
}

output_dir = os.path.join(script_dir, "data")
os.makedirs(output_dir, exist_ok=True)
Expand Down Expand Up @@ -409,13 +439,13 @@ def main(config):
# We want to ensure that tasks are not assigned to baseline blocks
n_nonbaseline_blocks = int(constants["N_BLOCKS"] * (n_categories - 1) / n_categories)
n_dupes = int(np.ceil(n_nonbaseline_blocks / len(grabber_list)))
task_miniblocks = grabber_list * n_dupes
task_blocks = grabber_list * n_dupes

# Scanner runtime
# ---------------
global_clock = core.Clock() # to track the time since experiment started
run_clock = core.Clock() # to track time since each run starts (post scanner pulse)
miniblock_clock = core.Clock() # to track duration of each miniblock
block_clock = core.Clock() # to track duration of each block
trial_clock = core.Clock() # to track duration of each trial
fixation_trial_clock = core.Clock() # to account for fixation time spent loading image

Expand All @@ -435,15 +465,15 @@ def main(config):
run_label = i_run + 1
events_file = os.path.join(output_dir, f"{base_name}_run-{run_label:02d}_events.tsv")

miniblock_categories = randomize_carefully(standard_categories, n_blocks_per_category)
np.random.shuffle(task_miniblocks)
block_categories = randomize_carefully(standard_categories, n_blocks_per_category)
np.random.shuffle(task_blocks)

# Scanner runtime
# ---------------
# Wait for trigger from scanner to start run.
if i_run == 0:
# Show instructions for the first run until the scanner trigger
draw_until_keypress(win=window, stim=instruction_text_box)
draw_until_keypress(win=window, stim=instruction_text_box, debug=debug)
else:
# Show performance from the last run until the scanner trigger
hit_count = (run_frame["classification"] == "true_positive").sum()
Expand All @@ -460,7 +490,7 @@ def main(config):
)
performance_screen.setText(performance_str)
performance_screen.draw()
draw_until_keypress(win=window, stim=performance_screen)
draw_until_keypress(win=window, stim=performance_screen, debug=debug)

run_clock.reset()

Expand All @@ -482,8 +512,8 @@ def main(config):

run_response_times = []
nonbaseline_block_counter = 0
for j_miniblock, category in enumerate(miniblock_categories):
miniblock_clock.reset()
for j_block, category in enumerate(block_categories):
block_clock.reset()
if category == "baseline":
onset_time = run_clock.getTime()
responses, _ = draw(
Expand All @@ -499,25 +529,25 @@ def main(config):

# Log info
run_data["onset"].append(onset_time)
run_data["duration"].append(miniblock_clock.getTime())
run_data["duration"].append(block_clock.getTime())
run_data["trial_type"].append("baseline")
run_data["stim_file"].append("n/a")
run_data["category"].append("baseline")
run_data["subcategory"].append("baseline")
run_data["miniblock_number"].append(j_miniblock + 1)
run_data["miniblock_number"].append(j_block + 1)
else:
# Block of stimuli
miniblock_stimuli = list(
block_stimuli = list(
np.random.choice(
stimuli[category],
size=constants["N_STIMULI_PER_BLOCK"],
replace=False,
)
)
if task_miniblocks[nonbaseline_block_counter] == 1:
if task_blocks[nonbaseline_block_counter] == 1:
# Check for last block's target to make sure that two targets don't
# occur within the same response window
if (j_miniblock > 0) and (target_idx is not None):
if (j_block > 0) and (target_idx is not None):
last_target_onset = (
((constants["N_STIMULI_PER_BLOCK"] + 1) - target_idx)
* trial_duration
Expand All @@ -533,22 +563,22 @@ def main(config):
# Adjust stimuli based on task
if exp_info["Task"] == "Oddball":
# target is scrambled image
target_idx = np.random.randint(first_viable_trial, len(miniblock_stimuli))
miniblock_stimuli[target_idx] = np.random.choice(stimuli["scrambled"])
target_idx = np.random.randint(first_viable_trial, len(block_stimuli))
block_stimuli[target_idx] = np.random.choice(stimuli["scrambled"])
elif exp_info["Task"] == "OneBack":
# target is second stim of same kind
first_viable_trial = np.maximum(first_viable_trial, 1)
target_idx = np.random.randint(first_viable_trial, len(miniblock_stimuli))
miniblock_stimuli[target_idx] = miniblock_stimuli[target_idx - 1]
target_idx = np.random.randint(first_viable_trial, len(block_stimuli))
block_stimuli[target_idx] = block_stimuli[target_idx - 1]
elif exp_info["Task"] == "TwoBack":
# target is second stim of same kind
first_viable_trial = np.maximum(first_viable_trial, 2)
target_idx = np.random.randint(first_viable_trial, len(miniblock_stimuli))
miniblock_stimuli[target_idx] = miniblock_stimuli[target_idx - 2]
target_idx = np.random.randint(first_viable_trial, len(block_stimuli))
block_stimuli[target_idx] = block_stimuli[target_idx - 2]
else:
target_idx = None

for k_stim, stim_file in enumerate(miniblock_stimuli):
for k_stim, stim_file in enumerate(block_stimuli):
fixation_trial_clock.reset()
stim_image.image = stim_file
trial_clock.reset()
Expand Down Expand Up @@ -591,12 +621,12 @@ def main(config):
run_data["stim_file"].append(relative_stim_file)
run_data["category"].append(category)
run_data["subcategory"].append(subcategory)
run_data["miniblock_number"].append(j_miniblock + 1)
run_data["miniblock_number"].append(j_block + 1)

nonbaseline_block_counter += 1

# Unused duration
# miniblock_duration = miniblock_clock.getTime()
# block_duration = block_clock.getTime()

run_frame = pd.DataFrame(run_data)
run_frame = allocate_responses(
Expand Down Expand Up @@ -645,17 +675,4 @@ def main(config):


if __name__ == "__main__":
# Ensure that relative paths start from the same directory as this script
try:
script_dir = os.path.dirname(os.path.abspath(__file__)).decode(
sys.getfilesystemencoding()
)
except AttributeError:
script_dir = os.path.dirname(os.path.abspath(__file__))

# Load configuration file
config_file = os.path.join(script_dir, "config.yml")
with open(config_file, "r") as fo:
config = load(fo, Loader=Loader)

main(config)
main(debug=False)

0 comments on commit ad75844

Please sign in to comment.