Skip to content

Commit

Permalink
REST-128. Allow extra tags in KijiREST/TSDB collector
Browse files Browse the repository at this point in the history
 - Add flag: --tags="key=value,key=value"
 - Fix typo in KijiREST client

issue: https://jira.kiji.org/browse/REST-128
review: https://review.kiji.org/r/1880
  • Loading branch information
Christophe Taton committed May 23, 2014
1 parent 885a7d7 commit a9db0db
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 24 deletions.
2 changes: 1 addition & 1 deletion clients/python/kiji/rest/kiji_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def address(self):
@property
def admin_address(self):
"""Returns: 'host:port' admin address of the KijiREST server, as a string."""
return self._admiin_address
return self._admin_address

def Request(
self,
Expand Down
90 changes: 68 additions & 22 deletions clients/python/kiji/rest/tsdb_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,34 @@ class Error(Exception):
# ------------------------------------------------------------------------------


def Push(tsdb, hostname, values, path=None, mkmetric=False, **kwargs):
def ListMetrics(values, path=None):
"""Lists the metrics that can be populated from a KijiREST metrics report.
Args:
values: KijiREST metrics report (JSON decoded object).
path: metric name prefix.
Yield:
OpenTSDB metric name.
"""
for key, value in values.items():
vpath = key if path is None else '%s.%s' % (path, key)
if isinstance(value, collections.Iterable) and not isinstance(value, str):
yield from ListMetrics(values=value, path=vpath)
elif isinstance(value, (int, float)):
yield vpath
else:
logging.log(
LogLevel.DEBUG_VERBOSE, 'Not a numeric metric: %r = %r', vpath, value)


def Push(tsdb, hostname, values, path=None, **kwargs):
"""Recursively converts a Python value (nested dicts) into a set of TSDB puts.
Args:
tsdb: TSDB client to use.
hostname: Name of the host to report metrics for.
values: Python value to convert (nested dicts).
path: Optional path prefix for the metrics to write.
mkmetric: When set, lists the metrics instead of sending puts to TSDB.
**kwargs: Optional set of tags to set on the metrics.
"""
for key, value in values.items():
Expand All @@ -51,14 +70,10 @@ def Push(tsdb, hostname, values, path=None, mkmetric=False, **kwargs):
hostname=hostname,
values=value,
path=vpath,
mkmetric=mkmetric,
**kwargs
)
elif isinstance(value, (int, float)):
if mkmetric:
yield vpath
else:
tsdb.Put(metric=vpath, value=value, host=hostname, **kwargs)
tsdb.Put(metric=vpath, value=value, host=hostname, **kwargs)
else:
logging.log(
LogLevel.DEBUG_VERBOSE, 'Not a numeric metric: %r = %r', vpath, value)
Expand Down Expand Up @@ -102,6 +117,13 @@ def RegisterFlags(self):
'None or empty means use localhost fully-qualified name.'),
)

self.flags.AddString(
name='tags',
default=None,
help=('Comma-separated list of extra tags (key=value) '
'to put on the reported metrics.'),
)

@property
def hostname(self):
"""Returns: the hostname of the KijiREST server to report metrics of."""
Expand All @@ -120,6 +142,23 @@ def Run(self, args):
hostname = socket.getfqdn()
self._hostname = hostname

self._tags = {}

tags = self.flags.tags
if (tags is not None) and (len(tags) > 0):
for tag in tags.split(','):
key, value = tag.split('=')
key = key.strip()
value = value.strip()
self._tags[key] = value

logging.info('Using extra tags: %r', self._tags)

self._rest_client = kiji_rest.KijiRestClient(
address = self.flags.kiji_rest_address,
admin_address = self.flags.kiji_rest_admin_address,
)

while True:
logging.info(
'Starting KijiREST metrics collection loop for host %s',
Expand All @@ -128,24 +167,12 @@ def Run(self, args):

def _CollectLoop(self):
"""Internal loop that collects KijiREST metrics and pushes them to TSDB."""
rest_client = kiji_rest.KijiRestClient(
address = self.flags.kiji_rest_address,
admin_address = self.flags.kiji_rest_admin_address,
)
tsdb = tsdb_client.TSDB(self.flags.tsdb_address)
self._tsdb = tsdb_client.TSDB(self.flags.tsdb_address)
try:
while True:
timestamp = int(time.time())
try:
logging.debug(
'Pushing KijiREST metrics for hostname=%s timestamp=%s',
self.hostname, timestamp)
Push(
tsdb = tsdb,
hostname = self.hostname,
timestamp = timestamp,
values = rest_client.GetMetrics(),
)
self._CollectAndPush(timestamp)
except ConnectionResetError:
logging.error(
'Error while pushing KijiREST metrics for hostname=%s timestamp=%s',
Expand All @@ -162,7 +189,26 @@ def _CollectLoop(self):

time.sleep(self.flags.interval_secs)
finally:
tsdb.Close()
self._tsdb.Close()

def _CollectAndPush(self, timestamp):
"""Collects and pushes one monitoring sample.
Args:
timestamp: Timestamp of the sample.
"""
logging.debug(
'Collecting metrics from KijiREST server %r (admin %r)',
self._rest_client.address, self._rest_client.admin_address)
metrics = self._rest_client.GetMetrics()
logging.debug('Pushing KijiREST metrics: %r', metrics)
Push(
tsdb = self._tsdb,
hostname = self.hostname,
timestamp = timestamp,
values = metrics,
**self._tags
)


# ------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion clients/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def Main():

setup(
name = 'kiji-rest',
version = '1.0.4',
version = '1.0.5',
packages = ['kiji.rest'],
package_dir = {
'kiji': 'kiji',
Expand Down

0 comments on commit a9db0db

Please sign in to comment.