Skip to content

Commit

Permalink
Merge pull request #23 from lincc-frameworks/nits
Browse files Browse the repository at this point in the history
Add parquet handling to data loading tutorial
  • Loading branch information
wilsonbb authored May 30, 2024
2 parents dac5188 + c14142e commit e83baef
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 4 deletions.
5 changes: 4 additions & 1 deletion docs/gettingstarted/quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@
"metadata": {},
"source": [
"The above is a Nested-Dask `NestedFrame` object. It's currently a \"lazy\" representation of the data, meaning that no data has actually been brought into memory yet. This lazy view gives us some useful information on the structure of the data, with notable pieces of information being:\n",
"\n",
"* Shows us which columns are in the dataset and their respective dtypes.\n",
"\n",
"* `npartitions=1` indicates how many partitions the dataset has been split into.\n",
"\n",
"* The `0` and `9` tell us the \"divisions\" of the partitions. When the dataset is sorted by the index, these divisions are ranges to show which index values reside in each partition.\n",
"\n",
"We can signal to Dask that we'd like to actually obtain the data as `nested_pandas.NestedFrame` by using `compute`."
Expand Down Expand Up @@ -183,7 +186,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
"version": "3.12.3"
}
},
"nbformat": 4,
Expand Down
160 changes: 157 additions & 3 deletions docs/tutorials/loading_data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import tempfile\n",
"\n",
"import nested_dask as nd\n",
"import nested_pandas as npd\n",
"import nested_dask as nd"
"from nested_dask import read_parquet\n",
"from nested_dask.datasets import generate_parquet_file"
]
},
{
Expand Down Expand Up @@ -51,8 +56,157 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Loading from Parquet Files"
"## Loading from Parquet \n",
"\n",
"For larger datasets, we support loading data from parquet files.\n",
"\n",
"In the following cell, we generate a series of temporary parquet files with random data, and ingest them with the `read_parquet` method.\n",
"\n",
"Then we use `read_parquet` to read each layer's parquet files into their own `NestedFrame`. Then we again use `add_nested` to pack these into a single `NestedFrame`, `nf`.\n",
"\n",
"Note that for each layer of our `NestedFrame` we expect a directory of parquet files where each file will be its own [Dask partition](https://docs.dask.org/en/stable/dataframe-design.html#dataframe-design-partitions)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"nf = None\n",
"\n",
"# Note: that we use the `tempfile` module to create and then cleanup a temporary directory.\n",
"# You can of course remove this and use your own directory and real files on your system.\n",
"with tempfile.TemporaryDirectory() as temp_path:\n",
" # Generates parquet files with random data within our temporary directory.\n",
" generate_parquet_file(\n",
" 10, # The number of rows to generated in the base layer\n",
" {\n",
" \"nested1\": 100, # Generate a nested layer named 'nested1' with 100 rows.\n",
" \"nested2\": 10,\n",
" }, # Generate a nested layer nameed 'nested2' with 10 rows.\n",
" temp_path, # The root temporary directory to store our generated parquet files.\n",
" npartitions=5, # The number of Dask partitions for each layer.\n",
" file_per_layer=True, # Generates a unique directory of parquet files for each layer\n",
" )\n",
"\n",
" # Note that each layer of our NestedFrame will be in its own directory,\n",
" # with a parquet file for each Dask partition.\n",
" parquet_dirs = [\n",
" os.path.join(temp_path, \"base\"),\n",
" os.path.join(temp_path, \"nested1\"),\n",
" os.path.join(temp_path, \"nested2\"),\n",
" ]\n",
" for path in parquet_dirs:\n",
" print(f\"Directory {path} has the following parquet files {os.listdir(path)}.\")\n",
"\n",
" # Create a single NestedFrame for our base layer from the directory containing the parquet files\n",
" # for each of its partitions.\n",
" nf = read_parquet(path=os.path.join(temp_path, \"base\"))\n",
"\n",
" # Read the nested layers from their respective directories.\n",
" nested1 = read_parquet(os.path.join(temp_path, \"nested1\"))\n",
" nested1 = nested1.persist()\n",
" nested2 = read_parquet(os.path.join(temp_path, \"nested2\"))\n",
" nested2 = nested2.persist()\n",
"\n",
" # Add the nested layers to the NestedFrame.\n",
" nf = nf.add_nested(nested1, \"nested1\")\n",
" nf = nf.add_nested(nested2, \"nested2\")\n",
"\n",
" # Here we have Dask 'persist' the data in memory now so that we don't have to read it from\n",
" # the source parquet files again (as it may try to do due to lazy evaluation).\n",
" # This is particularly useful since it forces Dask to read the data\n",
" # from the temporary parquet files before they are deleted rather than\n",
" nf = nf.persist()\n",
"\n",
"nf"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that we can use Dask's `compute()` to fully evaluate our dataframe."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"nf.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Saving NestedFrames to Parquet Files\n",
"\n",
"Additionally we can save an existing `NestedFrame` as a collection of parquet files using `NestedFrame.to_parquet`\n",
"\n",
"We save each layer to its own directory, and each Dask partition for that layer to its own parquet file within that directory.\n",
"\n",
"The base layer will be outputted to a directory named \"base\", and each nested layer will be written to a directory based on its respective column name. \n",
"\n",
"So the nested layer in column `nested1` will be written to directory \"nested1\"."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"restored_nf = None\n",
"\n",
"# Note: that we use the `tempfile` module to create and then cleanup a temporary directory.\n",
"# You can of course remove this and use your own directory and real files on your system.\n",
"with tempfile.TemporaryDirectory() as temp_path:\n",
" nf.to_parquet(\n",
" temp_path, # The directory to save our output parquet files.\n",
" by_layer=True, # Each layer will be saved in its own sub directory.\n",
" )\n",
" # List the files in temp_path to ensure they were saved correctly.\n",
" print(\"The NestedFrame was saved to the following directories :\", os.listdir(temp_path))\n",
"\n",
" # Read the NestedFrame back in from our saved parquet files.\n",
" restored_nf = read_parquet(os.path.join(temp_path, \"base\"))\n",
"\n",
" # Read the nested layers from their respective directories.\n",
" nested1 = read_parquet(os.path.join(temp_path, \"nested1\"))\n",
" nested2 = read_parquet(os.path.join(temp_path, \"nested2\"))\n",
"\n",
" # Add the nested layers to the NestedFrame.\n",
" restored_nf = restored_nf.add_nested(nested1, \"nested1\")\n",
" restored_nf = restored_nf.add_nested(nested2, \"nested2\")\n",
"\n",
" # Here we have Dask 'persist' the data in memory now so that we don't have to read it from\n",
" # the source parquet files again (as it may try to do due to lazy evaluation).\n",
" # This is particularly useful since it forces Dask to read the data\n",
" # from the temporary parquet files before they are deleted rather than\n",
" restored_nf = restored_nf.persist()\n",
"\n",
"restored_nf # our dataframe is restored from our saved parquet files"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"restored_nf.compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand All @@ -71,7 +225,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
"version": "3.12.3"
}
},
"nbformat": 4,
Expand Down
32 changes: 32 additions & 0 deletions src/nested_dask/datasets/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,35 @@ def generate_data(n_base, n_layer, npartitions=1, seed=None) -> nd.NestedFrame:
base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=npartitions)

return base_nf


def generate_parquet_file(n_base, n_layer, path, file_per_layer=True, npartitions=1, seed=None):
"""Generates a toy dataset and outputs it to one or more parquet files.
Parameters
----------
n_base : int
The number of rows to generate for the base layer
n_layer : int, or dict
The number of rows per n_base row to generate for a nested layer.
Alternatively, a dictionary of layer label, layer_size pairs may be
specified to created multiple nested columns with custom sizing.
path : str,
The path to the parquet file to write to if `file_per_layer` is `False`,
and otherwise the path to the directory to write the parquet file for
each layer.
file_per_layer : bool, default=True
TODO: Currently only True is supported.
If True, write each layer to its own parquet file. Otherwise, write
the generated to a single parquet file representing a nested dataset.
npartitions : int, default=1
The number of Dask partitions to split the generated data into for each layer.
seed : int, default=None
A seed to use for random generation of data
Returns
-------
None
"""
nf = generate_data(n_base, n_layer, npartitions, seed)
nf.to_parquet(path, by_layer=file_per_layer, write_index=False)

0 comments on commit e83baef

Please sign in to comment.