diff --git a/services/ops/LogStatisticsAgent/README.md b/services/ops/LogStatisticsAgent/README.md index b663023012..53ae291eaa 100644 --- a/services/ops/LogStatisticsAgent/README.md +++ b/services/ops/LogStatisticsAgent/README.md @@ -8,23 +8,28 @@ which may be an indication of some sort of failure or breach. ### Configuration -The Log Statistics agent has 4 required configuration values: - -- `file_path`: This should be the path to the "volttron.log" file -- `analysis_interval_secs`: The interval in seconds between publishing the size delta statistic to the message bus -- `publish_topic`: Can be used to specify a topic to publish log statistics to which does not get captured by the - historian framework (topics not prefixed by any of: "datalogger", "record", "analysis", "devices") -- `historian_topic`: Can be used to specify a topic to publish log statistics to which gets captured by the - historian framework ("datalogger", "record", "analysis", "devices") - -The following is an example configuration file: - +The Log Statistics agent has 4 configuration parameters, all of which are required: + +- `file_path`: The file path to the log file. If no config provided, defaults to `'volttron.log'` located within your VOLTTRON_HOME environment variable. +- `analysis_interval_secs`: The interval in seconds between publishes of the size delta statistic to the message bus. If no config provided, defaults to 60 seconds. +- `publish_topic`: Used to specify a topic to publish log statistics to which does not get captured by the + historian framework (topics not prefixed by any of: "datalogger", "record", "analysis", "devices"). If no config provided, defaults to `"platform/log_statistics"`. +- `historian_topic`: Can be used to specify a topic to publish log statistics to which gets captured by the + historian framework ("datalogger", "record", "analysis", "devices"). If no config provided, defaults to `record/log_statistics`. +- `unit`: Can be used to specify units. Defaults to `bytes`. + - "bytes" + - "kb" + - "mb" + - "gb" + +Here is an example configuration file named `log_stat_config.json`. ```json { - "file_path" : "~/volttron/volttron.log", - "analysis_interval_min" : 60, - "publish_topic" : "platform/log_statistics", - "historian_topic" : "record/log_statistics" + "analysis_interval_sec": 60, + "file_path": "path/to/.log/", + "historian_topic": "analysis/log_statistics", + "publish_topic": "platform/log_statistics", + "unit": "bytes" } ``` diff --git a/services/ops/LogStatisticsAgent/log_stat_config.json b/services/ops/LogStatisticsAgent/log_stat_config.json new file mode 100644 index 0000000000..c5f8813c6e --- /dev/null +++ b/services/ops/LogStatisticsAgent/log_stat_config.json @@ -0,0 +1,7 @@ +{ + "analysis_interval_sec": 20, + "file_path": "/home/riley/DRIVERWORK/PORTS/rileysVOLTTRON/volttron.log", + "historian_topic": "analysis/log_statistics", + "publish_topic": "platform/log_statistics", + "unit": "mb" +} diff --git a/services/ops/LogStatisticsAgent/logstatisticsagent.config b/services/ops/LogStatisticsAgent/logstatisticsagent.config deleted file mode 100644 index 176d45ed88..0000000000 --- a/services/ops/LogStatisticsAgent/logstatisticsagent.config +++ /dev/null @@ -1,6 +0,0 @@ -{ - "file_path" : "~/volttron/volttron.log", - "analysis_interval_sec" : 60, - "publish_topic" : "platform/log_statistics", - "historian_topic" : "analysis/log_statistics" -} diff --git a/services/ops/LogStatisticsAgent/logstatisticsagent/agent.py b/services/ops/LogStatisticsAgent/logstatisticsagent/agent.py index 0a624ea1d3..9883894dbf 100644 --- a/services/ops/LogStatisticsAgent/logstatisticsagent/agent.py +++ b/services/ops/LogStatisticsAgent/logstatisticsagent/agent.py @@ -36,20 +36,6 @@ _log = logging.getLogger(__name__) __version__ = '1.0' - -def log_statistics(config_path, **kwargs): - """ - Load the LogStatisticsAgent agent configuration and returns and instance - of the agent created using that configuration. - :param config_path: Path to a configuration file. - :type config_path: str - :returns: LogStatisticsAgent agent instance - :rtype: LogStatisticsAgent agent - """ - config = utils.load_config(config_path) - return LogStatisticsAgent(config, **kwargs) - - class LogStatisticsAgent(Agent): """ LogStatisticsAgent reads volttron.log file size every hour, compute the size delta from previous hour and publish @@ -66,39 +52,67 @@ class LogStatisticsAgent(Agent): } """ - def __init__(self, config, **kwargs): + def __init__(self, config_path=None, **kwargs): super(LogStatisticsAgent, self).__init__(**kwargs) + self.configured = False + self.last_std_dev_time = get_aware_utc_now() + + self.default_config = { + "file_path": "volttron.log", + "analysis_interval_sec": 60, + "publish_topic": "platform/log_statistics", + "historian_topic": "analysis/log_statistics", + "unit": "bytes" + } + if config_path: + self.default_config.update(utils.load_config(config_path)) + self.vip.config.set_default("config", self.default_config) + self.vip.config.subscribe(self.configure_main, actions=["NEW", "UPDATE"], pattern="config") + + def configure_main(self, config_name, action, contents): + config = self.default_config.copy() + config.update(contents) + self.configured = True + if action == "NEW" or "UPDATE": + self.reset_parameters(config) + _log.info("Starting " + self.__class__.__name__ + " agent") + + def reset_parameters(self, config=None): self.analysis_interval_sec = config["analysis_interval_sec"] self.file_path = config["file_path"] self.publish_topic = config["publish_topic"] self.historian_topic = config["historian_topic"] + self.unit = config["unit"] self.size_delta_list = [] self.file_start_size = None self.prev_file_size = None self._scheduled_event = None - - @Core.receiver('onstart') - def starting(self, sender, **kwargs): - _log.info("Starting " + self.__class__.__name__ + " agent") - self.publish_analysis() + if self.configured: + self.publish_analysis() def publish_analysis(self): """ Publishes file's size increment in previous time interval (60 minutes) with timestamp. Also publishes standard deviation of file's hourly size differences every 24 hour. """ + if not hasattr(self, '_scheduled_event'): + # The settings haven't been initialized, so skip the rest of the method + return + if self._scheduled_event is not None: self._scheduled_event.cancel() if self.prev_file_size is None: self.prev_file_size = self.get_file_size() - _log.debug("init_file_size = {}".format(self.prev_file_size)) + _log.debug(f"init_file_size = {self.convert_bytes(self.prev_file_size, self.unit)} {self.unit}") else: # read file size curr_file_size = self.get_file_size() # calculate size delta size_delta = curr_file_size - self.prev_file_size + size_delta = self.convert_bytes(size_delta, self.unit) + self.prev_file_size = curr_file_size self.size_delta_list.append(size_delta) @@ -107,23 +121,56 @@ def publish_analysis(self): publish_message = {'timestamp': datetime.datetime.utcnow().isoformat() + 'Z', 'log_size_delta': size_delta} - historian_message = [{"log_size_delta ": size_delta}, - {"log_size_delta ": {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}}] - - if len(self.size_delta_list) == 24: - standard_deviation = statistics.stdev(self.size_delta_list) - publish_message['log_std_dev'] = standard_deviation - historian_message[0]['log_std_dev'] = standard_deviation - historian_message[1]['log_std_dev'] = {'units': 'bytes', 'tz': 'UTC', 'type': 'float'} - - _log.debug('publishing message {} with header {} on historian topic {}' - .format(historian_message, headers, self.historian_topic)) - self.vip.pubsub.publish(peer="pubsub", topic=self.historian_topic, headers=headers, - message=historian_message) + historian_message = [{ + "log_size_delta ": size_delta + }, { + "log_size_delta ": { + 'units': f'{self.unit}', + 'tz': 'UTC', + 'type': 'float' + } + }] + + now = get_aware_utc_now() + hours_since_last_std_dev = (now - self.last_std_dev_time).total_seconds() / 3600 + + if hours_since_last_std_dev >= 24: + if self.size_delta_list: # make sure it has something in it + if len(self.size_delta_list) >= 2: # make sure it has more than two items + mean = statistics.mean(self.size_delta_list) + standard_deviation = statistics.stdev(self.size_delta_list) + + publish_message['log_mean'] = mean + print(f"Calculated mean: {mean}") + publish_message['log_std_dev'] = standard_deviation + + historian_message[0]['log_mean'] = mean + historian_message[0]['log_std_dev'] = standard_deviation + + historian_message[1]['log_mean'] = {'units': f'{self.unit}', 'tz': 'UTC', 'type': 'float'} + historian_message[1]['log_std_dev'] = {'units': f'{self.unit}', 'tz': 'UTC', + 'type': 'float'} + + else: + _log.info("Not enough data points to calculate standard deviation") + + else: + _log.info("Not enough data points to calculate mean and standard deviation") + + # Reset time + self.last_std_dev_time = now self.size_delta_list = [] - _log.debug('publishing message {} on topic {}'.format(publish_message, self.publish_topic)) + _log.debug(f'publishing message {historian_message}' + f' with header {headers}' + f' on historian topic {self.historian_topic}') + self.vip.pubsub.publish(peer="pubsub", + topic=self.historian_topic, + headers=headers, + message=historian_message) + + _log.debug(f'publishing message {publish_message} {self.unit} on topic {self.publish_topic}') self.vip.pubsub.publish(peer="pubsub", topic=self.publish_topic, message=publish_message) _log.debug('Scheduling next periodic call') @@ -138,13 +185,24 @@ def get_file_size(self): except OSError as e: _log.error(e) + def convert_bytes(self, size, unit): + """ + Converts size from bytes to the specified unit + """ + unit = unit.lower() + if unit == 'kb': + return size / 1024 + elif unit == 'mb': + return size / 1024 ** 2 + elif unit == 'gb': + return size / 1024 ** 3 + return size def main(argv=sys.argv): """ Main method called by the platform. """ - utils.vip_main(log_statistics, identity='platform.logstatisticsagent') - + utils.vip_main(LogStatisticsAgent, identity='platform.log_statistics') if __name__ == '__main__': # Entry point for script