Skip to content

Commit

Permalink
fix: unix domain socket path and examples (#6)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 authored Sep 19, 2022
1 parent 043e109 commit fd88178
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 14 deletions.
43 changes: 43 additions & 0 deletions examples/function/forward_message/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
FROM python:3.9.12-slim

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_VERSION=1.2.0 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup" \
VENV_PATH="/opt/pysetup/.venv"

ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential

# install poetry - respects $POETRY_VERSION & $POETRY_HOME
RUN curl -sSL https://install.python-poetry.org | python3 -

WORKDIR $PYSETUP_PATH
ADD . $PYSETUP_PATH
WORKDIR $PYSETUP_PATH
RUN poetry install --no-dev --no-root

ADD . /app

# install dumb-init
RUN wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64
RUN chmod +x /dumb-init

WORKDIR /app
RUN chmod +x entry.sh

ENTRYPOINT ["/dumb-init", "--"]
CMD ["/app/entry.sh"]
16 changes: 16 additions & 0 deletions examples/function/forward_message/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Example Python User Defined Function using Kafka sink

1. Install Kafka in the Kubernetes cluster
```shell
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/main/config/apps/kafka/kafka-minimal.yaml
```

2. Build the docker image and import into k3d
```shell
docker build -t test-python-udf:v1 . && k3d image import docker.io/library/test-python-udf:v1
```

3. Apply the pipeline
```shell
kubectl apply -f pipeline-numaflow.yaml
```
4 changes: 4 additions & 0 deletions examples/function/forward_message/entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
set -eux

python example.py
33 changes: 33 additions & 0 deletions examples/function/forward_message/pipeline-numaflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: local-kafka
spec:
vertices:
- name: input
source:
kafka:
brokers:
- kafka-broker:9092
topic: input-topic
consumerGroup: test
- name: forward-message
udf:
container:
args:
- python
- example.py
image: docker.io/library/test-python-udf:v1
- name: log-output
sink:
log: {}
- name: log-kafka-output
sink:
log: {}
edges:
- from: input
to: forward-message
- from: input
to: log-kafka-output
- from: forward-message
to: log-output
15 changes: 15 additions & 0 deletions examples/function/forward_message/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[tool.poetry]
name = "example-function"
version = "0.2.2"
description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.9"
pynumaflow = "0.2.2"

[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
6 changes: 3 additions & 3 deletions examples/sink/simplesink/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ENV PYTHONFAULTHANDLER=1 \
PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_VERSION=1.1.13 \
POETRY_VERSION=1.2.0 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
Expand All @@ -23,12 +23,12 @@ RUN apt-get update \
build-essential

# install poetry - respects $POETRY_VERSION & $POETRY_HOME
RUN curl -sSL https://raw.githubusercontent.com/sdispater/poetry/master/get-poetry.py | python
RUN curl -sSL https://install.python-poetry.org | python3 -

WORKDIR $PYSETUP_PATH
ADD . $PYSETUP_PATH
WORKDIR $PYSETUP_PATH
RUN poetry install --no-dev
RUN poetry install --no-dev --no-root

ADD . /app

Expand Down
2 changes: 1 addition & 1 deletion examples/sink/simplesink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@

2. Apply the pipeline
```shell
kubectl apply -f pipeline-numaflow.yaml
kubectl apply -f pipeline-numaflow.yaml
```
2 changes: 1 addition & 1 deletion examples/sink/simplesink/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pynumaflow.sink import Datum, Responses, Response, UserDefinedSinkServicer


def udsink_handler(datums: List[Datum], __) -> Responses:
def udsink_handler(datums: List[Datum]) -> Responses:
responses = Responses()
for msg in datums:
print("User Defined Sink", msg)
Expand Down
4 changes: 2 additions & 2 deletions examples/sink/simplesink/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[tool.poetry]
name = "example-sink"
version = "0.1.0"
version = "0.2.2"
description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.9"
pynumaflow = "0.1.0"
pynumaflow = "0.2.2"

[tool.poetry.dev-dependencies]

Expand Down
2 changes: 1 addition & 1 deletion pynumaflow/function/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class UserDefinedFunctionServicer(udfunction_pb2_grpc.UserDefinedFunctionService

def __init__(self, map_handler: UDFMapCallable, sock_path=FUNCTION_SOCK_PATH):
self.__map_handler: UDFMapCallable = map_handler
self.sock_path = sock_path
self.sock_path = f"unix://{sock_path}"
self._cleanup_coroutines = []

def MapFn(
Expand Down
8 changes: 3 additions & 5 deletions pynumaflow/sink/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@
import logging
from os import environ


from google.protobuf import empty_pb2 as _empty_pb2

import grpc
from typing import Callable, Any, Iterator, List
from typing import Callable, Any, List

from pynumaflow._constants import (
SINK_SOCK_PATH,
DATUM_KEY,
)
from pynumaflow.sink import Response, Responses, Datum
from pynumaflow.sink import Responses, Datum
from pynumaflow.sink.generated import udsink_pb2_grpc, udsink_pb2
from pynumaflow.types import NumaflowServicerContext

Expand Down Expand Up @@ -48,7 +46,7 @@ class UserDefinedSinkServicer(udsink_pb2_grpc.UserDefinedSinkServicer):

def __init__(self, sink_handler: UDSinkCallable, sock_path=SINK_SOCK_PATH):
self.__sink_handler: UDSinkCallable = sink_handler
self.sock_path = sock_path
self.sock_path = f"unix://{sock_path}"
self._cleanup_coroutines = []

def SinkFn(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pynumaflow"
version = "0.2.1"
version = "0.2.2"
description = "Provides the interfaces of writing Python User Defined Functions and Sinks for NumaFlow."
authors = ["NumaFlow Developers"]
readme = "README.md"
Expand Down

0 comments on commit fd88178

Please sign in to comment.