Skip to content

Commit

Permalink
Merge branch 'master' into fix/grouper-selection
Browse files Browse the repository at this point in the history
  • Loading branch information
fsbraun authored Jan 15, 2025
2 parents 16727e0 + 59e06b9 commit e0bd858
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 59 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
python-version: '3.11'
cache: 'pip'
- name: Cache dependencies
uses: actions/cache@v4.1.2
uses: actions/cache@v4.2.0
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('docs/requirements.txt') }}
Expand All @@ -44,7 +44,7 @@ jobs:
python-version: '3.11'
cache: 'pip'
- name: Cache dependencies
uses: actions/cache@v4.1.2
uses: actions/cache@v4.2.0
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('docs/requirements.txt') }}
Expand Down
4 changes: 2 additions & 2 deletions djangocms_versioning/cms_toolbars.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def get_page_content(self, language: Optional[str] = None) -> PageContent:
return toolbar_obj
else:
# Get it from the DB
return get_latest_admin_viewable_content(self.page, language=language)
return get_latest_admin_viewable_content(self.page, language=language, include_unpublished_archived=True)

def populate(self):
self.page = self.request.current_page
Expand All @@ -335,7 +335,7 @@ def override_language_menu(self):

for code, name in get_language_tuple(self.current_site.pk):
# Get the page content, it could be draft too!
page_content = self.get_page_content(language=code)
page_content = self.page.get_admin_content(language=code)
if page_content:
url = get_object_preview_url(page_content, code)
language_menu.add_link_item(name, url=url, active=self.current_lang == code)
Expand Down
114 changes: 68 additions & 46 deletions djangocms_versioning/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def is_editable(content_obj, request):
"""Check of content_obj is editable"""
from .models import Version

return Version.objects.get_for_content(content_obj).check_modify.as_bool(request.user)
return Version.objects.get_for_content(content_obj).check_modify.as_bool(
request.user
)


def versioning_admin_factory(admin_class, mixin):
Expand Down Expand Up @@ -98,7 +100,7 @@ def register_versionadmin_proxy(versionable, admin_site=None):
warnings.warn(
f"{versionable.version_model_proxy!r} is already registered with admin.",
UserWarning,
stacklevel=2
stacklevel=2,
)
return

Expand Down Expand Up @@ -136,7 +138,9 @@ def manager_factory(manager, prefix, mixin):
def replace_manager(model, manager, mixin, **kwargs):
if hasattr(model, manager) and isinstance(getattr(model, manager), mixin):
return
original_manager = getattr(model, manager).__class__ if hasattr(model, manager) else models.Manager
original_manager = (
getattr(model, manager).__class__ if hasattr(model, manager) else models.Manager
)
manager_object = manager_factory(original_manager, "Versioned", mixin)()
for key, value in kwargs.items():
setattr(manager_object, key, value)
Expand All @@ -146,15 +150,19 @@ def replace_manager(model, manager, mixin, **kwargs):
model.add_to_class(manager, manager_object)
if manager == "objects":
# only safe the original default manager
model.add_to_class(f'_original_{"manager" if manager == "objects" else manager}', original_manager())
model.add_to_class(
f'_original_{"manager" if manager == "objects" else manager}',
original_manager(),
)


def inject_generic_relation_to_version(model):
from .models import Version

related_query_name = f"{model._meta.app_label}_{model._meta.model_name}"
model.add_to_class("versions", GenericRelation(
Version, related_query_name=related_query_name))
model.add_to_class(
"versions", GenericRelation(Version, related_query_name=related_query_name)
)
if not hasattr(model, "is_editable"):
model.add_to_class("is_editable", is_editable)

Expand Down Expand Up @@ -187,18 +195,18 @@ def nonversioned_manager(model):
def _version_list_url(versionable, **params):
proxy = versionable.version_model_proxy
return add_url_parameters(
admin_reverse(
f"{proxy._meta.app_label}_{proxy._meta.model_name}_changelist"
),
**params
admin_reverse(f"{proxy._meta.app_label}_{proxy._meta.model_name}_changelist"),
**params,
)


