Skip to content
This repository has been archived by the owner on Jan 14, 2025. It is now read-only.

Commit

Permalink
Have head() traverse all partitions (#419)
Browse files Browse the repository at this point in the history
* Have head() iterate across all partitions

* lint fix

* Test compute=False

* Give warning if npartitions is provided to head
  • Loading branch information
wilsonbb authored Apr 4, 2024
1 parent d0af3ce commit d72d1e7
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 2 deletions.
4 changes: 2 additions & 2 deletions docs/tutorials/common_data_operations.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
"Often, you'll want to peek at your data even though the full-size is too large for memory.\n",
"\n",
"> **_Note:_**\n",
"By default this only looks at the first partition of data, so any operations that remove all data from the first partition will produce an empty head result. Specify `npartitions=-1` to grab from all partitions.\n"
"some partitions may be empty and `head` will have to traverse these empty partitions to find enough rows for your result. An empty table with many partitions (O(100)k) might be costly even for an ultimately empty result. "
]
},
{
Expand All @@ -119,7 +119,7 @@
"metadata": {},
"outputs": [],
"source": [
"ens.source.head(5, npartitions=-1) # grabs the first 5 rows\n",
"ens.source.head(5) # grabs the first 5 rows\n",
"\n",
"# can also use tail to grab the last 5 rows"
]
Expand Down
40 changes: 40 additions & 0 deletions src/tape/ensemble_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,46 @@ def repartition(
)
return self._propagate_metadata(result)

def head(self, n=5, compute=True, npartitions=None):
"""Returns `n` rows of data for previewing purposes.
Parameters
----------
n : int, optional
The number of desired rows. Default is 5.
compute : bool, optional
Whether to compute the result immediately. Default is True.
npartitions : int, optional
`npartitions` is not supported and if provided will be ignored. Instead all partitions may be used.
Returns:
A pandas DataFrame with up to `n` rows of data.
"""
if npartitions is not None:
warnings.warn(
"The 'npartitions' parameter is not supported for TAPE dataframes. All partitions may be used."
)

if not compute:
# Just use the Dask head method
return super().head(n, compute=False)

if n <= 0:
return super().head(0)

# Iterate over the partitions until we have enough rows
dfs = []
remaining_rows = n
for partition in self.partitions:
if remaining_rows == 0:
break
# Note that partition is itself a _Frame object, so we need to compute to avoid infinite recursion
partition_head = partition.compute().head(remaining_rows)
dfs.append(partition_head)
remaining_rows -= len(partition_head)

return pd.concat(dfs)


class EnsembleSeries(_Frame, dd.Series):
"""A barebones extension of a Dask Series for Ensemble data."""
Expand Down
59 changes: 59 additions & 0 deletions tests/tape_tests/test_ensemble_frame.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
""" Test EnsembleFrame (inherited from Dask.DataFrame) creation and manipulations. """

from math import floor
import numpy as np
import pandas as pd
from tape import (
Expand Down Expand Up @@ -470,3 +471,61 @@ def test_partition_slicing(parquet_ensemble_with_divisions):

assert ens.source.npartitions == 2 # should return exactly 2 partitions
assert len(ens.object) < prior_src_len # should affect objects


@pytest.mark.parametrize(
"data_fixture",
[
"parquet_ensemble",
"parquet_ensemble_with_divisions",
],
)
def test_head(data_fixture, request):
"""
Tests that head returns the correct number of rows.
"""
ens = request.getfixturevalue(data_fixture)

# Test witht repartitioning the source frame
frame = ens.source
frame = frame.repartition(npartitions=10)

assert frame.npartitions == 10

# Check that a warning is raised when npartitions are requested.
with pytest.warns(UserWarning):
frame.head(5, npartitions=5)

# Test inputs that should return an empty frame
assert len(frame.head(-100)) == 0
assert len(frame.head(0)) == 0
assert len(frame.head(-1)) == 0

assert len(frame.head(100, compute=False).compute()) == 100

one_res = frame.head(1)
assert len(one_res) == 1
assert isinstance(one_res, TapeFrame)
assert set(one_res.columns) == set(frame.columns)

def_result = frame.head()
assert len(def_result) == 5
assert isinstance(one_res, TapeFrame)
assert set(one_res.columns) == set(frame.columns)

def_result = frame.head(24)
assert len(def_result) == 24
assert isinstance(one_res, TapeFrame)
assert set(one_res.columns) == set(frame.columns)

# Test that we have sane behavior even when the number of rows requested is larger than the number of rows in the frame.
assert len(frame.head(2 * len(frame))) == len(frame)

# Choose a value that will be guaranteed to hit every partition for this data.
# Note that with parquet_ensemble_with_divisions some of the partitions are empty
# testing that as well.
rows = floor(len(frame.compute()) * 0.98)
result = frame.head(rows)
assert len(result) == rows
assert isinstance(result, TapeFrame)
assert set(result.columns) == set(frame.columns)

0 comments on commit d72d1e7

Please sign in to comment.