Skip to content

Commit

Permalink
Bug fixed: unmatched types. (#430)
Browse files Browse the repository at this point in the history
  • Loading branch information
ATATC committed Oct 3, 2024
1 parent 72a05e0 commit 376f2e8
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
1 change: 0 additions & 1 deletion leads/data_persistence/_computational/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
minimum: type[_np.min] = _np.min
maximum: type[_np.max] = _np.max
diff: type[_np.diff] = _np.diff
sum_up: type[_np.sum] = _np.sum

read_csv: type[_pandas.read_csv] = _pandas.read_csv
DataFrame: type[_pandas.DataFrame] = _pandas.DataFrame
Expand Down
16 changes: 9 additions & 7 deletions leads/data_persistence/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@
from leads.types import Compressor as _Compressor, VisualHeader as _VisualHeader, \
VisualHeaderFull as _VisualHeaderFull, DefaultHeaderFull as _DefaultHeaderFull, DefaultHeader as _DefaultHeader
from ._computational import array as _array, norm as _norm, read_csv as _read_csv, DataFrame as _DataFrame, \
TextFileReader as _TextFileReader, sum_up as _sum_up, diff as _diff
TextFileReader as _TextFileReader, diff as _diff, ndarray as _ndarray

T = _TypeVar("T", bound=_SupportsFloat)


def weighed_sum(elements: _Sequence[T], indexes: _Sequence[float], a: int = 0, b: int | None = None) -> T:
return _sum_up(_array(elements[a: b]) * _diff(_array(indexes[a: b + 1]))) if b else (_sum_up(_array(
elements[a: -1]) * _diff(_array(indexes[a:]))) + elements[-1])
def weighed_sum(elements: tuple[T, ...], indexes: tuple[float, ...], a: int = 0, b: int | None = None) -> T:
e = _array(elements[a: b if b else -1])
w = _diff(_array(indexes[a: b + 1] if b else indexes[a:] + (1,)))
if len(e.shape) > 1:
w = w.reshape((-1, 1))
return Vector(*tuple(s)) if isinstance(s := (e * w).sum(0, keepdims=True)[0], _ndarray) else float(s)


def weighed_mean(elements: _Sequence[T], indexes: _Sequence[float], a: int = 0, b: int | None = None) -> T:
def weighed_mean(elements: tuple[T, ...], indexes: tuple[float, ...], a: int = 0, b: int | None = None) -> T:
return weighed_sum(elements, indexes, a, b) / (indexes[b] - indexes[a]) if b else weighed_sum(
elements, indexes, a, b) / (indexes[-1] - indexes[a] + 1)

Expand All @@ -37,8 +40,7 @@ def mean_compressor(sequence: dict[T, float], target_size: int) -> dict[T, float
return sequence
r = {weighed_mean(elements, indexes, i * chunk_size, (i + 1) * chunk_size): indexes[i * chunk_size] for i in range(
target_size - 1)}
k = weighed_mean(elements, indexes, (target_size - 1) * chunk_size)
r[Vector(*k) if isinstance(k, Vector) else k] = indexes[(target_size - 1) * chunk_size]
r[weighed_mean(elements, indexes, (target_size - 1) * chunk_size)] = indexes[(target_size - 1) * chunk_size]
return r


Expand Down

0 comments on commit 376f2e8

Please sign in to comment.