-
Notifications
You must be signed in to change notification settings - Fork 16
/
collectd.py
238 lines (200 loc) · 7.13 KB
/
collectd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import re
import time
import socket
import struct
import logging
import traceback
from functools import wraps
from Queue import Queue, Empty
from collections import defaultdict
from threading import RLock, Thread, Semaphore
__all__ = ["Connection", "start_threads"]
__version_info__ = (1, 0, 2, "final", 0)
__version__ = "{0}.{1}.{2}".format(*__version_info__)
logger = logging.getLogger("collectd")
SEND_INTERVAL = 10 # seconds
MAX_PACKET_SIZE = 1024 # bytes
PLUGIN_TYPE = "gauge"
TYPE_HOST = 0x0000
TYPE_TIME = 0x0001
TYPE_PLUGIN = 0x0002
TYPE_PLUGIN_INSTANCE = 0x0003
TYPE_TYPE = 0x0004
TYPE_TYPE_INSTANCE = 0x0005
TYPE_VALUES = 0x0006
TYPE_INTERVAL = 0x0007
LONG_INT_CODES = [TYPE_TIME, TYPE_INTERVAL]
STRING_CODES = [TYPE_HOST, TYPE_PLUGIN, TYPE_PLUGIN_INSTANCE, TYPE_TYPE, TYPE_TYPE_INSTANCE]
VALUE_COUNTER = 0
VALUE_GAUGE = 1
VALUE_DERIVE = 2
VALUE_ABSOLUTE = 3
VALUE_CODES = {
VALUE_COUNTER: "!Q",
VALUE_GAUGE: "<d",
VALUE_DERIVE: "!q",
VALUE_ABSOLUTE: "!Q"
}
def pack_numeric(type_code, number):
return struct.pack("!HHq", type_code, 12, number)
def pack_string(type_code, string):
return struct.pack("!HH", type_code, 5 + len(string)) + string + "\0"
def pack_value(name, value):
return "".join([
pack(TYPE_TYPE_INSTANCE, name),
struct.pack("!HHH", TYPE_VALUES, 15, 1),
struct.pack("<Bd", VALUE_GAUGE, value)
])
def pack(id, value):
if isinstance(id, basestring):
return pack_value(id, value)
elif id in LONG_INT_CODES:
return pack_numeric(id, value)
elif id in STRING_CODES:
return pack_string(id, value)
else:
raise AssertionError("invalid type code " + str(id))
def message_start(when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any"):
return "".join([
pack(TYPE_HOST, host),
pack(TYPE_TIME, when or time.time()),
pack(TYPE_PLUGIN, plugin_name),
pack(TYPE_PLUGIN_INSTANCE, plugin_inst),
pack(TYPE_TYPE, PLUGIN_TYPE),
pack(TYPE_INTERVAL, SEND_INTERVAL)
])
def messages(counts, when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any"):
packets = []
start = message_start(when, host, plugin_inst, plugin_name)
parts = [pack(name, count) for name,count in counts.items()]
parts = [p for p in parts if len(start) + len(p) <= MAX_PACKET_SIZE]
if parts:
curr, curr_len = [start], len(start)
for part in parts:
if curr_len + len(part) > MAX_PACKET_SIZE:
packets.append("".join(curr))
curr, curr_len = [start], len(start)
curr.append(part)
curr_len += len(part)
packets.append("".join(curr))
return packets
def sanitize(s):
return re.sub(r"[^a-zA-Z0-9]+", "_", s).strip("_")
def swallow_errors(func):
@wraps(func)
def wrapped(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
try:
logger.error("unexpected error", exc_info = True)
except:
pass
return wrapped
def synchronized(method):
@wraps(method)
def wrapped(self, *args, **kwargs):
with self._lock:
return method(self, *args, **kwargs)
return wrapped
class Counter(object):
def __init__(self, category):
self.category = category
self._lock = RLock()
self.counts = defaultdict(lambda: defaultdict(float))
@swallow_errors
@synchronized
def record(self, *args, **kwargs):
for specific in list(args) + [""]:
assert isinstance(specific, basestring)
for stat, value in kwargs.items():
assert isinstance(value, (int, float))
self.counts[str(specific)][str(stat)] += value
@swallow_errors
@synchronized
def set_exact(self, **kwargs):
for stat, value in kwargs.items():
assert isinstance(value, (int, float))
self.counts[""][str(stat)] = value
@synchronized
def snapshot(self):
totals = {}
for specific,counts in self.counts.items():
for stat in counts:
name_parts = map(sanitize, [self.category, specific, stat])
name = "-".join(name_parts).replace("--", "-")
totals[name] = counts[stat]
counts[stat] = 0.0
return totals
class Connection(object):
_lock = RLock() # class-level lock, only used for __new__
instances = {}
@synchronized
def __new__(cls, hostname = socket.gethostname(),
collectd_host = "localhost", collectd_port = 25826,
plugin_inst = "", plugin_name = "any"):
id = (hostname, collectd_host, collectd_port, plugin_inst, plugin_name)
if id in cls.instances:
return cls.instances[id]
else:
inst = object.__new__(cls)
cls.instances[id] = inst
return inst
def __init__(self, hostname = socket.gethostname(),
collectd_host = "localhost", collectd_port = 25826,
plugin_inst = "", plugin_name = "any"):
if "_counters" not in self.__dict__:
self._lock = RLock()
self._counters = {}
self._plugin_inst = plugin_inst
self._plugin_name = plugin_name
self._hostname = hostname
self._collectd_addr = (collectd_host, collectd_port)
@synchronized
def __getattr__(self, name):
if name.startswith("_"):
raise AttributeError("{0} object has no attribute {1!r}".format(self.__class__.__name__, name))
if name not in self._counters:
self._counters[name] = Counter(name)
return self._counters[name]
@synchronized
def _snapshot(self):
return [c.snapshot() for c in self._counters.values() if c.counts]
snaps = Queue()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def take_snapshots():
for conn in Connection.instances.values():
snapshots = conn._snapshot()
if snapshots:
stats = {}
for snapshot in snapshots:
stats.update(snapshot)
snaps.put([int(time.time()), stats, conn])
def send_stats(raise_on_empty = False):
try:
when, stats, conn = snaps.get(timeout = 0.1)
for message in messages(stats, when, conn._hostname, conn._plugin_inst, conn._plugin_name):
sock.sendto(message, conn._collectd_addr)
except Empty:
if raise_on_empty:
raise
def daemonize(func, sleep_for = 0):
@wraps(func)
def wrapped():
while True:
try:
func()
except:
try:
logger.error("unexpected error", exc_info = True)
except:
traceback.print_exc()
time.sleep(sleep_for)
t = Thread(target = wrapped)
t.daemon = True
t.start()
single_start = Semaphore()
def start_threads():
assert single_start.acquire(blocking = False)
daemonize(take_snapshots, sleep_for = SEND_INTERVAL)
daemonize(send_stats)