Skip to content

Commit

Permalink
Merge pull request #118 from biocommons/112-make-fastadir-thread-safe
Browse files Browse the repository at this point in the history
112 make fastadir thread safe
  • Loading branch information
theferrit32 authored Nov 13, 2023
2 parents 48958d7 + 7ef1a79 commit fc76b79
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 32 deletions.
91 changes: 91 additions & 0 deletions misc/threading-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Threading Tests

This directory contains seqrepo tests for file descriptor exhaustion, especially in threading context
The idea: make it easy to test threading and cache size combinations.


See https://github.com/biocommons/biocommons.seqrepo/issues/112



## Examples

### single thread, without fd caching

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1
2023-09-13 15:25:56 snafu biocommons.seqrepo.fastadir.fastadir[2274974] INFO File descriptor caching disabled
2023-09-13 15:25:57 snafu root[2274974] INFO Queued 1000 accessions
2023-09-13 15:25:57 snafu root[2274974] INFO Starting run with 1 threads
2023-09-13 15:26:01 snafu root[2274974] INFO <Worker(Thread-1, started 139822207334080)>: Done; processed 1000 accessions
2023-09-13 15:26:01 snafu root[2274974] INFO Fetched 1000 sequences in 4.281685499 s with 1 threads; 234 seq/sec
```

### single thread, with fd caching

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1 -f 100
2023-09-13 15:26:07 snafu biocommons.seqrepo.fastadir.fastadir[2275006] WARNING File descriptor caching enabled (size=100)
2023-09-13 15:26:08 snafu root[2275006] INFO Queued 1000 accessions
2023-09-13 15:26:08 snafu root[2275006] INFO Starting run with 1 threads
2023-09-13 15:26:08 snafu root[2275006] INFO <Worker(Thread-1, started 140250961671872)>: Done; processed 1000 accessions
2023-09-13 15:26:08 snafu root[2275006] INFO Fetched 1000 sequences in 0.41264548700000003 s with 1 threads; 2423 seq/sec
CacheInfo(hits=934, misses=66, maxsize=100, currsize=66)
```

### five threads, without caching

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 5
2023-09-13 15:26:16 snafu biocommons.seqrepo.fastadir.fastadir[2275039] INFO File descriptor caching disabled
2023-09-13 15:26:17 snafu root[2275039] INFO Queued 1000 accessions
2023-09-13 15:26:17 snafu root[2275039] INFO Starting run with 5 threads
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-5, started 139965979674304)>: Done; processed 197 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-3, started 139965996459712)>: Done; processed 200 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-4, started 139965988067008)>: Done; processed 210 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-2, started 139966004852416)>: Done; processed 198 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-1, started 139966088738496)>: Done; processed 195 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO Fetched 1000 sequences in 5.946146807 s with 5 threads; 168 seq/sec
```

### five threads, with caching :-(

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 5 -f 10
2023-09-13 15:26:32 snafu biocommons.seqrepo.fastadir.fastadir[2275104] WARNING File descriptor caching enabled (size=10)
2023-09-13 15:26:33 snafu root[2275104] INFO Queued 1000 accessions
2023-09-13 15:26:33 snafu root[2275104] INFO Starting run with 5 threads
[E::bgzf_uncompress] Inflate operation failed: invalid distance too far back
[E::fai_retrieve] Failed to retrieve block. (Seeking in a compressed, .gzi unindexed, file?)
Exception in thread Thread-5:
Traceback (most recent call last):
File "/usr/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
self.run()
```


### 1 thread, cache_size < # available fds

Same as above successful run, but Limit the process to 50 open file descriptors causes failure

```
snafu$ (ulimit -n 50; ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1 -f 100)
2023-09-13 15:31:21 snafu biocommons.seqrepo.fastadir.fastadir[2275776] WARNING File descriptor caching enabled (size=100)
2023-09-13 15:31:21 snafu root[2275776] INFO Queued 1000 accessions
2023-09-13 15:31:21 snafu root[2275776] INFO Starting run with 1 threads
[E::fai_load3_core] Failed to open FASTA index /usr/local/share/seqrepo/2021-01-29/sequences/2020/0412/1420/1586701238.5306098.fa.bgz.gzi: Too many open files
Exception in thread Thread-1:
Traceback (most recent call last):
```


## Other useful commands

```
# dynamic (/2s) list of open files in seqrepo instance directory
watch lsof +D '/usr/local/share/seqrepo/'
# arbitrarily
(ulimit -n 200; ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -a archive/accessions.gz -f 128)
```
83 changes: 83 additions & 0 deletions misc/threading-tests/threading-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python3
"""test seqrepo for file descriptor exhaustion, especially in threading context
https://github.com/biocommons/biocommons.seqrepo/issues/112
The idea: read a bunch of NMs on stdin. Fetch the sequence for each in a threading context.
"""

import argparse
import logging
import queue
import pathlib
import random
import threading
import time

from smart_open import open

from biocommons.seqrepo import SeqRepo

_logger = logging.getLogger()


class Worker(threading.Thread):
def __init__(self, q: queue.Queue, sr: SeqRepo):
self.q = q
self.sr = sr
self.n = 0
super().__init__()

def run(self):
try:
while True:
ac = self.q.get(False)
sr.fetch(ac, 0, 5)
self.q.task_done()
self.n += 1
except queue.Empty:
_logger.info(f"{self}: Done; processed {self.n} accessions")
return


