diff --git a/.codecov.yml b/.codecov.yml index 9f40c590..9b15c177 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -14,3 +14,6 @@ ignore: - "pynumaflow/function/generated/udfunction_pb2.py" - "pynumaflow/function/_udfunction_pb2.pyi" - "pynumaflow/function/generated/udfunction_pb2_grpc.py" +- "pynumaflow/sink/generated/udsink_pb2.py" +- "pynumaflow/sink/_udsink_pb2.pyi" +- "pynumaflow/sink/generated/udsink_pb2_grpc.py" diff --git a/README.md b/README.md index d6fc18c8..99ffe7ff 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,7 @@ and [UDSinks](https://numaproj.github.io/numaflow/sinks/user-defined-sinks/) in ```python -from pynumaflow.function import Messages, Message, Datum -from pynumaflow.function.server import UserDefinedFunctionServicer +from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer def map_handler(key: str, datum: Datum) -> Messages: diff --git a/examples/function/forward_message/example.py b/examples/function/forward_message/example.py index 5d03d0c3..8571524d 100644 --- a/examples/function/forward_message/example.py +++ b/examples/function/forward_message/example.py @@ -1,5 +1,4 @@ -from pynumaflow.function import Messages, Message, Datum -from pynumaflow.function.server import UserDefinedFunctionServicer +from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer def map_handler(key: str, datum: Datum) -> Messages: diff --git a/examples/sink/simplesink/example.py b/examples/sink/simplesink/example.py index e26aa864..2d7f4422 100644 --- a/examples/sink/simplesink/example.py +++ b/examples/sink/simplesink/example.py @@ -1,16 +1,15 @@ from typing import List +from pynumaflow.sink import Datum, Responses, Response, UserDefinedSinkServicer -from pynumaflow.sink import Message, Responses, Response, HTTPSinkHandler - -def udsink_handler(messages: List[Message], __) -> Responses: +def udsink_handler(datums: List[Datum], __) -> Responses: responses = Responses() - for msg in messages: - print("Msg", msg) + for msg in datums: + print("User Defined Sink", msg) responses.append(Response.as_success(msg.id)) return responses if __name__ == "__main__": - handler = HTTPSinkHandler(udsink_handler) - handler.start() + grpc_server = UserDefinedSinkServicer(udsink_handler) + grpc_server.start() diff --git a/pynumaflow/_tools.py b/pynumaflow/_tools.py deleted file mode 100644 index 3776cb92..00000000 --- a/pynumaflow/_tools.py +++ /dev/null @@ -1,7 +0,0 @@ -class Singleton(type): - _instances = {} - - def __call__(cls, *args, **kwargs): - if cls not in cls._instances: - cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) - return cls._instances[cls] diff --git a/pynumaflow/exceptions.py b/pynumaflow/exceptions.py deleted file mode 100644 index 14cddfc8..00000000 --- a/pynumaflow/exceptions.py +++ /dev/null @@ -1,19 +0,0 @@ -class MarshalError(Exception): - """Exception raised for errors for unsupported UDF Content-type. - - Attributes: - udf_content_type -- the udf content type - """ - - def __init__(self, udf_content_type): - self.udf_content_type = udf_content_type - self.message = f"Unsupported UDF Content-type: {udf_content_type}" - super().__init__(self.message) - - -class MissingHeaderError(Exception): - pass - - -class InvalidContentTypeError(Exception): - pass diff --git a/pynumaflow/function/server.py b/pynumaflow/function/server.py index 552b0a8b..1338742f 100644 --- a/pynumaflow/function/server.py +++ b/pynumaflow/function/server.py @@ -14,8 +14,7 @@ FUNCTION_SOCK_PATH, DATUM_KEY, ) -from pynumaflow.function import Messages -from pynumaflow.function._dtypes import Datum +from pynumaflow.function import Messages, Datum from pynumaflow.types import NumaflowServicerContext if environ.get("PYTHONDEBUG"): @@ -36,8 +35,7 @@ class UserDefinedFunctionServicer(udfunction_pb2_grpc.UserDefinedFunctionService sock_path: Path to the UNIX Domain Socket Example invocation: - >>> from pynumaflow.function import Messages, Message, Datum - >>> from pynumaflow.function.server import UserDefinedFunctionServicer + >>> from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer >>> def map_handler(key: str, datum: Datum) -> Messages: ... val = datum.value ... _ = datum.event_time @@ -59,7 +57,7 @@ def MapFn( ) -> udfunction_pb2.DatumList: """ Applies a function to each datum element. - The camel case function name comes from the generated udfunction_pb2_grpc.py file. + The pascal case function name comes from the generated udfunction_pb2_grpc.py file. """ key = "" for metadata_key, metadata_value in context.invocation_metadata(): @@ -75,18 +73,18 @@ def MapFn( ), ) - datum_list = [] + datums = [] for msg in msgs.items(): - datum_list.append(udfunction_pb2.Datum(key=msg.key, value=msg.value)) + datums.append(udfunction_pb2.Datum(key=msg.key, value=msg.value)) - return udfunction_pb2.DatumList(elements=datum_list) + return udfunction_pb2.DatumList(elements=datums) def ReduceFn( self, request_iterator: Iterator[Datum], context: NumaflowServicerContext ) -> udfunction_pb2.DatumList: """ Applies a reduce function to a datum stream. - The camel case function name comes from the generated udfunction_pb2_grpc.py file. + The pascal case function name comes from the generated udfunction_pb2_grpc.py file. """ # TODO: implement Reduce function context.set_code(grpc.StatusCode.UNIMPLEMENTED) @@ -98,7 +96,7 @@ def IsReady( ) -> udfunction_pb2.ReadyResponse: """ IsReady is the heartbeat endpoint for gRPC. - The camel case function name comes from the generated udfunction_pb2_grpc.py file. + The pascal case function name comes from the generated udfunction_pb2_grpc.py file. """ return udfunction_pb2.ReadyResponse(ready=True) diff --git a/pynumaflow/sink/__init__.py b/pynumaflow/sink/__init__.py index 5314629b..422be7fb 100644 --- a/pynumaflow/sink/__init__.py +++ b/pynumaflow/sink/__init__.py @@ -1,4 +1,4 @@ -from pynumaflow.sink.handler import HTTPSinkHandler -from pynumaflow.sink._dtypes import Message, Response, Responses +from pynumaflow.sink._dtypes import Response, Responses, Datum +from pynumaflow.sink.server import UserDefinedSinkServicer -__all__ = ["HTTPSinkHandler", "Message", "Response", "Responses"] +__all__ = ["Response", "Responses", "Datum", "UserDefinedSinkServicer"] diff --git a/pynumaflow/sink/_dtypes.py b/pynumaflow/sink/_dtypes.py index 846b85bd..f95ba6ff 100644 --- a/pynumaflow/sink/_dtypes.py +++ b/pynumaflow/sink/_dtypes.py @@ -1,23 +1,11 @@ -import json from dataclasses import dataclass +from datetime import datetime from typing import List, TypeVar, Type, Optional -import msgpack - -from pynumaflow._constants import APPLICATION_JSON, APPLICATION_MSG_PACK -from pynumaflow.sink.encoder import NumaflowJSONEncoder, msgpack_encoding -from pynumaflow.exceptions import MarshalError - R = TypeVar("R", bound="Response") Rs = TypeVar("Rs", bound="Responses") -@dataclass -class Message: - id: str - payload: bytes - - @dataclass class Response: id: str @@ -49,9 +37,60 @@ def append(self, response: R) -> None: def items(self) -> List[R]: return self._responses - def dumps(self, udf_content_type: str) -> str: - if udf_content_type == APPLICATION_JSON: - return json.dumps(self._responses, cls=NumaflowJSONEncoder, separators=(",", ":")) - elif udf_content_type == APPLICATION_MSG_PACK: - return msgpack.dumps(self._responses, default=msgpack_encoding) - raise MarshalError(udf_content_type) + def dumps(self) -> str: + return str(self) + + +class Datum: + """ + Class to define the important information for the event. + Args: + value: the payload of the event. + event_time: the event time of the event. + watermark: the watermark of the event. + >>> # Example usage + >>> from pynumaflow.function import Datum + >>> from datetime import datetime, timezone + >>> payload = bytes("test_mock_message", encoding="utf-8") + >>> t1 = datetime.fromtimestamp(1662998400, timezone.utc) + >>> t2 = datetime.fromtimestamp(1662998460, timezone.utc) + >>> msg_id = "test_id" + >>> d = Datum(sink_msg_id=msg_id, value=payload, event_time=t1, watermark=t2) + """ + + def __init__(self, sink_msg_id: str, value: bytes, event_time: datetime, watermark: datetime): + self._id = sink_msg_id or "" + self._value = value or b"" + if not isinstance(event_time, datetime): + raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time") + self._event_time = event_time + if not isinstance(watermark, datetime): + raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark") + self._watermark = watermark + + def __str__(self): + value_string = self._value.decode("utf-8") + return f"id: {self._id}, value: {value_string}, event_time: {str(self._event_time)}, watermark: {str(self._watermark)}" + + def __repr__(self): + return str(self) + + @property + def id(self) -> str: + """Returns the id of the event.""" + return self._id + + @property + def value(self) -> bytes: + """Returns the value of the event.""" + return self._value + + @property + def event_time(self) -> datetime: + """Returns the event time of the event.""" + return self._event_time + + @property + def watermark(self) -> datetime: + """Returns the watermark of the event.""" + return self._watermark diff --git a/pynumaflow/sink/_udfunction_pb2.pyi b/pynumaflow/sink/_udfunction_pb2.pyi new file mode 100644 index 00000000..c3549c47 --- /dev/null +++ b/pynumaflow/sink/_udfunction_pb2.pyi @@ -0,0 +1,79 @@ +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import timestamp_pb2 as _timestamp_pb2 +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 +from typing import ( + ClassVar as _ClassVar, + Mapping as _Mapping, + Optional as _Optional, + Union as _Union, + List, +) + +DESCRIPTOR: _descriptor.FileDescriptor + +class ReadyResponse(_message.Message): + __slots__ = ["ready"] + READY_FIELD_NUMBER: _ClassVar[int] + ready: bool + def __init__(self, ready: _Optional[bool] = ...) -> None: ... + +class EventTime(_message.Message): + __slots__ = ["event_time"] + EVENT_TIME_FIELD_NUMBER: _ClassVar[int] + event_time: _timestamp_pb2.Timestamp + def __init__(self, event_time: _Optional[_timestamp_pb2.Timestamp] = ...) -> None: ... + +class Watermark(_message.Message): + __slots__ = ["watermark"] + WATERMARK_FIELD_NUMBER: _ClassVar[int] + watermark: _timestamp_pb2.Timestamp + def __init__(self, watermark: _Optional[_timestamp_pb2.Timestamp] = ...) -> None: ... + +class Datum(_message.Message): + __slots__ = ["key", "value", "event_time", "watermark", "id"] + KEY_FIELD_NUMBER: _ClassVar[int] + VALUE_FIELD_NUMBER: _ClassVar[int] + ID_FIELD_NUMBER: _ClassVar[int] + EVENT_TIME_FIELD_NUMBER: _ClassVar[int] + WATERMARK_FIELD_NUMBER: _ClassVar[int] + key: str + value: bytes + id: str + event_time: _timestamp_pb2.Timestamp + watermark: _timestamp_pb2.Timestamp + def __init__( + self, + key: _Optional[str], + value: _Optional[bytes], + id: _Optional[str], + event_time: _Optional[_timestamp_pb2.Timestamp] = ..., + watermark: _Optional[_timestamp_pb2.Timestamp] = ..., + ) -> None: ... + +class DatumList(_message.Message): + __slots__ = ["elements"] + ELEMENTS_FIELD_NUMBER: _ClassVar[int] + elements: List[Datum] + def __init__(self, elements: _Optional[List[Datum]]) -> None: ... + +class Response(_message.Message): + __slots__ = ["id", "success", "err_msg"] + ID_FIELD_NUMBER: _ClassVar[int] + SUCCESS_FIELD_NUMBER: _ClassVar[int] + ERR_MSG_FIELD_NUMBER: _ClassVar[int] + id: str + success: bool + err_msg: str + def __init__( + self, + id: _Optional[str], + success: _Optional[bool], + err_msg: _Optional[str], + ) -> None: ... + +class ResponseList(_message.Message): + __slots__ = ["responses"] + RESPONSES_FIELD_NUMBER: _ClassVar[int] + responses: List[Response] + def __init__(self, responses: _Optional[List[Response]]) -> None: ... diff --git a/pynumaflow/sink/decoder.py b/pynumaflow/sink/decoder.py deleted file mode 100644 index af4794cf..00000000 --- a/pynumaflow/sink/decoder.py +++ /dev/null @@ -1,38 +0,0 @@ -import base64 -import json -from typing import Dict, Any, List, Tuple - - -def msgpack_decoder(obj_pairs: List[Tuple[str, Any]]) -> Dict[str, Any]: - """ - Custom decoder for msgpack. This is needed to convert golang struct fields - which follow PascalCase to lower case Python fields. - - Args: - obj_pairs: List of object pairs for the object being unpacked - - Returns: A dictionary containing key value pairs - """ - result = {} - for key, value in obj_pairs: - result[key.lower()] = value - return result - - -class NumaflowJSONDecoder(json.JSONDecoder): - """ - Custom JSON decoder to deal with the payload field of Sink Messages object. - """ - - def __init__(self, *args, **kwargs): - json.JSONDecoder.__init__(self, object_pairs_hook=self.object_pairs_hook, *args, **kwargs) - - @staticmethod - def object_pairs_hook(elements: tuple) -> Dict[str, Any]: - obj_pairs = {} - for key, item in elements: - if key == "payload": - obj_pairs[key] = base64.b64decode(item.encode()) - else: - obj_pairs[key] = item - return obj_pairs diff --git a/pynumaflow/sink/encoder.py b/pynumaflow/sink/encoder.py deleted file mode 100644 index 4e369a31..00000000 --- a/pynumaflow/sink/encoder.py +++ /dev/null @@ -1,40 +0,0 @@ -import base64 -import json -from typing import Dict, Any - - -def msgpack_encoding(obj) -> Dict[str, Any]: - """ - Custom callable for msgpack to deal with types - involving User Defined Sinks. - - Args: - obj: Object to encode - - Returns: - Dictionary representation of the object - """ - from pynumaflow.sink import Response, Message as UDSinkMessage - - if isinstance(obj, UDSinkMessage): - obj = {"ID": obj.id, "Payload": obj.payload} - if isinstance(obj, Response): - return {"ID": obj.id, "Success": obj.success, "Err": obj.err} - return obj - - -class NumaflowJSONEncoder(json.JSONEncoder): - """ - Custom JSON encoder to deal with types involving User Defined Sinks. - """ - - def default(self, obj): - from pynumaflow.sink import Response, Message as UDSinkMessage - - if isinstance(obj, bytes): - return base64.b64encode(obj).decode("utf-8") - if isinstance(obj, Response): - return {"id": obj.id, "success": obj.success, "err": obj.err} - if isinstance(obj, UDSinkMessage): - return {"id": obj.id, "payload": obj.payload} - return json.JSONEncoder.default(self, obj) diff --git a/pynumaflow/sink/generated/__init__.py b/pynumaflow/sink/generated/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pynumaflow/sink/generated/udsink_pb2.py b/pynumaflow/sink/generated/udsink_pb2.py new file mode 100644 index 00000000..216e9142 --- /dev/null +++ b/pynumaflow/sink/generated/udsink_pb2.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: udsink.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x0cudsink.proto\x12\x07sink.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto";\n\tEventTime\x12.\n\nevent_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp":\n\tWatermark\x12-\n\twatermark\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp"~\n\x05\x44\x61tum\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12&\n\nevent_time\x18\x03 \x01(\x0b\x32\x12.sink.v1.EventTime\x12%\n\twatermark\x18\x04 \x01(\x0b\x32\x12.sink.v1.Watermark\x12\n\n\x02id\x18\x05 \x01(\t"-\n\tDatumList\x12 \n\x08\x65lements\x18\x01 \x03(\x0b\x32\x0e.sink.v1.Datum"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08"8\n\x08Response\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x0f\n\x07\x65rr_msg\x18\x03 \x01(\t"4\n\x0cResponseList\x12$\n\tresponses\x18\x01 \x03(\x0b\x32\x11.sink.v1.Response2\x81\x01\n\x0fUserDefinedSink\x12\x33\n\x06SinkFn\x12\x12.sink.v1.DatumList\x1a\x15.sink.v1.ResponseList\x12\x39\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x16.sink.v1.ReadyResponseB9Z7github.com/numaproj/numaflow-go/pkg/apis/protos/sink/v1b\x06proto3' +) + + +_EVENTTIME = DESCRIPTOR.message_types_by_name["EventTime"] +_WATERMARK = DESCRIPTOR.message_types_by_name["Watermark"] +_DATUM = DESCRIPTOR.message_types_by_name["Datum"] +_DATUMLIST = DESCRIPTOR.message_types_by_name["DatumList"] +_READYRESPONSE = DESCRIPTOR.message_types_by_name["ReadyResponse"] +_RESPONSE = DESCRIPTOR.message_types_by_name["Response"] +_RESPONSELIST = DESCRIPTOR.message_types_by_name["ResponseList"] +EventTime = _reflection.GeneratedProtocolMessageType( + "EventTime", + (_message.Message,), + { + "DESCRIPTOR": _EVENTTIME, + "__module__": "udsink_pb2" + # @@protoc_insertion_point(class_scope:sink.v1.EventTime) + }, +) +_sym_db.RegisterMessage(EventTime) + +Watermark = _reflection.GeneratedProtocolMessageType( + "Watermark", + (_message.Message,), + { + "DESCRIPTOR": _WATERMARK, + "__module__": "udsink_pb2" + # @@protoc_insertion_point(class_scope:sink.v1.Watermark) + }, +) +_sym_db.RegisterMessage(Watermark) + +Datum = _reflection.GeneratedProtocolMessageType( + "Datum", + (_message.Message,), + { + "DESCRIPTOR": _DATUM, + "__module__": "udsink_pb2" + # @@protoc_insertion_point(class_scope:sink.v1.Datum) + }, +) +_sym_db.RegisterMessage(Datum) + +DatumList = _reflection.GeneratedProtocolMessageType( + "DatumList", + (_message.Message,), + { + "DESCRIPTOR": _DATUMLIST, + "__module__": "udsink_pb2" + # @@protoc_insertion_point(class_scope:sink.v1.DatumList) + }, +) +_sym_db.RegisterMessage(DatumList) + +ReadyResponse = _reflection.GeneratedProtocolMessageType( + "ReadyResponse", + (_message.Message,), + { + "DESCRIPTOR": _READYRESPONSE, + "__module__": "udsink_pb2" + # @@protoc_insertion_point(class_scope:sink.v1.ReadyResponse) + }, +) +_sym_db.RegisterMessage(ReadyResponse) + +Response = _reflection.GeneratedProtocolMessageType( + "Response", + (_message.Message,), + { + "DESCRIPTOR": _RESPONSE, + "__module__": "udsink_pb2" + # @@protoc_insertion_point(class_scope:sink.v1.Response) + }, +) +_sym_db.RegisterMessage(Response) + +ResponseList = _reflection.GeneratedProtocolMessageType( + "ResponseList", + (_message.Message,), + { + "DESCRIPTOR": _RESPONSELIST, + "__module__": "udsink_pb2" + # @@protoc_insertion_point(class_scope:sink.v1.ResponseList) + }, +) +_sym_db.RegisterMessage(ResponseList) + +_USERDEFINEDSINK = DESCRIPTOR.services_by_name["UserDefinedSink"] +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b"Z7github.com/numaproj/numaflow-go/pkg/apis/protos/sink/v1" + _EVENTTIME._serialized_start = 87 + _EVENTTIME._serialized_end = 146 + _WATERMARK._serialized_start = 148 + _WATERMARK._serialized_end = 206 + _DATUM._serialized_start = 208 + _DATUM._serialized_end = 334 + _DATUMLIST._serialized_start = 336 + _DATUMLIST._serialized_end = 381 + _READYRESPONSE._serialized_start = 383 + _READYRESPONSE._serialized_end = 413 + _RESPONSE._serialized_start = 415 + _RESPONSE._serialized_end = 471 + _RESPONSELIST._serialized_start = 473 + _RESPONSELIST._serialized_end = 525 + _USERDEFINEDSINK._serialized_start = 528 + _USERDEFINEDSINK._serialized_end = 657 +# @@protoc_insertion_point(module_scope) diff --git a/pynumaflow/sink/generated/udsink_pb2_grpc.py b/pynumaflow/sink/generated/udsink_pb2_grpc.py new file mode 100644 index 00000000..6092a399 --- /dev/null +++ b/pynumaflow/sink/generated/udsink_pb2_grpc.py @@ -0,0 +1,125 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 +from pynumaflow.sink.generated import udsink_pb2 as udsink__pb2 + + +class UserDefinedSinkStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SinkFn = channel.unary_unary( + "/sink.v1.UserDefinedSink/SinkFn", + request_serializer=udsink__pb2.DatumList.SerializeToString, + response_deserializer=udsink__pb2.ResponseList.FromString, + ) + self.IsReady = channel.unary_unary( + "/sink.v1.UserDefinedSink/IsReady", + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=udsink__pb2.ReadyResponse.FromString, + ) + + +class UserDefinedSinkServicer(object): + """Missing associated documentation comment in .proto file.""" + + def SinkFn(self, request, context): + """SinkFn writes the Datum to a user defined sink.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def IsReady(self, request, context): + """IsReady is the heartbeat endpoint for gRPC.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_UserDefinedSinkServicer_to_server(servicer, server): + rpc_method_handlers = { + "SinkFn": grpc.unary_unary_rpc_method_handler( + servicer.SinkFn, + request_deserializer=udsink__pb2.DatumList.FromString, + response_serializer=udsink__pb2.ResponseList.SerializeToString, + ), + "IsReady": grpc.unary_unary_rpc_method_handler( + servicer.IsReady, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=udsink__pb2.ReadyResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "sink.v1.UserDefinedSink", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class UserDefinedSink(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def SinkFn( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/sink.v1.UserDefinedSink/SinkFn", + udsink__pb2.DatumList.SerializeToString, + udsink__pb2.ResponseList.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def IsReady( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/sink.v1.UserDefinedSink/IsReady", + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + udsink__pb2.ReadyResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/pynumaflow/sink/handler.py b/pynumaflow/sink/handler.py deleted file mode 100644 index dd683ec4..00000000 --- a/pynumaflow/sink/handler.py +++ /dev/null @@ -1,151 +0,0 @@ -import asyncio -import json -import logging -import os -from concurrent.futures import ThreadPoolExecutor -from typing import Callable, Any, Tuple, List - -import msgpack -from aiohttp import web - -from pynumaflow._constants import ( - SINK_SOCK_PATH, - APPLICATION_JSON, - APPLICATION_MSG_PACK, -) -from pynumaflow._tools import Singleton -from pynumaflow.sink.decoder import NumaflowJSONDecoder, msgpack_decoder -from pynumaflow.exceptions import InvalidContentTypeError -from pynumaflow.sink._dtypes import Responses, Message - -UDSinkCallable = Callable[[List[Message], Any], Responses] -_LOGGER = logging.getLogger(__name__) - - -class HTTPSinkHandler(metaclass=Singleton): - """ - Provides an interface to write a User Defined Sink (UDSink) - which will be exposed over HTTP. - - Args: - handler: Function callable following the type signature of UDSinkCallable - sock_path: Path to the UNIX Domain Socket - max_workers: Max number of threads to be spawned - - Example invocation: - >>> from pynumaflow.sink import Responses, Response, Message - >>> # mysinkfunc is a callable following the UDSinkCallable signature - >>> def mysinkfunc(messages: List[Message], __) -> Responses: - ... responses = Responses() - ... for msg in messages: - ... responses.append(Response.as_success(msg.id)) - ... return responses - ... - >>> http_handler = HTTPSinkHandler(mysinkfunc, sock_path="/tmp/uds.sock") - >>> http_handler.start() - """ - - app = web.Application() - - def __init__( - self, handler: UDSinkCallable, sock_path: str = SINK_SOCK_PATH, max_workers: int = 50 - ): - self.__handler = handler - self.sock_path = sock_path - self.max_workers = max_workers - self.executor = ThreadPoolExecutor(max_workers=max_workers) - - self.app.add_routes( - [ - web.get("/ready", self.ready_handler), - web.post("/messages", self.messages_handler), - ] - ) - self.app.on_shutdown.append(self.__on_shutdown) - - @staticmethod - async def ready_handler(*_) -> web.Response: - """ - End point for readiness check. - - Returns: - An aiohttp response object - """ - _LOGGER.info("READY") - return web.Response(status=204) - - @staticmethod - def decode(data: bytes, content_type: str) -> List[Message]: - """ - Decode the bytes array into a list of Sink Message instances. - Args: - data: encoded bytes data of Message instances - content_type: type of content used in the HTTP request - - Returns: - List of Sink Message objects. - - Raises: - InvalidContentTypeError: if the content type is not one of json or msgpack - """ - if content_type == APPLICATION_JSON: - messages = json.loads(data.decode("utf-8"), cls=NumaflowJSONDecoder) - elif content_type == APPLICATION_MSG_PACK: - messages = msgpack.unpackb(data, object_pairs_hook=msgpack_decoder) - else: - raise InvalidContentTypeError(f"Invalid Content-Type given: {content_type}") - return [Message(**params) for params in messages] - - async def messages_handler(self, request: web.Request) -> web.Response: - """ - End point for passing data through the User Defined Sink callable. - Args: - request: aiohttp Request object - - Returns: - aiohttp Response object - """ - content_type = request.headers.get("Content-Type") - try: - data: bytes = await request.content.read() - messages = self.decode(data, content_type) - - loop = asyncio.get_running_loop() - responses: Responses = await loop.run_in_executor( - self.executor, self.__handler, messages, {} - ) - - out_msgs = responses.dumps(content_type) - return web.Response(body=out_msgs, status=200, content_type=content_type) - - except Exception as err: - exception_type = type(err).__name__ - error_msg = f"Got an unexpected exception: {exception_type}, {repr(err)}" - _LOGGER.exception(error_msg) - return web.Response(body=error_msg, status=500) - - async def __on_shutdown(self, *_) -> None: - _LOGGER.info("SDK Server shutting down.") - self.executor.shutdown() - - async def __start_app(self) -> Tuple[web.AppRunner, web.UnixSite]: - runner = web.AppRunner(self.app) - await runner.setup() - site = web.UnixSite(runner, self.sock_path) - await site.start() - _LOGGER.info("Server starting at socket: %s with pid %s", site.name, os.getpid()) - return runner, site - - def start(self) -> None: - """ - Starts the server on the given UNIX socket. - """ - loop = asyncio.get_event_loop() - runner, site = loop.run_until_complete(self.__start_app()) - - _LOGGER.info("Running ThreadPool with max workers: %s", self.max_workers) - - try: - loop.run_forever() - except KeyboardInterrupt: - loop.run_until_complete(runner.cleanup()) diff --git a/pynumaflow/sink/server.py b/pynumaflow/sink/server.py new file mode 100644 index 00000000..935a603d --- /dev/null +++ b/pynumaflow/sink/server.py @@ -0,0 +1,105 @@ +import asyncio +import logging +from os import environ + + +from google.protobuf import empty_pb2 as _empty_pb2 + +import grpc +from typing import Callable, Any, Iterator, List + +from pynumaflow._constants import ( + SINK_SOCK_PATH, + DATUM_KEY, +) +from pynumaflow.sink import Response, Responses, Datum +from pynumaflow.sink.generated import udsink_pb2_grpc, udsink_pb2 +from pynumaflow.types import NumaflowServicerContext + +if environ.get("PYTHONDEBUG"): + logging.basicConfig(level=logging.DEBUG) + +_LOGGER = logging.getLogger(__name__) + +UDSinkCallable = Callable[[List[Datum], Any], Responses] + + +class UserDefinedSinkServicer(udsink_pb2_grpc.UserDefinedSinkServicer): + """ + Provides an interface to write a User Defined Sink (UDSink) + which will be exposed over gRPC. + + Args: + sink_handler: Function callable following the type signature of UDSinkCallable + sock_path: Path to the UNIX Domain Socket + + Example invocation: + >>> from typing import List + >>> from pynumaflow.sink import Datum, Responses, Response, UserDefinedSinkServicer + >>> def udsink_handler(datums: List[Datum]) -> Responses: + ... responses = Responses() + ... for msg in datums: + ... print("User Defined Sink", msg) + ... responses.append(Response.as_success(msg.id)) + ... return responses + >>> grpc_server = UserDefinedSinkServicer(udsink_handler) + >>> grpc_server.start() + """ + + def __init__(self, sink_handler: UDSinkCallable, sock_path=SINK_SOCK_PATH): + self.__sink_handler: UDSinkCallable = sink_handler + self.sock_path = sock_path + self._cleanup_coroutines = [] + + def SinkFn( + self, request: udsink_pb2.DatumList, context: NumaflowServicerContext + ) -> udsink_pb2.ResponseList: + """ + Applies a sink function to a list of datum elements. + The pascal case function name comes from the generated udsink_pb2_grpc.py file. + """ + + msgs = self.__sink_handler(request.elements) + + responses = [] + for msg in msgs.items(): + responses.append(udsink_pb2.Response(id=msg.id, success=msg.success, err_msg=msg.err)) + + return udsink_pb2.ResponseList(responses=responses) + + def IsReady( + self, request: _empty_pb2.Empty, context: NumaflowServicerContext + ) -> udsink_pb2.ReadyResponse: + """ + IsReady is the heartbeat endpoint for gRPC. + The pascal case function name comes from the generated udsink_pb2_grpc.py file. + """ + return udsink_pb2.ReadyResponse(ready=True) + + async def __serve(self) -> None: + server = grpc.aio.server() + udsink_pb2_grpc.add_UserDefinedSinkServicer_to_server( + UserDefinedSinkServicer(self.__sink_handler), server + ) + server.add_insecure_port(self.sock_path) + _LOGGER.info("Server listening on: %s", self.sock_path) + await server.start() + + async def server_graceful_shutdown(): + logging.info("Starting graceful shutdown...") + # Shuts down the server with 5 seconds of grace period. During the + # grace period, the server won't accept new connections and allow + # existing RPCs to continue within the grace period. + await server.stop(5) + + self._cleanup_coroutines.append(server_graceful_shutdown()) + await server.wait_for_termination() + + def start(self) -> None: + """Starts the server on the given UNIX socket.""" + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(self.__serve()) + finally: + loop.run_until_complete(*self._cleanup_coroutines) + loop.close() diff --git a/pynumaflow/tests/function/test_messages.py b/pynumaflow/tests/function/test_messages.py index ecb219f9..e586aa32 100644 --- a/pynumaflow/tests/function/test_messages.py +++ b/pynumaflow/tests/function/test_messages.py @@ -63,6 +63,11 @@ def test_items(self): self.assertEqual(len(mock_obj), len(msgs.items())) self.assertEqual(mock_obj[0]["Key"], msgs.items()[0]["Key"]) self.assertEqual(mock_obj[0]["Value"], msgs.items()[0]["Value"]) + self.assertEqual( + "[{'Key': b'U+005C__ALL__', 'Value': b'test_mock_message'}, " + "{'Key': b'U+005C__ALL__', 'Value': b'test_mock_message'}]", + repr(msgs), + ) def test_append(self): msgs = Messages() @@ -102,6 +107,11 @@ def test_dump(self): msgs.dumps(), ) + def test_load(self): + # to improve codecov + msgs = Messages() + msgs.loads() + if __name__ == "__main__": unittest.main() diff --git a/pynumaflow/tests/sink/test_datatypes.py b/pynumaflow/tests/sink/test_datatypes.py new file mode 100644 index 00000000..27ba32bd --- /dev/null +++ b/pynumaflow/tests/sink/test_datatypes.py @@ -0,0 +1,99 @@ +import unittest +from datetime import datetime, timezone +from google.protobuf import timestamp_pb2 as _timestamp_pb2 + + +from pynumaflow.sink._dtypes import ( + Datum, +) + + +def mock_message(): + msg = bytes("test_mock_message", encoding="utf-8") + return msg + + +def mock_event_time(): + t = datetime.fromtimestamp(1662998400, timezone.utc) + return t + + +def mock_watermark(): + t = datetime.fromtimestamp(1662998460, timezone.utc) + return t + + +class TestDatum(unittest.TestCase): + def test_err_event_time(self): + ts = _timestamp_pb2.Timestamp() + ts.GetCurrentTime() + with self.assertRaises(Exception) as context: + Datum(sink_msg_id="test_id_0", value=mock_message(), event_time=ts, watermark=ts) + self.assertEqual( + "Wrong data type: for Datum.event_time", + str(context.exception), + ) + + def test_err_watermark(self): + ts = _timestamp_pb2.Timestamp() + ts.GetCurrentTime() + with self.assertRaises(Exception) as context: + Datum( + sink_msg_id="test_id_0", + value=mock_message(), + event_time=mock_event_time(), + watermark=ts, + ) + self.assertEqual( + "Wrong data type: for Datum.watermark", + str(context.exception), + ) + + def test_value(self): + d = Datum( + sink_msg_id="test_id_0", + value=mock_message(), + event_time=mock_event_time(), + watermark=mock_watermark(), + ) + self.assertEqual(mock_message(), d.value) + self.assertEqual( + "id: test_id_0, value: test_mock_message, " + "event_time: 2022-09-12 16:00:00+00:00, watermark: 2022-09-12 16:01:00+00:00", + str(d), + ) + self.assertEqual( + "id: test_id_0, value: test_mock_message, event_time: 2022-09-12 16:00:00+00:00, watermark: 2022-09-12 16:01:00+00:00", + repr(d), + ) + + def test_id(self): + d = Datum( + sink_msg_id="test_id_0", + value=mock_message(), + event_time=mock_event_time(), + watermark=mock_watermark(), + ) + self.assertEqual("test_id_0", d.id) + + def test_event_time(self): + d = Datum( + sink_msg_id="test_id_0", + value=mock_message(), + event_time=mock_event_time(), + watermark=mock_watermark(), + ) + self.assertEqual(mock_event_time(), d.event_time) + + def test_watermark(self): + d = Datum( + sink_msg_id="test_id_0", + value=mock_message(), + event_time=mock_event_time(), + watermark=mock_watermark(), + ) + self.assertEqual(mock_watermark(), d.watermark) + + +if __name__ == "__main__": + unittest.main() diff --git a/pynumaflow/tests/sink/test_handler.py b/pynumaflow/tests/sink/test_handler.py deleted file mode 100644 index e4fef200..00000000 --- a/pynumaflow/tests/sink/test_handler.py +++ /dev/null @@ -1,84 +0,0 @@ -import json -import tempfile -import unittest -from typing import List - -import msgpack -from aiohttp.test_utils import AioHTTPTestCase - -from pynumaflow._constants import APPLICATION_JSON, APPLICATION_MSG_PACK -from pynumaflow.sink.decoder import msgpack_decoder -from pynumaflow.sink.encoder import msgpack_encoding, NumaflowJSONEncoder -from pynumaflow.sink import HTTPSinkHandler, Responses, Response, Message - - -def udsink_handler(messages: List[Message], __) -> Responses: - responses = Responses() - for msg in messages: - responses.append(Response.as_success(msg.id)) - return responses - - -class TestHTTPSinkHandler(AioHTTPTestCase): - async def tearDownAsync(self) -> None: - await self.handler.app.shutdown() - - async def get_application(self): - with tempfile.TemporaryDirectory() as tmp_dir: - self.handler = HTTPSinkHandler(udsink_handler, sock_path=tmp_dir) - return self.handler.app - - async def test_handler_routes(self): - async with self.client.request("GET", "/ready") as resp: - self.assertEqual(resp.status, 204) - - dummy_data = [ - Message(id="1993", payload=b"random 1"), - Message(id="1994", payload=b"random 2"), - ] - - # with msgpack content - msgpack_data = msgpack.dumps(dummy_data, default=msgpack_encoding) - async with self.client.request( - "POST", "/messages", data=msgpack_data, headers={"Content-Type": APPLICATION_MSG_PACK} - ) as resp: - self.assertEqual(resp.status, 200) - binary_content = await resp.read() - self.assertListEqual( - [ - {"id": "1993", "success": True, "err": None}, - {"id": "1994", "success": True, "err": None}, - ], - msgpack.unpackb(binary_content, object_pairs_hook=msgpack_decoder), - ) - - # with json content - json_data = json.dumps(dummy_data, cls=NumaflowJSONEncoder, separators=(",", ":")) - async with self.client.request( - "POST", "/messages", data=json_data, headers={"Content-Type": APPLICATION_JSON} - ) as resp: - self.assertEqual(resp.status, 200) - text = await resp.text() - self.assertListEqual( - [ - {"id": "1993", "success": True, "err": None}, - {"id": "1994", "success": True, "err": None}, - ], - json.loads(text), - ) - - # without header - json_data = json.dumps(dummy_data, cls=NumaflowJSONEncoder, separators=(",", ":")) - async with self.client.request("POST", "/messages", data=json_data) as resp: - self.assertEqual(resp.status, 500) - - -class TestHTTPSinkHandlerSingleton(unittest.TestCase): - def test_singleton(self): - handler_1 = HTTPSinkHandler(udsink_handler) - handler_2 = HTTPSinkHandler(lambda x: x + 1) - self.assertEqual(id(handler_1), id(handler_2)) - - -if __name__ == "__main__": - unittest.main() diff --git a/pynumaflow/tests/sink/test_responses.py b/pynumaflow/tests/sink/test_responses.py index 8e4126b3..d392da42 100644 --- a/pynumaflow/tests/sink/test_responses.py +++ b/pynumaflow/tests/sink/test_responses.py @@ -1,7 +1,4 @@ import unittest - -from pynumaflow._constants import APPLICATION_JSON, APPLICATION_MSG_PACK -from pynumaflow.exceptions import MarshalError from pynumaflow.sink import Response, Responses @@ -24,18 +21,20 @@ def setUp(self) -> None: def test_responses(self): self.resps.append(Response.as_success("4")) self.assertEqual(3, len(self.resps.items())) + self.assertEqual( + "[Response(id='2', success=True, err=None), " + "Response(id='3', success=False, err='RuntimeError encountered!'), " + "Response(id='4', success=True, err=None)]", + repr(self.resps), + ) - def test_dumps_json(self): - resps_json = self.resps.dumps(APPLICATION_JSON) - self.assertTrue(resps_json) - - def test_dumps_msgpack(self): - resps_msgpack = self.resps.dumps(APPLICATION_MSG_PACK) - self.assertTrue(resps_msgpack) - - def test_dumps_err(self): - with self.assertRaises(MarshalError): - self.resps.dumps("random_content_type") + def test_dumps(self): + dump_str = self.resps.dumps() + self.assertEqual( + "[Response(id='2', success=True, err=None), " + "Response(id='3', success=False, err='RuntimeError encountered!')]", + dump_str, + ) if __name__ == "__main__": diff --git a/pynumaflow/tests/sink/test_server.py b/pynumaflow/tests/sink/test_server.py new file mode 100644 index 00000000..b3581039 --- /dev/null +++ b/pynumaflow/tests/sink/test_server.py @@ -0,0 +1,114 @@ +import tempfile +import unittest +from datetime import datetime, timezone +from typing import List + +from grpc import StatusCode +from grpc_testing import server_from_dictionary, strict_real_time +from google.protobuf import timestamp_pb2 as _timestamp_pb2 +from google.protobuf import empty_pb2 as _empty_pb2 + + +from pynumaflow._constants import DATUM_KEY +from pynumaflow.sink import Responses, Datum, Response, UserDefinedSinkServicer +from pynumaflow.sink.generated import udsink_pb2 + + +def udsink_handler(datums: List[Datum]) -> Responses: + responses = Responses() + for msg in datums: + if "err" in msg.value.decode("utf-8"): + responses.append(Response.as_failure(msg.id, "mock sink message error")) + else: + responses.append(Response.as_success(msg.id)) + return responses + + +def mock_message(): + msg = bytes("test_mock_message", encoding="utf-8") + return msg + + +def mock_err_message(): + msg = bytes("test_mock_err_message", encoding="utf-8") + return msg + + +def mock_event_time(): + t = datetime.fromtimestamp(1662998400, timezone.utc) + return t + + +def mock_watermark(): + t = datetime.fromtimestamp(1662998460, timezone.utc) + return t + + +class TestServer(unittest.TestCase): + def __init__(self, method_name) -> None: + super().__init__(method_name) + + my_servicer = UserDefinedSinkServicer(udsink_handler) + services = {udsink_pb2.DESCRIPTOR.services_by_name["UserDefinedSink"]: my_servicer} + self.test_server = server_from_dictionary(services, strict_real_time()) + + def test_is_ready(self): + method = self.test_server.invoke_unary_unary( + method_descriptor=( + udsink_pb2.DESCRIPTOR.services_by_name["UserDefinedSink"].methods_by_name["IsReady"] + ), + invocation_metadata={}, + request=_empty_pb2.Empty(), + timeout=1, + ) + + response, metadata, code, details = method.termination() + expected = udsink_pb2.ReadyResponse(ready=True) + self.assertEqual(expected, response) + self.assertEqual(code, StatusCode.OK) + + def test_forward_message(self): + event_time_timestamp = _timestamp_pb2.Timestamp() + event_time_timestamp.FromDatetime(dt=mock_event_time()) + watermark_timestamp = _timestamp_pb2.Timestamp() + watermark_timestamp.FromDatetime(dt=mock_watermark()) + + test_datums = [ + udsink_pb2.Datum( + id="test_id_0", + value=mock_message(), + event_time=udsink_pb2.EventTime(event_time=event_time_timestamp), + watermark=udsink_pb2.Watermark(watermark=watermark_timestamp), + ), + udsink_pb2.Datum( + id="test_id_1", + value=mock_err_message(), + event_time=udsink_pb2.EventTime(event_time=event_time_timestamp), + watermark=udsink_pb2.Watermark(watermark=watermark_timestamp), + ), + ] + + request = udsink_pb2.DatumList(elements=test_datums) + + method = self.test_server.invoke_unary_unary( + method_descriptor=( + udsink_pb2.DESCRIPTOR.services_by_name["UserDefinedSink"].methods_by_name["SinkFn"] + ), + invocation_metadata={}, + request=request, + timeout=1, + ) + + response, metadata, code, details = method.termination() + self.assertEqual(2, len(response.responses)) + self.assertEqual("test_id_0", response.responses[0].id) + self.assertEqual("test_id_1", response.responses[1].id) + self.assertTrue(response.responses[0].success) + self.assertFalse(response.responses[1].success) + self.assertEqual("", response.responses[0].err_msg) + self.assertEqual("mock sink message error", response.responses[1].err_msg) + self.assertEqual(code, StatusCode.OK) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyproject.toml b/pyproject.toml index 9f585621..f24f0fc7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pynumaflow" -version = "0.2.0" +version = "0.2.1" description = "Provides the interfaces of writing Python User Defined Functions and Sinks for NumaFlow." authors = ["NumaFlow Developers"] readme = "README.md"