Skip to content

Commit

Permalink
updated threading test code and added README
Browse files Browse the repository at this point in the history
  • Loading branch information
reece committed Sep 13, 2023
1 parent 8e94914 commit 00d8cda
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 37 deletions.
91 changes: 91 additions & 0 deletions misc/threading-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Threading Tests

This directory contains seqrepo tests for file descriptor exhaustion, especially in threading context
The idea: make it easy to test threading and cache size combinations.


See https://github.com/biocommons/biocommons.seqrepo/issues/112



## Examples

### single thread, without fd caching

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1
2023-09-13 15:25:56 snafu biocommons.seqrepo.fastadir.fastadir[2274974] INFO File descriptor caching disabled
2023-09-13 15:25:57 snafu root[2274974] INFO Queued 1000 accessions
2023-09-13 15:25:57 snafu root[2274974] INFO Starting run with 1 threads
2023-09-13 15:26:01 snafu root[2274974] INFO <Worker(Thread-1, started 139822207334080)>: Done; processed 1000 accessions
2023-09-13 15:26:01 snafu root[2274974] INFO Fetched 1000 sequences in 4.281685499 s with 1 threads; 234 seq/sec
```

### single thread, with fd caching

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1 -f 100
2023-09-13 15:26:07 snafu biocommons.seqrepo.fastadir.fastadir[2275006] WARNING File descriptor caching enabled (size=100)
2023-09-13 15:26:08 snafu root[2275006] INFO Queued 1000 accessions
2023-09-13 15:26:08 snafu root[2275006] INFO Starting run with 1 threads
2023-09-13 15:26:08 snafu root[2275006] INFO <Worker(Thread-1, started 140250961671872)>: Done; processed 1000 accessions
2023-09-13 15:26:08 snafu root[2275006] INFO Fetched 1000 sequences in 0.41264548700000003 s with 1 threads; 2423 seq/sec
CacheInfo(hits=934, misses=66, maxsize=100, currsize=66)
```

### five threads, without caching

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 5
2023-09-13 15:26:16 snafu biocommons.seqrepo.fastadir.fastadir[2275039] INFO File descriptor caching disabled
2023-09-13 15:26:17 snafu root[2275039] INFO Queued 1000 accessions
2023-09-13 15:26:17 snafu root[2275039] INFO Starting run with 5 threads
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-5, started 139965979674304)>: Done; processed 197 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-3, started 139965996459712)>: Done; processed 200 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-4, started 139965988067008)>: Done; processed 210 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-2, started 139966004852416)>: Done; processed 198 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-1, started 139966088738496)>: Done; processed 195 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO Fetched 1000 sequences in 5.946146807 s with 5 threads; 168 seq/sec
```

### five threads, with caching :-(

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 5 -f 10
2023-09-13 15:26:32 snafu biocommons.seqrepo.fastadir.fastadir[2275104] WARNING File descriptor caching enabled (size=10)
2023-09-13 15:26:33 snafu root[2275104] INFO Queued 1000 accessions
2023-09-13 15:26:33 snafu root[2275104] INFO Starting run with 5 threads
[E::bgzf_uncompress] Inflate operation failed: invalid distance too far back
[E::fai_retrieve] Failed to retrieve block. (Seeking in a compressed, .gzi unindexed, file?)
Exception in thread Thread-5:
Traceback (most recent call last):
File "/usr/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
self.run()
```


### 1 thread, cache_size < # available fds

Same as above successful run, but Limit the process to 50 open file descriptors causes failure

```
snafu$ (ulimit -n 50; ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1 -f 100)
2023-09-13 15:31:21 snafu biocommons.seqrepo.fastadir.fastadir[2275776] WARNING File descriptor caching enabled (size=100)
2023-09-13 15:31:21 snafu root[2275776] INFO Queued 1000 accessions
2023-09-13 15:31:21 snafu root[2275776] INFO Starting run with 1 threads
[E::fai_load3_core] Failed to open FASTA index /usr/local/share/seqrepo/2021-01-29/sequences/2020/0412/1420/1586701238.5306098.fa.bgz.gzi: Too many open files
Exception in thread Thread-1:
Traceback (most recent call last):
```


## Other useful commands

```
# dynamic (/2s) list of open files in seqrepo instance directory
watch lsof +D '/usr/local/share/seqrepo/'
# arbitrarily
(ulimit -n 200; ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -a archive/accessions.gz -f 128)
```
25 changes: 12 additions & 13 deletions misc/threading-test → misc/threading-tests/threading-test
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
"""test seqrepo for fd-exhaustion in threading context
"""test seqrepo for file descriptor exhaustion, especially in threading context
https://github.com/biocommons/biocommons.seqrepo/issues/112
Expand All @@ -11,9 +11,12 @@ import argparse
import logging
import queue
import pathlib
import random
import threading
import time

from smart_open import open

from biocommons.seqrepo import SeqRepo

