Skip to content

Commit

Permalink
Func to find service annual stats and send an email
Browse files Browse the repository at this point in the history
  • Loading branch information
jzbahrai committed Nov 14, 2024
1 parent 7ff60f6 commit af27ac6
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 3 deletions.
74 changes: 73 additions & 1 deletion app/celery/reporting_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@
from app import notify_celery
from app.config import QueueNames
from app.cronitor import cronitor
from app.dao.annual_limits_data_dao import get_previous_quarter, insert_quarter_data
from app.dao.annual_limits_data_dao import (
fetch_quarter_cummulative_stats,
get_all_quarters,
get_previous_quarter,
insert_quarter_data,
)
from app.dao.fact_billing_dao import fetch_billing_data_for_day, update_fact_billing
from app.dao.fact_notification_status_dao import (
fetch_notification_status_for_day,
fetch_quarter_data,
update_fact_notification_status,
)
from app.dao.users_dao import get_services_for_all_users
from app.models import Service
from app.user.rest import send_annual_usage_data


@notify_celery.task(name="create-nightly-billing")
Expand Down Expand Up @@ -177,3 +184,68 @@ def insert_quarter_data_for_annual_limits(process_day):
start_date, end_date, chunk, e
)
)


def _create_quarterly_email_markdown_list(service_info, service_ids, cummulative_data_dict):
"""
This function creates a markdown list of the service names and their email and sms usage
Example:
## Notify
Emails: you’ve sent 5,000 out of 100,000 (5%)
Text messages: you’ve sent 1,000 out of 2,000 (50%)
"""
markdown_list_en = ""
markdown_list_fr = ""
for service_id in service_ids:
service_data = cummulative_data_dict.get(str(service_id), {})
service_name, email_annual_limit, sms_annual_limit = service_info[service_id]
email_count = service_data.get("email", 0)
sms_count = service_data.get("sms", 0)

markdown_list_en += f"## {service_name} \n"
markdown_list_fr += f"## {service_name} \n"
if email_count:
email_percentage = round(float(email_count / email_annual_limit), 2) * 100
markdown_list_en += f"you've sent {email_count} out of {email_annual_limit} {email_percentage}\n"
if sms_count:
sms_percentage = round(float(sms_count / sms_annual_limit), 2) * 100
markdown_list_en += f"Text messages: you've sent {sms_count} out of {sms_annual_limit} ({sms_percentage}%)\n"

markdown_list_en += "\n"
markdown_list_fr += "\n"
return markdown_list_en, markdown_list_fr


@notify_celery.task(name="send-quarterly-email")
@statsd(namespace="tasks")
def send_quarter_email(process_date):
service_info = {x.id: (x.name, x.email_annual_limit, x.sms_annual_limit) for x in Service.query.all()}

user_service_array = get_services_for_all_users()
quarters_list = get_all_quarters(process_date)
chunk_size = 50
iter_user_service_array = iter(user_service_array)

while True:
chunk = list(islice(iter_user_service_array, chunk_size))

if not chunk:
current_app.logger.info("send_quarter_email completed {} ".format(datetime.now(timezone.utc).date()))
break

try:
all_service_ids = set()
for _, _, services in chunk:
all_service_ids.update(services)
all_service_ids = list(all_service_ids)
cummulative_data = fetch_quarter_cummulative_stats(quarters_list, all_service_ids)
cummulative_data_dict = {str(c_data_id): c_data for c_data_id, c_data in cummulative_data}
for user_id, email_address, service_ids in chunk:
markdown_list_en, markdown_list_fr = _create_quarterly_email_markdown_list(
service_info, service_ids, cummulative_data_dict
)
send_annual_usage_data(user_id, email_address, markdown_list_en, markdown_list_fr)
except Exception as e:
current_app.logger.error("send_quarter_email task failed for for user {} . Error: {}".format(user_id, e))
continue
52 changes: 52 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,15 @@ class Config(object):
NEAR_DAILY_EMAIL_LIMIT_TEMPLATE_ID = "9aa60ad7-2d7f-46f0-8cbe-2bac3d4d77d8"
REACHED_DAILY_EMAIL_LIMIT_TEMPLATE_ID = "ee036547-e51b-49f1-862b-10ea982cfceb"
DAILY_EMAIL_LIMIT_UPDATED_TEMPLATE_ID = "97dade64-ea8d-460f-8a34-900b74ee5eb0"

