Skip to content

Commit

Permalink
feat: Update Taxonomy Tags permissions
Browse files Browse the repository at this point in the history
Utilize Taxonomy permissions to determine permissions of Taxonomy Tags
  • Loading branch information
yusuf-musleh committed Oct 16, 2023
1 parent ed0a7e2 commit f677f3f
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 44 deletions.
23 changes: 12 additions & 11 deletions openedx_tagging/core/tagging/rest_api/v1/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,21 @@ class ObjectTagObjectPermissions(DjangoObjectPermissions):
}


class TagListPermissions(DjangoObjectPermissions):
class TagObjectPermissions(DjangoObjectPermissions):
"""
Permissions for Tag object views.
Maps each REST API methods to its corresponding Tag permission.
"""
def has_permission(self, request, view):
"""
Returns True if the user on the given request is allowed the given view.
"""
if not request.user or (
not request.user.is_authenticated and self.authenticated_users_only
):
return False
return True
perms_map = {
"GET": ["%(app_label)s.view_%(model_name)s"],
"OPTIONS": [],
"HEAD": ["%(app_label)s.view_%(model_name)s"],
"POST": ["%(app_label)s.add_%(model_name)s"],
"PUT": ["%(app_label)s.change_%(model_name)s"],
"PATCH": ["%(app_label)s.change_%(model_name)s"],
"DELETE": ["%(app_label)s.delete_%(model_name)s"],
}

# This is to handle the special case for GET list of Taxonomy Tags
def has_object_permission(self, request, view, obj):
"""
Returns True if the user on the given request is allowed the given view for the given object.
Expand Down
26 changes: 22 additions & 4 deletions openedx_tagging/core/tagging/rest_api/v1/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from ...models import Taxonomy
from ...rules import ChangeObjectTagPermissionItem
from ..paginators import SEARCH_TAGS_THRESHOLD, TAGS_THRESHOLD, DisabledTagsPagination, TagsPagination
from .permissions import ObjectTagObjectPermissions, TagListPermissions, TaxonomyObjectPermissions
from .permissions import ObjectTagObjectPermissions, TagObjectPermissions, TaxonomyObjectPermissions
from .serializers import (
ObjectTagListQueryParamsSerializer,
ObjectTagSerializer,
Expand Down Expand Up @@ -480,7 +480,7 @@ class TaxonomyTagsView(ListAPIView, RetrieveUpdateDestroyAPIView):
"""

permission_classes = [TagListPermissions]
permission_classes = [TagObjectPermissions]
pagination_enabled = True

