Skip to content

Commit

Permalink
hsload fix for large contiguous datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
jreadey committed Nov 7, 2024
1 parent 0f8721c commit deec334
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 14 deletions.
8 changes: 7 additions & 1 deletion h5pyd/_apps/hsload.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
sys.exit(1)

try:
# this package is used when reading HDF5 files on S3
import s3fs

S3FS_IMPORT = True
except ImportError:
S3FS_IMPORT = False

try:
# this package is useful for HDF5 compresssed with blosc, etc.
import hdf5plugin
except ImportError:
pass

if __name__ == "__main__":
from config import Config
from utillib import load_file, load_h5image
Expand Down
148 changes: 144 additions & 4 deletions h5pyd/_apps/utillib.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

# adjust chunk shape to fit between min and max chunk sizes when possible
MIN_CHUNK_SIZE = 1 * 1024 * 1024
MAC_CHUNK_SIZE = 8 * 1024 * 1024
MAX_CHUNK_SIZE = 8 * 1024 * 1024
CHUNK_BASE = 64 * 1024 # Multiplier by which chunks are adjusted

H5Z_FILTER_MAP = {
32001: "blosclz",
Expand Down Expand Up @@ -235,6 +236,139 @@ def convert_dtype(srcdt, ctx):
return tgt_dt


def guess_chunk(shape, typesize):
""" Guess an appropriate chunk layout for a dataset, given its shape and
the size of each element in bytes. Will allocate chunks only as large
as MAX_SIZE. Chunks are generally close to some power-of-2 fraction of
each axis, slightly favoring bigger values for the last index.
Undocumented and subject to change without warning.
"""

# For unlimited dimensions we have to guess 1024
shape = tuple((x if x != 0 else 1024) for i, x in enumerate(shape))

ndims = len(shape)
if ndims == 0:
raise ValueError("Chunks not allowed for scalar datasets.")

chunks = np.array(shape, dtype='=f8')
if not np.all(np.isfinite(chunks)):
raise ValueError("Illegal value in chunk tuple")

# Determine the optimal chunk size in bytes using a PyTables expression.
# This is kept as a float.
dset_size = np.prod(chunks) * typesize
target_size = CHUNK_BASE * (2 ** np.log10(dset_size / (1024. * 1024)))

if target_size > MIN_CHUNK_SIZE:
target_size = MAX_CHUNK_SIZE
elif target_size < MIN_CHUNK_SIZE:
target_size = MIN_CHUNK_SIZE

idx = 0
while True:
# Repeatedly loop over the axes, dividing them by 2. Stop when:
# 1a. We're smaller than the target chunk size, OR
# 1b. We're within 50% of the target chunk size, AND
# 2. The chunk is smaller than the maximum chunk size

chunk_bytes = np.prod(chunks) * typesize

if (chunk_bytes < target_size or abs(chunk_bytes - target_size) / target_size < 0.5) and \
chunk_bytes < MAX_CHUNK_SIZE:
break

if np.prod(chunks) == 1:
break # Element size larger than CHUNK_MAX
chunks[idx % ndims] = np.ceil(chunks[idx % ndims] / 2.0)
idx += 1

return tuple(int(x) for x in chunks)


class ChunkIterator(object):
"""
Class to iterate through list of chunks of a given dataset
"""

def __init__(self, dset, source_sel=None):
self._shape = dset.shape
rank = len(dset.shape)

if not dset.chunks:
# coniguous layout - create some psuedo-chunks so we do't
# try to read to much data in one selection
self._layout = guess_chunk(dset.shape, dset.dtype.itemsize)
elif isinstance(dset.chunks, dict):
self._layout = dset.chunks["dims"]
else:
self._layout = dset.chunks

if source_sel is None:
# select over entire dataset
slices = []
for dim in range(rank):
slices.append(slice(0, self._shape[dim]))
self._sel = tuple(slices)
else:
if isinstance(source_sel, slice):
self._sel = (source_sel,)
else:
self._sel = source_sel
if len(self._sel) != rank:
raise ValueError(
"Invalid selection - selection region must have same rank as dataset"
)
self._chunk_index = []
for dim in range(rank):
s = self._sel[dim]
if s.start < 0 or s.stop > self._shape[dim] or s.stop <= s.start:
msg = "Invalid selection - selection region must be within dataset space"
raise ValueError(msg)
index = s.start // self._layout[dim]
self._chunk_index.append(index)

def __iter__(self):
return self

def __next__(self):
rank = len(self._shape)
slices = []
if rank == 0 or self._chunk_index[0] * self._layout[0] >= self._sel[0].stop:
# ran past the last chunk, end iteration
raise StopIteration()

for dim in range(rank):
s = self._sel[dim]
start = self._chunk_index[dim] * self._layout[dim]
stop = (self._chunk_index[dim] + 1) * self._layout[dim]
# adjust the start if this is an edge chunk
if start < s.start:
start = s.start
if stop > s.stop:
stop = s.stop # trim to end of the selection
s = slice(start, stop, 1)
slices.append(s)

# bump up the last index and carry forward if we run outside the selection
dim = rank - 1
while dim >= 0:
s = self._sel[dim]
self._chunk_index[dim] += 1

chunk_end = self._chunk_index[dim] * self._layout[dim]
if chunk_end < s.stop:
# we still have room to extend along this dimensions
return tuple(slices)

if dim > 0:
# reset to the start and continue iterating with higher dimension
self._chunk_index[dim] = 0
dim -= 1
return tuple(slices)


# ----------------------------------------------------------------------------------
def copy_element(val, src_dt, tgt_dt, ctx):
msg = f"copy_element, val: {val} "
Expand Down Expand Up @@ -1262,8 +1396,12 @@ def create_dataset(dobj, ctx):
# supported non-standard compressor
kwargs["compression"] = filter_name
logging.info(f"using compressor: {filter_name} for {dobj.name}")
kwargs["compression_opts"] = filter_opts
logging.info(f"compression_opts: {filter_opts}")
if isinstance(filter_opts, int) or isinstance(filter_opts, dict):
kwargs["compression_opts"] = filter_opts
logging.info(f"compression_opts: {filter_opts}")
else:
msg = f"ignoring compression_opts for filter: {filter_name}"
logging.warn(msg)
else:
logging.warning(f"filter id {filter_id} for {dobj.name} not supported")

Expand Down Expand Up @@ -1402,6 +1540,7 @@ def write_dataset(src, tgt, ctx):
if ctx["verbose"]:
print(msg)
try:
"""
if src.chunks is None:
# contiguous dataset, fake an iterator by creating a list
# with one slice
Expand All @@ -1417,11 +1556,12 @@ def write_dataset(src, tgt, ctx):
it = [slices,]
else:
it = src.iter_chunks()

"""
logging.debug(f"src dtype: {src.dtype}")
logging.debug(f"des dtype: {tgt.dtype}")

empty_arr = None
it = ChunkIterator(src)
for src_s in it:
logging.debug(f"src selection: {src_s}")
if rank == 1 and isinstance(src_s, slice):
Expand Down
2 changes: 1 addition & 1 deletion h5pyd/_hl/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def make_new_dset(
maxshape = tuple(m if m is not None else 0 for m in maxshape)
body["maxdims"] = maxshape
else:
print("Warning: maxshape provided but no shape")
print("maxshape provided but no shape")

req = "/datasets"

Expand Down
13 changes: 5 additions & 8 deletions h5pyd/_hl/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,11 @@ def rq_tuple(tpl, name):
for k in compression_opts:
filter[k] = compression_opts[k]
else:
if compression_opts in range(10):
level = compression_opts
else:
raise ValueError(
"compression setting must be a dict or integer from 0-9, not {}".format(
compression_opts
)
)
if compression_opts not in range(10):
msg = "compression setting must be a dict or integer from 0-9, "
msg += f"not {compression_opts}"
raise ValueError(msg)
level = compression_opts
filter["level"] = level
filters.append(filter)

Expand Down

0 comments on commit deec334

Please sign in to comment.