Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annual Limit daily delivered & failed notification counts and seeding implementation #333

Merged
merged 8 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/waffles/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
docopt==0.6.2
Flask==2.3.3
markupsafe==2.1.5
git+https://github.com/cds-snc/[email protected].6#egg=notifications-utils
git+https://github.com/cds-snc/[email protected].7#egg=notifications-utils
73 changes: 65 additions & 8 deletions notifications_utils/clients/redis/annual_limit.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
"""This module stores daily notification counts and annual limit statuses for a service in Redis."""
"""
This module stores daily notification counts and annual limit statuses for a service in Redis using a hash structure:


annual-limit: {
{service_id}: {
notifications: {
sms_delivered: int,
email_delivered: int,
sms_failed: int,
email_failed: int
},
status: {
near_sms_limit: Datetime,
near_email_limit: Datetime,
over_sms_limit: Datetime,
over_email_limit: Datetime
seeded_at: Datetime
}
}
}


"""

from datetime import datetime

Expand All @@ -9,13 +32,18 @@
SMS_FAILED = "sms_failed"
EMAIL_FAILED = "email_failed"

NOTIFICATIONS = [SMS_DELIVERED, EMAIL_DELIVERED, SMS_FAILED, EMAIL_FAILED]

NEAR_SMS_LIMIT = "near_sms_limit"
NEAR_EMAIL_LIMIT = "near_email_limit"
OVER_SMS_LIMIT = "over_sms_limit"
OVER_EMAIL_LIMIT = "over_email_limit"
SEEDED_AT = "seeded_at"

STATUSES = [NEAR_SMS_LIMIT, NEAR_EMAIL_LIMIT, OVER_SMS_LIMIT, OVER_EMAIL_LIMIT]


def notifications_key(service_id):
def annual_limit_notifications_key(service_id):
"""
Generates the Redis hash key for storing daily metrics of a service.
"""
Expand All @@ -29,11 +57,14 @@ def annual_limit_status_key(service_id):
return f"annual-limit:{service_id}:status"


def decode_byte_dict(dict: dict):
def decode_byte_dict(dict: dict, value_type=str):
"""
Redis-py returns byte strings for keys and values. This function decodes them to UTF-8 strings.
"""
return {key.decode("utf-8"): value.decode("utf-8") for key, value in dict.items()}
# Check if expected_value_type is one of the allowed types
if value_type not in {int, float, str}:
raise ValueError("expected_value_type must be int, float, or str")
return {key.decode("utf-8"): value_type(value.decode("utf-8")) for key, value in dict.items()}


class RedisAnnualLimit:
Expand All @@ -44,25 +75,51 @@ def init_app(self, app, *args, **kwargs):
pass

def increment_notification_count(self, service_id: str, field: str):
self._redis_client.increment_hash_value(notifications_key(service_id), field)
self._redis_client.increment_hash_value(annual_limit_notifications_key(service_id), field)

def get_notification_count(self, service_id: str, field: str):
"""
Retrieves the specified daily notification count for a service. (e.g. SMS_DELIVERED, EMAIL_FAILED, etc.)
"""
return int(self._redis_client.get_hash_field(notifications_key(service_id), field))
return int(self._redis_client.get_hash_field(annual_limit_notifications_key(service_id), field))

def get_all_notification_counts(self, service_id: str):
"""
Retrieves all daily notification metrics for a service.
"""
return decode_byte_dict(self._redis_client.get_all_from_hash(notifications_key(service_id)))
return decode_byte_dict(self._redis_client.get_all_from_hash(annual_limit_notifications_key(service_id)), int)

def reset_all_notification_counts(self, service_ids=None):
"""
Resets all daily notification metrics.
:param: service_ids: list of service_ids to reset, if None, resets all services
"""
hashes = (
annual_limit_notifications_key("*")
if not service_ids
else [annual_limit_notifications_key(service_id) for service_id in service_ids]
)

self._redis_client.delete_hash_fields(hashes=hashes)

def seed_annual_limit_notifications(self, service_id: str, mapping: dict):
self._redis_client.bulk_set_hash_fields(key=annual_limit_notifications_key(service_id), mapping=mapping)

def was_seeded_today(self, service_id):
last_seeded_time = self.get_seeded_at(service_id)
return last_seeded_time == datetime.utcnow().strftime("%Y-%m-%d") if last_seeded_time else False

def get_seeded_at(self, service_id: str):
return self._redis_client.get_hash_field(annual_limit_status_key(service_id), SEEDED_AT).decode("utf-8")

def set_seeded_at(self, service_id):
self._redis_client.set_hash_value(annual_limit_status_key(service_id), SEEDED_AT, datetime.utcnow().strftime("%Y-%m-%d"))

def clear_notification_counts(self, service_id: str):
"""
Clears all daily notification metrics for a service.
"""
self._redis_client.expire(notifications_key(service_id), -1)
self._redis_client.expire(annual_limit_notifications_key(service_id), -1)

def set_annual_limit_status(self, service_id: str, field: str, value: datetime):
"""
Expand Down
68 changes: 66 additions & 2 deletions notifications_utils/clients/redis/redis_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import numbers
import uuid
from time import time
from typing import Any, Dict
from typing import Any, Dict, Optional

from flask import current_app
from flask_redis import FlaskRedis
Expand Down Expand Up @@ -81,6 +81,64 @@ def delete_cache_keys_by_pattern(self, pattern):
return self.scripts["delete-keys-by-pattern"](args=[pattern])
return 0

# TODO: Refactor and simplify this to use HEXPIRE when we upgrade Redis to 7.4.0
def delete_hash_fields(self, hashes: (str | list), fields: Optional[list] = None, raise_exception=False):
"""Deletes fields from the specified hashes. if fields is `None`, then all fields from the hashes are deleted, deleting the hash entirely.