def version_list_url(content):
"""Returns a URL to list of content model versions,
filtered by `content`'s grouper
"""
versionable = versionables._cms_extension().versionables_by_content[content.__class__]
versionable = versionables._cms_extension().versionables_by_content[
content.__class__
]
return _version_list_url(
versionable, **versionable.grouping_values(content, relation_suffix=False)
)
Expand All @@ -208,7 +216,9 @@ def version_list_url_for_grouper(grouper):
"""Returns a URL to list of content model versions,
filtered by `grouper`
"""
versionable = versionables._cms_extension().versionables_by_grouper[grouper.__class__]
versionable = versionables._cms_extension().versionables_by_grouper[
grouper.__class__
]
return _version_list_url(
versionable, **{versionable.grouper_field_name: str(grouper.pk)}
)
Expand All @@ -235,7 +245,7 @@ def is_content_editable(placeholder, user):

def get_editable_url(content_obj, force_admin=False):
"""If the object is editable the cms editable view should be used, with the toolbar.
This method provides the URL for it.
This method provides the URL for it.
"""
if is_editable_model(content_obj.__class__) and not force_admin:
language = getattr(content_obj, "language", None)
Expand Down Expand Up @@ -264,10 +274,12 @@ def get_content_types_with_subclasses(models, using=None):
return content_types


def get_preview_url(content_obj: models.Model, language: typing.Union[str, None] = None) -> str:
def get_preview_url(
content_obj: models.Model, language: typing.Union[str, None] = None
) -> str:
"""If the object is editable the cms preview view should be used, with the toolbar.
This method provides the URL for it. It falls back the standard change view
should the object not be frontend editable.
This method provides the URL for it. It falls back the standard change view
should the object not be frontend editable.
"""
versionable = versionables.for_content(content_obj)
if versionable.preview_url:
Expand Down Expand Up @@ -300,7 +312,9 @@ def remove_published_where(queryset):
that are published are returned. If you need to return the full queryset
use the "admin_manager" instead of "objects"
"""
raise NotImplementedError("remove_published_where has been replaced by ContentObj.admin_manager")
raise NotImplementedError(
"remove_published_where has been replaced by ContentObj.admin_manager"
)


def get_latest_admin_viewable_content(
Expand All @@ -314,9 +328,15 @@ def get_latest_admin_viewable_content(
versionable = versionables.for_grouper(grouper)

# Check if all required grouping fields are given to be able to select the latest admin viewable content
missing_fields = [field for field in versionable.extra_grouping_fields if field not in extra_grouping_fields]
missing_fields = [
field
for field in versionable.extra_grouping_fields
if field not in extra_grouping_fields
]
if missing_fields:
raise ValueError(f"Grouping field(s) {missing_fields} required for {versionable.grouper_model}.")
raise ValueError(
f"Grouping field(s) {missing_fields} required for {versionable.grouper_model}."
)

# Get the name of the content_set (e.g., "pagecontent_set") from the versionable
content_set = versionable.grouper_field.remote_field.get_accessor_name()
Expand All @@ -331,10 +351,15 @@ def get_latest_admin_viewable_content(
return qs.filter(**extra_grouping_fields).current_content().first()


def get_latest_admin_viewable_page_content(page: Page, language: str) -> PageContent: # pragma: no cover
warnings.warn("get_latst_admin_viewable_page_content has ben deprecated. "
"Use get_latest_admin_viewable_content(page, language=language) instead.",
DeprecationWarning, stacklevel=2)
def get_latest_admin_viewable_page_content(
page: Page, language: str
) -> PageContent: # pragma: no cover
warnings.warn(
"get_latst_admin_viewable_page_content has ben deprecated. "
"Use get_latest_admin_viewable_content(page, language=language) instead.",
DeprecationWarning,
stacklevel=2,
)
return get_latest_admin_viewable_content(page, language=language)


Expand Down Expand Up @@ -378,14 +403,14 @@ def version_is_locked(version) -> settings.AUTH_USER_MODEL:


def version_is_unlocked_for_user(version, user: settings.AUTH_USER_MODEL) -> bool:
"""Check if lock doesn't exist for a version object or is locked to provided user.
"""
"""Check if lock doesn't exist for a version object or is locked to provided user."""
return version.locked_by is None or version.locked_by == user


