Skip to content

Commit

Permalink
feat: add better error handling, improved iter and call
Browse files Browse the repository at this point in the history
  • Loading branch information
flavioschneider committed Jun 7, 2023
1 parent 466e474 commit c9c8184
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 77 deletions.
60 changes: 20 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ for item in pipe:

_Iterate over the pipeline one item at a time_
```py
next(pipe)
it = iter(pipe)
next(it)
next(it)
```

_Use a meta pipeline (i.e. functional only)_
Expand All @@ -40,18 +42,13 @@ pipe([1, 2, 3, 4, 5]).list() == [2, 3, 4, 5, 6]
from pipd import Pipe

pipe = Pipe(1, 2, 3).map(lambda x: x * 2)

print(next(pipe))
print(next(pipe))
print(next(pipe))
print(list(pipe))
```

<details> <summary> Show output </summary>

```py
2
4
6
[2, 4, 6]
```

</details>
Expand All @@ -61,18 +58,13 @@ _Map items to parallel workers_
from pipd import Pipe

pipe = Pipe(1, 2, 3).map(lambda x: x * 2, num_workers=2) # parallel map (note: order is not guaranteed)

print(next(pipe))
print(next(pipe))
print(next(pipe))
print(list(pipe))
```

<details> <summary> Show output </summary>

```py
4
2
6
[2, 4, 6]
```

</details>
Expand All @@ -83,16 +75,13 @@ print(next(pipe))
from pipd import Pipe

pipe = Pipe(1, 2, 3).filter(lambda x: x != 1)

print(next(pipe))
print(next(pipe))
print(list(pipe))
```

<details> <summary> Show output </summary>

```py
2
3
[2, 3]
```

</details>
Expand All @@ -104,9 +93,9 @@ Applies a function on each item in the pipeline without changing the item, usefu
from pipd import Pipe

pipe = Pipe(1, 2, 3).side(lambda x: print('side', x))

print(next(pipe))
print(next(pipe))
it = iter(pipe)
print(next(it))
print(next(it))
```

<details> <summary> Show output </summary>
Expand All @@ -125,10 +114,10 @@ side 2
from pipd import Pipe

pipe = Pipe(1, 2, 3, 4, 5).batch(2)

print(next(pipe))
print(next(pipe))
print(next(pipe))
it = iter(pipe)
print(next(it))
print(next(it))
print(next(it))
```

<details> <summary> Show output </summary>
Expand All @@ -147,22 +136,13 @@ print(next(pipe))
from pipd import Pipe

pipe = Pipe([1, 2], [3], [4, 5]).unbatch()

print(next(pipe))
print(next(pipe))
print(next(pipe))
print(next(pipe))
print(next(pipe))
print(list(pipe))
```

<details> <summary> Show output </summary>

```py
1
2
3
4
5
[1, 2, 3, 4, 5]
```

</details>
Expand All @@ -173,7 +153,7 @@ print(next(pipe))
from pipd import Pipe

pipe = Pipe(range(10)).log()
pipe() # run the pipeline
list(pipe) # runs the pipeline
```

<details> <summary> Show output </summary>
Expand All @@ -198,7 +178,7 @@ pipe() # run the pipeline
from pipd import Pipe

pipe = Pipe(range(10)).limit(5).log()
pipe() # run the pipeline
list(pipe) # runs the pipeline
```

<details> <summary> Show output </summary>
Expand Down
2 changes: 1 addition & 1 deletion pipd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# isort: skip_file
from .pipe import Function, Pipe # noqa F403
from .pipe import Function, Pipe, log_traceback_and_continue # noqa F403
from .functions import * # noqa F403
8 changes: 2 additions & 6 deletions pipd/functions/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,20 @@
)
from typing import Callable, Iterable, Iterator, Optional, TypeVar

from pipd import Function, Pipe
from pipd import Function, Pipe, log_traceback_and_continue

T = TypeVar("T")
U = TypeVar("U")


def log_and_continue(exception: Exception):
print(repr(exception))


class Map(Function):
def __init__(
self,
fn: Callable[[T], U],
num_workers: int = 0,
buffer: Optional[int] = None,
mode: str = "multithread",
handler: Callable = log_and_continue,
handler: Callable = log_traceback_and_continue,
) -> None:
assert mode in ["multithread", "multiprocess"]
self.fn = fn
Expand Down
8 changes: 2 additions & 6 deletions pipd/functions/side.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from typing import Callable, Iterable, Iterator, TypeVar