_logger = logging.getLogger()
Expand Down Expand Up @@ -42,9 +45,8 @@ def parse_args():
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("-n", "--n-threads", type=int, default=1)
ap.add_argument("-s", "--seqrepo-path", type=pathlib.Path, required=True)
ap.add_argument("-a", "--accessions-path", type=pathlib.Path, required=True)
ap.add_argument("-m", "--max-accessions", type=int)
ap.add_argument("-l", "--lru-cache-size", type=int, default=0)
ap.add_argument("-f", "--fd-cache-size", type=int, default=0)
opts = ap.parse_args()
return opts

Expand All @@ -56,20 +58,15 @@ if __name__ == "__main__":

opts = parse_args()

sr = SeqRepo(opts.seqrepo_path)
sr.sequences._lru_cache_size = opts.lru_cache_size
sr.sequences._define_ofrc(opts.lru_cache_size)
_logger.info(f"{sr.sequences._lru_cache_size=}")
sr = SeqRepo(opts.seqrepo_path, fd_cache_size=opts.fd_cache_size)

acs = [ac.strip() for ac in opts.accessions_path.open()]
if opts.max_accessions is not None:
acs = acs[:opts.max_accessions]
acs = set(a["alias"] for a in sr.aliases.find_aliases(namespace="RefSeq", alias="NM_%"))
acs = random.sample(sorted(acs), opts.max_accessions or len(acs))
q = queue.Queue()
for ac in acs:
q.put(ac)
qs = q.qsize()

_logger.info(f"Queued {qs} accessions from {opts.accessions_path}")
_logger.info(f"Queued {qs} accessions")

_logger.info(f"Starting run with {opts.n_threads} threads")
t0 = time.process_time()
Expand All @@ -81,4 +78,6 @@ if __name__ == "__main__":
rate = float(qs) / td
_logger.info(f"Fetched {qs} sequences in {td} s with {opts.n_threads} threads; {rate:.0f} seq/sec")

print(sr.sequences._open_for_reading_cached.cache_info())
if hasattr(sr.sequences._open_for_reading, "cache_info"):
print(sr.sequences._open_for_reading.cache_info())

File renamed without changes.
36 changes: 12 additions & 24 deletions src/biocommons/seqrepo/fastadir/fastadir.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,14 @@ class FastaDir(BaseReader, BaseWriter):
"""

def __init__(self, root_dir, writeable=False, check_same_thread=True):
def __init__(self, root_dir, writeable=False, check_same_thread=True, fd_cache_size=0):
"""Creates a new sequence repository if necessary, and then opens it"""

self._root_dir = root_dir
self._db_path = os.path.join(self._root_dir, "db.sqlite3")
self._writing = None
self._db = None
self._writeable = writeable
self._lru_cache_size = 0

if self._writeable:
os.makedirs(self._root_dir, exist_ok=True)
Expand All @@ -80,6 +79,17 @@ def __init__(self, root_dir, writeable=False, check_same_thread=True):
schema_version, expected_schema_version
)
)

if fd_cache_size == 0:
_logger.info(f"File descriptor caching disabled")
self._open_for_reading = self._open_for_reading_uncached
else:
_logger.warning(f"File descriptor caching enabled (size={fd_cache_size})")
@functools.lru_cache(maxsize=fd_cache_size)
def _open_for_reading_cached(path):
return self._open_for_reading_uncached(path)
self._open_for_reading = _open_for_reading_cached


# ############################################################################
# Special methods
Expand Down Expand Up @@ -210,31 +220,9 @@ def _upgrade_db(self):
migrations_to_apply = backend.to_apply(migrations)
backend.apply_migrations(migrations_to_apply)


## =========
## ↓↓ UNDER CONSTRUCTION ↓↓

def _open_for_reading(self, path):
if self._lru_cache_size:
return self._open_for_reading_cached(path)
return self._open_for_reading_uncached(path)

# enable run-time setting of cache size
def _define_ofrc(self, cache_size):
@functools.lru_cache(maxsize=cache_size)
def ofrc(path):
return self._open_for_reading_uncached(path)
self._open_for_reading_cached = ofrc

@functools.lru_cache()
def _open_for_reading_cached_orig(self, path):
return self._open_for_reading_uncached(path)

def _open_for_reading_uncached(self, path):
_logger.debug("Opening for reading: " + path)
return FabgzReader(path)

## ==== END

def _dump_aliases(self):
import prettytable
Expand Down
2 changes: 2 additions & 0 deletions src/biocommons/seqrepo/seqrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(
translate_ncbi_namespace=None,
check_same_thread=False,
use_sequenceproxy=True,
fd_cache_size=0
):
self._root_dir = root_dir
self._upcase = upcase
Expand All @@ -122,6 +123,7 @@ def __init__(
self._seq_path,
writeable=self._writeable,
check_same_thread=self._check_same_thread,
fd_cache_size=fd_cache_size
)
self.aliases = SeqAliasDB(
self._db_path,
Expand Down

0 comments on commit 00d8cda

Please sign in to comment.