diff --git a/app/aws/s3.py b/app/aws/s3.py index 524eb876a9..9ebeb6d137 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -1,5 +1,6 @@ import uuid from datetime import datetime, timedelta +from typing import List import botocore import pytz @@ -7,6 +8,8 @@ from flask import current_app from notifications_utils.s3 import s3upload as utils_s3upload +from app.models import Job + FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv" @@ -60,8 +63,20 @@ def get_job_metadata_from_s3(service_id, job_id): return obj.get()["Metadata"] -def remove_job_from_s3(service_id, job_id): - return remove_s3_object(*get_job_location(service_id, job_id)) +def remove_jobs_from_s3(jobs: List[Job], batch_size=1000): + """ + Remove the files from S3 for the given jobs. + + Args: + jobs (List[Job]): The jobs whose files need to be removed from S3. + batch_size (int, optional): The number of jobs to process in each boto call. Defaults to the AWS maximum of 1000. + """ + + bucket = resource("s3").Bucket(current_app.config["CSV_UPLOAD_BUCKET_NAME"]) + + for start in range(0, len(jobs), batch_size): + object_keys = [FILE_LOCATION_STRUCTURE.format(job.service_id, job.id) for job in jobs[start : start + batch_size]] + bucket.delete_objects(Delete={"Objects": [{"Key": key} for key in object_keys]}) def get_s3_bucket_objects(bucket_name, subfolder="", older_than=7, limit_days=2): diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index 61a358fecc..4c3e5832d1 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -1,4 +1,5 @@ from datetime import datetime, timedelta +from typing import List import pytz from flask import current_app @@ -12,7 +13,7 @@ from app.config import QueueNames from app.cronitor import cronitor from app.dao.inbound_sms_dao import delete_inbound_sms_older_than_retention -from app.dao.jobs_dao import dao_archive_job, dao_get_jobs_older_than_data_retention +from app.dao.jobs_dao import dao_archive_jobs, dao_get_jobs_older_than_data_retention from app.dao.notifications_dao import ( dao_timeout_notifications, delete_notifications_older_than_retention_by_type, @@ -37,27 +38,37 @@ @notify_celery.task(name="remove_sms_email_jobs") @cronitor("remove_sms_email_jobs") @statsd(namespace="tasks") -def remove_sms_email_csv_files(): - _remove_csv_files([EMAIL_TYPE, SMS_TYPE]) +def remove_sms_email_jobs(): + """ + Remove csv files from s3 and archive email and sms jobs older than data retention period. + """ + + _archive_jobs([EMAIL_TYPE, SMS_TYPE]) @notify_celery.task(name="remove_letter_jobs") @cronitor("remove_letter_jobs") @statsd(namespace="tasks") -def remove_letter_csv_files(): - _remove_csv_files([LETTER_TYPE]) +def remove_letter_jobs(): + _archive_jobs([LETTER_TYPE]) + + +def _archive_jobs(job_types: List[str]): + """ + Remove csv files from s3 and archive jobs older than data retention period. + Args: + job_types (List[str]): list of job types to remove csv files and archive jobs for + """ -def _remove_csv_files(job_types): while True: - jobs = dao_get_jobs_older_than_data_retention(notification_types=job_types, limit=20000) + jobs = dao_get_jobs_older_than_data_retention(notification_types=job_types, limit=100) if len(jobs) == 0: break current_app.logger.info("Archiving {} jobs.".format(len(jobs))) - for job in jobs: - s3.remove_job_from_s3(job.service_id, job.id) - dao_archive_job(job) - current_app.logger.info("Job ID {} has been removed from s3.".format(job.id)) + s3.remove_jobs_from_s3(jobs) + dao_archive_jobs(jobs) + current_app.logger.info(f"Jobs archived: {[job.id for job in jobs]}") @notify_celery.task(name="delete-sms-notifications") diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index ec3b80f1ae..28a8b1f15d 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -1,5 +1,6 @@ import uuid from datetime import datetime, timedelta +from typing import Iterable from flask import current_app from notifications_utils.letter_timings import ( @@ -71,9 +72,15 @@ def dao_get_job_by_id(job_id) -> Job: return Job.query.filter_by(id=job_id).one() -def dao_archive_job(job): - job.archived = True - db.session.add(job) +def dao_archive_jobs(jobs: Iterable[Job]): + """ + Archive the given jobs. + Args: + jobs (Iterable[Job]): The jobs to archive. + """ + for job in jobs: + job.archived = True + db.session.add(job) db.session.commit() @@ -148,7 +155,7 @@ def dao_get_jobs_older_than_data_retention(notification_types, limit=None): .order_by(desc(Job.created_at)) ) if limit: - query = query.limit(limit) + query = query.limit(limit - len(jobs)) jobs.extend(query.all()) end_date = today - timedelta(days=7) diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index bae56c3f45..02de33cbba 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -1,6 +1,6 @@ import uuid from datetime import datetime, timedelta -from unittest.mock import call +from unittest.mock import Mock, call import pytest import pytz @@ -12,6 +12,7 @@ get_list_of_files_by_suffix, get_s3_bucket_objects, get_s3_file, + remove_jobs_from_s3, remove_transformed_dvla_file, upload_job_to_s3, ) @@ -214,3 +215,30 @@ def test_upload_job_to_s3(notify_api, mocker): bucket_name=current_app.config["CSV_UPLOAD_BUCKET_NAME"], file_location=f"service-{service_id}-notify/{upload_id}.csv", ) + + +def test_remove_jobs_from_s3(notify_api, mocker): + mock = Mock() + mocker.patch("app.aws.s3.resource", return_value=mock) + jobs = [ + type("Job", (object,), {"service_id": "foo", "id": "j1"}), + type("Job", (object,), {"service_id": "foo", "id": "j2"}), + type("Job", (object,), {"service_id": "foo", "id": "j3"}), + type("Job", (object,), {"service_id": "foo", "id": "j4"}), + type("Job", (object,), {"service_id": "foo", "id": "j5"}), + ] + + remove_jobs_from_s3(jobs, batch_size=2) + + mock.assert_has_calls( + [ + call.Bucket(current_app.config["CSV_UPLOAD_BUCKET_NAME"]), + call.Bucket().delete_objects( + Delete={"Objects": [{"Key": "service-foo-notify/j1.csv"}, {"Key": "service-foo-notify/j2.csv"}]} + ), + call.Bucket().delete_objects( + Delete={"Objects": [{"Key": "service-foo-notify/j3.csv"}, {"Key": "service-foo-notify/j4.csv"}]} + ), + call.Bucket().delete_objects(Delete={"Objects": [{"Key": "service-foo-notify/j5.csv"}]}), + ] + ) diff --git a/tests/app/celery/test_nightly_tasks.py b/tests/app/celery/test_nightly_tasks.py index 4cdc277db9..7de3d47b74 100644 --- a/tests/app/celery/test_nightly_tasks.py +++ b/tests/app/celery/test_nightly_tasks.py @@ -17,8 +17,8 @@ delete_sms_notifications_older_than_retention, letter_raise_alert_if_no_ack_file_for_zip, raise_alert_if_letter_notifications_still_sending, - remove_letter_csv_files, - remove_sms_email_csv_files, + remove_letter_jobs, + remove_sms_email_jobs, remove_transformed_dvla_files, s3, send_daily_performance_platform_stats, @@ -72,11 +72,11 @@ def mock_s3_get_list_diff(bucket_name, subfolder="", suffix="", last_modified=No @freeze_time("2016-10-18T10:00:00") -def test_will_remove_csv_files_for_jobs_older_than_seven_days(notify_db, notify_db_session, mocker, sample_template): +def test_will_archive_jobs_older_than_seven_days(notify_db, notify_db_session, mocker, sample_template): """ Jobs older than seven days are deleted, but only two day's worth (two-day window) """ - mocker.patch("app.celery.nightly_tasks.s3.remove_job_from_s3") + mocker.patch("app.celery.nightly_tasks.s3.remove_jobs_from_s3") seven_days_ago = datetime.utcnow() - timedelta(days=7) just_under_seven_days = seven_days_ago + timedelta(seconds=1) @@ -91,22 +91,20 @@ def test_will_remove_csv_files_for_jobs_older_than_seven_days(notify_db, notify_ dont_delete_me_1 = create_job(sample_template, created_at=seven_days_ago) create_job(sample_template, created_at=just_under_seven_days) - remove_sms_email_csv_files() + remove_sms_email_jobs() - assert s3.remove_job_from_s3.call_args_list == [ - call(job1_to_delete.service_id, job1_to_delete.id), - call(job2_to_delete.service_id, job2_to_delete.id), - ] + args = s3.remove_jobs_from_s3.call_args.args[0] + assert sorted(args, key=lambda x: x.id) == sorted([job1_to_delete, job2_to_delete], key=lambda x: x.id) assert job1_to_delete.archived is True assert dont_delete_me_1.archived is False @freeze_time("2016-10-18T10:00:00") -def test_will_remove_csv_files_for_jobs_older_than_retention_period(notify_db, notify_db_session, mocker): +def test_will_archive_jobs_older_than_retention_period(notify_db, notify_db_session, mocker): """ Jobs older than retention period are deleted, but only two day's worth (two-day window) """ - mocker.patch("app.celery.nightly_tasks.s3.remove_job_from_s3") + mocker.patch("app.celery.nightly_tasks.s3.remove_jobs_from_s3") service_1 = create_service(service_name="service 1") service_2 = create_service(service_name="service 2") create_service_data_retention(service=service_1, notification_type=SMS_TYPE, days_of_retention=3) @@ -129,22 +127,17 @@ def test_will_remove_csv_files_for_jobs_older_than_retention_period(notify_db, n job3_to_delete = create_job(email_template_service_2, created_at=thirty_one_days_ago) job4_to_delete = create_job(sms_template_service_2, created_at=eight_days_ago) - remove_sms_email_csv_files() + remove_sms_email_jobs() - s3.remove_job_from_s3.assert_has_calls( - [ - call(job1_to_delete.service_id, job1_to_delete.id), - call(job2_to_delete.service_id, job2_to_delete.id), - call(job3_to_delete.service_id, job3_to_delete.id), - call(job4_to_delete.service_id, job4_to_delete.id), - ], - any_order=True, + args = s3.remove_jobs_from_s3.call_args.args[0] + assert sorted(args, key=lambda x: x.id) == sorted( + [job1_to_delete, job2_to_delete, job3_to_delete, job4_to_delete], key=lambda x: x.id ) @freeze_time("2017-01-01 10:00:00") -def test_remove_csv_files_filters_by_type(mocker, sample_service): - mocker.patch("app.celery.nightly_tasks.s3.remove_job_from_s3") +def test_archive_jobs_by_type(mocker, sample_service): + mocker.patch("app.celery.nightly_tasks.s3.remove_jobs_from_s3") """ Jobs older than seven days are deleted, but only two day's worth (two-day window) """ @@ -156,11 +149,9 @@ def test_remove_csv_files_filters_by_type(mocker, sample_service): job_to_delete = create_job(template=letter_template, created_at=eight_days_ago) create_job(template=sms_template, created_at=eight_days_ago) - remove_letter_csv_files() + remove_letter_jobs() - assert s3.remove_job_from_s3.call_args_list == [ - call(job_to_delete.service_id, job_to_delete.id), - ] + assert s3.remove_jobs_from_s3.call_args.args[0] == [job_to_delete] def test_should_call_delete_sms_notifications_more_than_week_in_task(notify_api, mocker): diff --git a/tests/app/dao/test_jobs_dao.py b/tests/app/dao/test_jobs_dao.py index ebb8f50f01..58e3007739 100644 --- a/tests/app/dao/test_jobs_dao.py +++ b/tests/app/dao/test_jobs_dao.py @@ -351,17 +351,21 @@ def test_should_get_jobs_seven_days_old_by_scheduled_for_date(sample_service): @freeze_time("2016-10-31 10:00:00") def test_should_get_limited_number_of_jobs(sample_template): - flexable_retention_service = create_service(service_name="Another service") - insert_service_data_retention(flexable_retention_service.id, sample_template.template_type, 3) - flexable_template = create_template(flexable_retention_service, template_type=sample_template.template_type) + flexible_retention_service1 = create_service(service_name="Another service 1") + insert_service_data_retention(flexible_retention_service1.id, sample_template.template_type, 3) + flexible_template1 = create_template(flexible_retention_service1, template_type=sample_template.template_type) + + flexible_retention_service2 = create_service(service_name="Another service 2") + insert_service_data_retention(flexible_retention_service2.id, sample_template.template_type, 2) + flexible_template2 = create_template(flexible_retention_service2, template_type=sample_template.template_type) eight_days_ago = datetime.utcnow() - timedelta(days=8) four_days_ago = datetime.utcnow() - timedelta(days=4) - create_job(flexable_template, created_at=four_days_ago) - create_job(flexable_template, created_at=four_days_ago) - create_job(sample_template, created_at=eight_days_ago) - create_job(sample_template, created_at=eight_days_ago) + for _ in range(4): + create_job(flexible_template1, created_at=four_days_ago) + create_job(flexible_template2, created_at=four_days_ago) + create_job(sample_template, created_at=eight_days_ago) jobs = dao_get_jobs_older_than_data_retention(notification_types=[sample_template.template_type], limit=3)