Skip to content

Commit

Permalink
WIP: Investigate balancing threading with fd exhaustion
Browse files Browse the repository at this point in the history
  • Loading branch information
reece committed Sep 13, 2023
1 parent 87e8b8e commit 8e94914
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 1 deletion.
84 changes: 84 additions & 0 deletions misc/threading-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""test seqrepo for fd-exhaustion in threading context
https://github.com/biocommons/biocommons.seqrepo/issues/112
The idea: read a bunch of NMs on stdin. Fetch the sequence for each in a threading context.
"""

import argparse
import logging
import queue
import pathlib
import threading
import time

from biocommons.seqrepo import SeqRepo

_logger = logging.getLogger()


class Worker(threading.Thread):
def __init__(self, q: queue.Queue, sr: SeqRepo):
self.q = q
self.sr = sr
self.n = 0
super().__init__()

def run(self):
try:
while True:
ac = self.q.get(False)
sr.fetch(ac, 0, 5)
self.q.task_done()
self.n += 1
except queue.Empty:
_logger.info(f"{self}: Done; processed {self.n} accessions")
return


def parse_args():
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("-n", "--n-threads", type=int, default=1)
ap.add_argument("-s", "--seqrepo-path", type=pathlib.Path, required=True)
ap.add_argument("-a", "--accessions-path", type=pathlib.Path, required=True)
ap.add_argument("-m", "--max-accessions", type=int)
ap.add_argument("-l", "--lru-cache-size", type=int, default=0)
opts = ap.parse_args()
return opts

if __name__ == "__main__":
import coloredlogs
import sys

coloredlogs.install(level="INFO")

opts = parse_args()

sr = SeqRepo(opts.seqrepo_path)
sr.sequences._lru_cache_size = opts.lru_cache_size
sr.sequences._define_ofrc(opts.lru_cache_size)
_logger.info(f"{sr.sequences._lru_cache_size=}")

acs = [ac.strip() for ac in opts.accessions_path.open()]
if opts.max_accessions is not None:
acs = acs[:opts.max_accessions]
q = queue.Queue()
for ac in acs:
q.put(ac)
qs = q.qsize()

_logger.info(f"Queued {qs} accessions from {opts.accessions_path}")

_logger.info(f"Starting run with {opts.n_threads} threads")
t0 = time.process_time()
for _ in range(opts.n_threads):
Worker(q=q, sr=sr).start()
q.join()
t1 = time.process_time()
td = t1 - t0
rate = float(qs) / td
_logger.info(f"Fetched {qs} sequences in {td} s with {opts.n_threads} threads; {rate:.0f} seq/sec")

print(sr.sequences._open_for_reading_cached.cache_info())
24 changes: 23 additions & 1 deletion src/biocommons/seqrepo/fastadir/fastadir.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, root_dir, writeable=False, check_same_thread=True):
self._writing = None
self._db = None
self._writeable = writeable
self._lru_cache_size = 0

if self._writeable:
os.makedirs(self._root_dir, exist_ok=True)
Expand Down Expand Up @@ -209,10 +210,31 @@ def _upgrade_db(self):
migrations_to_apply = backend.to_apply(migrations)
backend.apply_migrations(migrations_to_apply)

@functools.lru_cache()

## =========
## ↓↓ UNDER CONSTRUCTION ↓↓

def _open_for_reading(self, path):
if self._lru_cache_size:
return self._open_for_reading_cached(path)
return self._open_for_reading_uncached(path)

# enable run-time setting of cache size
def _define_ofrc(self, cache_size):
@functools.lru_cache(maxsize=cache_size)
def ofrc(path):
return self._open_for_reading_uncached(path)
self._open_for_reading_cached = ofrc

@functools.lru_cache()
def _open_for_reading_cached_orig(self, path):
return self._open_for_reading_uncached(path)

def _open_for_reading_uncached(self, path):
_logger.debug("Opening for reading: " + path)
return FabgzReader(path)

## ==== END

def _dump_aliases(self):
import prettytable
Expand Down

0 comments on commit 8e94914

Please sign in to comment.