Skip to content

Commit

Permalink
Batch up job archiving - not crashy this time (#2143)
Browse files Browse the repository at this point in the history
  • Loading branch information
sastels authored Mar 20, 2024
1 parent 604ec61 commit 1d576ce
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 51 deletions.
19 changes: 17 additions & 2 deletions app/aws/s3.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import uuid
from datetime import datetime, timedelta
from typing import List

import botocore
import pytz
from boto3 import client, resource
from flask import current_app
from notifications_utils.s3 import s3upload as utils_s3upload

from app.models import Job

FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv"


Expand Down Expand Up @@ -60,8 +63,20 @@ def get_job_metadata_from_s3(service_id, job_id):
return obj.get()["Metadata"]


def remove_job_from_s3(service_id, job_id):
return remove_s3_object(*get_job_location(service_id, job_id))
def remove_jobs_from_s3(jobs: List[Job], batch_size=1000):
"""
Remove the files from S3 for the given jobs.
Args:
jobs (List[Job]): The jobs whose files need to be removed from S3.
batch_size (int, optional): The number of jobs to process in each boto call. Defaults to the AWS maximum of 1000.
"""

bucket = resource("s3").Bucket(current_app.config["CSV_UPLOAD_BUCKET_NAME"])

for start in range(0, len(jobs), batch_size):
object_keys = [FILE_LOCATION_STRUCTURE.format(job.service_id, job.id) for job in jobs[start : start + batch_size]]
bucket.delete_objects(Delete={"Objects": [{"Key": key} for key in object_keys]})


def get_s3_bucket_objects(bucket_name, subfolder="", older_than=7, limit_days=2):
Expand Down
33 changes: 22 additions & 11 deletions app/celery/nightly_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
from typing import List

import pytz
from flask import current_app
Expand All @@ -12,7 +13,7 @@
from app.config import QueueNames
from app.cronitor import cronitor
from app.dao.inbound_sms_dao import delete_inbound_sms_older_than_retention
from app.dao.jobs_dao import dao_archive_job, dao_get_jobs_older_than_data_retention
from app.dao.jobs_dao import dao_archive_jobs, dao_get_jobs_older_than_data_retention
from app.dao.notifications_dao import (
dao_timeout_notifications,
delete_notifications_older_than_retention_by_type,
Expand All @@ -37,27 +38,37 @@
@notify_celery.task(name="remove_sms_email_jobs")
@cronitor("remove_sms_email_jobs")
@statsd(namespace="tasks")
def remove_sms_email_csv_files():
_remove_csv_files([EMAIL_TYPE, SMS_TYPE])
def remove_sms_email_jobs():
"""
Remove csv files from s3 and archive email and sms jobs older than data retention period.
"""

_archive_jobs([EMAIL_TYPE, SMS_TYPE])


@notify_celery.task(name="remove_letter_jobs")
@cronitor("remove_letter_jobs")
@statsd(namespace="tasks")
def remove_letter_csv_files():
_remove_csv_files([LETTER_TYPE])
def remove_letter_jobs():
_archive_jobs([LETTER_TYPE])


def _archive_jobs(job_types: List[str]):
"""
Remove csv files from s3 and archive jobs older than data retention period.
Args:
job_types (List[str]): list of job types to remove csv files and archive jobs for
"""

def _remove_csv_files(job_types):
while True:
jobs = dao_get_jobs_older_than_data_retention(notification_types=job_types, limit=20000)
jobs = dao_get_jobs_older_than_data_retention(notification_types=job_types, limit=100)
if len(jobs) == 0:
break
current_app.logger.info("Archiving {} jobs.".format(len(jobs)))
for job in jobs:
s3.remove_job_from_s3(job.service_id, job.id)
dao_archive_job(job)
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
s3.remove_jobs_from_s3(jobs)
dao_archive_jobs(jobs)
current_app.logger.info(f"Jobs archived: {[job.id for job in jobs]}")


@notify_celery.task(name="delete-sms-notifications")
Expand Down
15 changes: 11 additions & 4 deletions app/dao/jobs_dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import uuid
from datetime import datetime, timedelta
from typing import Iterable

from flask import current_app
from notifications_utils.letter_timings import (
Expand Down Expand Up @@ -71,9 +72,15 @@ def dao_get_job_by_id(job_id) -> Job:
return Job.query.filter_by(id=job_id).one()


def dao_archive_job(job):
job.archived = True
db.session.add(job)
def dao_archive_jobs(jobs: Iterable[Job]):
"""
Archive the given jobs.
Args:
jobs (Iterable[Job]): The jobs to archive.
"""
for job in jobs:
job.archived = True
db.session.add(job)
db.session.commit()


Expand Down Expand Up @@ -148,7 +155,7 @@ def dao_get_jobs_older_than_data_retention(notification_types, limit=None):
.order_by(desc(Job.created_at))
)
if limit:
query = query.limit(limit)
query = query.limit(limit - len(jobs))
jobs.extend(query.all())

end_date = today - timedelta(days=7)
Expand Down
30 changes: 29 additions & 1 deletion tests/app/aws/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import uuid
from datetime import datetime, timedelta
from unittest.mock import call
from unittest.mock import Mock, call

import pytest
import pytz
Expand All @@ -12,6 +12,7 @@
get_list_of_files_by_suffix,
get_s3_bucket_objects,
get_s3_file,
remove_jobs_from_s3,
remove_transformed_dvla_file,
upload_job_to_s3,
)
Expand Down Expand Up @@ -214,3 +215,30 @@ def test_upload_job_to_s3(notify_api, mocker):
bucket_name=current_app.config["CSV_UPLOAD_BUCKET_NAME"],
file_location=f"service-{service_id}-notify/{upload_id}.csv",
)


def test_remove_jobs_from_s3(notify_api, mocker):
mock = Mock()
mocker.patch("app.aws.s3.resource", return_value=mock)
jobs = [
type("Job", (object,), {"service_id": "foo", "id": "j1"}),
type("Job", (object,), {"service_id": "foo", "id": "j2"}),
type("Job", (object,), {"service_id": "foo", "id": "j3"}),
type("Job", (object,), {"service_id": "foo", "id": "j4"}),
type("Job", (object,), {"service_id": "foo", "id": "j5"}),
]

remove_jobs_from_s3(jobs, batch_size=2)

mock.assert_has_calls(
[
call.Bucket(current_app.config["CSV_UPLOAD_BUCKET_NAME"]),
call.Bucket().delete_objects(
Delete={"Objects": [{"Key": "service-foo-notify/j1.csv"}, {"Key": "service-foo-notify/j2.csv"}]}
),
call.Bucket().delete_objects(
Delete={"Objects": [{"Key": "service-foo-notify/j3.csv"}, {"Key": "service-foo-notify/j4.csv"}]}
),
call.Bucket().delete_objects(Delete={"Objects": [{"Key": "service-foo-notify/j5.csv"}]}),
]
)
43 changes: 17 additions & 26 deletions tests/app/celery/test_nightly_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
delete_sms_notifications_older_than_retention,
letter_raise_alert_if_no_ack_file_for_zip,
raise_alert_if_letter_notifications_still_sending,
remove_letter_csv_files,
remove_sms_email_csv_files,
remove_letter_jobs,
remove_sms_email_jobs,
remove_transformed_dvla_files,
s3,
send_daily_performance_platform_stats,
Expand Down Expand Up @@ -72,11 +72,11 @@ def mock_s3_get_list_diff(bucket_name, subfolder="", suffix="", last_modified=No


@freeze_time("2016-10-18T10:00:00")
def test_will_remove_csv_files_for_jobs_older_than_seven_days(notify_db, notify_db_session, mocker, sample_template):
def test_will_archive_jobs_older_than_seven_days(notify_db, notify_db_session, mocker, sample_template):
"""
Jobs older than seven days are deleted, but only two day's worth (two-day window)
"""
mocker.patch("app.celery.nightly_tasks.s3.remove_job_from_s3")
mocker.patch("app.celery.nightly_tasks.s3.remove_jobs_from_s3")

seven_days_ago = datetime.utcnow() - timedelta(days=7)
just_under_seven_days = seven_days_ago + timedelta(seconds=1)
Expand All @@ -91,22 +91,20 @@ def test_will_remove_csv_files_for_jobs_older_than_seven_days(notify_db, notify_
dont_delete_me_1 = create_job(sample_template, created_at=seven_days_ago)
create_job(sample_template, created_at=just_under_seven_days)

remove_sms_email_csv_files()
remove_sms_email_jobs()

assert s3.remove_job_from_s3.call_args_list == [
call(job1_to_delete.service_id, job1_to_delete.id),
call(job2_to_delete.service_id, job2_to_delete.id),
]
args = s3.remove_jobs_from_s3.call_args.args[0]
assert sorted(args, key=lambda x: x.id) == sorted([job1_to_delete, job2_to_delete], key=lambda x: x.id)
assert job1_to_delete.archived is True
assert dont_delete_me_1.archived is False


@freeze_time("2016-10-18T10:00:00")
def test_will_remove_csv_files_for_jobs_older_than_retention_period(notify_db, notify_db_session, mocker):
def test_will_archive_jobs_older_than_retention_period(notify_db, notify_db_session, mocker):
"""
Jobs older than retention period are deleted, but only two day's worth (two-day window)
"""
mocker.patch("app.celery.nightly_tasks.s3.remove_job_from_s3")
mocker.patch("app.celery.nightly_tasks.s3.remove_jobs_from_s3")
service_1 = create_service(service_name="service 1")
service_2 = create_service(service_name="service 2")
create_service_data_retention(service=service_1, notification_type=SMS_TYPE, days_of_retention=3)
Expand All @@ -129,22 +127,17 @@ def test_will_remove_csv_files_for_jobs_older_than_retention_period(notify_db, n
job3_to_delete = create_job(email_template_service_2, created_at=thirty_one_days_ago)
job4_to_delete = create_job(sms_template_service_2, created_at=eight_days_ago)

remove_sms_email_csv_files()
remove_sms_email_jobs()

s3.remove_job_from_s3.assert_has_calls(
[
call(job1_to_delete.service_id, job1_to_delete.id),
call(job2_to_delete.service_id, job2_to_delete.id),
call(job3_to_delete.service_id, job3_to_delete.id),
call(job4_to_delete.service_id, job4_to_delete.id),
],
any_order=True,
args = s3.remove_jobs_from_s3.call_args.args[0]
assert sorted(args, key=lambda x: x.id) == sorted(
[job1_to_delete, job2_to_delete, job3_to_delete, job4_to_delete], key=lambda x: x.id
)


@freeze_time("2017-01-01 10:00:00")
def test_remove_csv_files_filters_by_type(mocker, sample_service):
mocker.patch("app.celery.nightly_tasks.s3.remove_job_from_s3")
def test_archive_jobs_by_type(mocker, sample_service):
mocker.patch("app.celery.nightly_tasks.s3.remove_jobs_from_s3")
"""
Jobs older than seven days are deleted, but only two day's worth (two-day window)
"""
Expand All @@ -156,11 +149,9 @@ def test_remove_csv_files_filters_by_type(mocker, sample_service):
job_to_delete = create_job(template=letter_template, created_at=eight_days_ago)
create_job(template=sms_template, created_at=eight_days_ago)

remove_letter_csv_files()
remove_letter_jobs()

assert s3.remove_job_from_s3.call_args_list == [
call(job_to_delete.service_id, job_to_delete.id),
]
assert s3.remove_jobs_from_s3.call_args.args[0] == [job_to_delete]


def test_should_call_delete_sms_notifications_more_than_week_in_task(notify_api, mocker):
Expand Down
18 changes: 11 additions & 7 deletions tests/app/dao/test_jobs_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,17 +351,21 @@ def test_should_get_jobs_seven_days_old_by_scheduled_for_date(sample_service):

@freeze_time("2016-10-31 10:00:00")
def test_should_get_limited_number_of_jobs(sample_template):
flexable_retention_service = create_service(service_name="Another service")
insert_service_data_retention(flexable_retention_service.id, sample_template.template_type, 3)
flexable_template = create_template(flexable_retention_service, template_type=sample_template.template_type)
flexible_retention_service1 = create_service(service_name="Another service 1")
insert_service_data_retention(flexible_retention_service1.id, sample_template.template_type, 3)
flexible_template1 = create_template(flexible_retention_service1, template_type=sample_template.template_type)

flexible_retention_service2 = create_service(service_name="Another service 2")
insert_service_data_retention(flexible_retention_service2.id, sample_template.template_type, 2)
flexible_template2 = create_template(flexible_retention_service2, template_type=sample_template.template_type)

eight_days_ago = datetime.utcnow() - timedelta(days=8)
four_days_ago = datetime.utcnow() - timedelta(days=4)

create_job(flexable_template, created_at=four_days_ago)
create_job(flexable_template, created_at=four_days_ago)
create_job(sample_template, created_at=eight_days_ago)
create_job(sample_template, created_at=eight_days_ago)
for _ in range(4):
create_job(flexible_template1, created_at=four_days_ago)
create_job(flexible_template2, created_at=four_days_ago)
create_job(sample_template, created_at=eight_days_ago)

jobs = dao_get_jobs_older_than_data_retention(notification_types=[sample_template.template_type], limit=3)

Expand Down

0 comments on commit 1d576ce

Please sign in to comment.