diff --git a/docs/developer/README.md b/docs/developer/README.md index 358df649c..9ddb35352 100644 --- a/docs/developer/README.md +++ b/docs/developer/README.md @@ -97,6 +97,7 @@ While on production (the sandbox referred to as `stable`), an existing analyst o "username": "", "first_name": "", "last_name": "", + "email": "", }, ... ] @@ -121,6 +122,7 @@ Analysts are a variant of the admin role with limited permissions. The process f "username": "", "first_name": "", "last_name": "", + "email": "", }, ... ] @@ -131,6 +133,20 @@ Analysts are a variant of the admin role with limited permissions. The process f Do note that if you wish to have both an analyst and admin account, append `-Analyst` to your first and last name, or use a completely different first/last name to avoid confusion. Example: `Bob-Analyst` +## Adding an email address to the email whitelist (sandboxes only) +On all non-production environments, we use an email whitelist table (called `Allowed emails`). This whitelist is not case sensitive, and it provides an inclusion for +1 emails (like example.person+1@igorville.gov). The content after the `+` can be any _digit_. The whitelist checks for the "base" email (example.person) so even if you only have the +1 email defined, an email will still be sent assuming that it follows those conventions. + +To add yourself to this, you can go about it in three ways. + +Permanent (all sandboxes): +1. In src/registrar/fixtures_users.py, add the "email" field to your user in either the ADMIN or STAFF table. +2. In src/registrar/fixtures_users.py, add the desired email address to the `ADDITIONAL_ALLOWED_EMAILS` list. This route is suggested for product. + +Sandbox specific (wiped when the db is reset): +3. Create a new record on the `Allowed emails` table with your email address. This can be done through django admin. + +More detailed instructions regarding #3 can be found [here](https://docs.google.com/document/d/1ebIz4PcUuoiT7LlVy83EAyHAk_nWPEc99neMp4QjzDs). + ## Adding to CODEOWNERS (optional) The CODEOWNERS file sets the tagged individuals as default reviewers on any Pull Request that changes files that they are marked as owners of. diff --git a/src/registrar/admin.py b/src/registrar/admin.py index 11a41a22d..6b42cf96b 100644 --- a/src/registrar/admin.py +++ b/src/registrar/admin.py @@ -7,6 +7,7 @@ from django.db.models import Value, CharField, Q from django.db.models.functions import Concat, Coalesce from django.http import HttpResponseRedirect +from django.conf import settings from django.shortcuts import redirect from django_fsm import get_available_FIELD_transitions, FSMField from registrar.models.domain_information import DomainInformation @@ -1965,6 +1966,9 @@ def save_model(self, request, obj, form, change): else: obj.action_needed_reason_email = default_email + if obj.status in DomainRequest.get_statuses_that_send_emails() and not settings.IS_PRODUCTION: + self._check_for_valid_email(request, obj) + # == Handle status == # if obj.status == original_obj.status: # If the status hasn't changed, let the base function take care of it @@ -1977,6 +1981,29 @@ def save_model(self, request, obj, form, change): if should_save: return super().save_model(request, obj, form, change) + def _check_for_valid_email(self, request, obj): + """Certain emails are whitelisted in non-production environments, + so we should display that information using this function. + + """ + + # TODO 2574: remove lines 1977-1978 (refactor as needed) + profile_flag = flag_is_active(request, "profile_feature") + if profile_flag and hasattr(obj, "creator"): + recipient = obj.creator + elif not profile_flag and hasattr(obj, "submitter"): + recipient = obj.submitter + else: + recipient = None + + # Displays a warning in admin when an email cannot be sent + if recipient and recipient.email: + email = recipient.email + allowed = models.AllowedEmail.is_allowed_email(email) + error_message = f"Could not send email. The email '{email}' does not exist within the whitelist." + if not allowed: + messages.warning(request, error_message) + def _handle_status_change(self, request, obj, original_obj): """ Checks for various conditions when a status change is triggered. @@ -3253,6 +3280,16 @@ def change_view(self, request, object_id, form_url="", extra_context=None): return super().change_view(request, object_id, form_url, extra_context) +class AllowedEmailAdmin(ListHeaderAdmin): + class Meta: + model = models.AllowedEmail + + list_display = ["email"] + search_fields = ["email"] + search_help_text = "Search by email." + ordering = ["email"] + + admin.site.unregister(LogEntry) # Unregister the default registration admin.site.register(LogEntry, CustomLogEntryAdmin) @@ -3281,6 +3318,7 @@ def change_view(self, request, object_id, form_url="", extra_context=None): admin.site.register(models.Suborganization, SuborganizationAdmin) admin.site.register(models.SeniorOfficial, SeniorOfficialAdmin) admin.site.register(models.UserPortfolioPermission, UserPortfolioPermissionAdmin) +admin.site.register(models.AllowedEmail, AllowedEmailAdmin) # Register our custom waffle implementations admin.site.register(models.WaffleFlag, WaffleFlagAdmin) diff --git a/src/registrar/fixtures_users.py b/src/registrar/fixtures_users.py index 0fc203248..8be3e13a2 100644 --- a/src/registrar/fixtures_users.py +++ b/src/registrar/fixtures_users.py @@ -6,6 +6,7 @@ User, UserGroup, ) +from registrar.models.allowed_email import AllowedEmail fake = Faker() @@ -32,6 +33,7 @@ class UserFixture: "username": "aad084c3-66cc-4632-80eb-41cdf5c5bcbf", "first_name": "Aditi", "last_name": "Green", + "email": "aditidevelops+01@gmail.com", }, { "username": "be17c826-e200-4999-9389-2ded48c43691", @@ -42,11 +44,13 @@ class UserFixture: "username": "5f283494-31bd-49b5-b024-a7e7cae00848", "first_name": "Rachid", "last_name": "Mrad", + "email": "rachid.mrad@associates.cisa.dhs.gov", }, { "username": "eb2214cd-fc0c-48c0-9dbd-bc4cd6820c74", "first_name": "Alysia", "last_name": "Broddrick", + "email": "abroddrick@truss.works", }, { "username": "8f8e7293-17f7-4716-889b-1990241cbd39", @@ -63,6 +67,7 @@ class UserFixture: "username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c", "first_name": "Cameron", "last_name": "Dixon", + "email": "cameron.dixon@cisa.dhs.gov", }, { "username": "0353607a-cbba-47d2-98d7-e83dcd5b90ea", @@ -83,16 +88,19 @@ class UserFixture: "username": "2a88a97b-be96-4aad-b99e-0b605b492c78", "first_name": "Rebecca", "last_name": "Hsieh", + "email": "rebecca.hsieh@truss.works", }, { "username": "fa69c8e8-da83-4798-a4f2-263c9ce93f52", "first_name": "David", "last_name": "Kennedy", + "email": "david.kennedy@ecstech.com", }, { "username": "f14433d8-f0e9-41bf-9c72-b99b110e665d", "first_name": "Nicolle", "last_name": "LeClair", + "email": "nicolle.leclair@ecstech.com", }, { "username": "24840450-bf47-4d89-8aa9-c612fe68f9da", @@ -141,6 +149,7 @@ class UserFixture: "username": "ffec5987-aa84-411b-a05a-a7ee5cbcde54", "first_name": "Aditi-Analyst", "last_name": "Green-Analyst", + "email": "aditidevelops+02@gmail.com", }, { "username": "d6bf296b-fac5-47ff-9c12-f88ccc5c1b99", @@ -183,6 +192,7 @@ class UserFixture: "username": "5dc6c9a6-61d9-42b4-ba54-4beff28bac3c", "first_name": "David-Analyst", "last_name": "Kennedy-Analyst", + "email": "david.kennedy@associates.cisa.dhs.gov", }, { "username": "0eb6f326-a3d4-410f-a521-aa4c1fad4e47", @@ -194,7 +204,7 @@ class UserFixture: "username": "cfe7c2fc-e24a-480e-8b78-28645a1459b3", "first_name": "Nicolle-Analyst", "last_name": "LeClair-Analyst", - "email": "nicolle.leclair@ecstech.com", + "email": "nicolle.leclair@gmail.com", }, { "username": "378d0bc4-d5a7-461b-bd84-3ae6f6864af9", @@ -240,6 +250,9 @@ class UserFixture: }, ] + # Additional emails to add to the AllowedEmail whitelist. + ADDITIONAL_ALLOWED_EMAILS: list[str] = ["davekenn4242@gmail.com", "rachid_mrad@hotmail.com"] + def load_users(cls, users, group_name, are_superusers=False): logger.info(f"Going to load {len(users)} users in group {group_name}") for user_data in users: @@ -264,6 +277,32 @@ def load_users(cls, users, group_name, are_superusers=False): logger.warning(e) logger.info(f"All users in group {group_name} loaded.") + def load_allowed_emails(cls, users, additional_emails): + """Populates a whitelist of allowed emails (as defined in this list)""" + logger.info(f"Going to load allowed emails for {len(users)} users") + if additional_emails: + logger.info(f"Going to load {len(additional_emails)} additional allowed emails") + + # Load user emails + allowed_emails = [] + for user_data in users: + user_email = user_data.get("email") + if user_email and user_email not in allowed_emails: + allowed_emails.append(AllowedEmail(email=user_email)) + else: + first_name = user_data.get("first_name") + last_name = user_data.get("last_name") + logger.warning(f"Could not add email to whitelist for {first_name} {last_name}.") + + # Load additional emails + allowed_emails.extend(additional_emails) + + if allowed_emails: + AllowedEmail.objects.bulk_create(allowed_emails) + logger.info(f"Loaded {len(allowed_emails)} allowed emails") + else: + logger.info("No allowed emails to load") + @classmethod def load(cls): # Lumped under .atomic to ensure we don't make redundant DB calls. @@ -275,3 +314,7 @@ def load(cls): with transaction.atomic(): cls.load_users(cls, cls.ADMINS, "full_access_group", are_superusers=True) cls.load_users(cls, cls.STAFF, "cisa_analysts_group") + + # Combine ADMINS and STAFF lists + all_users = cls.ADMINS + cls.STAFF + cls.load_allowed_emails(cls, all_users, additional_emails=cls.ADDITIONAL_ALLOWED_EMAILS) diff --git a/src/registrar/migrations/0121_allowedemail.py b/src/registrar/migrations/0121_allowedemail.py new file mode 100644 index 000000000..ebed1ac15 --- /dev/null +++ b/src/registrar/migrations/0121_allowedemail.py @@ -0,0 +1,25 @@ +# Generated by Django 4.2.10 on 2024-08-29 18:16 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("registrar", "0120_add_domainrequest_submission_dates"), + ] + + operations = [ + migrations.CreateModel( + name="AllowedEmail", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ("email", models.EmailField(max_length=320, unique=True)), + ], + options={ + "abstract": False, + }, + ), + ] diff --git a/src/registrar/migrations/0122_create_groups_v16.py b/src/registrar/migrations/0122_create_groups_v16.py new file mode 100644 index 000000000..82c750976 --- /dev/null +++ b/src/registrar/migrations/0122_create_groups_v16.py @@ -0,0 +1,37 @@ +# This migration creates the create_full_access_group and create_cisa_analyst_group groups +# It is dependent on 0079 (which populates federal agencies) +# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS +# in the user_group model then: +# [NOT RECOMMENDED] +# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions +# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups +# step 3: fake run the latest migration in the migrations list +# [RECOMMENDED] +# Alternatively: +# step 1: duplicate the migration that loads data +# step 2: docker-compose exec app ./manage.py migrate + +from django.db import migrations +from registrar.models import UserGroup +from typing import Any + + +# For linting: RunPython expects a function reference, +# so let's give it one +def create_groups(apps, schema_editor) -> Any: + UserGroup.create_cisa_analyst_group(apps, schema_editor) + UserGroup.create_full_access_group(apps, schema_editor) + + +class Migration(migrations.Migration): + dependencies = [ + ("registrar", "0121_allowedemail"), + ] + + operations = [ + migrations.RunPython( + create_groups, + reverse_code=migrations.RunPython.noop, + atomic=True, + ), + ] diff --git a/src/registrar/models/__init__.py b/src/registrar/models/__init__.py index c1023cafe..a1738cc76 100644 --- a/src/registrar/models/__init__.py +++ b/src/registrar/models/__init__.py @@ -22,6 +22,7 @@ from .suborganization import Suborganization from .senior_official import SeniorOfficial from .user_portfolio_permission import UserPortfolioPermission +from .allowed_email import AllowedEmail __all__ = [ @@ -48,6 +49,7 @@ "Suborganization", "SeniorOfficial", "UserPortfolioPermission", + "AllowedEmail", ] auditlog.register(Contact) @@ -73,3 +75,4 @@ auditlog.register(Suborganization) auditlog.register(SeniorOfficial) auditlog.register(UserPortfolioPermission) +auditlog.register(AllowedEmail) diff --git a/src/registrar/models/allowed_email.py b/src/registrar/models/allowed_email.py new file mode 100644 index 000000000..6622bcc55 --- /dev/null +++ b/src/registrar/models/allowed_email.py @@ -0,0 +1,52 @@ +from django.db import models +from django.db.models import Q +import re +from .utility.time_stamped_model import TimeStampedModel + + +class AllowedEmail(TimeStampedModel): + """ + AllowedEmail is a whitelist for email addresses that we can send to + in non-production environments. + """ + + email = models.EmailField( + unique=True, + null=False, + blank=False, + max_length=320, + ) + + @classmethod + def is_allowed_email(cls, email): + """Given an email, check if this email exists within our AllowEmail whitelist""" + + if not email: + return False + + # Split the email into a local part and a domain part + local, domain = email.split("@") + + # If the email exists within the whitelist, then do nothing else. + email_exists = cls.objects.filter(email__iexact=email).exists() + if email_exists: + return True + + # Check if there's a '+' in the local part + if "+" in local: + base_local = local.split("+")[0] + base_email_exists = cls.objects.filter(Q(email__iexact=f"{base_local}@{domain}")).exists() + + # Given an example email, such as "joe.smoe+1@igorville.com" + # The full regex statement will be: "^joe.smoe\\+\\d+@igorville.com$" + pattern = f"^{re.escape(base_local)}\\+\\d+@{re.escape(domain)}$" + return base_email_exists and re.match(pattern, email) + else: + # Edge case, the +1 record exists but the base does not, + # and the record we are checking is the base record. + pattern = f"^{re.escape(local)}\\+\\d+@{re.escape(domain)}$" + plus_email_exists = cls.objects.filter(Q(email__iregex=pattern)).exists() + return plus_email_exists + + def __str__(self): + return str(self.email) diff --git a/src/registrar/models/domain_request.py b/src/registrar/models/domain_request.py index 7ee80e43a..b80e063cd 100644 --- a/src/registrar/models/domain_request.py +++ b/src/registrar/models/domain_request.py @@ -594,6 +594,12 @@ def get_action_needed_reason_label(cls, action_needed_reason: str): blank=True, ) + @classmethod + def get_statuses_that_send_emails(cls): + """Returns a list of statuses that send an email to the user""" + excluded_statuses = [cls.DomainRequestStatus.INELIGIBLE, cls.DomainRequestStatus.IN_REVIEW] + return [status for status in cls.DomainRequestStatus if status not in excluded_statuses] + def sync_organization_type(self): """ Updates the organization_type (without saving) to match diff --git a/src/registrar/templates/admin/model_descriptions.html b/src/registrar/templates/admin/model_descriptions.html index 4b61e21bd..9f13245fe 100644 --- a/src/registrar/templates/admin/model_descriptions.html +++ b/src/registrar/templates/admin/model_descriptions.html @@ -32,6 +32,8 @@ {% include "django/admin/includes/descriptions/website_description.html" %} {% elif opts.model_name == 'portfolioinvitation' %} {% include "django/admin/includes/descriptions/portfolio_invitation_description.html" %} + {% elif opts.model_name == 'allowedemail' %} + {% include "django/admin/includes/descriptions/allowed_email_description.html" %} {% else %}

