Skip to content

Commit

Permalink
remove on start, added new units
Browse files Browse the repository at this point in the history
  • Loading branch information
riley206 committed May 20, 2024
1 parent 521ec54 commit b854b10
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 33 deletions.
21 changes: 14 additions & 7 deletions services/ops/LogStatisticsAgent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,26 @@ which may be an indication of some sort of failure or breach.

The Log Statistics agent has 4 configuration parameters, all of which are required:

- `file_path`: The file path to the log file. If left as `null`, defaults to "/home/volttron/volttron.log".
- `analysis_interval_secs`: The interval in seconds between publishes of the size delta statistic to the message bus. If left as `null`, defaults to 60 seconds.
- `file_path`: The file path to the log file. If no config provided, defaults to `'volttron.log'` located within your VOLTTRON_HOME environment variable.
- `analysis_interval_secs`: The interval in seconds between publishes of the size delta statistic to the message bus. If no config provided, defaults to 60 seconds.
- `publish_topic`: Used to specify a topic to publish log statistics to which does not get captured by the
historian framework (topics not prefixed by any of: "datalogger", "record", "analysis", "devices"). If left as `null`, defaults to `"platform/log_statistics"`.
historian framework (topics not prefixed by any of: "datalogger", "record", "analysis", "devices"). If no config provided, defaults to `"platform/log_statistics"`.
- `historian_topic`: Can be used to specify a topic to publish log statistics to which gets captured by the
historian framework ("datalogger", "record", "analysis", "devices"). If left as `null`, defaults to `record/log_statistics`.

historian framework ("datalogger", "record", "analysis", "devices"). If no config provided, defaults to `record/log_statistics`.
- `unit`: Can be used to specify units. Defaults to `bytes`.
- "bytes"
- "kb"
- "mb"
- "gb"

Here is an example configuration file named `log_stat_config.json`.
```json
{
"analysis_interval_sec": 60,
"file_path": null,
"file_path": "path/to/.log/",
"historian_topic": "analysis/log_statistics",
"publish_topic": "platform/log_statistics"
"publish_topic": "platform/log_statistics",
"unit": "bytes"
}
```

Expand Down
9 changes: 5 additions & 4 deletions services/ops/LogStatisticsAgent/logstatisticsagent.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"file_path" : "~/volttron/volttron.log",
"analysis_interval_sec" : 60,
"publish_topic" : "platform/log_statistics",
"historian_topic" : "analysis/log_statistics"
"analysis_interval_sec": 20,
"file_path": "/home/riley/DRIVERWORK/PORTS/rileysVOLTTRON/volttron.log",
"historian_topic": "analysis/log_statistics",
"publish_topic": "platform/log_statistics",
"unit": "mb"
}
58 changes: 36 additions & 22 deletions services/ops/LogStatisticsAgent/logstatisticsagent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
from volttron.platform.vip.agent import Agent, Core
from volttron.platform.agent import utils
from volttron.platform.agent.utils import get_aware_utc_now
from volttron.platform import get_home
import time

utils.setup_logging()
_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,10 +58,11 @@ def __init__(self, config_path=None, **kwargs):
self.last_std_dev_time = get_aware_utc_now()

self.default_config = {
"file_path": f"volttron/volttron.log",
"file_path": "volttron.log",
"analysis_interval_sec": 60,
"publish_topic": "platform/log_statistics",
"historian_topic": "analysis/log_statistics"
"historian_topic": "analysis/log_statistics",
"unit": "bytes"
}
if config_path:
self.default_config.update(utils.load_config(config_path))
Expand All @@ -76,56 +75,58 @@ def configure_main(self, config_name, action, contents):
self.configured = True
if action == "NEW" or "UPDATE":
self.reset_parameters(config)
_log.info("Starting " + self.__class__.__name__ + " agent")
self.publish_analysis()

def reset_parameters(self, config=None):
self.analysis_interval_sec = config["analysis_interval_sec"]
self.file_path = config["file_path"]
self.publish_topic = config["publish_topic"]
self.historian_topic = config["historian_topic"]
self.unit = config["unit"]
self.size_delta_list = []
self.file_start_size = None
self.prev_file_size = None
self._scheduled_event = None
if self.configured:
self.publish_analysis()

