diff --git a/src/lgdo/lh5_store.py b/src/lgdo/lh5_store.py index 342a00ff..da02f5ac 100644 --- a/src/lgdo/lh5_store.py +++ b/src/lgdo/lh5_store.py @@ -169,6 +169,7 @@ def read_object( start_row: int = 0, n_rows: int = sys.maxsize, idx: np.ndarray | list | tuple | list[np.ndarray | list | tuple] = None, + use_h5idx: bool = False, field_mask: dict[str, bool] | list[str] | tuple[str] = None, obj_buf: LGDO = None, obj_buf_start: int = 0, @@ -176,6 +177,14 @@ def read_object( ) -> tuple[LGDO, int]: """Read LH5 object data from a file. + Use the ``idx`` parameter to read out particular rows of the data. The ``use_h5idx`` flag + controls whether *only* those rows are read from disk or if the rows are indexed after reading + the entire object. Reading individual rows can be orders of magnitude slower than reading + the whole object and then indexing the desired rows. The default behavior (``use_h5idx=False``) + is to use slightly more memory for a much faster read. See + `legend-pydataobj #29 `_ + for additional information. + Parameters ---------- name @@ -192,16 +201,27 @@ def read_object( actual number of rows read will be returned as one of the return values (see below). idx - For NumPy-style "fancying indexing" for the read. Used to read out - rows that pass some selection criteria. Only selection along the first - axis is supported, so tuple arguments must be one-tuples. If `n_rows` - is not false, `idx` will be truncated to `n_rows` before reading. To use - with a list of files, can pass in a list of `idx`'s (one for each - file) or use a long contiguous list (e.g. built from a previous + For NumPy-style "fancying indexing" for the read to select only some + rows, e.g. after applying some cuts to particular columns. + Only selection along the first axis is supported, so tuple arguments + must be one-tuples. If `n_rows` is not false, `idx` will be truncated to + `n_rows` before reading. To use with a list of files, can pass in a list of + `idx`'s (one for each file) or use a long contiguous list (e.g. built from a previous identical read). If used in conjunction with `start_row` and `n_rows`, will be sliced to obey those constraints, where `n_rows` is interpreted as the (max) number of *selected* values (in `idx`) to be - read out. + read out. Note that the ``use_h5idx`` parameter controls some behaviour of the + read and that the default behavior (``use_h5idx=False``) prioritizes speed over + a small memory penalty. + use_h5idx + ``True`` will directly pass the ``idx`` parameter to the underlying + ``h5py`` call such that only the selected rows are read directly into memory, + which conserves memory at the cost of speed. There can be a significant penalty + to speed for larger files (1 - 2 orders of magnitude longer time). + ``False`` (default) will read the entire object into memory before + performing the indexing. The default is much faster but requires additional memory, + though a relatively small amount in the typical use case. It is recommended to + leave this parameter as its default. field_mask For tables and structs, determines which fields get written out. Only applies to immediate fields of the requested objects. If a dict @@ -223,6 +243,7 @@ def read_object( after reading. The option has no effect on data encoded with HDF5 built-in filters, which is always decompressed upstream by HDF5. + Returns ------- (object, n_rows_read) @@ -236,6 +257,14 @@ def read_object( if not isinstance(lh5_file, (str, h5py.File)): lh5_file = list(lh5_file) n_rows_read = 0 + + # to know whether we are reading in a list of files. + # this is part of the fix for reading data by idx + # (see https://github.com/legend-exp/legend-pydataobj/issues/29) + # so that we only make a copy of the data if absolutely necessary + # or if we can read the data from file without having to make a copy + self.in_file_loop = True + for i, h5f in enumerate(lh5_file): if isinstance(idx, list) and len(idx) > 0 and not np.isscalar(idx[0]): # a list of lists: must be one per file @@ -255,22 +284,32 @@ def read_object( else: idx_i = None n_rows_i = n_rows - n_rows_read + + # maybe someone passed in a list of len==1? + if i == (len(lh5_file) - 1): + self.in_file_loop = False + obj_buf, n_rows_read_i = self.read_object( name, lh5_file[i], start_row=start_row, n_rows=n_rows_i, idx=idx_i, + use_h5idx=use_h5idx, field_mask=field_mask, obj_buf=obj_buf, obj_buf_start=obj_buf_start, decompress=decompress, ) + n_rows_read += n_rows_read_i if n_rows_read >= n_rows or obj_buf is None: return obj_buf, n_rows_read start_row = 0 obj_buf_start += n_rows_read_i + + self.in_file_loop = False + return obj_buf, n_rows_read # get the file from the store @@ -358,6 +397,7 @@ def read_object( start_row=start_row, n_rows=n_rows, idx=idx, + use_h5idx=use_h5idx, decompress=decompress, ) # modify datatype in attrs if a field_mask was used @@ -404,6 +444,7 @@ def read_object( start_row=start_row, n_rows=n_rows, idx=idx, + use_h5idx=use_h5idx, obj_buf=fld_buf, obj_buf_start=obj_buf_start, decompress=decompress, @@ -497,6 +538,7 @@ def read_object( start_row=start_row, n_rows=n_rows, idx=idx, + use_h5idx=use_h5idx, obj_buf=None if decompress else decoded_size_buf, obj_buf_start=0 if decompress else obj_buf_start, ) @@ -508,6 +550,7 @@ def read_object( start_row=start_row, n_rows=n_rows, idx=idx, + use_h5idx=use_h5idx, obj_buf=None if decompress else encoded_data_buf, obj_buf_start=0 if decompress else obj_buf_start, ) @@ -573,6 +616,7 @@ def read_object( start_row=start_row, n_rows=n_rows, idx=idx, + use_h5idx=use_h5idx, obj_buf=cumulen_buf, obj_buf_start=obj_buf_start, ) @@ -597,6 +641,7 @@ def read_object( start_row=start_row, n_rows=n_rows, idx=idx2, + use_h5idx=use_h5idx, ) fd_starts = fd_starts.nda # we just need the nda if fd_start is None: @@ -679,6 +724,7 @@ def read_object( start_row=fd_start, n_rows=fd_n_rows, idx=fd_idx, + use_h5idx=use_h5idx, obj_buf=fd_buf, obj_buf_start=fd_buf_start, ) @@ -722,9 +768,22 @@ def read_object( if n_rows_to_read > n_rows: n_rows_to_read = n_rows + # if idx is passed, check if we can make it a slice instead (faster) + change_idx_to_slice = False + # prepare the selection for the read. Use idx if available if idx is not None: - source_sel = idx + # check if idx is empty and convert to slice instead + if len(idx[0]) == 0: + source_sel = np.s_[0:0] + change_idx_to_slice = True + # check if idx is contiguous and increasing + # if so, convert it to a slice instead (faster) + elif np.all(np.diff(idx[0]) == 1): + source_sel = np.s_[idx[0][0] : idx[0][-1] + 1] + change_idx_to_slice = True + else: + source_sel = idx else: source_sel = np.s_[start_row : start_row + n_rows_to_read] @@ -734,14 +793,34 @@ def read_object( if len(obj_buf) < buf_size: obj_buf.resize(buf_size) dest_sel = np.s_[obj_buf_start:buf_size] - h5f[name].read_direct(obj_buf.nda, source_sel, dest_sel) + + # this is required to make the read of multiple files faster + # until a better solution found. + if change_idx_to_slice or idx is None or use_h5idx: + h5f[name].read_direct(obj_buf.nda, source_sel, dest_sel) + else: + # it is faster to read the whole object and then do fancy indexing + obj_buf.nda[dest_sel] = h5f[name][...][source_sel] + nda = obj_buf.nda else: if n_rows == 0: tmp_shape = (0,) + h5f[name].shape[1:] nda = np.empty(tmp_shape, h5f[name].dtype) else: - nda = h5f[name][source_sel] + if change_idx_to_slice or idx is None or use_h5idx: + nda = h5f[name][source_sel] + else: + # it is faster to read the whole object and then do fancy indexing + nda = h5f[name][...][source_sel] + + # if reading a list of files recursively, this is given to obj_buf on + # the first file read. obj_buf needs to be resized and therefore + # it needs to hold the data itself (not a view of the data). + # a view is returned by the source_sel indexing, which cannot be resized + # by ndarray.resize(). + if hasattr(self, "in_file_loop") and self.in_file_loop: + nda = np.copy(nda) # special handling for bools # (c and Julia store as uint8 so cast to bool)