diff --git a/invenio_rdm_records/records/api.py b/invenio_rdm_records/records/api.py index ce6f4ba7d..44580383f 100644 --- a/invenio_rdm_records/records/api.py +++ b/invenio_rdm_records/records/api.py @@ -460,7 +460,6 @@ def next_latest_published_record_by_parent(cls, parent): :param parent: parent record. :param excluded_latest: latest record to exclude find next published version """ - with db.session.no_autoflush: rec_model_query = ( cls.model_cls.query.filter_by(parent_id=parent.id) @@ -488,7 +487,6 @@ def get_latest_published_by_parent(cls, parent): It might return None if there is no latest published version i.e not published yet or all versions are deleted. """ - latest_record = cls.get_latest_by_parent(parent) if latest_record.deletion_status != RecordDeletionStatusEnum.PUBLISHED.value: return None diff --git a/tests/services/test_record_deletion.py b/tests/services/test_record_deletion.py index f0c63116e..40fd2d29f 100644 --- a/tests/services/test_record_deletion.py +++ b/tests/services/test_record_deletion.py @@ -7,20 +7,31 @@ """Service-level tests for record deletion.""" +from copy import deepcopy from datetime import datetime import arrow import pytest -from invenio_access.permissions import system_identity from invenio_rdm_records.proxies import current_rdm_records +from invenio_rdm_records.records.api import RDMRecord from invenio_rdm_records.records.systemfields.deletion_status import ( RecordDeletionStatusEnum, ) from invenio_rdm_records.services.errors import DeletionStatusException -def test_record_deletion(running_app, minimal_record): +def assert_parent_resolved_to_record( + service, identity, parent_pid, record, status=RecordDeletionStatusEnum.PUBLISHED +): + """.""" + # resolve parent pid to latest + resolved_rec = service.pids.resolve(identity, parent_pid, "recid") + assert resolved_rec._record.pid.pid_value == record._record.pid.pid_value + assert resolved_rec._obj.deletion_status == status + + +def test_record_deletion(running_app, minimal_record, search_clear): """Test simple record deletion of a record.""" superuser_identity = running_app.superuser_identity service = current_rdm_records.records_service @@ -62,7 +73,7 @@ def test_record_deletion(running_app, minimal_record): assert record._obj.tombstone is None -def test_invalid_record_deletion_workflows(running_app, minimal_record): +def test_invalid_record_deletion_workflows(running_app, minimal_record, search_clear): """Test the wrong order of deletion operations.""" superuser_identity = running_app.superuser_identity service = current_rdm_records.records_service @@ -96,3 +107,160 @@ def test_invalid_record_deletion_workflows(running_app, minimal_record): # we cannot directly restore a record marked for purge with pytest.raises(DeletionStatusException): service.restore_record(superuser_identity, record.id) + + +def test_record_deletion_of_specific_version(running_app, minimal_record, search_clear): + """Test record deletion of a specific version.""" + superuser_identity = running_app.superuser_identity + service = current_rdm_records.records_service + draft = service.create(superuser_identity, minimal_record) + record_v1 = service.publish(superuser_identity, draft.id) + published_parent_pid = record_v1._record.parent.pid.pid_value + + # publish v2 + minimal_record_v2 = deepcopy(minimal_record) + minimal_record_v2["metadata"]["title"] = f"{minimal_record['metadata']['title']} v2" + draft_v2 = service.new_version(superuser_identity, record_v1.id) + service.update_draft(superuser_identity, draft_v2.id, minimal_record_v2) + record_v2 = service.publish(superuser_identity, draft_v2.id) + + assert record_v2._obj.deletion_status == RecordDeletionStatusEnum.PUBLISHED + + # resolve parent pid to record v2 + assert_parent_resolved_to_record( + service, superuser_identity, published_parent_pid, record_v2 + ) + + # search + res = service.search( + superuser_identity, + params={ + "status": RecordDeletionStatusEnum.PUBLISHED.value, + "allversions": True, + }, + ) + assert res.total == 2 + + # delete the record v1 + tombstone_info = {"note": "no specific reason, tbh"} + record = service.delete_record(superuser_identity, record_v1.id, tombstone_info) + + # resolve parent pid to record v1 + assert_parent_resolved_to_record( + service, superuser_identity, published_parent_pid, record_v2 + ) + + RDMRecord.index.refresh() + + # search + res = service.search( + superuser_identity, + params={ + "status": RecordDeletionStatusEnum.PUBLISHED.value, + "allversions": True, + }, + ) + assert res.total == 1 + + # delete the record v2 + tombstone_info = {"note": "no specific reason, tbh"} + record = service.delete_record(superuser_identity, record_v2.id, tombstone_info) + tombstone = record._obj.tombstone + + # resolve parent pid to record v2 but deleted (v2 was the last version) + assert_parent_resolved_to_record( + service, + superuser_identity, + published_parent_pid, + record_v2, # resolved rec + status=RecordDeletionStatusEnum.DELETED, + ) + assert tombstone.note == tombstone_info["note"] + + RDMRecord.index.refresh() + + # search + res = service.search( + superuser_identity, + params={ + "status": RecordDeletionStatusEnum.PUBLISHED.value, + "allversions": True, + }, + ) + assert res.total == 0 + + # restore the record v1 + record = service.restore_record(superuser_identity, record_v1.id) + assert record._obj.deletion_status == RecordDeletionStatusEnum.PUBLISHED + assert not record._obj.deletion_status.is_deleted + assert record._obj.tombstone is None + + # resolve parent pid to record v1 + assert_parent_resolved_to_record( + service, superuser_identity, published_parent_pid, record_v1 + ) + + RDMRecord.index.refresh() + + # search + res = service.search( + superuser_identity, + params={ + "status": RecordDeletionStatusEnum.PUBLISHED.value, + "allversions": True, + }, + ) + assert res.total == 1 + + # create new version from restored v1 + minimal_record_v2 = deepcopy(minimal_record) + minimal_record_v2["metadata"]["title"] = f"{minimal_record['metadata']['title']} v3" + draft_v3 = service.new_version(superuser_identity, record_v1.id) + service.update_draft(superuser_identity, draft_v3.id, minimal_record_v2) + record_v3 = service.publish(superuser_identity, draft_v3.id) + + # resolve parent pid to record v3 + assert_parent_resolved_to_record( + service, superuser_identity, published_parent_pid, record_v3 + ) + + RDMRecord.index.refresh() + + # search + res = service.search( + superuser_identity, + params={ + "status": RecordDeletionStatusEnum.PUBLISHED.value, + "allversions": True, + }, + ) + assert res.total == 2 + + # restore the record v2 + record = service.restore_record(superuser_identity, record_v2.id) + assert record._obj.deletion_status == RecordDeletionStatusEnum.PUBLISHED + assert not record._obj.deletion_status.is_deleted + assert record._obj.tombstone is None + + # resolve parent pid to record v3 + assert_parent_resolved_to_record( + service, superuser_identity, published_parent_pid, record_v3 + ) + + RDMRecord.index.refresh() + + # search + res = service.search( + superuser_identity, + params={ + "status": RecordDeletionStatusEnum.PUBLISHED.value, + "allversions": True, + }, + ) + + assert res.total == 3 + hits = [(hit["id"], hit["versions"]["index"]) for hit in res.hits] + # assert the correct order of (id, version.index) + assert hits[0] == (record_v3.id, 3) + assert hits[1] == (record_v2.id, 2) + assert hits[2] == (record_v1.id, 1)