-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
511 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from django.http import Http404 | ||
from rest_framework import routers | ||
from rest_framework.viewsets import ReadOnlyModelViewSet | ||
from apis_core.apis_history.serializers import HistoryLogSerializer | ||
from django.contrib.contenttypes.models import ContentType | ||
from drf_spectacular.openapi import OpenApiParameter, OpenApiTypes | ||
from drf_spectacular.utils import extend_schema | ||
from rest_framework import viewsets, mixins | ||
from apis_core.apis_history.utils import get_history | ||
|
||
|
||
class GenericHistoryLogs(viewsets.GenericViewSet, mixins.ListModelMixin): | ||
serializer_class = HistoryLogSerializer | ||
|
||
def get_queryset(self): | ||
id = self.request.query_params.get("id", None) | ||
model = self.request.query_params.get("entity_type", None) | ||
if id is None: | ||
raise Http404("No id provided") | ||
if model is None: | ||
raise Http404("No model provided") | ||
return get_history(model, id) | ||
|
||
@extend_schema( | ||
parameters=[ | ||
OpenApiParameter("id", OpenApiTypes.INT, OpenApiParameter.QUERY), | ||
OpenApiParameter("entity_type", OpenApiTypes.STR, OpenApiParameter.QUERY), | ||
], | ||
) | ||
def list(self, request, *args, **kwargs): | ||
return super().list(request, *args, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from django.apps import AppConfig | ||
|
||
|
||
class HistoryConfig(AppConfig): | ||
name = "apis_core.apis_history" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
from collections.abc import Iterable | ||
from typing import Any | ||
import pytz | ||
from simple_history.models import HistoricalRecords | ||
from django.core.exceptions import AppRegistryNotReady | ||
import inspect | ||
import django | ||
from django.db import models | ||
from datetime import datetime | ||
from simple_history import utils | ||
from django.db.models import Q | ||
|
||
# from apis_core.apis_history.mixins import TempTripleHistoryMixin | ||
# from apis_core.apis_relations.models import TempTriple | ||
|
||
|
||
class APISHistoricalRecords(HistoricalRecords): | ||
def get_m2m_fields_from_model(self, model): | ||
# Change the original simple history function to also return m2m fields | ||
m2m_fields = [] | ||
try: | ||
for field in inspect.getmembers(model): | ||
if isinstance( | ||
field[1], | ||
django.db.models.fields.related_descriptors.ManyToManyDescriptor, | ||
): | ||
m2m_fields.append(getattr(model, field[0]).field) | ||
except AppRegistryNotReady: | ||
pass | ||
return m2m_fields | ||
|
||
def get_prev_record(self): | ||
""" | ||
Get the previous history record for the instance. `None` if first. | ||
""" | ||
history = utils.get_history_manager_from_history(self) | ||
return ( | ||
history.filter(history_date__lt=self.history_date) | ||
.order_by("history_date") | ||
.first() | ||
) | ||
|
||
|
||
class APISHistoryTableBase(models.Model): | ||
version_tag = models.CharField(max_length=255, blank=True, null=True) | ||
|
||
class Meta: | ||
abstract = True | ||
|
||
def get_triples_for_version(self): | ||
from apis_core.apis_relations.models import TempTriple | ||
|
||
triples = TempTriple.history.as_of(self.history_date).filter( | ||
Q(subj=self.instance) | Q(obj=self.instance) | ||
) | ||
return triples | ||
|
||
|
||
class VersionMixin(models.Model): | ||
history = APISHistoricalRecords( | ||
inherit=True, | ||
bases=[ | ||
APISHistoryTableBase, | ||
], | ||
) | ||
__history_date = None | ||
|
||
@property | ||
def _history_date(self): | ||
return self.__history_date | ||
|
||
@_history_date.setter | ||
def _history_date(self, value): | ||
self.__history_date = value | ||
|
||
class Meta: | ||
abstract = True | ||
|
||
def save(self, *args, **kwargs) -> None: | ||
if self._history_date is None: | ||
self._history_date = datetime.now() | ||
return super().save(*args, **kwargs) | ||
|
||
def delete(self, *args, **kwargs) -> tuple[int, dict[str, int]]: | ||
if self._history_date is None: | ||
self._history_date = datetime.now() | ||
return super().delete(*args, **kwargs) | ||
|
||
def __init__(self, *args: Any, **kwargs: Any) -> None: | ||
super().__init__(*args, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
from rest_framework import serializers | ||
from rest_framework.fields import empty | ||
from django.core.exceptions import FieldDoesNotExist | ||
|
||
|
||
class ModelChangeSerializer(serializers.Serializer): | ||
field = serializers.CharField() | ||
old = serializers.SerializerMethodField(method_name="get_field_data_old") | ||
new = serializers.SerializerMethodField(method_name="get_field_data_new") | ||
|
||
def get_field_data(self, obj): | ||
try: | ||
self._parent_object._meta.get_field(obj["field"]) | ||
return obj["value"] | ||
except FieldDoesNotExist: | ||
for field in self._parent_object._history_m2m_fields: | ||
if field.attname == obj["field"]: | ||
sst = [ | ||
str(obj2) | ||
for obj2 in field.related_model.objects.filter( | ||
pk__in=[obj3[obj["field"]] for obj3 in obj["value"]] | ||
) | ||
] | ||
return " | ".join(sst) | ||
return obj | ||
|
||
def get_field_data_new(self, obj): | ||
repr = self.get_field_data({"field": obj.field, "value": obj.new}) | ||
return repr | ||
|
||
def get_field_data_old(self, obj): | ||
repr = self.get_field_data({"field": obj.field, "value": obj.old}) | ||
return repr | ||
|
||
def __init__(self, instance=None, parent_object=None, **kwargs): | ||
self._parent_object = parent_object | ||
super().__init__(instance, **kwargs) | ||
|
||
|
||
class HistoryLogSerializer(serializers.Serializer): | ||
diff = serializers.SerializerMethodField() | ||
timestamp = serializers.DateTimeField(source="history_date") | ||
version_tag = serializers.CharField() | ||
user = serializers.CharField(source="history_user") | ||
|
||
def get_diff(self, obj): | ||
if obj.prev_record is None: | ||
return [] | ||
diff = obj.diff_against(obj.prev_record) | ||
changed_fields = [] | ||
changes = [] | ||
for change in diff.changes: | ||
if change.new == "" and change.old == None: | ||
continue | ||
changed_fields.append(change.field) | ||
changes.append(ModelChangeSerializer(change, obj).data) | ||
return {"changed_fields": changed_fields, "changes": changes} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from apis_core.utils.caching import get_entity_class_of_name | ||
from django.db.models.query import QuerySet | ||
|
||
|
||
def get_history(model_class: str, id: int) -> QuerySet: | ||
"""Get history of a model instance. | ||
Args: | ||
model_class (str): Model class name. | ||
id (int): Model instance id. | ||
Returns: | ||
QuerySet: Queryset of history of a model instance. | ||
""" | ||
cls = get_entity_class_of_name(model_class) | ||
exclude = [] | ||
qs = cls.history.filter(id=id) | ||
for q in qs: | ||
id1 = q.history_id | ||
ts = q.history_date | ||
exclude.extend( | ||
cls.history.filter(history_id__lt=id1, history_date=ts).values_list( | ||
"history_id", flat=True | ||
) | ||
) | ||
return qs.exclude(history_id__in=exclude) |
Oops, something went wrong.