Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify Copy-On-Write and Spilling #15436

Merged
merged 30 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f2bf171
clean up
madsbk Apr 2, 2024
276dd80
remove redundant methods
madsbk Apr 2, 2024
2413e99
impl. and use BufferOwner.__init__
madsbk Apr 2, 2024
9f4b0e1
SpillableBufferOwner: use __init__
madsbk Apr 2, 2024
834f6d5
remove SpillableBuffer.mark_exposed()
madsbk Apr 2, 2024
68524e4
remove SpillableBuffer.exposed
madsbk Apr 2, 2024
2e43f85
SpillableBuffer(ExposureTrackedBuffer)
madsbk Apr 2, 2024
659e832
mark cow tests as "spilling"
madsbk Apr 3, 2024
c5355a5
impl. SpillableBuffer.copy
madsbk Apr 3, 2024
3581579
option: allow cow and spilling at the same time
madsbk Apr 3, 2024
58f7e8e
test: skip zero-copy and no-copy-on-write test whe spilling is enabled
madsbk Apr 3, 2024
6018949
cleanup
madsbk Apr 3, 2024
f48c529
impl. set_spill_on_demand_globally
madsbk Apr 3, 2024
dcff299
impl. spill_on_demand_globally
madsbk Apr 3, 2024
4a8b43e
cleanup
madsbk Apr 3, 2024
7344f9a
impl. test_spilling_and_copy_on_write
madsbk Apr 3, 2024
1e9d061
don't copy spilled data
madsbk Apr 3, 2024
dc715e1
test_get_rmm_memory_resource_stack is safe again
madsbk Apr 3, 2024
3eaf5e3
Apply suggestions from code review
madsbk Apr 4, 2024
a4a3ff4
BufferOwner: forcing keyword-only
madsbk Apr 4, 2024
55027fc
Merge branch 'branch-24.06' of github.com:rapidsai/cudf into cow_and_…
madsbk Apr 4, 2024
504ae9a
typo
madsbk Apr 4, 2024
53106ee
Merge branch 'branch-24.06' of github.com:rapidsai/cudf into cow_and_…
madsbk Apr 11, 2024
ac6831a
doc
madsbk Apr 11, 2024
49625fa
rename _from_*_memory => from_*_memory
madsbk Apr 11, 2024
f4458b8
enable must of test_series_zero_copy_cow_off again
madsbk Apr 11, 2024
30cd8e6
doc
madsbk Apr 11, 2024
580dead
Update python/cudf/cudf/tests/test_spilling.py
madsbk Apr 11, 2024
6fe2d58
more tests
madsbk Apr 11, 2024
51b4b82
Merge branch 'branch-24.06' of github.com:rapidsai/cudf into cow_and_…
madsbk Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 44 additions & 25 deletions python/cudf/cudf/core/buffer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,25 @@ class BufferOwner(Serializable):
been accessed outside of BufferOwner. In this case, we have no control
over knowing if the data is being modified by a third party.

Use `_from_device_memory` and `_from_host_memory` to create
Use `from_device_memory` and `from_host_memory` to create
a new instance from either device or host memory respectively.

Parameters
----------
ptr
An integer representing a pointer to memory.
size
The size of the memory in nbytes
owner
Python object to which the lifetime of the memory allocation is tied.
This buffer will keep a reference to `owner`.
exposed
Pointer to the underlying memory

Raises
------
ValueError
If size is negative
"""

_ptr: int
Expand All @@ -117,14 +134,25 @@ class BufferOwner(Serializable):
# The set of buffers that point to this owner.
_slices: weakref.WeakSet[Buffer]

def __init__(self):
raise ValueError(
f"do not create a {self.__class__} directly, please "
"use the factory function `cudf.core.buffer.as_buffer`"
)
def __init__(
self,
*,
ptr: int,
size: int,
owner: object,
exposed: bool,
):
if size < 0:
raise ValueError("size cannot be negative")

self._ptr = ptr
self._size = size
self._owner = owner
self._exposed = exposed
self._slices = weakref.WeakSet()

@classmethod
def _from_device_memory(cls, data: Any, exposed: bool) -> Self:
def from_device_memory(cls, data: Any, exposed: bool) -> Self:
"""Create from an object providing a `__cuda_array_interface__`.

No data is being copied.
Expand All @@ -151,24 +179,15 @@ def _from_device_memory(cls, data: Any, exposed: bool) -> Self:
If the resulting buffer has negative size
"""

# Bypass `__init__` and initialize attributes manually
ret = cls.__new__(cls)
ret._owner = data
ret._exposed = exposed
ret._slices = weakref.WeakSet()
if isinstance(data, rmm.DeviceBuffer): # Common case shortcut
ret._ptr = data.ptr
ret._size = data.size
ptr = data.ptr
size = data.size
else:
ret._ptr, ret._size = get_ptr_and_size(
data.__cuda_array_interface__
)
if ret.size < 0:
raise ValueError("size cannot be negative")
return ret
ptr, size = get_ptr_and_size(data.__cuda_array_interface__)
return cls(ptr=ptr, size=size, owner=data, exposed=exposed)

@classmethod
def _from_host_memory(cls, data: Any) -> Self:
def from_host_memory(cls, data: Any) -> Self:
"""Create an owner from a buffer or array like object

Data must implement `__array_interface__`, the buffer protocol, and/or
Expand Down Expand Up @@ -196,7 +215,7 @@ def _from_host_memory(cls, data: Any) -> Self:
# Copy to device memory
buf = rmm.DeviceBuffer(ptr=ptr, size=size)
# Create from device memory
return cls._from_device_memory(buf, exposed=False)
return cls.from_device_memory(buf, exposed=False)

@property
def size(self) -> int:
Expand Down Expand Up @@ -375,7 +394,7 @@ def copy(self, deep: bool = True) -> Self:
)

# Otherwise, we create a new copy of the memory
owner = self._owner._from_device_memory(
owner = self._owner.from_device_memory(
rmm.DeviceBuffer(
ptr=self._owner.get_ptr(mode="read") + self._offset,
size=self.size,
Expand Down Expand Up @@ -439,9 +458,9 @@ def deserialize(cls, header: dict, frames: list) -> Self:

owner_type: BufferOwner = pickle.loads(header["owner-type-serialized"])
if hasattr(frame, "__cuda_array_interface__"):
owner = owner_type._from_device_memory(frame, exposed=False)
owner = owner_type.from_device_memory(frame, exposed=False)
else:
owner = owner_type._from_host_memory(frame)
owner = owner_type.from_host_memory(frame)
return cls(
owner=owner,
offset=0,
Expand Down
18 changes: 6 additions & 12 deletions python/cudf/cudf/core/buffer/exposure_tracked_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,14 @@ class ExposureTrackedBuffer(Buffer):
The size of the slice (in bytes)
"""

_owner: BufferOwner

def __init__(
self,
owner: BufferOwner,
offset: int = 0,
size: Optional[int] = None,
) -> None:
super().__init__(owner=owner, offset=offset, size=size)
self._owner._slices.add(self)

@property
def exposed(self) -> bool:
return self._owner.exposed
self.owner._slices.add(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: should we assert isinstance(owner, BufferOwner) here? Or is it not necessary because we implicitly require it via the _slices attribute?

Copy link
Member Author

@madsbk madsbk Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, we could add assert isinstance(owner, BufferOwner) in Buffer.__init__ but I am not sure if that is pythonic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An assertion would basically be there to protect against developer error. There's nothing unpythonic about it, that's definitely what the functionality is there for. The main question I would have is, how would we end up with something other than a BufferOwner here, and how bad would the failure mode be? Unless the assertion makes it markedly easier to debug the code it doesn't add a whole lot of value IMHO.

Copy link
Member Author

@madsbk madsbk Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is arguable a little unpythonic to prevent duck-typing by using isinstance to force a specify type (as opposed to a general ABC/protocol type).
Anyways, in this case as_buffer() is the only one creating ExposureTrackedBuffer and it explicitly sets the owner to SpillableBufferOwner | SpillableBuffer | BufferOwner so I don't think a type error here is likely.


def get_ptr(self, *, mode: Literal["read", "write"]) -> int:
if mode == "write" and cudf.get_option("copy_on_write"):
Expand Down Expand Up @@ -72,7 +66,7 @@ def copy(self, deep: bool = True) -> Self:
copy-on-write option (see above).
"""
if cudf.get_option("copy_on_write"):
return super().copy(deep=deep or self.exposed)
return super().copy(deep=deep or self.owner.exposed)
return super().copy(deep=deep)

@property
Expand All @@ -98,11 +92,11 @@ def make_single_owner_inplace(self) -> None:
Buffer representing the same device memory as `data`
"""

if len(self._owner._slices) > 1:
# If this is not the only slice pointing to `self._owner`, we
# point to a new deep copy of the owner.
if len(self.owner._slices) > 1:
# If this is not the only slice pointing to `self.owner`, we
# point to a new copy of our slice of `self.owner`.
t = self.copy(deep=True)
self._owner = t._owner
self._owner = t.owner
self._offset = t._offset
self._size = t._size
self._owner._slices.add(self)
101 changes: 75 additions & 26 deletions python/cudf/cudf/core/buffer/spill_manager.py
vyasr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import warnings
import weakref
from collections import defaultdict
from contextlib import contextmanager
from dataclasses import dataclass
from functools import partial
from typing import Dict, List, Optional, Tuple
Expand Down Expand Up @@ -201,10 +202,6 @@ class SpillManager:
This class implements tracking of all known spillable buffers, on-demand
spilling of said buffers, and (optionally) maintains a memory usage limit.

When `spill_on_demand=True`, the manager registers an RMM out-of-memory
error handler, which will spill spillable buffers in order to free up
memory.

When `device_memory_limit=<limit-in-bytes>`, the manager will try keep
the device memory usage below the specified limit by spilling of spillable
buffers continuously, which will introduce a modest overhead.
Expand All @@ -213,8 +210,6 @@ class SpillManager:

Parameters
----------
spill_on_demand : bool
Enable spill on demand.
device_memory_limit: int, optional
If not None, this is the device memory limit in bytes that triggers
device to host spilling. The global manager sets this to the value
Expand All @@ -230,30 +225,15 @@ class SpillManager:
def __init__(
self,
*,
spill_on_demand: bool = False,
device_memory_limit: Optional[int] = None,
statistic_level: int = 0,
) -> None:
self._lock = threading.Lock()
self._buffers = weakref.WeakValueDictionary()
self._id_counter = 0
self._spill_on_demand = spill_on_demand
self._device_memory_limit = device_memory_limit
self.statistics = SpillStatistics(statistic_level)

if self._spill_on_demand:
# Set the RMM out-of-memory handle if not already set
mr = rmm.mr.get_current_device_resource()
if all(
not isinstance(m, rmm.mr.FailureCallbackResourceAdaptor)
for m in get_rmm_memory_resource_stack(mr)
):
rmm.mr.set_current_device_resource(
rmm.mr.FailureCallbackResourceAdaptor(
mr, self._out_of_memory_handle
)
)

def _out_of_memory_handle(self, nbytes: int, *, retry_once=True) -> bool:
"""Try to handle an out-of-memory error by spilling

Expand Down Expand Up @@ -408,8 +388,7 @@ def __repr__(self) -> str:
dev_limit = format_bytes(self._device_memory_limit)

return (
f"<SpillManager spill_on_demand={self._spill_on_demand} "
f"device_memory_limit={dev_limit} | "
f"<SpillManager device_memory_limit={dev_limit} | "
f"{format_bytes(spilled)} spilled | "
f"{format_bytes(unspilled)} ({unspillable_ratio:.0%}) "
f"unspilled (unspillable)>"
Expand Down Expand Up @@ -442,12 +421,82 @@ def get_global_manager() -> Optional[SpillManager]:
"""Get the global manager or None if spilling is disabled"""
global _global_manager_uninitialized
if _global_manager_uninitialized:
manager = None
if get_option("spill"):
manager = SpillManager(
spill_on_demand=get_option("spill_on_demand"),
device_memory_limit=get_option("spill_device_limit"),
statistic_level=get_option("spill_stats"),
)
set_global_manager(manager)
set_global_manager(manager)
if get_option("spill_on_demand"):
set_spill_on_demand_globally()
else:
set_global_manager(None)
return _global_manager


def set_spill_on_demand_globally() -> None:
"""Enable spill on demand in the current global spill manager.

Warning: this modifies the current RMM memory resource. A memory resource
to handle out-of-memory errors is pushed onto the RMM memory resource stack.

Raises
------
ValueError
If no global spill manager exists (spilling is disabled).
ValueError
If a failure callback resource is already in the resource stack.
"""

manager = get_global_manager()
if manager is None:
raise ValueError(
"Cannot enable spill on demand with no global spill manager"
)
mr = rmm.mr.get_current_device_resource()
if any(
isinstance(m, rmm.mr.FailureCallbackResourceAdaptor)
for m in get_rmm_memory_resource_stack(mr)
):
raise ValueError(
"Spill on demand (or another failure callback resource) "
"is already registered"
)
rmm.mr.set_current_device_resource(
rmm.mr.FailureCallbackResourceAdaptor(
mr, manager._out_of_memory_handle
)
)


@contextmanager
def spill_on_demand_globally():
"""Context to enable spill on demand temporarily.

Warning: this modifies the current RMM memory resource. A memory resource
to handle out-of-memory errors is pushed onto the RMM memory resource stack
when entering the context and popped again when exiting.

Raises
------
ValueError
If no global spill manager exists (spilling is disabled).
ValueError
If a failure callback resource is already in the resource stack.
ValueError
If the RMM memory source stack was changed while in the context.
"""
set_spill_on_demand_globally()
# Save the new memory resource stack for later cleanup
mr_stack = get_rmm_memory_resource_stack(
rmm.mr.get_current_device_resource()
)
try:
yield
finally:
mr = rmm.mr.get_current_device_resource()
if mr_stack != get_rmm_memory_resource_stack(mr):
raise ValueError(
"RMM memory source stack was changed while in the context"
)
rmm.mr.set_current_device_resource(mr_stack[1])
Loading
Loading