Skip to content

Commit

Permalink
Add: don't run the metadata out-of-process and sys.intern() strings
Browse files Browse the repository at this point in the history
The first prevents the need for twice the memory during any change
to the metadata. Instead, we yield on every page to allow aiohttp
to serve requests in between analyzing pages. This does increase
latency, but not by such an amount that it is annoying. And, it is
a rare event (rebuilding metadata), so it is unlikely people will
really notice.

sys.intern() reduces memory by 25% after reload, and by 10% when
there was no cache.
  • Loading branch information
TrueBrain committed Nov 12, 2020
1 parent b6a42c0 commit 04dc26f
Showing 1 changed file with 58 additions and 44 deletions.
102 changes: 58 additions & 44 deletions truewiki/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import json
import logging
import os
import sys
import time

from collections import defaultdict
from concurrent import futures
from openttd_helpers import click_helper

from . import singleton
Expand Down Expand Up @@ -43,13 +43,12 @@ def page():
RELOAD_BUSY = asyncio.Event()
RELOAD_BUSY.set()

METADATA_READY = asyncio.Event()


def translation_callback(wtp, wiki_page, page):
for wikilink in wtp.wikilinks:
if wikilink.target.startswith("Translation:"):
target = wikilink.target[len("Translation:") :].strip()
target = sys.intern(target)
PAGES[page]["translations"].append(target)
TRANSLATIONS[target].append(page)

Expand All @@ -58,6 +57,7 @@ def category_callback(wtp, wiki_page, page):
for wikilink in wtp.wikilinks:
if wikilink.target.startswith("Category:"):
target = wikilink.target[len("Category:") :].strip()
target = sys.intern(target)
PAGES[page]["categories"].append(target)
CATEGORIES[target].append(page)

Expand All @@ -66,6 +66,7 @@ def file_callback(wtp, wiki_page, page):
for wikilink in wtp.wikilinks:
if wikilink.target.startswith("File:"):
target = wikilink.target[len("File:") :].strip()
target = sys.intern(target)
PAGES[page]["files"].append(target)
FILES[target].append(page)

Expand All @@ -78,6 +79,7 @@ def links_callback(wtp, wiki_page, page):
else:
target = wikilink.target
target = target.strip()
target = sys.intern(target)

PAGES[page]["links"].append(target)
LINKS[target].append(page)
Expand All @@ -91,6 +93,7 @@ def template_callback(wtp, wiki_page, page):
namespace = "Template"

target = f"{namespace}/{template}".strip()
target = sys.intern(target)
PAGES[page]["templates"].append(target)
TEMPLATES[target].append(page)

Expand Down Expand Up @@ -151,7 +154,14 @@ def _analyze_page(page):
callback(wtp, wiki_page, page)


def _page_changed(page, notified=None):
async def _page_changed(page, notified=None):
page = sys.intern(page)

# Allow other tasks to do something now. This fraction is sufficient
# to still return pages (while indexing), although with an increased
# latency.
await asyncio.sleep(0)

if notified is None:
notified = set()

Expand All @@ -167,10 +177,10 @@ def _page_changed(page, notified=None):

# Notify all dependencies of a page change.
for dependency in TEMPLATES[page] + dependencies:
_page_changed(dependency, notified)
await _page_changed(dependency, notified)


def _scan_folder(folder, notified=None):
async def _scan_folder(folder, notified=None):
pages_seen = set()

if notified is None:
Expand All @@ -179,7 +189,7 @@ def _scan_folder(folder, notified=None):
for node in glob.glob(f"{singleton.STORAGE.folder}/{folder}/*"):
if os.path.isdir(node):
folder = node[len(singleton.STORAGE.folder) + 1 :]
pages_seen.update(_scan_folder(folder, notified))
pages_seen.update(await _scan_folder(folder, notified))
continue

if node.endswith(".mediawiki"):
Expand All @@ -199,55 +209,62 @@ def _scan_folder(folder, notified=None):
continue
PAGES[page]["digest"] = digest

_page_changed(page, notified)
await _page_changed(page, notified)

return pages_seen


def load_metadata():
METADATA_READY.clear()

loop = asyncio.get_event_loop()
loop.create_task(out_of_process("load_metadata", None))
loop.create_task(metadata_queue("load_metadata", None))