REACHED_ANNUAL_SMS_LIMIT_TEMPLATE_ID = "ca6d9205-d923-4198-acdd-d0aa37725c37"
ANNUAL_SMS_LIMIT_UPDATED_TEMPLATE_ID = "8381fdc3-95ad-4219-b07c-93aa808b67fa"
NEAR_ANNUAL_SMS_LIMIT_TEMPLATE_ID = "1a7a1f01-7fd0-43e5-93a4-982e25a78816"
REACHED_ANNUAL_EMAIL_LIMIT_TEMPLATE_ID = "d27cd502-4e92-4aae-b55f-758fd22f0e79"
ANNUAL_EMAIL_LIMIT_UPDATED_TEMPLATE_ID = "47e764eb-b313-4833-a4a7-ae920a8f52f3"
NEAR_ANNUAL_EMAIL_LIMIT_TEMPLATE_ID = "b04123de-5cff-486d-903c-8afe7219950d"
ANNUAL_LIMIT_QUARTERLY_USAGE_TEMPLATE_ID = "f66a1025-17f5-471c-a7ab-37d6b9e9d304"

APIKEY_REVOKE_TEMPLATE_ID = "a0a4e7b8-8a6a-4eaa-9f4e-9c3a5b2dbcf3"
HEARTBEAT_TEMPLATE_EMAIL_LOW = "73079cb9-c169-44ea-8cf4-8d397711cc9d"
HEARTBEAT_TEMPLATE_EMAIL_MEDIUM = "c75c4539-3014-4c4c-96b5-94d326758a74"
Expand Down Expand Up @@ -496,6 +505,49 @@ class Config(object):
"schedule": crontab(hour=9, minute=0), # 4:00 EST in UTC
"options": {"queue": QueueNames.PERIODIC},
},
# quarterly queue
"insert-quarter-data-for-annual-limits-q1": {
"task": "insert-quarter-data-for-annual-limits",
"schedule": crontab(
minute=0, hour=23, day_of_month=1, month_of_year=7
), # Running this at the end of the day on 1st July
"options": {"queue": QueueNames.PERIODIC},
},
"insert-quarter-data-for-annual-limits-q2": {
"task": "insert-quarter-data-for-annual-limits",
"schedule": crontab(
minute=0, hour=23, day_of_month=1, month_of_year=10
), # Running this at the end of the day on 1st Oct
"options": {"queue": QueueNames.PERIODIC},
},
"insert-quarter-data-for-annual-limits-q3": {
"task": "insert-quarter-data-for-annual-limits",
"schedule": crontab(
minute=0, hour=23, day_of_month=1, month_of_year=1
), # Running this at the end of the day on 1st Jan
"options": {"queue": QueueNames.PERIODIC},
},
"send-quarterly-email-q1": {
"task": "send-quarterly-email",
"schedule": crontab(
minute=0, hour=23, day_of_month=2, month_of_year=7
), # Running this at the end of the day on 2nd July
"options": {"queue": QueueNames.PERIODIC},
},
"send-quarterly-email-q2": {
"task": "send-quarterly-email",
"schedule": crontab(
minute=0, hour=23, day_of_month=2, month_of_year=10
), # Running this at the end of the day on 2nd Oct
"options": {"queue": QueueNames.PERIODIC},
},
"send-quarterly-email-q3": {
"task": "send-quarterly-email",
"schedule": crontab(
minute=0, hour=23, day_of_month=3, month_of_year=1
), # Running this at the end of the day on 2nd Jan
"options": {"queue": QueueNames.PERIODIC},
},
}
CELERY_QUEUES: List[Any] = []
CELERY_DELIVER_SMS_RATE_LIMIT = os.getenv("CELERY_DELIVER_SMS_RATE_LIMIT", "1/s")
Expand Down
45 changes: 45 additions & 0 deletions app/dao/annual_limits_data_dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime

