Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update #103: Creation of remote hdmicec client #115

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/configs/example_rack_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ rackConfig:
# [ hdmiCECController: optional ] - Specific hdmiCECController for the slot
# supported types:
# [type: "cec-client", adaptor: "/dev/ttycec"]
# [type: "remote-cec-client", adaptor: "/dev/ttycec", address: "192.168.99.1", username(optional): "testuser", password(optional): "testpswd", port(optional): "22"]
- pi2:
ip: "192.168.99.1"
description: "local pi4"
Expand Down
2 changes: 2 additions & 0 deletions framework/core/deviceManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,13 @@ def __init__(self, log:logModule, logPath:str, devices:dict):
# # Telnet
# # outbound
# # remoteController
# # hdmiCECController
self.log = log
self.consoles = dict()
self.powerControl = None
self.outBoundClient = None
self.remoteController = None
self.hdmiCECController = None
self.session = None
self.alive = False

Expand Down
155 changes: 82 additions & 73 deletions framework/core/hdmiCECController.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@
#* ** cec controller type is specified.
#* **
#* ******************************************************************************

from datetime import datetime
from os import path

import sys
MY_PATH = path.realpath(__file__)
MY_DIR = path.dirname(MY_PATH)
sys.path.append(path.join(MY_DIR,'../../'))
from framework.core.logModule import logModule
from framework.core.hdmicecModules import CECClientController, MonitoringType
from framework.core.streamToFile import StreamToFile
from framework.core.hdmicecModules import CECClientController, RemoteCECClient, CECDeviceType

class HDMICECController():
"""
This class provides a high-level interface for controlling and monitoring
This class provides a high-level interface for controlling and monitoring
Consumer Electronics Control (CEC) devices.
"""

Expand All @@ -56,115 +57,123 @@ def __init__(self, log: logModule, config: dict):
self._log = log
self.controllerType = config.get('type')
self.cecAdaptor = config.get('adaptor')
self._streamFile = path.join(self._log.logPath, f'{self.controllerType.lower()}_{str(datetime.now().timestamp())}')
self._stream = StreamToFile(self._streamFile)
if self.controllerType.lower() == 'cec-client':
self.controller = CECClientController(self.cecAdaptor, self._log)
self.controller = CECClientController(self.cecAdaptor,
self._log,
self._stream)
elif self.controllerType.lower() == 'remote-cec-client':
self.controller = RemoteCECClient(self.cecAdaptor,
self._log,
self._stream,
config.get('address'),
username=config.get('username',''),
password=config.get('password',''),
port=config.get('port',22),
prompt=config.get('prompt', ':~'))
self._read_line = 0
self._monitoringLog = path.join(self._log.logPath, 'cecMonitor.log')

def send_message(self, message: str) -> bool:
"""
Sends a CEC message to connected devices using the configured controller.

Args:
message (str): The CEC message to be sent.

Returns:
bool: True if the message was sent successfully, False otherwise.
def sendMessage(self, sourceAddress: str, destAddress: str, opCode: str, payload: list = None) -> None:
"""
self._log.debug('Sending CEC message: [%s]' % message)
return self.controller.sendMessage(message)
Sends an opCode from a specified source and to a specified destination.

def startMonitoring(self, deviceType: MonitoringType = MonitoringType.RECORDER) -> None:
"""
Starts monitoring CEC messages from the adaptor as the specified device type.

Args:
deviceType (MonitoringType, optional): The type of device to monitor (default: MonitoringType.RECORDER).

Raises:
RuntimeError: If monitoring is already running.
sourceAddress (str): The logical address of the source device (0-9 or A-F).
destAddress (str): The logical address of the destination device (0-9 or A-F).
opCode (str): Operation code to send as an hexidecimal string e.g 0x81.
payload (list): List of hexidecimal strings to be sent with the opCode. Optional.
"""
if self.controller.monitoring is False:
self._log.debug('Starting monitoring on adaptor: [%s]' % self.cecAdaptor)
self._log.debug('Monitoring as device type [%s]' % deviceType.name)
return self.controller.startMonitoring(self._monitoringLog, deviceType)
else:
self._log.warn('CEC monitoring is already running')

def stopMonitoring(self):
payload_string = ''
if isinstance(payload, list):
payload_string = ' '.join(payload)
self._log.debug('Sending CEC message: Source=[%s] Dest=[%s] opCode=[%s] payload=[%s]' %
(sourceAddress, destAddress, opCode, payload_string))
self.controller.sendMessage(sourceAddress, destAddress, opCode, payload=payload)

