Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handle OPDS server root content types. (PP-1959) #718

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions opds.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import json

import flask
Expand All @@ -7,6 +9,7 @@
from authentication_document import AuthenticationDocument
from config import Configuration
from model import ConfigurationSetting, Hyperlink, LibraryType, Validation
from util.http import NormalizedMediaType


class Annotator:
Expand Down Expand Up @@ -38,6 +41,9 @@ class OPDSCatalog:

CACHE_TIME = 3600 * 12

_NORMALIZED_OPDS2_TYPE = NormalizedMediaType(OPDS_TYPE)
_NORMALIZED_OPDS1_TYPE = NormalizedMediaType(OPDS_1_TYPE)

@classmethod
def _strftime(cls, date):
"""
Expand Down Expand Up @@ -87,11 +93,23 @@ def __init__(
url_for=url_for,
include_logo=include_logos,
web_client_uri_template=web_client_uri_template,
include_service_area=include_service_areas
include_service_area=include_service_areas,
)
)
annotator.annotate_catalog(self, live=live)

@classmethod
def is_opds1_type(cls, media_type: str | None) -> bool:
return cls._NORMALIZED_OPDS1_TYPE.min_match(media_type)

@classmethod
def is_opds2_type(cls, media_type: str | None) -> bool:
return cls._NORMALIZED_OPDS2_TYPE.min_match(media_type)

@classmethod
def is_opds_type(cls, media_type: str | None) -> bool:
return cls.is_opds1_type(media_type) or cls.is_opds2_type(media_type)

@classmethod
def _feed_is_large(cls, _db, libraries):
"""Determine whether a prospective feed is 'large' per a sitewide setting.
Expand Down Expand Up @@ -125,7 +143,6 @@ def library_catalog(
web_client_uri_template=None,
include_service_area=False,
):

