From 830f5b3c2dadfc65225a98ea9a3a27505a651db8 Mon Sep 17 00:00:00 2001 From: Delsin Van Grembergen Date: Wed, 4 Sep 2024 13:41:09 +0200 Subject: [PATCH] feat: Add mediafile_download_policy --- .../mediafile_download_policy.py | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 src/elody/policies/authorization/mediafile_download_policy.py diff --git a/src/elody/policies/authorization/mediafile_download_policy.py b/src/elody/policies/authorization/mediafile_download_policy.py new file mode 100644 index 0000000..8164dbb --- /dev/null +++ b/src/elody/policies/authorization/mediafile_download_policy.py @@ -0,0 +1,70 @@ +import re as regex + +from elody.policies.permission_handler import ( + get_permissions, + handle_single_item_request, +) +from flask import Request # pyright: ignore +from flask_restful import abort # pyright: ignore +from inuits_policy_based_auth import BaseAuthorizationPolicy # pyright: ignore +from inuits_policy_based_auth.contexts.policy_context import ( # pyright: ignore + PolicyContext, +) +from inuits_policy_based_auth.contexts.user_context import ( # pyright: ignore + UserContext, +) +from storage.storagemanager import StorageManager # pyright: ignore + + +class MediafileDownloadPolicy(BaseAuthorizationPolicy): + def authorize( + self, policy_context: PolicyContext, user_context: UserContext, request_context + ): + request: Request = request_context.http_request + if not regex.match(r"^/mediafiles/(.+)/download$", request.path): + return policy_context + + view_args = request.view_args or {} + collection = view_args.get("collection", request.path.split("/")[-3]) + id = view_args.get("id") + item = ( + StorageManager() + .get_db_engine() + .get_item_from_collection_by_id(collection, id) + ) + if not item: + abort( + 404, + message=f"Item with id {id} doesn't exist in collection {collection}", + ) + + for role in user_context.x_tenant.roles: + permissions = get_permissions(role, user_context) + if not permissions: + continue + + rules = [ + GetRequestRules, + ] + access_verdict = None + for rule in rules: + access_verdict = rule().apply(item, user_context, request, permissions) + if access_verdict != None: + policy_context.access_verdict = access_verdict + if not policy_context.access_verdict: + return policy_context + + if policy_context.access_verdict: + return policy_context + + return policy_context + + +class GetRequestRules: + def apply( + self, item, user_context: UserContext, request: Request, permissions + ) -> bool | None: + if request.method != "GET": + return None + + return handle_single_item_request(user_context, item, permissions, "read")