def content_is_unlocked_for_user(content: models.Model, user: settings.AUTH_USER_MODEL) -> bool:
"""Check if lock doesn't exist or object is locked to provided user.
"""
def content_is_unlocked_for_user(
content: models.Model, user: settings.AUTH_USER_MODEL
) -> bool:
"""Check if lock doesn't exist or object is locked to provided user."""
try:
if hasattr(content, "prefetched_versions"):
version = content.prefetched_versions[0]
Expand All @@ -396,7 +421,9 @@ def content_is_unlocked_for_user(content: models.Model, user: settings.AUTH_USER
return True


def placeholder_content_is_unlocked_for_user(placeholder: Placeholder, user: settings.AUTH_USER_MODEL) -> bool:
def placeholder_content_is_unlocked_for_user(
placeholder: Placeholder, user: settings.AUTH_USER_MODEL
) -> bool:
"""Check if lock doesn't exist or placeholder source object
is locked to provided user.
"""
Expand All @@ -405,10 +432,7 @@ def placeholder_content_is_unlocked_for_user(placeholder: Placeholder, user: set


def send_email(
recipients: list,
subject: str,
template: str,
template_context: dict
recipients: list, subject: str, template: str, template_context: dict
) -> int:
"""
Send emails using locking templates
Expand All @@ -423,22 +447,20 @@ def send_email(
from_email=settings.DEFAULT_FROM_EMAIL,
to=recipients,
)
return message.send(
fail_silently=EMAIL_NOTIFICATIONS_FAIL_SILENTLY
)
return message.send(fail_silently=EMAIL_NOTIFICATIONS_FAIL_SILENTLY)


def get_latest_draft_version(version):
def get_latest_draft_version(version: models.Model) -> models.Model:
"""Get latest draft version of version object and caches it in the
content object"""
from djangocms_versioning.constants import DRAFT
from djangocms_versioning.models import Version

if not hasattr(version.content, "_latest_draft_version"):
drafts = (
Version.objects
.filter_by_content_grouping_values(version.content)
.filter(state=DRAFT)
)
from .models import Version

if (
not hasattr(version.content, "_latest_draft_version")
or getattr(version.content._latest_draft_version, "state", DRAFT) != DRAFT
):
drafts = Version.objects.filter_by_content_grouping_values(
version.content
).filter(state=DRAFT)
version.content._latest_draft_version = drafts.first()
return version.content._latest_draft_version
36 changes: 27 additions & 9 deletions djangocms_versioning/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,25 @@ def with_user(self, user):


class AdminQuerySetMixin:
# Annotation for latest pk of draft or published version
_DraftOrPublished = models.Max(
models.Case(
models.When(versions__state__in=(constants.DRAFT, constants.PUBLISHED),
then="versions__pk"),
default=models.Value(0),
)
)

# Annotation for latest pk of any other version
_AnyOther = models.Max(
models.Case(
models.When(
~models.Q(versions__state__in=(constants.DRAFT, constants.PUBLISHED)),
then="versions__pk"),
default=models.Value(0),
)
)

def _chain(self):
# Also clone group by key when chaining querysets!
clone = super()._chain()
Expand All @@ -65,6 +84,7 @@ def current_content(self, **kwargs):
versions or published versions (in that order). This optimized query assumes that
draft versions always have a higher pk than any other version type. This is true as long as
no other version type can be converted to draft without creating a new version."""

pk_filter = self.filter(versions__state__in=(constants.DRAFT, constants.PUBLISHED))\
.values(*self._group_by_key)\
.annotate(vers_pk=models.Max("versions__pk"))\
Expand All @@ -80,15 +100,13 @@ def latest_content(self, **kwargs):
This filter assumes that there can only be one draft created and that the draft as
the highest pk of all versions (should it exist).
"""
current = self.filter(versions__state__in=(constants.DRAFT, constants.PUBLISHED))\
.values(*self._group_by_key)\
.annotate(vers_pk=models.Max("versions__pk"))
pk_current = current.values("vers_pk")
pk_other = self.exclude(**{key + "__in": current.values(key) for key in self._group_by_key})\
.values(*self._group_by_key)\
.annotate(vers_pk=models.Max("versions__pk"))\
.values("vers_pk")
return self.filter(versions__pk__in=pk_current | pk_other, **kwargs)

latest = (self.values(*self._group_by_key)
.annotate(h1=self._DraftOrPublished, h2=self._AnyOther)
.annotate(vers_pk=models.Case(models.When(h1__gt=0, then="h1"), default="h2"))
.values("vers_pk")
)
return self.filter(versions__pk__in=latest, **kwargs)


class AdminManagerMixin:
Expand Down
52 changes: 52 additions & 0 deletions tests/test_managers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from cms.test_utils.testcases import CMSTestCase

from djangocms_versioning import constants
from djangocms_versioning.test_utils import factories
from djangocms_versioning.test_utils.polls.models import PollContent


class TestLatestContentCurrentContent(CMSTestCase):
def setUp(self):
poll1 = factories.PollFactory()
factories.PollVersionFactory(state=constants.PUBLISHED, content__language="de")

factories.PollVersionFactory(state=constants.ARCHIVED, content__poll=poll1, content__language="de")
v1 = factories.PollVersionFactory(state=constants.UNPUBLISHED, content__poll=poll1, content__language="de")
v2 = factories.PollVersionFactory(state=constants.ARCHIVED, content__poll=poll1, content__language="en")
v3 = factories.PollVersionFactory(state=constants.DRAFT, content__poll=poll1, content__language="en")
v4 = factories.PollVersionFactory(state=constants.UNPUBLISHED, content__poll=poll1, content__language="fr")

self.poll = poll1
self.poll_content1 = v1.content
self.poll_content2 = v2.content
self.poll_content3 = v3.content
self.poll_content4 = v4.content

def test_latest_content(self):
"""only one version per grouper and grouping field (language) returned."""
latest_content = PollContent.admin_manager.latest_content(poll=self.poll)
self.assertEqual(latest_content.count(), 3)
self.assertIn(self.poll_content1, latest_content)
self.assertIn(self.poll_content3, latest_content)
self.assertIn(self.poll_content4, latest_content)

def test_latest_content_by_language(self):
"""only one version per grouper and grouping field (language) returned. Additional
filter before or after latest_content() should **not** affect the result."""

latest_content = PollContent.admin_manager.latest_content().filter(poll=self.poll, language="en")
self.assertEqual(latest_content.count(), 1)
self.assertIn(self.poll_content3, latest_content)

latest_content = PollContent.admin_manager.filter(poll=self.poll, language="en").latest_content()
self.assertEqual(latest_content.count(), 1)
self.assertIn(self.poll_content3, latest_content)

latest_content = PollContent.admin_manager.latest_content().filter(poll=self.poll, language="de")
self.assertEqual(latest_content.count(), 1)
self.assertIn(self.poll_content1, latest_content)

latest_content = PollContent.admin_manager.filter(poll=self.poll, language="de").latest_content()
self.assertEqual(latest_content.count(), 1)
self.assertIn(self.poll_content1, latest_content)

0 comments on commit e0bd858

Please sign in to comment.