From 6f0084be3ab0a30c30d4ce6c07e115f21844e2a6 Mon Sep 17 00:00:00 2001 From: Bruce Schubert Date: Fri, 6 Nov 2020 17:31:46 -0800 Subject: [PATCH 1/4] Work in progress on shutdown synchronization. --- callattendant/app.py | 32 ++++++++++++++++++++++++-------- callattendant/hardware/modem.py | 21 +++++++++++++++------ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/callattendant/app.py b/callattendant/app.py index 6455955..8826d0e 100755 --- a/callattendant/app.py +++ b/callattendant/app.py @@ -27,6 +27,7 @@ import queue import sqlite3 import time +import threading from pprint import pprint from shutil import copyfile @@ -51,6 +52,9 @@ def __init__(self, config): # The application-wide configuration self.config = config + # Thread synchonization object + self._stop_event = threading.Event() + # Open the database if self.config["TESTING"]: self.db = sqlite3.connect(":memory:") @@ -99,7 +103,7 @@ def handle_caller(self, caller): pprint(caller) self._caller_queue.put(caller) - def process_calls(self): + def run(self): """ Processes incoming callers by logging, screening, blocking and/or recording messages. @@ -114,14 +118,15 @@ def process_calls(self): permitted_greeting_file = permitted['greeting_file'] # Instruct the modem to start feeding calls into the caller queue - self.modem.handle_calls(self.handle_caller) + self.modem.start(self.handle_caller) # If testing, allow queue to be filled before processing for clean, readable logs if self.config["TESTING"]: time.sleep(1) # Process incoming calls - while 1: + exit_code = 0 + while not self._stop_event.isSet(): try: # Wait (blocking) for a caller print("Waiting for call...") @@ -197,11 +202,16 @@ def process_calls(self): # Answer the call! if ok_to_answer and len(actions) > 0: self.answer_call(actions, greeting, call_no, caller) - + except KeyboardInterrupt: + print("User requested shutdown") + self._stop_event.set() except Exception as e: pprint(e) - print("** Error running callattendant. Exiting.") - return 1 + print("** Error running callattendant.") + self._stop_event.set() + exit_code = 1 + print("Exiting") + return exit_code def answer_call(self, actions, greeting, call_no, caller): """ @@ -389,8 +399,14 @@ def main(argv): # Create and start the application app = CallAttendant(config) - app.process_calls() - return 0 + exit_code = 0 + try: + exit_code = app.run() + finally: + print("Stopping modem") + app.modem.stop() + print("Bye!") + return exit_code if __name__ == '__main__': diff --git a/callattendant/hardware/modem.py b/callattendant/hardware/modem.py index ae78653..2b2e1e2 100644 --- a/callattendant/hardware/modem.py +++ b/callattendant/hardware/modem.py @@ -139,8 +139,10 @@ def __init__(self, config): self.config = config self.model = None - # Thread synchronization object + # Thread synchronization objects + self._stop_event = threading.Event() self._lock = threading.RLock() + self._thread = None # Ring notifications self.ring_indicator = RingIndicator( @@ -197,7 +199,7 @@ def close_serial_port(self): print("Error: Unable to close the Serial Port.") sys.exit() - def handle_calls(self, handle_caller): + def start(self, handle_caller): """ Starts the thread that processes incoming data. :param handle_caller: @@ -205,11 +207,18 @@ def handle_calls(self, handle_caller): """ self._init_modem() - self.event_thread = threading.Thread( + self._thread = threading.Thread( target=self._call_handler, kwargs={'handle_caller': handle_caller}) - self.event_thread.name = "modem_call_handler" - self.event_thread.start() + self._thread.name = "modem_call_handler" + self._thread.start() + + def stop(self): + """ + Stops the modem thread. Called by the app. + """ + self._stop_event.set() + self._thread.join() def _call_handler(self, handle_caller): """ @@ -242,7 +251,7 @@ def _call_handler(self, handle_caller): # This loop reads incoming data from the serial port and # posts the caller data to the handle_caller function call_record = {} - while 1: + while not self._stop_event.isSet(): modem_data = b'' # Read from the modem From c696987cac72c81a29e10a02baf3f495b0b6b4a8 Mon Sep 17 00:00:00 2001 From: Bruce Schubert Date: Sat, 7 Nov 2020 10:29:11 -0800 Subject: [PATCH 2/4] Fixed comments --- callattendant/userinterface/webapp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/callattendant/userinterface/webapp.py b/callattendant/userinterface/webapp.py index 0c49720..1bec2dd 100644 --- a/callattendant/userinterface/webapp.py +++ b/callattendant/userinterface/webapp.py @@ -984,7 +984,7 @@ def get_pagination(**kwargs): def run_flask(config): ''' Runs the Flask webapp. - :param database: full path to the callattendant database file + :param config: the application-wide master config object ''' app.secret_key = get_random_string() with app.app_context(): @@ -1007,6 +1007,6 @@ def run_flask(config): def start(config): ''' Starts the Flask webapp in a separate thread. - :param database: full path to the callattendant database file + :param config: the application-wide master config object ''' _thread.start_new_thread(run_flask, (config,)) From e6caf7465b032c5b888ac4c84496f129e0d3d00c Mon Sep 17 00:00:00 2001 From: Bruce Schubert Date: Sat, 7 Nov 2020 10:29:32 -0800 Subject: [PATCH 3/4] Refactored thread handling to ensure clean a shutdown when app exits --- callattendant/app.py | 33 ++++++++++++++++++++-------- callattendant/hardware/modem.py | 22 ++++++++++++------- callattendant/messaging/voicemail.py | 20 ++++++++++++----- tests/test_modem.py | 2 +- tests/test_voicemail.py | 5 +++-- 5 files changed, 57 insertions(+), 25 deletions(-) diff --git a/callattendant/app.py b/callattendant/app.py index 8826d0e..73b917d 100755 --- a/callattendant/app.py +++ b/callattendant/app.py @@ -107,6 +107,7 @@ def run(self): """ Processes incoming callers by logging, screening, blocking and/or recording messages. + :returns: exit code 1 on error otherwise 0 """ # Get relevant config settings screening_mode = self.config['SCREENING_MODE'] @@ -126,11 +127,15 @@ def run(self): # Process incoming calls exit_code = 0 - while not self._stop_event.isSet(): + caller = {} + print("Waiting for call...") + while not self._stop_event.is_set(): try: # Wait (blocking) for a caller - print("Waiting for call...") - caller = self._caller_queue.get() + try: + caller = self._caller_queue.get(False, 3.0) + except queue.Empty: + continue # An incoming call has occurred, log it number = caller["NMBR"] @@ -202,17 +207,29 @@ def run(self): # Answer the call! if ok_to_answer and len(actions) > 0: self.answer_call(actions, greeting, call_no, caller) + + print("Waiting for next call...") + except KeyboardInterrupt: - print("User requested shutdown") + print("** User initiated shutdown") self._stop_event.set() except Exception as e: pprint(e) - print("** Error running callattendant.") + print("** Error running callattendant") self._stop_event.set() exit_code = 1 - print("Exiting") return exit_code + def shutdown(self): + print("Shutting down...") + print("-> Stopping modem") + self.modem.stop() + print("-> Stopping voice mail") + self.voice_mail.stop() + print("-> Releasing resources") + self.approved_indicator.close() + self.blocked_indicator.close() + def answer_call(self, actions, greeting, call_no, caller): """ Answer the call with the supplied actions, e.g, voice mail, @@ -403,9 +420,7 @@ def main(argv): try: exit_code = app.run() finally: - print("Stopping modem") - app.modem.stop() - print("Bye!") + app.shutdown() return exit_code diff --git a/callattendant/hardware/modem.py b/callattendant/hardware/modem.py index 2b2e1e2..f9d2fbe 100644 --- a/callattendant/hardware/modem.py +++ b/callattendant/hardware/modem.py @@ -154,7 +154,9 @@ def __init__(self, config): self._serial = serial.Serial() def open_serial_port(self): - """Detects and opens the serial port attached to the modem.""" + """ + Detects and opens the serial port attached to the modem. + """ # List all the Serial COM Ports on Raspberry Pi proc = subprocess.Popen(['ls /dev/tty[A-Za-z]*'], shell=True, stdout=subprocess.PIPE) com_ports = proc.communicate()[0] @@ -188,12 +190,14 @@ def open_serial_port(self): return False def close_serial_port(self): - """Closes the serial port attached to the modem.""" - print("Closing Serial Port") + """ + Closes the serial port attached to the modem. + """ + print("-> Closing Serial Port") try: if self._serial.isOpen(): self._serial.close() - print("Serial Port closed...") + print("-> Serial Port closed") except Exception as e: print(e) print("Error: Unable to close the Serial Port.") @@ -215,10 +219,12 @@ def start(self, handle_caller): def stop(self): """ - Stops the modem thread. Called by the app. + Stops the modem thread and releases hardware resources. """ self._stop_event.set() - self._thread.join() + if self._thread: + self._thread.join() + self.ring_indicator.close() def _call_handler(self, handle_caller): """ @@ -251,7 +257,7 @@ def _call_handler(self, handle_caller): # This loop reads incoming data from the serial port and # posts the caller data to the handle_caller function call_record = {} - while not self._stop_event.isSet(): + while not self._stop_event.is_set(): modem_data = b'' # Read from the modem @@ -317,7 +323,7 @@ def _call_handler(self, handle_caller): finally: if dev_mode: - print("Closing modem log file") + print("-> Closing modem log file") logfile.close() def pick_up(self): diff --git a/callattendant/messaging/voicemail.py b/callattendant/messaging/voicemail.py index 1c96020..a99e6ab 100644 --- a/callattendant/messaging/voicemail.py +++ b/callattendant/messaging/voicemail.py @@ -60,9 +60,10 @@ def __init__(self, db, config, modem): self.messages = Message(db, config) # Start the thread that monitors the message events and updates the indicators - self.event_thread = threading.Thread(target=self._event_handler) - self.event_thread.name = "voice_mail_event_handler" - self.event_thread.start() + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._event_handler) + self._thread.name = "voice_mail_event_handler" + self._thread.start() # Pulse the indicator if an unplayed msg is waiting self.reset_message_indicator() @@ -70,13 +71,22 @@ def __init__(self, db, config, modem): if self.config["DEBUG"]: print("VoiceMail initialized") + def stop(self): + """ + Stops the voice mail thread and releases hardware resources. + """ + self._stop_event.set() + self._thread.join() + self.message_indicator.close() + self.message_count_indicator.close() + def _event_handler(self): """ Thread function that updates the message indicators upon a message event. """ - while 1: + while not self._stop_event.is_set(): # Get the number of unread messages - if self.message_event.wait(): + if self.message_event.wait(2.0): if self.config["DEBUG"]: print("Message Event triggered") self.reset_message_indicator() diff --git a/tests/test_modem.py b/tests/test_modem.py index 0ad1227..43b70ff 100644 --- a/tests/test_modem.py +++ b/tests/test_modem.py @@ -60,7 +60,7 @@ def modem(): yield modem - modem.ring_indicator.close() + modem.stop() def test_profile_reset(modem): diff --git a/tests/test_voicemail.py b/tests/test_voicemail.py index 0b92232..d5e5ec1 100644 --- a/tests/test_voicemail.py +++ b/tests/test_voicemail.py @@ -71,14 +71,15 @@ def modem(db, config): modem = Modem(config) modem.open_serial_port() yield modem - modem.ring_indicator.close() + modem.stop() @pytest.fixture(scope='module') def voicemail(db, config, modem): voicemail = VoiceMail(db, config, modem) - return voicemail + yield voicemail + voicemail.stop() # Skip the test when running under continous integraion From 54f2e5e979ea9f387e8f79bda0d5486aad2f6650 Mon Sep 17 00:00:00 2001 From: Bruce Schubert Date: Sat, 7 Nov 2020 12:38:58 -0800 Subject: [PATCH 4/4] Fixed caller queue handling (restored blocking) --- callattendant/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/callattendant/app.py b/callattendant/app.py index 73b917d..940ce33 100755 --- a/callattendant/app.py +++ b/callattendant/app.py @@ -133,7 +133,7 @@ def run(self): try: # Wait (blocking) for a caller try: - caller = self._caller_queue.get(False, 3.0) + caller = self._caller_queue.get(True, 3.0) except queue.Empty: continue