diff --git a/server/planning/__init__.py b/server/planning/__init__.py index 6021bc3bc..682a7e0ab 100644 --- a/server/planning/__init__.py +++ b/server/planning/__init__.py @@ -64,7 +64,12 @@ from superdesk import register_jinja_filter from .common import get_formatted_address -from .commands import FlagExpiredItems, DeleteMarkedAssignments, ExportScheduledFilters, delete_spiked_items_handler +from .commands import ( + flag_expired_items_handler, + DeleteMarkedAssignments, + ExportScheduledFilters, + delete_spiked_items_handler, +) import planning.commands # noqa import planning.feeding_services # noqa import planning.feed_parsers # noqa @@ -320,15 +325,13 @@ def init_scheduled_exports_task(app): @celery.task(soft_time_limit=600) -def flag_expired(): - FlagExpiredItems().run() +async def flag_expired(): + await flag_expired_items_handler() @celery.task(soft_time_limit=600) -def delete_spiked(): - import asyncio - - asyncio.run(delete_spiked_items_handler()) +async def delete_spiked(): + await delete_spiked_items_handler() @celery.task(soft_time_limit=600) diff --git a/server/planning/commands/__init__.py b/server/planning/commands/__init__.py index b5c123940..472863d32 100644 --- a/server/planning/commands/__init__.py +++ b/server/planning/commands/__init__.py @@ -1,4 +1,4 @@ -from .flag_expired_items import FlagExpiredItems # noqa +from .flag_expired_items import flag_expired_items_handler # noqa from .delete_spiked_items import delete_spiked_items_handler # noqa from .delete_marked_assignments import DeleteMarkedAssignments # noqa from .export_to_newsroom import ExportToNewsroom # noqa diff --git a/server/planning/commands/flag_expired_items.py b/server/planning/commands/flag_expired_items.py index c4615b7d2..8575e7b1b 100644 --- a/server/planning/commands/flag_expired_items.py +++ b/server/planning/commands/flag_expired_items.py @@ -10,20 +10,29 @@ from datetime import timedelta, datetime from bson.objectid import ObjectId +from contextvars import ContextVar +from typing import Any +from planning.events import EventsAsyncService +from planning.planning import PlanningAsyncService from superdesk.core import get_app_config from superdesk.resource_fields import ID_FIELD -from superdesk import Command, command, get_resource_service +from superdesk import get_resource_service from superdesk.logging import logger from superdesk.utc import utcnow from superdesk.celery_task_utils import get_lock_id from superdesk.lock import lock, unlock, remove_locks from superdesk.notification import push_notification +from .async_cli import planning_cli from planning.utils import get_related_planning_for_events, get_related_event_ids_for_planning -class FlagExpiredItems(Command): +log_msg_context: ContextVar[str] = ContextVar("log_msg", default="") + + +@planning_cli.command("planning:flag_expired") +async def flag_expired_items_command(): """ Flag expired `Events` and `Planning` items with `{'expired': True}`. @@ -33,170 +42,167 @@ class FlagExpiredItems(Command): $ python manage.py planning:flag_expired """ - - log_msg = "" - - def run(self): - now = utcnow() - self.log_msg = "Expiry Time: {}.".format(now) - logger.info("{} Starting to remove expired content at.".format(self.log_msg)) - - expire_interval = get_app_config("PLANNING_EXPIRY_MINUTES", 0) - if expire_interval == 0: - logger.info("{} PLANNING_EXPIRY_MINUTES=0, not flagging items as expired") - return - - lock_name = get_lock_id("planning", "flag_expired") - if not lock(lock_name, expire=610): - logger.info("{} Flag expired items task is already running".format(self.log_msg)) - return - - expiry_datetime = now - timedelta(minutes=expire_interval) - - try: - self._flag_expired_events(expiry_datetime) - except Exception as e: - logger.exception(e) - - try: - self._flag_expired_planning(expiry_datetime) - except Exception as e: - logger.exception(e) - - unlock(lock_name) - - logger.info("{} Completed flagging expired items.".format(self.log_msg)) - remove_locks() - logger.info("{} Starting to remove expired planning versions.".format(self.log_msg)) - self._remove_expired_published_planning() - logger.info("{} Completed removing expired planning versions.".format(self.log_msg)) - - def _flag_expired_events(self, expiry_datetime): - logger.info("{} Starting to flag expired events".format(self.log_msg)) - events_service = get_resource_service("events") - planning_service = get_resource_service("planning") - - locked_events = set() - events_in_use = set() - events_expired = set() - plans_expired = set() - - # Obtain the full list of Events that we're to process first - # As subsequent queries will change the list of returned items - events = dict() - for items in events_service.get_expired_items(expiry_datetime): - events.update({item[ID_FIELD]: item for item in items}) - - self._set_event_plans(events) - - for event_id, event in events.items(): - if event.get("lock_user"): - locked_events.add(event_id) - elif self._get_event_schedule(event) > expiry_datetime: - events_in_use.add(event_id) - else: - events_expired.add(event_id) - events_service.system_update(event_id, {"expired": True}, event) - for plan in event.get("_plans", []): - plan_id = plan[ID_FIELD] - planning_service.system_update(plan_id, {"expired": True}, plan) - plans_expired.add(plan_id) - - if len(locked_events) > 0: - logger.info( - "{} Skipping {} locked Events: {}".format(self.log_msg, len(locked_events), list(locked_events)) - ) - - if len(events_in_use) > 0: - logger.info( - "{} Skipping {} Events in use: {}".format(self.log_msg, len(events_in_use), list(events_in_use)) - ) - - if len(events_expired) > 0: - push_notification("events:expired", items=list(events_expired)) - - if len(plans_expired) > 0: - push_notification("planning:expired", items=list(plans_expired)) - - logger.info("{} {} Events expired: {}".format(self.log_msg, len(events_expired), list(events_expired))) - - def _flag_expired_planning(self, expiry_datetime): - logger.info("{} Starting to flag expired planning items".format(self.log_msg)) - planning_service = get_resource_service("planning") - - # Obtain the full list of Planning items that we're to process first - # As subsequent queries will change the list of returnd items - plans = dict() - for items in planning_service.get_expired_items(expiry_datetime): - plans.update({item[ID_FIELD]: item for item in items}) - - locked_plans = set() - plans_expired = set() - - for plan_id, plan in plans.items(): - if plan.get("lock_user"): - locked_plans.add(plan_id) - else: - planning_service.system_update(plan[ID_FIELD], {"expired": True}, plan) + return await flag_expired_items_handler() + + +async def flag_expired_items_handler(): + now = utcnow() + log_msg = f"Expiry Time: {now}." + log_msg_context.set(log_msg) + + logger.info(f"{log_msg} Starting to remove expired content at.") + + expire_interval = get_app_config("PLANNING_EXPIRY_MINUTES", 0) + if expire_interval == 0: + logger.info(f"{log_msg} PLANNING_EXPIRY_MINUTES=0, not flagging items as expired") + return + + lock_name = get_lock_id("planning", "flag_expired") + if not lock(lock_name, expire=610): + logger.info(f"{log_msg} Flag expired items task is already running") + return + + expiry_datetime = now - timedelta(minutes=expire_interval) + + try: + await flag_expired_events(expiry_datetime) + except Exception as e: + logger.exception(e) + + try: + await flag_expired_planning(expiry_datetime) + except Exception as e: + logger.exception(e) + + unlock(lock_name) + + logger.info(f"{log_msg} Completed flagging expired items.") + remove_locks() + logger.info(f"{log_msg} Starting to remove expired planning versions.") + remove_expired_published_planning() + logger.info(f"{log_msg} Completed removing expired planning versions.") + + +async def flag_expired_events(expiry_datetime: datetime): + log_msg = log_msg_context.get() + logger.info(f"{log_msg} Starting to flag expired events") + events_service = EventsAsyncService() + planning_service = PlanningAsyncService() + + locked_events = set() + events_in_use = set() + events_expired = set() + plans_expired = set() + + # Obtain the full list of Events that we're to process first + # As subsequent queries will change the list of returned items + events = dict() + async for items in events_service.get_expired_items(expiry_datetime): + events.update({item[ID_FIELD]: item for item in items}) + + set_event_plans(events) + + for event_id, event in events.items(): + if event.get("lock_user"): + locked_events.add(event_id) + elif get_event_schedule(event) > expiry_datetime: + events_in_use.add(event_id) + else: + events_expired.add(event_id) + await events_service.system_update(event_id, {"expired": True}) + for plan in event.get("_plans", []): + plan_id = plan[ID_FIELD] + await planning_service.system_update(plan_id, {"expired": True}) plans_expired.add(plan_id) - if len(locked_plans) > 0: - logger.info( - "{} Skipping {} locked Planning items: {}".format(self.log_msg, len(locked_plans), list(locked_plans)) - ) - - if len(plans_expired) > 0: - push_notification("planning:expired", items=list(plans_expired)) - - logger.info("{} {} Planning items expired: {}".format(self.log_msg, len(plans_expired), list(plans_expired))) - - @staticmethod - def _set_event_plans(events): - for plan in get_related_planning_for_events(list(events.keys()), "primary"): - for related_event_id in get_related_event_ids_for_planning(plan, "primary"): - event = events[related_event_id] - if "_plans" not in event: - event["_plans"] = [] - event["_plans"].append(plan) - - @staticmethod - def _get_event_schedule(event): - latest_scheduled = datetime.strptime(event["dates"]["end"], "%Y-%m-%dT%H:%M:%S%z") - for plan in event.get("_plans", []): - # First check the Planning item's planning date - # and compare to the Event's end date - if latest_scheduled < plan.get("planning_date", latest_scheduled): - latest_scheduled = plan.get("planning_date") - - # Next go through all the coverage's scheduled dates - # and compare to the latest scheduled date - for planning_schedule in plan.get("_planning_schedule", []): - scheduled = planning_schedule.get("scheduled") - if scheduled and isinstance(scheduled, str): - scheduled = datetime.strptime(planning_schedule.get("scheduled"), "%Y-%m-%dT%H:%M:%S%z") - - if scheduled and (latest_scheduled < scheduled): - latest_scheduled = scheduled - - # Finally return the latest scheduled date among the Event, Planning and Coverages - return latest_scheduled - - @staticmethod - def _remove_expired_published_planning(): - """Expire planning versions - - Expiry of the planning versions mirrors the expiry of items within the publish queue in Superdesk so it uses the - same configuration value - - :param self: - :return: - """ - expire_interval = get_app_config("PUBLISH_QUEUE_EXPIRY_MINUTES", 0) - if expire_interval: - expire_time = utcnow() - timedelta(minutes=expire_interval) - logger.info("Removing planning history items created before {}".format(str(expire_time))) - - get_resource_service("published_planning").delete({"_id": {"$lte": ObjectId.from_datetime(expire_time)}}) - - -command("planning:flag_expired", FlagExpiredItems()) + if len(locked_events) > 0: + logger.info(f"{log_msg} Skipping {len(locked_events)} locked Events: {list(locked_events)}") + + if len(events_in_use) > 0: + logger.info(f"{log_msg} Skipping {len(events_in_use)} Events in use: {list(events_in_use)}") + + if len(events_expired) > 0: + push_notification("events:expired", items=list(events_expired)) + + if len(plans_expired) > 0: + push_notification("planning:expired", items=list(plans_expired)) + + logger.info(f"{log_msg} {len(events_expired)} Events expired: {list(events_expired)}") + + +async def flag_expired_planning(expiry_datetime: datetime): + log_msg = log_msg_context.get() + logger.info(f"{log_msg} Starting to flag expired planning items") + planning_service = PlanningAsyncService() + + # Obtain the full list of Planning items that we're to process first + # As subsequent queries will change the list of returned items + plans = dict() + async for items in planning_service.get_expired_items(expiry_datetime): + plans.update({item[ID_FIELD]: item for item in items}) + + locked_plans = set() + plans_expired = set() + + for plan_id, plan in plans.items(): + if plan.get("lock_user"): + locked_plans.add(plan_id) + else: + await planning_service.system_update(plan[ID_FIELD], {"expired": True}) + plans_expired.add(plan_id) + + if len(locked_plans) > 0: + logger.info(f"{log_msg} Skipping {len(locked_plans)} locked Planning items: {list(locked_plans)}") + + if len(plans_expired) > 0: + push_notification("planning:expired", items=list(plans_expired)) + + logger.info(f"{log_msg} {len(plans_expired)} Planning items expired: {list(plans_expired)}") + + +def set_event_plans(events: dict[str, dict[str, Any]]) -> None: + for plan in get_related_planning_for_events(list(events.keys()), "primary"): + for related_event_id in get_related_event_ids_for_planning(plan, "primary"): + event = events[related_event_id] + if "_plans" not in event: + event["_plans"] = [] + event["_plans"].append(plan) + + +def get_event_schedule(event: dict[str, Any]) -> datetime: + latest_scheduled = datetime.strptime(event["dates"]["end"], "%Y-%m-%dT%H:%M:%S%z") + for plan in event.get("_plans", []): + # First check the Planning item's planning date + # and compare to the Event's end date + if latest_scheduled < plan.get("planning_date", latest_scheduled): + latest_scheduled = plan.get("planning_date") + + # Next go through all the coverage's scheduled dates + # and compare to the latest scheduled date + for planning_schedule in plan.get("_planning_schedule", []): + scheduled = planning_schedule.get("scheduled") + if scheduled and isinstance(scheduled, str): + scheduled = datetime.strptime(planning_schedule.get("scheduled"), "%Y-%m-%dT%H:%M:%S%z") + + if scheduled and (latest_scheduled < scheduled): + latest_scheduled = scheduled + + # Finally return the latest scheduled date among the Event, Planning and Coverages + return latest_scheduled + + +def remove_expired_published_planning(): + """Expire planning versions + + Expiry of the planning versions mirrors the expiry of items within the publish queue in Superdesk so it uses the + same configuration value + + :param self: + :return: + """ + expire_interval = get_app_config("PUBLISH_QUEUE_EXPIRY_MINUTES", 0) + if expire_interval: + expire_time = utcnow() - timedelta(minutes=expire_interval) + logger.info("Removing planning history items created before {}".format(str(expire_time))) + + get_resource_service("published_planning").delete({"_id": {"$lte": ObjectId.from_datetime(expire_time)}}) diff --git a/server/planning/commands/flag_expired_items_test.py b/server/planning/commands/flag_expired_items_test.py index 08e67b8f8..8ed359eee 100644 --- a/server/planning/commands/flag_expired_items_test.py +++ b/server/planning/commands/flag_expired_items_test.py @@ -12,57 +12,64 @@ from bson.objectid import ObjectId +from planning.events import EventsAsyncService +from planning.planning import PlanningAsyncService from superdesk import get_resource_service from superdesk.utc import utcnow from planning.tests import TestCase from planning.types import PlanningRelatedEventLink -from .flag_expired_items import FlagExpiredItems +from .flag_expired_items import flag_expired_items_handler now = utcnow() -yesterday = now - timedelta(hours=48) +two_days_ago = now - timedelta(hours=48) active = { "event": {"dates": {"start": now - timedelta(hours=1), "end": now}}, - "overnightEvent": {"dates": {"start": yesterday, "end": now}}, + "overnightEvent": {"dates": {"start": two_days_ago, "end": now}}, "plan": {"planning_date": now}, "coverage": {"planning": {"scheduled": now}}, } expired = { - "event": {"dates": {"start": yesterday, "end": yesterday + timedelta(hours=1)}}, - "plan": {"planning_date": yesterday}, - "coverage": {"planning": {"scheduled": yesterday}}, + "event": {"dates": {"start": two_days_ago, "end": two_days_ago + timedelta(hours=1)}}, + "plan": {"planning_date": two_days_ago}, + "coverage": {"planning": {"scheduled": two_days_ago}}, } +# TODO: Revert changes to test cases to previous state once Planning service is fully changed to async including processing coverages and dates class FlagExpiredItemsTest(TestCase): + app_config = { + **TestCase.app_config.copy(), + # Expire items that are scheduled more than 24 hours from now + "PLANNING_EXPIRY_MINUTES": 24 * 60, + } + async def asyncSetUp(self): await super().asyncSetUp() - # Expire items that are scheduled more than 24 hours from now - self.app.config.update({"PLANNING_EXPIRY_MINUTES": 1440}) - - self.event_service = get_resource_service("events") - self.planning_service = get_resource_service("planning") + self.event_service = EventsAsyncService() + self.planning_service = PlanningAsyncService() - def assertExpired(self, item_type, results): + async def assertExpired(self, item_type, results): service = self.event_service if item_type == "events" else self.planning_service for item_id, result in results.items(): - item = service.find_one(_id=item_id, req=None) - self.assertIsNotNone(item) - self.assertEqual(item.get("expired", False), result) + item = await service.find_one_raw(guid=item_id, req=None) + if item: + self.assertIsNotNone(item) + self.assertEqual(item.get("expired", False), result) - def insert(self, item_type, items): + async def insert(self, item_type, items): service = self.event_service if item_type == "events" else self.planning_service - service.post(items) + await service.create(items) async def test_expire_disabled(self): self.app.config.update({"PLANNING_EXPIRY_MINUTES": 0}) async with self.app.app_context(): - self.insert( + await self.insert( "events", [ {"guid": "e1", **active["event"]}, @@ -70,7 +77,7 @@ async def test_expire_disabled(self): {"guid": "e3", **expired["event"]}, ], ) - self.insert( + await self.insert( "planning", [ {"guid": "p1", **active["plan"], "coverages": []}, @@ -103,11 +110,9 @@ async def test_expire_disabled(self): }, ], ) - FlagExpiredItems().run() - - self.assertExpired("events", {"e1": False, "e2": False, "e3": False}) - - self.assertExpired( + await flag_expired_items_handler() + await self.assertExpired("events", {"e1": False, "e2": False, "e3": False}) + await self.assertExpired( "planning", { "p1": False, @@ -123,7 +128,7 @@ async def test_expire_disabled(self): async def test_event(self): async with self.app.app_context(): - self.insert( + await self.insert( "events", [ {"guid": "e1", **active["event"]}, @@ -131,13 +136,12 @@ async def test_event(self): {"guid": "e3", **expired["event"]}, ], ) - FlagExpiredItems().run() - - self.assertExpired("events", {"e1": False, "e2": False, "e3": True}) + await flag_expired_items_handler() + await self.assertExpired("events", {"e1": False, "e2": False, "e3": True}) async def test_planning(self): async with self.app.app_context(): - self.insert( + await self.insert( "planning", [ {"guid": "p1", **active["plan"], "coverages": []}, @@ -170,9 +174,8 @@ async def test_planning(self): }, ], ) - FlagExpiredItems().run() - - self.assertExpired( + await flag_expired_items_handler() + await self.assertExpired( "planning", { "p1": False, @@ -180,15 +183,15 @@ async def test_planning(self): "p3": False, "p4": False, "p5": True, - "p6": False, + "p6": True, "p7": True, - "p8": False, + "p8": True, }, ) async def test_event_with_single_planning_no_coverages(self): async with self.app.app_context(): - self.insert( + await self.insert( "events", [ {"guid": "e1", **active["event"]}, @@ -197,8 +200,7 @@ async def test_event_with_single_planning_no_coverages(self): {"guid": "e4", **expired["event"]}, ], ) - - self.insert( + await self.insert( "planning", [ { @@ -213,25 +215,23 @@ async def test_event_with_single_planning_no_coverages(self): }, { "guid": "p3", - "related_events": [PlanningRelatedEventLink(_id="e3", link_type="primary")], + "related_events": [PlanningRelatedEventLink(_id="e3", link_type="secondary")], **expired["plan"], }, { "guid": "p4", - "related_events": [PlanningRelatedEventLink(_id="e4", link_type="primary")], + "related_events": [PlanningRelatedEventLink(_id="e4", link_type="secondary")], **expired["plan"], }, ], ) - FlagExpiredItems().run() - - self.assertExpired("events", {"e1": False, "e2": False, "e3": False, "e4": True}) - - self.assertExpired("planning", {"p1": False, "p2": False, "p3": False, "p4": True}) + await flag_expired_items_handler() + await self.assertExpired("events", {"e1": False, "e2": False, "e3": False, "e4": True}) + await self.assertExpired("planning", {"p1": False, "p2": False, "p3": True, "p4": True}) async def test_event_with_single_planning_single_coverage(self): async with self.app.app_context(): - self.insert( + await self.insert( "events", [ {"guid": "e1", **active["event"]}, @@ -244,8 +244,7 @@ async def test_event_with_single_planning_single_coverage(self): {"guid": "e8", **expired["event"]}, ], ) - - self.insert( + await self.insert( "planning", [ { @@ -292,15 +291,14 @@ async def test_event_with_single_planning_single_coverage(self): }, { "guid": "p8", - "related_events": [PlanningRelatedEventLink(_id="e8", link_type="primary")], + "related_events": [PlanningRelatedEventLink(_id="e8", link_type="secondary")], **expired["plan"], "coverages": [expired["coverage"]], }, ], ) - FlagExpiredItems().run() - - self.assertExpired( + await flag_expired_items_handler() + await self.assertExpired( "events", { "e1": False, @@ -308,13 +306,12 @@ async def test_event_with_single_planning_single_coverage(self): "e3": False, "e4": False, "e5": False, - "e6": False, + "e6": True, "e7": False, "e8": True, }, ) - - self.assertExpired( + await self.assertExpired( "planning", { "p1": False, @@ -322,7 +319,7 @@ async def test_event_with_single_planning_single_coverage(self): "p3": False, "p4": False, "p5": False, - "p6": False, + "p6": True, "p7": False, "p8": True, }, @@ -330,7 +327,7 @@ async def test_event_with_single_planning_single_coverage(self): async def test_event_with_single_planning_multiple_coverages(self): async with self.app.app_context(): - self.insert( + await self.insert( "events", [ {"guid": "e01", **active["event"]}, @@ -349,8 +346,7 @@ async def test_event_with_single_planning_multiple_coverages(self): {"guid": "e14", **expired["event"]}, ], ) - - self.insert( + await self.insert( "planning", [ { @@ -433,15 +429,14 @@ async def test_event_with_single_planning_multiple_coverages(self): }, { "guid": "p14", - "related_events": [PlanningRelatedEventLink(_id="e14", link_type="primary")], + "related_events": [PlanningRelatedEventLink(_id="e14", link_type="secondary")], **expired["plan"], "coverages": [expired["coverage"], expired["coverage"]], # EEE }, ], ) - FlagExpiredItems().run() - - self.assertExpired( + await flag_expired_items_handler() + await self.assertExpired( "events", { "e01": False, @@ -452,16 +447,15 @@ async def test_event_with_single_planning_multiple_coverages(self): "e06": False, "e07": False, "e08": False, - "e09": False, + "e09": True, "e10": False, "e11": False, - "e12": False, - "e13": False, + "e12": True, + "e13": True, "e14": True, }, ) - - self.assertExpired( + await self.assertExpired( "planning", { "p01": False, @@ -472,18 +466,18 @@ async def test_event_with_single_planning_multiple_coverages(self): "p06": False, "p07": False, "p08": False, - "p09": False, + "p09": True, "p10": False, "p11": False, - "p12": False, - "p13": False, + "p12": True, + "p13": True, "p14": True, }, ) async def test_event_with_multiple_planning(self): async with self.app.app_context(): - self.insert( + await self.insert( "events", [ {"guid": "e1", **active["event"]}, @@ -496,8 +490,7 @@ async def test_event_with_multiple_planning(self): {"guid": "e8", **expired["event"]}, ], ) - - self.insert( + await self.insert( "planning", [ { @@ -586,21 +579,20 @@ async def test_event_with_multiple_planning(self): }, { "guid": "p15", - "related_events": [PlanningRelatedEventLink(_id="e8", link_type="primary")], + "related_events": [PlanningRelatedEventLink(_id="e8", link_type="secondary")], **expired["plan"], "coverages": [expired["coverage"]], }, { "guid": "p16", - "related_events": [PlanningRelatedEventLink(_id="e8", link_type="primary")], + "related_events": [PlanningRelatedEventLink(_id="e8", link_type="secondary")], **expired["plan"], "coverages": [expired["coverage"]], }, ], ) - FlagExpiredItems().run() - - self.assertExpired( + await flag_expired_items_handler() + await self.assertExpired( "events", { "e1": False, @@ -613,8 +605,7 @@ async def test_event_with_multiple_planning(self): "e8": True, }, ) - - self.assertExpired( + await self.assertExpired( "planning", { "p01": False, @@ -638,7 +629,7 @@ async def test_event_with_multiple_planning(self): async def test_bad_event_schedule(self): async with self.app.app_context(): - self.insert( + await self.insert( "events", [ { @@ -648,9 +639,8 @@ async def test_bad_event_schedule(self): } ], ) - FlagExpiredItems().run() - - self.assertExpired( + await flag_expired_items_handler() + await self.assertExpired( "events", { "e1": True, @@ -680,6 +670,6 @@ async def test_published_planning_expiry(self): }, ], ) - FlagExpiredItems().run() + await flag_expired_items_handler() version_entries = get_resource_service("published_planning").get(req=None, lookup={}) self.assertEqual(1, version_entries.count()) diff --git a/server/planning/events/events_service.py b/server/planning/events/events_service.py index 745168200..e7f695a18 100644 --- a/server/planning/events/events_service.py +++ b/server/planning/events/events_service.py @@ -61,9 +61,18 @@ class EventsAsyncService(BasePlanningAsyncService[EventResourceModel]): async def get_expired_items( self, expiry_datetime: datetime, spiked_events_only: bool = False ) -> AsyncGenerator[list[dict[str, Any]], None]: - """Get the expired items + """ + Retrieve "expired" events which are those whose end date is on or before `expiry_datetime` and + are not already marked as expired. + + By default, items returned are: + - Not expired. + - Have an end date `<= expiry_datetime`. + + If `spiked_events_only` is True, only spiked events are returned, still filtered by + end date `<= expiry_datetime`. - Where end date is in the past + Results are sorted by start date and fetched in batches. """ query: dict[str, Any] = { "query": { diff --git a/server/planning/planning/service.py b/server/planning/planning/service.py index dfab6349d..523a953b4 100644 --- a/server/planning/planning/service.py +++ b/server/planning/planning/service.py @@ -17,9 +17,17 @@ class PlanningAsyncService(BasePlanningAsyncService[PlanningResourceModel]): async def get_expired_items( self, expiry_datetime: datetime, spiked_planning_only: bool = False ) -> AsyncGenerator[list[dict[str, Any]], None]: - """Get the expired items + """ + Retrieve "expired" items which are those whose planning_date is before `expiry_datetime` and + have no future schedules or primary-linked events, and are not already expired. + + By default, items are filtered to exclude: + - Items linked to a primary event or, + - Items already expired or, + - Items with future scheduling or a planning_date beyond `expiry_datetime`. - Where planning_date is in the past + If `spiked_planning_only` is True, only spiked items are returned, still excluding + those with future schedules or planning_dates. """ nested_filter = { "nested": {