Skip to content

Commit

Permalink
[SDESK-7163] improve: Use expiry datetime for purge_expired_locks com…
Browse files Browse the repository at this point in the history
…mand
  • Loading branch information
MarkLark86 committed Mar 7, 2024
1 parent 875f1cb commit a473ee0
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 36 deletions.
10 changes: 5 additions & 5 deletions server/planning/assignments/assignments.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def assignee_details_changed(self, updates: Dict[str, Any], original: Dict[str,
return False

def system_update(self, id, updates, original, **kwargs):
super().system_update(id, updates, original, **kwargs)
rtn = super().system_update(id, updates, original, **kwargs)
if self.is_assignment_being_activated(updates, original):
doc = deepcopy(original)
doc.update(updates)
Expand All @@ -310,6 +310,7 @@ def system_update(self, id, updates, original, **kwargs):
and updates.get("assigned_to").get("state") != ASSIGNMENT_WORKFLOW_STATE.CANCELLED
):
app.on_updated_assignments(updates, original)
return rtn

def is_assignment_modified(self, updates, original):
"""Checks whether the assignment is modified or not"""
Expand Down Expand Up @@ -1247,10 +1248,9 @@ def is_assignment_draft(self, updates, original):
return updates.get("assigned_to", original.get("assigned_to")).get("state") == ASSIGNMENT_WORKFLOW_STATE.DRAFT

def is_assignment_being_activated(self, updates, original):
return (
original.get("assigned_to").get("state") == ASSIGNMENT_WORKFLOW_STATE.DRAFT
and updates.get("assigned_to", {}).get("state") == ASSIGNMENT_WORKFLOW_STATE.ASSIGNED
)
return (original.get("assigned_to") or {}).get("state") == ASSIGNMENT_WORKFLOW_STATE.DRAFT and (
updates.get("assigned_to") or {}
).get("state") == ASSIGNMENT_WORKFLOW_STATE.ASSIGNED

def is_text_assignment(self, assignment):
# scheduled_update is always for text coverages
Expand Down
63 changes: 33 additions & 30 deletions server/planning/commands/purge_expired_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# at https://www.sourcefabric.org/superdesk/license

import logging
from datetime import timedelta

from flask import current_app as app
from eve.utils import date_to_str
Expand All @@ -18,6 +19,7 @@
from superdesk.lock import lock, unlock
from superdesk.celery_task_utils import get_lock_id
from planning.item_lock import LOCK_ACTION, LOCK_SESSION, LOCK_TIME, LOCK_USER
from planning.utils import try_cast_object_id

logger = logging.getLogger(__name__)

Expand All @@ -26,36 +28,51 @@ class PurgeExpiredLocks(Command):
"""
Purge item locks that are linked to a non-existing session
resource: The name of the resource to purge item locks for
--resource, -r: The name of the resource to purge item locks for
--expire-hours, -e: Purges locks that are older than this many hours
Example:
::
$ python manage.py planning:purge_expired_locks -r events
$ python manage.py planning:purge_expired_locks -r planning
$ python manage.py planning:purge_expired_locks -r assignments
$ python manage.py planning:purge_expired_locks -r all
$ python manage.py planning:purge_expired_locks -r all -e 48
"""

option_list = [Option("--resource", "-r", required=True)]
option_list = [
Option("--resource", "-r", required=True),
Option("--expire-hours", "-e", dest="expire_hours", required=False, type=int, default=24),
]

def run(self, resource: str):
def run(self, resource: str, expire_hours: int = 24) -> None:
logger.info("Starting to purge expired item locks")

if resource == "all":
resources = ["events", "planning", "assignments"]
elif resource not in ["events", "planning", "assignments"]:
raise ValueError(f"Invalid resource: {resource}")
else:
resources = [resource]

lock_name = get_lock_id("purge_expired_locks", resource)
if not lock(lock_name, expire=600):
logger.info("purge expired locks task is already running")
return

try:
self._purge_item_locks(resource)
except Exception as err:
logger.exception(f"Failed to purge item locks ({err})")
finally:
unlock(lock_name)
expiry_datetime = date_to_str(utcnow() - timedelta(hours=expire_hours))
for resource_name in resources:
try:
self._purge_item_locks(resource_name, expiry_datetime)
except Exception as err:
logger.exception(f"Failed to purge item locks ({err})")

unlock(lock_name)
logger.info("Completed purging expired item locks")

def _purge_item_locks(self, resource: str):
def _purge_item_locks(self, resource: str, expiry_datetime: str):
logger.info(f"Purging expired locks for {resource}")
resource_service = get_resource_service(resource)
try:
autosave_service = get_resource_service(
Expand All @@ -64,11 +81,11 @@ def _purge_item_locks(self, resource: str):
except KeyError:
autosave_service = None

for items in self.get_locked_items(resource):
for items in self.get_locked_items(resource, expiry_datetime):
failed_ids = []
for item in items:
try:
item_id = item["_id"]
item_id = try_cast_object_id(item["_id"])
except KeyError:
logger.exception("Item ID not found, unable to purge its lock")
continue
Expand Down Expand Up @@ -103,29 +120,15 @@ def _purge_item_locks(self, resource: str):
num_items = len(items)
num_success = num_items - len(failed_ids)
if num_success != num_items:
logger.warning(f"{num_success}/{num_items} item locks purged. Failed IDs: {failed_ids}")
logger.warning(f"{num_success}/{num_items} {resource} locks purged. Failed IDs: {failed_ids}")
else:
logger.info(f"{num_items} item locks purged")
logger.info(f"{num_items} {resource} locks purged")

def get_locked_items(self, resource: str):
now = utcnow()
active_sessions = [str(session["_id"]) for session in get_resource_service("auth").get(req=None, lookup={})]
def get_locked_items(self, resource: str, expiry_datetime: str):
service = get_resource_service(resource)
total_received = 0
query = {
"query": {
"bool": {
"filter": [
{"exists": {"field": LOCK_SESSION}},
# Use a range filter for lock time, so if this task takes a while
# it will exclude any newer item locks and/or sessions
{"range": {LOCK_TIME: {"lt": date_to_str(now)}}},
],
"must_not": [
{"terms": {LOCK_SESSION: active_sessions}},
],
},
},
"query": {"bool": {"filter": [{"range": {LOCK_TIME: {"lt": expiry_datetime}}}]}},
"size": app.config["MAX_EXPIRY_QUERY_LIMIT"],
"sort": [{LOCK_TIME: "asc"}],
}
Expand Down
182 changes: 182 additions & 0 deletions server/planning/commands/purge_expired_locks_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# -*- coding: utf-8; -*-
#
# This file is part of Superdesk.
#
# Copyright 2024 Sourcefabric z.u. and contributors.
#
# For the full copyright and license information, please see the
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

from typing import List, Tuple, Union
from datetime import timedelta
from bson import ObjectId

from superdesk.utc import utcnow
from planning.tests import TestCase

from .purge_expired_locks import PurgeExpiredLocks

now = utcnow()
assignment_1_id = ObjectId()
assignment_2_id = ObjectId()


# TODO: Add Assignments
class PurgeExpiredLocksTest(TestCase):
def setUp(self) -> None:
super().setUp()
self.app.data.insert(
"events",
[
{
"_id": "active_event_1",
"dates": {"start": now, "end": now + timedelta(days=1)},
"lock_user": "user1",
"lock_session": "session1",
"lock_time": now - timedelta(hours=23),
"lock_action": "edit",
},
{
"_id": "expired_event_1",
"dates": {"start": now, "end": now + timedelta(days=1)},
"lock_user": "user2",
"lock_session": "session2",
"lock_time": now - timedelta(hours=25),
"lock_action": "edit",
},
],
)
self.app.data.insert(
"planning",
[
{
"_id": "active_plan_1",
"planning_date": now,
"lock_user": "user3",
"lock_session": "session3",
"lock_time": now - timedelta(hours=23),
"lock_action": "edit",
},
{
"_id": "expired_plan_1",
"planning_date": now,
"lock_user": "user4",
"lock_session": "session4",
"lock_time": now - timedelta(hours=25),
"lock_action": "edit",
},
],
)
self.app.data.insert(
"assignments",
[
{
"_id": assignment_1_id,
"lock_user": "user5",
"lock_session": "session5",
"lock_time": now - timedelta(hours=23),
"lock_action": "edit",
},
{
"_id": assignment_2_id,
"lock_user": "user6",
"lock_session": "session6",
"lock_time": now - timedelta(hours=25),
"lock_action": "edit",
},
],
)
self.assertLockState(
[
("events", "active_event_1", True),
("events", "expired_event_1", True),
("planning", "active_plan_1", True),
("planning", "expired_plan_1", True),
("assignments", assignment_1_id, True),
("assignments", assignment_2_id, True),
]
)

def test_invalid_resource(self):
with self.assertRaises(ValueError):
PurgeExpiredLocks().run("blah")

def assertLockState(self, item_tests: List[Tuple[str, Union[str, ObjectId], bool]]):
for resource, item_id, is_locked in item_tests:
item = self.app.data.find_one(resource, req=None, _id=item_id)
if is_locked:
self.assertIsNotNone(item["lock_user"], f"{resource} item {item_id} is NOT locked, item={item}")
self.assertIsNotNone(item["lock_session"], f"{resource} item {item_id} is NOT locked, item={item}")
self.assertIsNotNone(item["lock_time"], f"{resource} item {item_id} is NOT locked, item={item}")
self.assertIsNotNone(item["lock_action"], f"{resource} item {item_id} is NOT locked, item={item}")
else:
self.assertIsNone(item.get("lock_user"), f"{resource} item {item_id} is locked, item={item}")
self.assertIsNone(item.get("lock_session"), f"{resource} item {item_id} is locked, item={item}")
self.assertIsNone(item.get("lock_time"), f"{resource} item {item_id} is locked, item={item}")
self.assertIsNone(item.get("lock_action"), f"{resource} item {item_id} is locked, item={item}")

def test_purge_event_locks(self):
PurgeExpiredLocks().run("events")
self.assertLockState(
[
("events", "active_event_1", True),
("events", "expired_event_1", False),
("planning", "active_plan_1", True),
("planning", "expired_plan_1", True),
("assignments", assignment_1_id, True),
("assignments", assignment_2_id, True),
]
)

def test_purge_planning_locks(self):
PurgeExpiredLocks().run("planning")
self.assertLockState(
[
("events", "active_event_1", True),
("events", "expired_event_1", True),
("planning", "active_plan_1", True),
("planning", "expired_plan_1", False),
("assignments", assignment_1_id, True),
("assignments", assignment_2_id, True),
]
)

def test_purge_assignment_locks(self):
PurgeExpiredLocks().run("assignments")
self.assertLockState(
[
("events", "active_event_1", True),
("events", "expired_event_1", True),
("planning", "active_plan_1", True),
("planning", "expired_plan_1", True),
("assignments", assignment_1_id, True),
("assignments", assignment_2_id, False),
]
)

def test_purge_all_locks(self):
PurgeExpiredLocks().run("all")
self.assertLockState(
[
("events", "active_event_1", True),
("events", "expired_event_1", False),
("planning", "active_plan_1", True),
("planning", "expired_plan_1", False),
("assignments", assignment_1_id, True),
("assignments", assignment_2_id, False),
]
)

def test_purge_all_locks_with_custom_expiry(self):
PurgeExpiredLocks().run("all", 2)
self.assertLockState(
[
("events", "active_event_1", False),
("events", "expired_event_1", False),
("planning", "active_plan_1", False),
("planning", "expired_plan_1", False),
("assignments", assignment_1_id, False),
("assignments", assignment_2_id, False),
]
)
3 changes: 2 additions & 1 deletion server/planning/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Union
from bson.objectid import ObjectId
from bson.errors import InvalidId


def try_cast_object_id(value):
def try_cast_object_id(value: str) -> Union[ObjectId, str]:
try:
return ObjectId(value)
except InvalidId:
Expand Down

0 comments on commit a473ee0

Please sign in to comment.