Skip to content

Commit

Permalink
Python Ingestion Service (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
solidiquis authored Jun 14, 2024
1 parent 4e9f25c commit d6993b0
Show file tree
Hide file tree
Showing 156 changed files with 1,097 additions and 14 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ examples/python/annotations/build/**/*
examples/python/annotations/dist/**/*
examples/rust/annotations/target/**/*
examples/rust/ingestion_with_config/target/**/*

rust/target/**/*
rust/protos/**/*

go/protos/**/*
python/protos/**/*
.DS_Store

python/build
*.egg-info/
2 changes: 2 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
SHELL := /bin/zsh

gen:
bash scripts/gen.sh

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
5 changes: 5 additions & 0 deletions python/lib/sift_internal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
INTERNAL MODULE
This module is for internal use only.
"""
Empty file.
40 changes: 40 additions & 0 deletions python/lib/sift_internal/convert/protobuf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from abc import ABC, abstractmethod
from google.protobuf.message import Message
from typing import cast, Type, TypeVar

ProtobufMessage = Message


class AsProtobuf(ABC):
"""
Flexible abstract class to create classes that can be converted into different protobuf
targets. All conversion logic goes inside of `as_pb` and should do a runtime check for
class name, module name, and/or things of the like using the `klass` argument available
to generate the appropriate protobuf target.
The `as_pb` method should rarely be used directly since it returns the super-type. Prefer
to use the `cast_pb` function to convert sub-types of `AsProtobuf` to the concrete protobuf type.
"""

@abstractmethod
def as_pb(self, klass: Type[Message]) -> Message:
pass


T = TypeVar("T", bound=ProtobufMessage)


def try_convert_pb(val: AsProtobuf, target_type: Type[T]) -> T:
"""
Utility to convert sub-types of `AsProtobuf` to its concrete protobuf type.
Will raise a `TypeError` if the underlying type of `val` is not `target_type`.
"""

pb_value = val.as_pb(target_type)

if isinstance(pb_value, target_type):
return cast(target_type, pb_value)
else:
raise TypeError(
f"Expected `val` to be a '{target_type.__module__}.{target_type.__name__}' but it is a '{val.__module__}.{val.__class__.__name__}'."
)
15 changes: 15 additions & 0 deletions python/lib/sift_internal/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Any, Optional, Type, TypeVar

T = TypeVar("T")
def any_as(val: Any, target_type: Type[T]) -> Optional[T]:
"""
Tries to cast `val` into `target_type` otherwise returns `None`.
"""

if val is None:
return None

if isinstance(val, target_type):
return val
else:
return None
Empty file added python/lib/sift_py/__init__.py
Empty file.
Empty file.
43 changes: 43 additions & 0 deletions python/lib/sift_py/grpc/interceptors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""
Internal Module: This module contains implementation details and is not intended for external use.
This module is concerned with defining interceptors for unary and streaming RPCs. Any sub-class
of `ClientInterceptor` will be invoked for all types of RPCs: unary-unary, unary-stream, stream-unary,
and stream-stream. To create interceptors for particular kinds of RPCs you'll need to create a sub-class
for the particular types of interceptors found in the base `grpc` module.
"""

from __future__ import annotations
from grpc_interceptor import ClientInterceptor, ClientCallDetails
from typing import Any, Callable, cast, List, Tuple

import grpc

Metadata = List[Tuple[str, str]]


class MetadataInterceptor(ClientInterceptor):
"""
Interceptor to add metadata to all unary and streaming RPCs
"""

def __init__(self, metadata: Metadata):
self.metadata = metadata

def intercept(
self,
method: Callable,
request_or_iterator: Any,
call_details: grpc.ClientCallDetails,
):
call_details = cast(ClientCallDetails, call_details)
new_details = ClientCallDetails(
call_details.method,
call_details.timeout,
self.metadata,
call_details.credentials,
call_details.wait_for_ready,
call_details.compression,
)

return method(request_or_iterator, new_details)
67 changes: 67 additions & 0 deletions python/lib/sift_py/grpc/transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
This module is concerned with creating a gRPC transport channel specifically for
interacting with Sift's gRPC API. the `use_sift_channel` method creates said channel
and should generally be used within a with-block for correct resource management.
"""

from __future__ import annotations
from grpc_interceptor import ClientInterceptor
from .interceptors import Metadata, MetadataInterceptor
from typing import List, TypedDict

import grpc

SiftChannel = grpc.Channel


def use_sift_channel(config: SiftChannelConfig) -> SiftChannel:
"""
Returns an intercepted channel that is meant to be used across all services that
make RPCs to Sift's API. It is highly encouraged to use this within a with-block
for correct resouce clean-up as to ensure no long-lived idle channels.
"""
credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(config["uri"], credentials)
interceptors = _compute_sift_interceptors(config)
return grpc.intercept_channel(channel, *interceptors)


def use_insecure_sift_channel(config: SiftChannelConfig) -> SiftChannel:
"""
FOR DEVELOPMENT PURPOSES ONLY
"""
channel = grpc.insecure_channel(config["uri"])
interceptors = _compute_sift_interceptors(config)
return grpc.intercept_channel(channel, *interceptors)


def _compute_sift_interceptors(config: SiftChannelConfig) -> List[ClientInterceptor]:
"""
Initialized all interceptors here.
"""
return [
_metadata_interceptor(config),
]


def _metadata_interceptor(config: SiftChannelConfig) -> ClientInterceptor:
"""
Any new metadata goes here.
"""
metadata: Metadata = [
("authorization", f"Bearer {config["apikey"]}"),
]
return MetadataInterceptor(metadata)


class SiftChannelConfig(TypedDict):
"""
Config class used to instantiate a `SiftChannel` via `use_sift_channel`.
Attributes:
uri: The URI of Sift's gRPC API. The scheme portion of the URI i.e. `https://` should be ommitted.
apikey: User-generated API key generated via the Sift application.
"""

uri: str
apikey: str
Empty file.
Loading

0 comments on commit d6993b0

Please sign in to comment.