diff --git a/docs/cudf/source/developer_guide/library_design.md b/docs/cudf/source/developer_guide/library_design.md index 016c2c1d281..0b37de00f6b 100644 --- a/docs/cudf/source/developer_guide/library_design.md +++ b/docs/cudf/source/developer_guide/library_design.md @@ -325,26 +325,26 @@ This section describes the internal implementation details of the copy-on-write It is recommended that developers familiarize themselves with [the user-facing documentation](copy-on-write-user-doc) of this functionality before reading through the internals below. -The core copy-on-write implementation relies on the factory function `as_exposure_tracked_buffer` and the two classes `ExposureTrackedBuffer` and `BufferSlice`. +The core copy-on-write implementation relies on `ExposureTrackedBuffer` and the tracking features of `BufferOwner`. -An `ExposureTrackedBuffer` is a subclass of the regular `Buffer` that tracks internal and external references to its underlying memory. Internal references are tracked by maintaining [weak references](https://docs.python.org/3/library/weakref.html) to every `BufferSlice` of the underlying memory. External references are tracked through "exposure" status of the underlying memory. A buffer is considered exposed if the device pointer (integer or void*) has been handed out to a library outside of cudf. In this case, we have no way of knowing if the data are being modified by a third party. +`BufferOwner` tracks internal and external references to its underlying memory. Internal references are tracked by maintaining [weak references](https://docs.python.org/3/library/weakref.html) to every `ExposureTrackedBuffer` of the underlying memory. External references are tracked through "exposure" status of the underlying memory. A buffer is considered exposed if the device pointer (integer or void*) has been handed out to a library outside of cudf. In this case, we have no way of knowing if the data are being modified by a third party. -`BufferSlice` is a subclass of `ExposureTrackedBuffer` that represents a _slice_ of the memory underlying a exposure tracked buffer. +`ExposureTrackedBuffer` is a subclass of `Buffer` that represents a _slice_ of the memory underlying an exposure tracked buffer. -When the cudf option `"copy_on_write"` is `True`, `as_buffer` calls `as_exposure_tracked_buffer`, which always returns a `BufferSlice`. It is then the slices that determine whether or not to make a copy when a write operation is performed on a `Column` (see below). If multiple slices point to the same underlying memory, then a copy must be made whenever a modification is attempted. +When the cudf option `"copy_on_write"` is `True`, `as_buffer` returns a `ExposureTrackedBuffer`. It is this class that determines whether or not to make a copy when a write operation is performed on a `Column` (see below). If multiple slices point to the same underlying memory, then a copy must be made whenever a modification is attempted. ### Eager copies when exposing to third-party libraries -If a `Column`/`BufferSlice` is exposed to a third-party library via `__cuda_array_interface__`, we are no longer able to track whether or not modification of the buffer has occurred. Hence whenever +If a `Column`/`ExposureTrackedBuffer` is exposed to a third-party library via `__cuda_array_interface__`, we are no longer able to track whether or not modification of the buffer has occurred. Hence whenever someone accesses data through the `__cuda_array_interface__`, we eagerly trigger the copy by calling -`.make_single_owner_inplace` which ensures a true copy of underlying data is made and that the slice is the sole owner. Any future copy requests must also trigger a true physical copy (since we cannot track the lifetime of the third-party object). To handle this we also mark the `Column`/`BufferSlice` as exposed thus indicating that any future shallow-copy requests will trigger a true physical copy rather than a copy-on-write shallow copy. +`.make_single_owner_inplace` which ensures a true copy of underlying data is made and that the slice is the sole owner. Any future copy requests must also trigger a true physical copy (since we cannot track the lifetime of the third-party object). To handle this we also mark the `Column`/`ExposureTrackedBuffer` as exposed thus indicating that any future shallow-copy requests will trigger a true physical copy rather than a copy-on-write shallow copy. ### Obtaining a read-only object A read-only object can be quite useful for operations that will not mutate the data. This can be achieved by calling `.get_ptr(mode="read")`, and using `cuda_array_interface_wrapper` to wrap a `__cuda_array_interface__` object around it. -This will not trigger a deep copy even if multiple `BufferSlice` points to the same `ExposureTrackedBuffer`. This API should only be used when the lifetime of the proxy object is restricted to cudf's internal code execution. Handing this out to external libraries or user-facing APIs will lead to untracked references and undefined copy-on-write behavior. We currently use this API for device to host +This will not trigger a deep copy even if multiple `ExposureTrackedBuffer`s point to the same `ExposureTrackedBufferOwner`. This API should only be used when the lifetime of the proxy object is restricted to cudf's internal code execution. Handing this out to external libraries or user-facing APIs will lead to untracked references and undefined copy-on-write behavior. We currently use this API for device to host copies like in `ColumnBase.data_array_view(mode="read")` which is used for `Column.values_host`. diff --git a/python/cudf/cudf/core/abc.py b/python/cudf/cudf/core/abc.py index adf9fe39e4f..ce6bb83bc77 100644 --- a/python/cudf/cudf/core/abc.py +++ b/python/cudf/cudf/core/abc.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. """Common abstract base classes for cudf.""" import pickle @@ -89,7 +89,13 @@ def device_serialize(self): """ header, frames = self.serialize() assert all( - isinstance(f, (cudf.core.buffer.Buffer, memoryview)) + isinstance( + f, + ( + cudf.core.buffer.Buffer, + memoryview, + ), + ) for f in frames ) header["type-serialized"] = pickle.dumps(type(self)) diff --git a/python/cudf/cudf/core/buffer/__init__.py b/python/cudf/cudf/core/buffer/__init__.py index d8883bd97e5..9b9774c12be 100644 --- a/python/cudf/cudf/core/buffer/__init__.py +++ b/python/cudf/cudf/core/buffer/__init__.py @@ -1,6 +1,10 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. -from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper +from cudf.core.buffer.buffer import ( + Buffer, + BufferOwner, + cuda_array_interface_wrapper, +) from cudf.core.buffer.exposure_tracked_buffer import ExposureTrackedBuffer from cudf.core.buffer.spillable_buffer import SpillableBuffer, SpillLock from cudf.core.buffer.utils import ( diff --git a/python/cudf/cudf/core/buffer/buffer.py b/python/cudf/cudf/core/buffer/buffer.py index 59d20a2784d..8d278c9c065 100644 --- a/python/cudf/cudf/core/buffer/buffer.py +++ b/python/cudf/cudf/core/buffer/buffer.py @@ -1,9 +1,10 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. from __future__ import annotations import math import pickle +import weakref from types import SimpleNamespace from typing import Any, Dict, Literal, Mapping, Optional, Sequence, Tuple @@ -90,15 +91,31 @@ def cuda_array_interface_wrapper( ) -class Buffer(Serializable): - """A Buffer represents device memory. +class BufferOwner(Serializable): + """An owning buffer that represents device memory. - Use the factory function `as_buffer` to create a Buffer instance. + This class isn't meant to be used throughout cuDF. Instead, it + standardizes data owning by wrapping any data object that + represents device memory. Multiple `Buffer` instances, which are + the ones used throughout cuDF, can then refer to the same + `BufferOwner` instance. + + In order to implement copy-on-write and spillable buffers, we need the + ability to detect external access to the underlying memory. We say that + the buffer has been exposed if the device pointer (integer or void*) has + been accessed outside of BufferOwner. In this case, we have no control + over knowing if the data is being modified by a third party. + + Use `_from_device_memory` and `_from_host_memory` to create + a new instance from either device or host memory respectively. """ _ptr: int _size: int _owner: object + _exposed: bool + # The set of buffers that point to this owner. + _slices: weakref.WeakSet[Buffer] def __init__(self): raise ValueError( @@ -107,8 +124,8 @@ def __init__(self): ) @classmethod - def _from_device_memory(cls, data: Any) -> Self: - """Create a Buffer from an object exposing `__cuda_array_interface__`. + def _from_device_memory(cls, data: Any, exposed: bool) -> Self: + """Create from an object providing a `__cuda_array_interface__`. No data is being copied. @@ -116,16 +133,29 @@ def _from_device_memory(cls, data: Any) -> Self: ---------- data : device-buffer-like An object implementing the CUDA Array Interface. + exposed : bool + Mark the buffer as permanently exposed. This is used by + ExposureTrackedBuffer to determine when a deep copy is required + and by SpillableBuffer to mark the buffer unspillable. Returns ------- - Buffer - Buffer representing the same device memory as `data` + BufferOwner + BufferOwner wrapping `data` + + Raises + ------ + AttributeError + If data does not support the cuda array interface + ValueError + If the resulting buffer has negative size """ # Bypass `__init__` and initialize attributes manually ret = cls.__new__(cls) ret._owner = data + ret._exposed = exposed + ret._slices = weakref.WeakSet() if isinstance(data, rmm.DeviceBuffer): # Common case shortcut ret._ptr = data.ptr ret._size = data.size @@ -139,7 +169,7 @@ def _from_device_memory(cls, data: Any) -> Self: @classmethod def _from_host_memory(cls, data: Any) -> Self: - """Create a Buffer from a buffer or array like object + """Create an owner from a buffer or array like object Data must implement `__array_interface__`, the buffer protocol, and/or be convertible to a buffer object using `numpy.array()` @@ -155,8 +185,8 @@ def _from_host_memory(cls, data: Any) -> Self: Returns ------- - Buffer - Buffer representing a copy of `data`. + BufferOwner + BufferOwner wrapping a device copy of `data`. """ # Convert to numpy array, this will not copy data in most cases. @@ -166,54 +196,7 @@ def _from_host_memory(cls, data: Any) -> Self: # Copy to device memory buf = rmm.DeviceBuffer(ptr=ptr, size=size) # Create from device memory - return cls._from_device_memory(buf) - - def _getitem(self, offset: int, size: int) -> Self: - """ - Sub-classes can overwrite this to implement __getitem__ - without having to handle non-slice inputs. - """ - return self._from_device_memory( - cuda_array_interface_wrapper( - ptr=self.get_ptr(mode="read") + offset, - size=size, - owner=self.owner, - ) - ) - - def __getitem__(self, key: slice) -> Self: - """Create a new slice of the buffer.""" - if not isinstance(key, slice): - raise TypeError( - "Argument 'key' has incorrect type " - f"(expected slice, got {key.__class__.__name__})" - ) - start, stop, step = key.indices(self.size) - if step != 1: - raise ValueError("slice must be C-contiguous") - return self._getitem(offset=start, size=stop - start) - - def copy(self, deep: bool = True) -> Self: - """ - Return a copy of Buffer. - - Parameters - ---------- - deep : bool, default True - If True, returns a deep copy of the underlying Buffer data. - If False, returns a shallow copy of the Buffer pointing to - the same underlying data. - - Returns - ------- - Buffer - """ - if deep: - return self._from_device_memory( - rmm.DeviceBuffer(ptr=self.get_ptr(mode="read"), size=self.size) - ) - else: - return self[:] + return cls._from_device_memory(buf, exposed=False) @property def size(self) -> int: @@ -226,20 +209,29 @@ def nbytes(self) -> int: return self._size @property - def owner(self) -> Any: + def owner(self) -> object: """Object owning the memory of the buffer.""" return self._owner @property - def __cuda_array_interface__(self) -> Mapping: - """Implementation of the CUDA Array Interface.""" - return { - "data": (self.get_ptr(mode="write"), False), - "shape": (self.size,), - "strides": None, - "typestr": "|u1", - "version": 0, - } + def exposed(self) -> bool: + """The current exposure status of the buffer + + This is used by ExposureTrackedBuffer to determine when a deep copy + is required and by SpillableBuffer to mark the buffer unspillable. + """ + return self._exposed + + def mark_exposed(self) -> None: + """Mark the buffer as "exposed" permanently + + This is used by ExposureTrackedBuffer to determine when a deep copy + is required and by SpillableBuffer to mark the buffer unspillable. + + Notice, once the exposure status becomes True, it will never change + back. + """ + self._exposed = True def get_ptr(self, *, mode: Literal["read", "write"]) -> int: """Device pointer to the start of the buffer. @@ -277,20 +269,148 @@ def memoryview( ) return memoryview(host_buf).toreadonly() + def __str__(self) -> str: + return ( + f"<{self.__class__.__name__} size={format_bytes(self._size)} " + f"ptr={hex(self._ptr)} owner={self._owner!r}>" + ) + + +class Buffer(Serializable): + """A buffer that represents a slice or view of a `BufferOwner`. + + Use the factory function `as_buffer` to create a Buffer instance. + + Note + ---- + This buffer is untyped, so all indexing and sizes are in bytes. + + Parameters + ---------- + owner + The owning exposure buffer this refers to. + offset + The offset relative to the start memory of owner (in bytes). + size + The size of the buffer (in bytes). If None, use the size of owner. + """ + + def __init__( + self, + *, + owner: BufferOwner, + offset: int = 0, + size: Optional[int] = None, + ) -> None: + size = owner.size if size is None else size + if size < 0: + raise ValueError("size cannot be negative") + if offset < 0: + raise ValueError("offset cannot be negative") + if offset + size > owner.size: + raise ValueError( + "offset+size cannot be greater than the size of owner" + ) + self._owner = owner + self._offset = offset + self._size = size + + @property + def size(self) -> int: + """Size of the buffer in bytes.""" + return self._size + + @property + def nbytes(self) -> int: + """Size of the buffer in bytes.""" + return self._size + + @property + def owner(self) -> BufferOwner: + """Object owning the memory of the buffer.""" + return self._owner + + def __getitem__(self, key: slice) -> Self: + """Create a new slice of the buffer.""" + if not isinstance(key, slice): + raise TypeError( + "Argument 'key' has incorrect type " + f"(expected slice, got {key.__class__.__name__})" + ) + start, stop, step = key.indices(self.size) + if step != 1: + raise ValueError("slice must be C-contiguous") + return self.__class__( + owner=self._owner, offset=self._offset + start, size=stop - start + ) + + def get_ptr(self, *, mode: Literal["read", "write"]) -> int: + return self._owner.get_ptr(mode=mode) + self._offset + + def memoryview(self) -> memoryview: + return self._owner.memoryview(offset=self._offset, size=self._size) + + def copy(self, deep: bool = True) -> Self: + """Return a copy of Buffer. + + Parameters + ---------- + deep : bool, default True + - If deep=True, returns a deep copy of the underlying data. + - If deep=False, returns a new `Buffer` instance that refers + to the same `BufferOwner` as this one. Thus, no device + data are being copied. + + Returns + ------- + Buffer + A new buffer that either refers to either a new or an existing + `BufferOwner` depending on the `deep` argument (see above). + """ + + # When doing a shallow copy, we just return a new slice + if not deep: + return self.__class__( + owner=self._owner, offset=self._offset, size=self._size + ) + + # Otherwise, we create a new copy of the memory + owner = self._owner._from_device_memory( + rmm.DeviceBuffer( + ptr=self._owner.get_ptr(mode="read") + self._offset, + size=self.size, + ), + exposed=False, + ) + return self.__class__(owner=owner, offset=0, size=owner.size) + + @property + def __cuda_array_interface__(self) -> Mapping: + """Implementation of the CUDA Array Interface.""" + return { + "data": (self.get_ptr(mode="write"), False), + "shape": (self.size,), + "strides": None, + "typestr": "|u1", + "version": 0, + } + def serialize(self) -> Tuple[dict, list]: """Serialize the buffer into header and frames. - The frames can be a mixture of memoryview and Buffer objects. + The frames can be a mixture of memoryview, Buffer, and BufferOwner + objects. Returns ------- Tuple[dict, List] The first element of the returned tuple is a dict containing any serializable metadata required to reconstruct the object. The - second element is a list containing Buffers and memoryviews. + second element is a list containing single frame. """ header: Dict[str, Any] = {} header["type-serialized"] = pickle.dumps(type(self)) + header["owner-type-serialized"] = pickle.dumps(type(self._owner)) header["frame_count"] = 1 frames = [self] return header, frames @@ -317,16 +437,27 @@ def deserialize(cls, header: dict, frames: list) -> Self: if isinstance(frame, cls): return frame # The frame is already deserialized + owner_type: BufferOwner = pickle.loads(header["owner-type-serialized"]) if hasattr(frame, "__cuda_array_interface__"): - return cls._from_device_memory(frame) - return cls._from_host_memory(frame) + owner = owner_type._from_device_memory(frame, exposed=False) + else: + owner = owner_type._from_host_memory(frame) + return cls( + owner=owner, + offset=0, + size=owner.size, + ) def __repr__(self) -> str: - klass = self.__class__ - name = f"{klass.__module__}.{klass.__qualname__}" return ( - f"<{name} size={format_bytes(self._size)} " - f"ptr={hex(self._ptr)} owner={repr(self._owner)}>" + f"{self.__class__.__name__}(owner={self._owner!r}, " + f"offset={self._offset!r}, size={self._size!r})" + ) + + def __str__(self) -> str: + return ( + f"<{self.__class__.__name__} size={format_bytes(self._size)} " + f"offset={format_bytes(self._offset)} of {self._owner}>" ) diff --git a/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py b/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py index f2ac6301944..4c08016adbb 100644 --- a/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py +++ b/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py @@ -1,241 +1,47 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. from __future__ import annotations -import weakref -from typing import ( - Any, - Container, - Literal, - Mapping, - Optional, - Type, - TypeVar, - cast, -) +from typing import Literal, Mapping, Optional from typing_extensions import Self import cudf -from cudf.core.buffer.buffer import Buffer, get_ptr_and_size -from cudf.utils.string import format_bytes - -T = TypeVar("T", bound="ExposureTrackedBuffer") - - -def get_owner(data, klass: Type[T]) -> Optional[T]: - """Get the owner of `data`, if any exist - - Search through the stack of data owners in order to find an - owner of type `klass` (not subclasses). - - Parameters - ---------- - data - The data object - - Return - ------ - klass or None - The owner of `data` if `klass` or None. - """ - - if type(data) is klass: - return data - if hasattr(data, "owner"): - return get_owner(data.owner, klass) - return None - - -def as_exposure_tracked_buffer( - data, exposed: bool, subclass: Optional[Type[T]] = None -) -> BufferSlice: - """Factory function to wrap `data` in a slice of an exposure tracked buffer - - If `subclass` is None, a new ExposureTrackedBuffer that points to the - memory of `data` is created and a BufferSlice that points to all of the - new ExposureTrackedBuffer is returned. - - If `subclass` is not None, a new `subclass` is created instead. Still, - a BufferSlice that points to all of the new `subclass` is returned - - It is illegal for an exposure tracked buffer to own another exposure - tracked buffer. When representing the same memory, we should have a single - exposure tracked buffer and multiple buffer slices. - - Developer Notes - --------------- - This function always returns slices thus all buffers in cudf will use - `BufferSlice` when copy-on-write is enabled. The slices implement - copy-on-write by trigging deep copies when write access is detected - and multiple slices points to the same exposure tracked buffer. - - Parameters - ---------- - data : buffer-like or array-like - A buffer-like or array-like object that represents C-contiguous memory. - exposed - Mark the buffer as permanently exposed. - subclass - If not None, a subclass of ExposureTrackedBuffer to wrap `data`. - - Return - ------ - BufferSlice - A buffer slice that points to a ExposureTrackedBuffer (or `subclass`), - which in turn wraps `data`. - """ - - if not hasattr(data, "__cuda_array_interface__"): - if exposed: - raise ValueError("cannot created exposed host memory") - return cast( - BufferSlice, ExposureTrackedBuffer._from_host_memory(data)[:] - ) - - owner = get_owner(data, subclass or ExposureTrackedBuffer) - if owner is None: - return cast( - BufferSlice, - ExposureTrackedBuffer._from_device_memory(data, exposed=exposed)[ - : - ], - ) - - # At this point, we know that `data` is owned by a exposure tracked buffer - ptr, size = get_ptr_and_size(data.__cuda_array_interface__) - if size > 0 and owner._ptr == 0: - raise ValueError("Cannot create a non-empty slice of a null buffer") - return BufferSlice(base=owner, offset=ptr - owner._ptr, size=size) +from cudf.core.buffer.buffer import Buffer, BufferOwner class ExposureTrackedBuffer(Buffer): - """A Buffer that tracks its "expose" status. - - In order to implement copy-on-write and spillable buffers, we need the - ability to detect external access to the underlying memory. We say that - the buffer has been exposed if the device pointer (integer or void*) has - been accessed outside of ExposureTrackedBuffer. In this case, we have no - control over knowing if the data is being modified by a third-party. - - Attributes - ---------- - _exposed - The current exposure status of the buffer. Notice, once the exposure - status becomes True, it should never change back. - _slices - The set of BufferSlice instances that point to this buffer. - """ - - _exposed: bool - _slices: weakref.WeakSet[BufferSlice] - - @property - def exposed(self) -> bool: - return self._exposed - - def mark_exposed(self) -> None: - """Mark the buffer as "exposed" permanently""" - self._exposed = True - - @classmethod - def _from_device_memory(cls, data: Any, *, exposed: bool = False) -> Self: - """Create an exposure tracked buffer from device memory. - - No data is being copied. - - Parameters - ---------- - data : device-buffer-like - An object implementing the CUDA Array Interface. - exposed : bool, optional - Mark the buffer as permanently exposed. - - Returns - ------- - ExposureTrackedBuffer - Buffer representing the same device memory as `data` - """ - ret = super()._from_device_memory(data) - ret._exposed = exposed - ret._slices = weakref.WeakSet() - return ret - - def _getitem(self, offset: int, size: int) -> BufferSlice: - return BufferSlice(base=self, offset=offset, size=size) - - @property - def __cuda_array_interface__(self) -> Mapping: - self.mark_exposed() - return super().__cuda_array_interface__ - - def __repr__(self) -> str: - return ( - f"" - ) - - -class BufferSlice(ExposureTrackedBuffer): - """A slice (aka. a view) of a exposure tracked buffer. + """An exposure tracked buffer. Parameters ---------- - base - The exposure tracked buffer this slice refers to. + owner + The owning exposure tracked buffer this refers to. offset - The offset relative to the start memory of base (in bytes). + The offset relative to the start memory of owner (in bytes). size The size of the slice (in bytes) - passthrough_attributes - Name of attributes that are passed through to the base as-is. """ + _owner: BufferOwner + def __init__( self, - base: ExposureTrackedBuffer, - offset: int, - size: int, - *, - passthrough_attributes: Container[str] = ("exposed",), + owner: BufferOwner, + offset: int = 0, + size: Optional[int] = None, ) -> None: - if size < 0: - raise ValueError("size cannot be negative") - if offset < 0: - raise ValueError("offset cannot be negative") - if offset + size > base.size: - raise ValueError( - "offset+size cannot be greater than the size of base" - ) - self._base = base - self._offset = offset - self._size = size - self._owner = base - self._passthrough_attributes = passthrough_attributes - base._slices.add(self) - - def __getattr__(self, name): - if name in self._passthrough_attributes: - return getattr(self._base, name) - raise AttributeError( - f"{self.__class__.__name__} object has no attribute {name}" - ) + super().__init__(owner=owner, offset=offset, size=size) + self._owner._slices.add(self) - def _getitem(self, offset: int, size: int) -> BufferSlice: - return BufferSlice( - base=self._base, offset=offset + self._offset, size=size - ) + @property + def exposed(self) -> bool: + return self._owner.exposed def get_ptr(self, *, mode: Literal["read", "write"]) -> int: if mode == "write" and cudf.get_option("copy_on_write"): self.make_single_owner_inplace() - return self._base.get_ptr(mode=mode) + self._offset - - def memoryview( - self, *, offset: int = 0, size: Optional[int] = None - ) -> memoryview: - return self._base.memoryview(offset=self._offset + offset, size=size) + return super().get_ptr(mode=mode) def copy(self, deep: bool = True) -> Self: """Return a copy of Buffer. @@ -260,16 +66,14 @@ def copy(self, deep: bool = True) -> Self: Returns ------- - BufferSlice - A slice pointing to either a new or the existing base buffer - depending on the expose status of the base buffer and the + ExposureTrackedBuffer + A slice pointing to either a new or the existing owner + depending on the expose status of the owner and the copy-on-write option (see above). """ if cudf.get_option("copy_on_write"): - base_copy = self._base.copy(deep=deep or self.exposed) - else: - base_copy = self._base.copy(deep=deep) - return cast(Self, base_copy[self._offset : self._offset + self._size]) + return super().copy(deep=deep or self.exposed) + return super().copy(deep=deep) @property def __cuda_array_interface__(self) -> Mapping: @@ -278,7 +82,7 @@ def __cuda_array_interface__(self) -> Mapping: return super().__cuda_array_interface__ def make_single_owner_inplace(self) -> None: - """Make sure this slice is the only one pointing to the base. + """Make sure this slice is the only one pointing to the owner. This is used by copy-on-write to trigger a deep copy when write access is detected. @@ -294,18 +98,11 @@ def make_single_owner_inplace(self) -> None: Buffer representing the same device memory as `data` """ - if len(self._base._slices) > 1: - # If this is not the only slice pointing to `self._base`, we - # point to a new deep copy of the base. + if len(self._owner._slices) > 1: + # If this is not the only slice pointing to `self._owner`, we + # point to a new deep copy of the owner. t = self.copy(deep=True) - self._base = t._base + self._owner = t._owner self._offset = t._offset self._size = t._size - self._owner = t._base - self._base._slices.add(self) - - def __repr__(self) -> str: - return ( - f"" - ) + self._owner._slices.add(self) diff --git a/python/cudf/cudf/core/buffer/spill_manager.py b/python/cudf/cudf/core/buffer/spill_manager.py index 91f3b2cd544..3e654e01401 100644 --- a/python/cudf/cudf/core/buffer/spill_manager.py +++ b/python/cudf/cudf/core/buffer/spill_manager.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. from __future__ import annotations @@ -16,7 +16,7 @@ import rmm.mr -from cudf.core.buffer.spillable_buffer import SpillableBuffer +from cudf.core.buffer.spillable_buffer import SpillableBufferOwner from cudf.options import get_option from cudf.utils.nvtx_annotation import _cudf_nvtx_annotate from cudf.utils.string import format_bytes @@ -128,7 +128,7 @@ def log_spill(self, src: str, dst: str, nbytes: int, time: float) -> None: total_time + time, ) - def log_expose(self, buf: SpillableBuffer) -> None: + def log_expose(self, buf: SpillableBufferOwner) -> None: """Log an expose event We track logged exposes by grouping them by their traceback such @@ -224,7 +224,7 @@ class SpillManager: SpillStatistics for the different levels. """ - _buffers: weakref.WeakValueDictionary[int, SpillableBuffer] + _buffers: weakref.WeakValueDictionary[int, SpillableBufferOwner] statistics: SpillStatistics def __init__( @@ -298,14 +298,14 @@ def _out_of_memory_handle(self, nbytes: int, *, retry_once=True) -> bool: ) return False # Since we didn't find anything to spill, we give up - def add(self, buffer: SpillableBuffer) -> None: + def add(self, buffer: SpillableBufferOwner) -> None: """Add buffer to the set of managed buffers The manager keeps a weak reference to the buffer Parameters ---------- - buffer : SpillableBuffer + buffer : SpillableBufferOwner The buffer to manage """ if buffer.size > 0 and not buffer.exposed: @@ -316,7 +316,7 @@ def add(self, buffer: SpillableBuffer) -> None: def buffers( self, order_by_access_time: bool = False - ) -> Tuple[SpillableBuffer, ...]: + ) -> Tuple[SpillableBufferOwner, ...]: """Get all managed buffers Parameters diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index 1856bec1876..aeac4b76e58 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. from __future__ import annotations @@ -16,8 +16,8 @@ from cudf.core.buffer.buffer import ( Buffer, + BufferOwner, cuda_array_interface_wrapper, - get_ptr_and_size, host_memory_allocation, ) from cudf.utils.nvtx_annotation import _get_color_for_nvtx, annotate @@ -27,86 +27,6 @@ from cudf.core.buffer.spill_manager import SpillManager -def get_spillable_owner(data) -> Optional[SpillableBuffer]: - """Get the spillable owner of `data`, if any exist - - Search through the stack of data owners in order to find an - owner of type `SpillableBuffer` (not subclasses). - - Parameters - ---------- - data : buffer-like or array-like - A buffer-like or array-like object that represent C-contiguous memory. - - Return - ------ - SpillableBuffer or None - The owner of `data` if spillable or None. - """ - - if type(data) is SpillableBuffer: - return data - if hasattr(data, "owner"): - return get_spillable_owner(data.owner) - return None - - -def as_spillable_buffer(data, exposed: bool) -> SpillableBuffer: - """Factory function to wrap `data` in a SpillableBuffer object. - - If `data` isn't a buffer already, a new buffer that points to the memory of - `data` is created. If `data` represents host memory, it is copied to a new - `rmm.DeviceBuffer` device allocation. Otherwise, the memory of `data` is - **not** copied, instead the new buffer keeps a reference to `data` in order - to retain its lifetime. - - If `data` is owned by a spillable buffer, a "slice" of the buffer is - returned. In this case, the spillable buffer must either be "exposed" or - spilled locked (called within an acquire_spill_lock context). This is to - guarantee that the memory of `data` isn't spilled before this function gets - to calculate the offset of the new slice. - - It is illegal for a spillable buffer to own another spillable buffer. - - Parameters - ---------- - data : buffer-like or array-like - A buffer-like or array-like object that represent C-contiguous memory. - exposed : bool, optional - Mark the buffer as permanently exposed (unspillable). - - Return - ------ - SpillableBuffer - A spillabe buffer instance that represents the device memory of `data`. - """ - - from cudf.core.buffer.utils import get_spill_lock - - if not hasattr(data, "__cuda_array_interface__"): - if exposed: - raise ValueError("cannot created exposed host memory") - return SpillableBuffer._from_host_memory(data) - - spillable_owner = get_spillable_owner(data) - if spillable_owner is None: - return SpillableBuffer._from_device_memory(data, exposed=exposed) - - if not spillable_owner.exposed and get_spill_lock() is None: - raise ValueError( - "A owning spillable buffer must " - "either be exposed or spilled locked." - ) - - # At this point, we know that `data` is owned by a spillable buffer, - # which is exposed or spilled locked. - ptr, size = get_ptr_and_size(data.__cuda_array_interface__) - base_ptr = spillable_owner.memory_info()[0] - return SpillableBufferSlice( - spillable_owner, offset=ptr - base_ptr, size=size - ) - - class SpillLock: pass @@ -141,7 +61,7 @@ def __getitem__(self, i): raise IndexError("tuple index out of range") -class SpillableBuffer(Buffer): +class SpillableBufferOwner(BufferOwner): """A Buffer that supports spilling memory off the GPU to avoid OOMs. This buffer supports spilling the represented data to host memory. @@ -150,9 +70,9 @@ class SpillableBuffer(Buffer): device memory usage see `cudf.core.buffer.spill_manager.SpillManager`. Unspill is triggered automatically when accessing the data of the buffer. - The buffer might not be spillable, which is based on the "expose" status - of the buffer. We say that the buffer has been exposed if the device - pointer (integer or void*) has been accessed outside of SpillableBuffer. + The buffer might not be spillable, which is based on the "expose" status of + the buffer. We say that the buffer has been exposed if the device pointer + (integer or void*) has been accessed outside of SpillableBufferOwner. In this case, we cannot invalidate the device pointer by moving the data to host. @@ -160,17 +80,17 @@ class SpillableBuffer(Buffer): property. To avoid this, one can use `.get_ptr()` instead, which support exposing the buffer temporarily. - Use the factory function `as_buffer` to create a SpillableBuffer instance. + Use the factory function `as_buffer` to create a SpillableBufferOwner + instance. """ lock: RLock _spill_locks: weakref.WeakSet _last_accessed: float _ptr_desc: Dict[str, Any] - _exposed: bool _manager: SpillManager - def _finalize_init(self, ptr_desc: Dict[str, Any], exposed: bool) -> None: + def _finalize_init(self, ptr_desc: Dict[str, Any]) -> None: """Finish initialization of the spillable buffer This implements the common initialization that `_from_device_memory` @@ -180,8 +100,6 @@ def _finalize_init(self, ptr_desc: Dict[str, Any], exposed: bool) -> None: ---------- ptr_desc : dict Description of the memory. - exposed : bool, optional - Mark the buffer as permanently exposed (unspillable). """ from cudf.core.buffer.spill_manager import get_global_manager @@ -190,7 +108,6 @@ def _finalize_init(self, ptr_desc: Dict[str, Any], exposed: bool) -> None: self._spill_locks = weakref.WeakSet() self._last_accessed = time.monotonic() self._ptr_desc = ptr_desc - self._exposed = exposed manager = get_global_manager() if manager is None: raise ValueError( @@ -202,7 +119,7 @@ def _finalize_init(self, ptr_desc: Dict[str, Any], exposed: bool) -> None: self._manager.add(self) @classmethod - def _from_device_memory(cls, data: Any, *, exposed: bool = False) -> Self: + def _from_device_memory(cls, data: Any, exposed: bool) -> Self: """Create a spillabe buffer from device memory. No data is being copied. @@ -211,16 +128,16 @@ def _from_device_memory(cls, data: Any, *, exposed: bool = False) -> Self: ---------- data : device-buffer-like An object implementing the CUDA Array Interface. - exposed : bool, optional + exposed : bool Mark the buffer as permanently exposed (unspillable). Returns ------- - SpillableBuffer + SpillableBufferOwner Buffer representing the same device memory as `data` """ - ret = super()._from_device_memory(data) - ret._finalize_init(ptr_desc={"type": "gpu"}, exposed=exposed) + ret = super()._from_device_memory(data, exposed=exposed) + ret._finalize_init(ptr_desc={"type": "gpu"}) return ret @classmethod @@ -241,7 +158,7 @@ def _from_host_memory(cls, data: Any) -> Self: Returns ------- - SpillableBuffer + SpillableBufferOwner Buffer representing a copy of `data`. """ @@ -257,20 +174,14 @@ def _from_host_memory(cls, data: Any) -> Self: ret._owner = None ret._ptr = 0 ret._size = data.nbytes - ret._finalize_init( - ptr_desc={"type": "cpu", "memoryview": data}, exposed=False - ) + ret._exposed = False + ret._finalize_init(ptr_desc={"type": "cpu", "memoryview": data}) return ret @property def is_spilled(self) -> bool: return self._ptr_desc["type"] != "gpu" - def copy(self, deep: bool = True) -> Self: - spill_lock = SpillLock() - self.spill_lock(spill_lock=spill_lock) - return super().copy(deep=deep) - def spill(self, target: str = "cpu") -> None: """Spill or un-spill this buffer in-place @@ -343,10 +254,10 @@ def mark_exposed(self) -> None: self._manager.spill_to_device_limit() with self.lock: - if not self._exposed: + if not self.exposed: self._manager.statistics.log_expose(self) self.spill(target="gpu") - self._exposed = True + super().mark_exposed() self._last_accessed = time.monotonic() def spill_lock(self, spill_lock: SpillLock) -> None: @@ -415,25 +326,9 @@ def memory_info(self) -> Tuple[int, int, str]: ).__array_interface__["data"][0] return (ptr, self.nbytes, self._ptr_desc["type"]) - @property - def owner(self) -> Any: - return self._owner - - @property - def exposed(self) -> bool: - return self._exposed - @property def spillable(self) -> bool: - return not self._exposed and len(self._spill_locks) == 0 - - @property - def size(self) -> int: - return self._size - - @property - def nbytes(self) -> int: - return self._size + return not self.exposed and len(self._spill_locks) == 0 @property def last_accessed(self) -> float: @@ -465,148 +360,114 @@ def memoryview( ) return ret - def _getitem(self, offset: int, size: int) -> SpillableBufferSlice: - return SpillableBufferSlice(base=self, offset=offset, size=size) - - def serialize(self) -> Tuple[dict, list]: - """Serialize the Buffer - - Normally, we would use `[self]` as the frames. This would work but - also mean that `self` becomes exposed permanently if the frames are - later accessed through `__cuda_array_interface__`, which is exactly - what libraries like Dask+UCX would do when communicating! - - The sound solution is to modify Dask et al. so that they access the - frames through `.get_ptr()` and holds on to the `spill_lock` until - the frame has been transferred. However, until this adaptation we - use a hack where the frame is a `Buffer` with a `spill_lock` as the - owner, which makes `self` unspillable while the frame is alive but - doesn't expose `self` when `__cuda_array_interface__` is accessed. - - Warning, this hack means that the returned frame must be copied before - given to `.deserialize()`, otherwise we would have a `Buffer` pointing - to memory already owned by an existing `SpillableBuffer`. - """ - header: Dict[Any, Any] - frames: List[Buffer | memoryview] - with self.lock: - header = {} - header["type-serialized"] = pickle.dumps(self.__class__) - header["frame_count"] = 1 - if self.is_spilled: - frames = [self.memoryview()] - else: - # TODO: Use `frames=[self]` instead of this hack, see doc above - spill_lock = SpillLock() - self.spill_lock(spill_lock) - ptr, size, _ = self.memory_info() - frames = [ - Buffer._from_device_memory( - cuda_array_interface_wrapper( - ptr=ptr, - size=size, - owner=(self._owner, spill_lock), - ) - ) - ] - return header, frames - - def __repr__(self) -> str: + def __str__(self) -> str: if self._ptr_desc["type"] != "gpu": ptr_info = str(self._ptr_desc) else: ptr_info = str(hex(self._ptr)) return ( - f"" ) -class SpillableBufferSlice(SpillableBuffer): +class SpillableBuffer(Buffer): """A slice of a spillable buffer This buffer applies the slicing and then delegates all - operations to its base buffer. + operations to its owning buffer. Parameters ---------- - base : SpillableBuffer - The base of the view + owner : SpillableBufferOwner + The owner of the view offset : int - Memory offset into the base buffer + Memory offset into the owning buffer size : int Size of the view (in bytes) """ - def __init__(self, base: SpillableBuffer, offset: int, size: int) -> None: - if size < 0: - raise ValueError("size cannot be negative") - if offset < 0: - raise ValueError("offset cannot be negative") - if offset + size > base.size: - raise ValueError( - "offset+size cannot be greater than the size of base" - ) - self._base = base - self._offset = offset - self._size = size - self._owner = base - self.lock = base.lock - - def get_ptr(self, *, mode: Literal["read", "write"]) -> int: - """ - A passthrough method to `SpillableBuffer.get_ptr` - with factoring in the `offset`. - """ - return self._base.get_ptr(mode=mode) + self._offset - - def _getitem(self, offset: int, size: int) -> SpillableBufferSlice: - return SpillableBufferSlice( - base=self._base, offset=offset + self._offset, size=size - ) + _owner: SpillableBufferOwner - @classmethod - def deserialize(cls, header: dict, frames: list): - # TODO: because of the hack in `SpillableBuffer.serialize()` where - # frames are of type `Buffer`, we always deserialize as if they are - # `SpillableBuffer`. In the future, we should be able to - # deserialize into `SpillableBufferSlice` when the frames hasn't been - # copied. - return SpillableBuffer.deserialize(header, frames) - - def memoryview( - self, *, offset: int = 0, size: Optional[int] = None - ) -> memoryview: - size = self._size if size is None else size - return self._base.memoryview(offset=self._offset + offset, size=size) - - def __repr__(self) -> str: - return ( - f" None: - return self._base.spill(target=target) + return self._owner.spill(target=target) @property def is_spilled(self) -> bool: - return self._base.is_spilled + return self._owner.is_spilled @property def exposed(self) -> bool: - return self._base.exposed + return self._owner.exposed @property def spillable(self) -> bool: - return self._base.spillable + return self._owner.spillable def spill_lock(self, spill_lock: SpillLock) -> None: - self._base.spill_lock(spill_lock=spill_lock) + self._owner.spill_lock(spill_lock=spill_lock) def memory_info(self) -> Tuple[int, int, str]: - (ptr, _, device_type) = self._base.memory_info() + (ptr, _, device_type) = self._owner.memory_info() return (ptr + self._offset, self.nbytes, device_type) + + def mark_exposed(self) -> None: + self._owner.mark_exposed() + + def serialize(self) -> Tuple[dict, list]: + """Serialize the Buffer + + Normally, we would use `[self]` as the frames. This would work but + also mean that `self` becomes exposed permanently if the frames are + later accessed through `__cuda_array_interface__`, which is exactly + what libraries like Dask+UCX would do when communicating! + + The sound solution is to modify Dask et al. so that they access the + frames through `.get_ptr()` and holds on to the `spill_lock` until + the frame has been transferred. However, until this adaptation we + use a hack where the frame is a `Buffer` with a `spill_lock` as the + owner, which makes `self` unspillable while the frame is alive but + doesn't expose `self` when `__cuda_array_interface__` is accessed. + + Warning, this hack means that the returned frame must be copied before + given to `.deserialize()`, otherwise we would have a `Buffer` pointing + to memory already owned by an existing `SpillableBufferOwner`. + """ + header: Dict[str, Any] = {} + frames: List[Buffer | memoryview] + with self._owner.lock: + header["type-serialized"] = pickle.dumps(self.__class__) + header["owner-type-serialized"] = pickle.dumps(type(self._owner)) + header["frame_count"] = 1 + if self.is_spilled: + frames = [self.memoryview()] + else: + # TODO: Use `frames=[self]` instead of this hack, see doc above + spill_lock = SpillLock() + self.spill_lock(spill_lock) + ptr, size, _ = self.memory_info() + frames = [ + Buffer( + owner=BufferOwner._from_device_memory( + cuda_array_interface_wrapper( + ptr=ptr, + size=size, + owner=(self._owner, spill_lock), + ), + exposed=False, + ) + ) + ] + return header, frames + + @property + def __cuda_array_interface__(self) -> dict: + return { + "data": DelayedPointerTuple(self), + "shape": (self.size,), + "strides": None, + "typestr": "|u1", + "version": 0, + } diff --git a/python/cudf/cudf/core/buffer/utils.py b/python/cudf/cudf/core/buffer/utils.py index 373be99ec96..c2ec7effd13 100644 --- a/python/cudf/cudf/core/buffer/utils.py +++ b/python/cudf/cudf/core/buffer/utils.py @@ -1,18 +1,51 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. from __future__ import annotations import threading from contextlib import ContextDecorator -from typing import Any, Dict, Optional, Tuple, Union - -from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper -from cudf.core.buffer.exposure_tracked_buffer import as_exposure_tracked_buffer +from typing import Any, Dict, Optional, Tuple, Type, Union + +from cudf.core.buffer.buffer import ( + Buffer, + BufferOwner, + cuda_array_interface_wrapper, + get_ptr_and_size, +) +from cudf.core.buffer.exposure_tracked_buffer import ExposureTrackedBuffer from cudf.core.buffer.spill_manager import get_global_manager -from cudf.core.buffer.spillable_buffer import SpillLock, as_spillable_buffer +from cudf.core.buffer.spillable_buffer import ( + SpillableBuffer, + SpillableBufferOwner, + SpillLock, +) from cudf.options import get_option +def get_buffer_owner(data: Any) -> Optional[BufferOwner]: + """Get the owner of `data`, if one exists + + Search through the stack of data owners in order to find an + owner BufferOwner (incl. subclasses). + + Parameters + ---------- + data + The data object to search for a BufferOwner instance + + Return + ------ + BufferOwner or None + The owner of `data` if found otherwise None. + """ + + if isinstance(data, BufferOwner): + return data + if hasattr(data, "owner"): + return get_buffer_owner(data.owner) + return None + + def as_buffer( data: Union[int, Any], *, @@ -30,7 +63,17 @@ def as_buffer( If `data` is an integer, it is assumed to point to device memory. - Raises ValueError if data isn't C-contiguous. + Raises ValueError if `data` isn't C-contiguous. + + If copy-on-write is enabled, an ExposureTrackedBuffer is returned. + + If spilling is enabled, a SpillableBuffer that refers to a + SpillableBufferOwner is returned. If `data` is owned by a spillable buffer, + it must either be "exposed" or spill locked (called within an + acquire_spill_lock context). This is to guarantee that the memory of `data` + isn't spilled before this function gets to calculate the offset of the new + SpillableBuffer. + Parameters ---------- @@ -73,13 +116,49 @@ def as_buffer( "`data` is a buffer-like or array-like object" ) - if get_option("copy_on_write"): - return as_exposure_tracked_buffer(data, exposed=exposed) + # Find the buffer types to return based on the current config + owner_class: Type[BufferOwner] + buffer_class: Type[Buffer] if get_global_manager() is not None: - return as_spillable_buffer(data, exposed=exposed) - if hasattr(data, "__cuda_array_interface__"): - return Buffer._from_device_memory(data) - return Buffer._from_host_memory(data) + owner_class = SpillableBufferOwner + buffer_class = SpillableBuffer + elif get_option("copy_on_write"): + owner_class = BufferOwner + buffer_class = ExposureTrackedBuffer + else: + owner_class = BufferOwner + buffer_class = Buffer + + # Handle host memory, + if not hasattr(data, "__cuda_array_interface__"): + if exposed: + raise ValueError("cannot created exposed host memory") + return buffer_class(owner=owner_class._from_host_memory(data)) + + # Check if `data` is owned by a known class + owner = get_buffer_owner(data) + if owner is None: # `data` is new device memory + return buffer_class( + owner=owner_class._from_device_memory(data, exposed=exposed) + ) + + # At this point, we know that `data` is owned by a known class, which + # should be the same class as specified by the current config (see above) + assert owner.__class__ is owner_class + if ( + isinstance(owner, SpillableBufferOwner) + and not owner.exposed + and get_spill_lock() is None + ): + raise ValueError( + "An owning spillable buffer must " + "either be exposed or spill locked." + ) + ptr, size = get_ptr_and_size(data.__cuda_array_interface__) + base_ptr = owner.get_ptr(mode="read") + if size > 0 and base_ptr == 0: + raise ValueError("Cannot create a non-empty slice of a null buffer") + return buffer_class(owner=owner, offset=ptr - base_ptr, size=size) _thread_spill_locks: Dict[int, Tuple[Optional[SpillLock], int]] = {} diff --git a/python/cudf/cudf/tests/test_buffer.py b/python/cudf/cudf/tests/test_buffer.py index 1c9e7475080..03637e05eae 100644 --- a/python/cudf/cudf/tests/test_buffer.py +++ b/python/cudf/cudf/tests/test_buffer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. import cupy as cp import pytest @@ -64,7 +64,14 @@ def test_buffer_creation_from_any(): assert isinstance(b, Buffer) assert ary.data.ptr == b.get_ptr(mode="read") assert ary.nbytes == b.size - assert b.owner.owner is ary + assert b.owner.owner.owner is ary + + +@pytest.mark.parametrize("size", [10, 2**10 + 500, 2**20]) +def test_buffer_str(size): + ary = cp.arange(size, dtype="uint8") + buf = as_buffer(ary) + assert f"size={size}" in repr(buf) @pytest.mark.parametrize( @@ -73,7 +80,7 @@ def test_buffer_creation_from_any(): def test_buffer_repr(size, expect): ary = cp.arange(size, dtype="uint8") buf = as_buffer(ary) - assert f"size={expect}" in repr(buf) + assert f"size={expect}" in str(buf) @pytest.mark.parametrize( diff --git a/python/cudf/cudf/tests/test_copying.py b/python/cudf/cudf/tests/test_copying.py index 085774e9dbc..e737a73e86b 100644 --- a/python/cudf/cudf/tests/test_copying.py +++ b/python/cudf/cudf/tests/test_copying.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. import cupy as cp import numpy as np @@ -113,11 +113,8 @@ def test_series_setitem_partial_slice_cow_on(): assert_eq(new_copy, cudf.Series([1, 2, 300, 300, 5])) new_slice = actual[2:] - # TODO: when COW and spilling has been unified, find a clean way to - # test this without accessing the internal attributes _base and _ptr assert ( - new_slice._column.base_data._base._ptr - == actual._column.base_data._base._ptr + new_slice._column.base_data.owner == actual._column.base_data.owner ) new_slice[0:2] = 10 assert_eq(new_slice, cudf.Series([10, 10, 5], index=[2, 3, 4])) @@ -134,9 +131,11 @@ def test_series_setitem_partial_slice_cow_off(): assert_eq(new_copy, cudf.Series([1, 2, 300, 300, 5])) new_slice = actual[2:] - assert ( - new_slice._column.base_data._ptr == actual._column.base_data._ptr - ) + # Since COW is off, a slice should point to the same memory + ptr1 = new_slice._column.base_data.get_ptr(mode="read") + ptr2 = actual._column.base_data.get_ptr(mode="read") + assert ptr1 == ptr2 + new_slice[0:2] = 10 assert_eq(new_slice, cudf.Series([10, 10, 5], index=[2, 3, 4])) assert_eq(actual, cudf.Series([1, 2, 10, 10, 5])) diff --git a/python/cudf/cudf/tests/test_spilling.py b/python/cudf/cudf/tests/test_spilling.py index 88ce908aa5f..7e66a7ab4ba 100644 --- a/python/cudf/cudf/tests/test_spilling.py +++ b/python/cudf/cudf/tests/test_spilling.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. import importlib import random @@ -34,7 +34,7 @@ ) from cudf.core.buffer.spillable_buffer import ( SpillableBuffer, - SpillableBufferSlice, + SpillableBufferOwner, SpillLock, ) from cudf.testing._utils import assert_eq @@ -196,10 +196,10 @@ def test_creations(manager: SpillManager): def test_spillable_df_groupby(manager: SpillManager): df = cudf.DataFrame({"a": [1, 1, 1]}) gb = df.groupby("a") - assert len(single_column_df_base_data(df)._spill_locks) == 0 + assert len(single_column_df_base_data(df).owner._spill_locks) == 0 gb._groupby # `gb._groupby`, which is cached on `gb`, holds a spill lock - assert len(single_column_df_base_data(df)._spill_locks) == 1 + assert len(single_column_df_base_data(df).owner._spill_locks) == 1 assert not single_column_df_data(df).spillable del gb assert single_column_df_data(df).spillable @@ -375,7 +375,7 @@ def test_get_ptr(manager: SpillManager, target): mem = np.empty(10, dtype="u1") buf = as_buffer(data=mem, exposed=False) assert buf.spillable - assert len(buf._spill_locks) == 0 + assert len(buf.owner._spill_locks) == 0 with acquire_spill_lock(): buf.get_ptr(mode="read") assert not buf.spillable @@ -496,8 +496,8 @@ def test_serialize_cuda_dataframe(manager: SpillManager): header, frames = protocol.serialize( df1, serializers=("cuda",), on_error="raise" ) - buf: SpillableBufferSlice = single_column_df_data(df1) - assert len(buf._base._spill_locks) == 1 + buf: SpillableBuffer = single_column_df_data(df1) + assert len(buf.owner._spill_locks) == 1 assert len(frames) == 1 assert isinstance(frames[0], Buffer) assert frames[0].get_ptr(mode="read") == buf.get_ptr(mode="read") @@ -543,13 +543,14 @@ def test_as_buffer_of_spillable_buffer(manager: SpillManager): data = cupy.arange(10, dtype="u1") b1 = as_buffer(data, exposed=False) assert isinstance(b1, SpillableBuffer) - assert b1.owner is data + assert isinstance(b1.owner, SpillableBufferOwner) + assert b1.owner.owner is data b2 = as_buffer(b1) assert b1 is b2 with pytest.raises( ValueError, - match="buffer must either be exposed or spilled locked", + match="owning spillable buffer must either be exposed or spill locked", ): # Use `memory_info` to access device point _without_ making # the buffer unspillable. @@ -557,21 +558,21 @@ def test_as_buffer_of_spillable_buffer(manager: SpillManager): with acquire_spill_lock(): b3 = as_buffer(b1.get_ptr(mode="read"), size=b1.size, owner=b1) - assert isinstance(b3, SpillableBufferSlice) - assert b3.owner is b1 + assert isinstance(b3, SpillableBuffer) + assert b3.owner is b1.owner b4 = as_buffer( b1.get_ptr(mode="write") + data.itemsize, size=b1.size - data.itemsize, owner=b3, ) - assert isinstance(b4, SpillableBufferSlice) - assert b4.owner is b1 + assert isinstance(b4, SpillableBuffer) + assert b4.owner is b1.owner assert all(cupy.array(b4.memoryview()) == data[1:]) b5 = as_buffer(b4.get_ptr(mode="write"), size=b4.size - 1, owner=b4) - assert isinstance(b5, SpillableBufferSlice) - assert b5.owner is b1 + assert isinstance(b5, SpillableBuffer) + assert b5.owner is b1.owner assert all(cupy.array(b5.memoryview()) == data[1:-1])