-
Notifications
You must be signed in to change notification settings - Fork 0
/
runKerasTensorflowClassifierOnATLASImagesMultiprocess.py
162 lines (120 loc) · 6.17 KB
/
runKerasTensorflowClassifierOnATLASImagesMultiprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python
"""Run the Keras/Tensorflow classifier.
Usage:
%s <configFile> [<candidate>...] [--hkoclassifier=<hkoclassifier>] [--mloclassifier=<mloclassifier>] [--ps1classifier=<ps1classifier>] [--outputcsv=<outputcsv>] [--listid=<listid>] [--imageroot=<imageroot>] [--update]
%s (-h | --help)
%s --version
Options:
-h --help Show this screen.
--version Show version.
--listid=<listid> List ID [default: 4].
--hkoclassifier=<hkoclassifier> HKO Classifier file.
--mloclassifier=<mloclassifier> MLO Classifier file.
--ps1classifier=<mloclassifier> PS1 Classifier file. This option will cause the HKO and MLO classifiers to be ignored.
--outputcsv=<outputcsv> Output file.
--imageroot=<imageroot> Root location of the actual images [default: /psdb3/images/].
--update Update the database.
Example:
python %s ~/config.pso3.gw.warp.yaml --ps1classifier=/data/db4data1/scratch/kws/training/ps1/20190115/ps1_20190115_400000_1200000.best.hdf5 --listid=4 --outputcsv=/tmp/pso3_list_4.csv
python %s ../ps13pi/config/config.yaml --ps1classifier=/data/db4data1/scratch/kws/training/ps1/20190115/ps1_20190115_400000_1200000.best.hdf5 --listid=4 --outputcsv=/tmp/ps13pi_list_4.csv
"""
import sys
__doc__ = __doc__ % (sys.argv[0], sys.argv[0], sys.argv[0], sys.argv[0], sys.argv[0])
from docopt import docopt
from gkutils import Struct, cleanOptions, readGenericDataFile, dbConnect
import sys, csv, os
from runKerasTensorflowClassifierOnATLASImages import getObjectsByList, runKerasTensorflowClassifier, updateTransientRBValue
from gkmultiprocessingUtils import *
LOG_FILE_LOCATION = '/' + os.uname()[1].split('.')[0] + '/tc_logs/'
LOG_PREFIX = 'ml_keras_'
def worker(num, db, objectListFragment, dateAndTime, firstPass, miscParameters, q):
"""thread worker function"""
# Redefine the output to be a log file.
sys.stdout = open('%s%s_%s_%d.log' % (LOG_FILE_LOCATION, LOG_PREFIX, dateAndTime, num), "w")
options = miscParameters[0]
# Override the full candidate list with a sublist of candidates.
options.candidate = [str(x['id']) for x in objectListFragment]
objectsForUpdate = runKerasTensorflowClassifier(options, processNumber = num)
# Write the objects for update onto a Queue object
print ("Adding %d objects onto the queue." % len(objectsForUpdate))
q.put(objectsForUpdate)
print ("Process complete.")
print ("DB Connection Closed - exiting")
return 0
def runKerasTensorflowClassifierMultiprocess(opts):
# Use utils.Struct to convert the dict into an object for compatibility with old optparse code.
if type(opts) is dict:
options = Struct(**opts)
else:
options = opts
import yaml
with open(options.configFile) as yaml_file:
config = yaml.load(yaml_file)
username = config['databases']['local']['username']
password = config['databases']['local']['password']
database = config['databases']['local']['database']
hostname = config['databases']['local']['hostname']
db = []
conn = dbConnect(hostname, username, password, database)
if not conn:
print("Cannot connect to the database")
return 1
# If the list isn't specified assume it's the Eyeball List.
if options.listid is not None:
try:
detectionList = int(options.listid)
if detectionList < 0 or detectionList > 8:
print ("Detection list must be between 0 and 8")
return 1
except ValueError as e:
sys.exit("Detection list must be an integer")
# 2018-07-31 KWS We have PS1 data. Don't bother with the HKO/MLO ATLAS data.
ps1Data = False
if options.ps1classifier:
ps1Data = True
objectList = []
# if candidates are specified in the options, then override the list.
if len(options.candidate) > 0:
objectList = [{'id': int(candidate)} for candidate in options.candidate]
else:
objectList = getObjectsByList(conn, database, listId = int(options.listid), ps1Data = ps1Data)
# 2019-06-07 KWS For reasons not entirely clear, Tensorflow seems to exhaust every last
# bit of CPU and memory. So let's divide the list by 10 if the list is
# larger than 10000 in size.
if len(objectList) > 100:
bin, subLists = splitList(objectList, bins=16)
else:
subLists = [objectList]
for l in subLists:
currentDate = datetime.datetime.now().strftime("%Y:%m:%d:%H:%M:%S")
(year, month, day, hour, min, sec) = currentDate.split(':')
dateAndTime = "%s%s%s_%s%s%s" % (year, month, day, hour, min, sec)
objectsForUpdate = []
if len(objectList) > 0:
# 2019-08-24 KWS Hard-wire the number of workers.
nProcessors, listChunks = splitList(l, bins=28)
print ("%s Parallel Processing..." % (datetime.datetime.now().strftime("%Y:%m:%d:%H:%M:%S")))
objectsForUpdate = parallelProcess(db, dateAndTime, nProcessors, listChunks, worker, miscParameters = [options])
print ("%s Done Parallel Processing" % (datetime.datetime.now().strftime("%Y:%m:%d:%H:%M:%S")))
print ("TOTAL OBJECTS TO UPDATE = %d" % len(objectsForUpdate))
# if len(objectsForUpdate) > 0 and options.update:
# updateObjects(conn, objectsForUpdate)
# Sort the combined list.
objectsForUpdate = sorted(objectsForUpdate, key = lambda x: x[1])
if options.outputcsv is not None:
with open(options.outputcsv, 'w') as f:
for row in objectsForUpdate:
print(row[0], row[1])
f.write('%s,%f\n' % (row[0], row[1]))
if options.update:
for row in objectsForUpdate:
updateTransientRBValue(conn, row[0], row[1], ps1Data = ps1Data)
conn.close()
def main():
opts = docopt(__doc__, version='0.1')
opts = cleanOptions(opts)
# Use utils.Struct to convert the dict into an object for compatibility with old optparse code.
options = Struct(**opts)
runKerasTensorflowClassifierMultiprocess(options)
if __name__=='__main__':
main()