Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process Signal SQS #3813

Merged
merged 12 commits into from
Oct 2, 2023
3 changes: 2 additions & 1 deletion requirements-base.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ atlassian-python-api==3.32.0
attrs==22.1.0
bcrypt
blockkit
boto3
cachetools
chardet
click
Expand Down Expand Up @@ -44,8 +45,8 @@ schemathesis
sentry-asgi
sentry-sdk
sh
slack-bolt
slack_sdk
slack-bolt
slowapi
spacy
sqlalchemy-filters
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ def run(self):
"dispatch.plugins": [
"dispatch_atlassian_confluence = dispatch.plugins.dispatch_atlassian_confluence.plugin:ConfluencePagePlugin",
"dispatch_atlassian_confluence_document = dispatch.plugins.dispatch_atlassian_confluence.docs.plugin:ConfluencePageDocPlugin",
"dispatch_aws_sqs = dispatch.plugins.dispatch_aws.plugin:AWSSQSSignalConsumerPlugin",
"dispatch_basic_auth = dispatch.plugins.dispatch_core.plugin:BasicAuthProviderPlugin",
"dispatch_contact = dispatch.plugins.dispatch_core.plugin:DispatchContactPlugin",
"dispatch_document_resolver = dispatch.plugins.dispatch_core.plugin:DispatchDocumentResolverPlugin",
Expand Down
112 changes: 72 additions & 40 deletions src/dispatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

import click
import uvicorn

from dispatch import __version__, config
from dispatch.enums import UserRoles
from dispatch.plugin.models import PluginInstance

from .scheduler import scheduler
from .extensions import configure_extensions

from .scheduler import scheduler

os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"

Expand Down Expand Up @@ -80,10 +80,10 @@ def list_plugins():
)
def install_plugins(force):
"""Installs all plugins, or only one."""
from dispatch.common.utils.cli import install_plugins
from dispatch.database.core import SessionLocal
from dispatch.plugin import service as plugin_service
from dispatch.plugin.models import Plugin
from dispatch.common.utils.cli import install_plugins
from dispatch.plugins.base import plugins

install_plugins()
Expand Down Expand Up @@ -162,9 +162,9 @@ def dispatch_user():
)
def register_user(email: str, role: str, password: str, organization: str):
"""Registers a new user."""
from dispatch.database.core import refetch_db_session
from dispatch.auth import service as user_service
from dispatch.auth.models import UserRegister, UserOrganization
from dispatch.auth.models import UserOrganization, UserRegister
from dispatch.database.core import refetch_db_session

db_session = refetch_db_session(organization_slug=organization)
user = user_service.get_by_email(email=email, db_session=db_session)
Expand Down Expand Up @@ -198,9 +198,9 @@ def register_user(email: str, role: str, password: str, organization: str):
)
def update_user(email: str, role: str, organization: str):
"""Updates a user's roles."""
from dispatch.database.core import SessionLocal
from dispatch.auth import service as user_service
from dispatch.auth.models import UserUpdate, UserOrganization
from dispatch.auth.models import UserOrganization, UserUpdate
from dispatch.database.core import SessionLocal

db_session = SessionLocal()
user = user_service.get_by_email(email=email, db_session=db_session)
Expand All @@ -222,9 +222,9 @@ def update_user(email: str, role: str, organization: str):
@click.password_option()
def reset_user_password(email: str, password: str):
"""Resets a user's password."""
from dispatch.database.core import SessionLocal
from dispatch.auth import service as user_service
from dispatch.auth.models import UserUpdate
from dispatch.database.core import SessionLocal

db_session = SessionLocal()
user = user_service.get_by_email(email=email, db_session=db_session)
Expand All @@ -249,9 +249,7 @@ def database_init():
"""Initializes a new database."""
click.echo("Initializing new database...")
from .database.core import engine
from .database.manage import (
init_database,
)
from .database.manage import init_database