@Core.receiver('onstart')
def starting(self, sender, **kwargs):
_log.info("Starting " + self.__class__.__name__ + " agent")
self.publish_analysis()

def publish_analysis(self):
"""
Publishes file's size increment in previous time interval (60 minutes) with timestamp.
Also publishes standard deviation of file's hourly size differences every 24 hour.
"""
if not self.configured:
if not hasattr(self, '_scheduled_event'):
# The settings haven't been initialized, so skip the rest of the method
return

if self._scheduled_event is not None:
self._scheduled_event.cancel()

if self.prev_file_size is None:
self.prev_file_size = self.get_file_size()
_log.debug("init_file_size = {}".format(self.prev_file_size))
_log.debug(f"init_file_size = {self.convert_bytes(self.prev_file_size, self.unit)} {self.unit}")
else:
# read file size
curr_file_size = self.get_file_size()

# calculate size delta
size_delta = curr_file_size - self.prev_file_size
size_delta = self.convert_bytes(size_delta, self.unit)

self.prev_file_size = curr_file_size

self.size_delta_list.append(size_delta)

headers = {'Date': datetime.datetime.utcnow().isoformat() + 'Z'}

publish_message = {'timestamp': datetime.datetime.utcnow().isoformat() + 'Z', 'log_size_delta': size_delta}
publish_message = {'timestamp': datetime.datetime.utcnow().isoformat() + 'Z',
'log_size_delta': size_delta}
historian_message = [{
"log_size_delta ": size_delta
}, {
"log_size_delta ": {
'units': 'bytes',
'units': f'{self.unit}',
'tz': 'UTC',
'type': 'float'
}
Expand All @@ -135,8 +136,8 @@ def publish_analysis(self):
hours_since_last_std_dev = (now - self.last_std_dev_time).total_seconds() / 3600

if hours_since_last_std_dev >= 24:
if self.size_delta_list: # make sure it has something in it
if len(self.size_delta_list) >= 2: # make sure it has more than two items
if self.size_delta_list: # make sure it has something in it
if len(self.size_delta_list) >= 2: # make sure it has more than two items
mean = statistics.mean(self.size_delta_list)
standard_deviation = statistics.stdev(self.size_delta_list)

Expand All @@ -147,8 +148,9 @@ def publish_analysis(self):
historian_message[0]['log_mean'] = mean
historian_message[0]['log_std_dev'] = standard_deviation

historian_message[1]['log_mean'] = {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}
historian_message[1]['log_std_dev'] = {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}
historian_message[1]['log_mean'] = {'units': f'{self.unit}', 'tz': 'UTC', 'type': 'float'}
historian_message[1]['log_std_dev'] = {'units': f'{self.unit}', 'tz': 'UTC',
'type': 'float'}

else:
_log.info("Not enough data points to calculate standard deviation")
Expand All @@ -161,14 +163,15 @@ def publish_analysis(self):

self.size_delta_list = []

_log.debug('publishing message {} with header {} on historian topic {}'.format(
historian_message, headers, self.historian_topic))
_log.debug(f'publishing message {historian_message}'
f' with header {headers}'
f' on historian topic {self.historian_topic}')
self.vip.pubsub.publish(peer="pubsub",
topic=self.historian_topic,
headers=headers,
message=historian_message)

_log.debug('publishing message {} on topic {}'.format(publish_message, self.publish_topic))
_log.debug(f'publishing message {publish_message} {self.unit} on topic {self.publish_topic}')
self.vip.pubsub.publish(peer="pubsub", topic=self.publish_topic, message=publish_message)

_log.debug('Scheduling next periodic call')
Expand All @@ -183,14 +186,25 @@ def get_file_size(self):
except OSError as e:
_log.error(e)

def convert_bytes(self, size, unit):
"""
Converts size from bytes to the specified unit
"""
unit = unit.lower()
if unit == 'kb':
return size / 1024
elif unit == 'mb':
return size / 1024 ** 2
elif unit == 'gb':
return size / 1024 ** 3
return size

def main(argv=sys.argv):
"""
Main method called by the platform.
"""
utils.vip_main(LogStatisticsAgent, identity='platform.log_statistics')


if __name__ == '__main__':
# Entry point for script
try:
Expand Down

0 comments on commit b854b10

Please sign in to comment.