-
Notifications
You must be signed in to change notification settings - Fork 0
/
pz-run.py
executable file
·177 lines (141 loc) · 5.54 KB
/
pz-run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import parsl
from condor import get_config
from apps import (
run_zphot, create_galaxy_lib,
create_filter_set, compute_galaxy_mag
)
from utils import (
create_dir, prepare_format_output, set_partitions
)
import time
import yaml
import os
import glob
import logging
import argparse
def run(phz_config, parsl_config):
""" Run Photo-z Compute
Args:
phz_config (dict): Photo-z pipeline configuration - available in the config.yml
parsl_config (dict): Parsl config
"""
lephare_sandbox = os.getcwd()
logger = logging.getLogger(__name__)
handler = logging.FileHandler(os.path.join(lephare_sandbox, 'pipeline.log'))
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
start_time_full = time.time()
logger.info('LePhare pipeline')
logger.info('-> Loading configurations')
# Changing run directory to sandbox in "child jobs".
parsl_config.run_dir = os.path.join(lephare_sandbox, "runinfo")
# Settings Parsl configurations
parsl.clear()
parsl.load(parsl_config)
inputs = phz_config.get('inputs', {})
output_dir = phz_config.get('output_dir', {})
settings = phz_config.get('settings', {})
test_env = phz_config.get("test_environment", {})
zphot_para = inputs.get('zphot')
lephare_dir = settings.get("lephare_bin")
# Creating LePhare dirs
for x in ['filt', 'lib_bin', 'lib_mag']:
try:
os.mkdir(x)
except:
pass
start_time = time.time()
logger.info("-> Step 1: creating SED library")
gallib = create_galaxy_lib(
zphot_para, lephare_dir, lephare_sandbox, stdout='sedtolib.log'
)
gallib.result()
logger.info("-> Step 2: creating filter transmission files")
filterset = create_filter_set(
zphot_para, lephare_dir, lephare_sandbox, stdout='filter.log'
)
filterset.result()
logger.info("-> Step 3: theoretical magnitudes library")
galmag = compute_galaxy_mag(
zphot_para, lephare_dir, lephare_sandbox, stdout='mag_gal.log'
)
galmag.result()
logger.info(" steps 1,2 and 3 completed: %s seconds" % (int(time.time() - start_time)))
logger.info("-> Step 4: run the photo-z code on the input catalog")
start_time = time.time()
# Getting Lephare parameters
apply_corr = settings.get('photo_corr', None)
photo_type = settings.get('photo_type')
err_type = settings.get('err_type')
bands_list = settings.get('bands')
id_col = settings.get("index")
shifts = settings.get("shifts", None)
limit_sample = test_env.get("limit_sample", None) if test_env.get("turn_on", False) else None
npartition = int(settings.get("partitions", 50))
# Reading zphot.para
dic = dict()
with open(zphot_para, "r") as conffile:
for line in conffile.read().splitlines():
dic[line.split()[0]] = "".join(line.split()[1:])
paraout = dic.get('PARA_OUT')
cat_fmt = str(dic['CAT_FMT'])
# Preparing LePhare output format
idxs, namephotoz = prepare_format_output(bands_list, paraout)
# Getting Input Catalog
photo_files = glob.glob(inputs.get("photometric_data"))
# Limits photometric data according to selected config. (for testing)
ninterval = 0
if limit_sample:
nfiles, ninterval = limit_sample
photo_files = photo_files[:nfiles]
# Creating outputs directory
create_dir(output_dir)
# Settings partitions in photometrics data
partitions_list = set_partitions(photo_files, npartition, id_col)
# Creating Lephare's runs list
counter, procs = 1, list()
for item in partitions_list:
filename = item.get("path")
ranges = item.get("ranges")[:ninterval] if ninterval else item.get("ranges")
for interval in ranges:
output_dir_file = os.path.join(
lephare_sandbox, output_dir, os.path.basename(filename).replace(".parquet", "")
)
create_dir(output_dir_file)
phot_out = os.path.join(
output_dir_file,
f'photz-{str(counter).zfill(5)}.parquet'
)
procs.append(run_zphot(counter, filename, interval, shifts, phot_out, photo_type,
err_type, apply_corr, bands_list, zphot_para, id_col, cat_fmt, idxs, namephotoz,
lephare_dir, lephare_sandbox, stdout=f'zphot-{counter}.log'
))
counter += 1
logger.info(f' number of parallel jobs: {str(len(procs))}')
for proc in procs:
proc.result()
logger.info(" step 4 completed: %s seconds" % (int(time.time() - start_time)))
logger.info("Full runtime: %s seconds" % (int(time.time() - start_time_full)))
parsl.clear()
if __name__ == '__main__':
working_dir = os.getcwd()
# Create the parser and add arguments
parser = argparse.ArgumentParser()
parser.add_argument(dest='config_path', help="yaml config path")
parser.add_argument("-w", "--working_dir", dest="working_dir", default=working_dir, help="run directory")
args = parser.parse_args()
working_dir = args.working_dir
config_path = args.config_path
# Loading Lephare configurations
with open(config_path) as _file:
phz_config = yaml.load(_file, Loader=yaml.FullLoader)
# Create sandbox dir
lephare_sandbox = f'{working_dir}/sandbox/'
create_dir(lephare_sandbox, chdir=True, rmtree=True)
parsl_config = get_config(phz_config)
# Run Photo-z
run(phz_config, parsl_config)