Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign the commit stage #398

Open
crusaderky opened this issue Oct 31, 2024 · 0 comments
Open

Redesign the commit stage #398

crusaderky opened this issue Oct 31, 2024 · 0 comments
Assignees

Comments

@crusaderky
Copy link
Collaborator

Downstream of #386, the commit stage - when you exit the stage_version(...) context manager - is by far what takes the longest time. This is unsurprising, as #386 didn't touch it.

We should extend the redesign of #386 to everything that happens afterwards, which means hashing and writing to disk.
The StagedChangesArray introduced in #386 offers solid infrastructure to build on.

Phases

This rewrite can happen in separate phases, each with an incremental benefit to production:

  1. push into the StagedChangesArray the hashing phase and the writing to hdf5 when writing from a InMemoryDataset or InMemorySparseDataset, leaving the creation of the virtual dataset to the legacy code;
  2. rewrite the creation of the virtual dataset downstream of InMemoryDataset or InMemorySparseDataset in a way that natively interfaces with the metadata of the StagedChangesArray, can be much more aggressively cythonized, and doesn't use the data_dict nor ndindex at any point;
  3. rewrite create_dataset and all other machinery on top of the StagedChangesArray so that the legacy code can be retired.

Performance expectations

Committing a 3 GiB dataset with 40,000x 80 kiB chunks (all chunks modified and different from the previous values) takes

  • 12.7s today (as measured on NVMe Linux desktop)
  • ~9.1s expected after phase 1
  • ~5.8s expected after phase 2

A granular breakdown of these numbers appers later in this document.

Low level design

NOTE: this section expects the reader to be already familiar with the low level design of #386. If you aren't, please stop here and read https://github.com/deshaw/versioned-hdf5/blob/17c687d3e1f1bfb64781a7b9039dbb621f5a46b4/docs/staged_changes.rst first.

Create a new StagedChangesArray.hash_tables list, matching the slabs list.
Every slab has a matching hash_table with dtype=hsize_t and shape=(n, 4), where n is the number of chunks in the slab (so every row can contain exactly one sha256 hash).

hash_tables[i][j, :] is the hash for the chunk on the slab at slabs[i][j * chunk_size[0]:(j + 1) * chunk_size[0]].

New requirement: base slabs must be aligned to chunk_sizes[0]. This is de facto already true for the raw_data slab.

New hash tables are created with np.zeros together with the matching slabs. A hash set to 0 means that it's uninitialised.

The hash_tables entry for the raw_data base slab is an actual np.ndarray loaded from disk, not a h5py.Dataset.

Committing changes

To "commit" is the act of taking all chunks that lie on a staged slab and moving them to a new base slab. The new base slab is created by a callback that must replicate the API of numpy.empty. InMemoryDataset, instead of creating a new empty numpy array, will enlarge raw_data and then return an ad-hoc view to it (using a thin wrapper class).

Committing is broken down in two stages: HashPlan and CommitPlan.

HashPlan

This new very simple plan finds all staged chunks and passes the sorted list of slab indices and slab offsets to a cythonized sha256 hasher, which will feature a vectorized API very similar to read_many_slices.

StagedChangesArray.commit() calls the hasher:

    for slab_idx, instructions in plan.hashes:
       slab = self._get_slab(slab_idx)
       hash_table = self._get_hash_table(slab_idx)
       hasher(slab, hash_table, instructions)  # pure C

The hasher populates StagedChangesArray.hash_tables for all staged chunks (but, crucially, not for deleted chunks and without covering uninitialised memory that may be on the edge chunks).
It also calculates the hash of the full chunk, as if it were C-contiguous.

CommitPlan

This new plan takes in input the full StagedChangesArray.hash_tables list.

First, it scans hash_tables and uses it to compile a hashmap of {hash key: (slab idx, slab offset)}.

Slabs with lower numeration overwrite the ones with higher numeration, meaning that

  • if a real chunk is actually full of fill_value, it will be replaced by the full chunk
  • if a chunk on a staged slab is identical to one on a base slab, it will be replaced by the one on the base slab.

All hashes are parsed - not just the ones referenced by a chunk - so that a chunk that was committed 2+ versions ago and then was deleted, but still exists in raw_data, it can be reused in the coming commit.

Once the map is compiled, we can once again scan for all staged chunks. This query is identical to the one we used in the HashPlan moments ago and the previous output should be reused for efficiency.

For each chunk, we now proceed to update slab_indices and slab_offsets with whatever we find in the hashmap (slab with lowest index wins). We have successfully deduplicated our staged chunks among each other and against the base chunk. Whenever a staged chunk already exists on the base slab, it now becomes just a reference to it.

Now we insert the new base slab just after the other base slabs. This is different from what we do normally, which is append at the end of the slabs list. CommitPlan itself will also create a new hash_tables entry, unlike the other MutatingPlans that just instruct StagedChangesArray to do so.

