From 99f60f5e633785a7f037932c33dec6c6c2750c19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eray=20=C3=96zcan?= Date: Fri, 16 Aug 2024 16:46:22 +0200 Subject: [PATCH] feat: introduce job system Refs #128312 --- src/elody/exceptions.py | 2 + src/elody/job.py | 71 +++++++++ src/elody/migration/__init__.py | 0 src/elody/migration/base_object_migrator.py | 18 +++ src/elody/object_configurations/__init__.py | 0 .../base_object_configuration.py | 135 ++++++++++++++++++ .../elody_configuration.py | 102 +++++++++++++ .../job_configuration.py | 59 ++++++++ 8 files changed, 387 insertions(+) create mode 100644 src/elody/job.py create mode 100644 src/elody/migration/__init__.py create mode 100644 src/elody/migration/base_object_migrator.py create mode 100644 src/elody/object_configurations/__init__.py create mode 100644 src/elody/object_configurations/base_object_configuration.py create mode 100644 src/elody/object_configurations/elody_configuration.py create mode 100644 src/elody/object_configurations/job_configuration.py diff --git a/src/elody/exceptions.py b/src/elody/exceptions.py index 6e422a8..5e13548 100644 --- a/src/elody/exceptions.py +++ b/src/elody/exceptions.py @@ -37,8 +37,10 @@ class NonUniqueException(Exception): class NotFoundException(Exception): pass + class NoTenantException(Exception): pass + class UnsupportedVersionException(Exception): pass diff --git a/src/elody/job.py b/src/elody/job.py new file mode 100644 index 0000000..2cf7a62 --- /dev/null +++ b/src/elody/job.py @@ -0,0 +1,71 @@ +from elody.object_configurations.job_configuration import JobConfiguration + + +_config = JobConfiguration() +_create = _config.crud()["creator"] +_post_crud_hook = _config.crud()["post_crud_hook"] + + +def start_job(name, type, *, get_rabbit, parent_id=None, get_user_context=None) -> str: + job = _create( + { + "metadata": [ + {"key": "name", "value": name}, + {"key": "status", "value": "running"}, + {"key": "type", "value": type}, + ], + "relations": ( + [{"key": parent_id, "type": "hasParentJob"}] if parent_id else [] + ), + "type": "job", + }, + get_user_context=get_user_context, + ) + _post_crud_hook( + crud="create", document=job, parent_id=parent_id, get_rabbit=get_rabbit + ) + return job["_id"] + + +def finish_job( + id, + id_of_document_job_was_initiated_for=None, + type_of_document_job_was_initiated_for=None, + *, + get_rabbit +): + document = { + "id": id, + "patch": { + "metadata": [{"key": "status", "value": "finished"}], + "relations": ( + [{"key": id_of_document_job_was_initiated_for, "type": "isJobOf"}] + if id_of_document_job_was_initiated_for + else [] + ), + }, + } + _post_crud_hook(crud="update", document=document, get_rabbit=get_rabbit) + + if id_of_document_job_was_initiated_for and type_of_document_job_was_initiated_for: + document = { + "document_info_job_was_initiated_for": { + "id": id_of_document_job_was_initiated_for, + "type": type_of_document_job_was_initiated_for, + }, + "patch": {"relations": [{"key": id, "type": "hasJob"}]}, + } + _post_crud_hook(crud="update", document=document, get_rabbit=get_rabbit) + + +def fail_job(id, exception_message, *, get_rabbit): + document = { + "id": id, + "patch": { + "metadata": [ + {"key": "info", "value": exception_message}, + {"key": "status", "value": "failed"}, + ] + }, + } + _post_crud_hook(crud="update", document=document, get_rabbit=get_rabbit) diff --git a/src/elody/migration/__init__.py b/src/elody/migration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/elody/migration/base_object_migrator.py b/src/elody/migration/base_object_migrator.py new file mode 100644 index 0000000..9411934 --- /dev/null +++ b/src/elody/migration/base_object_migrator.py @@ -0,0 +1,18 @@ +class BaseObjectMigrator: + def __init__(self, *, status, silent=False): + self._status = status + self._silent = silent + + @property + def status(self): + return self._status + + @property + def silent(self): + return self._silent + + def bulk_migrate(self, *, dry_run=False): # pyright: ignore + pass + + def lazy_migrate(self, item, *, dry_run=False): # pyright: ignore + return item diff --git a/src/elody/object_configurations/__init__.py b/src/elody/object_configurations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/elody/object_configurations/base_object_configuration.py b/src/elody/object_configurations/base_object_configuration.py new file mode 100644 index 0000000..966b491 --- /dev/null +++ b/src/elody/object_configurations/base_object_configuration.py @@ -0,0 +1,135 @@ +from abc import ABC, abstractmethod +from copy import deepcopy +from elody.migration.base_object_migrator import BaseObjectMigrator + + +class BaseObjectConfiguration(ABC): + SCHEMA_TYPE = "elody" + SCHEMA_VERSION = 1 + + @abstractmethod + def crud(self): + return { + "collection": "", + "collection_history": "", + "creator": lambda post_body, **kwargs: post_body, # pyright: ignore + "nested_matcher_builder": lambda object_lists, keys_info, value: self.__build_nested_matcher( + object_lists, keys_info, value + ), + "post_crud_hook": lambda **kwargs: None, # pyright: ignore + "pre_crud_hook": lambda **kwargs: None, # pyright: ignore + } + + @abstractmethod + def document_info(self): + return {"object_lists": {"metadata": "key", "relations": "type"}} + + @abstractmethod + def logging(self, flat_document, **kwargs): + info_labels = { + "uuid": flat_document.get("_id"), + "type": flat_document.get("type"), + "schema": f"{flat_document.get('schema.type')}:{flat_document.get('schema.version')}", + } + try: + user_context = kwargs.get("get_user_context")() # pyright: ignore + info_labels["http_method"] = user_context.bag.get("http_method") + info_labels["requested_endpoint"] = user_context.bag.get( + "requested_endpoint" + ) + info_labels["full_path"] = user_context.bag.get("full_path") + info_labels["preferred_username"] = user_context.preferred_username + info_labels["email"] = user_context.email + info_labels["user_roles"] = ", ".join(user_context.x_tenant.roles) + info_labels["x_tenant"] = user_context.x_tenant.id + except Exception: + pass + return {"info_labels": info_labels, "loki_indexed_info_labels": {}} + + @abstractmethod + def migration(self): + return BaseObjectMigrator(status="disabled") + + @abstractmethod + def serialization(self, from_format, to_format): + def serializer(document, **_): + return document + + return serializer + + @abstractmethod + def validation(self): + def validator(http_method, content, **_): # pyright: ignore + pass + + return "function", validator + + def _get_merged_post_body(self, post_body, document_defaults, object_list_name): + key = self.document_info()["object_lists"][object_list_name] + post_body[object_list_name] = self.__merge_object_lists( + document_defaults.get(object_list_name, []), + post_body.get(object_list_name, []), + key, + ) + return post_body + + def _sanitize_document(self, document, object_list_name, value_field_name): + object_list = deepcopy(document[object_list_name]) + for element in object_list: + if not element[value_field_name]: + document[object_list_name].remove(element) + + def _sort_document_keys(self, document): + def sort_keys(data): + if isinstance(data, dict): + sorted_items = {key: data.pop(key) for key in sorted(data.keys())} + for key, value in sorted_items.items(): + data[key] = sort_keys(value) + return data + elif isinstance(data, list): + if all(isinstance(i, str) for i in data): + data.sort() + return data + else: + for index, item in enumerate(data): + data[index] = sort_keys(item) + return data + else: + return data + + for key, value in self.document_info()["object_lists"].items(): + document[key] = sorted(document[key], key=lambda property: property[value]) + sort_keys(document) + + def __build_nested_matcher(self, object_lists, keys_info, value, index=0): + if index == 0 and not any(info["is_object_list"] for info in keys_info): + if value in ["ANY_MATCH", "NONE_MATCH"]: + value = {"$exists": value == "ANY_MATCH"} + return {".".join(info["key"] for info in keys_info): value} + + info = keys_info[index] + + if info["is_object_list"]: + nested_matcher = self.__build_nested_matcher( + object_lists, keys_info, value, index + 1 + ) + elem_match = { + "$elemMatch": { + object_lists[info["key"]]: info["object_key"], + keys_info[index + 1]["key"]: nested_matcher, + } + } + if value in ["ANY_MATCH", "NONE_MATCH"]: + del elem_match["$elemMatch"][keys_info[index + 1]["key"]] + if value == "NONE_MATCH": + return {"NOR_MATCHER": {info["key"]: {"$all": [elem_match]}}} + return elem_match if index > 0 else {info["key"]: {"$all": [elem_match]}} + + return value + + def __merge_object_lists(self, source, target, key): + for target_item in target: + for source_item in source: + if source_item[key] == target_item[key]: + source.remove(source_item) + return [*source, *target] diff --git a/src/elody/object_configurations/elody_configuration.py b/src/elody/object_configurations/elody_configuration.py new file mode 100644 index 0000000..6020257 --- /dev/null +++ b/src/elody/object_configurations/elody_configuration.py @@ -0,0 +1,102 @@ +from datetime import datetime, timezone +from elody.object_configurations.base_object_configuration import ( + BaseObjectConfiguration, +) +from elody.util import flatten_dict +from uuid import uuid4 + + +class ElodyObjectConfiguration(BaseObjectConfiguration): + SCHEMA_TYPE = "elody" + SCHEMA_VERSION = 1 + + def crud(self): + crud = { + "collection": "entities", + "collection_history": "history", + "creator": lambda post_body, **kwargs: self._creator(post_body, **kwargs), + "post_crud_hook": lambda **kwargs: self._post_crud_hook(**kwargs), + "pre_crud_hook": lambda **kwargs: self._pre_crud_hook(**kwargs), + } + return {**super().crud(), **crud} + + def document_info(self): + return {"object_lists": {"metadata": "key", "relations": "type"}} + + def logging(self, flat_document, **kwargs): + return super().logging(flat_document, **kwargs) + + def migration(self): + return super().migration() + + def serialization(self, from_format, to_format): + return super().serialization(from_format, to_format) + + def validation(self): + return super().validation() + + def _creator( + self, + post_body, + *, + get_user_context, + flat_post_body={}, + document_defaults={}, + ): + if not flat_post_body: + flat_post_body = flatten_dict( + self.document_info()["object_lists"], post_body + ) + _id = document_defaults.get("_id", str(uuid4())) + + identifiers = [] + for property in self.document_info().get("identifier_properties", []): + if identifier := flat_post_body.get(f"metadata.{property}.value"): + identifiers.append(identifier) + + template = { + "_id": _id, + "computed_values": { + "created_at": datetime.now(timezone.utc), + "event": "create", + }, + "identifiers": list( + set([_id, *identifiers, *document_defaults.pop("identifiers", [])]) + ), + "metadata": [], + "relations": [], + "schema": {"type": self.SCHEMA_TYPE, "version": self.SCHEMA_VERSION}, + } + if email := self.__get_email(get_user_context): + template["computed_values"]["created_by"] = email + + for key in self.document_info()["object_lists"].keys(): + post_body = self._get_merged_post_body(post_body, document_defaults, key) + document = {**template, **document_defaults, **post_body} + + self._sanitize_document(document, "metadata", "value") + self._sort_document_keys(document) + return document + + def _post_crud_hook(self, **_): + pass + + def _pre_crud_hook(self, *, crud, document={}, get_user_context=None, **_): + if document: + self._sanitize_document(document, "metadata", "value") + self.__patch_document_computed_values( + crud, document, get_user_context=get_user_context + ) + self._sort_document_keys(document) + + def __get_email(self, get_user_context): + try: + return get_user_context().email + except Exception: + return None + + def __patch_document_computed_values(self, crud, document, **kwargs): + document["computed_values"].update({"event": crud}) + document["computed_values"].update({"modified_at": datetime.now(timezone.utc)}) + if email := self.__get_email(kwargs.get("get_user_context")): + document["computed_values"].update({"modified_by": email}) diff --git a/src/elody/object_configurations/job_configuration.py b/src/elody/object_configurations/job_configuration.py new file mode 100644 index 0000000..3f652fc --- /dev/null +++ b/src/elody/object_configurations/job_configuration.py @@ -0,0 +1,59 @@ +from elody.object_configurations.elody_configuration import ( + ElodyObjectConfiguration, +) +from elody.util import send_cloudevent +from os import getenv + + +class JobConfiguration(ElodyObjectConfiguration): + SCHEMA_TYPE = "elody" + SCHEMA_VERSION = 1 + + def crud(self): + crud = {"collection": "jobs", "collection_history": ""} + return {**super().crud(), **crud} + + def document_info(self): + return super().document_info() + + def logging(self, flat_item, **kwargs): + return super().logging(flat_item, **kwargs) + + def migration(self): + return super().migration() + + def serialization(self, from_format, to_format): + return super().serialization(from_format, to_format) + + def validation(self): + return super().validation() + + def _post_crud_hook(self, *, crud, document, get_rabbit, **kwargs): + if crud == "create": + send_cloudevent( + get_rabbit(), + getenv("MQ_EXCHANGE", "dams"), + "dams.job_created", + document, + ) + if parent_id := kwargs.get("parent_id"): + send_cloudevent( + get_rabbit(), + getenv("MQ_EXCHANGE", "dams"), + "dams.job_changed", + { + "id": parent_id, + "patch": { + "relations": [ + {"key": document["_id"], "type": "isParentJobOf"} + ] + }, + }, + ) + elif crud == "update": + send_cloudevent( + get_rabbit(), + getenv("MQ_EXCHANGE", "dams"), + "dams.job_changed", + document, + )