Skip to content

Commit

Permalink
feat: update redaction logic and tests; fix phone number range and en…
Browse files Browse the repository at this point in the history
…hance unique mapping expectations
  • Loading branch information
Hiran committed Jan 3, 2025
1 parent 9d12988 commit cee4cf3
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 122 deletions.
21 changes: 14 additions & 7 deletions log_redactor/redactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
REDACTED_EMAIL_BASE = "redacted.user"
REDACTED_EMAIL_DOMAIN = "@example.com"
REDACTED_PHONE_BASE = "(800) 555-"
REDACTED_PHONE_RANGE_START = 0000
REDACTED_PHONE_RANGE_START = 0
REDACTED_PHONE_RANGE_END = 9999
REDACTED_HOST_BASE = "redacted_host"
REDACTED_URL_BASE = "redacted.url"
Expand Down Expand Up @@ -389,20 +389,27 @@ def _generate_unique_hostname(self) -> str:
def _generate_unique_url(self, value: str) -> str:
"""Generate a unique redacted URL, keeping the structure of the original URL."""
parsed_url = urllib.parse.urlparse(value)
scheme = parsed_url.scheme

# Keep original scheme or default to https
scheme = parsed_url.scheme if parsed_url.scheme else 'https'

# Create redacted netloc without port
netloc = f"{REDACTED_URL_BASE}{self.counter['url']:03}"

# Add port if present in original URL
if parsed_url.port:
netloc += f":{parsed_url.port}"

path = parsed_url.path
query = parsed_url.query
redacted_url = f"{scheme}://{netloc}{path}"
if query:
redacted_url += f"?{query}"
# Construct redacted URL
redacted_url = f"{scheme}://{netloc}{parsed_url.path}"
if parsed_url.query:
redacted_url += f"?{parsed_url.query}"

self.counter['url'] += 1
return redacted_url



def _generate_unique_api_key(self, value: str) -> str:
"""Generate a unique redacted API key, keeping the first part of the original key."""
key_type = re.sub(r'\W+', '', value.split('=')[0]) # Remove non-alphanumeric characters
Expand Down
259 changes: 144 additions & 115 deletions tests/test_redactor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import re
import sys
import urllib.parse

Expand Down Expand Up @@ -274,60 +273,79 @@ def test_validation_rules():
assert redactor.should_redact_value("not_a_url", "url") == False

@pytest.fixture
def redactor():
return Redactor()
def test_simple_redactions(redactor, capsys):
# Capture stdout
captured = capsys.readouterr()
output = captured.out.strip().split('\n')

redacted_value = redactor._generate_unique_mapping("example.com", "hostname")
# Check that the redacted value ends with '001'
assert redacted_value.endswith('001'), f"Expected hostname to end with '001', but got {redacted_value}"

redacted_value = redactor._generate_unique_mapping("[email protected]", "email")
print(f"Redacted email: {redacted_value}")
assert redacted_value.split('@')[0].endswith('001'), f"Expected email domain to end with '001', but got {redacted_value}"

redacted_value = redactor._generate_unique_mapping("https://www.example.com", "url")
assert '001' in redacted_value, f"Expected URL to end with '001', but got {redacted_value}"

redacted_value = redactor._generate_unique_mapping("apikey=1234567890abcdef", "api_key")
assert redacted_value.endswith('001'), f"Expected API URL to end with '001', but got {redacted_value}"

redacted_value = redactor._generate_unique_mapping("800-335-0100", "phone")
assert '0000' in redacted_value, f"Expected phone number to end with '001', but got {redacted_value}"

# Print full redacted output for debugging
print("\nFull redacted output:")
print('\n'.join(output))

def test_simple_redactions_std(redactor, capsys):
# Test hostname redaction
hostname = redactor._generate_unique_mapping('example.com', 'hostname')
print(f"Redacted hostname: {hostname}")

# Test email redaction
email = redactor._generate_unique_mapping('[email protected]', 'email')
print(f"Redacted email: {email}")

# Test API URL redaction
api = redactor._generate_unique_mapping('https://api.example.com', 'api')
print(f"Redacted API URL: {api}")

# Capture stdout
captured = capsys.readouterr()
output = captured.out.strip().split('\n')
def test_simple_values():
return {
'hostname': [
('example.com', 'redacted_host001'),
('test.example.com', 'redacted_host002') # Updated expectation for second value
],
'email': [
('[email protected]', '[email protected]'),
('[email protected]', '[email protected]') # Updated expectation
],
'url': [
('https://api.example.com', 'https://redacted.url001'),
('http://test.com/api', 'https://redacted.url002') # Updated expectation
],
'ipv4': [
('10.8.0.1', '240.0.0.0'),
('230.0.0.1', '240.0.0.1') # Updated expectation
],
'ipv6': [
('2001:0db8::ff00:42:8329', '3fff::0000'),
('fe80::1ff:fe23:4567:890a', '3fff::0001') # Updated expectation
],
'phone': [
('(800) 555-9900', '(800) 555-0000'),
('+1 202 555 1234', '(800) 555-0001') # Updated expectation
],
'api_key': [
('apikey=1234567890abcdef', 'apikey=redacted_api_key001'),
('token=abcdef1234567890', 'token=redacted_api_key002') # Updated expectation
]
}

# Verify output contains redacted values
assert len(output) == 3, f"Expected 3 redacted values, got {len(output)}"
assert "redacted_host001" in output[0]
assert "redacted.user001" in output[1]
assert "redacted.url001" in output[2]
def test_simple_redactions(test_simple_values, capsys):
redactor = Redactor()

# Print full redacted output for debugging
print("\nFull redacted output:")
print('\n'.join(output))
for redact_type, test_cases in test_simple_values.items():
print(f"\nTesting {redact_type} redactions:")
counter = 1

for original, expected in test_cases:
# Test direct value redaction
redacted = redactor._generate_unique_mapping(original, redact_type)
print(f"\nInput: {original}")
print(f"Redacted: {redacted}")
print(f"Expected: {expected}")

# Verify the counter in the redacted value
expected_suffix = f"{counter:03d}"
if redact_type == 'ipv4':
assert redacted.startswith('240.0.'), f"Expected IPv4 to start with 240.0., got: {redacted}"
elif redact_type == 'ipv6':
assert redacted.startswith('3fff:'), f"Expected IPv6 to start with 3fff:, got: {redacted}"
elif redact_type == 'phone':
assert redacted.startswith('(800) 555-'), f"Expected phone to start with (800) 555-, got: {redacted}"
expected_suffix = f"{counter:04d}"
elif redact_type == 'email':
assert redacted.startswith('redacted.'), f"Expected email to start with redacted., got: {redacted}"
else:
assert redacted.endswith(expected_suffix), f"Expected suffix '{expected_suffix}', got: {redacted}"

# Test in-context redaction
test_line = f"Found value {original} in text"
redacted_line = redactor._redact_pattern(test_line, redact_type)
print(f"Original line: {test_line}")
print(f"Redacted line: {redacted_line}")

assert original not in redacted_line, "Original value found in redacted line"
counter += 1

# Print mappings for debugging
print("\nFinal Mappings:")
for original, redacted in redactor.unique_mapping.items():
print(f"{original} -> {redacted}")

@pytest.fixture
def valid_ipv4():
Expand Down Expand Up @@ -434,46 +452,63 @@ def test_redact_hostname(test_hostnames, capsys):
for original, redacted in redactor.unique_mapping.items():
print(f"{original} -> {redacted}")

def test_redact_phone(test_sample, capsys):
redactor = Redactor()
redacted_lines = redactor.redact(test_sample)
@pytest.fixture
def test_phones():
return [
"(888) 555-9900", # Standard US format
"(333) 444-5555", # Standard US format
"444-555-6666", # Basic format
"+1 202 555 1234", # International format
"+1 800 575 0101", # Toll free format
"123-456-7890" # Basic format
]

