Skip to content

Commit

Permalink
Merge pull request #3 from Lightricks/feature/events
Browse files Browse the repository at this point in the history
Events: support events.
  • Loading branch information
ilya-lt authored Sep 15, 2019
2 parents 8a6b4c0 + 4c26430 commit f14f3b8
Show file tree
Hide file tree
Showing 24 changed files with 625 additions and 216 deletions.
1 change: 1 addition & 0 deletions pyformance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
from .registry import dump_metrics, clear
from .decorators import count_calls, meter_calls, hist_calls, time_calls
from .meters.timer import call_too_long
from .mark_int import MarkInt
2 changes: 1 addition & 1 deletion pyformance/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4"
__version__ = "1.0.0"
19 changes: 19 additions & 0 deletions pyformance/mark_int.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class MarkInt:
"""
Mark metric value as and integer.
Reporters such as influx require consistent data types for metrics and require you
to mark integer values with an "i" suffix. This is here to let Influx know it should
do so for the value it's initialized with.
"""
def __init__(self, value):
self.value = int(value)

def __str__(self):
return str(self.value)

def __repr__(self):
return f"MarkInt({self.value})"

def __eq__(self, other):
return isinstance(other, MarkInt) and other.value == self.value
3 changes: 2 additions & 1 deletion pyformance/meters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
from .histogram import Histogram
from .timer import Timer
from .gauge import Gauge, CallbackGauge, SimpleGauge
from .base_metric import BaseMetric
from .base_metric import BaseMetric
from .event import Event, EventPoint
59 changes: 59 additions & 0 deletions pyformance/meters/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import copy
from dataclasses import dataclass
from threading import Lock
from typing import Any, Dict

from .base_metric import BaseMetric


@dataclass
class EventPoint:
time: int
values: Dict[str, Any]


class Event(BaseMetric):
"""
Report events as specific data points in specific timestamps
This meter is outside of DropWizard's models and is here to support a specific use case of
infrequently running cron like operations that trigger once in a while, do a bunch of work
and dump the metric results for a single timestamp. Unlike all the other meter types, this one
doesn't repeat itself if no activity occurs leading you to think everything is running
constantly and producing data when it is not.
The closest you can get to the same effect without this class is by using a Gauge, setting the
value, invoking report_now, than clearing it right after.
Since those operations above are not within a lock shared by scheduled reporters , it can still
report the gauge twice.
Additionally when using gauges you don't have any control over the name of the field writen to
(just metric name and tags), and can't write a bunch of
values at once but resort to writing values to separate Gauges which will make the lack of
lock condition more likely to be an issue.
Another problem that will pop in such usage is that the metric will still be written, it will
just be written with the initial value of 0, so you won't be able to tell when was the last
successful run with ease.
"""

def __init__(self, clock, key, tags=None):
super(Event, self).__init__(key, tags)
self.lock = Lock()
self.points = []
self.clock = clock

def add(self, values: Dict[str, Any]):
with self.lock:
self.points.append(EventPoint(
time=self.clock.time(),
values=values
))

def clear(self):
with self.lock:
self.points = []

def get_events(self):
with self.lock:
return copy.copy(self.points)
69 changes: 56 additions & 13 deletions pyformance/registry.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import re
import time
from .meters import Counter, Histogram, Meter, Timer, Gauge, CallbackGauge, SimpleGauge, BaseMetric
from typing import Dict

from .meters import BaseMetric, CallbackGauge, Counter, Event, Gauge, Histogram, Meter, \
SimpleGauge, Timer

class MetricsRegistry(object):

