forked from CMSCompOps/WmAgentScripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkillWorkflowAgent.py
executable file
·80 lines (65 loc) · 2.84 KB
/
killWorkflowAgent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python
"""
Script for killing a workflow: delete from LQ, delete in WMBS and set cancelled in inbox
This has to be executed in each agent the workflow is running in (Agent Level)
"""
import os, sys
from WMCore.Configuration import loadConfigurationFile
from WMCore.WorkQueue.WorkQueueBackend import WorkQueueBackend
from WMCore.WMInit import connectToDB
from WMCore.WorkQueue.WMBSHelper import killWorkflow
def killWorkflowAgent(WorkflowName):
"""
Cancel work for a given workflow - delete in wmbs, delete from workqueue db, set canceled in inbox
"""
# get configuration file path
if not os.environ.has_key("WMAGENT_CONFIG"):
os.environ['WMAGENT_CONFIG'] = '/data/srv/wmagent/current/config/wmagent/config.py'
# load config
wmConfig = loadConfigurationFile(os.environ['WMAGENT_CONFIG'])
wqManager = wmConfig.section_('WorkQueueManager')
couchUrl = wqManager.couchurl
dbname = wqManager.dbname
inboxDatabase = wqManager.inboxDatabase
parentQueueCouchUrl = wqManager.queueParams['ParentQueueCouchUrl']
# Creates backend
backend = WorkQueueBackend(couchUrl, dbname, inboxDatabase,
parentQueueCouchUrl)
args = {}
args['RequestName'] = WorkflowName
elements = backend.getElements(**args)
# take wf from args in case no elements exist for workflow (i.e. work was negotiating)
requestNames = set([x['RequestName'] for x in elements]) | set([wf for wf in [WorkflowName]])
if not requestNames:
print 'Workflow is not at the backend'
inbox_elements = []
for wf in requestNames:
inbox_elements.extend(backend.getInboxElements(WorkflowName = wf))
print "Canceling work for workflow: %s" % (requestNames)
for workflow in requestNames:
try:
connectToDB()
jobDumpConfig = wmConfig
bossAirConfig = wmConfig
killWorkflow(workflow, jobDumpConfig,
bossAirConfig)
except Exception, ex:
print 'Aborting %s wmbs subscription failed: %s' % (workflow, str(ex))
# Don't update as fails sometimes due to conflicts (#3856)
[x.load().__setitem__('Status', 'Canceled') for x in inbox_elements if x['Status'] != 'Canceled']
updated_inbox_ids = [x.id for x in backend.saveElements(*inbox_elements)]
# delete elements - no longer need them
backend.deleteElements(*[x for x in elements if x['ParentQueueId'] in updated_inbox_ids])
print "Aborted worqueue elements:"
print [x.id for x in elements]
def main():
args=sys.argv[1:]
if not len(args)==1:
print "usage:killWorkflowAgent.py workflowname"
sys.exit(0)
workflow=args[0]
# kill a given workflow in current agent
killWorkflowAgent(workflow)
sys.exit(0);
if __name__ == "__main__":
main()