Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDESK-7444] - Planning: Migrate planning:flag_expired command to async #2149

Merged
merged 11 commits into from
Dec 13, 2024
11 changes: 9 additions & 2 deletions server/planning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@
from superdesk import register_jinja_filter
from .common import get_formatted_address

from .commands import FlagExpiredItems, DeleteMarkedAssignments, ExportScheduledFilters, delete_spiked_items_handler
from .commands import (
flag_expired_items_handler,
DeleteMarkedAssignments,
ExportScheduledFilters,
delete_spiked_items_handler,
)
import planning.commands # noqa
import planning.feeding_services # noqa
import planning.feed_parsers # noqa
Expand Down Expand Up @@ -321,7 +326,9 @@ def init_scheduled_exports_task(app):

@celery.task(soft_time_limit=600)
def flag_expired():
FlagExpiredItems().run()
import asyncio
BrianMwangi21 marked this conversation as resolved.
Show resolved Hide resolved

asyncio.run(flag_expired_items_handler())


@celery.task(soft_time_limit=600)
Expand Down
2 changes: 1 addition & 1 deletion server/planning/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .flag_expired_items import FlagExpiredItems # noqa
from .flag_expired_items import flag_expired_items_handler # noqa
from .delete_spiked_items import delete_spiked_items_handler # noqa
from .delete_marked_assignments import DeleteMarkedAssignments # noqa
from .export_to_newsroom import ExportToNewsroom # noqa
Expand Down
339 changes: 172 additions & 167 deletions server/planning/commands/flag_expired_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,28 @@

from datetime import timedelta, datetime
from bson.objectid import ObjectId
from contextvars import ContextVar

from planning.events import EventsAsyncService
from planning.planning import PlanningAsyncService
from superdesk.core import get_app_config
from superdesk.resource_fields import ID_FIELD
from superdesk import Command, command, get_resource_service
from superdesk import get_resource_service
from superdesk.logging import logger
from superdesk.utc import utcnow
from superdesk.celery_task_utils import get_lock_id
from superdesk.lock import lock, unlock, remove_locks
from superdesk.notification import push_notification
from .async_cli import planning_cli

from planning.utils import get_related_planning_for_events, get_related_event_ids_for_planning


class FlagExpiredItems(Command):
log_msg_context: ContextVar[str] = ContextVar("log_msg", default="")


@planning_cli.command("planning:flag_expired")
async def flag_expired_items_command():
"""
Flag expired `Events` and `Planning` items with `{'expired': True}`.

Expand All @@ -33,170 +41,167 @@ class FlagExpiredItems(Command):
$ python manage.py planning:flag_expired

"""

log_msg = ""

def run(self):
now = utcnow()
self.log_msg = "Expiry Time: {}.".format(now)
logger.info("{} Starting to remove expired content at.".format(self.log_msg))

expire_interval = get_app_config("PLANNING_EXPIRY_MINUTES", 0)
if expire_interval == 0:
logger.info("{} PLANNING_EXPIRY_MINUTES=0, not flagging items as expired")
return

lock_name = get_lock_id("planning", "flag_expired")
if not lock(lock_name, expire=610):
logger.info("{} Flag expired items task is already running".format(self.log_msg))
return

expiry_datetime = now - timedelta(minutes=expire_interval)

try:
self._flag_expired_events(expiry_datetime)
except Exception as e:
logger.exception(e)

try:
self._flag_expired_planning(expiry_datetime)
except Exception as e:
logger.exception(e)

unlock(lock_name)

logger.info("{} Completed flagging expired items.".format(self.log_msg))
remove_locks()
logger.info("{} Starting to remove expired planning versions.".format(self.log_msg))
self._remove_expired_published_planning()
logger.info("{} Completed removing expired planning versions.".format(self.log_msg))

def _flag_expired_events(self, expiry_datetime):
logger.info("{} Starting to flag expired events".format(self.log_msg))
events_service = get_resource_service("events")
planning_service = get_resource_service("planning")

locked_events = set()
events_in_use = set()
events_expired = set()
plans_expired = set()

# Obtain the full list of Events that we're to process first
# As subsequent queries will change the list of returned items
events = dict()
for items in events_service.get_expired_items(expiry_datetime):
events.update({item[ID_FIELD]: item for item in items})

self._set_event_plans(events)

for event_id, event in events.items():
if event.get("lock_user"):
locked_events.add(event_id)
elif self._get_event_schedule(event) > expiry_datetime:
events_in_use.add(event_id)
else:
events_expired.add(event_id)
events_service.system_update(event_id, {"expired": True}, event)
for plan in event.get("_plans", []):
plan_id = plan[ID_FIELD]
planning_service.system_update(plan_id, {"expired": True}, plan)
plans_expired.add(plan_id)

