-
Notifications
You must be signed in to change notification settings - Fork 0
/
threadQueueProvider.py
73 lines (60 loc) · 1.84 KB
/
threadQueueProvider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import threading, Queue, random, time
class ThreadQueueProvider(threading.Thread):
def __init__(self, maxSize=1, valueFunction=None):
threading.Thread.__init__(self)
self.maxSize = maxSize
self.queue = Queue.Queue(maxsize=self.maxSize)
self._isRunning = False
self.flushQueue = True
self.queuePushTimeout = 1
if valueFunction is not None:
self._getVal = valueFunction
def run(self):
"""Thread start override"""
print "Starting ThreadQueueProvider"
self._isRunning = True
self._initQueue()
self._loop()
if self.flushQueue:
self._flushQueue()
#
# Internal Methods
#
def _initQueue(self):
"""stuff queue full of valid vals so it provides"""
for i in range(self.maxSize):
try:
self.queue.put(self._getVal(), False)
except Queue.Full: #ignore full exception, just stuff it (this shouldnt ever get hit)
pass
def _loop(self):
nextVal = self._getVal()
while self._isRunning:
try:
self.queue.put(nextVal, True, self.queuePushTimeout)
except Queue.Full:
pass
else:
nextVal = self._getVal()
def _flushQueue(self):
"""stuff queue full of empty vals to avoid blocking queue listening on a dead thread"""
for i in range(self.maxSize):
try:
self.queue.put(0, False)
except Queue.Full: #ignore full exception, just stuff it
pass
def _getVal(self):
"""Internal method to be overwritten in subclass, or re-set using init arg valueFunction"""
return None
#
# External Methods
#
def getVal(self, block=True, timeout=None):
"""
Gets the latest value from the Queue and returns it, saves requiring queue referencing in other thread
Allows similar arguments as Queue.get()
"""
return self.queue.get(block, timeout)
def close(self):
"""Close the running of the thread - in its own time, which may be up to self.queuePushTimeout seconds long"""
self._isRunning = False