diff --git a/docs/developer/README.md b/docs/developer/README.md index 9ddb35352..0fa9d9a8c 100644 --- a/docs/developer/README.md +++ b/docs/developer/README.md @@ -173,7 +173,7 @@ You can change the logging verbosity, if needed. Do a web search for "django log ## Mock data -[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures_users.py) and [fixtures_domain_requests.py](../../src/registrar/fixtures_domain_requests.py), giving you some test data to play with while developing. +[load.py](../../src/registrar/management/commands/load.py) called from docker-compose (locally) and reset-db.yml (upper) loads the fixtures from [fixtures_user.py](../../src/registrar/fixtures/fixtures_users.py) and the rest of the data-loading fixtures in that fixtures folder, giving you some test data to play with while developing. See the [database-access README](./database-access.md) for information on how to pull data to update these fixtures. diff --git a/src/registrar/fixtures/fixtures_domains.py b/src/registrar/fixtures/fixtures_domains.py new file mode 100644 index 000000000..98f13cd43 --- /dev/null +++ b/src/registrar/fixtures/fixtures_domains.py @@ -0,0 +1,138 @@ +from datetime import timedelta +from django.utils import timezone +import logging +import random +from faker import Faker +from django.db import transaction + +from registrar.fixtures.fixtures_requests import DomainRequestFixture +from registrar.fixtures.fixtures_users import UserFixture +from registrar.models import User, DomainRequest +from registrar.models.domain import Domain + +fake = Faker() +logger = logging.getLogger(__name__) + + +class DomainFixture(DomainRequestFixture): + """Create two domains and permissions on them for each user. + One domain will have a past expiration date. + + Depends on fixtures_requests. + + Make sure this class' `load` method is called from `handle` + in management/commands/load.py, then use `./manage.py load` + to run this code. + """ + + @classmethod + def load(cls): + # Lumped under .atomic to ensure we don't make redundant DB calls. + # This bundles them all together, and then saves it in a single call. + with transaction.atomic(): + try: + # Get the usernames of users created in the UserFixture + created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF] + + # Filter users to only include those created by the fixture + users = list(User.objects.filter(username__in=created_usernames)) + except Exception as e: + logger.warning(e) + return + + # Approve each user associated with `in review` status domains + cls._approve_domain_requests(users) + + @staticmethod + def _generate_fake_expiration_date(days_in_future=365): + """Generates a fake expiration date between 1 and 365 days in the future.""" + current_date = timezone.now().date() # nosec + return current_date + timedelta(days=random.randint(1, days_in_future)) # nosec + + @staticmethod + def _generate_fake_expiration_date_in_past(): + """Generates a fake expiration date up to 365 days in the past.""" + current_date = timezone.now().date() # nosec + return current_date + timedelta(days=random.randint(-365, -1)) # nosec + + @classmethod + def _approve_request(cls, domain_request, users): + """Helper function to approve a domain request.""" + if not domain_request: + return None + + if domain_request.investigator is None: + # Assign random investigator if not already assigned + domain_request.investigator = random.choice(users) # nosec + + # Approve the domain request + domain_request.approve(send_email=False) + + return domain_request + + @classmethod + def _approve_domain_requests(cls, users): + """Approves one current and one expired request per user.""" + domain_requests_to_update = [] + expired_requests = [] + + for user in users: + # Get the latest and second-to-last domain requests + domain_requests = DomainRequest.objects.filter( + creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW + ).order_by("-id")[:2] + + # Latest domain request + domain_request = domain_requests[0] if domain_requests else None + # Second-to-last domain request (expired) + domain_request_expired = domain_requests[1] if len(domain_requests) > 1 else None + + # Approve the current domain request + if domain_request: + cls._approve_request(domain_request, users) + domain_requests_to_update.append(domain_request) + + # Approve the expired domain request + if domain_request_expired: + cls._approve_request(domain_request_expired, users) + domain_requests_to_update.append(domain_request_expired) + expired_requests.append(domain_request_expired) + + # Perform bulk update for the domain requests + cls._bulk_update_requests(domain_requests_to_update) + + # Retrieve all domains associated with the domain requests + domains_to_update = Domain.objects.filter(domain_info__domain_request__in=domain_requests_to_update) + + # Loop through and update expiration dates for domains + for domain in domains_to_update: + domain_request = domain.domain_info.domain_request + + # Set the expiration date based on whether the request is expired + if domain_request in expired_requests: + domain.expiration_date = cls._generate_fake_expiration_date_in_past() + else: + domain.expiration_date = cls._generate_fake_expiration_date() + + # Perform bulk update for the domains + cls._bulk_update_domains(domains_to_update) + + @classmethod + def _bulk_update_requests(cls, domain_requests_to_update): + """Bulk update domain requests.""" + if domain_requests_to_update: + try: + DomainRequest.objects.bulk_update(domain_requests_to_update, ["status", "investigator"]) + logger.info(f"Successfully updated {len(domain_requests_to_update)} requests.") + except Exception as e: + logger.error(f"Unexpected error during requests bulk update: {e}") + + @classmethod + def _bulk_update_domains(cls, domains_to_update): + """Bulk update domains with expiration dates.""" + if domains_to_update: + try: + Domain.objects.bulk_update(domains_to_update, ["expiration_date"]) + logger.info(f"Successfully updated {len(domains_to_update)} domains.") + except Exception as e: + logger.error(f"Unexpected error during domains bulk update: {e}") diff --git a/src/registrar/fixtures/fixtures_portfolios.py b/src/registrar/fixtures/fixtures_portfolios.py new file mode 100644 index 000000000..2a391fb16 --- /dev/null +++ b/src/registrar/fixtures/fixtures_portfolios.py @@ -0,0 +1,125 @@ +import logging +import random +from faker import Faker +from django.db import transaction + +from registrar.models import User, DomainRequest, FederalAgency +from registrar.models.portfolio import Portfolio +from registrar.models.senior_official import SeniorOfficial + + +fake = Faker() +logger = logging.getLogger(__name__) + + +class PortfolioFixture: + """ + Creates 2 pre-defined portfolios with the infrastructure to add more. + + Make sure this class' `load` method is called from `handle` + in management/commands/load.py, then use `./manage.py load` + to run this code. + """ + + PORTFOLIOS = [ + { + "organization_name": "Hotel California", + }, + { + "organization_name": "Wish You Were Here", + }, + ] + + @classmethod + def fake_so(cls): + return { + "first_name": fake.first_name(), + "last_name": fake.last_name(), + "title": fake.job(), + "email": fake.ascii_safe_email(), + "phone": "201-555-5555", + } + + @classmethod + def _set_non_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict): + """Helper method used by `load`.""" + portfolio.organization_type = ( + portfolio_dict["organization_type"] + if "organization_type" in portfolio_dict + else DomainRequest.OrganizationChoices.FEDERAL + ) + portfolio.notes = portfolio_dict["notes"] if "notes" in portfolio_dict else None + portfolio.address_line1 = ( + portfolio_dict["address_line1"] if "address_line1" in portfolio_dict else fake.street_address() + ) + portfolio.address_line2 = portfolio_dict["address_line2"] if "address_line2" in portfolio_dict else None + portfolio.city = portfolio_dict["city"] if "city" in portfolio_dict else fake.city() + portfolio.state_territory = ( + portfolio_dict["state_territory"] if "state_territory" in portfolio_dict else fake.state_abbr() + ) + portfolio.zipcode = portfolio_dict["zipcode"] if "zipcode" in portfolio_dict else fake.postalcode() + portfolio.urbanization = portfolio_dict["urbanization"] if "urbanization" in portfolio_dict else None + portfolio.security_contact_email = ( + portfolio_dict["security_contact_email"] if "security_contact_email" in portfolio_dict else fake.email() + ) + + @classmethod + def _set_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict, user: User): + """Helper method used by `load`.""" + if not portfolio.senior_official: + if portfolio_dict.get("senior_official") is not None: + portfolio.senior_official, _ = SeniorOfficial.objects.get_or_create(**portfolio_dict["senior_official"]) + else: + portfolio.senior_official = SeniorOfficial.objects.create(**cls.fake_so()) + + if not portfolio.federal_agency: + if portfolio_dict.get("federal_agency") is not None: + portfolio.federal_agency, _ = FederalAgency.objects.get_or_create(name=portfolio_dict["federal_agency"]) + else: + federal_agencies = FederalAgency.objects.all() + # Random choice of agency for selects, used as placeholders for testing. + portfolio.federal_agency = random.choice(federal_agencies) # nosec + + @classmethod + def load(cls): + """Creates portfolios.""" + logger.info("Going to load %s portfolios" % len(cls.PORTFOLIOS)) + + # Lumped under .atomic to ensure we don't make redundant DB calls. + # This bundles them all together, and then saves it in a single call. + with transaction.atomic(): + try: + user = User.objects.all().last() + except Exception as e: + logger.warning(e) + return + + portfolios_to_create = [] + for portfolio_data in cls.PORTFOLIOS: + organization_name = portfolio_data["organization_name"] + + # Check if portfolio with the organization name already exists + if Portfolio.objects.filter(organization_name=organization_name).exists(): + logger.info( + f"Portfolio with organization name '{organization_name}' already exists, skipping creation." + ) + continue + + try: + portfolio = Portfolio( + creator=user, + organization_name=portfolio_data["organization_name"], + ) + cls._set_non_foreign_key_fields(portfolio, portfolio_data) + cls._set_foreign_key_fields(portfolio, portfolio_data, user) + portfolios_to_create.append(portfolio) + except Exception as e: + logger.warning(e) + + # Bulk create domain requests + if len(portfolios_to_create) > 0: + try: + Portfolio.objects.bulk_create(portfolios_to_create) + logger.info(f"Successfully created {len(portfolios_to_create)} portfolios") + except Exception as e: + logger.warning(f"Error bulk creating portfolios: {e}") diff --git a/src/registrar/fixtures/fixtures_requests.py b/src/registrar/fixtures/fixtures_requests.py new file mode 100644 index 000000000..f5b57491e --- /dev/null +++ b/src/registrar/fixtures/fixtures_requests.py @@ -0,0 +1,325 @@ +from datetime import timedelta +from django.utils import timezone +import logging +import random +from faker import Faker +from django.db import transaction + +from registrar.fixtures.fixtures_portfolios import PortfolioFixture +from registrar.fixtures.fixtures_users import UserFixture +from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency +from registrar.models.portfolio import Portfolio +from registrar.models.suborganization import Suborganization + + +fake = Faker() +logger = logging.getLogger(__name__) + + +class DomainRequestFixture: + """ + Creates domain requests for each user in the database, + assign portfolios and suborgs. + + Creates 3 in_review requests, one for approving with an expired domain, + one for approving with a non-expired domain, and one for leaving in in_review. + + Depends on fixtures_portfolios and fixtures_suborganizations. + + Make sure this class' `load` method is called from `handle` + in management/commands/load.py, then use `./manage.py load` + to run this code. + """ + + # any fields not specified here will be filled in with fake data or defaults + # NOTE BENE: each fixture must have `organization_name` for uniqueness! + # Here is a more complete example as a template: + # { + # "status": "started", + # "organization_name": "Example - Just started", + # "generic_org_type": "federal", + # "federal_agency": None, + # "federal_type": None, + # "address_line1": None, + # "address_line2": None, + # "city": None, + # "state_territory": None, + # "zipcode": None, + # "urbanization": None, + # "purpose": None, + # "anything_else": None, + # "is_policy_acknowledged": None, + # "senior_official": None, + # "other_contacts": [], + # "current_websites": [], + # "alternative_domains": [], + # }, + DOMAINREQUESTS = [ + { + "status": DomainRequest.DomainRequestStatus.STARTED, + "organization_name": "Example - Finished but not submitted", + }, + { + "status": DomainRequest.DomainRequestStatus.SUBMITTED, + "organization_name": "Example - Submitted but pending investigation", + }, + { + "status": DomainRequest.DomainRequestStatus.IN_REVIEW, + "organization_name": "Example - In investigation", + }, + { + "status": DomainRequest.DomainRequestStatus.IN_REVIEW, + "organization_name": "Example - Approved", + }, + { + "status": DomainRequest.DomainRequestStatus.IN_REVIEW, + "organization_name": "Example - Approved, domain expired", + }, + { + "status": DomainRequest.DomainRequestStatus.WITHDRAWN, + "organization_name": "Example - Withdrawn", + }, + { + "status": DomainRequest.DomainRequestStatus.ACTION_NEEDED, + "organization_name": "Example - Action needed", + }, + { + "status": "rejected", + "organization_name": "Example - Rejected", + }, + ] + + @classmethod + def fake_contact(cls): + return { + "first_name": fake.first_name(), + "middle_name": None, + "last_name": fake.last_name(), + "title": fake.job(), + "email": fake.ascii_safe_email(), + "phone": "201-555-5555", + } + + @classmethod + def fake_dot_gov(cls): + return f"{fake.slug()}.gov" + + @classmethod + def fake_expiration_date(cls): + """Generates a fake expiration date between 0 and 1 year in the future.""" + current_date = timezone.now().date() + days_in_future = random.randint(0, 365) # nosec + return current_date + timedelta(days=days_in_future) + + @classmethod + def _set_non_foreign_key_fields(cls, request: DomainRequest, request_dict: dict): + """Helper method used by `load`.""" + request.status = request_dict["status"] if "status" in request_dict else "started" + + # TODO for a future ticket: Allow for more than just "federal" here + request.generic_org_type = request_dict["generic_org_type"] if "generic_org_type" in request_dict else "federal" + if request.status != "started": + request.last_submitted_date = fake.date() + request.federal_type = ( + request_dict["federal_type"] + if "federal_type" in request_dict + else random.choice(["executive", "judicial", "legislative"]) # nosec + ) + request.address_line1 = ( + request_dict["address_line1"] if "address_line1" in request_dict else fake.street_address() + ) + request.address_line2 = request_dict["address_line2"] if "address_line2" in request_dict else None + request.city = request_dict["city"] if "city" in request_dict else fake.city() + request.state_territory = ( + request_dict["state_territory"] if "state_territory" in request_dict else fake.state_abbr() + ) + request.zipcode = request_dict["zipcode"] if "zipcode" in request_dict else fake.postalcode() + request.urbanization = request_dict["urbanization"] if "urbanization" in request_dict else None + request.purpose = request_dict["purpose"] if "purpose" in request_dict else fake.paragraph() + request.has_cisa_representative = ( + request_dict["has_cisa_representative"] if "has_cisa_representative" in request_dict else True + ) + request.cisa_representative_email = ( + request_dict["cisa_representative_email"] if "cisa_representative_email" in request_dict else fake.email() + ) + request.cisa_representative_first_name = ( + request_dict["cisa_representative_first_name"] + if "cisa_representative_first_name" in request_dict + else fake.first_name() + ) + request.cisa_representative_last_name = ( + request_dict["cisa_representative_last_name"] + if "cisa_representative_last_name" in request_dict + else fake.last_name() + ) + request.has_anything_else_text = ( + request_dict["has_anything_else_text"] if "has_anything_else_text" in request_dict else True + ) + request.anything_else = request_dict["anything_else"] if "anything_else" in request_dict else fake.paragraph() + request.is_policy_acknowledged = ( + request_dict["is_policy_acknowledged"] if "is_policy_acknowledged" in request_dict else True + ) + + @classmethod + def _set_foreign_key_fields(cls, request: DomainRequest, request_dict: dict, user: User): + """Helper method used by `load`.""" + request.investigator = cls._get_investigator(request, request_dict, user) + request.senior_official = cls._get_senior_official(request, request_dict) + request.requested_domain = cls._get_requested_domain(request, request_dict) + request.federal_agency = cls._get_federal_agency(request, request_dict) + request.portfolio = cls._get_portfolio(request, request_dict) + request.sub_organization = cls._get_sub_organization(request, request_dict) + + @classmethod + def _get_investigator(cls, request: DomainRequest, request_dict: dict, user: User): + if not request.investigator: + return User.objects.get(username=user.username) if "investigator" in request_dict else None + return request.investigator + + @classmethod + def _get_senior_official(cls, request: DomainRequest, request_dict: dict): + if not request.senior_official: + if "senior_official" in request_dict and request_dict["senior_official"] is not None: + return Contact.objects.get_or_create(**request_dict["senior_official"])[0] + return Contact.objects.create(**cls.fake_contact()) + return request.senior_official + + @classmethod + def _get_requested_domain(cls, request: DomainRequest, request_dict: dict): + if not request.requested_domain: + if "requested_domain" in request_dict and request_dict["requested_domain"] is not None: + return DraftDomain.objects.get_or_create(name=request_dict["requested_domain"])[0] + return DraftDomain.objects.create(name=cls.fake_dot_gov()) + return request.requested_domain + + @classmethod + def _get_federal_agency(cls, request: DomainRequest, request_dict: dict): + if not request.federal_agency: + if "federal_agency" in request_dict and request_dict["federal_agency"] is not None: + return FederalAgency.objects.get_or_create(name=request_dict["federal_agency"])[0] + return random.choice(FederalAgency.objects.all()) # nosec + return request.federal_agency + + @classmethod + def _get_portfolio(cls, request: DomainRequest, request_dict: dict): + if not request.portfolio: + if "portfolio" in request_dict and request_dict["portfolio"] is not None: + return Portfolio.objects.get_or_create(name=request_dict["portfolio"])[0] + return cls._get_random_portfolio() + return request.portfolio + + @classmethod + def _get_sub_organization(cls, request: DomainRequest, request_dict: dict): + if not request.sub_organization: + if "sub_organization" in request_dict and request_dict["sub_organization"] is not None: + return Suborganization.objects.get_or_create(name=request_dict["sub_organization"])[0] + return cls._get_random_sub_organization() + return request.sub_organization + + @classmethod + def _get_random_portfolio(cls): + try: + organization_names = [portfolio["organization_name"] for portfolio in PortfolioFixture.PORTFOLIOS] + + portfolio_options = Portfolio.objects.filter(organization_name__in=organization_names) + return random.choice(portfolio_options) if portfolio_options.exists() else None # nosec + except Exception as e: + logger.warning(f"Expected fixture portfolio, did not find it: {e}") + return None + + @classmethod + def _get_random_sub_organization(cls): + try: + suborg_options = [Suborganization.objects.first(), Suborganization.objects.last()] + return random.choice(suborg_options) # nosec + except Exception as e: + logger.warning(f"Expected fixture sub_organization, did not find it: {e}") + return None + + @classmethod + def _set_many_to_many_relations(cls, request: DomainRequest, request_dict: dict): + """Helper method used by `load`.""" + if "other_contacts" in request_dict: + for contact in request_dict["other_contacts"]: + request.other_contacts.add(Contact.objects.get_or_create(**contact)[0]) + elif not request.other_contacts.exists(): + other_contacts = [ + Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(1, 3)) # nosec + ] + request.other_contacts.add(*other_contacts) + + if "current_websites" in request_dict: + for website in request_dict["current_websites"]: + request.current_websites.add(Website.objects.get_or_create(website=website)[0]) + elif not request.current_websites.exists(): + current_websites = [ + Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec + ] + request.current_websites.add(*current_websites) + + if "alternative_domains" in request_dict: + for domain in request_dict["alternative_domains"]: + request.alternative_domains.add(Website.objects.get_or_create(website=domain)[0]) + elif not request.alternative_domains.exists(): + alternative_domains = [ + Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec + ] + request.alternative_domains.add(*alternative_domains) + + @classmethod + def load(cls): + """Creates domain requests for each user in the database.""" + logger.info("Going to load %s domain requests" % len(cls.DOMAINREQUESTS)) + + # Lumped under .atomic to ensure we don't make redundant DB calls. + # This bundles them all together, and then saves it in a single call. + with transaction.atomic(): + try: + # Get the usernames of users created in the UserFixture + created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF] + + # Filter users to only include those created by the fixture + users = list(User.objects.filter(username__in=created_usernames)) + except Exception as e: + logger.warning(e) + return + + cls._create_domain_requests(users) + + @classmethod + def _create_domain_requests(cls, users): + """Creates DomainRequests given a list of users.""" + domain_requests_to_create = [] + for user in users: + for request_data in cls.DOMAINREQUESTS: + # Prepare DomainRequest objects + try: + domain_request = DomainRequest( + creator=user, + organization_name=request_data["organization_name"], + ) + cls._set_non_foreign_key_fields(domain_request, request_data) + cls._set_foreign_key_fields(domain_request, request_data, user) + domain_requests_to_create.append(domain_request) + except Exception as e: + logger.warning(e) + + # Bulk create domain requests + cls._bulk_create_requests(domain_requests_to_create) + + # Now many-to-many relationships + for domain_request in domain_requests_to_create: + try: + cls._set_many_to_many_relations(domain_request, request_data) + except Exception as e: + logger.warning(e) + + @classmethod + def _bulk_create_requests(cls, domain_requests_to_create): + """Bulk create domain requests.""" + if len(domain_requests_to_create) > 0: + try: + DomainRequest.objects.bulk_create(domain_requests_to_create) + logger.info(f"Successfully created {len(domain_requests_to_create)} requests.") + except Exception as e: + logger.error(f"Unexpected error during requests bulk creation: {e}") diff --git a/src/registrar/fixtures/fixtures_suborganizations.py b/src/registrar/fixtures/fixtures_suborganizations.py new file mode 100644 index 000000000..af7e02804 --- /dev/null +++ b/src/registrar/fixtures/fixtures_suborganizations.py @@ -0,0 +1,87 @@ +import logging +from faker import Faker +from django.db import transaction + +from registrar.models.portfolio import Portfolio +from registrar.models.suborganization import Suborganization + + +fake = Faker() +logger = logging.getLogger(__name__) + + +class SuborganizationFixture: + """ + Creates 2 pre-defined suborg with the infrastructure to add more. + + Depends on fixtures_portfolios. + + Make sure this class' `load` method is called from `handle` + in management/commands/load.py, then use `./manage.py load` + to run this code. + """ + + SUBORGS = [ + { + "name": "Take it Easy", + }, + { + "name": "Welcome to the Machine", + }, + ] + + @classmethod + def load(cls): + """Creates suborganizations.""" + logger.info(f"Going to load {len(cls.SUBORGS)} suborgs") + + with transaction.atomic(): + portfolios = cls._get_portfolios() + if not portfolios: + return + + suborgs_to_create = cls._prepare_suborgs_to_create(portfolios) + cls._bulk_create_suborgs(suborgs_to_create) + + @classmethod + def _get_portfolios(cls): + """Fetches portfolios with organization_name 'Hotel California' and 'Wish You Were Here' + and logs warnings if not found.""" + try: + portfolio1 = Portfolio.objects.filter(organization_name="Hotel California").first() + portfolio2 = Portfolio.objects.filter(organization_name="Wish You Were Here").first() + + if not portfolio1 or not portfolio2: + logger.warning("One or both portfolios not found.") + return None + return portfolio1, portfolio2 + except Exception as e: + logger.warning(f"Error fetching portfolios: {e}") + return None + + @classmethod + def _prepare_suborgs_to_create(cls, portfolios): + """Prepares a list of suborganizations to create, avoiding duplicates.""" + portfolio1, portfolio2 = portfolios + suborgs_to_create = [] + + try: + if not Suborganization.objects.filter(name=cls.SUBORGS[0]["name"]).exists(): + suborgs_to_create.append(Suborganization(portfolio=portfolio1, name=cls.SUBORGS[0]["name"])) + + if not Suborganization.objects.filter(name=cls.SUBORGS[1]["name"]).exists(): + suborgs_to_create.append(Suborganization(portfolio=portfolio2, name=cls.SUBORGS[1]["name"])) + except Exception as e: + logger.warning(f"Error creating suborg objects: {e}") + + return suborgs_to_create + + @classmethod + def _bulk_create_suborgs(cls, suborgs_to_create): + """Bulk creates suborganizations and logs success or errors.""" + if suborgs_to_create: + try: + Suborganization.objects.bulk_create(suborgs_to_create) + logger.info(f"Successfully created {len(suborgs_to_create)} suborgs") + except Exception as e: + logger.warning(f"Error bulk creating suborgs: {e}") diff --git a/src/registrar/fixtures/fixtures_user_portfolio_permissions.py b/src/registrar/fixtures/fixtures_user_portfolio_permissions.py new file mode 100644 index 000000000..3c64eb6b5 --- /dev/null +++ b/src/registrar/fixtures/fixtures_user_portfolio_permissions.py @@ -0,0 +1,86 @@ +import logging +from faker import Faker +from django.db import transaction + +from registrar.fixtures.fixtures_portfolios import PortfolioFixture +from registrar.fixtures.fixtures_users import UserFixture +from registrar.models import User +from registrar.models.portfolio import Portfolio +from registrar.models.user_portfolio_permission import UserPortfolioPermission +from registrar.models.utility.portfolio_helper import UserPortfolioRoleChoices + +fake = Faker() +logger = logging.getLogger(__name__) + + +class UserPortfolioPermissionFixture: + """Create user portfolio permissions for each user. + Each user will be admin on 2 portfolios. + + Depends on fixture_portfolios""" + + @classmethod + def load(cls): + logger.info("Going to set user portfolio permissions") + + # Lumped under .atomic to ensure we don't make redundant DB calls. + # This bundles them all together, and then saves it in a single call. + with transaction.atomic(): + try: + # Get the usernames of users created in the UserFixture + created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF] + + # Filter users to only include those created by the fixture + users = list(User.objects.filter(username__in=created_usernames)) + + organization_names = [portfolio["organization_name"] for portfolio in PortfolioFixture.PORTFOLIOS] + + portfolios = list(Portfolio.objects.filter(organization_name__in=organization_names)) + + if not users: + logger.warning("User fixtures missing.") + return + + if not portfolios: + logger.warning("Portfolio fixtures missing.") + return + + except Exception as e: + logger.warning(e) + return + + user_portfolio_permissions_to_create = [] + for user in users: + for portfolio in portfolios: + try: + if not UserPortfolioPermission.objects.filter(user=user, portfolio=portfolio).exists(): + user_portfolio_permission = UserPortfolioPermission( + user=user, + portfolio=portfolio, + roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN], + ) + user_portfolio_permissions_to_create.append(user_portfolio_permission) + else: + logger.info( + f"Permission exists for user '{user.username}' " + f"on portfolio '{portfolio.organization_name}'." + ) + except Exception as e: + logger.warning(e) + + # Bulk create permissions + cls._bulk_create_permissions(user_portfolio_permissions_to_create) + + @classmethod + def _bulk_create_permissions(cls, user_portfolio_permissions_to_create): + """Bulk creates permissions and logs success or errors.""" + if user_portfolio_permissions_to_create: + try: + UserPortfolioPermission.objects.bulk_create(user_portfolio_permissions_to_create) + logger.info( + f"Successfully created {len(user_portfolio_permissions_to_create)} user portfolio permissions." + ) + except Exception as e: + logger.error(f"Unexpected error during portfolio permission bulk creation: {e}") + else: + logger.info("No new user portfolio permissions to create.") diff --git a/src/registrar/fixtures_users.py b/src/registrar/fixtures/fixtures_users.py similarity index 67% rename from src/registrar/fixtures_users.py rename to src/registrar/fixtures/fixtures_users.py index 7fbf41223..91b35f854 100644 --- a/src/registrar/fixtures_users.py +++ b/src/registrar/fixtures/fixtures_users.py @@ -23,129 +23,123 @@ class UserFixture: """ ADMINS = [ - { - "username": "43a7fa8d-0550-4494-a6fe-81500324d590", - "first_name": "Jyoti", - "last_name": "Bock", - "email": "jyotibock@truss.works", - }, { "username": "aad084c3-66cc-4632-80eb-41cdf5c5bcbf", "first_name": "Aditi", "last_name": "Green", "email": "aditidevelops+01@gmail.com", + "title": "Positive vibes", }, { "username": "be17c826-e200-4999-9389-2ded48c43691", "first_name": "Matthew", "last_name": "Spence", + "title": "Hollywood hair", }, { "username": "5f283494-31bd-49b5-b024-a7e7cae00848", "first_name": "Rachid", "last_name": "Mrad", "email": "rachid.mrad@associates.cisa.dhs.gov", + "title": "Common pirate", }, { "username": "eb2214cd-fc0c-48c0-9dbd-bc4cd6820c74", "first_name": "Alysia", "last_name": "Broddrick", "email": "abroddrick@truss.works", + "title": "I drink coffee and know things", }, { "username": "8f8e7293-17f7-4716-889b-1990241cbd39", "first_name": "Katherine", "last_name": "Osos", "email": "kosos@truss.works", + "title": "Grove keeper", }, { "username": "70488e0a-e937-4894-a28c-16f5949effd4", "first_name": "Gaby", "last_name": "DiSarli", "email": "gaby@truss.works", + "title": "De Stijl", }, { "username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c", "first_name": "Cameron", "last_name": "Dixon", "email": "cameron.dixon@cisa.dhs.gov", - }, - { - "username": "0353607a-cbba-47d2-98d7-e83dcd5b90ea", - "first_name": "Ryan", - "last_name": "Brooks", + "title": "Product owner", }, { "username": "30001ee7-0467-4df2-8db2-786e79606060", "first_name": "Zander", "last_name": "Adkinson", + "title": "ACME specialist", }, { "username": "2bf518c2-485a-4c42-ab1a-f5a8b0a08484", "first_name": "Paul", "last_name": "Kuykendall", + "title": "Dr. Silvertongue", }, { "username": "2a88a97b-be96-4aad-b99e-0b605b492c78", "first_name": "Rebecca", "last_name": "Hsieh", "email": "rebecca.hsieh@truss.works", + "title": "Catlady", }, { "username": "fa69c8e8-da83-4798-a4f2-263c9ce93f52", "first_name": "David", "last_name": "Kennedy", "email": "david.kennedy@ecstech.com", + "title": "Mean lean coding machine", }, { "username": "f14433d8-f0e9-41bf-9c72-b99b110e665d", "first_name": "Nicolle", "last_name": "LeClair", "email": "nicolle.leclair@ecstech.com", + "title": "Nightowl", }, { "username": "24840450-bf47-4d89-8aa9-c612fe68f9da", "first_name": "Erin", "last_name": "Song", + "title": "Catlady 2", }, { "username": "e0ea8b94-6e53-4430-814a-849a7ca45f21", "first_name": "Kristina", "last_name": "Yin", + "title": "Hufflepuff prefect", }, { "username": "ac49d7c1-368a-4e6b-8f1d-60250e20a16f", "first_name": "Vicky", "last_name": "Chin", "email": "szu.chin@associates.cisa.dhs.gov", + "title": "Ze whip", }, { "username": "66bb1a5a-a091-4d7f-a6cf-4d772b4711c7", "first_name": "Christina", "last_name": "Burnett", "email": "christina.burnett@cisa.dhs.gov", - }, - { - "username": "012f844d-8a0f-4225-9d82-cbf87bff1d3e", - "first_name": "Riley", - "last_name": "Orr", - "email": "riley+320@truss.works", + "title": "Groovy", }, { "username": "76612d84-66b0-4ae9-9870-81e98b9858b6", "first_name": "Anna", "last_name": "Gingle", "email": "annagingle@truss.works", + "title": "Sweetwater sailor", }, ] STAFF = [ - { - "username": "a5906815-dd80-4c64-aebe-2da6a4c9d7a4", - "first_name": "Jyoti-Analyst", - "last_name": "Bock-Analyst", - "email": "jyotibock+1@truss.works", - }, { "username": "ffec5987-aa84-411b-a05a-a7ee5cbcde54", "first_name": "Aditi-Analyst", @@ -231,18 +225,6 @@ class UserFixture: "last_name": "Burnett-Analyst", "email": "christina.burnett@gwe.cisa.dhs.gov", }, - { - "username": "d9839768-0c17-4fa2-9c8e-36291eef5c11", - "first_name": "Alex-Analyst", - "last_name": "Mcelya-Analyst", - "email": "ALEXANDER.MCELYA@cisa.dhs.gov", - }, - { - "username": "082a066f-e0a4-45f6-8672-4343a1208a36", - "first_name": "Riley-Analyst", - "last_name": "Orr-Analyst", - "email": "riley+321@truss.works", - }, { "username": "e1e350b1-cfc1-4753-a6cb-3ae6d912f99c", "first_name": "Anna-Analyst", @@ -254,29 +236,61 @@ class UserFixture: # Additional emails to add to the AllowedEmail whitelist. ADDITIONAL_ALLOWED_EMAILS: list[str] = ["davekenn4242@gmail.com", "rachid_mrad@hotmail.com"] + @classmethod def load_users(cls, users, group_name, are_superusers=False): - logger.info(f"Going to load {len(users)} users in group {group_name}") - for user_data in users: + """Loads the users into the database and assigns them to the specified group.""" + logger.info(f"Going to load {len(users)} users for group {group_name}") + + group = UserGroup.objects.get(name=group_name) + + # Prepare sets of existing usernames and IDs in one query + user_identifiers = [(user.get("username"), user.get("id")) for user in users] + existing_users = User.objects.filter( + username__in=[user[0] for user in user_identifiers] + [user[1] for user in user_identifiers] + ).values_list("username", "id") + + existing_usernames = set(user[0] for user in existing_users) + existing_user_ids = set(user[1] for user in existing_users) + + # Filter out users with existing IDs or usernames + new_users = [ + User( + id=user_data.get("id"), + first_name=user_data.get("first_name"), + last_name=user_data.get("last_name"), + username=user_data.get("username"), + email=user_data.get("email", ""), + title=user_data.get("title", "Peon"), + phone=user_data.get("phone", "2022222222"), + is_active=user_data.get("is_active", True), + is_staff=True, + is_superuser=are_superusers, + ) + for user_data in users + if user_data.get("username") not in existing_usernames and user_data.get("id") not in existing_user_ids + ] + + # Perform bulk creation for new users + if new_users: try: - user, _ = User.objects.get_or_create(username=user_data["username"]) - user.is_superuser = are_superusers - user.first_name = user_data["first_name"] - user.last_name = user_data["last_name"] - if "email" in user_data: - user.email = user_data["email"] - user.is_staff = True - user.is_active = True - # This verification type will get reverted to "regular" (or whichever is applicables) - # once the user logs in for the first time (as they then got verified through different means). - # In the meantime, we can still describe how the user got here in the first place. - user.verification_type = User.VerificationTypeChoices.FIXTURE_USER - group = UserGroup.objects.get(name=group_name) - user.groups.add(group) - user.save() - logger.debug(f"User object created for {user_data['first_name']}") + User.objects.bulk_create(new_users) + logger.info(f"Created {len(new_users)} new users.") except Exception as e: - logger.warning(e) - logger.info(f"All users in group {group_name} loaded.") + logger.error(f"Unexpected error during user bulk creation: {e}") + else: + logger.info("No new users to create.") + + # Get all users to be updated (both new and existing) + created_or_existing_users = User.objects.filter(username__in=[user.get("username") for user in users]) + + # Filter out users who are already in the group + users_not_in_group = created_or_existing_users.exclude(groups__id=group.id) + + # Add only users who are not already in the group + if users_not_in_group.exists(): + group.user_set.add(*users_not_in_group) + + logger.info(f"Users loaded for group {group_name}.") def load_allowed_emails(cls, users, additional_emails): """Populates a whitelist of allowed emails (as defined in this list)""" @@ -284,37 +298,33 @@ def load_allowed_emails(cls, users, additional_emails): if additional_emails: logger.info(f"Going to load {len(additional_emails)} additional allowed emails") - # Load user emails - allowed_emails = [] + existing_emails = set(AllowedEmail.objects.values_list("email", flat=True)) + new_allowed_emails = [] + for user_data in users: user_email = user_data.get("email") - if user_email and user_email not in allowed_emails: - allowed_emails.append(AllowedEmail(email=user_email)) - else: - first_name = user_data.get("first_name") - last_name = user_data.get("last_name") - logger.warning(f"Could not add email to whitelist for {first_name} {last_name}.") + if user_email and user_email not in existing_emails: + new_allowed_emails.append(AllowedEmail(email=user_email)) - # Load additional emails - allowed_emails.extend([AllowedEmail(email=email) for email in additional_emails]) + # Load additional emails, only if they don't exist already + for email in additional_emails: + if email not in existing_emails: + new_allowed_emails.append(AllowedEmail(email=email)) - if allowed_emails: - AllowedEmail.objects.bulk_create(allowed_emails) - logger.info(f"Loaded {len(allowed_emails)} allowed emails") + if new_allowed_emails: + try: + AllowedEmail.objects.bulk_create(new_allowed_emails) + logger.info(f"Loaded {len(new_allowed_emails)} allowed emails") + except Exception as e: + logger.error(f"Unexpected error during allowed emails bulk creation: {e}") else: logger.info("No allowed emails to load") @classmethod def load(cls): - # Lumped under .atomic to ensure we don't make redundant DB calls. - # This bundles them all together, and then saves it in a single call. - # This is slightly different then bulk_create or bulk_update, in that - # you still get the same behaviour of .save(), but those incremental - # steps now do not need to close/reopen a db connection, - # instead they share one. with transaction.atomic(): - cls.load_users(cls, cls.ADMINS, "full_access_group", are_superusers=True) - cls.load_users(cls, cls.STAFF, "cisa_analysts_group") + cls.load_users(cls.ADMINS, "full_access_group", are_superusers=True) + cls.load_users(cls.STAFF, "cisa_analysts_group") # Combine ADMINS and STAFF lists all_users = cls.ADMINS + cls.STAFF diff --git a/src/registrar/fixtures_domain_requests.py b/src/registrar/fixtures_domain_requests.py deleted file mode 100644 index 44dd13e4c..000000000 --- a/src/registrar/fixtures_domain_requests.py +++ /dev/null @@ -1,236 +0,0 @@ -import logging -import random -from faker import Faker -from django.db import transaction - -from registrar.models import User, DomainRequest, DraftDomain, Contact, Website, FederalAgency - -fake = Faker() -logger = logging.getLogger(__name__) - - -class DomainRequestFixture: - """ - Load domain requests into the database. - - Make sure this class' `load` method is called from `handle` - in management/commands/load.py, then use `./manage.py load` - to run this code. - """ - - # any fields not specified here will be filled in with fake data or defaults - # NOTE BENE: each fixture must have `organization_name` for uniqueness! - # Here is a more complete example as a template: - # { - # "status": "started", - # "organization_name": "Example - Just started", - # "generic_org_type": "federal", - # "federal_agency": None, - # "federal_type": None, - # "address_line1": None, - # "address_line2": None, - # "city": None, - # "state_territory": None, - # "zipcode": None, - # "urbanization": None, - # "purpose": None, - # "anything_else": None, - # "is_policy_acknowledged": None, - # "senior_official": None, - # "other_contacts": [], - # "current_websites": [], - # "alternative_domains": [], - # }, - DA = [ - { - "status": DomainRequest.DomainRequestStatus.STARTED, - "organization_name": "Example - Finished but not submitted", - }, - { - "status": DomainRequest.DomainRequestStatus.SUBMITTED, - "organization_name": "Example - Submitted but pending investigation", - }, - { - "status": DomainRequest.DomainRequestStatus.IN_REVIEW, - "organization_name": "Example - In investigation", - }, - { - "status": DomainRequest.DomainRequestStatus.IN_REVIEW, - "organization_name": "Example - Approved", - }, - { - "status": DomainRequest.DomainRequestStatus.WITHDRAWN, - "organization_name": "Example - Withdrawn", - }, - { - "status": DomainRequest.DomainRequestStatus.ACTION_NEEDED, - "organization_name": "Example - Action needed", - }, - { - "status": "rejected", - "organization_name": "Example - Rejected", - }, - ] - - @classmethod - def fake_contact(cls): - return { - "first_name": fake.first_name(), - "middle_name": None, - "last_name": fake.last_name(), - "title": fake.job(), - "email": fake.ascii_safe_email(), - "phone": "201-555-5555", - } - - @classmethod - def fake_dot_gov(cls): - return f"{fake.slug()}.gov" - - @classmethod - def _set_non_foreign_key_fields(cls, da: DomainRequest, app: dict): - """Helper method used by `load`.""" - da.status = app["status"] if "status" in app else "started" - - # TODO for a future ticket: Allow for more than just "federal" here - da.generic_org_type = app["generic_org_type"] if "generic_org_type" in app else "federal" - da.last_submitted_date = fake.date() - da.federal_type = ( - app["federal_type"] - if "federal_type" in app - else random.choice(["executive", "judicial", "legislative"]) # nosec - ) - da.address_line1 = app["address_line1"] if "address_line1" in app else fake.street_address() - da.address_line2 = app["address_line2"] if "address_line2" in app else None - da.city = app["city"] if "city" in app else fake.city() - da.state_territory = app["state_territory"] if "state_territory" in app else fake.state_abbr() - da.zipcode = app["zipcode"] if "zipcode" in app else fake.postalcode() - da.urbanization = app["urbanization"] if "urbanization" in app else None - da.purpose = app["purpose"] if "purpose" in app else fake.paragraph() - da.anything_else = app["anything_else"] if "anything_else" in app else None - da.is_policy_acknowledged = app["is_policy_acknowledged"] if "is_policy_acknowledged" in app else True - - @classmethod - def _set_foreign_key_fields(cls, da: DomainRequest, app: dict, user: User): - """Helper method used by `load`.""" - if not da.investigator: - da.investigator = User.objects.get(username=user.username) if "investigator" in app else None - - if not da.senior_official: - if "senior_official" in app and app["senior_official"] is not None: - da.senior_official, _ = Contact.objects.get_or_create(**app["senior_official"]) - else: - da.senior_official = Contact.objects.create(**cls.fake_contact()) - - if not da.requested_domain: - if "requested_domain" in app and app["requested_domain"] is not None: - da.requested_domain, _ = DraftDomain.objects.get_or_create(name=app["requested_domain"]) - else: - da.requested_domain = DraftDomain.objects.create(name=cls.fake_dot_gov()) - if not da.federal_agency: - if "federal_agency" in app and app["federal_agency"] is not None: - da.federal_agency, _ = FederalAgency.objects.get_or_create(name=app["federal_agency"]) - else: - federal_agencies = FederalAgency.objects.all() - # Random choice of agency for selects, used as placeholders for testing. - da.federal_agency = random.choice(federal_agencies) # nosec - - @classmethod - def _set_many_to_many_relations(cls, da: DomainRequest, app: dict): - """Helper method used by `load`.""" - if "other_contacts" in app: - for contact in app["other_contacts"]: - da.other_contacts.add(Contact.objects.get_or_create(**contact)[0]) - elif not da.other_contacts.exists(): - other_contacts = [ - Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(0, 3)) # nosec - ] - da.other_contacts.add(*other_contacts) - - if "current_websites" in app: - for website in app["current_websites"]: - da.current_websites.add(Website.objects.get_or_create(website=website)[0]) - elif not da.current_websites.exists(): - current_websites = [ - Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec - ] - da.current_websites.add(*current_websites) - - if "alternative_domains" in app: - for domain in app["alternative_domains"]: - da.alternative_domains.add(Website.objects.get_or_create(website=domain)[0]) - elif not da.alternative_domains.exists(): - alternative_domains = [ - Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec - ] - da.alternative_domains.add(*alternative_domains) - - @classmethod - def load(cls): - """Creates domain requests for each user in the database.""" - logger.info("Going to load %s domain requests" % len(cls.DA)) - try: - users = list(User.objects.all()) # force evaluation to catch db errors - except Exception as e: - logger.warning(e) - return - - # Lumped under .atomic to ensure we don't make redundant DB calls. - # This bundles them all together, and then saves it in a single call. - with transaction.atomic(): - cls._create_domain_requests(users) - - @classmethod - def _create_domain_requests(cls, users): - """Creates DomainRequests given a list of users""" - for user in users: - logger.debug("Loading domain requests for %s" % user) - for app in cls.DA: - try: - da, _ = DomainRequest.objects.get_or_create( - creator=user, - organization_name=app["organization_name"], - ) - cls._set_non_foreign_key_fields(da, app) - cls._set_foreign_key_fields(da, app, user) - da.save() - cls._set_many_to_many_relations(da, app) - except Exception as e: - logger.warning(e) - - -class DomainFixture(DomainRequestFixture): - """Create one domain and permissions on it for each user.""" - - @classmethod - def load(cls): - try: - users = list(User.objects.all()) # force evaluation to catch db errors - except Exception as e: - logger.warning(e) - return - - # Lumped under .atomic to ensure we don't make redundant DB calls. - # This bundles them all together, and then saves it in a single call. - with transaction.atomic(): - # approve each user associated with `in review` status domains - DomainFixture._approve_domain_requests(users) - - @staticmethod - def _approve_domain_requests(users): - """Approves all provided domain requests if they are in the state in_review""" - for user in users: - domain_request = DomainRequest.objects.filter( - creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW - ).last() - logger.debug(f"Approving {domain_request} for {user}") - - # All approvals require an investigator, so if there is none, - # assign one. - if domain_request.investigator is None: - # All "users" in fixtures have admin perms per prior config. - # No need to check for that. - domain_request.investigator = random.choice(users) # nosec - - domain_request.approve(send_email=False) - domain_request.save() diff --git a/src/registrar/management/commands/load.py b/src/registrar/management/commands/load.py index aac5eade1..a4cdb1ed5 100644 --- a/src/registrar/management/commands/load.py +++ b/src/registrar/management/commands/load.py @@ -1,11 +1,13 @@ import logging from django.core.management.base import BaseCommand -from auditlog.context import disable_auditlog # type: ignore - - -from registrar.fixtures_users import UserFixture -from registrar.fixtures_domain_requests import DomainRequestFixture, DomainFixture +from auditlog.context import disable_auditlog +from registrar.fixtures.fixtures_domains import DomainFixture +from registrar.fixtures.fixtures_portfolios import PortfolioFixture +from registrar.fixtures.fixtures_requests import DomainRequestFixture +from registrar.fixtures.fixtures_suborganizations import SuborganizationFixture +from registrar.fixtures.fixtures_user_portfolio_permissions import UserPortfolioPermissionFixture +from registrar.fixtures.fixtures_users import UserFixture # type: ignore logger = logging.getLogger(__name__) @@ -16,6 +18,9 @@ def handle(self, *args, **options): # https://github.com/jazzband/django-auditlog/issues/17 with disable_auditlog(): UserFixture.load() + PortfolioFixture.load() + SuborganizationFixture.load() DomainRequestFixture.load() DomainFixture.load() + UserPortfolioPermissionFixture.load() logger.info("All fixtures loaded.") diff --git a/src/registrar/models/user_group.py b/src/registrar/models/user_group.py index 76657fe29..182d16e95 100644 --- a/src/registrar/models/user_group.py +++ b/src/registrar/models/user_group.py @@ -113,7 +113,6 @@ def create_cisa_analyst_group(apps, schema_editor): + cisa_analysts_group.name ) - cisa_analysts_group.save() logger.debug("CISA Analyst permissions added to group " + cisa_analysts_group.name) except Exception as e: logger.error(f"Error creating analyst permissions group: {e}") @@ -135,7 +134,6 @@ def create_full_access_group(apps, schema_editor): # Assign all permissions to the group full_access_group.permissions.add(*all_permissions) - full_access_group.save() logger.debug("All permissions added to group " + full_access_group.name) except Exception as e: logger.error(f"Error creating full access group: {e}")