-
Notifications
You must be signed in to change notification settings - Fork 4
/
registrar.py
406 lines (366 loc) · 15.3 KB
/
registrar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
from __future__ import annotations
import json
import logging
import re
from io import BytesIO
from typing import TYPE_CHECKING
from urllib.parse import urljoin
import feedparser
from flask_babel import lazy_gettext as _
from PIL import Image
from authentication_document import AuthenticationDocument
from model import Hyperlink
from opds import OPDSCatalog
from problem_details import (
ERROR_RETRIEVING_DOCUMENT,
INTEGRATION_DOCUMENT_NOT_FOUND,
INVALID_CONTACT_URI,
INVALID_INTEGRATION_DOCUMENT,
LIBRARY_ALREADY_IN_PRODUCTION,
TIMEOUT,
)
from util.file_storage import LibraryLogoStore
from util.http import HTTP, RequestTimedOut
from util.problem_detail import ProblemDetail
if TYPE_CHECKING:
from model import Library
class VerifyLinkRegexes:
MAILTO = r"^mailto:"
HTTP_OR_MAILTO = r"^(http[s]?:|mailto:)"
class LibraryRegistrar:
"""Encapsulates the logic of the library registration process."""
def __init__(self, _db, do_get=HTTP.debuggable_get):
self._db = _db
self.do_get = do_get
self.log = logging.getLogger("Library registrar")
def reregister(self, library):
"""Re-register the given Library by fetching its authentication
document and updating its record appropriately.
This process will not be as thorough as one initiated manually
by the library administrator, but it can be used to
automatically keep us up to date on minor changes to a
library's description, logo, etc.
:param library: A Library.
:return: A ProblemDetail if there's a problem. Otherwise, None.
"""
result = self.register(library, library.library_stage)
if isinstance(result, ProblemDetail):
return result
# The return value may include new settings for contact
# hyperlinks, but we will not be changing any Hyperlink
# objects, since that might result in emails being sent out
# unexpectedly. The library admin must explicitly re-register
# for that to happen.
#
# Basically, we don't actually use any of the items returned
# by register() -- only the controller uses that stuff.
return None
def register(self, library: Library, library_stage):
"""Register the given Library with this registry, if possible.
:param library: A Library to register or re-register.
:param library_stage: The library administrator's proposed value for
Library.library_stage.
:return: A ProblemDetail if there's a problem. Otherwise, a 2-tuple
(auth_document, new_hyperlinks).
`auth_document` is an AuthenticationDocument corresponding to
the library's authentication document, as found at auth_url.
`new_hyperlinks` is a list of Hyperlinks
that ought to be created for registration to be complete.
"""
hyperlinks_to_create = []
auth_url = library.authentication_url
auth_response = self._make_request(
auth_url,
auth_url,
_("No Authentication For OPDS document present at %(url)s", url=auth_url),
_("Timeout retrieving auth document %(url)s", url=auth_url),
_("Error retrieving auth document %(url)s", url=auth_url),
)
if isinstance(auth_response, ProblemDetail):
return auth_response
try:
auth_document = AuthenticationDocument.from_string(
self._db, auth_response.content
)
except Exception as e:
self.log.error(
"Registration of %s failed: invalid auth document.",
auth_url,
exc_info=e,
)
return INVALID_INTEGRATION_DOCUMENT
failure_detail = None
if not auth_document.id:
failure_detail = _("The OPDS authentication document is missing an id.")
if not auth_document.title:
failure_detail = _("The OPDS authentication document is missing a title.")
if auth_document.root:
opds_url = auth_document.root["href"]
else:
failure_detail = _(
"The OPDS authentication document is missing a 'start' link to the root OPDS feed."
)
if auth_document.id != auth_response.url:
failure_detail = _(
"The OPDS authentication document's id (%(id)s) doesn't match its url (%(url)s).",
id=auth_document.id,
url=auth_response.url,
)
if failure_detail:
self.log.error("Registration of %s failed: %s", auth_url, failure_detail)
return INVALID_INTEGRATION_DOCUMENT.detailed(failure_detail)
# Make sure the authentication document includes a way for
# patrons to get help or file a copyright complaint. These
# links must be stored in the database as Hyperlink objects.
links = auth_document.links or []
for rel, problem_title, regexes in [
(
"help",
"Invalid or missing patron support email address or website",
VerifyLinkRegexes.HTTP_OR_MAILTO,
),
(
Hyperlink.COPYRIGHT_DESIGNATED_AGENT_REL,
"Invalid or missing copyright designated agent email address",
VerifyLinkRegexes.MAILTO,
),
]:
uris = self._verify_links(rel, links, problem_title, link_regex=regexes)
if isinstance(uris, ProblemDetail):
return uris
hyperlinks_to_create.append((rel, uris))
# Cross-check the opds_url to make sure it links back to the
# authentication document.
opds_response = self._make_request(
auth_url,
opds_url,
_("No OPDS root document present at %(url)s", url=opds_url),
_("Timeout retrieving OPDS root document at %(url)s", url=opds_url),
_("Error retrieving OPDS root document at %(url)s", url=opds_url),
allow_401=True,
)
if isinstance(opds_response, ProblemDetail):
return opds_response
content_type = opds_response.headers.get("Content-Type")
failure_detail = None
if opds_response.status_code == 401:
# This is only acceptable if the server returned a copy of
# the Authentication For OPDS document we just got.
if content_type != AuthenticationDocument.MEDIA_TYPE:
failure_detail = _(
"401 response at %(url)s did not yield an Authentication For OPDS document",
url=opds_url,
)
elif not self.opds_response_links_to_auth_document(opds_response, auth_url):
failure_detail = _(
"Authentication For OPDS document guarding %(opds_url)s does not match the one at %(auth_url)s",
opds_url=opds_url,
auth_url=auth_url,
)
elif not OPDSCatalog.is_opds_type(content_type):
failure_detail = _(
f"Supposed root document at {opds_url} does not appear to be an OPDS document (content_type={content_type!r}).",
)
elif not self.opds_response_links_to_auth_document(opds_response, auth_url):
failure_detail = _(
"OPDS root document at %(opds_url)s does not link back to authentication document %(auth_url)s",
opds_url=opds_url,
auth_url=auth_url,
)
if failure_detail:
self.log.error("Registration of %s failed: %s", auth_url, failure_detail)
return INVALID_INTEGRATION_DOCUMENT.detailed(failure_detail)
auth_url = auth_response.url
try:
library.library_stage = library_stage
except ValueError:
return LIBRARY_ALREADY_IN_PRODUCTION
library.name = auth_document.title
if auth_document.website:
url = auth_document.website.get("href")
if url:
url = urljoin(opds_url, url)
library.web_url = auth_document.website.get("href")
else:
library.web_url = None
if auth_document.logo:
# Write this data to the storage
logo_url = LibraryLogoStore.write_from_b64(library, auth_document.logo)
if logo_url:
library.logo_url = logo_url
else:
return INVALID_INTEGRATION_DOCUMENT.detailed(
_("Could upload the logo image to the file storage")
)
elif auth_document.logo_link:
url = auth_document.logo_link.get("href")
if url:
url = urljoin(opds_url, url)
logo_response = self.do_get(url, stream=True)
try:
image = Image.open(logo_response.raw)
except Exception:
image_url = auth_document.logo_link.get("href")
self.log.error(
"Registration of %s failed: could not read logo image %s",
auth_url,
image_url,
)
return INVALID_INTEGRATION_DOCUMENT.detailed(
_("Could not read logo image %(image_url)s", image_url=image_url)
)
# Convert to PNG.
buffer = BytesIO()
image.save(buffer, format="PNG")
# Upload to the file store
logo_url = LibraryLogoStore.write(library, buffer)
if not logo_url:
return INVALID_INTEGRATION_DOCUMENT.detailed(
_("Could upload the logo image to the file storage")
)
library.logo_url = logo_url
buffer.seek(0)
problem = auth_document.update_library(library)
if problem:
self.log.error(
"Registration of %s failed: problem during registration: %s/%s/%s/%s",
auth_url,
problem.uri,
problem.title,
problem.detail,
problem.debug_message,
)
return problem
return auth_document, hyperlinks_to_create
def _make_request(
self, registration_url, url, on_404, on_timeout, on_exception, allow_401=False
):
allowed_codes = ["2xx", "3xx", 404]
if allow_401:
allowed_codes.append(401)
try:
response = self.do_get(
url, allowed_response_codes=allowed_codes, timeout=30
)
# We only allowed 404 above so that we could return a more
# specific problem detail document if it happened.
if response.status_code == 404:
return INTEGRATION_DOCUMENT_NOT_FOUND.detailed(on_404)
if not allow_401 and response.status_code == 401:
self.log.error(
"Registration of %s failed: %s is behind authentication gateway",
registration_url,
url,
)
return ERROR_RETRIEVING_DOCUMENT.detailed(
_("%(url)s is behind an authentication gateway", url=url)
)
except RequestTimedOut as e:
self.log.error(
"Registration of %s failed: timeout retrieving %s",
registration_url,
url,
exc_info=e,
)
return TIMEOUT.detailed(on_timeout)
except Exception as e:
self.log.error(
"Registration of %s failed: error retrieving %s",
registration_url,
url,
exc_info=e,
)
return ERROR_RETRIEVING_DOCUMENT.detailed(on_exception)
return response
@classmethod
def opds_response_links(cls, response, rel):
"""Find all the links in the given response for the given
link relation.
"""
# Look in the response itself for a Link header.
links = []
link = response.links.get(rel)
if link:
links.append(link.get("url"))
media_type = response.headers.get("Content-Type")
if OPDSCatalog.is_opds2_type(media_type):
# Parse as OPDS 2.
catalog = json.loads(response.content)
links = []
for k, v in catalog.get("links", {}).items():
if k == rel:
links.append(v.get("href"))
elif OPDSCatalog.is_opds1_type(media_type):
# Parse as OPDS 1.
feed = feedparser.parse(response.content)
for link in feed.get("feed", {}).get("links", []):
if link.get("rel") == rel:
links.append(link.get("href"))
elif media_type == AuthenticationDocument.MEDIA_TYPE:
document = json.loads(response.content)
if isinstance(document, dict):
links.append(document.get("id"))
return [urljoin(response.url, url) for url in links if url]
@classmethod
def opds_response_links_to_auth_document(cls, opds_response, auth_url):
"""Verify that the given response links to the given URL as its
Authentication For OPDS document.
The link might happen in the `Link` header or in the body of
an OPDS feed.
"""
links = []
try:
links = cls.opds_response_links(
opds_response, AuthenticationDocument.AUTHENTICATION_DOCUMENT_REL
)
except ValueError:
# The response itself is malformed.
return False
return auth_url in links
@classmethod
def _verify_links(
cls, rel, links, problem_title, link_regex=VerifyLinkRegexes.MAILTO
):
"""Find one or more email addresses in a list of links, all with
a given `rel`.
:param library: A Library
:param rel: The rel for this type of link.
:param links: A list of dictionaries with keys 'rel' and 'href'
:problem_title: The title to use in a ProblemDetail if no
valid links are found.
:return: Either a list of candidate links or a customized ProblemDetail.
"""
candidates = []
for link in links:
if link.get("rel") != rel:
# Wrong kind of link.
continue
uri = link.get("href")
value = cls._required_link_type(uri, problem_title, link_regex)
if isinstance(value, str):
candidates.append(value)
# There were no relevant links.
if not candidates:
problem = INVALID_CONTACT_URI.detailed(
f"No valid '{link_regex}' links found with rel={rel}"
)
problem.title = problem_title
return problem
return candidates
@classmethod
def _required_link_type(cls, uri, problem_title, link_regex):
"""Verify that `uri` is a particular type URI.
:return: Either a verified type URI or a customized ProblemDetail.
"""
problem = None
on_error = INVALID_CONTACT_URI
if not uri:
problem = on_error.detailed("No link href was provided")
elif not re.match(link_regex, uri):
problem = on_error.detailed(
_("URI must match '%s' (got: %s)") % (link_regex, uri)
)
if problem:
problem.title = problem_title
return problem
return uri