diff --git a/.github/actions/django-security-check/README.md b/.github/actions/django-security-check/README.md index 4eddcf74c..94f02a97c 100644 --- a/.github/actions/django-security-check/README.md +++ b/.github/actions/django-security-check/README.md @@ -38,7 +38,7 @@ jobs: id: check uses: victoriadrake/django-security-check@master - name: Upload output - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: security-check-output path: output.txt diff --git a/.github/workflows/security-check.yaml b/.github/workflows/security-check.yaml index dd75d5c98..aea700613 100644 --- a/.github/workflows/security-check.yaml +++ b/.github/workflows/security-check.yaml @@ -44,7 +44,7 @@ jobs: id: check uses: ./.github/actions/django-security-check - name: Upload output - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: security-check-output path: ./src/output.txt diff --git a/docs/operations/data_migration.md b/docs/operations/data_migration.md index 4301ca878..5e1aa688a 100644 --- a/docs/operations/data_migration.md +++ b/docs/operations/data_migration.md @@ -860,3 +860,55 @@ Example: `cf ssh getgov-za` ### Running locally ```docker-compose exec app ./manage.py populate_domain_request_dates``` + +## Create federal portfolio +This script takes the name of a `FederalAgency` (like 'AMTRAK') and does the following: +1. Creates the portfolio record based off of data on the federal agency object itself. +2. Creates suborganizations from existing DomainInformation records. +3. Associates the SeniorOfficial record (if it exists). +4. Adds this portfolio to DomainInformation / DomainRequests or both. + +Errors: +1. ValueError: Federal agency not found in database. +2. Logged Warning: No senior official found for portfolio +3. Logged Error: No suborganizations found for portfolio. +4. Logged Warning: No new suborganizations to add. +5. Logged Warning: No valid DomainRequest records to update. +6. Logged Warning: No valid DomainInformation records to update. + +### Running on sandboxes + +#### Step 1: Login to CloudFoundry +```cf login -a api.fr.cloud.gov --sso``` + +#### Step 2: SSH into your environment +```cf ssh getgov-{space}``` + +Example: `cf ssh getgov-za` + +#### Step 3: Create a shell instance +```/tmp/lifecycle/shell``` + +#### Step 4: Upload your csv to the desired sandbox +[Follow these steps](#use-scp-to-transfer-data-to-sandboxes) to upload the federal_cio csv to a sandbox of your choice. + +#### Step 5: Running the script +```./manage.py create_federal_portfolio "{federal_agency_name}" --both``` + +Example (only requests): `./manage.py create_federal_portfolio "AMTRAK" --parse_requests` + +### Running locally + +#### Step 1: Running the script +```docker-compose exec app ./manage.py create_federal_portfolio "{federal_agency_name}" --both``` + +##### Parameters +| | Parameter | Description | +|:-:|:-------------------------- |:-------------------------------------------------------------------------------------------| +| 1 | **federal_agency_name** | Name of the FederalAgency record surrounded by quotes. For instance,"AMTRAK". | +| 2 | **both** | If True, runs parse_requests and parse_domains. | +| 3 | **parse_requests** | If True, then the created portfolio is added to all related DomainRequests. | +| 4 | **parse_domains** | If True, then the created portfolio is added to all related Domains. | + +Note: Regarding parameters #2-#3, you cannot use `--both` while using these. You must specify either `--parse_requests` or `--parse_domains` seperately. While all of these parameters are optional in that you do not need to specify all of them, +you must specify at least one to run this script. diff --git a/src/epplibwrapper/cert.py b/src/epplibwrapper/cert.py index 15ff16c06..589736a04 100644 --- a/src/epplibwrapper/cert.py +++ b/src/epplibwrapper/cert.py @@ -1,7 +1,7 @@ import os import tempfile -from django.conf import settings +from django.conf import settings # type: ignore class Cert: @@ -12,7 +12,7 @@ class Cert: variable but Python's ssl library requires a file. """ - def __init__(self, data=settings.SECRET_REGISTRY_CERT) -> None: + def __init__(self, data=settings.SECRET_REGISTRY_CERT) -> None: # type: ignore self.filename = self._write(data) def __del__(self): @@ -31,4 +31,4 @@ class Key(Cert): """Location of private key as written to disk.""" def __init__(self) -> None: - super().__init__(data=settings.SECRET_REGISTRY_KEY) + super().__init__(data=settings.SECRET_REGISTRY_KEY) # type: ignore diff --git a/src/registrar/admin.py b/src/registrar/admin.py index e3bd5c9f7..fb830378c 100644 --- a/src/registrar/admin.py +++ b/src/registrar/admin.py @@ -963,7 +963,9 @@ def change_view(self, request, object_id, form_url="", extra_context=None): domain_ids = user_domain_roles.values_list("domain_id", flat=True) domains = Domain.objects.filter(id__in=domain_ids).exclude(state=Domain.State.DELETED) - extra_context = {"domain_requests": domain_requests, "domains": domains} + portfolio_ids = obj.get_portfolios().values_list("portfolio", flat=True) + portfolios = models.Portfolio.objects.filter(id__in=portfolio_ids) + extra_context = {"domain_requests": domain_requests, "domains": domains, "portfolios": portfolios} return super().change_view(request, object_id, form_url, extra_context) diff --git a/src/registrar/assets/js/get-gov-admin.js b/src/registrar/assets/js/get-gov-admin.js index 27ff1470b..7ff02ba1f 100644 --- a/src/registrar/assets/js/get-gov-admin.js +++ b/src/registrar/assets/js/get-gov-admin.js @@ -748,7 +748,10 @@ function initializeWidgetOnList(list, parentId) { //------ Requested Domains const requestedDomainElement = document.getElementById('id_requested_domain'); - const requestedDomain = requestedDomainElement.options[requestedDomainElement.selectedIndex].text; + // We have to account for different superuser and analyst markups + const requestedDomain = requestedDomainElement.options + ? requestedDomainElement.options[requestedDomainElement.selectedIndex].text + : requestedDomainElement.text; //------ Submitter // Function to extract text by ID and handle missing elements @@ -762,7 +765,10 @@ function initializeWidgetOnList(list, parentId) { // Extract the submitter name, title, email, and phone number const submitterDiv = document.querySelector('.form-row.field-submitter'); const submitterNameElement = document.getElementById('id_submitter'); - const submitterName = submitterNameElement.options[submitterNameElement.selectedIndex].text; + // We have to account for different superuser and analyst markups + const submitterName = submitterNameElement + ? submitterNameElement.options[submitterNameElement.selectedIndex].text + : submitterDiv.querySelector('a').text; const submitterTitle = extractTextById('contact_info_title', submitterDiv); const submitterEmail = extractTextById('contact_info_email', submitterDiv); const submitterPhone = extractTextById('contact_info_phone', submitterDiv); diff --git a/src/registrar/config/settings.py b/src/registrar/config/settings.py index 7965424bc..96740a15c 100644 --- a/src/registrar/config/settings.py +++ b/src/registrar/config/settings.py @@ -23,6 +23,9 @@ from pathlib import Path from typing import Final from botocore.config import Config +import json +import logging +from django.utils.log import ServerFormatter # # # ### # Setup code goes here # @@ -57,7 +60,7 @@ env_debug = env.bool("DJANGO_DEBUG", default=False) env_is_production = env.bool("IS_PRODUCTION", default=False) env_log_level = env.str("DJANGO_LOG_LEVEL", "DEBUG") -env_base_url = env.str("DJANGO_BASE_URL") +env_base_url: str = env.str("DJANGO_BASE_URL") env_getgov_public_site_url = env.str("GETGOV_PUBLIC_SITE_URL", "") env_oidc_active_provider = env.str("OIDC_ACTIVE_PROVIDER", "identity sandbox") @@ -192,7 +195,7 @@ "registrar.registrar_middleware.CheckPortfolioMiddleware", ] -# application object used by Django’s built-in servers (e.g. `runserver`) +# application object used by Django's built-in servers (e.g. `runserver`) WSGI_APPLICATION = "registrar.config.wsgi.application" # endregion @@ -415,7 +418,7 @@ # and to interpret datetimes entered in forms TIME_ZONE = "UTC" -# enable Django’s translation system +# enable Django's translation system USE_I18N = True # enable localized formatting of numbers and dates @@ -450,6 +453,40 @@ # logger.error("Can't do this important task. Something is very wrong.") # logger.critical("Going to crash now.") + +class JsonFormatter(logging.Formatter): + """Formats logs into JSON for better parsing""" + + def __init__(self): + super().__init__(datefmt="%d/%b/%Y %H:%M:%S") + + def format(self, record): + log_record = { + "timestamp": self.formatTime(record, self.datefmt), + "level": record.levelname, + "name": record.name, + "lineno": record.lineno, + "message": record.getMessage(), + } + return json.dumps(log_record) + + +class JsonServerFormatter(ServerFormatter): + """Formats server logs into JSON for better parsing""" + + def format(self, record): + formatted_record = super().format(record) + log_entry = {"server_time": record.server_time, "level": record.levelname, "message": formatted_record} + return json.dumps(log_entry) + + +# default to json formatted logs +server_formatter, console_formatter = "json.server", "json" + +# don't use json format locally, it makes logs hard to read in console +if "localhost" in env_base_url: + server_formatter, console_formatter = "django.server", "verbose" + LOGGING = { "version": 1, # Don't import Django's existing loggers @@ -469,6 +506,12 @@ "format": "[{server_time}] {message}", "style": "{", }, + "json.server": { + "()": JsonServerFormatter, + }, + "json": { + "()": JsonFormatter, + }, }, # define where log messages will be sent; # each logger can have one or more handlers @@ -476,12 +519,12 @@ "console": { "level": env_log_level, "class": "logging.StreamHandler", - "formatter": "verbose", + "formatter": console_formatter, }, "django.server": { "level": "INFO", "class": "logging.StreamHandler", - "formatter": "django.server", + "formatter": server_formatter, }, # No file logger is configured, # because containerized apps diff --git a/src/registrar/fixtures_users.py b/src/registrar/fixtures_users.py index 1b8eda9ab..7fbf41223 100644 --- a/src/registrar/fixtures_users.py +++ b/src/registrar/fixtures_users.py @@ -56,6 +56,7 @@ class UserFixture: "username": "8f8e7293-17f7-4716-889b-1990241cbd39", "first_name": "Katherine", "last_name": "Osos", + "email": "kosos@truss.works", }, { "username": "70488e0a-e937-4894-a28c-16f5949effd4", @@ -171,7 +172,7 @@ class UserFixture: "username": "91a9b97c-bd0a-458d-9823-babfde7ebf44", "first_name": "Katherine-Analyst", "last_name": "Osos-Analyst", - "email": "kosos@truss.works", + "email": "kosos+1@truss.works", }, { "username": "2cc0cde8-8313-4a50-99d8-5882e71443e8", diff --git a/src/registrar/management/commands/create_federal_portfolio.py b/src/registrar/management/commands/create_federal_portfolio.py new file mode 100644 index 000000000..d05a2911b --- /dev/null +++ b/src/registrar/management/commands/create_federal_portfolio.py @@ -0,0 +1,255 @@ +"""Loads files from /tmp into our sandboxes""" + +import argparse +import logging +from django.core.management import BaseCommand, CommandError +from registrar.management.commands.utility.terminal_helper import TerminalColors, TerminalHelper +from registrar.models import DomainInformation, DomainRequest, FederalAgency, Suborganization, Portfolio, User + + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Creates a federal portfolio given a FederalAgency name" + + def add_arguments(self, parser): + """Add three arguments: + 1. agency_name => the value of FederalAgency.agency + 2. --parse_requests => if true, adds the given portfolio to each related DomainRequest + 3. --parse_domains => if true, adds the given portfolio to each related DomainInformation + """ + parser.add_argument( + "agency_name", + help="The name of the FederalAgency to add", + ) + parser.add_argument( + "--parse_requests", + action=argparse.BooleanOptionalAction, + help="Adds portfolio to DomainRequests", + ) + parser.add_argument( + "--parse_domains", + action=argparse.BooleanOptionalAction, + help="Adds portfolio to DomainInformation", + ) + parser.add_argument( + "--both", + action=argparse.BooleanOptionalAction, + help="Adds portfolio to both requests and domains", + ) + + def handle(self, agency_name, **options): + parse_requests = options.get("parse_requests") + parse_domains = options.get("parse_domains") + both = options.get("both") + + if not both: + if not parse_requests and not parse_domains: + raise CommandError("You must specify at least one of --parse_requests or --parse_domains.") + else: + if parse_requests or parse_domains: + raise CommandError("You cannot pass --parse_requests or --parse_domains when passing --both.") + + federal_agency = FederalAgency.objects.filter(agency__iexact=agency_name).first() + if not federal_agency: + raise ValueError( + f"Cannot find the federal agency '{agency_name}' in our database. " + "The value you enter for `agency_name` must be " + "prepopulated in the FederalAgency table before proceeding." + ) + + portfolio = self.create_or_modify_portfolio(federal_agency) + self.create_suborganizations(portfolio, federal_agency) + + if parse_requests or both: + self.handle_portfolio_requests(portfolio, federal_agency) + + if parse_domains or both: + self.handle_portfolio_domains(portfolio, federal_agency) + + def create_or_modify_portfolio(self, federal_agency): + """Creates or modifies a portfolio record based on a federal agency.""" + portfolio_args = { + "federal_agency": federal_agency, + "organization_name": federal_agency.agency, + "organization_type": DomainRequest.OrganizationChoices.FEDERAL, + "creator": User.get_default_user(), + "notes": "Auto-generated record", + } + + if federal_agency.so_federal_agency.exists(): + portfolio_args["senior_official"] = federal_agency.so_federal_agency.first() + + portfolio, created = Portfolio.objects.get_or_create( + organization_name=portfolio_args.get("organization_name"), defaults=portfolio_args + ) + + if created: + message = f"Created portfolio '{portfolio}'" + TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message) + + if portfolio_args.get("senior_official"): + message = f"Added senior official '{portfolio_args['senior_official']}'" + TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message) + else: + message = ( + f"No senior official added to portfolio '{portfolio}'. " + "None was returned for the reverse relation `FederalAgency.so_federal_agency.first()`" + ) + TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message) + else: + proceed = TerminalHelper.prompt_for_execution( + system_exit_on_terminate=False, + prompt_message=f""" + The given portfolio '{federal_agency.agency}' already exists in our DB. + If you cancel, the rest of the script will still execute but this record will not update. + """, + prompt_title="Do you wish to modify this record?", + ) + if proceed: + + # Don't override the creator and notes fields + if portfolio.creator: + portfolio_args.pop("creator") + + if portfolio.notes: + portfolio_args.pop("notes") + + # Update everything else + for key, value in portfolio_args.items(): + setattr(portfolio, key, value) + + portfolio.save() + message = f"Modified portfolio '{portfolio}'" + TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) + + if portfolio_args.get("senior_official"): + message = f"Added/modified senior official '{portfolio_args['senior_official']}'" + TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) + + return portfolio + + def create_suborganizations(self, portfolio: Portfolio, federal_agency: FederalAgency): + """Create Suborganizations tied to the given portfolio based on DomainInformation objects""" + valid_agencies = DomainInformation.objects.filter( + federal_agency=federal_agency, organization_name__isnull=False + ) + org_names = set(valid_agencies.values_list("organization_name", flat=True)) + + if not org_names: + message = ( + "Could not add any suborganizations." + f"\nNo suborganizations were found for '{federal_agency}' when filtering on this name, " + "and excluding null organization_name records." + ) + TerminalHelper.colorful_logger(logger.warning, TerminalColors.FAIL, message) + return + + # Check if we need to update any existing suborgs first. This step is optional. + existing_suborgs = Suborganization.objects.filter(name__in=org_names) + if existing_suborgs.exists(): + self._update_existing_suborganizations(portfolio, existing_suborgs) + + # Create new suborgs, as long as they don't exist in the db already + new_suborgs = [] + for name in org_names - set(existing_suborgs.values_list("name", flat=True)): + # Stored in variables due to linter wanting type information here. + portfolio_name: str = portfolio.organization_name if portfolio.organization_name is not None else "" + if name is not None and name.lower() == portfolio_name.lower(): + # You can use this to populate location information, when this occurs. + # However, this isn't needed for now so we can skip it. + message = ( + f"Skipping suborganization create on record '{name}'. " + "The federal agency name is the same as the portfolio name." + ) + TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, message) + else: + new_suborgs.append(Suborganization(name=name, portfolio=portfolio)) # type: ignore + + if new_suborgs: + Suborganization.objects.bulk_create(new_suborgs) + TerminalHelper.colorful_logger( + logger.info, TerminalColors.OKGREEN, f"Added {len(new_suborgs)} suborganizations" + ) + else: + TerminalHelper.colorful_logger(logger.warning, TerminalColors.YELLOW, "No suborganizations added") + + def _update_existing_suborganizations(self, portfolio, orgs_to_update): + """ + Update existing suborganizations with new portfolio. + Prompts for user confirmation before proceeding. + """ + proceed = TerminalHelper.prompt_for_execution( + system_exit_on_terminate=False, + prompt_message=f"""Some suborganizations already exist in our DB. + If you cancel, the rest of the script will still execute but these records will not update. + + ==Proposed Changes== + The following suborgs will be updated: {[org.name for org in orgs_to_update]} + """, + prompt_title="Do you wish to modify existing suborganizations?", + ) + if proceed: + for org in orgs_to_update: + org.portfolio = portfolio + + Suborganization.objects.bulk_update(orgs_to_update, ["portfolio"]) + message = f"Updated {len(orgs_to_update)} suborganizations." + TerminalHelper.colorful_logger(logger.info, TerminalColors.MAGENTA, message) + + def handle_portfolio_requests(self, portfolio: Portfolio, federal_agency: FederalAgency): + """ + Associate portfolio with domain requests for a federal agency. + Updates all relevant domain request records. + """ + invalid_states = [ + DomainRequest.DomainRequestStatus.STARTED, + DomainRequest.DomainRequestStatus.INELIGIBLE, + DomainRequest.DomainRequestStatus.REJECTED, + ] + domain_requests = DomainRequest.objects.filter(federal_agency=federal_agency).exclude(status__in=invalid_states) + if not domain_requests.exists(): + message = f""" + Portfolios not added to domain requests: no valid records found. + This means that a filter on DomainInformation for the federal_agency '{federal_agency}' returned no results. + Excluded statuses: STARTED, INELIGIBLE, REJECTED. + """ + TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message) + return None + + # Get all suborg information and store it in a dict to avoid doing a db call + suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name") + for domain_request in domain_requests: + domain_request.portfolio = portfolio + if domain_request.organization_name in suborgs: + domain_request.sub_organization = suborgs.get(domain_request.organization_name) + + DomainRequest.objects.bulk_update(domain_requests, ["portfolio", "sub_organization"]) + message = f"Added portfolio '{portfolio}' to {len(domain_requests)} domain requests." + TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message) + + def handle_portfolio_domains(self, portfolio: Portfolio, federal_agency: FederalAgency): + """ + Associate portfolio with domains for a federal agency. + Updates all relevant domain information records. + """ + domain_infos = DomainInformation.objects.filter(federal_agency=federal_agency) + if not domain_infos.exists(): + message = f""" + Portfolios not added to domains: no valid records found. + This means that a filter on DomainInformation for the federal_agency '{federal_agency}' returned no results. + """ + TerminalHelper.colorful_logger(logger.info, TerminalColors.YELLOW, message) + return None + + # Get all suborg information and store it in a dict to avoid doing a db call + suborgs = Suborganization.objects.filter(portfolio=portfolio).in_bulk(field_name="name") + for domain_info in domain_infos: + domain_info.portfolio = portfolio + if domain_info.organization_name in suborgs: + domain_info.sub_organization = suborgs.get(domain_info.organization_name) + + DomainInformation.objects.bulk_update(domain_infos, ["portfolio", "sub_organization"]) + message = f"Added portfolio '{portfolio}' to {len(domain_infos)} domains" + TerminalHelper.colorful_logger(logger.info, TerminalColors.OKGREEN, message) diff --git a/src/registrar/management/commands/transfer_transition_domains_to_domains.py b/src/registrar/management/commands/transfer_transition_domains_to_domains.py index 615df50a5..727db6dab 100644 --- a/src/registrar/management/commands/transfer_transition_domains_to_domains.py +++ b/src/registrar/management/commands/transfer_transition_domains_to_domains.py @@ -423,7 +423,7 @@ def create_new_domain_info( valid_fed_type = fed_type in fed_choices valid_fed_agency = fed_agency in agency_choices - default_creator, _ = User.objects.get_or_create(username="System") + default_creator = User.get_default_user() new_domain_info_data = { "domain": domain, diff --git a/src/registrar/models/user.py b/src/registrar/models/user.py index 12bfaa370..929a63525 100644 --- a/src/registrar/models/user.py +++ b/src/registrar/models/user.py @@ -131,6 +131,12 @@ def __str__(self): else: return self.username + @classmethod + def get_default_user(cls): + """Returns the default "system" user""" + default_creator, _ = User.objects.get_or_create(username="System") + return default_creator + def restrict_user(self): self.status = self.RESTRICTED self.save() @@ -300,6 +306,9 @@ def portfolio_role_summary(self, portfolio): return roles + def get_portfolios(self): + return self.portfolio_permissions.all() + @classmethod def needs_identity_verification(cls, email, uuid): """A method used by our oidc classes to test whether a user needs email/uuid verification diff --git a/src/registrar/templates/django/admin/includes/detail_table_fieldset.html b/src/registrar/templates/django/admin/includes/detail_table_fieldset.html index 5e1057139..db156b033 100644 --- a/src/registrar/templates/django/admin/includes/detail_table_fieldset.html +++ b/src/registrar/templates/django/admin/includes/detail_table_fieldset.html @@ -107,7 +107,7 @@ {% endif %} {% elif field.field.name == "requested_domain" %} {% with current_path=request.get_full_path %} - {{ original.requested_domain }} + {{ original.requested_domain }} {% endwith%} {% elif field.field.name == "current_websites" %} {% comment %} diff --git a/src/registrar/templates/django/admin/user_change_form.html b/src/registrar/templates/django/admin/user_change_form.html index c0ddd8caf..736f12ba4 100644 --- a/src/registrar/templates/django/admin/user_change_form.html +++ b/src/registrar/templates/django/admin/user_change_form.html @@ -17,6 +17,26 @@ {% endblock %} {% block after_related_objects %} + {% if portfolios %} +
+

