Skip to content

Data Streaming

raunaqtri1 edited this page Jun 23, 2020 · 1 revision

Data Streaming

Pipeline supports streaming using asynchronous programming. To utilize streaming, a Transformation Adapter receives each wired input as an Async Generator. These wired inputs can be iterated only once to consume the stream.

This page will first cover the basics about Generators, Async Generators making its way to Streaming Transformation Adapters.

Generators

A Generator is a function which returns an Iterator (an object that can be iterated or looped over). For the purposes of this page, you can think of a Generator and Iterator to be the same. In reality, there are differences, every generator is an iterator, but not vice versa. You can refer here for more details.

Creation

We use the keyword yield instead of a return statement to create Generators. yield pauses the function saving all its states and later continues from there on successive calls. For example:

# A simple generator function
def my_gen():
    for i in range(5):
        yield i

Usage

There are 3 ways to use a Generator/Iterator:

  1. A for loop
    for i in my_gen():
        print(i)
    
  2. Built-in function next()
    gen = my_gen()
    print(next(gen))
    print(next(gen))
    print(next(gen))
    print(next(gen))
    print(next(gen))
    

    Note: If you try next(gen) again, it'll raise StopIteration exception because they can only only be iterated once.

  3. Iterator special method __next__()
     gen = my_gen()
     while True:
         try:
             print(gen.__next__())
         except StopIteration:
             break
    

    Note: Built-in function next() simply calls __next__() and the for loop is basically a shorthand for the above code.

There are more advanced Generator features, but they are not relevant for the topic of this Page.

Async Generators

They are Generators that support Asynchronous Programming. Asynchronous programming is a type of parallel programming in which a unit of work is allowed to run separately from the primary application thread. Python provides a standard library asyncio and keywords async/await to write Asynchronous Programs. You can refer here and here for more details.

Creation

We use the keyword async in front of a Generator function to create Async Generators. For example:

# A simple async generator function
async def my_asyncgen():
    for i in range(5):
        yield i

Usage

There are 2 ways to use an Async Generator/Iterator:

  1. An async for loop
    async for i in my_asyncgen():
        print(i)
    
  2. Async Iterator special method __anext__()
    gen = my_asyncgen()
    print(await gen.__anext__())
    print(await gen.__anext__())
    print(await gen.__anext__())
    print(await gen.__anext__())
    print(await gen.__anext__())
    

    Note: If you await gen.__anext__() again, it'll raise StopAsyncIteration exception because they can only only be iterated once. There is no Built-in function anext() right now. The async for loop is a shorthand for the following:

    gen = my_asyncgen()
    while True:
        try:
            print(await gen.__anext__())
        except StopAsyncIteration:
            break
    

In Python, all Async code needs to be executed in an event loop. async for or await can only be executed in an async def function, which in turn needs an event loop. But they are not relevant for the topic of this Page.

Transformation Adapters

Transformation Adapters can be of 4 types based on if their exec function is:

  1. An Async Generator
  2. A regular function
  3. A Generator
  4. An Async function

1. An Async Generator

If the Adapter's exec function is an Async Generator, i.e. a function with async def syntax and a yield statement, then it will receive each wired input as a stream (Async Generator object). For all other types of Adapters, each wired input is a single value of the expected ArgType. Here is a sample Adapter:

class MyStreamFunc(IFunc):
    id = "my_stream_func"
    inputs = {"input": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input: AsyncGenerator[str, None]):
        self.input = input

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        async for input in self.input:
            yield {"output": input.lower()}

Notice how input is an Async Generator object which is looped over with an async for statement. Here is a more complicated example:

class MyStreamFunc(IFunc):
    id = "my_stream_func"
    inputs = {"input1": ArgType.String, "input2": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input1: AsyncGenerator[str, None], input2: AsyncGenerator[str, None]):
        self.input1 = input1
        self.input2 = input2

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        while True:
            try:
                input1 = await self.input1.__anext__()
                input2 = await self.input2.__anext__()
                yield {"output": input1.lower() + " " + input2.lower()}
            except StopAsyncIteration:
                break

This is a classic zipping of two streams. It finishes when one of the streams finishes. We change the exec function in the above code to:

async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
    input1_list = []
    async for input1 in self.input1:
        input1_list.append(input1)
    async for input2 in self.input2:
        for input1 in input1_list:
            yield {"output": input1.lower() + " " + input2.lower()}

And it becomes a classic crossing of two streams. Notice how the stream input1 had to be stored in memory in the List input1_list.

2. A regular function

If the Adapter's exec function is a regular function, then it will receive each wired input as a single value of the expected ArgType. Here is a sample Adapter:

class MyStreamFunc(IFunc):
    id = "my_stream_func"
    inputs = {"input": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input: str):
        self.input = input

    def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        return {"output": self.input.lower()}

Notice how input is simply a String.

3. A Generator

If the Adapter's exec function is a Generator, i.e. a regular function with yield statement, then it will receive each wired input as a single value of the expected ArgType. Here is a sample Adapter:

class MyStreamFunc(IFunc):
    id = "my_stream_func"
    inputs = {"input": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input: str):
        self.input = input

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        for i in range(5):
            yield {"output": self.input.lower() + str(i)}

4. An Async function

If the Adapter's exec function is an Async function, i.e. a function with async def syntax and a return statement, then it will receive each wired input as a single value of the expected ArgType. Here is a sample Adapter:

class MyStreamFunc(IFunc):
    id = "my_stream_func"
    inputs = {"input": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input: str):__
        self.input = input

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        return {"output": self.input.lower()}

