diff --git a/CHAP/TaskManager.py b/CHAP/TaskManager.py new file mode 100644 index 0000000..550e530 --- /dev/null +++ b/CHAP/TaskManager.py @@ -0,0 +1,214 @@ +""" +Python thread pool, see +http://code.activestate.com/recipes/577187-python-thread-pool/ +Author: Valentin Kuznetsov +""" +from builtins import range, object + + +# system modules +import time +import json +import hashlib +import logging +import threading +from queue import Queue + + +def genkey(query): + """ + Generate a new key-hash for a given query. We use md5 hash for the + query and key is just hex representation of this hash. + """ + if isinstance(query, dict): + record = dict(query) + query = json.JSONEncoder(sort_keys=True).encode(record) + keyhash = hashlib.md5() + keyhash.update(query.encode("utf-8", "strict")) + return keyhash.hexdigest() + +def set_thread_name(ident, name): + "Set thread name for given identified" + for thr in threading.enumerate(): + if thr.ident == ident: + thr.name = name + break + +class StoppableThread(threading.Thread): + """Thread class with a stop() method. The thread itself has to check + regularly for the stopped() condition.""" + + def __init__(self, target, name, args): + super(StoppableThread, self).__init__(target=target, name=name, args=args) + self._stop_event = threading.Event() + + def stop(self): + "Set event to stop the thread" + self._stop_event.set() + + def stopped(self): + "Return stopped status of the thread" + return self._stop_event.is_set() + + def running(self): + "Return running status of the thread" + return not self._stop_event.is_set() + +def start_new_thread(name, func, args, unique=False): + "Wrapper wroung standard thread.strart_new_thread call" + if unique: + threads = sorted(threading.enumerate()) + for thr in threads: + if name == thr.name: + return thr +# thr = threading.Thread(target=func, name=name, args=args) + thr = StoppableThread(target=func, name=name, args=args) + thr.daemon = True + thr.start() + return thr + +class UidSet(object): + "UID holder keeps track of uid frequency" + def __init__(self): + self.set = {} + + def add(self, uid): + "Add given uid or increment uid occurence in a set" + if not uid: + return + if uid in self.set: + self.set[uid] += 1 + else: + self.set[uid] = 1 + + def discard(self, uid): + "Either discard or downgrade uid occurence in a set" + if uid in self.set: + self.set[uid] -= 1 + if uid in self.set and not self.set[uid]: + del self.set[uid] + + def __contains__(self, uid): + "Check if uid present in a set" + if uid in self.set: + return True + return False + + def get(self, uid): + "Get value for given uid" + return self.set.get(uid, 0) + +class Worker(threading.Thread): + """Thread executing worker from a given tasks queue""" + def __init__(self, name, taskq, pidq, uidq, logger=None): + self.logger = logging.getLogger() + threading.Thread.__init__(self, name=name) + self.exit = 0 + self.tasks = taskq + self.pids = pidq + self.uids = uidq + self.daemon = True + self.start() + + def force_exit(self): + """Force run loop to exit in a hard way""" + self.exit = 1 + + def run(self): + """Run thread loop""" + while True: + if self.exit: + return + task = self.tasks.get() + if task is None: + return + if self.exit: + return + if isinstance(task, str): + print(f"Worker daemon run {task}") + elif isinstance(task, tuple) and len(task) == 5: + evt, pid, func, args, kwargs = task + try: + func(*args, **kwargs) + self.pids.discard(pid) + except Exception as exc: + self.pids.discard(pid) + msg = "func=%s args=%s kwargs=%s" % (func, args, kwargs) + self.logger.error('error %s, call %s', str(exc), msg) + evt.set() + else: + print(f"Unsupported task {task}") + +class TaskManager(object): + """ + Task manager class based on thread module which + executes assigned tasks concurently. It uses a + pool of thread workers, queue of tasks and pid + set to monitor jobs execution. + + .. doctest:: + + Use case: + mgr = TaskManager() + jobs = [] + jobs.append(mgr.spawn(func, args)) + mgr.joinall(jobs) + + """ + def __init__(self, nworkers=10, name='TaskManager', logger=None): + self.logger = logging.getLogger() + self.name = name + self.pids = set() + self.uids = UidSet() + self.tasks = Queue() + self.workers = [Worker(name, self.tasks, self.pids, self.uids, logger) \ + for _ in range(0, nworkers)] + + def status(self): + "Return status of task manager queue" + info = {'qsize':self.tasks.qsize(), 'full':self.tasks.full(), + 'unfinished':self.tasks.unfinished_tasks, + 'nworkers':len(self.workers)} + return {self.name: info} + + def nworkers(self): + """Return number of workers associated with this manager""" + return len(self.workers) + + def spawn(self, func, *args, **kwargs): + """Spawn new process for given function""" + pid = kwargs.get('pid', genkey(str(args) + str(kwargs))) + evt = threading.Event() + if not pid in self.pids: + self.pids.add(pid) + task = (evt, pid, func, args, kwargs) + self.tasks.put(task) + else: + # the event was not added to task list, invoke set() + # to pass it in wait() call, see joinall + evt.set() + return evt, pid + + def remove(self, pid): + """Remove pid and associative process from the queue""" + self.pids.discard(pid) + + def is_alive(self, pid): + """Check worker queue if given pid of the process is still running""" + return pid in self.pids + + def clear(self, tasks): + """ + Clear all tasks in a queue. It allows current jobs to run, but will + block all new requests till workers event flag is set again + """ + _ = [t[0].clear() for t in tasks] # each task is return from spawn, i.e. a pair (evt, pid) + + def joinall(self, tasks): + """Join all tasks in a queue and quit""" + _ = [t[0].wait() for t in tasks] # each task is return from spawn, i.e. a pair (evt, pid) + + def quit(self): + """Put None task to all workers and let them quit""" + _ = [self.tasks.put(None) for _ in self.workers] + time.sleep(1) # let workers threads cool-off and quit diff --git a/CHAP/runner.py b/CHAP/runner.py index ff0dacd..fe70c92 100755 --- a/CHAP/runner.py +++ b/CHAP/runner.py @@ -52,18 +52,35 @@ def runner(opts): :param opts: opts is an instance of argparse.Namespace which contains all input parameters """ + log_level = opts.log_level.upper() + logger, log_handler = setLogger(log_level) + config = {} + with open(opts.config) as file: + config = yaml.safe_load(file) + logger.info(f'Input configuration: {config}\n') + pipeline_config = config.get('pipeline', []) + run(pipeline_config, logger, log_level, log_handler) + +def setLogger(log_level="INFO"): + """ + Helper function to set CHAP logger + + :param log_level: logger level, default INFO + """ logger = logging.getLogger(__name__) - log_level = getattr(logging, opts.log_level.upper()) + log_level = getattr(logging, log_level.upper()) logger.setLevel(log_level) log_handler = logging.StreamHandler() log_handler.setFormatter(logging.Formatter('{name:20}: {message}', style='{')) logger.addHandler(log_handler) + return logger, log_handler - config = {} - with open(opts.config) as file: - config = yaml.safe_load(file) - logger.info(f'Input configuration: {config}\n') - pipeline_config = config.get('pipeline', []) +def run(pipeline_config, logger=None, log_level=None, log_handler=None): + """ + Run given pipeline_config + + :param pipeline_config: CHAP pipeline config + """ objects = [] kwds = [] for item in pipeline_config: @@ -78,16 +95,22 @@ def runner(opts): modName, clsName = name.split('.') module = __import__(f'CHAP.{modName}', fromlist=[clsName]) obj = getattr(module, clsName)() - obj.logger.setLevel(log_level) - obj.logger.addHandler(log_handler) - logger.info(f'Loaded {obj}') + if log_level: + obj.logger.setLevel(log_level) + if log_handler: + obj.logger.addHandler(log_handler) + if logger: + logger.info(f'Loaded {obj}') objects.append(obj) kwds.append(kwargs) pipeline = Pipeline(objects, kwds) - pipeline.logger.setLevel(log_level) - pipeline.logger.addHandler(log_handler) - logger.info(f'Loaded {pipeline} with {len(objects)} items\n') - logger.info(f'Calling "execute" on {pipeline}') + if log_level: + pipeline.logger.setLevel(log_level) + if log_handler: + pipeline.logger.addHandler(log_handler) + if logger: + logger.info(f'Loaded {pipeline} with {len(objects)} items\n') + logger.info(f'Calling "execute" on {pipeline}') pipeline.execute() diff --git a/CHAP/server.py b/CHAP/server.py new file mode 100644 index 0000000..df71411 --- /dev/null +++ b/CHAP/server.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +#-*- coding: utf-8 -*- +#pylint: disable= +""" +File : server.py +Author : Valentin Kuznetsov +Description: Python server with thread pool and CHAP pipeline + +### Client side: +cat /tmp/chap.json +{ + "pipeline": [{"common.PrintProcessor": {}}], + "input": 1 +} + +### curl call to the server with our CHAP pipeline +curl -X POST -H "Content-type: application/json" -d@/tmp/chap.json http://localhost:5000/pipeline +{"pipeline":[{"common.PrintProcessor":{}}],"status":"ok"} + +### Server side: +flask --app server run + * Serving Flask app 'server' + * Debug mode: off +WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. + * Running on http://127.0.0.1:5000 +Press CTRL+C to quit +... + +CHAP.server : call pipeline args=() kwds={'pipeline': [{'common.PrintProcessor': {}}]} +CHAP.server : pipeline +[{'common.PrintProcessor': {}}] +CHAP.server : Loaded +CHAP.server : Loaded with 1 items + +CHAP.server : Calling "execute" on +Pipeline : Executing "execute" + +Pipeline : Calling "process" on +PrintProcessor : Executing "process" with type(data)= +PrintProcessor data : +None +PrintProcessor : Finished "process" in 0.000 seconds + +Pipeline : Executed "execute" in 0.000 seconds +127.0.0.1 - - [07/Apr/2023 09:11:22] "POST /pipeline HTTP/1.1" 200 - +""" + +# system modules +import time +import logging +from queue import Queue + +# thrid-party modules + +# Flask modules +from flask import Flask, request, jsonify + +# CHAP modules +from CHAP.TaskManager import TaskManager, start_new_thread +from CHAP.runner import run, setLogger + + +# Task manager to execute our tasks +taskManager = TaskManager() + +# Flask Server +app = Flask(__name__) + +# daemon task queue +task_queue = Queue() + +@app.route("/") +def index_route(): + """ + Server main end-point + """ + return "CHAP daemon" + +@app.route("/run") +def run_route(): + """ + Server main end-point + """ + task = request.args.get('task') + task_queue.put(task) + return f"Execute {task}" + +@app.route("/pipeline", methods=["POST"]) +def pipeline_route(): + """ + Server /pipeline end-point + """ + content = request.json + if 'pipeline' in content: + # spawn new pipeline task + jobs = [] + jobs.append(taskManager.spawn(task, pipeline=content['pipeline'])) + taskManager.joinall(jobs) + return {"status": "ok", "pipeline": content['pipeline']} + else: + return {"status": "fail", "reason": "no pipeline in incoming request"} + +def task(*args, **kwds): + """ + Helper function to execute CHAP pipeline + """ + log_level = "INFO" + logger, log_handler = setLogger(log_level) + logger.info(f"call pipeline args={args} kwds={kwds}") + pipeline = kwds['pipeline'] + logger.info(f"pipeline\n{pipeline}") + run(pipeline, logger, log_level, log_handler) + +def daemon(name, queue, interval): + """ + Daemon example based on Queue + """ + print(f"Daemon {name}") + while True: + if queue.qsize() == 0: + print("Default action") + time.sleep(interval) + else: + task = queue.get() + if task == "exit": + return + print(f"daemon run {task}") + +# start daemon thread in addition to Flask server +start_new_thread("daemon", daemon, ("daemon", task_queue, 3))