diff --git a/ohmu_common_py/fileutil.py b/ohmu_common_py/fileutil.py new file mode 100644 index 0000000..b7f060d --- /dev/null +++ b/ohmu_common_py/fileutil.py @@ -0,0 +1,38 @@ +""" +ohmu_common_py - json file handling utility functions + +Copyright (c) 2015 Ohmu Ltd +See LICENSE for details +""" +import datetime +import json +import os +import tempfile + + +def default_json_serialization(obj): + if isinstance(obj, datetime.datetime): + if obj.tzinfo: + return obj.isoformat().replace("+00:00", "Z") + # assume UTC for datetime objects without a timezone + return obj.isoformat() + "Z" + + +def json_encode(obj, compact=True, binary=False): + res = json.dumps(obj, + sort_keys=not compact, + indent=None if compact else 4, + separators=(",", ":") if compact else None, + default=default_json_serialization) + return res.encode("utf-8") if binary else res + + +def write_json_file(filename, obj, *, compact=False): + json_data = json_encode(obj, compact=compact) + dirname, basename = os.path.dirname(filename), os.path.basename(filename) + fd, tempname = tempfile.mkstemp(dir=dirname or ".", prefix=basename, suffix=".tmp") + with os.fdopen(fd, "w") as fp: + fp.write(json_data) + if not compact: + fp.write("\n") + os.rename(tempname, filename) diff --git a/ohmu_common_py/logutil.py b/ohmu_common_py/logutil.py index 7defec1..50ba18c 100644 --- a/ohmu_common_py/logutil.py +++ b/ohmu_common_py/logutil.py @@ -15,30 +15,43 @@ daemon = None -LOG_FORMAT = "%(asctime)s\t%(name)s\t%(threadName)s\t%(levelname)s\t%(message)s" +LOG_FORMAT_BASIC = "%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s" +LOG_FORMAT_BASIC_MT = "%(asctime)s\t%(name)s\t%(threadName)s\t%(levelname)s\t%(message)s" LOG_FORMAT_SHORT = "%(levelname)s\t%(message)s" -LOG_FORMAT_SYSLOG = "%(name)s %(threadName)s %(levelname)s: %(message)s" +LOG_FORMAT_SYSLOG = "%(name)s %(levelname)s: %(message)s" +LOG_FORMAT_SYSLOG_MT = "%(name)s %(threadName)s %(levelname)s: %(message)s" +_multi_threaded = False -def set_syslog_handler(address, facility, logger): + +def set_syslog_handler(address, facility, logger, multi_threaded=None): + if multi_threaded is None: + multi_threaded = _multi_threaded syslog_handler = logging.handlers.SysLogHandler(address=address, facility=facility) logger.addHandler(syslog_handler) - formatter = logging.Formatter(LOG_FORMAT_SYSLOG) + formatter = logging.Formatter(LOG_FORMAT_SYSLOG_MT if multi_threaded else LOG_FORMAT_SYSLOG) syslog_handler.setFormatter(formatter) return syslog_handler -def configure_logging(level=logging.DEBUG, short_log=False): +def configure_logging(level=logging.DEBUG, short_log=False, multi_threaded=False): + global _multi_threaded # pylint: disable=global-statement + _multi_threaded = multi_threaded + # Are we running under systemd? if os.getenv("NOTIFY_SOCKET"): - logging.basicConfig(level=level, format=LOG_FORMAT_SYSLOG) + format_string = LOG_FORMAT_SYSLOG_MT if multi_threaded else LOG_FORMAT_SYSLOG if not daemon: print( "WARNING: Running under systemd but python-systemd not available, " "systemd won't see our notifications" ) + elif short_log: + format_string = LOG_FORMAT_SHORT else: - logging.basicConfig(level=level, format=LOG_FORMAT_SHORT if short_log else LOG_FORMAT) + format_string = LOG_FORMAT_BASIC_MT if multi_threaded else LOG_FORMAT_BASIC + + logging.basicConfig(level=level, format=format_string) def notify_systemd(status): diff --git a/ohmu_common_py/statsd.py b/ohmu_common_py/statsd.py new file mode 100644 index 0000000..2061840 --- /dev/null +++ b/ohmu_common_py/statsd.py @@ -0,0 +1,56 @@ +""" +ohmu_common_py - StatsD client + +Copyright (c) 2015 Ohmu Ltd +See LICENSE for details + +Supports telegraf's statsd protocol extension for 'key=value' tags: + + https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd +""" + +import logging +import socket + + +class StatsClient(object): + def __init__(self, host="127.0.0.1", port=8125, tags=None): + self.log = logging.getLogger("StatsClient") + self._dest_addr = (host, port) + self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._tags = tags or {} + + def gauge(self, metric, value, tags=None): + self._send(metric, b"g", value, tags) + + def increase(self, metric, inc_value=1, tags=None): + self._send(metric, b"c", inc_value, tags) + + def timing(self, metric, value, tags=None): + self._send(metric, b"ms", value, tags) + + def unexpected_exception(self, ex, where, tags=None): + all_tags = { + "exception": ex.__class__.__name__, + "where": where, + } + all_tags.update(tags or {}) + self.increase("exception", tags=all_tags) + + def _send(self, metric, metric_type, value, tags): + if None in self._dest_addr: + # stats sending is disabled + return + + try: + # format: "user.logins,service=payroll,region=us-west:1|c" + parts = [metric.encode("utf-8"), b":", str(value).encode("utf-8"), b"|", metric_type] + send_tags = self._tags.copy() + send_tags.update(tags or {}) + for tag, value in send_tags.items(): + parts.insert(1, ",{}={}".format(tag, value).encode("utf-8")) + + self._socket.sendto(b"".join(parts), self._dest_addr) + except Exception as ex: # pylint: disable=broad-except + self.log.error("Unexpected exception in statsd send: %s: %s", + ex.__class__.__name__, ex) diff --git a/sync.py b/sync.py index 2b8de38..255ae0b 100644 --- a/sync.py +++ b/sync.py @@ -5,30 +5,60 @@ See LICENSE for details """ +import logging import os import version import sys FILES = [ + "ohmu_common_py/fileutil.py", "ohmu_common_py/logutil.py", "ohmu_common_py/pgutil.py", + "ohmu_common_py/statsd.py", + "test/test_fileutil.py", + "test/test_logutil.py", "test/test_pgutil.py", "version.py", ] -def main(target): - ver = version.get_project_version("ohmu_common_py/version.py") - curdir = os.path.dirname(__file__) +def sync_files(source_dir, target_name, target_dir=None, common_dir=None, test_dir=None): + ver = version.get_project_version(os.path.join(os.path.dirname(__file__), "ohmu_common_py/version.py")) for src_f in FILES: - with open(os.path.join(curdir, src_f), "r") as fp: + with open(os.path.join(source_dir, src_f), "r") as fp: source = fp.read() - dst_f = src_f.replace("ohmu_common_py", target) - dst = os.path.join(curdir, "..", target, dst_f) + source = source.replace("ohmu_common_py", target_name) + if target_dir: + dst = os.path.join(target_dir, src_f.replace("ohmu_common_py", target_name)) + elif src_f.startswith("ohmu_common_py/") and common_dir: + dst = os.path.join(common_dir, src_f.replace("ohmu_common_py/", "")) + elif src_f.startswith("test/") and test_dir: + dst = os.path.join(test_dir, src_f.replace("test/", "")) + else: + logging.info("%r: skipping", src_f) + continue + + # check existing file for changes + if os.path.exists(dst): + with open(dst, "r") as fp: + existing_data = fp.read() + existing_data = "\n".join(existing_data.splitlines()[1:]) + "\n" + if existing_data == source: + logging.info("%r: no update required", dst) + continue with open(dst, "w") as fp: fp.write("# Copied from https://github.com/ohmu/ohmu_common_py {} version {}\n".format(src_f, ver)) - fp.write(source.replace("ohmu_common_py", target)) + fp.write(source) + logging.info("%r: UPDATED", dst) + + +def main(target): + from ohmu_common_py import logutil + logutil.configure_logging() + curdir = os.path.dirname(__file__) + target_dir = os.path.join(curdir, "..", target) + sync_files(source_dir=curdir, target_name=target, target_dir=target_dir) if __name__ == "__main__": diff --git a/test/conftest.py b/test/conftest.py index e7f98ff..fe18c72 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -5,7 +5,7 @@ See LICENSE for details """ -from ohmu_common_py import logutil +from ohmu_common_py import logutil, version # noqa pylint: disable=unused-import logutil.configure_logging() diff --git a/test/test_fileutil.py b/test/test_fileutil.py new file mode 100644 index 0000000..fe187c9 --- /dev/null +++ b/test/test_fileutil.py @@ -0,0 +1,43 @@ +""" +ohmu_common_py - fileutil tests + +Copyright (c) 2016 Ohmu Ltd +See LICENSE for details +""" +from ohmu_common_py import fileutil +import datetime +import json + + +def test_json_serialization(tmpdir): + ob = { + "foo": [ + "bar", + "baz", + 42, + ], + "t": datetime.datetime(2015, 9, 1, 4, 0, 0), + "f": 0.42, + } + res = json.dumps(ob, default=fileutil.default_json_serialization, separators=(",", ":"), sort_keys=True) + assert res == '{"f":0.42,"foo":["bar","baz",42],"t":"2015-09-01T04:00:00Z"}' + + assert isinstance(fileutil.json_encode(ob), str) + assert isinstance(fileutil.json_encode(ob, binary=True), bytes) + assert "\n" not in fileutil.json_encode(ob) + assert "\n" in fileutil.json_encode(ob, compact=False) + + output_file = tmpdir.join("test.json").strpath + fileutil.write_json_file(output_file, ob) + with open(output_file, "r") as fp: + ob2 = json.load(fp) + ob_ = dict(ob, t=ob["t"].isoformat() + "Z") + assert ob2 == ob_ + + fileutil.write_json_file(output_file, ob, compact=True) + with open(output_file, "r") as fp: + output_data = fp.read() + assert "\n" not in output_data + ob2_ = json.loads(output_data) + + assert ob2 == ob2_ diff --git a/test/test_logutil.py b/test/test_logutil.py new file mode 100644 index 0000000..708baec --- /dev/null +++ b/test/test_logutil.py @@ -0,0 +1,78 @@ +""" +ohmu_common_py - logutil tests + +Copyright (c) 2016 Ohmu Ltd +See LICENSE for details +""" +# pylint: disable=protected-access +from ohmu_common_py import logutil +import logging +import logging.handlers +import os +import pytest + + +@pytest.yield_fixture +def reset_logging(): + """reset root log handler to the default value after running the test""" + notify_socket = os.environ.pop("NOTIFY_SOCKET", None) + roothandlers = logging.getLogger().handlers + oldhandlers = roothandlers.copy() + roothandlers.clear() + try: + yield + finally: + roothandlers.clear() + roothandlers.extend(oldhandlers) + if notify_socket is not None: + os.environ["NOTIFY_SOCKET"] = notify_socket + + +def test_configure_logging(reset_logging): # pylint: disable=redefined-outer-name,unused-argument + roothandlers = logging.getLogger().handlers + roothandlers.clear() + logutil.configure_logging(short_log=True) + assert "(name)" not in roothandlers[0].formatter._fmt + + roothandlers.clear() + logutil.configure_logging(short_log=False) + assert "(name)" in roothandlers[0].formatter._fmt + assert "(threadName)" not in roothandlers[0].formatter._fmt + + roothandlers.clear() + logutil.configure_logging(multi_threaded=True) + assert "(threadName)" in roothandlers[0].formatter._fmt + assert "(asctime)" in roothandlers[0].formatter._fmt + + os.environ["NOTIFY_SOCKET"] = "/dev/null" + roothandlers.clear() + logutil.configure_logging(multi_threaded=True) + assert "(threadName)" in roothandlers[0].formatter._fmt + assert "(asctime)" not in roothandlers[0].formatter._fmt + + +def test_syslog_handler(reset_logging): # pylint: disable=redefined-outer-name,unused-argument + rootlogger = logging.getLogger() + roothandlers = rootlogger.handlers + roothandlers.clear() + logutil.configure_logging() + logutil.set_syslog_handler("/dev/log", "local2", rootlogger) + assert len(roothandlers) == 2 + + assert isinstance(roothandlers[0], logging.StreamHandler) + assert "(asctime)" in roothandlers[0].formatter._fmt + assert "(threadName)" not in roothandlers[0].formatter._fmt + + assert isinstance(roothandlers[1], logging.handlers.SysLogHandler) + assert "(asctime)" not in roothandlers[1].formatter._fmt + assert "(threadName)" not in roothandlers[1].formatter._fmt + + roothandlers.clear() + logutil.configure_logging(multi_threaded=True) + logutil.set_syslog_handler("/dev/log", "local2", rootlogger) + assert len(roothandlers) == 2 + + assert isinstance(roothandlers[0], logging.StreamHandler) + assert "(threadName)" in roothandlers[0].formatter._fmt + assert isinstance(roothandlers[1], logging.handlers.SysLogHandler) + assert "(threadName)" in roothandlers[1].formatter._fmt