Args:
hashes (str|list): The hash pattern or list of hash keys to delete fields from.
fields (list): A list of fields to delete from the hashes. If `None`, then all fields are deleted.

Returns:
_type_: _description_
"""
if self.active:
try:
hashes = [prepare_value(h) for h in hashes] if isinstance(hashes, list) else prepare_value(hashes)
# When fields are passed in, use the list as is
# When hashes is a list, and no fields are passed in, fetch the fields from the first hash in the list
# otherwise we know we're going scan iterate over a pattern so we'll fetch the fields on the first pass in the loop below
fields = (
[prepare_value(f) for f in fields]
if fields is not None
else self.redis_store.hkeys(hashes[0])
if isinstance(hashes, list)
else None
)
# Use a pipeline to atomically delete fields from each hash.
pipe = self.redis_store.pipeline()
# if hashes is not a list, we're scan iterating over keys matching a pattern
for key in hashes if isinstance(hashes, list) else self.redis_store.scan_iter(hashes):
if not fields:
fields = self.redis_store.hkeys(key)
key = prepare_value(key)
pipe.hdel(key, *fields)
result = pipe.execute()
# TODO: May need to double check that the pipeline result count matches the number of hashes deleted
# and retry any failures
return result
except Exception as e:
self.__handle_exception(e, raise_exception, "expire_hash_fields", hashes)
return False

def bulk_set_hash_fields(self, mapping, pattern=None, key=None, raise_exception=False):
"""
Bulk set hash fields.
:param pattern: the pattern to match keys
:param mappting: the mapping of fields to set
:param raise_exception: True if we should allow the exception to bubble up
"""
if self.active:
try:
if pattern:
for key in self.redis_store.scan_iter(pattern):
self.redis_store.hmset(key, mapping)
if key:
return self.redis_store.hmset(key, mapping)
except Exception as e:
self.__handle_exception(e, raise_exception, "bulk_set_hash_fields", pattern)
return False

def exceeded_rate_limit(self, cache_key, limit, interval, raise_exception=False):
"""
Rate limiting.
Expand Down Expand Up @@ -235,10 +293,12 @@ def set_hash_value(self, key, field, value, raise_exception=False):

if self.active:
try:
self.redis_store.hset(key, field, value)
return self.redis_store.hset(key, field, value)
except Exception as e:
self.__handle_exception(e, raise_exception, "set_hash_value", key)

return None

def decrement_hash_value(self, key, value, raise_exception=False):
return self.increment_hash_value(key, value, raise_exception, incr_by=-1)

Expand Down Expand Up @@ -273,6 +333,8 @@ def get_all_from_hash(self, key, raise_exception=False):
except Exception as e:
self.__handle_exception(e, raise_exception, "get_all_from_hash", key)

return None

def set_hash_and_expire(self, key, values, expire_in_seconds, raise_exception=False):
key = prepare_value(key)
values = {prepare_value(k): prepare_value(v) for k, v in values.items()}
Expand All @@ -283,6 +345,8 @@ def set_hash_and_expire(self, key, values, expire_in_seconds, raise_exception=Fa
except Exception as e:
self.__handle_exception(e, raise_exception, "set_hash_and_expire", key)

return None

def expire(self, key, expire_in_seconds, raise_exception=False):
key = prepare_value(key)
if self.active:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "notifications-utils"
version = "52.3.6"
version = "52.3.7"
description = "Shared python code for Notification - Provides logging utils etc."
authors = ["Canadian Digital Service"]
license = "MIT license"
Expand Down
27 changes: 25 additions & 2 deletions tests/test_annual_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
SMS_DELIVERED,
SMS_FAILED,
RedisAnnualLimit,
annual_limit_notifications_key,
annual_limit_status_key,
notifications_key,
)
from notifications_utils.clients.redis.redis_client import RedisClient

Expand Down Expand Up @@ -79,7 +79,7 @@ def mocked_service_id():

def test_notifications_key(mocked_service_id):
expected_key = f"annual-limit:{mocked_service_id}:notifications"
assert notifications_key(mocked_service_id) == expected_key
assert annual_limit_notifications_key(mocked_service_id) == expected_key


def test_annual_limits_key(mocked_service_id):
Expand Down Expand Up @@ -127,6 +127,29 @@ def test_clear_notification_counts(mock_annual_limit_client, mock_notification_c
assert len(mock_annual_limit_client.get_all_notification_counts(mocked_service_id)) == 0


@pytest.mark.parametrize(
"service_ids",
[
[
str(uuid.uuid4()),
str(uuid.uuid4()),
str(uuid.uuid4()),
str(uuid.uuid4()),
]
],
)
def test_bulk_reset_notification_counts(mock_annual_limit_client, mock_notification_count_types, service_ids):
for service_id in service_ids:
for field in mock_notification_count_types:
mock_annual_limit_client.increment_notification_count(service_id, field)
assert len(mock_annual_limit_client.get_all_notification_counts(service_id)) == 4

mock_annual_limit_client.reset_all_notification_counts()

for service_id in service_ids:
assert len(mock_annual_limit_client.get_all_notification_counts(service_id)) == 0


def test_set_annual_limit_status(mock_annual_limit_client, mocked_service_id):
mock_annual_limit_client.set_annual_limit_status(mocked_service_id, NEAR_SMS_LIMIT, datetime.utcnow())
result = mock_annual_limit_client.get_annual_limit_status(mocked_service_id, NEAR_SMS_LIMIT)
Expand Down
Loading
Loading