Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDESK-7163] improve: Use expiry datetime for purge_expired_locks command #1929

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions server/planning/assignments/assignments.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def assignee_details_changed(self, updates: Dict[str, Any], original: Dict[str,
return False

def system_update(self, id, updates, original, **kwargs):
super().system_update(id, updates, original, **kwargs)
rtn = super().system_update(id, updates, original, **kwargs)
if self.is_assignment_being_activated(updates, original):
doc = deepcopy(original)
doc.update(updates)
Expand All @@ -310,6 +310,7 @@ def system_update(self, id, updates, original, **kwargs):
and updates.get("assigned_to").get("state") != ASSIGNMENT_WORKFLOW_STATE.CANCELLED
):
app.on_updated_assignments(updates, original)
return rtn

def is_assignment_modified(self, updates, original):
"""Checks whether the assignment is modified or not"""
Expand Down Expand Up @@ -1247,10 +1248,9 @@ def is_assignment_draft(self, updates, original):
return updates.get("assigned_to", original.get("assigned_to")).get("state") == ASSIGNMENT_WORKFLOW_STATE.DRAFT

def is_assignment_being_activated(self, updates, original):
return (
original.get("assigned_to").get("state") == ASSIGNMENT_WORKFLOW_STATE.DRAFT
and updates.get("assigned_to", {}).get("state") == ASSIGNMENT_WORKFLOW_STATE.ASSIGNED
)
return (original.get("assigned_to") or {}).get("state") == ASSIGNMENT_WORKFLOW_STATE.DRAFT and (
updates.get("assigned_to") or {}
).get("state") == ASSIGNMENT_WORKFLOW_STATE.ASSIGNED

def is_text_assignment(self, assignment):
# scheduled_update is always for text coverages
Expand Down
63 changes: 33 additions & 30 deletions server/planning/commands/purge_expired_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# at https://www.sourcefabric.org/superdesk/license

import logging
from datetime import timedelta

from flask import current_app as app
from eve.utils import date_to_str
Expand All @@ -18,6 +19,7 @@
from superdesk.lock import lock, unlock
from superdesk.celery_task_utils import get_lock_id
from planning.item_lock import LOCK_ACTION, LOCK_SESSION, LOCK_TIME, LOCK_USER
from planning.utils import try_cast_object_id

logger = logging.getLogger(__name__)

Expand All @@ -26,36 +28,51 @@ class PurgeExpiredLocks(Command):
"""
Purge item locks that are linked to a non-existing session

resource: The name of the resource to purge item locks for
--resource, -r: The name of the resource to purge item locks for
--expire-hours, -e: Purges locks that are older than this many hours

Example:
::

$ python manage.py planning:purge_expired_locks -r events
$ python manage.py planning:purge_expired_locks -r planning
$ python manage.py planning:purge_expired_locks -r assignments
$ python manage.py planning:purge_expired_locks -r all
$ python manage.py planning:purge_expired_locks -r all -e 48
"""

option_list = [Option("--resource", "-r", required=True)]
option_list = [
Option("--resource", "-r", required=True),
Option("--expire-hours", "-e", dest="expire_hours", required=False, type=int, default=24),
]

def run(self, resource: str):
def run(self, resource: str, expire_hours: int = 24) -> None:
logger.info("Starting to purge expired item locks")

if resource == "all":
resources = ["events", "planning", "assignments"]
elif resource not in ["events", "planning", "assignments"]:
raise ValueError(f"Invalid resource: {resource}")
else:
resources = [resource]

lock_name = get_lock_id("purge_expired_locks", resource)
if not lock(lock_name, expire=600):
logger.info("purge expired locks task is already running")
return

try:
self._purge_item_locks(resource)
except Exception as err:
logger.exception(f"Failed to purge item locks ({err})")
finally:
unlock(lock_name)
expiry_datetime = date_to_str(utcnow() - timedelta(hours=expire_hours))
for resource_name in resources:
try:
self._purge_item_locks(resource_name, expiry_datetime)
except Exception as err:
logger.exception(f"Failed to purge item locks ({err})")

unlock(lock_name)
logger.info("Completed purging expired item locks")

def _purge_item_locks(self, resource: str):
def _purge_item_locks(self, resource: str, expiry_datetime: str):
logger.info(f"Purging expired locks for {resource}")
resource_service = get_resource_service(resource)
try:
autosave_service = get_resource_service(
Expand All @@ -64,11 +81,11 @@ def _purge_item_locks(self, resource: str):
except KeyError:
autosave_service = None

for items in self.get_locked_items(resource):
for items in self.get_locked_items(resource, expiry_datetime):
failed_ids = []
for item in items:
try:
item_id = item["_id"]
item_id = try_cast_object_id(item["_id"])
except KeyError:
logger.exception("Item ID not found, unable to purge its lock")
continue
Expand Down Expand Up @@ -103,29 +120,15 @@ def _purge_item_locks(self, resource: str):
num_items = len(items)
num_success = num_items - len(failed_ids)
if num_success != num_items:
logger.warning(f"{num_success}/{num_items} item locks purged. Failed IDs: {failed_ids}")
logger.warning(f"{num_success}/{num_items} {resource} locks purged. Failed IDs: {failed_ids}")
else:
logger.info(f"{num_items} item locks purged")
logger.info(f"{num_items} {resource} locks purged")

def get_locked_items(self, resource: str):
now = utcnow()
active_sessions = [str(session["_id"]) for session in get_resource_service("auth").get(req=None, lookup={})]
def get_locked_items(self, resource: str, expiry_datetime: str):
service = get_resource_service(resource)
total_received = 0
query = {
"query": {
"bool": {
"filter": [
{"exists": {"field": LOCK_SESSION}},
# Use a range filter for lock time, so if this task takes a while
# it will exclude any newer item locks and/or sessions
{"range": {LOCK_TIME: {"lt": date_to_str(now)}}},
],
"must_not": [
{"terms": {LOCK_SESSION: active_sessions}},
],
},
},
"query": {"bool": {"filter": [{"range": {LOCK_TIME: {"lt": expiry_datetime}}}]}},
"size": app.config["MAX_EXPIRY_QUERY_LIMIT"],
"sort": [{LOCK_TIME: "asc"}],
}
Expand Down
182 changes: 182 additions & 0 deletions server/planning/commands/purge_expired_locks_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# -*- coding: utf-8; -*-
#
# This file is part of Superdesk.
#
# Copyright 2024 Sourcefabric z.u. and contributors.
#
# For the full copyright and license information, please see the
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

from typing import List, Tuple, Union
from datetime import timedelta
from bson import ObjectId

from superdesk.utc import utcnow
from planning.tests import TestCase

from .purge_expired_locks import PurgeExpiredLocks

now = utcnow()
assignment_1_id = ObjectId()
assignment_2_id = ObjectId()


# TODO: Add Assignments
class PurgeExpiredLocksTest(TestCase):
def setUp(self) -> None:
super().setUp()
self.app.data.insert(
"events",
[
{
"_id": "active_event_1",
"dates": {"start": now, "end": now + timedelta(days=1)},
"lock_user": "user1",
"lock_session": "session1",
"lock_time": now - timedelta(hours=23),
"lock_action": "edit",
},
{
"_id": "expired_event_1",
"dates": {"start": now, "end": now + timedelta(days=1)},
"lock_user": "user2",
"lock_session": "session2",
"lock_time": now - timedelta(hours=25),
"lock_action": "edit",
},
],
)
self.app.data.insert(
"planning",
[
{
"_id": "active_plan_1",
"planning_date": now,
"lock_user": "user3",
"lock_session": "session3",
"lock_time": now - timedelta(hours=23),
"lock_action": "edit",
},
{
"_id": "expired_plan_1",
"planning_date": now,
"lock_user": "user4",
"lock_session": "session4",
"lock_time": now - timedelta(hours=25),
"lock_action": "edit",
},
],
)
self.app.data.insert(
"assignments",
[
{
"_id": assignment_1_id,
"lock_user": "user5",
"lock_session": "session5",
"lock_time": now - timedelta(hours=23),
"lock_action": "edit",
},
{
"_id": assignment_2_id,
"lock_user": "user6",
"lock_session": "session6",
"lock_time": now - timedelta(hours=25),
"lock_action": "edit",
},
],
)
self.assertLockState(
[
("events", "active_event_1", True),
("events", "expired_event_1", True),
("planning", "active_plan_1", True),
("planning", "expired_plan_1", True),
("assignments", assignment_1_id, True),
("assignments", assignment_2_id, True),
]
)

def test_invalid_resource(self):
with self.assertRaises(ValueError):
PurgeExpiredLocks().run("blah")

def assertLockState(self, item_tests: List[Tuple[str, Union[str, ObjectId], bool]]):
for resource, item_id, is_locked in item_tests:
item = self.app.data.find_one(resource, req=None, _id=item_id)
if is_locked:
self.assertIsNotNone(item["lock_user"], f"{resource} item {item_id} is NOT locked, item={item}")
self.assertIsNotNone(item["lock_session"], f"{resource} item {item_id} is NOT locked, item={item}")
self.assertIsNotNone(item["lock_time"], f"{resource} item {item_id} is NOT locked, item={item}")
self.assertIsNotNone(item["lock_action"], f"{resource} item {item_id} is NOT locked, item={item}")
else:
self.assertIsNone(item.get("lock_user"), f"{resource} item {item_id} is locked, item={item}")
self.assertIsNone(item.get("lock_session"), f"{resource} item {item_id} is locked, item={item}")
self.assertIsNone(item.get("lock_time"), f"{resource} item {item_id} is locked, item={item}")
self.assertIsNone(item.get("lock_action"), f"{resource} item {item_id} is locked, item={item}")

def test_purge_event_locks(self):
PurgeExpiredLocks().run("events")
self.assertLockState(
[
("events", "active_event_1", True),
("events", "expired_event_1", False),
("planning", "active_plan_1", True),
("planning", "expired_plan_1", True),
("assignments", assignment_1_id, True),
("assignments", assignment_2_id, True),
]
)

def test_purge_planning_locks(self):
PurgeExpiredLocks().run("planning")
self.assertLockState(
[
("events", "active_event_1", True),
("events", "expired_event_1", True),
("planning", "active_plan_1", True),
("planning", "expired_plan_1", False),
("assignments", assignment_1_id, True),
("assignments", assignment_2_id, True),
]
)

def test_purge_assignment_locks(self):
PurgeExpiredLocks().run("assignments")
self.assertLockState(
[
("events", "active_event_1", True),
("events", "expired_event_1", True),
("planning", "active_plan_1", True),
("planning", "expired_plan_1", True),
("assignments", assignment_1_id, True),
("assignments", assignment_2_id, False),
]
)

def test_purge_all_locks(self):
PurgeExpiredLocks().run("all")
self.assertLockState(
[
("events", "active_event_1", True),
("events", "expired_event_1", False),
("planning", "active_plan_1", True),
("planning", "expired_plan_1", False),
("assignments", assignment_1_id, True),
("assignments", assignment_2_id, False),
]
)

def test_purge_all_locks_with_custom_expiry(self):
PurgeExpiredLocks().run("all", 2)
self.assertLockState(
[
("events", "active_event_1", False),
("events", "expired_event_1", False),
("planning", "active_plan_1", False),
("planning", "expired_plan_1", False),
("assignments", assignment_1_id, False),
("assignments", assignment_2_id, False),
]
)
3 changes: 2 additions & 1 deletion server/planning/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Union
from bson.objectid import ObjectId
from bson.errors import InvalidId


def try_cast_object_id(value):
def try_cast_object_id(value: str) -> Union[ObjectId, str]:
try:
return ObjectId(value)
except InvalidId:
Expand Down
Loading