def page_changed(pages):
loop = asyncio.get_event_loop()
loop.create_task(out_of_process("page_changed", pages))

loop.create_task(metadata_queue("page_changed", pages))

async def out_of_process(func, pages):
global CATEGORIES, FILES, LANGUAGES, LINKS, PAGES, PAGES_LC, TEMPLATES, TRANSLATIONS

async def metadata_queue(func, pages):
await RELOAD_BUSY.wait()
RELOAD_BUSY.clear()

try:
reload_helper = ReloadHelper(pages)

# Run the reload in a new process, so we don't block the rest of the
# server while doing this job.
loop = asyncio.get_event_loop()
with futures.ProcessPoolExecutor(max_workers=1) as executor:
task = loop.run_in_executor(executor, getattr(reload_helper, func))
(
CATEGORIES,
FILES,
LANGUAGES,
LINKS,
PAGES,
PAGES_LC,
TEMPLATES,
TRANSLATIONS,
) = await task
instance = MetadataQueue(pages)
instance_func = getattr(instance, func)
await instance_func()
finally:
RELOAD_BUSY.set()

if func == "load_metadata":
METADATA_READY.set()

def object_pairs_hook(items):
"""
Use sys.intern() over every possible string we can sniff out.
We do this, as we reuse the same string a lot, and this heavily reduces
memory usage.
"""
result = {}
for key, value in items:
key = sys.intern(key)

if isinstance(value, dict):
result[key] = object_pairs_hook(value.items())
elif isinstance(value, list):
result[key] = [sys.intern(v) for v in value]
elif isinstance(value, str):
result[key] = sys.intern(value)
elif isinstance(value, int):
result[key] = value
else:
raise NotImplementedError(f"Unknown type in json.loads() for {value}")

# Validate we didn't change something
assert result[key] == value

return result

class ReloadHelper:

class MetadataQueue:
def __init__(self, pages):
self.pages = pages

Expand All @@ -266,7 +283,7 @@ def _post(self):
set(TRANSLATIONS[translation]), key=lambda name: (name.find("en/") < 0, name)
)

# Ensure thare are no duplicated in PAGES too, and fill the PAGES_LC
# Ensure there are no duplicated in PAGES too, and fill the PAGES_LC
# with a mapping from lowercase to real page name.
PAGES_LC.clear()
for page, page_data in PAGES.items():
Expand All @@ -278,17 +295,15 @@ def _post(self):

PAGES_LC[page.lower()] = page

def page_changed(self):
async def page_changed(self):
for page in self.pages:
_page_changed(page)
await _page_changed(page)
self._post()

return CATEGORIES, FILES, LANGUAGES, LINKS, PAGES, PAGES_LC, TEMPLATES, TRANSLATIONS

def _load_metadata_from_cache(self):
with open(CACHE_FILENAME, "r") as fp:
try:
payload = json.loads(fp.read())
payload = json.loads(fp.read(), object_pairs_hook=object_pairs_hook)
except json.JSONDecodeError:
log.info("Cache was corrupted; reloading metadata ...")
return
Expand All @@ -303,7 +318,7 @@ def _load_metadata_from_cache(self):
TEMPLATES.update(payload["templates"])
TRANSLATIONS.update(payload["translations"])

def load_metadata(self):
async def load_metadata(self):
start = time.time()
log.info("Loading metadata (this can take a while the first run) ...")

Expand All @@ -329,7 +344,7 @@ def load_metadata(self):
if os.path.isdir(node):
language = node.split("/")[-1]
LANGUAGES.add(language)
pages_seen.update(_scan_folder(f"{subfolder}", notified))
pages_seen.update(await _scan_folder(f"{subfolder}", notified))

# If we come from cache, validate that no file got removed; we should
# forget about those.
Expand Down Expand Up @@ -359,7 +374,6 @@ def load_metadata(self):
)

log.info(f"Loading metadata done; took {time.time() - start:.2f} seconds")
return CATEGORIES, FILES, LANGUAGES, LINKS, PAGES, PAGES_LC, TEMPLATES, TRANSLATIONS


@click_helper.extend
Expand Down

0 comments on commit 04dc26f

Please sign in to comment.