From 4778c253a5981558c4702fd8a8b3ae577db22192 Mon Sep 17 00:00:00 2001 From: Helmy Giacoman Date: Sun, 8 Dec 2024 21:39:59 +0100 Subject: [PATCH] Move `on_create`, `on_created`, `on_update` & `on_updated` SDESK-7442 --- .../commands/export_to_newsroom_test.py | 1 - server/planning/events/events.py | 25 +- server/planning/events/events_service.py | 368 +++++++++++++++++- .../planning/events/events_sync/__init__.py | 6 +- .../events/events_sync/embedded_planning.py | 6 +- server/planning/events/events_utils.py | 25 +- server/planning/types/__init__.py | 6 +- server/planning/types/event.py | 4 +- server/planning/types/event_dates.py | 2 +- 9 files changed, 398 insertions(+), 45 deletions(-) diff --git a/server/planning/commands/export_to_newsroom_test.py b/server/planning/commands/export_to_newsroom_test.py index 774f891cb..94366995e 100644 --- a/server/planning/commands/export_to_newsroom_test.py +++ b/server/planning/commands/export_to_newsroom_test.py @@ -157,7 +157,6 @@ async def setup_data(self): "type": "event", }, ] - events = [EventResourceModel.from_dict(ev) for ev in events] planning = [ { diff --git a/server/planning/events/events.py b/server/planning/events/events.py index 49b179278..e1362053f 100644 --- a/server/planning/events/events.py +++ b/server/planning/events/events.py @@ -10,14 +10,15 @@ """Superdesk Events""" -from typing import Dict, Any, Optional, List, Tuple + +import re +import pytz import logging import itertools + from copy import deepcopy from datetime import timedelta - -import pytz -import re +from typing import Dict, Any, Optional, List, Tuple from eve.methods.common import resolve_document_etag from eve.utils import date_to_str from dateutil.rrule import ( @@ -35,9 +36,9 @@ SU, ) +import superdesk from superdesk.core import get_app_config, get_current_app from superdesk.resource_fields import ID_FIELD -import superdesk from superdesk import get_resource_service from superdesk.errors import SuperdeskApiError from superdesk.metadata.utils import generate_guid @@ -48,12 +49,8 @@ from apps.auth import get_user, get_user_id from apps.archive.common import get_auth, update_dates_for -from planning.types import ( - Event, - EmbeddedPlanning, - PlanningRelatedEventLink, - PLANNING_RELATED_EVENT_LINK_TYPE, -) +from planning.types import Event, PlanningRelatedEventLink, PLANNING_RELATED_EVENT_LINK_TYPE +from planning.types.event import EmbeddedPlanning from planning.common import ( UPDATE_SINGLE, UPDATE_FUTURE, @@ -281,9 +278,9 @@ def create(self, docs: List[Event], **kwargs): embedded_planning_lists: List[Tuple[Event, List[EmbeddedPlanning]]] = [] for event in docs: - embedded_planning = get_events_embedded_planning(event) - if len(embedded_planning): - embedded_planning_lists.append((event, embedded_planning)) + emb_planning = get_events_embedded_planning(event) + if len(emb_planning): + embedded_planning_lists.append((event, emb_planning)) # type: ignore ids = self.backend.create(self.datasource, docs, **kwargs) diff --git a/server/planning/events/events_service.py b/server/planning/events/events_service.py index 3896dda13..1e32fa0cb 100644 --- a/server/planning/events/events_service.py +++ b/server/planning/events/events_service.py @@ -1,14 +1,20 @@ +from planning.events.events_base_service import EventsBaseService import pytz import itertools +from copy import deepcopy from bson import ObjectId -from typing import Any, AsyncGenerator +from typing import Any, AsyncGenerator, cast from datetime import datetime, timedelta from apps.auth import get_user, get_user_id +import superdesk +from superdesk.utc import utcnow from superdesk import get_resource_service +from superdesk.resource_fields import ID_FIELD from superdesk.errors import SuperdeskApiError from superdesk.metadata.item import GUID_NEWSML +from superdesk.notification import push_notification from superdesk.core import get_app_config, get_current_app from superdesk.core.utils import date_to_str, generate_guid @@ -17,11 +23,25 @@ EventResourceModel, PlanningRelatedEventLink, PlanningSchedule, + PostStates, + UpdateMethods, + WorkflowState, ) from planning.types.event import EmbeddedPlanning -from planning.common import WorkflowStates, get_event_max_multi_day_duration, get_max_recurrent_events +from planning.common import ( + WorkflowStates, + get_event_max_multi_day_duration, + get_max_recurrent_events, + remove_lock_information, + set_ingested_event_state, + post_required, +) from planning.core.service import BasePlanningAsyncService -from planning.utils import get_planning_event_link_method, get_related_event_ids_for_planning +from planning.utils import ( + get_planning_event_link_method, + get_related_event_ids_for_planning, + get_related_planning_for_events, +) from .events_sync import sync_event_metadata_with_planning_items from .events_utils import generate_recurring_dates, get_events_embedded_planning @@ -86,6 +106,7 @@ async def create(self, docs: list[EventResourceModel]): And then uses them to synchronise/process the associated Planning item(s) """ + docs = await self._convert_dicts_to_model(docs) ids = await super().create(docs) embedded_planning_lists: list[tuple[EventResourceModel, list[EmbeddedPlanning]]] = [] @@ -130,7 +151,7 @@ async def on_create(self, docs: list[EventResourceModel]) -> None: if event.expired: event.expired = False - self._set_planning_schedule(event) + event.planning_schedule = self._create_planning_schedule(event) original_planning_item = event.planning_item # validate event @@ -161,13 +182,123 @@ async def on_create(self, docs: list[EventResourceModel]) -> None: if generated_events: docs.extend(generated_events) - def validate_event(self, updated_event: EventResourceModel, original_event: EventResourceModel | None = None): - """Validate the event + await super().on_create(docs) - @:param dict event: event created or updated + async def on_created(self, docs: list[EventResourceModel]): + """Send WebSocket Notifications for created Events + + Generate the list of IDs for recurring and non-recurring events + Then send this list off to the clients so they can fetch these events + """ + notifications_sent = [] + history_service = get_resource_service("events_history") + + for doc in docs: + event_id = doc.id + + # If we duplicated this event, update the history + if doc.duplicate_from: + parent_id = doc.duplicate_from + parent_event = await self.find_by_id(parent_id) + + assert parent_event is not None + + history_service.on_item_updated({"duplicate_id": event_id}, parent_event.to_dict(), "duplicate") + history_service.on_item_updated({"duplicate_id": parent_id}, doc.to_dict(), "duplicate_from") + + duplicate_ids = parent_event.duplicate_to or [] + duplicate_ids.append(event_id) + + await super().update(parent_id, {"duplicate_to": duplicate_ids}) + + event_type = "events:created" + user_id = doc.original_creator or "" + + if doc.recurrence_id: + event_type = "events:created:recurring" + event_id = str(doc.recurrence_id) + + # Don't send notification if one has already been sent + # This is to ensure recurring events don't send multiple notifications + if event_id in notifications_sent or doc.previous_recurrence_id: + continue + + notifications_sent.append(event_id) + push_notification(event_type, item=event_id, user=user_id) + + async def on_update(self, updates: dict[str, Any], original: EventResourceModel): + """Update single or series of recurring events. + + Determine if the supplied event is a single event or a + series of recurring events, and call the appropriate method + for the event type. """ + if "skip_on_update" in updates: + # this is a recursive update (see below) + del updates["skip_on_update"] + return + + update_method = updates.pop("update_method", UpdateMethods.SINGLE) + + user = get_user() + user_id = user.get(ID_FIELD) if user else None + + if user_id: + updates["version_creator"] = user_id + set_ingested_event_state(updates, original.to_dict()) + + lock_user = original.lock_user or None + str_user_id = str(user.get(ID_FIELD)) if user_id else None + + if lock_user and str(lock_user) != str_user_id: + raise SuperdeskApiError.forbiddenError("The item was locked by another user") + + # If only the `recurring_rule` was provided, then fill in the rest from the original + # This can happen, for example, when converting a single Event to a series of Recurring Events + if list(updates.get("dates") or {}) == ["recurring_rule"]: + new_dates = original.to_dict()["dates"] + new_dates.update(updates["dates"]) + updates["dates"] = new_dates + + # validate event + self.validate_event(updates, original) + + # Run the specific methods based on if the original is a single or a series of recurring events + if not getattr((original.dates or {}), "recurring_rule") or update_method == UpdateMethods.SINGLE: + await self._update_single_event(updates, original) + else: + await self._update_recurring_events(updates, original, update_method) + + return await super().on_update(updates, original) + + async def update(self, event_id: str | ObjectId, updates: dict[str, Any], etag: str | None = None): + """Updates the event and also extracts out the ``embedded_planning`` before saving the Event + And then uses them to synchronise/process the associated Planning item(s) + """ + + updates.setdefault("versioncreated", utcnow()) + original_event = await self.find_by_id(event_id) + + if original_event is None: + raise SuperdeskApiError.badRequestError("Event not found") - assert updated_event is not None + # Extract the ``embedded_planning`` from the updates + embedded_planning = get_events_embedded_planning(updates) + + await super().update(event_id, updates, etag) + + # Process ``embedded_planning`` field, and sync Event metadata with associated Planning/Coverages + sync_event_metadata_with_planning_items(original_event.to_dict(), updates, embedded_planning) + + def validate_event( + self, updated_event: dict[str, Any] | EventResourceModel, original_event: EventResourceModel | None = None + ): + """Validate the event""" + + if isinstance(updated_event, dict): + updated_event = EventResourceModel.from_dict(updated_event) + # mypy complains even when `from_dict` returns a model instance + updated_event = cast(EventResourceModel, updated_event) self._validate_multiday_event_duration(updated_event) self._validate_dates(updated_event, original_event) @@ -220,7 +351,7 @@ def _validate_dates(self, updated_event: EventResourceModel, original_event: Eve raise SuperdeskApiError(message="Recurring event should have an end (until or count)") def _validate_convert_to_recurring( - self, updated_event: EventResourceModel, original: EventResourceModel | None = None + self, updated_event: dict[str, Any] | EventResourceModel, original: EventResourceModel | None = None ): """Validates if the convert to recurring action is valid. @@ -231,6 +362,10 @@ def _validate_convert_to_recurring( if original is None: return + if isinstance(updated_event, dict): + updated_event = EventResourceModel.from_dict(updated_event) + updated_event = cast(EventResourceModel, updated_event) + if ( original.lock_action == "convert_recurring" and updated_event.dates @@ -261,10 +396,225 @@ def _validate_template(updated_event: EventResourceModel, original_event: EventR payload={"template": "This value can't be changed."}, ) + async def _update_single_event(self, updates: dict[str, Any], original: EventResourceModel): + """Updates the metadata of a single event. + + If recurring_rule is provided, we convert this single event into + a series of recurring events, otherwise we simply update this event. + """ + + if post_required(updates, original.to_dict()): + merged: EventResourceModel = original.model_copy(updates, deep=True) + + # TODO-ASYNC: replace when `event_post` is async + get_resource_service("events_post").validate_item(merged.to_dict()) + + # Determine if we're to convert this single event to a recurring of events + if ( + original.lock_action == "convert_recurring" + and updates.get("dates", {}).get("recurring_rule", None) is not None + ): + generated_events = await self._convert_to_recurring_event(updates, original) + + # if the original event was "posted" then post all the generated events + # if original.get("pubstatus") in [ POST_STATE.CANCELLED, POST_STATE.USABLE]: + if original.pubstatus in [PostStates.CANCELLED, PostStates.USABLE]: + post = { + "event": generated_events[0].id, + "etag": generated_events[0].etag, + "update_method": "all", + "pubstatus": original.pubstatus, + } + + # TODO-ASYNC: replace when `event_post` is async + get_resource_service("events_post").post([post]) + + push_notification( + "events:updated:recurring", + item=str(original.id), + user=str(updates.get("version_creator", "")), + recurrence_id=str(generated_events[0].recurrence_id), + ) + else: + if original.lock_action == "mark_completed" and updates.get("actioned_date"): + self.mark_event_complete(updates, original, False) + + # This updates Event metadata only + push_notification( + "events:updated", + item=str(original.id), + user=str(updates.get("version_creator", "")), + ) + + async def _update_recurring_events( + self, updates: dict[str, Any], original: EventResourceModel, update_method: UpdateMethods + ): + """Method to update recurring events. + + If the recurring_rule has been removed for this event, process + it separately, otherwise update the event and/or its recurring rules + """ + # This method now only handles updating of Event metadata + # So make sure to remove any date information that might be in + # the updates + updates.pop("dates", None) + original_as_dict = original.to_dict() + + if update_method == UpdateMethods.FUTURE: + historic, past, future = self._get_recurring_timeline(original_as_dict) + events = future + else: + historic, past, future = self._get_recurring_timeline(original_as_dict) + events = historic + past + future + + events_post_service = get_resource_service("events_post") + + # First we want to validate that all events can be posted + for e in events: + if post_required(updates, e): + merged = deepcopy(e) + merged.update(updates) + events_post_service.validate_item(merged) + + # If this update is from assignToCalendar action + # Then we only want to update the calendars of each Event + only_calendars = original.lock_action == "assign_calendar" + original_calendar_qcodes = [calendar.qcode for calendar in original.calendars] + + # Get the list of calendars added + updated_calendars = [ + calendar for calendar in updates.get("calendars") or [] if calendar["qcode"] not in original_calendar_qcodes + ] + + mark_completed = original.lock_action == "mark_completed" and updates.get("actioned_date") + mark_complete_validated = False + for e in events: + event_id = e[ID_FIELD] + + new_updates = deepcopy(updates) + new_updates["skip_on_update"] = True + new_updates[ID_FIELD] = event_id + + if only_calendars: + # Get the original for this item, and add new calendars to it + # Skipping calendars already assigned to this item + original_event: EventResourceModel = await self.find_by_id(event_id) + assert original_event is not None + original_qcodes = [calendar.qcode for calendar in original_event.calendars] + + new_updates["calendars"] = deepcopy(original_event.calendars) + new_updates["calendars"].extend( + [calendar for calendar in updated_calendars if calendar["qcode"] not in original_qcodes] + ) + elif mark_completed: + ev = EventResourceModel.from_dict(e) + self.mark_event_complete(updates, ev, mark_complete_validated) + # It is validated if the previous funciton did not raise an error + mark_complete_validated = True + + # Remove ``embedded_planning`` before updating this event, as this should only be handled + # by the event provided to this update request + new_updates.pop("embedded_planning", None) + + app = get_current_app().as_any() + app.on_updated_events(new_updates, {"_id": event_id}) + + # And finally push a notification to connected clients + push_notification( + "events:updated:recurring", + item=str(original[ID_FIELD]), + recurrence_id=str(original.recurrence_id), + user=str(updates.get("version_creator", "")), + ) + + def _get_recurring_timeline(self, selected: dict[str, Any], spiked: bool = False): + # TODO-ASYNC: replace with an async service + events_base_service = EventsBaseService("events", backend=superdesk.get_backend()) + return events_base_service.get_recurring_timeline(selected, postponed=True, spiked=spiked) + + def mark_event_complete(self, updates: dict[str, Any], event: EventResourceModel, mark_complete_validated: bool): + assert event.dates is not None + assert event.dates.start is not None + + # If the entire series is in future, raise an error + if event.recurrence_id: + if not mark_complete_validated: + if event.dates.start.date() > updates["actioned_date"].date(): + raise SuperdeskApiError.badRequestError("Recurring series has not started.") + + # If we are marking an event as completed + # Update only those which are behind the 'actioned_date' + if event.dates.start < updates["actioned_date"]: + return + + for plan in get_related_planning_for_events([event.id], "primary"): + if plan.get("state") != WorkflowState.CANCELLED and len(plan.get("coverages", [])) > 0: + # TODO-ASYNC: replace when `planning_cancel` is async + get_resource_service("planning_cancel").patch( + plan[ID_FIELD], + { + "reason": "Event Completed", + "cancel_all_coverage": True, + }, + ) + + async def _convert_to_recurring_event(self, updates: dict[str, Any], original: EventResourceModel): + """Convert a single event to a series of recurring events""" + + self._validate_convert_to_recurring(updates, original) + updates["recurrence_id"] = original.id + + merged: EventResourceModel = original.model_copy(updates, deep=True) + + # Generated new events will be "draft" + merged.state = WorkflowState.DRAFT + generated_events = self._generate_recurring_events(merged, updates["recurrence_id"]) + updated_event = generated_events.pop(0) + + assert updated_event.dates is not None + assert updated_event.dates.start is not None + assert original.dates is not None + assert original.dates.start is not None + + # Check to see if the first generated event is different from original + # If yes, mark original as rescheduled with generated recurrence_id + if updated_event.dates.start.date() != original.dates.start.date(): + # Reschedule original event + updates["update_method"] = UpdateMethods.SINGLE + updates["dates"] = updated_event.dates + updates["_planning_schedule"] = [x.to_dict() for x in self._create_planning_schedule(updated_event)] + + event_reschedule_service = get_resource_service("events_reschedule") + event_reschedule_service.update_single_event(updates, original) + + if updates.get("state") == WorkflowState.RESCHEDULED: + history_service = get_resource_service("events_history") + history_service.on_reschedule(updates, original.to_dict()) + else: + # Original event falls as a part of the series + # Remove the first element in the list (the current event being updated) + # And update the start/end dates to be in line with the new recurring rules + updates["dates"]["start"] = updated_event.dates.start + updates["dates"]["end"] = updated_event.dates.end + updates["_planning_schedule"] = [x.to_dict() for x in self._create_planning_schedule(updated_event)] + remove_lock_information(item=updates) + + # Create the new events and generate their history + await self.create(generated_events) + app = get_current_app().as_any() + app.on_inserted_events(generated_events) + + return generated_events + def _set_planning_schedule(self, event: EventResourceModel): if event.dates and event.dates.start: event.planning_schedule = [PlanningSchedule(scheduled=event.dates.start)] + def _create_planning_schedule(self, event: EventResourceModel) -> list[PlanningSchedule]: + if event.dates and event.dates.start: + return [PlanningSchedule(scheduled=event.dates.start)] + return [] + def _overwrite_event_expiry_date(self, event: EventResourceModel): if event.expiry: assert event.dates is not None diff --git a/server/planning/events/events_sync/__init__.py b/server/planning/events/events_sync/__init__.py index ff3173f97..d2b405d95 100644 --- a/server/planning/events/events_sync/__init__.py +++ b/server/planning/events/events_sync/__init__.py @@ -16,7 +16,7 @@ from superdesk import get_resource_service -from planning.types import Event, EmbeddedPlanning, StringFieldTranslation +from planning.types import Event, EmbeddedPlanningDict, StringFieldTranslation from planning.common import get_config_event_fields_to_sync_with_planning from planning.content_profiles.utils import AllContentProfileData from planning.utils import get_related_planning_for_events @@ -44,14 +44,14 @@ def get_translated_fields(translations: List[StringFieldTranslation]) -> Dict[st def sync_event_metadata_with_planning_items( original: Optional[Event], updates: Event | EventResourceModel, - embedded_planning: list[EmbeddedPlanning] | list[EmbeddedPlanningModel], + embedded_planning: list[EmbeddedPlanningDict] | list[EmbeddedPlanningModel], ): # TODO-ASYNC: remove these checks after this is migrated if isinstance(updates, EventResourceModel): updates = cast(Event, updates.to_dict()) embedded_planning = [ - cast(EmbeddedPlanning, obj.to_dict()) if isinstance(obj, EmbeddedPlanningModel) else obj + cast(EmbeddedPlanningDict, obj.to_dict()) if isinstance(obj, EmbeddedPlanningModel) else obj for obj in embedded_planning ] diff --git a/server/planning/events/events_sync/embedded_planning.py b/server/planning/events/events_sync/embedded_planning.py index 4a4447e80..7583e9d6b 100644 --- a/server/planning/events/events_sync/embedded_planning.py +++ b/server/planning/events/events_sync/embedded_planning.py @@ -16,7 +16,7 @@ from planning.types import ( Event, - EmbeddedPlanning, + EmbeddedPlanningDict, EmbeddedCoverageItem, Planning, Coverage, @@ -35,7 +35,7 @@ def create_new_plannings_from_embedded_planning( event: Event, event_translations: Dict[str, Dict[str, str]], - embedded_planning: List[EmbeddedPlanning], + embedded_planning: List[EmbeddedPlanningDict], profiles: AllContentProfileData, vocabs: VocabsSyncData, ): @@ -240,7 +240,7 @@ def create_new_coverage_from_event_and_planning( def get_existing_plannings_from_embedded_planning( event: Event, event_translations: Dict[str, Dict[str, str]], - embedded_planning: List[EmbeddedPlanning], + embedded_planning: List[EmbeddedPlanningDict], profiles: AllContentProfileData, vocabs: VocabsSyncData, ) -> Iterator[Tuple[Planning, Planning, bool]]: diff --git a/server/planning/events/events_utils.py b/server/planning/events/events_utils.py index d2d326e36..1ff938289 100644 --- a/server/planning/events/events_utils.py +++ b/server/planning/events/events_utils.py @@ -1,9 +1,8 @@ import re -from planning.types.enums import UpdateMethods import pytz -from datetime import datetime, tzinfo -from typing import AsyncGenerator, Any, Tuple, Literal +from datetime import date, datetime +from typing import AsyncGenerator, Any, Generator, Tuple, Literal, cast, overload from dateutil.rrule import rrule, DAILY, WEEKLY, MONTHLY, YEARLY, MO, TU, WE, TH, FR, SA, SU @@ -13,8 +12,8 @@ from superdesk.metadata.utils import generate_guid from superdesk.core.types import SortParam, SortListParam +from planning.types import EventResourceModel, UpdateMethods from planning.types.event import EmbeddedPlanning, EmbeddedPlanningCoverage -from planning.types import EmbeddedCoverageItem, EventResourceModel from planning.common import TEMP_ID_PREFIX, WORKFLOW_STATE, get_max_recurrent_events @@ -42,12 +41,12 @@ def generate_recurring_dates( start: datetime, frequency: FrequencyType, interval: int = 1, - until: datetime = None, - byday: str = None, + until: datetime | None = None, + byday: str | None = None, count: int = 5, - tz: tzinfo = None, + tz: pytz.BaseTzInfo | None = None, date_only: bool = False, -) -> list[datetime]: +) -> Generator[datetime | date]: """ Returns list of dates related to recurring rules @@ -58,7 +57,7 @@ def generate_recurring_dates( :param until datetime: date after which the recurrence rule expires :param byday str or list: "MO TU" :param count int: number of occurrences of the rule - :return list: list of datetime + :return Generator: list of datetime """ # if tz is given, respect the timezone by starting from the local time @@ -90,7 +89,7 @@ def generate_recurring_dates( day_of_month = int(byday[:1]) day_of_week = byday[1:] - byweekday = DAYS.get(day_of_week)(day_of_month) + byweekday = DAYS.get(day_of_week)(day_of_month) # type: ignore[misc] else: # byday uses DAYS constants byweekday = byday and [DAYS.get(d) for d in byday.split()] or None @@ -121,7 +120,11 @@ def generate_recurring_dates( return (date for date in dates) -def get_events_embedded_planning(event: EventResourceModel) -> list[EmbeddedPlanning]: +def get_events_embedded_planning(event: dict[str, Any] | EventResourceModel) -> list[EmbeddedPlanning]: + if isinstance(event, dict): + event = EventResourceModel.from_dict(event) + event = cast(EventResourceModel, event) + def _get_coverage_id(coverage: EmbeddedPlanningCoverage) -> str: if not coverage.coverage_id: coverage.coverage_id = TEMP_ID_PREFIX + "-" + generate_guid(type=GUID_NEWSML) diff --git a/server/planning/types/__init__.py b/server/planning/types/__init__.py index ab2d5f748..11e73fb32 100644 --- a/server/planning/types/__init__.py +++ b/server/planning/types/__init__.py @@ -19,6 +19,7 @@ from .planning import PlanningResourceModel from .assignment import AssignmentResourceModel from .published import PublishedPlanningModel +from .enums import PostStates, UpdateMethods, WorkflowState __all__ = [ @@ -28,6 +29,9 @@ "AssignmentResourceModel", "PublishedPlanningModel", "PlanningSchedule", + "PostStates", + "UpdateMethods", + "WorkflowState", ] @@ -58,7 +62,7 @@ class EmbeddedCoverageItem(TypedDict, total=False): priority: int -class EmbeddedPlanning(TypedDict, total=False): +class EmbeddedPlanningDict(TypedDict, total=False): planning_id: str update_method: UPDATE_METHOD coverages: Dict[str, EmbeddedCoverageItem] diff --git a/server/planning/types/event.py b/server/planning/types/event.py index a597692ef..ded74ed7c 100644 --- a/server/planning/types/event.py +++ b/server/planning/types/event.py @@ -104,7 +104,7 @@ class EmbeddedPlanningCoverage: class EmbeddedPlanning(Dataclass): planning_id: Annotated[str, validate_data_relation_async("planning")] update_method: Annotated[UpdateMethods, fields.keyword_mapping()] | None = None - coverages: list[EmbeddedPlanningCoverage] | None = Field(default_factory=list) + coverages: list[EmbeddedPlanningCoverage] = Field(default_factory=list) @dataclass @@ -214,7 +214,7 @@ class EventResourceModel(BasePlanningModel, LockFieldsMixin): item_type: Annotated[fields.Keyword, Field(alias="type")] = "event" # Named Calendars - calendars: list[KeywordQCodeName] | None = None + calendars: list[KeywordQCodeName] = Field(default_factory=list) # The previous state the item was in before for example being spiked, # when un-spiked it will revert to this state diff --git a/server/planning/types/event_dates.py b/server/planning/types/event_dates.py index faeb2c6a7..a68267363 100644 --- a/server/planning/types/event_dates.py +++ b/server/planning/types/event_dates.py @@ -44,7 +44,7 @@ class OccurStatus: class EventDates(Dataclass): - # TODO-ASYNC: double check which ones are required + # TODO-ASYNC: double check which ones are mandatory start: datetime | None = None end: datetime | None = None tz: str | None = None