diff --git a/requirements-base.in b/requirements-base.in index e6834e4d73dd..95003484d6f4 100644 --- a/requirements-base.in +++ b/requirements-base.in @@ -6,6 +6,7 @@ atlassian-python-api==3.32.0 attrs==22.1.0 bcrypt blockkit +boto3 cachetools chardet click @@ -44,8 +45,8 @@ schemathesis sentry-asgi sentry-sdk sh -slack-bolt slack_sdk +slack-bolt slowapi spacy sqlalchemy-filters diff --git a/requirements-base.txt b/requirements-base.txt index 0775b5d9bc6e..c6599d9ec3fa 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -358,7 +358,7 @@ rsa==4.9 # google-auth # oauth2client # python-jose -schedule==1.2.0 +schedule==1.2.1 # via -r requirements-base.in schemathesis==3.19.7 # via -r requirements-base.in diff --git a/requirements-dev.txt b/requirements-dev.txt index c8d639b2ce32..e348450adb31 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -44,7 +44,7 @@ identify==2.5.27 # via pre-commit iniconfig==2.0.0 # via pytest -ipython==8.16.0 +ipython==8.16.1 # via -r requirements-dev.in jedi==0.19.0 # via ipython diff --git a/setup.py b/setup.py index 0314fe703c4e..ef838342801a 100644 --- a/setup.py +++ b/setup.py @@ -404,6 +404,7 @@ def run(self): "dispatch.plugins": [ "dispatch_atlassian_confluence = dispatch.plugins.dispatch_atlassian_confluence.plugin:ConfluencePagePlugin", "dispatch_atlassian_confluence_document = dispatch.plugins.dispatch_atlassian_confluence.docs.plugin:ConfluencePageDocPlugin", + "dispatch_aws_sqs = dispatch.plugins.dispatch_aws.plugin:AWSSQSSignalConsumerPlugin", "dispatch_basic_auth = dispatch.plugins.dispatch_core.plugin:BasicAuthProviderPlugin", "dispatch_contact = dispatch.plugins.dispatch_core.plugin:DispatchContactPlugin", "dispatch_document_resolver = dispatch.plugins.dispatch_core.plugin:DispatchDocumentResolverPlugin", diff --git a/src/dispatch/case/flows.py b/src/dispatch/case/flows.py index 6c06cb88113d..86b04e400096 100644 --- a/src/dispatch/case/flows.py +++ b/src/dispatch/case/flows.py @@ -177,7 +177,7 @@ def case_new_create_flow( conversation_target: str = None, service_id: int = None, db_session: Session, - create_resources: bool = True, + create_all_resources: bool = True, ): """Runs the case new creation flow.""" # we get the case @@ -186,18 +186,21 @@ def case_new_create_flow( # we create the ticket ticket_flows.create_case_ticket(case=case, db_session=db_session) + # we resolve participants individual_participants, team_participants = get_case_participants( case=case, db_session=db_session ) - if create_resources: - case_create_resources_flow( - db_session=db_session, - case_id=case.id, - individual_participants=individual_participants, - team_participants=team_participants, - conversation_target=conversation_target, - ) + # NOTE: we create all external resources for a Case unless it's + # created from a Signal, as it gets expensive when we have lots of them. + case_create_resources_flow( + db_session=db_session, + case_id=case.id, + individual_participants=individual_participants, + team_participants=team_participants, + conversation_target=conversation_target, + create_all_resources=create_all_resources, + ) if case.case_priority.page_assignee: if not service_id: @@ -341,8 +344,9 @@ def case_update_flow( db_session=db_session, ) - # we send the case updated notification - update_conversation(case, db_session) + if case.conversation: + # we send the case updated notification + update_conversation(case, db_session) def case_delete_flow(case: Case, db_session: SessionLocal): @@ -608,7 +612,7 @@ def case_create_resources_flow( individual_participants: List[str], team_participants: List[str], conversation_target: str = None, - create_resources: bool = True, + create_all_resources: bool = True, ) -> None: """Runs the case resource creation flow.""" case = get(db_session=db_session, case_id=case_id) @@ -616,7 +620,7 @@ def case_create_resources_flow( if case.assignee: individual_participants.append((case.assignee.individual, None)) - if create_resources: + if create_all_resources: # we create the tactical group direct_participant_emails = [i.email for i, _ in individual_participants] @@ -654,9 +658,6 @@ def case_create_resources_flow( db_session=db_session, ) - # we update the ticket - ticket_flows.update_case_ticket(case=case, db_session=db_session) - # we update the case document document_flows.update_document( document=case.case_document, project_id=case.project.id, db_session=db_session @@ -706,3 +707,6 @@ def case_create_resources_flow( case_id=case.id, ) log.exception(e) + + # we update the ticket + ticket_flows.update_case_ticket(case=case, db_session=db_session) diff --git a/src/dispatch/case/messaging.py b/src/dispatch/case/messaging.py new file mode 100644 index 000000000000..9cb7780f6e1d --- /dev/null +++ b/src/dispatch/case/messaging.py @@ -0,0 +1,87 @@ +""" +.. module: dispatch.case.messaging + :platform: Unix + :copyright: (c) 2019 by Netflix Inc., see AUTHORS for more + :license: Apache, see LICENSE for more details. +""" +import logging + +from dispatch.database.core import SessionLocal +from dispatch.case.models import Case +from dispatch.messaging.strings import ( + CASE_CLOSE_REMINDER, + CASE_TRIAGE_REMINDER, + MessageType, +) +from dispatch.plugin import service as plugin_service + + +log = logging.getLogger(__name__) + + +def send_case_close_reminder(case: Case, db_session: SessionLocal): + """ + Sends a direct message to the assignee reminding them to close the case if possible. + """ + message_text = "Case Close Reminder" + message_template = CASE_CLOSE_REMINDER + + plugin = plugin_service.get_active_instance( + db_session=db_session, project_id=case.project.id, plugin_type="conversation" + ) + if not plugin: + log.warning("Case close reminder message not sent. No conversation plugin enabled.") + return + + items = [ + { + "name": case.name, + "ticket_weblink": case.ticket.weblink, + "title": case.title, + "status": case.status, + } + ] + + plugin.instance.send_direct( + case.assignee.individual.email, + message_text, + message_template, + MessageType.case_status_reminder, + items=items, + ) + + log.debug(f"Case close reminder sent to {case.assignee.individual.email}.") + + +def send_case_triage_reminder(case: Case, db_session: SessionLocal): + """ + Sends a direct message to the assignee reminding them to triage the case if possible. + """ + message_text = "Case Triage Reminder" + message_template = CASE_TRIAGE_REMINDER + + plugin = plugin_service.get_active_instance( + db_session=db_session, project_id=case.project.id, plugin_type="conversation" + ) + if not plugin: + log.warning("Case triage reminder message not sent. No conversation plugin enabled.") + return + + items = [ + { + "name": case.name, + "ticket_weblink": case.ticket.weblink, + "title": case.title, + "status": case.status, + } + ] + + plugin.instance.send_direct( + case.assignee.individual.email, + message_text, + message_template, + MessageType.case_status_reminder, + items=items, + ) + + log.debug(f"Case triage reminder sent to {case.assignee.individual.email}.") diff --git a/src/dispatch/case/scheduled.py b/src/dispatch/case/scheduled.py new file mode 100644 index 000000000000..e3f419cc5265 --- /dev/null +++ b/src/dispatch/case/scheduled.py @@ -0,0 +1,47 @@ +from datetime import datetime, date +from schedule import every + +from dispatch.database.core import SessionLocal +from dispatch.decorators import scheduled_project_task, timer +from dispatch.project.models import Project +from dispatch.scheduler import scheduler + +from .enums import CaseStatus +from .messaging import send_case_close_reminder, send_case_triage_reminder +from .service import ( + get_all_by_status, +) + + +@scheduler.add(every(1).day.at("18:00"), name="case-close-reminder") +@timer +@scheduled_project_task +def case_close_reminder(db_session: SessionLocal, project: Project): + """Sends a reminder to the case assignee to close out their case.""" + cases = get_all_by_status( + db_session=db_session, project_id=project.id, status=CaseStatus.triage + ) + + for case in cases: + span = datetime.utcnow() - case.triage_at + q, r = divmod(span.days, 7) + if q >= 1 and date.today().isoweekday() == 1: + # we only send the reminder for cases that have been triaging + # longer than a week and only on Mondays + send_case_close_reminder(case, db_session) + + +@scheduler.add(every(1).day.at("18:00"), name="case-triage-reminder") +@timer +@scheduled_project_task +def case_triage_reminder(db_session: SessionLocal, project: Project): + """Sends a reminder to the case assignee to triage their case.""" + cases = get_all_by_status(db_session=db_session, project_id=project.id, status=CaseStatus.new) + + # if we want more specific SLA reminders, we would need to add additional data model + for case in cases: + span = datetime.utcnow() - case.created_at + q, r = divmod(span.days, 1) + if q >= 1: + # we only send one reminder per case per day + send_case_triage_reminder(case, db_session) diff --git a/src/dispatch/case/service.py b/src/dispatch/case/service.py index 70499d0b5acc..e46c16da28f5 100644 --- a/src/dispatch/case/service.py +++ b/src/dispatch/case/service.py @@ -318,7 +318,7 @@ def update(*, db_session, case: Case, case_in: CaseUpdate, current_user: Dispatc db_session=db_session, source="Dispatch Core App", description=( - f"Case status changed to {case_in.status.lower()} " f"by {current_user.email}" + f"Case status changed to {case_in.status.lower()} by {current_user.email}" ), dispatch_user_id=current_user.id, case_id=case.id, diff --git a/src/dispatch/cli.py b/src/dispatch/cli.py index 95ae7d5a3f96..8ed30c41518d 100644 --- a/src/dispatch/cli.py +++ b/src/dispatch/cli.py @@ -3,13 +3,13 @@ import click import uvicorn + from dispatch import __version__, config from dispatch.enums import UserRoles from dispatch.plugin.models import PluginInstance -from .scheduler import scheduler from .extensions import configure_extensions - +from .scheduler import scheduler os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" @@ -80,10 +80,10 @@ def list_plugins(): ) def install_plugins(force): """Installs all plugins, or only one.""" + from dispatch.common.utils.cli import install_plugins from dispatch.database.core import SessionLocal from dispatch.plugin import service as plugin_service from dispatch.plugin.models import Plugin - from dispatch.common.utils.cli import install_plugins from dispatch.plugins.base import plugins install_plugins() @@ -162,9 +162,9 @@ def dispatch_user(): ) def register_user(email: str, role: str, password: str, organization: str): """Registers a new user.""" - from dispatch.database.core import refetch_db_session from dispatch.auth import service as user_service - from dispatch.auth.models import UserRegister, UserOrganization + from dispatch.auth.models import UserOrganization, UserRegister + from dispatch.database.core import refetch_db_session db_session = refetch_db_session(organization_slug=organization) user = user_service.get_by_email(email=email, db_session=db_session) @@ -198,9 +198,9 @@ def register_user(email: str, role: str, password: str, organization: str): ) def update_user(email: str, role: str, organization: str): """Updates a user's roles.""" - from dispatch.database.core import SessionLocal from dispatch.auth import service as user_service - from dispatch.auth.models import UserUpdate, UserOrganization + from dispatch.auth.models import UserOrganization, UserUpdate + from dispatch.database.core import SessionLocal db_session = SessionLocal() user = user_service.get_by_email(email=email, db_session=db_session) @@ -222,9 +222,9 @@ def update_user(email: str, role: str, organization: str): @click.password_option() def reset_user_password(email: str, password: str): """Resets a user's password.""" - from dispatch.database.core import SessionLocal from dispatch.auth import service as user_service from dispatch.auth.models import UserUpdate + from dispatch.database.core import SessionLocal db_session = SessionLocal() user = user_service.get_by_email(email=email, db_session=db_session) @@ -249,9 +249,7 @@ def database_init(): """Initializes a new database.""" click.echo("Initializing new database...") from .database.core import engine - from .database.manage import ( - init_database, - ) + from .database.manage import init_database init_database(engine) click.secho("Success.", fg="green") @@ -265,12 +263,13 @@ def database_init(): ) def restore_database(dump_file): """Restores the database via psql.""" - from sh import psql, createdb, ErrorReturnCode_1 + from sh import ErrorReturnCode_1, createdb, psql + from dispatch.config import ( + DATABASE_CREDENTIALS, DATABASE_HOSTNAME, DATABASE_NAME, DATABASE_PORT, - DATABASE_CREDENTIALS, ) username, password = str(DATABASE_CREDENTIALS).split(":") @@ -318,11 +317,12 @@ def restore_database(dump_file): def dump_database(dump_file): """Dumps the database via pg_dump.""" from sh import pg_dump + from dispatch.config import ( + DATABASE_CREDENTIALS, DATABASE_HOSTNAME, DATABASE_NAME, DATABASE_PORT, - DATABASE_CREDENTIALS, ) username, password = str(DATABASE_CREDENTIALS).split(":") @@ -345,7 +345,7 @@ def dump_database(dump_file): @click.option("--yes", is_flag=True, help="Silences all confirmation prompts.") def drop_database(yes): """Drops all data in database.""" - from sqlalchemy_utils import drop_database, database_exists + from sqlalchemy_utils import database_exists, drop_database if database_exists(str(config.SQLALCHEMY_DATABASE_URI)): if yes: @@ -378,10 +378,10 @@ def drop_database(yes): def upgrade_database(tag, sql, revision, revision_type): """Upgrades database schema to newest version.""" import sqlalchemy - from sqlalchemy import inspect - from sqlalchemy_utils import database_exists from alembic import command as alembic_command from alembic.config import Config as AlembicConfig + from sqlalchemy import inspect + from sqlalchemy_utils import database_exists from .database.core import engine from .database.manage import init_database @@ -570,6 +570,7 @@ def revision_database( ): """Create new database revision.""" import types + from alembic import command as alembic_command from alembic.config import Config as AlembicConfig @@ -623,23 +624,19 @@ def dispatch_scheduler(): from .evergreen.scheduled import create_evergreen_reminders # noqa from .feedback.incident.scheduled import feedback_report_daily # noqa from .feedback.service.scheduled import oncall_shift_feedback # noqa - from .incident_cost.scheduled import calculate_incidents_response_cost # noqa - from .incident.scheduled import ( # noqa - incident_auto_tagger, - incident_close_reminder, - incident_report_daily, + from .incident.scheduled import ( + incident_auto_tagger, # noqa ) + from .incident_cost.scheduled import calculate_incidents_response_cost # noqa from .monitor.scheduled import sync_active_stable_monitors # noqa from .report.scheduled import incident_report_reminders # noqa - from .signal.scheduled import consume_signals # noqa - from .tag.scheduled import sync_tags, build_tag_models # noqa - from .task.scheduled import ( # noqa - create_incident_tasks_reminders, - sync_incident_tasks_daily, - sync_active_stable_incident_tasks, + from .tag.scheduled import build_tag_models, sync_tags # noqa + from .task.scheduled import ( + create_incident_tasks_reminders, # noqa ) from .term.scheduled import sync_terms # noqa from .workflow.scheduled import sync_workflows # noqa + from .case.scheduled import case_triage_reminder, case_close_reminder # noqa @dispatch_scheduler.command("list") @@ -661,6 +658,7 @@ def list_tasks(): def start_tasks(tasks, exclude, eager): """Starts the scheduler.""" import signal + from dispatch.common.utils.cli import install_plugins install_plugins() @@ -704,6 +702,7 @@ def dispatch_server(): def show_routes(): """Prints all available routes.""" from tabulate import tabulate + from dispatch.main import api_router table = [] @@ -716,9 +715,11 @@ def show_routes(): @dispatch_server.command("config") def show_config(): """Prints the current config as dispatch sees it.""" - import sys import inspect + import sys + from tabulate import tabulate + from dispatch import config func_members = inspect.getmembers(sys.modules[config.__name__]) @@ -771,15 +772,105 @@ def signals_group(): pass +def _run_consume(plugin_slug: str, organization_slug: str, project_id: int, running: bool): + from dispatch.database.core import refetch_db_session + from dispatch.plugin import service as plugin_service + from dispatch.project import service as project_service + from dispatch.common.utils.cli import install_plugins + + install_plugins() + + db_session = refetch_db_session(organization_slug=organization_slug) + plugin = plugin_service.get_active_instance_by_slug( + db_session=db_session, slug=plugin_slug, project_id=project_id + ) + project = project_service.get(db_session=db_session, project_id=project_id) + while True: + if not running["is_running"]: + break + plugin.instance.consume(db_session=db_session, project=project) + + +@signals_group.command("consume") +def consume_signals(): + """Runs a continuous process that consumes signals from the specified plugin.""" + import time + from multiprocessing import Manager + import multiprocessing + + import signal + + from dispatch.common.utils.cli import install_plugins + from dispatch.project import service as project_service + from dispatch.plugin import service as plugin_service + + from dispatch.organization.service import get_all as get_all_organizations + from dispatch.database.core import SessionLocal, engine, sessionmaker + + install_plugins() + organizations = get_all_organizations(db_session=SessionLocal()) + + with Manager() as manager: + running = manager.dict() + running["is_running"] = True + workers = [] + + for organization in organizations: + schema_engine = engine.execution_options( + schema_translate_map={ + None: f"dispatch_organization_{organization.slug}", + } + ) + session = sessionmaker(bind=schema_engine)() + + projects = project_service.get_all(db_session=session) + for project in projects: + plugins = plugin_service.get_active_instances( + db_session=session, plugin_type="signal-consumer", project_id=project.id + ) + + if not plugins: + log.warning( + f"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}" + ) + + for plugin in plugins: + log.debug(f"Consuming signals for plugin: {plugin.plugin.slug}") + p = multiprocessing.Process( + target=_run_consume, + args=(plugin.plugin.slug, organization.slug, project.id, running), + ) + p.start() + workers.append(p) + print(workers) + + def terminate_processes(signum, frame): + print("Terminating main process...") + running["is_running"] = False # noqa + for worker in workers: + worker.join() + + signal.signal(signal.SIGINT, terminate_processes) + signal.signal(signal.SIGTERM, terminate_processes) + + # Keep the main thread running + while True: + if not running["is_running"]: + print("Main process terminating.") + break + time.sleep(1) + + @signals_group.command("process") def process_signals(): """Runs a continuous process that does additional processing on newly created signals.""" from sqlalchemy import asc - from dispatch.database.core import sessionmaker, engine, SessionLocal - from dispatch.signal.models import SignalInstance + + from dispatch.common.utils.cli import install_plugins + from dispatch.database.core import SessionLocal, engine, sessionmaker from dispatch.organization.service import get_all as get_all_organizations from dispatch.signal import flows as signal_flows - from dispatch.common.utils.cli import install_plugins + from dispatch.signal.models import SignalInstance install_plugins() @@ -818,20 +909,15 @@ def process_signals(): @click.argument("project") def run_slack_websocket(organization: str, project: str): """Runs the slack websocket process.""" - from sqlalchemy import true - from slack_bolt.adapter.socket_mode import SocketModeHandler + from sqlalchemy import true - from dispatch.database.core import refetch_db_session from dispatch.common.utils.cli import install_plugins + from dispatch.database.core import refetch_db_session from dispatch.plugins.dispatch_slack.bolt import app + from dispatch.plugins.dispatch_slack.case.interactive import configure as case_configure from dispatch.plugins.dispatch_slack.incident.interactive import configure as incident_configure - from dispatch.plugins.dispatch_slack.feedback.interactive import ( # noqa - configure as feedback_configure, - ) from dispatch.plugins.dispatch_slack.workflow import configure as workflow_configure - from dispatch.plugins.dispatch_slack.case.interactive import configure as case_configure - from dispatch.project import service as project_service from dispatch.project.models import ProjectRead @@ -883,6 +969,7 @@ def run_slack_websocket(organization: str, project: str): def shell(ipython_args): """Starts an ipython shell importing our app. Useful for debugging.""" import sys + import IPython from IPython.terminal.ipapp import load_default_config diff --git a/src/dispatch/messaging/strings.py b/src/dispatch/messaging/strings.py index 0facbef81ce4..877dac1ce69b 100644 --- a/src/dispatch/messaging/strings.py +++ b/src/dispatch/messaging/strings.py @@ -5,6 +5,7 @@ from dispatch.messaging.email.filters import env from dispatch.conversation.enums import ConversationButtonActions from dispatch.incident.enums import IncidentStatus +from dispatch.case.enums import CaseStatus from dispatch.enums import Visibility from dispatch import config @@ -34,6 +35,7 @@ class MessageType(DispatchEnum): incident_tactical_report = "incident-tactical-report" incident_task_list = "incident-task-list" incident_task_reminder = "incident-task-reminder" + case_status_reminder = "case-status-reminder" service_feedback = "service-feedback" @@ -43,6 +45,13 @@ class MessageType(DispatchEnum): IncidentStatus.closed: "This no longer requires additional involvement, long term incident action items have been assigned to their respective owners.", } +CASE_STATUS_DESCRIPTIONS = { + CaseStatus.new: "This case is new and needs triaging.", + CaseStatus.triage: "This case is being triaged.", + CaseStatus.escalated: "This case has been escalated.", + CaseStatus.closed: "This case has been closed.", +} + INCIDENT_VISIBILITY_DESCRIPTIONS = { Visibility.open: "We ask that you use your best judgment while sharing details about this incident outside of the dedicated channels of communication. Please reach out to the Incident Commander if you have any questions.", Visibility.restricted: "This incident is restricted to immediate participants of this incident. We ask that you exercise extra caution and discretion while talking about this incident outside of the dedicated channels of communication. Only invite new participants that are strictly necessary. Please reach out to the Incident Commander if you have any questions.", @@ -236,6 +245,16 @@ class MessageType(DispatchEnum): "\n", " " ).strip() +CASE_TRIAGE_REMINDER_DESCRIPTION = """The status of this case hasn't been updated recently. +Please ensure you triage the case based on its priority.""".replace( + "\n", " " +).strip() + +CASE_CLOSE_REMINDER_DESCRIPTION = """The status of this case hasn't been updated recently. +You can use the case 'Resolve' button if it has been resolved and can be closed.""".replace( + "\n", " " +).strip() + INCIDENT_TASK_NEW_DESCRIPTION = """ The following incident task has been created and assigned to you by {{task_creator}}: {{task_description}}""" @@ -390,6 +409,14 @@ class MessageType(DispatchEnum): INCIDENT_TITLE = {"title": "Title", "text": "{{title}}"} +CASE_TITLE = {"title": "Title", "text": "{{title}}"} + +CASE_STATUS = { + "title": "Status - {{status}}", + "status_mapping": CASE_STATUS_DESCRIPTIONS, +} + + if config.DISPATCH_MARKDOWN_IN_INCIDENT_DESC: INCIDENT_DESCRIPTION = {"title": "Description", "text": "{{description | markdown}}"} else: @@ -596,6 +623,28 @@ class MessageType(DispatchEnum): INCIDENT_STATUS, ] + +CASE_CLOSE_REMINDER = [ + { + "title": "{{name}} Case - Close Reminder", + "title_link": "{{ticket_weblink}}", + "text": CASE_CLOSE_REMINDER_DESCRIPTION, + }, + CASE_TITLE, + CASE_STATUS, +] + +CASE_TRIAGE_REMINDER = [ + { + "title": "{{name}} Case - Triage Reminder", + "title_link": "{{ticket_weblink}}", + "text": CASE_TRIAGE_REMINDER_DESCRIPTION, + }, + CASE_TITLE, + CASE_STATUS, +] + + INCIDENT_TASK_REMINDER = [ {"title": "Incident - {{ name }}", "text": "{{ title }}"}, {"title": "Creator", "text": "{{ creator }}"}, diff --git a/src/dispatch/plugins/dispatch_aws/__init__.py b/src/dispatch/plugins/dispatch_aws/__init__.py new file mode 100644 index 000000000000..ad5cc752c07b --- /dev/null +++ b/src/dispatch/plugins/dispatch_aws/__init__.py @@ -0,0 +1 @@ +from ._version import __version__ # noqa diff --git a/src/dispatch/plugins/dispatch_aws/_version.py b/src/dispatch/plugins/dispatch_aws/_version.py new file mode 100644 index 000000000000..3dc1f76bc69e --- /dev/null +++ b/src/dispatch/plugins/dispatch_aws/_version.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/src/dispatch/plugins/dispatch_aws/config.py b/src/dispatch/plugins/dispatch_aws/config.py new file mode 100644 index 000000000000..d14e3f58ae2a --- /dev/null +++ b/src/dispatch/plugins/dispatch_aws/config.py @@ -0,0 +1,29 @@ +from pydantic import Field +from dispatch.config import BaseConfigurationModel + + +class AWSSQSConfiguration(BaseConfigurationModel): + """Signal SQS configuration""" + + queue_name: str = Field( + title="Queue Name", + description="Queue Name, not the ARN.", + ) + + queue_owner: str = Field( + title="Queue Owner", + description="Queue Owner Account ID.", + ) + + region: str = Field( + title="AWS Region", + description="AWS Region.", + default="us-east-1", + ) + + batch_size: int = Field( + title="Batch Size", + description="Number of messages to retrieve from SQS.", + default=10, + le=10, + ) diff --git a/src/dispatch/plugins/dispatch_aws/plugin.py b/src/dispatch/plugins/dispatch_aws/plugin.py new file mode 100644 index 000000000000..7d15fec5ab8d --- /dev/null +++ b/src/dispatch/plugins/dispatch_aws/plugin.py @@ -0,0 +1,77 @@ +""" +.. module: dispatch.plugins.dispatchaws.plugin + :platform: Unix + :copyright: (c) 2023 by Netflix Inc., see AUTHORS for more + :license: Apache, see LICENSE for more details. +.. moduleauthor:: Kevin Glisson +""" +import boto3 +import json +import logging + +from dispatch.metrics import provider as metrics_provider +from dispatch.plugins.bases import SignalConsumerPlugin +from dispatch.signal import service as signal_service +from dispatch.signal.models import SignalInstanceCreate +from dispatch.plugins.dispatch_aws.config import AWSSQSConfiguration + +from . import __version__ + +log = logging.getLogger(__name__) + + +class AWSSQSSignalConsumerPlugin(SignalConsumerPlugin): + title = "AWS SQS - Signal Consumer" + slug = "aws-sqs-signal-consumer" + description = "Uses sqs to consume signals" + version = __version__ + + author = "Netflix" + author_url = "https://github.com/netflix/dispatch.git" + + def __init__(self): + self.configuration_schema = AWSSQSConfiguration + + def consume(self, db_session, project): + client = boto3.client("sqs", region_name=self.configuration.region) + queue_url: str = client.get_queue_url( + QueueName=self.configuration.queue_name, + QueueOwnerAWSAccountId=self.configuration.queue_owner, + )["QueueUrl"] + + response = client.receive_message( + QueueUrl=queue_url, + MaxNumberOfMessages=self.configuration.batch_size, + VisibilityTimeout=40, + WaitTimeSeconds=20, + ) + if response.get("Messages") and len(response.get("Messages")) > 0: + entries = [] + for message in response["Messages"]: + try: + body = json.loads(message["Body"]) + signal_data = json.loads(body["Message"]) + + signal_instance = signal_service.create_signal_instance( + db_session=db_session, + signal_instance_in=SignalInstanceCreate( + project=project, raw=signal_data, **signal_data + ), + ) + metrics_provider.counter( + "aws-sqs-signal-consumer.signal.received", + tags={ + "signalName": signal_instance.signal.name, + "externalId": signal_instance.signal.external_id, + }, + ) + log.debug( + f"Received signal: SignalName: {signal_instance.signal.name} ExernalId: {signal_instance.signal.external_id}" + ) + entries.append( + {"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]} + ) + except Exception as e: + log.exception(e) + + client.delete_message_batch(QueueUrl=queue_url, Entries=entries) diff --git a/src/dispatch/plugins/dispatch_slack/case/interactive.py b/src/dispatch/plugins/dispatch_slack/case/interactive.py index a49ad567497b..dd43e16ca54b 100644 --- a/src/dispatch/plugins/dispatch_slack/case/interactive.py +++ b/src/dispatch/plugins/dispatch_slack/case/interactive.py @@ -1519,17 +1519,39 @@ def engagement_button_approve_click( ).build() return client.views_open(trigger_id=body["trigger_id"], view=modal) + engagement = signal_service.get_signal_engagement( + db_session=db_session, + signal_engagement_id=context["subject"].engagement_id, + ) + + mfa_plugin = plugin_service.get_active_instance( + db_session=db_session, project_id=context["subject"].project_id, plugin_type="auth-mfa" + ) + mfa_enabled = True if mfa_plugin and engagement.require_mfa else False + + blocks = [ + Section(text="Confirm that this is expected and that it is not suspicious behavior."), + Divider(), + description_input(label="Additional Context", optional=False), + ] + + if mfa_enabled: + blocks.append(Section(text=" ")) + blocks.append( + Context( + elements=[ + "After submission, you will be asked to confirm a Multi-Factor Authentication (MFA) prompt, please have your MFA device ready." + ] + ), + ) + modal = Modal( submit="Submit", close="Cancel", title="Confirmation", callback_id=SignalEngagementActions.approve_submit, private_metadata=context["subject"].json(), - blocks=[ - Section(text="Confirm that this is expected and that it is not suspicious behavior."), - Divider(), - description_input(label="Additional Context", optional=False), - ], + blocks=blocks, ).build() client.views_open(trigger_id=body["trigger_id"], view=modal) diff --git a/src/dispatch/signal/exceptions.py b/src/dispatch/signal/exceptions.py new file mode 100644 index 000000000000..867412acd0d4 --- /dev/null +++ b/src/dispatch/signal/exceptions.py @@ -0,0 +1,13 @@ +from dispatch.exceptions import DispatchException + + +class SignalNotIdentifiedException(DispatchException): + pass + + +class SignalNotDefinedException(DispatchException): + pass + + +class SignalNotEnabledException(DispatchException): + pass diff --git a/src/dispatch/signal/flows.py b/src/dispatch/signal/flows.py index 680d10a75ed8..4891abf2dfad 100644 --- a/src/dispatch/signal/flows.py +++ b/src/dispatch/signal/flows.py @@ -147,7 +147,7 @@ def signal_instance_create_flow( service_id=None, conversation_target=conversation_target, case_id=case.id, - create_resources=False, + create_all_resources=False, ) if signal_instance.signal.engagements and entities: diff --git a/src/dispatch/signal/models.py b/src/dispatch/signal/models.py index d35154a8c325..a295c02fb486 100644 --- a/src/dispatch/signal/models.py +++ b/src/dispatch/signal/models.py @@ -363,11 +363,12 @@ class AdditionalMetadata(DispatchBase): class SignalInstanceBase(DispatchBase): - project: ProjectRead + project: Optional[ProjectRead] case: Optional[CaseReadMinimal] canary: Optional[bool] = False entities: Optional[List[EntityRead]] = [] raw: dict[str, Any] + external_id: Optional[str] filter_action: SignalFilterAction = None created_at: Optional[datetime] = None diff --git a/src/dispatch/signal/scheduled.py b/src/dispatch/signal/scheduled.py deleted file mode 100644 index f09d5415f396..000000000000 --- a/src/dispatch/signal/scheduled.py +++ /dev/null @@ -1,50 +0,0 @@ -""" -.. module: dispatch.signal.scheduled - :platform: Unix - :copyright: (c) 2022 by Netflix Inc., see AUTHORS for more - :license: Apache, see LICENSE for more details. -""" -import logging - -from schedule import every - -from dispatch.database.core import SessionLocal -from dispatch.decorators import scheduled_project_task, timer -from dispatch.plugin import service as plugin_service -from dispatch.project.models import Project -from dispatch.scheduler import scheduler -from dispatch.signal import flows as signal_flows - -log = logging.getLogger(__name__) - - -# TODO do we want per signal source flexibility? -@scheduler.add(every(1).minutes, name="signal-consume") -@timer -@scheduled_project_task -def consume_signals(db_session: SessionLocal, project: Project): - """Consume signals from external sources.""" - plugins = plugin_service.get_active_instances( - db_session=db_session, plugin_type="signal-consumer", project_id=project.id - ) - - if not plugins: - log.debug( - "No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}" - ) - return - - for plugin in plugins: - log.debug(f"Consuming signals using signal-consumer plugin: {plugin.plugin.slug}") - signal_instances = plugin.instance.consume() - for signal_instance_data in signal_instances: - log.info(f"Attempting to process the following signal: {signal_instance_data}") - try: - signal_flows.create_signal_instance( - db_session=db_session, - project=project, - signal_instance_data=signal_instance_data, - ) - except Exception as e: - log.debug(signal_instance_data) - log.exception(e) diff --git a/src/dispatch/signal/service.py b/src/dispatch/signal/service.py index f445763af842..eadc7e0924e1 100644 --- a/src/dispatch/signal/service.py +++ b/src/dispatch/signal/service.py @@ -18,6 +18,15 @@ from dispatch.tag import service as tag_service from dispatch.workflow import service as workflow_service from dispatch.entity.models import Entity +from sqlalchemy.exc import IntegrityError +from dispatch.entity_type.models import EntityScopeEnum +from dispatch.entity import service as entity_service + +from .exceptions import ( + SignalNotDefinedException, + SignalNotEnabledException, + SignalNotIdentifiedException, +) from .models import ( Signal, @@ -109,6 +118,58 @@ def get_signal_engagement_by_name_or_raise( return signal_engagement +def create_signal_instance(*, db_session: Session, signal_instance_in: SignalInstanceCreate): + if not signal_instance_in.signal: + external_id = signal_instance_in.external_id + + # this assumes the external_ids are uuids + if external_id: + signal = ( + db_session.query(Signal).filter(Signal.external_id == external_id).one_or_none() + ) + signal_instance_in.signal = signal + else: + msg = "An externalId must be provided." + raise SignalNotIdentifiedException(msg) + + if not signal: + msg = f"No signal definition found. ExternalId: {external_id}" + raise SignalNotDefinedException(msg) + + if not signal.enabled: + msg = f"Signal definition not enabled. SignalName: {signal.name} ExternalId: {signal.external_id}" + raise SignalNotEnabledException(msg) + + try: + signal_instance = create_instance( + db_session=db_session, signal_instance_in=signal_instance_in + ) + signal_instance.signal = signal + db_session.commit() + except IntegrityError: + db_session.rollback() + signal_instance = update_instance( + db_session=db_session, signal_instance_in=signal_instance_in + ) + # Note: we can do this because it's still relatively cheap, if we add more logic here + # this will need to be moved to a background function (similar to case creation) + # fetch `all` entities that should be associated with all signal definitions + entity_types = entity_type_service.get_all( + db_session=db_session, scope=EntityScopeEnum.all + ).all() + entity_types = signal_instance.signal.entity_types + entity_types + + if entity_types: + entities = entity_service.find_entities( + db_session=db_session, + signal_instance=signal_instance, + entity_types=entity_types, + ) + signal_instance.entities = entities + db_session.commit() + return signal_instance + + def create_signal_filter( *, db_session: Session, creator: DispatchUser, signal_filter_in: SignalFilterCreate ) -> SignalFilter: @@ -451,6 +512,7 @@ def create_instance( "project", "entities", "raw", + "external_id", } ), raw=json.loads(json.dumps(signal_instance_in.raw)), diff --git a/src/dispatch/static/dispatch/package-lock.json b/src/dispatch/static/dispatch/package-lock.json index 85d77f4e0cc1..dadfccd0d4e6 100644 --- a/src/dispatch/static/dispatch/package-lock.json +++ b/src/dispatch/static/dispatch/package-lock.json @@ -757,9 +757,9 @@ "optional": true }, "node_modules/@monaco-editor/loader": { - "version": "1.3.3", - "resolved": "https://registry.npmjs.org/@monaco-editor/loader/-/loader-1.3.3.tgz", - "integrity": "sha512-6KKF4CTzcJiS8BJwtxtfyYt9shBiEv32ateQ9T4UVogwn4HM/uPo9iJd2Dmbkpz8CM6Y0PDUpjnZzCwC+eYo2Q==", + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/@monaco-editor/loader/-/loader-1.4.0.tgz", + "integrity": "sha512-00ioBig0x642hytVspPl7DbQyaSWRaolYie/UFNjoTdvoKPzo6xrXLhTk9ixgIKcLH5b5vDOjVNiGyY+uDCUlg==", "dependencies": { "state-local": "^1.0.6" }, @@ -1386,9 +1386,9 @@ } }, "node_modules/apexcharts": { - "version": "3.42.0", - "resolved": "https://registry.npmjs.org/apexcharts/-/apexcharts-3.42.0.tgz", - "integrity": "sha512-hYhzZqh2Efny9uiutkGU2M/EarJ4Nn8s6dxZ0C7E7N+SV4d1xjTioXi2NLn4UKVJabZkb3HnpXDoumXgtAymwg==", + "version": "3.43.0", + "resolved": "https://registry.npmjs.org/apexcharts/-/apexcharts-3.43.0.tgz", + "integrity": "sha512-YPw1aLatPQMUqVLMp5d+LDaXFi4QrRQND72/XO7/2NJdg+R5MjE9sifJ0GzOfgoZM7ltBUTjwfSxIvwR/9V8yw==", "dependencies": { "@yr/monotone-cubic-spline": "^1.0.3", "svg.draggable.js": "^2.2.2", @@ -7201,9 +7201,9 @@ "optional": true }, "@monaco-editor/loader": { - "version": "1.3.3", - "resolved": "https://registry.npmjs.org/@monaco-editor/loader/-/loader-1.3.3.tgz", - "integrity": "sha512-6KKF4CTzcJiS8BJwtxtfyYt9shBiEv32ateQ9T4UVogwn4HM/uPo9iJd2Dmbkpz8CM6Y0PDUpjnZzCwC+eYo2Q==", + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/@monaco-editor/loader/-/loader-1.4.0.tgz", + "integrity": "sha512-00ioBig0x642hytVspPl7DbQyaSWRaolYie/UFNjoTdvoKPzo6xrXLhTk9ixgIKcLH5b5vDOjVNiGyY+uDCUlg==", "requires": { "state-local": "^1.0.6" } @@ -7743,9 +7743,9 @@ } }, "apexcharts": { - "version": "3.42.0", - "resolved": "https://registry.npmjs.org/apexcharts/-/apexcharts-3.42.0.tgz", - "integrity": "sha512-hYhzZqh2Efny9uiutkGU2M/EarJ4Nn8s6dxZ0C7E7N+SV4d1xjTioXi2NLn4UKVJabZkb3HnpXDoumXgtAymwg==", + "version": "3.43.0", + "resolved": "https://registry.npmjs.org/apexcharts/-/apexcharts-3.43.0.tgz", + "integrity": "sha512-YPw1aLatPQMUqVLMp5d+LDaXFi4QrRQND72/XO7/2NJdg+R5MjE9sifJ0GzOfgoZM7ltBUTjwfSxIvwR/9V8yw==", "requires": { "@yr/monotone-cubic-spline": "^1.0.3", "svg.draggable.js": "^2.2.2", diff --git a/src/dispatch/static/dispatch/src/components/AppToolbar.vue b/src/dispatch/static/dispatch/src/components/AppToolbar.vue index 78aaad9c1a61..f13d2926d180 100644 --- a/src/dispatch/static/dispatch/src/components/AppToolbar.vue +++ b/src/dispatch/static/dispatch/src/components/AppToolbar.vue @@ -120,7 +120,7 @@ color="blue" @change="updateExperimentalFeatures()" :label="currentUser().experimental_features ? 'Enabled' : 'Disabled'" - > + /> Organizations @@ -196,7 +196,11 @@ export default { UserApi.getUserInfo() .then((response) => { let userId = response.data.id - UserApi.update(userId, { id: userId, experimental_features: this.experimental_features }) + let newUserExperimentalFeatures = this.currentUser().experimental_features + UserApi.update(userId, { + id: userId, + experimental_features: newUserExperimentalFeatures, + }) }) .catch((error) => { console.error("Error occurred while updating experimental features: ", error) diff --git a/src/dispatch/static/dispatch/src/signal/Table.vue b/src/dispatch/static/dispatch/src/signal/Table.vue index a582a69dc8d5..296e737c64d5 100644 --- a/src/dispatch/static/dispatch/src/signal/Table.vue +++ b/src/dispatch/static/dispatch/src/signal/Table.vue @@ -42,6 +42,9 @@ :loading="loading" loading-text="Loading... Please wait" > + @@ -118,6 +121,7 @@ export default { { text: "Name", value: "name", align: "left", width: "10%" }, { text: "Variant", value: "variant", sortable: true }, { text: "Description", value: "description", sortable: false }, + { text: "Create Case", value: "create_case", sortable: true, width: "100px" }, { text: "Enabled", value: "enabled", sortable: true }, { text: "Owner", value: "owner" }, { text: "Case Type", value: "case_type" }, diff --git a/src/dispatch/ticket/flows.py b/src/dispatch/ticket/flows.py index c17d22836a81..94a96313d489 100644 --- a/src/dispatch/ticket/flows.py +++ b/src/dispatch/ticket/flows.py @@ -196,7 +196,7 @@ def update_case_ticket( case: Case, db_session: SessionLocal, ): - """Updates an case ticket.""" + """Updates a case ticket.""" plugin = plugin_service.get_active_instance( db_session=db_session, project_id=case.project.id, plugin_type="ticket" ) @@ -215,6 +215,14 @@ def update_case_ticket( case_type_in=case.case_type, ).get_meta(plugin.plugin.slug) + case_document_weblink = "" + if case.case_document: + case_document_weblink = resolve_attr(case, "case_document.weblink") + + case_storage_weblink = "" + if case.storage: + case_storage_weblink = resolve_attr(case, "storage.weblink") + # we update the external case ticket try: plugin.instance.update_case_ticket( @@ -227,8 +235,8 @@ def update_case_ticket( case.case_priority.name, case.status.lower(), case.assignee.individual.email, - resolve_attr(case, "case_document.weblink"), - resolve_attr(case, "storage.weblink"), + case_document_weblink, + case_storage_weblink, case_type_plugin_metadata=case_type_plugin_metadata, ) except Exception as e: