From 43d1ec94794d4aefd852dda31b62faf385e20dfb Mon Sep 17 00:00:00 2001 From: Benji Nguyen <45523555+solidiquis@users.noreply.github.com> Date: Fri, 23 Aug 2024 13:42:19 -0700 Subject: [PATCH] python(feature): support keepalive (#86) --- python/lib/sift_py/grpc/keepalive.py | 34 ++++++++++++++++++++++++++++ python/lib/sift_py/grpc/transport.py | 32 ++++++++++++++++++++++---- 2 files changed, 61 insertions(+), 5 deletions(-) create mode 100644 python/lib/sift_py/grpc/keepalive.py diff --git a/python/lib/sift_py/grpc/keepalive.py b/python/lib/sift_py/grpc/keepalive.py new file mode 100644 index 00000000..42cef821 --- /dev/null +++ b/python/lib/sift_py/grpc/keepalive.py @@ -0,0 +1,34 @@ +from typing import TypedDict + +DEFAULT_KEEPALIVE_TIME_MS = 20_000 +"""Interval with which to send keepalive pings""" + +DEFAULT_KEEPALIVE_TIMEOUT_MS = 60_000 +"""Timeout while waiting for server to acknowledge keepalive ping""" + +DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS = 0 +"""Disabled""" + +DEFAULT_MAX_PINGS_WITHOUT_DATA = 0 +"""Disabled""" + + +# https://github.com/grpc/grpc/blob/master/doc/keepalive.md +class KeepaliveConfig(TypedDict): + """ + Make make this public in the future to allow folks to configure their own keepalive settings + if there is demand for it. + """ + + keepalive_time_ms: int + keepalive_timeout_ms: int + keepalive_permit_without_calls: int + max_pings_without_data: int + + +DEFAULT_KEEPALIVE_CONFIG: KeepaliveConfig = { + "keepalive_time_ms": DEFAULT_KEEPALIVE_TIME_MS, + "keepalive_timeout_ms": DEFAULT_KEEPALIVE_TIMEOUT_MS, + "keepalive_permit_without_calls": DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS, + "max_pings_without_data": DEFAULT_MAX_PINGS_WITHOUT_DATA, +} diff --git a/python/lib/sift_py/grpc/transport.py b/python/lib/sift_py/grpc/transport.py index 1ea5a5cf..2aa51a24 100644 --- a/python/lib/sift_py/grpc/transport.py +++ b/python/lib/sift_py/grpc/transport.py @@ -6,7 +6,7 @@ from __future__ import annotations -from typing import Any, List, Tuple, TypedDict +from typing import Any, List, Optional, Tuple, TypedDict, Union from urllib.parse import ParseResult, urlparse import grpc @@ -18,6 +18,7 @@ from sift_py.grpc._interceptors.base import ClientInterceptor from sift_py.grpc._interceptors.metadata import Metadata, MetadataInterceptor from sift_py.grpc._retry import RetryPolicy +from sift_py.grpc.keepalive import DEFAULT_KEEPALIVE_CONFIG, KeepaliveConfig SiftChannel: TypeAlias = grpc.Channel SiftAsyncChannel: TypeAlias = grpc_aio.Channel @@ -39,7 +40,7 @@ def use_sift_channel(config: SiftChannelConfig) -> SiftChannel: return _use_insecure_sift_channel(config) credentials = grpc.ssl_channel_credentials() - options = _compute_channel_options() + options = _compute_channel_options(config) api_uri = _clean_uri(config["uri"], use_ssl) channel = grpc.secure_channel(api_uri, credentials, options) interceptors = _compute_sift_interceptors(config) @@ -59,7 +60,7 @@ def use_sift_async_channel(config: SiftChannelConfig) -> SiftAsyncChannel: return grpc_aio.secure_channel( target=_clean_uri(config["uri"], use_ssl), credentials=grpc.ssl_channel_credentials(), - options=_compute_channel_options(), + options=_compute_channel_options(config), interceptors=_compute_sift_async_interceptors(config), ) @@ -101,11 +102,28 @@ def _compute_sift_async_interceptors(config: SiftChannelConfig) -> List[grpc_aio ] -def _compute_channel_options() -> List[Tuple[str, Any]]: +def _compute_channel_options(opts: Optional[SiftChannelConfig] = None) -> List[Tuple[str, Any]]: """ Initialize all [channel options](https://github.com/grpc/grpc/blob/v1.64.x/include/grpc/impl/channel_arg_names.h) here. """ - return [("grpc.enable_retries", 1), ("grpc.service_config", RetryPolicy.default().as_json())] + + options = [("grpc.enable_retries", 1), ("grpc.service_config", RetryPolicy.default().as_json())] + + if opts is None: + return options + + if keepalive := opts.get("enable_keepalive"): + config = DEFAULT_KEEPALIVE_CONFIG if isinstance(keepalive, bool) else keepalive + options.extend( + [ + ("grpc.keepalive_time_ms", config["keepalive_time_ms"]), + ("grpc.keepalive_timeout_ms", config["keepalive_timeout_ms"]), + ("grpc.http2.max_pings_without_data", config["max_pings_without_data"]), + ("grpc.keepalive_permit_without_calls", config["keepalive_permit_without_calls"]), + ] + ) + + return options def _metadata_interceptor(config: SiftChannelConfig) -> ClientInterceptor: @@ -150,9 +168,13 @@ class SiftChannelConfig(TypedDict): Config class used to instantiate a `SiftChannel` via `use_sift_channel`. - `uri`: The URI of Sift's gRPC API. The scheme portion of the URI i.e. `https://` should be ommitted. - `apikey`: User-generated API key generated via the Sift application. + - `enable_keepalive`: Enable HTTP/2 PING-based keepalive to allow long-lived connections with idle long periods. If + set to `True`, it will use the default values configured in `sift_py.grpc.keepalive` to configure keepalive. A custom + `sift_py.grpc.keepalive.KeepaliveConfig` may also be provided. Default disabled. - `use_ssl`: INTERNAL USE. Meant to be used for local development. """ uri: str apikey: str + enable_keepalive: NotRequired[Union[bool, KeepaliveConfig]] use_ssl: NotRequired[bool]