from sqlalchemy import func
from sqlalchemy.dialects.postgresql import insert

from app import db
Expand Down Expand Up @@ -36,6 +37,20 @@ def get_previous_quarter(date_to_check):
return quarter_name, (start_date, end_date)


def get_all_quarters(process_day):
previous_quarter, _ = get_previous_quarter(process_day)
quarter, year = previous_quarter.split("-")

quarter_mapping = {
"Q1": [f"Q1-{year}"],
"Q2": [f"Q1-{year}", f"Q2-{year}"],
"Q3": [f"Q1-{year}", f"Q2-{year}", f"Q3-{year}"],
"Q4": [f"Q1-{year}", f"Q2-{year}", f"Q3-{year}", f"Q4-{year}"],
}

return quarter_mapping[quarter]


def insert_quarter_data(data, quarter, service_info):
"""
Insert data for each quarter into the database.
Expand Down Expand Up @@ -70,3 +85,33 @@ def insert_quarter_data(data, quarter, service_info):
)
db.session.connection().execute(stmt)
db.session.commit()


def fetch_quarter_cummulative_stats(quarters, service_ids):
"""
Fetch notification status data for a list of quarters and service_ids.
This function returns a list of namedtuples with the following fields:
- service_id,
- notification_type,
- notification_count
"""
subquery = (
db.session.query(
AnnualLimitsData.service_id,
AnnualLimitsData.notification_type,
func.sum(AnnualLimitsData.notification_count).label("notification_count"),
)
.filter(AnnualLimitsData.service_id.in_(service_ids), AnnualLimitsData.time_period.in_(quarters))
.group_by(AnnualLimitsData.service_id, AnnualLimitsData.notification_type)
.subquery()
)

return (
db.session.query(
subquery.c.service_id,
func.json_object_agg(subquery.c.notification_type, subquery.c.notification_count).label("notification_counts"),
)
.group_by(subquery.c.service_id)
.all()
)
24 changes: 23 additions & 1 deletion app/dao/users_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from app.dao.permissions_dao import permission_dao
from app.dao.service_user_dao import dao_get_service_users_by_user_id
from app.errors import InvalidRequest
from app.models import EMAIL_AUTH_TYPE, User, VerifyCode
from app.models import EMAIL_AUTH_TYPE, Service, ServiceUser, User, VerifyCode
from app.utils import escape_special_characters


Expand Down Expand Up @@ -200,3 +200,25 @@ def user_can_be_archived(user):
def get_archived_email_address(email_address):
date = datetime.utcnow().strftime("%Y-%m-%d")
return "_archived_{}_{}".format(date, email_address)


def get_services_for_all_users():
"""
Return (user_id, email_address, [service_id1, service_id2]...] for all users
where both the user and the service are active.
"""
result = (
db.session.query(
User.id.label("user_id"),
User.email_address.label("email_address"),
func.array_agg(Service.id).label("service_ids"),
)
.join(ServiceUser, User.id == ServiceUser.user_id)
.join(Service, Service.id == ServiceUser.service_id)
.filter(User.state == "active", Service.active.is_(True), Service.restricted.is_(False), Service.research_mode.is_(False))
.group_by(User.id, User.email_address)
.all()
)

return result
31 changes: 31 additions & 0 deletions app/user/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,3 +886,34 @@ def _update_alert(user_to_update, changes=None):
)

send_notification_to_queue(saved_notification, False, queue=QueueNames.NOTIFY)


def send_annual_usage_data(user_id, email_address, markdown_en, markdown_fr):
"""
We are sending a notification to the user to inform them that their annual usage
per service.
"""
user = get_user_by_id(user_id=user_id)
template = dao_get_template_by_id(current_app.config["ANNUAL_LIMIT_QUARTERLY_USAGE_TEMPLATE_ID"])
service = Service.query.get(current_app.config["NOTIFY_SERVICE_ID"])

saved_notification = persist_notification(
template_id=template.id,
template_version=template.version,
recipient=email_address,
service=service,
personalisation={
"name": user.name,
"markdown_en": markdown_en,
"markdown_fr": markdown_fr,
},
notification_type=template.template_type,
api_key_id=None,
key_type=KEY_TYPE_NORMAL,
reply_to_text=service.get_default_reply_to_email_address(),
)

send_notification_to_queue(saved_notification, False, queue=QueueNames.NOTIFY)

return jsonify({}), 204
29 changes: 29 additions & 0 deletions tests/app/celery/test_reporting_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
create_nightly_notification_status,
create_nightly_notification_status_for_day,
insert_quarter_data_for_annual_limits,
send_quarter_email,
)
from app.dao.fact_billing_dao import get_rate
from app.models import (
Expand Down Expand Up @@ -605,3 +606,31 @@ def test_insert_quarter_data(self, notify_db_session):
# Data for Q1 2018
insert_quarter_data_for_annual_limits(datetime(2018, 7, 1))
assert AnnualLimitsData.query.filter_by(service_id=service_1.id, time_period="Q1-2018").first().notification_count == 10


class TestSendQuarterEmail:
def test_send_quarter_email(self, sample_user, mocker, notify_db_session):
service_1 = create_service(service_name="service_1")
service_2 = create_service(service_name="service_2")

create_ft_notification_status(date(2018, 1, 1), "sms", service_1, count=4)
create_ft_notification_status(date(2018, 5, 2), "sms", service_1, count=10)
create_ft_notification_status(date(2018, 3, 20), "sms", service_2, count=100)
create_ft_notification_status(date(2018, 2, 1), "sms", service_2, count=1000)

# Data for Q4 2017
insert_quarter_data_for_annual_limits(datetime(2018, 4, 1))

service_1.users = [sample_user]
service_2.users = [sample_user]
send_mock = mocker.patch("app.celery.reporting_tasks.send_annual_usage_data")

markdown_list_en = "## service_1 \nText messages: you've sent 4 out of 25000 (0.0%)\n\n## service_2 \nText messages: you've sent 1100 out of 25000 (4.0%)\n\n"
markdown_list_fr = "## service_1 \n\n## service_2 \n\n"
send_quarter_email(datetime(2018, 4, 1))
assert send_mock.call_args(
sample_user.id,
sample_user.email_address,
markdown_list_en,
markdown_list_fr,
)
32 changes: 31 additions & 1 deletion tests/app/dao/test_annual_limits_data_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

import pytest

from app.dao.annual_limits_data_dao import get_previous_quarter, insert_quarter_data
from app.dao.annual_limits_data_dao import (
fetch_quarter_cummulative_stats,
get_previous_quarter,
insert_quarter_data,
)
from app.models import AnnualLimitsData, Service
from tests.app.db import create_service

Expand Down Expand Up @@ -53,3 +57,29 @@ def test_insert_quarter_data(self, notify_db_session):
service_info,
)
assert AnnualLimitsData.query.filter_by(service_id=service_2.id).first().notification_count == 500


class TestFetchCummulativeStats:
def test_fetch_quarter_cummulative_stats(self, notify_db_session):
service_1 = create_service(service_name="service_1")
service_2 = create_service(service_name="service_2")

service_info = {x.id: (x.email_annual_limit, x.sms_annual_limit) for x in Service.query.all()}
NotificationData = namedtuple("NotificationData", ["service_id", "notification_type", "notification_count"])

data = [
NotificationData(service_1.id, "sms", 4),
NotificationData(service_2.id, "sms", 1100),
NotificationData(service_1.id, "email", 2),
]
insert_quarter_data(data, "Q1-2018", service_info)

data2 = [NotificationData(service_1.id, "sms", 5)]
insert_quarter_data(data2, "Q2-2018", service_info)

result = fetch_quarter_cummulative_stats(["Q1-2018", "Q2-2018"], [service_1.id, service_2.id])
for service_id, counts in result:
if service_id == service_1.id:
assert counts == {"sms": 9, "email": 2}
if service_id == service_2.id:
assert counts == {"sms": 1100}
Loading

0 comments on commit af27ac6

Please sign in to comment.