forked from iliis/crossword
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwaitable_event.py
42 lines (31 loc) · 1.03 KB
/
waitable_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# source: https://lat.sk/2015/02/multiple-event-waiting-python-3/
import os
import select
class WaitableEvent:
"""
Provides an abstract object that can be used to resume select loops with
indefinite waits from another thread or process. This mimics the standard
threading.Event interface.
"""
def __init__(self):
self._read_fd, self._write_fd = os.pipe()
def wait(self, timeout=None):
rfds, wfds, efds = select.select([self._read_fd], [], [], timeout)
return self._read_fd in rfds
def isSet(self):
return self.wait(0)
def clear(self):
if self.isSet():
os.read(self._read_fd, 1)
def set(self):
if not self.isSet():
os.write(self._write_fd, b'1')
def fileno(self):
"""
Return the FD number of the read side of the pipe, allows this object
to be used with select.select().
"""
return self._read_fd
def __del__(self):
os.close(self._read_fd)
os.close(self._write_fd)