if len(locked_events) > 0:
logger.info(
"{} Skipping {} locked Events: {}".format(self.log_msg, len(locked_events), list(locked_events))
)

if len(events_in_use) > 0:
logger.info(
"{} Skipping {} Events in use: {}".format(self.log_msg, len(events_in_use), list(events_in_use))
)

if len(events_expired) > 0:
push_notification("events:expired", items=list(events_expired))

if len(plans_expired) > 0:
push_notification("planning:expired", items=list(plans_expired))

logger.info("{} {} Events expired: {}".format(self.log_msg, len(events_expired), list(events_expired)))

def _flag_expired_planning(self, expiry_datetime):
logger.info("{} Starting to flag expired planning items".format(self.log_msg))
planning_service = get_resource_service("planning")

# Obtain the full list of Planning items that we're to process first
# As subsequent queries will change the list of returnd items
plans = dict()
for items in planning_service.get_expired_items(expiry_datetime):
plans.update({item[ID_FIELD]: item for item in items})

locked_plans = set()
plans_expired = set()

for plan_id, plan in plans.items():
if plan.get("lock_user"):
locked_plans.add(plan_id)
else:
planning_service.system_update(plan[ID_FIELD], {"expired": True}, plan)
return await flag_expired_items_handler()


async def flag_expired_items_handler():
now = utcnow()
log_msg = f"Expiry Time: {now}."
log_msg_context.set(log_msg)

logger.info(f"{log_msg} Starting to remove expired content at.")

expire_interval = get_app_config("PLANNING_EXPIRY_MINUTES", 0)
if expire_interval == 0:
logger.info(f"{log_msg} PLANNING_EXPIRY_MINUTES=0, not flagging items as expired")
return

lock_name = get_lock_id("planning", "flag_expired")
if not lock(lock_name, expire=610):
logger.info(f"{log_msg} Flag expired items task is already running")
return

expiry_datetime = now - timedelta(minutes=expire_interval)

try:
await flag_expired_events(expiry_datetime)
except Exception as e:
logger.exception(e)

try:
await flag_expired_planning(expiry_datetime)
except Exception as e:
logger.exception(e)

unlock(lock_name)

logger.info(f"{log_msg} Completed flagging expired items.")
remove_locks()
logger.info(f"{log_msg} Starting to remove expired planning versions.")
remove_expired_published_planning()
logger.info(f"{log_msg} Completed removing expired planning versions.")


async def flag_expired_events(expiry_datetime: datetime):
log_msg = log_msg_context.get()
logger.info(f"{log_msg} Starting to flag expired events")
events_service = EventsAsyncService()
planning_service = PlanningAsyncService()

locked_events = set()
events_in_use = set()
events_expired = set()
plans_expired = set()

# Obtain the full list of Events that we're to process first
# As subsequent queries will change the list of returned items
events = dict()
async for items in events_service.get_expired_items(expiry_datetime):
events.update({item[ID_FIELD]: item for item in items})

set_event_plans(events)

for event_id, event in events.items():
if event.get("lock_user"):
locked_events.add(event_id)
elif get_event_schedule(event) > expiry_datetime:
events_in_use.add(event_id)
else:
events_expired.add(event_id)
await events_service.system_update(event_id, {"expired": True})
for plan in event.get("_plans", []):
plan_id = plan[ID_FIELD]
await planning_service.system_update(plan_id, {"expired": True})
plans_expired.add(plan_id)

if len(locked_plans) > 0:
logger.info(
"{} Skipping {} locked Planning items: {}".format(self.log_msg, len(locked_plans), list(locked_plans))
)

if len(plans_expired) > 0:
push_notification("planning:expired", items=list(plans_expired))

logger.info("{} {} Planning items expired: {}".format(self.log_msg, len(plans_expired), list(plans_expired)))

@staticmethod
def _set_event_plans(events):
for plan in get_related_planning_for_events(list(events.keys()), "primary"):
for related_event_id in get_related_event_ids_for_planning(plan, "primary"):
event = events[related_event_id]
if "_plans" not in event:
event["_plans"] = []
event["_plans"].append(plan)

@staticmethod
def _get_event_schedule(event):
latest_scheduled = datetime.strptime(event["dates"]["end"], "%Y-%m-%dT%H:%M:%S%z")
for plan in event.get("_plans", []):
# First check the Planning item's planning date
# and compare to the Event's end date
if latest_scheduled < plan.get("planning_date", latest_scheduled):
latest_scheduled = plan.get("planning_date")

