-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkflow.py
140 lines (105 loc) · 3.94 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python
"""Workflow to parse and compute polygenic scores
gwf workflow to parse all GWAS sumstats in data folder and generate PRS from them
using LDpred2-auto and lassosum as implemented in LDpred2
@Author: Ole S Hansen
@Date:
"""
import os, json
from gwf import Workflow, AnonymousTarget
from gwf.workflow import collect
from datetime import date
from code.functions.modpath import modpath
gwf = Workflow()
paths = json.load(open('data/paths.json'))
### Defining gwf templates
def munge_sumstats(inputfile):
'''
Template for running the r script "munge_sumstats.R" which munges GWAS summary statistics and performs quality control
'''
# Name of folder to be created in steps and results
folder_name = os.path.split(inputfile)[0].split("/")[-1]
# Using modpath() to create name of output file from inputfile - keeps basename,
# but gets another path and another suffix
base_name = modpath(inputfile, parent='', suffix='')
res_folder = f'results/{folder_name}/{base_name}'
munged_folder = f'steps/munged_sumstats/{folder_name}'
munged_sumstats = f'{munged_folder}/{base_name}_munged.rds'
# Defining inputs, outputs and ressources
inputs = {'sumstats': inputfile}
outputs = [munged_sumstats]
working_dir = paths['work_dir']
options = {
'memory': '60g',
'walltime': '00:30:00',
'cores': '2',
'account': 'NCRR-PRS'
}
# Command to be run in the terminal
spec = f'''
mkdir -p {res_folder}
mkdir -p {munged_folder}
Rscript code/munge_sumstats.R {inputfile} {munged_sumstats} {res_folder}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def compute_pgs(inputfile):
'''
Template for running the r script "pgs_model.R" which computes PGS models using munged sumstats
'''
# Name of folder to be created in steps and results
folder_name = os.path.split(inputfile)[0].split("/")[-1]
base_name = modpath(inputfile, parent=(''), suffix=('_munged.rds', '')) # Getting the base name from the inputfile
output_path = f'results/{folder_name}/{base_name}/{base_name}' # New path with sumstat-specific folder (and filename without suffix)
model_info = f'{output_path}_model_info.csv'
working_dir = paths['work_dir']
inputs = {'munged_sumstats': inputfile}
outputs = {
'models': f'{output_path}_raw_models.rds',
'scores': f'{output_path}_scores.rds',
'auto_parameters': f'{output_path}_auto_parameters.rds',
'model_info': model_info
}
options = {
'memory': '40g',
'walltime': '12:00:00',
'cores': '23',
'account': 'NCRR-PRS'
}
spec = f'''
Rscript code/pgs_model.R {inputfile} {output_path} {model_info}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def concat_files(paths):
folder_name = os.path.split(paths[0])[0].split('/')[-2]
output_path = f'results/{folder_name}/{folder_name}_model_info.csv'
list_str = ' '.join(paths)
command_string = "awk 'FNR == 1 && NR != 1 {next} {print}'"
inputs = {'paths': paths}
outputs = {'concatenated_model_info': output_path}
options = {
'memory': '2g',
'walltime': '00:05:00'
}
spec = f'''
{command_string} {list_str} > {output_path}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
### Defining targets
# Naming functions
def get_munge_name(idx, target):
filename = modpath(target.inputs['sumstats'], parent='', suffix='')
return f'munge_{filename}'
def get_pgs_name(idx, target):
filename = modpath(target.inputs['munged_sumstats'], parent='', suffix=('_munged.rds', ''))
return f'pgs_{filename}'
# Input
sumstats = gwf.glob(paths['sumstat_folder'])
# Mapping over the inputs
munge_sumstats = gwf.map(munge_sumstats, sumstats, name=get_munge_name)
compute_pgs = gwf.map(compute_pgs, munge_sumstats.outputs, name=get_pgs_name)
gwf.target_from_template(
name='concatenate_model_info',
template=concat_files(
paths=collect(compute_pgs.outputs, ['model_info'])['model_infos']
)
)