Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: bulk distributed access #30

Open
tomwhite opened this issue Jul 23, 2020 · 35 comments
Open

Feature request: bulk distributed access #30

tomwhite opened this issue Jul 23, 2020 · 35 comments

Comments

@tomwhite
Copy link

Thank you for all your work on this library! We are using bgen-reader-py to implement a Dask-based BGEN reader in sgkit-dev/sgkit-bgen#1. There is already some support for Dask, but (as far as I can tell) the delayed object that is created is for a single row (variant), rather than a chunk of rows. (See https://github.com/limix/bgen-reader-py/blob/master/bgen_reader/_genotype.py#L46.) This is a good choice for random access, but may be prohibitive for bulk access.

Would it be possible to support bulk access with an API that returns the genotype probability array as a Dask array?

@CarlKCarlK
Copy link
Collaborator

CarlKCarlK commented Jul 23, 2020 via email

@tomwhite
Copy link
Author

@CarlKCarlK that's great to know! That looks very useful. I will dig in and try it out soon.

@horta
Copy link
Collaborator

horta commented Aug 1, 2020

That should be done by end of this month (August).

@horta
Copy link
Collaborator

horta commented Aug 10, 2020

@tomwhite I'm trying to figure out a nice way to provide bulk access.

As you probably know, the probabilities matrix of genotype 0 maybe have different number of columns of genotype 1 as the ploidy is allowed to vary. Not only that but a genotype is (in bgen terms) defined in terms of probabilities as well as being phased, ploidy, missingness.

