Skip to content

Commit

Permalink
Process Signal SQS (#3813)
Browse files Browse the repository at this point in the history
* Initial work

* Fixes entities undefined

* Fixes

* Removes scheduled consumers in favor of long running processes

* Fix name

---------

Co-authored-by: Will Sheldon <[email protected]>
  • Loading branch information
kevgliss and wssheldon authored Oct 2, 2023
1 parent 62c3ff3 commit beec5de
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 92 deletions.
3 changes: 2 additions & 1 deletion requirements-base.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ atlassian-python-api==3.32.0
attrs==22.1.0
bcrypt
blockkit
boto3
cachetools
chardet
click
Expand Down Expand Up @@ -44,8 +45,8 @@ schemathesis
sentry-asgi
sentry-sdk
sh
slack-bolt
slack_sdk
slack-bolt
slowapi
spacy
sqlalchemy-filters
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ def run(self):
"dispatch.plugins": [
"dispatch_atlassian_confluence = dispatch.plugins.dispatch_atlassian_confluence.plugin:ConfluencePagePlugin",
"dispatch_atlassian_confluence_document = dispatch.plugins.dispatch_atlassian_confluence.docs.plugin:ConfluencePageDocPlugin",
"dispatch_aws_sqs = dispatch.plugins.dispatch_aws.plugin:AWSSQSSignalConsumerPlugin",
"dispatch_basic_auth = dispatch.plugins.dispatch_core.plugin:BasicAuthProviderPlugin",
"dispatch_contact = dispatch.plugins.dispatch_core.plugin:DispatchContactPlugin",
"dispatch_document_resolver = dispatch.plugins.dispatch_core.plugin:DispatchDocumentResolverPlugin",
Expand Down
112 changes: 72 additions & 40 deletions src/dispatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

import click
import uvicorn

from dispatch import __version__, config
from dispatch.enums import UserRoles
from dispatch.plugin.models import PluginInstance

from .scheduler import scheduler
from .extensions import configure_extensions

from .scheduler import scheduler

os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"

Expand Down Expand Up @@ -80,10 +80,10 @@ def list_plugins():
)
def install_plugins(force):
"""Installs all plugins, or only one."""
from dispatch.common.utils.cli import install_plugins
from dispatch.database.core import SessionLocal
from dispatch.plugin import service as plugin_service
from dispatch.plugin.models import Plugin
from dispatch.common.utils.cli import install_plugins
from dispatch.plugins.base import plugins

install_plugins()
Expand Down Expand Up @@ -162,9 +162,9 @@ def dispatch_user():
)
def register_user(email: str, role: str, password: str, organization: str):
"""Registers a new user."""
from dispatch.database.core import refetch_db_session
from dispatch.auth import service as user_service
from dispatch.auth.models import UserRegister, UserOrganization
from dispatch.auth.models import UserOrganization, UserRegister
from dispatch.database.core import refetch_db_session

db_session = refetch_db_session(organization_slug=organization)
user = user_service.get_by_email(email=email, db_session=db_session)
Expand Down Expand Up @@ -198,9 +198,9 @@ def register_user(email: str, role: str, password: str, organization: str):
)
def update_user(email: str, role: str, organization: str):
"""Updates a user's roles."""
from dispatch.database.core import SessionLocal
from dispatch.auth import service as user_service
from dispatch.auth.models import UserUpdate, UserOrganization
from dispatch.auth.models import UserOrganization, UserUpdate
from dispatch.database.core import SessionLocal

db_session = SessionLocal()
user = user_service.get_by_email(email=email, db_session=db_session)
Expand All @@ -222,9 +222,9 @@ def update_user(email: str, role: str, organization: str):
@click.password_option()
def reset_user_password(email: str, password: str):
"""Resets a user's password."""
from dispatch.database.core import SessionLocal
from dispatch.auth import service as user_service
from dispatch.auth.models import UserUpdate
from dispatch.database.core import SessionLocal

db_session = SessionLocal()
user = user_service.get_by_email(email=email, db_session=db_session)
Expand All @@ -249,9 +249,7 @@ def database_init():
"""Initializes a new database."""
click.echo("Initializing new database...")
from .database.core import engine
from .database.manage import (
init_database,
)
from .database.manage import init_database