class MetricsRegistry(object):
"""
A single interface used to gather metrics on a service. It keeps track of
all the relevant Counters, Meters, Histograms, and Timers. It does not have
Expand All @@ -21,6 +23,7 @@ def __init__(self, clock=time):
self._counters = {}
self._histograms = {}
self._gauges = {}
self._events = {}
self._clock = clock

def add(self, key, metric, tags=None):
Expand All @@ -42,6 +45,7 @@ def add(self, key, metric, tags=None):
(Gauge, self._gauges),
(Timer, self._timers),
(Counter, self._counters),
(Event, self._events),
)
for cls, registry in class_map:
if isinstance(metric, cls):
Expand Down Expand Up @@ -144,11 +148,28 @@ def timer(self, key, tags=None):
)
return self._timers[metric_key]

def event(self, key: str, tags: Dict[str, str] = None) -> Event:
"""
Gets an event reporter based on key and tags
:param key: The metric name / measurement name
:param tags: Tags to attach to the metric
:return: Event object you can add readings to
"""
metric_key = BaseMetric(key, tags)
if metric_key not in self._events:
self._events[metric_key] = Event(
clock=self._clock,
key=key,
tags=tags
)
return self._events[metric_key]

def clear(self):
self._meters.clear()
self._counters.clear()
self._gauges.clear()
self._timers.clear()
self._events.clear()
self._histograms.clear()

def _get_counter_metrics(self, metric_key):
Expand Down Expand Up @@ -196,6 +217,19 @@ def _get_meter_metrics(self, metric_key):
return res
return {}

def _get_event_metrics(self, metric_key):
if metric_key in self._events:
_event = self._events[metric_key]
points = _event.get_events()

if points:
res = {
"events": points
}

return res
return {}

def _get_timer_metrics(self, metric_key):
if metric_key in self._timers:
timer = self._timers[metric_key]
Expand Down Expand Up @@ -237,11 +271,12 @@ def get_metrics(self, key, tags=None):
def _get_metrics_by_metric_key(self, metric_key):
metrics = {}
for getter in (
self._get_counter_metrics,
self._get_histogram_metrics,
self._get_meter_metrics,
self._get_timer_metrics,
self._get_gauge_metrics,
self._get_counter_metrics,
self._get_histogram_metrics,
self._get_meter_metrics,
self._get_timer_metrics,
self._get_gauge_metrics,
self._get_event_metrics,
):
metrics.update(getter(metric_key))
return metrics
Expand All @@ -257,11 +292,12 @@ def dump_metrics(self, key_is_metric=False):
"""
metrics = {}
for metric_type in (
self._counters,
self._histograms,
self._meters,
self._timers,
self._gauges,
self._counters,
self._histograms,
self._meters,
self._timers,
self._gauges,
self._events,
):
for metric_key in metric_type.keys():
if key_is_metric:
Expand All @@ -271,12 +307,15 @@ def dump_metrics(self, key_is_metric=False):

metrics[key] = self._get_metrics_by_metric_key(metric_key)

# Don't repeat events, that's the whole point of events
for _event in self._events.values():
_event.clear()

return metrics


# TODO make sure tags are supported properly
class RegexRegistry(MetricsRegistry):