def receiveMessage(self, sourceAddress: str, destAddress: str, opCode: str, timeout: int = 10, payload: list = None) -> bool:
"""
Stops the CEC monitoring process.

Delegates the stop task to the underlying `CECClientController`.
"""
return self.controller.stopMonitoring()

def readUntil(self, message: str, retries: int = 5) -> bool:
"""
Reads the monitoring log until the specified message is found.

Opens the monitoring log file and checks for the message within a specified retry limit.
This function checks to see if a specified opCode has been received.

Args:
message (str): The message to search for in the monitoring log.
retries (int, optional): The maximum number of retries before giving up (default: 5).
sourceAddress (str): The logical address of the source device (0-9 or A-F).
destAddress (str): The logical address of the destination device (0-9 or A-F).
opCode (str): Operation code to send as an hexidecimal string e.g 0x81.
timeout (int): The maximum amount of time, in seconds, that the method will
wait for the message to be received. Defaults to 10.
payload (list): List of hexidecimal strings to be sent with the opCode. Optional.

Returns:
bool: True if the message was found, False otherwise.
boolean: True if message is received. False otherwise.
"""
self._log.debug('Starting readUntil for message as [%s] with [%s] retries' % (message,retries))
result = False
retry = 0
max_retries = retries
while retry != max_retries and not result:
with open(self._monitoringLog, 'r') as logFile:
logLines = logFile.readlines()
read_line = self._read_line
write_line = len(logLines)
while read_line != write_line:
if message in logLines[read_line]:
result = True
break
read_line+=1
retry += 1
self._read_line = read_line
return result
payload_string = ''
if isinstance(payload, list):
payload_string = ' '.join(payload)
self._log.debug('Expecting CEC message: Source=[%s] Dest=[%s] opCode=[%s] payload=[%s]' %
(sourceAddress, destAddress, opCode, payload_string))
return self.controller.receiveMessage(sourceAddress, destAddress, opCode, timeout=timeout, payload=payload)

def listDevices(self) -> list:
"""
Retrieves a list of discovered CEC devices with their OSD names (if available).

List CEC devices on CEC network.

The list returned contains dicts in the following format:
{'active source': False,
'vendor': 'Unknown',
'osd string': 'TV',
'CEC version': '1.3a',
'power status': 'on',
'language': 'eng',
'physical address': '0.0.0.0',
'name': 'TV',
'logical address': '0'}
Returns:
list: A list of dictionaries representing discovered devices.
"""
self._log.debug('Listing devices on CEC network')
return self.controller.listDevices()

def start(self):
"""Start the CECContoller.
"""
self.controller.start()

def stop(self):
"""Stop the CECController.
"""
self.controller.stop()

if __name__ == "__main__":
import time
import json
LOG = logModule('CECTEST', logModule.DEBUG)
CONFIGS = [
# {
# 'type': 'cec-client',
# 'adaptor': '/dev/ttyACM0' # This is default for pulse 8
# },
{
'type': 'cec-client',
'adaptor': '/dev/ttyACM0'
},
'type': 'remote-cec-client',
'adaptor': '/dev/cec0', # This is default for Raspberry Pi
'address': '', # Needs to be be filled out with IP address
'username': '', # Needs to be filled out with login username
'password': '', # Needs to be filled out with login password
'prompt' : ''
}
]
for config in CONFIGS:
LOG.setFilename('./logs/','CECTEST%s.log' % config.get('type'))
LOG.setFilename(path.abspath('./logs/'),'CECTEST%s.log' % config.get('type'))
LOG.stepStart('Testing with %s' % json.dumps(config))
CEC = HDMICECController(LOG, config)
DEVICES = CEC.listDevices()
LOG.info(json.dumps(DEVICES))
CEC.sendMessage('0', '2', '0x8f', ['0x21','0x85'])
# The user will need to check all the devices expected from their
# cec network are shown in this output.
CEC.startMonitoring()
# It's is expected that a user will send a standby command on their cec
# network during this 2 minutes.
time.sleep(120)
result = CEC.readUntil('standby')
CEC.stopMonitoring()
result = CEC.receiveMessage('2', '0', '0x8f')
LOG.stepResult(result, 'The readUntil result is: [%s]' % result)
CEC.stop()
# The user should check here the monitoring log for thier type contains
# the expected information.
3 changes: 2 additions & 1 deletion framework/core/hdmicecModules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@
#* ******************************************************************************

from .cecClient import CECClientController
from .cecTypes import MonitoringType
from .remoteCECClient import RemoteCECClient
from .cecTypes import CECDeviceType
94 changes: 62 additions & 32 deletions framework/core/hdmicecModules/abstractCECController.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,31 @@
#* ******************************************************************************

from abc import ABCMeta, abstractmethod
from datetime import datetime
import os

from framework.core.logModule import logModule
from .cecTypes import MonitoringType
from framework.core.streamToFile import StreamToFile
from .cecTypes import CECDeviceType

class CECInterface(metaclass=ABCMeta):

def __init__(self, adaptor_path:str, logger:logModule):
def __init__(self, adaptor_path:str, logger:logModule, streamLogger: StreamToFile):
self.adaptor = adaptor_path
self._log = logger
self._monitoring = False

@property
def monitoring(self) -> bool:
return self._monitoring
self._proc = None
self._stream = streamLogger

@abstractmethod
def sendMessage(cls, message:str) -> bool:
def sendMessage(cls, sourceAddress: str, destAddress: str, opCode: str, payload: list = None, deviceType: CECDeviceType=None) -> None:
"""
Send a CEC message to the CEC network.

Sends an opCode from a specified source and to a specified destination.
Args:
message (str): The CEC message to be sent.

Returns:
bool: True if the message was sent successfully, False otherwise.
sourceAddress (str): The logical address of the source device (0-9 or A-F).
destAddress (str): The logical address of the destination device (0-9 or A-F).
opCode (str): Operation code to send as an hexidecimal string e.g 0x81.
payload (list): List of hexidecimal strings to be sent with the opCode. Optional.
"""
pass

Expand All @@ -64,35 +64,65 @@ def listDevices(cls) -> list:
List CEC devices on CEC network.

The list returned contains dicts in the following format:
{
'name': 'TV'
'address': '0.0.0.0',
'active source': True,
'vendor': 'Unknown',
'osd string': 'TV',
'CEC version': '1.3a',
'power status': 'on',
'language': 'eng',
}
{'active source': False,
'vendor': 'Unknown',
'osd string': 'TV',
'CEC version': '1.3a',
'power status': 'on',
'language': 'eng',
'physical address': '0.0.0.0',
'name': 'TV',
'logical address': '0'}
Returns:
list: A list of dictionaries representing discovered devices.
"""
pass

@abstractmethod
def startMonitoring(cls, monitoringLog: str, deviceType: MonitoringType=MonitoringType.RECORDER) -> None:
def start(cls):
"""Start the CECContoller.
"""
Starts monitoring CEC messages with a specified device type.
pass

@abstractmethod
def stop(cls):
"""Stop the CECController.
"""
pass

def formatMessage(cls, sourceAddress: str, destAddress: str, opCode:str, payload: list = None) -> str:
"""Format the input information into the required message string
for the CECController.

Args:
deviceType (MonitoringType, optional): The type of device to monitor (default: MonitoringType.RECORDER).
monitoringLog (str) : Path to write the monitoring log out
sourceAddress (str): The logical address of the source device (0-9 or A-F).
destAddress (str): The logical address of the destination device (0-9 or A-F).
opCode (str): Operation code to send as an hexidecimal string e.g 0x81.
payload (list): List of hexidecimal strings to be sent with the opCode. Optional

Returns:
str: Formatted message for CECController.
"""
pass

@abstractmethod
def stopMonitoring(cls) -> None:
def receiveMessage(self,sourceAddress: str, destAddress: str, opCode: str, timeout: int = 10, payload: list = None) -> bool:
"""
Stops the CEC monitoring process.
This function checks to see if a specified opCode has been received.

Args:
sourceAddress (str): The logical address of the source device (0-9 or A-F).
destAddress (str): The logical address of the destination device (0-9 or A-F).
opCode (str): Operation code to send as an hexidecimal string e.g 0x81.
timeout (int): The maximum amount of time, in seconds, that the method will
wait for the message to be received. Defaults to 10.
payload (list): List of hexidecimal strings to be sent with the opCode. Optional.

Returns:
boolean: True if message is received. False otherwise.
"""
pass
end = datetime.now().timestamp() + timeout
result = False
while datetime.now().timestamp() < end and result is False:
message = self.formatMessage(sourceAddress, destAddress, opCode, payload)
result = self._stream.readUntil(message)
return result
Loading