"""Create an OPDS catalog for a library.

:param distance: The distance, in meters, from the client's
Expand Down
9 changes: 4 additions & 5 deletions registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,9 @@ def register(self, library: Library, library_stage):
opds_url=opds_url,
auth_url=auth_url,
)
elif content_type not in (OPDSCatalog.OPDS_TYPE, OPDSCatalog.OPDS_1_TYPE):
elif not OPDSCatalog.is_opds_type(content_type):
failure_detail = _(
"Supposed root document at %(url)s is not an OPDS document",
url=opds_url,
f"Supposed root document at {opds_url} does not appear to be an OPDS document (content_type={content_type!r}).",
)
elif not self.opds_response_links_to_auth_document(opds_response, auth_url):
failure_detail = _(
Expand Down Expand Up @@ -316,14 +315,14 @@ def opds_response_links(cls, response, rel):
if link:
links.append(link.get("url"))
media_type = response.headers.get("Content-Type")
if media_type == OPDSCatalog.OPDS_TYPE:
if OPDSCatalog.is_opds2_type(media_type):
# Parse as OPDS 2.
catalog = json.loads(response.content)
links = []
for k, v in catalog.get("links", {}).items():
if k == rel:
links.append(v.get("href"))
elif media_type == OPDSCatalog.OPDS_1_TYPE:
elif OPDSCatalog.is_opds1_type(media_type):
# Parse as OPDS 1.
feed = feedparser.parse(response.content)
for link in feed.get("feed", {}).get("links", []):
Expand Down
64 changes: 57 additions & 7 deletions tests/test_controller.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import base64
import datetime
import json
Expand Down Expand Up @@ -1507,30 +1509,78 @@ def test_register_fails_on_start_link_error(
== "Error retrieving OPDS root document at http://circmanager.org/feed/"
)

@pytest.mark.parametrize(
"media_type, expect_success",
[
pytest.param(
"application/atom+xml;profile=opds-catalog;kind=acquisition",
True,
id="opds1-acquisition",
),
pytest.param(
"application/atom+xml;kind=acquisition;profile=opds-catalog",
True,
id="opds1_acquisition-different-order",
),
pytest.param(
"application/atom+xml;profile=opds-catalog;kind=acquisition;api-version=1",
True,
id="opds1-acquisition-apiv1",
),
pytest.param(
"application/atom+xml;api-version=1;kind=acquisition;profile=opds-catalog",
True,
id="opds1_acquisition_apiv1-different-order",
),
pytest.param(
"application/atom+xml;api-version=2;kind=acquisition;profile=opds-catalog",
True,
id="opds1-acquisition-apiv2",
),
pytest.param(
"application/atom+xml;profile=opds-catalog;kind=navigation",
False,
id="opds1-navigation",
),
pytest.param("application/opds+json;api-version=1", True, id="opds2-apiv1"),
pytest.param("application/opds+json;api-version=2", True, id="opds2-apiv2"),
pytest.param("application/epub+zip", False, id="epub+zip"),
pytest.param("application/json", False, id="application-json"),
pytest.param("", False, id="empty-string"),
pytest.param(None, False, id="none-value"),
],
)
def test_register_fails_on_start_link_not_opds_feed(
self, registry_controller_fixture: LibraryRegistryControllerFixture
):
self,
media_type: str | None,
expect_success: bool,
registry_controller_fixture: LibraryRegistryControllerFixture,
) -> None:
fixture = registry_controller_fixture
# An empty string media type results in a no content-type header.
content_type = None if media_type == "" else media_type

"""The request returns an authentication document but an attempt
to retrieve the corresponding OPDS feed gives a server-side error.
"""
auth_document = self._auth_document()
# The start link returns a 200 response code but the media type might be wrong.
fixture.http_client.queue_response(
200, content=json.dumps(auth_document), url=auth_document["id"]
)

# The start link returns a 200 response code but the wrong
# Content-Type.
fixture.http_client.queue_response(200, "text/html")
fixture.http_client.queue_response(200, media_type)
with fixture.app.test_request_context("/", method="POST"):
flask.request.form = fixture.form
response = fixture.controller.register(do_get=fixture.http_client.do_get)
# We expect to get INVALID_INTEGRATION_DOCUMENT problem detail here, in any case,
# since our test is not fully configured.
assert response.uri == INVALID_INTEGRATION_DOCUMENT.uri
# But we should see the `not OPDS` detail only in the case of an invalid media type.
assert (
response.detail
== "Supposed root document at http://circmanager.org/feed/ is not an OPDS document"
)
!= f"Supposed root document at http://circmanager.org/feed/ does not appear to be an OPDS document (content_type={content_type!r})."
) == expect_success

def test_register_fails_if_start_link_does_not_link_back_to_auth_document(
self, registry_controller_fixture: LibraryRegistryControllerFixture
Expand Down
68 changes: 68 additions & 0 deletions tests/test_opds.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from __future__ import annotations

import datetime
import json

import pytest

from authentication_document import AuthenticationDocument
from config import Configuration
from model import (
Expand Down Expand Up @@ -351,3 +355,67 @@ def assert_reservation_status(expect):
# _hyperlink_args stops working.
hyperlink.resource = None
assert m(hyperlink) is None


class TestOpdsMediaTypeChecks:
@pytest.mark.parametrize(
"media_type, expect_is_opds1, expect_is_opds2",
[
pytest.param(
"application/atom+xml;profile=opds-catalog;kind=acquisition",
True,
False,
id="opds1-acquisition",
),
pytest.param(
"application/atom+xml;kind=acquisition;profile=opds-catalog",
True,
False,
id="opds1_acquisition-different-order",
),
pytest.param(
"application/atom+xml;profile=opds-catalog;kind=acquisition;api-version=1",
True,
False,
id="opds1-acquisition-apiv1",
),
pytest.param(
"application/atom+xml;api-version=1;kind=acquisition;profile=opds-catalog",
True,
False,
id="opds1_acquisition_apiv1-different-order",
),
pytest.param(
"application/atom+xml;api-version=2;kind=acquisition;profile=opds-catalog",
True,
False,
id="opds1-acquisition-apiv2",
),
pytest.param(
"application/atom+xml;profile=opds-catalog;kind=navigation",
False,
False,
id="opds1-navigation",
),
pytest.param(
"application/opds+json;api-version=1", False, True, id="opds2-apiv1"
),
pytest.param(
"application/opds+json;api-version=2", False, True, id="opds2-apiv2"
),
pytest.param("application/epub+zip", False, False, id="epub+zip"),
pytest.param("application/json", False, False, id="application-json"),
pytest.param("", False, False, id="empty-string"),
pytest.param(None, False, False, id="none-value"),
],
)
def test_opds_catalog_types(
self, media_type: str | None, expect_is_opds1: bool, expect_is_opds2: bool
) -> None:
is_opds1 = OPDSCatalog.is_opds1_type(media_type)
is_opds2 = OPDSCatalog.is_opds2_type(media_type)
is_opds_catalog_type = OPDSCatalog.is_opds_type(media_type)

assert is_opds1 == expect_is_opds1
assert is_opds2 == expect_is_opds2
assert is_opds_catalog_type == (expect_is_opds1 or expect_is_opds2)
Loading
Loading