Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

24 Hour Reporting and Default Config Implementation for Log Stat Agent #3157

Merged
merged 10 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions services/ops/LogStatisticsAgent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,28 @@ which may be an indication of some sort of failure or breach.

### Configuration

The Log Statistics agent has 4 required configuration values:

- `file_path`: This should be the path to the "volttron.log" file
- `analysis_interval_secs`: The interval in seconds between publishing the size delta statistic to the message bus
- `publish_topic`: Can be used to specify a topic to publish log statistics to which does not get captured by the
historian framework (topics not prefixed by any of: "datalogger", "record", "analysis", "devices")
- `historian_topic`: Can be used to specify a topic to publish log statistics to which gets captured by the
historian framework ("datalogger", "record", "analysis", "devices")

The following is an example configuration file:

The Log Statistics agent has 4 configuration parameters, all of which are required:

- `file_path`: The file path to the log file. If no config provided, defaults to `'volttron.log'` located within your VOLTTRON_HOME environment variable.
- `analysis_interval_secs`: The interval in seconds between publishes of the size delta statistic to the message bus. If no config provided, defaults to 60 seconds.
- `publish_topic`: Used to specify a topic to publish log statistics to which does not get captured by the
historian framework (topics not prefixed by any of: "datalogger", "record", "analysis", "devices"). If no config provided, defaults to `"platform/log_statistics"`.
- `historian_topic`: Can be used to specify a topic to publish log statistics to which gets captured by the
historian framework ("datalogger", "record", "analysis", "devices"). If no config provided, defaults to `record/log_statistics`.
- `unit`: Can be used to specify units. Defaults to `bytes`.
- "bytes"
- "kb"
- "mb"
- "gb"

Here is an example configuration file named `log_stat_config.json`.
```json
{
"file_path" : "~/volttron/volttron.log",
"analysis_interval_min" : 60,
"publish_topic" : "platform/log_statistics",
"historian_topic" : "record/log_statistics"
"analysis_interval_sec": 60,
"file_path": "path/to/.log/",
"historian_topic": "analysis/log_statistics",
"publish_topic": "platform/log_statistics",
"unit": "bytes"
}
```

Expand Down
7 changes: 7 additions & 0 deletions services/ops/LogStatisticsAgent/log_stat_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"analysis_interval_sec": 20,
"file_path": "/home/riley/DRIVERWORK/PORTS/rileysVOLTTRON/volttron.log",
"historian_topic": "analysis/log_statistics",
"publish_topic": "platform/log_statistics",
"unit": "mb"
}
6 changes: 0 additions & 6 deletions services/ops/LogStatisticsAgent/logstatisticsagent.config

This file was deleted.

132 changes: 95 additions & 37 deletions services/ops/LogStatisticsAgent/logstatisticsagent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,6 @@
_log = logging.getLogger(__name__)
__version__ = '1.0'


def log_statistics(config_path, **kwargs):
"""
Load the LogStatisticsAgent agent configuration and returns and instance
of the agent created using that configuration.
:param config_path: Path to a configuration file.
:type config_path: str
:returns: LogStatisticsAgent agent instance
:rtype: LogStatisticsAgent agent
"""
config = utils.load_config(config_path)
return LogStatisticsAgent(config, **kwargs)


class LogStatisticsAgent(Agent):
"""
LogStatisticsAgent reads volttron.log file size every hour, compute the size delta from previous hour and publish
Expand All @@ -66,39 +52,67 @@ class LogStatisticsAgent(Agent):
}
"""

def __init__(self, config, **kwargs):
def __init__(self, config_path=None, **kwargs):
super(LogStatisticsAgent, self).__init__(**kwargs)
self.configured = False
self.last_std_dev_time = get_aware_utc_now()

self.default_config = {
"file_path": "volttron.log",
"analysis_interval_sec": 60,
"publish_topic": "platform/log_statistics",
"historian_topic": "analysis/log_statistics",
"unit": "bytes"
}
if config_path:
self.default_config.update(utils.load_config(config_path))
self.vip.config.set_default("config", self.default_config)
self.vip.config.subscribe(self.configure_main, actions=["NEW", "UPDATE"], pattern="config")

def configure_main(self, config_name, action, contents):
config = self.default_config.copy()
config.update(contents)
self.configured = True
if action == "NEW" or "UPDATE":
self.reset_parameters(config)
_log.info("Starting " + self.__class__.__name__ + " agent")

def reset_parameters(self, config=None):
self.analysis_interval_sec = config["analysis_interval_sec"]
self.file_path = config["file_path"]
self.publish_topic = config["publish_topic"]
self.historian_topic = config["historian_topic"]
self.unit = config["unit"]
self.size_delta_list = []
self.file_start_size = None
self.prev_file_size = None
self._scheduled_event = None

