Skip to content

Commit

Permalink
Security email tests / bug fixes
Browse files Browse the repository at this point in the history
Still running into racing test conditions... Works when you run TestRegistrantContacts on its own, but when running the entire file something is happening
  • Loading branch information
zandercymatics committed Sep 15, 2023
1 parent aec32ca commit c8eca67
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 37 deletions.
13 changes: 12 additions & 1 deletion src/registrar/models/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __set__(self, obj, value):
"""Called during set. Example: `domain.registrant = 'abc123'`."""
super().__set__(obj, value)
# always invalidate cache after sending updates to the registry
logger.debug("cache was invalidateds")
obj._invalidate_cache()

def __delete__(self, obj):
Expand Down Expand Up @@ -650,6 +651,13 @@ def isActive(self):

# Q: I don't like this function name much,
# what would be better here?
# Note for reviewers:
# This can likely be done without passing in
# contact_id and contact_type and instead embedding it inside of
# contact, but the tradeoff for that is that it unnecessarily complicates using this
# (as you'd have to create a custom dictionary), and type checking becomes weaker.
# I'm sure though that there is an easier alternative...
# TLDR: This doesn't look as pretty, but it makes using this function easier
def map_epp_contact_to_public_contact(
self, contact: eppInfo.InfoContactResultData, contact_id, contact_type
):
Expand Down Expand Up @@ -767,6 +775,7 @@ def generic_contact_getter(
# The contact type 'registrant' is stored under a different property
if contact_type_choice == PublicContact.ContactTypeChoices.REGISTRANT:
desired_property = "registrant"
logger.debug(f"generic domain getter was called. Wanting contacts on {contact_type_choice}")
contacts = self._get_property(desired_property)
if contact_type_choice == PublicContact.ContactTypeChoices.REGISTRANT:
contacts = [contacts]
Expand Down Expand Up @@ -873,6 +882,7 @@ def _get_or_create_domain(self):
while not exitEarly and count < 3:
try:
logger.info("Getting domain info from epp")
logger.debug(f"domain info name is... {self.__dict__}")
req = commands.InfoDomain(name=self.name)
domainInfo = registry.send(req, cleaned=True).res_data[0]
exitEarly = True
Expand Down Expand Up @@ -1195,6 +1205,7 @@ def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False):

def _invalidate_cache(self):
"""Remove cache data when updates are made."""
logger.debug(f"cache was cleared! {self.__dict__}")
self._cache = {}

def _get_property(self, property):
Expand All @@ -1206,7 +1217,7 @@ def _get_property(self, property):
)

if property in self._cache:
logger.debug("hit here also!!")
logger.debug(f"hit here also!! {property}")
logger.debug(self._cache[property])
return self._cache[property]
else:
Expand Down
18 changes: 7 additions & 11 deletions src/registrar/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ def __init__(
self.hosts = hosts
self.registrant = registrant

def dummyInfoContactResultData(id, email):
def dummyInfoContactResultData(id, email, cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), pw="thisisnotapassword"):
fake = info.InfoContactResultData(
id=id,
postal_info=common.PostalInfo(
Expand All @@ -575,12 +575,12 @@ def dummyInfoContactResultData(id, email):
voice="+1.8882820870",
fax="+1-212-9876543",
email=email,
auth_info=common.ContactAuthInfo(pw="thisisnotapassword"),
auth_info=common.ContactAuthInfo(pw=pw),
roid=...,
statuses=[],
cl_id=...,
cr_id=...,
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
cr_date=cr_date,
up_id=...,
up_date=...,
tr_date=...,
Expand All @@ -596,8 +596,8 @@ def dummyInfoContactResultData(id, email):
mockAdministrativeContact = dummyInfoContactResultData("administrativeContact", "[email protected]")
mockRegistrantContact = dummyInfoContactResultData("registrantContact", "[email protected]")
mockDataInfoDomain = fakedEppObject(
"fakepw",
cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35),
"lastPw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[common.DomainContact(contact="123", type="security")],
hosts=["fake.host.com"],
)
Expand All @@ -618,11 +618,9 @@ def dummyInfoContactResultData(id, email):
contacts=[],
hosts=["fake.host.com"],
)
mockDataInfoContact = fakedEppObject(
"anotherPw", cr_date=datetime.datetime(2023, 7, 25, 19, 45, 35)
)
mockDataInfoContact = dummyInfoContactResultData("123", "[email protected]", datetime.datetime(2023, 5, 25, 19, 45, 35), "lastPw")
mockDataInfoHosts = fakedEppObject(
"lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35)
"lastPw", cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35)
)

