From a9db0db1e2e263754a48b97a5df5f07430b4d78a Mon Sep 17 00:00:00 2001 From: Christophe Taton Date: Thu, 22 May 2014 18:49:34 -0700 Subject: [PATCH] REST-128. Allow extra tags in KijiREST/TSDB collector - Add flag: --tags="key=value,key=value" - Fix typo in KijiREST client issue: https://jira.kiji.org/browse/REST-128 review: https://review.kiji.org/r/1880 --- clients/python/kiji/rest/kiji_rest.py | 2 +- clients/python/kiji/rest/tsdb_collector.py | 90 ++++++++++++++++------ clients/python/setup.py | 2 +- 3 files changed, 70 insertions(+), 24 deletions(-) diff --git a/clients/python/kiji/rest/kiji_rest.py b/clients/python/kiji/rest/kiji_rest.py index 5d0ae28..1bb429d 100755 --- a/clients/python/kiji/rest/kiji_rest.py +++ b/clients/python/kiji/rest/kiji_rest.py @@ -115,7 +115,7 @@ def address(self): @property def admin_address(self): """Returns: 'host:port' admin address of the KijiREST server, as a string.""" - return self._admiin_address + return self._admin_address def Request( self, diff --git a/clients/python/kiji/rest/tsdb_collector.py b/clients/python/kiji/rest/tsdb_collector.py index 4f11778..551d14e 100755 --- a/clients/python/kiji/rest/tsdb_collector.py +++ b/clients/python/kiji/rest/tsdb_collector.py @@ -31,7 +31,27 @@ class Error(Exception): # ------------------------------------------------------------------------------ -def Push(tsdb, hostname, values, path=None, mkmetric=False, **kwargs): +def ListMetrics(values, path=None): + """Lists the metrics that can be populated from a KijiREST metrics report. + + Args: + values: KijiREST metrics report (JSON decoded object). + path: metric name prefix. + Yield: + OpenTSDB metric name. + """ + for key, value in values.items(): + vpath = key if path is None else '%s.%s' % (path, key) + if isinstance(value, collections.Iterable) and not isinstance(value, str): + yield from ListMetrics(values=value, path=vpath) + elif isinstance(value, (int, float)): + yield vpath + else: + logging.log( + LogLevel.DEBUG_VERBOSE, 'Not a numeric metric: %r = %r', vpath, value) + + +def Push(tsdb, hostname, values, path=None, **kwargs): """Recursively converts a Python value (nested dicts) into a set of TSDB puts. Args: @@ -39,7 +59,6 @@ def Push(tsdb, hostname, values, path=None, mkmetric=False, **kwargs): hostname: Name of the host to report metrics for. values: Python value to convert (nested dicts). path: Optional path prefix for the metrics to write. - mkmetric: When set, lists the metrics instead of sending puts to TSDB. **kwargs: Optional set of tags to set on the metrics. """ for key, value in values.items(): @@ -51,14 +70,10 @@ def Push(tsdb, hostname, values, path=None, mkmetric=False, **kwargs): hostname=hostname, values=value, path=vpath, - mkmetric=mkmetric, **kwargs ) elif isinstance(value, (int, float)): - if mkmetric: - yield vpath - else: - tsdb.Put(metric=vpath, value=value, host=hostname, **kwargs) + tsdb.Put(metric=vpath, value=value, host=hostname, **kwargs) else: logging.log( LogLevel.DEBUG_VERBOSE, 'Not a numeric metric: %r = %r', vpath, value) @@ -102,6 +117,13 @@ def RegisterFlags(self): 'None or empty means use localhost fully-qualified name.'), ) + self.flags.AddString( + name='tags', + default=None, + help=('Comma-separated list of extra tags (key=value) ' + 'to put on the reported metrics.'), + ) + @property def hostname(self): """Returns: the hostname of the KijiREST server to report metrics of.""" @@ -120,6 +142,23 @@ def Run(self, args): hostname = socket.getfqdn() self._hostname = hostname + self._tags = {} + + tags = self.flags.tags + if (tags is not None) and (len(tags) > 0): + for tag in tags.split(','): + key, value = tag.split('=') + key = key.strip() + value = value.strip() + self._tags[key] = value + + logging.info('Using extra tags: %r', self._tags) + + self._rest_client = kiji_rest.KijiRestClient( + address = self.flags.kiji_rest_address, + admin_address = self.flags.kiji_rest_admin_address, + ) + while True: logging.info( 'Starting KijiREST metrics collection loop for host %s', @@ -128,24 +167,12 @@ def Run(self, args): def _CollectLoop(self): """Internal loop that collects KijiREST metrics and pushes them to TSDB.""" - rest_client = kiji_rest.KijiRestClient( - address = self.flags.kiji_rest_address, - admin_address = self.flags.kiji_rest_admin_address, - ) - tsdb = tsdb_client.TSDB(self.flags.tsdb_address) + self._tsdb = tsdb_client.TSDB(self.flags.tsdb_address) try: while True: timestamp = int(time.time()) try: - logging.debug( - 'Pushing KijiREST metrics for hostname=%s timestamp=%s', - self.hostname, timestamp) - Push( - tsdb = tsdb, - hostname = self.hostname, - timestamp = timestamp, - values = rest_client.GetMetrics(), - ) + self._CollectAndPush(timestamp) except ConnectionResetError: logging.error( 'Error while pushing KijiREST metrics for hostname=%s timestamp=%s', @@ -162,7 +189,26 @@ def _CollectLoop(self): time.sleep(self.flags.interval_secs) finally: - tsdb.Close() + self._tsdb.Close() + + def _CollectAndPush(self, timestamp): + """Collects and pushes one monitoring sample. + + Args: + timestamp: Timestamp of the sample. + """ + logging.debug( + 'Collecting metrics from KijiREST server %r (admin %r)', + self._rest_client.address, self._rest_client.admin_address) + metrics = self._rest_client.GetMetrics() + logging.debug('Pushing KijiREST metrics: %r', metrics) + Push( + tsdb = self._tsdb, + hostname = self.hostname, + timestamp = timestamp, + values = metrics, + **self._tags + ) # ------------------------------------------------------------------------------ diff --git a/clients/python/setup.py b/clients/python/setup.py index b0ebe51..5145729 100755 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -32,7 +32,7 @@ def Main(): setup( name = 'kiji-rest', - version = '1.0.4', + version = '1.0.5', packages = ['kiji.rest'], package_dir = { 'kiji': 'kiji',