@Core.receiver('onstart')
def starting(self, sender, **kwargs):
_log.info("Starting " + self.__class__.__name__ + " agent")
self.publish_analysis()
if self.configured:
self.publish_analysis()

def publish_analysis(self):
"""
Publishes file's size increment in previous time interval (60 minutes) with timestamp.
Also publishes standard deviation of file's hourly size differences every 24 hour.
"""
if not hasattr(self, '_scheduled_event'):
# The settings haven't been initialized, so skip the rest of the method
return

if self._scheduled_event is not None:
self._scheduled_event.cancel()

if self.prev_file_size is None:
self.prev_file_size = self.get_file_size()
_log.debug("init_file_size = {}".format(self.prev_file_size))
_log.debug(f"init_file_size = {self.convert_bytes(self.prev_file_size, self.unit)} {self.unit}")
else:
# read file size
curr_file_size = self.get_file_size()

# calculate size delta
size_delta = curr_file_size - self.prev_file_size
size_delta = self.convert_bytes(size_delta, self.unit)

self.prev_file_size = curr_file_size

self.size_delta_list.append(size_delta)
Expand All @@ -107,23 +121,56 @@ def publish_analysis(self):

publish_message = {'timestamp': datetime.datetime.utcnow().isoformat() + 'Z',
'log_size_delta': size_delta}
historian_message = [{"log_size_delta ": size_delta},
{"log_size_delta ": {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}}]

if len(self.size_delta_list) == 24:
standard_deviation = statistics.stdev(self.size_delta_list)
publish_message['log_std_dev'] = standard_deviation
historian_message[0]['log_std_dev'] = standard_deviation
historian_message[1]['log_std_dev'] = {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}

_log.debug('publishing message {} with header {} on historian topic {}'
.format(historian_message, headers, self.historian_topic))
self.vip.pubsub.publish(peer="pubsub", topic=self.historian_topic, headers=headers,
message=historian_message)
historian_message = [{
"log_size_delta ": size_delta
}, {
"log_size_delta ": {
'units': f'{self.unit}',
'tz': 'UTC',
'type': 'float'
}
}]

now = get_aware_utc_now()
hours_since_last_std_dev = (now - self.last_std_dev_time).total_seconds() / 3600

if hours_since_last_std_dev >= 24:
if self.size_delta_list: # make sure it has something in it
if len(self.size_delta_list) >= 2: # make sure it has more than two items
mean = statistics.mean(self.size_delta_list)
standard_deviation = statistics.stdev(self.size_delta_list)

publish_message['log_mean'] = mean
print(f"Calculated mean: {mean}")
publish_message['log_std_dev'] = standard_deviation

historian_message[0]['log_mean'] = mean
historian_message[0]['log_std_dev'] = standard_deviation

historian_message[1]['log_mean'] = {'units': f'{self.unit}', 'tz': 'UTC', 'type': 'float'}
historian_message[1]['log_std_dev'] = {'units': f'{self.unit}', 'tz': 'UTC',
'type': 'float'}

else:
_log.info("Not enough data points to calculate standard deviation")

else:
_log.info("Not enough data points to calculate mean and standard deviation")

# Reset time
self.last_std_dev_time = now

self.size_delta_list = []

_log.debug('publishing message {} on topic {}'.format(publish_message, self.publish_topic))
_log.debug(f'publishing message {historian_message}'
f' with header {headers}'
f' on historian topic {self.historian_topic}')
self.vip.pubsub.publish(peer="pubsub",
topic=self.historian_topic,
headers=headers,
message=historian_message)

_log.debug(f'publishing message {publish_message} {self.unit} on topic {self.publish_topic}')
self.vip.pubsub.publish(peer="pubsub", topic=self.publish_topic, message=publish_message)

_log.debug('Scheduling next periodic call')
Expand All @@ -138,13 +185,24 @@ def get_file_size(self):
except OSError as e:
_log.error(e)

def convert_bytes(self, size, unit):
"""
Converts size from bytes to the specified unit
"""
unit = unit.lower()
if unit == 'kb':
return size / 1024
elif unit == 'mb':
return size / 1024 ** 2
elif unit == 'gb':
return size / 1024 ** 3
return size

def main(argv=sys.argv):
"""
Main method called by the platform.
"""
utils.vip_main(log_statistics, identity='platform.logstatisticsagent')

utils.vip_main(LogStatisticsAgent, identity='platform.log_statistics')

if __name__ == '__main__':
# Entry point for script
Expand Down
Loading