This table does not have a description yet.

{% endif %} diff --git a/src/registrar/templates/django/admin/includes/descriptions/allowed_email_description.html b/src/registrar/templates/django/admin/includes/descriptions/allowed_email_description.html new file mode 100644 index 000000000..5ec5a4906 --- /dev/null +++ b/src/registrar/templates/django/admin/includes/descriptions/allowed_email_description.html @@ -0,0 +1,6 @@ +

This table is an email whitelist for non-production environments.

+

+ If an email is sent out and the email does not exist within this table (or is not a subset of it), + then no email will be sent. +

+

If this table is populated in a production environment, no change will occur as it will simply be ignored.

diff --git a/src/registrar/tests/test_admin_request.py b/src/registrar/tests/test_admin_request.py index b1169a9ef..15cc18404 100644 --- a/src/registrar/tests/test_admin_request.py +++ b/src/registrar/tests/test_admin_request.py @@ -23,6 +23,7 @@ Website, SeniorOfficial, Portfolio, + AllowedEmail, ) from .common import ( MockSESClient, @@ -70,6 +71,8 @@ def setUpClass(self): model=DomainRequest, ) self.mock_client = MockSESClient() + allowed_emails = [AllowedEmail(email="mayor@igorville.gov"), AllowedEmail(email="help@get.gov")] + AllowedEmail.objects.bulk_create(allowed_emails) def tearDown(self): super().tearDown() @@ -86,6 +89,7 @@ def tearDown(self): def tearDownClass(self): super().tearDownClass() User.objects.all().delete() + AllowedEmail.objects.all().delete() @less_console_noise_decorator def test_domain_request_senior_official_is_alphabetically_sorted(self): @@ -599,7 +603,8 @@ def transition_state_and_send_email( ): """Helper method for the email test cases.""" - with boto3_mocking.clients.handler_for("sesv2", self.mock_client): + with boto3_mocking.clients.handler_for("sesv2", self.mock_client), ExitStack() as stack: + stack.enter_context(patch.object(messages, "warning")) # Create a mock request request = self.factory.post("/admin/registrar/domainrequest/{}/change/".format(domain_request.pk)) @@ -626,7 +631,8 @@ def assert_email_is_accurate( ): """Helper method for the email test cases. email_index is the index of the email in mock_client.""" - + AllowedEmail.objects.get_or_create(email=email_address) + AllowedEmail.objects.get_or_create(email=bcc_email_address) with less_console_noise(): # Access the arguments passed to send_email call_args = self.mock_client.EMAILS_SENT @@ -1174,6 +1180,7 @@ def test_transition_to_rejected_without_rejection_reason_does_trigger_error(self with ExitStack() as stack: stack.enter_context(patch.object(messages, "error")) + stack.enter_context(patch.object(messages, "warning")) domain_request.status = DomainRequest.DomainRequestStatus.REJECTED self.admin.save_model(request, domain_request, None, True) @@ -1202,6 +1209,7 @@ def test_transition_to_rejected_with_rejection_reason_does_not_trigger_error(sel with ExitStack() as stack: stack.enter_context(patch.object(messages, "error")) + stack.enter_context(patch.object(messages, "warning")) domain_request.status = DomainRequest.DomainRequestStatus.REJECTED domain_request.rejection_reason = DomainRequest.RejectionReasons.CONTACTS_OR_ORGANIZATION_LEGITIMACY @@ -1253,11 +1261,13 @@ def test_save_model_sets_approved_domain(self): request = self.factory.post("/admin/registrar/domainrequest/{}/change/".format(domain_request.pk)) with boto3_mocking.clients.handler_for("sesv2", self.mock_client): - # Modify the domain request's property - domain_request.status = DomainRequest.DomainRequestStatus.APPROVED + with ExitStack() as stack: + stack.enter_context(patch.object(messages, "warning")) + # Modify the domain request's property + domain_request.status = DomainRequest.DomainRequestStatus.APPROVED - # Use the model admin's save_model method - self.admin.save_model(request, domain_request, form=None, change=True) + # Use the model admin's save_model method + self.admin.save_model(request, domain_request, form=None, change=True) # Test that approved domain exists and equals requested domain self.assertEqual(domain_request.requested_domain.name, domain_request.approved_domain.name) @@ -1717,6 +1727,8 @@ def custom_is_active(self): # Patch Domain.is_active and django.contrib.messages.error simultaneously stack.enter_context(patch.object(Domain, "is_active", custom_is_active)) stack.enter_context(patch.object(messages, "error")) + stack.enter_context(patch.object(messages, "warning")) + stack.enter_context(patch.object(messages, "success")) domain_request.status = another_state diff --git a/src/registrar/tests/test_emails.py b/src/registrar/tests/test_emails.py index 8cf707004..e699d9b75 100644 --- a/src/registrar/tests/test_emails.py +++ b/src/registrar/tests/test_emails.py @@ -2,11 +2,12 @@ from unittest.mock import MagicMock -from django.test import TestCase +from django.test import TestCase, override_settings from waffle.testutils import override_flag from registrar.utility import email from registrar.utility.email import send_templated_email from .common import completed_domain_request +from registrar.models import AllowedEmail from api.tests.common import less_console_noise_decorator from datetime import datetime @@ -14,10 +15,33 @@ class TestEmails(TestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + allowed_emails = [ + AllowedEmail(email="doesnotexist@igorville.com"), + AllowedEmail(email="testy@town.com"), + AllowedEmail(email="mayor@igorville.gov"), + AllowedEmail(email="testy2@town.com"), + AllowedEmail(email="cisaRep@igorville.gov"), + AllowedEmail(email="sender@example.com"), + AllowedEmail(email="recipient@example.com"), + ] + AllowedEmail.objects.bulk_create(allowed_emails) + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + AllowedEmail.objects.all().delete() + def setUp(self): self.mock_client_class = MagicMock() self.mock_client = self.mock_client_class.return_value + def tearDown(self): + super().tearDown() + @boto3_mocking.patching @override_flag("disable_email_sending", active=True) @less_console_noise_decorator @@ -235,3 +259,59 @@ def test_send_email_with_attachment(self): self.assertIn("Content-Transfer-Encoding: base64", call_args["RawMessage"]["Data"]) self.assertIn("Content-Disposition: attachment;", call_args["RawMessage"]["Data"]) self.assertNotIn("Attachment file content", call_args["RawMessage"]["Data"]) + + +class TestAllowedEmail(TestCase): + """Tests our allowed email whitelist""" + + def setUp(self): + self.mock_client_class = MagicMock() + self.mock_client = self.mock_client_class.return_value + self.email = "mayor@igorville.gov" + self.email_2 = "cake@igorville.gov" + self.plus_email = "mayor+1@igorville.gov" + self.invalid_plus_email = "1+mayor@igorville.gov" + + def tearDown(self): + super().tearDown() + AllowedEmail.objects.all().delete() + + @boto3_mocking.patching + @override_settings(IS_PRODUCTION=True) + @less_console_noise_decorator + def test_email_whitelist_disabled_in_production(self): + """Tests if the whitelist is disabled in production""" + + # Ensure that the given email isn't in the whitelist + is_in_whitelist = AllowedEmail.objects.filter(email=self.email).exists() + self.assertFalse(is_in_whitelist) + + # The submit should work as normal + domain_request = completed_domain_request(has_anything_else=False) + with boto3_mocking.clients.handler_for("sesv2", self.mock_client_class): + domain_request.submit() + _, kwargs = self.mock_client.send_email.call_args + body = kwargs["Content"]["Simple"]["Body"]["Text"]["Data"] + self.assertNotIn("Anything else", body) + # spacing should be right between adjacent elements + self.assertRegex(body, r"5557\n\n----") + + @boto3_mocking.patching + @override_settings(IS_PRODUCTION=False) + @less_console_noise_decorator + def test_email_whitelist(self): + """Tests the email whitelist is enabled elsewhere""" + with boto3_mocking.clients.handler_for("sesv2", self.mock_client_class): + expected_message = "Could not send email. " + "The email 'doesnotexist@igorville.com' does not exist within the whitelist." + with self.assertRaisesRegex(email.EmailSendingError, expected_message): + send_templated_email( + "test content", + "test subject", + "doesnotexist@igorville.com", + context={"domain_request": self}, + bcc_address=None, + ) + + # Assert that an email wasn't sent + self.assertFalse(self.mock_client.send_email.called) diff --git a/src/registrar/tests/test_models.py b/src/registrar/tests/test_models.py index 9f55fced1..3a1b21ee2 100644 --- a/src/registrar/tests/test_models.py +++ b/src/registrar/tests/test_models.py @@ -18,6 +18,7 @@ UserDomainRole, FederalAgency, UserPortfolioPermission, + AllowedEmail, ) import boto3_mocking @@ -236,7 +237,7 @@ def check_email_sent( self, domain_request, msg, action, expected_count, expected_content=None, expected_email="mayor@igorville.com" ): """Check if an email was sent after performing an action.""" - + email_allowed, _ = AllowedEmail.objects.get_or_create(email=expected_email) with self.subTest(msg=msg, action=action): with boto3_mocking.clients.handler_for("sesv2", self.mock_client): # Perform the specified action @@ -255,6 +256,8 @@ def check_email_sent( email_content = sent_emails[0]["kwargs"]["Content"]["Simple"]["Body"]["Text"]["Data"] self.assertIn(expected_content, email_content) + email_allowed.delete() + @override_flag("profile_feature", active=False) @less_console_noise_decorator def test_submit_from_started_sends_email(self): @@ -2445,3 +2448,103 @@ def test_can_add_urbanization_field(self): self.assertEqual(portfolio.urbanization, "test123") self.assertEqual(portfolio.state_territory, DomainRequest.StateTerritoryChoices.PUERTO_RICO) + + +class TestAllowedEmail(TestCase): + """Tests our allowed email whitelist""" + + @less_console_noise_decorator + def setUp(self): + self.email = "mayor@igorville.gov" + self.email_2 = "cake@igorville.gov" + self.plus_email = "mayor+1@igorville.gov" + self.invalid_plus_email = "1+mayor@igorville.gov" + + def tearDown(self): + super().tearDown() + AllowedEmail.objects.all().delete() + + def test_email_in_whitelist(self): + """Test for a normal email defined in the whitelist""" + AllowedEmail.objects.create(email=self.email) + is_allowed = AllowedEmail.is_allowed_email(self.email) + self.assertTrue(is_allowed) + + def test_email_not_in_whitelist(self): + """Test for a normal email NOT defined in the whitelist""" + # Check a email not in the list + is_allowed = AllowedEmail.is_allowed_email(self.email_2) + self.assertFalse(AllowedEmail.objects.filter(email=self.email_2).exists()) + self.assertFalse(is_allowed) + + def test_plus_email_in_whitelist(self): + """Test for a +1 email defined in the whitelist""" + AllowedEmail.objects.create(email=self.plus_email) + plus_email_allowed = AllowedEmail.is_allowed_email(self.plus_email) + self.assertTrue(plus_email_allowed) + + def test_plus_email_not_in_whitelist(self): + """Test for a +1 email not defined in the whitelist""" + # This email should not be allowed. + # Checks that we do more than just a regex check on the record. + plus_email_allowed = AllowedEmail.is_allowed_email(self.plus_email) + self.assertFalse(plus_email_allowed) + + def test_plus_email_not_in_whitelist_but_base_email_is(self): + """ + Test for a +1 email NOT defined in the whitelist, but the normal one is defined. + Example: + normal (in whitelist) - joe@igorville.com + +1 email (not in whitelist) - joe+1@igorville.com + """ + AllowedEmail.objects.create(email=self.email) + base_email_allowed = AllowedEmail.is_allowed_email(self.email) + self.assertTrue(base_email_allowed) + + # The plus email should also be allowed + plus_email_allowed = AllowedEmail.is_allowed_email(self.plus_email) + self.assertTrue(plus_email_allowed) + + # This email shouldn't exist in the DB + self.assertFalse(AllowedEmail.objects.filter(email=self.plus_email).exists()) + + def test_plus_email_in_whitelist_but_base_email_is_not(self): + """ + Test for a +1 email defined in the whitelist, but the normal is NOT defined. + Example: + normal (not in whitelist) - joe@igorville.com + +1 email (in whitelist) - joe+1@igorville.com + """ + AllowedEmail.objects.create(email=self.plus_email) + plus_email_allowed = AllowedEmail.is_allowed_email(self.plus_email) + self.assertTrue(plus_email_allowed) + + # The base email should also be allowed + base_email_allowed = AllowedEmail.is_allowed_email(self.email) + self.assertTrue(base_email_allowed) + + # This email shouldn't exist in the DB + self.assertFalse(AllowedEmail.objects.filter(email=self.email).exists()) + + def test_invalid_regex_for_plus_email(self): + """ + Test for an invalid email that contains a '+'. + This base email should still pass, but the regex rule should not. + + Our regex should only pass for emails that end with a '+' + Example: + Invalid email - 1+joe@igorville.com + Valid email: - joe+1@igorville.com + """ + AllowedEmail.objects.create(email=self.invalid_plus_email) + invalid_plus_email = AllowedEmail.is_allowed_email(self.invalid_plus_email) + # We still expect that this will pass, it exists in the db + self.assertTrue(invalid_plus_email) + + # The base email SHOULD NOT pass, as it doesn't match our regex + base_email = AllowedEmail.is_allowed_email(self.email) + self.assertFalse(base_email) + + # For good measure, also check the other plus email + regular_plus_email = AllowedEmail.is_allowed_email(self.plus_email) + self.assertFalse(regular_plus_email) diff --git a/src/registrar/tests/test_views_domain.py b/src/registrar/tests/test_views_domain.py index 5ac24fd69..273adfba0 100644 --- a/src/registrar/tests/test_views_domain.py +++ b/src/registrar/tests/test_views_domain.py @@ -27,6 +27,7 @@ Domain, DomainInformation, DomainInvitation, + AllowedEmail, Contact, PublicContact, Host, @@ -349,6 +350,22 @@ def test_domain_readonly_on_detail_page(self): class TestDomainManagers(TestDomainOverview): + @classmethod + def setUpClass(cls): + super().setUpClass() + allowed_emails = [ + AllowedEmail(email=""), + AllowedEmail(email="testy@town.com"), + AllowedEmail(email="mayor@igorville.gov"), + AllowedEmail(email="testy2@town.com"), + ] + AllowedEmail.objects.bulk_create(allowed_emails) + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + AllowedEmail.objects.all().delete() + def tearDown(self): """Ensure that the user has its original permissions""" super().tearDown() @@ -465,6 +482,7 @@ def test_domain_invitation_email_sent(self): """Inviting a non-existent user sends them an email.""" # make sure there is no user with this email email_address = "mayor@igorville.gov" + allowed_email, _ = AllowedEmail.objects.get_or_create(email=email_address) User.objects.filter(email=email_address).delete() self.domain_information, _ = DomainInformation.objects.get_or_create(creator=self.user, domain=self.domain) @@ -484,6 +502,7 @@ def test_domain_invitation_email_sent(self): Destination={"ToAddresses": [email_address]}, Content=ANY, ) + allowed_email.delete() @boto3_mocking.patching @less_console_noise_decorator @@ -569,6 +588,7 @@ def test_domain_invitation_email_has_email_as_requestor_staff(self): """Inviting a user sends them an email, with email as the name.""" # Create a fake user object email_address = "mayor@igorville.gov" + AllowedEmail.objects.get_or_create(email=email_address) User.objects.get_or_create(email=email_address, username="fakeuser@fakeymail.com") # Make sure the user is staff diff --git a/src/registrar/utility/email.py b/src/registrar/utility/email.py index e274893a2..1fe1be596 100644 --- a/src/registrar/utility/email.py +++ b/src/registrar/utility/email.py @@ -4,6 +4,7 @@ import logging import textwrap from datetime import datetime +from django.apps import apps from django.conf import settings from django.template.loader import get_template from email.mime.application import MIMEApplication @@ -27,7 +28,7 @@ def send_templated_email( to_address: str, bcc_address="", context={}, - attachment_file: str = None, + attachment_file=None, wrap_email=False, ): """Send an email built from a template to one email address. @@ -39,9 +40,11 @@ def send_templated_email( Raises EmailSendingError if SES client could not be accessed """ - if flag_is_active(None, "disable_email_sending") and not settings.IS_PRODUCTION: # type: ignore - message = "Could not send email. Email sending is disabled due to flag 'disable_email_sending'." - raise EmailSendingError(message) + if not settings.IS_PRODUCTION: # type: ignore + # Split into a function: C901 'send_templated_email' is too complex. + # Raises an error if we cannot send an email (due to restrictions). + # Does nothing otherwise. + _can_send_email(to_address, bcc_address) template = get_template(template_name) email_body = template.render(context=context) @@ -71,7 +74,7 @@ def send_templated_email( destination["BccAddresses"] = [bcc_address] try: - if attachment_file is None: + if not attachment_file: # Wrap the email body to a maximum width of 80 characters per line. # Not all email clients support CSS to do this, and our .txt files require parsing. if wrap_email: @@ -102,6 +105,24 @@ def send_templated_email( raise EmailSendingError("Could not send SES email.") from exc +def _can_send_email(to_address, bcc_address): + """Raises an EmailSendingError if we cannot send an email. Does nothing otherwise.""" + + if flag_is_active(None, "disable_email_sending"): # type: ignore + message = "Could not send email. Email sending is disabled due to flag 'disable_email_sending'." + raise EmailSendingError(message) + else: + # Raise an email sending error if these doesn't exist within our whitelist. + # If these emails don't exist, this function can handle that elsewhere. + AllowedEmail = apps.get_model("registrar", "AllowedEmail") + message = "Could not send email. The email '{}' does not exist within the whitelist." + if to_address and not AllowedEmail.is_allowed_email(to_address): + raise EmailSendingError(message.format(to_address)) + + if bcc_address and not AllowedEmail.is_allowed_email(bcc_address): + raise EmailSendingError(message.format(bcc_address)) + + def wrap_text_and_preserve_paragraphs(text, width): """ Wraps text to `width` preserving newlines; splits on '\n', wraps segments, rejoins with '\n'.