Skip to content

Commit

Permalink
(refactor): try maybe open?
Browse files Browse the repository at this point in the history
  • Loading branch information
ilan-gold committed Apr 17, 2024
1 parent aa1006e commit dda7d83
Showing 1 changed file with 24 additions and 19 deletions.
43 changes: 24 additions & 19 deletions src/anndata/_io/specs/lazy_methods.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

from contextlib import contextmanager
from pathlib import Path

import h5py
import numpy as np
from scipy import sparse
Expand Down Expand Up @@ -46,42 +49,44 @@ def make_index(is_csc, stride, shape, block_id):
return index


@contextmanager
def maybe_open_h5(filename_or_elem: str | ZarrGroup, elem_name: str):
if isinstance(filename_or_elem, str):
file = h5py.File(filename_or_elem, "r")
try:
yield file[elem_name]
finally:
file.close()
else:
try:
yield filename_or_elem
finally:
pass


@_LAZY_REGISTRY.register_read(H5Group, IOSpec("csc_matrix", "0.1.0"))
@_LAZY_REGISTRY.register_read(H5Group, IOSpec("csr_matrix", "0.1.0"))
@_LAZY_REGISTRY.register_read(ZarrGroup, IOSpec("csc_matrix", "0.1.0"))
@_LAZY_REGISTRY.register_read(ZarrGroup, IOSpec("csr_matrix", "0.1.0"))
def read_sparse_as_dask_h5(elem, _reader):
filename = elem.file.filename
elem_name = elem.name
filename_or_elem = elem.file.filename if isinstance(elem, H5Group) else elem
elem_name = elem.name if isinstance(elem, H5Group) else Path(elem.path).name
shape = elem.attrs["shape"]
dtype = elem["data"].dtype
is_csc = elem.attrs["encoding-type"] == "csc_matrix"

def make_dask_chunk(block_id=None):
# We need to open the file in each task since `dask` cannot share h5py objects when using `dask.distributed`
# https://github.com/scverse/anndata/issues/1105
with h5py.File(filename, "r") as f:
mtx = ad.experimental.sparse_dataset(f[elem_name])
with maybe_open_h5(filename_or_elem, elem_name) as f:
mtx = ad.experimental.sparse_dataset(f)
index = make_index(is_csc, stride, shape, block_id)
chunk = mtx[index]
return chunk

return make_dask_array(is_csc, shape, make_dask_chunk, dtype)


@_LAZY_REGISTRY.register_read(ZarrGroup, IOSpec("csc_matrix", "0.1.0"))
@_LAZY_REGISTRY.register_read(ZarrGroup, IOSpec("csr_matrix", "0.1.0"))
def read_sparse_as_dask_zarr(elem, _reader):
shape = elem.attrs["shape"]
dtype = elem["data"].dtype
is_csc = elem.attrs["encoding-type"] == "csc_matrix"

def make_dask_chunk(block_id=None):
mtx = ad.experimental.sparse_dataset(elem)
index = make_index(is_csc, stride, shape, block_id)
return mtx[index]

return make_dask_array(is_csc, shape, make_dask_chunk, dtype)


@_LAZY_REGISTRY.register_read(H5Array, IOSpec("array", "0.2.0"))
def read_h5_array(elem, _reader):
import dask.array as da
Expand Down

0 comments on commit dda7d83

Please sign in to comment.