diff --git a/scraper/src/libretexts2zim/client.py b/scraper/src/libretexts2zim/client.py
index a88c8d7..13ebe71 100644
--- a/scraper/src/libretexts2zim/client.py
+++ b/scraper/src/libretexts2zim/client.py
@@ -1,6 +1,9 @@
import datetime
+import json
import re
from collections.abc import Callable
+from pathlib import Path
+from typing import Any
import requests
from bs4 import BeautifulSoup, NavigableString
@@ -57,7 +60,7 @@ def placeholders(
class LibreTextsClient:
"""Utility functions to read data from libretexts."""
- def __init__(self, library_slug: str) -> None:
+ def __init__(self, library_slug: str, cache_folder: Path) -> None:
"""Initializes LibreTextsClient.
Paremters:
@@ -65,41 +68,146 @@ def __init__(self, library_slug: str) -> None:
e.g. `https://geo.libretexts.org/`.
"""
self.library_slug = library_slug
+ self.deki_token = None
+ self.cache_folder = cache_folder
@property
def library_url(self) -> str:
- return f"https://{self.library_slug}.libretexts.org/"
+ return f"https://{self.library_slug}.libretexts.org"
- def _get_text(self, url: str) -> str:
+ @property
+ def api_url(self) -> str:
+ return f"{self.library_url}/@api/deki"
+
+ def _get_cache_file(self, url_subpath_and_query: str) -> Path:
+ """Get location where HTTP result should be cached"""
+ if url_subpath_and_query.startswith("/"):
+ url_subpath_and_query = url_subpath_and_query[1:]
+ if url_subpath_and_query.endswith("/"):
+ url_subpath_and_query += "index"
+ return self.cache_folder / url_subpath_and_query
+
+ def _get_text(self, url_subpath_and_query: str) -> str:
"""Perform a GET request and return the response as decoded text."""
- logger.debug(f"Fetching {url}")
+ cache_file = self._get_cache_file(f"text{url_subpath_and_query}")
+ if cache_file.exists():
+ return cache_file.read_text()
+ cache_file.parent.mkdir(parents=True, exist_ok=True)
+
+ full_url = f"{self.library_url}{url_subpath_and_query}"
+ logger.debug(f"Fetching {full_url}")
resp = requests.get(
- url=url,
+ url=full_url,
allow_redirects=True,
timeout=HTTP_TIMEOUT_SECONDS,
)
resp.raise_for_status()
+ cache_file.write_text(resp.text)
return resp.text
+ def _get_api_resp(
+ self, api_sub_path_and_query: str, timeout: float
+ ) -> requests.Response:
+ api_url = f"{self.api_url}{api_sub_path_and_query}"
+ logger.debug(f"Calling API at {api_url}")
+ resp = requests.get(
+ url=api_url,
+ headers={"x-deki-token": self.deki_token},
+ timeout=timeout,
+ )
+ resp.raise_for_status()
+ return resp
+
+ def _get_api_json(
+ self, api_sub_path: str, timeout: float = HTTP_TIMEOUT_SECONDS
+ ) -> Any:
+ cache_file = self._get_cache_file(f"api_json{api_sub_path}")
+ if cache_file.exists():
+ return json.loads(cache_file.read_text())
+ cache_file.parent.mkdir(parents=True, exist_ok=True)
+ resp = self._get_api_resp(
+ f"{api_sub_path}?dream.out.format=json", timeout=timeout
+ )
+ result = resp.json()
+ cache_file.write_text(json.dumps(result))
+ return result
+
+ def _get_api_content(
+ self, api_sub_path: str, timeout: float = HTTP_TIMEOUT_SECONDS
+ ) -> bytes | Any:
+ cache_file = self._get_cache_file(f"api_content{api_sub_path}")
+ if cache_file.exists():
+ return json.loads(cache_file.read_text())
+ cache_file.parent.mkdir(parents=True, exist_ok=True)
+ resp = self._get_api_resp(api_sub_path, timeout=timeout)
+ result = resp.content
+ cache_file.write_bytes(result)
+ return result
+
def get_home(self) -> LibreTextsHome:
- home_content = self._get_text(self.library_url)
+ """Retrieves data about home page by crawling home page"""
+ home_content = self._get_text("/")
soup = _get_soup(home_content)
+ self.deki_token = _get_deki_token_from_home(soup)
return LibreTextsHome(
welcome_text_paragraphs=_get_welcome_text_from_home(soup),
welcome_image_url=_get_welcome_image_url_from_home(soup),
shelves=[],
)
+ def get_deki_token(self) -> str:
+ """Retrieves the API token to use to query the website API"""
+ if self.deki_token:
+ return self.deki_token
+
+ home_content = self._get_text("/")
+
+ soup = _get_soup(home_content)
+ self.deki_token = _get_deki_token_from_home(soup)
+ return self.deki_token
+
+ def get_all_pages_ids(self):
+ """Returns the IDs of all pages on current website, exploring the whole tree"""
+
+ tree = self._get_api_json("/pages/home/tree", timeout=HTTP_TIMEOUT_SECONDS * 2)
+
+ page_ids: list[str] = []
+
+ def _get_page_ids(page_node: Any) -> None:
+ page_ids.append(page_node["@id"])
+ if not page_node["subpages"]:
+ return
+ if "@id" in page_node["subpages"]["page"]:
+ _get_page_ids(page_node["subpages"]["page"])
+ else:
+ for page in page_node["subpages"]["page"]:
+ _get_page_ids(page)
+
+ _get_page_ids(tree["page"])
+
+ return page_ids
+
+ def get_root_page_id(self) -> str:
+ """Returns the ID the root of the tree of pages"""
+
+ tree = self._get_api_json("/pages/home/tree", timeout=HTTP_TIMEOUT_SECONDS * 2)
+ return tree["page"]["@id"]
+
def _get_soup(content: str) -> BeautifulSoup:
+ """Return a BeautifulSoup soup from textual content
+
+ This is a utility function to ensure same parser is used in the whole codebase
+ """
return BeautifulSoup(content, "html.parser")
def _get_welcome_image_url_from_home(soup: BeautifulSoup) -> str:
+ """Return the URL of the image found on home header"""
branding_div = soup.find("div", class_="LTBranding")
if not branding_div:
raise LibreTextsParsingError("
with class 'LTBranding' not found")
@@ -119,6 +227,7 @@ def _get_welcome_image_url_from_home(soup: BeautifulSoup) -> str:
def _get_welcome_text_from_home(soup: BeautifulSoup) -> list[str]:
+ """Returns the text found on home page"""
content_section = soup.find("section", class_="mt-content-container")
if not content_section or isinstance(content_section, NavigableString):
raise LibreTextsParsingError(
@@ -133,3 +242,22 @@ def _get_welcome_text_from_home(soup: BeautifulSoup) -> list[str]:
if paragraph_text := paragraph.text:
welcome_text.append(paragraph_text)
return welcome_text
+
+
+def _get_deki_token_from_home(soup: BeautifulSoup) -> str:
+ global_settings = soup.find("script", id="mt-global-settings")
+ if not global_settings:
+ logger.debug("home content:")
+ logger.debug(soup)
+ raise Exception(
+ "Failed to retrieve API token to query website API, missing "
+ "mt-global-settings script"
+ )
+ x_deki_token = json.loads(global_settings.text).get("apiToken", None)
+ if not x_deki_token:
+ logger.debug("mt-global-settings script content:")
+ logger.debug(global_settings.text)
+ raise Exception(
+ "Failed to retrieve API token to query website API, missing apiToken."
+ )
+ return x_deki_token
diff --git a/scraper/src/libretexts2zim/entrypoint.py b/scraper/src/libretexts2zim/entrypoint.py
index 2ed9c37..e01cf7c 100644
--- a/scraper/src/libretexts2zim/entrypoint.py
+++ b/scraper/src/libretexts2zim/entrypoint.py
@@ -1,6 +1,9 @@
import argparse
import logging
import os
+from pathlib import Path
+
+from zimscraperlib.zim.filesystem import validate_zimfile_creatable
from libretexts2zim.client import LibreTextsClient
from libretexts2zim.constants import (
@@ -46,11 +49,18 @@ def main() -> None:
parser.add_argument(
"--output",
- help="Output folder for ZIMs. Default: /output",
- default="/output",
+ help="Output folder for ZIMs. Default: output",
+ default="output",
dest="output_folder",
)
+ parser.add_argument(
+ "--tmp",
+ help="Temporary folder for cache, intermediate files, ... Default: tmp",
+ default="tmp",
+ dest="tmp_folder",
+ )
+
parser.add_argument(
"--zimui-dist",
type=str,
@@ -84,21 +94,41 @@ def main() -> None:
required=True,
)
+ parser.add_argument(
+ "--keep-cache",
+ help="Keep cache of website responses",
+ action="store_true",
+ default=False,
+ )
+
args = parser.parse_args()
logger.setLevel(level=logging.DEBUG if args.debug else logging.INFO)
+ output_folder = Path(args.output_folder)
+ output_folder.mkdir(exist_ok=True)
+ validate_zimfile_creatable(output_folder, "test.txt")
+
+ tmp_folder = Path(args.tmp_folder)
+ tmp_folder.mkdir(exist_ok=True)
+ validate_zimfile_creatable(tmp_folder, "test.txt")
+
try:
zim_config = ZimConfig.of(args)
doc_filter = ContentFilter.of(args)
+
+ cache_folder = tmp_folder / "cache"
+ cache_folder.mkdir()
+
libretexts_client = LibreTextsClient(
library_slug=args.library_slug,
+ cache_folder=cache_folder,
)
Generator(
libretexts_client=libretexts_client,
zim_config=zim_config,
- output_folder=args.output_folder,
+ output_folder=output_folder,
zimui_dist=args.zimui_dist,
content_filter=doc_filter,
overwrite_existing_zim=args.overwrite,
diff --git a/scraper/src/libretexts2zim/generator.py b/scraper/src/libretexts2zim/generator.py
index 314e03c..e3a4417 100644
--- a/scraper/src/libretexts2zim/generator.py
+++ b/scraper/src/libretexts2zim/generator.py
@@ -1,6 +1,5 @@
import argparse
import datetime
-import os
import re
from io import BytesIO
from pathlib import Path
@@ -11,6 +10,7 @@
)
from zimscraperlib.image import resize_image
from zimscraperlib.zim import Creator
+from zimscraperlib.zim.filesystem import validate_zimfile_creatable
from zimscraperlib.zim.indexing import IndexData
from libretexts2zim.client import LibreTextsClient, LibreTextsMetadata
@@ -107,7 +107,7 @@ def __init__(
libretexts_client: LibreTextsClient,
zim_config: ZimConfig,
content_filter: ContentFilter,
- output_folder: str,
+ output_folder: Path,
zimui_dist: str,
*,
overwrite_existing_zim: bool,
@@ -129,8 +129,6 @@ def __init__(
self.zimui_dist = Path(zimui_dist)
self.overwrite_existing_zim = overwrite_existing_zim
- os.makedirs(self.output_folder, exist_ok=True)
-
self.zim_illustration_path = self.libretexts_newsite_path(
"header_logo_mini.png"
)
@@ -157,11 +155,17 @@ def run(self) -> Path:
name=self.zim_config.library_name, slug=self.libretexts_client.library_slug
)
formatted_config = self.zim_config.format(metadata.placeholders())
- zim_path = Path(self.output_folder, f"{formatted_config.file_name_format}.zim")
+ zim_file_name = f"{formatted_config.file_name_format}.zim"
+ zim_path = self.output_folder / zim_file_name
+
+ if zim_path.exists():
+ if self.overwrite_existing_zim:
+ zim_path.unlink()
+ else:
+ logger.error(f" {zim_path} already exists, aborting.")
+ raise SystemExit(f"ZIM file already exists at {zim_path}")
- if zim_path.exists() and not self.overwrite_existing_zim:
- logger.error(f" {zim_path} already exists, aborting.")
- raise SystemExit(f"ZIM file already exists at {zim_path}")
+ validate_zimfile_creatable(self.output_folder, zim_file_name)
logger.info(f" Writing to: {zim_path}")
diff --git a/scraper/tests-integration/conftest.py b/scraper/tests-integration/conftest.py
index 98d237a..250300d 100644
--- a/scraper/tests-integration/conftest.py
+++ b/scraper/tests-integration/conftest.py
@@ -1,3 +1,8 @@
+import tempfile
+from collections.abc import Generator
+from pathlib import Path
+from typing import Any
+
import pytest
@@ -6,6 +11,12 @@ def libretexts_slug() -> str:
return "geo"
+@pytest.fixture(scope="module")
+def cache_folder() -> Generator[Path, Any, Any]:
+ with tempfile.TemporaryDirectory() as tmpdir:
+ yield Path(tmpdir)
+
+
@pytest.fixture(scope="module")
def libretexts_url(libretexts_slug: str) -> str:
return f"https://{libretexts_slug}.libretexts.org"
diff --git a/scraper/tests-integration/test_client.py b/scraper/tests-integration/test_client.py
index 2e4889c..5cf9ebf 100644
--- a/scraper/tests-integration/test_client.py
+++ b/scraper/tests-integration/test_client.py
@@ -1,4 +1,5 @@
import io
+from pathlib import Path
import pytest
from zimscraperlib.download import (
@@ -10,8 +11,8 @@
@pytest.fixture(scope="module")
-def client(libretexts_slug: str) -> LibreTextsClient:
- return LibreTextsClient(library_slug=libretexts_slug)
+def client(libretexts_slug: str, cache_folder: Path) -> LibreTextsClient:
+ return LibreTextsClient(library_slug=libretexts_slug, cache_folder=cache_folder)
@pytest.fixture(scope="module")
@@ -19,6 +20,43 @@ def home(client: LibreTextsClient) -> LibreTextsHome:
return client.get_home()
+@pytest.fixture(scope="module")
+def deki_token(client: LibreTextsClient) -> str:
+ return client.get_deki_token()
+
+
+@pytest.fixture(scope="module")
+def minimum_number_of_pages() -> int:
+ return 8000
+
+
+@pytest.fixture(scope="module")
+def root_page_id() -> str:
+ return "34"
+
+
+def test_get_deki_token(deki_token: str):
+ """Ensures we achieve to get a deki_token"""
+ assert deki_token
+
+
+def test_get_all_pages_ids(
+ client: LibreTextsClient,
+ minimum_number_of_pages: int,
+ deki_token: str, # noqa: ARG001
+):
+ pages_ids = client.get_all_pages_ids()
+ assert len(pages_ids) > minimum_number_of_pages
+
+
+def test_get_root_page_id(
+ client: LibreTextsClient,
+ root_page_id: str,
+ deki_token: str, # noqa: ARG001
+):
+ assert client.get_root_page_id() == root_page_id
+
+
def test_get_home_image_url(home: LibreTextsHome):
"""Ensures proper image url is retrieved"""
assert home.welcome_image_url == "https://cdn.libretexts.net/Logos/geo_full.png"