Skip to content

Commit

Permalink
Added types to functions
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianMwangi21 committed Dec 18, 2024
1 parent f978459 commit 6576d9a
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
8 changes: 4 additions & 4 deletions server/planning/events/events_history_async_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ async def on_item_created(self, items: list[EventResourceModel | Any], operation
await super().on_item_created(created_from_planning, "created_from_planning")
await super().on_item_created(regular_events)

async def on_item_deleted(self, doc):
async def on_item_deleted(self, doc: dict[str, Any]):
lookup = {"event_id": doc[ID_FIELD]}
await self.delete_many(lookup=lookup)

async def on_item_updated(self, updates: dict[str, Any], original, operation: str | None = None):
async def on_item_updated(self, updates: dict[str, Any], original: dict[str, Any], operation: str | None = None):
item = deepcopy(original)
if list(item.keys()) == ["_id"]:
diff = self._remove_unwanted_fields(updates)
Expand Down Expand Up @@ -76,8 +76,8 @@ async def _save_history(self, item, update: dict[str, Any], operation: str | Non
history["operation"] = "ingested"
await self.create([history])

async def on_update_repetitions(self, updates: dict[str, Any], event_id, operation: str | None = None):
async def on_update_repetitions(self, updates: dict[str, Any], event_id: str, operation: str | None = None):
await self.on_item_updated(updates, {"_id": event_id}, operation or "update_repetitions")

async def on_update_time(self, updates: dict[str, Any], original):
async def on_update_time(self, updates: dict[str, Any], original: dict[str, Any]):
await self.on_item_updated(updates, original, "update_time")
4 changes: 2 additions & 2 deletions server/planning/history_async_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
class HistoryAsyncService(AsyncResourceService[Generic[HistoryResourceModelType]]):
"""Provide common async methods for tracking history of Creation, Updates and Spiking to collections"""

async def on_item_created(self, items, operation: str | None = None):
async def on_item_created(self, items: list[Any], operation: str | None = None):
for item in items:
if not item.get("duplicate_from"):
await self._save_history(
Expand Down Expand Up @@ -92,7 +92,7 @@ async def get_user_id(self):
if user:
return user.get("_id")

async def _changes(self, original, updates):
async def _changes(self, original: dict[str, Any], updates: dict[str, Any]):
"""
Given the original record and the updates calculate what has changed and what is new
Expand Down
26 changes: 13 additions & 13 deletions server/planning/planning/planning_history_async_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def on_item_created(self, items: list[PlanningResourceModel], operation=No
async def _save_history(self, item, update: dict[str, Any], operation: str | None = None):
user = await self.get_user_id()
# confirmation could be from external fulfillment, so set the user to the assignor
if operation == ASSIGNMENT_HISTORY_ACTIONS.CONFIRM and user is None:
if operation == ASSIGNMENT_HISTORY_ACTIONS.CONFIRM and user is None: # type: ignore
assigned_to = update.get("assigned_to")
if assigned_to is not None:
user = update.get(
Expand All @@ -70,7 +70,7 @@ async def on_item_updated(
if list(item.keys()) == ["_id"]:
diff = self._remove_unwanted_fields(updates)
else:
diff = await self._changes(original, updates)
diff = await self._changes(original.to_dict(), updates)
diff.pop("coverages", None)
if updates:
item.update(updates)
Expand All @@ -87,7 +87,7 @@ async def on_item_updated(

await self._save_history(item, diff, operation)

await self._save_coverage_history(updates, original)
await self._save_coverage_history(updates, original.to_dict())

async def on_cancel(self, updates: dict[str, Any], original):
await self.on_item_updated(
Expand All @@ -96,9 +96,9 @@ async def on_cancel(self, updates: dict[str, Any], original):
"planning_cancel" if original.get("lock_action") in ["planning_cancel", "edit"] else "events_cancel",
)

async def _get_coverage_diff(self, updates: dict[str, Any], original):
async def _get_coverage_diff(self, updates: dict[str, Any], original: dict[str, Any]):
diff = {"coverage_id": original.get("coverage_id")}
cov_plan_diff = await self._changes(original.get("planning"), updates.get("planning"))
cov_plan_diff = await self._changes(original.get("planning", {}), updates.get("planning", {}))

if cov_plan_diff:
diff["planning"] = cov_plan_diff
Expand All @@ -108,7 +108,7 @@ async def _get_coverage_diff(self, updates: dict[str, Any], original):

return diff

async def _save_coverage_history(self, updates: dict[str, Any], original):
async def _save_coverage_history(self, updates: dict[str, Any], original: dict[str, Any]):
"""Save the coverage history for the planning item"""
item = deepcopy(original)
original_coverages = {c.get("coverage_id"): c for c in (original or {}).get("coverages") or []}
Expand All @@ -128,7 +128,7 @@ async def _save_coverage_history(self, updates: dict[str, Any], original):
deleted = [coverage for cid, coverage in original_coverages.items() if cid not in updates_coverages]

for cov in added:
if cov.get("assigned_to", {}).get("state") == ASSIGNMENT_WORKFLOW_STATE.ASSIGNED:
if cov.get("assigned_to", {}).get("state") == ASSIGNMENT_WORKFLOW_STATE.ASSIGNED: # type: ignore
diff = {"coverage_id": cov.get("coverage_id")}
diff.update(cov)
await self._save_history(
Expand All @@ -142,7 +142,7 @@ async def _save_coverage_history(self, updates: dict[str, Any], original):
await self._save_history(item, cov, "coverage_created")

for cov in updated:
original_coverage = original_coverages.get(cov.get("coverage_id"))
original_coverage = original_coverages.get(cov.get("coverage_id"), {})
diff = await self._get_coverage_diff(cov, original_coverage)
if len(diff.keys()) > 1:
await self._save_history(item, diff, "coverage_edited")
Expand All @@ -160,7 +160,7 @@ async def _save_coverage_history(self, updates: dict[str, Any], original):
if not original.get(LOCK_ACTION):
operation = "events_cancel"
elif (
original.get(LOCK_ACTION) == ITEM_ACTIONS.PLANNING_CANCEL
original.get(LOCK_ACTION) == ITEM_ACTIONS.PLANNING_CANCEL # type: ignore
or updates.get("state") == WORKFLOW_STATE.CANCELLED
):
# If cancelled through item action or through editor
Expand All @@ -181,7 +181,7 @@ async def _save_coverage_history(self, updates: dict[str, Any], original):
for cov in deleted:
await self._save_history(item, {"coverage_id": cov.get("coverage_id")}, "coverage_deleted")

async def on_spike(self, updates: dict[str, Any], original):
async def on_spike(self, updates: dict[str, Any], original: PlanningResourceModel):
"""Spike event
On spike of a planning item the history of any agendas that the item belongs to will have an entry added to
Expand All @@ -192,17 +192,17 @@ async def on_spike(self, updates: dict[str, Any], original):
"""
await super().on_spike(updates, original)

async def on_unspike(self, updates: dict[str, Any], original):
async def on_unspike(self, updates: dict[str, Any], original: PlanningResourceModel):
await super().on_unspike(updates, original)

async def on_duplicate(self, parent, duplicate):
async def on_duplicate(self, parent: dict[str, Any], duplicate: dict[str, Any]):
await self._save_history(
{ID_FIELD: str(parent[ID_FIELD])},
{"duplicate_id": str(duplicate[ID_FIELD])},
"duplicate",
)

async def on_duplicate_from(self, item: dict[str, Any], duplicate_id):
async def on_duplicate_from(self, item: dict[str, Any], duplicate_id: str):
new_plan = deepcopy(item)
new_plan["duplicate_id"] = duplicate_id
await self._save_history({ID_FIELD: str(item[ID_FIELD])}, new_plan, "duplicate_from")

0 comments on commit 6576d9a

Please sign in to comment.