Skip to content

Commit

Permalink
Merge pull request #94 from lincc-frameworks/chunked_array
Browse files Browse the repository at this point in the history
  • Loading branch information
hombit authored May 29, 2024
2 parents 0ed3746 + d193826 commit f861313
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 51 deletions.
4 changes: 2 additions & 2 deletions src/nested_pandas/series/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def to_lists(self, fields: list[str] | None = None) -> pd.DataFrame:
raise ValueError("Cannot convert a struct with no fields to lists")

list_chunks = defaultdict(list)
for chunk in self._series.array._pa_array.iterchunks():
for chunk in self._series.array._chunked_array.iterchunks():
struct_array = cast(pa.StructArray, chunk)
for field in fields:
list_array = cast(pa.ListArray, struct_array.field(field))
Expand Down Expand Up @@ -94,7 +94,7 @@ def to_flat(self, fields: list[str] | None = None) -> pd.DataFrame:
index = pd.Series(self.get_flat_index(), name=self._series.index.name)

flat_chunks = defaultdict(list)
for chunk in self._series.array._pa_array.iterchunks():
for chunk in self._series.array._chunked_array.iterchunks():
struct_array = cast(pa.StructArray, chunk)
for field in fields:
list_array = cast(pa.ListArray, struct_array.field(field))
Expand Down
99 changes: 52 additions & 47 deletions src/nested_pandas/series/ext_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ def __getitem__(self, item):

