diff --git a/docker/setup_configuration/data.yaml b/docker/setup_configuration/data.yaml index d5776648..b96ed6ef 100644 --- a/docker/setup_configuration/data.yaml +++ b/docker/setup_configuration/data.yaml @@ -42,20 +42,33 @@ objecttypes: name: Object Type 1 service_identifier: objecttypes-api + - uuid: b0e8553f-8b1a-4d55-ab90-6d02f1bcf2c2 + name: Object Type 2 + service_identifier: objecttypes-api + tokenauth_config_enable: true tokenauth: items: - identifier: token-1 - token: 18b2b74ef994314b84021d47b9422e82b685d82f + token: ba9d233e95e04c4a8a661a27daffe7c9bd019067 contact_person: Person 1 email: person-1@example.com organization: Organization 1 application: Application 1 administration: Administration 1 - is_superuser: true - - + permissions: + - object_type: b427ef84-189d-43aa-9efd-7bb2c459e281 + mode: read_and_write + - object_type: b0e8553f-8b1a-4d55-ab90-6d02f1bcf2c2 + mode: read_only + use_fields: true + fields: + '1': + - record__data__leeftijd + - record__data__kiemjaar + + oidc_db_config_enable: true oidc_db_config_admin_auth: items: diff --git a/docs/installation/config_cli.rst b/docs/installation/config_cli.rst index 4445262a..40799877 100644 --- a/docs/installation/config_cli.rst +++ b/docs/installation/config_cli.rst @@ -37,6 +37,7 @@ Objecttypes configuration To configure objecttypes the following configuration could be used: .. code-block:: yaml + ... zgw_consumers_config_enable: true zgw_consumers: @@ -68,6 +69,7 @@ To configure objecttypes the following configuration could be used: name: Object Type 2 service_identifier: objecttypen-bar ... + .. note:: The ``uuid`` field will be used to lookup existing ``ObjectType``'s. Objecttypes require a corresponding ``Service`` to work correctly. Creating @@ -81,8 +83,8 @@ In order to be able to retrieve objecttypes, a corresponding ``Service`` should created. An example of a configuration could be seen below: .. code-block:: yaml - ... + ... zgw_consumers_config_enable: true zgw_consumers: services: @@ -102,7 +104,8 @@ created. An example of a configuration could be seen below: auth_type: api_key header_key: Authorization header_value: Token b9f100590925b529664ed9d370f5f8da124b2c20 - .... + ... + Tokens configuration -------------------- @@ -121,14 +124,28 @@ Create or update the (single) YAML configuration file with your settings: organization: Organization XYZ # optional application: Application XYZ # optional administration: Administration XYZ # optional - is_superuser: true # optional + permissions: + - object_type: b427ef84-189d-43aa-9efd-7bb2c459e281 + mode: read_and_write - identifier: token-2 token: 7b2b212d9f16d171a70a1d927cdcfbd5ca7a4799 contact_person: Person 2 email: person-2@example.com + permissions: + - object_type: b0e8553f-8b1a-4d55-ab90-6d02f1bcf2c2 + mode: read_only + use_fields: true + fields: + '1': + - record__data__leeftijd + - record__data__kiemjaar ... +.. note:: To ensure the proper functioning of the tokens, it is essential to first configure the ``objecttypes``. + Then, the token configuration must be completed to guarantee the correct configuration of the ``Permissions``. + + Mozilla-django-oidc-db ---------------------- @@ -158,16 +175,32 @@ can be found at the _`documentation`: https://mozilla-django-oidc-db.readthedocs Sites configuration ------------------- +.. code-block:: yaml + + ... + sites_config_enable: true + sites_config: + items: + - domain: example.com + name: Example site + - domain: test.example.com + name: Test site + ... + +More details about sites configuration through ``setup_configuration`` +can be found at the _`site documentation`: https://github.com/maykinmedia/django-setup-configuration/blob/main/docs/sites_config.rst + + Notifications configuration -------------------------- +--------------------------- To configure sending notifications for the application ensure there is a ``services`` item present that matches the ``notifications_api_service_identifier`` in the ``notifications_config`` namespace: .. code-block:: yaml - ... + ... zgw_consumers_config_enable: true zgw_consumers: services: @@ -184,7 +217,7 @@ item present that matches the ``notifications_api_service_identifier`` in the notification_delivery_max_retries: 1 notification_delivery_retry_backoff: 2 notification_delivery_retry_backoff_max: 3 - .... + ... Execution diff --git a/src/objects/conf/base.py b/src/objects/conf/base.py index f7ab7c12..84be265d 100644 --- a/src/objects/conf/base.py +++ b/src/objects/conf/base.py @@ -88,5 +88,6 @@ "zgw_consumers.contrib.setup_configuration.steps.ServiceConfigurationStep", "notifications_api_common.contrib.setup_configuration.steps.NotificationConfigurationStep", "mozilla_django_oidc_db.setup_configuration.steps.AdminOIDCConfigurationStep", + "objects.setup_configuration.steps.objecttypes.ObjectTypesConfigurationStep", "objects.setup_configuration.steps.token_auth.TokenAuthConfigurationStep", ) diff --git a/src/objects/setup_configuration/models/token_auth.py b/src/objects/setup_configuration/models/token_auth.py index 5adf5746..23fae8f0 100644 --- a/src/objects/setup_configuration/models/token_auth.py +++ b/src/objects/setup_configuration/models/token_auth.py @@ -1,9 +1,30 @@ +from django_setup_configuration.fields import DjangoModelRef from django_setup_configuration.models import ConfigurationModel +from pydantic import UUID4, Field -from objects.token.models import TokenAuth +from objects.token.models import Permission, TokenAuth + + +class TokenAuthPermissionConfigurationModel(ConfigurationModel): + object_type: UUID4 + fields: dict[str, list[str]] | None = DjangoModelRef( + Permission, "fields", default=None + ) + + class Meta: + django_model_refs = { + Permission: ( + "mode", + "use_fields", + ), + } class TokenAuthConfigurationModel(ConfigurationModel): + permissions: list[TokenAuthPermissionConfigurationModel] | None = Field( + default_factory=list, + ) + class Meta: django_model_refs = { TokenAuth: ( diff --git a/src/objects/setup_configuration/steps/token_auth.py b/src/objects/setup_configuration/steps/token_auth.py index 09577370..abe41e2b 100644 --- a/src/objects/setup_configuration/steps/token_auth.py +++ b/src/objects/setup_configuration/steps/token_auth.py @@ -1,15 +1,17 @@ import logging +from typing import Any -from django.core.exceptions import ValidationError +from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.db import IntegrityError from django_setup_configuration.configuration import BaseConfigurationStep from django_setup_configuration.exceptions import ConfigurationRunFailed +from objects.core.models import ObjectType from objects.setup_configuration.models.token_auth import ( TokenAuthGroupConfigurationModel, ) -from objects.token.models import TokenAuth +from objects.token.models import Permission, TokenAuth logger = logging.getLogger(__name__) @@ -18,7 +20,7 @@ class TokenAuthConfigurationStep( BaseConfigurationStep[TokenAuthGroupConfigurationModel] ): """ - Configure tokens for other applications to access Objects API + Configure tokens with permissions for other applications to access Objects API """ namespace = "tokenauth" @@ -27,14 +29,61 @@ class TokenAuthConfigurationStep( verbose_name = "Configuration to set up authentication tokens for objects" config_model = TokenAuthGroupConfigurationModel + def _full_clean(self, instance: Any) -> None: + try: + instance.full_clean(exclude=("id",), validate_unique=False) + except ValidationError as exception: + raise ConfigurationRunFailed( + ("Validation error(s) during instance cleaning: %s" % type(instance)) + ) from exception + + def _configure_permissions(self, token: TokenAuth, permissions: list) -> None: + if len(permissions) == 0: + logger.warning("No permissions provided for %s", token.identifier) + + for permission in permissions: + try: + permission_kwargs = { + "token_auth": token, + "object_type": ObjectType.objects.get(uuid=permission.object_type), + "mode": permission.mode, + "use_fields": permission.use_fields, + "fields": permission.fields, + } + except ObjectDoesNotExist as exception: + raise ConfigurationRunFailed( + ("Object type with %s does not exist" % permission.object_type) + ) from exception + + permission_instance = Permission(**permission_kwargs) + self._full_clean(permission_instance) + + try: + Permission.objects.update_or_create( + token_auth=permission_kwargs["token_auth"], + object_type=permission_kwargs["object_type"], + defaults={ + "mode": permission_kwargs["mode"], + "use_fields": permission_kwargs["use_fields"], + "fields": permission_kwargs["fields"], + }, + ) + except IntegrityError as exception: + raise ConfigurationRunFailed( + ( + "Failed configuring permission for token %s and object type %s" + % (token.identifier, permission.object_type) + ) + ) from exception + def execute(self, model: TokenAuthGroupConfigurationModel) -> None: if len(model.items) == 0: logger.warning("No tokens provided for configuration") for item in model.items: - logger.info(f"Configuring {item.identifier}") + logger.info("Configuring %s", item.identifier) - model_kwargs = { + token_kwargs = { "identifier": item.identifier, "token": item.token, "contact_person": item.contact_person, @@ -45,31 +94,24 @@ def execute(self, model: TokenAuthGroupConfigurationModel) -> None: "is_superuser": item.is_superuser, } - token_instance = TokenAuth(**model_kwargs) - + token_instance = TokenAuth(**token_kwargs) + self._full_clean(token_instance) try: - token_instance.full_clean(exclude=("id",), validate_unique=False) - except ValidationError as exception: - exception_message = ( - f"Validation error(s) occured for {item.identifier}." - ) - raise ConfigurationRunFailed(exception_message) from exception - - logger.debug(f"No validation errors found for {item.identifier}") - - try: - logger.debug(f"Saving {item.identifier}") - - TokenAuth.objects.update_or_create( + logger.debug("Saving %s", item.identifier) + token, _ = TokenAuth.objects.update_or_create( identifier=item.identifier, defaults={ key: value - for key, value in model_kwargs.items() + for key, value in token_kwargs.items() if key != "identifier" }, ) + + self._configure_permissions(token, item.permissions) + except IntegrityError as exception: - exception_message = f"Failed configuring token {item.identifier}." - raise ConfigurationRunFailed(exception_message) from exception + raise ConfigurationRunFailed( + "Failed configuring token %s" % item.identifier + ) from exception - logger.info(f"Configured {item.identifier}") + logger.info("Configured %s", item.identifier) diff --git a/src/objects/setup_configuration/tests/files/token_auth/valid_setup_complete.yaml b/src/objects/setup_configuration/tests/files/token_auth/valid_setup_complete.yaml index 6252d846..67557d7e 100644 --- a/src/objects/setup_configuration/tests/files/token_auth/valid_setup_complete.yaml +++ b/src/objects/setup_configuration/tests/files/token_auth/valid_setup_complete.yaml @@ -8,7 +8,17 @@ tokenauth: organization: Organization 1 application: Application 1 administration: Administration 1 - is_superuser: True + permissions: + - object_type: 3a82fb7f-fc9b-4104-9804-993f639d6d0d + mode: read_only + use_fields: true + fields: + '1': + - record__data__leeftijd + - record__data__kiemjaar + + - object_type: ca754b52-3f37-4c49-837c-130e8149e337 + mode: read_and_write - identifier: token-2 token: e882642bd0ec2482adcdc97258c2e6f98cb06d85 @@ -17,4 +27,15 @@ tokenauth: organization: Organization 2 application: Application 2 administration: Administration 2 + permissions: + - object_type: feeaa795-d212-4fa2-bb38-2c34996e5702 + mode: read_only + + - identifier: token-3 + token: ff835859ecf8df4d541aab09f2d0854d17b41a77 + contact_person: Person 3 + email: person-3@example.com + organization: Organization 3 + application: Application 3 + administration: Administration 3 is_superuser: True diff --git a/src/objects/setup_configuration/tests/test_token_auth_config.py b/src/objects/setup_configuration/tests/test_token_auth_config.py index a7c8676e..4fd20949 100644 --- a/src/objects/setup_configuration/tests/test_token_auth_config.py +++ b/src/objects/setup_configuration/tests/test_token_auth_config.py @@ -7,15 +7,39 @@ PrerequisiteFailed, ) from django_setup_configuration.test_utils import execute_single_step +from zgw_consumers.models import Service +from zgw_consumers.test.factories import ServiceFactory +from objects.core.models import ObjectType +from objects.core.tests.factories import ObjectTypeFactory from objects.setup_configuration.steps.token_auth import TokenAuthConfigurationStep -from objects.token.models import TokenAuth +from objects.token.models import Permission, TokenAuth from objects.token.tests.factories import TokenAuthFactory DIR_FILES = (Path(__file__).parent / "files/token_auth").resolve() -class TokenAuthConfigurationStepTests(TestCase): +class TokenTestCase(TestCase): + def setUp(self): + self.service = ServiceFactory(slug="service") + ObjectTypeFactory( + service=self.service, + uuid="3a82fb7f-fc9b-4104-9804-993f639d6d0d", + _name="Object Type 001", + ) + ObjectTypeFactory( + service=self.service, + uuid="ca754b52-3f37-4c49-837c-130e8149e337", + _name="Object Type 002", + ) + ObjectTypeFactory( + service=self.service, + uuid="feeaa795-d212-4fa2-bb38-2c34996e5702", + _name="Object Type 003", + ) + + +class TokenAuthConfigurationStepTests(TokenTestCase): def test_valid_setup_default(self): execute_single_step( TokenAuthConfigurationStep, @@ -50,9 +74,8 @@ def test_valid_setup_complete(self): ) tokens = TokenAuth.objects.all() - self.assertEqual(tokens.count(), 2) + self.assertEqual(tokens.count(), 3) - # Same as configuration token = tokens.get(identifier="token-1") self.assertEqual(token.token, "18b2b74ef994314b84021d47b9422e82b685d82f") self.assertEqual(token.contact_person, "Person 1") @@ -60,9 +83,8 @@ def test_valid_setup_complete(self): self.assertEqual(token.organization, "Organization 1") self.assertEqual(token.application, "Application 1") self.assertEqual(token.administration, "Administration 1") - self.assertTrue(token.is_superuser) + self.assertFalse(token.is_superuser) - # Token data updated token = tokens.get(identifier="token-2") self.assertEqual(token.contact_person, "Person 2") self.assertEqual(token.token, "e882642bd0ec2482adcdc97258c2e6f98cb06d85") @@ -70,6 +92,15 @@ def test_valid_setup_complete(self): self.assertEqual(token.organization, "Organization 2") self.assertEqual(token.application, "Application 2") self.assertEqual(token.administration, "Administration 2") + self.assertFalse(token.is_superuser) + + token = tokens.get(identifier="token-3") + self.assertEqual(token.contact_person, "Person 3") + self.assertEqual(token.token, "ff835859ecf8df4d541aab09f2d0854d17b41a77") + self.assertEqual(token.email, "person-3@example.com") + self.assertEqual(token.organization, "Organization 3") + self.assertEqual(token.application, "Application 3") + self.assertEqual(token.administration, "Administration 3") self.assertTrue(token.is_superuser) def test_valid_update_existing_tokens(self): @@ -95,7 +126,7 @@ def test_valid_update_existing_tokens(self): ) tokens = TokenAuth.objects.all() - self.assertEqual(tokens.count(), 2) + self.assertEqual(tokens.count(), 3) # Same as configuration token = tokens.get(identifier="token-1") @@ -105,7 +136,7 @@ def test_valid_update_existing_tokens(self): self.assertEqual(token.organization, "Organization 1") self.assertEqual(token.application, "Application 1") self.assertEqual(token.administration, "Administration 1") - self.assertTrue(token.is_superuser) + self.assertFalse(token.is_superuser) # Token data updated token = tokens.get(identifier="token-2") @@ -115,7 +146,7 @@ def test_valid_update_existing_tokens(self): self.assertEqual(token.organization, "Organization 2") self.assertEqual(token.application, "Application 2") self.assertEqual(token.administration, "Administration 2") - self.assertTrue(token.is_superuser) + self.assertFalse(token.is_superuser) self.assertNotEqual(token.token, "1cad42916dfa439af8c69000bf7b6af6a66782af") self.assertNotEqual(token.contact_person, "Person 3") @@ -128,7 +159,7 @@ def test_valid_idempotent_step(self): ) tokens = TokenAuth.objects.all() - self.assertEqual(tokens.count(), 2) + self.assertEqual(tokens.count(), 3) old_token_a = tokens.get(identifier="token-1") self.assertEqual(old_token_a.identifier, "token-1") @@ -138,7 +169,7 @@ def test_valid_idempotent_step(self): self.assertEqual(old_token_a.organization, "Organization 1") self.assertEqual(old_token_a.application, "Application 1") self.assertEqual(old_token_a.administration, "Administration 1") - self.assertTrue(old_token_a.is_superuser) + self.assertFalse(old_token_a.is_superuser) old_token_b = tokens.get(identifier="token-2") self.assertEqual(old_token_b.identifier, "token-2") @@ -148,7 +179,7 @@ def test_valid_idempotent_step(self): self.assertEqual(old_token_b.organization, "Organization 2") self.assertEqual(old_token_b.application, "Application 2") self.assertEqual(old_token_b.administration, "Administration 2") - self.assertTrue(old_token_b.is_superuser) + self.assertFalse(old_token_b.is_superuser) execute_single_step( TokenAuthConfigurationStep, @@ -156,7 +187,7 @@ def test_valid_idempotent_step(self): ) tokens = TokenAuth.objects.all() - self.assertEqual(tokens.count(), 2) + self.assertEqual(tokens.count(), 3) new_token_a = tokens.get(identifier="token-1") self.assertEqual(new_token_a.identifier, old_token_a.identifier) @@ -207,7 +238,8 @@ def test_invalid_setup_email(self): execute_single_step(TokenAuthConfigurationStep, object_source=object_source) self.assertTrue( - "Validation error(s) occured for token-1" in str(command_error.exception) + "Validation error(s) during instance cleaning" + in str(command_error.exception) ) self.assertEqual(TokenAuth.objects.count(), 0) @@ -232,7 +264,8 @@ def test_invalid_setup_token(self): execute_single_step(TokenAuthConfigurationStep, object_source=object_source) self.assertTrue( - "Validation error(s) occured for token-1" in str(command_error.exception) + "Validation error(s) during instance cleaning" + in str(command_error.exception) ) self.assertEqual(TokenAuth.objects.count(), 0) @@ -255,9 +288,9 @@ def test_invalid_empty_token(self): } with self.assertRaises(ConfigurationRunFailed) as command_error: execute_single_step(TokenAuthConfigurationStep, object_source=object_source) - self.assertTrue( - "Validation error(s) occured for token-1" in str(command_error.exception) + "Validation error(s) during instance cleaning" + in str(command_error.exception) ) self.assertEqual(TokenAuth.objects.count(), 0) @@ -338,7 +371,8 @@ def test_invalid_setup_contact_person(self): execute_single_step(TokenAuthConfigurationStep, object_source=object_source) self.assertTrue( - "Validation error(s) occured for token-1" in str(command_error.exception) + "Validation error(s) during instance cleaning" + in str(command_error.exception) ) self.assertEqual(TokenAuth.objects.count(), 0) @@ -385,3 +419,388 @@ def test_invalid_empty_identifier(self): execute_single_step(TokenAuthConfigurationStep, object_source=object_source) self.assertTrue("String should match pattern" in str(command_error.exception)) self.assertEqual(TokenAuth.objects.count(), 0) + + +class TokenAuthConfigurationStepWithPermissionsTests(TokenTestCase): + def test_valid_setup_default_without_permissions(self): + self.assertEqual(TokenAuth.objects.count(), 0) + self.assertEqual(Permission.objects.count(), 0) + self.assertEqual(Service.objects.count(), 1) + self.assertEqual(ObjectType.objects.count(), 3) + + execute_single_step( + TokenAuthConfigurationStep, + yaml_source=str(DIR_FILES / "valid_setup_default.yaml"), + ) + tokens = TokenAuth.objects.all() + self.assertEqual(tokens.count(), 2) + + token = tokens.get(identifier="token-1") + self.assertEqual(token.token, "18b2b74ef994314b84021d47b9422e82b685d82f") + self.assertEqual(token.contact_person, "Person 1") + self.assertEqual(token.email, "person-1@example.com") + self.assertEqual(token.organization, "") + self.assertEqual(token.application, "") + self.assertEqual(token.administration, "") + self.assertFalse(token.is_superuser) + self.assertEqual(token.permissions.count(), 0) + self.assertEqual(token.object_types.count(), 0) + + token = tokens.get(identifier="token-2") + self.assertEqual(token.contact_person, "Person 2") + self.assertEqual(token.token, "e882642bd0ec2482adcdc97258c2e6f98cb06d85") + self.assertEqual(token.email, "person-2@example.com") + self.assertEqual(token.organization, "") + self.assertEqual(token.application, "") + self.assertEqual(token.administration, "") + self.assertFalse(token.is_superuser) + self.assertEqual(token.permissions.count(), 0) + self.assertEqual(token.object_types.count(), 0) + + def test_valid_setup_complete(self): + self.assertEqual(TokenAuth.objects.count(), 0) + self.assertEqual(Permission.objects.count(), 0) + self.assertEqual(Service.objects.count(), 1) + self.assertEqual(ObjectType.objects.count(), 3) + + execute_single_step( + TokenAuthConfigurationStep, + yaml_source=str(DIR_FILES / "valid_setup_complete.yaml"), + ) + tokens = TokenAuth.objects.all() + self.assertEqual(tokens.count(), 3) + self.assertEqual(Permission.objects.count(), 3) + + token = tokens.get(identifier="token-1") + token_permissions = token.permissions.all() + self.assertEqual(token.token, "18b2b74ef994314b84021d47b9422e82b685d82f") + self.assertEqual(token.contact_person, "Person 1") + self.assertEqual(token.email, "person-1@example.com") + self.assertEqual(token.organization, "Organization 1") + self.assertEqual(token.application, "Application 1") + self.assertEqual(token.administration, "Administration 1") + self.assertFalse(token.is_superuser) + self.assertEqual(token.object_types.count(), 2) + self.assertEqual(token_permissions.count(), 2) + object_type = ObjectType.objects.get( + uuid="3a82fb7f-fc9b-4104-9804-993f639d6d0d", service=self.service + ) + permission = token_permissions.get(object_type=object_type) + self.assertTrue(object_type in token.object_types.all()) + self.assertTrue(permission in token.permissions.all()) + self.assertEqual(permission.mode, "read_only") + self.assertTrue(permission.use_fields) + self.assertTrue(isinstance(permission.fields, dict)) + self.assertTrue(isinstance(permission.fields["1"], list)) + self.assertEqual(len(permission.fields.keys()), 1) + self.assertTrue("1" in permission.fields.keys()) + self.assertTrue("record__data__leeftijd" in permission.fields["1"]) + self.assertTrue("record__data__kiemjaar" in permission.fields["1"]) + object_type = ObjectType.objects.get( + uuid="ca754b52-3f37-4c49-837c-130e8149e337", service=self.service + ) + permission = token_permissions.get(object_type=object_type) + self.assertTrue(object_type in token.object_types.all()) + self.assertTrue(permission in token.permissions.all()) + self.assertEqual(permission.mode, "read_and_write") + self.assertFalse(permission.use_fields) + self.assertIsNone(permission.fields) + + token = tokens.get(identifier="token-2") + token_permissions = token.permissions.all() + self.assertEqual(token.contact_person, "Person 2") + self.assertEqual(token.token, "e882642bd0ec2482adcdc97258c2e6f98cb06d85") + self.assertEqual(token.email, "person-2@example.com") + self.assertEqual(token.organization, "Organization 2") + self.assertEqual(token.application, "Application 2") + self.assertEqual(token.administration, "Administration 2") + self.assertFalse(token.is_superuser) + self.assertEqual(token.permissions.count(), 1) + self.assertEqual(token.object_types.count(), 1) + object_type = ObjectType.objects.get( + uuid="feeaa795-d212-4fa2-bb38-2c34996e5702", service=self.service + ) + permission = token_permissions.get(object_type=object_type) + self.assertTrue(object_type in token.object_types.all()) + self.assertTrue(permission in token.permissions.all()) + self.assertEqual(permission.mode, "read_only") + self.assertFalse(permission.use_fields) + self.assertIsNone(permission.fields) + + token = tokens.get(identifier="token-3") + self.assertEqual(token.contact_person, "Person 3") + self.assertEqual(token.token, "ff835859ecf8df4d541aab09f2d0854d17b41a77") + self.assertEqual(token.email, "person-3@example.com") + self.assertEqual(token.organization, "Organization 3") + self.assertEqual(token.application, "Application 3") + self.assertEqual(token.administration, "Administration 3") + self.assertTrue(token.is_superuser) + self.assertEqual(token.permissions.count(), 0) + self.assertEqual(token.object_types.count(), 0) + + def test_valid_update_permissions(self): + object_source = { + "tokenauth_config_enable": True, + "tokenauth": { + "items": [ + { + "identifier": "token-1", + "token": "18b2b74ef994314b84021d47b9422e82b685d82f", + "contact_person": "Person 1", + "email": "person-1@example.com", + "organization": "Organization 1", + "application": "Application 1", + "administration": "Administration 1", + "permissions": [ + { + "object_type": "3a82fb7f-fc9b-4104-9804-993f639d6d0d", + "mode": "read_and_write", + }, + ], + }, + ], + }, + } + + execute_single_step(TokenAuthConfigurationStep, object_source=object_source) + + token = TokenAuth.objects.get(identifier="token-1") + self.assertEqual(token.contact_person, "Person 1") + self.assertEqual(token.token, "18b2b74ef994314b84021d47b9422e82b685d82f") + self.assertEqual(token.email, "person-1@example.com") + self.assertEqual(token.organization, "Organization 1") + self.assertEqual(token.application, "Application 1") + self.assertEqual(token.administration, "Administration 1") + self.assertEqual(token.permissions.count(), 1) + self.assertEqual(token.object_types.count(), 1) + object_type = ObjectType.objects.get( + uuid="3a82fb7f-fc9b-4104-9804-993f639d6d0d", service=self.service + ) + permission = token.permissions.get(object_type=object_type) + self.assertTrue(object_type in token.object_types.all()) + self.assertTrue(permission in token.permissions.all()) + self.assertEqual(permission.mode, "read_and_write") + self.assertFalse(permission.use_fields) + self.assertIsNone(permission.fields) + + # Update token permissions + execute_single_step( + TokenAuthConfigurationStep, + yaml_source=str(DIR_FILES / "valid_setup_complete.yaml"), + ) + token = TokenAuth.objects.get(identifier="token-1") + self.assertEqual(token.contact_person, "Person 1") + self.assertEqual(token.token, "18b2b74ef994314b84021d47b9422e82b685d82f") + self.assertEqual(token.email, "person-1@example.com") + self.assertEqual(token.organization, "Organization 1") + self.assertEqual(token.application, "Application 1") + self.assertEqual(token.administration, "Administration 1") + self.assertEqual(token.permissions.count(), 2) + self.assertEqual(token.object_types.count(), 2) + + permission = token.permissions.get(object_type=object_type) + self.assertTrue(object_type in token.object_types.all()) + self.assertTrue(permission in token.permissions.all()) + self.assertEqual(permission.mode, "read_only") + self.assertTrue(permission.use_fields) + self.assertTrue(isinstance(permission.fields, dict)) + self.assertTrue(isinstance(permission.fields["1"], list)) + self.assertEqual(len(permission.fields.keys()), 1) + self.assertTrue("1" in permission.fields.keys()) + self.assertTrue("record__data__leeftijd" in permission.fields["1"]) + self.assertTrue("record__data__kiemjaar" in permission.fields["1"]) + + def test_valid_idempotent_step(self): + self.assertEqual(TokenAuth.objects.count(), 0) + self.assertEqual(Permission.objects.count(), 0) + self.assertEqual(Service.objects.count(), 1) + self.assertEqual(ObjectType.objects.count(), 3) + + execute_single_step( + TokenAuthConfigurationStep, + yaml_source=str(DIR_FILES / "valid_setup_complete.yaml"), + ) + + tokens = TokenAuth.objects.all() + self.assertEqual(tokens.count(), 3) + self.assertEqual(Permission.objects.count(), 3) + + old_token = tokens.get(identifier="token-1") + old_token_permissions = old_token.permissions.all() + self.assertEqual(old_token.token, "18b2b74ef994314b84021d47b9422e82b685d82f") + self.assertEqual(old_token.contact_person, "Person 1") + self.assertEqual(old_token.email, "person-1@example.com") + self.assertEqual(old_token.organization, "Organization 1") + self.assertEqual(old_token.application, "Application 1") + self.assertEqual(old_token.administration, "Administration 1") + self.assertFalse(old_token.is_superuser) + self.assertEqual(old_token.object_types.count(), 2) + self.assertEqual(old_token_permissions.count(), 2) + object_type = ObjectType.objects.get( + uuid="3a82fb7f-fc9b-4104-9804-993f639d6d0d", service=self.service + ) + old_permission = old_token_permissions.get(object_type=object_type) + self.assertTrue(object_type in old_token.object_types.all()) + self.assertTrue(old_permission in old_token.permissions.all()) + self.assertEqual(old_permission.mode, "read_only") + self.assertTrue(old_permission.use_fields) + self.assertTrue(isinstance(old_permission.fields, dict)) + self.assertTrue(isinstance(old_permission.fields["1"], list)) + self.assertEqual(len(old_permission.fields.keys()), 1) + self.assertTrue("1" in old_permission.fields.keys()) + self.assertTrue("record__data__leeftijd" in old_permission.fields["1"]) + self.assertTrue("record__data__kiemjaar" in old_permission.fields["1"]) + + execute_single_step( + TokenAuthConfigurationStep, + yaml_source=str(DIR_FILES / "valid_setup_complete.yaml"), + ) + + tokens = TokenAuth.objects.all() + self.assertEqual(tokens.count(), 3) + self.assertEqual(Permission.objects.count(), 3) + new_token = tokens.get(identifier="token-1") + new_token_permissions = new_token.permissions.all() + self.assertEqual(new_token.token, old_token.token) + self.assertEqual(new_token.contact_person, old_token.contact_person) + self.assertEqual(new_token.email, old_token.email) + self.assertEqual(new_token.organization, old_token.organization) + self.assertEqual(new_token.application, old_token.application) + self.assertEqual(new_token.administration, old_token.administration) + self.assertFalse(new_token.is_superuser) + self.assertEqual(new_token.object_types.count(), 2) + self.assertEqual(new_token_permissions.count(), 2) + new_permission = new_token_permissions.get(object_type=object_type) + self.assertTrue(object_type in new_token.object_types.all()) + self.assertTrue(new_permission in new_token.permissions.all()) + self.assertEqual(new_permission.mode, "read_only") + self.assertTrue(new_permission.use_fields) + self.assertTrue(isinstance(new_permission.fields, dict)) + self.assertTrue(isinstance(new_permission.fields["1"], list)) + self.assertEqual(len(new_permission.fields.keys()), 1) + self.assertTrue("1" in new_permission.fields.keys()) + self.assertTrue("record__data__leeftijd" in new_permission.fields["1"]) + self.assertTrue("record__data__kiemjaar" in new_permission.fields["1"]) + + def test_invalid_permissions_object_type_does_not_exist(self): + self.assertFalse( + ObjectType.objects.filter( + uuid="69feca90-6c3d-4628-ace8-19e4b0ae4065", service=self.service + ).exists() + ) + object_source = { + "tokenauth_config_enable": True, + "tokenauth": { + "items": [ + { + "identifier": "token-1", + "token": "ba9d233e95e04c4a8a661a27daffe7c9bd019067", + "contact_person": "Person 1", + "email": "person-1@example.com", + "organization": "Organization 1", + "application": "Application 1", + "administration": "Administration 1", + "permissions": [ + { + "object_type": "69feca90-6c3d-4628-ace8-19e4b0ae4065", + "mode": "read_only", + "use_fields": True, + "fields": { + "1": [ + "record__data__leeftijd", + "record__data__kiemjaar", + ] + }, + }, + ], + }, + ], + }, + } + + with self.assertRaises(ConfigurationRunFailed) as command_error: + execute_single_step(TokenAuthConfigurationStep, object_source=object_source) + self.assertTrue( + "Object type with 69feca90-6c3d-4628-ace8-19e4b0ae4065 does not exist" + in str(command_error.exception) + ) + # Token was created without permissions + self.assertEqual(TokenAuth.objects.count(), 1) + self.assertEqual(Permission.objects.count(), 0) + + def test_invalid_permissions_mode_not_valid(self): + object_source = { + "tokenauth_config_enable": True, + "tokenauth": { + "items": [ + { + "identifier": "token-1", + "token": "ba9d233e95e04c4a8a661a27daffe7c9bd019067", + "contact_person": "Person 1", + "email": "person-1@example.com", + "organization": "Organization 1", + "application": "Application 1", + "administration": "Administration 1", + "permissions": [ + { + "object_type": "3a82fb7f-fc9b-4104-9804-993f639d6d0d", + "mode": "test", + "use_fields": True, + "fields": { + "1": [ + "record__data__leeftijd", + "record__data__kiemjaar", + ] + }, + }, + ], + }, + ], + }, + } + with self.assertRaises(PrerequisiteFailed) as command_error: + execute_single_step(TokenAuthConfigurationStep, object_source=object_source) + self.assertTrue( + "Input should be 'read_only' or 'read_and_write'" + in str(command_error.exception) + ) + self.assertEqual(TokenAuth.objects.count(), 0) + self.assertEqual(Permission.objects.count(), 0) + + def test_invalid_permissions_field_based_authorization(self): + object_source = { + "tokenauth_config_enable": True, + "tokenauth": { + "items": [ + { + "identifier": "token-1", + "token": "ba9d233e95e04c4a8a661a27daffe7c9bd019067", + "contact_person": "Person 1", + "email": "person-1@example.com", + "organization": "Organization 1", + "application": "Application 1", + "administration": "Administration 1", + "permissions": [ + { + "object_type": "3a82fb7f-fc9b-4104-9804-993f639d6d0d", + "mode": "read_and_write", + "use_fields": True, + "fields": { + "1": [ + "record__data__leeftijd", + "record__data__kiemjaar", + ] + }, + }, + ], + }, + ], + }, + } + with self.assertRaises(ConfigurationRunFailed) as command_error: + execute_single_step(TokenAuthConfigurationStep, object_source=object_source) + + self.assertTrue( + "Validation error(s) during instance cleaning" + in str(command_error.exception) + )