Skip to content

Commit

Permalink
Merge pull request #34 from adamkimbler/master
Browse files Browse the repository at this point in the history
Some quick PEPification changes
  • Loading branch information
tsalo authored Jun 27, 2019
2 parents b7e35fb + 365d2b0 commit 086ed55
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 62 deletions.
146 changes: 103 additions & 43 deletions cis.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def run(command, env={}):
while True:
line = process.stdout.readline()
#line = str(line).encode('utf-8')[:-1]
line=str(line, 'utf-8')[:-1]
line = str(line, 'utf-8')[:-1]
print(line)
if line == '' and process.poll() is not None:
break
Expand All @@ -49,9 +49,11 @@ def get_parser():
parser.add_argument('--config', required=True, dest='config',
help='Path to the config json file.')
parser.add_argument('--protocol_check', required=False, action='store_true',
help='Will perform a protocol check to determine if the correct number of scans and TRs are present.')
help='Will perform a protocol check to determine \
if the correct number of scans and TRs are present.')
parser.add_argument('--autocheck', required=False, action='store_true',
help='Will automatically download all scans from XNAT that are not currently in the project folder.')
help='Will automatically download all scans from XNAT \
that are not currently in the project folder.')
parser.add_argument('--xnat_experiment', required=False, dest='xnatexp',
default=None,
help='XNAT Experiment ID (i.e., XNAT_E*) for single session download.')
Expand All @@ -72,48 +74,45 @@ def main(argv=None):

proj_dir = os.path.dirname(args.bids_dir)
if not op.isdir(proj_dir):
raise ValueError('Project directory must be an existing directory!')
raise ValueError('Project directory must be an existing directory!')

if not op.isfile(args.config):
raise ValueError('Argument "config" must be an existing file.')

if args.n_procs < 1:
raise ValueError('Argument "n_procs" must be positive integer greater '
'than zero.')
else:
n_procs = int(args.n_procs)

with open(args.config, 'r') as fo:
config_options = json.load(fo)

if 'project' not in config_options.keys():
raise Exception('Config File must be updated with project field'
'See Sample Config File for More information')

proj_work_dir = op.join(args.work_dir,
'{0}'.format(config_options['project']))
config_options['project'])
if not proj_work_dir.startswith('/scratch'):
raise ValueError('Working directory must be in scratch.')

xnatdownload_file = op.join('/home/data/cis/singularity-images/',
config_options['xnatdownload'])
config_options['xnatdownload'])

# Additional checks and copying for XNAT Download file
if not op.isfile(xnatdownload_file):
raise ValueError('XNAT Download image specified in config files must be '
'an existing file.')


# Make folders/files
if not op.isdir(op.join(proj_dir, 'code/err')):
os.makedirs(op.join(proj_dir, 'code/err'))

if not op.isdir(op.join(proj_dir, 'code/out')):
os.makedirs(op.join(proj_dir, 'code/out'))

if not op.isdir(proj_work_dir):
os.makedirs(proj_work_dir)

raw_dir = op.join(proj_dir, 'raw')
if not op.isdir(raw_dir):
os.makedirs(raw_dir)
Expand All @@ -122,10 +121,12 @@ def main(argv=None):
#with open(op.join(proj_work_dir, '{0}-processed.txt'.format(config_options['project'])), 'a') as fo:
# for tmp_tar_list in tar_list:
# fo.write(tmp_tar_list + "\n")

scans_df = pd.read_csv(op.join(raw_dir, 'scans.tsv'), sep='\t')
scans_df = scans_df['file']
scans_df.to_csv(op.join(proj_work_dir, '{0}-processed.txt'.format(config_options['project'])), index=False)
scans_df.to_csv(op.join(proj_work_dir,
'{0}-processed.txt'.format(config_options['project'])),
index=False)

# Copy singularity images to scratch
scratch_xnatdownload = op.join(args.work_dir, op.basename(xnatdownload_file))
Expand All @@ -135,74 +136,133 @@ def main(argv=None):

# Run XNAT Download
if args.autocheck:
cmd = ('{sing} -w {work} --project {proj} --autocheck --processed {tar_list}'.format(sing=scratch_xnatdownload, work=proj_work_dir, proj=config_options['project'], tar_list=op.join(proj_work_dir, '{0}-processed.txt'.format(config_options['project']))))
cmd = ('{sing} -w {work} --project {proj} \
--autocheck \
--processed {tar_list}'.format(sing=scratch_xnatdownload,
work=proj_work_dir,
proj=config_options['project'],
tar_list=op.join(proj_work_dir,
'{0}-processed.txt'.format(config_options['project']))))
run(cmd)
elif args.xnatexp is not None:
cmd = ('{sing} -w {work} --project {proj} --session {xnat_exp} --processed {tar_list}'.format(sing=scratch_xnatdownload, work=proj_work_dir, proj=config_options['project'], xnat_exp=args.xnatexp, tar_list=op.join(proj_work_dir, '{0}-processed.txt'.format(config_options['project']))))
cmd = ('{sing} -w {work} --project {proj} \
--session {xnat_exp} \
--processed {tar_list}'.format(sing=scratch_xnatdownload,
work=proj_work_dir,
proj=config_options['project'],
xnat_exp=args.xnatexp,
tar_list=op.join(proj_work_dir,
'{0}-processed.txt'.format(config_options['project']))))
run(cmd)
else:
raise Exception('A valid XNAT Experiment session was not entered for the project or you are not running autocheck.')

