Skip to content

Commit

Permalink
Merge pull request #11 from andreas-amlabs/nad_telnet_filter_response
Browse files Browse the repository at this point in the history
Wait for the requested response
  • Loading branch information
joopert authored Jan 20, 2018
2 parents f80ffba + f4d3fdf commit 4f72e83
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# nad_receiver
Simple python API to connect to NAD receivers through the RS232 interface, and to the NAD D 7050 amplifier via tcp/ip. For the RS232 interface use the NADReceiver class, for the D 7050 use the D7050 class.
Simple python API to connect to NAD receivers through the RS232 interface, Telnet interface or the TCP/IP interface.
For the RS232 interface use the NADReceiver class
For the Telnet interface use the NADReceiverTelnet class
For the TCP/IP interface use the D7050 class

Note that the RS232 interface is only tested with the NAD T748v2. Commands are implemented based on the T748v2. Those commands should work with more NAD receivers.
The Telnet interface is only tested with the NAD T787. The Telnet interface share documentation with the RS232 interface and supports the same commands
The supported commands can easily be extended for receivers which support more commands.

For more information see the official documentation at http://nadelectronics.com/software
Expand All @@ -26,6 +30,13 @@ D7050.select_source('Optical 1')
D7050.mute()
D7050.unmute()
D7050.power_off()
receiver = NADReceiverTelnet(my_nad.local)
receiver.main_volume('+') # will increase volume with 1 and return new value
receiver.main_volume('-') # will decrease volume with 1 and return new value
receiver.main_volume('=', '-40') # specify dB, will return new value
print(receiver.main_volume('?')) # will return current value
```

supported commands with supported operators for the RS232 interface
Expand Down
72 changes: 55 additions & 17 deletions nad_receiver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def mute(self):
def unmute(self):
"""Unmute the device."""
self._send(self.CMD_UNMUTE)

def select_source(self, source):
"""Select a source from the list of sources."""
status = self.status()
Expand All @@ -267,20 +267,39 @@ class NADReceiverTelnet(NADReceiver):
Known supported model: Nad T787.
"""
def _open_connection(self):
if not self.telnet:
try:
self.telnet = telnetlib.Telnet(self.host, self.port, 3)
# Some versions of the firmware report Main.Model=T787.
# some versions do not, we want to clear that line
self.telnet.read_until('\n'.encode(), self.timeout)
# Could raise eg. EOFError, UnicodeError
except:
return False

return True

def _close_connection(self):
"""
Close any telnet session
"""
if self.telnet:
self.telnet.close()

def __init__(self, host, port=23, timeout=DEFAULT_TIMEOUT):
"""Create Telnet connection."""
self.telnet = telnetlib.Telnet(host, port, 5)
"""Create NADTelnet."""
self.telnet = None
self.host = host
self.port = port
self.timeout = timeout
# Some versions of the firmware report Main.Model=T787.
# some versions do not, we want to clear that line
msg = self.telnet.read_until('\n'.encode(), self.timeout)
#msg.decode().strip().split('=')[1]
# __init__ must never raise

def __del__(self):
"""
Close any telnet session
"""
self.telnet.close()
self._close_connection()

def exec_command(self, domain, function, operator, value=None):
"""
Expand All @@ -298,14 +317,33 @@ def exec_command(self, domain, function, operator, value=None):
else:
raise ValueError('Invalid operator provided %s' % operator)

# Not possible to test for open Telnet connection
# let is raise if any issues
# Yes, for telnet the first \r / \n is recommended only
self.telnet.write((''.join([cmd, '\n']).encode()))
if self._open_connection():
# For telnet the first \r / \n is recommended only
self.telnet.write((''.join(['\r', cmd, '\n']).encode()))
# Could raise eg. socket.error, UnicodeError, let the client handle it

msg = self.telnet.read_until('\n'.encode(), self.timeout)
msg = msg.decode().strip('\r\n')
#print("NAD reponded with '%s'" % msg)
# Test 3 x buffer is completely empty
# With the default timeout that means a delay at
# about 3+ seconds
loop = 3
while loop:
msg = self.telnet.read_until('\n'.encode(), self.timeout)
# Could raise eg. EOFError, UnicodeError, let the client handle it

return msg.strip().split('=')[1]
# b'Main.Volume=-12\r will return -12
if msg == "":
# Nothing in buffer
loop -= 1
continue

msg = msg.decode().strip('\r\n')
# Could raise eg. UnicodeError, let the client handle it

#print("NAD reponded with '%s'" % msg)
# Wait for the response that equals the requested domain.function
if msg.strip().split('=')[0].lower() == '.'.join([domain, function]).lower():
# b'Main.Volume=-12\r will return -12
return msg.strip().split('=')[1]

raise RuntimeError('Failed to read response')

raise RuntimeError('Failed to open connection')

0 comments on commit 4f72e83

Please sign in to comment.