Skip to content

Commit

Permalink
feat: introduce job system
Browse files Browse the repository at this point in the history
Refs #128312
  • Loading branch information
eray-inuits committed Aug 16, 2024
1 parent f02b5eb commit 99f60f5
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/elody/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ class NonUniqueException(Exception):
class NotFoundException(Exception):
pass


class NoTenantException(Exception):
pass


class UnsupportedVersionException(Exception):
pass
71 changes: 71 additions & 0 deletions src/elody/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from elody.object_configurations.job_configuration import JobConfiguration


_config = JobConfiguration()
_create = _config.crud()["creator"]
_post_crud_hook = _config.crud()["post_crud_hook"]


def start_job(name, type, *, get_rabbit, parent_id=None, get_user_context=None) -> str:
job = _create(
{
"metadata": [
{"key": "name", "value": name},
{"key": "status", "value": "running"},
{"key": "type", "value": type},
],
"relations": (
[{"key": parent_id, "type": "hasParentJob"}] if parent_id else []
),
"type": "job",
},
get_user_context=get_user_context,
)
_post_crud_hook(
crud="create", document=job, parent_id=parent_id, get_rabbit=get_rabbit
)
return job["_id"]


def finish_job(
id,
id_of_document_job_was_initiated_for=None,
type_of_document_job_was_initiated_for=None,
*,
get_rabbit
):
document = {
"id": id,
"patch": {
"metadata": [{"key": "status", "value": "finished"}],
"relations": (
[{"key": id_of_document_job_was_initiated_for, "type": "isJobOf"}]
if id_of_document_job_was_initiated_for
else []
),
},
}
_post_crud_hook(crud="update", document=document, get_rabbit=get_rabbit)

if id_of_document_job_was_initiated_for and type_of_document_job_was_initiated_for:
document = {
"document_info_job_was_initiated_for": {
"id": id_of_document_job_was_initiated_for,
"type": type_of_document_job_was_initiated_for,
},
"patch": {"relations": [{"key": id, "type": "hasJob"}]},
}
_post_crud_hook(crud="update", document=document, get_rabbit=get_rabbit)


def fail_job(id, exception_message, *, get_rabbit):
document = {
"id": id,
"patch": {
"metadata": [
{"key": "info", "value": exception_message},
{"key": "status", "value": "failed"},
]
},
}
_post_crud_hook(crud="update", document=document, get_rabbit=get_rabbit)
Empty file added src/elody/migration/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions src/elody/migration/base_object_migrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class BaseObjectMigrator:
def __init__(self, *, status, silent=False):
self._status = status
self._silent = silent

@property
def status(self):
return self._status

@property
def silent(self):
return self._silent

def bulk_migrate(self, *, dry_run=False): # pyright: ignore
pass

def lazy_migrate(self, item, *, dry_run=False): # pyright: ignore
return item
Empty file.
135 changes: 135 additions & 0 deletions src/elody/object_configurations/base_object_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from abc import ABC, abstractmethod
from copy import deepcopy
from elody.migration.base_object_migrator import BaseObjectMigrator


class BaseObjectConfiguration(ABC):
SCHEMA_TYPE = "elody"
SCHEMA_VERSION = 1

@abstractmethod
def crud(self):
return {
"collection": "",
"collection_history": "",
"creator": lambda post_body, **kwargs: post_body, # pyright: ignore
"nested_matcher_builder": lambda object_lists, keys_info, value: self.__build_nested_matcher(
object_lists, keys_info, value
),
"post_crud_hook": lambda **kwargs: None, # pyright: ignore
"pre_crud_hook": lambda **kwargs: None, # pyright: ignore
}

@abstractmethod
def document_info(self):
return {"object_lists": {"metadata": "key", "relations": "type"}}

@abstractmethod
def logging(self, flat_document, **kwargs):
info_labels = {
"uuid": flat_document.get("_id"),
"type": flat_document.get("type"),
"schema": f"{flat_document.get('schema.type')}:{flat_document.get('schema.version')}",
}
try:
user_context = kwargs.get("get_user_context")() # pyright: ignore
info_labels["http_method"] = user_context.bag.get("http_method")
info_labels["requested_endpoint"] = user_context.bag.get(
"requested_endpoint"
)
info_labels["full_path"] = user_context.bag.get("full_path")
info_labels["preferred_username"] = user_context.preferred_username
info_labels["email"] = user_context.email
info_labels["user_roles"] = ", ".join(user_context.x_tenant.roles)
info_labels["x_tenant"] = user_context.x_tenant.id
except Exception:
pass
return {"info_labels": info_labels, "loki_indexed_info_labels": {}}

@abstractmethod
def migration(self):
return BaseObjectMigrator(status="disabled")

@abstractmethod
def serialization(self, from_format, to_format):
def serializer(document, **_):
return document

return serializer

@abstractmethod
def validation(self):
def validator(http_method, content, **_): # pyright: ignore
pass

return "function", validator

def _get_merged_post_body(self, post_body, document_defaults, object_list_name):
key = self.document_info()["object_lists"][object_list_name]
post_body[object_list_name] = self.__merge_object_lists(
document_defaults.get(object_list_name, []),
post_body.get(object_list_name, []),
key,
)
return post_body