init_database(engine)
click.secho("Success.", fg="green")
Expand All @@ -265,12 +263,13 @@ def database_init():
)
def restore_database(dump_file):
"""Restores the database via psql."""
from sh import psql, createdb, ErrorReturnCode_1
from sh import ErrorReturnCode_1, createdb, psql

from dispatch.config import (
DATABASE_CREDENTIALS,
DATABASE_HOSTNAME,
DATABASE_NAME,
DATABASE_PORT,
DATABASE_CREDENTIALS,
)

username, password = str(DATABASE_CREDENTIALS).split(":")
Expand Down Expand Up @@ -318,11 +317,12 @@ def restore_database(dump_file):
def dump_database(dump_file):
"""Dumps the database via pg_dump."""
from sh import pg_dump

from dispatch.config import (
DATABASE_CREDENTIALS,
DATABASE_HOSTNAME,
DATABASE_NAME,
DATABASE_PORT,
DATABASE_CREDENTIALS,
)

username, password = str(DATABASE_CREDENTIALS).split(":")
Expand All @@ -345,7 +345,7 @@ def dump_database(dump_file):
@click.option("--yes", is_flag=True, help="Silences all confirmation prompts.")
def drop_database(yes):
"""Drops all data in database."""
from sqlalchemy_utils import drop_database, database_exists
from sqlalchemy_utils import database_exists, drop_database

if database_exists(str(config.SQLALCHEMY_DATABASE_URI)):
if yes:
Expand Down Expand Up @@ -378,10 +378,10 @@ def drop_database(yes):
def upgrade_database(tag, sql, revision, revision_type):
"""Upgrades database schema to newest version."""
import sqlalchemy
from sqlalchemy import inspect
from sqlalchemy_utils import database_exists
from alembic import command as alembic_command
from alembic.config import Config as AlembicConfig
from sqlalchemy import inspect
from sqlalchemy_utils import database_exists

from .database.core import engine
from .database.manage import init_database
Expand Down Expand Up @@ -570,6 +570,7 @@ def revision_database(
):
"""Create new database revision."""
import types

from alembic import command as alembic_command
from alembic.config import Config as AlembicConfig