In [6]: genotype[0].compute()
Out[6]:
{'probs': array([[1., 0., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        ...,
        [1., 0., 0.],
        [1., 0., 0.],
        [1., 0., 0.]]),
 'phased': False,
 'ploidy': array([2, 2, 2, ..., 2, 2, 2]),
 'missing': array([False, False, False, ..., False, False, False])}

In [7]: genotype[1].compute()
Out[7]:
{'probs': array([[nan, nan, nan],
        [nan, nan, nan],
        [nan, nan, nan],
        ...,
        [ 1.,  0.,  0.],
        [ 1.,  0.,  0.],
        [nan, nan, nan]]),
 'phased': False,
 'ploidy': array([2, 2, 2, ..., 2, 2, 2]),
 'missing': array([ True,  True,  True, ..., False, False,  True])}

Without trying to be fancy, a quick way to implement bulk access is by indexing genotype by its "bulk index" instead of its genotype index. Something like that:

In [6]: bulk[0].compute().genotype[0]
Out[6]:
{'probs': array([[1., 0., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        ...,
        [1., 0., 0.],
        [1., 0., 0.],
        [1., 0., 0.]]),
 'phased': False,
 'ploidy': array([2, 2, 2, ..., 2, 2, 2]),
 'missing': array([False, False, False, ..., False, False, False])}

In [7]: bulk[0].compute().genotype[1]
Out[7]:
{'probs': array([[nan, nan, nan],
        [nan, nan, nan],
        [nan, nan, nan],
        ...,
        [ 1.,  0.,  0.],
        [ 1.,  0.,  0.],
        [nan, nan, nan]]),
 'phased': False,
 'ploidy': array([2, 2, 2, ..., 2, 2, 2]),
 'missing': array([ True,  True,  True, ..., False, False,  True])}

Does it appeal you?

@horta
Copy link
Collaborator

horta commented Aug 10, 2020

@CarlKCarlK pointed out that he already have bulk access implemented (in the numpy-like interface, accessed via open_bgen). I'm also interested in having that in the dask interface: via read_bgen

@hammer
Copy link

hammer commented Aug 10, 2020

Just want to ensure @eric-czech is in this conversation too as he and Carl have been in discussion separately.

@eric-czech
Copy link

eric-czech commented Aug 10, 2020

Hey @CarlKCarlK, I tested this on a single UKB file and had some results to share.

I ran a script for pybgen, bgen_reader, and the new beta to compare times / resource usage on a single XY PAR file scan. The code and the times I recorded are at https://github.com/eric-czech/sgkit-dev/tree/master/io/bgen/ukb-test.

The file is only 4.6G and here are the times I got on a single core:

  • PyBGEN: ~28 minutes (the .bgi file already exists)
  • bgen_reader: I stopped it after an hour but it was on track to take ~4 hours
  • bgen_reader2: ~34 minutes not including the metadata construction time

I did a little more profiling on the new beta in bgen_reader2_prof.ipynb, though this only loads 10k variants of 45,906 to speed things up. I found that in reading these 10k variants after deleting the metafile, I see resource utilization like this:

prof

The metafile apparently takes about 2m 40s to create, which would be approximately 8% of the overall time. It seems to take a somewhat marginal amount of time compared to the scan, and the cpu/memory usage profiles are otherwise very stable.

Is this about what you would have expected? Our use case would be to traverse the file once and convert it to another format -- is there any relatively simple way to speed up a scan like that?

A few smaller questions:

  • Have you ever used tqdm? Assuming the custom progress prints are coming from python and not c++, that could make things easier. I noticed a small bug with each print where they seem to be off on the end interval, i.e.
metadata -- time=0:02:41.61, step 3: part 45,000 of 45,906
Found 10000 variants
reading -- time=0:00:34.72, part 990 of 1,000 # --> Never makes it to 1,000 
reading -- time=0:00:37.07, part 990 of 1,000
  • I was doing my scan by grabbing row slices in batches of 1000 variants but it looked like the code was actually pulling them in smaller batches of 10. Each "part N" message was incrementing by 10 from 0 up to 1000. Does it work that way regardless of the file size?

@hammer
Copy link

hammer commented Aug 10, 2020

If the progress prints are coming from C++, https://github.com/p-ranav/indicators is an option.

@CarlKCarlK
Copy link
Collaborator

CarlKCarlK commented Aug 10, 2020 via email

@CarlKCarlK
Copy link
Collaborator

CarlKCarlK commented Aug 10, 2020 via email

@horta
Copy link
Collaborator

horta commented Aug 10, 2020

Thanks for the benchmark, @eric-czech !

I have found some bootlenecks on the bgen-reader. Here is a small comparison now:

% python pybgen_exec.py run --path=$HOME/carl/merged_487400x220000.bgen
  0%|          | 778/220000 [00:18<1:28:18, 41.37it/s]
% python bgen_reader_exec.py run --path=$HOME/carl/merged_487400x220000.bgen
  0%|          | 483/220000 [00:10<1:22:19, 44.44it/s]

I have stopped more or less after 10 seconds just to have an idea.

I modified your pybgen_exec.py only to use tqdm:

import fire
import tqdm
from pybgen import PyBGEN


def run(path):
    ct = 0
    with PyBGEN(path) as bgen:
        n = bgen.nb_variants
        for info, dosage in tqdm.tqdm(bgen, total=n):
            assert dosage.ndim == 1
            ct += dosage.size
    print(f"Number of entries read: {ct}")


fire.Fire()

I will try to release tonight a version 4.0.5 for bgen-reader with that speed-up. It would be great if you could find time to run the benchmarks again on that new version (I come back here as soon as I release it).

It will come with the Carl API as well, which might be in the state of experimental but I think that is fine for now.

Notice that I've not used bulk reading (nor have implemented it yet), although reading the variants in order using bgen_reader.read_bgen is faster than in random order.

@horta
Copy link
Collaborator

horta commented Aug 10, 2020

@hammer Thanks for the pointer. I didnt know about that library.

I've coded the low-level of bgen in C and at that time I did not find a nice progress bar library for C only. So I coded mine: https://github.com/horta/almosthere
It is very humble compared to the one you showed: ASCII based only.

@CarlKCarlK
Copy link
Collaborator

CarlKCarlK commented Aug 10, 2020 via email

@horta
Copy link
Collaborator

horta commented Aug 11, 2020

bgen-reader 4.0.5 has been released: https://pypi.org/project/bgen-reader/

It should be a lot faster now. I will be working on the bulk idea or on the Carl's interface.

@eric-czech
Copy link

The Open, Read, and Close methods are @Danilo Hortamailto:[email protected]’s C/C++ code.

Good to know, thanks @CarlKCarlK.

It would be great if you could find time to run the benchmarks again on that new version (I come back here as soon as I release it).
bgen-reader 4.0.5 has been released: https://pypi.org/project/bgen-reader/

Beautiful, appreciate the quick turnaround @horta! I'll give that a try again soon.

@eric-czech
Copy link

eric-czech commented Aug 17, 2020

It should be a lot faster now. I will be working on the bulk idea or on the Carl's interface.

Definitely! Here are the times I got with the new code in context with the previous times:

  • PyBGEN: ~28 minutes (the .bgi file already exists)
  • bgen_reader 4.0.4: I stopped it after an hour but it was on track to take ~4 hours
  • bgen_reader 4.0.5: ~21 minutes including metafile construction time
  • bgen_reader2 in pysnptools: ~34 minutes not including the metadata construction time

Here is the resource utilization profile I saw for bgen_reader 4.0.5 too:

Screen Shot 2020-08-17 at 10 27 09 AM

The low memory usage there is great! I know that seems to imply CPU usage was also 100% but it's actually 100% out of 400% (4 cores on the VM), so it seemed like there wasn't any real benefit to the threading via dask delayed, assuming that's how it still works. I verified by running a big random matrix multiplication which produces a similar graph at 400% CPU utilization across the board. All the related code is in bgen_reader_prof.ipynb.

I'm assuming that the GIL is the culprit with the default Dask scheduler though I'm curious @horta if you did anything special to see higher concurrency when you switched to Dask in the past? I could try with scheduler='processes' but my experience with that is that it usually ends up being pretty close to a wash with scheduler='threads' because of all the serialization for inter-process communication. Let me know if you have any advice.

EDIT: I tried it with the multiprocessing scheduler instead and it was very slow (on track for ~11 hrs) but that's not very fair because I'm asking it to send all the data back to the driver process rather than some reduction over it. It would be great if Dask didn't have this inter-process communication bottleneck -- it keeps coming up!

@eric-czech
Copy link

Also out of curiosity @horta I was trying to find some commits related to your changes that made such a huge improvement in 4.0.5. Was there one you think would be good look at to highlight the difference? I was just hoping to get a bit more familiar with how it all works internally. Thanks again for whatever you did!

@horta
Copy link
Collaborator

horta commented Aug 18, 2020

This one: b40fc36
Before, with a dataset having 500k samples, it would read one variant per time. Per time meaning passing a single variant offset to the function read_genotype_partition, called from here:

g = read_genotype_partition(bgen_filepath, vaddrs)

Everytime the function read_genotype_partition is called, a bgen_file is opened (which opens the bgen file, read the header, and ftell). So minimizing that call will minimize that overhead.

There are basically only two overheads that, if minimized, would make everything even faster:

  • Opening the metafile bgen_metafile
  • Opening the bgen file bgen_file

If you have a single core to use and want to process the whole bgen file as fast as possible, it would suffice to do something like:

with bgen_metafile(metafile_filepath) as mf:
    with bgen_file(bgen_filepath) as bgen:
        # Loop over variants in sequence

You can do the above very easily by using this package I released last week which will be used by bgen-reader very soon: https://github.com/limix/cbgen-py

I believe Carl's interface try to work along those lines but using memmap from numpy.

@horta
Copy link
Collaborator

horta commented Aug 18, 2020

You will find examples of using cbgen here: https://github.com/limix/cbgen-py/blob/master/cbgen/test/test_bgen.py
Or you can wait until we make new releases of bgen-reader.

@eric-czech
Copy link

Everytime the function read_genotype_partition is called, a bgen_file is opened (which opens the bgen file, read the header, and ftell). So minimizing that call will minimize that overhead

👍

You can do the above very easily by using this package I released last week which will be used by bgen-reader very soon: https://github.com/limix/cbgen-py

Awesome! Ok, I will give that a spin -- thanks for the examples!

@eric-czech
Copy link

Hey @horta I tried using cbgen rather than bgen-reader and everything worked nicely!

As an experiment I was going to try to lift the file open/close logic out of _bgen_file.py#L70-L71 into my own loop but that's a bit out of my depth. I wasn't sure what the offset should be in:

def read_genotype(self, offset: int) -> Genotype:
    gt: CData = lib.bgen_file_open_genotype(self._bgen_file, offset)
    ...
    lib.bgen_genotype_close(gt)

Is there a fairly easy way to do that?

@eric-czech
Copy link

Oh and FYI here is a resource profile I had in using cbgen to read a UKB contig:

cbgen_prof

This is from using a cbgen_reader.py wrapper I made to potentially replace the sgkit-bgen#bgen_reader.py wrapper and I thought the sawtooth memory usage was odd. I didn't see that earlier in #30 (comment) when reading the variants in order but I'm not sure what is being "built up" in this case. The wrapper is pre-allocating result arrays so the memory usage shouldn't be so incremental.

Might there be anything in cbgen or bgen_reader that would cause this gradual increase in memory used when reading blocks of variants out-of-order?

@horta
Copy link
Collaborator

horta commented Aug 19, 2020

Hey @eric-czech , the offset for each variant is the result of ftell on the start of the variant block in the bgen file: https://github.com/limix/bgen/blob/a311bcfd32ece93b92d9c2cd9bf964e8d55cb1a6/src/file.c#L145

There is no easy way to get all those offsets. As the bgen file specification stands, you don't know where the variant block start for variant n without knowing where the variant n-1 started (and knowning the size of variant block n-1.

That is one of the reasons index files exist for bgen files.

@horta
Copy link
Collaborator

horta commented Aug 19, 2020

Regarding memory, I don't know. Isn't that just garbage collection by Python? (you can try to use gc.collect() to exclude that out)
Also, if that is out-of-order, and assuming that you reading in a uniformly random fashion, why that patteern would be so precise? It looks odd to me too but I've to admit that I don't have much experience on how garbage collection tend to behave. cbgen library does not do anything fancy: allocate numpy arrays, use the C bgen library to copy the requested data to them, and that is over.

@eric-czech
Copy link

There is no easy way to get all those offsets. As the bgen file specification stands, you don't know where the variant block start for variant n without knowing where the variant n-1 started (and knowning the size of variant block n-1.

Is it not possible to:

  • Use the metafile to get the offset to the first variant in a block of variants to be read (block meaning some number of sequential variants)
  • Do a lib.bgen_file_open_genotype(self._bgen_file, offset) with that offset
  • Read data for the first variant
  • "Advance" the open file to the next offset without closing it
  • Read data for the next variant
  • ... and so on

I have no idea how the "Advance the open file" part would work in cbgen -- is that more difficult than it sounds?

Also, if that is out-of-order, and assuming that you reading in a uniformly random fashion

Mm well what I mean is Dask asks us to fetch some block of adjacent variants (say 100 of them), and there guarantees on which order it will ask for the blocks but they do typically go in order. I.e. for two workers process/thread 1 fetches variants 0 to 100 while process/thread 2 fetches variants 100 to 200 then worker 1 does 200 to 300, etc. "Arbitrary blocks of some size" is probably what I should have called it rather than random.

cbgen library does not do anything fancy: allocate numpy arrays, use the C bgen library to copy the requested data to them, and that is over.

Makes sense. It's probably something in Dask or our wrapper then -- I saw a similar pattern when making a Dask wrapper around pybgen too.

@horta
Copy link
Collaborator

horta commented Aug 20, 2020

I've created an issue to perform "advance the open file". I think it won't change anything: opening variant genotype in sequence should have the same effect as "advancing the open file". Important bit of the C library for that: https://github.com/limix/bgen/blob/a311bcfd32ece93b92d9c2cd9bf964e8d55cb1a6/src/file.c#L145

I indeed perform a fseek each time one opens a variant genotype but I believe the underlying implementation is smart enough to figure out that we are already at the right file position. (I just couldn't find an explicit explanation about that on google)

@horta
Copy link
Collaborator

horta commented Aug 20, 2020

Just to clarify: after you close an openened variant genotype n, the file stream poisition remains at the start position of variant genotype n+1. Which means that subsequently opening variant genotype n+1 and reading it, should have zero overhead regarding file stream position (I'm assuming here that performing fseek that amounts to the current position will have zero overhead.)

@eric-czech
Copy link

Just to clarify: after you close an openened variant genotype n, the file stream poisition remains at the start position of variant genotype n+1. Which means that subsequently opening variant genotype n+1 and reading it, should have zero overhead regarding file stream position

Ah ok, thanks @horta.

The biggest killer on reading to a Dask array then, AFAICT, is the lack of sample slicing for chunks. Right now, the way we're reading the data in might say that chunks are of size say 1,000 variants x 10,000 samples, but that still means we have to read all the samples for each variant + chunk (so I'm reading the same variant data upwards of 10 times for UKB).

Do you know if there's some relatively simple way to have cbgen/bgen-reader not deserialize all the unwanted samples into memory? In other words, is it possible to do something like this:

  • Take a single variant and a subset of adjacent samples to get data for
  • Seek to the start of the data for the variant
  • Further seek to the start of the data for the sample block
  • Scan and read all probabilities until you get to the end of the sample block

Let me know if that doesn't make sense.

@tomwhite
Copy link
Author

The biggest killer on reading to a Dask array then, AFAICT, is the lack of sample slicing for chunks.

Another way of tackling this would be to convert to an intermediate chunked format (like zarr). This would be done once, then analysis would be performed on the intermediate format, which would have efficient, chunked read patterns. Perhaps this could be something we recommend for large numbers of samples?

This is the way I am doing it for VCF, which is the only real option as we don't have offset information for the n-th variant, so we need to convert to something like zarr anyway.

@eric-czech
Copy link

Another way of tackling this would be to convert to an intermediate chunked format (like zarr).

That's basically what I'm after, a more efficient way to do that conversion. Are you talking about some process for it that doesn't use Dask?

I'd like to think of this as a one-time conversion that we eat the cost of but even now when I'm using ridiculously wide chunks (~1,000 x ~100,000) to avoid some of the repetitive sample reads, it looks it will cost roughly $300 for the vCPU+memory hours (storage aside). That isn't so bad I suppose but I'd like to get those chunks down to more like 10k x 10k, which would hypothetically cost ~5x as much if the write and read workloads are fairly balanced (IO/network bandwidth aside).

@tomwhite
Copy link
Author

Are you talking about some process for it that doesn't use Dask?

No, it still uses Dask, but instead of using dask.array.from_array to create a Dask array and then write it to Zarr, use Dask delayed to read ranges of variants in parallel (and all their samples) and write to Zarr directly. This should avoid repetitive sample reads, while still getting chunking in the samples dimension (by setting the Zarr chunks appropriately).

I do something similar for VCF, but it should be simpler for BGEN since you can read variant ranges that align with Zarr chunks, which means you can avoid the concat and rechunk operations that are needed for VCF. See https://github.com/tomwhite/sgkit-vcf/blob/master/sgkit_vcf/vcf_reader.py#L206-L221

@eric-czech
Copy link

use Dask delayed to read ranges of variants in parallel (and all their samples) and write to Zarr directly

Ahhh I see what you mean. I would like to think that there is some way to seek between blocks of sample data for different variants in bgen, but I can't imagine how that's possible if compression is on. I'll try a similar bgen_to_zarr function instead.

@tomwhite
Copy link
Author

I would like to think that there is some way to seek between blocks of sample data for different variants in bgen, but I can't imagine how that's possible if compression is on.

Exactly. The genotype blocks in BGEN are compressed using zlib or zstd, and so there's no way to avoid decompressing the whole block to only pull out a subset of samples.

@horta
Copy link
Collaborator

horta commented Aug 24, 2020

Just to clarify: after you close an openened variant genotype n, the file stream poisition remains at the start position of variant genotype n+1. Which means that subsequently opening variant genotype n+1 and reading it, should have zero overhead regarding file stream position

Ah ok, thanks @horta.

The biggest killer on reading to a Dask array then, AFAICT, is the lack of sample slicing for chunks. Right now, the way we're reading the data in might say that chunks are of size say 1,000 variants x 10,000 samples, but that still means we have to read all the samples for each variant + chunk (so I'm reading the same variant data upwards of 10 times for UKB).

Do you know if there's some relatively simple way to have cbgen/bgen-reader not deserialize all the unwanted samples into memory? In other words, is it possible to do something like this:

* Take a single variant and a subset of _adjacent_ samples to get data for

* Seek to the start of the data for the variant

* Further seek to the start of the data for the sample block

* Scan and read all probabilities until you get to the end of the sample block

Let me know if that doesn't make sense.

If compressed, the whole structure of probabilities, ploidy, phasedness, missingness, of a single variant that includes all samples has to first me decompressed. So I think the answer to your question is no whenever bgen file is using compression (which I believe will be in most cases.).

If it is not compressed, then it seems possible to just read only the probabilities of the samples your are interested in (https://www.well.ox.ac.uk/~gav/bgen_format/spec/latest.html).

@horta
Copy link
Collaborator

horta commented Aug 24, 2020

Ops, you already figured out the answer. Sorry, didn't read before answering!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants