From c3cc48f3d2cbd271e6b48d8840072c18ab56dc08 Mon Sep 17 00:00:00 2001 From: Oskari Saarenmaa Date: Fri, 27 May 2016 14:08:46 +0300 Subject: [PATCH 1/4] statsd: enhanced version from pglookout --- ohmu_common_py/statsd.py | 56 ++++++++++++++++++++++++++++++++++++++++ sync.py | 1 + 2 files changed, 57 insertions(+) create mode 100644 ohmu_common_py/statsd.py diff --git a/ohmu_common_py/statsd.py b/ohmu_common_py/statsd.py new file mode 100644 index 0000000..2061840 --- /dev/null +++ b/ohmu_common_py/statsd.py @@ -0,0 +1,56 @@ +""" +ohmu_common_py - StatsD client + +Copyright (c) 2015 Ohmu Ltd +See LICENSE for details + +Supports telegraf's statsd protocol extension for 'key=value' tags: + + https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd +""" + +import logging +import socket + + +class StatsClient(object): + def __init__(self, host="127.0.0.1", port=8125, tags=None): + self.log = logging.getLogger("StatsClient") + self._dest_addr = (host, port) + self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._tags = tags or {} + + def gauge(self, metric, value, tags=None): + self._send(metric, b"g", value, tags) + + def increase(self, metric, inc_value=1, tags=None): + self._send(metric, b"c", inc_value, tags) + + def timing(self, metric, value, tags=None): + self._send(metric, b"ms", value, tags) + + def unexpected_exception(self, ex, where, tags=None): + all_tags = { + "exception": ex.__class__.__name__, + "where": where, + } + all_tags.update(tags or {}) + self.increase("exception", tags=all_tags) + + def _send(self, metric, metric_type, value, tags): + if None in self._dest_addr: + # stats sending is disabled + return + + try: + # format: "user.logins,service=payroll,region=us-west:1|c" + parts = [metric.encode("utf-8"), b":", str(value).encode("utf-8"), b"|", metric_type] + send_tags = self._tags.copy() + send_tags.update(tags or {}) + for tag, value in send_tags.items(): + parts.insert(1, ",{}={}".format(tag, value).encode("utf-8")) + + self._socket.sendto(b"".join(parts), self._dest_addr) + except Exception as ex: # pylint: disable=broad-except + self.log.error("Unexpected exception in statsd send: %s: %s", + ex.__class__.__name__, ex) diff --git a/sync.py b/sync.py index 2b8de38..77d1971 100644 --- a/sync.py +++ b/sync.py @@ -13,6 +13,7 @@ FILES = [ "ohmu_common_py/logutil.py", "ohmu_common_py/pgutil.py", + "ohmu_common_py/statsd.py", "test/test_pgutil.py", "version.py", ] From 5e883efa81a38da1ec031a67870d1f3bdeb5e4b5 Mon Sep 17 00:00:00 2001 From: Oskari Saarenmaa Date: Fri, 27 May 2016 23:27:02 +0300 Subject: [PATCH 2/4] sync: make sync usable from outside ohmu_common_py directory And don't resync files with no changes plus log what we do. --- sync.py | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/sync.py b/sync.py index 77d1971..ffd39c5 100644 --- a/sync.py +++ b/sync.py @@ -5,6 +5,7 @@ See LICENSE for details """ +import logging import os import version import sys @@ -19,17 +20,42 @@ ] -def main(target): - ver = version.get_project_version("ohmu_common_py/version.py") - curdir = os.path.dirname(__file__) +def sync_files(source_dir, target_name, target_dir=None, common_dir=None, test_dir=None): + ver = version.get_project_version(os.path.join(os.path.dirname(__file__), "ohmu_common_py/version.py")) for src_f in FILES: - with open(os.path.join(curdir, src_f), "r") as fp: + with open(os.path.join(source_dir, src_f), "r") as fp: source = fp.read() - dst_f = src_f.replace("ohmu_common_py", target) - dst = os.path.join(curdir, "..", target, dst_f) + source = source.replace("ohmu_common_py", target_name) + if target_dir: + dst = os.path.join(target_dir, src_f.replace("ohmu_common_py", target_name)) + elif src_f.startswith("ohmu_common_py/") and common_dir: + dst = os.path.join(common_dir, src_f.replace("ohmu_common_py/", "")) + elif src_f.startswith("test/") and test_dir: + dst = os.path.join(test_dir, src_f.replace("test/", "")) + else: + logging.info("%r: skipping", src_f) + continue + + # check existing file for changes + if os.path.exists(dst): + with open(dst, "r") as fp: + existing_data = fp.read() + existing_data = "\n".join(existing_data.splitlines()[1:]) + "\n" + if existing_data == source: + logging.info("%r: no update required", dst) + continue with open(dst, "w") as fp: fp.write("# Copied from https://github.com/ohmu/ohmu_common_py {} version {}\n".format(src_f, ver)) - fp.write(source.replace("ohmu_common_py", target)) + fp.write(source) + logging.info("%r: UPDATED", dst) + + +def main(target): + from ohmu_common_py import logutil + logutil.configure_logging() + curdir = os.path.dirname(__file__) + target_dir = os.path.join(curdir, "..", target) + sync_files(source_dir=curdir, target_name=target, target_dir=target_dir) if __name__ == "__main__": From 138a7b7d48ac42b628e28429ec1e5f64db4929ce Mon Sep 17 00:00:00 2001 From: Oskari Saarenmaa Date: Sun, 29 May 2016 22:27:18 +0300 Subject: [PATCH 3/4] logutil: add multi / single threaded formats and test cases --- ohmu_common_py/logutil.py | 27 ++++++++++---- sync.py | 1 + test/conftest.py | 2 +- test/test_logutil.py | 78 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 8 deletions(-) create mode 100644 test/test_logutil.py diff --git a/ohmu_common_py/logutil.py b/ohmu_common_py/logutil.py index 7defec1..50ba18c 100644 --- a/ohmu_common_py/logutil.py +++ b/ohmu_common_py/logutil.py @@ -15,30 +15,43 @@ daemon = None -LOG_FORMAT = "%(asctime)s\t%(name)s\t%(threadName)s\t%(levelname)s\t%(message)s" +LOG_FORMAT_BASIC = "%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s" +LOG_FORMAT_BASIC_MT = "%(asctime)s\t%(name)s\t%(threadName)s\t%(levelname)s\t%(message)s" LOG_FORMAT_SHORT = "%(levelname)s\t%(message)s" -LOG_FORMAT_SYSLOG = "%(name)s %(threadName)s %(levelname)s: %(message)s" +LOG_FORMAT_SYSLOG = "%(name)s %(levelname)s: %(message)s" +LOG_FORMAT_SYSLOG_MT = "%(name)s %(threadName)s %(levelname)s: %(message)s" +_multi_threaded = False -def set_syslog_handler(address, facility, logger): + +def set_syslog_handler(address, facility, logger, multi_threaded=None): + if multi_threaded is None: + multi_threaded = _multi_threaded syslog_handler = logging.handlers.SysLogHandler(address=address, facility=facility) logger.addHandler(syslog_handler) - formatter = logging.Formatter(LOG_FORMAT_SYSLOG) + formatter = logging.Formatter(LOG_FORMAT_SYSLOG_MT if multi_threaded else LOG_FORMAT_SYSLOG) syslog_handler.setFormatter(formatter) return syslog_handler -def configure_logging(level=logging.DEBUG, short_log=False): +def configure_logging(level=logging.DEBUG, short_log=False, multi_threaded=False): + global _multi_threaded # pylint: disable=global-statement + _multi_threaded = multi_threaded + # Are we running under systemd? if os.getenv("NOTIFY_SOCKET"): - logging.basicConfig(level=level, format=LOG_FORMAT_SYSLOG) + format_string = LOG_FORMAT_SYSLOG_MT if multi_threaded else LOG_FORMAT_SYSLOG if not daemon: print( "WARNING: Running under systemd but python-systemd not available, " "systemd won't see our notifications" ) + elif short_log: + format_string = LOG_FORMAT_SHORT else: - logging.basicConfig(level=level, format=LOG_FORMAT_SHORT if short_log else LOG_FORMAT) + format_string = LOG_FORMAT_BASIC_MT if multi_threaded else LOG_FORMAT_BASIC + + logging.basicConfig(level=level, format=format_string) def notify_systemd(status): diff --git a/sync.py b/sync.py index ffd39c5..dc568f2 100644 --- a/sync.py +++ b/sync.py @@ -15,6 +15,7 @@ "ohmu_common_py/logutil.py", "ohmu_common_py/pgutil.py", "ohmu_common_py/statsd.py", + "test/test_logutil.py", "test/test_pgutil.py", "version.py", ] diff --git a/test/conftest.py b/test/conftest.py index e7f98ff..fe18c72 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -5,7 +5,7 @@ See LICENSE for details """ -from ohmu_common_py import logutil +from ohmu_common_py import logutil, version # noqa pylint: disable=unused-import logutil.configure_logging() diff --git a/test/test_logutil.py b/test/test_logutil.py new file mode 100644 index 0000000..708baec --- /dev/null +++ b/test/test_logutil.py @@ -0,0 +1,78 @@ +""" +ohmu_common_py - logutil tests + +Copyright (c) 2016 Ohmu Ltd +See LICENSE for details +""" +# pylint: disable=protected-access +from ohmu_common_py import logutil +import logging +import logging.handlers +import os +import pytest + + +@pytest.yield_fixture +def reset_logging(): + """reset root log handler to the default value after running the test""" + notify_socket = os.environ.pop("NOTIFY_SOCKET", None) + roothandlers = logging.getLogger().handlers + oldhandlers = roothandlers.copy() + roothandlers.clear() + try: + yield + finally: + roothandlers.clear() + roothandlers.extend(oldhandlers) + if notify_socket is not None: + os.environ["NOTIFY_SOCKET"] = notify_socket + + +def test_configure_logging(reset_logging): # pylint: disable=redefined-outer-name,unused-argument + roothandlers = logging.getLogger().handlers + roothandlers.clear() + logutil.configure_logging(short_log=True) + assert "(name)" not in roothandlers[0].formatter._fmt + + roothandlers.clear() + logutil.configure_logging(short_log=False) + assert "(name)" in roothandlers[0].formatter._fmt + assert "(threadName)" not in roothandlers[0].formatter._fmt + + roothandlers.clear() + logutil.configure_logging(multi_threaded=True) + assert "(threadName)" in roothandlers[0].formatter._fmt + assert "(asctime)" in roothandlers[0].formatter._fmt + + os.environ["NOTIFY_SOCKET"] = "/dev/null" + roothandlers.clear() + logutil.configure_logging(multi_threaded=True) + assert "(threadName)" in roothandlers[0].formatter._fmt + assert "(asctime)" not in roothandlers[0].formatter._fmt + + +def test_syslog_handler(reset_logging): # pylint: disable=redefined-outer-name,unused-argument + rootlogger = logging.getLogger() + roothandlers = rootlogger.handlers + roothandlers.clear() + logutil.configure_logging() + logutil.set_syslog_handler("/dev/log", "local2", rootlogger) + assert len(roothandlers) == 2 + + assert isinstance(roothandlers[0], logging.StreamHandler) + assert "(asctime)" in roothandlers[0].formatter._fmt + assert "(threadName)" not in roothandlers[0].formatter._fmt + + assert isinstance(roothandlers[1], logging.handlers.SysLogHandler) + assert "(asctime)" not in roothandlers[1].formatter._fmt + assert "(threadName)" not in roothandlers[1].formatter._fmt + + roothandlers.clear() + logutil.configure_logging(multi_threaded=True) + logutil.set_syslog_handler("/dev/log", "local2", rootlogger) + assert len(roothandlers) == 2 + + assert isinstance(roothandlers[0], logging.StreamHandler) + assert "(threadName)" in roothandlers[0].formatter._fmt + assert isinstance(roothandlers[1], logging.handlers.SysLogHandler) + assert "(threadName)" in roothandlers[1].formatter._fmt From ed788ed2a104d7789b2bfc06fb35078574aafa4e Mon Sep 17 00:00:00 2001 From: Oskari Saarenmaa Date: Tue, 31 May 2016 21:33:39 +0300 Subject: [PATCH 4/4] fileutil: add json writing utility functions --- ohmu_common_py/fileutil.py | 38 +++++++++++++++++++++++++++++++++ sync.py | 2 ++ test/test_fileutil.py | 43 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+) create mode 100644 ohmu_common_py/fileutil.py create mode 100644 test/test_fileutil.py diff --git a/ohmu_common_py/fileutil.py b/ohmu_common_py/fileutil.py new file mode 100644 index 0000000..b7f060d --- /dev/null +++ b/ohmu_common_py/fileutil.py @@ -0,0 +1,38 @@ +""" +ohmu_common_py - json file handling utility functions + +Copyright (c) 2015 Ohmu Ltd +See LICENSE for details +""" +import datetime +import json +import os +import tempfile + + +def default_json_serialization(obj): + if isinstance(obj, datetime.datetime): + if obj.tzinfo: + return obj.isoformat().replace("+00:00", "Z") + # assume UTC for datetime objects without a timezone + return obj.isoformat() + "Z" + + +def json_encode(obj, compact=True, binary=False): + res = json.dumps(obj, + sort_keys=not compact, + indent=None if compact else 4, + separators=(",", ":") if compact else None, + default=default_json_serialization) + return res.encode("utf-8") if binary else res + + +def write_json_file(filename, obj, *, compact=False): + json_data = json_encode(obj, compact=compact) + dirname, basename = os.path.dirname(filename), os.path.basename(filename) + fd, tempname = tempfile.mkstemp(dir=dirname or ".", prefix=basename, suffix=".tmp") + with os.fdopen(fd, "w") as fp: + fp.write(json_data) + if not compact: + fp.write("\n") + os.rename(tempname, filename) diff --git a/sync.py b/sync.py index dc568f2..255ae0b 100644 --- a/sync.py +++ b/sync.py @@ -12,9 +12,11 @@ FILES = [ + "ohmu_common_py/fileutil.py", "ohmu_common_py/logutil.py", "ohmu_common_py/pgutil.py", "ohmu_common_py/statsd.py", + "test/test_fileutil.py", "test/test_logutil.py", "test/test_pgutil.py", "version.py", diff --git a/test/test_fileutil.py b/test/test_fileutil.py new file mode 100644 index 0000000..fe187c9 --- /dev/null +++ b/test/test_fileutil.py @@ -0,0 +1,43 @@ +""" +ohmu_common_py - fileutil tests + +Copyright (c) 2016 Ohmu Ltd +See LICENSE for details +""" +from ohmu_common_py import fileutil +import datetime +import json + + +def test_json_serialization(tmpdir): + ob = { + "foo": [ + "bar", + "baz", + 42, + ], + "t": datetime.datetime(2015, 9, 1, 4, 0, 0), + "f": 0.42, + } + res = json.dumps(ob, default=fileutil.default_json_serialization, separators=(",", ":"), sort_keys=True) + assert res == '{"f":0.42,"foo":["bar","baz",42],"t":"2015-09-01T04:00:00Z"}' + + assert isinstance(fileutil.json_encode(ob), str) + assert isinstance(fileutil.json_encode(ob, binary=True), bytes) + assert "\n" not in fileutil.json_encode(ob) + assert "\n" in fileutil.json_encode(ob, compact=False) + + output_file = tmpdir.join("test.json").strpath + fileutil.write_json_file(output_file, ob) + with open(output_file, "r") as fp: + ob2 = json.load(fp) + ob_ = dict(ob, t=ob["t"].isoformat() + "Z") + assert ob2 == ob_ + + fileutil.write_json_file(output_file, ob, compact=True) + with open(output_file, "r") as fp: + output_data = fp.read() + assert "\n" not in output_data + ob2_ = json.loads(output_data) + + assert ob2 == ob2_