From 25325a60859a456cbe8653a515b52675cc798a8f Mon Sep 17 00:00:00 2001 From: Kamil Piechowiak <32928185+KamilPiechowiak@users.noreply.github.com> Date: Wed, 20 Nov 2024 21:04:04 +0100 Subject: [PATCH] avoid storing too much in arrangements (#7592) GitOrigin-RevId: 9b36302c51750955938be4f20d6bb93ec487910f --- CHANGELOG.md | 1 + .../common/test_unused_columns_removal.py | 78 ++++ python/pathway/internals/dtype.py | 51 +++ .../graph_runner/expression_evaluator.py | 165 ++++----- .../internals/graph_runner/path_evaluator.py | 345 ++++++++++++++---- .../internals/graph_runner/path_storage.py | 95 ++++- python/pathway/tests/test_column_paths.py | 167 +++++++++ src/engine/dataflow.rs | 28 +- 8 files changed, 729 insertions(+), 201 deletions(-) create mode 100644 integration_tests/common/test_unused_columns_removal.py create mode 100644 python/pathway/tests/test_column_paths.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ebccb6fb..1aa90ae8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - `pw.Table.concat`, `pw.Table.with_id`, `pw.Table.with_id_from` no longer perform checks if ids are unique. It improves memory usage. +- table operations that store values (like `pw.Table.join`, `pw.Table.update_cells`) no longer store columns that are not used downstream. ### Fixed - `query_as_of_now` of `pw.stdlib.indexing.DataIndex` and `pw.stdlib.indexing.HybridIndex` now work in constant memory for infinite query stream (no query-related data is kept after query is answered). diff --git a/integration_tests/common/test_unused_columns_removal.py b/integration_tests/common/test_unused_columns_removal.py new file mode 100644 index 00000000..c15f4a44 --- /dev/null +++ b/integration_tests/common/test_unused_columns_removal.py @@ -0,0 +1,78 @@ +import multiprocessing +import resource +import time + +import numpy as np + +import pathway as pw + + +@pw.udf(deterministic=True) +def embedder(x: str) -> np.ndarray: + return np.arange(100_000) + ord(x[0]) + + +@pw.udf(deterministic=True) +def anti_embedder(x: np.ndarray) -> str: + return chr(x[0]) + + +class QuerySchema(pw.Schema): + query: str + + +class QuerySubject(pw.io.python.ConnectorSubject): + def __init__(self, n: int) -> None: + super().__init__() + self.n = n + + def run(self): + for i in range(self.n): + time.sleep(0.001) + self.next(query=f"{chr(i%26 + 97)}") + + +class DocSchema(pw.Schema): + doc: str + + +class DocsSubject(pw.io.python.ConnectorSubject): + def __init__(self, n: int) -> None: + super().__init__() + self.n = n + + def run(self): + for doc in ["a", "b", "c", "d", "x", "z"]: + self.next(doc=doc) + time.sleep(0.001 * self.n + 10) + + +def run(n: int) -> None: + query = pw.io.python.read( + QuerySubject(n), schema=QuerySchema, autocommit_duration_ms=100 + ) + max_depth_2 = query.with_columns(vec=embedder(pw.this.query)) + max_depth_3 = max_depth_2.with_columns(c=anti_embedder(pw.this.vec)) + docs = pw.io.python.read(DocsSubject(n), schema=DocSchema) + res = max_depth_3.join(docs, pw.left.query == pw.right.doc).select( + pw.left.query, pw.left.c, pw.right.doc + ) + + pw.io.null.write(res) + pw.run(monitoring_level=pw.MonitoringLevel.NONE) + assert resource.getrusage(resource.RUSAGE_SELF).ru_maxrss < 1_500_000 + + +def test_big_columns_are_not_stored_if_not_needed(): + n = 100_000 + p = multiprocessing.Process( + target=run, + args=(n,), + ) + p.start() + try: + p.join(timeout=400) + assert p.exitcode == 0 + finally: + p.terminate() + p.join() diff --git a/python/pathway/internals/dtype.py b/python/pathway/internals/dtype.py index 8451a468..48ee7c6a 100644 --- a/python/pathway/internals/dtype.py +++ b/python/pathway/internals/dtype.py @@ -5,6 +5,7 @@ import collections import datetime import functools +import math import typing from abc import ABC, abstractmethod from enum import Enum @@ -55,6 +56,11 @@ def equivalent_to(self, other: DType) -> bool: @abstractmethod def typehint(self) -> typing.Any: ... + @abstractmethod + def max_size(self) -> float: + """The maximal size of a DType measured in the number of Value enums""" + ... + class _SimpleDType(DType): wrapped: type @@ -101,6 +107,12 @@ def to_engine(self) -> api.PathwayType: def typehint(self) -> type: return self.wrapped + def max_size(self) -> float: + if self.wrapped in {str, bytes}: + return math.inf + else: + return 1 + INT: DType = _SimpleDType(int) BOOL: DType = _SimpleDType(bool) @@ -129,6 +141,9 @@ def to_engine(self) -> api.PathwayType: def typehint(self) -> None: return None + def max_size(self) -> float: + return 1 + NONE: DType = _NoneDType() @@ -153,6 +168,9 @@ def is_value_compatible(self, arg): def typehint(self) -> typing.Any: return typing.Any + def max_size(self) -> float: + return math.inf + ANY: DType = _AnyDType() @@ -197,6 +215,9 @@ def typehint(self) -> typing.Any: self.return_type.typehint, ] + def max_size(self) -> float: + return math.inf + class Array(DType): n_dim: int | None @@ -254,6 +275,9 @@ def strip_dimension(self) -> DType: else: return self.wrapped + def max_size(self) -> float: + return math.inf + T = typing.TypeVar("T") @@ -294,6 +318,9 @@ def typehint(self) -> type[api.Pointer]: else: return api.Pointer[tuple(arg.typehint for arg in self.args)] # type: ignore[misc] + def max_size(self) -> float: + return 1 + ANY_POINTER: DType = Pointer(...) @@ -328,6 +355,9 @@ def is_value_compatible(self, arg): def typehint(self) -> type[UnionType]: return self.wrapped.typehint | None + def max_size(self) -> float: + return self.wrapped.max_size() + class Tuple(DType): args: tuple[DType, ...] @@ -368,6 +398,9 @@ def is_value_compatible(self, arg): def typehint(self) -> type[tuple]: return tuple[tuple(arg.typehint for arg in self.args)] # type: ignore[misc] + def max_size(self) -> float: + return sum(arg.max_size() for arg in self.args) + 1 + class Json(DType): def __new__(cls) -> Json: @@ -389,6 +422,9 @@ def is_value_compatible(self, arg): def typehint(self) -> type[js.Json]: return js.Json + def max_size(self) -> float: + return math.inf + JSON: DType = Json() @@ -417,6 +453,9 @@ def is_value_compatible(self, arg): def typehint(self) -> type[list]: return list[self.wrapped.typehint] # type: ignore[name-defined] + def max_size(self) -> float: + return math.inf + class _DateTimeNaive(DType): def __repr__(self): @@ -438,6 +477,9 @@ def is_value_compatible(self, arg): def typehint(self) -> type[datetime_types.DateTimeNaive]: return datetime_types.DateTimeNaive + def max_size(self) -> float: + return 1 + DATE_TIME_NAIVE = _DateTimeNaive() @@ -462,6 +504,9 @@ def is_value_compatible(self, arg): def typehint(self) -> type[datetime_types.DateTimeUtc]: return datetime_types.DateTimeUtc + def max_size(self) -> float: + return 1 + DATE_TIME_UTC = _DateTimeUtc() @@ -486,6 +531,9 @@ def is_value_compatible(self, arg): def typehint(self) -> type[datetime_types.Duration]: return datetime_types.Duration + def max_size(self) -> float: + return 1 + DURATION = _Duration() @@ -514,6 +562,9 @@ def is_value_compatible(self, arg): def typehint(self) -> type[api.PyObjectWrapper]: return api.PyObjectWrapper[self.wrapped] # type: ignore[name-defined] + def max_size(self) -> float: + return math.inf + ANY_PY_OBJECT_WRAPPER: DType = PyObjectWrapper(object) diff --git a/python/pathway/internals/graph_runner/expression_evaluator.py b/python/pathway/internals/graph_runner/expression_evaluator.py index 01ed9ee7..32739736 100644 --- a/python/pathway/internals/graph_runner/expression_evaluator.py +++ b/python/pathway/internals/graph_runner/expression_evaluator.py @@ -84,44 +84,33 @@ def expression_type(self, expression: expr.ColumnExpression) -> dt.DType: def run(self, output_storage: Storage) -> api.Table: raise NotImplementedError() - def _flatten_table_storage( + def maybe_flatten_table( self, output_storage: Storage, - input_storage: Storage, ) -> api.Table: - paths = [ - input_storage.get_path(column) for column in output_storage.get_columns() - ] - - engine_flattened_table = self.scope.flatten_table_storage( - self.state.get_table(input_storage._universe), paths - ) - return engine_flattened_table + input_storage = self.state.get_storage(output_storage._universe) + table = self.state.get_table(output_storage._universe) + if output_storage is input_storage: + return table + assert output_storage.is_flat + paths = [] + for i, column in enumerate(output_storage.get_columns()): + assert output_storage.get_path(column) == ColumnPath((i,)) + paths.append(input_storage.get_path(column)) + return self.scope.flatten_table_storage(table, paths) def flatten_table_storage_if_needed(self, output_storage: Storage): if output_storage.flattened_output is not None: - flattened_storage = self._flatten_table_storage( - output_storage.flattened_output, output_storage + flattened_storage = self.maybe_flatten_table( + output_storage.flattened_output ) self.state.set_table(output_storage.flattened_output, flattened_storage) - def flatten_tables( - self, output_storage: Storage, *input_storages: Storage - ) -> tuple[api.Table, ...]: - if output_storage.flattened_inputs is not None: - assert len(input_storages) == len(output_storage.flattened_inputs) - engine_input_tables = [] - for input_storage, flattened_storage in zip( - input_storages, output_storage.flattened_inputs - ): - flattened_engine_storage = self._flatten_table_storage( - flattened_storage, input_storage - ) - engine_input_tables.append(flattened_engine_storage) - else: - engine_input_tables = [ - self.state.get_table(storage._universe) for storage in input_storages - ] + def maybe_flatten_tables(self, output_storage: Storage) -> tuple[api.Table, ...]: + engine_input_tables = [] + for flattened_storage in output_storage.maybe_flattened_inputs.values(): + flattened_engine_storage = self.maybe_flatten_table(flattened_storage) + engine_input_tables.append(flattened_engine_storage) return tuple(engine_input_tables) def _table_properties(self, storage: Storage) -> api.TableProperties: @@ -924,7 +913,11 @@ def run(self, output_storage: Storage) -> api.Table: engine_tables = self.state.get_tables(self.context.universe_dependencies()) properties = self._table_properties(output_storage) return self.scope.intersect_tables( - engine_tables[0], engine_tables[1:], properties + self.maybe_flatten_table( + output_storage.maybe_flattened_inputs["input_storage"] + ), + engine_tables[1:], + properties, ) @@ -934,8 +927,12 @@ class RestrictEvaluator(ExpressionEvaluator, context_type=clmn.RestrictContext): def run(self, output_storage: Storage) -> api.Table: properties = self._table_properties(output_storage) return self.scope.restrict_table( - self.state.get_table(self.context.orig_id_column.universe), - self.state.get_table(self.context.universe), + self.maybe_flatten_table( + output_storage.maybe_flattened_inputs["orig_storage"] + ), + self.maybe_flatten_table( + output_storage.maybe_flattened_inputs["new_storage"] + ), properties, ) @@ -946,7 +943,9 @@ class DifferenceEvaluator(ExpressionEvaluator, context_type=clmn.DifferenceConte def run(self, output_storage: Storage) -> api.Table: properties = self._table_properties(output_storage) return self.scope.subtract_table( - self.state.get_table(self.context.left.universe), + self.maybe_flatten_table( + output_storage.maybe_flattened_inputs["input_storage"] + ), self.state.get_table(self.context.right.universe), properties, ) @@ -970,12 +969,14 @@ class IxEvaluator(ExpressionEvaluator, context_type=clmn.IxContext): context: clmn.IxContext def run(self, output_storage: Storage) -> api.Table: - key_storage = self.state.get_storage(self.context.universe) + key_storage = output_storage.maybe_flattened_inputs["new_storage"] key_column_path = key_storage.get_path(self.context.key_column) properties = self._table_properties(output_storage) return self.scope.ix_table( - self.state.get_table(self.context.orig_id_column.universe), - self.state.get_table(self.context.universe), + self.maybe_flatten_table( + output_storage.maybe_flattened_inputs["orig_storage"] + ), + self.maybe_flatten_table(key_storage), key_column_path, optional=self.context.optional, strict=True, @@ -991,8 +992,12 @@ class PromiseSameUniverseEvaluator( def run(self, output_storage: Storage) -> api.Table: properties = self._table_properties(output_storage) return self.scope.override_table_universe( - self.state.get_table(self.context.orig_id_column.universe), - self.state.get_table(self.context.universe), + self.maybe_flatten_table( + output_storage.maybe_flattened_inputs["orig_storage"] + ), + self.maybe_flatten_table( + output_storage.maybe_flattened_inputs["new_storage"] + ), properties, ) @@ -1003,15 +1008,15 @@ class PromiseSameUniverseAsOfNowEvaluator( context: clmn.PromiseSameUniverseContext def run(self, output_storage: Storage) -> api.Table: - orig_universe = self.context.orig_id_column.universe + orig_storage = output_storage.maybe_flattened_inputs["orig_storage"] original_table_with_forgetting = self.scope.forget_immediately( - self.state.get_table(orig_universe), - self._table_properties(self.state.get_storage(orig_universe)), + self.maybe_flatten_table(orig_storage), + self._table_properties(orig_storage), ) - destination_universe = self.context.universe + new_universe_storage = output_storage.maybe_flattened_inputs["new_storage"] destination_universe_table_with_forgetting = self.scope.forget_immediately( - self.state.get_table(destination_universe), - self._table_properties(self.state.get_storage(destination_universe)), + self.maybe_flatten_table(new_universe_storage), + self._table_properties(new_universe_storage), ) properties = self._table_properties(output_storage) @@ -1034,7 +1039,9 @@ def run(self, output_storage: Storage) -> api.Table: key_column_path = key_storage.get_path(self.context.key_column) properties = self._table_properties(output_storage) return self.scope.ix_table( - self.state.get_table(self.context.orig_id_column.universe), + self.maybe_flatten_table( + output_storage.maybe_flattened_inputs["input_storage"] + ), self.state.get_table(key_storage._universe), key_column_path, optional=False, @@ -1046,38 +1053,10 @@ def run(self, output_storage: Storage) -> api.Table: class JoinEvaluator(ExpressionEvaluator, context_type=clmn.JoinContext): context: clmn.JoinContext - def get_join_storage( - self, - universe: univ.Universe, - left_input_storage: Storage, - right_input_storage: Storage, - ) -> Storage: - left_id_storage = Storage( - self.context.left_table._universe, - { - self.context.left_table._id_column: ColumnPath.EMPTY, - }, - ) - right_id_storage = Storage( - self.context.right_table._universe, - { - self.context.right_table._id_column: ColumnPath.EMPTY, - }, - ) - return Storage.merge_storages( - universe, - left_id_storage, - left_input_storage.remove_columns_from_table(self.context.right_table), - right_id_storage, - right_input_storage.restrict_to_table(self.context.right_table), - ) - - def run_join(self, universe: univ.Universe) -> None: - left_input_storage = self.state.get_storage(self.context.left_table._universe) - right_input_storage = self.state.get_storage(self.context.right_table._universe) - output_storage = self.get_join_storage( - universe, left_input_storage, right_input_storage - ) + def run_join(self, universe: univ.Universe, output_storage: Storage) -> None: + left_input_storage = output_storage.maybe_flattened_inputs["left_storage"] + right_input_storage = output_storage.maybe_flattened_inputs["right_storage"] + join_storage = output_storage.maybe_flattened_inputs["join_storage"] left_paths = [ left_input_storage.get_path(column) for column in self.context.on_left.columns @@ -1086,10 +1065,10 @@ def run_join(self, universe: univ.Universe) -> None: right_input_storage.get_path(column) for column in self.context.on_right.columns ] - properties = self._table_properties(output_storage) + properties = self._table_properties(join_storage) output_engine_table = self.scope.join_tables( - self.state.get_table(left_input_storage._universe), - self.state.get_table(right_input_storage._universe), + self.maybe_flatten_table(left_input_storage), + self.maybe_flatten_table(right_input_storage), left_paths, right_paths, last_column_is_instance=self.context.last_column_is_instance, @@ -1098,10 +1077,10 @@ def run_join(self, universe: univ.Universe) -> None: left_ear=self.context.left_ear, right_ear=self.context.right_ear, ) - self.state.set_table(output_storage, output_engine_table) + self.state.set_table(join_storage, output_engine_table) def run(self, output_storage: Storage) -> api.Table: - self.run_join(self.context.universe) + self.run_join(self.context.universe, output_storage) rowwise_evaluator = RowwiseEvaluator( clmn.RowwiseContext(self.context.id_column), self.scope, @@ -1258,8 +1237,7 @@ class UpdateRowsEvaluator(ExpressionEvaluator, context_type=clmn.UpdateRowsConte context: clmn.UpdateRowsContext def run(self, output_storage: Storage) -> api.Table: - input_storages = self.state.get_storages(self.context.universe_dependencies()) - engine_input_tables = self.flatten_tables(output_storage, *input_storages) + engine_input_tables = self.maybe_flatten_tables(output_storage) [input_table, update_input_table] = engine_input_tables properties = self._table_properties(output_storage) return self.scope.update_rows_table(input_table, update_input_table, properties) @@ -1269,28 +1247,26 @@ class UpdateCellsEvaluator(ExpressionEvaluator, context_type=clmn.UpdateCellsCon context: clmn.UpdateCellsContext def run(self, output_storage: Storage) -> api.Table: - input_storage = self.state.get_storage(self.context.left.universe) - update_input_storage = self.state.get_storage(self.context.right.universe) + left_storage = output_storage.maybe_flattened_inputs["left_storage"] + right_storage = output_storage.maybe_flattened_inputs["right_storage"] paths = [] update_paths = [] for column in output_storage.get_columns(): - if column in input_storage.get_columns(): + if column in left_storage.get_columns(): continue assert isinstance(column, clmn.ColumnWithReference) if column.expression.name in self.context.updates: - paths.append(input_storage.get_path(column.expression._column)) + paths.append(left_storage.get_path(column.expression._column)) update_paths.append( - update_input_storage.get_path( - self.context.updates[column.expression.name] - ) + right_storage.get_path(self.context.updates[column.expression.name]) ) properties = self._table_properties(output_storage) return self.scope.update_cells_table( - self.state.get_table(input_storage._universe), - self.state.get_table(update_input_storage._universe), + self.maybe_flatten_table(left_storage), + self.maybe_flatten_table(right_storage), paths, update_paths, properties, @@ -1301,8 +1277,7 @@ class ConcatUnsafeEvaluator(ExpressionEvaluator, context_type=clmn.ConcatUnsafeC context: clmn.ConcatUnsafeContext def run(self, output_storage: Storage) -> api.Table: - input_storages = self.state.get_storages(self.context.universe_dependencies()) - engine_input_tables = self.flatten_tables(output_storage, *input_storages) + engine_input_tables = self.maybe_flatten_tables(output_storage) properties = self._table_properties(output_storage) engine_table = self.scope.concat_tables(engine_input_tables, properties) return engine_table diff --git a/python/pathway/internals/graph_runner/path_evaluator.py b/python/pathway/internals/graph_runner/path_evaluator.py index 64207439..c8f88e85 100644 --- a/python/pathway/internals/graph_runner/path_evaluator.py +++ b/python/pathway/internals/graph_runner/path_evaluator.py @@ -3,6 +3,7 @@ from __future__ import annotations import itertools +import math from abc import ABC, abstractmethod from collections.abc import Iterable from typing import ClassVar @@ -12,6 +13,7 @@ import pathway.internals.operator as op from pathway.internals.column_path import ColumnPath from pathway.internals.graph_runner.path_storage import Storage +from pathway.internals.helpers import StableSet from pathway.internals.universe import Universe @@ -33,7 +35,34 @@ def compute_paths( raise ValueError( f"Operator {operator} in update_storage() but it shouldn't produce tables." ) - return evaluator.compute(output_columns, input_storages) + output_columns = list(output_columns) + return evaluator.compute(output_columns, input_storages).restrict_to( + output_columns, require_all=True + ) + + +def maybe_flatten_input_storage( + storage: Storage, columns: Iterable[clmn.Column] +) -> Storage: + columns = StableSet(columns) + paths = set() + for column in columns: + paths.add(storage.get_path(column)) + + removable_size = 0.0 + for path, column in storage.get_all_columns(): + # Using get_all_columns (instead of get_columns) is needed to keep track of columns + # that are not in use but still present in the tuple. Only if we are aware of all + # fields in a tuple, we can make a conscious decision whether we want to flatten + # a tuple (and remove columns) or not. + if path not in paths: + removable_size += column.dtype.max_size() + if math.isinf(removable_size) or removable_size > len(columns): + # if we can remove potentially large column or removable_size is greater than + # the number of columns we keep (equal to the flattening cost), then flatten the storage + return Storage.flat(storage._universe, columns) + else: + return storage class PathEvaluator(ABC): @@ -100,7 +129,7 @@ class AddNewColumnsPathEvaluator( ): def compute_if_all_new_are_references( self, - output_columns: Iterable[clmn.Column], + output_columns: list[clmn.Column], input_storage: Storage, ) -> Storage | None: paths = {} @@ -115,27 +144,27 @@ def compute_if_all_new_are_references( paths[column] = input_storage.get_path(column.expression._column) else: return None - return Storage(self.context.universe, paths, has_only_references=True) + return input_storage.with_updated_paths(paths).with_only_references() def compute_if_old_are_not_required( self, - output_columns: Iterable[clmn.Column], + output_columns: list[clmn.Column], input_storage: Storage, ) -> Storage | None: - paths = {} - for i, column in enumerate(output_columns): + for column in output_columns: if input_storage.has_column(column): return None - else: - paths[column] = ColumnPath((i,)) - return Storage(self.context.universe, paths, has_only_new_columns=True) + return Storage.flat( + self.context.universe, output_columns + ).with_only_new_columns() def compute( self, output_columns: Iterable[clmn.Column], input_storages: dict[Universe, Storage], ) -> Storage: - input_storage = input_storages.get(self.context.universe) + input_storage = input_storages[self.context.universe] + output_columns = list(output_columns) if input_storage is not None and isinstance(self.context, clmn.RowwiseContext): maybe_storage = self.compute_if_all_new_are_references( output_columns, input_storage @@ -146,14 +175,14 @@ def compute( ) if maybe_storage is not None: return maybe_storage - paths = {} + paths: dict[clmn.Column, ColumnPath] = {} counter = itertools.count(start=1) for column in output_columns: - if input_storage is not None and input_storage.has_column(column): - paths[column] = (0,) + input_storage.get_path(column) - else: + if not input_storage.has_column(column): paths[column] = ColumnPath((next(counter),)) - return Storage(self.context.universe, paths) + return input_storage.with_prefix((0,)).with_updated_paths( + paths, universe=self.context.universe + ) class SortingPathEvaluator(PathEvaluator, context_types=[clmn.SortingContext]): @@ -165,15 +194,12 @@ def compute( input_storages: dict[Universe, Storage], ) -> Storage: input_storage = input_storages[self.context.universe] - paths = {} - for column in output_columns: - if column == self.context.prev_column: - paths[column] = ColumnPath((1,)) - elif column == self.context.next_column: - paths[column] = ColumnPath((2,)) - else: - paths[column] = (0,) + input_storage.get_path(column) - return Storage(self.context.universe, paths) + return Storage.merge_storages( + self.context.universe, + input_storage, + Storage.one_column_storage(self.context.prev_column), + Storage.one_column_storage(self.context.next_column), + ) class NoNewColumnsMultipleSourcesPathEvaluator( @@ -209,28 +235,36 @@ def compute( else: updates = (context.updates,) - if not keep_structure: - names = [] - source_columns: list[list[clmn.Column]] = [[]] - for column in output_columns_list: - assert isinstance(column, clmn.ColumnWithExpression) - assert isinstance(column.expression, expr.ColumnReference) - names.append(column.expression.name) - source_columns[0].append(column.dereference()) - for columns in updates: - source_columns.append([columns[name] for name in names]) - - flattened_inputs = [] - assert len(list(self.context.universe_dependencies())) == len( - source_columns - ) + names = [] + source_columns: list[list[clmn.Column]] = [[]] + for column in output_columns_list: + assert isinstance(column, clmn.ColumnWithExpression) + assert isinstance(column.expression, expr.ColumnReference) + names.append(column.expression.name) + source_columns[0].append(column.dereference()) + for columns in updates: + source_columns.append([columns[name] for name in names]) + + if keep_structure and isinstance(context, clmn.UpdateRowsContext): for universe, cols in zip( - self.context.universe_dependencies(), source_columns + self.context.universe_dependencies(), source_columns, strict=True ): + input_storage = input_storages[universe] + maybe_flat_storage = maybe_flatten_input_storage(input_storage, cols) + if maybe_flat_storage is not input_storage: + keep_structure = False + break + + flattened_inputs = {} + for i, (universe, cols) in enumerate( + zip(self.context.universe_dependencies(), source_columns, strict=True) + ): + if keep_structure: + flattened_storage = input_storages[universe] + else: flattened_storage = Storage.flat(universe, cols) - flattened_inputs.append(flattened_storage) - else: - flattened_inputs = None + + flattened_inputs[f"{i}"] = flattened_storage evaluator: PathEvaluator if keep_structure: @@ -243,15 +277,12 @@ def compute( {source_universe: input_storages[source_universe]}, ) - return storage.with_flattened_inputs(flattened_inputs) + return storage.with_maybe_flattened_inputs(flattened_inputs) NoNewColumnsContext = ( clmn.FilterContext | clmn.ReindexContext - | clmn.IntersectContext - | clmn.DifferenceContext - | clmn.HavingContext | clmn.ForgetContext | clmn.ForgetImmediatelyContext | clmn.FilterOutForgettingContext @@ -269,9 +300,6 @@ class NoNewColumnsPathEvaluator( context_types=[ clmn.FilterContext, clmn.ReindexContext, - clmn.IntersectContext, - clmn.DifferenceContext, - clmn.HavingContext, clmn.ForgetContext, clmn.ForgetImmediatelyContext, clmn.FilterOutForgettingContext, @@ -297,54 +325,190 @@ def compute( ): source_column = column.expression._column paths[column] = input_storage.get_path(source_column) + return input_storage.with_updated_paths(paths, universe=self.context.universe) + + +class NoNewColumnsWithDataStoredPathEvaluator( + PathEvaluator, + context_types=[ + clmn.IntersectContext, + clmn.DifferenceContext, + clmn.HavingContext, + ], +): + context: clmn.IntersectContext | clmn.DifferenceContext | clmn.HavingContext + + def compute( + self, + output_columns: Iterable[clmn.Column], + input_storages: dict[Universe, Storage], + ) -> Storage: + input_storage = input_storages[self.context.input_universe()] + required_columns: StableSet[clmn.Column] = StableSet() + for column in output_columns: + if ( + isinstance(column, clmn.ColumnWithReference) + and column.context == self.context + ): + required_columns.add(column.expression._column) else: # column from the same universe, but not the current table - paths[column] = input_storage.get_path(column) - return Storage(self.context.universe, paths) + required_columns.add(column) + input_storage = maybe_flatten_input_storage(input_storage, required_columns) + paths: dict[clmn.Column, ColumnPath] = {} + for column in output_columns: + if ( + isinstance(column, clmn.ColumnWithReference) + and column.context == self.context + ): + source_column = column.expression._column + paths[column] = input_storage.get_path(source_column) + return input_storage.with_updated_paths( + paths, universe=self.context.universe + ).with_maybe_flattened_inputs({"input_storage": input_storage}) class UpdateCellsPathEvaluator(PathEvaluator, context_types=[clmn.UpdateCellsContext]): context: clmn.UpdateCellsContext + def maybe_flatten_input_storages( + self, + output_columns: Iterable[clmn.Column], + input_storages: dict[Universe, Storage], + ) -> tuple[Storage, Storage]: + input_storage = input_storages[self.context.universe] + left_columns: StableSet[clmn.Column] = StableSet() + right_columns: StableSet[clmn.Column] = StableSet() + for column in output_columns: + if input_storage.has_column(column): + left_columns.add(column) + else: + assert isinstance(column, clmn.ColumnWithReference) + left_columns.add(column.expression._column) + if column.expression.name in self.context.updates: + right_columns.add(self.context.updates[column.expression.name]) + left_storage = maybe_flatten_input_storage(input_storage, left_columns) + right_storage = maybe_flatten_input_storage( + input_storages[self.context.right.universe], right_columns + ) + + return (left_storage, right_storage) + def compute( self, output_columns: Iterable[clmn.Column], input_storages: dict[Universe, Storage], ) -> Storage: - input_storage = input_storages[self.context.universe] + left_storage, right_storage = self.maybe_flatten_input_storages( + output_columns, input_storages + ) + prefixed_left_storage = left_storage.with_prefix((0,)) counter = itertools.count(start=1) - paths = {} + paths: dict[clmn.Column, ColumnPath] = {} for column in output_columns: - if column in input_storage.get_columns(): - paths[column] = (0,) + input_storage.get_path(column) - elif ( + if ( isinstance(column, clmn.ColumnWithReference) - and column.expression.name not in self.context.updates + and column.context == self.context ): - source_column = column.expression._column - paths[column] = (0,) + input_storage.get_path(source_column) - else: - paths[column] = ColumnPath((next(counter),)) - return Storage(self.context.universe, paths) + if column.expression.name in self.context.updates: + paths[column] = ColumnPath((next(counter),)) + else: + source_column = column.expression._column + paths[column] = prefixed_left_storage.get_path(source_column) + return prefixed_left_storage.with_updated_paths( + paths + ).with_maybe_flattened_inputs( + {"left_storage": left_storage, "right_storage": right_storage} + ) class JoinPathEvaluator(PathEvaluator, context_types=[clmn.JoinContext]): context: clmn.JoinContext + def maybe_flatten_input_storages( + self, + output_columns: Iterable[clmn.Column], + input_storages: dict[Universe, Storage], + ) -> tuple[Storage, Storage]: + exclusive_right_columns = list( + itertools.chain( + self.context.right_table._columns.values(), + self.context.on_right.columns, + ) + ) + left_input_storage = input_storages[self.context.left_table._universe].remove( + exclusive_right_columns + ) + right_input_storage = input_storages[ + self.context.right_table._universe + ].restrict_to(exclusive_right_columns) + + required_input_columns: list[clmn.Column] = [] + for column in itertools.chain( + output_columns, self.context.column_dependencies() + ): + if ( + isinstance(column, clmn.ColumnWithExpression) + and column.context == self.context + ): + required_input_columns.extend(column.column_dependencies()) + else: + required_input_columns.append(column) + + left_columns: StableSet[clmn.Column] = StableSet() + right_columns: StableSet[clmn.Column] = StableSet() + for column in required_input_columns: + if left_input_storage.has_column(column): + left_columns.add(column) + else: + assert right_input_storage.has_column(column) + right_columns.add(column) + left_input_storage = maybe_flatten_input_storage( + input_storages[self.context.left_table._universe], left_columns + ) + right_input_storage = maybe_flatten_input_storage( + input_storages[self.context.right_table._universe], right_columns + ) + return (left_input_storage, right_input_storage) + + def merge_storages(self, left_storage: Storage, right_storage: Storage) -> Storage: + left_id_storage = Storage.one_column_storage(self.context.left_table._id_column) + right_id_storage = Storage.one_column_storage( + self.context.right_table._id_column + ) + right_columns = list(self.context.right_table._columns.values()) + return Storage.merge_storages( + self.context.universe, + left_id_storage, + left_storage.remove(right_columns), + right_id_storage, + right_storage.restrict_to(right_columns), + ) + def compute( self, output_columns: Iterable[clmn.Column], input_storages: dict[Universe, Storage], ) -> Storage: + output_columns = list(output_columns) + left_input_storage, right_input_storage = self.maybe_flatten_input_storages( + output_columns, input_storages + ) + join_storage = self.merge_storages(left_input_storage, right_input_storage) if self.context.assign_id: - left_universe = self.context.left_table._universe - input_storage = input_storages[left_universe] - return AddNewColumnsPathEvaluator(self.context).compute( - output_columns, {left_universe: input_storage} + output_storage = AddNewColumnsPathEvaluator(self.context).compute( + output_columns, {self.context.universe: left_input_storage} ) else: - return FlatStoragePathEvaluator(self.context).compute( - output_columns, input_storages + output_storage = FlatStoragePathEvaluator(self.context).compute( + output_columns, {} ) + return output_storage.with_maybe_flattened_inputs( + { + "left_storage": left_input_storage, + "right_storage": right_input_storage, + "join_storage": join_storage, + } + ) class FlattenPathEvaluator(PathEvaluator, context_types=[clmn.FlattenContext]): @@ -355,7 +519,9 @@ def compute( output_columns: Iterable[clmn.Column], input_storages: dict[Universe, Storage], ) -> Storage: - input_storage = input_storages[self.context.orig_universe] + prefixed_input_storage = input_storages[self.context.orig_universe].with_prefix( + (0,) + ) paths = {} for column in output_columns: if column == self.context.flatten_result_column: @@ -363,8 +529,10 @@ def compute( else: assert isinstance(column, clmn.ColumnWithReference) original_column = column.expression._column - paths[column] = (0,) + input_storage.get_path(original_column) - return Storage(self.context.universe, paths) + paths[column] = prefixed_input_storage.get_path(original_column) + return prefixed_input_storage.with_updated_paths( + paths, universe=self.context.universe + ) class PromiseSameUniversePathEvaluator( @@ -388,15 +556,36 @@ def compute( output_columns: Iterable[clmn.Column], input_storages: dict[Universe, Storage], ) -> Storage: - paths: dict[clmn.Column, ColumnPath] = {} - orig_storage = input_storages[self.context.orig_id_column.universe] - new_storage = input_storages[self.context.universe] + orig_storage_columns: StableSet[clmn.Column] = StableSet() + newly_created_columns: StableSet[clmn.ColumnWithReference] = StableSet() + new_storage_columns: StableSet[clmn.Column] = StableSet() for column in output_columns: if ( isinstance(column, clmn.ColumnWithReference) and column.context == self.context ): - paths[column] = (1,) + orig_storage.get_path(column.expression._column) + newly_created_columns.add(column) + orig_storage_columns.add(column.expression._column) else: - paths[column] = (0,) + new_storage.get_path(column) - return Storage(self.context.universe, paths) + new_storage_columns.add(column) + + if isinstance(self.context, clmn.IxContext): + new_storage_columns.add(self.context.key_column) + + orig_storage = input_storages[self.context.orig_id_column.universe] + orig_storage = maybe_flatten_input_storage(orig_storage, orig_storage_columns) + new_storage = input_storages[self.context.universe] + new_storage = maybe_flatten_input_storage(new_storage, new_storage_columns) + + paths: dict[clmn.Column, ColumnPath] = {} + for column in newly_created_columns: + paths[column] = (1,) + orig_storage.get_path(column.expression._column) + for column in new_storage_columns: + paths[column] = (0,) + new_storage.get_path(column) + return ( + Storage.merge_storages(self.context.universe, new_storage, orig_storage) + .with_updated_paths(paths) + .with_maybe_flattened_inputs( + {"orig_storage": orig_storage, "new_storage": new_storage} + ) + ) diff --git a/python/pathway/internals/graph_runner/path_storage.py b/python/pathway/internals/graph_runner/path_storage.py index c7e268b0..65a707f7 100644 --- a/python/pathway/internals/graph_runner/path_storage.py +++ b/python/pathway/internals/graph_runner/path_storage.py @@ -9,7 +9,6 @@ from pathway.internals.column import Column, IdColumn from pathway.internals.column_path import ColumnPath -from pathway.internals.table import Table from pathway.internals.universe import Universe @@ -17,14 +16,44 @@ class Storage: _universe: Universe _column_paths: dict[Column, ColumnPath] - flattened_inputs: list[Storage] | None = None + """Paths to columns that are needed downstream. Not all columns in a tuple are present there.""" + + _all_columns: dict[ColumnPath, Column] + """Columns at a given path, all columns present in a tuple are there. Used to + evaluate the total memory usage of a tuple or stream properties (append-only). + """ + + maybe_flattened_inputs: dict[str, Storage] = dataclasses.field(default_factory=dict) flattened_output: Storage | None = None has_only_references: bool = False has_only_new_columns: bool = False + is_flat: bool = False + + def __post_init__(self) -> None: + for path in self._column_paths.values(): + assert path in self._all_columns + + @classmethod + def new( + cls, universe: Universe, column_paths: dict[Column, ColumnPath], **kwargs + ) -> Storage: + return cls( + universe, + column_paths, + {path: column for column, path in column_paths.items()}, + **kwargs, + ) + + @classmethod + def one_column_storage(cls, column: Column) -> Storage: + return cls.new(column.universe, {column: ColumnPath.EMPTY}) def get_columns(self) -> Iterable[Column]: return self._column_paths.keys() + def get_all_columns(self) -> Iterable[tuple[ColumnPath, Column]]: + return self._all_columns.items() + def has_column(self, column: Column) -> bool: return column in self._column_paths or ( isinstance(column, IdColumn) and column.universe == self._universe @@ -42,30 +71,48 @@ def get_path(self, column: Column) -> ColumnPath: def max_depth(self) -> int: return max((len(path) for path in self._column_paths.values()), default=0) - def validate(self) -> None: - assert len(set(self._column_paths.values())) == len( - self._column_paths - ), "Some columns have the same path." - - def with_updated_paths(self, paths: dict[Column, ColumnPath]) -> Storage: - return dataclasses.replace(self, _column_paths=self._column_paths | paths) + def with_updated_paths( + self, paths: dict[Column, ColumnPath], *, universe: Universe | None = None + ) -> Storage: + if universe is None: + universe = self._universe + new_column_paths = self._column_paths | paths + new_all_columns = self._all_columns | { + path: column for column, path in paths.items() + } + return Storage(universe, new_column_paths, new_all_columns) def with_flattened_output(self, storage: Storage) -> Storage: return dataclasses.replace(self, flattened_output=storage) - def with_flattened_inputs(self, storages: list[Storage] | None = None) -> Storage: - return dataclasses.replace(self, flattened_inputs=storages) + def with_maybe_flattened_inputs(self, storages: dict[str, Storage]) -> Storage: + return dataclasses.replace(self, maybe_flattened_inputs=storages) + + def with_only_references(self) -> Storage: + return dataclasses.replace(self, has_only_references=True) + + def with_only_new_columns(self) -> Storage: + return dataclasses.replace(self, has_only_new_columns=True) - def restrict_to_table(self, table: Table) -> Storage: - table_columns = set(table._columns.values()) + def restrict_to( + self, columns: Iterable[Column], require_all: bool = False + ) -> Storage: + table_columns = set(columns) new_column_paths = {} for column, path in self._column_paths.items(): if column in table_columns: new_column_paths[column] = path + if require_all: + assert len(table_columns) == len(new_column_paths) + assert all(column.universe == self._universe for column in new_column_paths) + # restrict_to doesn't change the _all_columns because these columns are still present in a tuple. + # It only affects _column_paths which is used to set which columns are visible + # externally (from methods like get_path). + # restrict_to performs no operation on the underlying data return dataclasses.replace(self, _column_paths=new_column_paths) - def remove_columns_from_table(self, table: Table) -> Storage: - table_columns = set(table._columns.values()) + def remove(self, columns: Iterable[Column]) -> Storage: + table_columns = set(columns) new_column_paths = {} for column, path in self._column_paths.items(): if column not in table_columns: @@ -75,11 +122,14 @@ def remove_columns_from_table(self, table: Table) -> Storage: @classmethod def merge_storages(cls, universe: Universe, *storages: Storage) -> Storage: column_paths = {} + all_columns = {} for i, storage in enumerate(storages): for column, path in storage._column_paths.items(): assert path is not None column_paths[column] = (i,) + path - return cls(universe, column_paths) + for path, column in storage._all_columns.items(): + all_columns[(i,) + path] = column + return cls(universe, column_paths, all_columns) @classmethod def flat( @@ -88,4 +138,15 @@ def flat( paths = {} for i, column in enumerate(columns): paths[column] = ColumnPath((i + shift,)) - return cls(universe, paths) + return cls.new(universe, paths, is_flat=True) + + def with_prefix(self, prefix: tuple[int, ...]) -> Storage: + new_paths = {} + for column, path in self._column_paths.items(): + new_paths[column] = prefix + path + all_columns = {} + for path, column in self._all_columns.items(): + all_columns[prefix + path] = column + return Storage( + self._universe, _column_paths=new_paths, _all_columns=all_columns + ) diff --git a/python/pathway/tests/test_column_paths.py b/python/pathway/tests/test_column_paths.py new file mode 100644 index 00000000..76658428 --- /dev/null +++ b/python/pathway/tests/test_column_paths.py @@ -0,0 +1,167 @@ +from pathway.internals import column, dtype as dt +from pathway.internals.column_path import ColumnPath +from pathway.internals.column_properties import ColumnProperties +from pathway.internals.graph_runner.path_evaluator import maybe_flatten_input_storage +from pathway.internals.graph_runner.path_storage import Storage + + +def assert_storages_have_same_paths(a: Storage, b: Storage) -> None: + assert set(a.get_columns()) == set(b.get_columns()) + for col in a.get_columns(): + assert a.get_path(col) == b.get_path(col) + + +def test_removal_of_potentially_big_columns_from_storage_1(): + universe = column.Universe() + str_column: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.STR) + ) + int_column: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.INT) + ) + input_storage = Storage.new( + universe, + { + str_column: ColumnPath((0,)), + int_column: ColumnPath((1,)), + }, + ) + maybe_flat_storage = maybe_flatten_input_storage(input_storage, [int_column]) + expected_storage = Storage.new( + universe, {int_column: ColumnPath((0,))}, is_flat=True + ) + assert_storages_have_same_paths(maybe_flat_storage, expected_storage) + + +def test_removal_of_potentially_big_columns_from_storage_2(): + universe = column.Universe() + int_column_1: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.INT) + ) + int_column_2: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.INT) + ) + int_column_3: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.INT) + ) + input_storage = Storage.new( + universe, + { + int_column_1: ColumnPath((0,)), + int_column_2: ColumnPath((1,)), + int_column_3: ColumnPath((2,)), + }, + ) + maybe_flat_storage = maybe_flatten_input_storage(input_storage, [int_column_1]) + expected_storage = Storage.new( + universe, {int_column_1: ColumnPath((0,))}, is_flat=True + ) + assert_storages_have_same_paths(maybe_flat_storage, expected_storage) + + +def test_removal_of_potentially_big_columns_from_storage_3(): + universe = column.Universe() + int_column_1: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.INT) + ) + int_column_2: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.INT) + ) + input_storage = Storage.new( + universe, + { + int_column_1: ColumnPath((0,)), + int_column_2: ColumnPath((1,)), + }, + ) + maybe_flat_storage = maybe_flatten_input_storage(input_storage, [int_column_1]) + expected_storage = Storage.new( + universe, + { + int_column_1: ColumnPath((0,)), + int_column_2: ColumnPath((1,)), + }, + ) + assert_storages_have_same_paths(maybe_flat_storage, expected_storage) + + +def test_removal_of_potentially_big_columns_from_storage_4(): + universe = column.Universe() + str_column: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.STR) + ) + int_column: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.INT) + ) + input_storage = Storage.new( + universe, + { + str_column: ColumnPath((0,)), + int_column: ColumnPath((1,)), + }, + ).restrict_to([int_column]) + # make sure also works on restricted version + # str_column is still in the tuple structure + maybe_flat_storage = maybe_flatten_input_storage(input_storage, [int_column]) + expected_storage = Storage.new( + universe, {int_column: ColumnPath((0,))}, is_flat=True + ) + assert_storages_have_same_paths(maybe_flat_storage, expected_storage) + + +def test_removal_of_potentially_big_columns_from_storage_5(): + universe = column.Universe() + str_column_1: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.STR) + ) + str_column_2: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.STR) + ) + int_column: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.INT) + ) + input_storage = Storage.new( + universe, + { + str_column_1: ColumnPath((0,)), + str_column_2: ColumnPath((1,)), + int_column: ColumnPath((2,)), + }, + ) + maybe_flat_storage = maybe_flatten_input_storage( + input_storage, [int_column, str_column_2] + ) + expected_storage = Storage.new( + universe, + {int_column: ColumnPath((0,)), str_column_2: ColumnPath((1,))}, + is_flat=True, + ) + assert_storages_have_same_paths(maybe_flat_storage, expected_storage) + + +def test_removal_of_potentially_big_columns_from_storage_6(): + universe = column.Universe() + str_column_1: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.STR) + ) + str_column_2: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.STR) + ) + int_column: column.Column = column.MaterializedColumn( + universe=universe, properties=ColumnProperties(dtype=dt.INT) + ) + input_storage = Storage.new( + universe, + { + str_column_1: ColumnPath((0,)), + str_column_2: ColumnPath((0,)), + int_column: ColumnPath((2,)), + }, + ) + # two columns with the same path one can be a reference to the other + # expect no flatten as all fields are kept + maybe_flat_storage = maybe_flatten_input_storage( + input_storage, [int_column, str_column_2] + ) + expected_storage = input_storage + assert_storages_have_same_paths(maybe_flat_storage, expected_storage) diff --git a/src/engine/dataflow.rs b/src/engine/dataflow.rs index f3876cda..2e22ec95 100644 --- a/src/engine/dataflow.rs +++ b/src/engine/dataflow.rs @@ -2169,7 +2169,7 @@ impl DataflowGraphInner { }); let error_reporter = self.error_reporter.clone(); - let values_to_keys_arranged: ArrangedByKey = match ix_key_policy { + let values_to_keys = match ix_key_policy { IxKeyPolicy::FailMissing => key_table_extracted.map_named( "ix_table unwrapping pointers", move |(key, (values, value))| { @@ -2185,16 +2185,22 @@ impl DataflowGraphInner { Some((pointer, (key, values))) } }), - } - .arrange(); - let new_table = match ix_key_policy { - IxKeyPolicy::SkipMissing => values_to_keys_arranged.join_core( + }; + let new_table = if ix_key_policy == IxKeyPolicy::SkipMissing { + let valued_to_keys_arranged: ArrangedByKey = values_to_keys + .map_named( + "ix_skip_missing_arrange_keys", + |(source_key, (result_key, _result_value))| (source_key, result_key), + ) + .arrange(); + valued_to_keys_arranged.join_core( to_ix_table.values_arranged(), - |_source_key, (result_key, _result_row), to_ix_row| { - once((*result_key, to_ix_row.clone())) - }, - ), - _ => values_to_keys_arranged.join_core( + |_source_key, result_key, to_ix_row| once((*result_key, to_ix_row.clone())), + ) + } else { + let values_to_keys_arranged: ArrangedByKey = + values_to_keys.arrange(); + values_to_keys_arranged.join_core( to_ix_table.values_arranged(), |_source_key, (result_key, result_row), to_ix_row| { once(( @@ -2202,7 +2208,7 @@ impl DataflowGraphInner { Value::from([result_row.clone(), to_ix_row.clone()].as_slice()), )) }, - ), + ) }; let new_table = match ix_key_policy { IxKeyPolicy::ForwardNone => {