Portfolio information

+
+
+

Portfolios

+ +
+
+
+ {% endif %} +

Associated requests and domains

diff --git a/src/registrar/tests/common.py b/src/registrar/tests/common.py index bcd45f103..5e418da2b 100644 --- a/src/registrar/tests/common.py +++ b/src/registrar/tests/common.py @@ -911,6 +911,7 @@ def completed_domain_request( # noqa federal_type=None, action_needed_reason=None, portfolio=None, + organization_name=None, ): """A completed domain request.""" if not user: @@ -954,7 +955,7 @@ def completed_domain_request( # noqa federal_type="executive", purpose="Purpose of the site", is_policy_acknowledged=True, - organization_name="Testorg", + organization_name=organization_name if organization_name else "Testorg", address_line1="address 1", address_line2="address 2", state_territory="NY", diff --git a/src/registrar/tests/test_admin.py b/src/registrar/tests/test_admin.py index 25d7e5fd2..83114b3b3 100644 --- a/src/registrar/tests/test_admin.py +++ b/src/registrar/tests/test_admin.py @@ -2,6 +2,7 @@ from django.utils import timezone from django.test import TestCase, RequestFactory, Client from django.contrib.admin.sites import AdminSite +from django_webtest import WebTest # type: ignore from api.tests.common import less_console_noise_decorator from django.urls import reverse from registrar.admin import ( @@ -41,13 +42,12 @@ TransitionDomain, Portfolio, Suborganization, + UserPortfolioPermission, + UserDomainRole, + SeniorOfficial, + PortfolioInvitation, + VerifiedByStaff, ) -from registrar.models.portfolio_invitation import PortfolioInvitation -from registrar.models.senior_official import SeniorOfficial -from registrar.models.user_domain_role import UserDomainRole -from registrar.models.user_portfolio_permission import UserPortfolioPermission -from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices -from registrar.models.verified_by_staff import VerifiedByStaff from .common import ( MockDbForSharedTests, AuditedAdminMockData, @@ -60,10 +60,11 @@ multiple_unalphabetical_domain_objects, GenericTestHelper, ) +from registrar.models.utility.portfolio_helper import UserPortfolioPermissionChoices, UserPortfolioRoleChoices from django.contrib.sessions.backends.db import SessionStore from django.contrib.auth import get_user_model from unittest.mock import ANY, patch, Mock -from django_webtest import WebTest # type: ignore + import logging @@ -973,7 +974,7 @@ def test_get_filters(self): ) -class TestMyUserAdmin(MockDbForSharedTests): +class TestMyUserAdmin(MockDbForSharedTests, WebTest): """Tests for the MyUserAdmin class as super or staff user Notes: @@ -993,6 +994,7 @@ def setUpClass(cls): def setUp(self): super().setUp() + self.app.set_user(self.superuser.username) self.client = Client(HTTP_HOST="localhost:8080") def tearDown(self): @@ -1227,6 +1229,20 @@ def test_analyst_cannot_see_selects_for_portfolio_role_and_permissions_in_user_f self.assertNotContains(response, "Portfolio roles:") self.assertNotContains(response, "Portfolio additional permissions:") + @less_console_noise_decorator + def test_user_can_see_related_portfolios(self): + """Tests if a user can see the portfolios they are associated with on the user page""" + portfolio, _ = Portfolio.objects.get_or_create(organization_name="test", creator=self.superuser) + permission, _ = UserPortfolioPermission.objects.get_or_create( + user=self.superuser, portfolio=portfolio, roles=[UserPortfolioRoleChoices.ORGANIZATION_ADMIN] + ) + response = self.app.get(reverse("admin:registrar_user_change", args=[self.superuser.pk])) + expected_href = reverse("admin:registrar_portfolio_change", args=[portfolio.pk]) + self.assertContains(response, expected_href) + self.assertContains(response, str(portfolio)) + permission.delete() + portfolio.delete() + class AuditedAdminTest(TestCase): diff --git a/src/registrar/tests/test_management_scripts.py b/src/registrar/tests/test_management_scripts.py index 1958454f5..cbdc2c034 100644 --- a/src/registrar/tests/test_management_scripts.py +++ b/src/registrar/tests/test_management_scripts.py @@ -1,4 +1,5 @@ import copy +import boto3_mocking # type: ignore from datetime import date, datetime, time from django.core.management import call_command from django.test import TestCase, override_settings @@ -8,6 +9,7 @@ from django.utils.module_loading import import_string import logging import pyzipper +from django.core.management.base import CommandError from registrar.management.commands.clean_tables import Command as CleanTablesCommand from registrar.management.commands.export_tables import Command as ExportTablesCommand from registrar.models import ( @@ -23,14 +25,17 @@ VerifiedByStaff, PublicContact, FederalAgency, + Portfolio, + Suborganization, ) import tablib from unittest.mock import patch, call, MagicMock, mock_open from epplibwrapper import commands, common -from .common import MockEppLib, less_console_noise, completed_domain_request +from .common import MockEppLib, less_console_noise, completed_domain_request, MockSESClient from api.tests.common import less_console_noise_decorator + logger = logging.getLogger(__name__) @@ -1408,3 +1413,137 @@ def test_populate_federal_agency_initials_and_fceb_missing_agency(self): missing_agency.refresh_from_db() self.assertIsNone(missing_agency.initials) self.assertIsNone(missing_agency.is_fceb) + + +class TestCreateFederalPortfolio(TestCase): + + @less_console_noise_decorator + def setUp(self): + self.mock_client = MockSESClient() + self.user = User.objects.create(username="testuser") + self.federal_agency = FederalAgency.objects.create(agency="Test Federal Agency") + self.senior_official = SeniorOfficial.objects.create( + first_name="first", last_name="last", email="testuser@igorville.gov", federal_agency=self.federal_agency + ) + with boto3_mocking.clients.handler_for("sesv2", self.mock_client): + self.domain_request = completed_domain_request( + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + generic_org_type=DomainRequest.OrganizationChoices.CITY, + federal_agency=self.federal_agency, + user=self.user, + ) + self.domain_request.approve() + self.domain_info = DomainInformation.objects.filter(domain_request=self.domain_request).get() + + self.domain_request_2 = completed_domain_request( + name="sock@igorville.org", + status=DomainRequest.DomainRequestStatus.IN_REVIEW, + generic_org_type=DomainRequest.OrganizationChoices.CITY, + federal_agency=self.federal_agency, + user=self.user, + organization_name="Test Federal Agency", + ) + self.domain_request_2.approve() + self.domain_info_2 = DomainInformation.objects.filter(domain_request=self.domain_request_2).get() + + def tearDown(self): + DomainInformation.objects.all().delete() + DomainRequest.objects.all().delete() + Suborganization.objects.all().delete() + Portfolio.objects.all().delete() + SeniorOfficial.objects.all().delete() + FederalAgency.objects.all().delete() + User.objects.all().delete() + + @less_console_noise_decorator + def run_create_federal_portfolio(self, agency_name, parse_requests=False, parse_domains=False): + with patch( + "registrar.management.commands.utility.terminal_helper.TerminalHelper.query_yes_no_exit", + return_value=True, + ): + call_command( + "create_federal_portfolio", agency_name, parse_requests=parse_requests, parse_domains=parse_domains + ) + + def test_create_or_modify_portfolio(self): + """Test portfolio creation and modification with suborg and senior official.""" + self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True) + + portfolio = Portfolio.objects.get(federal_agency=self.federal_agency) + self.assertEqual(portfolio.organization_name, self.federal_agency.agency) + self.assertEqual(portfolio.organization_type, DomainRequest.OrganizationChoices.FEDERAL) + self.assertEqual(portfolio.creator, User.get_default_user()) + self.assertEqual(portfolio.notes, "Auto-generated record") + + # Test the suborgs + suborganizations = Suborganization.objects.filter(portfolio__federal_agency=self.federal_agency) + self.assertEqual(suborganizations.count(), 1) + self.assertEqual(suborganizations.first().name, "Testorg") + + # Test the senior official + self.assertEqual(portfolio.senior_official, self.senior_official) + + def test_handle_portfolio_requests(self): + """Verify portfolio association with domain requests.""" + self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True) + + self.domain_request.refresh_from_db() + self.assertIsNotNone(self.domain_request.portfolio) + self.assertEqual(self.domain_request.portfolio.federal_agency, self.federal_agency) + self.assertEqual(self.domain_request.sub_organization.name, "Testorg") + + def test_handle_portfolio_domains(self): + """Check portfolio association with domain information.""" + self.run_create_federal_portfolio("Test Federal Agency", parse_domains=True) + + self.domain_info.refresh_from_db() + self.assertIsNotNone(self.domain_info.portfolio) + self.assertEqual(self.domain_info.portfolio.federal_agency, self.federal_agency) + self.assertEqual(self.domain_info.sub_organization.name, "Testorg") + + def test_handle_parse_both(self): + """Ensure correct parsing of both requests and domains.""" + self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True, parse_domains=True) + + self.domain_request.refresh_from_db() + self.domain_info.refresh_from_db() + self.assertIsNotNone(self.domain_request.portfolio) + self.assertIsNotNone(self.domain_info.portfolio) + self.assertEqual(self.domain_request.portfolio, self.domain_info.portfolio) + + def test_command_error_no_parse_options(self): + """Verify error when no parse options are provided.""" + with self.assertRaisesRegex( + CommandError, "You must specify at least one of --parse_requests or --parse_domains." + ): + self.run_create_federal_portfolio("Test Federal Agency") + + def test_command_error_agency_not_found(self): + """Check error handling for non-existent agency.""" + expected_message = ( + "Cannot find the federal agency 'Non-existent Agency' in our database. " + "The value you enter for `agency_name` must be prepopulated in the FederalAgency table before proceeding." + ) + with self.assertRaisesRegex(ValueError, expected_message): + self.run_create_federal_portfolio("Non-existent Agency", parse_requests=True) + + def test_update_existing_portfolio(self): + """Test updating an existing portfolio.""" + # Create an existing portfolio + existing_portfolio = Portfolio.objects.create( + federal_agency=self.federal_agency, + organization_name="Test Federal Agency", + organization_type=DomainRequest.OrganizationChoices.CITY, + creator=self.user, + notes="Old notes", + ) + + self.run_create_federal_portfolio("Test Federal Agency", parse_requests=True) + + existing_portfolio.refresh_from_db() + self.assertEqual(existing_portfolio.organization_name, self.federal_agency.agency) + self.assertEqual(existing_portfolio.organization_type, DomainRequest.OrganizationChoices.FEDERAL) + + # Notes and creator should be untouched + self.assertEqual(existing_portfolio.notes, "Old notes") + self.assertEqual(existing_portfolio.creator, self.user) diff --git a/src/registrar/views/domain.py b/src/registrar/views/domain.py index 511c55228..f6c87d659 100644 --- a/src/registrar/views/domain.py +++ b/src/registrar/views/domain.py @@ -837,6 +837,23 @@ def _send_domain_invitation_email(self, email: str, requestor: User, add_success ) return None + # Check to see if an invite has already been sent + try: + invite = DomainInvitation.objects.get(email=email, domain=self.object) + # check if the invite has already been accepted + if invite.status == DomainInvitation.DomainInvitationStatus.RETRIEVED: + add_success = False + messages.warning( + self.request, + f"{email} is already a manager for this domain.", + ) + else: + add_success = False + # else if it has been sent but not accepted + messages.warning(self.request, f"{email} has already been invited to this domain") + except Exception: + logger.error("An error occured") + try: send_templated_email( "emails/domain_invitation.txt", @@ -862,24 +879,13 @@ def _send_domain_invitation_email(self, email: str, requestor: User, add_success def _make_invitation(self, email_address: str, requestor: User): """Make a Domain invitation for this email and redirect with a message.""" - # Check to see if an invite has already been sent (NOTE: we do not want to create an invite just yet.) try: - invite = DomainInvitation.objects.get(email=email_address, domain=self.object) - # that invitation already existed - if invite is not None: - messages.warning( - self.request, - f"{email_address} has already been invited to this domain.", - ) - except DomainInvitation.DoesNotExist: - # Try to send the invitation. If it succeeds, add it to the DomainInvitation table. - try: - self._send_domain_invitation_email(email=email_address, requestor=requestor) - except EmailSendingError: - messages.warning(self.request, "Could not send email invitation.") - else: - # (NOTE: only create a domainInvitation if the e-mail sends correctly) - DomainInvitation.objects.get_or_create(email=email_address, domain=self.object) + self._send_domain_invitation_email(email=email_address, requestor=requestor) + except EmailSendingError: + messages.warning(self.request, "Could not send email invitation.") + else: + # (NOTE: only create a domainInvitation if the e-mail sends correctly) + DomainInvitation.objects.get_or_create(email=email_address, domain=self.object) return redirect(self.get_success_url()) def form_valid(self, form): @@ -919,11 +925,9 @@ def form_valid(self, form): role=UserDomainRole.Roles.MANAGER, ) except IntegrityError: - # User already has the desired role! Do nothing?? - pass - - messages.success(self.request, f"Added user {requested_email}.") - + messages.warning(self.request, f"{requested_email} is already a manager for this domain") + else: + messages.success(self.request, f"Added user {requested_email}.") return redirect(self.get_success_url())