Skip to content

Commit

Permalink
avoid storing too much in arrangements (#7592)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 9b36302c51750955938be4f20d6bb93ec487910f
  • Loading branch information
KamilPiechowiak authored and Manul from Pathway committed Nov 20, 2024
1 parent 976cc83 commit 25325a6
Show file tree
Hide file tree
Showing 8 changed files with 729 additions and 201 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Changed
- `pw.Table.concat`, `pw.Table.with_id`, `pw.Table.with_id_from` no longer perform checks if ids are unique. It improves memory usage.
- table operations that store values (like `pw.Table.join`, `pw.Table.update_cells`) no longer store columns that are not used downstream.

### Fixed
- `query_as_of_now` of `pw.stdlib.indexing.DataIndex` and `pw.stdlib.indexing.HybridIndex` now work in constant memory for infinite query stream (no query-related data is kept after query is answered).
Expand Down
78 changes: 78 additions & 0 deletions integration_tests/common/test_unused_columns_removal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import multiprocessing
import resource
import time

import numpy as np

import pathway as pw


@pw.udf(deterministic=True)
def embedder(x: str) -> np.ndarray:
return np.arange(100_000) + ord(x[0])


@pw.udf(deterministic=True)
def anti_embedder(x: np.ndarray) -> str:
return chr(x[0])


class QuerySchema(pw.Schema):
query: str


class QuerySubject(pw.io.python.ConnectorSubject):
def __init__(self, n: int) -> None:
super().__init__()
self.n = n

def run(self):
for i in range(self.n):
time.sleep(0.001)
self.next(query=f"{chr(i%26 + 97)}")


class DocSchema(pw.Schema):
doc: str


class DocsSubject(pw.io.python.ConnectorSubject):
def __init__(self, n: int) -> None:
super().__init__()
self.n = n

def run(self):
for doc in ["a", "b", "c", "d", "x", "z"]:
self.next(doc=doc)
time.sleep(0.001 * self.n + 10)


def run(n: int) -> None:
query = pw.io.python.read(
QuerySubject(n), schema=QuerySchema, autocommit_duration_ms=100
)
max_depth_2 = query.with_columns(vec=embedder(pw.this.query))
max_depth_3 = max_depth_2.with_columns(c=anti_embedder(pw.this.vec))
docs = pw.io.python.read(DocsSubject(n), schema=DocSchema)
res = max_depth_3.join(docs, pw.left.query == pw.right.doc).select(
pw.left.query, pw.left.c, pw.right.doc
)

pw.io.null.write(res)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
assert resource.getrusage(resource.RUSAGE_SELF).ru_maxrss < 1_500_000


def test_big_columns_are_not_stored_if_not_needed():
n = 100_000
p = multiprocessing.Process(
target=run,
args=(n,),
)
p.start()
try:
p.join(timeout=400)
assert p.exitcode == 0
finally:
p.terminate()
p.join()
51 changes: 51 additions & 0 deletions python/pathway/internals/dtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import collections
import datetime
import functools
import math
import typing
from abc import ABC, abstractmethod
from enum import Enum
Expand Down Expand Up @@ -55,6 +56,11 @@ def equivalent_to(self, other: DType) -> bool:
@abstractmethod
def typehint(self) -> typing.Any: ...

@abstractmethod
def max_size(self) -> float:
"""The maximal size of a DType measured in the number of Value enums"""
...


class _SimpleDType(DType):
wrapped: type
Expand Down Expand Up @@ -101,6 +107,12 @@ def to_engine(self) -> api.PathwayType:
def typehint(self) -> type:
return self.wrapped

def max_size(self) -> float:
if self.wrapped in {str, bytes}:
return math.inf
else:
return 1


INT: DType = _SimpleDType(int)
BOOL: DType = _SimpleDType(bool)
Expand Down Expand Up @@ -129,6 +141,9 @@ def to_engine(self) -> api.PathwayType:
def typehint(self) -> None:
return None

def max_size(self) -> float:
return 1


NONE: DType = _NoneDType()

Expand All @@ -153,6 +168,9 @@ def is_value_compatible(self, arg):
def typehint(self) -> typing.Any:
return typing.Any

def max_size(self) -> float:
return math.inf


ANY: DType = _AnyDType()

Expand Down Expand Up @@ -197,6 +215,9 @@ def typehint(self) -> typing.Any:
self.return_type.typehint,
]

def max_size(self) -> float:
return math.inf


class Array(DType):
n_dim: int | None
Expand Down Expand Up @@ -254,6 +275,9 @@ def strip_dimension(self) -> DType:
else:
return self.wrapped

def max_size(self) -> float:
return math.inf


T = typing.TypeVar("T")

Expand Down Expand Up @@ -294,6 +318,9 @@ def typehint(self) -> type[api.Pointer]:
else:
return api.Pointer[tuple(arg.typehint for arg in self.args)] # type: ignore[misc]

def max_size(self) -> float:
return 1


ANY_POINTER: DType = Pointer(...)

Expand Down Expand Up @@ -328,6 +355,9 @@ def is_value_compatible(self, arg):
def typehint(self) -> type[UnionType]:
return self.wrapped.typehint | None

def max_size(self) -> float:
return self.wrapped.max_size()


class Tuple(DType):
args: tuple[DType, ...]
Expand Down Expand Up @@ -368,6 +398,9 @@ def is_value_compatible(self, arg):
def typehint(self) -> type[tuple]:
return tuple[tuple(arg.typehint for arg in self.args)] # type: ignore[misc]

def max_size(self) -> float:
return sum(arg.max_size() for arg in self.args) + 1


class Json(DType):
def __new__(cls) -> Json:
Expand All @@ -389,6 +422,9 @@ def is_value_compatible(self, arg):
def typehint(self) -> type[js.Json]:
return js.Json

def max_size(self) -> float:
return math.inf


JSON: DType = Json()

Expand Down Expand Up @@ -417,6 +453,9 @@ def is_value_compatible(self, arg):
def typehint(self) -> type[list]:
return list[self.wrapped.typehint] # type: ignore[name-defined]

def max_size(self) -> float:
return math.inf


class _DateTimeNaive(DType):
def __repr__(self):
Expand All @@ -438,6 +477,9 @@ def is_value_compatible(self, arg):
def typehint(self) -> type[datetime_types.DateTimeNaive]:
return datetime_types.DateTimeNaive

def max_size(self) -> float:
return 1


DATE_TIME_NAIVE = _DateTimeNaive()

Expand All @@ -462,6 +504,9 @@ def is_value_compatible(self, arg):
def typehint(self) -> type[datetime_types.DateTimeUtc]:
return datetime_types.DateTimeUtc

def max_size(self) -> float:
return 1


DATE_TIME_UTC = _DateTimeUtc()

Expand All @@ -486,6 +531,9 @@ def is_value_compatible(self, arg):
def typehint(self) -> type[datetime_types.Duration]:
return datetime_types.Duration

def max_size(self) -> float:
return 1


DURATION = _Duration()

Expand Down Expand Up @@ -514,6 +562,9 @@ def is_value_compatible(self, arg):
def typehint(self) -> type[api.PyObjectWrapper]:
return api.PyObjectWrapper[self.wrapped] # type: ignore[name-defined]

def max_size(self) -> float:
return math.inf


ANY_PY_OBJECT_WRAPPER: DType = PyObjectWrapper(object)

Expand Down
Loading

0 comments on commit 25325a6

Please sign in to comment.