def mockSend(self, _request, cleaned):
Expand All @@ -639,8 +637,6 @@ def mockSend(self, _request, cleaned):
# Default contact return
mocked_result = self.mockDataInfoContact
# For testing contact types...
l = getattr(_request, "id", None)
logger.debug(f"get l'd {l}")
match getattr(_request, "id", None):
case "securityContact":
mocked_result = self.mockSecurityContact
Expand Down
189 changes: 164 additions & 25 deletions src/registrar/tests/test_models_domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
class TestDomainCache(MockEppLib):
def test_cache_sets_resets(self):
"""Cache should be set on getter and reset on setter calls"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
domain, _ = Domain.objects.get_or_create(name="freeman.gov")
# trigger getter
_ = domain.creation_date

domain._get_property("contacts")
# getter should set the domain cache with a InfoDomain object
# (see InfoDomainResult)
self.assertEquals(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
self.assertEquals(domain._cache["cr_date"], self.mockDataInfoDomain.cr_date)
self.assertEquals(domain._cache["auth_info"], self.InfoDomainWithContacts.auth_info)
self.assertEquals(domain._cache["cr_date"], self.InfoDomainWithContacts.cr_date)
self.assertFalse("avail" in domain._cache.keys())

# using a setter should clear the cache
Expand All @@ -47,10 +47,13 @@ def test_cache_sets_resets(self):
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoDomain(name="igorville.gov", auth_info=None),
commands.InfoDomain(name="freeman.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoContact(id="123", auth_info=None), cleaned=True),
call(commands.InfoContact(id='registrantContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='securityContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='administrativeContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='technicalContact', auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
]
)
Expand Down Expand Up @@ -80,30 +83,57 @@ def test_cache_used_when_avail(self):

def test_cache_nested_elements(self):
"""Cache works correctly with the nested objects cache and hosts"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
domain, _ = Domain.objects.get_or_create(name="freeman.gov")

# the cached contacts and hosts should be dictionaries of what is passed to them
expectedContactsDict = {
"id": self.mockDataInfoDomain.contacts[0].contact,
"type": self.mockDataInfoDomain.contacts[0].type,
"auth_info": self.mockDataInfoContact.auth_info,
"cr_date": self.mockDataInfoContact.cr_date,
}
self.maxDiff = None
# The contact list will initally contain objects of type 'DomainContact'
# this is then transformed into PublicContact, and cache should NOT
# hold onto the DomainContact object
expectedUnfurledContactsList = [
common.DomainContact(contact="securityContact", type="security"),
common.DomainContact(contact="administrativeContact", type="admin"),
common.DomainContact(contact="technicalContact", type="tech"),
]
expectedContactsList = [
domain.map_epp_contact_to_public_contact(
self.mockSecurityContact, "securityContact", "security"
),
domain.map_epp_contact_to_public_contact(
self.mockAdministrativeContact, "administrativeContact", "admin"
),
domain.map_epp_contact_to_public_contact(
self.mockTechnicalContact, "technicalContact", "tech"
),
]
expectedHostsDict = {
"name": self.mockDataInfoDomain.hosts[0],
"cr_date": self.mockDataInfoHosts.cr_date,
"name": self.InfoDomainWithContacts.hosts[0],
"cr_date": self.InfoDomainWithContacts.cr_date,
}

# this can be changed when the getter for contacts is implemented
domain._get_property("contacts")

# check domain info is still correct and not overridden
self.assertEqual(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
self.assertEqual(domain._cache["cr_date"], self.mockDataInfoDomain.cr_date)
self.assertEqual(domain._cache["auth_info"], self.InfoDomainWithContacts.auth_info)
self.assertEqual(domain._cache["cr_date"], self.InfoDomainWithContacts.cr_date)

# check contacts
self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts)
self.assertEqual(domain._cache["contacts"], [expectedContactsDict])
self.assertEqual(domain._cache["_contacts"], self.InfoDomainWithContacts.contacts)
# The contact list should not contain what is sent by the registry by default,
# as _fetch_cache will transform the type to PublicContact
self.assertNotEqual(domain._cache["contacts"], expectedUnfurledContactsList)
# Assert that what we get from cache is inline with our mock
# Since our cache creates new items inside of our contact list,
# as we need to map DomainContact -> PublicContact, our mocked items
# will point towards a different location in memory (as they are different objects).
# This should be a problem only exclusive to our mocks, since we are not
# replicating the same item twice outside this context. That said, we want to check
# for data integrity, but do not care if they are of the same _state or not
for cached_contact, expected_contact in zip(domain._cache["contacts"], expectedContactsList):
self.assertEqual(
{k: v for k, v in vars(cached_contact).items() if k != '_state'},
{k: v for k, v in vars(expected_contact).items() if k != '_state'}
)

