diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 724ae6e..a7dc495 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,7 +14,22 @@ repos: - id: check-symlinks - id: end-of-file-fixer - id: trailing-whitespace -- repo: https://gitlab.com/pycqa/flake8 +- repo: https://github.com/pycqa/flake8 rev: 3.8.4 hooks: - id: flake8 +- repo: https://github.com/sirosen/check-jsonschema + rev: 8a14ffa1d4c81a56057f55e1da308daeccc3bcd6 + hooks: + - id: check-jsonschema + name: "Check schemas" + language: python + files: ^villas/controller/schemas/.*\.yaml$ + types: [yaml] + args: ["--schemafile", "https://json-schema.org/draft/2020-12/schema"] + - id: check-jsonschema + name: "Check OpenAPI doc" + language: python + files: ^doc/openapi.yaml$ + types: [yaml] + args: ["--schemafile", "https://raw.githubusercontent.com/OAI/OpenAPI-Specification/main/schemas/v3.1/schema.json"] diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..fbc8a14 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +recursive-include villas/controller * diff --git a/doc/openapi.yaml b/doc/openapi.yaml new file mode 100644 index 0000000..cbddd06 --- /dev/null +++ b/doc/openapi.yaml @@ -0,0 +1,327 @@ +--- +openapi: 3.0.0 +info: + title: VILLAScontroller API + description: 'A HTTP/REST API for controlling VILLAScontroller remotely for querying component status as well as issuing control actions.' + version: 0.0.1 + contact: + name: "Steffen Vogel" + email: "svogel2@eonerc.rwth-aachen.de" + license: + name: Apache-2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 + +servers: +- url: https://villas.k8s.eonerc.rwth-aachen.de/controller/api/v1 + description: Demo instance at RWTH Aachen + +paths: + + /: + get: + summary: 'Get status of VILLAScontroller daemon' + operationId: getStatus + tags: + - status + responses: + '200': + description: '' + content: + application/json: + schema: + type: object + properties: + components: + type: array + items: + type: string + format: uuid + status: + type: object + properties: + version: + type: string + uptime: + type: number + host: + type: string + kernel: + type: object + properties: + sysname: + type: string + nodename: + type: string + release: + type: string + version: + type: string + machine: + type: string + + example: + components: + - f4751894-205e-11eb-aefb-0741ff98abca + - 3ddd318e-fee1-46d7-bff4-7c064d640d4e + status: + version: 0.3.2 + uptime: 15.38102650642395 + host: lat.0l.de + kernel: + sysname: Linux + nodename: lat.0l.de + release: 5.13.14-200.fc34.x86_64 + version: '#1 SMP Fri Sep 3 15:33:01 UTC 2021' + machine: x86_64 + + /health: + get: + operationId: getHealth + summary: Query health of daemon. + tags: + - status + responses: + '200': + description: The daemon is healthy + content: + application/json: + schema: + type: object + properties: + status: + type: string + example: + status: ok + + + /component/{uuid}: + get: + summary: 'Get the current status of a component' + operationId: getComponentStatus + + parameters: + - name: uuid + in: path + required: true + schema: + type: string + format: uuid + + responses: + '200': + description: '' + content: + application/json: + schema: + type: object + properties: + components: + type: array + items: + type: string + format: uuid + + status: + type: object + properties: + managed_by: + type: string + format: uuid + + state: + type: string + enum: + - idle + - starting + - running + - stopping + - pausing + - paused + - resuming + - error + - resetting + - shuttingdown + - shutdown + - gone + + version: + type: string + uptime: + type: number + host: + type: string + kernel: + type: object + properties: + sysname: + type: string + nodename: + type: string + release: + type: string + version: + type: string + machine: + type: string + + properties: + type: object + properties: + category: + type: string + enum: + - manager + - simulator + - gateway + type: + type: string + pattern: '[a-z-]+' + name: + type: string + realm: + type: string + pattern: '[a-z0-9-.]+' + uuid: + type: string + format: uuid + + additionalProperties: true + + schema: + type: object + additionalProperties: + $ref: 'https://json-schema.org/draft/2020-12/schema' + + example: + components: [] + status: + state: idle + version: 0.3.2 + uptime: 480.25064611434937 + host: lat.0l.de + kernel: + sysname: Linux + nodename: lat.0l.de + release: 5.13.14-200.fc34.x86_64 + version: '#1 SMP Fri Sep 3 15:33:01 UTC 2021' + machine: x86_64 + managed_by: f4751894-205e-11eb-aefb-0741ff98abca + properties: + category: manager + type: generic + name: Standard Controller + realm: de.rwth-aachen.eonerc.acs + uuid: f4751894-205e-11eb-aefb-0741ff98abca + schema: + create: + # $schema: 'http://json-schema.org/draft-07/schema' + type: object + default: {} + required: + - name + - category + - location + - owner + - realm + - type + - api_url + - ws_url + properties: + name: + type: string + title: Component name + default: New Component + examples: 'Generic Simulator #1' + owner: + type: string + title: Component owner + examples: + - rmr + - svg + realm: + type: string + title: Component realm + default: '' + examples: + - de.rwth-aachen.eonerc.acs + category: + type: string + title: Component category + examples: + - simulator + location: + type: string + title: Component location + examples: + - Richard's PC + type: + type: string + default: generic + uuid: + type: 'string' + format: uuid + ws_url: + type: string + examples: + - 'https://villas.k8s.eonerc.rwth-aachen.de/ws/relay/generic_1' + api_url: + type: string + examples: + - 'https://villas.k8s.eonerc.rwth-aachen.de/api/ic/generic_1' + shell: + type: boolean + default: false + examples: + - true + whitelist: + type: array + title: The whitelist schema + default: [] + examples: + - - /sbin/ping + - ^echo + additionalItems: true + items: + anyOf: + - type: string + examples: + - /sbin/ping + - ^echo + + post: + operationId: executeComponentAction + summary: 'Send a control action to the component' + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + action: + type: string + enum: + - start + - stop + - pause + - resume + - create + - delete + - shutdown + - reset + parameters: + oneOf: + - $ref: ../villas/controller/schemas/manager/generic/create.yaml + - $ref: ../villas/controller/schemas/manager/kubernetes/create.yaml + - $ref: ../villas/controller/schemas/simulator/dpsim/start.yaml + - $ref: ../villas/controller/schemas/simulator/dummy/start.yaml + + responses: + '200': + description: '' + content: + application/json: {} + + + # example: + # runtime: 10.2 diff --git a/etc/config_simplekub.yaml b/etc/config_simplekub.yaml new file mode 100644 index 0000000..9653b03 --- /dev/null +++ b/etc/config_simplekub.yaml @@ -0,0 +1,14 @@ +--- +broker: + url: amqp://villas:Haegiethu0rohtee@kubernetes-master-1.os-cloud.eonerc.rwth-aachen.de:30809/%2F + +components: +- type: generic + category: manager + uuid: ebbbbba0-557b-4848-ac7a-faa3e7c51fa3 + +- category: manager + type: kubernetes-simple + uuid: 4bbbb73e-7e74-11eb-8f63-f3a5b3ab82f6 + + namespace: villas-controller diff --git a/etc/params_k8s_dpsim.yaml b/etc/params_k8s_dpsim.yaml index 5ce3b84..fa0ed9e 100644 --- a/etc/params_k8s_dpsim.yaml +++ b/etc/params_k8s_dpsim.yaml @@ -13,9 +13,9 @@ properties: name: dpsim spec: suspend: true - activeDeadlineSeconds: 120 # kill the Job after 1h + activeDeadlineSeconds: 3600 # kill the Job after 1h backoffLimit: 0 # only try to run pod once, no retries - ttlSecondsAfterFinished: 120 # delete the Job resources 1h after completion + ttlSecondsAfterFinished: 3600 # delete the Job resources 1h after completion template: spec: restartPolicy: Never diff --git a/requirements.txt b/requirements.txt index 7f3a823..3d3ebfe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,7 @@ requests villas-node>=0.10.2 kubernetes xdg +dotmap PyYAML +tornado +jsonschema>=4.1.0 diff --git a/setup.py b/setup.py index 76c310a..6519bbe 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ ], packages=find_namespace_packages(include=['villas.*']), install_requires=[ + 'dotmap', 'kombu', 'termcolor', 'psutil', @@ -30,7 +31,9 @@ 'villas-node>=0.10.2', 'kubernetes', 'xdg', - 'PyYAML' + 'PyYAML', + 'tornado', + 'jsonschema>=4.1.0' ], data_files=[ ('/etc/villas/controller', glob('etc/*.{json,yaml}')), @@ -41,5 +44,6 @@ 'villas-ctl=villas.controller.main:main', 'villas-controller=villas.controller.main:main' ], - } + }, + include_package_data=True ) diff --git a/villas/controller/api.py b/villas/controller/api.py new file mode 100644 index 0000000..e6a684e --- /dev/null +++ b/villas/controller/api.py @@ -0,0 +1,56 @@ +import threading +import logging +import asyncio +from tornado.ioloop import IOLoop +import tornado.web + +LOGGER = logging.getLogger(__name__) +REGEX_UUID = r'\b[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}-[0-9a-fA-F]' \ + r'{4}-[0-9a-fA-F]{4}-\b[0-9a-fA-F]{12}\b' + +BASE = '/api/v1' + + +class RequestHandler(tornado.web.RequestHandler): + + def initialize(self, controller): + self.controller = controller + + +class Api(threading.Thread): + + def __init__(self, controller): + super().__init__() + + self.controller = controller + + def run(self): + self.app = tornado.web.Application(self.handlers) + + # Create new event loop for this thread + aio_loop = asyncio.new_event_loop() + asyncio.set_event_loop(aio_loop) + + port = self.controller.config.api.port + self.app.listen(port) + + LOGGER.info('Starting API at http://localhost:%d%s', port, BASE) + + self.loop = IOLoop.current(instance=True) + self.loop.start() + + @property + def handlers(self): + from villas.controller.handlers.component import ComponentRequestHandler # noqa E501 + from villas.controller.handlers.main import MainRequestHandler + from villas.controller.handlers.health import HealthRequestHandler + + args = { + 'controller': self.controller + } + + return [ + (BASE + r'/?', MainRequestHandler, args), + (BASE + r'/health', HealthRequestHandler, args), + (BASE + r'/component/('+REGEX_UUID+r')', ComponentRequestHandler, args) # noqa E501 + ] diff --git a/villas/controller/commands/daemon.py b/villas/controller/commands/daemon.py index 731a00d..82f18db 100644 --- a/villas/controller/commands/daemon.py +++ b/villas/controller/commands/daemon.py @@ -13,15 +13,14 @@ class DaemonCommand(Command): def add_parser(subparsers): parser = subparsers.add_parser('daemon', help='Run VILLAScontroller as a daemon') - parser.set_defaults(func=DaemonCommand.run) + parser.set_defaults(func=DaemonCommand.start) @staticmethod - def run(connection, args): - components = args.config.components + def start(connection, args): try: - d = ControllerMixin(connection, components) - d.run() + d = ControllerMixin(connection, args) + d.start() except KeyboardInterrupt: d.shutdown() except ConnectionError: diff --git a/villas/controller/commands/simulator.py b/villas/controller/commands/simulator.py index 0dcdeee..ff16e96 100644 --- a/villas/controller/commands/simulator.py +++ b/villas/controller/commands/simulator.py @@ -15,8 +15,8 @@ def _get_parameters(params, params_file): try: if params is not None: - parameters.update(yaml.loads(params, - Loader=yaml.FullLoader)) + parameters.update(yaml.load(params, + Loader=yaml.FullLoader)) if params_file is not None: with open(params_file) as f: parameters.update(yaml.load(f, Loader=yaml.FullLoader)) diff --git a/villas/controller/component.py b/villas/controller/component.py index 863e842..6055b2e 100644 --- a/villas/controller/component.py +++ b/villas/controller/component.py @@ -1,12 +1,16 @@ import logging -import kombu +import os.path import time -import socket -import os +import kombu import uuid import threading +import yaml +import jsonschema +from jsonschema import Draft202012Validator + +import importlib +import importlib.resources as resources -from villas.controller import __version__ as version from villas.controller.exceptions import SimulationException @@ -18,6 +22,7 @@ def __init__(self, **props): self.name = props.get('name') self.category = props.get('category') self.enabled = props.get('enabled', True) + self.location = props.get('location', '') self.uuid = props.get('uuid') # The manager component which manages this instances @@ -36,14 +41,15 @@ def __init__(self, **props): self.logger = logging.getLogger( f'villas.controller.{self.category}.{self.type}:{self.uuid}') - self.publish_status_interval = 2 + self._schema = self.load_schema() + + self.publish_status_interval = 30 self.publish_status_thread_stop = threading.Event() self.publish_status_thread = threading.Thread( target=self.publish_status_periodically) def on_ready(self): self.publish_status_thread.start() - pass def on_shutdown(self): if self.publish_status_thread.is_alive(): @@ -57,6 +63,8 @@ def set_mixin(self, mixin): self.mixin = mixin self.connection = mixin.connection + self.workdir = os.path.join(self.mixin.config.workdir, str(self.uuid)) + def get_consumer(self, channel): self.channel = channel @@ -69,12 +77,54 @@ def get_consumer(self, channel): 'x-match': 'any', **self.headers }, - durable=False + durable=False, + exclusive=True, + auto_delete=True ), no_ack=True, accept={'application/json'} ) + @property + def schema(self): + return self._schema + + def load_schema(self): + schema = {} + + try: + pkg_name = f'villas.controller.schemas.{self.category}.{self.type}' + pkg = importlib.import_module(pkg_name) + except ModuleNotFoundError: + self.logger.warn('Missing schemas!') + + return schema + + for res in resources.contents(pkg): + name, ext = os.path.splitext(res) + if resources.is_resource(pkg, res) and ext in ['.yaml', '.json']: + + fo = resources.open_text(pkg, res) + loadedschema = yaml.load(fo, yaml.SafeLoader) + + try: + Draft202012Validator.check_schema(loadedschema) + schema[name] = loadedschema + except jsonschema.exceptions.SchemaError: + self.logger.warn("Schema is invalid!") + + return schema + + def validate_parameters(self, action, parameters): + if action in self.schema: + validator = self.schema[action] + + validator.validate(parameters) + + else: + self.logger.warn('missing schema for action: %s', action) + return True # we really should fail here... + @property def headers(self): return { @@ -84,26 +134,12 @@ def headers(self): 'type': self.type } - @property - def schema(self): - return self.properties.get('schema', {}) - @property def status(self): - u = os.uname() status = { 'state': self._state, - 'version': version, - 'uptime': time.time() - self.started, - 'host': socket.gethostname(), - 'kernel': { - 'sysname': u.sysname, - 'nodename': u.nodename, - 'release': u.release, - 'version': u.version, - 'machine': u.machine - }, + **self.mixin.status, **self._status_fields } @@ -116,51 +152,77 @@ def status(self): **self.properties, **self.headers }, - 'schema': self.schema + 'schema': { + **self.schema + } } def on_message(self, message): self.logger.debug('Received message: %s', message.payload) if 'action' in message.payload: - self.run_action(message.payload['action'], message) + try: + self.run_action(message.payload['action'], message.payload) + except SimulationException: + pass + + message.ack() - def run_action(self, action, message): + def run_action(self, action, payload): if action == 'ping': self.logger.debug('Received action: %s', action) else: self.logger.info('Received action: %s', action) + parameters = payload.get('parameters', {}) + + try: + self.validate_parameters(action, parameters) + except jsonschema.ValidationError as ve: + e = { + 'instance': ve.instance, + 'path': ve.json_path, + } + + se = SimulationException(self, 'Failed to validate parameters', + **e) + + self.logger.error('Failed to validate action parameters against ' + 'schema: %s', ve.message) + self.change_to_error(ve.message, **e) + + raise se + try: if action == 'ping': - self.ping(message) + self.ping(payload) elif action == 'start': self.change_state('starting') - self.start(message) + self.start(payload) elif action == 'stop': self.change_state('stopping') - self.stop(message) + self.stop(payload) elif action == 'pause': self.change_state('pausing') - self.pause(message) + self.pause(payload) elif action == 'resume': self.change_state('resuming') - self.resume(message) + self.resume(payload) elif action == 'shutdown': self.change_state('shuttingdown') - self.shutdown(message) + self.shutdown(payload) elif action == 'reset': self.change_state('resetting') - self.reset(message) + self.reset(payload) else: raise SimulationException(self, 'Unknown action', action=action) except SimulationException as se: self.logger.error('SimulationException: %s', str(se)) - self.change_state('error', msg=se.msg, **se.info) - finally: - message.ack() + self.change_to_error(se.msg, **se.info) + + raise se def change_state(self, state, **kwargs): if self._state == state: @@ -173,26 +235,33 @@ def change_state(self, state, **kwargs): self.publish_status() + def change_to_error(self, msg, **details): + self.change_state('error', + error={ + 'msg': msg, + **details + }) + # Actions - def ping(self, message): + def ping(self, payload): self.publish_status() - def start(self, message): + def start(self, payload): raise SimulationException('The component can not be started') - def stop(self, message): + def stop(self, payload): raise SimulationException('The component can not be stopped') - def pause(self, message): + def pause(self, payload): raise SimulationException('The component can not be paused') - def resume(self, message): + def resume(self, payload): raise SimulationException('The component can not be resumed') - def shutdown(self, message): + def shutdown(self, payload): raise SimulationException('The component can not be shut down') - def reset(self, message): + def reset(self, payload): self.started = time.time() @staticmethod @@ -213,12 +282,15 @@ def from_dict(dict): def publish_status(self): if not self.mixin: + self.logger.warn('No mixin!') return self.mixin.publish(self.status, headers=self.headers) def publish_status_periodically(self): - self.logger.info('Start state publish thread') + self.logger.info('Start state publish thread, initial status: %s', + self.status) + self.publish_status() # publish the first update immediately while not self.publish_status_thread_stop.wait( self.publish_status_interval): diff --git a/villas/controller/components/gateways/villas_node.py b/villas/controller/components/gateways/villas_node.py index f5d46c5..19ff2cb 100644 --- a/villas/controller/components/gateways/villas_node.py +++ b/villas/controller/components/gateways/villas_node.py @@ -20,35 +20,35 @@ def __init__(self, manager, args): super().__init__(manager, **props) - def start(self, message): + def start(self, payload): try: self.manager.node.request('node.start', {'node': self.name}) self.manager.reconcile() except Exception as e: self.logger.warn('Failed to start node: %s', e) - def stop(self, message): + def stop(self, payload): try: self.manager.node.request('node.stop', {'node': self.name}) self.manager.reconcile() except Exception as e: self.logger.warn('Failed to stop node: %s', e) - def pause(self, message): + def pause(self, payload): try: self.manager.node.request('node.pause', {'node': self.name}) self.manager.reconcile() except Exception as e: self.logger.warn('Failed to pause node: %s', e) - def resume(self, message): + def resume(self, payload): try: self.manager.node.request('node.resume', {'node': self.name}) self.manager.reconcile() except Exception as e: self.logger.warn('Failed to resume node: %s', e) - def reset(self, message): + def reset(self, payload): try: self.manager.node.reset('node.restart', {'node': self.name}) self.manager.reconcile() diff --git a/villas/controller/components/manager.py b/villas/controller/components/manager.py index fb503ad..16db32c 100644 --- a/villas/controller/components/manager.py +++ b/villas/controller/components/manager.py @@ -1,4 +1,5 @@ from villas.controller.component import Component +from villas.controller.exceptions import SimulationException class Manager(Component): @@ -17,17 +18,6 @@ def status(self): **super().status } - @property - def schema(self): - return { - 'create': self.create_schema, - **super().schema - } - - @property - def create_schema(self): - return {} - @staticmethod def from_dict(dict): type = dict.get('type', 'generic') @@ -38,6 +28,9 @@ def from_dict(dict): if type == 'kubernetes': from villas.controller.components.managers import kubernetes return kubernetes.KubernetesManager(**dict) + if type == 'kubernetes-simple': + from villas.controller.components.managers import kubernetes_simple + return kubernetes_simple.KubernetesManagerSimple(**dict) if type == 'villas-node': from villas.controller.components.managers import villas_node # noqa E501 return villas_node.VILLASnodeManager(**dict) @@ -49,7 +42,12 @@ def from_dict(dict): def add_component(self, comp): if comp.uuid in self.mixin.components: - raise KeyError + existing_comp = self.mixin.components[comp.uuid] +# self.logger.error('UUID %s already exists, not added', comp.uuid) +# return + raise SimulationException(self, 'Component with same UUID ' + + 'already exists!', + component=existing_comp) comp.set_manager(self) @@ -66,18 +64,20 @@ def remove_component(self, comp): self.logger.info('Removed component %s', comp) - def run_action(self, action, message): + def run_action(self, action, payload): if action == 'create': - self.create(message) + self.create(payload) +# print(message.payload) +# self.create(message) elif action == 'delete': - self.delete(message) + self.delete(payload) else: - super().run_action(action, message) + super().run_action(action, payload) - def create(self, message): + def create(self, payload): raise NotImplementedError() - def delete(self, message): + def delete(self, payload): raise NotImplementedError() def on_shutdown(self): diff --git a/villas/controller/components/managers/generic.py b/villas/controller/components/managers/generic.py index 2e2dcb6..084592c 100644 --- a/villas/controller/components/managers/generic.py +++ b/villas/controller/components/managers/generic.py @@ -4,147 +4,8 @@ class GenericManager(Manager): - create_schema = { - '$schema': 'http://json-schema.org/draft-07/schema', - '$id': 'http://example.com/example.json', - 'type': 'object', - 'default': {}, - 'required': [ - 'name', - 'category', - 'location', - 'owner', - 'realm', - 'type', - 'api_url', - 'ws_url' - ], - 'properties': { - 'name': { - '$id': '#/properties/name', - 'type': 'string', - 'title': 'Component name', - 'default': 'New Component', - 'examples': [ - 'Generic Simulator #1' - ] - }, - 'owner': { - '$id': '#/properties/owner', - 'type': 'string', - 'title': 'Component owner', - 'default': '', - 'examples': [ - 'rmr', - 'svg' - ] - }, - 'realm': { - '$id': '#/properties/realm', - 'type': 'string', - 'title': 'Component realm', - 'default': '', - 'examples': [ - 'de.rwth-aachen.eonerc.acs' - ] - }, - 'category': { - '$id': '#/properties/category', - 'type': 'string', - 'title': 'Component category', - 'default': '', - 'examples': [ - 'simulator' - ] - }, - 'location': { - '$id': '#/properties/location', - 'type': 'string', - 'title': 'Component location', - 'default': '', - 'examples': [ - 'Richard\'s PC' - ] - }, - 'type': { - '$id': '#/properties/type', - 'type': 'string', - 'title': 'The type schema', - 'default': '', - 'examples': [ - 'generic' - ] - }, - 'uuid': { - '$id': '#/properties/uuid', - 'type': 'null', - 'title': 'The uuid schema', - 'default': None, - }, - - 'ws_url': { - '$id': '#/properties/ws_url', - 'type': 'string', - 'title': 'The ws_url schema', - 'default': '', - 'examples': [ - 'https://villas.k8s.eonerc.rwth-aachen.de/' - 'ws/relay/generic_1' - ] - }, - 'api_url': { - '$id': '#/properties/api_url', - 'type': 'string', - 'title': 'The api_url schema', - 'default': '', - 'examples': [ - 'https://villas.k8s.eonerc.rwth-aachen.de/api/ic/generic_1' - ] - }, - - 'shell': { - '$id': '#/properties/shell', - 'type': 'boolean', - 'title': 'The shell schema', - 'default': False, - 'examples': [ - True - ] - }, - 'whitelist': { - '$id': '#/properties/whitelist', - 'type': 'array', - 'title': 'The whitelist schema', - 'default': [], - 'examples': [ - [ - '/sbin/ping', - '^echo' - ] - ], - 'additionalItems': True, - 'items': { - '$id': '#/properties/whitelist/items', - 'anyOf': [ - { - '$id': '#/properties/whitelist/items/anyOf/0', - 'type': 'string', - 'title': 'The first anyOf schema', - 'default': '', - 'examples': [ - '/sbin/ping', - '^echo' - ] - } - ] - } - } - }, - 'additionalProperties': True - } - - def create(self, message): - component = Component.from_dict(message.payload.get('parameters')) + def create(self, payload): + component = Component.from_dict(payload.get('parameters')) try: self.add_component(component) @@ -152,8 +13,8 @@ def create(self, message): self.logger.error('A component with the UUID %s already exists', component.uuid) - def delete(self, message): - parameters = message.payload.get('parameters') + def delete(self, payload): + parameters = payload.get('parameters') uuid = parameters.get('uuid') try: diff --git a/villas/controller/components/managers/kubernetes.py b/villas/controller/components/managers/kubernetes.py index ed1500a..57f0b48 100644 --- a/villas/controller/components/managers/kubernetes.py +++ b/villas/controller/components/managers/kubernetes.py @@ -8,32 +8,17 @@ from villas.controller.components.simulators.kubernetes import KubernetesJob -def _match(stringA, stringB): - if stringA == stringB: +def _match(a, b): + if a == b: return True - elif len(stringA) < len(stringB): - return stringA in stringB - elif len(stringB) < len(stringA): - return stringB in stringA + elif len(a) < len(b): + return a in b + elif len(b) < len(a): + return b in a class KubernetesManager(Manager): - create_schema = { - '$schema': 'http://json-schema.org/draft-04/schema#', - 'properties': { - 'job': { - '$ref': 'https://kubernetesjsonschema.dev/v1.18.1/job.json' - }, - 'schema': { - 'type': 'object', - 'additionalProperties': { - '$ref': 'https://json-schema.org/draft-04/schema' - } - } - } - } - def __init__(self, **args): super().__init__(**args) @@ -51,7 +36,14 @@ def __init__(self, **args): else: k8s.config.load_incluster_config() - self.namespace = args.get('namespace', 'default') + self.namespace = os.environ.get('NAMESPACE') + if self.namespace: + self.namespace = self.namespace + '-controller' + else: + self.namespace = 'villas-controller' + + self.my_pod_name = os.environ.get('POD_NAME') + self.my_pod_uid = os.environ.get('POD_UID') self._check_namespace(self.namespace) @@ -118,17 +110,22 @@ def _run_event_watcher(self): if _match(comp.job.metadata.name, eo.involved_object.name): + if comp._state == 'stopping': + # incoming events are old repetitions + continue + if eo.reason == 'Completed': comp.change_state('stopping', True) elif eo.reason == 'Started': comp.pods.add(eo.involved_object.name) - comp.properties['pod_names'] = list(comp.pods) comp.change_state('running', True) elif eo.reason == 'BackoffLimitExceeded': - comp.change_state('error', error=eo.reason) + comp.change_to_error('failed to start job', + reason=eo.reason) elif eo.reason == 'Failed': if comp._state == 'running': - comp.change_state('error', error=eo.reason) + comp.change_to_error('failed to start job', + error=eo.reason) elif comp._state == 'starting': # wait for BackoffLimitExceeded event continue @@ -142,14 +139,14 @@ def _run_event_watcher(self): attempting reconnect..') time.sleep(1) - def create(self, message): - parameters = message.payload.get('parameters', {}) + def create(self, payload): + parameters = payload.get('parameters', {}) comp = KubernetesJob(self, **parameters) self.add_component(comp) - def delete(self, message): - parameters = message.payload.get('parameters') + def delete(self, payload): + parameters = payload.get('parameters') uuid = parameters.get('uuid') try: diff --git a/villas/controller/components/managers/kubernetes_simple.py b/villas/controller/components/managers/kubernetes_simple.py new file mode 100644 index 0000000..1fb27a7 --- /dev/null +++ b/villas/controller/components/managers/kubernetes_simple.py @@ -0,0 +1,79 @@ +from villas.controller.components.managers.kubernetes import KubernetesManager +from villas.controller.components.simulators.kubernetes import KubernetesJob + +parameters_simple = { + 'type': 'kubernetes', + 'category': 'simulator', + 'uuid': None, + 'name': '', + 'properties': { + 'job': { + 'apiVersion': 'batch/v1', + 'kind': 'Job', + 'metadata': { + 'name': '' + }, + 'spec': { + 'activeDeadlineSeconds': 3600, + 'backoffLimit': 2, + 'template': { + 'spec': { + 'restartPolicy': 'Never', + 'containers': [ + { + 'image': '', + 'imagePullPolicy': 'Always', + 'name': 'jobcontainer', + 'securityContext': { + 'privileged': True + } + } + ] + } + } + } + } + } +} + + +class KubernetesManagerSimple(KubernetesManager): + + def __init__(self, **args): + super().__init__(**args) + + def create(self, payload): + params = payload.get('parameters', {}) + sim_name = payload.get('name', 'Kubernetes Simulator') + jobname = params.get('jobname', 'noname') + adls = params.get('activeDeadlineSeconds', 3600) + if type(adls) is str: + adls = int(adls) + image = params.get('image') + name = params.get('name') + uuid = params.get('uuid') + self.logger.info('uuid:') + self.logger.info(uuid) + + if image is None: + self.logger.error('No image given, will try super.create') + super().create(payload) + return + + parameters = parameters_simple + parameters['name'] = sim_name + job = parameters['properties']['job'] + job['metadata']['name'] = jobname + job['spec']['activeDeadlineSeconds'] = adls + job['spec']['template']['spec']['containers'][0]['image'] = image + + parameters['job'] = job + + if name: + parameters['name'] = name + + if uuid: + parameters['uuid'] = uuid + + comp = KubernetesJob(self, **parameters) + self.add_component(comp) diff --git a/villas/controller/components/managers/villas_node.py b/villas/controller/components/managers/villas_node.py index 3fd9111..3f25b45 100644 --- a/villas/controller/components/managers/villas_node.py +++ b/villas/controller/components/managers/villas_node.py @@ -10,11 +10,14 @@ class VILLASnodeManager(Manager): def __init__(self, **args): self.autostart = args.get('autostart', False) - self.api_url = args.get('url', 'http://localhost:8080') + '/api/v1' - self.api_url_external = args.get('url_external', self.api_url) + self.api_url = args.get('api_url', 'http://localhost:8080') + self.api_url_external = args.get('api_url_external', self.api_url) args['api_url'] = self.api_url + self.thread_stop = threading.Event() + self.thread = threading.Thread(target=self.reconcile_periodically) + self.node = Node(**args) self._status = self.node.status @@ -23,17 +26,16 @@ def __init__(self, **args): super().__init__(**args) - self.thread_stop = threading.Event() - self.thread = threading.Thread(target=self.reconcile_periodically) - self.thread.start() - def reconcile_periodically(self): while not self.thread_stop.wait(2): self.reconcile() def reconcile(self): try: - for node in self.node.nodes: + self._status = self.node.status + self._nodes = self.node.nodes + + for node in self._nodes: self.logger.debug('Found node %s on gateway: %s', node['name'], node) @@ -50,23 +52,23 @@ def reconcile(self): self.change_state('running') except Exception as e: - self.change_state('error', error=str(e)) + self.change_to_error('failed to reconcile', + exception=str(e), + args=str(e.args)) @property def status(self): - return { - **super().status, - 'villas_node_version': self.node.get_version() - } + status = super().status + + status['status']['villas_node_version'] = self._status.get('version') + + return status def on_ready(self): if self.autostart and not self.node.is_running(): self.start() - try: - self._status = self.node.status - except Exception: - self.change_state('error', error='VILLASnode not installed') + self.thread.start() super().on_ready() @@ -76,12 +78,12 @@ def on_shutdown(self): return super().on_shutdown() - def start(self, message): + def start(self, payload): self.node.start() self.change_state('starting') - def stop(self, message): + def stop(self, payload): if self.node.is_running(): self.node.stop() @@ -91,7 +93,7 @@ def stop(self, message): for node in self.nodes: node.change_state('shutdown') - def pause(self, message): + def pause(self, payload): self.node.pause() self.change_state('paused') @@ -100,8 +102,8 @@ def pause(self, message): for node in self.nodes: node.change_state('paused') - def resume(self, message): + def resume(self, payload): self.node.resume() - def reset(self, message): + def reset(self, payload): self.node.restart() diff --git a/villas/controller/components/managers/villas_relay.py b/villas/controller/components/managers/villas_relay.py index eb76556..5ba69b4 100644 --- a/villas/controller/components/managers/villas_relay.py +++ b/villas/controller/components/managers/villas_relay.py @@ -12,7 +12,9 @@ def __init__(self, **args): self.autostart = args.get('autostart', False) self.api_url = args.get('api_url', 'http://localhost:8088') + '/api/v1' self.api_url_external = args.get('api_url_external', self.api_url) - self._version = '-1' + + self.thread_stop = threading.Event() + self.thread = threading.Thread(target=self.reconcile_periodically) uuid = self.get_uuid() if uuid is not None: @@ -22,14 +24,6 @@ def __init__(self, **args): self.properties['api_url'] = self.api_url_external - self.thread_stop = threading.Event() - self.thread = threading.Thread(target=self.reconcile_periodically) - self.thread.start() - - def reconcile_periodically(self): - while not self.thread_stop.wait(2): - self.reconcile() - def get_uuid(self): try: r = requests.get(self.api_url) @@ -46,55 +40,68 @@ def get_status(self): return r.json() except requests.exceptions.RequestException: - self.change_state('error', error='Failed to contact VILLASrelay') + self.change_to_error('Failed to contact VILLASrelay') return None + def reconcile_periodically(self): + while not self.thread_stop.wait(2): + self.reconcile() + def reconcile(self): - status = self.get_status() - if status is None: - return + try: + self._status = self.get_status() - if self._state == 'error': - self.change_state('idle') + active_sessions = self._status['sessions'] + active_uuids = {session['uuid'] for session in active_sessions} + existing_uuids = set(self.components.keys()) - self._status = status - self._version = self._status['version'] + # Add new sessions and update existing ones + for session in active_sessions: + uuid = session['uuid'] - active_sessions = self._status['sessions'] - active_uuids = {session['uuid'] for session in active_sessions} - existing_uuids = set(self.components.keys()) + if uuid in self.components: + comp = self.components[uuid] + else: + comp = VILLASrelayGateway(self, session) + self.add_component(comp) - # Add new sessions and update existing ones - for session in active_sessions: - uuid = session['uuid'] + comp.change_state('running') - if uuid in self.components: + # Find vanished sessions + for uuid in existing_uuids - active_uuids: comp = self.components[uuid] - comp.change_state('running') - else: - comp = VILLASrelayGateway(self, session) - - self.add_component(comp) + comp.change_state('stopped') - # Find vanished sessions - for uuid in existing_uuids - active_uuids: - comp = self.components[uuid] + # We dont remove the components here + # So that they dont get removed from the backend + # and get recreated with the same UUID later + # self.remove_component(comp) - self.remove_component(comp) + if len(active_sessions) > 0: + self.change_state('running') + else: + self.change_state('paused') - if len(self.components) > 0: - self.change_state('running') - else: - self.change_state('paused') + except Exception as e: + self.change_to_error('failed to reconcile', + exception=str(e), + args=e.args) @property def status(self): - return { - 'villas_relay_version': self._version, - **super().status - } + status = super().status + + try: + version = status.get('version') + status['status']['villas_relay_version'] = version + except Exception as e: + self.change_to_error('failed to get version from VILLASrelay', + exception=str(e), + args=str(e.args)) + + return status def on_shutdown(self): self.thread_stop.set() @@ -106,12 +113,6 @@ def on_ready(self): if self.autostart: os.system('villas-relay') - try: - status = self.get_status() - - self._version = status['version'] - - except Exception: - self.change_state('error', error='Failed to contact VILLASrelay') + self.thread.start() super().on_ready() diff --git a/villas/controller/components/simulator.py b/villas/controller/components/simulator.py index 17ba7c0..573d985 100644 --- a/villas/controller/components/simulator.py +++ b/villas/controller/components/simulator.py @@ -18,23 +18,12 @@ def __init__(self, **args): self.results = None @property - def schema(self): - return { - 'start': self.start_schema, - **super().schema - } - - @property - def start_schema(self): - return {} - - @property - def state(self): + def status(self): return { 'model': self.model, 'results': self.results, - **super().state + **super().status } @staticmethod @@ -98,32 +87,27 @@ def change_state(self, state, force=False, **kwargs): super().change_state(state, **kwargs) # Actions - def start(self, message): + def start(self, payload): self.started = time.time() self.simuuid = uuid.uuid4() - if 'parameters' in message.payload: - self.params = message.payload['parameters'] - - if 'model' in message.payload: - self.model = message.payload['model'] - - if 'results' in message.payload: - self.results = message.payload['results'] + self.params = payload.get('parameters', {}) + self.model = payload.get('model') + self.results = payload.get('results') - self.workdir = '/var/villas/controller/simulators/' + \ - str(self.uuid) + '/simulation/' + str(self.simuuid) + self.sim_workdir = os.path.join(self.workdir, 'simulation', + str(self.simuuid)) - self.logdir = self.workdir + '/Logs/' - self.logger.info('Target working directory: %s' % self.workdir) + self.sim_logdir = self.sim_workdir + '/Logs/' + self.logger.info('Simulation working directory: %s' % self.sim_workdir) try: - os.makedirs(self.logdir) - os.chdir(self.logdir) + os.makedirs(self.sim_logdir) + os.chdir(self.sim_logdir) except Exception as e: raise SimulationException(self, 'Failed to create and change to ' 'working directory: %s ( %s )' % - (self.logdir, e)) + (self.sim_logdir, e)) def _upload(self, filename): url = self.results['url'] @@ -157,9 +141,9 @@ def _unzip_files(self, filename): def upload_results(self): try: - filename = self.workdir + '/results.zip' + filename = os.path.join(self.sim_workdir, 'results.zip') with zipfile.ZipFile(filename, 'w') as results_zip: - for sub in os.scandir(self.logdir): + for sub in os.scandir(self.sim_logdir): results_zip.write(sub) results_zip.close() diff --git a/villas/controller/components/simulators/dpsim.py b/villas/controller/components/simulators/dpsim.py index 8eb954b..5ce2b01 100644 --- a/villas/controller/components/simulators/dpsim.py +++ b/villas/controller/components/simulators/dpsim.py @@ -1,6 +1,4 @@ import dpsim -import time -import socket import os from villas.controller.components.simulator import Simulator @@ -8,103 +6,7 @@ class DPsimSimulator(Simulator): - start_schema = { - '$schema': 'http://json-schema.org/draft-04/schema#', - 'properties': { - 'blocking': { - 'title': 'Block execution of each time-step until the ' - 'arrival of new data on the interfaces', - 'type': 'boolean' - }, - 'duration': { - 'examples': [ - 3600.0 - ], - 'title': 'Simulation duration [s]', - 'type': 'number' - }, - 'log-level': { - 'enum': ['NONE', 'INFO', 'DEBUG', 'WARN', 'ERR'], - 'title': 'Logging level', - 'type': 'string' - }, - 'name': { - 'examples': [ - 'Simulation_1' - ], - 'title': 'Name of log files', - 'type': 'string' - }, - 'options': { - 'additionalProperties': { - 'type': 'number' - }, - 'examples': [ - { - 'Ld': 0.2299, - 'Lq': 0.0 - } - ], - 'title': 'User-definable options', - 'type': 'object' - }, - 'scenario': { - 'title': 'Scenario selection', 'type': 'integer' - }, - 'solver-domain': { - 'enum': ['SP', 'DP', 'EMT'], - 'title': 'Domain of solver', - 'type': 'string' - }, - 'solver-type': { - 'enum': ['NRP', 'MNA'], - 'title': 'Type of solver', - 'type': 'string' - }, - 'start-at': { - 'description': 'The date must be given as an ISO8601 ' - 'formatted string', - 'examples': ['2004-06-14T23:34:30'], - 'format': 'date-time', - 'title': 'Start time of real-time simulation', - 'type': 'string' - }, - 'start-in': { - 'title': 'Start simulation relative to current time [s]', - 'type': 'number' - }, - 'start-synch': { - 'title': 'Sychronize start of simulation ' - 'with external interfaces', - 'type': 'boolean' - }, - 'steady-init': { - 'title': 'Perform a steady-state initialization prior ' - 'to the simulation', - 'type': 'boolean' - }, - 'system-freq': { - 'examples': [ - 50.0, - 60.0 - ], - 'title': 'System frequency [Hz]', - 'type': 'number' - }, - 'timestep': { - 'examples': [5e-05], - 'title': 'Simulation time-step [s]', - 'type': 'number' - } - }, - 'required': ['name'], - 'type': 'object' - } - def __init__(self, **args): - args['type'] = 'dpsim' - - self.started = time.time() self.sim = None super().__init__(**args) @@ -118,25 +20,14 @@ def headers(self): return headers - @property - def state(self): - state = super().state - - state['uptime'] = time.time() - self.started - state['version'] = '0.1.0' - state['host'] = socket.getfqdn() - state['kernel'] = os.uname() - - return state - def load_cim(self, fp): if fp is not None: self.sim = dpsim.load_cim(fp.name) self.logger.info(self.sim) os.unlink(fp.name) - def start(self, message): - fp = self.download_model(message) + def start(self, payload): + fp = self.download_model(payload) if fp: self.load_cim(fp) @@ -147,14 +38,14 @@ def start(self, message): if self.sim.start() is None: self.change_state('running') else: - self.change_state('error') + self.change_to_error('failed to start simulation') self.logger.warn('Attempt to start simulator failed.' 'State is %s', self._state) else: self.logger.warn('Attempted to start non-stopped simulator.' 'State is %s', self._state) - def stop(self, message): + def stop(self, payload): if self._state == 'running': self.logger.info('Stopping simulation...') @@ -169,7 +60,7 @@ def stop(self, message): self.logger.warn('Attempted to stop non-stopped simulator.' 'State is %s', self._state) - def pause(self, message): + def pause(self, payload): if self._state == 'running': self.logger.info('Pausing simulation...') @@ -192,7 +83,7 @@ def pause(self, message): self.logger.warn('Attempted to pause non-running simulator.' 'State is ' + self._state) - def resume(self, message): + def resume(self, payload): if self._state == 'paused': self.logger.info('Resuming simulation...') diff --git a/villas/controller/components/simulators/dummy.py b/villas/controller/components/simulators/dummy.py index 06597b0..028ed72 100644 --- a/villas/controller/components/simulators/dummy.py +++ b/villas/controller/components/simulators/dummy.py @@ -5,26 +5,6 @@ class DummySimulator(Simulator): - start_schema = { - '$schema': 'http://json-schema.org/draft-07/schema', - 'type': 'object', - 'default': {}, - 'required': [ - 'runtime' - ], - 'properties': { - 'runtime': { - '$id': '#/properties/runtime', - 'description': 'The run time of the simulation', - 'type': 'number', - 'default': 1.0, - 'examples': [ - 3.0 - ] - } - } - } - def __init__(self, **args): super().__init__(**args) @@ -38,34 +18,34 @@ def _schedule_state_transition(self, state, time=1.0): self.timer = threading.Timer(time, self.change_state, args=[state]) self.timer.start() - def start(self, message): - super().start(message) + def start(self, payload): + super().start(payload) runtime = self.params.get('runtime', 1.0) self._schedule_state_transition('running', runtime) - def stop(self, message): - super().stop(message) + def stop(self, payload): + super().stop(payload) self._schedule_state_transition('idle') - def pause(self, message): - super().pause(message) + def pause(self, payload): + super().pause(payload) self._schedule_state_transition('paused') - def resume(self, message): - super().resume(message) + def resume(self, payload): + super().resume(payload) self._schedule_state_transition('running') - def shutdown(self, message): - super().shutdown(message) + def shutdown(self, payload): + super().shutdown(payload) self._schedule_state_transition('shutdown') - def reset(self, message): - super().reset(message) + def reset(self, payload): + super().reset(payload) self._schedule_state_transition('idle') diff --git a/villas/controller/components/simulators/generic.py b/villas/controller/components/simulators/generic.py index 130b21b..174ff41 100644 --- a/villas/controller/components/simulators/generic.py +++ b/villas/controller/components/simulators/generic.py @@ -24,19 +24,16 @@ def __del__(self): if self.timer: self.timer.cancel() - if self.thread: - self - @property - def state(self): - state = super().state + def status(self): + status = super().status - state['return_code'] = self.return_code + status['status']['return_code'] = self.return_code - return state + return status - def start(self, message): - super().start(message) + def start(self, payload): + super().start(payload) self.logger.info('Working directory: %s', os.getcwd()) path = self.download_model() @@ -45,7 +42,7 @@ def start(self, message): raise SimulationException(self, 'Child process is already running') try: - params = message.payload['parameters'] + params = payload['parameters'] self.thread = threading.Thread(target=GenericSimulator.run, args=(self, params, path)) @@ -57,8 +54,7 @@ def start(self, message): def check_state(self, state): if self._state != state: - self.change_state('error', - msg=f'Failed to transition to state "{state}"!') + self.change_to_error('Failed to transition to state', state=state) def check_state_deferred(self, state, timeout=5): self.timer = threading.Timer(timeout, self.check_state, args=[state]) @@ -146,11 +142,11 @@ def run(self, params, path): # GenericSimulator.run() is executed in a separate thread. # We therefore want to catch exceptions here. except SimulationException as se: - self.change_state('error', msg=se.msg, **se.info) + self.change_to_error(se.msg, **se.info) self.child = None - def reset(self, message): + def reset(self, payload): # Don't send a signal if the child does not exist if self.child is None: return @@ -173,7 +169,7 @@ def reset(self, message): # we will transition into the error state self.check_state_deferred('idle', 5) - def stop(self, message): + def stop(self, payload): send_cont = False if self.child is None: @@ -196,7 +192,7 @@ def stop(self, message): # we will transition into the error state self.check_state_deferred('stopped', 5) - def pause(self, message): + def pause(self, payload): # Suspend command if self.child is None: raise SimulationException(self, 'No child process is running') @@ -206,7 +202,7 @@ def pause(self, message): self.change_state('paused') self.logger.info('Child process has been paused') - def resume(self, message): + def resume(self, payload): # Let process run if self.child is None: raise SimulationException(self, 'No child process is running') diff --git a/villas/controller/components/simulators/kubernetes.py b/villas/controller/components/simulators/kubernetes.py index e0decee..9a6373e 100644 --- a/villas/controller/components/simulators/kubernetes.py +++ b/villas/controller/components/simulators/kubernetes.py @@ -1,33 +1,25 @@ +from __future__ import annotations +from typing import TYPE_CHECKING import json import signal -from copy import deepcopy -import collections +# from copy import deepcopy +# import collections +import time import kubernetes as k8s from villas.controller.components.simulator import Simulator from villas.controller.exceptions import SimulationException +from villas.controller.util import merge - -def merge(dict1, dict2): - ''' Return a new dictionary by merging two dictionaries recursively. ''' - - result = deepcopy(dict1) - - for key, value in dict2.items(): - if isinstance(value, collections.Mapping): - result[key] = merge(result.get(key, {}), value) - elif value is None: - del result[key] - else: - result[key] = deepcopy(dict2[key]) - - return result +if TYPE_CHECKING: + from villas.controller.components.managers.kubernetes \ + import KubernetesManager class KubernetesJob(Simulator): - def __init__(self, manager, **args): + def __init__(self, manager: KubernetesManager, **args): super().__init__(**args) self.manager = manager @@ -39,19 +31,45 @@ def __init__(self, manager, **args): self.job = None self.pods = set() + self.cm_name = '' self.custom_schema = props.get('schema', {}) + @property + def status(self): + status = super().status + + status['status']['pod_names'] = list(self.pods) + + return status + @property def schema(self): - return { - **self.custom_schema, - **super().schema - } + if (super().schema): + return { + **self.custom_schema, + **super().schema + } + else: + return { + **self.custom_schema + } + + def _owner(self): + # if self.manager.my_pod_name and self.manager.my_pod_uid: + # return k8s.client.V1OwnerReference( + # kind='Pod', + # name=self.manager.my_pod_name, + # uid=self.manager.my_pod_uid, + # api_version='v1' + # ) + + return None def _prepare_job(self, job, payload): # Create config map cm = self._create_config_map(payload) + self.cm_name = cm.metadata.name # Create volumes v = k8s.client.V1Volume( @@ -93,13 +111,28 @@ def _prepare_job(self, job, payload): job.metadata.generate_name = name + '-' job.metadata.name = None + if o := self._owner(): + job.metadata.owner_references = [o] + if job.metadata.labels is None: job.metadata.labels = {} job.metadata.labels.update({ - 'controller': 'villas', - 'controller-uuid': self.manager.uuid, - 'uuid': self.uuid + 'app.kubernetes.io/part-of': 'villas', + 'app.kubernetes.io/managed-by': 'villas-controller', + 'app.kubernetes.io/component': 'infrastructure-component', + + 'villas.fein-aachen.org/ic-manager-uuid': self.manager.uuid, + 'villas.fein-aachen.org/ic-uuid': self.uuid + }) + + if job.metadata.annotations is None: + job.metadata.annotations = {} + + job.metadata.annotations.update({ + 'villas.fein-aachen.org/name': self.name, + 'villas.fein-aachen.org/location': self.location, + 'villas.fein-aachen.org/realm': self.realm }) return job @@ -116,6 +149,9 @@ def _create_config_map(self, payload): } ) + if o := self._owner(): + self.cm.metadata.owner_references = [o] + return c.create_namespaced_config_map( namespace=self.manager.namespace, body=self.cm @@ -142,11 +178,14 @@ def _delete_job(self): self.job = None self.properties['job_name'] = None self.properties['pod_names'] = [] + # job isn't immediately deleted + # let the user see something is happening + time.sleep(7) - def start(self, message): + def start(self, payload): + # Delete prior job self._delete_job() - payload = message.payload job = payload.get('job', {}) job = merge(self.jobdict, job) v1job = self._prepare_job(job, payload) @@ -164,8 +203,8 @@ def start(self, message): self.properties['namespace'] = self.manager.namespace def stop(self, message): + self.change_state('stopping', True) self._delete_job() - self.change_state('idle') def _send_signal(self, sig): @@ -187,17 +226,19 @@ def _send_signal_to_pod(self, sig, podname): self.logger.debug('Sent signal %d to container: %s', sig, resp) - def pause(self, message): + def pause(self, payload): self._send_signal(signal.SIGSTOP) self.change_state('paused') - def resume(self, message): + def resume(self, payload): self._send_signal(signal.SIGCONT) self.change_state('running') - def reset(self, message): + def reset(self, payload): + self.change_state('resetting', True) + self.mixin.drain_publish_queue() self._delete_job() - super().reset(message) + super().reset(payload) self.change_state('idle') diff --git a/villas/controller/components/simulators/rscad.py b/villas/controller/components/simulators/rscad.py index 6c0bb74..6c4e3b3 100644 --- a/villas/controller/components/simulators/rscad.py +++ b/villas/controller/components/simulators/rscad.py @@ -1,5 +1,4 @@ import socket -import time from villas.controller.components.simulator import Simulator @@ -14,25 +13,26 @@ def __init__(self, host, number): self.name = f'{host}({number})' @property - def state(self): + def status(self): try: user, case = self.ping() if len(user) > 0: - state = { + status = { 'status': 'running', 'user': user, 'case': case } else: - state = { + status = { 'status': 'free' } except socket.timeout: - state = { + status = { 'status': 'offline' } - state['time'] = int(round(time.time() * 1000)) - - return state + return { + **super().status, + **status + } diff --git a/villas/controller/config.py b/villas/controller/config.py index 0424e85..cf0f50b 100644 --- a/villas/controller/config.py +++ b/villas/controller/config.py @@ -2,6 +2,9 @@ import argparse import logging import os +import dotmap +import uuid +from typing import List from os import getcwd from xdg import ( @@ -10,6 +13,7 @@ ) from villas.controller.component import Component +from villas.controller.util import merge LOGGER = logging.getLogger(__name__) @@ -28,6 +32,20 @@ def __call__(self, arg): class Config: + DEFAULT_CONFIG = { + 'broker': { + 'url': 'amqp://localhost:5672/%2F' + }, + 'api': { + 'enabled': True, + 'port': 8089 + }, + 'components': [], + # 'workdir': '/var/villas/controller/simulators/' + 'workdir': os.getcwd(), + 'uuid': str(uuid.uuid4()) + } + DEFAULT_PATHS = xdg_config_dirs() + [ xdg_config_home(), getcwd(), @@ -39,11 +57,17 @@ def __init__(self, fp=None): fn = self.find_default_path() if fn: with open(fn) as fp: - self.dict = yaml.load(fp, Loader=yaml.FullLoader) + self.load(fp) else: - pass # Start without config + self.config = {} # Start without config else: - self.dict = yaml.load(fp, Loader=yaml.FullLoader) + self.load(fp) + + def load(self, fp): + config = yaml.load(fp, Loader=yaml.FullLoader) + merged = merge(self.DEFAULT_CONFIG, config) + + self.config = dotmap.DotMap(merged) def find_default_path(self, filename='config', suffixes=['json', 'yaml', 'yml']): @@ -54,8 +78,11 @@ def find_default_path(self, filename='config', return fn @property - def components(self): - return [Component.from_dict(c) for c in self.dict['components']] + def components(self) -> List[Component]: + return [Component.from_dict(c) for c in self.config.components] + + def __getattr__(self, attr): + return self.config.get(attr) def check(self): uuids = [c.uuid for c in self.components] diff --git a/villas/controller/controller.py b/villas/controller/controller.py index d9c6481..95f058a 100644 --- a/villas/controller/controller.py +++ b/villas/controller/controller.py @@ -1,17 +1,25 @@ +import time import logging import socket import queue +import os import kombu.mixins +from villas.controller.api import Api +from villas.controller import __version__ as version +from villas.controller.config import Config from villas.controller.components.managers.generic import GenericManager + LOGGER = logging.getLogger(__name__) class ControllerMixin(kombu.mixins.ConsumerProducerMixin): - def __init__(self, connection, components): - self.components = {c.uuid: c for c in components if c.enabled} + def __init__(self, connection, args): + self.args = args + self.config: Config = args.config + self.connection = connection self.exchange = kombu.Exchange(name='villas', type='headers', @@ -19,16 +27,18 @@ def __init__(self, connection, components): self.publish_queue = queue.Queue() + # Components are activated by first call to on_iteration() + self.components = {} + self.active_components = {} + manager = self.add_managers() - for _, comp in self.components.items(): + comps = [c for c in self.config.components if c.enabled] + for comp in comps: LOGGER.info('Adding %s', comp) - comp.set_manager(manager) + manager.add_component(comp) - # Components are activated by first call to on_iteration() - self.active_components = {} - - def get_consumers(self, Consumer, channel): + def get_consumers(self, _, channel): return map(lambda comp: comp.get_consumer(channel), self.active_components.values()) @@ -47,9 +57,12 @@ def add_managers(self): type='generic', category='manager', name='Generic Manager', - location=socket.gethostname() + location=socket.gethostname(), + uuid=self.config.uuid ) + mgr.mixin = self + self.components[mgr.uuid] = mgr else: mgr = mgrs[0] @@ -59,7 +72,7 @@ def add_managers(self): def publish(self, body, **kwargs): self.publish_queue.put((body, kwargs)) - def _drain_publish_queue(self): + def drain_publish_queue(self): try: while msg := self.publish_queue.get(False): body = msg[0] @@ -71,10 +84,12 @@ def _drain_publish_queue(self): self.producer.publish(body, **kwargs) except queue.Empty: pass + except TimeoutError: + LOGGER.warn('TimeoutError, let kombu reconnect..') def on_iteration(self): # Drain publish queue - self._drain_publish_queue() + self.drain_publish_queue() # Update components added = self.components.keys() - self.active_components.keys() @@ -100,12 +115,18 @@ def on_iteration(self): self.active_components = self.components.copy() - def run(self): + def start(self): + self.started = time.time() + + if self.config.api.enabled: + self.api = Api(self) + self.api.start() + self.should_terminate = False while not self.should_terminate: self.should_stop = False - LOGGER.info('Startig mixing for %d components', + LOGGER.info('Starting mixing for %d components', len(self.active_components)) super().run() @@ -119,3 +140,20 @@ def shutdown(self): # Publish last status updates before shutdown self._drain_publish_queue() self.should_terminate = True + + @property + def status(self): + u = os.uname() + + return { + 'version': version, + 'uptime': time.time() - self.started, + 'host': socket.gethostname(), + 'kernel': { + 'sysname': u.sysname, + 'nodename': u.nodename, + 'release': u.release, + 'version': u.version, + 'machine': u.machine + }, + } diff --git a/villas/controller/handlers/component.py b/villas/controller/handlers/component.py new file mode 100644 index 0000000..cb91e60 --- /dev/null +++ b/villas/controller/handlers/component.py @@ -0,0 +1,51 @@ +import json +from tornado.web import HTTPError +from http import HTTPStatus +from functools import wraps + + +from villas.controller.api import RequestHandler +from villas.controller.exceptions import SimulationException + + +def with_component(f): + + @wraps(f) + def wrapper(self, uuid): + try: + component = self.controller.components[uuid] + + f(self, component) + except KeyError: + raise HTTPError(HTTPStatus.NOT_FOUND) + + return wrapper + + +class ComponentRequestHandler(RequestHandler): + + @with_component + def get(self, component): + self.write(component.status) + + @with_component + def post(self, component): + payload = json.loads(self.request.body) + + action = payload.get('action') + if action is None: + raise HTTPError(HTTPStatus.BAD_REQUEST) + + try: + component.run_action(action, payload) + self.write(component.status) + + except SimulationException as se: + self.write({ + 'exception': { + 'msg': se.msg, + 'args': se.args + }, + **component.status + }) + self.send_error(HTTPStatus.BAD_REQUEST) diff --git a/villas/controller/handlers/health.py b/villas/controller/handlers/health.py new file mode 100644 index 0000000..d8be63e --- /dev/null +++ b/villas/controller/handlers/health.py @@ -0,0 +1,9 @@ +from villas.controller.api import RequestHandler + + +class HealthRequestHandler(RequestHandler): + + def get(self): + self.write({ + 'status': 'ok' + }) diff --git a/villas/controller/handlers/main.py b/villas/controller/handlers/main.py new file mode 100644 index 0000000..cbc07d7 --- /dev/null +++ b/villas/controller/handlers/main.py @@ -0,0 +1,12 @@ +from villas.controller.api import RequestHandler + + +class MainRequestHandler(RequestHandler): + + def get(self): + self.write({ + 'components': list(self.controller.components.keys()), + 'status': { + **self.controller.status + } + }) diff --git a/villas/controller/main.py b/villas/controller/main.py index bbd9c57..4f362df 100644 --- a/villas/controller/main.py +++ b/villas/controller/main.py @@ -103,12 +103,12 @@ def main(): setup_logging(args) - if args.broker: - broker_url = args.broker - else: - broker = args.config.dict.get('broker', {}) - broker_url = broker.get('url', 'amqp://guest:guest@localhost/%2F') - + try: + broker_url = args.broker or args.config.broker.url + except AttributeError: + LOGGER.error('A broker URL must be provided either via a command line ' + 'parameter or a configuration file') + return -1 try: with kombu.Connection(broker_url, connect_timeout=3) as c: LOGGER.info(f'Connecting to: {broker_url}') diff --git a/villas/controller/schemas/__init__.py b/villas/controller/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/villas/controller/schemas/manager/__init__.py b/villas/controller/schemas/manager/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/villas/controller/schemas/manager/generic/__init__.py b/villas/controller/schemas/manager/generic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/villas/controller/schemas/manager/generic/create.yaml b/villas/controller/schemas/manager/generic/create.yaml new file mode 100644 index 0000000..c936127 --- /dev/null +++ b/villas/controller/schemas/manager/generic/create.yaml @@ -0,0 +1,80 @@ +--- +$schema: http://json-schema.org/draft-07/schema +additionalProperties: true + +type: object +required: + - name + - category + - location + - owner + - realm + - type + - api_url + - ws_url + +properties: + api_url: + examples: + - https://villas.k8s.eonerc.rwth-aachen.de/api/ic/generic_1 + title: The api_url schema + type: string + category: + examples: + - simulator + title: Component category + type: string + location: + examples: + - Richard's PC + title: Component location + type: string + name: + default: New Component + examples: + - 'Generic Simulator #1' + title: Component name + type: string + owner: + examples: + - rmr + - svg + title: Component owner + type: string + realm: + examples: + - de.rwth-aachen.eonerc.acs + title: Component realm + type: string + shell: + examples: + - true + title: The shell schema + type: boolean + type: + examples: + - generic + title: The type schema + type: string + uuid: + title: The uuid schema + type: 'null' + whitelist: + additionalItems: true + examples: + - - /sbin/ping + - ^echo + items: + anyOf: + - examples: + - /sbin/ping + - ^echo + title: The first anyOf schema + type: string + title: The whitelist schema + type: array + ws_url: + examples: + - https://villas.k8s.eonerc.rwth-aachen.de/ws/relay/generic_1 + title: The ws_url schema + type: string diff --git a/villas/controller/schemas/manager/kubernetes-simple/__init__.py b/villas/controller/schemas/manager/kubernetes-simple/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/villas/controller/schemas/manager/kubernetes-simple/create.yaml b/villas/controller/schemas/manager/kubernetes-simple/create.yaml new file mode 100644 index 0000000..9747f77 --- /dev/null +++ b/villas/controller/schemas/manager/kubernetes-simple/create.yaml @@ -0,0 +1,28 @@ +--- +$schema: http://json-schema.org/draft-04/schema# + +type: object +title: 'Simple Kubernetes Job' +required: + - image +properties: + name: + type: string + title: 'Simulator Name' + default: 'Kubernetes Simulator' + uuid: + type: string + title: UUID + default: 8dfd03b2-1c78-11ec-9621-0242ac130002 + jobname: + type: string + title: 'Jobname' + default: myjob + activeDeadlineSeconds: + type: number + title: activeDeadlineSeconds + default: 3600 + image: + type: string + title: Image + default: perl diff --git a/villas/controller/schemas/manager/kubernetes/create.yaml b/villas/controller/schemas/manager/kubernetes/create.yaml new file mode 100644 index 0000000..60c9bac --- /dev/null +++ b/villas/controller/schemas/manager/kubernetes/create.yaml @@ -0,0 +1,10 @@ +--- +$schema: http://json-schema.org/draft-04/schema# + +type: object +properties: + job: + $ref: https://kubernetesjsonschema.dev/v1.18.1/job.json + schema: + additionalProperties: + $ref: https://json-schema.org/draft-04/schema diff --git a/villas/controller/schemas/simulator/dpsim/__init__.py b/villas/controller/schemas/simulator/dpsim/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/villas/controller/schemas/simulator/dpsim/start.yaml b/villas/controller/schemas/simulator/dpsim/start.yaml new file mode 100644 index 0000000..20ceb41 --- /dev/null +++ b/villas/controller/schemas/simulator/dpsim/start.yaml @@ -0,0 +1,81 @@ +--- +$schema: http://json-schema.org/draft-04/schema# + +type: object +required: +- name +properties: + blocking: + title: Block execution of each time-step until the arrival of new data on the + interfaces + type: boolean + duration: + examples: + - 3600.0 + title: Simulation duration [s] + type: number + log-level: + enum: + - NONE + - INFO + - DEBUG + - WARN + - ERR + title: Logging level + type: string + name: + examples: + - Simulation_1 + title: Name of log files + type: string + options: + additionalProperties: + type: number + examples: + - Ld: 0.2299 + Lq: 0.0 + title: User-definable options + type: object + scenario: + title: Scenario selection + type: integer + solver-domain: + enum: + - SP + - DP + - EMT + title: Domain of solver + type: string + solver-type: + enum: + - NRP + - MNA + title: Type of solver + type: string + start-at: + description: The date must be given as an ISO8601 formatted string + examples: + - '2004-06-14T23:34:30' + format: date-time + title: Start time of real-time simulation + type: string + start-in: + title: Start simulation relative to current time [s] + type: number + start-synch: + title: Sychronize start of simulation with external interfaces + type: boolean + steady-init: + title: Perform a steady-state initialization prior to the simulation + type: boolean + system-freq: + examples: + - 50.0 + - 60.0 + title: System frequency [Hz] + type: number + timestep: + examples: + - 5.0e-05 + title: Simulation time-step [s] + type: number diff --git a/villas/controller/schemas/simulator/dummy/__init__.py b/villas/controller/schemas/simulator/dummy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/villas/controller/schemas/simulator/dummy/start.yaml b/villas/controller/schemas/simulator/dummy/start.yaml new file mode 100644 index 0000000..6da689e --- /dev/null +++ b/villas/controller/schemas/simulator/dummy/start.yaml @@ -0,0 +1,12 @@ +--- +$schema: http://json-schema.org/draft-07/schema + +type: object +required: +- runtime +properties: + runtime: + type: number + default: 1.0 + description: The runtime of the simulation in seconds + example: 3.0 diff --git a/villas/controller/util.py b/villas/controller/util.py new file mode 100644 index 0000000..a51e144 --- /dev/null +++ b/villas/controller/util.py @@ -0,0 +1,18 @@ +from copy import deepcopy +import collections + + +def merge(dict1, dict2): + ''' Return a new dictionary by merging two dictionaries recursively. ''' + + result = deepcopy(dict1) + + for key, value in dict2.items(): + if isinstance(value, collections.Mapping): + result[key] = merge(result.get(key, {}), value) + elif value is None: + del result[key] + else: + result[key] = deepcopy(dict2[key]) + + return result