# Next go through all the coverage's scheduled dates
# and compare to the latest scheduled date
for planning_schedule in plan.get("_planning_schedule", []):
scheduled = planning_schedule.get("scheduled")
if scheduled and isinstance(scheduled, str):
scheduled = datetime.strptime(planning_schedule.get("scheduled"), "%Y-%m-%dT%H:%M:%S%z")

if scheduled and (latest_scheduled < scheduled):
latest_scheduled = scheduled

# Finally return the latest scheduled date among the Event, Planning and Coverages
return latest_scheduled

@staticmethod
def _remove_expired_published_planning():
"""Expire planning versions

Expiry of the planning versions mirrors the expiry of items within the publish queue in Superdesk so it uses the
same configuration value

:param self:
:return:
"""
expire_interval = get_app_config("PUBLISH_QUEUE_EXPIRY_MINUTES", 0)
if expire_interval:
expire_time = utcnow() - timedelta(minutes=expire_interval)
logger.info("Removing planning history items created before {}".format(str(expire_time)))

get_resource_service("published_planning").delete({"_id": {"$lte": ObjectId.from_datetime(expire_time)}})


command("planning:flag_expired", FlagExpiredItems())
if len(locked_events) > 0:
logger.info(f"{log_msg} Skipping {len(locked_events)} locked Events: {list(locked_events)}")

if len(events_in_use) > 0:
logger.info(f"{log_msg} Skipping {len(events_in_use)} Events in use: {list(events_in_use)}")

if len(events_expired) > 0:
push_notification("events:expired", items=list(events_expired))

if len(plans_expired) > 0:
push_notification("planning:expired", items=list(plans_expired))

logger.info(f"{log_msg} {len(events_expired)} Events expired: {list(events_expired)}")


async def flag_expired_planning(expiry_datetime: datetime):
log_msg = log_msg_context.get()
logger.info(f"{log_msg} Starting to flag expired planning items")
planning_service = PlanningAsyncService()

# Obtain the full list of Planning items that we're to process first
# As subsequent queries will change the list of returnd items
BrianMwangi21 marked this conversation as resolved.
Show resolved Hide resolved
plans = dict()
async for items in planning_service.get_expired_items(expiry_datetime):
plans.update({item[ID_FIELD]: item for item in items})

locked_plans = set()
plans_expired = set()

for plan_id, plan in plans.items():
if plan.get("lock_user"):
locked_plans.add(plan_id)
else:
await planning_service.system_update(plan[ID_FIELD], {"expired": True})
plans_expired.add(plan_id)

if len(locked_plans) > 0:
logger.info(f"{log_msg} Skipping {len(locked_plans)} locked Planning items: {list(locked_plans)}")

if len(plans_expired) > 0:
push_notification("planning:expired", items=list(plans_expired))

logger.info(f"{log_msg} {len(plans_expired)} Planning items expired: {list(plans_expired)}")


def set_event_plans(events):
BrianMwangi21 marked this conversation as resolved.
Show resolved Hide resolved
for plan in get_related_planning_for_events(list(events.keys()), "primary"):
for related_event_id in get_related_event_ids_for_planning(plan, "primary"):
event = events[related_event_id]
if "_plans" not in event:
event["_plans"] = []
event["_plans"].append(plan)


def get_event_schedule(event):
BrianMwangi21 marked this conversation as resolved.
Show resolved Hide resolved
latest_scheduled = datetime.strptime(event["dates"]["end"], "%Y-%m-%dT%H:%M:%S%z")
for plan in event.get("_plans", []):
# First check the Planning item's planning date
# and compare to the Event's end date
if latest_scheduled < plan.get("planning_date", latest_scheduled):
latest_scheduled = plan.get("planning_date")

# Next go through all the coverage's scheduled dates
# and compare to the latest scheduled date
for planning_schedule in plan.get("_planning_schedule", []):
scheduled = planning_schedule.get("scheduled")
if scheduled and isinstance(scheduled, str):
scheduled = datetime.strptime(planning_schedule.get("scheduled"), "%Y-%m-%dT%H:%M:%S%z")

if scheduled and (latest_scheduled < scheduled):
latest_scheduled = scheduled

# Finally return the latest scheduled date among the Event, Planning and Coverages
return latest_scheduled


def remove_expired_published_planning():
"""Expire planning versions

Expiry of the planning versions mirrors the expiry of items within the publish queue in Superdesk so it uses the
same configuration value

:param self:
:return:
"""
expire_interval = get_app_config("PUBLISH_QUEUE_EXPIRY_MINUTES", 0)
if expire_interval:
expire_time = utcnow() - timedelta(minutes=expire_interval)
logger.info("Removing planning history items created before {}".format(str(expire_time)))

get_resource_service("published_planning").delete({"_id": {"$lte": ObjectId.from_datetime(expire_time)}})
Loading
Loading