Streaming Pipeline

Out of the four types of Transformation Adapters mentioned above, only the first type is compatible with streaming. The Pipeline only deals in streams and assumes each Adapter's exec function to be an Async Generator. It uses a Wrapper to support the other three types of Adapters. The Wrapper employs the same zipping approach for wired input streams as mentioned previously. We are currently adding support for writing quick Wrappers and a way to specify them in the Configuration file.

This section will cover the more advanced details about Pipeline and streaming inputs.

Processing Inputs

We have covered how to handle input streams in this section above. However, we have yet to witness the dual nature of an input, which may or may not be a stream depending on whether it is wired or not. Here is a sample adapter:

class MyStreamFunc(IFunc):
    id = "my_stream_func"
    inputs = {"input": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input: Union[str, AsyncGenerator[str, None]]):
        self.input = input

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        if isinstance(self.input, str):
            yield {"output": self.input.lower()}
        else:
            async for input in self.input:
                yield {"output": input.lower()}

Notice how we checked if input is a String or not.

Partially Consuming Streams

In all previous examples, we consumed an input stream completely. If an Adapter exits cleanly before all its input streams finish, then the Adapters producing those partially consumed streams stay unaffected and still run till completion. For example, if you have the following Adapters:

class MyProducerFunc(IFunc):
    id = "my_producer_func"
    inputs = {"input": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input: str):
        self.input = input

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        for i in range(5):
            yield {"output": self.input.lower() + str(i)}

class MyConsumerFunc(IFunc):
    id = "my_consumer_func"
    inputs = {"input": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input: AsyncGenerator[str, None]):
        self.input = input

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        async for input in self.input:
            yield {"output": input.lower()}
            break

Notice how we break out of the async for loop without completely consuming the stream input. What is the expected behaviour if we wire MyProducerFunc's output to MyConsumerFunc's input? MyProducerFunc is executed till completion irrespective of its consumers. Here is a more complicated example:

class MyProducerFunc1(IFunc):
    id = "my_producer_func1"
    inputs = {"input": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input: str):
        self.input = input

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        for i in range(5):
            yield {"output": self.input.lower() + str(i)}

class MyProducerFunc2(IFunc):
    id = "my_producer_func2"
    inputs = {"input": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input: str):
        self.input = input

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        for i in range(10):
            yield {"output": self.input.lower() + str(i)}

class MyConsumerFunc(IFunc):
    id = "my_consumer_func"
    inputs = {"input1": ArgType.String, "input2": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input1: AsyncGenerator[str, None], input2: AsyncGenerator[str, None]):
        self.input1 = input1
        self.input2 = input2

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        while True:
            try:
                input1 = await self.input1.__anext__()
                input2 = await self.input2.__anext__()
                yield {"output": input1.lower() + " " + input2.lower()}
            except StopAsyncIteration:
                break

Notice how MyProducerFunc1 yields 5 times, but MyProducerFunc2 yields 10 times. What is the expected behaviour if we wire MyProducerFunc1's output to MyConsumerFunc's input1 and MyProducerFunc2's output to MyConsumerFunc's input2? MyConsumerFunc exits cleanly when the stream input1 finishes earlier than the stream input2, but MyProducerFunc2 is executed till completion. Here MyConsumerFunc employs the same zipping approach as the Wrapper that the Pipeline uses for the three non-streaming types of Adapters. Therefore, we will get the exact same behaviour if we use the following non-streaming MyConsumerFunc:

class MyConsumerFunc(IFunc):
    id = "my_consumer_func"
    inputs = {"input1": ArgType.String, "input2": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input1: str, input2: str):
        self.input1 = input1
        self.input2 = input2

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        yield {"output": self.input1.lower() + " " + self.input2.lower()}

Note: However, if an Adapter exits with an error, then all the other Adapters are also terminated.

Deadlocking

Used incorrectly, streaming can lead to a deadlock in the Pipeline. We have implemented deadlock detection in the Pipeline which prevents it from getting stuck. However, users should be aware of cases which can lead to a deadlock. Here is an example:

class MyProducerFunc(IFunc):
    id = "my_producer_func"
    inputs = {"input": ArgType.String}
    outputs = {"output1": ArgType.String, "output2": ArgType.String}

    def __init__(self, input: str):
        self.input = input

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        for i in range(5):
            yield {"output1": self.input.lower() + str(i), "output2": self.input.upper() + str(i)}

class MyConsumerFunc(IFunc):
    id = "my_consumer_func"
    inputs = {"input1": ArgType.String, "input2": ArgType.String}
    outputs = {"output": ArgType.String}

    def __init__(self, input1: AsyncGenerator[str, None], input2: AsyncGenerator[str, None]):
        self.input1 = input1
        self.input2 = input2

    async def exec(self) -> Union[dict, Generator[dict, None, None], AsyncGenerator[dict, None]]:
        input1_list = []
        async for input1 in self.input1:
            input1_list.append(input1)
        async for input2 in self.input2:
            for input1 in input1_list:
                yield {"output": input1.lower() + " " + input2.lower()}

If we wire MyProducerFunc's output1 to MyConsumerFunc's input1 and MyProducerFunc's output2 to MyConsumerFunc's input2, then it'll lead to a deadlock. This happens because MyConsumerFunc consumes the two streams input1 and input2, from the same source MyProducerFunc, at different rates. We will get the exact same behaviour if input1 and input2 had different immediate sources but could be traced back to a common source. Such diamond patterns are the reasons for a deadlock.