Expand Down Expand Up @@ -623,20 +624,15 @@ def dispatch_scheduler():
from .evergreen.scheduled import create_evergreen_reminders # noqa
from .feedback.incident.scheduled import feedback_report_daily # noqa
from .feedback.service.scheduled import oncall_shift_feedback # noqa
from .incident_cost.scheduled import calculate_incidents_response_cost # noqa
from .incident.scheduled import ( # noqa
incident_auto_tagger,
incident_close_reminder,
incident_report_daily,
from .incident.scheduled import (
incident_auto_tagger, # noqa
)
from .incident_cost.scheduled import calculate_incidents_response_cost # noqa
from .monitor.scheduled import sync_active_stable_monitors # noqa
from .report.scheduled import incident_report_reminders # noqa
from .signal.scheduled import consume_signals # noqa
from .tag.scheduled import sync_tags, build_tag_models # noqa
from .task.scheduled import ( # noqa
create_incident_tasks_reminders,
sync_incident_tasks_daily,
sync_active_stable_incident_tasks,
from .tag.scheduled import build_tag_models, sync_tags # noqa
from .task.scheduled import (
create_incident_tasks_reminders, # noqa
)
from .term.scheduled import sync_terms # noqa
from .workflow.scheduled import sync_workflows # noqa
Expand All @@ -662,6 +658,7 @@ def list_tasks():
def start_tasks(tasks, exclude, eager):
"""Starts the scheduler."""
import signal

from dispatch.common.utils.cli import install_plugins

install_plugins()
Expand Down Expand Up @@ -705,6 +702,7 @@ def dispatch_server():
def show_routes():
"""Prints all available routes."""
from tabulate import tabulate

from dispatch.main import api_router

table = []
Expand All @@ -717,9 +715,11 @@ def show_routes():
@dispatch_server.command("config")
def show_config():
"""Prints the current config as dispatch sees it."""
import sys
import inspect
import sys

from tabulate import tabulate

from dispatch import config

func_members = inspect.getmembers(sys.modules[config.__name__])
Expand Down Expand Up @@ -772,15 +772,51 @@ def signals_group():
pass


@signals_group.command("consume")
@click.argument("organization")
@click.argument("project")
@click.argument("plugin_name")
def consume_signals(organization, project, plugin_name): # TODO support multiple from one command
"""Runs a continuous process that consumes signals from the specified plugin."""
from dispatch.common.utils.cli import install_plugins
from dispatch.database.core import refetch_db_session
from dispatch.project import service as project_service
from dispatch.project.models import ProjectRead
from dispatch.plugin import service as plugin_service

install_plugins()

session = refetch_db_session(organization)

project = project_service.get_by_name_or_raise(
db_session=session, project_in=ProjectRead(name=project)
)

plugins = plugin_service.get_active_instances(
db_session=session, plugin_type="signal-consumer", project_id=project.id
)

if not plugins:
log.debug(
"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}"
)
return

for plugin in plugins:
if plugin.plugin.slug == plugin_name:
plugin.instance.consume(db_session=session, project=project)


@signals_group.command("process")
def process_signals():
"""Runs a continuous process that does additional processing on newly created signals."""
from sqlalchemy import asc
from dispatch.database.core import sessionmaker, engine, SessionLocal
from dispatch.signal.models import SignalInstance

from dispatch.common.utils.cli import install_plugins
from dispatch.database.core import SessionLocal, engine, sessionmaker
from dispatch.organization.service import get_all as get_all_organizations
from dispatch.signal import flows as signal_flows
from dispatch.common.utils.cli import install_plugins
from dispatch.signal.models import SignalInstance

install_plugins()

Expand Down Expand Up @@ -819,20 +855,15 @@ def process_signals():
@click.argument("project")
def run_slack_websocket(organization: str, project: str):
"""Runs the slack websocket process."""
from sqlalchemy import true

from slack_bolt.adapter.socket_mode import SocketModeHandler
from sqlalchemy import true

from dispatch.database.core import refetch_db_session
from dispatch.common.utils.cli import install_plugins
from dispatch.database.core import refetch_db_session
from dispatch.plugins.dispatch_slack.bolt import app
from dispatch.plugins.dispatch_slack.case.interactive import configure as case_configure
from dispatch.plugins.dispatch_slack.incident.interactive import configure as incident_configure
from dispatch.plugins.dispatch_slack.feedback.interactive import ( # noqa
configure as feedback_configure,
)
from dispatch.plugins.dispatch_slack.workflow import configure as workflow_configure
from dispatch.plugins.dispatch_slack.case.interactive import configure as case_configure

from dispatch.project import service as project_service
from dispatch.project.models import ProjectRead

Expand Down Expand Up @@ -884,6 +915,7 @@ def run_slack_websocket(organization: str, project: str):
def shell(ipython_args):
"""Starts an ipython shell importing our app. Useful for debugging."""
import sys

import IPython
from IPython.terminal.ipapp import load_default_config

Expand Down
1 change: 1 addition & 0 deletions src/dispatch/plugins/dispatch_aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from ._version import __version__ # noqa
1 change: 1 addition & 0 deletions src/dispatch/plugins/dispatch_aws/_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.1.0"
29 changes: 29 additions & 0 deletions src/dispatch/plugins/dispatch_aws/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pydantic import Field
from dispatch.config import BaseConfigurationModel


class AWSSQSConfiguration(BaseConfigurationModel):
"""Signal SQS configuration"""

queue_name: str = Field(
title="Queue Name",
description="Queue Name, not the ARN.",
)

queue_owner: str = Field(
title="Queue Owner",
description="Queue Owner Account ID.",
)

region: str = Field(
title="AWS Region",
description="AWS Region.",
default="us-east-1",
)

batch_size: int = Field(
title="Batch Size",
description="Number of messages to retrieve from SQS.",
default=10,
le=10,
)
Loading

0 comments on commit beec5de

Please sign in to comment.