From 403504846be04d101d1980c1c5b736168d1d4358 Mon Sep 17 00:00:00 2001 From: Helmy Giacoman Date: Fri, 6 Dec 2024 19:00:51 +0100 Subject: [PATCH] Migrated `create` & `on_create` methods Along with another set of utils functions SDESK-7442 --- .../planning/commands/delete_spiked_items.py | 2 +- .../commands/export_to_newsroom_test.py | 51 ++- server/planning/events/__init__.py | 2 +- server/planning/events/events.py | 19 +- server/planning/events/events_service.py | 402 ++++++++++++++++++ .../planning/events/events_sync/__init__.py | 17 +- server/planning/events/events_utils.py | 112 ++++- server/planning/events/module.py | 2 +- server/planning/events/service.py | 168 -------- server/planning/events/utils.py | 92 ---- server/planning/tests/__init__.py | 1 + server/planning/types/common.py | 7 +- server/planning/types/enums.py | 6 +- server/planning/types/event.py | 8 +- server/planning/types/event_dates.py | 11 +- 15 files changed, 582 insertions(+), 318 deletions(-) create mode 100644 server/planning/events/events_service.py delete mode 100644 server/planning/events/service.py delete mode 100644 server/planning/events/utils.py diff --git a/server/planning/commands/delete_spiked_items.py b/server/planning/commands/delete_spiked_items.py index 2a0753c5d..6a81e4556 100644 --- a/server/planning/commands/delete_spiked_items.py +++ b/server/planning/commands/delete_spiked_items.py @@ -19,7 +19,7 @@ from superdesk.lock import lock, unlock, remove_locks from planning.common import WORKFLOW_STATE from planning.events import EventsAsyncService -from planning.events.utils import get_recurring_timeline +from planning.events.events_utils import get_recurring_timeline from planning.planning import PlanningAsyncService from planning.assignments import AssignmentsAsyncService from .async_cli import planning_cli diff --git a/server/planning/commands/export_to_newsroom_test.py b/server/planning/commands/export_to_newsroom_test.py index 36ca93889..774f891cb 100644 --- a/server/planning/commands/export_to_newsroom_test.py +++ b/server/planning/commands/export_to_newsroom_test.py @@ -7,12 +7,21 @@ # For the full copyright and license information, please see the # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license + import mock +from bson import ObjectId from datetime import timedelta -from .export_to_newsroom import ExportToNewsroom -from superdesk import get_resource_service + +from planning.types.event import EventResourceModel + +from superdesk.flask import g from superdesk.utc import utcnow +from superdesk import get_resource_service + from planning.tests import TestCase +from planning.events.events_service import EventsAsyncService + +from .export_to_newsroom import ExportToNewsroom class MockTransmitter: @@ -27,17 +36,24 @@ def transmit(self, queue_item): class ExportToNewsroomTest(TestCase): - def setUp(self): - super().setUp() + async def asyncSetUp(self): + await super().asyncSetUp() - self.event_service = get_resource_service("events") + self.event_service = EventsAsyncService() self.planning_service = get_resource_service("planning") - def setUp_data(self): + def setup_user(self): + user = {"_id": ObjectId()} + self.app.data.insert("users", [user]) + g.user = user + + async def setup_data(self): utc_now = utcnow() + self.setup_user() + events = [ { - "_id": "draft", + "id": "draft", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -48,7 +64,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "scheduled", + "id": "scheduled", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -60,7 +76,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "postponed", + "id": "postponed", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -72,7 +88,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "rescheduled", + "id": "rescheduled", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -84,7 +100,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "cancelled", + "id": "cancelled", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -96,7 +112,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "killed", + "id": "killed", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -108,7 +124,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "postponed-not-published", + "id": "postponed-not-published", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -119,7 +135,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "rescheduled-not-published", + "id": "rescheduled-not-published", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -130,7 +146,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "cancelled-not-published", + "id": "cancelled-not-published", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -141,6 +157,7 @@ def setUp_data(self): "type": "event", }, ] + events = [EventResourceModel.from_dict(ev) for ev in events] planning = [ { @@ -213,13 +230,13 @@ def setUp_data(self): }, ] - self.event_service.create(events) + await self.event_service.create(events) self.planning_service.create(planning) @mock.patch("planning.commands.export_to_newsroom.NewsroomHTTPTransmitter") async def test_events_events_planning(self, mock_transmitter): async with self.app.app_context(): - self.setUp_data() + await self.setup_data() mock_transmitter.return_value = MockTransmitter() ExportToNewsroom().run(assets_url="foo", resource_url="bar") diff --git a/server/planning/events/__init__.py b/server/planning/events/__init__.py index fd3d23be1..9072af7c1 100644 --- a/server/planning/events/__init__.py +++ b/server/planning/events/__init__.py @@ -44,7 +44,7 @@ ) from planning.autosave import AutosaveService -from .service import EventsAsyncService +from .events_service import EventsAsyncService from .module import events_resource_config __all__ = [ diff --git a/server/planning/events/events.py b/server/planning/events/events.py index 5866348e8..49b179278 100644 --- a/server/planning/events/events.py +++ b/server/planning/events/events.py @@ -51,7 +51,6 @@ from planning.types import ( Event, EmbeddedPlanning, - EmbeddedCoverageItem, PlanningRelatedEventLink, PLANNING_RELATED_EVENT_LINK_TYPE, ) @@ -74,7 +73,6 @@ set_ingest_version_datetime, is_new_version, update_ingest_on_patch, - TEMP_ID_PREFIX, ) from planning.utils import ( get_planning_event_link_method, @@ -84,6 +82,7 @@ from .events_base_service import EventsBaseService from .events_schema import events_schema from .events_sync import sync_event_metadata_with_planning_items +from .events_utils import get_events_embedded_planning logger = logging.getLogger(__name__) @@ -99,22 +98,6 @@ } -def get_events_embedded_planning(event: Event) -> List[EmbeddedPlanning]: - def get_coverage_id(coverage: EmbeddedCoverageItem) -> str: - if not coverage.get("coverage_id"): - coverage["coverage_id"] = TEMP_ID_PREFIX + "-" + generate_guid(type=GUID_NEWSML) - return coverage["coverage_id"] - - return [ - EmbeddedPlanning( - planning_id=planning.get("planning_id"), - update_method=planning.get("update_method") or "single", - coverages={get_coverage_id(coverage): coverage for coverage in planning.get("coverages") or []}, - ) - for planning in event.pop("embedded_planning", []) - ] - - def get_subject_str(subject: Dict[str, str]) -> str: return ":".join( [ diff --git a/server/planning/events/events_service.py b/server/planning/events/events_service.py new file mode 100644 index 000000000..3896dda13 --- /dev/null +++ b/server/planning/events/events_service.py @@ -0,0 +1,402 @@ +import pytz +import itertools + +from bson import ObjectId +from typing import Any, AsyncGenerator +from datetime import datetime, timedelta +from apps.auth import get_user, get_user_id + +from superdesk import get_resource_service +from superdesk.errors import SuperdeskApiError +from superdesk.metadata.item import GUID_NEWSML +from superdesk.core import get_app_config, get_current_app +from superdesk.core.utils import date_to_str, generate_guid + +from planning.types import ( + PLANNING_RELATED_EVENT_LINK_TYPE, + EventResourceModel, + PlanningRelatedEventLink, + PlanningSchedule, +) +from planning.types.event import EmbeddedPlanning +from planning.common import WorkflowStates, get_event_max_multi_day_duration, get_max_recurrent_events +from planning.core.service import BasePlanningAsyncService +from planning.utils import get_planning_event_link_method, get_related_event_ids_for_planning + +from .events_sync import sync_event_metadata_with_planning_items +from .events_utils import generate_recurring_dates, get_events_embedded_planning + + +class EventsAsyncService(BasePlanningAsyncService[EventResourceModel]): + resource_name = "events" + + async def get_expired_items( + self, expiry_datetime: datetime, spiked_events_only: bool = False + ) -> AsyncGenerator[list[dict[str, Any]], None]: + """Get the expired items + + Where end date is in the past + """ + query: dict[str, Any] = { + "query": { + "bool": { + "must_not": [{"term": {"expired": True}}], + "filter": {"range": {"dates.end": {"lte": date_to_str(expiry_datetime)}}}, + }, + }, + "sort": [{"dates.start": "asc"}], + "size": get_max_recurrent_events(), + } + + if spiked_events_only: + del query["query"]["bool"]["must_not"] + query["query"]["bool"]["must"] = [{"term": {"state": WorkflowStates.SPIKED}}] + + total_received = 0 + total_events = -1 + + while True: + query["from"] = total_received + + results = await self.search(query) + items = await results.to_list_raw() + results_count = len(items) + + # If the total_events has not been set, then this is the first query + # In which case we need to store the total hits from the search + if total_events < 0: + total_events = results_count + + # If the search doesn't contain any results, return here + if total_events < 1: + break + + # If the last query doesn't contain any results, return here + if results_count == 0: + break + + total_received += results_count + + # Yield the results for iteration by the callee + yield items + + async def create(self, docs: list[EventResourceModel]): + """ + Extracts out the ``embedded_planning`` before saving the Event(s) + And then uses them to synchronise/process the associated Planning item(s) + """ + + ids = await super().create(docs) + + embedded_planning_lists: list[tuple[EventResourceModel, list[EmbeddedPlanning]]] = [] + + for event in docs: + embedded_planning = get_events_embedded_planning(event) + if len(embedded_planning): + embedded_planning_lists.append((event.to_dict(), embedded_planning)) + + if len(embedded_planning_lists): + for event, embedded_planning in embedded_planning_lists: + sync_event_metadata_with_planning_items(None, event, embedded_planning) + + return ids + + async def on_create(self, docs: list[EventResourceModel]) -> None: + # events generated by recurring rules + generated_events = [] + for event in docs: + # generates an unique id + if not event.guid: + event.guid = generate_guid(type=GUID_NEWSML) + event.id = event.guid + + if not event.language: + try: + event.language = event.languages[0] + except IndexError: + event.language = get_app_config("DEFAULT_LANGUAGE") + + # TODO-ASYNC: consider moving this into base service later + event.original_creator = ObjectId(get_user_id()) or None + + # overwrite expiry date if needed + self._overwrite_event_expiry_date(event) + + # we ignore the 'update_method' on create + if event.update_method: + event.update_method = None + + # remove the 'expired' flag if it is set, as no new Event can be created as expired + if event.expired: + event.expired = False + + self._set_planning_schedule(event) + original_planning_item = event.planning_item + + # validate event + self.validate_event(event) + + # If _created_externally is true, generate_recurring_events is restricted. + if event.dates and event.dates.recurring_rule and not event.dates.recurring_rule._created_externally: + recurring_events = self._generate_recurring_events(event) + generated_events.extend(recurring_events) + + # Set the current Event to the first Event in the new series + # This will make sure the ID of the Event can be used when + # using 'event' from here on, such as when linking to a Planning item + event = recurring_events[0] + + # And set the Planning Item from the original + # (generate_recurring_events removes this field) + event.planning_item = original_planning_item + + if event.state == WorkflowStates.INGESTED: + events_history = get_resource_service("events_history") + events_history.on_item_created([event.to_dict()]) + + if original_planning_item: + self._link_to_planning(event) + del event["_planning_item"] + + if generated_events: + docs.extend(generated_events) + + def validate_event(self, updated_event: EventResourceModel, original_event: EventResourceModel | None = None): + """Validate the event + + @:param dict event: event created or updated + """ + + assert updated_event is not None + + self._validate_multiday_event_duration(updated_event) + self._validate_dates(updated_event, original_event) + self._validate_convert_to_recurring(updated_event, original_event) + self._validate_template(updated_event, original_event) + + # TODO-ASYNC: migrate `sanitize_input_data` to support new models based on pydantic + # this function below allows both Event and Planning items + # sanitize_input_data(updates) + + def _validate_multiday_event_duration(self, event: EventResourceModel): + """Validate that the multiday event duration is not greater than PLANNING_MAX_MULTI_DAY_DURATION + + @:param dict event: event created or updated + """ + max_duration = get_event_max_multi_day_duration() + if not max_duration > 0: + return + + if not event.dates: + return + + assert event.dates.start is not None + assert event.dates.end is not None + + event_duration = event.dates.end - event.dates.start + if event_duration.days > max_duration: + raise SuperdeskApiError(message="Event duration is greater than {} days.".format(max_duration)) + + def _validate_dates(self, updated_event: EventResourceModel, original_event: EventResourceModel | None = None): + """Validate the dates + + @:param dict event: + """ + # TODO-ASYNC: consider/check if these validations should be in the pydantic model + event = updated_event if updated_event.dates or not original_event else original_event + + assert event.dates is not None + + start_date = event.dates.start + end_date = event.dates.end + + if not start_date or not end_date: + raise SuperdeskApiError(message="Event START DATE and END DATE are mandatory.") + + if end_date < start_date: + raise SuperdeskApiError(message="END TIME should be after START TIME") + + if event.dates.recurring_rule and not event.dates.recurring_rule.until and not event.dates.recurring_rule.count: + raise SuperdeskApiError(message="Recurring event should have an end (until or count)") + + def _validate_convert_to_recurring( + self, updated_event: EventResourceModel, original: EventResourceModel | None = None + ): + """Validates if the convert to recurring action is valid. + + :param updates: + :param original: + :return: + """ + if original is None: + return + + if ( + original.lock_action == "convert_recurring" + and updated_event.dates + and updated_event.dates.recurring_rule is None + ): + raise SuperdeskApiError(message="Event recurring rules are mandatory for convert to recurring action.") + + if original.lock_action == "convert_recurring" and original.recurrence_id: + raise SuperdeskApiError(message="Event is already converted to recurring event.") + + @staticmethod + def _validate_template(updated_event: EventResourceModel, original_event: EventResourceModel | None = None): + """Ensures that event template can't be changed + + :param updates: updates to event that should be saved + :type updates: dict + :param original: original event before update + :type original: dict + :return: + """ + if original_event is None: + return + + # we can't change `template` id + if updated_event.template and updated_event.template != original_event.template: + raise SuperdeskApiError.badRequestError( + message="Request is not valid", + payload={"template": "This value can't be changed."}, + ) + + def _set_planning_schedule(self, event: EventResourceModel): + if event.dates and event.dates.start: + event.planning_schedule = [PlanningSchedule(scheduled=event.dates.start)] + + def _overwrite_event_expiry_date(self, event: EventResourceModel): + if event.expiry: + assert event.dates is not None + assert event.dates.end is not None + + expiry_minutes = get_app_config("PLANNING_EXPIRY_MINUTES", None) + event.expiry = event.dates.end + timedelta(minutes=expiry_minutes or 0) + + def set_recurring_mode(self, event: EventResourceModel): + assert event.dates is not None + assert event.dates.recurring_rule is not None + + end_repeat_mode = event.dates.recurring_rule.end_repeat_mode + + if end_repeat_mode == "count": + event.dates.recurring_rule.until = None + elif end_repeat_mode == "until": + event.dates.recurring_rule.count = None + + def _reset_recurring_event_fields(self, event: EventResourceModel): + """ + Reset fields that are not required by the new (recurring) events + """ + fields_to_reset = ["lock_user", "lock_time", "lock_session", "lock_action"] + + for field in fields_to_reset: + setattr(event, field, None) + + def _generate_recurring_events( + self, event: EventResourceModel, recurrence_id: int | None = None + ) -> list[EventResourceModel]: + """ + Generate recurring events based on the recurrence rules of the given event. + + Args: + event (EventResourceModel): The original event used as a template for recurrence. + recurrence_id (int, optional): The ID of the recurrence group. Defaults to None. + + Returns: + list[EventResourceModel]: A list of newly generated recurring events. + """ + assert event.dates is not None + + self.set_recurring_mode(event) + generated_events = [] + + assert event.dates.start is not None + assert event.dates.end is not None + assert event.dates.recurring_rule is not None + + # compute the difference between start and end in the original event + time_delta = event.dates.end - event.dates.start + + max_recurring_events = get_max_recurrent_events() + recurring_dates = generate_recurring_dates( + start=event.dates.start, + tz=pytz.timezone(event.dates.tz or ""), + **event.dates.recurring_rule.to_dict(), + ) + + # for all the dates based on the recurring rules + # set a limit to prevent too many events to be created + for date in itertools.islice(recurring_dates, 0, max_recurring_events): + # prepare data for new recurring event + new_id = generate_guid(type=GUID_NEWSML) + recurring_event_updates = {"dates": dict(start=date, end=(date + time_delta)), "guid": new_id, "id": new_id} + + if not recurrence_id: + recurring_event_updates["recurrence_id"] = new_id + + # reset fields not required by new events + fields_to_reset = [ + "lock_user", + "lock_time", + "lock_session", + "lock_action", + "planning_schedule", + "reschedule_from_schedule", + "planning_item", + "pubstatus", + "reschedule_from", + ] + for field in fields_to_reset: + recurring_event_updates[field] = None + + # let's finally clone the original event & update it with recurring event data + new_event = event.model_copy(update=recurring_event_updates, deep=True) + + # set expiry date + self._overwrite_event_expiry_date(new_event) + self._set_planning_schedule(new_event) + + generated_events.append(new_event) + + return generated_events + + @staticmethod + def _link_to_planning(event: EventResourceModel): + """ + Links an Event to an existing Planning Item + + The Planning item remains locked, it is up to the client to release this lock + after this operation is complete + """ + # TODO-ASYNC: replace when planning service is async + planning_service = get_resource_service("planning") + plan_id = event.planning_item + + planning_item = planning_service.find_one(req=None, _id=plan_id) + + if not planning_item: + raise SuperdeskApiError.badRequestError("Planning item not found") + + updates = {"related_events": planning_item.get("related_events") or []} + event_link_method = get_planning_event_link_method() + link_type: PLANNING_RELATED_EVENT_LINK_TYPE = ( + "primary" + if not len(get_related_event_ids_for_planning(planning_item, "primary")) + and event_link_method in ("one_primary", "one_primary_many_secondary") + else "secondary" + ) + related_planning = PlanningRelatedEventLink(_id=event.id, link_type=link_type) + updates["related_events"].append(related_planning) + + # Add ``recurrence_id`` if the supplied Event is part of a series + if event.recurrence_id: + related_planning["recurrence_id"] = event.recurrence_id + if not planning_item.get("recurrence_id") and link_type == "primary": + updates["recurrence_id"] = event.recurrence_id + + planning_service.validate_on_update(updates, planning_item, get_user()) + planning_service.system_update(plan_id, updates, planning_item) + + app = get_current_app().as_any() + app.on_updated_planning(updates, planning_item) diff --git a/server/planning/events/events_sync/__init__.py b/server/planning/events/events_sync/__init__.py index 50606aae1..ff3173f97 100644 --- a/server/planning/events/events_sync/__init__.py +++ b/server/planning/events/events_sync/__init__.py @@ -8,7 +8,7 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license -from typing import Dict, Optional, List +from typing import Dict, Optional, List, cast from copy import deepcopy import pytz @@ -20,6 +20,7 @@ from planning.common import get_config_event_fields_to_sync_with_planning from planning.content_profiles.utils import AllContentProfileData from planning.utils import get_related_planning_for_events +from planning.types.event import EmbeddedPlanning as EmbeddedPlanningModel, EventResourceModel from .common import VocabsSyncData, SyncItemData, SyncData from .embedded_planning import ( @@ -39,9 +40,21 @@ def get_translated_fields(translations: List[StringFieldTranslation]) -> Dict[st return fields +# TODO-ASYNC: use resource models instead of typed dicts def sync_event_metadata_with_planning_items( - original: Optional[Event], updates: Event, embedded_planning: List[EmbeddedPlanning] + original: Optional[Event], + updates: Event | EventResourceModel, + embedded_planning: list[EmbeddedPlanning] | list[EmbeddedPlanningModel], ): + # TODO-ASYNC: remove these checks after this is migrated + if isinstance(updates, EventResourceModel): + updates = cast(Event, updates.to_dict()) + + embedded_planning = [ + cast(EmbeddedPlanning, obj.to_dict()) if isinstance(obj, EmbeddedPlanningModel) else obj + for obj in embedded_planning + ] + profiles = AllContentProfileData() if original is None: diff --git a/server/planning/events/events_utils.py b/server/planning/events/events_utils.py index 5c10a5ed9..d2d326e36 100644 --- a/server/planning/events/events_utils.py +++ b/server/planning/events/events_utils.py @@ -1,9 +1,22 @@ import re +from planning.types.enums import UpdateMethods import pytz -from typing import Literal + from datetime import datetime, tzinfo +from typing import AsyncGenerator, Any, Tuple, Literal + from dateutil.rrule import rrule, DAILY, WEEKLY, MONTHLY, YEARLY, MO, TU, WE, TH, FR, SA, SU +from superdesk.utc import utcnow +from superdesk.resource_fields import ID_FIELD +from superdesk.metadata.item import GUID_NEWSML +from superdesk.metadata.utils import generate_guid +from superdesk.core.types import SortParam, SortListParam + +from planning.types.event import EmbeddedPlanning, EmbeddedPlanningCoverage +from planning.types import EmbeddedCoverageItem, EventResourceModel +from planning.common import TEMP_ID_PREFIX, WORKFLOW_STATE, get_max_recurrent_events + FrequencyType = Literal["DAILY", "WEEKLY", "MONTHLY", "YEARLY"] @@ -106,3 +119,100 @@ def generate_recurring_dates( return (date.date() for date in dates) else: return (date for date in dates) + + +def get_events_embedded_planning(event: EventResourceModel) -> list[EmbeddedPlanning]: + def _get_coverage_id(coverage: EmbeddedPlanningCoverage) -> str: + if not coverage.coverage_id: + coverage.coverage_id = TEMP_ID_PREFIX + "-" + generate_guid(type=GUID_NEWSML) + return coverage.coverage_id + + return [ + EmbeddedPlanning( + planning_id=planning.planning_id, + update_method=planning.update_method or UpdateMethods.SINGLE, + coverages={_get_coverage_id(coverage): coverage for coverage in planning.coverages}, + ) + for planning in event.embedded_planning + ] + + +async def get_series( + query: dict, sort: SortParam | None = None, max_results: int = 25 +) -> AsyncGenerator[EventResourceModel, None]: + events_service = EventResourceModel.get_service() + page = 1 + + while True: + # Get the results from mongo + results = await events_service.find(req=query, page=page, max_results=max_results, sort=sort, use_mongo=True) + + docs = await results.to_list() + if not docs: + break + + page += 1 + + # Yield the results for iteration by the callee + for doc in docs: + yield doc + + +async def get_recurring_timeline( + selected: dict[str, Any], + spiked: bool = False, + rescheduled: bool = False, + cancelled: bool = False, + postponed: bool = False, +) -> Tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: + """Utility method to get all events in the series + + This splits up the series of events into 3 separate arrays. + Historic: event.dates.start < utcnow() + Past: utcnow() < event.dates.start < selected.dates.start + Future: event.dates.start > selected.dates.start + """ + excluded_states = [] + + if not spiked: + excluded_states.append(WORKFLOW_STATE.SPIKED) + if not rescheduled: + excluded_states.append(WORKFLOW_STATE.RESCHEDULED) + if not cancelled: + excluded_states.append(WORKFLOW_STATE.CANCELLED) + if not postponed: + excluded_states.append(WORKFLOW_STATE.POSTPONED) + + query = { + "$and": [ + {"recurrence_id": selected["recurrence_id"]}, + {"_id": {"$ne": selected[ID_FIELD]}}, + ] + } + + if excluded_states: + query["$and"].append({"state": {"$nin": excluded_states}}) + + sort: SortListParam = [("dates.start", 1)] + max_results = get_max_recurrent_events() + selected_start = selected.get("dates", {}).get("start", utcnow()) + + # Make sure we are working with a datetime instance + if not isinstance(selected_start, datetime): + selected_start = datetime.strptime(selected_start, "%Y-%m-%dT%H:%M:%S%z") + + historic = [] + past = [] + future = [] + + async for event in get_series(query, sort, max_results): + end = event.dates.end if event.dates else None + start = event.dates.start if event.dates else None + if end and end < utcnow(): + historic.append(event.to_dict()) + elif start and start < selected_start: + past.append(event.to_dict()) + elif start and start > selected_start: + future.append(event.to_dict()) + + return historic, past, future diff --git a/server/planning/events/module.py b/server/planning/events/module.py index bc1ca661c..935345445 100644 --- a/server/planning/events/module.py +++ b/server/planning/events/module.py @@ -6,7 +6,7 @@ ) from planning.types import EventResourceModel -from .service import EventsAsyncService +from .events_service import EventsAsyncService events_resource_config = ResourceConfig( name="events", diff --git a/server/planning/events/service.py b/server/planning/events/service.py deleted file mode 100644 index 2287e7df1..000000000 --- a/server/planning/events/service.py +++ /dev/null @@ -1,168 +0,0 @@ -import pytz -import itertools - -from datetime import timedelta -from apps.auth import get_user_id - -from superdesk.core import get_app_config -from superdesk import get_resource_service -from superdesk.core.utils import generate_guid -from superdesk.metadata.item import GUID_NEWSML - -from planning.types import EventResourceModel, PlanningSchedule -from planning.common import WorkflowStates, get_max_recurrent_events -from planning.core.service import BasePlanningAsyncService - -from .events_utils import generate_recurring_dates - - -class EventsAsyncService(BasePlanningAsyncService[EventResourceModel]): - resource_name = "events" - - async def on_create(self, docs: list[EventResourceModel]) -> None: - # events generated by recurring rules - generated_events = [] - for event in docs: - # generates an unique id - if not event.guid: - event.guid = generate_guid(type=GUID_NEWSML) - event.id = event.guid - - if not event.language: - try: - event.language = event.languages[0] - except IndexError: - event.language = get_app_config("DEFAULT_LANGUAGE") - - # TODO-ASYNC: consider moving this into base service later - event.original_creator = get_user_id() or None - - # overwrite expiry date if needed - self._overwrite_event_expiry_date(event) - - # we ignore the 'update_method' on create - if event.update_method: - event.update_method = None - - # remove the 'expired' flag if it is set, as no new Event can be created as expired - if event.expired: - event.expired = None - - self._set_planning_schedule(event) - original_planning_item = event.planning_item - - # validate event - self.validate_event(event) - - # If _created_externally is true, generate_recurring_events is restricted. - if event.dates.recurring_rule and not event.dates.recurring_rule._created_externally: - recurring_events = self._generate_recurring_events(event) - generated_events.extend(recurring_events) - - # Set the current Event to the first Event in the new series - # This will make sure the ID of the Event can be used when - # using 'event' from here on, such as when linking to a Planning item - event = recurring_events[0] - - # And set the Planning Item from the original - # (generate_recurring_events removes this field) - event.planning_item = original_planning_item - - if event.state == WorkflowStates.INGESTED: - events_history = get_resource_service("events_history") - events_history.on_item_created([event]) - - if original_planning_item: - self._link_to_planning(event) - del event["_planning_item"] - - if generated_events: - docs.extend(generated_events) - - def _set_planning_schedule(self, event: EventResourceModel): - if event.dates.start: - event.planning_schedule = [PlanningSchedule(scheduled=event.dates.start)] - - def _overwrite_event_expiry_date(self, event: EventResourceModel): - if event.expiry: - expiry_minutes = get_app_config("PLANNING_EXPIRY_MINUTES", None) - event.expiry = event.dates.end + timedelta(minutes=expiry_minutes or 0) - - def set_recurring_mode(self, event: EventResourceModel): - end_repeat_mode = event.dates.recurring_rule.end_repeat_mode - - if end_repeat_mode == "count": - event.dates.recurring_rule.until = None - elif end_repeat_mode == "until": - event.dates.recurring_rule.count = None - - def _reset_recurring_event_fields(self, event: EventResourceModel): - """ - Reset fields that are not required by the new (recurring) events - """ - fields_to_reset = ["lock_user", "lock_time", "lock_session", "lock_action"] - - for field in fields_to_reset: - setattr(event, field, None) - - def _generate_recurring_events( - self, event: EventResourceModel, recurrence_id: int = None - ) -> list[EventResourceModel]: - """ - Generate recurring events based on the recurrence rules of the given event. - - Args: - event (EventResourceModel): The original event used as a template for recurrence. - recurrence_id (int, optional): The ID of the recurrence group. Defaults to None. - - Returns: - list[EventResourceModel]: A list of newly generated recurring events. - """ - self.set_recurring_mode(event) - generated_events = [] - - # compute the difference between start and end in the original event - time_delta = event.dates.end - event.dates.start - - max_recurring_events = get_max_recurrent_events() - recurring_dates = generate_recurring_dates( - start=event.dates.start, - tz=pytz.timezone(event.dates.tz or None), - **event.dates.recurring_rule, - ) - - # for all the dates based on the recurring rules - # set a limit to prevent too many events to be created - for date in itertools.islice(recurring_dates, 0, max_recurring_events): - # prepare data for new recurring event - new_id = generate_guid(type=GUID_NEWSML) - recurring_event_updates = {"dates": dict(start=date, end=(date + time_delta)), "guid": new_id, "id": new_id} - - if not recurrence_id: - recurring_event_updates["recurrence_id"] = new_id - - # reset fields not required by new events - fields_to_reset = [ - "lock_user", - "lock_time", - "lock_session", - "lock_action", - "planning_schedule", - "reschedule_from_schedule", - "planning_item", - "pubstatus", - "reschedule_from", - ] - for field in fields_to_reset: - recurring_event_updates[field] = None - - # let's finally clone the original event & update it with recurring event data - new_event = event.model_copy(update=recurring_event_updates, deep=True) - - # set expiry date - self._overwrite_event_expiry_date(new_event) - self._set_planning_schedule(new_event) - - generated_events.append(new_event) - - return generated_events diff --git a/server/planning/events/utils.py b/server/planning/events/utils.py deleted file mode 100644 index 75e118253..000000000 --- a/server/planning/events/utils.py +++ /dev/null @@ -1,92 +0,0 @@ -from typing import AsyncGenerator, Any, Tuple -from datetime import datetime - -from planning.common import ( - WORKFLOW_STATE, - get_max_recurrent_events, -) -from planning.types import EventResourceModel -from superdesk.core.types import SortParam, SortListParam -from superdesk.resource_fields import ID_FIELD -from superdesk.utc import utcnow - - -async def get_series( - query: dict, sort: SortParam | None = None, max_results: int = 25 -) -> AsyncGenerator[EventResourceModel, None]: - events_service = EventResourceModel.get_service() - page = 1 - - while True: - # Get the results from mongo - results = await events_service.find(req=query, page=page, max_results=max_results, sort=sort, use_mongo=True) - - docs = await results.to_list() - if not docs: - break - - page += 1 - - # Yield the results for iteration by the callee - for doc in docs: - yield doc - - -async def get_recurring_timeline( - selected: dict[str, Any], - spiked: bool = False, - rescheduled: bool = False, - cancelled: bool = False, - postponed: bool = False, -) -> Tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: - """Utility method to get all events in the series - - This splits up the series of events into 3 separate arrays. - Historic: event.dates.start < utcnow() - Past: utcnow() < event.dates.start < selected.dates.start - Future: event.dates.start > selected.dates.start - """ - excluded_states = [] - - if not spiked: - excluded_states.append(WORKFLOW_STATE.SPIKED) - if not rescheduled: - excluded_states.append(WORKFLOW_STATE.RESCHEDULED) - if not cancelled: - excluded_states.append(WORKFLOW_STATE.CANCELLED) - if not postponed: - excluded_states.append(WORKFLOW_STATE.POSTPONED) - - query = { - "$and": [ - {"recurrence_id": selected["recurrence_id"]}, - {"_id": {"$ne": selected[ID_FIELD]}}, - ] - } - - if excluded_states: - query["$and"].append({"state": {"$nin": excluded_states}}) - - sort: SortListParam = [("dates.start", 1)] - max_results = get_max_recurrent_events() - selected_start = selected.get("dates", {}).get("start", utcnow()) - - # Make sure we are working with a datetime instance - if not isinstance(selected_start, datetime): - selected_start = datetime.strptime(selected_start, "%Y-%m-%dT%H:%M:%S%z") - - historic = [] - past = [] - future = [] - - async for event in get_series(query, sort, max_results): - end = event.dates.end if event.dates else None - start = event.dates.start if event.dates else None - if end and end < utcnow(): - historic.append(event.to_dict()) - elif start and start < selected_start: - past.append(event.to_dict()) - elif start and start > selected_start: - future.append(event.to_dict()) - - return historic, past, future diff --git a/server/planning/tests/__init__.py b/server/planning/tests/__init__.py index f1b97a352..5d9e0fdc9 100644 --- a/server/planning/tests/__init__.py +++ b/server/planning/tests/__init__.py @@ -4,6 +4,7 @@ class TestCase(BaseTestCase): test_context = None # avoid using test_request_context + app_config: dict[str, Any] = { "INSTALLED_APPS": ["planning"], "MODULES": ["planning.module"], diff --git a/server/planning/types/common.py b/server/planning/types/common.py index 5a9244c30..7bae389e2 100644 --- a/server/planning/types/common.py +++ b/server/planning/types/common.py @@ -3,7 +3,7 @@ from typing import Any, Annotated, Literal, TypeAlias from superdesk.utc import utcnow -from superdesk.core.resources import dataclass, fields +from superdesk.core.resources import dataclass, fields, Dataclass from superdesk.core.elastic.mapping import json_schema_to_elastic_mapping from superdesk.core.resources.validators import validate_data_relation_async @@ -63,9 +63,8 @@ class RelationshipItem: related: str | None = None -@dataclass -class PlanningSchedule: - scheduled: date | None = None +class PlanningSchedule(Dataclass): + scheduled: datetime | None = None coverage_id: fields.Keyword | None = None diff --git a/server/planning/types/enums.py b/server/planning/types/enums.py index 043b9fe95..8d82934d3 100644 --- a/server/planning/types/enums.py +++ b/server/planning/types/enums.py @@ -32,9 +32,9 @@ class PostStates(str, Enum): @unique class UpdateMethods(str, Enum): - UPDATE_SINGLE = "single" - UPDATE_FUTURE = "future" - UPDATE_ALL = "all" + SINGLE = "single" + FUTURE = "future" + ALL = "all" @unique diff --git a/server/planning/types/event.py b/server/planning/types/event.py index eb9dd475b..a597692ef 100644 --- a/server/planning/types/event.py +++ b/server/planning/types/event.py @@ -5,7 +5,7 @@ from content_api.items.model import CVItem, Place from superdesk.utc import utcnow -from superdesk.core.resources import fields, dataclass +from superdesk.core.resources import fields, dataclass, Dataclass from superdesk.core.resources.validators import validate_data_relation_async from .base import BasePlanningModel @@ -101,7 +101,7 @@ class EmbeddedPlanningCoverage: @dataclass -class EmbeddedPlanning: +class EmbeddedPlanning(Dataclass): planning_id: Annotated[str, validate_data_relation_async("planning")] update_method: Annotated[UpdateMethods, fields.keyword_mapping()] | None = None coverages: list[EmbeddedPlanningCoverage] | None = Field(default_factory=list) @@ -123,7 +123,7 @@ class RelatedItem: class EventResourceModel(BasePlanningModel, LockFieldsMixin): - guid: fields.Keyword + guid: fields.Keyword | None = None unique_id: int | None = None unique_name: fields.Keyword | None = None version: int | None = None @@ -171,7 +171,7 @@ class EventResourceModel(BasePlanningModel, LockFieldsMixin): priority: int | None = None # NewsML-G2 Event properties See IPTC-G2-Implementation_Guide 15.4.3 - dates: EventDates | None = Field(default_factory=EventDates) + dates: EventDates | None = None # This is an extra field so that we can sort in the combined view of events and planning. # It will store the dates.start of the event. diff --git a/server/planning/types/event_dates.py b/server/planning/types/event_dates.py index a9039ca6e..faeb2c6a7 100644 --- a/server/planning/types/event_dates.py +++ b/server/planning/types/event_dates.py @@ -3,7 +3,7 @@ from pydantic.fields import Field -from superdesk.core.resources import dataclass, fields +from superdesk.core.resources import dataclass, fields, Dataclass # NewsML-G2 Event properties See IPTC-G2-Implementation_Guide 15.4.3 @@ -11,8 +11,7 @@ RepeatModeType: TypeAlias = Literal["count", "until"] -@dataclass -class RecurringRule: +class RecurringRule(Dataclass): frequency: str | None = None interval: int | None = None end_repeat_mode: RepeatModeType | None = Field(default=None, alias="endRepeatMode") @@ -44,8 +43,8 @@ class OccurStatus: label: fields.Keyword | None = None -@dataclass -class EventDates: +class EventDates(Dataclass): + # TODO-ASYNC: double check which ones are required start: datetime | None = None end: datetime | None = None tz: str | None = None @@ -55,7 +54,7 @@ class EventDates: duration: str | None = None confirmation: str | None = None recurring_date: List[date] | None = None - recurring_rule: RecurringRule | None = Field(default_factory=RecurringRule) + recurring_rule: RecurringRule | None = None occur_status: OccurStatus | None = None ex_date: List[date] = Field(default_factory=list) ex_rule: ExRule | None = None