Skip to content

Commit

Permalink
Adds a command to perform signals testing to the CLI (#5285)
Browse files Browse the repository at this point in the history
* Adds a command to perform signals testing to the CLI

* Adds a command to perform signals testing to the CLI

* Update cli.py
  • Loading branch information
mvilanova authored Oct 3, 2024
1 parent bb0c45b commit f53427d
Showing 1 changed file with 124 additions and 4 deletions.
128 changes: 124 additions & 4 deletions src/dispatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import uvicorn

from dispatch import __version__, config
from dispatch.config import DISPATCH_UI_URL
from dispatch.enums import UserRoles
from dispatch.plugin.models import PluginInstance

Expand Down Expand Up @@ -636,6 +637,7 @@ def revision_database(
def dispatch_scheduler():
"""Container for all dispatch scheduler commands."""
# we need scheduled tasks to be imported
from .case.scheduled import case_close_reminder, case_triage_reminder # noqa
from .case_cost.scheduled import (
calculate_cases_response_cost, # noqa
)
Expand All @@ -656,7 +658,6 @@ def dispatch_scheduler():
)
from .term.scheduled import sync_terms # noqa
from .workflow.scheduled import sync_workflows # noqa
from .case.scheduled import case_triage_reminder, case_close_reminder # noqa


@dispatch_scheduler.command("list")
Expand Down Expand Up @@ -806,10 +807,10 @@ def consume_signals():
None
"""
from dispatch.common.utils.cli import install_plugins
from dispatch.project import service as project_service
from dispatch.plugin import service as plugin_service
from dispatch.database.core import get_organization_session, get_session
from dispatch.organization.service import get_all as get_all_organizations
from dispatch.database.core import get_session, get_organization_session
from dispatch.plugin import service as plugin_service
from dispatch.project import service as project_service

install_plugins()

Expand Down Expand Up @@ -883,6 +884,125 @@ def process_signals():
db_session.close()


@signals_group.command("perf-test")
@click.option("--num-instances", default=1, help="Number of signal instances to send.")
@click.option("--num-workers", default=1, help="Number of threads to use.")
@click.option(
"--api-endpoint",
default=f"{DISPATCH_UI_URL}/api/v1/default/signals/instances",
required=True,
help="API endpoint to send the signal instances to.",
)
@click.option(
"--api-token",
required=True,
help="API token to use.",
)
@click.option(
"--project",
default="Test",
required=True,
help="The Dispatch project to send the instances to.",
)
def perf_test(
num_instances: int, num_workers: int, api_endpoint: str, api_token: str, project: str
) -> None:
"""Performance testing utility for creating signal instances."""

import concurrent.futures
import time
import uuid

import requests
from fastapi import status

NUM_SIGNAL_INSTANCES = num_instances
NUM_WORKERS = num_workers

session = requests.Session()
session.headers.update(
{
"Content-Type": "application/json",
"Authorization": f"Bearer {api_token}",
}
)
start_time = time.time()

def _send_signal_instance(
api_endpoint: str,
api_token: str,
session: requests.Session,
signal_instance: dict[str, str],
) -> None:
try:
r = session.post(
api_endpoint,
json=signal_instance,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_token}",
},
)
log.info(f"Response: {r.json()}")
if r.status_code == status.HTTP_401_UNAUTHORIZED:
raise PermissionError(
"Unauthorized. Please check your bearer token. You can find it in the Dev Tools under Request Headers -> Authorization."
)

r.raise_for_status()

except requests.exceptions.RequestException as e:
log.error(f"Unable to send finding. Reason: {e} Response: {r.json() if r else 'N/A'}")
else:
log.info(f"{signal_instance.get('raw', {}).get('id')} created successfully")

def send_signal_instances(
api_endpoint: str, api_token: str, signal_instances: list[dict[str, str]]
):
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = [
executor.submit(
_send_signal_instance,
api_endpoint=api_endpoint,
api_token=api_token,
session=session,
signal_instance=signal_instance,
)
for signal_instance in signal_instances
]
results = [future.result() for future in concurrent.futures.as_completed(futures)]

log.info(f"\nSent {len(results)} of {NUM_SIGNAL_INSTANCES} signal instances")

signal_instances = [
{
"project": {"name": project},
"raw": {
"id": str(uuid.uuid4()),
"name": "Test Signal",
"slug": "test-signal",
"canary": False,
"events": [
{
"original": {
"dateint": 20240930,
"distinct_lookupkey_count": 95,
},
},
],
"created_at": "2024-09-18T19:47:15Z",
"quiet_mode": False,
"external_id": "4ebbab36-c703-495f-ae47-7051bdc8b3ef",
},
},
] * NUM_SIGNAL_INSTANCES

send_signal_instances(api_endpoint, api_token, signal_instances)

elapsed_time = time.time() - start_time
click.echo(f"Elapsed time: {elapsed_time:.2f} seconds")


@dispatch_server.command("slack")
@click.argument("organization")
@click.argument("project")
Expand Down

0 comments on commit f53427d

Please sign in to comment.