From 3dea29ff3dc0af37546587c5c4b994161162b00d Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Thu, 16 May 2024 15:23:14 -0700 Subject: [PATCH] Support writing nested frames to parquet files (#83) * Have read_parquet accept an already packed file * Add support for generating parquet files * Update io testing * Ruff fixes * Lint fixes * Pre-commit fixes --- src/nested_pandas/datasets/generation.py | 29 ++++++++ src/nested_pandas/nestedframe/core.py | 49 ++++++++++++++ src/nested_pandas/nestedframe/io.py | 10 +-- tests/nested_pandas/nestedframe/test_io.py | 78 +++++++++++++++++++++- 4 files changed, 161 insertions(+), 5 deletions(-) diff --git a/src/nested_pandas/datasets/generation.py b/src/nested_pandas/datasets/generation.py index f50bfe1..d305418 100644 --- a/src/nested_pandas/datasets/generation.py +++ b/src/nested_pandas/datasets/generation.py @@ -53,3 +53,32 @@ def generate_data(n_base, n_layer, seed=None) -> NestedFrame: return base_nf else: raise TypeError("Input to n_layer is not an int or dict.") + + +def generate_parquet_file(n_base, n_layer, path, file_per_layer=False, seed=None): + """Generates a toy dataset and outputs it to one or more parquet files. + + Parameters + ---------- + n_base : int + The number of rows to generate for the base layer + n_layer : int, or dict + The number of rows per n_base row to generate for a nested layer. + Alternatively, a dictionary of layer label, layer_size pairs may be + specified to created multiple nested columns with custom sizing. + path : str, + The path to the parquet file to write to if `file_per_layer` is `False`, + and otherwise the path to the directory to write the parquet file for + each layer. + file_per_layer : bool, default=False + If True, write each layer to its own parquet file. Otherwise, write + the generated to a single parquet file representing a nested dataset. + seed : int, default=None + A seed to use for random generation of data + + Returns + ------- + None + """ + nf = generate_data(n_base, n_layer, seed) + nf.to_parquet(path, by_layer=file_per_layer) diff --git a/src/nested_pandas/nestedframe/core.py b/src/nested_pandas/nestedframe/core.py index 71dd33f..0358811 100644 --- a/src/nested_pandas/nestedframe/core.py +++ b/src/nested_pandas/nestedframe/core.py @@ -1,6 +1,8 @@ # typing.Self and "|" union syntax don't exist in Python 3.9 from __future__ import annotations +import os + import numpy as np import pandas as pd import pyarrow as pa @@ -438,3 +440,50 @@ def translate_cols(frame, layer, col): axis=1, # to apply func on each row of our nested frame) ) return result + + def to_parquet(self, path, by_layer=False, **kwargs) -> None: + """Creates parquet file(s) with the data of a NestedFrame, either + as a single parquet file where each nested dataset is packed into its + own column or as an individual parquet file for each layer. + + Note that here we always opt to use the pyarrow engine for writing + parquet files. + + Parameters + ---------- + path : str + The path to the parquet file to be written if 'by_layer' is False. + If 'by_layer' is True, this should be the path to an existing. + by_layer : bool, default False + If False, writes the entire NestedFrame to a single parquet file. + + If True, writes each layer to a separate parquet file within the + directory specified by path. The filename for each outputted file will + be named after its layer and then the ".parquet" extension. + For example for the base layer this is always "base.parquet". + kwargs : keyword arguments, optional + Keyword arguments to pass to the function. + + Returns + ------- + None + """ + if not by_layer: + # We just defer to the pandas to_parquet method if we're not writing by layer + # or there is only one layer in the NestedFrame. + super().to_parquet(path, engine="pyarrow", **kwargs) + else: + # If we're writing by layer, path must be an existing directory + if not os.path.isdir(path): + raise ValueError("The provided path must be an existing directory if by_layer=True") + + # Write the base layer to a parquet file + base_frame = self.drop(columns=self.nested_columns, inplace=False) + base_frame.to_parquet(os.path.join(path, "base.parquet"), by_layer=False, **kwargs) + + # Write each nested layer to a parquet file + for layer in self.all_columns: + if layer != "base": + path_layer = os.path.join(path, f"{layer}.parquet") + self[layer].nest.to_flat().to_parquet(path_layer, engine="pyarrow", **kwargs) + return None diff --git a/src/nested_pandas/nestedframe/io.py b/src/nested_pandas/nestedframe/io.py index e0f773f..ebf2f5e 100644 --- a/src/nested_pandas/nestedframe/io.py +++ b/src/nested_pandas/nestedframe/io.py @@ -14,7 +14,7 @@ def read_parquet( data: FilePath | ReadBuffer[bytes], - to_pack: dict, + to_pack: dict | None = None, columns: list[str] | None = None, pack_columns: dict | None = None, dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default, @@ -40,10 +40,11 @@ def read_parquet( partitioned parquet files. Both pyarrow and fastparquet support paths to directories as well as file URLs. A directory path could be: ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``. - to_pack: dict, + to_pack: dict, default=None A dictionary of parquet data paths (same criteria as `data`), where each key reflects the desired column name to pack the data into and - each value reflects the parquet data to pack. + each value reflects the parquet data to pack. If None, it assumes + that any data to pack is already packed as a column within `data`. columns : list, default=None If not None, only these columns will be read from the file. pack_columns: dict, default=None @@ -64,7 +65,8 @@ def read_parquet( """ df = NestedFrame(pd.read_parquet(data, engine="pyarrow", columns=columns, dtype_backend=dtype_backend)) - + if to_pack is None: + return df for pack_key in to_pack: col_subset = pack_columns.get(pack_key, None) if pack_columns is not None else None packed = pd.read_parquet( diff --git a/tests/nested_pandas/nestedframe/test_io.py b/tests/nested_pandas/nestedframe/test_io.py index 8037755..1ee8a1a 100644 --- a/tests/nested_pandas/nestedframe/test_io.py +++ b/tests/nested_pandas/nestedframe/test_io.py @@ -1,8 +1,10 @@ import os +import tempfile import pandas as pd import pytest -from nested_pandas import read_parquet +from nested_pandas import NestedFrame, read_parquet +from pandas.testing import assert_frame_equal @pytest.mark.parametrize("columns", [["a"], None]) @@ -57,3 +59,77 @@ def test_read_parquet(tmp_path, columns, pack_columns): assert nf[nested_col].nest.fields == nested1.columns.tolist() elif nested_col == "nested2": assert nf[nested_col].nest.fields == nested2.columns.tolist() + + +def test_write_packed_parquet(): + """Tests writing a nested frame to a single parquet file.""" + # Generate some test data + base = pd.DataFrame(data={"a": [1, 2, 3], "b": [2, 4, 6]}, index=[0, 1, 2]) + + nested1 = pd.DataFrame( + data={"c": [0, 2, 4, 1, 4, 3, 1, 4, 1], "d": [5, 4, 7, 5, 3, 1, 9, 3, 4]}, + index=[0, 0, 0, 1, 1, 1, 2, 2, 2], + ) + + nested2 = pd.DataFrame( + data={"e": [0, 2, 4, 1, 4, 3, 1, 4, 1], "f": [5, 4, 7, 5, 3, 1, 9, 3, 4]}, + index=[0, 0, 0, 1, 1, 1, 2, 2, 2], + ) + + # Construct the NestedFrame + nf = NestedFrame(base).add_nested(nested1, name="nested1").add_nested(nested2, name="nested2") + + # Write to parquet using a named temporary file + temp = tempfile.NamedTemporaryFile(suffix=".parquet") + nf.to_parquet(temp.name) + + # Read from parquet + nf2 = read_parquet(temp.name) + assert_frame_equal(nf, nf2) + + +def test_write_parquet_by_layer(): + """Tests writing a nested frame to multiple parquet files.""" + base = pd.DataFrame(data={"a": [1, 2, 3], "b": [2, 4, 6]}, index=[0, 1, 2]) + + nested1 = pd.DataFrame( + data={"c": [0, 2, 4, 1, 4, 3, 1, 4, 1], "d": [5, 4, 7, 5, 3, 1, 9, 3, 4]}, + index=[0, 0, 0, 1, 1, 1, 2, 2, 2], + ) + + nested2 = pd.DataFrame( + data={"e": [0, 2, 4, 1, 4, 3, 1, 4, 1], "f": [5, 4, 7, 5, 3, 1, 9, 3, 4]}, + index=[0, 0, 0, 1, 1, 1, 2, 2, 2], + ) + + # Construct the NestedFrame + nf = NestedFrame(base).add_nested(nested1, name="nested1").add_nested(nested2, name="nested2") + + # Asser that a temporary file path must be a directory when by_layer is True + with pytest.raises(ValueError): + nf.to_parquet(tempfile.NamedTemporaryFile(suffix=".parquet").name, by_layer=True) + + # Write to parquet using a named temporary file + tmp_dir = tempfile.TemporaryDirectory() + nf.to_parquet(tmp_dir.name, by_layer=True) + + # Validate the individual layers were correctly saved as their own parquet files + read_base_frame = read_parquet(os.path.join(tmp_dir.name, "base.parquet"), to_pack=None) + assert_frame_equal(read_base_frame, nf.drop(columns=["nested1", "nested2"])) + + read_nested1 = read_parquet(os.path.join(tmp_dir.name, "nested1.parquet"), to_pack=None) + assert_frame_equal(read_nested1, nf["nested1"].nest.to_flat()) + + read_nested2 = read_parquet(os.path.join(tmp_dir.name, "nested2.parquet"), to_pack=None) + assert_frame_equal(read_nested2, nf["nested2"].nest.to_flat()) + + # Validate the entire NestedFrame can be read + entire_nf = read_parquet( + data=os.path.join(tmp_dir.name, "base.parquet"), + to_pack={ + "nested1": os.path.join(tmp_dir.name, "nested1.parquet"), + "nested2": os.path.join(tmp_dir.name, "nested2.parquet"), + }, + ) + + assert_frame_equal(nf, entire_nf)