raise Exception('A valid XNAT Experiment session was not entered \
for the project or you are not running autocheck.')

os.remove(op.join(proj_work_dir, '{0}-processed.txt'.format(config_options['project'])))
os.remove(scratch_xnatdownload)
# Temporary raw directory in work_dir
raw_work_dir = op.join(proj_work_dir, 'raw')

if op.isdir(raw_work_dir):
# Check if anything was downloaded
data_download = False
if os.listdir(raw_work_dir):
data_download = True

if data_download:

sub_list = os.listdir(raw_work_dir)
for tmp_sub in sub_list:
ses_list = os.listdir(op.join(raw_work_dir, '{0}'.format(tmp_sub)))

for tmp_ses in ses_list:
#run the protocol check if requested
if args.protocol_check:
cmd = ('python /home/data/cis/cis-processing/protocol_check.py -w {work} --bids_dir {bids_dir} --sub {sub} --ses {ses}'.format(work=raw_work_dir, bids_dir = args.bids_dir, sub=tmp_sub, ses=tmp_ses))
cmd = ('python /home/data/cis/cis-processing/protocol_check.py -w {work} \
--bids_dir {bids_dir} \
--sub {sub} --ses {ses}'.format(work=raw_work_dir,
bids_dir=args.bids_dir,
sub=tmp_sub,
ses=tmp_ses))
run(cmd)

