-
Notifications
You must be signed in to change notification settings - Fork 0
/
classify_crts_data.py
168 lines (138 loc) · 6.69 KB
/
classify_crts_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import luigi
from pipeline_utils import utils_dir, data_dir
from transient_url_extraction import transient_url_extraction
from transient_lc_extraction import transient_lc_extraction
from transient_metadata_extraction import transient_metadata_extraction
from tag_merge_metadata import tag_merge_metadata
from fats_feature_extraction import fats_feature_extraction
from fats_features_preprocessing import fats_features_preprocessing
from first_stage_classification import first_stage_classification
from first_stage_classificationDMDT import first_stage_classificationDMDT
from second_stage_classification import second_stage_classification
from second_stage_classificationDMDT import second_stage_classificationDMDT
from dmdt_mappings import dmdt_mappings
import csv
class TransientURLExtraction(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget(utils_dir+"transient_lc_urls.csv")
def run(self):
url_list = transient_url_extraction()
with self.output().open('w') as f:
wr = csv.writer(f,lineterminator='\n')
for url in url_list:
wr.writerow([url])
class TransientLCExtraction(luigi.Task):
def requires(self):
return [TransientURLExtraction()]
def output(self):
return luigi.LocalTarget(data_dir+"/raw/transients/")
def run(self):
with self.input()[0].open() as fin:
transient_lc_extraction(fin)
class TransientMetaDataExtraction(luigi.Task):
def requires(self):
return [TransientURLExtraction(), TransientLCExtraction()]
def output(self):
return luigi.LocalTarget(data_dir+"metadata/transient_lc_metadata.csv")
def run(self):
metadata_list = transient_metadata_extraction()
with self.output().open('w') as f:
wr = csv.writer(f,lineterminator='\n')
fieldnames = ['CRTS ID', 'RA (J2000)', 'Dec (J2000)', 'UT Date', 'Mag', 'CSS images', 'SDSS', 'Others', 'Followed', 'Last', 'LC', 'FC', 'Classification','SubClassification']
wr.writerow(fieldnames)
wr.writerows(metadata_list)
class VariableMetaDataExtraction(luigi.ExternalTask):
def output(self):
return luigi.LocalTarget(data_dir+'metadata/variables_lc_metadata.dat')
class TagMergeMetadata(luigi.Task):
def requires(self):
return [TransientMetaDataExtraction(),VariableMetaDataExtraction()]
def output(self):
return luigi.LocalTarget(data_dir+"metadata/lc_metadata.pkl")
def run(self):
tag_merge_metadata(data_dir+"metadata/lc_metadata.pkl")
class FatsFeatureExtraction(luigi.Task):
def requires(self):
return [TagMergeMetadata()]
def output(self):
return luigi.LocalTarget(data_dir+"features/fats_features/tagged_features.pkl")
def run(self):
filename =data_dir+"features/fats_features/tagged_features.pkl"
errorFilename=data_dir+"features/fats_features/errors.pkl"
fats_feature_extraction(filename,errorFilename)
class DMDTMappings(luigi.Task):
def requires(self):
return [TagMergeMetadata()]
def output(self):
return luigi.LocalTarget(data_dir+"features/dmdt_mappings/tagged_features1.pkl")
def run(self):
outputFile = data_dir+"features/dmdt_mappings/tagged_features"
inputFile = data_dir+"metadata/lc_metadata.pkl"
dmdt_mappings(inputFile,outputFile)
class FatsFeaturesPreprocessing(luigi.Task):
def requires(self):
return [FatsFeatureExtraction()]
def output(self):
return luigi.LocalTarget(data_dir+"features/fats_features/clean_tagged_features.pkl")
def run(self):
outputFile =data_dir+"features/fats_features/clean_tagged_features.pkl"
inputFile = data_dir+"features/fats_features/tagged_features.pkl"
fats_features_preprocessing(outputFile, inputFile)
class FirstStageClassification(luigi.Task):
def requires(self):
return [FatsFeaturesPreprocessing()]
def output(self):
return luigi.LocalTarget(data_dir+"results/fats_features/first_stage_scores.txt")
def run(self):
outputFile = data_dir+"results/fats_features/first_stage_scores.txt"
inputFile = data_dir+"features/fats_features/clean_tagged_features.pkl"
first_stage_classification(inputFile, outputFile)
class SecondStageClassification(luigi.Task):
def requires(self):
return [FatsFeaturesPreprocessing()]
def output(self):
return luigi.LocalTarget(data_dir+"results/fats_features/second_stage_scores.txt")
def run(self):
outputFile = data_dir+"results/fats_features/second_stage_scores.txt"
inputFile = data_dir+"features/fats_features/clean_tagged_features.pkl"
second_stage_classification(inputFile, outputFile)
class FirstStageClassificationDMDT(luigi.Task):
def requires(self):
return [DMDTMappings()]
def output(self):
return luigi.LocalTarget(data_dir+"results/dmdt_mappings/first_stage_scores.txt")
def run(self):
outputFile = data_dir+"results/dmdt_mappings/first_stage_scores.txt"
inputFile = data_dir+"features/dmdt_mappings/tagged_features"
first_stage_classificationDMDT(inputFile, outputFile)
class SecondStageClassificationDMDT(luigi.Task):
def requires(self):
return [DMDTMappings()]
def output(self):
return luigi.LocalTarget(data_dir+"results/tagged_features/second_stage_scores.txt")
def run(self):
outputFile = data_dir+"results/dmdt_mappings/second_stage_scores.txt"
inputFile = data_dir+"features/dmdt_mappings/tagged_features"
second_stage_classificationDMDT(inputFile, outputFile)
class ClassifyCRTSData(luigi.Task):
def requires(self):
return [FatsFeaturesPreprocessing(), DMDTMappings()]
def output(self):
return luigi.LocalTarget(data_dir+"resuts/")
def run(self):
outputFile = data_dir+"results/fats_features/first_stage_scores.txt"
inputFile = data_dir+"features/fats_features/clean_tagged_features.pkl"
first_stage_classification(inputFile, outputFile)
outputFile = data_dir+"results/fats_features/second_stage_scores.txt"
inputFile = data_dir+"features/fats_features/clean_tagged_features.pkl"
second_stage_classification(inputFile, outputFile)
outputFile = data_dir+"results/dmdt_mappings/first_stage_scores.txt"
inputFile = data_dir+"features/dmdt_mappings/tagged_features"
first_stage_classificationDMDT(inputFile, outputFile)
outputFile = data_dir+"results/dmdt_mappings/second_stage_scores.txt"
inputFile = data_dir+"features/dmdt_mappings/tagged_features"
second_stage_classificationDMDT(inputFile, outputFile)
if __name__ == '__main__':
luigi.run()