from pipd import Function, Pipe
from pipd import Function, Pipe, log_traceback_and_continue

T = TypeVar("T")
U = TypeVar("U")


def log_and_continue(exception: Exception):
print(repr(exception))


class Side(Function):
def __init__(
self,
fn: Callable[[T], U],
num_workers: int = 0,
mode: str = "multithread",
handler: Callable = log_and_continue,
handler: Callable = log_traceback_and_continue,
) -> None:
assert mode in ["multithread", "multiprocess"]
self.fn = fn
Expand Down
39 changes: 24 additions & 15 deletions pipd/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import random
import re
import traceback
from typing import Any, Callable, Dict, Iterator, Optional, Sequence, Type


Expand All @@ -13,6 +14,14 @@ def is_iterable(obj):
return False


def log_traceback_and_continue(exception: Exception):
message = "Exception in Pipe, logging traceback continuing:\n"
message += "".join(
traceback.format_exception(type(exception), exception, exception.__traceback__)
)
print(message)


def camelcase_to_snakecase(name):
return re.sub(r"([a-z])([A-Z])", r"\1_\2", name).lower()

Expand Down Expand Up @@ -59,17 +68,24 @@ def method(*args, **kwargs):
class Pipe(metaclass=PipeMeta):
functions: Dict[str, Type[Function]] = {}

def __init__(self, *iterable, function: Optional[Callable] = identity):
def __init__(
self,
*iterable,
function: Optional[Callable] = identity,
handler: Optional[Callable] = log_traceback_and_continue,
):
if len(iterable) == 1 and is_iterable(iterable[0]):
iterable = iterable[0]
self.iterator = iter(iterable)
self.iterable = iterable
self.function = function
self.handler = handler

def __iter__(self):
yield from self.function(self.iterator)

def __next__(self):
return next(self.function(self.iterator))
for item in self.function(iter(self.iterable)):
try:
yield item
except Exception as e:
self.handler(e)

def close(self):
# Necessary to use `yield from` on a Pipe object
Expand All @@ -82,21 +98,14 @@ def __getattr__(self, name):
def method(*args, **kwargs):
cls = self.__class__
function = self.functions[name](*args, **kwargs)
return cls(self.iterator, function=compose(self.function, function))
return cls(self.iterable, function=compose(self.function, function))

return method

def __call__(self, *iterable):
if len(iterable) == 1 and is_iterable(iterable[0]):
iterable = iterable[0]
self.iterator = iter(iterable)
return self
return self.__class__(*iterable, function=self.function)

@classmethod
def add_fn(cls, fn: Type[Function], name: Optional[str] = None):
fn_name = camelcase_to_snakecase(fn.__name__) if name is None else name
cls.functions[fn_name] = fn

@classmethod
def merge(cls, *pipes, weights: Optional[Sequence[float]] = None) -> Pipe:
return cls(merge_rand(*pipes, weights=weights))
17 changes: 9 additions & 8 deletions pipd/tests/test_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ def __getitem__(self, key):
return key

pipe = Pipe(ClassWithGetItem())
assert next(pipe) == 0
assert next(pipe) == 1
assert next(pipe) == 2
it = iter(pipe)
assert next(it) == 0
assert next(it) == 1
assert next(it) == 2

pipe = Pipe(0, 1, 2, 3)

Expand All @@ -36,8 +37,8 @@ def test_metaclass():
assert list(pipe(0, 1, 2, 3)) == [1, 2, 3, 4]


def test_merge():
pipe0 = Pipe(0, 0, 0, 0, 0, 0)
pipe1 = Pipe(1, 1, 1, 1, 1, 1)
pipe = Pipe.merge(pipe0, pipe1, weights=[3, 1])
print(list(pipe))
# def test_merge():
# pipe0 = Pipe(0, 0, 0, 0, 0, 0)
# pipe1 = Pipe(1, 1, 1, 1, 1, 1)
# pipe = Pipe.merge(pipe0, pipe1, weights=[3, 1])
# print(list(pipe))
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
setup(
name="pipd",
packages=find_packages(exclude=[]),
version="0.1.5",
version="0.1.6",
description="Utility functions for python data pipelines.",
long_description_content_type="text/markdown",
author="ElevenLabs",
Expand Down

0 comments on commit c9c8184

Please sign in to comment.