diff --git a/python/lib/sift_cli.py b/python/lib/sift_cli.py new file mode 100644 index 00000000..c973c1f4 --- /dev/null +++ b/python/lib/sift_cli.py @@ -0,0 +1,122 @@ +from pathlib import Path +from typing import Dict, Optional, cast + +import click +from dotenv import dotenv_values +from sift.ingestion_configs.v1.ingestion_configs_pb2 import IngestionConfig +from sift.ping.v1.ping_pb2 import PingRequest +from sift.ping.v1.ping_pb2_grpc import PingServiceStub +from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel +from sift_py.ingestion._internal.ingestion_config import ( + get_ingestion_config_by_client_key, + get_ingestion_config_flows, + list_ingestion_configs, +) + +DEFUALT_AUTH_PATH = Path.home() / ".sift" / ".auth.env" + + +@click.group() +@click.option( + "--auth_file", + default=DEFUALT_AUTH_PATH, + show_default=True, + type=click.Path(), + help="Path to the auth file.", +) +@click.pass_context +def sift_cli(ctx: click.Context, auth_file: str): + credentials: Dict[str, Optional[str]] = {} + + if auth_file: + credentials = dotenv_values(auth_file) + else: + if DEFUALT_AUTH_PATH.exists(): + credentials = dotenv_values(DEFUALT_AUTH_PATH) + else: + raise click.ClickException(f"Auth file not found at {DEFUALT_AUTH_PATH}") + + if not credentials.get("SIFT_API_KEY"): + raise click.ClickException("Auth file must contain SIFT_API_KEY") + + if not credentials.get("BASE_GRPC_URI"): + raise click.ClickException("Auth file must contain BASE_GRPC_URI") + + validated_credentials = cast(Dict[str, str], credentials) + + channel_config: SiftChannelConfig = { + "apikey": validated_credentials["SIFT_API_KEY"], + "uri": validated_credentials["BASE_GRPC_URI"], + } + + ctx.obj["channel"] = ctx.with_resource(use_sift_channel(channel_config)) + + +################ +# Ping Service # +################ + + +@sift_cli.command() +@click.pass_context +def ping(ctx): + """Test the connection to the Sift API.""" + click.echo(PingServiceStub(ctx.obj["channel"]).Ping(PingRequest())) + + +############################ +# Ingestion Config Service # +############################ + + +@sift_cli.command(name="list-ingestion-configs") +@click.option( + "--filter", + default=None, + help="A Common Expression Language (CEL) filter string.", +) +@click.pass_context +def list_ingestion_configs_command(ctx: click.Context, filter: str): + """List ingestion configs using an optional filter.""" + configs = list_ingestion_configs(ctx.obj["channel"], filter) + if configs: + click.echo("Results:\n") + for config in configs: + click.echo(config) + else: + click.echo("No results found.") + + +@sift_cli.command(name="get-ingestion-config") +@click.argument( + "client_key", +) +@click.option( + "--list_flows", + is_flag=True, + help="List flow information for the ingestion config.", +) +@click.pass_context +def get_ingestion_config_by_client_key_command( + ctx: click.Context, client_key: str, list_flows: bool +): + """Return an ingestion config by its client key.""" + result: Optional[IngestionConfig] = get_ingestion_config_by_client_key( + ctx.obj["channel"], client_key + ) + if result: + click.echo(result) + if list_flows: + for flow in get_ingestion_config_flows(ctx.obj["channel"], result.ingestion_config_id): + click.echo(flow) + + else: + click.echo("No results found.") + + +def main(): + sift_cli(obj={}) + + +if __name__ == "__main__": + main() diff --git a/python/lib/sift_py/ingestion/_internal/ingestion_config.py b/python/lib/sift_py/ingestion/_internal/ingestion_config.py index 14b3c57a..07fbbf3b 100644 --- a/python/lib/sift_py/ingestion/_internal/ingestion_config.py +++ b/python/lib/sift_py/ingestion/_internal/ingestion_config.py @@ -29,19 +29,40 @@ def get_ingestion_config_by_client_key( """ Returns `None` if no ingestion config can be matched with the provided `client_key` """ + results = list_ingestion_configs(channel, f'client_key=="{client_key}"', page_size=1) - svc = IngestionConfigServiceStub(channel) - req = ListIngestionConfigsRequest( - filter=f'client_key=="{client_key}"', - page_token="", - page_size=1, - ) - res = cast(ListIngestionConfigsResponse, svc.ListIngestionConfigs(req)) - - if len(res.ingestion_configs) == 0: + if len(results) == 0: return None else: - return res.ingestion_configs[0] + return results[0] + + +def list_ingestion_configs( + channel: SiftChannel, + filter: str, + page_size: int = 1_000, +) -> List[IngestionConfig]: + """ + Returns a list of ingestion configs that can be matched with the provided `filter`. + """ + svc = IngestionConfigServiceStub(channel) + + results: List[IngestionConfig] = [] + next_page_token = "" + while True: + req = ListIngestionConfigsRequest( + page_token=next_page_token, + filter=filter, + page_size=page_size, + ) + res = cast(ListIngestionConfigsResponse, svc.ListIngestionConfigs(req)) + results.extend(res.ingestion_configs) + + next_page_token = res.next_page_token + if not next_page_token: + break + + return results def create_ingestion_config( diff --git a/python/pyproject.toml b/python/pyproject.toml index 1010474a..46fed83d 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -47,6 +47,7 @@ Changelog = "https://github.com/sift-stack/sift/tree/main/python/CHANGELOG.md" [project.optional-dependencies] development = [ + "click", "grpcio-testing==1.13", "mypy==1.10.0", "pyright==1.1.386", @@ -177,3 +178,6 @@ exclude = [ [tool.ruff.lint] select = ["F", "W", "I", "N", "TID"] + +[project.scripts] +sift-cli = "sift_cli:main" \ No newline at end of file