if isinstance(item, np.ndarray):
if len(item) == 0:
return type(self)(pa.chunked_array([], type=self._pa_array.type), validate=False)
return type(self)(pa.chunked_array([], type=self._chunked_array.type), validate=False)
pa_item = pa.array(item)
if item.dtype.kind in "iu":
return type(self)(self._pa_array.take(pa_item), validate=False)
return type(self)(self._chunked_array.take(pa_item), validate=False)
if item.dtype.kind == "b":
return type(self)(self._pa_array.filter(pa_item), validate=False)
return type(self)(self._chunked_array.filter(pa_item), validate=False)
# It should be covered by check_array_indexer above
raise IndexError(
"Only integers, slices and integer or " "boolean arrays are valid indices."
Expand All @@ -159,7 +159,7 @@ def __getitem__(self, item):
if item is Ellipsis:
item = slice(None)

scalar_or_array = self._pa_array[item]
scalar_or_array = self._chunked_array[item]
if isinstance(scalar_or_array, pa.StructScalar):
return self._convert_struct_scalar_to_df(scalar_or_array, copy=False)
# Logically, it must be a pa.ChunkedArray if it is not a scalar
Expand Down Expand Up @@ -212,14 +212,14 @@ def __setitem__(self, key, value) -> None:

# We cannot use pa.compute.replace_with_mask(), it is not implemented for struct arrays:
# https://github.com/apache/arrow/issues/29558
# self._pa_array = pa.compute.replace_with_mask(self._pa_array, pa_mask, value)
self._pa_array = replace_with_mask(self._pa_array, pa_mask, value)
# self._chunked_array = pa.compute.replace_with_mask(self._chunked_array, pa_mask, value)
self._chunked_array = replace_with_mask(self._chunked_array, pa_mask, value)

def __len__(self) -> int:
return len(self._pa_array)
return len(self._chunked_array)

def __iter__(self) -> Iterator[pd.DataFrame]:
for value in self._pa_array:
for value in self._chunked_array:
yield self._convert_struct_scalar_to_df(value, copy=False)

# We do not implement it yet, because pa.compute.equal does not work for struct arrays
Expand Down Expand Up @@ -255,7 +255,7 @@ def to_numpy(self, dtype: None = None, copy: bool = False, na_value: Any = no_de
# Hack with np.empty is the only way to force numpy to create 1-d array of objects
result = np.empty(shape=len(self), dtype=object)

for i, value in enumerate(self._pa_array):
for i, value in enumerate(self._chunked_array):
result[i] = self._convert_struct_scalar_to_df(value, copy=copy, na_value=na_value)

return result
Expand All @@ -268,22 +268,22 @@ def dtype(self) -> NestedDtype:
@property
def nbytes(self) -> int:
"""Number of bytes consumed by the data in memory."""
return self._pa_array.nbytes
return self._chunked_array.nbytes

def isna(self) -> np.ndarray:
"""Boolean NumPy array indicating if each value is missing."""
# Fast paths adopted from ArrowExtensionArray
null_count = self._pa_array.null_count
null_count = self._chunked_array.null_count
if null_count == 0:
return np.zeros(len(self), dtype=bool)
if null_count == len(self):
return np.ones(len(self), dtype=bool)

return self._pa_array.is_null().to_numpy()
return self._chunked_array.is_null().to_numpy()

@property
def _hasna(self) -> bool:
return self._pa_array.null_count > 0
return self._chunked_array.null_count > 0

# We do not implement it yet, neither ArrowExtensionArray does for struct arrays
def interpolate(
Expand Down Expand Up @@ -355,9 +355,9 @@ def take(

indices_array = np.asanyarray(indices)

if len(self._pa_array) == 0 and (indices_array >= 0).any():
if len(self._chunked_array) == 0 and (indices_array >= 0).any():
raise IndexError("cannot do a non-empty take from empty array")
if indices_array.size > 0 and indices_array.max() >= len(self._pa_array):
if indices_array.size > 0 and indices_array.max() >= len(self._chunked_array):
raise IndexError("out of bounds value in 'indices'.")

if allow_fill:
Expand All @@ -366,28 +366,28 @@ def take(
fill_mask = indices_array < 0
if not fill_mask.any():
# Nothing to fill
return type(self)(self._pa_array.take(indices))
validate_indices(indices_array, len(self._pa_array))
return type(self)(self._chunked_array.take(indices))
validate_indices(indices_array, len(self._chunked_array))
indices_array = pa.array(indices_array, mask=fill_mask)

result = self._pa_array.take(indices_array)
result = self._chunked_array.take(indices_array)
if not pa.compute.is_null(fill_value).as_py():
result = pa.compute.if_else(fill_mask, fill_value, result)
return type(self)(result)

if (indices_array < 0).any():
# Don't modify in-place
indices_array = np.copy(indices_array)
indices_array[indices_array < 0] += len(self._pa_array)
return type(self)(self._pa_array.take(indices_array))
indices_array[indices_array < 0] += len(self._chunked_array)
return type(self)(self._chunked_array.take(indices_array))

def copy(self) -> Self: # type: ignore[name-defined] # noqa: F821
"""Return a copy of the extension array.
This implementation returns a shallow copy of the extension array,
because the underlying PyArrow array is immutable.
"""
return type(self)(self._pa_array)
return type(self)(self._chunked_array)

def _formatter(self, boxed: bool = False) -> Callable[[Any], str | None]:
# TODO: make formatted strings more pretty
Expand All @@ -405,7 +405,7 @@ def box_formatter(value):

@classmethod
def _concat_same_type(cls, to_concat: Sequence[Self]) -> Self: # type: ignore[name-defined] # noqa: F821
chunks = [chunk for array in to_concat for chunk in array._pa_array.iterchunks()]
chunks = [chunk for array in to_concat for chunk in array._chunked_array.iterchunks()]
pa_array = pa.chunked_array(chunks)
return cls(pa_array)

Expand All @@ -425,14 +425,14 @@ def equals(self, other) -> bool:
"""
if not isinstance(other, type(self)):
return False
return self._pa_array == other._pa_array
return self._chunked_array == other._chunked_array

def dropna(self) -> Self:
"""Return a new ExtensionArray with missing values removed.
Note that this applies to the top-level struct array, not to the list arrays.
"""
return type(self)(pa.compute.drop_null(self._pa_array))
return type(self)(pa.compute.drop_null(self._chunked_array))

# End of ExtensionArray overrides #

Expand All @@ -441,8 +441,8 @@ def dropna(self) -> Self:
def __arrow_array__(self, type=None):
"""Convert the extension array to a PyArrow array."""
if type is None:
return self._pa_array
return self._pa_array.cast(type)
return self._chunked_array
return self._chunked_array.cast(type)

def __array__(self, dtype=None):
"""Convert the extension array to a numpy array."""
Expand All @@ -451,12 +451,12 @@ def __array__(self, dtype=None):
# Adopted from ArrowExtensionArray
def __getstate__(self):
state = self.__dict__.copy()
state["_pa_array"] = self._pa_array.combine_chunks()
state["_chunked_array"] = self._chunked_array.combine_chunks()
return state

# Adopted from ArrowExtensionArray
def __setstate__(self, state):
state["_pa_array"] = pa.chunked_array(state["_pa_array"])
state["_chunked_array"] = pa.chunked_array(state["_chunked_array"])
self.__dict__.update(state)

# End of Additional magic methods #
Expand All @@ -478,7 +478,7 @@ def _box_pa_scalar(cls, value, *, pa_type: pa.DataType | None) -> pa.Scalar:
def _box_pa_array(cls, value, *, pa_type: pa.DataType | None) -> pa.Array | pa.ChunkedArray:
"""Convert a value to a PyArrow array with the specified type."""
if isinstance(value, cls):
pa_array = value._pa_array
pa_array = value._chunked_array
elif isinstance(value, (pa.Array, pa.ChunkedArray)):
pa_array = value
else:
Expand Down Expand Up @@ -520,7 +520,7 @@ def _convert_struct_scalar_to_df(cls, value: pa.StructScalar, *, copy: bool, na_
d = {name: pd.Series(list_scalar.values, copy=copy) for name, list_scalar in value.items()}
return pd.DataFrame(d, copy=False)

_pa_array: pa.ChunkedArray
_chunked_array: pa.ChunkedArray
_dtype: NestedDtype

def __init__(self, values: pa.Array | pa.ChunkedArray, *, validate: bool = True) -> None:
Expand All @@ -530,7 +530,7 @@ def __init__(self, values: pa.Array | pa.ChunkedArray, *, validate: bool = True)
if validate:
self._validate(values)

self._pa_array = values
self._chunked_array = values
self._dtype = NestedDtype(values.type)

@classmethod
Expand Down Expand Up @@ -558,6 +558,11 @@ def _pyarrow_dtype(self) -> pa.DataType:
"""PyArrow data type of the extension array"""
return self._dtype.pyarrow_dtype

@property
def chunked_array(self) -> pa.ChunkedArray:
"""The underlying PyArrow ChunkedArray"""
return self._chunked_array

@staticmethod
def _validate(array: pa.ChunkedArray) -> None:
"""Raises ValueError if the input array is not a struct array with all fields being
Expand Down Expand Up @@ -589,12 +594,12 @@ def from_arrow_ext_array(cls, array: ArrowExtensionArray) -> Self: # type: igno

def to_arrow_ext_array(self) -> ArrowExtensionArray:
"""Convert the extension array to pandas' ArrowExtensionArray"""
return ArrowExtensionArray(self._pa_array)
return ArrowExtensionArray(self._chunked_array)

def _replace_pa_array(self, pa_array: pa.ChunkedArray, *, validate: bool) -> None:
def _replace_chunked_array(self, pa_array: pa.ChunkedArray, *, validate: bool) -> None:
if validate:
self._validate(pa_array)
self._pa_array = pa_array
self._chunked_array = pa_array
self._dtype = NestedDtype(pa_array.chunk(0).type)

@property
Expand All @@ -610,16 +615,16 @@ def list_offsets(self) -> pa.Array:
The list offsets of the field arrays.
"""
# Quick and cheap path for a single chunk
if self._pa_array.num_chunks == 1:
struct_array = cast(pa.StructArray, self._pa_array.chunk(0))
if self._chunked_array.num_chunks == 1:
struct_array = cast(pa.StructArray, self._chunked_array.chunk(0))
return cast(pa.ListArray, struct_array.field(0)).offsets

chunks = []
# The offset of the current chunk in the flat array.
# It is 0 for the first chunk, and the last offset of the previous chunk for the next chunks,
# as a pa.Scalar.
chunk_offset: pa.Scalar | int = 0
for chunk in self._pa_array.iterchunks():
for chunk in self._chunked_array.iterchunks():
list_array = cast(pa.ListArray, chunk.field(0))
if chunk_offset == 0:
offsets = list_array.offsets
Expand All @@ -632,17 +637,17 @@ def list_offsets(self) -> pa.Array:
@property
def field_names(self) -> list[str]:
"""Names of the nested columns"""
return [field.name for field in self._pa_array.chunk(0).type]
return [field.name for field in self._chunked_array.chunk(0).type]

@property
def flat_length(self) -> int:
"""Length of the flat arrays"""
return sum(chunk.field(0).value_lengths().sum().as_py() for chunk in self._pa_array.iterchunks())
return sum(chunk.field(0).value_lengths().sum().as_py() for chunk in self._chunked_array.iterchunks())

@property
def num_chunks(self) -> int:
"""Number of chunks in underlying pyarrow.ChunkedArray"""
return self._pa_array.num_chunks
return self._chunked_array.num_chunks

def view_fields(self, fields: str | list[str]) -> Self: # type: ignore[name-defined] # noqa: F821
"""Get a view of the series with only the specified fields
Expand All @@ -665,7 +670,7 @@ def view_fields(self, fields: str | list[str]) -> Self: # type: ignore[name-def
raise ValueError(f"Some fields are not found, given: {fields}, available: {self.field_names}")

chunks = []
for chunk in self._pa_array.iterchunks():
for chunk in self._chunked_array.iterchunks():
chunk = cast(pa.StructArray, chunk)
struct_dict = {}
for field in fields:
Expand Down Expand Up @@ -703,7 +708,7 @@ def set_flat_field(self, field: str, value: ArrayLike, *, keep_dtype: bool = Fal
f"Got: {field}, available: {self.field_names}"
)
# Get the current element type of list-array
pa_type = self._pa_array.chunk(0).field(field).type.value_type
pa_type = self._chunked_array.chunk(0).field(field).type.value_type
else:
pa_type = None

Expand Down Expand Up @@ -752,7 +757,7 @@ def set_list_field(self, field: str, value: ArrayLike, *, keep_dtype: bool = Fal
"If keep_dtype is True, the field must exist in the series. "
f"Got: {field}, available: {self.field_names}"
)
pa_type = self._pa_array.chunk(0).field(field).type
pa_type = self._chunked_array.chunk(0).field(field).type
else:
pa_type = None

Expand All @@ -772,7 +777,7 @@ def set_list_field(self, field: str, value: ArrayLike, *, keep_dtype: bool = Fal
raise ValueError("The length of the list-array must be equal to the length of the series")

chunks = []
for sl, chunk in enumerate_chunks(self._pa_array):
for sl, chunk in enumerate_chunks(self._chunked_array):
chunk = cast(pa.StructArray, chunk)

# Build a new struct array. We collect all existing fields and add/replace the new one.
Expand All @@ -785,7 +790,7 @@ def set_list_field(self, field: str, value: ArrayLike, *, keep_dtype: bool = Fal
chunks.append(struct_array)
chunked_array = pa.chunked_array(chunks)

self._replace_pa_array(chunked_array, validate=True)
self._replace_chunked_array(chunked_array, validate=True)

def pop_fields(self, fields: Iterable[str]):
"""Delete fields from the struct array
Expand All @@ -806,7 +811,7 @@ def pop_fields(self, fields: Iterable[str]):
raise ValueError("Cannot delete all fields")

chunks = []
for chunk in self._pa_array.iterchunks():
for chunk in self._chunked_array.iterchunks():
chunk = cast(pa.StructArray, chunk)
struct_dict = {}
for pa_field in chunk.type:
Expand All @@ -816,4 +821,4 @@ def pop_fields(self, fields: Iterable[str]):
chunks.append(struct_array)
pa_array = pa.chunked_array(chunks)

self._replace_pa_array(pa_array, validate=False)
self._replace_chunked_array(pa_array, validate=False)
19 changes: 17 additions & 2 deletions tests/nested_pandas/series/test_ext_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def test_from_sequence_with_arrow_array_and_dtype():
type=pa_type,
)

actual = NestedExtensionArray.from_sequence(pa_array, dtype=new_pa_type)._pa_array
actual = NestedExtensionArray.from_sequence(pa_array, dtype=new_pa_type).chunked_array
desired = pa.chunked_array([pa_array.cast(new_pa_type)])
# pyarrow doesn't convert pandas boxed missing values to nulls in nested arrays
assert actual == desired
Expand Down Expand Up @@ -525,7 +525,7 @@ def test___setitem___series_of_dfs():
)
desired = NestedExtensionArray(desired_struct_array)

assert ext_array._pa_array == desired._pa_array
assert ext_array.chunked_array == desired.chunked_array
assert ext_array.equals(desired)


Expand Down Expand Up @@ -588,6 +588,21 @@ def test_series_built_raises(data):
_array = NestedExtensionArray(pa_array)


def test_chunked_array():
"""Test that the .chunked_array property is correct."""
struct_array = pa.StructArray.from_arrays(
arrays=[
pa.array([np.array([1, 2, 3]), np.array([1, 2, 1])]),
pa.array([-np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])]),
],
names=["a", "b"],
)
ext_array = NestedExtensionArray(struct_array)

# pyarrow returns a single bool for ==
assert ext_array.chunked_array == pa.chunked_array(struct_array)


def test_list_offsets_single_chunk():
"""Test that the .list_offset property is correct for a single chunk."""
struct_array = pa.StructArray.from_arrays(
Expand Down

0 comments on commit f861313

Please sign in to comment.