diff --git a/openedx_learning/__init__.py b/openedx_learning/__init__.py index aeffa12d..13243d24 100644 --- a/openedx_learning/__init__.py +++ b/openedx_learning/__init__.py @@ -1,4 +1,4 @@ """ Open edX Learning ("Learning Core"). """ -__version__ = "0.4.3" +__version__ = "0.4.4" diff --git a/openedx_tagging/core/tagging/rest_api/paginators.py b/openedx_tagging/core/tagging/rest_api/paginators.py index 2ebce12d..5831e8b0 100644 --- a/openedx_tagging/core/tagging/rest_api/paginators.py +++ b/openedx_tagging/core/tagging/rest_api/paginators.py @@ -1,14 +1,59 @@ """ Paginators uses by the REST API """ +from typing import Type from edx_rest_framework_extensions.paginators import DefaultPagination # type: ignore[import] +from rest_framework.request import Request +from rest_framework.response import Response + +from openedx_tagging.core.tagging.models import Tag, Taxonomy + +from .utils import UserPermissionsHelper # From this point, the tags begin to be paginated MAX_FULL_DEPTH_THRESHOLD = 10_000 -class TagsPagination(DefaultPagination): +class CanAddPermissionMixin(UserPermissionsHelper): # pylint: disable=abstract-method + """ + This mixin inserts a boolean "can_add_" field at the top level of the paginated response. + + The value of the field indicates whether request user may create new instances of the current model. + """ + @property + def _request(self) -> Request: + """ + Returns the current request. + """ + return self.request # type: ignore[attr-defined] + + def get_paginated_response(self, data) -> Response: + """ + Injects the user's model-level permissions into the paginated response. + """ + response_data = super().get_paginated_response(data).data # type: ignore[misc] + field_name = f"can_add_{self.model_name}" + response_data[field_name] = self.get_can_add() + return Response(response_data) + + +class TaxonomyPagination(CanAddPermissionMixin, DefaultPagination): + """ + Inserts permissions data for Taxonomies into the top level of the paginated response. + """ + page_size = 500 + max_page_size = 500 + + @property + def _model(self) -> Type: + """ + Returns the model that is being paginated. + """ + return Taxonomy + + +class TagsPagination(CanAddPermissionMixin, DefaultPagination): """ Custom pagination configuration for taxonomies with a large number of tags. Used on the get tags API view. @@ -16,8 +61,15 @@ class TagsPagination(DefaultPagination): page_size = 10 max_page_size = 300 + @property + def _model(self) -> Type: + """ + Returns the model that is being paginated. + """ + return Tag + -class DisabledTagsPagination(DefaultPagination): +class DisabledTagsPagination(CanAddPermissionMixin, DefaultPagination): """ Custom pagination configuration for taxonomies with a small number of tags. Used on the get tags API view @@ -28,3 +80,10 @@ class DisabledTagsPagination(DefaultPagination): """ page_size = MAX_FULL_DEPTH_THRESHOLD max_page_size = MAX_FULL_DEPTH_THRESHOLD + 1 + + @property + def _model(self) -> Type: + """ + Returns the model that is being paginated. + """ + return Tag diff --git a/openedx_tagging/core/tagging/rest_api/utils.py b/openedx_tagging/core/tagging/rest_api/utils.py new file mode 100644 index 00000000..ed68de50 --- /dev/null +++ b/openedx_tagging/core/tagging/rest_api/utils.py @@ -0,0 +1,109 @@ +""" +Utilities for the API +""" +from typing import Optional, Type + +from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication # type: ignore[import] +from edx_rest_framework_extensions.auth.session.authentication import ( # type: ignore[import] + SessionAuthenticationAllowInactiveUser, +) +from rest_framework.request import Request + + +def view_auth_classes(func_or_class): + """ + Function and class decorator that abstracts the authentication classes for api views. + """ + def _decorator(func_or_class): + """ + Requires either OAuth2 or Session-based authentication; + are the same authentication classes used on edx-platform + """ + func_or_class.authentication_classes = ( + JwtAuthentication, + SessionAuthenticationAllowInactiveUser, + ) + return func_or_class + return _decorator(func_or_class) + + +class UserPermissionsHelper: + """ + Provides helper methods for serializing user permissions. + """ + @property + def _request(self) -> Request: + """ + Returns the current request. + """ + raise NotImplementedError # pragma: no cover + + @property + def _model(self) -> Type: + """ + Returns the model used when checking permissions. + """ + raise NotImplementedError # pragma: no cover + + @property + def app_label(self) -> Type: + """ + Returns the app_label for the model used when checking permissions. + """ + return self._model._meta.app_label + + @property + def model_name(self) -> Type: + """ + Returns the name of the model used when checking permissions. + """ + return self._model._meta.model_name + + def _get_permission_name(self, action: str) -> str: + """ + Returns the fully-qualified permission name corresponding to the current model and `action`. + """ + assert action in ("add", "view", "change", "delete") + return f'{self.app_label}.{action}_{self.model_name}' + + def _can(self, perm_name: str, instance=None) -> Optional[bool]: + """ + Does the current `request.user` have the given `perm` on the `instance` object? + + Returns None if no permissions were requested. + Returns True if they may. + Returns False if they may not. + """ + request = self._request + assert request and request.user + return request.user.has_perm(perm_name, instance) + + def get_can_add(self, _instance=None) -> Optional[bool]: + """ + Returns True if the current user is allowed to add new instances. + + Note: we omit the actual instance from the permissions check; most tagging models prefer this. + """ + perm_name = self._get_permission_name('add') + return self._can(perm_name) + + def get_can_view(self, instance) -> Optional[bool]: + """ + Returns True if the current user is allowed to view/see this instance. + """ + perm_name = self._get_permission_name('view') + return self._can(perm_name, instance) + + def get_can_change(self, instance) -> Optional[bool]: + """ + Returns True if the current user is allowed to edit/change this instance. + """ + perm_name = self._get_permission_name('change') + return self._can(perm_name, instance) + + def get_can_delete(self, instance) -> Optional[bool]: + """ + Returns True if the current user is allowed to delete this instance. + """ + perm_name = self._get_permission_name('change') + return self._can(perm_name, instance) diff --git a/openedx_tagging/core/tagging/rest_api/v1/serializers.py b/openedx_tagging/core/tagging/rest_api/v1/serializers.py index a875c98b..a3af0bbb 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/serializers.py +++ b/openedx_tagging/core/tagging/rest_api/v1/serializers.py @@ -3,15 +3,19 @@ """ from __future__ import annotations -from typing import Any +from typing import Any, Type from urllib.parse import urlencode from rest_framework import serializers +from rest_framework.request import Request from rest_framework.reverse import reverse from openedx_tagging.core.tagging.data import TagData from openedx_tagging.core.tagging.import_export.parsers import ParserFormat from openedx_tagging.core.tagging.models import ObjectTag, Tag, TagImportTask, Taxonomy +from openedx_tagging.core.tagging.rules import ObjectTagPermissionItem + +from ..utils import UserPermissionsHelper class TaxonomyListQueryParamsSerializer(serializers.Serializer): # pylint: disable=abstract-method @@ -30,11 +34,44 @@ class TaxonomyExportQueryParamsSerializer(serializers.Serializer): # pylint: di output_format = serializers.RegexField(r"(?i)^(json|csv)$", allow_blank=False) -class TaxonomySerializer(serializers.ModelSerializer): +class UserPermissionsSerializerMixin(UserPermissionsHelper): + """ + Provides methods for serializing user permissions. + + To use this mixin: + + 1. Add it to your serializer's list of subclasses + 2. Add `can_` fields for each permission/action you want to serialize. + + and this mixin will take care of the rest. + + Notes: + * Assumes the serialized model should be used to check permissions (override _model to change). + * Requires the current request to be passed into the serializer context (override _request to change). + """ + @property + def _model(self) -> Type: + """ + Returns the model that is being serialized + """ + return self.Meta.model # type: ignore[attr-defined] + + @property + def _request(self) -> Request: + """ + Returns the current request from the serialize context. + """ + return self.context.get('request') # type: ignore[attr-defined] + + +class TaxonomySerializer(UserPermissionsSerializerMixin, serializers.ModelSerializer): """ Serializer for the Taxonomy model. """ tags_count = serializers.SerializerMethodField() + can_change_taxonomy = serializers.SerializerMethodField(method_name='get_can_change') + can_delete_taxonomy = serializers.SerializerMethodField(method_name='get_can_delete') + can_tag_object = serializers.SerializerMethodField() class Meta: model = Taxonomy @@ -48,6 +85,9 @@ class Meta: "system_defined", "visible_to_authors", "tags_count", + "can_change_taxonomy", + "can_delete_taxonomy", + "can_tag_object", ] def to_representation(self, instance): @@ -60,6 +100,16 @@ def to_representation(self, instance): def get_tags_count(self, instance): return instance.tag_set.count() + def get_can_tag_object(self, instance) -> bool | None: + """ + Returns True if the current request user may create object tags on this taxonomy. + + (The object_id test is necessarily skipped because we don't have an object_id to check.) + """ + perm_name = f'{self.app_label}.can_tag_object' + perm_object = ObjectTagPermissionItem(taxonomy=instance, object_id="") + return self._can(perm_name, perm_object) + class ObjectTagListQueryParamsSerializer(serializers.Serializer): # pylint: disable=abstract-method """ @@ -71,16 +121,17 @@ class ObjectTagListQueryParamsSerializer(serializers.Serializer): # pylint: dis ) -class ObjectTagMinimalSerializer(serializers.ModelSerializer): +class ObjectTagMinimalSerializer(UserPermissionsSerializerMixin, serializers.ModelSerializer): """ Minimal serializer for the ObjectTag model. """ class Meta: model = ObjectTag - fields = ["value", "lineage"] + fields = ["value", "lineage", "can_delete_objecttag"] lineage = serializers.ListField(child=serializers.CharField(), source="get_lineage", read_only=True) + can_delete_objecttag = serializers.SerializerMethodField(method_name='get_can_delete') class ObjectTagSerializer(ObjectTagMinimalSerializer): @@ -98,14 +149,18 @@ class Meta: ] -class ObjectTagsByTaxonomySerializer(serializers.ModelSerializer): +class ObjectTagsByTaxonomySerializer(UserPermissionsSerializerMixin, serializers.ModelSerializer): """ Serialize a group of ObjectTags, grouped by taxonomy """ + class Meta: + model = ObjectTag + def to_representation(self, instance: list[ObjectTag]) -> dict: """ Convert this list of ObjectTags to the serialized dictionary, grouped by Taxonomy """ + can_tag_object_perm = f"{self.app_label}.can_tag_object" by_object: dict[str, dict[str, Any]] = {} for obj_tag in instance: if obj_tag.object_id not in by_object: @@ -118,11 +173,11 @@ def to_representation(self, instance: list[ObjectTag]) -> dict: tax_entry = { "name": obj_tag.name, "taxonomy_id": obj_tag.taxonomy_id, - "editable": (not obj_tag.taxonomy.cast().system_defined) if obj_tag.taxonomy else False, + "can_tag_object": self._can(can_tag_object_perm, obj_tag), "tags": [] } taxonomies.append(tax_entry) - tax_entry["tags"].append(ObjectTagMinimalSerializer(obj_tag).data) + tax_entry["tags"].append(ObjectTagMinimalSerializer(obj_tag, context=self.context).data) return by_object @@ -144,7 +199,7 @@ class ObjectTagUpdateQueryParamsSerializer(serializers.Serializer): # pylint: d ) -class TagDataSerializer(serializers.Serializer): +class TagDataSerializer(UserPermissionsSerializerMixin, serializers.Serializer): # pylint: disable=abstract-method """ Serializer for TagData dicts. Also can serialize Tag instances. @@ -161,6 +216,8 @@ class TagDataSerializer(serializers.Serializer): _id = serializers.IntegerField(allow_null=True) sub_tags_url = serializers.SerializerMethodField() + can_change_tag = serializers.SerializerMethodField() + can_delete_tag = serializers.SerializerMethodField() def get_sub_tags_url(self, obj: TagData | Tag): """ @@ -184,6 +241,30 @@ def get_sub_tags_url(self, obj: TagData | Tag): return request.build_absolute_uri(url) return None + @property + def _model(self) -> Type: + """ + Returns the model used when checking permissions. + """ + return Tag + + def get_can_change_tag(self, _instance) -> bool | None: + """ + Returns True if the current user is allowed to edit/change this Tag instance. + + Because we're serializing TagData (not Tags), the view stores these permissions in the serializer + context. + """ + return self.context.get('can_change_tag') + + def get_can_delete_tag(self, _instance) -> bool | None: + """ + Returns True if the current user is allowed to delete this Tag instance. + + Because we're serializing TagData (not Tags), the view stores these permissions in the serializer + """ + return self.context.get('can_delete_tag') + def to_representation(self, instance: TagData | Tag) -> dict: """ Convert this TagData (or Tag model instance) to the serialized dictionary @@ -194,12 +275,6 @@ def to_representation(self, instance: TagData | Tag) -> dict: data["parent_value"] = instance.parent.value if instance.parent else None return data - def update(self, instance, validated_data): - raise RuntimeError('`update()` is not supported by the TagData serializer.') - - def create(self, validated_data): - raise RuntimeError('`create()` is not supported by the TagData serializer.') - class TaxonomyTagCreateBodySerializer(serializers.Serializer): # pylint: disable=abstract-method """ @@ -273,7 +348,7 @@ class Meta: ] -class TaxonomyImportPlanResponseSerializer(serializers.Serializer): +class TaxonomyImportPlanResponseSerializer(serializers.Serializer): # pylint: disable=abstract-method """ Serializer for the response of the Taxonomy Import Plan request """ @@ -290,9 +365,3 @@ def get_plan(self, obj): return plan.plan() return None - - def update(self, instance, validated_data): - raise RuntimeError('`update()` is not supported by the TagImportTask serializer.') - - def create(self, validated_data): - raise RuntimeError('`create()` is not supported by the TagImportTask serializer.') diff --git a/openedx_tagging/core/tagging/rest_api/v1/utils.py b/openedx_tagging/core/tagging/rest_api/v1/utils.py deleted file mode 100644 index 0667fb3b..00000000 --- a/openedx_tagging/core/tagging/rest_api/v1/utils.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Utilities for the API -""" -from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication # type: ignore[import] -from edx_rest_framework_extensions.auth.session.authentication import ( # type: ignore[import] - SessionAuthenticationAllowInactiveUser, -) - - -def view_auth_classes(func_or_class): - """ - Function and class decorator that abstracts the authentication classes for api views. - """ - def _decorator(func_or_class): - """ - Requires either OAuth2 or Session-based authentication; - are the same authentication classes used on edx-platform - """ - func_or_class.authentication_classes = ( - JwtAuthentication, - SessionAuthenticationAllowInactiveUser, - ) - return func_or_class - return _decorator(func_or_class) diff --git a/openedx_tagging/core/tagging/rest_api/v1/views.py b/openedx_tagging/core/tagging/rest_api/v1/views.py index 6d685d4e..3abf4f0d 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/views.py +++ b/openedx_tagging/core/tagging/rest_api/v1/views.py @@ -28,9 +28,10 @@ from ...data import TagDataQuerySet from ...import_export.api import export_tags, import_tags from ...import_export.parsers import ParserFormat -from ...models import Taxonomy +from ...models import Tag, Taxonomy from ...rules import ObjectTagPermissionItem -from ..paginators import MAX_FULL_DEPTH_THRESHOLD, DisabledTagsPagination, TagsPagination +from ..paginators import MAX_FULL_DEPTH_THRESHOLD, DisabledTagsPagination, TagsPagination, TaxonomyPagination +from ..utils import view_auth_classes from .permissions import ObjectTagObjectPermissions, TagObjectPermissions, TaxonomyObjectPermissions from .serializers import ( ObjectTagListQueryParamsSerializer, @@ -49,7 +50,6 @@ TaxonomyTagDeleteBodySerializer, TaxonomyTagUpdateBodySerializer, ) -from .utils import view_auth_classes @view_auth_classes @@ -210,6 +210,7 @@ class TaxonomyView(ModelViewSet): lookup_value_regex = r'-?\d+' serializer_class = TaxonomySerializer permission_classes = [TaxonomyObjectPermissions] + pagination_class = TaxonomyPagination def get_object(self) -> Taxonomy: """ @@ -365,8 +366,6 @@ class ObjectTagView( **Retrieve Parameters** * object_id (required): - The Object ID to retrieve ObjectTags for. - - **Retrieve Query Parameters** * taxonomy (optional) - PK of taxonomy to filter ObjectTags for. **Retrieve Example Requests** @@ -378,6 +377,27 @@ class ObjectTagView( * 400 - Invalid query parameter * 403 - Permission denied + Response: + { + ${object_id}: { + taxonomies: [ + { + taxonomy_id: str, + can_tag_object: bool, + tags: [ + { + value: str, + lineage: list[str], + can_delete_objecttag: bool, + }, + ] + }, + ... + ], + }, + ... + } + **Update Parameters** * object_id (required): - The Object ID to add ObjectTags for. @@ -444,7 +464,7 @@ def retrieve(self, request, *args, **kwargs) -> Response: behavior we want. """ object_tags = self.filter_queryset(self.get_queryset()) - serializer = ObjectTagsByTaxonomySerializer(list(object_tags)) + serializer = ObjectTagsByTaxonomySerializer(list(object_tags), context=self.get_serializer_context()) response_data = serializer.data if self.kwargs["object_id"] not in response_data: # For consistency, the key with the object_id should always be present in the response, even if there @@ -488,7 +508,7 @@ def update(self, request, *args, **kwargs) -> Response: taxonomy = query_params.validated_data.get("taxonomy", None) taxonomy = taxonomy.cast() - perm = "oel_tagging.change_objecttag" + perm = "oel_tagging.can_tag_object" object_id = kwargs.pop('object_id') perm_obj = ObjectTagPermissionItem( @@ -600,6 +620,24 @@ class TaxonomyTagsView(ListAPIView, RetrieveUpdateDestroyAPIView): * 403 - Permission denied * 404 - Taxonomy not found + Response: + + can_add_tag: bool, + results: [ + { + value: str, + external_id: str, + child_count: int, + depth: int, + parent_value: str, + usage_count: int, + _id: int, + sub_tags_url: str, + can_delete_tag: bool|null, + }, + ... + ], + **Create Query Parameters** * id (required) - The ID of the taxonomy to create a Tag for @@ -673,21 +711,45 @@ class TaxonomyTagsView(ListAPIView, RetrieveUpdateDestroyAPIView): pagination_class = TagsPagination serializer_class = TagDataSerializer - def get_taxonomy(self, pk: int) -> Taxonomy: + def __init__(self, *args, **kwargs): + """ + Initializes the local variables. + """ + super().__init__(*args, **kwargs) + self._taxonomy = None + + def get_taxonomy(self) -> Taxonomy: """ Get the taxonomy from `pk` or raise 404. + + The current taxonomy is cached in the view. """ - taxonomy = get_taxonomy(pk) - if not taxonomy: - raise Http404("Taxonomy not found") - self.check_object_permissions(self.request, taxonomy) - return taxonomy + if not self._taxonomy: + taxonomy_id = int(self.kwargs["pk"]) + taxonomy = get_taxonomy(taxonomy_id) + if not taxonomy: + raise Http404("Taxonomy not found") + self.check_object_permissions(self.request, taxonomy) + self._taxonomy = taxonomy + return self._taxonomy def get_serializer_context(self): + """ + Passes data from the view to the serializer. + """ + # Use our serializer to check permissions if requested -- requires a request in the context. context = super().get_serializer_context() + context['request'] = self.request + serializer = self.serializer_class(self, context=context) + + # Instead of checking permissions for each TagData instance, we just check them once for the whole taxonomy + # (since that's currently how our rules work). This might change if Tag-specific permissions are needed. + taxonomy = self.get_taxonomy() + dummy_tag = Tag(taxonomy=taxonomy) context.update({ - "request": self.request, - "taxonomy_id": int(self.kwargs["pk"]), + "taxonomy_id": taxonomy.id, + "can_change_tag": serializer.get_can_change(dummy_tag), + "can_delete_tag": serializer.get_can_delete(dummy_tag), }) return context @@ -695,8 +757,7 @@ def get_queryset(self) -> TagDataQuerySet: """ Builds and returns the queryset to be paginated. """ - taxonomy_id = int(self.kwargs.get("pk")) - taxonomy = self.get_taxonomy(taxonomy_id) + taxonomy = self.get_taxonomy() parent_tag_value = self.request.query_params.get("parent_tag", None) include_counts = "include_counts" in self.request.query_params search_term = self.request.query_params.get("search_term", None) @@ -739,8 +800,7 @@ def post(self, request, *args, **kwargs): """ Creates new Tag in Taxonomy and returns the newly created Tag. """ - pk = self.kwargs.get("pk") - taxonomy = self.get_taxonomy(pk) + taxonomy = self.get_taxonomy() body = TaxonomyTagCreateBodySerializer(data=request.data) body.is_valid(raise_exception=True) @@ -769,8 +829,7 @@ def update(self, request, *args, **kwargs): Updates a Tag that belongs to the Taxonomy and returns it. Currently only updating the Tag value is supported. """ - pk = self.kwargs.get("pk") - taxonomy = self.get_taxonomy(pk) + taxonomy = self.get_taxonomy() body = TaxonomyTagUpdateBodySerializer(data=request.data) body.is_valid(raise_exception=True) @@ -797,8 +856,7 @@ def delete(self, request, *args, **kwargs): the `with_subtags` is not set to `True` it will fail, otherwise the sub-tags will be deleted as well. """ - pk = self.kwargs.get("pk") - taxonomy = self.get_taxonomy(pk) + taxonomy = self.get_taxonomy() body = TaxonomyTagDeleteBodySerializer(data=request.data) body.is_valid(raise_exception=True) diff --git a/openedx_tagging/core/tagging/rules.py b/openedx_tagging/core/tagging/rules.py index 76139244..36f67ffe 100644 --- a/openedx_tagging/core/tagging/rules.py +++ b/openedx_tagging/core/tagging/rules.py @@ -186,6 +186,8 @@ def can_change_object_tag( rules.add_perm("oel_tagging.change_objecttag", can_change_object_tag) rules.add_perm("oel_tagging.delete_objecttag", can_change_object_tag) rules.add_perm("oel_tagging.view_objecttag", can_view_object_tag) +# If a user "can tag object", they can delete or create ObjectTags using the given Taxonomy + object_id. +rules.add_perm("oel_tagging.can_tag_object", can_change_object_tag) # Users can tag objects using tags from any taxonomy that they have permission to view rules.add_perm("oel_tagging.view_objecttag_objectid", can_view_object_tag_objectid) diff --git a/tests/openedx_tagging/core/tagging/test_views.py b/tests/openedx_tagging/core/tagging/test_views.py index 72352bcc..c2ccd320 100644 --- a/tests/openedx_tagging/core/tagging/test_views.py +++ b/tests/openedx_tagging/core/tagging/test_views.py @@ -53,6 +53,9 @@ def check_taxonomy( allow_free_text=False, system_defined=False, visible_to_authors=True, + can_change_taxonomy=None, + can_delete_taxonomy=None, + can_tag_object=None, ): """ Check taxonomy data @@ -65,6 +68,9 @@ def check_taxonomy( assert data["allow_free_text"] == allow_free_text assert data["system_defined"] == system_defined assert data["visible_to_authors"] == visible_to_authors + assert data["can_change_taxonomy"] == can_change_taxonomy + assert data["can_delete_taxonomy"] == can_delete_taxonomy + assert data["can_tag_object"] == can_tag_object class TestTaxonomyViewMixin(APITestCase): @@ -129,10 +135,10 @@ def test_list_taxonomy_queryparams(self, enabled, expected_status: int, expected @ddt.data( (None, status.HTTP_401_UNAUTHORIZED, 0), ("user", status.HTTP_200_OK, 10), - ("staff", status.HTTP_200_OK, 20), + ("staff", status.HTTP_200_OK, 20, True), ) @ddt.unpack - def test_list_taxonomy(self, user_attr: str | None, expected_status: int, tags_count: int): + def test_list_taxonomy(self, user_attr: str | None, expected_status: int, tags_count: int, is_admin=False): taxonomy = api.create_taxonomy(name="Taxonomy enabled 1", enabled=True) for i in range(tags_count): tag = Tag( @@ -150,8 +156,8 @@ def test_list_taxonomy(self, user_attr: str | None, expected_status: int, tags_c response = self.client.get(url) assert response.status_code == expected_status - # Check results - if tags_count: + # Check response + if status.is_success(expected_status): assert response.data["results"] == [ { "id": -1, @@ -163,6 +169,10 @@ def test_list_taxonomy(self, user_attr: str | None, expected_status: int, tags_c "system_defined": True, "visible_to_authors": True, "tags_count": 0, + # System taxonomy cannot be modified + "can_change_taxonomy": False, + "can_delete_taxonomy": False, + "can_tag_object": False, }, { "id": taxonomy.id, @@ -174,8 +184,15 @@ def test_list_taxonomy(self, user_attr: str | None, expected_status: int, tags_c "system_defined": False, "visible_to_authors": True, "tags_count": tags_count, + # Enabled taxonomy can be modified by taxonomy admins + "can_change_taxonomy": is_admin, + "can_delete_taxonomy": is_admin, + # can_tag_object is False because we default to not allowing users to tag arbitrary objects. + # But specific uses of this code (like content_tagging) will override this perm for their use cases. + "can_tag_object": False, }, ] + assert response.data.get("can_add_taxonomy") == is_admin def test_list_taxonomy_pagination(self) -> None: url = TAXONOMY_LIST_URL @@ -198,6 +215,55 @@ def test_list_taxonomy_pagination(self) -> None: next_page = parse_qs(parsed_url.query).get("page", [""])[0] assert next_page == "3" + @ddt.data( + ('user', False), + ('staff', True), + ) + @ddt.unpack + def test_list_taxonomy_empty(self, user_attr, expected_can_add) -> None: + # Delete the language taxonomy so we can get an empty list + language_taxonomy = api.get_taxonomy(LANGUAGE_TAXONOMY_ID) + if language_taxonomy: + language_taxonomy.delete() + + url = TAXONOMY_LIST_URL + user = getattr(self, user_attr) + self.client.force_authenticate(user=user) + response = self.client.get(url, format="json") + + assert response.status_code == status.HTTP_200_OK + assert response.data == { + "count": 0, + "current_page": 1, + "next": None, + "num_pages": 1, + "previous": None, + "start": 0, + "results": [], + "can_add_taxonomy": expected_can_add, + } + + def test_list_taxonomy_query_count(self): + """ + Test how many queries are used when retrieving taxonomies and permissions + """ + api.create_taxonomy(name="T1", enabled=True) + api.create_taxonomy(name="T2", enabled=True) + + url = TAXONOMY_LIST_URL + + self.client.force_authenticate(user=self.user) + with self.assertNumQueries(5): + response = self.client.get(url) + + assert response.status_code == 200 + assert not response.data["can_add_taxonomy"] + assert len(response.data["results"]) == 3 + for taxonomy in response.data["results"]: + assert not taxonomy["can_change_taxonomy"] + assert not taxonomy["can_delete_taxonomy"] + assert not taxonomy["can_tag_object"] + def test_list_invalid_page(self) -> None: url = TAXONOMY_LIST_URL @@ -225,6 +291,9 @@ def test_language_taxonomy(self): description="Languages that are enabled on this system.", allow_multiple=False, # We may change this in the future to allow multiple language tags system_defined=True, + can_change_taxonomy=False, + can_delete_taxonomy=False, + can_tag_object=False, ) @ddt.data( @@ -232,11 +301,13 @@ def test_language_taxonomy(self): (None, {"enabled": False}, status.HTTP_401_UNAUTHORIZED), ("user", {"enabled": True}, status.HTTP_200_OK), ("user", {"enabled": False}, status.HTTP_404_NOT_FOUND), - ("staff", {"enabled": True}, status.HTTP_200_OK), - ("staff", {"enabled": False}, status.HTTP_200_OK), + ("staff", {"enabled": True}, status.HTTP_200_OK, True), + ("staff", {"enabled": False}, status.HTTP_200_OK, True), ) @ddt.unpack - def test_detail_taxonomy(self, user_attr: str | None, taxonomy_data: dict[str, bool], expected_status: int): + def test_detail_taxonomy( + self, user_attr: str | None, taxonomy_data: dict[str, bool], expected_status: int, is_admin=False, + ): create_data = {"name": "taxonomy detail test", **taxonomy_data} taxonomy = api.create_taxonomy(**create_data) # type: ignore[arg-type] url = TAXONOMY_DETAIL_URL.format(pk=taxonomy.pk) @@ -249,7 +320,11 @@ def test_detail_taxonomy(self, user_attr: str | None, taxonomy_data: dict[str, b assert response.status_code == expected_status if status.is_success(expected_status): - check_taxonomy(response.data, taxonomy.pk, **create_data) + expected_data = create_data + expected_data["can_change_taxonomy"] = is_admin + expected_data["can_delete_taxonomy"] = is_admin + expected_data["can_tag_object"] = False + check_taxonomy(response.data, taxonomy.pk, **expected_data) def test_detail_system_taxonomy(self): url = TAXONOMY_DETAIL_URL.format(pk=LANGUAGE_TAXONOMY_ID) @@ -297,6 +372,9 @@ def test_create_taxonomy(self, user_attr: str | None, expected_status: int): # If we were able to create the taxonomy, check if it was created if status.is_success(expected_status): + create_data["can_change_taxonomy"] = True + create_data["can_delete_taxonomy"] = True + create_data["can_tag_object"] = False check_taxonomy(response.data, response.data["id"], **create_data) url = TAXONOMY_DETAIL_URL.format(pk=response.data["id"]) @@ -358,6 +436,9 @@ def test_update_taxonomy(self, user_attr, expected_status): "name": "new name", "description": "taxonomy description", "enabled": True, + "can_change_taxonomy": True, + "can_delete_taxonomy": True, + "can_tag_object": False, }, ) @@ -414,6 +495,9 @@ def test_patch_taxonomy(self, user_attr, expected_status): **{ "name": "new name", "enabled": True, + "can_change_taxonomy": True, + "can_delete_taxonomy": True, + "can_tag_object": False, }, ) @@ -580,7 +664,7 @@ def _change_object_permission(user, object_id: str) -> bool: """ For testing, let everyone have edit object permission on object_id "abc" and "limit_tag_count" """ - if object_id in ("abc", "limit_tag_count"): + if object_id in ("abc", "limit_tag_count", "problem7", "problem15", "html7"): return True return can_change_object_tag_objectid(user, object_id) @@ -640,28 +724,31 @@ def test_retrieve_object_tags(self, user_attr, expected_status): { "name": "Life on Earth", "taxonomy_id": 1, - "editable": True, + "can_tag_object": True, "tags": [ # Note: based on tree order (Animalia before Fungi), this tag comes first even though it # starts with "M" and Fungi starts with "F" { "value": "Mammalia", "lineage": ["Eukaryota", "Animalia", "Chordata", "Mammalia"], + "can_delete_objecttag": True, }, { "value": "Fungi", "lineage": ["Eukaryota", "Fungi"], + "can_delete_objecttag": True, }, ] }, { "name": "User Authors", "taxonomy_id": 3, - "editable": False, + "can_tag_object": True, "tags": [ { "value": "test_user_1", "lineage": ["test_user_1"], + "can_delete_objecttag": True, }, ], } @@ -669,7 +756,7 @@ def test_retrieve_object_tags(self, user_attr, expected_status): }, } - def prepare_for_sort_test(self) -> tuple[str, list[dict]]: + def prepare_for_sort_test(self, expected_perm=False) -> tuple[str, list[dict]]: """ Tag an object with tags from the "sort test" taxonomy """ @@ -691,6 +778,11 @@ def prepare_for_sort_test(self) -> tuple[str, list[dict]]: taxonomy = self.create_sort_test_taxonomy() api.tag_object(object_id=object_id, taxonomy=taxonomy, tags=sort_test_tags) + # Properties shared by all returned object tags + shared_props = { + "can_delete_objecttag": expected_perm, + } + # The result we expect to see when retrieving the object tags, after applying the list above. # Note: the full taxonomy looks like the following, so this is the order we # expect, although not all of these tags were included. @@ -713,15 +805,15 @@ def prepare_for_sort_test(self) -> tuple[str, list[dict]]: # ANVIL # azure sort_test_applied_result = [ - {"value": "1 A", "lineage": ["1", "1 A"]}, - {"value": "1111-grandchild", "lineage": ["1", "11111", "1111-grandchild"]}, - {"value": "111", "lineage": ["111"]}, - {"value": "11111111", "lineage": ["111", "11111111"]}, - {"value": "123", "lineage": ["111", "123"]}, - {"value": "abstract", "lineage": ["abstract"]}, - {"value": "azores islands", "lineage": ["abstract", "azores islands"]}, - {"value": "Android", "lineage": ["ALPHABET", "Android"]}, - {"value": "ANVIL", "lineage": ["ALPHABET", "ANVIL"]}, + {"value": "1 A", "lineage": ["1", "1 A"], **shared_props}, + {"value": "1111-grandchild", "lineage": ["1", "11111", "1111-grandchild"], **shared_props}, + {"value": "111", "lineage": ["111"], **shared_props}, + {"value": "11111111", "lineage": ["111", "11111111"], **shared_props}, + {"value": "123", "lineage": ["111", "123"], **shared_props}, + {"value": "abstract", "lineage": ["abstract"], **shared_props}, + {"value": "azores islands", "lineage": ["abstract", "azores islands"], **shared_props}, + {"value": "Android", "lineage": ["ALPHABET", "Android"], **shared_props}, + {"value": "ANVIL", "lineage": ["ALPHABET", "ANVIL"], **shared_props}, ] return object_id, sort_test_applied_result @@ -730,7 +822,7 @@ def test_retrieve_object_tags_sorted(self): Test the sort order of the object tags retrieved from the get object tags API. """ - object_id, sort_test_applied_result = self.prepare_for_sort_test() + object_id, sort_test_applied_result = self.prepare_for_sort_test(expected_perm=True) url = OBJECT_TAGS_RETRIEVE_URL.format(object_id=object_id) self.client.force_authenticate(user=self.user_1) @@ -741,16 +833,20 @@ def test_retrieve_object_tags_sorted(self): def test_retrieve_object_tags_query_count(self): """ - Test how many queries are used when retrieving object tags + Test how many queries are used when retrieving object tags and permissions """ - object_id, sort_test_applied_result = self.prepare_for_sort_test() - + expected_perm = True + object_id, sort_test_applied_result = self.prepare_for_sort_test(expected_perm) url = OBJECT_TAGS_RETRIEVE_URL.format(object_id=object_id) + self.client.force_authenticate(user=self.user_1) with self.assertNumQueries(1): response = self.client.get(url) - assert response.status_code == 200 - assert response.data[object_id]["taxonomies"][0]["tags"] == sort_test_applied_result + + assert response.status_code == 200 + assert len(response.data[object_id]["taxonomies"]) == 1 + assert response.data[object_id]["taxonomies"][0]["can_tag_object"] == expected_perm + assert response.data[object_id]["taxonomies"][0]["tags"] == sort_test_applied_result def test_retrieve_object_tags_unauthorized(self): """ @@ -797,11 +893,12 @@ def test_retrieve_object_tags_taxonomy_queryparam( { "name": "User Authors", "taxonomy_id": 3, - "editable": False, + "can_tag_object": True, "tags": [ { "value": "test_user_1", "lineage": ["test_user_1"], + "can_delete_objecttag": True, }, ], } @@ -1190,6 +1287,7 @@ def test_small_taxonomy_root(self): assert data.get("count") == root_count assert data.get("num_pages") == 1 assert data.get("current_page") == 1 + assert data.get("can_add_tag") def test_small_taxonomy(self): """ @@ -1320,6 +1418,23 @@ def test_small_search_shallow(self): " Euryarchaeida (children: 0)", ] + def test_small_query_count(self): + """ + Test how many queries are used when retrieving small taxonomies+tags and permissions + """ + url = f"{self.small_taxonomy_url}?search_term=eU" + + self.client.force_authenticate(user=self.staff) + with self.assertNumQueries(5): + response = self.client.get(url) + + assert response.status_code == status.HTTP_200_OK + assert response.data["can_add_tag"] + assert len(response.data["results"]) == 3 + for taxonomy in response.data["results"]: + assert taxonomy["can_change_tag"] + assert taxonomy["can_delete_tag"] + def test_empty_results(self): """ Test that various queries return an empty list @@ -1348,7 +1463,11 @@ def test_large_taxonomy(self): """ self._build_large_taxonomy() self.client.force_authenticate(user=self.staff) - response = self.client.get(self.large_taxonomy_url + "?include_counts") + + url = self.large_taxonomy_url + "?include_counts" + with self.assertNumQueries(3): + response = self.client.get(url) + assert response.status_code == status.HTTP_200_OK data = response.data @@ -1379,6 +1498,8 @@ def test_large_taxonomy(self): f"rest_api/v1/taxonomies/{self.large_taxonomy.id}" f"/tags/?parent_tag={quote_plus(results[0]['value'])}" ) + assert results[0].get("can_change_tag") + assert results[0].get("can_delete_tag") # Checking pagination values assert data.get("next") == ( @@ -1389,6 +1510,7 @@ def test_large_taxonomy(self): assert data.get("count") == self.root_tags_count assert data.get("num_pages") == 6 assert data.get("current_page") == 1 + assert data.get("can_add_tag") def test_next_page_large_taxonomy(self): self._build_large_taxonomy()