-
Notifications
You must be signed in to change notification settings - Fork 1
Data Streaming
Pipeline supports streaming using asynchronous programming. To utilize streaming, a Transformation Adapter receives each wired input as an Async Generator. These wired inputs can be iterated only once to consume the stream.
This page will first cover the basics about Generators, Async Generators making its way to Streaming Transformation Adapters.
A Generator is a function which returns an Iterator (an object that can be iterated or looped over). For the purposes of this page, you can think of a Generator and Iterator to be the same. In reality, there are differences, every generator is an iterator, but not vice versa. You can refer here for more details.
We use the keyword yield
instead of a return
statement to create Generators. yield
pauses the function saving all its states and later continues from there on successive calls. For example:
# A simple generator function
def my_gen():
for i in range(5):
yield i
There are 3 ways to use a Generator/Iterator:
- A for loop
for i in my_gen(): print(i)
- Built-in function next()
gen = my_gen() print(next(gen)) print(next(gen)) print(next(gen)) print(next(gen)) print(next(gen))
Note: If you try next(gen) again, it'll raise StopIteration exception because they can only only be iterated once.
- Iterator special method __next__()
gen = my_gen() while True: try: print(gen.__next__()) except StopIteration: break
Note: Built-in function next() simply calls __next__() and the for loop is basically a shorthand for the above code.
There are more advanced Generator features, but they are not relevant for the topic of this Page.
They are Generators that support Asynchronous Programming. Asynchronous programming is a type of parallel programming in which a unit of work is allowed to run separately from the primary application thread. Python provides a standard library asyncio
and keywords async/await
to write Asynchronous Programs. You can refer here and here for more details.
We use the keyword async
in front of a Generator function to create Async Generators. For example:
# A simple async generator function
async def my_asyncgen():
for i in range(5):
yield i
There are 2 ways to use an Async Generator/Iterator:
- An async for loop
async for i in my_asyncgen(): print(i)
- Async Iterator special method __anext__()
gen = my_asyncgen() print(await gen.__anext__()) print(await gen.__anext__()) print(await gen.__anext__()) print(await gen.__anext__()) print(await gen.__anext__())
Note: If you await gen.__anext__() again, it'll raise StopAsyncIteration exception because they can only only be iterated once. There is no Built-in function anext() right now. The async for loop is a shorthand for the following:
gen = my_asyncgen() while True: try: print(await gen.__anext__()) except StopAsyncIteration: break
In Python, all Async code needs to be executed in an event loop. async for
or await
can only be executed in an async def
function, which in turn needs an event loop. But they are not relevant for the topic of this Page.
Transformation Adapters can be of 4 types based on if their exec
function is:
- An Async Generator
- A regular function
- A Generator
- An Async function
If the Adapter's exec
function is an Async Generator, i.e. a function with async def
syntax and a yield
statement, then it will receive each wired input as a stream (Async Generator object). For all other types of Adapters, each wired input is a single value of the expected ArgType. Here is a sample Adapter:
class MyStreamFunc(IFunc):
id = "my_stream_func"
inputs = {"input": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input: AsyncGenerator[str, None]):
self.input = input
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
async for input in self.input:
yield {"output": input.lower()}
Notice how input
is an Async Generator object which is looped over with an async for
statement. Here is a more complicated example:
class MyStreamFunc(IFunc):
id = "my_stream_func"
inputs = {"input1": ArgType.String, "input2": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input1: AsyncGenerator[str, None], input2: AsyncGenerator[str, None]):
self.input1 = input1
self.input2 = input2
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
while True:
try:
input1 = await self.input1.__anext__()
input2 = await self.input2.__anext__()
yield {"output": input1.lower() + " " + input2.lower()}
except StopAsyncIteration:
break
This is a classic zipping of two streams. It finishes when one of the streams finishes. We change the exec
function in the above code to:
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
input1_list = []
async for input1 in self.input1:
input1_list.append(input1)
async for input2 in self.input2:
for input1 in input1_list:
yield {"output": input1.lower() + " " + input2.lower()}
And it becomes a classic crossing of two streams. Notice how the stream input1
had to be stored in memory in the List input1_list
.
If the Adapter's exec
function is a regular function, then it will receive each wired input as a single value of the expected ArgType. Here is a sample Adapter:
class MyStreamFunc(IFunc):
id = "my_stream_func"
inputs = {"input": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input: str):
self.input = input
def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
return {"output": self.input.lower()}
Notice how input
is simply a String.
If the Adapter's exec
function is a Generator, i.e. a regular function with yield
statement, then it will receive each wired input as a single value of the expected ArgType. Here is a sample Adapter:
class MyStreamFunc(IFunc):
id = "my_stream_func"
inputs = {"input": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input: str):
self.input = input
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
for i in range(5):
yield {"output": self.input.lower() + str(i)}
If the Adapter's exec
function is an Async function, i.e. a function with async def
syntax and a return
statement, then it will receive each wired input as a single value of the expected ArgType. Here is a sample Adapter:
class MyStreamFunc(IFunc):
id = "my_stream_func"
inputs = {"input": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input: str):__
self.input = input
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
return {"output": self.input.lower()}
Out of the four types of Transformation Adapters mentioned above, only the first type is compatible with streaming. The Pipeline only deals in streams and assumes each Adapter's exec
function to be an Async Generator. It uses a Wrapper to support the other three types of Adapters. The Wrapper employs the same zipping approach for wired input streams as mentioned previously. We are currently adding support for writing quick Wrappers and a way to specify them in the Configuration file.
This section will cover the more advanced details about Pipeline and streaming inputs.
We have covered how to handle input streams in this section above. However, we have yet to witness the dual nature of an input, which may or may not be a stream depending on whether it is wired or not. Here is a sample adapter:
class MyStreamFunc(IFunc):
id = "my_stream_func"
inputs = {"input": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input: Union[str, AsyncGenerator[str, None]]):
self.input = input
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
if isinstance(self.input, str):
yield {"output": self.input.lower()}
else:
async for input in self.input:
yield {"output": input.lower()}
Notice how we checked if input
is a String or not.
In all previous examples, we consumed an input stream completely. If an Adapter exits cleanly before all its input streams finish, then the Adapters producing those partially consumed streams stay unaffected and still run till completion. For example, if you have the following Adapters:
class MyProducerFunc(IFunc):
id = "my_producer_func"
inputs = {"input": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input: str):
self.input = input
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
for i in range(5):
yield {"output": self.input.lower() + str(i)}
class MyConsumerFunc(IFunc):
id = "my_consumer_func"
inputs = {"input": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input: AsyncGenerator[str, None]):
self.input = input
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
async for input in self.input:
yield {"output": input.lower()}
break
Notice how we break out of the async for
loop without completely consuming the stream input
. What is the expected behaviour if we wire MyProducerFunc's output
to MyConsumerFunc's input
? MyProducerFunc is executed till completion irrespective of its consumers. Here is a more complicated example:
class MyProducerFunc1(IFunc):
id = "my_producer_func1"
inputs = {"input": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input: str):
self.input = input
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
for i in range(5):
yield {"output": self.input.lower() + str(i)}
class MyProducerFunc2(IFunc):
id = "my_producer_func2"
inputs = {"input": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input: str):
self.input = input
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
for i in range(10):
yield {"output": self.input.lower() + str(i)}
class MyConsumerFunc(IFunc):
id = "my_consumer_func"
inputs = {"input1": ArgType.String, "input2": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input1: AsyncGenerator[str, None], input2: AsyncGenerator[str, None]):
self.input1 = input1
self.input2 = input2
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
while True:
try:
input1 = await self.input1.__anext__()
input2 = await self.input2.__anext__()
yield {"output": input1.lower() + " " + input2.lower()}
except StopAsyncIteration:
break
Notice how MyProducerFunc1 yields 5 times, but MyProducerFunc2 yields 10 times. What is the expected behaviour if we wire MyProducerFunc1's output
to MyConsumerFunc's input1
and MyProducerFunc2's output
to MyConsumerFunc's input2
? MyConsumerFunc exits cleanly when the stream input1
finishes earlier than the stream input2
, but MyProducerFunc2 is executed till completion. Here MyConsumerFunc employs the same zipping approach as the Wrapper that the Pipeline uses for the three non-streaming types of Adapters. Therefore, we will get the exact same behaviour if we use the following non-streaming MyConsumerFunc:
class MyConsumerFunc(IFunc):
id = "my_consumer_func"
inputs = {"input1": ArgType.String, "input2": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input1: str, input2: str):
self.input1 = input1
self.input2 = input2
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
yield {"output": self.input1.lower() + " " + self.input2.lower()}
Note: However, if an Adapter exits with an error, then all the other Adapters are also terminated.
Used incorrectly, streaming can lead to a deadlock in the Pipeline. We have implemented deadlock detection in the Pipeline which prevents it from getting stuck. However, users should be aware of cases which can lead to a deadlock. Here is an example:
class MyProducerFunc(IFunc):
id = "my_producer_func"
inputs = {"input": ArgType.String}
outputs = {"output1": ArgType.String, "output2": ArgType.String}
def __init__(self, input: str):
self.input = input
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
for i in range(5):
yield {"output1": self.input.lower() + str(i), "output2": self.input.upper() + str(i)}
class MyConsumerFunc(IFunc):
id = "my_consumer_func"
inputs = {"input1": ArgType.String, "input2": ArgType.String}
outputs = {"output": ArgType.String}
def __init__(self, input1: AsyncGenerator[str, None], input2: AsyncGenerator[str, None]):
self.input1 = input1
self.input2 = input2
async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
input1_list = []
async for input1 in self.input1:
input1_list.append(input1)
async for input2 in self.input2:
for input1 in input1_list:
yield {"output": input1.lower() + " " + input2.lower()}
If we wire MyProducerFunc's output1
to MyConsumerFunc's input1
and MyProducerFunc's output2
to MyConsumerFunc's input2
, then it'll lead to a deadlock. This happens because MyConsumerFunc consumes the two streams input1
and input2
, from the same source MyProducerFunc, at different rates. We will get the exact same behaviour if input1
and input2
had different immediate sources but could be traced back to a common source. Such diamond patterns are the reasons for a deadlock.