You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Downstream of #386, the commit stage - when you exit the stage_version(...) context manager - is by far what takes the longest time. This is unsurprising, as #386 didn't touch it.
We should extend the redesign of #386 to everything that happens afterwards, which means hashing and writing to disk.
The StagedChangesArray introduced in #386 offers solid infrastructure to build on.
Phases
This rewrite can happen in separate phases, each with an incremental benefit to production:
push into the StagedChangesArray the hashing phase and the writing to hdf5 when writing from a InMemoryDataset or InMemorySparseDataset, leaving the creation of the virtual dataset to the legacy code;
rewrite the creation of the virtual dataset downstream of InMemoryDataset or InMemorySparseDataset in a way that natively interfaces with the metadata of the StagedChangesArray, can be much more aggressively cythonized, and doesn't use the data_dict nor ndindex at any point;
rewrite create_dataset and all other machinery on top of the StagedChangesArray so that the legacy code can be retired.
Performance expectations
Committing a 3 GiB dataset with 40,000x 80 kiB chunks (all chunks modified and different from the previous values) takes
12.7s today (as measured on NVMe Linux desktop)
~9.1s expected after phase 1
~5.8s expected after phase 2
A granular breakdown of these numbers appers later in this document.
Create a new StagedChangesArray.hash_tables list, matching the slabs list.
Every slab has a matching hash_table with dtype=hsize_t and shape=(n, 4), where n is the number of chunks in the slab (so every row can contain exactly one sha256 hash).
hash_tables[i][j, :] is the hash for the chunk on the slab at slabs[i][j * chunk_size[0]:(j + 1) * chunk_size[0]].
New requirement: base slabs must be aligned to chunk_sizes[0]. This is de facto already true for the raw_data slab.
New hash tables are created with np.zeros together with the matching slabs. A hash set to 0 means that it's uninitialised.
The hash_tables entry for the raw_data base slab is an actual np.ndarray loaded from disk, not a h5py.Dataset.
Committing changes
To "commit" is the act of taking all chunks that lie on a staged slab and moving them to a new base slab. The new base slab is created by a callback that must replicate the API of numpy.empty. InMemoryDataset, instead of creating a new empty numpy array, will enlarge raw_data and then return an ad-hoc view to it (using a thin wrapper class).
Committing is broken down in two stages: HashPlan and CommitPlan.
HashPlan
This new very simple plan finds all staged chunks and passes the sorted list of slab indices and slab offsets to a cythonized sha256 hasher, which will feature a vectorized API very similar to read_many_slices.
StagedChangesArray.commit() calls the hasher:
forslab_idx, instructionsinplan.hashes:
slab=self._get_slab(slab_idx)
hash_table=self._get_hash_table(slab_idx)
hasher(slab, hash_table, instructions) # pure C
The hasher populates StagedChangesArray.hash_tables for all staged chunks (but, crucially, not for deleted chunks and without covering uninitialised memory that may be on the edge chunks).
It also calculates the hash of the full chunk, as if it were C-contiguous.
CommitPlan
This new plan takes in input the full StagedChangesArray.hash_tables list.
First, it scans hash_tables and uses it to compile a hashmap of {hash key: (slab idx, slab offset)}.
Slabs with lower numeration overwrite the ones with higher numeration, meaning that
if a real chunk is actually full of fill_value, it will be replaced by the full chunk
if a chunk on a staged slab is identical to one on a base slab, it will be replaced by the one on the base slab.
All hashes are parsed - not just the ones referenced by a chunk - so that a chunk that was committed 2+ versions ago and then was deleted, but still exists in raw_data, it can be reused in the coming commit.
Once the map is compiled, we can once again scan for all staged chunks. This query is identical to the one we used in the HashPlan moments ago and the previous output should be reused for efficiency.
For each chunk, we now proceed to update slab_indices and slab_offsets with whatever we find in the hashmap (slab with lowest index wins). We have successfully deduplicated our staged chunks among each other and against the base chunk. Whenever a staged chunk already exists on the base slab, it now becomes just a reference to it.
Now we insert the new base slab just after the other base slabs. This is different from what we do normally, which is append at the end of the slabs list. CommitPlan itself will also create a new hash_tables entry, unlike the other MutatingPlans that just instruct StagedChangesArray to do so.
We scan slab_indices and slab_offsets again. Same query as before, but different result because of the deduplication pass we just completed.
We then compile a TransferPlan. This is the inverse of LoadPlan, where all the contents of the base slabs are copied to a new staged slab and then dropped. Here, we copy all the contents of the staged slabs (those that survived deduplication) to a new base slab and finally we drop all the staged slabs.
From InMemoryDataset/InMemorySparseDataset's POV
Commit starts: there is 1 base slab, which is raw_data, and 0 or more staged slabs.
Call self.staged_changes.commit(empty=self.resize_raw_data_and_return_view) where the callback resizes self.id.raw_data and returns a view to the new surface.
There are now exactly 3 elements in self.staged_changes.slabs:
slabs[0]: the full slab (ignore it)
slabs[1]: raw_data (may have been replaced with None)
slabs[2]: view to extended raw_data we just created with the callback.
All data - but not the metadata yet - has been saved to disk.
We can now resize the hash table in the hdf5 dataset and append to it staged_changes.hash_tables[2].
At the end of the last stage we can transitionally resume with what we were doing in the legacy design: call self.staged_changes.changes(), compile a data_dict, and pass it to create_virtual_dataset.
This also has the advantage that we don't need to think yet about what happens in the create_dataset function.
In a later phase, however, we can rewrite create_virtual_dataset so that it accepts slab_indices and slab_offsets directly, in a way that mirrors build_slab_indices_and_offsets and doesn't require a data_dict or ndindex.
When this happens, the data_dict and the CommitPlan can be retired.
create_dataset will need to be reimplemented on top of StagedChangesArray.from_array(as_base=False), which will create staged slabs from views of the original data.
Components
Phase 1:
Add hash table to InMemoryDatasetID, next to raw_data
and all chunks modified (write to local NVMe):
TODAY (as of #386)
All timings are measured with pyinstrument:
1.3s InMemoryDataset.data_dict
1.6s openssl_sha256
0.9s Hashtable control code
5.7s Dataset.__setitem__
2.7s create_virtual_dataset
0.5s misc control code
12.7s TOTAL
After phase 1 (move hashing and writing to StagedChangesArray).
0.4s plans formulation (guesstimate)
1.6s hash_slabs (measured on POC code)
3.1s read_many_slices calls H5Dwrite for each chunk (measured on POC code)
1.3s InMemoryDataset.data_dict (same as prod)
2.7s create_virtual_dataset (same as prod)
9.1s TOTAL
Note: Early POC benchmarks show no benefit of calling openssl C hashers directly vs. hashlib at 10k points per chunk (80kb), and start seeing a substantial benefit at 1000 points per chunk (8 kb).
After phase 2 (rewrite create_virtual_dataset):
0.4s plans formulation (guesstimate)
1.6s hash_slabs (measured on POC code)
3.1s read_many_slices calls H5Dwrite for each chunk (measured on POC code)
0.7s create_virtual_dataset (guesstimate)
5.8s TOTAL
Out of scope bonus: cache on read
Downstream of this it becomes straightforward, if we wish to, to implement a switch that enables cache on read for getitem, like it used to happen before #386.
GetItemPlan becomes a MutatingPlan with the option, if the cache-on-read flag is on, to load a new staged slab from hdf5 and then read from it, just like SetItemPlan loads a new staged slab from hdf5 for the partially modified data and then updates it.
When GetItemPlan creates a new slab, it also appends a new array to hash_tables and populates it with a copy of the hashes it just copied. This means that HashPlan will later have an easy time skipping the hashing of the array.
LoadPlan likewise copies over the hashes from the base slabs to the staged slabs.
When SetItemPlan and ResizePlan update an existing slab, they instead zero out the updated chunks to signal that the hash needs to be (re)computed.
astype() completely wipes the hash tables.
refill() could partially wipe the hash tables only where it actually changes points, but for the sake of simplicity it's likely simpler to just wipe everything.
The text was updated successfully, but these errors were encountered:
Downstream of #386, the commit stage - when you exit the
stage_version(...)
context manager - is by far what takes the longest time. This is unsurprising, as #386 didn't touch it.We should extend the redesign of #386 to everything that happens afterwards, which means hashing and writing to disk.
The StagedChangesArray introduced in #386 offers solid infrastructure to build on.
Phases
This rewrite can happen in separate phases, each with an incremental benefit to production:
Performance expectations
Committing a 3 GiB dataset with 40,000x 80 kiB chunks (all chunks modified and different from the previous values) takes
A granular breakdown of these numbers appers later in this document.
Low level design
NOTE: this section expects the reader to be already familiar with the low level design of #386. If you aren't, please stop here and read https://github.com/deshaw/versioned-hdf5/blob/17c687d3e1f1bfb64781a7b9039dbb621f5a46b4/docs/staged_changes.rst first.
Create a new
StagedChangesArray.hash_tables
list, matching theslabs
list.Every slab has a matching hash_table with dtype=hsize_t and shape=(n, 4), where n is the number of chunks in the slab (so every row can contain exactly one sha256 hash).
hash_tables[i][j, :]
is the hash for the chunk on the slab atslabs[i][j * chunk_size[0]:(j + 1) * chunk_size[0]]
.New requirement: base slabs must be aligned to
chunk_sizes[0]
. This is de facto already true for theraw_data
slab.New hash tables are created with
np.zeros
together with the matching slabs. A hash set to 0 means that it's uninitialised.The hash_tables entry for the raw_data base slab is an actual np.ndarray loaded from disk, not a h5py.Dataset.
Committing changes
To "commit" is the act of taking all chunks that lie on a staged slab and moving them to a new base slab. The new base slab is created by a callback that must replicate the API of numpy.empty. InMemoryDataset, instead of creating a new empty numpy array, will enlarge raw_data and then return an ad-hoc view to it (using a thin wrapper class).
Committing is broken down in two stages: HashPlan and CommitPlan.
HashPlan
This new very simple plan finds all staged chunks and passes the sorted list of slab indices and slab offsets to a cythonized sha256 hasher, which will feature a vectorized API very similar to
read_many_slices
.StagedChangesArray.commit()
calls the hasher:The hasher populates
StagedChangesArray.hash_tables
for all staged chunks (but, crucially, not for deleted chunks and without covering uninitialised memory that may be on the edge chunks).It also calculates the hash of the full chunk, as if it were C-contiguous.
CommitPlan
This new plan takes in input the full
StagedChangesArray.hash_tables
list.First, it scans hash_tables and uses it to compile a hashmap of
{hash key: (slab idx, slab offset)}
.Slabs with lower numeration overwrite the ones with higher numeration, meaning that
fill_value
, it will be replaced by the full chunkAll hashes are parsed - not just the ones referenced by a chunk - so that a chunk that was committed 2+ versions ago and then was deleted, but still exists in raw_data, it can be reused in the coming commit.
Once the map is compiled, we can once again scan for all staged chunks. This query is identical to the one we used in the HashPlan moments ago and the previous output should be reused for efficiency.
For each chunk, we now proceed to update slab_indices and slab_offsets with whatever we find in the hashmap (slab with lowest index wins). We have successfully deduplicated our staged chunks among each other and against the base chunk. Whenever a staged chunk already exists on the base slab, it now becomes just a reference to it.
Now we insert the new base slab just after the other base slabs. This is different from what we do normally, which is append at the end of the slabs list. CommitPlan itself will also create a new hash_tables entry, unlike the other MutatingPlans that just instruct StagedChangesArray to do so.
We scan slab_indices and slab_offsets again. Same query as before, but different result because of the deduplication pass we just completed.
We then compile a TransferPlan. This is the inverse of LoadPlan, where all the contents of the base slabs are copied to a new staged slab and then dropped. Here, we copy all the contents of the staged slabs (those that survived deduplication) to a new base slab and finally we drop all the staged slabs.
From InMemoryDataset/InMemorySparseDataset's POV
self.staged_changes.commit(empty=self.resize_raw_data_and_return_view)
where the callback resizesself.id.raw_data
and returns a view to the new surface.self.staged_changes.slabs
:slabs[0]
: the full slab (ignore it)slabs[1]
:raw_data
(may have been replaced with None)slabs[2]
: view to extendedraw_data
we just created with the callback.All data - but not the metadata yet - has been saved to disk.
staged_changes.hash_tables[2]
.Recreating the virtual dataset
At the end of the last stage we can transitionally resume with what we were doing in the legacy design: call
self.staged_changes.changes()
, compile adata_dict
, and pass it tocreate_virtual_dataset
.This also has the advantage that we don't need to think yet about what happens in the
create_dataset
function.In a later phase, however, we can rewrite
create_virtual_dataset
so that it acceptsslab_indices
andslab_offsets
directly, in a way that mirrorsbuild_slab_indices_and_offsets
and doesn't require a data_dict or ndindex.When this happens, the
data_dict
and theCommitPlan
can be retired.create_dataset
will need to be reimplemented on top ofStagedChangesArray.from_array(as_base=False)
, which will create staged slabs from views of the original data.Components
Phase 1:
Phase 2:
Phase 3:
Benchmarks
Breakdown of
vf.stage_version(...).__exit__()
to commit a dataset withshape=(20_000, 20_000)
(3 GiB)chunks=(100, 100)
(40_000 chunks worth 80 kiB each)and all chunks modified (write to local NVMe):
TODAY (as of #386)
All timings are measured with pyinstrument:
After phase 1 (move hashing and writing to StagedChangesArray).
Note: Early POC benchmarks show no benefit of calling openssl C hashers directly vs. hashlib at 10k points per chunk (80kb), and start seeing a substantial benefit at 1000 points per chunk (8 kb).
After phase 2 (rewrite create_virtual_dataset):
Out of scope bonus: cache on read
Downstream of this it becomes straightforward, if we wish to, to implement a switch that enables cache on read for getitem, like it used to happen before #386.
GetItemPlan becomes a MutatingPlan with the option, if the cache-on-read flag is on, to load a new staged slab from hdf5 and then read from it, just like SetItemPlan loads a new staged slab from hdf5 for the partially modified data and then updates it.
When GetItemPlan creates a new slab, it also appends a new array to hash_tables and populates it with a copy of the hashes it just copied. This means that HashPlan will later have an easy time skipping the hashing of the array.
LoadPlan likewise copies over the hashes from the base slabs to the staged slabs.
When SetItemPlan and ResizePlan update an existing slab, they instead zero out the updated chunks to signal that the hash needs to be (re)computed.
astype() completely wipes the hash tables.
refill() could partially wipe the hash tables only where it actually changes points, but for the sake of simplicity it's likely simpler to just wipe everything.
The text was updated successfully, but these errors were encountered: