-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdispatcher.py
executable file
·604 lines (533 loc) · 28.8 KB
/
dispatcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
#!/usr/bin/python3 -u
import MDSplus
import redis
import time
import random
import threading
import traceback
# Shared information on Redis
# actionStatus : Dictionary {path: status[:message]} indexed by <experiment>:<shot>:ActionStatus>:<ident>
# abortRequest : Dictionary {path: abort req(1/0) } indexed by <experiment>:<shot>:AbortRequest:<ident>
# phaseInfo : Set indexed by <experiment>:<shot>:Phases
# serverInfo :Dictionary {ident: number of instances}: indexed by ActionServers
# PubSub channels on Redis:
# ident: shared by all instances of action servers sharing the same ident. Messages received:
# DO_SEQUENCE: starts executing the passed sequence number
# DEPEND_UPDATE: check eligibility for local action nids affected by the passed nid
# dispatcher: unique, subscribed by every action server. Messages receives
# BUILD_TABLES: start local information collection based on passed exp+shot
# DO_PHASE: starts execution of the passed phase
treeDEPENDENCY_AND = 10
treeDEPENDENCY_OR = 11
treeDEPENDENCY_OF = 12
treeLOGICAL_AND = 45
treeLOGICAL_OR = 267
treeLOGICAL_OF = 229
ACTION_NOT_DISPATCHED = 0
ACTION_DOING = 1
ACTION_DONE = 2
ACTION_ERROR = 3
ACTION_TIMEOUT = 4
ACTION_ABORT = 5
ACTION_STREAMING= 6
def pathToNid(path, t):
return t.getNode(path).getNid()
def nidToPath(nid, t):
return MDSplus.TreeNode(nid, t).getFullPath()
# return the list of Action nids potentially affected by the execution of this actionNode
def getDepActionNids(actionNode):
action = actionNode.getData()
dispatch = action.getDispatch()
when = dispatch.getWhen()
if isinstance(when, MDSplus.Compound):
opcode = when.getOpcode()
if opcode == treeDEPENDENCY_AND or opcode == treeLOGICAL_AND or opcode == treeDEPENDENCY_OR or opcode == treeLOGICAL_OR:
if isinstance(when.getArgumentAt(0), MDSplus.TreeNode) or isinstance(when.getArgumentAt(0), MDSplus.TreePath):
leftSide = [when.getArgumentAt(0).getNid()]
elif isinstance(when.getArgumentAt(0), MDSplus.Compound):
leftSide = getDepActionNids(when.getArgumentAt(0))
else:
leftSide = []
if isinstance(when.getArgumentAt(1), MDSplus.TreeNode) or isinstance(when.getArgumentAt(1), MDSplus.TreePath):
rightSide = [when.getArgumentAt(1).getNid()]
elif isinstance(when.getArgumentAt(1), MDSplus.Compound):
rightSide = getDepActionNids(when.getArgumentAt(1))
else:
rightSide = []
return leftSide + rightSide
if opcode == treeDEPENDENCY_OF or opcode == treeLOGICAL_OF:
return [when.getArgumentAt(1).getNid()]
if isinstance(when, MDSplus.TreeNode) or isinstance(when, MDSplus.TreePath):
if when.getNid() != None:
return [when.getNid()]
return []
def registerIdent(ident):
red.hincrby('ActionServers', ident, 1)
def getActionServers():
keys = red.hkeys('ActionServes')
retServers = []
for key in keys:
retServers.append(key.decode('utf8'))
return retServers
# add action as not yet dispatched in redis
def addActionInShared(experiment, shot, ident, actionNid, actionPath, phase, t):
red.hset(experiment+':'+str(shot)+':ActionStatus:'+ident, nidToPath(actionNid, t), str(ACTION_NOT_DISPATCHED))
red.hset(experiment+':'+str(shot)+':ActionPathStatus:'+ident, actionPath, str(ACTION_NOT_DISPATCHED))
red.hset(experiment+':'+str(shot)+':ActionInfo:'+ident, actionPath, nidToPath(actionNid, t))
red.hset(experiment+':'+str(shot)+':ActionPhaseInfo:'+ident, nidToPath(actionNid, t), phase)
red.hset(experiment+':'+str(shot)+':AbortRequest:'+ident, nidToPath(actionNid, t), 0)
red.sadd(experiment+':'+str(shot)+':Phases', phase)
# get an undispatched action in the list. -1 is returned if non undispatched
def pickNotYetDispatched(experiment, shot, ident, actionNidList, t):
pending = False
for actionNid in actionNidList:
itemid = experiment+':'+str(shot)+':ActionStatus:'+ident
while True:
try:
pipe = red.pipeline()
pipe.watch(itemid)
status = int(pipe.hget(itemid, nidToPath(actionNid, t)))
if status == ACTION_NOT_DISPATCHED:
pipe.multi()
pipe.hset(itemid, nidToPath(actionNid, t), ACTION_DOING)
pipe.execute()
pipe.unwatch()
return actionNid
pipe.unwatch()
if status == ACTION_DOING:
pending = True
break
except redis.WatchError as exc:
print('*****************Concurrent access for ident '+ident)
# print(exc)
if pending:
return -1 #all dispatched but some not yet finished
else:
return -2 #all dispatched and finished
def updateStatus(experiment, shot, ident, actionNid, actionPath, status, t):
red.hset(experiment+':'+str(shot)+':ActionStatus:'+ident, nidToPath(actionNid, t), str(status))
red.hset(experiment+':'+str(shot)+':ActionPathStatus:'+ident, actionPath, str(status))
red.hset(experiment+':'+str(shot)+':AbortRequest:'+ident, nidToPath(actionNid, t), 0)
def checkAbort(experiment, shot, ident, actionPath, t):
abortReq = int(red.hget(experiment+':'+str(shot)+':AbortRequest:'+ident, actionPath))
if abortReq == 1:
abortReq = red.hset(experiment+':'+str(shot)+':AbortRequest:'+ident, actionPath, 0)
return True
return False
def clearShared(experiment):
keys = red.keys(experiment.upper()+'*')
for key in keys:
red.delete(key)
class ActionServer:
# local information
# seqActions keeps track of the sequential actions for this ident
# seqActions Dictionary{Phase:Dictionary{seqNum:Nid list}}
#
# depActions Dictionary{nid: Dependency}
#
# inDependency keeps track for each local/global action nid the list of local potentially affected action nids
# inDependency Dictionary{str(nid):list of affected nids}
#
# outDependency keeps track for every local action the list of idents hosting potentially affected actions
# outDependency Dictionary{nid: list of affected idents (including self)}
#
# actTimeout keeps track of possible (>0) action timeouts
# actTimeout Dictionary{nid: timeout}
#
# completionEvent keeps track for every action nid for this action server the possible completion event
# completionEvent Dictionary{nid: event name}
#
def __init__(self, ident):
self.seqActions = {}
self.inDependency = {}
self.outDependency = {}
self.actTimeout = {}
self.completionEvent = {}
self.ident = ident
self.pubsub = red.pubsub()
self.pubsub.subscribe('COMMAND:'+ident)
self.actionExecutor = ActionExecutor(self)
self.updateMutex = threading.Lock()
registerIdent(ident)
def buildTables(self, tree):
print("BUILD TABLES "+ tree.name+' '+str(tree.shot))
self.tree = tree
self.experiment = tree.name
self.shot = tree.shot
dd = tree.getNodeWild('***', 'ACTION')
for d in dd:
if d.isOn():
try:
disp = d.getData().getDispatch()
when = disp.getWhen()
phase = disp.getPhase().data()
task = d.getData().getTask()
if not phase in self.seqActions.keys():
self.seqActions[phase] = {}
if disp.getIdent() == self.ident: #Consider only local sequential actions
#Record timeout
try:
self.actTimeout[d.getNid()] = float(task.getTimeout().data())
print("TIMEOUT FOR "+ d.getPath()+': '+str(self.actTimeout[d.getNid()]))
except:
self.actTimeout[d.getNid()] = 0.
#update shared action status EXCEPT FOR ACTION UPDATES
if not isinstance(d.getData().getTask(), MDSplus.String):
updateStatus(self.experiment, self.shot, self.ident, d.getNid(), d.getPath(), ACTION_NOT_DISPATCHED, tree)
if isinstance(when, MDSplus.Scalar):
seqNum = int(when.data())
if not seqNum in self.seqActions[phase].keys():
self.seqActions[phase][seqNum] = []
self.seqActions[phase][seqNum].append(d.getNid())
addActionInShared(self.experiment, self.shot, self.ident,d.getNid(), d.getPath(), phase, self.tree)
# record completion event if any
completionName = disp.getCompletion().getString()
if completionName != '':
self.completionEvent[d.getNid()] = completionName
if not isinstance(when, MDSplus.Scalar): #if it is a dependent action
depNids = getDepActionNids(d) #get Affecting nodes
if disp.getIdent() == self.ident: #if the affected node is local
for depNid in depNids:
if not str(depNid) in self.inDependency.keys():
self.inDependency[str(depNid)] = []
self.inDependency[str(depNid)].append(d.getNid())
depIdent = disp.getIdent()
#if(depIdent != self.ident):
for depNid in depNids:
depNode = MDSplus.TreeNode(depNid, self.tree)
#if the affected node is on another action server and the affecting node is on this server
if(depNode.getData().getDispatch().getIdent().data() == self.ident):
if not depNode.getNid() in self.outDependency.keys():
self.outDependency[depNode.getNid()] = []
if not depIdent in self.outDependency[depNode.getNid()]:
self.outDependency[depNode.getNid()].append(depIdent)
except Exception as e:
print('Error collecting action ' + d.getPath()+ ': '+str(e))
def handleCommands(self):
while True:
while True:
msg = self.pubsub.get_message(timeout=1)
if msg != None:
break
if not isinstance(msg['data'], bytes):
continue
# message = str(msg['data'], 'utf8')
message = msg['data'].decode('utf8')
print('Received Command: '+message)
if message == 'QUIT':
return
if message.startswith('DO_PHASE:'):
phase = message[9:]
t = threading.Thread(target=doPhase, args = (self, phase))
t.start()
elif message.startswith('UPDATE:'): #when any action potentially affecting a local action has been updated
print(message)
items = message[7:].split()
actionNidStr = str(pathToNid(items[0], self.tree))
origIdent = items[1]
newTree = MDSplus.Tree(self.experiment, self.shot)
t1 = threading.Thread(target=doUpdate, args = (self, actionNidStr, newTree, origIdent))
t1.start()
elif message.startswith('BUILD_TABLES:'):
try:
cc = message.split(':')
treeName = cc[1]
shot = int(cc[2])
t = MDSplus.Tree(treeName, shot)
self.buildTables(t)
except :
traceback.print_exc()
# return True if the dispatching confition is satisfied
def checkDispatch(self, actionNode, inIdent):
action = actionNode.getData()
dispatch = action.getDispatch()
when = dispatch.getWhen()
return self.checkDone(when, inIdent)
def checkDone(self, when, inIdent):
if isinstance(when, MDSplus.TreeNode):
nid = when.getNid()
itemId = self.tree.name+':'+str(self.shot)+':ActionStatus:'+inIdent
print('CHEKING '+itemId + ' '+ nidToPath(nid, self.tree))
status = (red.hget(itemId, nidToPath(nid, self.tree))).decode('utf8')
return status == str(ACTION_DONE)
if isinstance(when, MDSplus.Compound):
opcode = when.getOpcode()
if opcode == treeDEPENDENCY_AND or opcode == treeLOGICAL_AND:
return self.checkDone(when.getArgumentAt(0)) and self.checkDone(when.getArgumentAt(1))
if opcode == treeDEPENDENCY_OR or opcode == treeLOGICAL_OR:
return self.checkDone(when.getArgumentAt(0)) or self.checkDone(when.getArgumentAt(1))
if opcode == treeDEPENDENCY_OF or opcode == treeLOGICAL_OF:
nid = when.getArgumentAt(1).getNid()
updateMsg = when.getArgumentAt(0).data()
itemId = self.tree.name+':'+str(self.shot)+':ActionStatus:'+self.ident
statuses = str(red.hget(itemId, nidToPath(nid, self.tree)).decode('utf8')).split(':')
return len(statuses) == 2 and statuses[0] == str(ACTION_STREAMING) and statuses[1] == updateMsg
return False
def doUpdate(dispatchTable, actionNidStr, tree, inIdent):
if actionNidStr in dispatchTable.inDependency.keys():
affectedNids = dispatchTable.inDependency[actionNidStr]
toDoNids = []
actionUpdateNids = []
for affectedNid in affectedNids:
affectedNode = MDSplus.TreeNode(affectedNid, tree)
if dispatchTable.checkDispatch(affectedNode, inIdent):
# Action updates are not picked by the shared redis memory, but are always passed to the action executor
# only the action executor for the action server currently running it will delievr the action update
if isinstance(affectedNode.getData().getTask(), MDSplus.String):
actionUpdateNids.append(affectedNid)
else:
toDoNids.append(affectedNid)
if len(toDoNids) + len(actionUpdateNids) > 0:
while True:
currActionNid = pickNotYetDispatched(dispatchTable.experiment, dispatchTable.shot, dispatchTable.ident, toDoNids, tree)
if currActionNid < 0:
break
########################
currActionNode = MDSplus.TreeNode(currActionNid, dispatchTable.tree)
dispatchTable.actionExecutor.doAction(dispatchTable.ident, dispatchTable.tree, currActionNode, dispatchTable.actTimeout[currActionNid])
###########################
dispatchTable.updateMutex.acquire()
if currActionNid in dispatchTable.outDependency.keys():
for outIdent in dispatchTable.outDependency[currActionNid]:
red.publish('COMMAND:'+str(outIdent), 'UPDATE:'+nidToPath(currActionNid, tree) + ' '+ dispatchTable.ident)
#red.publish('COMMAND:'+dispatchTable.ident, 'UPDATE:'+str(currActionNid))
dispatchTable.updateMutex.release()
# do the same for action updates
for currActionNid in actionUpdateNids:
########################
currActionNode = MDSplus.TreeNode(currActionNid, dispatchTable.tree)
dispatchTable.actionExecutor.doAction(dispatchTable.ident, dispatchTable.tree, currActionNode, dispatchTable.actTimeout[currActionNid])
###########################
# action updates DO NOT TRIGGER other actions
###########################
def doPhase(dispatchTable, phase):
seqNums = []
for seqNum in dispatchTable.seqActions[phase].keys():
seqNums.append(seqNum)
seqNums.sort()
for currSeqNum in seqNums:
actionNids = []
for action in dispatchTable.seqActions[phase][currSeqNum]:
actionNids.append(action)
# actionNids = dispatchTable.seqActions[phase][currSeqNum].copy()
while True:
currActionNid = pickNotYetDispatched(dispatchTable.experiment, dispatchTable.shot, dispatchTable.ident, actionNids, dispatchTable.tree)
if currActionNid == -1: #all actions in the list dispatched but some not yet finished
while currActionNid != -2:
time.sleep(0.1)
currActionNid = pickNotYetDispatched(dispatchTable.experiment, dispatchTable.shot, dispatchTable.ident, actionNids, dispatchTable.tree)
if currActionNid == -2:
break
currActionNode = MDSplus.TreeNode(currActionNid, dispatchTable.tree)
actionNids.remove(currActionNid)
######################
dispatchTable.actionExecutor.doAction(dispatchTable.ident, dispatchTable.tree, currActionNode, dispatchTable.actTimeout[currActionNid])
###########################
dispatchTable.updateMutex.acquire()
if currActionNid in dispatchTable.outDependency.keys():
for outIdent in dispatchTable.outDependency[currActionNid]:
print('ORA PUBBLICO '+'UPDATE:'+str(currActionNid) +' A '+'COMMAND:'+outIdent)
red.publish('COMMAND:'+str(outIdent), 'UPDATE:'+nidToPath(currActionNid, dispatchTable.tree)+' '+dispatchTable.ident)
dispatchTable.updateMutex.release()
#################################################################
# aCTION STUFF
#################################################################
class ActionExecutor:
class Worker(threading.Thread):
def __init__(self, tree, node, dispatchTable):
# super().__init__()
super(ActionExecutor.Worker, self).__init__()
self.tree = tree
self.node = node
self.dispatchTable = dispatchTable
self.treename=tree.name
self.shotnumber = tree.shot
self.path = node.fullpath
def run(self):
self.tree = MDSplus.Tree(self.treename, self.shotnumber)
self.node = self.tree.getNode(self.path)
task = self.node.getData().getTask()
if isinstance(task, MDSplus.Program) or isinstance(task, MDSplus.Procedure or isinstance(task, MDSplus.Routine)):
self.tree.tcl('do '+self.path)
self.status = 1
elif isinstance(task, MDSplus.Method):
try:
print('Doing '+self.node.getFullPath())
self.status = task.getObject().doMethod(task.getMethod())
print('Done '+self.node.getFullPath())
print(self.status)
if self.status == None:
self.status = 1
else:
self.status = self.status.data()
except Exception as exc:
try:
traceback.print_exc(exc)
except:
pass
self.status = 0
else:
try:
self.status = int(task.data())
except Exception as exc:
# traceback.print_exc(exc)
self.status = 0
class StreamedWorker(threading.Thread):
def __init__(self, tree, device, actionName, actionNid, actionPath, dispatchTable, streamedWorkers, objectNid):
super().__init__()
self.tree = tree
self.device = device
self.actionName = actionName
self.actionNid = actionNid
self.actionPath = actionPath
self.dispatchTable = dispatchTable
self.streamedWorkers = streamedWorkers
self.objectNid = objectNid
self.updateAction = ''
def run(self):
updateStatus(self.tree.name, self.tree.shot, self.dispatchTable.ident, self.actionNid, self.actionPath, str(ACTION_STREAMING))
try:
status = self.device.doMethod(self.actionName+'_init')
if status == None:
status = 1
if (status % 2) == 0:
self.dispatchTable.updateMutex.acquire()
updateStatus(self.tree.name, self.tree.shot, self.dispatchTable.ident, self.actionNid, self.actionPath, ACTION_ERROR, self.tree)
self.dispatchTable.updateMutex.release()
return
if self.actionNid in self.dispatchTable.completionEvent.keys():
completionEvent = self.dispatchTable.completionEvent[self.actionNid]
else:
completionEvent = ''
while(True):
if self.updateAction != '':
try:
retDict = self.device.doMethod(self.actionName+'_'+self.updateAction)
if 'status' in retDict.keys() and retDict['status'] % 2 == 0:
self.dispatchTable.updateMutex.acquire()
updateStatus(self.tree.name, self.tree.shot, self.dispatchTable.ident, self.actionNid, self.actionPath, ACTION_ERROR)
self.dispatchTable.updateMutex.release()
return
self.updateAction = ''
except Exception as exc:
print('Error in Action update '+self.updateAction+': '+str(exc))
self.updateAction = ''
traceback.print_exception(exc)
retDict = self.device.doMethod(self.actionName+'_step')
if 'status' in retDict.keys() and retDict['status'] % 2 == 0:
self.dispatchTable.updateMutex.acquire()
updateStatus(self.tree.name, self.tree.shot, self.dispatchTable.ident, self.actionNid, self.actionPath, ACTION_ERROR)
self.dispatchTable.updateMutex.release()
return
if completionEvent != '':
MDSplus.Event.setevent(completionEvent)
if 'update' in retDict.keys():
self.dispatchTable.updateMutex.acquire()
updateStatus(self.tree.name, self.tree.shot, self.dispatchTable.ident, self.actionNid,self.actionPath, str(ACTION_STREAMING)+':'+retDict['update'], self.tree)
if self.actionNid in self.dispatchTable.outDependency.keys():
for outIdent in self.dispatchTable.outDependency[self.actionNid]:
red.publish('COMMAND:'+self.dispatchTable.ident, 'UPDATE:'+nidToPath(self.actionNid, self.tree)+' '+self.dispatchTable.ident)
self.dispatchTable.updateMutex.release()
if 'is_last' in retDict.keys() and retDict['is_last']:
try:
status = self.device.doMethod(self.actionName+'_finish')
if status == None:
status = 1
else:
status = status.data()
except:
status = 0
self.dispatchTable.updateMutex.acquire()
if (status % 2) == 0:
updateStatus(self.tree.name, self.tree.shot, self.dispatchTable.ident, self.actionNid, self.actionPath, ACTION_ERROR, self.tree)
else:
updateStatus(self.tree.name, self.tree.shot, self.dispatchTable.ident, self.actionNid, self.actionPath, ACTION_DONE, self.tree)
if self.actionNid in self.dispatchTable.outDependency.keys():
for outIdent in self.dispatchTable.outDependency[self.actionNid]:
red.publish('COMMAND:'+self.dispatchTable.ident, 'UPDATE:'+nidToPath(self.actionNid, self.tree))
if completionEvent != '':
MDSplus.Event.setevent(completionEvent)
del self.streamedWorkers[int(self.objectNid)]
self.dispatchTable.updateMutex.release()
return
except Exception as exc:
traceback.print_exc(exc)
def __init__(self, dispatchTable):
self.mutex = threading.Lock()
self.dispatchTable = dispatchTable
self.streamedWorkers = {}
def isStreamed(self, tree, actionNode):
if not isinstance(actionNode.getData().getTask(), MDSplus.Method):
return False
if actionNode.getExtendedAttribute('is_streamed') == 'yes':
return True
return False
def doAction(self, ident, tree, node, timeout = 0):
if self.isStreamed(tree, node):
self.doActionStreamed(ident, tree, node)
elif isinstance(node.getData().getTask(), MDSplus.String):
# the action update will be executed only if the target action is currently running
parentAct = node.getParent()
if parentAct.getLength() > 0 and isinstance(parentAct.getData(), MDSplus.Action) and isinstance(parentAct.getData().getTask(), MDSplus.Method):
targetActNid = (parentAct.getData().getTask().getObject()).getNid()
if targetActNid in self.streamedWorkers.keys():
self.streamedWorkers[targetActNid].updateAction = node.getData().getTask().data()
updateStatus(tree.name, tree.shot, ident, node.getNid(), node.getPath(), ACTION_DONE, tree)
# else: --Do Not update status if not found singe another instance of this ident may do the stuff
# updateStatus(tree.name, tree.shot, ident, node.getNid(), node.getPath(), ACTION_ERROR)
else:
return self.doActionNonStreamed(ident, tree, node, timeout)
def doActionStreamed(self, ident, tree, node):
task = node.getData().getTask()
worker = self.StreamedWorker(tree, task.getObject(), task.getMethod().data(), node.getNid(), node.getPath(), self.dispatchTable, self.streamedWorkers, task.getObject().getNid())
self.streamedWorkers[task.getObject().getNid()] = worker
worker.start()
def doActionNonStreamed(self, ident, tree, node, timeout = 0):
self.mutex.acquire()
worker = self.Worker(tree, node, self.dispatchTable)
self.dispatchTable.updateMutex.acquire()
updateStatus(tree.name, tree.shot, ident, node.getNid(), node.getPath(), ACTION_DOING, tree)
self.dispatchTable.updateMutex.release()
worker.start()
actionTime = 0.
abortTime = 0.5
while True:
worker.join(timeout = 0.1)
if not worker.is_alive():
self.dispatchTable.updateMutex.acquire()
if (worker.status % 2) == 0:
updateStatus(tree.name, tree.shot, ident, node.getNid(), node.getPath(), ACTION_ERROR, tree)
else:
updateStatus(tree.name, tree.shot, ident, node.getNid(), node.getPath(), ACTION_DONE, tree)
if node.getNid() in self.dispatchTable.completionEvent.keys():
MDSplus.Event.setevent(self.dispatchTable.completionEvent[node.getNid()])
self.dispatchTable.updateMutex.release()
self.mutex.release()
return worker.status
actionTime += 0.1
if timeout > 0 and actionTime > timeout:
print('TIMEOUT')
self.dispatchTable.updateMutex.acquire()
updateStatus(tree.name, tree.shot, ident, node.getNid(), node.getPath(), ACTION_TIMEOUT, tree)
self.dispatchTable.updateMutex.release()
self.mutex.release()
return 0
if actionTime > abortTime:
abortTime = actionTime + 0.5
if checkAbort(tree.name, tree.shot, ident, node.getFullPath(),tree):
print('ABORT')
self.dispatchTable.updateMutex.acquire()
updateStatus(tree.name, tree.shot, ident, node.getNid(), node.getPath(), ACTION_ABORT, tree)
self.dispatchTable.updateMutex.release()
self.mutex.release()
return 0
import sys
if len(sys.argv) != 2 and len(sys.argv) != 3:
print('usage: python dispatcher.py <server class> [redis server]')
sys.exit(0)
if len(sys.argv) == 2:
red = redis.Redis(host='localhost')
else:
red = redis.Redis(host=sys.argv[2])
act = ActionServer(sys.argv[1])
act.handleCommands()