From 257391b981e197372b16725e434af2e3eb58d5d0 Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Tue, 16 Jan 2024 17:01:02 -0800 Subject: [PATCH 1/3] Add EnsembleFrame.repartition --- src/tape/ensemble_frame.py | 62 +++++++++++++++++++++++++ tests/tape_tests/test_ensemble_frame.py | 7 +++ 2 files changed, 69 insertions(+) diff --git a/src/tape/ensemble_frame.py b/src/tape/ensemble_frame.py index 352285ca..03da4580 100644 --- a/src/tape/ensemble_frame.py +++ b/src/tape/ensemble_frame.py @@ -666,6 +666,68 @@ def compute(self, **kwargs): self.ensemble._lazy_sync_tables_from_frame(self) return super().compute(**kwargs) + def repartition(self, **kwargs): + """Repartition dataframe along new divisions + + Doc string below derived from dask.dataframe.DataFrame + + Parameters + ---------- + divisions : list, optional + The "dividing lines" used to split the dataframe into partitions. + For ``divisions=[0, 10, 50, 100]``, there would be three output partitions, + where the new index contained [0, 10), [10, 50), and [50, 100), respectively. + See https://docs.dask.org/en/latest/dataframe-design.html#partitions. + Only used if npartitions and partition_size isn't specified. + For convenience if given an integer this will defer to npartitions + and if given a string it will defer to partition_size (see below) + npartitions : int, optional + Approximate number of partitions of output. Only used if partition_size + isn't specified. The number of partitions used may be slightly + lower than npartitions depending on data distribution, but will never be + higher. + partition_size: int or string, optional + Max number of bytes of memory for each partition. Use numbers or + strings like 5MB. If specified npartitions and divisions will be + ignored. Note that the size reflects the number of bytes used as + computed by ``pandas.DataFrame.memory_usage``, which will not + necessarily match the size when storing to disk. + + .. warning:: + + This keyword argument triggers computation to determine + the memory size of each partition, which may be expensive. + + freq : str, pd.Timedelta + A period on which to partition timeseries data like ``'7D'`` or + ``'12h'`` or ``pd.Timedelta(hours=12)``. Assumes a datetime index. + force : bool, default False + Allows the expansion of the existing divisions. + If False then the new divisions' lower and upper bounds must be + the same as the old divisions'. + + Notes + ----- + Exactly one of `divisions`, `npartitions`, `partition_size`, or `freq` + should be specified. A ``ValueError`` will be raised when that is + not the case. + + Also note that ``len(divisons)`` is equal to ``npartitions + 1``. This is because ``divisions`` + represents the upper and lower bounds of each partition. The first item is the + lower bound of the first partition, the second item is the lower bound of the + second partition and the upper bound of the first partition, and so on. + The second-to-last item is the lower bound of the last partition, and the last + (extra) item is the upper bound of the last partition. + + Examples + -------- + >>> df = df.repartition(npartitions=10) # doctest: +SKIP + >>> df = df.repartition(divisions=[0, 5, 10, 20]) # doctest: +SKIP + >>> df = df.repartition(freq='7d') # doctest: +SKIP + """ + result = super().repartition(**kwargs) + return self._propagate_metadata(result) + class TapeSeries(pd.Series): """A barebones extension of a Pandas series to be used for underlying Ensemble data. diff --git a/tests/tape_tests/test_ensemble_frame.py b/tests/tape_tests/test_ensemble_frame.py index 962f9b2c..5ed01488 100644 --- a/tests/tape_tests/test_ensemble_frame.py +++ b/tests/tape_tests/test_ensemble_frame.py @@ -143,6 +143,13 @@ def test_ensemble_frame_propagation(data_fixture, request): assert merged_frame.ensemble == ens assert merged_frame.is_dirty() + # Test that frame metadata is preserved after repartitioning + repartitioned_frame = ens_frame.copy().repartition(npartitions=10) + assert isinstance(repartitioned_frame, EnsembleFrame) + assert repartitioned_frame.label == TEST_LABEL + assert repartitioned_frame.ensemble == ens + assert repartitioned_frame.is_dirty() + # Test that head returns a subset of the underlying TapeFrame. h = ens_frame.head(5) assert isinstance(h, TapeFrame) From 61b7734bb6ad5441cf2236f1ea0544c989d5e892 Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Tue, 16 Jan 2024 17:13:00 -0800 Subject: [PATCH 2/3] Repartition source frame with update_ensemble --- src/tape/ensemble.py | 2 +- src/tape/ensemble_frame.py | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 44d963c7..51c812fd 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -336,7 +336,7 @@ def insert_sources( if all(prev_div): self.update_frame(self.source.repartition(divisions=prev_div)) elif self.source.npartitions != prev_num: - self.source = self.source.repartition(npartitions=prev_num) + self.update_frame(self.source.repartition(npartitions=prev_num)) return self diff --git a/src/tape/ensemble_frame.py b/src/tape/ensemble_frame.py index 03da4580..4b67213f 100644 --- a/src/tape/ensemble_frame.py +++ b/src/tape/ensemble_frame.py @@ -666,7 +666,14 @@ def compute(self, **kwargs): self.ensemble._lazy_sync_tables_from_frame(self) return super().compute(**kwargs) - def repartition(self, **kwargs): + def repartition( + self, + divisions=None, + npartitions=None, + partition_size=None, + freq=None, + force=False, + ): """Repartition dataframe along new divisions Doc string below derived from dask.dataframe.DataFrame @@ -725,7 +732,13 @@ def repartition(self, **kwargs): >>> df = df.repartition(divisions=[0, 5, 10, 20]) # doctest: +SKIP >>> df = df.repartition(freq='7d') # doctest: +SKIP """ - result = super().repartition(**kwargs) + result = super().repartition( + divisions=divisions, + npartitions=npartitions, + partition_size=partition_size, + freq=freq, + force=force, + ) return self._propagate_metadata(result) From 2a2c27212d96714708c5610a9e901109e6b6e546 Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Tue, 16 Jan 2024 17:24:10 -0800 Subject: [PATCH 3/3] lint fix --- src/tape/ensemble_frame.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tape/ensemble_frame.py b/src/tape/ensemble_frame.py index 4b67213f..7a24b203 100644 --- a/src/tape/ensemble_frame.py +++ b/src/tape/ensemble_frame.py @@ -675,7 +675,7 @@ def repartition( force=False, ): """Repartition dataframe along new divisions - + Doc string below derived from dask.dataframe.DataFrame Parameters @@ -733,10 +733,10 @@ def repartition( >>> df = df.repartition(freq='7d') # doctest: +SKIP """ result = super().repartition( - divisions=divisions, - npartitions=npartitions, + divisions=divisions, + npartitions=npartitions, partition_size=partition_size, - freq=freq, + freq=freq, force=force, ) return self._propagate_metadata(result)