Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

24 Hour Reporting and Default Config Implementation for Log Stat Agent #3157

Merged
merged 10 commits into from
Jul 31, 2024
24 changes: 11 additions & 13 deletions services/ops/LogStatisticsAgent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@ which may be an indication of some sort of failure or breach.

### Configuration

The Log Statistics agent has 4 required configuration values:
The Log Statistics agent has 4 configuration parameters, all of which are required:

- `file_path`: This should be the path to the "volttron.log" file
- `analysis_interval_secs`: The interval in seconds between publishing the size delta statistic to the message bus
- `publish_topic`: Can be used to specify a topic to publish log statistics to which does not get captured by the
historian framework (topics not prefixed by any of: "datalogger", "record", "analysis", "devices")
- `historian_topic`: Can be used to specify a topic to publish log statistics to which gets captured by the
historian framework ("datalogger", "record", "analysis", "devices")

The following is an example configuration file:
- `file_path`: The file path to the log file. If left as `null`, defaults to "/home/volttron/volttron.log".
- `analysis_interval_secs`: The interval in seconds between publishes of the size delta statistic to the message bus. If left as `null`, defaults to 60 seconds.
- `publish_topic`: Used to specify a topic to publish log statistics to which does not get captured by the
historian framework (topics not prefixed by any of: "datalogger", "record", "analysis", "devices"). If left as `null`, defaults to `"platform/log_statistics"`.
- `historian_topic`: Can be used to specify a topic to publish log statistics to which gets captured by the
historian framework ("datalogger", "record", "analysis", "devices"). If left as `null`, defaults to `record/log_statistics`.

```json
{
"file_path" : "~/volttron/volttron.log",
"analysis_interval_min" : 60,
"publish_topic" : "platform/log_statistics",
"historian_topic" : "record/log_statistics"
"analysis_interval_sec": 60,
"file_path": null,
"historian_topic": "analysis/log_statistics",
"publish_topic": "platform/log_statistics"
}
```

Expand Down
103 changes: 74 additions & 29 deletions services/ops/LogStatisticsAgent/logstatisticsagent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,13 @@
from volttron.platform.vip.agent import Agent, Core
from volttron.platform.agent import utils
from volttron.platform.agent.utils import get_aware_utc_now
from volttron.platform import get_home
import time
riley206 marked this conversation as resolved.
Show resolved Hide resolved

utils.setup_logging()
_log = logging.getLogger(__name__)
__version__ = '1.0'


def log_statistics(config_path, **kwargs):
"""
Load the LogStatisticsAgent agent configuration and returns and instance
of the agent created using that configuration.
:param config_path: Path to a configuration file.
:type config_path: str
:returns: LogStatisticsAgent agent instance
:rtype: LogStatisticsAgent agent
"""
config = utils.load_config(config_path)
return LogStatisticsAgent(config, **kwargs)


class LogStatisticsAgent(Agent):
"""
LogStatisticsAgent reads volttron.log file size every hour, compute the size delta from previous hour and publish
Expand All @@ -66,8 +54,30 @@ class LogStatisticsAgent(Agent):
}
"""

def __init__(self, config, **kwargs):
def __init__(self, config_path=None, **kwargs):
super(LogStatisticsAgent, self).__init__(**kwargs)
self.configured = False
self.last_std_dev_time = get_aware_utc_now()

self.default_config = {
"file_path": f"volttron/volttron.log",
"analysis_interval_sec": 60,
"publish_topic": "platform/log_statistics",
"historian_topic": "analysis/log_statistics"
}
if config_path:
self.default_config.update(utils.load_config(config_path))
self.vip.config.set_default("config", self.default_config)
self.vip.config.subscribe(self.configure_main, actions=["NEW", "UPDATE"], pattern="config")

def configure_main(self, config_name, action, contents):
config = self.default_config.copy()
config.update(contents)
self.configured = True
if action == "NEW" or "UPDATE":
self.reset_parameters(config)

def reset_parameters(self, config=None):
self.analysis_interval_sec = config["analysis_interval_sec"]
self.file_path = config["file_path"]
self.publish_topic = config["publish_topic"]
Expand All @@ -76,6 +86,8 @@ def __init__(self, config, **kwargs):
self.file_start_size = None
self.prev_file_size = None
self._scheduled_event = None
if self.configured:
self.publish_analysis()

@Core.receiver('onstart')
def starting(self, sender, **kwargs):
Expand All @@ -87,6 +99,9 @@ def publish_analysis(self):
Publishes file's size increment in previous time interval (60 minutes) with timestamp.
Also publishes standard deviation of file's hourly size differences every 24 hour.
"""
if not self.configured:
riley206 marked this conversation as resolved.
Show resolved Hide resolved
return

if self._scheduled_event is not None:
self._scheduled_event.cancel()

Expand All @@ -105,24 +120,54 @@ def publish_analysis(self):

headers = {'Date': datetime.datetime.utcnow().isoformat() + 'Z'}

publish_message = {'timestamp': datetime.datetime.utcnow().isoformat() + 'Z',
'log_size_delta': size_delta}
historian_message = [{"log_size_delta ": size_delta},
{"log_size_delta ": {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}}]
publish_message = {'timestamp': datetime.datetime.utcnow().isoformat() + 'Z', 'log_size_delta': size_delta}
historian_message = [{
"log_size_delta ": size_delta
}, {
"log_size_delta ": {
'units': 'bytes',
'tz': 'UTC',
'type': 'float'
}
}]

if len(self.size_delta_list) == 24:
standard_deviation = statistics.stdev(self.size_delta_list)
publish_message['log_std_dev'] = standard_deviation
historian_message[0]['log_std_dev'] = standard_deviation
historian_message[1]['log_std_dev'] = {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}
now = get_aware_utc_now()
hours_since_last_std_dev = (now - self.last_std_dev_time).total_seconds() / 3600

_log.debug('publishing message {} with header {} on historian topic {}'
.format(historian_message, headers, self.historian_topic))
self.vip.pubsub.publish(peer="pubsub", topic=self.historian_topic, headers=headers,
message=historian_message)
if hours_since_last_std_dev >= 24:
if self.size_delta_list: # make sure it has something in it
if len(self.size_delta_list) >= 2: # make sure it has more than two items
mean = statistics.mean(self.size_delta_list)
standard_deviation = statistics.stdev(self.size_delta_list)

publish_message['log_mean'] = mean
print(f"Calculated mean: {mean}")
publish_message['log_std_dev'] = standard_deviation

historian_message[0]['log_mean'] = mean
historian_message[0]['log_std_dev'] = standard_deviation

historian_message[1]['log_mean'] = {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}
historian_message[1]['log_std_dev'] = {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}

else:
_log.info("Not enough data points to calculate standard deviation")

else:
_log.info("Not enough data points to calculate mean and standard deviation")

# Reset time
self.last_std_dev_time = now

self.size_delta_list = []

_log.debug('publishing message {} with header {} on historian topic {}'.format(
historian_message, headers, self.historian_topic))
self.vip.pubsub.publish(peer="pubsub",
topic=self.historian_topic,
headers=headers,
message=historian_message)

_log.debug('publishing message {} on topic {}'.format(publish_message, self.publish_topic))
self.vip.pubsub.publish(peer="pubsub", topic=self.publish_topic, message=publish_message)

Expand All @@ -143,7 +188,7 @@ def main(argv=sys.argv):
"""
Main method called by the platform.
"""
utils.vip_main(log_statistics, identity='platform.logstatisticsagent')
utils.vip_main(LogStatisticsAgent, identity='platform.log_statistics')


if __name__ == '__main__':
Expand Down
Loading