-
Notifications
You must be signed in to change notification settings - Fork 2
/
dataFlowMacroMergerInLine
executable file
·81 lines (61 loc) · 2.78 KB
/
dataFlowMacroMergerInLine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/usr/bin/env python
#configfile = "/opt/merger/dataFlowMerger.conf"
#configfile = "dataFlowMergerMini.conf"
configfile = "dataFlowMergerMacro.conf"
import sys, os, socket, signal
from configobj import ConfigObj
from Logging import getLogger
import cmsDataFlowMerger
log = getLogger()
def getMergeParams(mergeConfigFileName):
try:
if os.path.isfile(mergeConfigFileName):
config = ConfigObj(mergeConfigFileName)
else:
log.error("Configuration file not found: {0}!".format(mergeConfigFileName))
sys.exit(1)
except IOError, e:
log.error("Unable to open configuration file: {0}!".format(mergeConfigFileName))
sys.exit(1)
return config
def get_params():
params = getMergeParams(configfile)
try:
in_path = params['Input']['dataPath']
out_path = params['Output']['dataPath']
eol_path = params['Input']['eolPath']
merge_type = params['Misc']['mergeType']
stream_type = params['Misc']['streamType']
sm_path = params['Output']['smPath']
dqm_path = params['Output']['dqmPath']
doCheckSum_option = params['Output']['doCheckSum']
merge_option = params['Misc']['mergeOption']
es_server_url = params['Misc']['esServerUrl']
es_index_name = params['Misc']['esIndexName']
number_of_shards = params['Misc']['numberOfShards']
number_of_replicas = params['Misc']['numberOfReplicas']
except KeyError, e:
log.error("At least one non-optional parameter missing from the config file {0}:".format(configfile))
log.error("{0}".format(e))
exit(1)
try:
additional_label = params['Output']['outputEndName']
except KeyError, e:
additional_label = socket.gethostname()
try:
delete_files = params['Misc']['deleteFiles']
except KeyError, e:
delete_files = "True"
try:
debug_level = params['Misc']['debugLevel']
except KeyError, e:
debug_level = 10
if (merge_option != "optionA"):
delete_files = "True"
return in_path, eol_path, out_path, sm_path, dqm_path, doCheckSum_option, additional_label, merge_type, stream_type, delete_files, merge_option, es_server_url, es_index_name, number_of_shards, number_of_replicas, debug_level
def DataFlowMerger():
[inPath, eolPath, outPath, smPath, dqmPath, doCheckSumOption, adLabel, mType, sType, delOrigFiles, mergeOption, esServerUrl, esIndexName, numberOfShards, numberOfReplicas, debugLevel] = get_params()
log.info("delFiles: {0}".format(delOrigFiles))
cmsDataFlowMerger.start_merging(inPath, eolPath, mType, sType, outPath, smPath, dqmPath, doCheckSumOption, adLabel, delOrigFiles, mergeOption, esServerUrl, esIndexName, numberOfShards, numberOfReplicas, debugLevel)
if __name__ == "__main__":
DataFlowMerger()