def _sanitize_document(self, document, object_list_name, value_field_name):
object_list = deepcopy(document[object_list_name])
for element in object_list:
if not element[value_field_name]:
document[object_list_name].remove(element)

def _sort_document_keys(self, document):
def sort_keys(data):
if isinstance(data, dict):
sorted_items = {key: data.pop(key) for key in sorted(data.keys())}
for key, value in sorted_items.items():
data[key] = sort_keys(value)
return data
elif isinstance(data, list):
if all(isinstance(i, str) for i in data):
data.sort()
return data
else:
for index, item in enumerate(data):
data[index] = sort_keys(item)
return data
else:
return data

for key, value in self.document_info()["object_lists"].items():
document[key] = sorted(document[key], key=lambda property: property[value])
sort_keys(document)

def __build_nested_matcher(self, object_lists, keys_info, value, index=0):
if index == 0 and not any(info["is_object_list"] for info in keys_info):
if value in ["ANY_MATCH", "NONE_MATCH"]:
value = {"$exists": value == "ANY_MATCH"}
return {".".join(info["key"] for info in keys_info): value}

info = keys_info[index]

if info["is_object_list"]:
nested_matcher = self.__build_nested_matcher(
object_lists, keys_info, value, index + 1
)
elem_match = {
"$elemMatch": {
object_lists[info["key"]]: info["object_key"],
keys_info[index + 1]["key"]: nested_matcher,
}
}
if value in ["ANY_MATCH", "NONE_MATCH"]:
del elem_match["$elemMatch"][keys_info[index + 1]["key"]]
if value == "NONE_MATCH":
return {"NOR_MATCHER": {info["key"]: {"$all": [elem_match]}}}
return elem_match if index > 0 else {info["key"]: {"$all": [elem_match]}}

return value

def __merge_object_lists(self, source, target, key):
for target_item in target:
for source_item in source:
if source_item[key] == target_item[key]:
source.remove(source_item)
return [*source, *target]
102 changes: 102 additions & 0 deletions src/elody/object_configurations/elody_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from datetime import datetime, timezone
from elody.object_configurations.base_object_configuration import (
BaseObjectConfiguration,
)
from elody.util import flatten_dict
from uuid import uuid4


class ElodyObjectConfiguration(BaseObjectConfiguration):
SCHEMA_TYPE = "elody"
SCHEMA_VERSION = 1

def crud(self):
crud = {
"collection": "entities",
"collection_history": "history",
"creator": lambda post_body, **kwargs: self._creator(post_body, **kwargs),
"post_crud_hook": lambda **kwargs: self._post_crud_hook(**kwargs),
"pre_crud_hook": lambda **kwargs: self._pre_crud_hook(**kwargs),
}
return {**super().crud(), **crud}

def document_info(self):
return {"object_lists": {"metadata": "key", "relations": "type"}}

def logging(self, flat_document, **kwargs):
return super().logging(flat_document, **kwargs)

def migration(self):
return super().migration()

def serialization(self, from_format, to_format):
return super().serialization(from_format, to_format)

def validation(self):
return super().validation()

def _creator(
self,
post_body,
*,
get_user_context,
flat_post_body={},
document_defaults={},
):
if not flat_post_body:
flat_post_body = flatten_dict(
self.document_info()["object_lists"], post_body
)
_id = document_defaults.get("_id", str(uuid4()))

identifiers = []
for property in self.document_info().get("identifier_properties", []):
if identifier := flat_post_body.get(f"metadata.{property}.value"):
identifiers.append(identifier)

template = {
"_id": _id,
"computed_values": {
"created_at": datetime.now(timezone.utc),
"event": "create",
},
"identifiers": list(
set([_id, *identifiers, *document_defaults.pop("identifiers", [])])
),
"metadata": [],
"relations": [],
"schema": {"type": self.SCHEMA_TYPE, "version": self.SCHEMA_VERSION},
}
if email := self.__get_email(get_user_context):
template["computed_values"]["created_by"] = email

for key in self.document_info()["object_lists"].keys():
post_body = self._get_merged_post_body(post_body, document_defaults, key)
document = {**template, **document_defaults, **post_body}

self._sanitize_document(document, "metadata", "value")
self._sort_document_keys(document)
return document

def _post_crud_hook(self, **_):
pass

def _pre_crud_hook(self, *, crud, document={}, get_user_context=None, **_):
if document:
self._sanitize_document(document, "metadata", "value")
self.__patch_document_computed_values(
crud, document, get_user_context=get_user_context
)
self._sort_document_keys(document)

def __get_email(self, get_user_context):
try:
return get_user_context().email
except Exception:
return None

def __patch_document_computed_values(self, crud, document, **kwargs):
document["computed_values"].update({"event": crud})
document["computed_values"].update({"modified_at": datetime.now(timezone.utc)})
if email := self.__get_email(kwargs.get("get_user_context")):
document["computed_values"].update({"modified_by": email})
Loading

0 comments on commit 99f60f5

Please sign in to comment.