Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored threads to ensure clean shutdown #125

Merged
merged 4 commits into from
Nov 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions callattendant/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import queue
import sqlite3
import time
import threading
from pprint import pprint
from shutil import copyfile

Expand All @@ -51,6 +52,9 @@ def __init__(self, config):
# The application-wide configuration
self.config = config

# Thread synchonization object
self._stop_event = threading.Event()

# Open the database
if self.config["TESTING"]:
self.db = sqlite3.connect(":memory:")
Expand Down Expand Up @@ -99,10 +103,11 @@ def handle_caller(self, caller):
pprint(caller)
self._caller_queue.put(caller)

def process_calls(self):
def run(self):
"""
Processes incoming callers by logging, screening, blocking
and/or recording messages.
:returns: exit code 1 on error otherwise 0
"""
# Get relevant config settings
screening_mode = self.config['SCREENING_MODE']
Expand All @@ -114,18 +119,23 @@ def process_calls(self):
permitted_greeting_file = permitted['greeting_file']

# Instruct the modem to start feeding calls into the caller queue
self.modem.handle_calls(self.handle_caller)
self.modem.start(self.handle_caller)

# If testing, allow queue to be filled before processing for clean, readable logs
if self.config["TESTING"]:
time.sleep(1)

# Process incoming calls
while 1:
exit_code = 0
caller = {}
print("Waiting for call...")
while not self._stop_event.is_set():
try:
# Wait (blocking) for a caller
print("Waiting for call...")
caller = self._caller_queue.get()
try:
caller = self._caller_queue.get(True, 3.0)
except queue.Empty:
continue

# An incoming call has occurred, log it
number = caller["NMBR"]
Expand Down Expand Up @@ -198,10 +208,27 @@ def process_calls(self):
if ok_to_answer and len(actions) > 0:
self.answer_call(actions, greeting, call_no, caller)

print("Waiting for next call...")

except KeyboardInterrupt:
print("** User initiated shutdown")
self._stop_event.set()
except Exception as e:
pprint(e)
print("** Error running callattendant. Exiting.")
return 1
print("** Error running callattendant")
self._stop_event.set()
exit_code = 1
return exit_code

def shutdown(self):
print("Shutting down...")
print("-> Stopping modem")
self.modem.stop()
print("-> Stopping voice mail")
self.voice_mail.stop()
print("-> Releasing resources")
self.approved_indicator.close()
self.blocked_indicator.close()

def answer_call(self, actions, greeting, call_no, caller):
"""
Expand Down Expand Up @@ -389,8 +416,12 @@ def main(argv):

# Create and start the application
app = CallAttendant(config)
app.process_calls()
return 0
exit_code = 0
try:
exit_code = app.run()
finally:
app.shutdown()
return exit_code


if __name__ == '__main__':
Expand Down
37 changes: 26 additions & 11 deletions callattendant/hardware/modem.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ def __init__(self, config):
self.config = config
self.model = None

# Thread synchronization object
# Thread synchronization objects
self._stop_event = threading.Event()
self._lock = threading.RLock()
self._thread = None

# Ring notifications
self.ring_indicator = RingIndicator(
Expand All @@ -152,7 +154,9 @@ def __init__(self, config):
self._serial = serial.Serial()

def open_serial_port(self):
"""Detects and opens the serial port attached to the modem."""
"""
Detects and opens the serial port attached to the modem.
"""
# List all the Serial COM Ports on Raspberry Pi
proc = subprocess.Popen(['ls /dev/tty[A-Za-z]*'], shell=True, stdout=subprocess.PIPE)
com_ports = proc.communicate()[0]
Expand Down Expand Up @@ -186,30 +190,41 @@ def open_serial_port(self):
return False

def close_serial_port(self):
"""Closes the serial port attached to the modem."""
print("Closing Serial Port")
"""
Closes the serial port attached to the modem.
"""
print("-> Closing Serial Port")
try:
if self._serial.isOpen():
self._serial.close()
print("Serial Port closed...")
print("-> Serial Port closed")
except Exception as e:
print(e)
print("Error: Unable to close the Serial Port.")
sys.exit()

def handle_calls(self, handle_caller):
def start(self, handle_caller):
"""
Starts the thread that processes incoming data.
:param handle_caller:
A callback function that takes a caller dict object.
"""
self._init_modem()

self.event_thread = threading.Thread(
self._thread = threading.Thread(
target=self._call_handler,
kwargs={'handle_caller': handle_caller})
self.event_thread.name = "modem_call_handler"
self.event_thread.start()
self._thread.name = "modem_call_handler"
self._thread.start()

def stop(self):
"""
Stops the modem thread and releases hardware resources.
"""
self._stop_event.set()
if self._thread:
self._thread.join()
self.ring_indicator.close()

def _call_handler(self, handle_caller):
"""
Expand Down Expand Up @@ -242,7 +257,7 @@ def _call_handler(self, handle_caller):
# This loop reads incoming data from the serial port and
# posts the caller data to the handle_caller function
call_record = {}
while 1:
while not self._stop_event.is_set():
modem_data = b''

# Read from the modem
Expand Down Expand Up @@ -308,7 +323,7 @@ def _call_handler(self, handle_caller):

finally:
if dev_mode:
print("Closing modem log file")
print("-> Closing modem log file")
logfile.close()

def pick_up(self):
Expand Down
20 changes: 15 additions & 5 deletions callattendant/messaging/voicemail.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,33 @@ def __init__(self, db, config, modem):
self.messages = Message(db, config)

# Start the thread that monitors the message events and updates the indicators
self.event_thread = threading.Thread(target=self._event_handler)
self.event_thread.name = "voice_mail_event_handler"
self.event_thread.start()
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._event_handler)
self._thread.name = "voice_mail_event_handler"
self._thread.start()

# Pulse the indicator if an unplayed msg is waiting
self.reset_message_indicator()

if self.config["DEBUG"]:
print("VoiceMail initialized")

def stop(self):
"""
Stops the voice mail thread and releases hardware resources.
"""
self._stop_event.set()
self._thread.join()
self.message_indicator.close()
self.message_count_indicator.close()

def _event_handler(self):
"""
Thread function that updates the message indicators upon a message event.
"""
while 1:
while not self._stop_event.is_set():
# Get the number of unread messages
if self.message_event.wait():
if self.message_event.wait(2.0):
if self.config["DEBUG"]:
print("Message Event triggered")
self.reset_message_indicator()
Expand Down
4 changes: 2 additions & 2 deletions callattendant/userinterface/webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ def get_pagination(**kwargs):
def run_flask(config):
'''
Runs the Flask webapp.
:param database: full path to the callattendant database file
:param config: the application-wide master config object
'''
app.secret_key = get_random_string()
with app.app_context():
Expand All @@ -1007,6 +1007,6 @@ def run_flask(config):
def start(config):
'''
Starts the Flask webapp in a separate thread.
:param database: full path to the callattendant database file
:param config: the application-wide master config object
'''
_thread.start_new_thread(run_flask, (config,))
2 changes: 1 addition & 1 deletion tests/test_modem.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def modem():

yield modem

modem.ring_indicator.close()
modem.stop()


def test_profile_reset(modem):
Expand Down
5 changes: 3 additions & 2 deletions tests/test_voicemail.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ def modem(db, config):
modem = Modem(config)
modem.open_serial_port()
yield modem
modem.ring_indicator.close()
modem.stop()


@pytest.fixture(scope='module')
def voicemail(db, config, modem):

voicemail = VoiceMail(db, config, modem)
return voicemail
yield voicemail
voicemail.stop()


# Skip the test when running under continous integraion
Expand Down