From 14d6196b4ab10fd62ce84560e146b9639c930a5e Mon Sep 17 00:00:00 2001 From: Rachid Mrad Date: Wed, 18 Dec 2024 18:07:43 -0500 Subject: [PATCH] lint --- .../tests/test_views_member_domains_json.py | 6 + src/registrar/tests/test_views_portfolio.py | 291 +++++++++++++++++- src/registrar/views/member_domains_json.py | 4 +- src/registrar/views/portfolios.py | 169 +++++----- 4 files changed, 395 insertions(+), 75 deletions(-) diff --git a/src/registrar/tests/test_views_member_domains_json.py b/src/registrar/tests/test_views_member_domains_json.py index c9f1e38cc..45b115842 100644 --- a/src/registrar/tests/test_views_member_domains_json.py +++ b/src/registrar/tests/test_views_member_domains_json.py @@ -94,6 +94,12 @@ def setUpClass(cls): DomainInvitation.objects.create( email=cls.invited_member_email, domain=cls.domain2, status=DomainInvitation.DomainInvitationStatus.INVITED ) + DomainInvitation.objects.create( + email=cls.invited_member_email, domain=cls.domain3, status=DomainInvitation.DomainInvitationStatus.CANCELED + ) + DomainInvitation.objects.create( + email=cls.invited_member_email, domain=cls.domain4, status=DomainInvitation.DomainInvitationStatus.RETRIEVED + ) @classmethod def tearDownClass(cls): diff --git a/src/registrar/tests/test_views_portfolio.py b/src/registrar/tests/test_views_portfolio.py index de27b7059..0ebb485c9 100644 --- a/src/registrar/tests/test_views_portfolio.py +++ b/src/registrar/tests/test_views_portfolio.py @@ -14,6 +14,7 @@ Suborganization, AllowedEmail, ) +from registrar.models.domain_invitation import DomainInvitation from registrar.models.portfolio_invitation import PortfolioInvitation from registrar.models.user_group import UserGroup from registrar.models.user_portfolio_permission import UserPortfolioPermission @@ -25,6 +26,7 @@ import boto3_mocking # type: ignore from django.test import Client import logging +import json logger = logging.getLogger(__name__) @@ -1927,7 +1929,7 @@ def setUpClass(cls): cls.portfolio = Portfolio.objects.create(creator=cls.user, organization_name="Test Portfolio") # Assign permissions to the user making requests - UserPortfolioPermission.objects.create( + cls.portfolio_permission = UserPortfolioPermission.objects.create( user=cls.user, portfolio=cls.portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], @@ -2106,10 +2108,15 @@ class TestPortfolioMemberDomainsEditView(TestPortfolioMemberDomainsView): @classmethod def setUpClass(cls): super().setUpClass() + cls.url = reverse("member-domains-edit", kwargs={"pk": cls.portfolio_permission.pk}) + names = ["1.gov", "2.gov", "3.gov"] + Domain.objects.bulk_create([Domain(name=name) for name in names]) @classmethod def tearDownClass(cls): super().tearDownClass() + UserDomainRole.objects.all().delete() + Domain.objects.all().delete() @less_console_noise_decorator @override_flag("organization_feature", active=True) @@ -2162,15 +2169,132 @@ def test_member_domains_edit_not_found(self): # Make sure the response is not found self.assertEqual(response.status_code, 404) + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_valid_added_domains(self): + """Test that domains can be successfully added.""" + self.client.force_login(self.user) + + data = { + "added_domains": json.dumps([1, 2, 3]), # Mock domain IDs + } + response = self.client.post(self.url, data) + + # Check that the UserDomainRole objects were created + self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 3) + + # Check for a success message and a redirect + self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.") + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_valid_removed_domains(self): + """Test that domains can be successfully removed.""" + self.client.force_login(self.user) + + # Create some UserDomainRole objects + domains = [1, 2, 3] + UserDomainRole.objects.bulk_create([UserDomainRole(domain_id=domain, user=self.user) for domain in domains]) + + data = { + "removed_domains": json.dumps([1, 2]), + } + response = self.client.post(self.url, data) + + # Check that the UserDomainRole objects were deleted + self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 1) + self.assertEqual(UserDomainRole.objects.filter(domain_id=3, user=self.user).count(), 1) + + # Check for a success message and a redirect + self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.") + + UserDomainRole.objects.all().delete() + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_invalid_added_domains_data(self): + """Test that an error is returned for invalid added domains data.""" + self.client.force_login(self.user) + + data = { + "added_domains": "json-statham", + } + response = self.client.post(self.url, data) + + # Check that no UserDomainRole objects were created + self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 0) + + # Check for an error message and a redirect + self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), "Invalid data for added domains.") + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_invalid_removed_domains_data(self): + """Test that an error is returned for invalid removed domains data.""" + self.client.force_login(self.user) + + data = { + "removed_domains": "not-a-json", + } + response = self.client.post(self.url, data) + + # Check that no UserDomainRole objects were deleted + self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 0) + + # Check for an error message and a redirect + self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), "Invalid data for removed domains.") + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_no_changes(self): + """Test that no changes message is displayed when no changes are made.""" + self.client.force_login(self.user) + + response = self.client.post(self.url, {}) + + # Check that no UserDomainRole objects were created or deleted + self.assertEqual(UserDomainRole.objects.filter(user=self.user).count(), 0) + + # Check for an info message and a redirect + self.assertRedirects(response, reverse("member-domains", kwargs={"pk": self.portfolio_permission.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), "No changes detected.") + class TestPortfolioInvitedMemberEditDomainsView(TestPortfolioInvitedMemberDomainsView): @classmethod def setUpClass(cls): super().setUpClass() + cls.url = reverse("invitedmember-domains-edit", kwargs={"pk": cls.invitation.pk}) + names = ["1.gov", "2.gov", "3.gov"] + Domain.objects.bulk_create([Domain(name=name) for name in names]) @classmethod def tearDownClass(cls): super().tearDownClass() + Domain.objects.all().delete() + + def tearDown(self): + return super().tearDown() + DomainInvitation.objects.all().delete() @less_console_noise_decorator @override_flag("organization_feature", active=True) @@ -2222,6 +2346,171 @@ def test_member_domains_edit_not_found(self): # Make sure the response is not found self.assertEqual(response.status_code, 404) + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_valid_added_domains(self): + """Test adding new domains successfully.""" + self.client.force_login(self.user) + + data = { + "added_domains": json.dumps([1, 2, 3]), # Mock domain IDs + } + response = self.client.post(self.url, data) + + # Check that the DomainInvitation objects were created + self.assertEqual( + DomainInvitation.objects.filter( + email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + ).count(), + 3, + ) + + # Check for a success message and a redirect + self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), "The domain assignment changes have been saved.") + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_existing_and_new_added_domains(self): + """Test updating existing and adding new invitations.""" + self.client.force_login(self.user) + + # Create existing invitations + DomainInvitation.objects.bulk_create( + [ + DomainInvitation( + domain_id=1, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.CANCELED + ), + DomainInvitation( + domain_id=2, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + ), + ] + ) + + data = { + "added_domains": json.dumps([1, 2, 3]), + } + response = self.client.post(self.url, data) + + # Check that status for domain_id=1 was updated to INVITED + self.assertEqual( + DomainInvitation.objects.get(domain_id=1, email="invited@example.com").status, + DomainInvitation.DomainInvitationStatus.INVITED, + ) + + # Check that domain_id=3 was created as INVITED + self.assertTrue( + DomainInvitation.objects.filter( + domain_id=3, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + ).exists() + ) + + # Check for a success message and a redirect + self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk})) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_valid_removed_domains(self): + """Test removing domains successfully.""" + self.client.force_login(self.user) + + # Create existing invitations + DomainInvitation.objects.bulk_create( + [ + DomainInvitation( + domain_id=1, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + ), + DomainInvitation( + domain_id=2, email="invited@example.com", status=DomainInvitation.DomainInvitationStatus.INVITED + ), + ] + ) + + data = { + "removed_domains": json.dumps([1]), + } + response = self.client.post(self.url, data) + + # Check that the status for domain_id=1 was updated to CANCELED + self.assertEqual( + DomainInvitation.objects.get(domain_id=1, email="invited@example.com").status, + DomainInvitation.DomainInvitationStatus.CANCELED, + ) + + # Check that domain_id=2 remains INVITED + self.assertEqual( + DomainInvitation.objects.get(domain_id=2, email="invited@example.com").status, + DomainInvitation.DomainInvitationStatus.INVITED, + ) + + # Check for a success message and a redirect + self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk})) + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_invalid_added_domains_data(self): + """Test handling of invalid JSON for added domains.""" + self.client.force_login(self.user) + + data = { + "added_domains": "not-a-json", + } + response = self.client.post(self.url, data) + + # Check that no DomainInvitation objects were created + self.assertEqual(DomainInvitation.objects.count(), 0) + + # Check for an error message and a redirect + self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), "Invalid data for added domains.") + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_invalid_removed_domains_data(self): + """Test handling of invalid JSON for removed domains.""" + self.client.force_login(self.user) + + data = { + "removed_domains": "json-sudeikis", + } + response = self.client.post(self.url, data) + + # Check that no DomainInvitation objects were updated + self.assertEqual(DomainInvitation.objects.count(), 0) + + # Check for an error message and a redirect + self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), "Invalid data for removed domains.") + + @less_console_noise_decorator + @override_flag("organization_feature", active=True) + @override_flag("organization_members", active=True) + def test_post_with_no_changes(self): + """Test the case where no changes are made.""" + self.client.force_login(self.user) + + response = self.client.post(self.url, {}) + + # Check that no DomainInvitation objects were created or updated + self.assertEqual(DomainInvitation.objects.count(), 0) + + # Check for an info message and a redirect + self.assertRedirects(response, reverse("invitedmember-domains", kwargs={"pk": self.invitation.pk})) + messages = list(response.wsgi_request._messages) + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), "No changes detected.") + class TestRequestingEntity(WebTest): """The requesting entity page is a domain request form that only exists diff --git a/src/registrar/views/member_domains_json.py b/src/registrar/views/member_domains_json.py index 007730166..3d24336bb 100644 --- a/src/registrar/views/member_domains_json.py +++ b/src/registrar/views/member_domains_json.py @@ -90,7 +90,9 @@ def get_domain_ids_from_request(self, request): domain_info_ids = DomainInformation.objects.filter(portfolio=portfolio).values_list( "domain_id", flat=True ) - domain_invitations = DomainInvitation.objects.filter(email=email, status=DomainInvitation.DomainInvitationStatus.INVITED).values_list("domain_id", flat=True) + domain_invitations = DomainInvitation.objects.filter( + email=email, status=DomainInvitation.DomainInvitationStatus.INVITED + ).values_list("domain_id", flat=True) return domain_info_ids.intersection(domain_invitations) else: domain_infos = DomainInformation.objects.filter(portfolio=portfolio) diff --git a/src/registrar/views/portfolios.py b/src/registrar/views/portfolios.py index 8482f2b01..043d1e4a3 100644 --- a/src/registrar/views/portfolios.py +++ b/src/registrar/views/portfolios.py @@ -219,7 +219,7 @@ def get(self, request, pk): "member": member, }, ) - + def post(self, request, pk): """ Handles adding and removing domains for a portfolio member. @@ -229,41 +229,23 @@ def post(self, request, pk): portfolio_permission = get_object_or_404(UserPortfolioPermission, pk=pk) member = portfolio_permission.user - try: - added_domain_ids = json.loads(added_domains) if added_domains else [] - except json.JSONDecodeError: - messages.error(request, "Invalid data for added domains.") + added_domain_ids = self._parse_domain_ids(added_domains, "added domains") + if added_domain_ids is None: return redirect(reverse("member-domains", kwargs={"pk": pk})) - try: - removed_domain_ids = json.loads(removed_domains) if removed_domains else [] - except json.JSONDecodeError: - messages.error(request, "Invalid data for removed domains.") + removed_domain_ids = self._parse_domain_ids(removed_domains, "removed domains") + if removed_domain_ids is None: return redirect(reverse("member-domains", kwargs={"pk": pk})) if added_domain_ids or removed_domain_ids: try: - if added_domain_ids: - # Bulk create UserDomainRole instances for added domains - UserDomainRole.objects.bulk_create( - [ - UserDomainRole(domain_id=domain_id, user=member) - for domain_id in added_domain_ids - ], - ignore_conflicts=True, # Avoid duplicate entries - ) - - if removed_domain_ids: - # Delete UserDomainRole instances for removed domains - UserDomainRole.objects.filter(domain_id__in=removed_domain_ids, user=member).delete() - + self._process_added_domains(added_domain_ids, member) + self._process_removed_domains(removed_domain_ids, member) messages.success(request, "The domain assignment changes have been saved.") return redirect(reverse("member-domains", kwargs={"pk": pk})) - except IntegrityError: messages.error(request, "A database error occurred while saving changes.") return redirect(reverse("member-domains-edit", kwargs={"pk": pk})) - except Exception as e: messages.error(request, f"An unexpected error occurred: {str(e)}") return redirect(reverse("member-domains-edit", kwargs={"pk": pk})) @@ -271,6 +253,34 @@ def post(self, request, pk): messages.info(request, "No changes detected.") return redirect(reverse("member-domains", kwargs={"pk": pk})) + def _parse_domain_ids(self, domain_data, domain_type): + """ + Parses the domain IDs from the request and handles JSON errors. + """ + try: + return json.loads(domain_data) if domain_data else [] + except json.JSONDecodeError: + messages.error(self.request, f"Invalid data for {domain_type}.") + return None + + def _process_added_domains(self, added_domain_ids, member): + """ + Processes added domains by bulk creating UserDomainRole instances. + """ + if added_domain_ids: + # Bulk create UserDomainRole instances for added domains + UserDomainRole.objects.bulk_create( + [UserDomainRole(domain_id=domain_id, user=member) for domain_id in added_domain_ids], + ignore_conflicts=True, # Avoid duplicate entries + ) + + def _process_removed_domains(self, removed_domain_ids, member): + """ + Processes removed domains by deleting corresponding UserDomainRole instances. + """ + if removed_domain_ids: + # Delete UserDomainRole instances for removed domains + UserDomainRole.objects.filter(domain_id__in=removed_domain_ids, user=member).delete() class PortfolioInvitedMemberView(PortfolioMemberPermissionView, View): @@ -396,72 +406,33 @@ def get(self, request, pk): "portfolio_invitation": portfolio_invitation, }, ) - + def post(self, request, pk): """ Handles adding and removing domains for a portfolio invitee. - - Instead of deleting DomainInvitations, we move their status to CANCELED. - Instead of adding DomainIncitations, we first do a lookup and if the invite exists - we change its status to INVITED, otherwise we do a create. """ added_domains = request.POST.get("added_domains") removed_domains = request.POST.get("removed_domains") portfolio_invitation = get_object_or_404(PortfolioInvitation, pk=pk) email = portfolio_invitation.email - try: - added_domain_ids = json.loads(added_domains) if added_domains else [] - except json.JSONDecodeError: - messages.error(request, "Invalid data for added domains.") - return redirect(reverse("member-domains", kwargs={"pk": pk})) + added_domain_ids = self._parse_domain_ids(added_domains, "added domains") + if added_domain_ids is None: + return redirect(reverse("invitedmember-domains", kwargs={"pk": pk})) - try: - removed_domain_ids = json.loads(removed_domains) if removed_domains else [] - except json.JSONDecodeError: - messages.error(request, "Invalid data for removed domains.") - return redirect(reverse("member-domains", kwargs={"pk": pk})) + removed_domain_ids = self._parse_domain_ids(removed_domains, "removed domains") + if removed_domain_ids is None: + return redirect(reverse("invitedmember-domains", kwargs={"pk": pk})) if added_domain_ids or removed_domain_ids: try: - if added_domain_ids: - # Get existing invitations for the added domains - existing_invitations = DomainInvitation.objects.filter( - domain_id__in=added_domain_ids, email=email - ) - - # Update existing invitations from CANCELED to INVITED - existing_invitations.update(status=DomainInvitation.DomainInvitationStatus.INVITED) - - # Determine which domains need new invitations - existing_domain_ids = existing_invitations.values_list("domain_id", flat=True) - new_domain_ids = set(added_domain_ids) - set(existing_domain_ids) - - # Bulk create new invitations for domains without existing invitations - DomainInvitation.objects.bulk_create( - [ - DomainInvitation( - domain_id=domain_id, - email=email, - status=DomainInvitation.DomainInvitationStatus.INVITED, - ) - for domain_id in new_domain_ids - ] - ) - - if removed_domain_ids: - # Bulk update invitations for removed domains from INVITED to CANCELED - DomainInvitation.objects.filter( - domain_id__in=removed_domain_ids, email=email, status=DomainInvitation.DomainInvitationStatus.INVITED - ).update(status=DomainInvitation.DomainInvitationStatus.CANCELED) - + self._process_added_domains(added_domain_ids, email) + self._process_removed_domains(removed_domain_ids, email) messages.success(request, "The domain assignment changes have been saved.") return redirect(reverse("invitedmember-domains", kwargs={"pk": pk})) - except IntegrityError: messages.error(request, "A database error occurred while saving changes.") return redirect(reverse("invitedmember-domains-edit", kwargs={"pk": pk})) - except Exception as e: messages.error(request, f"An unexpected error occurred: {str(e)}") return redirect(reverse("invitedmember-domains-edit", kwargs={"pk": pk})) @@ -469,6 +440,58 @@ def post(self, request, pk): messages.info(request, "No changes detected.") return redirect(reverse("invitedmember-domains", kwargs={"pk": pk})) + def _parse_domain_ids(self, domain_data, domain_type): + """ + Parses the domain IDs from the request and handles JSON errors. + """ + try: + return json.loads(domain_data) if domain_data else [] + except json.JSONDecodeError: + messages.error(self.request, f"Invalid data for {domain_type}.") + return None + + def _process_added_domains(self, added_domain_ids, email): + """ + Processes added domain invitations by updating existing invitations + or creating new ones. + """ + if not added_domain_ids: + return + + # Update existing invitations from CANCELED to INVITED + existing_invitations = DomainInvitation.objects.filter(domain_id__in=added_domain_ids, email=email) + existing_invitations.update(status=DomainInvitation.DomainInvitationStatus.INVITED) + + # Determine which domains need new invitations + existing_domain_ids = existing_invitations.values_list("domain_id", flat=True) + new_domain_ids = set(added_domain_ids) - set(existing_domain_ids) + + # Bulk create new invitations + DomainInvitation.objects.bulk_create( + [ + DomainInvitation( + domain_id=domain_id, + email=email, + status=DomainInvitation.DomainInvitationStatus.INVITED, + ) + for domain_id in new_domain_ids + ] + ) + + def _process_removed_domains(self, removed_domain_ids, email): + """ + Processes removed domain invitations by updating their status to CANCELED. + """ + if not removed_domain_ids: + return + + # Update invitations from INVITED to CANCELED + DomainInvitation.objects.filter( + domain_id__in=removed_domain_ids, + email=email, + status=DomainInvitation.DomainInvitationStatus.INVITED, + ).update(status=DomainInvitation.DomainInvitationStatus.CANCELED) + class PortfolioNoDomainsView(NoPortfolioDomainsPermissionView, View): """Some users have access to the underlying portfolio, but not any domains.