From fa4fff51b47e21f0f46c1e2309c29f22638148e5 Mon Sep 17 00:00:00 2001 From: mathdugre Date: Fri, 24 Sep 2021 11:36:55 -0400 Subject: [PATCH 1/5] [ADD] `requirements.txt` file. --- requirements.txt | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..118a54c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +matplotlib +numpy +PyYAML +requests From eb076e0c4e4c645aa083404aa2f36bfe55333d7b Mon Sep 17 00:00:00 2001 From: mathdugre Date: Fri, 24 Sep 2021 11:42:47 -0400 Subject: [PATCH 2/5] [REACTOR] Remove unused import and fix style to PEP 8. --- NeuroCI.py | 24 +++++++++++++++--------- analysesVisualizations.py | 20 +++++++++++--------- cacheOps.py | 19 +++++++++++-------- cbrainAPI.py | 6 ++---- 4 files changed, 39 insertions(+), 30 deletions(-) diff --git a/NeuroCI.py b/NeuroCI.py index f8973dc..7834b3b 100644 --- a/NeuroCI.py +++ b/NeuroCI.py @@ -1,15 +1,21 @@ -import requests -import yaml -import json +import datetime import sys -import os -#from github import Github -from ast import literal_eval import time -import datetime -from cbrainAPI import * -from cacheOps import * +import yaml + +from cacheOps import ( + download_cache, + populate_cache_filenames, + update_statuses, + pipeline_manager, + populate_results +) +from cbrainAPI import ( + cbrain_login, + cbrain_logout, + cbrain_get_all_tasks +) ################################################################################## diff --git a/analysesVisualizations.py b/analysesVisualizations.py index 63402b7..261c43d 100644 --- a/analysesVisualizations.py +++ b/analysesVisualizations.py @@ -1,15 +1,17 @@ -import requests +import csv import json -import yaml -from csv import reader -import numpy as np +import sys + import matplotlib.pyplot as plt +import numpy as np from numpy.polynomial.polynomial import polyfit -import sys -import os +import yaml -from cbrainAPI import * -from cacheOps import * +from cbrainAPI import ( + cbrain_login, + cbrain_logout, + cbrain_download_file +) ########################################################################################################################### #General functions @@ -60,7 +62,7 @@ def preventAD_get_labels_from_filename(filename): def preventAD_get_measure_from_csv(subject, visit, data_file): with open(data_file, 'r') as read_obj: - csv_reader = reader(read_obj) + csv_reader = csv.reader(read_obj) for row in csv_reader: if row[1] == subject and row[2]==visit: return row[19] #change this to get a different column in CSV diff --git a/cacheOps.py b/cacheOps.py index 92774f8..3d1299f 100644 --- a/cacheOps.py +++ b/cacheOps.py @@ -1,13 +1,16 @@ -import requests -import yaml -import json -import sys -import os -import csv -from github import Github from ast import literal_eval +import csv +import json + +import requests -from cbrainAPI import * +from cbrainAPI import ( + cbrain_list_data_provider, + cbrain_post_task, + cbrain_get_task_info_from_list, + cbrain_download_text, + cbrain_sync_file +) ############################################# diff --git a/cbrainAPI.py b/cbrainAPI.py index 5f4d5e7..9dd8e5a 100644 --- a/cbrainAPI.py +++ b/cbrainAPI.py @@ -1,8 +1,6 @@ -import requests import json -import sys -import os -import csv + +import requests ################################################################################## From 6938b9294a69c19931871981a93bf93f29a58176 Mon Sep 17 00:00:00 2001 From: mathdugre Date: Fri, 24 Sep 2021 11:45:18 -0400 Subject: [PATCH 3/5] [REMOVE] Commented code. --- NeuroCI.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/NeuroCI.py b/NeuroCI.py index 7834b3b..512a9a4 100644 --- a/NeuroCI.py +++ b/NeuroCI.py @@ -46,13 +46,7 @@ def main(cbrain_token, CCI_token, experiment_definition, cbrain_ids, latest_arti populate_results(dataset + '.json', cbrain_token) print('Populated results for ' + dataset + '.json') - #extract_results() - #analysis(expdef[script]) - - #start = time.time() - #update_statuses(dataset + '.json', cbrain_token) - #end = time.time() - #print('Updated statuses in cache for: ' + dataset + '.json in' + str(datetime.timedelta(seconds=(end - start)))) + ################################################################################## From 61f2de927c57280f0461c4bba5f548ec8ae1dc8d Mon Sep 17 00:00:00 2001 From: mathdugre Date: Fri, 24 Sep 2021 12:15:23 -0400 Subject: [PATCH 4/5] [REFACTOR] Move the docstring within the function body. --- analysesVisualizations.py | 5 ++--- cacheOps.py | 23 ++++++++++++++--------- cbrainAPI.py | 31 ++++++++++++++++--------------- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/analysesVisualizations.py b/analysesVisualizations.py index 261c43d..2caf4ec 100644 --- a/analysesVisualizations.py +++ b/analysesVisualizations.py @@ -16,8 +16,8 @@ ########################################################################################################################### #General functions -'''Generates a simple boxplot, not used for now.''' def boxplot(volume_list, pipeline_name, dataset_name): + """Generates a simple boxplot, not used for now.""" data = np.array(volume_list).astype(np.float) fig1, ax1 = plt.subplots() ax1.set_title('Left Hippocampal Volumes (mm3)') @@ -26,9 +26,8 @@ def boxplot(volume_list, pipeline_name, dataset_name): plt.savefig('./artifacts/' + dataset_name + '_' + pipeline_name + '_box' + '.png') # Saves in artifact directory #plt.show() -'''Scatter plot and line of best fit''' def corrplot(volume_list, hearing_loss_list, pipeline_name, dataset_name): - + """Scatter plot and line of best fit.""" new_hl_list = [] new_vol_list = [] index = 0 diff --git a/cacheOps.py b/cacheOps.py index 3d1299f..5519ab2 100644 --- a/cacheOps.py +++ b/cacheOps.py @@ -14,8 +14,8 @@ ############################################# -'''Downloads newest cache file to json, or if it's not found in the circleCI artifacts, creates a new cache file''' def download_cache(cache_file, CCI_token, latest_artifacts_url): + """Downloads newest cache file to json, or if it's not found in the circleCI artifacts, creates a new cache file.""" headers = {'Circle-Token': CCI_token} response = requests.get(str(latest_artifacts_url), headers=headers) #finds the link to the cache file amongst all the artifacts @@ -44,8 +44,8 @@ def download_cache(cache_file, CCI_token, latest_artifacts_url): print('written cache to temp file') -'''Creates a template for a cache entry (cbrain data provider file), for a specific pipeline. Provides a userfile ID as a starting point for task computations''' def generate_cache_subject(nifti_file, cbrain_userfile_ID, pipeline, experiment_definition): + """Creates a template for a cache entry (cbrain data provider file), for a specific pipeline. Provides a userfile ID as a starting point for task computations.""" data = { nifti_file: { pipeline: {}}} @@ -81,8 +81,9 @@ def generate_cache_subject(nifti_file, cbrain_userfile_ID, pipeline, experiment_ return data -'''Generates the template for every file in a cache, for a specific pipeline''' def populate_cache_filenames(cache_file, cbrain_token, blocklist, pipeline, data_provider_id, experiment_definition): + """Generates the template for every file in a cache, for a specific pipeline.""" + filelist = [] data_provider_browse = cbrain_list_data_provider(str(data_provider_id), cbrain_token) #Query CBRAIN to list all files in data provider. @@ -114,8 +115,9 @@ def populate_cache_filenames(cache_file, cbrain_token, blocklist, pipeline, data -'''Updates a cache file with the newest task statuses from CBRAIN''' def update_statuses(cache_filename, task_list): + """Updates a cache file with the newest task statuses from CBRAIN.""" + with open(cache_filename, "r+") as cache_file: data = json.load(cache_file) @@ -155,8 +157,8 @@ def update_statuses(cache_filename, task_list): cache_file.truncate() -'''Iterates over each component in a pipeline, organizes, and feeds the necessary data to the functions which post tasks on CBRAIN and update the caches''' def pipeline_manager(cbrain_token, experiment_definition, cbrain_ids, pipeline, dataset): + """Iterates over each component in a pipeline, organizes, and feeds the necessary data to the functions which post tasks on CBRAIN and update the caches.""" component_number = 0 #Keeps track of the order of the component (we need to flag the first one) @@ -178,8 +180,8 @@ def pipeline_manager(cbrain_token, experiment_definition, cbrain_ids, pipeline, component_number = component_number + 1 -'''Handles the cache writing for the first task in a pipeline, and calls to post the task to CBRAIN''' def first_task_handler(cbrain_token, parameter_dictionary, tool_config_id, cache_file, pipeline_component, pipeline_name): + """Handles the cache writing for the first task in a pipeline, and calls to post the task to CBRAIN.""" with open(cache_file, "r+") as file: data = json.load(file) @@ -203,8 +205,8 @@ def first_task_handler(cbrain_token, parameter_dictionary, tool_config_id, cache file.truncate() -'''Handles the cache writing and task posting for any pipeline component except the first task''' def nth_task_handler(cbrain_token, parameter_dictionary, tool_config_id, cache_file, pipeline_component, previous_pipeline_component, pipeline_name): + """Handles the cache writing and task posting for any pipeline component except the first task.""" with open(cache_file, "r+") as file: data = json.load(file) @@ -228,8 +230,8 @@ def nth_task_handler(cbrain_token, parameter_dictionary, tool_config_id, cache_f json.dump(data, file, indent=2) file.truncate() -'''Resubmits a task, and sets all subsequent pipeline component dependencies to null in the cache''' def task_resubmission_handler(cbrain_token, parameter_dictionary, tool_config_id, cache_file, pipeline_component, pipeline_name, rerun_ID_list): + """Resubmits a task, and sets all subsequent pipeline component dependencies to null in the cache.""" with open(cache_file, "r+") as file: data = json.load(file) @@ -279,8 +281,11 @@ def task_resubmission_handler(cbrain_token, parameter_dictionary, tool_config_id file.truncate() -'''Fetches the text from a file on CBRAIN and writes it to the cache. Originally this designed for extracting a hippocampal volume from an FSL Stats text output''' def populate_results(cache_filename, cbrain_token): + """Fetches the text from a file on CBRAIN and writes it to the cache. + + Originally this is designed to extract a hippocampal volume from an FSL Stats text output. + """ with open(cache_filename, "r+") as cache_file: data = json.load(cache_file) diff --git a/cbrainAPI.py b/cbrainAPI.py index 9dd8e5a..99ce19e 100644 --- a/cbrainAPI.py +++ b/cbrainAPI.py @@ -4,8 +4,8 @@ ################################################################################## -'''Posts API call to CBRAIN to obtain a authentication token given a username and password''' def cbrain_login(username, password): + """Posts API call to CBRAIN to obtain a authentication token given a username and password.""" headers = { 'Content-Type': 'application/x-www-form-urlencoded', @@ -28,8 +28,8 @@ def cbrain_login(username, password): return 1 -'''End a CBRAIN session''' def cbrain_logout(cbrain_token): + """Ends a CBRAIN session.""" headers = { 'Accept': 'application/json', @@ -46,10 +46,10 @@ def cbrain_logout(cbrain_token): else: print("Logout failure") return 1 - -'''Lists all files in a CBRAIN data provider''' + def cbrain_list_data_provider(data_provider_ID, cbrain_token): + """Lists all files in a CBRAIN data provider.""" data_provider_ID = str(data_provider_ID) headers = { @@ -70,8 +70,8 @@ def cbrain_list_data_provider(data_provider_ID, cbrain_token): return 1 -'''Posts a task in CBRAIN''' def cbrain_post_task(cbrain_token, userfile_id, tool_config_id, parameter_dictionary): + """Posts a task in CBRAIN.""" userfile_id = str(userfile_id) @@ -109,8 +109,9 @@ def cbrain_post_task(cbrain_token, userfile_id, tool_config_id, parameter_dictio print(response.content) return 1 -'''Gets the list of all the tasks of the user on CBRAIN''' + def cbrain_get_all_tasks(cbrain_token): + """Gets the list of all the tasks of the user on CBRAIN.""" headers = { 'Accept': 'application/json', @@ -140,16 +141,17 @@ def cbrain_get_all_tasks(cbrain_token): return task_list -'''Obtains info on the progress of a single task, given the list of all tasks for the user''' + def cbrain_get_task_info_from_list(task_list, task_ID): + """Obtains info on the progress of a single task, given the list of all tasks for the user.""" for task in task_list: if task_ID == task['id'] or int(task_ID) == task['id']: return task - -'''Obtains information on the progress of a single task by querying for a single task''' + def cbrain_get_task_info(cbrain_token, task_ID): + """Obtains information on the progress of a single task by querying for a single task.""" task_ID = str(task_ID) headers = { @@ -171,8 +173,9 @@ def cbrain_get_task_info(cbrain_token, task_ID): return 1 -'''Downloads the text from a file on CBRAIN''' + def cbrain_download_text(userfile_ID, cbrain_token): + """Downloads the text from a file on CBRAIN.""" userfile_ID = str(userfile_ID) headers = { @@ -193,9 +196,8 @@ def cbrain_download_text(userfile_ID, cbrain_token): return 1 -'''Downloads a file from CBRAIN and saves it, given a userfile ID''' def cbrain_download_file(userfile_ID, filename, cbrain_token): - + """Downloads a file from CBRAIN and saves it, given a userfile ID.""" fileID = str(userfile_ID) headers = { 'Accept': 'application/json', @@ -216,9 +218,8 @@ def cbrain_download_file(userfile_ID, filename, cbrain_token): return 1 -'''Given a filename and data provider, download the file from the data provider''' def cbrain_download_DP_file(filename, data_provider_id, cbrain_token): - + """Given a filename and data provider, download the file from the data provider.""" data_provider_browse = cbrain_list_data_provider(str(data_provider_id), cbrain_token) #Query CBRAIN to list all files in data provider. print(data_provider_browse) @@ -237,8 +238,8 @@ def cbrain_download_DP_file(filename, data_provider_id, cbrain_token): return -'''Makes sure a file in a data provider is synchronized with CBRAIN''' def cbrain_sync_file(userfile_id_list, cbrain_token): + """Makes sure a file in a data provider is synchronized with CBRAIN.""" #userfile_id_list can either be a string eg. '3663657', or a list eg. ['3663729', '3663714'] headers = { 'Content-Type': 'application/json', From a6cd83dd02238ce7440965c8ff8e174f707e298c Mon Sep 17 00:00:00 2001 From: mathdugre Date: Fri, 24 Sep 2021 12:27:50 -0400 Subject: [PATCH 5/5] [REFACTOR] format code base with psf/black. Makes the code style consistent trhoughout the project and with common standards. --- NeuroCI.py | 133 ++++--- analysesVisualizations.py | 177 ++++---- cacheOps.py | 819 +++++++++++++++++++++++--------------- cbrainAPI.py | 435 ++++++++++---------- 4 files changed, 909 insertions(+), 655 deletions(-) diff --git a/NeuroCI.py b/NeuroCI.py index 512a9a4..8438eef 100644 --- a/NeuroCI.py +++ b/NeuroCI.py @@ -5,52 +5,79 @@ import yaml from cacheOps import ( - download_cache, - populate_cache_filenames, - update_statuses, - pipeline_manager, - populate_results -) -from cbrainAPI import ( - cbrain_login, - cbrain_logout, - cbrain_get_all_tasks + download_cache, + populate_cache_filenames, + update_statuses, + pipeline_manager, + populate_results, ) +from cbrainAPI import cbrain_login, cbrain_logout, cbrain_get_all_tasks ################################################################################## -def main(cbrain_token, CCI_token, experiment_definition, cbrain_ids, latest_artifacts_url): - - for dataset in experiment_definition['Datasets']: - - download_cache(dataset + '.json', CCI_token, latest_artifacts_url) #Downloads newest cache to json file - print('Downloaded newest cache for: ' + dataset + '.json') - - task_list = cbrain_get_all_tasks(cbrain_token) #Gets the complete list of tasks for the user on CBRAIN - print('Fetched the list of tasks for the CBRAIN user') - - start = time.time() - update_statuses(dataset + '.json', task_list) #Updates the contents of a cache to reflect CBRAIN task statuses - end = time.time() - print('Updated statuses in cache for: ' + dataset + '.json in' + str(datetime.timedelta(seconds=(end - start)))) - - for pipeline in experiment_definition['Pipelines']: - - start = time.time() - populate_cache_filenames(dataset + '.json', cbrain_token, experiment_definition['Datasets'][dataset]['Blocklist'], pipeline, cbrain_ids['Data_Provider_IDs'][dataset], experiment_definition) #Populates a cache with any new files found - end = time.time() - print('Populated cache filenames for: ' + dataset + '.json' + ', ' + pipeline + " in" + str(datetime.timedelta(seconds=(end - start)))) - - pipeline_manager(cbrain_token, experiment_definition, cbrain_ids, pipeline, dataset) - print('Posted tasks for: ' + dataset + '.json' + ', ' + pipeline) - - populate_results(dataset + '.json', cbrain_token) - print('Populated results for ' + dataset + '.json') + +def main( + cbrain_token, CCI_token, experiment_definition, cbrain_ids, latest_artifacts_url +): + + for dataset in experiment_definition["Datasets"]: + + download_cache( + dataset + ".json", CCI_token, latest_artifacts_url + ) # Downloads newest cache to json file + print("Downloaded newest cache for: " + dataset + ".json") + + task_list = cbrain_get_all_tasks( + cbrain_token + ) # Gets the complete list of tasks for the user on CBRAIN + print("Fetched the list of tasks for the CBRAIN user") + + start = time.time() + update_statuses( + dataset + ".json", task_list + ) # Updates the contents of a cache to reflect CBRAIN task statuses + end = time.time() + print( + "Updated statuses in cache for: " + + dataset + + ".json in" + + str(datetime.timedelta(seconds=(end - start))) + ) + + for pipeline in experiment_definition["Pipelines"]: + + start = time.time() + populate_cache_filenames( + dataset + ".json", + cbrain_token, + experiment_definition["Datasets"][dataset]["Blocklist"], + pipeline, + cbrain_ids["Data_Provider_IDs"][dataset], + experiment_definition, + ) # Populates a cache with any new files found + end = time.time() + print( + "Populated cache filenames for: " + + dataset + + ".json" + + ", " + + pipeline + + " in" + + str(datetime.timedelta(seconds=(end - start))) + ) + + pipeline_manager( + cbrain_token, experiment_definition, cbrain_ids, pipeline, dataset + ) + print("Posted tasks for: " + dataset + ".json" + ", " + pipeline) + + populate_results(dataset + ".json", cbrain_token) + print("Populated results for " + dataset + ".json") ################################################################################## -#Obtain login credentials from args, stored in CI environment variables. +# Obtain login credentials from args, stored in CI environment variables. cbrain_user = sys.argv[1] cbrain_password = sys.argv[2] @@ -60,22 +87,24 @@ def main(cbrain_token, CCI_token, experiment_definition, cbrain_ids, latest_arti ################################################################################## -#Main code execution section +# Main code execution section -with open('Experiment_Definition.yaml') as file: #Load experiment definition - try: - experiment_definition = yaml.safe_load(file) - except yaml.YAMLError as exception: #yaml file not valid - print('The Experiment Definition file is not valid') - print(exception) +with open("Experiment_Definition.yaml") as file: # Load experiment definition + try: + experiment_definition = yaml.safe_load(file) + except yaml.YAMLError as exception: # yaml file not valid + print("The Experiment Definition file is not valid") + print(exception) -with open('./Config_Files/CBRAIN_IDs.yaml') as file: #Load mappings for all CBRAIN DP_IDs and toolconfig IDs - try: - cbrain_ids = yaml.safe_load(file) - except yaml.YAMLError as exception: #yaml file not valid - print('The configuration file is not valid') - print(exception) +with open( + "./Config_Files/CBRAIN_IDs.yaml" +) as file: # Load mappings for all CBRAIN DP_IDs and toolconfig IDs + try: + cbrain_ids = yaml.safe_load(file) + except yaml.YAMLError as exception: # yaml file not valid + print("The configuration file is not valid") + print(exception) print("Using artifacts from : " + latest_artifacts_url) @@ -85,5 +114,3 @@ def main(cbrain_token, CCI_token, experiment_definition, cbrain_ids, latest_arti cbrain_logout(cbrain_token) ################################################################################## - - diff --git a/analysesVisualizations.py b/analysesVisualizations.py index 2caf4ec..1884951 100644 --- a/analysesVisualizations.py +++ b/analysesVisualizations.py @@ -7,106 +7,125 @@ from numpy.polynomial.polynomial import polyfit import yaml -from cbrainAPI import ( - cbrain_login, - cbrain_logout, - cbrain_download_file -) +from cbrainAPI import cbrain_login, cbrain_logout, cbrain_download_file ########################################################################################################################### -#General functions +# General functions + def boxplot(volume_list, pipeline_name, dataset_name): """Generates a simple boxplot, not used for now.""" data = np.array(volume_list).astype(np.float) fig1, ax1 = plt.subplots() - ax1.set_title('Left Hippocampal Volumes (mm3)') + ax1.set_title("Left Hippocampal Volumes (mm3)") ax1.boxplot(data) - plt.xticks([1], [dataset_name + '/' + pipeline_name]) - plt.savefig('./artifacts/' + dataset_name + '_' + pipeline_name + '_box' + '.png') # Saves in artifact directory - #plt.show() - + plt.xticks([1], [dataset_name + "/" + pipeline_name]) + plt.savefig( + "./artifacts/" + dataset_name + "_" + pipeline_name + "_box" + ".png" + ) # Saves in artifact directory + # plt.show() + + def corrplot(volume_list, hearing_loss_list, pipeline_name, dataset_name): """Scatter plot and line of best fit.""" new_hl_list = [] new_vol_list = [] index = 0 for elem in hearing_loss_list: - if elem != 'NA': #Append to new list if value is not NA + if elem != "NA": # Append to new list if value is not NA new_hl_list.append(hearing_loss_list[index]) new_vol_list.append(volume_list[index]) index = index + 1 - + x = np.array(new_hl_list).astype(np.float) y = np.array(new_vol_list).astype(np.float) b, m = polyfit(x, y, 1) - plt.plot(x, y, '.') - plt.plot(x, b + m * x, '-') + plt.plot(x, y, ".") + plt.plot(x, b + m * x, "-") plt.ylim(ymin=0) - plt.title('Left Hippocampal Volumes vs Worse_ear_dsi' + '\n' + dataset_name + ' with ' + pipeline_name) - plt.xlabel('Worse_ear_dsi') - plt.ylabel('Hippocampal Volume (mm3)') - plt.savefig('./artifacts/' + dataset_name + '_' + pipeline_name + '_corr' + '.png') # Saves in artifact directory - plt.close() #so we have separate figures and not overlaid. - #plt.show() - + plt.title( + "Left Hippocampal Volumes vs Worse_ear_dsi" + + "\n" + + dataset_name + + " with " + + pipeline_name + ) + plt.xlabel("Worse_ear_dsi") + plt.ylabel("Hippocampal Volume (mm3)") + plt.savefig( + "./artifacts/" + dataset_name + "_" + pipeline_name + "_corr" + ".png" + ) # Saves in artifact directory + plt.close() # so we have separate figures and not overlaid. + # plt.show() + + #################################################################################################################### -#Prevent-AD and hearing loss +# Prevent-AD and hearing loss + def preventAD_get_labels_from_filename(filename): - subject = filename[4:11] - visit = filename[16:23] - return (subject, visit) + subject = filename[4:11] + visit = filename[16:23] + return (subject, visit) def preventAD_get_measure_from_csv(subject, visit, data_file): - with open(data_file, 'r') as read_obj: - csv_reader = csv.reader(read_obj) - for row in csv_reader: - if row[1] == subject and row[2]==visit: - return row[19] #change this to get a different column in CSV + with open(data_file, "r") as read_obj: + csv_reader = csv.reader(read_obj) + for row in csv_reader: + if row[1] == subject and row[2] == visit: + return row[19] # change this to get a different column in CSV -#Process the cache results for a single pipeline +# Process the cache results for a single pipeline def preventAD_process(data_file, cache_file, pipeline_name): - - hearing_loss_list = [] - volume_list = [] - with open(cache_file, "r") as file: - cache = json.load(file) - for entry in cache: - - if cache[entry][pipeline_name]['Result']['result'] != None: - - volume = cache[entry][pipeline_name]['Result']['result'] - - if volume != 1: #If there is more than one word in the result string - necessary for FSL, but maybe not for other pipelines in future. - volume = volume.partition(' ')[0] #Get the first word - subject, visit = preventAD_get_labels_from_filename(entry) - - try: - hearing_loss = preventAD_get_measure_from_csv(subject, visit, data_file) - except Exception as e: - print("Error getting CSV file measures for Prevent-AD.") - return #skips the plotting - - if hearing_loss != None: #only visualize if we have a hearing loss measure for subject/visit - hearing_loss_list.append(hearing_loss) - volume_list.append(volume) - - if len(volume_list) >= 1 and len(hearing_loss_list)>=1: #If there is at least one data point. - corrplot(volume_list, hearing_loss_list, pipeline_name, 'Prevent-AD') - #boxplot(volume_list, pipeline_name, 'Prevent-AD') - print('Generated plots for ' + cache_file + '/' + pipeline_name) - -preventAD_data_file = 'Auditory_processing_Registered_PREVENTAD.csv' -preventAD_cache_file = 'Prevent-AD.json' + + hearing_loss_list = [] + volume_list = [] + with open(cache_file, "r") as file: + cache = json.load(file) + for entry in cache: + + if cache[entry][pipeline_name]["Result"]["result"] != None: + + volume = cache[entry][pipeline_name]["Result"]["result"] + + if ( + volume != 1 + ): # If there is more than one word in the result string - necessary for FSL, but maybe not for other pipelines in future. + volume = volume.partition(" ")[0] # Get the first word + subject, visit = preventAD_get_labels_from_filename(entry) + + try: + hearing_loss = preventAD_get_measure_from_csv( + subject, visit, data_file + ) + except Exception as e: + print("Error getting CSV file measures for Prevent-AD.") + return # skips the plotting + + if ( + hearing_loss != None + ): # only visualize if we have a hearing loss measure for subject/visit + hearing_loss_list.append(hearing_loss) + volume_list.append(volume) + + if ( + len(volume_list) >= 1 and len(hearing_loss_list) >= 1 + ): # If there is at least one data point. + corrplot(volume_list, hearing_loss_list, pipeline_name, "Prevent-AD") + # boxplot(volume_list, pipeline_name, 'Prevent-AD') + print("Generated plots for " + cache_file + "/" + pipeline_name) + + +preventAD_data_file = "Auditory_processing_Registered_PREVENTAD.csv" +preventAD_cache_file = "Prevent-AD.json" ######################################################################################################### -#Compass-ND +# Compass-ND ######################################################################################################### -#UK-BioBank +# UK-BioBank ######################################################################################################### # Main section of Analyses @@ -115,17 +134,19 @@ def preventAD_process(data_file, cache_file, pipeline_name): cbrain_password = sys.argv[2] cbrain_token = cbrain_login(cbrain_user, cbrain_password) -#cbrain_download_DP_file('Auditory_processing_Registered_PREVENTAD.csv', 318, cbrain_token) #use this if you know the file name but not the ID. Takes a long time though. -cbrain_download_file(3497558, preventAD_data_file, cbrain_token) #use this (quicker) if you know the CBRAIN userfileID - -with open('Experiment_Definition.yaml') as file: #Load experiment definition - try: - experiment_definition = yaml.safe_load(file) - except yaml.YAMLError as exception: #yaml file not valid - print('The Experiment Definition file is not valid') - print(exception) - - for pipeline in experiment_definition['Pipelines']: - preventAD_process(preventAD_data_file, preventAD_cache_file, pipeline) +# cbrain_download_DP_file('Auditory_processing_Registered_PREVENTAD.csv', 318, cbrain_token) #use this if you know the file name but not the ID. Takes a long time though. +cbrain_download_file( + 3497558, preventAD_data_file, cbrain_token +) # use this (quicker) if you know the CBRAIN userfileID + +with open("Experiment_Definition.yaml") as file: # Load experiment definition + try: + experiment_definition = yaml.safe_load(file) + except yaml.YAMLError as exception: # yaml file not valid + print("The Experiment Definition file is not valid") + print(exception) + + for pipeline in experiment_definition["Pipelines"]: + preventAD_process(preventAD_data_file, preventAD_cache_file, pipeline) cbrain_logout(cbrain_token) diff --git a/cacheOps.py b/cacheOps.py index 5519ab2..ace5053 100644 --- a/cacheOps.py +++ b/cacheOps.py @@ -5,332 +5,527 @@ import requests from cbrainAPI import ( - cbrain_list_data_provider, - cbrain_post_task, - cbrain_get_task_info_from_list, - cbrain_download_text, - cbrain_sync_file + cbrain_list_data_provider, + cbrain_post_task, + cbrain_get_task_info_from_list, + cbrain_download_text, + cbrain_sync_file, ) ############################################# -def download_cache(cache_file, CCI_token, latest_artifacts_url): - """Downloads newest cache file to json, or if it's not found in the circleCI artifacts, creates a new cache file.""" - - headers = {'Circle-Token': CCI_token} - response = requests.get(str(latest_artifacts_url), headers=headers) #finds the link to the cache file amongst all the artifacts - #example URL for this repo: https://circleci.com/api/v1.1/project/github/jacobsanz97/NDR-CI/latest/artifacts - - link_to_cache = "http://" - if response.status_code == 200: - literal_list = literal_eval(response.text) #convert text to dictionary so we can browse it - for file in literal_list: - if cache_file in file['url']: - link_to_cache = file['url'] - else: - print("Error loading CircleCI artifacts") - print(response.text) - - try: - response = requests.get(link_to_cache, headers=headers) #download the cache file to json - except: - json_cache = json.loads("{}") #Cache file couldn't be loaded, so we create an empty json - print("Cache file not found...Creating a new one.") - else: - json_cache = json.loads(response.text) - - with open(cache_file, 'w') as outfile: #create cache file for CI - json.dump(json_cache, outfile) - print('written cache to temp file') - - -def generate_cache_subject(nifti_file, cbrain_userfile_ID, pipeline, experiment_definition): - """Creates a template for a cache entry (cbrain data provider file), for a specific pipeline. Provides a userfile ID as a starting point for task computations.""" - - data = { nifti_file: { - pipeline: {}}} - - result = {"result": None, "isUsed": None} - - component_number = 0 #Keeps track of the order of the component (we need to flag the first one) - for pipeline_component in experiment_definition['Pipelines'][pipeline]['Components']: - - if component_number == 0: - component_record = { - "inputID": cbrain_userfile_ID, #only do this for first component - "toolConfigID": None, - "taskID": None, - "status": None, - "outputID": None, - "isUsed": None - } - else: - component_record = { - "inputID": None, - "toolConfigID": None, - "taskID": None, - "status": None, - "outputID": None, - "isUsed": None - } - - data[nifti_file][pipeline][pipeline_component] = component_record #add this component to the cache - component_number = component_number + 1 - - data[nifti_file][pipeline]['Result'] = result #add the results section after all the component sections - return data - - -def populate_cache_filenames(cache_file, cbrain_token, blocklist, pipeline, data_provider_id, experiment_definition): - """Generates the template for every file in a cache, for a specific pipeline.""" - - - filelist = [] - data_provider_browse = cbrain_list_data_provider(str(data_provider_id), cbrain_token) #Query CBRAIN to list all files in data provider. - - try: - for entry in data_provider_browse: - if 'userfile_id' in entry: #if it's a registered file, add to filelist. - filelist.append([entry['name'], entry['userfile_id']]) - except Exception as e: - print("Error in browsing data provider, will continue using the filelist from the previous CI run") - return #skips the function without crashing - - with open(cache_file, "r+") as file: - data = json.load(file) - for entry in filelist: - - if entry[0] not in data and entry[0] not in blocklist: #if entry[name] is not in cache AND is not in the blocklist...add to cache - leaf = generate_cache_subject(entry[0], entry[1], pipeline, experiment_definition) - data.update(leaf) - - if entry[0] not in blocklist and pipeline not in data[entry[0]]: #if already in cache, just add entry for new pipeline. - leaf = generate_cache_subject(entry[0], entry[1], pipeline, experiment_definition) - data[entry[0]][pipeline] = leaf[entry[0]][pipeline] - - file.seek(0) # rewind - json.dump(data, file, indent=2) - file.truncate() - return data +def download_cache(cache_file, CCI_token, latest_artifacts_url): + """Downloads newest cache file to json, or if it's not found in the circleCI artifacts, creates a new cache file.""" + + headers = {"Circle-Token": CCI_token} + response = requests.get( + str(latest_artifacts_url), headers=headers + ) # finds the link to the cache file amongst all the artifacts + # example URL for this repo: https://circleci.com/api/v1.1/project/github/jacobsanz97/NDR-CI/latest/artifacts + + link_to_cache = "http://" + if response.status_code == 200: + literal_list = literal_eval( + response.text + ) # convert text to dictionary so we can browse it + for file in literal_list: + if cache_file in file["url"]: + link_to_cache = file["url"] + else: + print("Error loading CircleCI artifacts") + print(response.text) + + try: + response = requests.get( + link_to_cache, headers=headers + ) # download the cache file to json + except: + json_cache = json.loads( + "{}" + ) # Cache file couldn't be loaded, so we create an empty json + print("Cache file not found...Creating a new one.") + else: + json_cache = json.loads(response.text) + + with open(cache_file, "w") as outfile: # create cache file for CI + json.dump(json_cache, outfile) + print("written cache to temp file") + + +def generate_cache_subject( + nifti_file, cbrain_userfile_ID, pipeline, experiment_definition +): + """Creates a template for a cache entry (cbrain data provider file), for a specific pipeline. Provides a userfile ID as a starting point for task computations.""" + + data = {nifti_file: {pipeline: {}}} + + result = {"result": None, "isUsed": None} + + component_number = ( + 0 # Keeps track of the order of the component (we need to flag the first one) + ) + for pipeline_component in experiment_definition["Pipelines"][pipeline][ + "Components" + ]: + + if component_number == 0: + component_record = { + "inputID": cbrain_userfile_ID, # only do this for first component + "toolConfigID": None, + "taskID": None, + "status": None, + "outputID": None, + "isUsed": None, + } + else: + component_record = { + "inputID": None, + "toolConfigID": None, + "taskID": None, + "status": None, + "outputID": None, + "isUsed": None, + } + + data[nifti_file][pipeline][ + pipeline_component + ] = component_record # add this component to the cache + component_number = component_number + 1 + + data[nifti_file][pipeline][ + "Result" + ] = result # add the results section after all the component sections + return data + + +def populate_cache_filenames( + cache_file, + cbrain_token, + blocklist, + pipeline, + data_provider_id, + experiment_definition, +): + """Generates the template for every file in a cache, for a specific pipeline.""" + + filelist = [] + data_provider_browse = cbrain_list_data_provider( + str(data_provider_id), cbrain_token + ) # Query CBRAIN to list all files in data provider. + + try: + for entry in data_provider_browse: + if "userfile_id" in entry: # if it's a registered file, add to filelist. + filelist.append([entry["name"], entry["userfile_id"]]) + except Exception as e: + print( + "Error in browsing data provider, will continue using the filelist from the previous CI run" + ) + return # skips the function without crashing + + with open(cache_file, "r+") as file: + data = json.load(file) + for entry in filelist: + + if ( + entry[0] not in data and entry[0] not in blocklist + ): # if entry[name] is not in cache AND is not in the blocklist...add to cache + leaf = generate_cache_subject( + entry[0], entry[1], pipeline, experiment_definition + ) + data.update(leaf) + + if ( + entry[0] not in blocklist and pipeline not in data[entry[0]] + ): # if already in cache, just add entry for new pipeline. + leaf = generate_cache_subject( + entry[0], entry[1], pipeline, experiment_definition + ) + data[entry[0]][pipeline] = leaf[entry[0]][pipeline] + + file.seek(0) # rewind + json.dump(data, file, indent=2) + file.truncate() + return data def update_statuses(cache_filename, task_list): - """Updates a cache file with the newest task statuses from CBRAIN.""" - - - with open(cache_filename, "r+") as cache_file: - data = json.load(cache_file) - for (file, pipeline) in data.items(): #Parse the json - for (pipeline_name, task_name) in pipeline.items(): - for (task_name_str, params) in task_name.items(): - - if task_name_str != "Result" and params["taskID"] != None and params["status"] != "Completed": #If this is a task (not a result) with an existent ID on CBRAIN, and hasn't yet run to completion - - try: - - jayson = cbrain_get_task_info_from_list(task_list, params["taskID"]) - - if jayson['status'] == "Completed": - #Task completed, update status and get output file ID - data[file][pipeline_name][task_name_str]["status"] = jayson["status"] - #differentiate between one and many outputs - if '_cbrain_output_outputs' in jayson['params']: - data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['_cbrain_output_outputs'][0] - if '_cbrain_output_output' in jayson['params']: - data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['_cbrain_output_output'][0] - if '_cbrain_output_outfile' in jayson['params']: - data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['_cbrain_output_outfile'][0] - if 'outfile_id' in jayson['params']: - data[file][pipeline_name][task_name_str]["outputID"] = jayson['params']['outfile_id'] - - else: - #Task not completed, just update status - data[file][pipeline_name][task_name_str]["status"] = jayson["status"] - - except Exception as e: - pass - - - cache_file.seek(0) - json.dump(data, cache_file, indent=2) - cache_file.truncate() - - -def pipeline_manager(cbrain_token, experiment_definition, cbrain_ids, pipeline, dataset): - """Iterates over each component in a pipeline, organizes, and feeds the necessary data to the functions which post tasks on CBRAIN and update the caches.""" - - component_number = 0 #Keeps track of the order of the component (we need to flag the first one) - - for pipeline_component in experiment_definition['Pipelines'][pipeline]['Components']: - - with open(experiment_definition['Pipelines'][pipeline]['Components'][pipeline_component]['Parameter_dictionary'], "r+") as param_file: #Load parameters for current pipeline component - parameter_dictionary = json.load(param_file) - - if component_number == 0: - first_task_handler(cbrain_token, parameter_dictionary, cbrain_ids['Tool_Config_IDs'][pipeline_component], dataset + '.json', pipeline_component, pipeline) - else: - nth_task_handler(cbrain_token, parameter_dictionary, cbrain_ids['Tool_Config_IDs'][pipeline_component], dataset + '.json', pipeline_component, previous_pipeline_component, pipeline) - - - if len(experiment_definition['Resubmit_tasks']['taskIDs']) > 0: #if there are any tasks to resubmit... - task_resubmission_handler(cbrain_token, parameter_dictionary, cbrain_ids['Tool_Config_IDs'][pipeline_component], dataset + '.json', pipeline_component, pipeline, experiment_definition['Resubmit_tasks']['taskIDs']) - - previous_pipeline_component = pipeline_component - component_number = component_number + 1 - - -def first_task_handler(cbrain_token, parameter_dictionary, tool_config_id, cache_file, pipeline_component, pipeline_name): - """Handles the cache writing for the first task in a pipeline, and calls to post the task to CBRAIN.""" - - with open(cache_file, "r+") as file: - data = json.load(file) - for filename in data: - if data[filename][pipeline_name][pipeline_component]['isUsed'] == None: - - try: - - userfile_id = data[filename][pipeline_name][pipeline_component]['inputID'] - jayson = cbrain_post_task(cbrain_token, userfile_id, tool_config_id, parameter_dictionary) - data[filename][pipeline_name][pipeline_component]['toolConfigID'] = jayson[0]['tool_config_id'] - data[filename][pipeline_name][pipeline_component]['taskID'] = jayson[0]["id"] - data[filename][pipeline_name][pipeline_component]['status'] = jayson[0]["status"] - data[filename][pipeline_name][pipeline_component]['isUsed'] = True - - except Exception as e: - pass - - file.seek(0) # rewind - json.dump(data, file, indent=2) - file.truncate() - - -def nth_task_handler(cbrain_token, parameter_dictionary, tool_config_id, cache_file, pipeline_component, previous_pipeline_component, pipeline_name): - """Handles the cache writing and task posting for any pipeline component except the first task.""" - - with open(cache_file, "r+") as file: - data = json.load(file) - for filename in data: - if data[filename][pipeline_name][pipeline_component]['isUsed'] == None and data[filename][pipeline_name][previous_pipeline_component]['status'] == "Completed": - - try: - - userfile_id = data[filename][pipeline_name][previous_pipeline_component]['outputID'] #output of last task - jayson = cbrain_post_task(cbrain_token, userfile_id, tool_config_id, parameter_dictionary) - data[filename][pipeline_name][pipeline_component]['inputID'] = userfile_id - data[filename][pipeline_name][pipeline_component]['toolConfigID'] = jayson[0]['tool_config_id'] - data[filename][pipeline_name][pipeline_component]['taskID'] = jayson[0]["id"] - data[filename][pipeline_name][pipeline_component]['status'] = jayson[0]["status"] - data[filename][pipeline_name][pipeline_component]['isUsed'] = True - - except Exception as e: - pass - - file.seek(0) # rewind - json.dump(data, file, indent=2) - file.truncate() - -def task_resubmission_handler(cbrain_token, parameter_dictionary, tool_config_id, cache_file, pipeline_component, pipeline_name, rerun_ID_list): - """Resubmits a task, and sets all subsequent pipeline component dependencies to null in the cache.""" - - with open(cache_file, "r+") as file: - data = json.load(file) - for filename in data: - - if 'taskID' in data[filename][pipeline_name][pipeline_component]: - - if data[filename][pipeline_name][pipeline_component]['taskID'] in rerun_ID_list: - - try: - userfile_id = data[filename][pipeline_name][pipeline_component]['inputID'] - jayson = cbrain_post_task(cbrain_token, userfile_id, tool_config_id, parameter_dictionary) - data[filename][pipeline_name][pipeline_component]['toolConfigID'] = jayson[0]['tool_config_id'] - data[filename][pipeline_name][pipeline_component]['taskID'] = jayson[0]["id"] - data[filename][pipeline_name][pipeline_component]['status'] = jayson[0]["status"] - data[filename][pipeline_name][pipeline_component]['isUsed'] = True - print("Reposting " + str(data[filename][pipeline_name][pipeline_component]['taskID'])) - - except Exception as e: - pass - - #The code section sets all the subsequent pipeline components following the reposted task to null - pipeline_length = len(data[filename][pipeline_name].items()) #total number of components in pipeline - curr_index = list(data[filename][pipeline_name].keys()).index(pipeline_component) #index of current (reposted) component - - index_counter = 0 - for component in data[filename][pipeline_name].items(): - - #if we are on a component after the one being submitted, and not the result - if index_counter > curr_index and index_counter < pipeline_length-1: - data[filename][pipeline_name][component[0]]['inputID'] = None - data[filename][pipeline_name][component[0]]['toolConfigID'] = None - data[filename][pipeline_name][component[0]]['taskID'] = None - data[filename][pipeline_name][component[0]]['status'] = None - data[filename][pipeline_name][component[0]]['outputID'] = None - data[filename][pipeline_name][component[0]]['isUsed'] = None - - #if we are on the result component of the pipeline - if index_counter > curr_index and index_counter == pipeline_length-1: - data[filename][pipeline_name][component[0]]['result'] = None - data[filename][pipeline_name][component[0]]['isUsed'] = None - - index_counter += 1 - - file.seek(0) # rewind - json.dump(data, file, indent=2) - file.truncate() + """Updates a cache file with the newest task statuses from CBRAIN.""" + + with open(cache_filename, "r+") as cache_file: + data = json.load(cache_file) + for (file, pipeline) in data.items(): # Parse the json + for (pipeline_name, task_name) in pipeline.items(): + for (task_name_str, params) in task_name.items(): + + if ( + task_name_str != "Result" + and params["taskID"] != None + and params["status"] != "Completed" + ): # If this is a task (not a result) with an existent ID on CBRAIN, and hasn't yet run to completion + + try: + + jayson = cbrain_get_task_info_from_list( + task_list, params["taskID"] + ) + + if jayson["status"] == "Completed": + # Task completed, update status and get output file ID + data[file][pipeline_name][task_name_str][ + "status" + ] = jayson["status"] + # differentiate between one and many outputs + if "_cbrain_output_outputs" in jayson["params"]: + data[file][pipeline_name][task_name_str][ + "outputID" + ] = jayson["params"]["_cbrain_output_outputs"][0] + if "_cbrain_output_output" in jayson["params"]: + data[file][pipeline_name][task_name_str][ + "outputID" + ] = jayson["params"]["_cbrain_output_output"][0] + if "_cbrain_output_outfile" in jayson["params"]: + data[file][pipeline_name][task_name_str][ + "outputID" + ] = jayson["params"]["_cbrain_output_outfile"][0] + if "outfile_id" in jayson["params"]: + data[file][pipeline_name][task_name_str][ + "outputID" + ] = jayson["params"]["outfile_id"] + + else: + # Task not completed, just update status + data[file][pipeline_name][task_name_str][ + "status" + ] = jayson["status"] + + except Exception as e: + pass + + cache_file.seek(0) + json.dump(data, cache_file, indent=2) + cache_file.truncate() + + +def pipeline_manager( + cbrain_token, experiment_definition, cbrain_ids, pipeline, dataset +): + """Iterates over each component in a pipeline, organizes, and feeds the necessary data to the functions which post tasks on CBRAIN and update the caches.""" + + component_number = ( + 0 # Keeps track of the order of the component (we need to flag the first one) + ) + + for pipeline_component in experiment_definition["Pipelines"][pipeline][ + "Components" + ]: + + with open( + experiment_definition["Pipelines"][pipeline]["Components"][ + pipeline_component + ]["Parameter_dictionary"], + "r+", + ) as param_file: # Load parameters for current pipeline component + parameter_dictionary = json.load(param_file) + + if component_number == 0: + first_task_handler( + cbrain_token, + parameter_dictionary, + cbrain_ids["Tool_Config_IDs"][pipeline_component], + dataset + ".json", + pipeline_component, + pipeline, + ) + else: + nth_task_handler( + cbrain_token, + parameter_dictionary, + cbrain_ids["Tool_Config_IDs"][pipeline_component], + dataset + ".json", + pipeline_component, + previous_pipeline_component, + pipeline, + ) + + if ( + len(experiment_definition["Resubmit_tasks"]["taskIDs"]) > 0 + ): # if there are any tasks to resubmit... + task_resubmission_handler( + cbrain_token, + parameter_dictionary, + cbrain_ids["Tool_Config_IDs"][pipeline_component], + dataset + ".json", + pipeline_component, + pipeline, + experiment_definition["Resubmit_tasks"]["taskIDs"], + ) + + previous_pipeline_component = pipeline_component + component_number = component_number + 1 + + +def first_task_handler( + cbrain_token, + parameter_dictionary, + tool_config_id, + cache_file, + pipeline_component, + pipeline_name, +): + """Handles the cache writing for the first task in a pipeline, and calls to post the task to CBRAIN.""" + + with open(cache_file, "r+") as file: + data = json.load(file) + for filename in data: + if data[filename][pipeline_name][pipeline_component]["isUsed"] == None: + + try: + + userfile_id = data[filename][pipeline_name][pipeline_component][ + "inputID" + ] + jayson = cbrain_post_task( + cbrain_token, userfile_id, tool_config_id, parameter_dictionary + ) + data[filename][pipeline_name][pipeline_component][ + "toolConfigID" + ] = jayson[0]["tool_config_id"] + data[filename][pipeline_name][pipeline_component][ + "taskID" + ] = jayson[0]["id"] + data[filename][pipeline_name][pipeline_component][ + "status" + ] = jayson[0]["status"] + data[filename][pipeline_name][pipeline_component]["isUsed"] = True + + except Exception as e: + pass + + file.seek(0) # rewind + json.dump(data, file, indent=2) + file.truncate() + + +def nth_task_handler( + cbrain_token, + parameter_dictionary, + tool_config_id, + cache_file, + pipeline_component, + previous_pipeline_component, + pipeline_name, +): + """Handles the cache writing and task posting for any pipeline component except the first task.""" + + with open(cache_file, "r+") as file: + data = json.load(file) + for filename in data: + if ( + data[filename][pipeline_name][pipeline_component]["isUsed"] == None + and data[filename][pipeline_name][previous_pipeline_component]["status"] + == "Completed" + ): + + try: + + userfile_id = data[filename][pipeline_name][ + previous_pipeline_component + ][ + "outputID" + ] # output of last task + jayson = cbrain_post_task( + cbrain_token, userfile_id, tool_config_id, parameter_dictionary + ) + data[filename][pipeline_name][pipeline_component][ + "inputID" + ] = userfile_id + data[filename][pipeline_name][pipeline_component][ + "toolConfigID" + ] = jayson[0]["tool_config_id"] + data[filename][pipeline_name][pipeline_component][ + "taskID" + ] = jayson[0]["id"] + data[filename][pipeline_name][pipeline_component][ + "status" + ] = jayson[0]["status"] + data[filename][pipeline_name][pipeline_component]["isUsed"] = True + + except Exception as e: + pass + + file.seek(0) # rewind + json.dump(data, file, indent=2) + file.truncate() + + +def task_resubmission_handler( + cbrain_token, + parameter_dictionary, + tool_config_id, + cache_file, + pipeline_component, + pipeline_name, + rerun_ID_list, +): + """Resubmits a task, and sets all subsequent pipeline component dependencies to null in the cache.""" + + with open(cache_file, "r+") as file: + data = json.load(file) + for filename in data: + + if "taskID" in data[filename][pipeline_name][pipeline_component]: + + if ( + data[filename][pipeline_name][pipeline_component]["taskID"] + in rerun_ID_list + ): + + try: + userfile_id = data[filename][pipeline_name][pipeline_component][ + "inputID" + ] + jayson = cbrain_post_task( + cbrain_token, + userfile_id, + tool_config_id, + parameter_dictionary, + ) + data[filename][pipeline_name][pipeline_component][ + "toolConfigID" + ] = jayson[0]["tool_config_id"] + data[filename][pipeline_name][pipeline_component][ + "taskID" + ] = jayson[0]["id"] + data[filename][pipeline_name][pipeline_component][ + "status" + ] = jayson[0]["status"] + data[filename][pipeline_name][pipeline_component][ + "isUsed" + ] = True + print( + "Reposting " + + str( + data[filename][pipeline_name][pipeline_component][ + "taskID" + ] + ) + ) + + except Exception as e: + pass + + # The code section sets all the subsequent pipeline components following the reposted task to null + pipeline_length = len( + data[filename][pipeline_name].items() + ) # total number of components in pipeline + curr_index = list(data[filename][pipeline_name].keys()).index( + pipeline_component + ) # index of current (reposted) component + + index_counter = 0 + for component in data[filename][pipeline_name].items(): + + # if we are on a component after the one being submitted, and not the result + if ( + index_counter > curr_index + and index_counter < pipeline_length - 1 + ): + data[filename][pipeline_name][component[0]][ + "inputID" + ] = None + data[filename][pipeline_name][component[0]][ + "toolConfigID" + ] = None + data[filename][pipeline_name][component[0]]["taskID"] = None + data[filename][pipeline_name][component[0]]["status"] = None + data[filename][pipeline_name][component[0]][ + "outputID" + ] = None + data[filename][pipeline_name][component[0]]["isUsed"] = None + + # if we are on the result component of the pipeline + if ( + index_counter > curr_index + and index_counter == pipeline_length - 1 + ): + data[filename][pipeline_name][component[0]]["result"] = None + data[filename][pipeline_name][component[0]]["isUsed"] = None + + index_counter += 1 + + file.seek(0) # rewind + json.dump(data, file, indent=2) + file.truncate() def populate_results(cache_filename, cbrain_token): - """Fetches the text from a file on CBRAIN and writes it to the cache. + """Fetches the text from a file on CBRAIN and writes it to the cache. Originally this is designed to extract a hippocampal volume from an FSL Stats text output. """ - - with open(cache_filename, "r+") as cache_file: - data = json.load(cache_file) - for (file, pipeline) in data.items(): - for (pipeline_name, pipeline_component) in pipeline.items(): - previous_string = None - for (pipeline_component_str, params) in pipeline_component.items(): - - if pipeline_component_str == "Result": #Find the task before the result in the json - - if data[file][pipeline_name]['Result']['isUsed'] == None and data[file][pipeline_name][previous_string]['status'] == "Completed": - - fileID = data[file][pipeline_name][previous_string]['outputID'] - print("Streaming text for fileID: " + str(fileID)) - cbrain_sync_file(str(fileID), cbrain_token) - - try: - - #Note that result population is hardcoded, as the pipelines all produce different outputs that need different parsing procedures. - if pipeline_name == "FSL": - vol_string = cbrain_download_text(fileID, cbrain_token) - vol = vol_string.split()[0] #get first word - - if pipeline_name == "FreeSurfer": - asegstats_string = cbrain_download_text(fileID, cbrain_token) - vol = retrieve_FreeSurfer_volume(asegstats_string, "Left-Hippocampus") - - data[file][pipeline_name]['Result']['result'] = vol - data[file][pipeline_name]['Result']['isUsed'] = True - - except Exception as e: - pass - - previous_string = pipeline_component_str - - cache_file.seek(0) # rewind - json.dump(data, cache_file, indent=2) - cache_file.truncate() + + with open(cache_filename, "r+") as cache_file: + data = json.load(cache_file) + for (file, pipeline) in data.items(): + for (pipeline_name, pipeline_component) in pipeline.items(): + previous_string = None + for (pipeline_component_str, params) in pipeline_component.items(): + + if ( + pipeline_component_str == "Result" + ): # Find the task before the result in the json + + if ( + data[file][pipeline_name]["Result"]["isUsed"] == None + and data[file][pipeline_name][previous_string]["status"] + == "Completed" + ): + + fileID = data[file][pipeline_name][previous_string][ + "outputID" + ] + print("Streaming text for fileID: " + str(fileID)) + cbrain_sync_file(str(fileID), cbrain_token) + + try: + + # Note that result population is hardcoded, as the pipelines all produce different outputs that need different parsing procedures. + if pipeline_name == "FSL": + vol_string = cbrain_download_text( + fileID, cbrain_token + ) + vol = vol_string.split()[0] # get first word + + if pipeline_name == "FreeSurfer": + asegstats_string = cbrain_download_text( + fileID, cbrain_token + ) + vol = retrieve_FreeSurfer_volume( + asegstats_string, "Left-Hippocampus" + ) + + data[file][pipeline_name]["Result"]["result"] = vol + data[file][pipeline_name]["Result"]["isUsed"] = True + + except Exception as e: + pass + + previous_string = pipeline_component_str + + cache_file.seek(0) # rewind + json.dump(data, cache_file, indent=2) + cache_file.truncate() def retrieve_FreeSurfer_volume(asegstats_string, structName): -#Take as input the aseg.stats file from the freesurfer output as a string, and the StructName field. - lines = asegstats_string.splitlines() - reader = csv.reader(lines, delimiter=" ") - for row in reader: - if structName in row: - index = row.index(structName) - return row[index-2] #Returns the word which is two before the name of the structure. + # Take as input the aseg.stats file from the freesurfer output as a string, and the StructName field. + lines = asegstats_string.splitlines() + reader = csv.reader(lines, delimiter=" ") + for row in reader: + if structName in row: + index = row.index(structName) + return row[ + index - 2 + ] # Returns the word which is two before the name of the structure. diff --git a/cbrainAPI.py b/cbrainAPI.py index 99ce19e..78691f0 100644 --- a/cbrainAPI.py +++ b/cbrainAPI.py @@ -4,259 +4,270 @@ ################################################################################## + def cbrain_login(username, password): - """Posts API call to CBRAIN to obtain a authentication token given a username and password.""" - - headers = { - 'Content-Type': 'application/x-www-form-urlencoded', - 'Accept': 'application/json', - } - data = { - 'login': username, - 'password': password - } - - response = requests.post('https://portal.cbrain.mcgill.ca/session', headers=headers, data=data) - - if response.status_code == 200: - print("Login success") - print(response.content) - jsonResponse = response.json() - return jsonResponse["cbrain_api_token"] - else: - print("Login failure") - return 1 + """Posts API call to CBRAIN to obtain a authentication token given a username and password.""" + + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", + } + data = {"login": username, "password": password} + + response = requests.post( + "https://portal.cbrain.mcgill.ca/session", headers=headers, data=data + ) + + if response.status_code == 200: + print("Login success") + print(response.content) + jsonResponse = response.json() + return jsonResponse["cbrain_api_token"] + else: + print("Login failure") + return 1 def cbrain_logout(cbrain_token): - """Ends a CBRAIN session.""" - - headers = { - 'Accept': 'application/json', - } - params = ( - ('cbrain_api_token', cbrain_token), - ) - - response = requests.delete('https://portal.cbrain.mcgill.ca/session', headers=headers, params=params) - - if response.status_code == 200: - print("Logout success") - return 0 - else: - print("Logout failure") - return 1 + """Ends a CBRAIN session.""" + + headers = { + "Accept": "application/json", + } + params = (("cbrain_api_token", cbrain_token),) + + response = requests.delete( + "https://portal.cbrain.mcgill.ca/session", headers=headers, params=params + ) + + if response.status_code == 200: + print("Logout success") + return 0 + else: + print("Logout failure") + return 1 def cbrain_list_data_provider(data_provider_ID, cbrain_token): - """Lists all files in a CBRAIN data provider.""" - - data_provider_ID = str(data_provider_ID) - headers = { - 'Accept': 'application/json', - } - params = ( - ('id', data_provider_ID), - ('cbrain_api_token', cbrain_token), - ) - url = 'https://portal.cbrain.mcgill.ca/data_providers/' + data_provider_ID + '/browse' - - response = requests.get(url, headers=headers, params=params, allow_redirects=True) - - if response.status_code == 200: - return response.json() - else: - print('DP browse failure') - return 1 + """Lists all files in a CBRAIN data provider.""" + + data_provider_ID = str(data_provider_ID) + headers = { + "Accept": "application/json", + } + params = ( + ("id", data_provider_ID), + ("cbrain_api_token", cbrain_token), + ) + url = ( + "https://portal.cbrain.mcgill.ca/data_providers/" + data_provider_ID + "/browse" + ) + + response = requests.get(url, headers=headers, params=params, allow_redirects=True) + + if response.status_code == 200: + return response.json() + else: + print("DP browse failure") + return 1 def cbrain_post_task(cbrain_token, userfile_id, tool_config_id, parameter_dictionary): - """Posts a task in CBRAIN.""" - - userfile_id = str(userfile_id) - - #Parse the parameter dictionary json, and insert the userfile IDs. - parameter_dictionary['interface_userfile_ids'] = [userfile_id] - parameter_dictionary['input_file'] = userfile_id - - headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - } - params = ( - ('cbrain_api_token', cbrain_token), - ) - data = { - "cbrain_task": { - 'tool_config_id': tool_config_id, - 'params': parameter_dictionary, - 'run_number': None, - 'results_data_provider_id': 179, #Using Beluga - 'cluster_workdir_size': None, - 'workdir_archived': True, - 'description': ''} - } - - y = json.dumps(data) #convert data field to JSON: - response = requests.post('https://portal.cbrain.mcgill.ca/tasks', headers=headers, params=params, data=y) - - if response.status_code == 200: - print(response.text) - jsonResponse = response.json() - return jsonResponse - else: - print("Task posting failed.") - print(response.content) - return 1 + """Posts a task in CBRAIN.""" + + userfile_id = str(userfile_id) + + # Parse the parameter dictionary json, and insert the userfile IDs. + parameter_dictionary["interface_userfile_ids"] = [userfile_id] + parameter_dictionary["input_file"] = userfile_id + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + params = (("cbrain_api_token", cbrain_token),) + data = { + "cbrain_task": { + "tool_config_id": tool_config_id, + "params": parameter_dictionary, + "run_number": None, + "results_data_provider_id": 179, # Using Beluga + "cluster_workdir_size": None, + "workdir_archived": True, + "description": "", + } + } + + y = json.dumps(data) # convert data field to JSON: + response = requests.post( + "https://portal.cbrain.mcgill.ca/tasks", headers=headers, params=params, data=y + ) + + if response.status_code == 200: + print(response.text) + jsonResponse = response.json() + return jsonResponse + else: + print("Task posting failed.") + print(response.content) + return 1 def cbrain_get_all_tasks(cbrain_token): - """Gets the list of all the tasks of the user on CBRAIN.""" - - headers = { - 'Accept': 'application/json', - } - params = { - 'cbrain_api_token': cbrain_token, - 'page': 1, - 'per_page': 1000 - } - url = 'https://portal.cbrain.mcgill.ca/tasks' - task_list = [] - - while True: - - response = requests.get(url, headers=headers, params=params) - - if response.status_code == 200: - jsonResponse = response.json() - task_list += jsonResponse - params['page'] += 1 - else: - print("Task list retrieval failed.") - return 1 - - if len(jsonResponse) < params['per_page']: - break - - return task_list + """Gets the list of all the tasks of the user on CBRAIN.""" + + headers = { + "Accept": "application/json", + } + params = {"cbrain_api_token": cbrain_token, "page": 1, "per_page": 1000} + url = "https://portal.cbrain.mcgill.ca/tasks" + task_list = [] + + while True: + + response = requests.get(url, headers=headers, params=params) + + if response.status_code == 200: + jsonResponse = response.json() + task_list += jsonResponse + params["page"] += 1 + else: + print("Task list retrieval failed.") + return 1 + + if len(jsonResponse) < params["per_page"]: + break + + return task_list def cbrain_get_task_info_from_list(task_list, task_ID): - """Obtains info on the progress of a single task, given the list of all tasks for the user.""" - - for task in task_list: - if task_ID == task['id'] or int(task_ID) == task['id']: - return task + """Obtains info on the progress of a single task, given the list of all tasks for the user.""" + + for task in task_list: + if task_ID == task["id"] or int(task_ID) == task["id"]: + return task def cbrain_get_task_info(cbrain_token, task_ID): - """Obtains information on the progress of a single task by querying for a single task.""" - - task_ID = str(task_ID) - headers = { - 'Accept': 'application/json', - } - params = ( - ('id', task_ID), - ('cbrain_api_token', cbrain_token) - ) - url = 'https://portal.cbrain.mcgill.ca/tasks/' + task_ID - - response = requests.get(url, headers=headers, params=params) - - if response.status_code == 200: - jsonResponse = response.json() - return jsonResponse - else: - print("Task Info retrieval failed.") - return 1 + """Obtains information on the progress of a single task by querying for a single task.""" + + task_ID = str(task_ID) + headers = { + "Accept": "application/json", + } + params = (("id", task_ID), ("cbrain_api_token", cbrain_token)) + url = "https://portal.cbrain.mcgill.ca/tasks/" + task_ID + + response = requests.get(url, headers=headers, params=params) + if response.status_code == 200: + jsonResponse = response.json() + return jsonResponse + else: + print("Task Info retrieval failed.") + return 1 def cbrain_download_text(userfile_ID, cbrain_token): - """Downloads the text from a file on CBRAIN.""" - - userfile_ID = str(userfile_ID) - headers = { - 'Accept': 'text', - } - params = ( - ('cbrain_api_token', cbrain_token), - ) - url = 'https://portal.cbrain.mcgill.ca/userfiles/' + userfile_ID + '/content' - - response = requests.get(url, headers=headers, params=params, allow_redirects=True) - - if response.status_code == 200: - return response.text - else: - print('Download failure') - print(response.status_code) - return 1 + """Downloads the text from a file on CBRAIN.""" + + userfile_ID = str(userfile_ID) + headers = { + "Accept": "text", + } + params = (("cbrain_api_token", cbrain_token),) + url = "https://portal.cbrain.mcgill.ca/userfiles/" + userfile_ID + "/content" + + response = requests.get(url, headers=headers, params=params, allow_redirects=True) + + if response.status_code == 200: + return response.text + else: + print("Download failure") + print(response.status_code) + return 1 def cbrain_download_file(userfile_ID, filename, cbrain_token): """Downloads a file from CBRAIN and saves it, given a userfile ID.""" fileID = str(userfile_ID) headers = { - 'Accept': 'application/json', + "Accept": "application/json", } params = ( - ('id', fileID), - ('cbrain_api_token', cbrain_token), + ("id", fileID), + ("cbrain_api_token", cbrain_token), ) - url = 'https://portal.cbrain.mcgill.ca/userfiles/' + fileID + '/content' - + url = "https://portal.cbrain.mcgill.ca/userfiles/" + fileID + "/content" + response = requests.get(url, headers=headers, params=params, allow_redirects=True) if response.status_code == 200: - open(filename, 'wb').write(response.content) + open(filename, "wb").write(response.content) print("Downloaded file " + filename) return 0 else: - print('File download failure: ' + filename) + print("File download failure: " + filename) return 1 def cbrain_download_DP_file(filename, data_provider_id, cbrain_token): - """Given a filename and data provider, download the file from the data provider.""" - data_provider_browse = cbrain_list_data_provider(str(data_provider_id), cbrain_token) #Query CBRAIN to list all files in data provider. - print(data_provider_browse) - - try: - for entry in data_provider_browse: - if 'userfile_id' in entry and entry['name'] == filename: #if it's a registered file, and filename matches - print("Found registered file: " + filename + " in Data Provider with ID " + str(data_provider_id)) - cbrain_download_file(entry['userfile_id'], filename, cbrain_token) - return 0 - else: - print("File " + filename + " not found in Data Provider " + str(data_provider_id)) - return 1 - - except Exception as e: - print("Error in browsing data provider or file download") - return + """Given a filename and data provider, download the file from the data provider.""" + data_provider_browse = cbrain_list_data_provider( + str(data_provider_id), cbrain_token + ) # Query CBRAIN to list all files in data provider. + print(data_provider_browse) + + try: + for entry in data_provider_browse: + if ( + "userfile_id" in entry and entry["name"] == filename + ): # if it's a registered file, and filename matches + print( + "Found registered file: " + + filename + + " in Data Provider with ID " + + str(data_provider_id) + ) + cbrain_download_file(entry["userfile_id"], filename, cbrain_token) + return 0 + else: + print( + "File " + + filename + + " not found in Data Provider " + + str(data_provider_id) + ) + return 1 + + except Exception as e: + print("Error in browsing data provider or file download") + return def cbrain_sync_file(userfile_id_list, cbrain_token): - """Makes sure a file in a data provider is synchronized with CBRAIN.""" - #userfile_id_list can either be a string eg. '3663657', or a list eg. ['3663729', '3663714'] - headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - } - - params = ( - ('file_ids[]', userfile_id_list), - ('cbrain_api_token', cbrain_token), - ) - - response = requests.post('https://portal.cbrain.mcgill.ca/userfiles/sync_multiple', headers=headers, params=params) - - if response.status_code == 200: - print("Synchronized userfiles " + str(userfile_id_list)) - return - else: - print("Userfile sync failed for IDs: " + str(userfile_id_list)) - print(response.status_code) - return + """Makes sure a file in a data provider is synchronized with CBRAIN.""" + # userfile_id_list can either be a string eg. '3663657', or a list eg. ['3663729', '3663714'] + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + + params = ( + ("file_ids[]", userfile_id_list), + ("cbrain_api_token", cbrain_token), + ) + + response = requests.post( + "https://portal.cbrain.mcgill.ca/userfiles/sync_multiple", + headers=headers, + params=params, + ) + + if response.status_code == 200: + print("Synchronized userfiles " + str(userfile_id_list)) + return + else: + print("Userfile sync failed for IDs: " + str(userfile_id_list)) + print(response.status_code) + return