diff --git a/docs/tutorials/loading_data.ipynb b/docs/tutorials/loading_data.ipynb index 1c24d8c..7df11a9 100644 --- a/docs/tutorials/loading_data.ipynb +++ b/docs/tutorials/loading_data.ipynb @@ -9,12 +9,17 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ + "import os\n", + "import tempfile\n", + "\n", + "import nested_dask as nd\n", "import nested_pandas as npd\n", - "import nested_dask as nd" + "from nested_dask import read_parquet\n", + "from nested_dask.datasets import generate_parquet_file" ] }, { @@ -28,9 +33,60 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/html": [ + "
Dask DataFrame Structure:
\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
abnested
npartitions=1
0int64int64nested<c: [int64], d: [int64]>
2.........
\n", + "
Dask Name: nestedframe, 2 expressions
" + ], + "text/plain": [ + "Dask NestedFrame Structure:\n", + " a b nested\n", + "npartitions=1 \n", + "0 int64 int64 nested\n", + "2 ... ... ...\n", + "Dask Name: nestedframe, 2 expressions\n", + "Expr=MapPartitions(NestedFrame)" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "# Create a Nested-Pandas NestedFrame\n", "nf = npd.NestedFrame(data={\"a\": [1, 2, 3], \"b\": [2, 4, 6]}, index=[0, 1, 2])\n", @@ -51,8 +107,681 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Loading from Parquet Files" + "## Loading from Parquet \n", + "\n", + "For larger datasets, we support loading data from parquet files.\n", + "\n", + "In the following cell, we generate a series of temporary parquet files with random data, and ingest them with the `read_parquet` method.\n", + "\n", + "Then we use `read_parquet` to read each layer's parquet files into their own `NestedFrame`. Then we again use `add_nested` to pack these into a single `NestedFrame`, `nf`.\n", + "\n", + "Note that for each layer of our `NestedFrame` we expect a directory of parquet files where each file will be its own [Dask partition](https://docs.dask.org/en/stable/dataframe-design.html#dataframe-design-partitions)." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Directory /var/folders/d7/c6jq9xpd2pz66frp8d20n2w00000gn/T/tmph8n4is_0/base has the following parquet files ['part.4.parquet', 'part.3.parquet', 'part.2.parquet', 'part.0.parquet', 'part.1.parquet'].\n", + "Directory /var/folders/d7/c6jq9xpd2pz66frp8d20n2w00000gn/T/tmph8n4is_0/nested1 has the following parquet files ['part.4.parquet', 'part.3.parquet', 'part.2.parquet', 'part.0.parquet', 'part.1.parquet'].\n", + "Directory /var/folders/d7/c6jq9xpd2pz66frp8d20n2w00000gn/T/tmph8n4is_0/nested2 has the following parquet files ['part.4.parquet', 'part.3.parquet', 'part.2.parquet', 'part.0.parquet', 'part.1.parquet'].\n" + ] + }, + { + "data": { + "text/html": [ + "
Dask DataFrame Structure:
\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
abnested1nested2
npartitions=5
float64float64nested<t: [double], flux: [double], band: [large_string]>nested<t: [double], flux: [double], band: [large_string]>
............
...............
............
............
\n", + "
Dask Name: getitem-fused-blockwisemerge, 1 expression
" + ], + "text/plain": [ + "Dask NestedFrame Structure:\n", + " a b nested1 nested2\n", + "npartitions=5 \n", + " float64 float64 nested nested\n", + " ... ... ... ...\n", + "... ... ... ... ...\n", + " ... ... ... ...\n", + " ... ... ... ...\n", + "Dask Name: getitem-fused-blockwisemerge, 1 expression\n", + "Expr=FromGraph(73b15af)" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "\n", + "nf = None\n", + "\n", + "# Note: that we use the `tempfile` module to create and then cleanup a temporary directory.\n", + "# You can of course remove this and use your own directory and real files on your system.\n", + "with tempfile.TemporaryDirectory() as temp_path:\n", + " # Generates parquet files with random data within our temporary directory.\n", + " generate_parquet_file(\n", + " 10, # The number of rows to generated in the base layer\n", + " {\"nested1\": 100, # Generate a nested layer named 'nested1' with 100 rows.\n", + " \"nested2\": 10}, # Generate a nested layer nameed 'nested2' with 10 rows.\n", + " temp_path, # The root temporary directory to store our generated parquet files.\n", + " npartitions=5, # The number of Dask partitions for each layer.\n", + " file_per_layer=True, # Generates a unique directory of parquet files for each layer\n", + " )\n", + "\n", + " # Note that each layer of our NestedFrame will be in its own directory,\n", + " # with a parquet file for each Dask partition.\n", + " parquet_dirs = [\n", + " os.path.join(temp_path, \"base\"),\n", + " os.path.join(temp_path, \"nested1\"),\n", + " os.path.join(temp_path, \"nested2\"),\n", + " ]\n", + " for path in parquet_dirs:\n", + " print(f'Directory {path} has the following parquet files {os.listdir(path)}.')\n", + "\n", + "\n", + " # Create a single NestedFrame for our base layer from the directory containing the parquet files\n", + " # for each of its partitions.\n", + " nf = read_parquet(path=os.path.join(temp_path, \"base\"))\n", + "\n", + " # Read the nested layers from their respective directories.\n", + " nested1 = read_parquet(os.path.join(temp_path, \"nested1\"))\n", + " nested1 = nested1.persist()\n", + " nested2 = read_parquet(os.path.join(temp_path, \"nested2\"))\n", + " nested2 = nested2.persist()\n", + "\n", + " # Add the nested layers to the NestedFrame.\n", + " nf = nf.add_nested(nested1, \"nested1\")\n", + " nf = nf.add_nested(nested2, \"nested2\")\n", + "\n", + " # Here we have Dask 'persist' the data in memory now so that we don't have to read it from\n", + " # the source parquet files again (as it may try to do due to lazy evaluation).\n", + " # This is particularly useful since it forces Dask to read the data\n", + " # from the temporary parquet files before they are deleted rather than\n", + " nf = nf.persist()\n", + "\n", + "nf\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Note that we can use Dask's `compute()` to see the fully evaluate our dataframe." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
abnested1nested2
00.7149510.420148t flux band\n", + "0 19.353302 34....t flux band\n", + "0 6.712161 19.92...
00.7149510.420148t flux band\n", + "0 19.353302 34....t flux band\n", + "0 18.370752 36....
00.7149510.420148t flux band\n", + "0 19.353302 34....t flux band\n", + "0 18.885416 71....
00.7149510.420148t flux band\n", + "0 19.353302 34....t flux band\n", + "0 9.353898 20.14...
00.7149510.420148t flux band\n", + "0 19.353302 34....t flux band\n", + "0 9.162827 80.84...
...............
268NaNNaNt flux band\n", + "0 18.694918 77....None
270NaNNaNt flux band\n", + "0 13.198498 68....None
283NaNNaNt flux band\n", + "0 13.030202 9.20...None
286NaNNaNt flux band\n", + "0 14.276824 99....None
298NaNNaNt flux band\n", + "0 2.894067 56.75...None
\n", + "

1525 rows × 4 columns

\n", + "
" + ], + "text/plain": [ + " a b nested1 \\\n", + "0 0.714951 0.420148 t flux band\n", + "0 19.353302 34.... \n", + "0 0.714951 0.420148 t flux band\n", + "0 19.353302 34.... \n", + "0 0.714951 0.420148 t flux band\n", + "0 19.353302 34.... \n", + "0 0.714951 0.420148 t flux band\n", + "0 19.353302 34.... \n", + "0 0.714951 0.420148 t flux band\n", + "0 19.353302 34.... \n", + ".. ... ... ... \n", + "268 NaN NaN t flux band\n", + "0 18.694918 77.... \n", + "270 NaN NaN t flux band\n", + "0 13.198498 68.... \n", + "283 NaN NaN t flux band\n", + "0 13.030202 9.20... \n", + "286 NaN NaN t flux band\n", + "0 14.276824 99.... \n", + "298 NaN NaN t flux band\n", + "0 2.894067 56.75... \n", + "\n", + " nested2 \n", + "0 t flux band\n", + "0 6.712161 19.92... \n", + "0 t flux band\n", + "0 18.370752 36.... \n", + "0 t flux band\n", + "0 18.885416 71.... \n", + "0 t flux band\n", + "0 9.353898 20.14... \n", + "0 t flux band\n", + "0 9.162827 80.84... \n", + ".. ... \n", + "268 None \n", + "270 None \n", + "283 None \n", + "286 None \n", + "298 None \n", + "\n", + "[1525 rows x 4 columns]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "nf.compute()" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Saving NestedFrames to Parquet Files\n", + "\n", + "Additionally we can save an existing `NestedFrame` as a collection of parquet files using `NestedFrame.to_parquet`\n", + "\n", + "We save each layer to its own directory, and each Dask partition for that layer to its own parquet file within that directory.\n", + "\n", + "The base layer will be outputted to a directory named \"base\", and each nested layer will be written to a directory based on its respective column name. \n", + "\n", + "So the nested layer in column `nested1` will be written to directory \"nested1\"." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "The NestedFrame was saved to the following directories : ['nested1', 'nested2', 'base']\n" + ] + }, + { + "data": { + "text/html": [ + "
Dask DataFrame Structure:
\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
abnested1nested2
npartitions=5
float64float64nested<t: [double], flux: [double], band: [large_string]>nested<t: [double], flux: [double], band: [large_string]>
............
...............
............
............
\n", + "
Dask Name: getitem-fused-blockwisemerge, 1 expression
" + ], + "text/plain": [ + "Dask NestedFrame Structure:\n", + " a b nested1 nested2\n", + "npartitions=5 \n", + " float64 float64 nested nested\n", + " ... ... ... ...\n", + "... ... ... ... ...\n", + " ... ... ... ...\n", + " ... ... ... ...\n", + "Dask Name: getitem-fused-blockwisemerge, 1 expression\n", + "Expr=FromGraph(99d16c1)" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "restored_nf = None\n", + "\n", + "# Note: that we use the `tempfile` module to create and then cleanup a temporary directory.\n", + "# You can of course remove this and use your own directory and real files on your system.\n", + "with tempfile.TemporaryDirectory() as temp_path:\n", + " nf.to_parquet(\n", + " temp_path, # The directory to save our output parquet files.\n", + " by_layer=True, # Each layer will be saved in its own sub directory.\n", + " )\n", + " # List the files in temp_path to ensure they were saved correctly.\n", + " print(\"The NestedFrame was saved to the following directories :\", os.listdir(temp_path))\n", + "\n", + " # Read the NestedFrame back in from our saved parquet files.\n", + " restored_nf = read_parquet(os.path.join(temp_path, \"base\"))\n", + "\n", + " # Read the nested layers from their respective directories.\n", + " nested1 = read_parquet(os.path.join(temp_path, \"nested1\"))\n", + " nested2 = read_parquet(os.path.join(temp_path, \"nested2\"))\n", + "\n", + " # Add the nested layers to the NestedFrame.\n", + " restored_nf = restored_nf.add_nested(nested1, \"nested1\")\n", + " restored_nf = restored_nf.add_nested(nested2, \"nested2\")\n", + "\n", + " # Here we have Dask 'persist' the data in memory now so that we don't have to read it from\n", + " # the source parquet files again (as it may try to do due to lazy evaluation).\n", + " # This is particularly useful since it forces Dask to read the data\n", + " # from the temporary parquet files before they are deleted rather than\n", + " restored_nf = restored_nf.persist()\n", + "\n", + "restored_nf # our dataframe is restored from our saved parquet files" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
abnested1nested2
00.7149510.420148t flux band\n", + "0 19.353302 ...t flux band\n", + "0 6.712161 ...
00.7149510.420148t flux band\n", + "0 19.353302 ...t flux band\n", + "0 6.712161 ...
00.7149510.420148t flux band\n", + "0 19.353302 ...t flux band\n", + "0 6.712161 ...
00.7149510.420148t flux band\n", + "0 19.353302 ...t flux band\n", + "0 6.712161 ...
00.7149510.420148t flux band\n", + "0 19.353302 ...t flux band\n", + "0 6.712161 ...
...............
268NaNNaNt flux band\n", + "0 18.694918 77....None
270NaNNaNt flux band\n", + "0 13.198498 68....None
283NaNNaNt flux band\n", + "0 13.030202 9.20...None
286NaNNaNt flux band\n", + "0 14.276824 99....None
298NaNNaNt flux band\n", + "0 2.894067 56.75...None
\n", + "

1525 rows × 4 columns

\n", + "
" + ], + "text/plain": [ + " a b nested1 \\\n", + "0 0.714951 0.420148 t flux band\n", + "0 19.353302 ... \n", + "0 0.714951 0.420148 t flux band\n", + "0 19.353302 ... \n", + "0 0.714951 0.420148 t flux band\n", + "0 19.353302 ... \n", + "0 0.714951 0.420148 t flux band\n", + "0 19.353302 ... \n", + "0 0.714951 0.420148 t flux band\n", + "0 19.353302 ... \n", + ".. ... ... ... \n", + "268 NaN NaN t flux band\n", + "0 18.694918 77.... \n", + "270 NaN NaN t flux band\n", + "0 13.198498 68.... \n", + "283 NaN NaN t flux band\n", + "0 13.030202 9.20... \n", + "286 NaN NaN t flux band\n", + "0 14.276824 99.... \n", + "298 NaN NaN t flux band\n", + "0 2.894067 56.75... \n", + "\n", + " nested2 \n", + "0 t flux band\n", + "0 6.712161 ... \n", + "0 t flux band\n", + "0 6.712161 ... \n", + "0 t flux band\n", + "0 6.712161 ... \n", + "0 t flux band\n", + "0 6.712161 ... \n", + "0 t flux band\n", + "0 6.712161 ... \n", + ".. ... \n", + "268 None \n", + "270 None \n", + "283 None \n", + "286 None \n", + "298 None \n", + "\n", + "[1525 rows x 4 columns]" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "restored_nf.compute()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { @@ -71,7 +800,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.11" + "version": "3.12.3" } }, "nbformat": 4, diff --git a/src/nested_dask/datasets/generation.py b/src/nested_dask/datasets/generation.py index 2d4b9ee..f5ef970 100644 --- a/src/nested_dask/datasets/generation.py +++ b/src/nested_dask/datasets/generation.py @@ -40,3 +40,33 @@ def generate_data(n_base, n_layer, npartitions=1, seed=None) -> nd.NestedFrame: base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=npartitions) return base_nf + + +def generate_parquet_file(n_base, n_layer, path, file_per_layer=True, npartitions=1, seed=None): + """Generates a toy dataset and outputs it to one or more parquet files. + + Parameters + ---------- + n_base : int + The number of rows to generate for the base layer + n_layer : int, or dict + The number of rows per n_base row to generate for a nested layer. + Alternatively, a dictionary of layer label, layer_size pairs may be + specified to created multiple nested columns with custom sizing. + path : str, + The path to the parquet file to write to if `file_per_layer` is `False`, + and otherwise the path to the directory to write the parquet file for + each layer. + file_per_layer : bool, default=True + TODO: Currently only True is supported. + If True, write each layer to its own parquet file. Otherwise, write + the generated to a single parquet file representing a nested dataset. + seed : int, default=None + A seed to use for random generation of data + + Returns + ------- + None + """ + nf = generate_data(n_base, n_layer, npartitions, seed) + nf.to_parquet(path, by_layer=file_per_layer, write_index=False)