init_database(engine)
click.secho("Success.", fg="green")
Expand All @@ -265,12 +263,13 @@ def database_init():
)
def restore_database(dump_file):
"""Restores the database via psql."""
from sh import psql, createdb, ErrorReturnCode_1
from sh import ErrorReturnCode_1, createdb, psql

from dispatch.config import (
DATABASE_CREDENTIALS,
DATABASE_HOSTNAME,
DATABASE_NAME,
DATABASE_PORT,
DATABASE_CREDENTIALS,
)

username, password = str(DATABASE_CREDENTIALS).split(":")
Expand Down Expand Up @@ -318,11 +317,12 @@ def restore_database(dump_file):
def dump_database(dump_file):
"""Dumps the database via pg_dump."""
from sh import pg_dump

from dispatch.config import (
DATABASE_CREDENTIALS,
DATABASE_HOSTNAME,
DATABASE_NAME,
DATABASE_PORT,
DATABASE_CREDENTIALS,
)

username, password = str(DATABASE_CREDENTIALS).split(":")
Expand All @@ -345,7 +345,7 @@ def dump_database(dump_file):
@click.option("--yes", is_flag=True, help="Silences all confirmation prompts.")
def drop_database(yes):
"""Drops all data in database."""
from sqlalchemy_utils import drop_database, database_exists
from sqlalchemy_utils import database_exists, drop_database

if database_exists(str(config.SQLALCHEMY_DATABASE_URI)):
if yes:
Expand Down Expand Up @@ -378,10 +378,10 @@ def drop_database(yes):
def upgrade_database(tag, sql, revision, revision_type):
"""Upgrades database schema to newest version."""
import sqlalchemy
from sqlalchemy import inspect
from sqlalchemy_utils import database_exists
from alembic import command as alembic_command
from alembic.config import Config as AlembicConfig
from sqlalchemy import inspect
from sqlalchemy_utils import database_exists

from .database.core import engine
from .database.manage import init_database
Expand Down Expand Up @@ -570,6 +570,7 @@ def revision_database(
):
"""Create new database revision."""
import types

from alembic import command as alembic_command
from alembic.config import Config as AlembicConfig

Expand Down Expand Up @@ -623,20 +624,15 @@ def dispatch_scheduler():
from .evergreen.scheduled import create_evergreen_reminders # noqa
from .feedback.incident.scheduled import feedback_report_daily # noqa
from .feedback.service.scheduled import oncall_shift_feedback # noqa
from .incident_cost.scheduled import calculate_incidents_response_cost # noqa
from .incident.scheduled import ( # noqa
incident_auto_tagger,
incident_close_reminder,
incident_report_daily,
from .incident.scheduled import (
incident_auto_tagger, # noqa
)
from .incident_cost.scheduled import calculate_incidents_response_cost # noqa
from .monitor.scheduled import sync_active_stable_monitors # noqa
from .report.scheduled import incident_report_reminders # noqa
from .signal.scheduled import consume_signals # noqa
from .tag.scheduled import sync_tags, build_tag_models # noqa
from .task.scheduled import ( # noqa
create_incident_tasks_reminders,
sync_incident_tasks_daily,
sync_active_stable_incident_tasks,
from .tag.scheduled import build_tag_models, sync_tags # noqa
from .task.scheduled import (
create_incident_tasks_reminders, # noqa
)
from .term.scheduled import sync_terms # noqa
from .workflow.scheduled import sync_workflows # noqa
Expand All @@ -662,6 +658,7 @@ def list_tasks():
def start_tasks(tasks, exclude, eager):
"""Starts the scheduler."""
import signal

from dispatch.common.utils.cli import install_plugins

install_plugins()
Expand Down Expand Up @@ -705,6 +702,7 @@ def dispatch_server():
def show_routes():
"""Prints all available routes."""
from tabulate import tabulate

from dispatch.main import api_router

table = []
Expand All @@ -717,9 +715,11 @@ def show_routes():
@dispatch_server.command("config")
def show_config():
"""Prints the current config as dispatch sees it."""
import sys
import inspect
import sys

from tabulate import tabulate

from dispatch import config

func_members = inspect.getmembers(sys.modules[config.__name__])
Expand Down Expand Up @@ -772,15 +772,51 @@ def signals_group():
pass


@signals_group.command("consume")
@click.argument("organization")
@click.argument("project")
@click.argument("plugin_name")
def consume_signals(organization, project, plugin_name): # TODO support multiple from one command
"""Runs a continuous process that consumes signals from the specified plugin."""
from dispatch.common.utils.cli import install_plugins
from dispatch.database.core import refetch_db_session
from dispatch.project import service as project_service
from dispatch.project.models import ProjectRead
from dispatch.plugin import service as plugin_service

install_plugins()

session = refetch_db_session(organization)

project = project_service.get_by_name_or_raise(
db_session=session, project_in=ProjectRead(name=project)
)

plugins = plugin_service.get_active_instances(
db_session=session, plugin_type="signal-consumer", project_id=project.id
)

if not plugins:
log.debug(
"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}"
)
return

for plugin in plugins:
if plugin.plugin.slug == plugin_name:
plugin.instance.consume(db_session=session, project=project)


@signals_group.command("process")
def process_signals():
"""Runs a continuous process that does additional processing on newly created signals."""
from sqlalchemy import asc
from dispatch.database.core import sessionmaker, engine, SessionLocal
from dispatch.signal.models import SignalInstance

from dispatch.common.utils.cli import install_plugins
from dispatch.database.core import SessionLocal, engine, sessionmaker
from dispatch.organization.service import get_all as get_all_organizations
from dispatch.signal import flows as signal_flows
from dispatch.common.utils.cli import install_plugins
from dispatch.signal.models import SignalInstance

install_plugins()

Expand Down Expand Up @@ -819,20 +855,15 @@ def process_signals():
@click.argument("project")
def run_slack_websocket(organization: str, project: str):
"""Runs the slack websocket process."""
from sqlalchemy import true

from slack_bolt.adapter.socket_mode import SocketModeHandler
from sqlalchemy import true

from dispatch.database.core import refetch_db_session
from dispatch.common.utils.cli import install_plugins
from dispatch.database.core import refetch_db_session
from dispatch.plugins.dispatch_slack.bolt import app
from dispatch.plugins.dispatch_slack.case.interactive import configure as case_configure
from dispatch.plugins.dispatch_slack.incident.interactive import configure as incident_configure
from dispatch.plugins.dispatch_slack.feedback.interactive import ( # noqa
configure as feedback_configure,
)
from dispatch.plugins.dispatch_slack.workflow import configure as workflow_configure
from dispatch.plugins.dispatch_slack.case.interactive import configure as case_configure

from dispatch.project import service as project_service
from dispatch.project.models import ProjectRead

Expand Down Expand Up @@ -884,6 +915,7 @@ def run_slack_websocket(organization: str, project: str):
def shell(ipython_args):
"""Starts an ipython shell importing our app. Useful for debugging."""
import sys

import IPython
from IPython.terminal.ipapp import load_default_config

Expand Down
1 change: 1 addition & 0 deletions src/dispatch/plugins/dispatch_aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from ._version import __version__ # noqa
1 change: 1 addition & 0 deletions src/dispatch/plugins/dispatch_aws/_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.1.0"
29 changes: 29 additions & 0 deletions src/dispatch/plugins/dispatch_aws/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pydantic import Field
from dispatch.config import BaseConfigurationModel


class AWSSQSConfiguration(BaseConfigurationModel):
"""Signal SQS configuration"""

queue_name: str = Field(
title="Queue Name",
description="Queue Name, not the ARN.",
)

queue_owner: str = Field(
title="Queue Owner",
description="Queue Owner Account ID.",
)

region: str = Field(
title="AWS Region",
description="AWS Region.",
default="us-east-1",
)

batch_size: int = Field(
title="Batch Size",
description="Number of messages to retrieve from SQS.",
default=10,
le=10,
)
Loading
Loading