def parse_args():
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("-n", "--n-threads", type=int, default=1)
ap.add_argument("-s", "--seqrepo-path", type=pathlib.Path, required=True)
ap.add_argument("-m", "--max-accessions", type=int)
ap.add_argument("-f", "--fd-cache-size", type=int, default=0)
opts = ap.parse_args()
return opts

if __name__ == "__main__":
import coloredlogs
import sys

coloredlogs.install(level="INFO")

opts = parse_args()

sr = SeqRepo(opts.seqrepo_path, fd_cache_size=opts.fd_cache_size)

acs = set(a["alias"] for a in sr.aliases.find_aliases(namespace="RefSeq", alias="NM_%"))
acs = random.sample(sorted(acs), opts.max_accessions or len(acs))
q = queue.Queue()
for ac in acs:
q.put(ac)
qs = q.qsize()
_logger.info(f"Queued {qs} accessions")

_logger.info(f"Starting run with {opts.n_threads} threads")
t0 = time.process_time()
for _ in range(opts.n_threads):
Worker(q=q, sr=sr).start()
q.join()
t1 = time.process_time()
td = t1 - t0
rate = float(qs) / td
_logger.info(f"Fetched {qs} sequences in {td} s with {opts.n_threads} threads; {rate:.0f} seq/sec")

if hasattr(sr.sequences._open_for_reading, "cache_info"):
print(sr.sequences._open_for_reading.cache_info())

File renamed without changes.
13 changes: 13 additions & 0 deletions src/biocommons/seqrepo/fastadir/fabgz.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import shutil
import stat
import subprocess
import threading

import six
from pysam import FastaFile
Expand Down Expand Up @@ -70,12 +71,24 @@ def _find_bgzip():


class FabgzReader(object):
"""
Class that implements ContextManager and wraps a FabgzReader.
The FabgzReader is returned when acquired in a contextmanager with statement.
"""
def __init__(self, filename):
self.lock = threading.Lock()
self._fh = FastaFile(filename)

def __del__(self):
self._fh.close()

def __enter__(self):
self.lock.acquire()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.lock.release()

def fetch(self, seq_id, start=None, end=None):
return self._fh.fetch(seq_id.encode("ascii"), start, end)

Expand Down
45 changes: 13 additions & 32 deletions src/biocommons/seqrepo/fastadir/fastadir.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,6 @@
expected_schema_version = 1


class LockableFabgzReader(contextlib.AbstractContextManager):
"""
Class that implements ContextManager and wraps a FabgzReader.
The FabgzReader is returned when acquired in a contextmanager with statement.
"""

def __init__(self, path):
self.lock = threading.Lock()
self.fabgz_reader = FabgzReader(path)

def __enter__(self):
self.lock.acquire()
return self.fabgz_reader

def __exit__(self, exc_type, exc_value, traceback):
self.lock.release()


class FastaDir(BaseReader, BaseWriter):
"""This class provides simple a simple key-value interface to a
directory of compressed fasta files.
Expand All @@ -70,7 +52,7 @@ class FastaDir(BaseReader, BaseWriter):
"""

def __init__(self, root_dir, writeable=False, check_same_thread=True):
def __init__(self, root_dir, writeable=False, check_same_thread=True, fd_cache_size=0):
"""Creates a new sequence repository if necessary, and then opens it"""

self._root_dir = root_dir
Expand Down Expand Up @@ -99,6 +81,18 @@ def __init__(self, root_dir, writeable=False, check_same_thread=True):
schema_version, expected_schema_version
)
)

if fd_cache_size == 0:
_logger.info(f"File descriptor caching disabled")
def _open_for_reading(path):
_logger.debug("Opening for reading uncached: " + path)
return FabgzReader(path)
else:
_logger.warning(f"File descriptor caching enabled (size={fd_cache_size})")
@functools.lru_cache(maxsize=fd_cache_size)
def _open_for_reading(path):
return FabgzReader(path)
self._open_for_reading = _open_for_reading

def __del__(self):
self._db.close()
Expand Down Expand Up @@ -238,19 +232,6 @@ def _upgrade_db(self):
migrations_to_apply = backend.to_apply(migrations)
backend.apply_migrations(migrations_to_apply)

@functools.lru_cache()
def _open_for_reading(self, path):
"""
Opens a FabgzReader to path, wraps in a LockableFabgzReader for use in context managers.
Places it in an LRU cache so file is only opened once per FastaDir object. Caller must
lock the LockableFabgzReader or otherwise handle concurrent access if sharing between
in-process concurrent execution threads, such as asyncio (e.g. WSGI/ASGI web servers)
"""
_logger.debug("Opening for reading: %s", path)
if not os.path.exists(path):
_logger.error("_open_for_reading path does not exist: %s", path)
return LockableFabgzReader(path)

def _dump_aliases(self):
import prettytable

Expand Down
2 changes: 2 additions & 0 deletions src/biocommons/seqrepo/seqrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(
translate_ncbi_namespace=None,
check_same_thread=False,
use_sequenceproxy=True,
fd_cache_size=0
):
self._root_dir = root_dir
self._upcase = upcase
Expand All @@ -122,6 +123,7 @@ def __init__(
self._seq_path,
writeable=self._writeable,
check_same_thread=self._check_same_thread,
fd_cache_size=fd_cache_size
)
self.aliases = SeqAliasDB(
self._db_path,
Expand Down

0 comments on commit fc76b79

Please sign in to comment.