From fd4a5870e4dda41e978726e8e2a9bfc156e4b09e Mon Sep 17 00:00:00 2001 From: Thomas Lemoine <43831409+Thomas-Lemoine@users.noreply.github.com> Date: Fri, 1 Sep 2023 13:39:14 -0400 Subject: [PATCH] Arbital refactor (#174) * first commit * refactor markdownify_text with summaries * added test for new arbital summary behaviour * minor refactor of parse_arbital_link * added edge cases to parse_arbital_link --- align_data/sources/arbital/arbital.py | 195 +++++++++++++++-------- tests/align_data/sources/test_arbital.py | 29 ++-- 2 files changed, 143 insertions(+), 81 deletions(-) diff --git a/align_data/sources/arbital/arbital.py b/align_data/sources/arbital/arbital.py index ab19aab7..f6087937 100644 --- a/align_data/sources/arbital/arbital.py +++ b/align_data/sources/arbital/arbital.py @@ -1,5 +1,9 @@ import re +from dataclasses import dataclass, field +from datetime import datetime, timezone import logging +from typing import List, Tuple, Iterator, Dict, Union, Any, TypedDict + import requests from datetime import datetime, timezone from dateutil.parser import parse @@ -10,81 +14,132 @@ logger = logging.getLogger(__name__) -def parse_arbital_link(contents): - text = contents[1].split(" ") - url = f"https://arbital.com/p/{text[0]}" - if len(text) > 1: - title = " ".join(text[1:]) - else: - title = url - return f"[{title}]({url})" +class Page(TypedDict, total=False): + text: str + likeableId: str + likeableType: str + title: str + editCreatedAt: str + pageCreatedAt: str + alias: str + userId: str + tagIds: str + changeLogs: List[Dict[str, Any]] -def flatten(val): - if isinstance(val, (list, tuple)): - return [item for i in val for item in flatten(i)] - return [val] +def parse_arbital_link(internal_link: str) -> str: + """ + Parses the Arbital internal link. + :param str internal_link: The internal link to parse. + :return: The parsed link. + :rtype: str -def markdownify_text(current, view): - """Recursively parse the text parts in `view` to create a markdown AST from them. + Typical format: `123 Some title` -> `[Some title](https://arbital.com/p/123)` + Special cases: + `toc:` -> `toc:` + `https://www.gwern.net/ Gwern Branwen` -> `[Gwern Branwen](https://www.gwern.net/)` + """ + page_id, *title_parts = internal_link.split(" ") + if not page_id or page_id.startswith("toc:"): + # could be a regular text bracket, ignore it + return internal_link + if page_id.startswith("http"): + # could be a regular link, ignore it + return f"[{' '.join(title_parts)}]({page_id})" + url = f"https://arbital.com/p/{page_id}" + title = " ".join(title_parts) if title_parts else url + return f"[{title}]({url})" - Arbital adds some funky extra stuff to markdown. The known things are: - * "[summary: ]" blocks to add summaries - * "[123 ]" are internal links to `<123>` - The `view` parameter should be a generator, so recursive calls can iterate over it without needing - to mess about with indexes etc. +def flatten(val: Union[List[str], Tuple[str], str]) -> List[str]: + """Flattens a nested list.""" + if isinstance(val, (list, tuple)): + return [item for sublist in val for item in flatten(sublist)] + return [val] - :param List[str] current: the list of parsed items. Should generally be passed in as `[]` - :param generator(str, str) view: a generator that returns `part` and `next_part`, where `part` is the current item - and `next_part` is a lookahead - :returns: a tuple of `(<summary string>, <markdown contents>)` +def markdownify_text(current: List[str], view: Iterator[Tuple[str, str]]) -> Tuple[str, str]: + """ + Recursively parse text segments from `view` to generate a markdown Abstract Syntax Tree (AST). + + This function helps in transitioning from Arbital's specific markdown extensions to standard markdown. It specifically + handles two main features: + - "[summary: <contents>]" blocks, which are used in Arbital to add summaries. + - "[123 <title>]" which are Arbital's internal links pointing to https://arbital.com/p/123, with link title <title>. + + Args: + :param List[str] current: A list of parsed items. Should generally be initialized as an empty list. + :param Iterator[Tuple[str, str]] view: An iterator that returns pairs of `part` and `next_part`, where `part` is the + current segment and `next_part` provides a lookahead. + + :return: <summary>, <text>, where <summary> is the summary extracted from the text, and <text> is the text with all + Arbital-specific markdown extensions replaced with standard markdown. + :rtype: Tuple[str, str] + + Example: + From the text: "[summary: A behaviorist [6w genie]]" + We get the input: + current = [] + view = iter([('[', 'summary: A behaviorist '), ('summary: A behaviorist ', '['), ('[', '6w genie'), ('6w genie', ']'), (']', ']'), (']', None)]) + The function should return: + `('A behaviorist [genie](https://arbital.com/p/6w)', '')` + + Note: + This function assumes that `view` provides a valid Arbital markdown sequence. Malformed sequences might lead to + unexpected results. """ in_link = False + summary = "" for part, next_part in view: if part == "[": # Recursively try to parse this new section - it's probably a link, but can be something else - current.append(markdownify_text([part], view)) - elif part == "]" and next_part == "(": - # mark that it's now in the url part of a markdown link - current.append("]") - in_link = True + sub_summary, text = markdownify_text([part], view) + summary += sub_summary + "\n\n" + current.append(text) + elif part == "]": - # this is the arbital summary - just join it for now, but it'll have to be handled later - if current[1].startswith("summary"): - return "".join(current[1:]) - # if this was a TODO section, then ignore it - if current[1].startswith("todo"): - return "" - # Otherwise it's an arbital link - return parse_arbital_link(current) + if next_part == "(": + # Indicate that it's in the URL part of a markdown link. + current.append(part) + in_link = True + else: + # Extract the descriptor, which might be a summary tag, TODO tag, or an Arbital internal link's "<page_id> <title>". + descriptor = current[1] + + # Handle Arbital summary. + if descriptor.startswith("summary"): + summary_tag, summary_content = "".join(current[1:]).split(":", 1) + return f"{summary_tag}: {summary_content.strip()}", "" + + # Handle TODO section (ignore it). + if descriptor.startswith("todo"): + return "", "" + + # Handle Arbital link (e.g., "6w genie" -> "[6w genie](https://arbital.com/p/6w)"). + return "", parse_arbital_link(descriptor) + elif in_link and part == ")": # this is the end of a markdown link - just join the contents, as they're already correct - return "".join(current + [part]) + return "", "".join(current + [part]) + elif in_link and current[-1] == "(" and next_part != ")": # This link is strange... looks like it could be malformed? # Assuming that it's malformed and missing a closing `)` # This will remove any additional info in the link, but that seems a reasonable price? words = part.split(" ") - return "".join(current + [words[0], ") ", " ".join(words[1:])]) + return "", "".join(current + [words[0], ") ", " ".join(words[1:])]) + else: # Just your basic text - add it to the processed parts and go on your merry way current.append(part) - # Check if the first item is the summary - if so, extract it - summary = "" - if current[0].startswith("summary"): - _, summary = re.split(r"summary[()\w]*:", current[0], 1) - current = current[1:] - # Otherwise just join all the parts back together return summary.strip(), "".join(flatten(current)).strip() -def extract_text(text): +def extract_text(text: str) -> Tuple[str, str]: parts = [i for i in re.split(r"([\[\]()])", text) if i] return markdownify_text([], zip(parts, parts[1:] + [None])) @@ -106,10 +161,10 @@ class Arbital(AlignmentDataset): "sec-fetch-dest": "empty", "accept-language": "en-US,en;q=0.9", } - titles_map = {} + titles_map: Dict[str, str] = field(default_factory=dict) @property - def items_list(self): + def items_list(self) -> List[str]: logger.info("Getting page aliases") items = [ alias @@ -122,7 +177,7 @@ def items_list(self): def get_item_key(self, item: str) -> str: return item - def process_entry(self, alias): + def process_entry(self, alias: str): try: page = self.get_page(alias) summary, text = extract_text(page["text"]) @@ -144,33 +199,37 @@ def process_entry(self, alias): except Exception as e: logger.error(f"Error getting page {alias}: {e}") return None - - def get_arbital_page_aliases(self, subspace): + + def send_post_request(self, url: str, page_alias: str, referer_base: str) -> requests.Response: headers = self.headers.copy() - headers["referer"] = f"https://arbital.com/explore/{subspace}/" - data = f'{{"pageAlias":"{subspace}"}}' - response = requests.post( - "https://arbital.com/json/explore/", headers=headers, data=data - ).json() - return list(response["pages"].keys()) + headers['referer'] = f"{referer_base}{page_alias}/" + data = f'{{"pageAlias":"{page_alias}"}}' + return requests.post(url, headers=headers, data=data) + + def get_arbital_page_aliases(self, subspace: str) -> List[str]: + response = self.send_post_request( + url='https://arbital.com/json/explore/', + page_alias=subspace, + referer_base='https://arbital.com/explore/' + ) + return list(response.json()['pages'].keys()) + + def get_page(self, alias: str) -> Page: + response = self.send_post_request( + url='https://arbital.com/json/primaryPage/', + page_alias=alias, + referer_base='https://arbital.com/p/' + ) + return response.json()['pages'][alias] @staticmethod - def _get_published_date(page): + def _get_published_date(page: Page) -> datetime | None: date_published = page.get("editCreatedAt") or page.get("pageCreatedAt") if date_published: return parse(date_published).astimezone(timezone.utc) return None - def get_page(self, alias): - headers = self.headers.copy() - headers["referer"] = "https://arbital.com/" - data = f'{{"pageAlias":"{alias}"}}' - response = requests.post( - "https://arbital.com/json/primaryPage/", headers=headers, data=data - ) - return response.json()["pages"][alias] - - def get_title(self, itemId): + def get_title(self, itemId: str) -> str | None: if title := self.titles_map.get(itemId): return title @@ -186,7 +245,7 @@ def get_title(self, itemId): return title return None - def extract_authors(self, page): + def extract_authors(self, page: Page) -> List[str]: """Get all authors of this page. This will work faster the more its used, as it only fetches info for authors it hasn't yet seen. diff --git a/tests/align_data/sources/test_arbital.py b/tests/align_data/sources/test_arbital.py index af65ed05..19ad8e97 100644 --- a/tests/align_data/sources/test_arbital.py +++ b/tests/align_data/sources/test_arbital.py @@ -15,12 +15,14 @@ @pytest.mark.parametrize( "contents, expected", ( - (["[", "123"], "[https://arbital.com/p/123](https://arbital.com/p/123)"), - (["[", "123 Some title"], "[Some title](https://arbital.com/p/123)"), + ("123", "[https://arbital.com/p/123](https://arbital.com/p/123)"), + ("123 Some title", "[Some title](https://arbital.com/p/123)"), ( - ["[", "123 Some title with multiple words"], + "123 Some title with multiple words", "[Some title with multiple words](https://arbital.com/p/123)", ), + ("https://www.gwern.net/ Gwern Branwen", "[Gwern Branwen](https://www.gwern.net/)"), + ("toc:", "toc:"), # `toc:` is a mysterious thing ), ) def test_parse_arbital_link(contents, expected): @@ -84,37 +86,38 @@ def test_markdownify_text_contents_arbital_markdown(text, expected): ( ( "[summary: summaries should be extracted] bla bla bla", - "summaries should be extracted", + ("summary: summaries should be extracted", "bla bla bla"), ), ( "[summary: \n whitespace should be stripped \n] bla bla bla", - "whitespace should be stripped", + ("summary: whitespace should be stripped", "bla bla bla"), ), ( "[summary(Bold): special summaries should be extracted] bla bla bla", - "special summaries should be extracted", + ("summary(Bold): special summaries should be extracted", "bla bla bla"), ), ( "[summary(Markdown): special summaries should be extracted] bla bla bla", - "special summaries should be extracted", + ("summary(Markdown): special summaries should be extracted", "bla bla bla"), ), ( "[summary(BLEEEE): special summaries should be extracted] bla bla bla", - "special summaries should be extracted", + ("summary(BLEEEE): special summaries should be extracted", "bla bla bla"), ), ( "[summary: markdown is handled: [bla](https://bla.bla)] bla bla bla", - "markdown is handled: [bla](https://bla.bla)", + ("summary: markdown is handled: [bla](https://bla.bla)", "bla bla bla"), ), ( "[summary: markdown is handled: [123 ble ble]] bla bla bla", - "markdown is handled: [ble ble](https://arbital.com/p/123)", + ("summary: markdown is handled: [ble ble](https://arbital.com/p/123)", "bla bla bla"), ), ), ) -def test_markdownify_text_summary(text, expected): - summary, _ = extract_text(text) - assert summary == expected +def test_markdownify_text_summary_and_content(text, expected): + summary, text = extract_text(text) + assert summary == expected[0] + assert text == expected[1] @pytest.fixture