diff --git a/src/registrar/templates/home.html b/src/registrar/templates/home.html
index a79065f50..93f1243ea 100644
--- a/src/registrar/templates/home.html
+++ b/src/registrar/templates/home.html
@@ -15,6 +15,7 @@
{% endblock %}
Manage your domains
+
diff --git a/src/registrar/tests/test_reports.py b/src/registrar/tests/test_reports.py
index 630904218..011c60b93 100644
--- a/src/registrar/tests/test_reports.py
+++ b/src/registrar/tests/test_reports.py
@@ -7,13 +7,14 @@
from registrar.models.public_contact import PublicContact
from registrar.models.user import User
from django.contrib.auth import get_user_model
+from registrar.models.user_domain_role import UserDomainRole
from registrar.tests.common import MockEppLib
from registrar.utility.csv_export import (
- write_header,
- write_body,
+ write_csv,
get_default_start_date,
get_default_end_date,
)
+
from django.core.management import call_command
from unittest.mock import MagicMock, call, mock_open, patch
from api.views import get_current_federal, get_current_full
@@ -336,11 +337,30 @@ def setUp(self):
federal_agency="Armed Forces Retirement Home",
)
+ meoward_user = get_user_model().objects.create(
+ username="meoward_username", first_name="first_meoward", last_name="last_meoward", email="meoward@rocks.com"
+ )
+
+ # Test for more than 1 domain manager
+ _, created = UserDomainRole.objects.get_or_create(
+ user=meoward_user, domain=self.domain_1, role=UserDomainRole.Roles.MANAGER
+ )
+
+ _, created = UserDomainRole.objects.get_or_create(
+ user=self.user, domain=self.domain_1, role=UserDomainRole.Roles.MANAGER
+ )
+
+ # Test for just 1 domain manager
+ _, created = UserDomainRole.objects.get_or_create(
+ user=meoward_user, domain=self.domain_2, role=UserDomainRole.Roles.MANAGER
+ )
+
def tearDown(self):
PublicContact.objects.all().delete()
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
User.objects.all().delete()
+ UserDomainRole.objects.all().delete()
super().tearDown()
def test_export_domains_to_writer_security_emails(self):
@@ -383,8 +403,10 @@ def test_export_domains_to_writer_security_emails(self):
}
self.maxDiff = None
# Call the export functions
- write_header(writer, columns)
- write_body(writer, columns, sort_fields, filter_condition)
+ write_csv(
+ writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True
+ )
+
# Reset the CSV file's position to the beginning
csv_file.seek(0)
# Read the content into a variable
@@ -405,7 +427,7 @@ def test_export_domains_to_writer_security_emails(self):
expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
self.assertEqual(csv_content, expected_content)
- def test_write_body(self):
+ def test_write_csv(self):
"""Test that write_body returns the
existing domain, test that sort by domain name works,
test that filter works"""
@@ -440,8 +462,9 @@ def test_write_body(self):
],
}
# Call the export functions
- write_header(writer, columns)
- write_body(writer, columns, sort_fields, filter_condition)
+ write_csv(
+ writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True
+ )
# Reset the CSV file's position to the beginning
csv_file.seek(0)
# Read the content into a variable
@@ -489,8 +512,9 @@ def test_write_body_additional(self):
],
}
# Call the export functions
- write_header(writer, columns)
- write_body(writer, columns, sort_fields, filter_condition)
+ write_csv(
+ writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True
+ )
# Reset the CSV file's position to the beginning
csv_file.seek(0)
# Read the content into a variable
@@ -567,20 +591,22 @@ def test_write_body_with_date_filter_pulls_domains_in_range(self):
}
# Call the export functions
- write_header(writer, columns)
- write_body(
+ write_csv(
writer,
columns,
sort_fields,
filter_condition,
+ get_domain_managers=False,
+ should_write_header=True,
)
- write_body(
+ write_csv(
writer,
columns,
sort_fields_for_deleted_domains,
filter_conditions_for_deleted_domains,
+ get_domain_managers=False,
+ should_write_header=False,
)
-
# Reset the CSV file's position to the beginning
csv_file.seek(0)
@@ -606,6 +632,64 @@ def test_write_body_with_date_filter_pulls_domains_in_range(self):
self.assertEqual(csv_content, expected_content)
+ def test_export_domains_to_writer_domain_managers(self):
+ """Test that export_domains_to_writer returns the
+ expected domain managers"""
+ with less_console_noise():
+ # Create a CSV file in memory
+ csv_file = StringIO()
+ writer = csv.writer(csv_file)
+ # Define columns, sort fields, and filter condition
+
+ columns = [
+ "Domain name",
+ "Status",
+ "Expiration date",
+ "Domain type",
+ "Agency",
+ "Organization name",
+ "City",
+ "State",
+ "AO",
+ "AO email",
+ "Security contact email",
+ ]
+ sort_fields = ["domain__name"]
+ filter_condition = {
+ "domain__state__in": [
+ Domain.State.READY,
+ Domain.State.DNS_NEEDED,
+ Domain.State.ON_HOLD,
+ ],
+ }
+ self.maxDiff = None
+ # Call the export functions
+ write_csv(
+ writer, columns, sort_fields, filter_condition, get_domain_managers=True, should_write_header=True
+ )
+
+ # Reset the CSV file's position to the beginning
+ csv_file.seek(0)
+ # Read the content into a variable
+ csv_content = csv_file.read()
+ # We expect READY domains,
+ # sorted alphabetially by domain name
+ expected_content = (
+ "Domain name,Status,Expiration date,Domain type,Agency,"
+ "Organization name,City,State,AO,AO email,"
+ "Security contact email,Domain manager email 1,Domain manager email 2,\n"
+ "adomain10.gov,Ready,,Federal,Armed Forces Retirement Home,,,, , ,\n"
+ "adomain2.gov,Dns needed,,Interstate,,,,, , , ,meoward@rocks.com\n"
+ "cdomain1.gov,Ready,,Federal - Executive,World War I Centennial Commission,,,"
+ ", , , ,meoward@rocks.com,info@example.com\n"
+ "ddomain3.gov,On hold,,Federal,Armed Forces Retirement Home,,,, , , ,,\n"
+ )
+ # Normalize line endings and remove commas,
+ # spaces and leading/trailing whitespace
+ csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip()
+ expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
+ self.assertEqual(csv_content, expected_content)
+
class HelperFunctions(TestCase):
"""This asserts that 1=1. Its limited usefulness lies in making sure the helper methods stay healthy."""
diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py
index a188fb91c..90e80f551 100644
--- a/src/registrar/utility/csv_export.py
+++ b/src/registrar/utility/csv_export.py
@@ -17,8 +17,9 @@
def write_header(writer, columns):
"""
Receives params from the parent methods and outputs a CSV with a header row.
- Works with write_header as longas the same writer object is passed.
+ Works with write_header as long as the same writer object is passed.
"""
+
writer.writerow(columns)
@@ -43,7 +44,7 @@ def get_domain_infos(filter_condition, sort_fields):
return domain_infos_cleaned
-def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None):
+def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None, get_domain_managers=False):
"""Given a set of columns, generate a new row from cleaned column data"""
# Domain should never be none when parsing this information
@@ -77,6 +78,8 @@ def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None
# create a dictionary of fields which can be included in output
FIELDS = {
"Domain name": domain.name,
+ "Status": domain.get_state_display(),
+ "Expiration date": domain.expiration_date,
"Domain type": domain_type,
"Agency": domain_info.federal_agency,
"Organization name": domain_info.organization_name,
@@ -85,39 +88,27 @@ def parse_row(columns, domain_info: DomainInformation, security_emails_dict=None
"AO": domain_info.ao, # type: ignore
"AO email": domain_info.authorizing_official.email if domain_info.authorizing_official else " ",
"Security contact email": security_email,
- "Status": domain.get_state_display(),
- "Expiration date": domain.expiration_date,
"Created at": domain.created_at,
"First ready": domain.first_ready,
"Deleted": domain.deleted,
}
- # user_emails = [user.email for user in domain.permissions]
+ if get_domain_managers:
+ # Get each domain managers email and add to list
+ dm_emails = [dm.user.email for dm in domain.permissions.all()]
- # Dynamically add user emails to the FIELDS dictionary
- # for i, user_email in enumerate(user_emails, start=1):
- # FIELDS[f"User{i} email"] = user_email
+ # Set up the "matching header" + row field data
+ for i, dm_email in enumerate(dm_emails, start=1):
+ FIELDS[f"Domain manager email {i}"] = dm_email
row = [FIELDS.get(column, "") for column in columns]
return row
-def write_body(
- writer,
- columns,
- sort_fields,
- filter_condition,
-):
+def _get_security_emails(sec_contact_ids):
"""
- Receives params from the parent methods and outputs a CSV with fltered and sorted domains.
- Works with write_header as longas the same writer object is passed.
+ Retrieve security contact emails for the given security contact IDs.
"""
-
- # Get the domainInfos
- all_domain_infos = get_domain_infos(filter_condition, sort_fields)
-
- # Store all security emails to avoid epp calls or excessive filters
- sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True)
security_emails_dict = {}
public_contacts = (
PublicContact.objects.only("email", "domain__name")
@@ -133,24 +124,55 @@ def write_body(
else:
logger.warning("csv_export -> Domain was none for PublicContact")
- # all_user_nums = 0
- # for domain_info in all_domain_infos:
- # user_num = len(domain_info.domain.permissions)
- # all_user_nums.append(user_num)
+ return security_emails_dict
+
+
+def update_columns_with_domain_managers(columns, max_dm_count):
+ """
+ Update the columns list to include "Domain manager email {#}" headers
+ based on the maximum domain manager count.
+ """
+ for i in range(1, max_dm_count + 1):
+ columns.append(f"Domain manager email {i}")
+
- # if user_num > highest_user_nums:
- # highest_user_nums = user_num
+def write_csv(
+ writer,
+ columns,
+ sort_fields,
+ filter_condition,
+ get_domain_managers=False,
+ should_write_header=True,
+):
+ """
+ Receives params from the parent methods and outputs a CSV with fltered and sorted domains.
+ Works with write_header as longas the same writer object is passed.
+ get_domain_managers: Conditional bc we only use domain manager info for export_data_full_to_csv
+ should_write_header: Conditional bc export_data_growth_to_csv calls write_body twice
+ """
- # Build the header here passing to it highest_user_nums
+ all_domain_infos = get_domain_infos(filter_condition, sort_fields)
+
+ # Store all security emails to avoid epp calls or excessive filters
+ sec_contact_ids = all_domain_infos.values_list("domain__security_contact_registry_id", flat=True)
+
+ security_emails_dict = _get_security_emails(sec_contact_ids)
# Reduce the memory overhead when performing the write operation
paginator = Paginator(all_domain_infos, 1000)
+
+ if get_domain_managers and len(all_domain_infos) > 0:
+ # We want to get the max amont of domain managers an
+ # account has to set the column header dynamically
+ max_dm_count = max(len(domain_info.domain.permissions.all()) for domain_info in all_domain_infos)
+ update_columns_with_domain_managers(columns, max_dm_count)
+
for page_num in paginator.page_range:
page = paginator.page(page_num)
rows = []
for domain_info in page.object_list:
try:
- row = parse_row(columns, domain_info, security_emails_dict)
+ row = parse_row(columns, domain_info, security_emails_dict, get_domain_managers)
rows.append(row)
except ValueError:
# This should not happen. If it does, just skip this row.
@@ -158,7 +180,10 @@ def write_body(
logger.error("csv_export -> Error when parsing row, domain was None")
continue
- writer.writerows(rows)
+ if should_write_header:
+ write_header(writer, columns)
+
+ writer.writerows(rows)
def export_data_type_to_csv(csv_file):
@@ -168,6 +193,8 @@ def export_data_type_to_csv(csv_file):
# define columns to include in export
columns = [
"Domain name",
+ "Status",
+ "Expiration date",
"Domain type",
"Agency",
"Organization name",
@@ -176,9 +203,9 @@ def export_data_type_to_csv(csv_file):
"AO",
"AO email",
"Security contact email",
- "Status",
- "Expiration date",
+ # For domain manager we are pass it in as a parameter below in write_body
]
+
# Coalesce is used to replace federal_type of None with ZZZZZ
sort_fields = [
"organization_type",
@@ -193,8 +220,7 @@ def export_data_type_to_csv(csv_file):
Domain.State.ON_HOLD,
],
}
- write_header(writer, columns)
- write_body(writer, columns, sort_fields, filter_condition)
+ write_csv(writer, columns, sort_fields, filter_condition, get_domain_managers=True, should_write_header=True)
def export_data_full_to_csv(csv_file):
@@ -225,8 +251,7 @@ def export_data_full_to_csv(csv_file):
Domain.State.ON_HOLD,
],
}
- write_header(writer, columns)
- write_body(writer, columns, sort_fields, filter_condition)
+ write_csv(writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True)
def export_data_federal_to_csv(csv_file):
@@ -258,8 +283,7 @@ def export_data_federal_to_csv(csv_file):
Domain.State.ON_HOLD,
],
}
- write_header(writer, columns)
- write_body(writer, columns, sort_fields, filter_condition)
+ write_csv(writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True)
def get_default_start_date():
@@ -326,6 +350,12 @@ def export_data_growth_to_csv(csv_file, start_date, end_date):
"domain__deleted__gte": start_date_formatted,
}
- write_header(writer, columns)
- write_body(writer, columns, sort_fields, filter_condition)
- write_body(writer, columns, sort_fields_for_deleted_domains, filter_condition_for_deleted_domains)
+ write_csv(writer, columns, sort_fields, filter_condition, get_domain_managers=False, should_write_header=True)
+ write_csv(
+ writer,
+ columns,
+ sort_fields_for_deleted_domains,
+ filter_condition_for_deleted_domains,
+ get_domain_managers=False,
+ should_write_header=False,
+ )