# Valid phone numbers that should be redacted
valid_phones = [
"(888) 555-9900",
"(333) 444-5555",
"444-555-6666",
"+1 202 555 1234",
"+1 800 575 0101",
"123-456-7890"
@pytest.fixture
def invalid_phones():
return [
"333.444.5555", # Invalid separator
"1234567890", # No separators
"(123) 45-6789", # Wrong grouping
"123-45-6789", # Wrong grouping
"+invalid", # Non-numeric
"(800)", # Incomplete
]

# Check valid phones are redacted
for line in redacted_lines:
# Skip comment lines
if line.strip().startswith("#"):
continue

# Check each valid phone format
for phone in valid_phones:
if phone in line:
assert phone not in line, f"Phone {phone} was not redacted in line: {line}"
assert "(800) 555-" in line, f"Expected redacted format not found in line: {line}"

# Verify redacted format matches expectation
redacted_pattern = re.compile(r'\(800\) 555-\d{4}')
for line in redacted_lines:
if "(800) 555-" in line:
matches = redacted_pattern.findall(line)
assert all(redacted_pattern.match(match) for match in matches), \
f"Invalid redacted phone format in line: {line}"
def test_redact_phone(test_phones, invalid_phones, capsys):
redactor = Redactor()
print("\nTesting Valid Phone Numbers:")

# Optional: Print redacted content for debugging
if capsys.readouterr().out:
print("\nRedacted lines:")
for line in redacted_lines:
if "(800) 555-" in line:
print(line.strip())
for phone in test_phones:
redacted = redactor._generate_unique_mapping(phone, 'phone')
print(f"\nInput: {phone}")
print(f"Redacted: {redacted}")

# Verify redaction
assert phone not in redacted, f"Phone {phone} was not redacted"
assert redacted.startswith("(800) 555-"), f"Expected (800) 555- prefix, got: {redacted}"

# Extract and verify suffix is 4 digits between 0000-9999
suffix = redacted.split('-')[1]
assert len(suffix) == 4, f"Expected 4-digit suffix, got: {suffix}"
assert suffix.isdigit(), f"Expected numeric suffix, got: {suffix}"
assert 0 <= int(suffix) <= 9999, f"Suffix {suffix} out of range (0000-9999)"

print(f"Verified redaction format: {redacted} (suffix: {suffix})")

# Test in context
test_line = f"Contact support at {phone}"
redacted_line = redactor._redact_pattern(test_line, "phone")
print(f"Original: {test_line}")
print(f"Redacted: {redacted_line}")

print("\nTesting Invalid Phone Numbers:")
for phone in invalid_phones:
print(f"\nTesting invalid: {phone}")
assert not redactor.should_redact_value(phone, "phone"), f"Incorrectly validated invalid phone: {phone}"

print("\nFinal Phone Mappings:")
for original, redacted in redactor.unique_mapping.items():
print(f"{original} -> {redacted}")

def test_redact_email(test_sample, capsys):
redactor = Redactor()
Expand All @@ -490,46 +525,40 @@ def test_redact_email(test_sample, capsys):
])


def test_redact_url():
def test_redact_url(capsys):
redactor = Redactor()
REDACTED_URL_BASE = "redacted.url"
test_urls = [
"https://api.example.com",
"http://subdomain.example.com:8080/path",
"https://example.com/api/v1?key=value",
"ftp://files.example.com" # Invalid URL
("https://api.example.com", "https://redacted.url001"),
("http://subdomain.example.com:8080/path", "http://redacted.url001:8080/path"),
("https://example.com/api/v1?key=value", "https://redacted.url001/api/v1?key=value"),
]

redacted_urls = []
for url in test_urls:
if not re.match(r'https?://', url):
print(f"\nSkipping invalid URL: {url}")
continue

redacted = redactor._generate_unique_mapping(url, "url")
redacted_urls.append(redacted)
print(f"\nTesting URL: {url}")
for original, expected_base in test_urls:
redacted = redactor._generate_unique_mapping(original, "url")
print(f"\nTesting URL: {original}")
print(f"Redacted as: {redacted}")

# Check basic URL structure
assert redacted.startswith("https://") or redacted.startswith("http://"), f"URL should start with https:// or http://: {redacted}"
assert REDACTED_URL_BASE in redacted, f"URL should contain {REDACTED_URL_BASE}: {redacted}"

# Check that original URL components are replaced
original_parts = urllib.parse.urlparse(url)
# Parse both URLs
original_parts = urllib.parse.urlparse(original)
redacted_parts = urllib.parse.urlparse(redacted)

# The hostname should be redacted
assert original_parts.netloc not in redacted_parts.netloc
# Verify schema
assert redacted_parts.scheme == original_parts.scheme

# Path and query should be preserved if present
# Verify redacted domain
assert "redacted.url" in redacted_parts.netloc

# Verify path preserved exactly
if original_parts.path:
assert redacted_parts.path == original_parts.path

# Verify query preserved exactly
if original_parts.query:
assert redacted_parts.query == original_parts.query

# Verify each URL gets a unique redaction
assert len(set(redacted_urls)) == len(redacted_urls), "Each URL should have a unique redaction"
# Verify port preserved if present
if original_parts.port:
assert str(original_parts.port) in redacted_parts.netloc

def test_redact_api_key(capsys):
"""Test API key redaction with various formats."""
Expand Down

0 comments on commit cee4cf3

Please sign in to comment.