"""
A single interface used to gather metrics on a service. This class uses a regex to combine
measures that match a pattern. For example, if you have a REST API, instead of defining
Expand Down Expand Up @@ -348,6 +387,10 @@ def timer(key, tags=None):
return _global_registry.timer(key, tags)


def event(key, tags=None):
return _global_registry.event(key, tags)


def gauge(key, gauge=None, tags=None):
return _global_registry.gauge(key=key, gauge=gauge, tags=tags)

Expand Down
60 changes: 36 additions & 24 deletions pyformance/reporters/carbon_reporter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# -*- coding: utf-8 -*-
import contextlib
import pickle
import socket
import sys
import struct
import pickle
import contextlib
import sys

from six import iteritems

from .reporter import Reporter
Expand All @@ -13,21 +14,20 @@


class CarbonReporter(Reporter):

"""
Carbon is the network daemon to collect metrics for Graphite
"""

def __init__(
self,
registry=None,
reporting_interval=5,
prefix="",
server=DEFAULT_CARBON_SERVER,
port=DEFAULT_CARBON_PORT,
socket_factory=socket.socket,
clock=None,
pickle_protocol=False,
self,
registry=None,
reporting_interval=5,
prefix="",
server=DEFAULT_CARBON_SERVER,
port=DEFAULT_CARBON_PORT,
socket_factory=socket.socket,
clock=None,
pickle_protocol=False,
):
super(CarbonReporter, self).__init__(registry, reporting_interval, clock)
self.prefix = prefix
Expand Down Expand Up @@ -55,7 +55,7 @@ def _collect_metrics(self, registry, timestamp=None):
(timestamp, metric_value),
)
for metric_name, metric in iteritems(metrics)
for metric_key, metric_value in iteritems(metric)
for metric_key, metric_value in iteritems(metric) if metric_key != "events"
],
protocol=2,
)
Expand All @@ -65,22 +65,34 @@ def _collect_metrics(self, registry, timestamp=None):
metrics_data = []
for metric_name, metric in iteritems(metrics):
for metric_key, metric_value in iteritems(metric):
metric_line = "%s%s.%s %s %s\n" % (
self.prefix,
metric_name,
metric_key,
metric_value,
timestamp,
)
metrics_data.append(metric_line)
if metric_key != "events":
metric_line = "%s%s.%s %s %s\n" % (
self.prefix,
metric_name,
metric_key,
metric_value,
timestamp,
)
metrics_data.append(metric_line)
else:
for event in metric_value:
for field, value in event.values.items():
metric_line = "%s%s.%s %s %s\n" % (
self.prefix,
metric_name,
field,
value,
event.time,
)

metrics_data.append(metric_line)
result = "".join(metrics_data)
if sys.version_info[0] > 2:
return result.encode()
return result


class UdpCarbonReporter(CarbonReporter):

"""
The default CarbonReporter uses TCP.
This sub-class uses UDP instead which might be unreliable but it is faster
Expand All @@ -90,6 +102,6 @@ def report_now(self, registry=None, timestamp=None):
metrics = self._collect_metrics(registry or self.registry, timestamp)
if metrics:
with contextlib.closing(
self.socket_factory(socket.AF_INET, socket.SOCK_DGRAM)
self.socket_factory(socket.AF_INET, socket.SOCK_DGRAM)
) as sock:
sock.sendto(metrics, (self.server, self.port))
30 changes: 24 additions & 6 deletions pyformance/reporters/console_reporter.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys

import datetime
import sys

from .reporter import Reporter


class ConsoleReporter(Reporter):

"""
Show metrics in a human readable form.
This is useful for debugging if you want to read the current state on the console.
"""

def __init__(
self, registry=None, reporting_interval=30, stream=sys.stderr, clock=None
self, registry=None, reporting_interval=30, stream=sys.stderr, clock=None
):
super(ConsoleReporter, self).__init__(registry, reporting_interval, clock)
self.stream = stream
Expand All @@ -33,8 +34,25 @@ def _collect_metrics(self, registry, timestamp=None):
]
for key in metrics.keys():
values = metrics[key]
metrics_data.append("%s:" % key)
for value_key in values.keys():
metrics_data.append("%20s = %s" % (value_key, values[value_key]))
if values.keys() != {"events"}:
metrics_data.append("%s:" % key)
for value_key in values.keys():
if value_key != "events":
metrics_data.append("%20s = %s" % (value_key, values[value_key]))
metrics_data.append("")

# Add events
for key in metrics.keys():
for event in metrics[key].get("events", []):
dt = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=event.time)
metrics_data.append("== %s ===================================" %
dt.strftime("%Y-%m-%d %H:%M:%S"))

metrics_data.append("%s:" % key)

for field, value in event.values.items():
metrics_data.append("%20s = %s" % (field, value))

metrics_data.append("")

return metrics_data
Loading

0 comments on commit f14f3b8

Please sign in to comment.