Skip to content

Commit

Permalink
feat: batch-map implementation (numaproj#177)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Jul 16, 2024
1 parent 32a9d26 commit 921b7c0
Show file tree
Hide file tree
Showing 51 changed files with 1,770 additions and 90 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ jobs:
example_directories: [
"examples/map/even_odd", "examples/map/flatmap", "examples/map/forward_message",
"examples/map/multiproc_map", "examples/mapstream/flatmap_stream", "examples/reduce/counter",
"examples/reducestream/counter", "examples/reducestream/sum", "examples/sideinput/simple_sideinput",
"examples/sideinput/simple_sideinput/udf", "examples/sink/async_log", "examples/sink/log",
"examples/source/async_source", "examples/source/simple_source", "examples/sourcetransform/event_time_filter"
"examples/reducestream/counter", "examples/reducestream/sum", "examples/sideinput/simple-sideinput",
"examples/sideinput/simple-sideinput/udf", "examples/sink/async_log", "examples/sink/log",
"examples/source/async-source", "examples/source/simple-source", "examples/sourcetransform/event_time_filter",
"examples/batchmap/flatmap"
]

steps:
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ proto:
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sourcetransformer --python_out=pynumaflow/proto/sourcetransformer --grpc_python_out=pynumaflow/proto/sourcetransformer pynumaflow/proto/sourcetransformer/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sideinput --python_out=pynumaflow/proto/sideinput --grpc_python_out=pynumaflow/proto/sideinput pynumaflow/proto/sideinput/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sourcer --python_out=pynumaflow/proto/sourcer --grpc_python_out=pynumaflow/proto/sourcer pynumaflow/proto/sourcer/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/batchmapper --python_out=pynumaflow/proto/batchmapper --grpc_python_out=pynumaflow/proto/batchmapper pynumaflow/proto/batchmapper/*.proto


sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/proto/*/*.py
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pre-commit install
- [Map](https://github.com/numaproj/numaflow-python/tree/main/examples/map)
- [Reduce](https://github.com/numaproj/numaflow-python/tree/main/examples/reduce)
- [Map Stream](https://github.com/numaproj/numaflow-python/tree/main/examples/mapstream)
- [Batch Map](https://github.com/numaproj/numaflow-python/tree/main/examples/batchmap)
- [Implement User Defined Sinks](https://github.com/numaproj/numaflow-python/tree/main/examples/sink)
- [Implement User Defined SideInputs](https://github.com/numaproj/numaflow-python/tree/main/examples/sideinput)

Expand Down Expand Up @@ -95,7 +96,7 @@ This could be an alternative to creating multiple replicas of the same UDF conta
Thus this server type is useful for UDFs which are CPU intensive.
```
grpc_server = MapMultiProcServer(handler)
grpc_server = MapMultiProcServer(mapper_instance=handler, server_count=2)
```
#### Currently Supported Server Types for each functionality
Expand All @@ -111,6 +112,8 @@ These are the class names for the server types supported by each of the function
- ReduceAsyncServer
- MapStream
- MapStreamAsyncServer
- BatchMap
- BatchMapAsyncServer
- Source Transform
- SourceTransformServer
- SourceTransformMultiProcServer
Expand Down Expand Up @@ -147,6 +150,8 @@ The list of base handler classes for each of the functionalities is given below
- MapStreamer
- Source Transform
- SourceTransformer
- Batch Map
- BatchMapper
- UDSource
- Sourcer
- UDSink
Expand Down
66 changes: 66 additions & 0 deletions examples/batchmap/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
## BatchMap Interface
The BatchMap interface allows developers to
process multiple data items together in a single UDF handler.


### What is BatchMap?
BatchMap is an interface that allows developers to process multiple data items
in a UDF single call, rather than each item in separate calls.


The BatchMap interface can be helpful in scenarios
where performing operations on a group of data can be more efficient.


### Understanding the User Interface
The BatchMap interface requires developers to implement a handler with a specific signature.
Here is the signature of the BatchMap handler:

```python
async def handler(datums: AsyncIterable[Datum]) -> BatchResponses:
```
The handler takes an iterable of `Datum` objects and returns
`BatchResponses`.
The `BatchResponses` object is a list of the *same length* as the input
datums, with each item corresponding to the response for one request datum.

To clarify, let's say we have three data items:

```
data_1 = {"name": "John", "age": 25}
data_2 = {"name": "Jane", "age": 30}
data_3 = {"name": "Bob", "age": 45}
```

These data items will be grouped together by numaflow and
passed to the handler as an iterable:

```python
result = await handler([data_1, data_2, data_3])
```

The result will be a BatchResponses object, which is a list of responses corresponding to each input data item's processing.

### Important Considerations
When using BatchMap, there are a few important considerations to keep in mind:

- Ensure that the `BatchResponses` object is tagged with the *correct request ID*.
Each Datum has a unique ID tag, which will be used by Numaflow to ensure correctness.

```python
async for datum in datums:
batch_response = BatchResponse.from_id(datum.id)
```


- Ensure that the length of the `BatchResponses`
list is equal to the number of requests received.
**This means that for every input data item**, there should be a corresponding
response in the BatchResponses list.

Use batch processing only when it makes sense. In some
scenarios, batch processing may not be the most
efficient approach, and processing data items one by one
could be a better option.
The burden of concurrent processing of the data will rely on the
UDF implementation in this use case.
54 changes: 54 additions & 0 deletions examples/batchmap/flatmap/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
####################################################################################################
# builder: install needed dependencies
####################################################################################################

FROM python:3.10-slim-bullseye AS builder

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_VERSION=1.2.2 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup"

ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/batchmap/flatmap"
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential \
&& apt-get install -y git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
\
# install dumb-init
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
&& chmod +x /dumb-init \
&& curl -sSL https://install.python-poetry.org | python3 -

####################################################################################################
# udf: used for running the udf vertices
####################################################################################################
FROM builder AS udf

WORKDIR $PYSETUP_PATH
COPY ./ ./

WORKDIR $EXAMPLE_PATH
RUN poetry install --no-cache --no-root && \
rm -rf ~/.cache/pypoetry/

RUN chmod +x entry.sh

ENTRYPOINT ["/dumb-init", "--"]
CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"]

EXPOSE 5000
22 changes: 22 additions & 0 deletions examples/batchmap/flatmap/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/batch-map-flatmap:${TAG}
DOCKER_FILE_PATH = examples/batchmap/flatmap/Dockerfile

.PHONY: update
update:
poetry update -vv

.PHONY: image-push
image-push: update
cd ../../../ && docker buildx build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} \
--platform linux/amd64,linux/arm64 . --push

.PHONY: image
image: update
cd ../../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
4 changes: 4 additions & 0 deletions examples/batchmap/flatmap/entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
set -eux

python example.py
46 changes: 46 additions & 0 deletions examples/batchmap/flatmap/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from collections.abc import AsyncIterable

from pynumaflow.batchmapper import (
Message,
Datum,
BatchMapper,
BatchMapAsyncServer,
BatchResponses,
BatchResponse,
)


class Flatmap(BatchMapper):
"""
This is a class that inherits from the BatchMapper class.
It implements a flatmap operation over a batch of input messages
"""

async def handler(
self,
datums: AsyncIterable[Datum],
) -> BatchResponses:
batch_responses = BatchResponses()
async for datum in datums:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
batch_response = BatchResponse.from_id(datum.id)
if len(strs) == 0:
batch_response.append(Message.to_drop())
else:
for s in strs:
batch_response.append(Message(str.encode(s)))
batch_responses.append(batch_response)

return batch_responses


if __name__ == "__main__":
"""
This example shows how to use the Batch Map Flatmap.
We use a class as handler, but a function can be used as well.
"""
grpc_server = BatchMapAsyncServer(Flatmap())
grpc_server.start()
33 changes: 33 additions & 0 deletions examples/batchmap/flatmap/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: flatmap
spec:
vertices:
- name: in
source:
# A self data generating source
generator:
rpu: 500
duration: 1s
- name: batch-flatmap
partitions: 2
metadata:
annotations:
numaflow.numaproj.io/batch-map: "true"
scale:
min: 1
udf:
container:
image: quay.io/numaio/numaflow-python/batch-map-flatmap:stable
imagePullPolicy: Always
- name: sink
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: batch-flatmap
- from: batch-flatmap
to: sink
15 changes: 15 additions & 0 deletions examples/batchmap/flatmap/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[tool.poetry]
name = "batch-map-flatmap"
version = "0.1.0"
description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
pynumaflow = { path = "../../../"}

[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
7 changes: 6 additions & 1 deletion pynumaflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SOURCE_SOCK_PATH = "/var/run/numaflow/source.sock"
MULTIPROC_MAP_SOCK_ADDR = "/var/run/numaflow/multiproc"
FALLBACK_SINK_SOCK_PATH = "/var/run/numaflow/fb-sink.sock"
BATCH_MAP_SOCK_PATH = "/var/run/numaflow/batchmap.sock"

# Server information file configs
MAP_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info"
Expand All @@ -28,6 +29,7 @@
SIDE_INPUT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info"
SOURCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcer-server-info"
FALLBACK_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info"
BATCH_MAP_SERVER_INFO_FILE_PATH = "/var/run/numaflow/batchmapper-server-info"

ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
UD_CONTAINER_FALLBACK_SINK = "fb-udsink"
Expand All @@ -43,7 +45,10 @@
DROP = "U+005C__DROP__"

_PROCESS_COUNT = os.cpu_count()
MAX_THREADS = int(os.getenv("MAX_THREADS", "4"))
# Cap max value to 16
MAX_NUM_THREADS = 16
# If NUM_THREADS_DEFAULT env is not set default to 4
NUM_THREADS_DEFAULT = int(os.getenv("MAX_THREADS", "4"))

_LOGGER = setup_logging(__name__)
if os.getenv("PYTHONDEBUG"):
Expand Down
20 changes: 20 additions & 0 deletions pynumaflow/batchmapper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pynumaflow._constants import DROP

from pynumaflow.batchmapper._dtypes import (
Message,
Datum,
BatchMapper,
BatchResponses,
BatchResponse,
)
from pynumaflow.batchmapper.async_server import BatchMapAsyncServer

__all__ = [
"Message",
"Datum",
"DROP",
"BatchMapAsyncServer",
"BatchMapper",
"BatchResponses",
"BatchResponse",
]
Loading

0 comments on commit 921b7c0

Please sign in to comment.