Skip to content

Commit

Permalink
fix: async source init (#120)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Nov 21, 2023
1 parent b24f87d commit 2f0d4c4
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 5 deletions.
54 changes: 54 additions & 0 deletions examples/source/async-source/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
####################################################################################################
# builder: install needed dependencies
####################################################################################################

FROM python:3.10-slim-bullseye AS builder

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_VERSION=1.2.2 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup" \
VENV_PATH="/opt/pysetup/.venv"

ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential \
&& apt-get install -y git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
\
# install dumb-init
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
&& chmod +x /dumb-init \
&& curl -sSL https://install.python-poetry.org | python3 -

####################################################################################################
# udf: used for running the udf vertices
####################################################################################################
FROM builder AS udf

WORKDIR $PYSETUP_PATH
COPY pyproject.toml ./
RUN poetry install --no-cache --no-root && \
rm -rf ~/.cache/pypoetry/

ADD . /app
WORKDIR /app

RUN chmod +x entry.sh

ENTRYPOINT ["/dumb-init", "--"]
CMD ["/app/entry.sh"]

EXPOSE 5000
8 changes: 8 additions & 0 deletions examples/source/async-source/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.PHONY: image
image:
docker build -t "quay.io/numaio/numaflow-python/async-source:v0.5.4" .
# Github CI runner uses platform linux/amd64. If your local environment don't, the image built by command above might not work
# under the CI E2E test environment.
# To build an image that supports multiple platforms(linux/amd64,linux/arm64) and push to quay.io, use the following command
# docker buildx build -t "quay.io/numaio/numaflow-python/async-source:v0.5.4" --platform linux/amd64,linux/arm64 . --push
# If command failed, refer to https://billglover.me/notes/build-multi-arch-docker-images/ to fix
6 changes: 6 additions & 0 deletions examples/source/async-source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Example Python User Defined Source
A simple example of a user-defined source. The source maintains an array of messages and implements the Read,
Ack, and Pending methods.
The Read(x) method returns the next x number of messages in the array.
The Ack() method acknowledges the last batch of messages returned by Read().
The Pending() method returns 0 to indicate that the simple source always has 0 pending messages.
4 changes: 4 additions & 0 deletions examples/source/async-source/entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
set -eux

python example.py
69 changes: 69 additions & 0 deletions examples/source/async-source/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from datetime import datetime
from collections.abc import AsyncIterable

import aiorun

from pynumaflow.sourcer import (
ReadRequest,
Message,
AckRequest,
PendingResponse,
Offset,
AsyncSourcer,
)


class AsyncSource:
"""
AsyncSource is a class for User Defined Source implementation.
"""

def __init__(self):
"""
to_ack_set: Set to maintain a track of the offsets yet to be acknowledged
read_idx : the offset idx till where the messages have been read
"""
self.to_ack_set = set()
self.read_idx = 0

async def read_handler(self, datum: ReadRequest) -> AsyncIterable[Message]:
"""
read_handler is used to read the data from the source and send the data forward
for each read request we process num_records and increment the read_idx to indicate that
the message has been read and the same is added to the ack set
"""
if self.to_ack_set:
return

for x in range(datum.num_records):
yield Message(
payload=str(self.read_idx).encode(),
offset=Offset(offset=str(self.read_idx).encode(), partition_id="0"),
event_time=datetime.now(),
)
self.to_ack_set.add(str(self.read_idx))
self.read_idx += 1

async def ack_handler(self, ack_request: AckRequest):
"""
The ack handler is used acknowledge the offsets that have been read, and remove them
from the to_ack_set
"""
for offset in ack_request.offset:
self.to_ack_set.remove(str(offset.offset, "utf-8"))

async def pending_handler(self) -> PendingResponse:
"""
The simple source always returns zero to indicate there is no pending record.
"""
return PendingResponse(count=0)


if __name__ == "__main__":
ud_source = AsyncSource()
grpc_server = AsyncSourcer(
read_handler=ud_source.read_handler,
ack_handler=ud_source.ack_handler,
pending_handler=ud_source.pending_handler,
)
aiorun.run(grpc_server.start())
21 changes: 21 additions & 0 deletions examples/source/async-source/pipeline-numaflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: async-source
spec:
vertices:
- name: in
source:
udsource:
container:
# A simple user-defined async source
image: quay.io/numaio/numaflow-python/async-source:v0.5.4
imagePullPolicy: Always
limits:
readBatchSize: 2
- name: out
sink:
log: {}
edges:
- from: in
to: out
17 changes: 17 additions & 0 deletions examples/source/async-source/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[tool.poetry]
name = "simple-source"
version = "0.2.4"
description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
pynumaflow = "~0.5.4"
aiorun = "^2023.7"


[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
12 changes: 8 additions & 4 deletions pynumaflow/sourcer/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,20 @@ async def AckFn(
for offset in request.request.offsets:
offsets.append(Offset(offset.offset, offset.partition_id))
try:
await self.__invoke_ack(ack_req=request)
await self.__invoke_ack(ack_req=offsets)
except Exception as e:
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(str(e))
raise e

return source_pb2.AckResponse()

async def __invoke_ack(self, ack_req: AckRequest):
async def __invoke_ack(self, ack_req: list[Offset]):
"""
Invokes the Source Ack Function.
"""
try:
await self.__source_ack_handler(ack_req)
await self.__source_ack_handler(AckRequest(offsets=ack_req))
except Exception as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
raise err
Expand Down Expand Up @@ -182,7 +182,11 @@ async def PendingFn(

async def __serve_async(self, server) -> None:
source_pb2_grpc.add_SourceServicer_to_server(
AsyncSourcer(read_handler=self.__source_read_handler),
AsyncSourcer(
read_handler=self.__source_read_handler,
ack_handler=self.__source_ack_handler,
pending_handler=self.__source_pending_handler,
),
server,
)
server.add_insecure_port(self.sock_path)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pynumaflow"
version = "0.5.3"
version = "0.5.4"
description = "Provides the interfaces of writing Python User Defined Functions and Sinks for NumaFlow."
authors = ["NumaFlow Developers"]
readme = "README.md"
Expand Down

0 comments on commit 2f0d4c4

Please sign in to comment.