We scan slab_indices and slab_offsets again. Same query as before, but different result because of the deduplication pass we just completed.

We then compile a TransferPlan. This is the inverse of LoadPlan, where all the contents of the base slabs are copied to a new staged slab and then dropped. Here, we copy all the contents of the staged slabs (those that survived deduplication) to a new base slab and finally we drop all the staged slabs.

From InMemoryDataset/InMemorySparseDataset's POV

  1. Commit starts: there is 1 base slab, which is raw_data, and 0 or more staged slabs.
  2. Call self.staged_changes.commit(empty=self.resize_raw_data_and_return_view) where the callback resizes self.id.raw_data and returns a view to the new surface.
  3. There are now exactly 3 elements in self.staged_changes.slabs:
    • slabs[0]: the full slab (ignore it)
    • slabs[1]: raw_data (may have been replaced with None)
    • slabs[2]: view to extended raw_data we just created with the callback.

All data - but not the metadata yet - has been saved to disk.

  1. We can now resize the hash table in the hdf5 dataset and append to it staged_changes.hash_tables[2].
  2. finally, we hack the metadata:
   self.staged_changes.slab_offsets[
       self.staged_changes.slab_indices == 2
   ] += previous_raw_data_len

Recreating the virtual dataset

At the end of the last stage we can transitionally resume with what we were doing in the legacy design: call self.staged_changes.changes(), compile a data_dict, and pass it to create_virtual_dataset.

This also has the advantage that we don't need to think yet about what happens in the create_dataset function.

In a later phase, however, we can rewrite create_virtual_dataset so that it accepts slab_indices and slab_offsets directly, in a way that mirrors build_slab_indices_and_offsets and doesn't require a data_dict or ndindex.
When this happens, the data_dict and the CommitPlan can be retired.

create_dataset will need to be reimplemented on top of StagedChangesArray.from_array(as_base=False), which will create staged slabs from views of the original data.

Components

Phase 1:

  1. Add hash table to InMemoryDatasetID, next to raw_data
  2. staged_changes.py: StagedChangesArray.commit(), HashPlan, CommitPlan
  3. Write cythonized hasher
  4. integration in InMemoryDataset (resize raw_data; append to hashes)

Phase 2:

  1. Rewrite create_virtual_dataset

Phase 3:

  1. Rewrite new_dataset on top of StagedChangesArray (needed to delete legacy code)

Benchmarks

Breakdown of vf.stage_version(...).__exit__() to commit a dataset with

  • shape=(20_000, 20_000) (3 GiB)
  • chunks=(100, 100) (40_000 chunks worth 80 kiB each)

and all chunks modified (write to local NVMe):
TODAY (as of #386)
All timings are measured with pyinstrument:

 1.3s InMemoryDataset.data_dict
 1.6s openssl_sha256
 0.9s Hashtable control code
 5.7s Dataset.__setitem__
 2.7s create_virtual_dataset
 0.5s misc control code
12.7s TOTAL

After phase 1 (move hashing and writing to StagedChangesArray).

 0.4s plans formulation (guesstimate)
 1.6s hash_slabs (measured on POC code)
 3.1s read_many_slices calls H5Dwrite for each chunk (measured on POC code)
 1.3s InMemoryDataset.data_dict (same as prod)
 2.7s create_virtual_dataset (same as prod)
 9.1s TOTAL

Note: Early POC benchmarks show no benefit of calling openssl C hashers directly vs. hashlib at 10k points per chunk (80kb), and start seeing a substantial benefit at 1000 points per chunk (8 kb).

After phase 2 (rewrite create_virtual_dataset):

 0.4s plans formulation (guesstimate)
 1.6s hash_slabs (measured on POC code)
 3.1s read_many_slices calls H5Dwrite for each chunk (measured on POC code)
 0.7s create_virtual_dataset (guesstimate)
 5.8s TOTAL

Out of scope bonus: cache on read

Downstream of this it becomes straightforward, if we wish to, to implement a switch that enables cache on read for getitem, like it used to happen before #386.

GetItemPlan becomes a MutatingPlan with the option, if the cache-on-read flag is on, to load a new staged slab from hdf5 and then read from it, just like SetItemPlan loads a new staged slab from hdf5 for the partially modified data and then updates it.

When GetItemPlan creates a new slab, it also appends a new array to hash_tables and populates it with a copy of the hashes it just copied. This means that HashPlan will later have an easy time skipping the hashing of the array.

LoadPlan likewise copies over the hashes from the base slabs to the staged slabs.

When SetItemPlan and ResizePlan update an existing slab, they instead zero out the updated chunks to signal that the hash needs to be (re)computed.
astype() completely wipes the hash tables.
refill() could partially wipe the hash tables only where it actually changes points, but for the sake of simplicity it's likely simpler to just wipe everything.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant