diff --git a/examples/configs/example_rack_config.yml b/examples/configs/example_rack_config.yml index 9a2cfb5..b0d9372 100644 --- a/examples/configs/example_rack_config.yml +++ b/examples/configs/example_rack_config.yml @@ -96,6 +96,7 @@ rackConfig: # [ hdmiCECController: optional ] - Specific hdmiCECController for the slot # supported types: # [type: "cec-client", adaptor: "/dev/ttycec"] + # [type: "remote-cec-client", adaptor: "/dev/ttycec", address: "192.168.99.1", username(optional): "testuser", password(optional): "testpswd", port(optional): "22"] - pi2: ip: "192.168.99.1" description: "local pi4" diff --git a/framework/core/deviceManager.py b/framework/core/deviceManager.py index 51820e7..6a78269 100644 --- a/framework/core/deviceManager.py +++ b/framework/core/deviceManager.py @@ -150,11 +150,13 @@ def __init__(self, log:logModule, logPath:str, devices:dict): # # Telnet # # outbound # # remoteController + # # hdmiCECController self.log = log self.consoles = dict() self.powerControl = None self.outBoundClient = None self.remoteController = None + self.hdmiCECController = None self.session = None self.alive = False diff --git a/framework/core/hdmiCECController.py b/framework/core/hdmiCECController.py index 0c8048a..d16a0ea 100644 --- a/framework/core/hdmiCECController.py +++ b/framework/core/hdmiCECController.py @@ -29,7 +29,7 @@ #* ** cec controller type is specified. #* ** #* ****************************************************************************** - +from datetime import datetime from os import path import sys @@ -37,11 +37,12 @@ MY_DIR = path.dirname(MY_PATH) sys.path.append(path.join(MY_DIR,'../../')) from framework.core.logModule import logModule -from framework.core.hdmicecModules import CECClientController, MonitoringType +from framework.core.streamToFile import StreamToFile +from framework.core.hdmicecModules import CECClientController, RemoteCECClient, CECDeviceType class HDMICECController(): """ - This class provides a high-level interface for controlling and monitoring + This class provides a high-level interface for controlling and monitoring Consumer Electronics Control (CEC) devices. """ @@ -56,90 +57,95 @@ def __init__(self, log: logModule, config: dict): self._log = log self.controllerType = config.get('type') self.cecAdaptor = config.get('adaptor') + self._streamFile = path.join(self._log.logPath, f'{self.controllerType.lower()}_{str(datetime.now().timestamp())}') + self._stream = StreamToFile(self._streamFile) if self.controllerType.lower() == 'cec-client': - self.controller = CECClientController(self.cecAdaptor, self._log) + self.controller = CECClientController(self.cecAdaptor, + self._log, + self._stream) + elif self.controllerType.lower() == 'remote-cec-client': + self.controller = RemoteCECClient(self.cecAdaptor, + self._log, + self._stream, + config.get('address'), + username=config.get('username',''), + password=config.get('password',''), + port=config.get('port',22), + prompt=config.get('prompt', ':~')) self._read_line = 0 - self._monitoringLog = path.join(self._log.logPath, 'cecMonitor.log') - - def send_message(self, message: str) -> bool: - """ - Sends a CEC message to connected devices using the configured controller. - - Args: - message (str): The CEC message to be sent. - Returns: - bool: True if the message was sent successfully, False otherwise. - """ - self._log.debug('Sending CEC message: [%s]' % message) - return self.controller.sendMessage(message) - - def startMonitoring(self, deviceType: MonitoringType = MonitoringType.RECORDER) -> None: + def sendMessage(self, sourceAddress: str, destAddress: str, opCode: str, payload: list = None) -> None: """ - Starts monitoring CEC messages from the adaptor as the specified device type. + Sends an opCode from a specified source and to a specified destination. Args: - deviceType (MonitoringType, optional): The type of device to monitor (default: MonitoringType.RECORDER). - - Raises: - RuntimeError: If monitoring is already running. - """ - if self.controller.monitoring is False: - self._log.debug('Starting monitoring on adaptor: [%s]' % self.cecAdaptor) - self._log.debug('Monitoring as device type [%s]' % deviceType.name) - return self.controller.startMonitoring(self._monitoringLog, deviceType) - else: - self._log.warn('CEC monitoring is already running') - - def stopMonitoring(self): + sourceAddress (str): The logical address of the source device (0-9 or A-F). + destAddress (str): The logical address of the destination device (0-9 or A-F). + opCode (str): Operation code to send as an hexidecimal string e.g 0x81. + payload (list): List of hexidecimal strings to be sent with the opCode. Optional. """ - Stops the CEC monitoring process. - - Delegates the stop task to the underlying `CECClientController`. - """ - return self.controller.stopMonitoring() - - def readUntil(self, message: str, retries: int = 5) -> bool: + payload_string = '' + if isinstance(payload, list): + payload_string = ' '.join(payload) + self._log.debug('Sending CEC message: Source=[%s] Dest=[%s] opCode=[%s] payload=[%s]' % + (sourceAddress, destAddress, opCode, payload_string)) + self.controller.sendMessage(sourceAddress, destAddress, opCode, payload=payload) + + def checkMessageReceived(self, sourceAddress: str, destAddress: str, opCode: str, timeout: int = 10, payload: list = None) -> bool: """ - Reads the monitoring log until the specified message is found. - - Opens the monitoring log file and checks for the message within a specified retry limit. + This function checks to see if a specified opCode has been received. Args: - message (str): The message to search for in the monitoring log. - retries (int, optional): The maximum number of retries before giving up (default: 5). + sourceAddress (str): The logical address of the source device (0-9 or A-F). + destAddress (str): The logical address of the destination device (0-9 or A-F). + opCode (str): Operation code to send as an hexidecimal string e.g 0x81. + timeout (int): The maximum amount of time, in seconds, that the method will + wait for the message to be received. Defaults to 10. + payload (list): List of hexidecimal strings to be sent with the opCode. Optional. Returns: - bool: True if the message was found, False otherwise. + boolean: True if message is received. False otherwise. """ - self._log.debug('Starting readUntil for message as [%s] with [%s] retries' % (message,retries)) result = False - retry = 0 - max_retries = retries - while retry != max_retries and not result: - with open(self._monitoringLog, 'r') as logFile: - logLines = logFile.readlines() - read_line = self._read_line - write_line = len(logLines) - while read_line != write_line: - if message in logLines[read_line]: - result = True - break - read_line+=1 - retry += 1 - self._read_line = read_line + payload_string = '' + if isinstance(payload, list): + payload_string = ' '.join(payload) + self._log.debug('Expecting CEC message: Source=[%s] Dest=[%s] opCode=[%s] payload=[%s]' % + (sourceAddress, destAddress, opCode, payload_string)) + received_message = self.controller.receiveMessage(sourceAddress, destAddress, opCode, timeout=timeout, payload=payload) + if len(received_message) > 0: + result = True return result def listDevices(self) -> list: """ - Retrieves a list of discovered CEC devices with their OSD names (if available). - + List CEC devices on CEC network. + + The list returned contains dicts in the following format: + {'active source': False, + 'vendor': 'Unknown', + 'osd string': 'TV', + 'CEC version': '1.3a', + 'power status': 'on', + 'language': 'eng', + 'physical address': '0.0.0.0', + 'name': 'TV', + 'logical address': '0'} Returns: list: A list of dictionaries representing discovered devices. """ self._log.debug('Listing devices on CEC network') return self.controller.listDevices() + def start(self): + """Start the CECContoller. + """ + self.controller.start() + + def stop(self): + """Stop the CECController. + """ + self.controller.stop() if __name__ == "__main__": import time @@ -148,23 +154,25 @@ def listDevices(self) -> list: CONFIGS = [ { 'type': 'cec-client', - 'adaptor': '/dev/ttyACM0' - }, + 'adaptor': '/dev/ttyACM0' # This is default for pulse 8 + }, + { + 'type': 'remote-cec-client', + 'adaptor': '/dev/cec0', # This is default for Raspberry Pi + 'address': '', # Needs to be be filled out with IP address + 'username': '', # Needs to be filled out with login username + 'password': '', # Needs to be filled out with login password + 'prompt' : '' + } ] for config in CONFIGS: - LOG.setFilename('./logs/','CECTEST%s.log' % config.get('type')) + LOG.setFilename(path.abspath('./logs/'),'CECTEST%s.log' % config.get('type')) LOG.stepStart('Testing with %s' % json.dumps(config)) CEC = HDMICECController(LOG, config) DEVICES = CEC.listDevices() LOG.info(json.dumps(DEVICES)) - # The user will need to check all the devices expected from their - # cec network are shown in this output. - CEC.startMonitoring() - # It's is expected that a user will send a standby command on their cec - # network during this 2 minutes. - time.sleep(120) - result = CEC.readUntil('standby') - CEC.stopMonitoring() + CEC.sendMessage('0', '2', '0x8f') + result = CEC.receiveMessage('2', '0', '0x90', payload=['0x00']) LOG.stepResult(result, 'The readUntil result is: [%s]' % result) - # The user should check here the monitoring log for thier type contains - # the expected information. + CEC.stop() + diff --git a/framework/core/hdmicecModules/__init__.py b/framework/core/hdmicecModules/__init__.py index df8f28f..1eea55a 100644 --- a/framework/core/hdmicecModules/__init__.py +++ b/framework/core/hdmicecModules/__init__.py @@ -27,4 +27,5 @@ #* ****************************************************************************** from .cecClient import CECClientController -from .cecTypes import MonitoringType +from .remoteCECClient import RemoteCECClient +from .cecTypes import CECDeviceType diff --git a/framework/core/hdmicecModules/abstractCECController.py b/framework/core/hdmicecModules/abstractCECController.py index 5a00a14..89e5cf3 100644 --- a/framework/core/hdmicecModules/abstractCECController.py +++ b/framework/core/hdmicecModules/abstractCECController.py @@ -30,31 +30,31 @@ #* ****************************************************************************** from abc import ABCMeta, abstractmethod +from datetime import datetime +import os from framework.core.logModule import logModule -from .cecTypes import MonitoringType +from framework.core.streamToFile import StreamToFile +from .cecTypes import CECDeviceType class CECInterface(metaclass=ABCMeta): - def __init__(self, adaptor_path:str, logger:logModule): + def __init__(self, adaptor_path:str, logger:logModule, streamLogger: StreamToFile): self.adaptor = adaptor_path self._log = logger - self._monitoring = False - - @property - def monitoring(self) -> bool: - return self._monitoring + self._console = None + self._stream = streamLogger @abstractmethod - def sendMessage(cls, message:str) -> bool: + def sendMessage(cls, sourceAddress: str, destAddress: str, opCode: str, payload: list = None, deviceType: CECDeviceType=None) -> None: """ - Send a CEC message to the CEC network. - + Sends an opCode from a specified source and to a specified destination. + Args: - message (str): The CEC message to be sent. - - Returns: - bool: True if the message was sent successfully, False otherwise. + sourceAddress (str): The logical address of the source device (0-9 or A-F). + destAddress (str): The logical address of the destination device (0-9 or A-F). + opCode (str): Operation code to send as an hexidecimal string e.g 0x81. + payload (list): List of hexidecimal strings to be sent with the opCode. Optional. """ pass @@ -64,35 +64,62 @@ def listDevices(cls) -> list: List CEC devices on CEC network. The list returned contains dicts in the following format: - { - 'name': 'TV' - 'address': '0.0.0.0', - 'active source': True, - 'vendor': 'Unknown', - 'osd string': 'TV', - 'CEC version': '1.3a', - 'power status': 'on', - 'language': 'eng', - } + {'active source': False, + 'vendor': 'Unknown', + 'osd string': 'TV', + 'CEC version': '1.3a', + 'power status': 'on', + 'language': 'eng', + 'physical address': '0.0.0.0', + 'name': 'TV', + 'logical address': '0'} Returns: list: A list of dictionaries representing discovered devices. """ pass @abstractmethod - def startMonitoring(cls, monitoringLog: str, deviceType: MonitoringType=MonitoringType.RECORDER) -> None: + def start(cls): + """Start the CECContoller. """ - Starts monitoring CEC messages with a specified device type. + pass + + @abstractmethod + def stop(cls): + """Stop the CECController. + """ + pass + + def formatMessage(cls, sourceAddress: str, destAddress: str, opCode:str, payload: list = None) -> str: + """Format the input information into the required message string + for the CECController. Args: - deviceType (MonitoringType, optional): The type of device to monitor (default: MonitoringType.RECORDER). - monitoringLog (str) : Path to write the monitoring log out + sourceAddress (str): The logical address of the source device (0-9 or A-F). + destAddress (str): The logical address of the destination device (0-9 or A-F). + opCode (str): Operation code to send as an hexidecimal string e.g 0x81. + payload (list): List of hexidecimal strings to be sent with the opCode. Optional + + Returns: + str: Formatted message for CECController. """ pass - @abstractmethod - def stopMonitoring(cls) -> None: + def receiveMessage(self,sourceAddress: str, destAddress: str, opCode: str, timeout: int = 10, payload: list = None) -> bool: """ - Stops the CEC monitoring process. + This function checks to see if a specified opCode has been received. + + Args: + sourceAddress (str): The logical address of the source device (0-9 or A-F). + destAddress (str): The logical address of the destination device (0-9 or A-F). + opCode (str): Operation code to send as an hexidecimal string e.g 0x81. + timeout (int): The maximum amount of time, in seconds, that the method will + wait for the message to be received. Defaults to 10. + payload (list): List of hexidecimal strings to be sent with the opCode. Optional. + + Returns: + list: list of strings containing found message. Empty list if message isn't found. """ - pass + message = self.formatMessage(sourceAddress, destAddress, opCode, payload) + output = self._stream.readUntil(message, timeout) + return output diff --git a/framework/core/hdmicecModules/cecClient.py b/framework/core/hdmicecModules/cecClient.py index 0e2c758..9596f17 100644 --- a/framework/core/hdmicecModules/cecClient.py +++ b/framework/core/hdmicecModules/cecClient.py @@ -31,23 +31,25 @@ #* ** #* ****************************************************************************** +from datetime import datetime from io import IOBase import re import subprocess from threading import Thread from framework.core.logModule import logModule +from framework.core.streamToFile import StreamToFile from .abstractCECController import CECInterface -from .cecTypes import MonitoringType +from .cecTypes import CECDeviceType class CECClientController(CECInterface): """ - This class provides an interface for controlling Consumer Electronics Control (CEC) + This class provides an interface for controlling Consumer Electronics Control (CEC) devices through the `cec-client` command-line tool. """ - def __init__(self, adaptor_path:str, logger:logModule): + def __init__(self, adaptor_path:str, logger:logModule, streamLogger: StreamToFile): """ Initializes the CECClientController instance. @@ -58,43 +60,34 @@ def __init__(self, adaptor_path:str, logger:logModule): Raises: AttributeError: If the specified CEC adaptor is not found. """ - - self._log = logger - self.adaptor = adaptor_path + + super().__init__(adaptor_path=adaptor_path, logger=logger, streamLogger=streamLogger) self._log.debug('Initialising CECClientController for [%s]' % self.adaptor) if self.adaptor not in map(lambda x: x.get('com port'),self._getAdaptors()): raise AttributeError('CEC Adaptor specified not found') - self._monitoring = False - self._m_proc = None - self._m_stdout_thread = None - - def sendMessage(self,message: str) -> bool: - exit_code, stdout = self._sendMessage(message, 0) - self._log.debug('Output of message sent: [%s]' % stdout) - if exit_code != 0: - return False - return True - - def _sendMessage(self, message: str, debug: int = 1) -> tuple: - """ - Internal method for sending a CEC message using `subprocess`. - - Args: - message (str): The CEC message to be sent. - debug (int, optional): Debug level for `cec-client` (default: 1). + self.start() - Returns: - tuple: A tuple containing the exit code of the subprocess call and the standard output. - """ - result = subprocess.run(f'echo "{message}" | cec-client {self.adaptor} -s -d {debug}', - shell=True, - check=True, + def start(self): + self._console = subprocess.Popen(f'cec-client {self.adaptor}'.split(), stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout = result.stdout.decode('utf-8') - stderr = result.stderr.decode('utf-8') - exit_code = result.returncode - return exit_code, stdout + stderr=subprocess.STDOUT, + stdin=subprocess.PIPE, + text=True) + self._stream.writeStreamToFile(self._console.stdout) + + def stop(self): + self._console.stdin.write('q\n') + self._console.stdin.flush() + try: + self._console.wait() + except subprocess.CalledProcessError: + self._console.terminate() + self._stream.stopStreamedLog() + + def sendMessage(self, sourceAddress: str, destAddress: str, opCode: str, payload: list = None) -> None: + message = self.formatMessage(sourceAddress, destAddress, opCode, payload=payload) + self._console.stdin.write(f'tx {message}\n') + self._console.stdin.flush() def _getAdaptors(self) -> list: """ @@ -103,7 +96,7 @@ def _getAdaptors(self) -> list: Returns: list: A list of dictionaries representing available adaptors with details like COM port. """ - result = subprocess.run(f'cec-client -l', + result = subprocess.run(f'cec-client -l', shell=True, text=True, capture_output=True, @@ -122,21 +115,33 @@ def _scanCECNetwork(self) -> list: Returns: list: A list of dictionaries representing discovered devices with details. """ - _, result = self._sendMessage('scan') - self._log.debug('Output of scan on CEC Network: [%s]' % result) - devicesOnNetwork = self._splitDeviceSectionsToDicts(result) + devicesOnNetwork = [] + self._console.stdin.write('scan') + self._console.stdin.flush() + output = self._stream.readUntil('currently active source',30) + if len(output) > 0: + output = '\n'.join(output) + self._log.debug('Output of scan on CEC Network: [%s]' % output) + devicesOnNetwork = self._splitDeviceSectionsToDicts(output) return devicesOnNetwork def listDevices(self) -> list: devices = self._scanCECNetwork() for device_dict in devices: + # Remove the 'address' from the dict and change it to 'physical address' + device_dict['physical address'] = device_dict.pop('address') device_dict['name'] = device_dict.get('osd string') + for key in device_dict.keys(): + if 'device' in key.lower(): + device_dict['logical address'] = key.rsplit('#')[-1] + device_dict.pop(key) + break if device_dict.get('active source') == 'yes': device_dict['active source'] = True else: device_dict['active source'] = False return devices - + def _splitDeviceSectionsToDicts(self,command_output:str) -> list: """ Splits the output of a `cec-client` command into individual device sections and parses them into dictionaries. @@ -148,7 +153,7 @@ def _splitDeviceSectionsToDicts(self,command_output:str) -> list: list: A list of dictionaries, each representing a single CEC device with its attributes. """ devices = [] - device_sections = re.findall(r'^device[ #0-9]{0,}:[\s\S]+?(?:type|language): +[\S ]+$', + device_sections = re.findall(r'^device[ #0-9A-F]{0,}:[\s\S]+?(?:type|language): +[\S ]+$', command_output, re.M) if device_sections: @@ -161,47 +166,15 @@ def _splitDeviceSectionsToDicts(self,command_output:str) -> list: devices.append(device_dict) return devices - def startMonitoring(self, monitoringLog: str, device_type: MonitoringType = MonitoringType.RECORDER) -> None: - self._monitoring = True - try: - self._m_proc = subprocess.Popen(f'cec-client {self.adaptor} -m -d 0 -t {device_type.value}'.split(), - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True) - self._m_stdout_thread = Thread(target=self._write_monitoring_log, - args=[self._m_proc.stdout, monitoringLog], - daemon=True) - self._m_stdout_thread.start() - except Exception as e: - self.stopMonitoring() - raise - - def _write_monitoring_log(self,streamIn: IOBase, logFilePath: str) -> None: - """ - Writes the output of the monitoring process to a log file. - - Args: - stream_in (IOBase): The input stream from the monitoring process. - logFilePath (str): File path to write the monitoring log out to. - """ - while True: - chunk = streamIn.readline() - if chunk == '': - break - with open(logFilePath, 'a+',) as out: - out.write(chunk) - - def stopMonitoring(self) -> None: - self._log.debug('Stopping monitoring of adaptor [%s]' % self.adaptor) - if self.monitoring is False: - return - self._m_proc.terminate() - exit_code = self._m_proc.wait() - self._m_stdout_thread.join() - self._monitoring = False + def formatMessage(self, sourceAddress: str, destAddress: str, opCode:str, payload: list = None) -> str: + message_string = f'{sourceAddress}{destAddress}:{opCode[2:]}' + if payload: + payload_string = ':'.join(map(lambda x: x[2:], payload)) + message_string += ':' + payload_string + return message_string.lower() def __del__(self): """ Destructor for the class, ensures monitoring is stopped. """ - self.stopMonitoring() + self.stop() diff --git a/framework/core/hdmicecModules/cecTypes.py b/framework/core/hdmicecModules/cecTypes.py index 62e81f4..9e3568e 100644 --- a/framework/core/hdmicecModules/cecTypes.py +++ b/framework/core/hdmicecModules/cecTypes.py @@ -31,7 +31,7 @@ from enum import Enum -class MonitoringType(Enum): +class CECDeviceType(Enum): PLAYBACK = "p" RECORDER = "r" TUNER = "t" diff --git a/framework/core/hdmicecModules/remoteCECClient.py b/framework/core/hdmicecModules/remoteCECClient.py new file mode 100644 index 0000000..399ebce --- /dev/null +++ b/framework/core/hdmicecModules/remoteCECClient.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +#** ***************************************************************************** +# * +# * If not stated otherwise in this file or this component's LICENSE file the +# * following copyright and licenses apply: +# * +# * Copyright 2023 RDK Management +# * +# * Licensed under the Apache License, Version 2.0 (the "License"); +# * you may not use this file except in compliance with the License. +# * You may obtain a copy of the License at +# * +# * +# http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * +#* ****************************************************************************** +#* +#* ** Project : RAFT +#* ** @addtogroup : core +#* ** @date : 02/10/2024 +#* ** +#* ** @brief : Abstract class for CEC controller types. +#* ** +#* ****************************************************************************** + +import re + +from framework.core.logModule import logModule +from framework.core.streamToFile import StreamToFile +from framework.core.commandModules.sshConsole import sshConsole +from .abstractCECController import CECInterface +from .cecTypes import CECDeviceType + +class RemoteCECClient(CECInterface): + + def __init__(self, adaptor: str,logger: logModule, streamLogger: StreamToFile, address: str, port: int = 22, username: str = '', password: str = '', prompt = ':~'): + super().__init__(adaptor, logger, streamLogger) + self._console = sshConsole(self._log,address, username, password, port=port, prompt=prompt) + self._log.debug('Initialising RemoteCECClient controller') + try: + self._console.open() + except: + self._log.critical('Could not open connection to RemoteCECClient controller') + raise + if self.adaptor not in map(lambda x: x.get('com port'),self._getAdaptors()): + raise AttributeError('CEC Adaptor specified not found') + self.start() + + def start(self): + self._console.write(f'cec-client {self.adaptor}') + self._stream.writeStreamToFile(self._console.shell.makefile()) + + def stop(self): + self._console.write('q') + self._stream.stopStreamedLog() + + def _getAdaptors(self) -> list: + """ + Retrieves a list of available CEC adaptors using `cec-client`. + + Returns: + list: A list of dictionaries representing available adaptors with details like COM port. + """ + self._console.write(f'cec-client -l') + stdout = self._console.read() + stdout = stdout.replace('\r\n','\n') + adaptor_count = re.search(r'Found devices: ([0-9]+)',stdout, re.M).group(1) + adaptors = self._splitDeviceSectionsToDicts(stdout) + return adaptors + + def sendMessage(self, sourceAddress: str, destAddress: str, opCode: str, payload: list = None) -> None: + message = self.formatMessage(sourceAddress, destAddress, opCode, payload=payload) + self._console.write(f'tx {message}') + + def listDevices(self) -> list: + self._console.write(f'scan') + output = self._stream.readUntil('currently active source',30) + devices = [] + if len(output) > 0: + output = '\n'.join(output) + devices = self._splitDeviceSectionsToDicts(output) + for device in devices: + device['physical address'] = device.pop('address') + device['name'] = device.get('osd string') + for key in device.keys(): + if 'device' in key.lower(): + device['logical address'] = key.rsplit('#')[-1] + device.pop(key) + break + if device.get('active source') == 'yes': + device['active source'] = True + else: + device['active source'] = False + return devices + + def _splitDeviceSectionsToDicts(self,command_output:str) -> list: + """ + Splits the output of a `cec-client` command into individual device sections and parses them into dictionaries. + + Args: + command_output (str): The output string from the `cec-client` command. + + Returns: + list: A list of dictionaries, each representing a single CEC device with its attributes. + """ + devices = [] + device_sections = re.findall(r'^device[ #0-9A-F]{0,}:[\s\S]+?(?:type|language): +[\S ]+$', + command_output, + re.M) + if device_sections: + for section in device_sections: + device_dict = {} + for line in section.split('\n'): + line_split = re.search(r'^([\w #]+): +?(\S[\S ]{0,})$',line) + if line_split: + device_dict[line_split.group(1)] = line_split.group(2) + devices.append(device_dict) + return devices + + def formatMessage(self, sourceAddress, destAddress, opCode, payload = None): + message_string = f'{sourceAddress}{destAddress}:{opCode[2:]}' + if payload: + payload_string = ':'.join(map(lambda x: x[2:], payload)) + message_string += ':' + payload_string + return message_string.lower() \ No newline at end of file diff --git a/framework/core/logModule.py b/framework/core/logModule.py index f6ab68e..d1e76ca 100755 --- a/framework/core/logModule.py +++ b/framework/core/logModule.py @@ -31,7 +31,9 @@ #* ****************************************************************************** import os +from io import IOBase import logging +from threading import Thread import time import datetime #from datetime import datetime @@ -110,6 +112,8 @@ def __init__(self, moduleName, level=INFO): self.path = None self.logFile = None self.csvLogFile = None + self.failedSteps = {} + def __del__(self): """Deletes the logger instance. @@ -486,4 +490,3 @@ def stepResult(self, result, message): message = "[{}]: RESULT : [{}]: {}".format(self.stepNum,resultMessage, message) self.step("=====================Step End======================",showStepNumber=False) self.stepResultMessage(message) - diff --git a/framework/core/streamToFile.py b/framework/core/streamToFile.py new file mode 100644 index 0000000..f0df434 --- /dev/null +++ b/framework/core/streamToFile.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 + +from io import IOBase, SEEK_CUR +from threading import Thread +from os import path +import time + + +class StreamToFile(): + + def __init__(self, outputPath): + self._filePath = outputPath + self._fileHandle = None + self._activeThread = None + self._readLine = 0 + self._stopThread = False + + def writeStreamToFile(self, inputStream: IOBase) -> None: + """ + Starts a new thread to write the contents of an input stream to a file. + + Args: + inputStream (IOBase): The input stream to be read from. + outFileName (str): The path of the output file where the stream data will be written. + If only a file name is given, the file will be written in the current tests log directory. + """ + self._fileHandle = open(self._filePath, 'a+', encoding='utf-8') + self._stopThread = False + newThread = Thread(target=self._writeLogFile, + args=[inputStream, self._fileHandle], + daemon=True) + self._activeThread = newThread + newThread.start() + + def stopStreamedLog(self) -> None: + """ + Stops a previously started thread that is writing to a log file. + + Args: + outFileName (str): The path of the output file associated with the thread to be stopped. + + Raises: + AttributeError: If the specified thread cannot be found. + """ + self._stopThread = True + while self._activeThread.is_alive(): + self._activeThread.join() + + def _writeLogFile(self,streamIn: IOBase, ioOut: IOBase) -> None: + """ + Writes the input stream to a log file. + + Args: + stream_in (IOBase): The stream from a process. + logFilePath (str): File path to write the log out to. + """ + while self._stopThread is False: + chunk = streamIn.readline() + if chunk == '': + break + ioOut.write(chunk) + + def readUntil(self, searchString:str, retries: int = 5) -> None: + """ + Read lines from a file until a specific search string is found, with a specified + number of retries. + + Args: + searchString (str): The string that will be search for. + retries (int): The maximum number of times the method will attempt to find the `searchString`. + Defaults to 5 + + Returns: + list : list of strings including the search line. Empty list when search not found. + """ + result = [] + retry = 0 + max_retries = retries + while retry != max_retries and len(result) == 0: + read_line = self._readLine + self._fileHandle.seek(0) + out_lines = self._fileHandle.readlines() + write_line = len(out_lines) + if read_line == write_line: + time.sleep(1) + else: + while read_line < write_line: + if searchString in out_lines[read_line]: + result = out_lines[:read_line] + break + read_line+=1 + retry += 1 + self._readLine = read_line + return result + + def __del__(self): + self.stopStreamedLog() + if self._fileHandle: + self._fileHandle.close() diff --git a/framework/core/testControl.py b/framework/core/testControl.py index 6388251..cbe230b 100644 --- a/framework/core/testControl.py +++ b/framework/core/testControl.py @@ -155,6 +155,7 @@ def __init__(self, testName="", qcId="", maxRunTime=TEST_MAX_RUN_TIME, level=log self.outboundClient = self.dut.outBoundClient self.powerControl = self.dut.powerControl self.commonRemote = self.dut.remoteController + self.hdmiCECController = self.dut.hdmiCECController self.utils = utilities(self.log) # For UI tests Initialising Video capture and decode the screen_regions.yml for the platform cpePlatform = self.slotInfo.getPlatform() @@ -300,6 +301,8 @@ def testEndFunction(self, powerOff=True): self.webpageController.closeBrowser() if self.capture is not None: self.capture.stop() + if self.hdmiCECController: + self.hdmiCECController.stop() return True def testExceptionCleanUp(self):