#tar the subject and session directory and copy to raw dir
with tarfile.open(op.join(raw_dir, tmp_sub, tmp_ses, '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses)), "w") as tar:
tar.add(op.join(raw_work_dir, '{sub}'.format(sub=tmp_sub)), arcname=os.path.basename(op.join(raw_work_dir, '{sub}'.format(sub=tmp_sub))))
shutil.rmtree(op.join(raw_work_dir, '{sub}'.format(sub=tmp_sub)))

with tarfile.open(op.join(raw_dir,
tmp_sub,
tmp_ses,
'{sub}-{ses}.tar'.format(sub=tmp_sub,
ses=tmp_ses)), "w") \
as tar:
tar.add(op.join(raw_work_dir, '{sub}'.format(sub=tmp_sub)),
arcname=os.path.basename(op.join(raw_work_dir, tmp_sub)))
shutil.rmtree(op.join(raw_work_dir, tmp_sub))

scans_df = pd.read_csv(op.join(raw_dir, 'scans.tsv'), sep='\t')
cols = scans_df.columns
#cols = scans_df.columns
tmp_df = pd.DataFrame()
tmp_df = tmp_df.append({'sub': tmp_sub}, ignore_index=True)
tmp_df['ses'] = tmp_ses
tmp_df['file'] = '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses)
moddate = os.path.getmtime(op.join(raw_dir, tmp_sub, tmp_ses, '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses)))
moddate = op.getmtime(op.join(raw_dir, tmp_sub, tmp_ses,
'{sub}-{ses}.tar'.format(sub=tmp_sub,
ses=tmp_ses)))
timedateobj = datetime.datetime.fromtimestamp(moddate)
tmp_df['creation'] = datetime.datetime.strftime(datetimeob, "%m/%d/%Y, %H:%M")
tmp_df['creation'] = datetime.datetime.strftime(timedateobj, "%m/%d/%Y, %H:%M")
scans_df = scans_df.append(tmp_df)
scans_df.to_csv(op.join(raw_dir, 'scans.tsv'), sep='\t', index=False)

# run cis_proc.py
#cmd = ('python cis_proc.py -t {tarfile} -b {bidsdir} -w {work} --config {config} --sub {sub} --ses {ses} --n_procs {nprocs}'. format(tarfile=op.join(raw_dir, '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses)), bidsdir=args.bids_dir, work=proj_work_dir, config=args.config, sub=tmp_sub.strip('sub-'), ses=tmp_ses.strip('ses-'), nprocs=args.n_procs))
#cmd = ('srun -J cis_proc-{proj}-{sub}-{ses} -e {err_file_loc} -o {out_file_loc} -c {nprocs} -q {hpc_queue} -p investor python /home/data/cis/cis-processing/cis_proc.py -t {tarfile} -b {bidsdir} -w {work} --config {config} --sub {sub} --ses {ses} --n_procs {nprocs}'. format(hpc_queue=config_options['hpc_queue'], proj=config_options['project'], err_file_loc = op.join('/home/data/cis/cis-processing/err', config_options['project'], 'cis_proc-{sub}-{ses}'.format(sub=tmp_sub, ses=tmp_ses)), out_file_loc= op.join('/home/data/cis/cis-processing/out', config_options['project'], 'cis_proc-{sub}-{ses}'.format(sub=tmp_sub, ses=tmp_ses)), tarfile=op.join(raw_dir, '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses)), bidsdir=args.bids_dir, work=proj_work_dir, config=args.config, sub=tmp_sub.strip('sub-'), ses=tmp_ses.strip('ses-'), nprocs=args.n_procs))
cmd = ('bsub -J cis_proc-{proj}-{sub}-{ses} -eo {err_file_loc} -oo {out_file_loc} -n {nprocs} -q {hpc_queue} python /home/data/cis/cis-processing/cis_proc_mcr.py -t {tarfile} -b {bidsdir} -w {work} --config {config} --sub {sub} --ses {ses} --n_procs {nprocs}'. format(hpc_queue=config_options['hpc_queue'], proj=config_options['project'], err_file_loc = op.join(proj_dir, 'code/err', 'cis_proc-{sub}-{ses}'.format(sub=tmp_sub, ses=tmp_ses)), out_file_loc= op.join(proj_dir, 'code/out', 'cis_proc-{sub}-{ses}'.format(sub=tmp_sub, ses=tmp_ses)), tarfile=op.join(raw_dir, '{sub}-{ses}.tar'.format(sub=tmp_sub, ses=tmp_ses)), bidsdir=args.bids_dir, work=proj_work_dir, config=args.config, sub=tmp_sub.strip('sub-'), ses=tmp_ses.strip('ses-'), nprocs=args.n_procs))
cmd = ('bsub -J cis_proc-{proj}-{sub}-{ses} \
-eo {err_file_loc} -oo {out_file_loc} \
-n {nprocs} -q {hpc_queue} python /home/data/cis/cis-processing/cis_proc_mcr.py \
-t {tarfile} -b {bidsdir} -w {work} --config {config} \
--sub {sub} --ses {ses} \
--n_procs {n_procs}'.format(hpc_queue=config_options['hpc_queue'],
proj=config_options['project'],
err_file_loc=op.join(proj_dir,
'code/err',
'cis_proc-{sub}-{ses}'.format(sub=tmp_sub,
ses=tmp_ses)),
out_file_loc=op.join(proj_dir,
'code/out',
'cis_proc-{sub}-{ses}'.format(sub=tmp_sub,
ses=tmp_ses)),
tarfile=op.join(raw_dir,
'{sub}-{ses}.tar'.format(sub=tmp_sub,
ses=tmp_ses)),
bidsdir=args.bids_dir,
work=proj_work_dir,
config=args.config,
sub=tmp_sub.strip('sub-'),
ses=tmp_ses.strip('ses-'),
n_procs=args.n_procs))
run(cmd)

# get date and time
now = datetime.datetime.now()
date_time=now.strftime("%Y-%m-%d %H:%M")
date_time = now.strftime("%Y-%m-%d %H:%M")
# append the email message

with open(op.join(proj_work_dir, '{0}-processed-message.txt'.format(config_options['project'])), 'a') as fo:
fo.write('Data transferred from XNAT to FIU-HPC for Project: {proj} Subject: {sub} Session: {ses} on {datetime}\n'.format(proj=config_options['project'], sub=tmp_sub, ses=tmp_ses, datetime=date_time))



with open(op.join(proj_work_dir,
'{0}-processed-message.txt'.format(config_options['project'])), 'a') as fo:
fo.write('Data transferred from XNAT to FIU-HPC for Project: {proj} \
Subject: {sub} \
Session: {ses} on {datetime}\n'.format(proj=config_options['project'],
sub=tmp_sub, ses=tmp_ses,
datetime=date_time))


shutil.rmtree(op.join(raw_work_dir))

cmd = ("mail -s 'FIU XNAT-HPC Data Transfer Update Project {proj}' {email_list} < {message}".format(proj=config_options['project'], email_list=config_options['email'], message=op.join(proj_work_dir, '{0}-processed-message.txt'.format(config_options['project']))))
cmd = ("mail -s 'FIU XNAT-HPC Data Transfer Update Project {proj}' \
{email_list} < {message}".format(proj=config_options['project'],
email_list=config_options['email'],
message=op.join(proj_work_dir,
'{0}-processed-message.txt'.format(config_options['project']))))
run(cmd)
os.remove(op.join(proj_work_dir, '{0}-processed-message.txt'.format(config_options['project'])))
os.remove(op.join(proj_work_dir,
'{0}-processed-message.txt'.format(config_options['project'])))

if __name__ == '__main__':
main()
Loading

0 comments on commit 086ed55

Please sign in to comment.