diff --git a/cis.py b/cis.py index 317b51d..87fa509 100644 --- a/cis.py +++ b/cis.py @@ -26,7 +26,7 @@ def run(command, env={}): while True: line = process.stdout.readline() #line = str(line).encode('utf-8')[:-1] - line=str(line, 'utf-8')[:-1] + line = str(line, 'utf-8')[:-1] print(line) if line == '' and process.poll() is not None: break @@ -49,9 +49,11 @@ def get_parser(): parser.add_argument('--config', required=True, dest='config', help='Path to the config json file.') parser.add_argument('--protocol_check', required=False, action='store_true', - help='Will perform a protocol check to determine if the correct number of scans and TRs are present.') + help='Will perform a protocol check to determine \ + if the correct number of scans and TRs are present.') parser.add_argument('--autocheck', required=False, action='store_true', - help='Will automatically download all scans from XNAT that are not currently in the project folder.') + help='Will automatically download all scans from XNAT \ + that are not currently in the project folder.') parser.add_argument('--xnat_experiment', required=False, dest='xnatexp', default=None, help='XNAT Experiment ID (i.e., XNAT_E*) for single session download.') @@ -72,48 +74,45 @@ def main(argv=None): proj_dir = os.path.dirname(args.bids_dir) if not op.isdir(proj_dir): - raise ValueError('Project directory must be an existing directory!') - + raise ValueError('Project directory must be an existing directory!') + if not op.isfile(args.config): raise ValueError('Argument "config" must be an existing file.') if args.n_procs < 1: raise ValueError('Argument "n_procs" must be positive integer greater ' 'than zero.') - else: - n_procs = int(args.n_procs) - with open(args.config, 'r') as fo: config_options = json.load(fo) if 'project' not in config_options.keys(): raise Exception('Config File must be updated with project field' 'See Sample Config File for More information') - + proj_work_dir = op.join(args.work_dir, - '{0}'.format(config_options['project'])) + config_options['project']) if not proj_work_dir.startswith('/scratch'): raise ValueError('Working directory must be in scratch.') xnatdownload_file = op.join('/home/data/cis/singularity-images/', - config_options['xnatdownload']) + config_options['xnatdownload']) # Additional checks and copying for XNAT Download file if not op.isfile(xnatdownload_file): raise ValueError('XNAT Download image specified in config files must be ' 'an existing file.') - + # Make folders/files if not op.isdir(op.join(proj_dir, 'code/err')): os.makedirs(op.join(proj_dir, 'code/err')) - + if not op.isdir(op.join(proj_dir, 'code/out')): os.makedirs(op.join(proj_dir, 'code/out')) - + if not op.isdir(proj_work_dir): os.makedirs(proj_work_dir) - + raw_dir = op.join(proj_dir, 'raw') if not op.isdir(raw_dir): os.makedirs(raw_dir) @@ -122,10 +121,12 @@ def main(argv=None): #with open(op.join(proj_work_dir, '{0}-processed.txt'.format(config_options['project'])), 'a') as fo: # for tmp_tar_list in tar_list: # fo.write(tmp_tar_list + "\n") - + scans_df = pd.read_csv(op.join(raw_dir, 'scans.tsv'), sep='\t') scans_df = scans_df['file'] - scans_df.to_csv(op.join(proj_work_dir, '{0}-processed.txt'.format(config_options['project'])), index=False) + scans_df.to_csv(op.join(proj_work_dir, + '{0}-processed.txt'.format(config_options['project'])), + index=False) # Copy singularity images to scratch scratch_xnatdownload = op.join(args.work_dir, op.basename(xnatdownload_file)) @@ -135,19 +136,33 @@ def main(argv=None): # Run XNAT Download if args.autocheck: - cmd = ('{sing} -w {work} --project {proj} --autocheck --processed {tar_list}'.format(sing=scratch_xnatdownload, work=proj_work_dir, proj=config_options['project'], tar_list=op.join(proj_work_dir, '{0}-processed.txt'.format(config_options['project'])))) + cmd = ('{sing} -w {work} --project {proj} \ + --autocheck \ + --processed {tar_list}'.format(sing=scratch_xnatdownload, + work=proj_work_dir, + proj=config_options['project'], + tar_list=op.join(proj_work_dir, + '{0}-processed.txt'.format(config_options['project'])))) run(cmd) elif args.xnatexp is not None: - cmd = ('{sing} -w {work} --project {proj} --session {xnat_exp} --processed {tar_list}'.format(sing=scratch_xnatdownload, work=proj_work_dir, proj=config_options['project'], xnat_exp=args.xnatexp, tar_list=op.join(proj_work_dir, '{0}-processed.txt'.format(config_options['project'])))) + cmd = ('{sing} -w {work} --project {proj} \ + --session {xnat_exp} \ + --processed {tar_list}'.format(sing=scratch_xnatdownload, + work=proj_work_dir, + proj=config_options['project'], + xnat_exp=args.xnatexp, + tar_list=op.join(proj_work_dir, + '{0}-processed.txt'.format(config_options['project'])))) run(cmd) else: - raise Exception('A valid XNAT Experiment session was not entered for the project or you are not running autocheck.') - + raise Exception('A valid XNAT Experiment session was not entered \ + for the project or you are not running autocheck.') + os.remove(op.join(proj_work_dir, '{0}-processed.txt'.format(config_options['project']))) os.remove(scratch_xnatdownload) # Temporary raw directory in work_dir raw_work_dir = op.join(proj_work_dir, 'raw') - + if op.isdir(raw_work_dir): # Check if anything was downloaded data_download = False @@ -155,54 +170,99 @@ def main(argv=None): data_download = True if data_download: - sub_list = os.listdir(raw_work_dir) for tmp_sub in sub_list: ses_list = os.listdir(op.join(raw_work_dir, '{0}'.format(tmp_sub))) - + for tmp_ses in ses_list: #run the protocol check if requested if args.protocol_check: - cmd = ('python /home/data/cis/cis-processing/protocol_check.py -w {work} --bids_dir {bids_dir} --sub {sub} --ses {ses}'.format(work=raw_work_dir, bids_dir = args.bids_dir, sub=tmp_sub, ses=tmp_ses)) + cmd = ('python /home/data/cis/cis-processing/protocol_check.py -w {work} \ + --bids_dir {bids_dir} \ + --sub {sub} --ses {ses}'.format(work=raw_work_dir, + bids_dir=args.bids_dir, + sub=tmp_sub, + ses=tmp_ses)) run(cmd) - + #tar the subject and session directory and copy to raw dir - with tarfile.open(op.join(raw_dir, tmp_sub, tmp_ses, '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses)), "w") as tar: - tar.add(op.join(raw_work_dir, '{sub}'.format(sub=tmp_sub)), arcname=os.path.basename(op.join(raw_work_dir, '{sub}'.format(sub=tmp_sub)))) - shutil.rmtree(op.join(raw_work_dir, '{sub}'.format(sub=tmp_sub))) - + with tarfile.open(op.join(raw_dir, + tmp_sub, + tmp_ses, + '{sub}-{ses}.tar'.format(sub=tmp_sub, + ses=tmp_ses)), "w") \ + as tar: + tar.add(op.join(raw_work_dir, '{sub}'.format(sub=tmp_sub)), + arcname=os.path.basename(op.join(raw_work_dir, tmp_sub))) + shutil.rmtree(op.join(raw_work_dir, tmp_sub)) + scans_df = pd.read_csv(op.join(raw_dir, 'scans.tsv'), sep='\t') - cols = scans_df.columns + #cols = scans_df.columns tmp_df = pd.DataFrame() tmp_df = tmp_df.append({'sub': tmp_sub}, ignore_index=True) tmp_df['ses'] = tmp_ses tmp_df['file'] = '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses) - moddate = os.path.getmtime(op.join(raw_dir, tmp_sub, tmp_ses, '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses))) + moddate = op.getmtime(op.join(raw_dir, tmp_sub, tmp_ses, + '{sub}-{ses}.tar'.format(sub=tmp_sub, + ses=tmp_ses))) timedateobj = datetime.datetime.fromtimestamp(moddate) - tmp_df['creation'] = datetime.datetime.strftime(datetimeob, "%m/%d/%Y, %H:%M") + tmp_df['creation'] = datetime.datetime.strftime(timedateobj, "%m/%d/%Y, %H:%M") scans_df = scans_df.append(tmp_df) scans_df.to_csv(op.join(raw_dir, 'scans.tsv'), sep='\t', index=False) - + # run cis_proc.py #cmd = ('python cis_proc.py -t {tarfile} -b {bidsdir} -w {work} --config {config} --sub {sub} --ses {ses} --n_procs {nprocs}'. format(tarfile=op.join(raw_dir, '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses)), bidsdir=args.bids_dir, work=proj_work_dir, config=args.config, sub=tmp_sub.strip('sub-'), ses=tmp_ses.strip('ses-'), nprocs=args.n_procs)) #cmd = ('srun -J cis_proc-{proj}-{sub}-{ses} -e {err_file_loc} -o {out_file_loc} -c {nprocs} -q {hpc_queue} -p investor python /home/data/cis/cis-processing/cis_proc.py -t {tarfile} -b {bidsdir} -w {work} --config {config} --sub {sub} --ses {ses} --n_procs {nprocs}'. format(hpc_queue=config_options['hpc_queue'], proj=config_options['project'], err_file_loc = op.join('/home/data/cis/cis-processing/err', config_options['project'], 'cis_proc-{sub}-{ses}'.format(sub=tmp_sub, ses=tmp_ses)), out_file_loc= op.join('/home/data/cis/cis-processing/out', config_options['project'], 'cis_proc-{sub}-{ses}'.format(sub=tmp_sub, ses=tmp_ses)), tarfile=op.join(raw_dir, '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses)), bidsdir=args.bids_dir, work=proj_work_dir, config=args.config, sub=tmp_sub.strip('sub-'), ses=tmp_ses.strip('ses-'), nprocs=args.n_procs)) - cmd = ('bsub -J cis_proc-{proj}-{sub}-{ses} -eo {err_file_loc} -oo {out_file_loc} -n {nprocs} -q {hpc_queue} python /home/data/cis/cis-processing/cis_proc_mcr.py -t {tarfile} -b {bidsdir} -w {work} --config {config} --sub {sub} --ses {ses} --n_procs {nprocs}'. format(hpc_queue=config_options['hpc_queue'], proj=config_options['project'], err_file_loc = op.join(proj_dir, 'code/err', 'cis_proc-{sub}-{ses}'.format(sub=tmp_sub, ses=tmp_ses)), out_file_loc= op.join(proj_dir, 'code/out', 'cis_proc-{sub}-{ses}'.format(sub=tmp_sub, ses=tmp_ses)), tarfile=op.join(raw_dir, '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses)), bidsdir=args.bids_dir, work=proj_work_dir, config=args.config, sub=tmp_sub.strip('sub-'), ses=tmp_ses.strip('ses-'), nprocs=args.n_procs)) + cmd = ('bsub -J cis_proc-{proj}-{sub}-{ses} \ + -eo {err_file_loc} -oo {out_file_loc} \ + -n {nprocs} -q {hpc_queue} python /home/data/cis/cis-processing/cis_proc_mcr.py \ + -t {tarfile} -b {bidsdir} -w {work} --config {config} \ + --sub {sub} --ses {ses} \ + --n_procs {n_procs}'.format(hpc_queue=config_options['hpc_queue'], + proj=config_options['project'], + err_file_loc=op.join(proj_dir, + 'code/err', + 'cis_proc-{sub}-{ses}'.format(sub=tmp_sub, + ses=tmp_ses)), + out_file_loc=op.join(proj_dir, + 'code/out', + 'cis_proc-{sub}-{ses}'.format(sub=tmp_sub, + ses=tmp_ses)), + tarfile=op.join(raw_dir, + '{sub}-{ses}.tar'.format(sub=tmp_sub, + ses=tmp_ses)), + bidsdir=args.bids_dir, + work=proj_work_dir, + config=args.config, + sub=tmp_sub.strip('sub-'), + ses=tmp_ses.strip('ses-'), + n_procs=args.n_procs)) run(cmd) # get date and time now = datetime.datetime.now() - date_time=now.strftime("%Y-%m-%d %H:%M") + date_time = now.strftime("%Y-%m-%d %H:%M") # append the email message - - with open(op.join(proj_work_dir, '{0}-processed-message.txt'.format(config_options['project'])), 'a') as fo: - fo.write('Data transferred from XNAT to FIU-HPC for Project: {proj} Subject: {sub} Session: {ses} on {datetime}\n'.format(proj=config_options['project'], sub=tmp_sub, ses=tmp_ses, datetime=date_time)) - - + + with open(op.join(proj_work_dir, + '{0}-processed-message.txt'.format(config_options['project'])), 'a') as fo: + fo.write('Data transferred from XNAT to FIU-HPC for Project: {proj} \ + Subject: {sub} \ + Session: {ses} on {datetime}\n'.format(proj=config_options['project'], + sub=tmp_sub, ses=tmp_ses, + datetime=date_time)) + + shutil.rmtree(op.join(raw_work_dir)) - cmd = ("mail -s 'FIU XNAT-HPC Data Transfer Update Project {proj}' {email_list} < {message}".format(proj=config_options['project'], email_list=config_options['email'], message=op.join(proj_work_dir, '{0}-processed-message.txt'.format(config_options['project'])))) + cmd = ("mail -s 'FIU XNAT-HPC Data Transfer Update Project {proj}' \ + {email_list} < {message}".format(proj=config_options['project'], + email_list=config_options['email'], + message=op.join(proj_work_dir, + '{0}-processed-message.txt'.format(config_options['project'])))) run(cmd) - os.remove(op.join(proj_work_dir, '{0}-processed-message.txt'.format(config_options['project']))) + os.remove(op.join(proj_work_dir, + '{0}-processed-message.txt'.format(config_options['project']))) if __name__ == '__main__': main() diff --git a/cis_proc.py b/cis_proc.py index d9f0849..e4776a6 100755 --- a/cis_proc.py +++ b/cis_proc.py @@ -24,7 +24,7 @@ def run(command, env={}): while True: line = process.stdout.readline() #line = str(line).encode('utf-8')[:-1] - line=str(line, 'utf-8')[:-1] + line = str(line, 'utf-8')[:-1] print(line) if line == '' and process.poll() is not None: break @@ -77,8 +77,7 @@ def main(argv=None): if args.n_procs < 1: raise ValueError('Argument "n_procs" must be positive integer greater ' 'than zero.') - else: - n_procs = int(args.n_procs) + n_procs = args.n_procs with open(args.config, 'r') as fo: config_options = json.load(fo) @@ -210,22 +209,22 @@ def main(argv=None): out_ses_dir = op.join(out_sub_dir, 'ses-{0}'.format(args.ses)) if not op.isdir(out_ses_dir): shutil.copytree(scratch_ses_dir, out_ses_dir) - + else: print('Warning: Subject/session directory already exists in ' 'dataset.') else: print('Warning: Subject directory already exists in dataset.') - + if args.ses is not None: tmp_df = pd.read_csv(op.join(out_sub_dir, 'ses-{ses}'.format(ses=args.ses), 'sub-{sub}_ses-{ses}_scans.tsv'.format(sub=args.sub, ses=args.ses)), sep='\t') else: tmp_df = pd.read_csv(op.join(out_sub_dir, 'sub-{sub}_scans.tsv'.format(sub=args.sub)), sep='\t') - + #append scans.tsv file with remove and annot fields tmp_df['remove'] = 0 tmp_df['annotation'] = '' - + #import master scans file if op.isfile(op.join(os.path.dirname(args.bids_dir), 'code/{proj}_scans.tsv'.format(proj=config_options['project']))): master_df = pd.read_csv(op.join(os.path.dirname(args.bids_dir), 'code/{proj}_scans.tsv'.format(proj=config_options['project'])), sep='\t') @@ -266,22 +265,20 @@ def main(argv=None): out=scratch_deriv_dir, mod=tmp_mod, work=mriqc_work_dir, n_procs=n_procs, kwargs=kwargs)) run(cmd) - + # Run MRIQC func for tmp_task in config_options['mriqc_options']['func']['task'].keys(): print(op.join(scratch_bids_dir, 'sub-{sub}'.format(sub=args.sub), 'ses-{ses}'.format(ses=args.ses), 'func/sub-{sub}_ses-{ses}_task-{task}_run-01_bold.json'.format(sub=args.sub, ses=args.ses, task=tmp_task))) - if args.ses is not None: - if op.isfile(op.join(scratch_bids_dir, 'sub-{sub}'.format(sub=args.sub), 'ses-{ses}'.format(ses=args.ses), 'func/sub-{sub}_ses-{ses}_task-{task}_run-01_bold.json'.format(sub=args.sub, ses=args.ses, task=tmp_task))): - run_mriqc=True - else: - run_mriqc=False + if args.ses: + run_mriqc = op.isfile(op.join(scratch_bids_dir, 'sub-{sub}'.format(sub=args.sub), 'ses-{ses}'.format(ses=args.ses), 'func/sub-{sub}_ses-{ses}_task-{task}_run-01_bold.json'.format(sub=args.sub, ses=args.ses, task=tmp_task))) + else: - if op.isfile(op.join(scratch_bids_dir, 'sub-{sub}'.format(sub=args.sub), 'func/sub-{sub}_task-{task}_run-01_bold.json'.format(sub=args.sub, task=tmp_task))): - run_mriqc=True - else: - run_mriqc=False - - if run_mriqc: + run_mriqc = op.isfile(op.join(scratch_bids_dir, 'sub-{sub}'.format(sub=args.sub), + 'func/sub-{sub}_task-{task}_run-01_bold.json'.format(sub=args.sub, + task=tmp_task))) + + + if run_mriqc: kwargs = '' for field in config_options['mriqc_options']['func']['task'][tmp_task]['mriqc_settings'].keys(): if isinstance(config_options['mriqc_options']['func']['task'][tmp_task]['mriqc_settings'][field], list):