Skip to content

Commit

Permalink
feat: udsink gRPC (#5)
Browse files Browse the repository at this point in the history
Co-authored-by: jyu6 <[email protected]>
Co-authored-by: Avik Basu <[email protected]>
  • Loading branch information
3 people authored Sep 19, 2022
1 parent 496543c commit 043e109
Show file tree
Hide file tree
Showing 23 changed files with 756 additions and 397 deletions.
3 changes: 3 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ ignore:
- "pynumaflow/function/generated/udfunction_pb2.py"
- "pynumaflow/function/_udfunction_pb2.pyi"
- "pynumaflow/function/generated/udfunction_pb2_grpc.py"
- "pynumaflow/sink/generated/udsink_pb2.py"
- "pynumaflow/sink/_udsink_pb2.pyi"
- "pynumaflow/sink/generated/udsink_pb2_grpc.py"
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ and [UDSinks](https://numaproj.github.io/numaflow/sinks/user-defined-sinks/) in

```python

from pynumaflow.function import Messages, Message, Datum
from pynumaflow.function.server import UserDefinedFunctionServicer
from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer


def map_handler(key: str, datum: Datum) -> Messages:
Expand Down
3 changes: 1 addition & 2 deletions examples/function/forward_message/example.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from pynumaflow.function import Messages, Message, Datum
from pynumaflow.function.server import UserDefinedFunctionServicer
from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer


def map_handler(key: str, datum: Datum) -> Messages:
Expand Down
13 changes: 6 additions & 7 deletions examples/sink/simplesink/example.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from typing import List
from pynumaflow.sink import Datum, Responses, Response, UserDefinedSinkServicer

from pynumaflow.sink import Message, Responses, Response, HTTPSinkHandler


def udsink_handler(messages: List[Message], __) -> Responses:
def udsink_handler(datums: List[Datum], __) -> Responses:
responses = Responses()
for msg in messages:
print("Msg", msg)
for msg in datums:
print("User Defined Sink", msg)
responses.append(Response.as_success(msg.id))
return responses


if __name__ == "__main__":
handler = HTTPSinkHandler(udsink_handler)
handler.start()
grpc_server = UserDefinedSinkServicer(udsink_handler)
grpc_server.start()
7 changes: 0 additions & 7 deletions pynumaflow/_tools.py

This file was deleted.

19 changes: 0 additions & 19 deletions pynumaflow/exceptions.py

This file was deleted.

18 changes: 8 additions & 10 deletions pynumaflow/function/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
FUNCTION_SOCK_PATH,
DATUM_KEY,
)
from pynumaflow.function import Messages
from pynumaflow.function._dtypes import Datum
from pynumaflow.function import Messages, Datum
from pynumaflow.types import NumaflowServicerContext

if environ.get("PYTHONDEBUG"):
Expand All @@ -36,8 +35,7 @@ class UserDefinedFunctionServicer(udfunction_pb2_grpc.UserDefinedFunctionService
sock_path: Path to the UNIX Domain Socket
Example invocation:
>>> from pynumaflow.function import Messages, Message, Datum
>>> from pynumaflow.function.server import UserDefinedFunctionServicer
>>> from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer
>>> def map_handler(key: str, datum: Datum) -> Messages:
... val = datum.value
... _ = datum.event_time
Expand All @@ -59,7 +57,7 @@ def MapFn(
) -> udfunction_pb2.DatumList:
"""
Applies a function to each datum element.
The camel case function name comes from the generated udfunction_pb2_grpc.py file.
The pascal case function name comes from the generated udfunction_pb2_grpc.py file.
"""
key = ""
for metadata_key, metadata_value in context.invocation_metadata():
Expand All @@ -75,18 +73,18 @@ def MapFn(
),
)

datum_list = []
datums = []
for msg in msgs.items():
datum_list.append(udfunction_pb2.Datum(key=msg.key, value=msg.value))
datums.append(udfunction_pb2.Datum(key=msg.key, value=msg.value))

return udfunction_pb2.DatumList(elements=datum_list)
return udfunction_pb2.DatumList(elements=datums)

def ReduceFn(
self, request_iterator: Iterator[Datum], context: NumaflowServicerContext
) -> udfunction_pb2.DatumList:
"""
Applies a reduce function to a datum stream.
The camel case function name comes from the generated udfunction_pb2_grpc.py file.
The pascal case function name comes from the generated udfunction_pb2_grpc.py file.
"""
# TODO: implement Reduce function
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
Expand All @@ -98,7 +96,7 @@ def IsReady(
) -> udfunction_pb2.ReadyResponse:
"""
IsReady is the heartbeat endpoint for gRPC.
The camel case function name comes from the generated udfunction_pb2_grpc.py file.
The pascal case function name comes from the generated udfunction_pb2_grpc.py file.
"""
return udfunction_pb2.ReadyResponse(ready=True)

Expand Down
6 changes: 3 additions & 3 deletions pynumaflow/sink/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pynumaflow.sink.handler import HTTPSinkHandler
from pynumaflow.sink._dtypes import Message, Response, Responses
from pynumaflow.sink._dtypes import Response, Responses, Datum
from pynumaflow.sink.server import UserDefinedSinkServicer

__all__ = ["HTTPSinkHandler", "Message", "Response", "Responses"]
__all__ = ["Response", "Responses", "Datum", "UserDefinedSinkServicer"]
77 changes: 58 additions & 19 deletions pynumaflow/sink/_dtypes.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,11 @@
import json
from dataclasses import dataclass
from datetime import datetime
from typing import List, TypeVar, Type, Optional

import msgpack

from pynumaflow._constants import APPLICATION_JSON, APPLICATION_MSG_PACK
from pynumaflow.sink.encoder import NumaflowJSONEncoder, msgpack_encoding
from pynumaflow.exceptions import MarshalError

R = TypeVar("R", bound="Response")
Rs = TypeVar("Rs", bound="Responses")


@dataclass
class Message:
id: str
payload: bytes


@dataclass
class Response:
id: str
Expand Down Expand Up @@ -49,9 +37,60 @@ def append(self, response: R) -> None:
def items(self) -> List[R]:
return self._responses

def dumps(self, udf_content_type: str) -> str:
if udf_content_type == APPLICATION_JSON:
return json.dumps(self._responses, cls=NumaflowJSONEncoder, separators=(",", ":"))
elif udf_content_type == APPLICATION_MSG_PACK:
return msgpack.dumps(self._responses, default=msgpack_encoding)
raise MarshalError(udf_content_type)
def dumps(self) -> str:
return str(self)


class Datum:
"""
Class to define the important information for the event.
Args:
value: the payload of the event.
event_time: the event time of the event.
watermark: the watermark of the event.
>>> # Example usage
>>> from pynumaflow.function import Datum
>>> from datetime import datetime, timezone
>>> payload = bytes("test_mock_message", encoding="utf-8")
>>> t1 = datetime.fromtimestamp(1662998400, timezone.utc)
>>> t2 = datetime.fromtimestamp(1662998460, timezone.utc)
>>> msg_id = "test_id"
>>> d = Datum(sink_msg_id=msg_id, value=payload, event_time=t1, watermark=t2)
"""

def __init__(self, sink_msg_id: str, value: bytes, event_time: datetime, watermark: datetime):
self._id = sink_msg_id or ""
self._value = value or b""
if not isinstance(event_time, datetime):
raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time")
self._event_time = event_time
if not isinstance(watermark, datetime):
raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
self._watermark = watermark

def __str__(self):
value_string = self._value.decode("utf-8")
return f"id: {self._id}, value: {value_string}, event_time: {str(self._event_time)}, watermark: {str(self._watermark)}"

def __repr__(self):
return str(self)

@property
def id(self) -> str:
"""Returns the id of the event."""
return self._id

@property
def value(self) -> bytes:
"""Returns the value of the event."""
return self._value

@property
def event_time(self) -> datetime:
"""Returns the event time of the event."""
return self._event_time

@property
def watermark(self) -> datetime:
"""Returns the watermark of the event."""
return self._watermark
79 changes: 79 additions & 0 deletions pynumaflow/sink/_udfunction_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import timestamp_pb2 as _timestamp_pb2
from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
from typing import (
ClassVar as _ClassVar,
Mapping as _Mapping,
Optional as _Optional,
Union as _Union,
List,
)

DESCRIPTOR: _descriptor.FileDescriptor

class ReadyResponse(_message.Message):
__slots__ = ["ready"]
READY_FIELD_NUMBER: _ClassVar[int]
ready: bool
def __init__(self, ready: _Optional[bool] = ...) -> None: ...

class EventTime(_message.Message):
__slots__ = ["event_time"]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
event_time: _timestamp_pb2.Timestamp
def __init__(self, event_time: _Optional[_timestamp_pb2.Timestamp] = ...) -> None: ...

class Watermark(_message.Message):
__slots__ = ["watermark"]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
watermark: _timestamp_pb2.Timestamp
def __init__(self, watermark: _Optional[_timestamp_pb2.Timestamp] = ...) -> None: ...

class Datum(_message.Message):
__slots__ = ["key", "value", "event_time", "watermark", "id"]
KEY_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
key: str
value: bytes
id: str
event_time: _timestamp_pb2.Timestamp
watermark: _timestamp_pb2.Timestamp
def __init__(
self,
key: _Optional[str],
value: _Optional[bytes],
id: _Optional[str],
event_time: _Optional[_timestamp_pb2.Timestamp] = ...,
watermark: _Optional[_timestamp_pb2.Timestamp] = ...,
) -> None: ...

class DatumList(_message.Message):
__slots__ = ["elements"]
ELEMENTS_FIELD_NUMBER: _ClassVar[int]
elements: List[Datum]
def __init__(self, elements: _Optional[List[Datum]]) -> None: ...

class Response(_message.Message):
__slots__ = ["id", "success", "err_msg"]
ID_FIELD_NUMBER: _ClassVar[int]
SUCCESS_FIELD_NUMBER: _ClassVar[int]
ERR_MSG_FIELD_NUMBER: _ClassVar[int]
id: str
success: bool
err_msg: str
def __init__(
self,
id: _Optional[str],
success: _Optional[bool],
err_msg: _Optional[str],
) -> None: ...

class ResponseList(_message.Message):
__slots__ = ["responses"]
RESPONSES_FIELD_NUMBER: _ClassVar[int]
responses: List[Response]
def __init__(self, responses: _Optional[List[Response]]) -> None: ...
38 changes: 0 additions & 38 deletions pynumaflow/sink/decoder.py

This file was deleted.

40 changes: 0 additions & 40 deletions pynumaflow/sink/encoder.py

This file was deleted.

Empty file.
Loading

0 comments on commit 043e109

Please sign in to comment.