From f53427d7676a90b8a4eddb1fe81c403a1d30d8b0 Mon Sep 17 00:00:00 2001 From: Marc Vilanova <39573146+mvilanova@users.noreply.github.com> Date: Thu, 3 Oct 2024 11:42:09 -0700 Subject: [PATCH] Adds a command to perform signals testing to the CLI (#5285) * Adds a command to perform signals testing to the CLI * Adds a command to perform signals testing to the CLI * Update cli.py --- src/dispatch/cli.py | 128 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 124 insertions(+), 4 deletions(-) diff --git a/src/dispatch/cli.py b/src/dispatch/cli.py index 9fe023421546..753020726107 100644 --- a/src/dispatch/cli.py +++ b/src/dispatch/cli.py @@ -5,6 +5,7 @@ import uvicorn from dispatch import __version__, config +from dispatch.config import DISPATCH_UI_URL from dispatch.enums import UserRoles from dispatch.plugin.models import PluginInstance @@ -636,6 +637,7 @@ def revision_database( def dispatch_scheduler(): """Container for all dispatch scheduler commands.""" # we need scheduled tasks to be imported + from .case.scheduled import case_close_reminder, case_triage_reminder # noqa from .case_cost.scheduled import ( calculate_cases_response_cost, # noqa ) @@ -656,7 +658,6 @@ def dispatch_scheduler(): ) from .term.scheduled import sync_terms # noqa from .workflow.scheduled import sync_workflows # noqa - from .case.scheduled import case_triage_reminder, case_close_reminder # noqa @dispatch_scheduler.command("list") @@ -806,10 +807,10 @@ def consume_signals(): None """ from dispatch.common.utils.cli import install_plugins - from dispatch.project import service as project_service - from dispatch.plugin import service as plugin_service + from dispatch.database.core import get_organization_session, get_session from dispatch.organization.service import get_all as get_all_organizations - from dispatch.database.core import get_session, get_organization_session + from dispatch.plugin import service as plugin_service + from dispatch.project import service as project_service install_plugins() @@ -883,6 +884,125 @@ def process_signals(): db_session.close() +@signals_group.command("perf-test") +@click.option("--num-instances", default=1, help="Number of signal instances to send.") +@click.option("--num-workers", default=1, help="Number of threads to use.") +@click.option( + "--api-endpoint", + default=f"{DISPATCH_UI_URL}/api/v1/default/signals/instances", + required=True, + help="API endpoint to send the signal instances to.", +) +@click.option( + "--api-token", + required=True, + help="API token to use.", +) +@click.option( + "--project", + default="Test", + required=True, + help="The Dispatch project to send the instances to.", +) +def perf_test( + num_instances: int, num_workers: int, api_endpoint: str, api_token: str, project: str +) -> None: + """Performance testing utility for creating signal instances.""" + + import concurrent.futures + import time + import uuid + + import requests + from fastapi import status + + NUM_SIGNAL_INSTANCES = num_instances + NUM_WORKERS = num_workers + + session = requests.Session() + session.headers.update( + { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_token}", + } + ) + start_time = time.time() + + def _send_signal_instance( + api_endpoint: str, + api_token: str, + session: requests.Session, + signal_instance: dict[str, str], + ) -> None: + try: + r = session.post( + api_endpoint, + json=signal_instance, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {api_token}", + }, + ) + log.info(f"Response: {r.json()}") + if r.status_code == status.HTTP_401_UNAUTHORIZED: + raise PermissionError( + "Unauthorized. Please check your bearer token. You can find it in the Dev Tools under Request Headers -> Authorization." + ) + + r.raise_for_status() + + except requests.exceptions.RequestException as e: + log.error(f"Unable to send finding. Reason: {e} Response: {r.json() if r else 'N/A'}") + else: + log.info(f"{signal_instance.get('raw', {}).get('id')} created successfully") + + def send_signal_instances( + api_endpoint: str, api_token: str, signal_instances: list[dict[str, str]] + ): + with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: + futures = [ + executor.submit( + _send_signal_instance, + api_endpoint=api_endpoint, + api_token=api_token, + session=session, + signal_instance=signal_instance, + ) + for signal_instance in signal_instances + ] + results = [future.result() for future in concurrent.futures.as_completed(futures)] + + log.info(f"\nSent {len(results)} of {NUM_SIGNAL_INSTANCES} signal instances") + + signal_instances = [ + { + "project": {"name": project}, + "raw": { + "id": str(uuid.uuid4()), + "name": "Test Signal", + "slug": "test-signal", + "canary": False, + "events": [ + { + "original": { + "dateint": 20240930, + "distinct_lookupkey_count": 95, + }, + }, + ], + "created_at": "2024-09-18T19:47:15Z", + "quiet_mode": False, + "external_id": "4ebbab36-c703-495f-ae47-7051bdc8b3ef", + }, + }, + ] * NUM_SIGNAL_INSTANCES + + send_signal_instances(api_endpoint, api_token, signal_instances) + + elapsed_time = time.time() - start_time + click.echo(f"Elapsed time: {elapsed_time:.2f} seconds") + + @dispatch_server.command("slack") @click.argument("organization") @click.argument("project")