diff --git a/callattendant/hardware/modem.py b/callattendant/hardware/modem.py index aac7747..4fcb223 100644 --- a/callattendant/hardware/modem.py +++ b/callattendant/hardware/modem.py @@ -3,7 +3,7 @@ # # file: modem.py # -# Copyright 2018 Bruce Schubert +# Copyright 2018-2020 Bruce Schubert # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -57,21 +57,29 @@ # Modem AT commands: # See http://support.usr.com/support/5637/5637-ug/ref_data.html RESET = "ATZ" -FACTORY_RESET = "ATZ3" +RESET_PROFILE = "ATZ0" GET_MODEM_PRODUCT_CODE = "ATI0" -DISPLAY_MODEM_SETTINGS = "ATI4" -ENABLE_ECHO_COMMANDS = "ATE1" +GET_MODEM_SETTINGS = "AT&V" DISABLE_ECHO_COMMANDS = "ATE0" +ENABLE_ECHO_COMMANDS = "ATE1" ENABLE_FORMATTED_CID = "AT+VCID=1" ENABLE_VERBOSE_CODES = "ATV1" DISABLE_SILENCE_DETECTION = "AT+VSD=128,0" +DISABLE_SILENCE_DETECTION_ZOOM = "AT+VSD=0,0" ENABLE_SILENCE_DETECTION_5_SECS = "AT+VSD=128,50" +ENABLE_SILENCE_DETECTION_5_SECS_ZOOM = "AT+VSD=0,50" ENABLE_SILENCE_DETECTION_10_SECS = "AT+VSD=128,100" +ENABLE_SILENCE_DETECTION_10_SECS_ZOOM = "AT+VSD=0,100" ENTER_VOICE_MODE = "AT+FCLASS=8" -ENTER_TELEPHONE_ANSWERING_DEVICE_OFF_HOOK = "AT+VLS=1" # DCE off-hook, connected to telco. -ENTER_VOICE_TRANSMIT_DATA_STATE = "AT+VTX" ENTER_VOICE_RECIEVE_DATA_STATE = "AT+VRX" -SET_VOICE_COMPRESSION_8BIT_SAMPLING_8K = "AT+VSM=128,8000" # 128 = 8-bit linear, 8.0 kHz +ENTER_VOICE_TRANSMIT_DATA_STATE = "AT+VTX" +SEND_VOICE_TONE_BEEP = "AT+VTS=[933,900,120]" # 1.2 second beep +GET_VOICE_COMPRESSION_SETTING = "AT+VSM?" +GET_VOICE_COMPRESSION_OPTIONS = "AT+VSM=?" +SET_VOICE_COMPRESSION = "AT+VSM=128,8000" # USR 5637: 128 = 8-bit linear, 8.0 kHz +SET_VOICE_COMPRESSION_ZOOM = "AT+VSM=1,8000,0,0" # Zoom 3095: 1 = 8-bit unsigned pcm, 8.0 kHz +TELEPHONE_ANSWERING_DEVICE_OFF_HOOK = "AT+VLS=1" # TAD (DCE) off-hook, connected to telco +TELEPHONE_ANSWERING_DEVICE_ON_HOOK = "AT+VLS=0" # TAD (DCE) on-hook GO_OFF_HOOK = "ATH1" GO_ON_HOOK = "ATH0" TERMINATE_CALL = "ATH" @@ -82,17 +90,23 @@ DCE_FAX_CALLING_TONE = (chr(16) + chr(99)).encode() # -c DCE_DIAL_TONE = (chr(16) + chr(100)).encode() # -d DCE_DATA_CALLING_TONE = (chr(16) + chr(101)).encode() # -e +DCE_LINE_REVERSAL = (chr(16) + chr(108)).encode() # -l DCE_PHONE_ON_HOOK = (chr(16) + chr(104)).encode() # -h DCE_PHONE_OFF_HOOK = (chr(16) + chr(72)).encode() # -H +DCE_PHONE_OFF_HOOK2 = (chr(16) + chr(80)).encode() # -P Zoom +DCE_QUIET_DETECTED = (chr(16) + chr(113)).encode() # -q Zoom DCE_RING = (chr(16) + chr(82)).encode() # -R DCE_SILENCE_DETECTED = (chr(16) + chr(115)).encode() # -s +DCE_TX_BUFFER_UNDERRUN = (chr(16) + chr(117)).encode() # -u DCE_END_VOICE_DATA_TX = (chr(16) + chr(3)).encode() # -# System DLE shielded codes - DTE to DCE commands -DTE_RAISE_VOLUME = (chr(16) + chr(117)) # -u -DTE_LOWER_VOLUME = (chr(16) + chr(100)) # -d -DTE_END_VOICE_DATA_TX = (chr(16) + chr(3)) # -DTE_END_RECIEVE_DATA_STATE = (chr(16) + chr(33)) # -! +# System DLE shielded codes (single DLE) - DTE to DCE commands (used by USR 5637 modem) +DTE_RAISE_VOLUME = (chr(16) + chr(117)) # -u +DTE_LOWER_VOLUME = (chr(16) + chr(100)) # -d +DTE_END_VOICE_DATA_RX = (chr(16) + chr(33)) # -! +DTE_END_VOICE_DATA_RX2 = (chr(16) + chr(94)) # -^ Zoom +DTE_END_VOICE_DATA_TX = (chr(16) + chr(3)) # +DTE_CLEAR_TRANSMIT_BUFFER = (chr(16) + chr(24)) # # Return codes CRLF = (chr(13) + chr(10)).encode() @@ -102,9 +116,10 @@ TEST_DATA = [ b"RING", b"DATE=0801", b"TIME=1801", b"NMBR=8055554567", b"NAME=Test1 - Permitted", b"RING", b"RING", b"RING", b"RING", - b"RING", b"DATE=0801", b"TIME=1800", b"NMBR=5551234567", b"NAME=Test2 - Spammer", - b"RING", b"DATE=0801", b"TIME=1802", b"NMBR=3605554567", b"NAME=Test3 - Blocked", - b"RING", b"DATE=0801", b"TIME=1802", b"NMBR=8005554567", b"NAME=V123456789012345", + b"RING", b"DATE=0802", b"TIME=1802", b"NMBR=5551234567", b"NAME=Test2 - Spammer", + b"RING", b"DATE=0803", b"TIME=1803", b"NMBR=3605554567", b"NAME=Test3 - Blocked", + b"RING", b"DATE=0804", b"TIME=1804", b"NMBR=8005554567", b"NAME=V123456789012345", + b"RING", b"DATE = 0805", b"TIME = 1805", b"NMBR = 8055554567", b"NAME = Test5 - Permitted", ] @@ -306,8 +321,8 @@ def pick_up(self): if not self._send(DISABLE_SILENCE_DETECTION): raise RuntimeError("Failed to disable silence detection.") - if not self._send(ENTER_TELEPHONE_ANSWERING_DEVICE_OFF_HOOK): - raise RuntimeError("Unable put modem into TAD mode.") + if not self._send(TELEPHONE_ANSWERING_DEVICE_OFF_HOOK): + raise RuntimeError("Unable put modem into telephone answering device mode.") # Flush any existing input outout data from the buffers # self._serial.flushInput() @@ -359,8 +374,8 @@ def hang_up(self): def play_audio(self, audio_file_name): """ Play the given audio file. - :param audio_file_name: a wav file with 8-bit linear - compression recored at 8.0 kHz sampling rate + :param audio_file_name: + a wav file with 8-bit linear compression recored at 8.0 kHz sampling rate """ if self.config["DEBUG"]: print("> Playing {}...".format(audio_file_name)) @@ -372,39 +387,36 @@ def play_audio(self, audio_file_name): if not self._send(ENTER_VOICE_MODE): print("* Error: Failed to put modem into voice mode.") return False - if not self._send(SET_VOICE_COMPRESSION_8BIT_SAMPLING_8K): + if not self._send(SET_VOICE_COMPRESSION): print("* Error: Failed to set compression method and sampling rate specifications.") return False - if not self._send(ENTER_TELEPHONE_ANSWERING_DEVICE_OFF_HOOK): - print("* Error: Unable put modem into TAD mode.") + if not self._send(TELEPHONE_ANSWERING_DEVICE_OFF_HOOK): + print("* Error: Unable put modem into telephone answering device mode.") return False if not self._send(ENTER_VOICE_TRANSMIT_DATA_STATE, "CONNECT"): - print("* Error: Unable put modem into TAD data transmit state.") + print("* Error: Unable put modem into voice data transmit state.") return False # Play Audio File - with wave.open(audio_file_name, 'rb') as wf: + with wave.open(audio_file_name, 'rb') as wavefile: sleep_interval = .12 # 120ms; You may need to change to smooth-out audio chunk = 1024 - data = wf.readframes(chunk) + data = wavefile.readframes(chunk) while data != b'': self._serial.write(data) - data = wf.readframes(chunk) - time.sleep(sleep_interval) + data = wavefile.readframes(chunk) + # ~ time.sleep(sleep_interval) - # self._serial.flushInput() - # self._serial.flushOutput() - - self._send(DTE_END_VOICE_DATA_TX) + self._send(DTE_END_VOICE_DATA_TX) return True def record_audio(self, audio_file_name): """ Records audio from the model to the given audio file. - :param audio_file_name: the wav file to be created with the - recorded audio; recored with 8-bit linear compression - at 8.0 kHz sampling rate + :param audio_file_name: + the wav file to be created with the recorded audio; + recorded with 8-bit linear compression at 8.0 kHz sampling rate """ if self.config["DEBUG"]: print("> Recording {}...".format(audio_file_name)) @@ -417,20 +429,16 @@ def record_audio(self, audio_file_name): if not self._send(ENTER_VOICE_MODE): raise RuntimeError("Failed to put modem into voice mode.") - if not self._send("AT+VGT=128"): - raise RuntimeError("Failed to set speaker volume to normal.") - - if not self._send(SET_VOICE_COMPRESSION_8BIT_SAMPLING_8K): + if not self._send(SET_VOICE_COMPRESSION): raise RuntimeError("Failed to set compression method and sampling rate specifications.") if not self._send(DISABLE_SILENCE_DETECTION): raise RuntimeError("Failed to disable silence detection.") - if not self._send(ENTER_TELEPHONE_ANSWERING_DEVICE_OFF_HOOK): - raise RuntimeError("Unable put modem into TAD mode.") + if not self._send(TELEPHONE_ANSWERING_DEVICE_OFF_HOOK): + raise RuntimeError("Unable put modem (TAD) off hook.") - # Play 1.2 beep - if not self._send("AT+VTS=[933,900,120]"): + if not self._send(SEND_VOICE_TONE_BEEP): raise RuntimeError("Failed to play 1.2 second beep.") if not self._send(ENABLE_SILENCE_DETECTION_5_SECS): @@ -452,29 +460,34 @@ def record_audio(self, audio_file_name): audio_data = self._serial.read(CHUNK) + # Check if is in the stream + if (DCE_END_VOICE_DATA_TX in audio_data): + print(">> Char Recieved... Stop recording.") + break + # Check if s is in the stream + if (DCE_SILENCE_DETECTED in audio_data): + print(">> Silence Detected... Stop recording.") + break + # Check if q is in the stream + if (DCE_QUIET_DETECTED in audio_data): + print(">> Silence Detected... Stop recording.") + break + # Check if H is in the stream if (DCE_PHONE_OFF_HOOK in audio_data): print(">> Local phone off hook... Stop recording") break - - if (DCE_RING in audio_data): - print(">> Ring detected... Stop recording; new call coming in") - break - + # ~ # Check if P is in the stream + # ~ if (DCE_PHONE_OFF_HOOK2 in audio_data): + # ~ print(">> Local extension off hook... Stop recording") + # ~ break + # ~ # Check if l is in the stream + # ~ if (DCE_LINE_REVERSAL in audio_data): + # ~ print(">> Local phone off hook... Stop recording") + # ~ break # Check if b is in the stream if (DCE_BUSY_TONE in audio_data): print(">> Busy Tone... Stop recording.") break - - # Check if s is in the stream - if (DCE_SILENCE_DETECTED in audio_data): - print(">> Silence Detected... Stop recording.") - break - - # Check if is in the stream - if (DCE_END_VOICE_DATA_TX in audio_data): - print(">> Char Recieved... Stop recording.") - break - # Timeout if ((datetime.now() - start_time).seconds) > REC_VM_MAX_DURATION: print(">> Stop recording: max time limit reached.") @@ -489,6 +502,7 @@ def record_audio(self, audio_file_name): wf.setsampwidth(1) wf.setframerate(8000) wf.writeframes(b''.join(audio_frames)) + print(">> Recording stopped after {} seconds".format((datetime.now() - start_time).seconds)) # Clear input buffer before sending commands else its @@ -496,9 +510,10 @@ def record_audio(self, audio_file_name): self._serial.reset_input_buffer() # Send End of Recieve Data state by passing "!" - # Note: the command returns , but the DLE is stripped + # USR-5637 note: The command returns , but the DLE is stripped # from the response during the test, so we only test for the ETX. - if not self._send(DTE_END_RECIEVE_DATA_STATE, ETX_CODE): + response = lambda model: "OK" if model == "ZOOM" else ETX_CODE + if not self._send(DTE_END_VOICE_DATA_RX, response(self.model)): print("* Error: Unable to signal end of data receive state") return True @@ -506,8 +521,10 @@ def record_audio(self, audio_file_name): def wait_for_keypress(self, wait_time_secs=15): """ Waits n seconds for a key-press. - :params wait_time_secs: the number of seconds to wait for a keypress - :return: success (bool), key-press value (str) + :params wait_time_secs: + the number of seconds to wait for a keypress + :return: + success (bool), key-press value (str) """ print("> Waiting for key-press...") @@ -523,8 +540,8 @@ def wait_for_keypress(self, wait_time_secs=15): if not self._send(ENABLE_SILENCE_DETECTION_10_SECS): raise RuntimeError("Failed to enable silence detection.") - if not self._send(ENTER_TELEPHONE_ANSWERING_DEVICE_OFF_HOOK): - raise RuntimeError("Unable put modem into TAD mode.") + if not self._send(TELEPHONE_ANSWERING_DEVICE_OFF_HOOK): + raise RuntimeError("Unable put modem into Telephone Answering Device mode.") # Wait for keypress start_time = datetime.now() @@ -602,12 +619,12 @@ def _send_and_read(self, command, expected_response="OK", response_timeout=5): self._serial.write((command + '\r').encode()) self._serial.flush() # Get the execution status plus any preceeding result(s) from the modem - success, result = self._read_response(expected_response, response_timeout) + success, result = self._read_response(expected_response, response_timeout) return (success, result) except Exception as e: print(e) print("Error: Failed to execute the command: {}".format(command)) - return False, None + return False, None def _read_response(self, expected_response, response_timeout_secs): """ @@ -633,7 +650,7 @@ def _read_response(self, expected_response, response_timeout_secs): pprint(modem_data) response = decode(modem_data) # strips DLE_CODE - if expected_response == None: + if expected_response is None: return (True, None) elif expected_response == response: @@ -652,30 +669,61 @@ def _read_response(self, expected_response, response_timeout_secs): except Exception as e: print("Error in read_response function...") print(e) - return (False, None) + return (False, None) + + def _init_serial_port(self, com_port): + """Initializes the given COM port for communications with the modem.""" + self._serial.port = com_port + self._serial.baudrate = 57600 # 9600 + self._serial.bytesize = serial.EIGHTBITS # number of bits per bytes + self._serial.parity = serial.PARITY_NONE # set parity check: no parity + self._serial.stopbits = serial.STOPBITS_ONE # number of stop bits + self._serial.timeout = 3 # non-block read + self._serial.writeTimeout = 3 # timeout for write + self._serial.xonxoff = False # disable software flow control + self._serial.rtscts = False # disable hardware (RTS/CTS) flow control + self._serial.dsrdtr = False # disable hardware (DSR/DTR) flow control def _detect_modem(self): - global SET_VOICE_COMPRESSION, ENABLE_SILENCE_DETECTION_5_SECS, \ - DTE_RAISE_VOLUME, DTE_LOWER_VOLUME, DTE_END_VOICE_DATA_TX, \ - DTE_END_VOICE_DATA_RX, DTE_CLEAR_TRASMIT_BUFFER + global SET_VOICE_COMPRESSION, DISABLE_SILENCE_DETECTION, \ + ENABLE_SILENCE_DETECTION_5_SECS, ENABLE_SILENCE_DETECTION_10_SECS, \ + DTE_RAISE_VOLUME, DTE_LOWER_VOLUME, DTE_END_VOICE_DATA_TX, \ + DTE_END_VOICE_DATA_RX, DTE_CLEAR_TRANSMIT_BUFFER # Attempt to identify the modem success, result = self._send_and_read(GET_MODEM_PRODUCT_CODE) + if success: if USR_5637_PRODUCT_CODE in result: print("******* US Robotics Model 5637 detected **********") self.model = "USR" + elif ZOOM_3905_PRODUCT_CODE in result: + print("******* Zoom Model 3905 Detected **********") + self.model = "ZOOM" + # Define the settings for the Zoom3905 where they differ from the USR5637 + SET_VOICE_COMPRESSION = SET_VOICE_COMPRESSION_ZOOM + DISABLE_SILENCE_DETECTION = DISABLE_SILENCE_DETECTION_ZOOM + ENABLE_SILENCE_DETECTION_5_SECS = ENABLE_SILENCE_DETECTION_5_SECS_ZOOM + ENABLE_SILENCE_DETECTION_10_SECS = ENABLE_SILENCE_DETECTION_10_SECS_ZOOM + # System DLE shielded codes (double DLE) - DTE to DCE commands + DTE_RAISE_VOLUME = (chr(16) + chr(16) + chr(117)) # -u + DTE_LOWER_VOLUME = (chr(16) + chr(16) + chr(100)) # -d + DTE_END_VOICE_DATA_RX = (chr(16) + chr(16) + chr(16) + chr(33)) # -! + DTE_END_VOICE_DATA_TX = (chr(16) + chr(16) + chr(16) + chr(3)) # + DTE_CLEAR_TRANSMIT_BUFFER = (chr(16) + chr(16) + chr(16) + chr(24)) # + else: print("******* Unknown modem detected **********") - # We'll try to use it with the defined AT commands if it supports VOICE mode - # Validate modem selection by trying to put it in Voice Mode + # We'll try to use it with the predefined AT commands if it supports VOICE mode. if self._send(ENTER_VOICE_MODE): self.model = "UNKNOWN" + # Use the default settings (used by the USR 5637 modem) else: print("Error: Failed to put modem into voice mode.") success = False + return success def _init_modem(self): @@ -697,7 +745,7 @@ def _init_modem(self): # Test Modem connection, using basic AT command. if not self._send("AT"): print("Error: Unable to access the Modem") - if not self._send(FACTORY_RESET): + if not self._send(RESET): print("Error: Unable reset to factory default") if not self._send(ENABLE_VERBOSE_CODES): print("Error: Unable set response in verbose form") @@ -710,7 +758,7 @@ def _init_modem(self): if not self._send("AT&W0"): print("Error: Failed to store profile.") - self._send(DISPLAY_MODEM_SETTINGS) + self._send(GET_MODEM_SETTINGS) # Flush any existing input outout data from the buffers self._serial.flushInput() @@ -724,21 +772,9 @@ def _init_modem(self): print("Error: unable to Initialize the Modem") sys.exit() - def _init_serial_port(self, com_port): - """Initializes the given COM port for communications with the modem.""" - self._serial.port = com_port - self._serial.baudrate = 57600 # 9600 - self._serial.bytesize = serial.EIGHTBITS # number of bits per bytes - self._serial.parity = serial.PARITY_NONE # set parity check: no parity - self._serial.stopbits = serial.STOPBITS_ONE # number of stop bits - self._serial.timeout = 3 # non-block read - self._serial.writeTimeout = 3 # timeout for write - self._serial.xonxoff = False # disable software flow control - self._serial.rtscts = False # disable hardware (RTS/CTS) flow control - self._serial.dsrdtr = False # disable hardware (DSR/DTR) flow control - def decode(bytestr): # Remove non-printable chars before decoding. - string = re.sub(b'[^\x00-\x7f]', b'', bytestr).decode("utf-8").strip(' \t\n\r' + DLE_CODE) + # ~ string = re.sub(b'[^\x00-\x7f]', b'', bytestr).decode("utf-8").strip(' \t\n\r' + DLE_CODE) + string = bytestr.decode("utf-8", "ignore").strip(' \t\n\r' + DLE_CODE) return string diff --git a/tests/test_modem.py b/tests/test_modem.py index 8b75b9e..b8ecdc8 100644 --- a/tests/test_modem.py +++ b/tests/test_modem.py @@ -23,6 +23,9 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. + +global SET_VOICE_COMPRESSION + import os import sys import tempfile @@ -32,10 +35,13 @@ import pytest from callattendant.config import Config -from callattendant.hardware.modem import Modem, FACTORY_RESET, RESET, DISPLAY_MODEM_SETTINGS, \ - ENTER_VOICE_MODE, SET_VOICE_COMPRESSION_8BIT_SAMPLING_8K, ENTER_TELEPHONE_ANSWERING_DEVICE_OFF_HOOK, \ - ENTER_VOICE_TRANSMIT_DATA_STATE, DTE_END_VOICE_DATA_TX, ENTER_VOICE_RECIEVE_DATA_STATE, \ - DTE_END_RECIEVE_DATA_STATE, TERMINATE_CALL, ETX_CODE +from callattendant.hardware.modem import Modem, RESET, \ + GET_MODEM_PRODUCT_CODE, GET_MODEM_SETTINGS, \ + ENTER_VOICE_MODE, TELEPHONE_ANSWERING_DEVICE_OFF_HOOK, \ + ENTER_VOICE_TRANSMIT_DATA_STATE, DTE_END_VOICE_DATA_TX, \ + ENTER_VOICE_RECIEVE_DATA_STATE, DTE_END_VOICE_DATA_RX, \ + TERMINATE_CALL, ETX_CODE, DLE_CODE, \ + SET_VOICE_COMPRESSION, SET_VOICE_COMPRESSION_ZOOM # Skip the test when running under continous integraion pytestmark = pytest.mark.skipif(os.getenv("CI")=="true", reason="Hardware not installed") @@ -58,16 +64,12 @@ def modem(): modem.ring_indicator.close() -def test_factory_reset(modem): - assert modem._send(FACTORY_RESET) - - def test_profile_reset(modem): assert modem._send(RESET) -def test_display_modem_settings(modem): - assert modem._send(DISPLAY_MODEM_SETTINGS) +def test_get_modem_settings(modem): + assert modem._send(GET_MODEM_SETTINGS) def test_put_modem_into_voice_mode(modem): @@ -75,11 +77,13 @@ def test_put_modem_into_voice_mode(modem): def test_set_compression_method_and_sampling_rate_specifications(modem): - assert modem._send(SET_VOICE_COMPRESSION_8BIT_SAMPLING_8K) + assert modem._send( + SET_VOICE_COMPRESSION_ZOOM if modem.model == "ZOOM" else SET_VOICE_COMPRESSION + ) def test_put_modem_into_TAD_mode(modem): - assert modem._send(ENTER_TELEPHONE_ANSWERING_DEVICE_OFF_HOOK) + assert modem._send(TELEPHONE_ANSWERING_DEVICE_OFF_HOOK) def test_put_modem_into_voice_transmit_data_state(modem): @@ -94,8 +98,9 @@ def test_put_modem_into_voice_recieve_data_state(modem): assert modem._send(ENTER_VOICE_RECIEVE_DATA_STATE, "CONNECT") -def test_cancel_data_transmit_state(modem): - assert modem._send(DTE_END_RECIEVE_DATA_STATE, ETX_CODE) +def test_cancel_data_receive_state(modem): + response = lambda model: "OK" if model == "ZOOM" else ETX_CODE + assert modem._send(DTE_END_VOICE_DATA_RX, response(modem.model)) def test_terminate_call(modem):