# get and check hosts is set correctly
domain._get_property("hosts")
Expand Down Expand Up @@ -207,6 +237,7 @@ def tearDown(self):
DomainInformation.objects.all().delete()
DomainApplication.objects.all().delete()
Domain.objects.all().delete()
self.domain._cache = {}
# self.contactMailingAddressPatch.stop()
# self.createContactPatch.stop()

Expand Down Expand Up @@ -468,28 +499,94 @@ def test_epp_public_contact_mapper(self):
def test_contact_getter_security(self):
domain_contacts, _ = Domain.objects.get_or_create(name="freeman.gov")

self.maxDiff = None
security = PublicContact.get_default_security()
security.email = "[email protected]"
security.domain = domain_contacts
security.save()
domain_contacts.security_contact = security

expected_security_contact = security
expected_security_contact = domain_contacts.map_epp_contact_to_public_contact(
self.mockSecurityContact, "securityContact", "security"
)


contact_dict = domain_contacts.security_contact.__dict__
expected_dict = expected_security_contact.__dict__

contact_dict.pop('_state')
expected_dict.pop('_state')

self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoDomain(name="freeman.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoContact(id='registrantContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='securityContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='administrativeContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='technicalContact', auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
]
)

self.assertEqual(contact_dict, expected_dict)

def test_setter_getter_security_email(self):
domain_contacts, _ = Domain.objects.get_or_create(name="freeman.gov")

expected_security_contact = domain_contacts.map_epp_contact_to_public_contact(
self.mockSecurityContact, "securityContact", "security"
)

domain_contacts.security_contact = security

contact_dict = domain_contacts.security_contact.__dict__
expected_dict = expected_security_contact.__dict__

contact_dict.pop('_state')
expected_dict.pop('_state')

# Getter functions properly...
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoDomain(name="freeman.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoContact(id='registrantContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='securityContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='administrativeContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='technicalContact', auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
]
)

self.assertEqual(contact_dict, expected_dict)


# Setter functions properly...
domain_contacts.security_contact.email = "[email protected]"
expected_security_contact.email = "[email protected]"
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoDomain(name="freeman.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoContact(id='registrantContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='securityContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='administrativeContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='technicalContact', auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
]
)
self.assertEqual(domain_contacts.security_contact.email, expected_security_contact.email)

@skip("not implemented yet")
def test_setter_getter_security_email_mock_user(self):
# TODO - grab the HTML content of the page,
# and verify that things have changed as expected
raise

def test_contact_getter_technical(self):
domain_contacts, _ = Domain.objects.get_or_create(name="freeman.gov")

Expand All @@ -514,6 +611,20 @@ def test_contact_getter_technical(self):
contact_dict.pop('_state')
expected_dict.pop('_state')

self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoDomain(name="freeman.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoContact(id='registrantContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='securityContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='administrativeContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='technicalContact', auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
]
)

self.assertEqual(contact_dict, expected_dict)

def test_contact_getter_administrative(self):
Expand All @@ -537,6 +648,20 @@ def test_contact_getter_administrative(self):
contact_dict.pop('_state')
expected_dict.pop('_state')

self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoDomain(name="freeman.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoContact(id='registrantContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='securityContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='administrativeContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='technicalContact', auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
]
)

self.assertEqual(contact_dict, expected_dict)

def test_contact_getter_registrant(self):
Expand All @@ -562,6 +687,20 @@ def test_contact_getter_registrant(self):
contact_dict.pop('_state')
expected_dict.pop('_state')

self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoDomain(name="freeman.gov", auth_info=None),
cleaned=True,
),
call(commands.InfoContact(id='registrantContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='securityContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='administrativeContact', auth_info=None), cleaned=True),
call(commands.InfoContact(id='technicalContact', auth_info=None), cleaned=True),
call(commands.InfoHost(name="fake.host.com"), cleaned=True),
]
)

self.assertEqual(contact_dict, expected_dict)


Expand Down

0 comments on commit c8eca67

Please sign in to comment.