def __init__(self):
Expand Down Expand Up @@ -613,7 +613,7 @@ def get_matching_tags(

return result

def get_queryset(self) -> list[Tag]: # type: ignore[override]
def get_queryset(self) -> models.QuerySet[Tag]: # type: ignore[override]
"""
Builds and returns the queryset to be paginated.
Expand All @@ -631,10 +631,26 @@ def get_queryset(self) -> list[Tag]: # type: ignore[override]
search_term=search_term,
)

# Convert the results back to a QuerySet for permissions to apply
# Due to the conversion we lose the populated `sub_tags` attribute,
# in the case of using the special search serializer so we
# need to repopulate it again
if self.serializer_class == TagsForSearchSerializer:
results_dict = {tag.id: tag for tag in result}

result_queryset = Tag.objects.filter(id__in=results_dict.keys())

for tag in result_queryset:
sub_tags = results_dict[tag.id].sub_tags
tag.sub_tags = sub_tags

else:
result_queryset = Tag.objects.filter(id__in=[tag.id for tag in result])

# This function is not called automatically
self.pagination_class = self.get_pagination_class()

return result
return result_queryset

def post(self, request, *args, **kwargs):
"""
Expand All @@ -659,6 +675,7 @@ def post(self, request, *args, **kwargs):
except ValueError as e:
raise ValidationError(e) from e

self.serializer_class = TagsSerializer
serializer_context = self.get_serializer_context()
return Response(
self.serializer_class(new_tag, context=serializer_context).data,
Expand Down Expand Up @@ -686,6 +703,7 @@ def update(self, request, *args, **kwargs):
except ValueError as e:
raise ValidationError(e) from e

self.serializer_class = TagsSerializer
serializer_context = self.get_serializer_context()
return Response(
self.serializer_class(updated_tag, context=serializer_context).data,
Expand Down
29 changes: 21 additions & 8 deletions openedx_tagging/core/tagging/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,28 @@ def can_change_taxonomy(user: UserType, taxonomy: Taxonomy | None = None) -> boo
)


@rules.predicate
def can_view_tag(user: UserType, tag: Tag | None = None) -> bool:
"""
User can view tags for any taxonomy they can view.
"""
taxonomy = tag.taxonomy.cast() if (tag and tag.taxonomy) else None
has_perm_thing = user.has_perm(
"oel_tagging.view_taxonomy",
taxonomy,
)
return has_perm_thing


@rules.predicate
def can_change_tag(user: UserType, tag: Tag | None = None) -> bool:
"""
Even taxonomy admins cannot add tags to system taxonomies (their tags are system-defined), or free-text taxonomies
(these don't have predefined tags).
Users can change tags for any taxonomy they can modify.
"""
taxonomy = tag.taxonomy.cast() if (tag and tag.taxonomy) else None
return is_taxonomy_admin(user) and (
not tag
or not taxonomy
or (taxonomy and not taxonomy.allow_free_text and not taxonomy.system_defined)
return user.has_perm(
"oel_tagging.change_taxonomy",
taxonomy,
)


Expand Down Expand Up @@ -114,8 +125,10 @@ def can_change_object_tag(
# Tag
rules.add_perm("oel_tagging.add_tag", can_change_tag)
rules.add_perm("oel_tagging.change_tag", can_change_tag)
rules.add_perm("oel_tagging.delete_tag", is_taxonomy_admin)
rules.add_perm("oel_tagging.view_tag", rules.always_allow)
rules.add_perm("oel_tagging.delete_tag", can_change_tag)
rules.add_perm("oel_tagging.view_tag", can_view_tag)
# Special Case for listing Tags, we check if we can view the Taxonomy since
# that is what is passed in rather than a Tag object
rules.add_perm("oel_tagging.list_tag", can_view_taxonomy)

# ObjectTag
Expand Down
91 changes: 70 additions & 21 deletions tests/openedx_tagging/core/tagging/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1204,7 +1204,7 @@ def test_create_tag_in_taxonomy_while_loggedout(self):

assert response.status_code == status.HTTP_401_UNAUTHORIZED

def test_create_tag_in_taxonomy(self):
def test_create_tag_in_taxonomy_without_permission(self):
self.client.force_authenticate(user=self.user)
new_tag_value = "New Tag"

Expand All @@ -1216,6 +1216,20 @@ def test_create_tag_in_taxonomy(self):
self.small_taxonomy_url, create_data, format="json"
)

assert response.status_code == status.HTTP_403_FORBIDDEN

def test_create_tag_in_taxonomy(self):
self.client.force_authenticate(user=self.staff)
new_tag_value = "New Tag"

create_data = {
"tag": new_tag_value
}

response = self.client.post(
self.small_taxonomy_url, create_data, format="json"
)

assert response.status_code == status.HTTP_201_CREATED

data = response.data
Expand All @@ -1229,7 +1243,7 @@ def test_create_tag_in_taxonomy(self):
self.assertEqual(data.get("children_count"), 0)

def test_create_tag_in_taxonomy_with_parent_id(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)
parent_tag = self.small_taxonomy.tag_set.filter(parent=None).first()
new_tag_value = "New Child Tag"
new_external_id = "extId"
Expand Down Expand Up @@ -1257,7 +1271,7 @@ def test_create_tag_in_taxonomy_with_parent_id(self):
self.assertEqual(data.get("children_count"), 0)

def test_create_tag_in_invalid_taxonomy(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)
new_tag_value = "New Tag"

create_data = {
Expand All @@ -1272,7 +1286,7 @@ def test_create_tag_in_invalid_taxonomy(self):
assert response.status_code == status.HTTP_404_NOT_FOUND

def test_create_tag_in_free_text_taxonomy(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)
new_tag_value = "New Tag"

create_data = {
Expand All @@ -1290,7 +1304,7 @@ def test_create_tag_in_free_text_taxonomy(self):
assert response.status_code == status.HTTP_400_BAD_REQUEST

def test_create_tag_in_system_defined_taxonomy(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)
new_tag_value = "New Tag"

create_data = {
Expand All @@ -1308,7 +1322,7 @@ def test_create_tag_in_system_defined_taxonomy(self):
assert response.status_code == status.HTTP_400_BAD_REQUEST

def test_create_tag_in_taxonomy_with_invalid_parent_tag_id(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)
invalid_parent_tag_id = 91919
new_tag_value = "New Child Tag"

Expand All @@ -1324,7 +1338,7 @@ def test_create_tag_in_taxonomy_with_invalid_parent_tag_id(self):
assert response.status_code == status.HTTP_400_BAD_REQUEST

def test_create_tag_in_taxonomy_with_parent_tag_id_in_other_taxonomy(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)
invalid_parent_tag_id = 1
new_tag_value = "New Child Tag"

Expand All @@ -1340,7 +1354,7 @@ def test_create_tag_in_taxonomy_with_parent_tag_id_in_other_taxonomy(self):
assert response.status_code == status.HTTP_404_NOT_FOUND

def test_create_tag_in_taxonomy_with_already_existing_value(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)
new_tag_value = "New Tag"

create_data = {
Expand Down Expand Up @@ -1378,9 +1392,28 @@ def test_update_tag_in_taxonomy_while_loggedout(self):

assert response.status_code == status.HTTP_401_UNAUTHORIZED

def test_update_tag_in_taxonomy_with_different_methods(self):
def test_update_tag_in_taxonomy_without_permission(self):
self.client.force_authenticate(user=self.user)
updated_tag_value = "Updated Tag"

# Existing Tag that will be updated
existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first()

update_data = {
"tag": existing_tag.id,
"tag_value": updated_tag_value
}

# Test updating using the PUT method
response = self.client.put(
self.small_taxonomy_url, update_data, format="json"
)

assert response.status_code == status.HTTP_403_FORBIDDEN

def test_update_tag_in_taxonomy_with_different_methods(self):
self.client.force_authenticate(user=self.staff)
updated_tag_value = "Updated Tag"
updated_tag_value_2 = "Updated Tag 2"

# Existing Tag that will be updated
Expand Down Expand Up @@ -1425,7 +1458,7 @@ def test_update_tag_in_taxonomy_with_different_methods(self):
self.assertEqual(data.get("external_id"), existing_tag.external_id)

def test_update_tag_in_taxonomy_reflects_changes_in_object_tags(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)

existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first()

Expand Down Expand Up @@ -1476,7 +1509,7 @@ def test_update_tag_in_taxonomy_reflects_changes_in_object_tags(self):
self.assertEqual(object_tag_3.value, updated_tag_value)

def test_update_tag_in_taxonomy_with_invalid_tag_id(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)
updated_tag_value = "Updated Tag"

update_data = {
Expand All @@ -1491,7 +1524,7 @@ def test_update_tag_in_taxonomy_with_invalid_tag_id(self):
assert response.status_code == status.HTTP_400_BAD_REQUEST

def test_update_tag_in_taxonomy_with_tag_id_in_other_taxonomy(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)
updated_tag_value = "Updated Tag"

update_data = {
Expand All @@ -1506,7 +1539,7 @@ def test_update_tag_in_taxonomy_with_tag_id_in_other_taxonomy(self):
assert response.status_code == status.HTTP_404_NOT_FOUND

def test_update_tag_in_taxonomy_with_no_tag_value_provided(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)

# Existing Tag that will be updated
existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first()
Expand All @@ -1522,7 +1555,7 @@ def test_update_tag_in_taxonomy_with_no_tag_value_provided(self):
assert response.status_code == status.HTTP_400_BAD_REQUEST

def test_update_tag_in_invalid_taxonomy(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)

# Existing Tag that will be updated
existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first()
Expand Down Expand Up @@ -1555,8 +1588,24 @@ def test_delete_single_tag_from_taxonomy_while_loggedout(self):

assert response.status_code == status.HTTP_401_UNAUTHORIZED

def test_delete_single_tag_from_taxonomy(self):
def test_delete_single_tag_from_taxonomy_without_permission(self):
self.client.force_authenticate(user=self.user)
# Get Tag that will be deleted
existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first()

delete_data = {
"tag_ids": [existing_tag.id],
"with_subtags": True
}

response = self.client.delete(
self.small_taxonomy_url, delete_data, format="json"
)

assert response.status_code == status.HTTP_403_FORBIDDEN

def test_delete_single_tag_from_taxonomy(self):
self.client.force_authenticate(user=self.staff)

# Get Tag that will be deleted
existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first()
Expand All @@ -1577,7 +1626,7 @@ def test_delete_single_tag_from_taxonomy(self):
existing_tag.refresh_from_db()

def test_delete_multiple_tags_from_taxonomy(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)

# Get Tags that will be deleted
existing_tags = self.small_taxonomy.tag_set.filter(parent=None)[:3]
Expand All @@ -1599,7 +1648,7 @@ def test_delete_multiple_tags_from_taxonomy(self):
existing_tag.refresh_from_db()

def test_delete_tag_with_subtags_should_fail_without_flag_passed(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)

# Get Tag that will be deleted
existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first()
Expand All @@ -1615,7 +1664,7 @@ def test_delete_tag_with_subtags_should_fail_without_flag_passed(self):
assert response.status_code == status.HTTP_400_BAD_REQUEST

def test_delete_tag_in_invalid_taxonomy(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)

# Get Tag that will be deleted
existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first()
Expand All @@ -1632,7 +1681,7 @@ def test_delete_tag_in_invalid_taxonomy(self):
assert response.status_code == status.HTTP_404_NOT_FOUND

def test_delete_tag_in_taxonomy_with_invalid_tag_id(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)

delete_data = {
"tag_ids": [91919]
Expand All @@ -1645,7 +1694,7 @@ def test_delete_tag_in_taxonomy_with_invalid_tag_id(self):
assert response.status_code == status.HTTP_400_BAD_REQUEST

def test_delete_tag_with_tag_id_in_other_taxonomy(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)

# Get Tag in other Taxonomy
tag_in_other_taxonomy = self.small_taxonomy.tag_set.filter(parent=None).first()
Expand All @@ -1661,7 +1710,7 @@ def test_delete_tag_with_tag_id_in_other_taxonomy(self):
assert response.status_code == status.HTTP_400_BAD_REQUEST

def test_delete_tag_in_taxonomy_without_subtags(self):
self.client.force_authenticate(user=self.user)
self.client.force_authenticate(user=self.staff)

# Get Tag that will be deleted
existing_tag = self.small_taxonomy.tag_set.filter(children__isnull=True).first()
Expand Down

0 comments on commit f677f3f

Please sign in to comment.