diff --git a/src/palace/manager/api/admin/controller/work_editor.py b/src/palace/manager/api/admin/controller/work_editor.py index 1ef68f412..bff10ce8c 100644 --- a/src/palace/manager/api/admin/controller/work_editor.py +++ b/src/palace/manager/api/admin/controller/work_editor.py @@ -1,10 +1,14 @@ import json +from collections.abc import Mapping +from typing import Any import flask from flask import Response from flask_babel import lazy_gettext as _ +from pydantic import TypeAdapter, ValidationError from palace.manager.api.admin.controller.base import AdminPermissionsControllerMixin +from palace.manager.api.admin.model.work_editor import CustomListResponse from palace.manager.api.admin.problem_details import ( EROTICA_FOR_ADULTS_ONLY, GENRE_NOT_FOUND, @@ -30,6 +34,7 @@ from palace.manager.api.util.flask import get_request_library from palace.manager.core.classifier import NO_NUMBER, NO_VALUE, genres from palace.manager.core.classifier.simplified import SimplifiedGenreClassifier +from palace.manager.core.problem_details import INVALID_INPUT from palace.manager.feed.acquisition import OPDSAcquisitionFeed from palace.manager.feed.annotator.admin import AdminAnnotator from palace.manager.sqlalchemy.model.classification import ( @@ -46,6 +51,7 @@ from palace.manager.sqlalchemy.model.licensing import RightsStatus from palace.manager.sqlalchemy.model.measurement import Measurement from palace.manager.sqlalchemy.model.resource import Hyperlink +from palace.manager.sqlalchemy.model.work import Work from palace.manager.sqlalchemy.presentation import PresentationCalculationPolicy from palace.manager.sqlalchemy.util import create, get_one, get_one_or_create from palace.manager.util.datetime_helpers import strptime_utc, utc_now @@ -675,78 +681,100 @@ def edit_classifications(self, identifier_type, identifier): return Response("", 200) - def custom_lists(self, identifier_type, identifier): + @staticmethod + def _existing_custom_lists(library: Library, work: Work) -> list[CustomList]: + return [ + entry.customlist + for entry in work.custom_list_entries + if entry.customlist and entry.customlist.library == library + ] + + def _custom_lists_get(self, library: Library, work: Work) -> dict[str, Any]: + lists = [ + CustomListResponse(id=cl.id, name=cl.name).api_dict() + for cl in self._existing_custom_lists(library, work) + ] + return dict(custom_lists=lists) + + def _custom_lists_post(self, library: Library, work: Work) -> Response: + ta = TypeAdapter(list[CustomListResponse]) + try: + lists = ta.validate_json(flask.request.form.get("lists", "[]", str)) + except ValidationError as ex: + self.log.debug("Invalid custom list data: %s", ex) + raise ProblemDetailException( + INVALID_INPUT.detailed("Invalid form data", debug_message=str(ex)) + ) + + staff_data_source = DataSource.lookup(self._db, DataSource.LIBRARY_STAFF) + affected_lanes = set() + + # Remove entries for lists that were not in the submitted form. + submitted_ids = {l.id for l in lists} + for custom_list in self._existing_custom_lists(library, work): + if custom_list.id not in submitted_ids: + custom_list.remove_entry(work) + for lane in Lane.affected_by_customlist(custom_list): + affected_lanes.add(lane) + + # Add entries for any new lists. + for list_response in lists: + if list_response.id is not None: + custom_list_or_none = get_one( + self._db, + CustomList, + id=list_response.id, + name=list_response.name, + library=library, + data_source=staff_data_source, + ) + if not custom_list_or_none: + self._db.rollback() + raise ProblemDetailException( + MISSING_CUSTOM_LIST.detailed( + _( + 'Could not find list "%(list_name)s"', + list_name=list_response.name, + ) + ) + ) + custom_list = custom_list_or_none + else: + custom_list, __ = create( + self._db, + CustomList, + name=list_response.name, + data_source=staff_data_source, + library=library, + ) + custom_list.created = utc_now() + entry, was_new = custom_list.add_entry(work, featured=True) + if was_new: + for lane in Lane.affected_by_customlist(custom_list): + affected_lanes.add(lane) + + # If any list changes affected lanes, update their sizes. + # NOTE: This may not make a difference until the + # works are actually re-indexed. + for lane in affected_lanes: + lane.update_size(self._db, search_engine=self.search_engine) + + return Response(str(_("Success")), 200) + + def custom_lists( + self, identifier_type: str, identifier: str + ) -> Mapping[str, Any] | Response: library = get_request_library() self.require_librarian(library) work = self.load_work(library, identifier_type, identifier) if isinstance(work, ProblemDetail): - return work - - staff_data_source = DataSource.lookup(self._db, DataSource.LIBRARY_STAFF) + raise ProblemDetailException(work) if flask.request.method == "GET": - lists = [] - for entry in work.custom_list_entries: - list = entry.customlist - lists.append(dict(id=list.id, name=list.name)) - return dict(custom_lists=lists) - - if flask.request.method == "POST": - lists = flask.request.form.get("lists") - if lists: - lists = json.loads(lists) - else: - lists = [] - - affected_lanes = set() - - # Remove entries for lists that were not in the submitted form. - submitted_ids = [l.get("id") for l in lists if l.get("id")] - for entry in work.custom_list_entries: - if entry.list_id not in submitted_ids: - list = entry.customlist - list.remove_entry(work) - for lane in Lane.affected_by_customlist(list): - affected_lanes.add(lane) - - # Add entries for any new lists. - for list_info in lists: - id = list_info.get("id") - name = list_info.get("name") - - if id: - is_new = False - list = get_one( - self._db, - CustomList, - id=int(id), - name=name, - library=library, - data_source=staff_data_source, - ) - if not list: - self._db.rollback() - return MISSING_CUSTOM_LIST.detailed( - _('Could not find list "%(list_name)s"', list_name=name) - ) - else: - list, is_new = create( - self._db, - CustomList, - name=name, - data_source=staff_data_source, - library=library, - ) - list.created = utc_now() - entry, was_new = list.add_entry(work, featured=True) - if was_new: - for lane in Lane.affected_by_customlist(list): - affected_lanes.add(lane) - - # If any list changes affected lanes, update their sizes. - # NOTE: This may not make a difference until the - # works are actually re-indexed. - for lane in affected_lanes: - lane.update_size(self._db, search_engine=self.search_engine) - - return Response(str(_("Success")), 200) + return self._custom_lists_get(library, work) + + elif flask.request.method == "POST": + return self._custom_lists_post(library, work) + + else: + raise RuntimeError("Unsupported method") diff --git a/src/palace/manager/api/admin/model/work_editor.py b/src/palace/manager/api/admin/model/work_editor.py new file mode 100644 index 000000000..2f13c764f --- /dev/null +++ b/src/palace/manager/api/admin/model/work_editor.py @@ -0,0 +1,15 @@ +from pydantic import ConfigDict, NonNegativeInt + +from palace.manager.util.flask_util import CustomBaseModel + + +class CustomListResponse(CustomBaseModel): + name: str + id: NonNegativeInt | None = None + + model_config = ConfigDict( + # TODO: circulation-admin includes extra fields in its response that we don't + # need / use. It should be updated to just send the data we need, then we + # can forbid extras like we do in our other models. + extra="ignore", + ) diff --git a/src/palace/manager/sqlalchemy/model/customlist.py b/src/palace/manager/sqlalchemy/model/customlist.py index 99decb5e7..b3e388534 100644 --- a/src/palace/manager/sqlalchemy/model/customlist.py +++ b/src/palace/manager/sqlalchemy/model/customlist.py @@ -65,7 +65,7 @@ class CustomList(Base): size = Column(Integer, nullable=False, default=0) entries: Mapped[list[CustomListEntry]] = relationship( - "CustomListEntry", backref="customlist", uselist=True + "CustomListEntry", back_populates="customlist", uselist=True ) # List sharing mechanisms @@ -364,6 +364,10 @@ class CustomListEntry(Base): __tablename__ = "customlistentries" id = Column(Integer, primary_key=True) list_id = Column(Integer, ForeignKey("customlists.id"), index=True) + customlist: Mapped[CustomList] = relationship( + "CustomList", back_populates="entries" + ) + edition_id = Column(Integer, ForeignKey("editions.id"), index=True) work_id = Column(Integer, ForeignKey("works.id"), index=True) featured = Column(Boolean, nullable=False, default=False) diff --git a/tests/manager/api/admin/controller/test_work_editor.py b/tests/manager/api/admin/controller/test_work_editor.py index 9b69800a3..2915d723a 100644 --- a/tests/manager/api/admin/controller/test_work_editor.py +++ b/tests/manager/api/admin/controller/test_work_editor.py @@ -7,7 +7,6 @@ import pytest from werkzeug.datastructures import ImmutableMultiDict -from palace.manager.api.admin.controller.custom_lists import CustomListsController from palace.manager.api.admin.exceptions import AdminNotAuthorized from palace.manager.api.admin.problem_details import ( EROTICA_FOR_ADULTS_ONLY, @@ -22,7 +21,7 @@ UNKNOWN_MEDIUM, UNKNOWN_ROLE, ) -from palace.manager.api.problem_details import LIBRARY_NOT_FOUND +from palace.manager.api.problem_details import LIBRARY_NOT_FOUND, NO_LICENSES from palace.manager.core.classifier.simplified import SimplifiedGenreClassifier from palace.manager.feed.annotator.admin import AdminAnnotator from palace.manager.sqlalchemy.constants import IdentifierType @@ -43,7 +42,8 @@ from palace.manager.util.problem_detail import ProblemDetail, ProblemDetailException from tests.fixtures.api_admin import AdminControllerFixture from tests.fixtures.api_controller import ControllerFixture -from tests.mocks.flask import add_request_context +from tests.fixtures.database import DatabaseTransactionFixture +from tests.fixtures.problem_detail import raises_problem_detail from tests.mocks.mock import ( AlwaysSuccessfulCoverageProvider, NeverSuccessfulCoverageProvider, @@ -927,90 +927,183 @@ def test_classifications(self, work_fixture: WorkFixture): lp.identifier.identifier, ) - def test_custom_lists_get(self, work_fixture: WorkFixture): - staff_data_source = DataSource.lookup( - work_fixture.ctrl.db.session, DataSource.LIBRARY_STAFF + def test_custom_lists_get( + self, work_fixture: WorkFixture, db: DatabaseTransactionFixture + ) -> None: + # Test with non-existent identifier. + with ( + work_fixture.request_context_with_library_and_admin("/"), + raises_problem_detail( + pd=NO_LICENSES.detailed( + "The item you're asking about (URI/http://non-existent-id) isn't in this collection." + ) + ), + ): + work_fixture.manager.admin_work_controller.custom_lists( + IdentifierType.URI.value, "http://non-existent-id" + ) + + # Test normal case. Only custom lists for the current library are returned. + staff_data_source = DataSource.lookup(db.session, DataSource.LIBRARY_STAFF) + custom_list, ignore = create( + db.session, + CustomList, + name=db.fresh_str(), + library=db.default_library(), + data_source=staff_data_source, ) - list, ignore = create( - work_fixture.ctrl.db.session, + other_library_custom_list, ignore = create( + db.session, CustomList, - name=work_fixture.ctrl.db.fresh_str(), - library=work_fixture.ctrl.db.default_library(), + name=db.fresh_str(), + library=db.library(), data_source=staff_data_source, ) - work = work_fixture.ctrl.db.work(with_license_pool=True) - list.add_entry(work) + work = db.work(with_license_pool=True) + custom_list.add_entry(work) + other_library_custom_list.add_entry(work) identifier = work.presentation_edition.primary_identifier with work_fixture.request_context_with_library_and_admin("/"): response = work_fixture.manager.admin_work_controller.custom_lists( identifier.type, identifier.identifier ) + assert isinstance(response, dict) lists = response.get("custom_lists") + assert isinstance(lists, list) assert 1 == len(lists) - assert list.id == lists[0].get("id") - assert list.name == lists[0].get("name") - - work_fixture.admin.remove_role( - AdminRole.LIBRARIAN, work_fixture.ctrl.db.default_library() - ) - with work_fixture.request_context_with_library_and_admin("/"): - pytest.raises( - AdminNotAuthorized, - work_fixture.manager.admin_work_controller.custom_lists, + [custom_list_response] = lists + assert custom_list.id == custom_list_response.get("id") + assert custom_list.name == custom_list_response.get("name") + + # Test lack permissions. + work_fixture.admin.remove_role(AdminRole.LIBRARIAN, db.default_library()) + with ( + work_fixture.request_context_with_library_and_admin("/"), + pytest.raises(AdminNotAuthorized), + ): + work_fixture.manager.admin_work_controller.custom_lists( identifier.type, identifier.identifier, ) - def test_custom_lists_edit_with_missing_list(self, work_fixture: WorkFixture): - work = work_fixture.ctrl.db.work(with_license_pool=True) + def test_custom_lists_post( + self, work_fixture: WorkFixture, db: DatabaseTransactionFixture + ) -> None: + staff_data_source = DataSource.lookup(db.session, DataSource.LIBRARY_STAFF) + custom_list, _ = create( + db.session, + CustomList, + name=db.fresh_str(), + library=db.default_library(), + data_source=staff_data_source, + ) + work = db.work(with_license_pool=True) identifier = work.presentation_edition.primary_identifier - with work_fixture.request_context_with_library_and_admin("/", method="POST"): - form = ImmutableMultiDict( + # Create a Lane that depends on this CustomList for its membership. + lane = db.lane() + lane.customlists.append(custom_list) + lane.size = 300 + + # Try adding the work to a list that doesn't exist. + deleted_custom_list, _ = create( + db.session, + CustomList, + name=db.fresh_str(), + library=db.default_library(), + data_source=staff_data_source, + ) + deleted_list_id = deleted_custom_list.id + db.session.delete(deleted_custom_list) + with ( + work_fixture.request_context_with_library_and_admin("/", method="POST"), + raises_problem_detail( + pd=MISSING_CUSTOM_LIST.detailed( + 'Could not find list "non-existent list"' + ) + ), + ): + flask.request.form = ImmutableMultiDict( [ - ("id", "4"), - ("name", "name"), + ( + "lists", + json.dumps( + [{"name": "non-existent list", "id": deleted_list_id}] + ), + ) ] ) - add_request_context( - flask.request, CustomListsController.CustomListPostRequest, form=form + work_fixture.manager.admin_work_controller.custom_lists( + identifier.type, identifier.identifier ) - response = work_fixture.manager.admin_custom_lists_controller.custom_lists() - assert MISSING_CUSTOM_LIST == response + # Try sending bad data + with ( + work_fixture.request_context_with_library_and_admin("/", method="POST"), + raises_problem_detail(detail="Invalid form data"), + ): + flask.request.form = ImmutableMultiDict( + [("lists", json.dumps("Complete garbage 🗑️"))] + ) + work_fixture.manager.admin_work_controller.custom_lists( + identifier.type, identifier.identifier + ) - def test_custom_lists_edit_success(self, work_fixture: WorkFixture): - staff_data_source = DataSource.lookup( - work_fixture.ctrl.db.session, DataSource.LIBRARY_STAFF - ) - list, ignore = create( - work_fixture.ctrl.db.session, + # Try adding work to a list that the library doesn't have access to. + other_libraries_list, _ = create( + db.session, CustomList, - name=work_fixture.ctrl.db.fresh_str(), - library=work_fixture.ctrl.db.default_library(), + name=db.fresh_str(), + library=db.library(), data_source=staff_data_source, ) - work = work_fixture.ctrl.db.work(with_license_pool=True) - identifier = work.presentation_edition.primary_identifier - - # Create a Lane that depends on this CustomList for its membership. - lane = work_fixture.ctrl.db.lane() - lane.customlists.append(list) - lane.size = 300 + with ( + work_fixture.request_context_with_library_and_admin("/", method="POST"), + raises_problem_detail( + pd=MISSING_CUSTOM_LIST.detailed( + f'Could not find list "{other_libraries_list.name}"' + ) + ), + ): + flask.request.form = ImmutableMultiDict( + [ + ( + "lists", + json.dumps( + [ + { + "id": str(other_libraries_list.id), + "name": other_libraries_list.name, + } + ] + ), + ) + ] + ) + work_fixture.manager.admin_work_controller.custom_lists( + identifier.type, identifier.identifier + ) # Add the list to the work. with work_fixture.request_context_with_library_and_admin("/", method="POST"): flask.request.form = ImmutableMultiDict( - [("lists", json.dumps([{"id": str(list.id), "name": list.name}]))] + [ + ( + "lists", + json.dumps( + [{"id": str(custom_list.id), "name": custom_list.name}] + ), + ) + ] ) response = work_fixture.manager.admin_work_controller.custom_lists( identifier.type, identifier.identifier ) assert 200 == response.status_code assert 1 == len(work.custom_list_entries) - assert 1 == len(list.entries) - assert list == work.custom_list_entries[0].customlist + assert 1 == len(custom_list.entries) + assert custom_list == work.custom_list_entries[0].customlist assert True == work.custom_list_entries[0].featured # Now remove the work from the list. @@ -1025,7 +1118,7 @@ def test_custom_lists_edit_success(self, work_fixture: WorkFixture): ) assert 200 == response.status_code assert 0 == len(work.custom_list_entries) - assert 0 == len(list.entries) + assert 0 == len(custom_list.entries) # Add a list that didn't exist before. with work_fixture.request_context_with_library_and_admin("/", method="POST"): @@ -1038,24 +1131,23 @@ def test_custom_lists_edit_success(self, work_fixture: WorkFixture): assert 200 == response.status_code assert 1 == len(work.custom_list_entries) new_list = CustomList.find( - work_fixture.ctrl.db.session, + db.session, "new list", staff_data_source, - work_fixture.ctrl.db.default_library(), + db.default_library(), ) assert new_list == work.custom_list_entries[0].customlist assert True == work.custom_list_entries[0].featured - work_fixture.admin.remove_role( - AdminRole.LIBRARIAN, work_fixture.ctrl.db.default_library() - ) - with work_fixture.request_context_with_library_and_admin("/", method="POST"): + work_fixture.admin.remove_role(AdminRole.LIBRARIAN, db.default_library()) + with ( + work_fixture.request_context_with_library_and_admin("/", method="POST"), + pytest.raises(AdminNotAuthorized), + ): flask.request.form = ImmutableMultiDict( [("lists", json.dumps([{"name": "another new list"}]))] ) - pytest.raises( - AdminNotAuthorized, - work_fixture.manager.admin_work_controller.custom_lists, + work_fixture.manager.admin_work_controller.custom_lists( identifier.type, identifier.identifier, )