From ad7584423b0310573c76021d62560d91639bdfff Mon Sep 17 00:00:00 2001 From: Taylor Salo Date: Tue, 28 Nov 2023 10:20:26 -0500 Subject: [PATCH] Add debug parameter. --- fLoc.py | 123 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 70 insertions(+), 53 deletions(-) diff --git a/fLoc.py b/fLoc.py index fc6dca2..aecf74e 100644 --- a/fLoc.py +++ b/fLoc.py @@ -156,7 +156,7 @@ def draw_countdown(win, stim, duration): countdown_sec -= 1 -def draw_until_keypress(win, stim, continueKeys=["5"]): +def draw_until_keypress(win, stim, continueKeys=["5"], debug=False): """Draw a screen until a specific key is pressed. Parameters @@ -168,10 +168,17 @@ def draw_until_keypress(win, stim, continueKeys=["5"]): continueKeys : :obj:`list` of :obj:`str`, optional Keys to accept to stop drawing the stimulus. Default is ["5"]. + debug : :obj:`bool` + If True, then the screen will just wait 5 seconds and then continue. + Default is False. """ response = event.BuilderKeyResponse() win.callOnFlip(response.clock.reset) event.clearEvents(eventType="keyboard") + if debug: + time.wait(5) + return + while True: if isinstance(stim, list): for s in stim: @@ -232,26 +239,27 @@ def draw(win, stim, duration, clock): return response.keys, response.rt -def main(config): +def main(debug=False): """Run the fLoc task.""" + # Ensure that relative paths start from the same directory as this script + try: + script_dir = os.path.dirname(os.path.abspath(__file__)).decode( + sys.getfilesystemencoding() + ) + except AttributeError: + script_dir = os.path.dirname(os.path.abspath(__file__)) + + # Load configuration file + config_file = os.path.join(script_dir, "config.yml") + with open(config_file, "r") as fo: + config = load(fo, Loader=Loader) + constants = config["constants"] trial_duration = constants["IMAGE_DURATION"] + constants["TARGET_ISI"] # Collect user input # ------------------ # Remember to turn fullscr to True for the real deal. - exp_info = { - "Subject": "", - "Session": "", - "Task": ["OneBack", "TwoBack", "Oddball"], - "Image Set": ["default", "alternate", "both"], - "Number of Runs": "4", - } - dlg = DlgFromDict( - exp_info, - title="Functional localizer", - order=["Subject", "Session", "Task", "Image Set", "Number of Runs"], - ) window = visual.Window( fullscr=True, size=(800, 600), @@ -264,9 +272,31 @@ def main(config): blendMode="avg", useFBO=True, ) - if not dlg.OK: - # Quit if user presses "Cancel" or "Close" - core.quit() + + if not debug: + exp_info = { + "Subject": "", + "Session": "", + "Task": ["OneBack", "TwoBack", "Oddball"], + "Image Set": ["default", "alternate", "both"], + "Number of Runs": "4", + } + dlg = DlgFromDict( + exp_info, + title="Functional localizer", + order=["Subject", "Session", "Task", "Image Set", "Number of Runs"], + ) + if not dlg.OK: + # Quit if user presses "Cancel" or "Close" + core.quit() + else: + exp_info = { + "Subject": "01", + "Session": "01", + "Task": "Oddball", + "Image Set": "default", + "Number of Runs": "1", + } output_dir = os.path.join(script_dir, "data") os.makedirs(output_dir, exist_ok=True) @@ -409,13 +439,13 @@ def main(config): # We want to ensure that tasks are not assigned to baseline blocks n_nonbaseline_blocks = int(constants["N_BLOCKS"] * (n_categories - 1) / n_categories) n_dupes = int(np.ceil(n_nonbaseline_blocks / len(grabber_list))) - task_miniblocks = grabber_list * n_dupes + task_blocks = grabber_list * n_dupes # Scanner runtime # --------------- global_clock = core.Clock() # to track the time since experiment started run_clock = core.Clock() # to track time since each run starts (post scanner pulse) - miniblock_clock = core.Clock() # to track duration of each miniblock + block_clock = core.Clock() # to track duration of each block trial_clock = core.Clock() # to track duration of each trial fixation_trial_clock = core.Clock() # to account for fixation time spent loading image @@ -435,15 +465,15 @@ def main(config): run_label = i_run + 1 events_file = os.path.join(output_dir, f"{base_name}_run-{run_label:02d}_events.tsv") - miniblock_categories = randomize_carefully(standard_categories, n_blocks_per_category) - np.random.shuffle(task_miniblocks) + block_categories = randomize_carefully(standard_categories, n_blocks_per_category) + np.random.shuffle(task_blocks) # Scanner runtime # --------------- # Wait for trigger from scanner to start run. if i_run == 0: # Show instructions for the first run until the scanner trigger - draw_until_keypress(win=window, stim=instruction_text_box) + draw_until_keypress(win=window, stim=instruction_text_box, debug=debug) else: # Show performance from the last run until the scanner trigger hit_count = (run_frame["classification"] == "true_positive").sum() @@ -460,7 +490,7 @@ def main(config): ) performance_screen.setText(performance_str) performance_screen.draw() - draw_until_keypress(win=window, stim=performance_screen) + draw_until_keypress(win=window, stim=performance_screen, debug=debug) run_clock.reset() @@ -482,8 +512,8 @@ def main(config): run_response_times = [] nonbaseline_block_counter = 0 - for j_miniblock, category in enumerate(miniblock_categories): - miniblock_clock.reset() + for j_block, category in enumerate(block_categories): + block_clock.reset() if category == "baseline": onset_time = run_clock.getTime() responses, _ = draw( @@ -499,25 +529,25 @@ def main(config): # Log info run_data["onset"].append(onset_time) - run_data["duration"].append(miniblock_clock.getTime()) + run_data["duration"].append(block_clock.getTime()) run_data["trial_type"].append("baseline") run_data["stim_file"].append("n/a") run_data["category"].append("baseline") run_data["subcategory"].append("baseline") - run_data["miniblock_number"].append(j_miniblock + 1) + run_data["miniblock_number"].append(j_block + 1) else: # Block of stimuli - miniblock_stimuli = list( + block_stimuli = list( np.random.choice( stimuli[category], size=constants["N_STIMULI_PER_BLOCK"], replace=False, ) ) - if task_miniblocks[nonbaseline_block_counter] == 1: + if task_blocks[nonbaseline_block_counter] == 1: # Check for last block's target to make sure that two targets don't # occur within the same response window - if (j_miniblock > 0) and (target_idx is not None): + if (j_block > 0) and (target_idx is not None): last_target_onset = ( ((constants["N_STIMULI_PER_BLOCK"] + 1) - target_idx) * trial_duration @@ -533,22 +563,22 @@ def main(config): # Adjust stimuli based on task if exp_info["Task"] == "Oddball": # target is scrambled image - target_idx = np.random.randint(first_viable_trial, len(miniblock_stimuli)) - miniblock_stimuli[target_idx] = np.random.choice(stimuli["scrambled"]) + target_idx = np.random.randint(first_viable_trial, len(block_stimuli)) + block_stimuli[target_idx] = np.random.choice(stimuli["scrambled"]) elif exp_info["Task"] == "OneBack": # target is second stim of same kind first_viable_trial = np.maximum(first_viable_trial, 1) - target_idx = np.random.randint(first_viable_trial, len(miniblock_stimuli)) - miniblock_stimuli[target_idx] = miniblock_stimuli[target_idx - 1] + target_idx = np.random.randint(first_viable_trial, len(block_stimuli)) + block_stimuli[target_idx] = block_stimuli[target_idx - 1] elif exp_info["Task"] == "TwoBack": # target is second stim of same kind first_viable_trial = np.maximum(first_viable_trial, 2) - target_idx = np.random.randint(first_viable_trial, len(miniblock_stimuli)) - miniblock_stimuli[target_idx] = miniblock_stimuli[target_idx - 2] + target_idx = np.random.randint(first_viable_trial, len(block_stimuli)) + block_stimuli[target_idx] = block_stimuli[target_idx - 2] else: target_idx = None - for k_stim, stim_file in enumerate(miniblock_stimuli): + for k_stim, stim_file in enumerate(block_stimuli): fixation_trial_clock.reset() stim_image.image = stim_file trial_clock.reset() @@ -591,12 +621,12 @@ def main(config): run_data["stim_file"].append(relative_stim_file) run_data["category"].append(category) run_data["subcategory"].append(subcategory) - run_data["miniblock_number"].append(j_miniblock + 1) + run_data["miniblock_number"].append(j_block + 1) nonbaseline_block_counter += 1 # Unused duration - # miniblock_duration = miniblock_clock.getTime() + # block_duration = block_clock.getTime() run_frame = pd.DataFrame(run_data) run_frame = allocate_responses( @@ -645,17 +675,4 @@ def main(config): if __name__ == "__main__": - # Ensure that relative paths start from the same directory as this script - try: - script_dir = os.path.dirname(os.path.abspath(__file__)).decode( - sys.getfilesystemencoding() - ) - except AttributeError: - script_dir = os.path.dirname(os.path.abspath(__file__)) - - # Load configuration file - config_file = os.path.join(script_dir, "config.yml") - with open(config_file, "r") as fo: - config = load(fo, Loader=Loader) - - main(config) + main(debug=False)