From 7b0e9884f39772e1a505f7fbb7d66ca21c893ca5 Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Wed, 27 Sep 2023 16:41:00 -0700 Subject: [PATCH] Fixes --- requirements-base.in | 3 +- setup.py | 1 + src/dispatch/cli.py | 31 +++++-------- src/dispatch/plugins/dispatch_aws/__init__.py | 1 + src/dispatch/plugins/dispatch_aws/config.py | 14 +++--- src/dispatch/plugins/dispatch_aws/plugin.py | 44 ++++++++++++------- src/dispatch/signal/models.py | 3 +- src/dispatch/signal/service.py | 31 +++++-------- 8 files changed, 65 insertions(+), 63 deletions(-) create mode 100644 src/dispatch/plugins/dispatch_aws/__init__.py diff --git a/requirements-base.in b/requirements-base.in index e6834e4d73dd..95003484d6f4 100644 --- a/requirements-base.in +++ b/requirements-base.in @@ -6,6 +6,7 @@ atlassian-python-api==3.32.0 attrs==22.1.0 bcrypt blockkit +boto3 cachetools chardet click @@ -44,8 +45,8 @@ schemathesis sentry-asgi sentry-sdk sh -slack-bolt slack_sdk +slack-bolt slowapi spacy sqlalchemy-filters diff --git a/setup.py b/setup.py index 0314fe703c4e..5b31ebe73601 100644 --- a/setup.py +++ b/setup.py @@ -404,6 +404,7 @@ def run(self): "dispatch.plugins": [ "dispatch_atlassian_confluence = dispatch.plugins.dispatch_atlassian_confluence.plugin:ConfluencePagePlugin", "dispatch_atlassian_confluence_document = dispatch.plugins.dispatch_atlassian_confluence.docs.plugin:ConfluencePageDocPlugin", + "dispatch_aws_sns = dispatch.plugins.dispatch_aws.plugin:AWSSQSSignalConsumerPlugin", "dispatch_basic_auth = dispatch.plugins.dispatch_core.plugin:BasicAuthProviderPlugin", "dispatch_contact = dispatch.plugins.dispatch_core.plugin:DispatchContactPlugin", "dispatch_document_resolver = dispatch.plugins.dispatch_core.plugin:DispatchDocumentResolverPlugin", diff --git a/src/dispatch/cli.py b/src/dispatch/cli.py index de62735e8d09..0ccf135ad224 100644 --- a/src/dispatch/cli.py +++ b/src/dispatch/cli.py @@ -775,15 +775,14 @@ def signals_group(): @signals_group.command("consume") @click.argument("organization") @click.argument("project") -@click.argument("plugin") -def consume_signals(organization, project, plugin): # TODO support multiple from one command +@click.argument("plugin_name") +def consume_signals(organization, project, plugin_name): # TODO support multiple from one command """Runs a continuous process that consumes signals from the specified plugin.""" - from sqlalchemy import true - from dispatch.common.utils.cli import install_plugins from dispatch.database.core import refetch_db_session from dispatch.project import service as project_service from dispatch.project.models import ProjectRead + from dispatch.plugin import service as plugin_service install_plugins() @@ -793,26 +792,20 @@ def consume_signals(organization, project, plugin): # TODO support multiple fro db_session=session, project_in=ProjectRead(name=project) ) - instances = ( - session.query(PluginInstance) - .filter(PluginInstance.enabled == true()) - .filter(PluginInstance.project_id == project.id) - .all() + plugins = plugin_service.get_active_instances( + db_session=session, plugin_type="signal-consumer", project_id=project.id ) - instance = None - for i in instances: - if i.plugin.slug == "signal-consumer": - instance: PluginInstance = i - break - - if not instance: - click.secho( - f"No signal consumer plugin has been configured for this organization/plugin. Organization: {organization} Project: {project}", - fg="red", + if not plugins: + log.debug( + "No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}" ) return + for plugin in plugins: + if plugin.plugin.slug == plugin_name: + plugin.instance.consume(db_session=session, project=project) + @signals_group.command("process") def process_signals(): diff --git a/src/dispatch/plugins/dispatch_aws/__init__.py b/src/dispatch/plugins/dispatch_aws/__init__.py new file mode 100644 index 000000000000..ad5cc752c07b --- /dev/null +++ b/src/dispatch/plugins/dispatch_aws/__init__.py @@ -0,0 +1 @@ +from ._version import __version__ # noqa diff --git a/src/dispatch/plugins/dispatch_aws/config.py b/src/dispatch/plugins/dispatch_aws/config.py index 557886e6166b..d14e3f58ae2a 100644 --- a/src/dispatch/plugins/dispatch_aws/config.py +++ b/src/dispatch/plugins/dispatch_aws/config.py @@ -3,16 +3,16 @@ class AWSSQSConfiguration(BaseConfigurationModel): - """SQS configuration description.""" + """Signal SQS configuration""" queue_name: str = Field( - title="SQS Queue Name", - description="SQS Queue Name, not the ARN.", + title="Queue Name", + description="Queue Name, not the ARN.", ) queue_owner: str = Field( - title="SQS Queue Owner", - description="SQS Queue Owner Account ID.", + title="Queue Owner", + description="Queue Owner Account ID.", ) region: str = Field( @@ -22,8 +22,8 @@ class AWSSQSConfiguration(BaseConfigurationModel): ) batch_size: int = Field( - title="SQS Batch Size", - description="SQS Batch Size.", + title="Batch Size", + description="Number of messages to retrieve from SQS.", default=10, le=10, ) diff --git a/src/dispatch/plugins/dispatch_aws/plugin.py b/src/dispatch/plugins/dispatch_aws/plugin.py index 8210e94e1110..f2a1c770b354 100644 --- a/src/dispatch/plugins/dispatch_aws/plugin.py +++ b/src/dispatch/plugins/dispatch_aws/plugin.py @@ -12,15 +12,15 @@ from dispatch.metrics import provider as metrics_provider from dispatch.plugins.bases import SignalConsumerPlugin from dispatch.signal import service as signal_service -from dispatch.signal.models import SignalInstance -from dispatch.plugins.dispatch_aws.config import AWSSQSConfigurationSchema +from dispatch.signal.models import SignalInstanceCreate +from dispatch.plugins.dispatch_aws.config import AWSSQSConfiguration from . import __version__ log = logging.getLogger(__name__) -class SQSSignalConsumerPlugin(SignalConsumerPlugin): +class AWSSQSSignalConsumerPlugin(SignalConsumerPlugin): title = "AWS SQS - Signal Consumer" slug = "aws-sqs-signal-consumer" description = "Uses sqs to consume signals" @@ -30,37 +30,49 @@ class SQSSignalConsumerPlugin(SignalConsumerPlugin): author_url = "https://github.com/netflix/dispatch.git" def __init__(self): - self.configuration_schema = AWSSQSConfigurationSchema + self.configuration_schema = AWSSQSConfiguration - def consume( - self, - ): + def consume(self, db_session, project): client = boto3.client("sqs", region_name=self.configuration.region) - sqs_queue_url: str = client.get_queue_url( - QueueName=self.configuration.queue_name, QueueOwnerAWSAccountId=self.sqs_queue_owner + queue_url: str = client.get_queue_url( + QueueName=self.configuration.queue_name, + QueueOwnerAWSAccountId=self.configuration.queue_owner, )["QueueUrl"] while True: response = client.receive_message( - QueueUrl=sqs_queue_url, + QueueUrl=queue_url, MaxNumberOfMessages=self.configuration.batch_size, - VisibilityTimeout=2 * self.round_length, - WaitTimeSeconds=self.round_length, + VisibilityTimeout=40, + WaitTimeSeconds=20, ) if response.get("Messages") and len(response.get("Messages")) > 0: entries = [] for message in response["Messages"]: try: body = json.loads(message["Body"]) - signal = signal_service.create_signal_instance(SignalInstance(**body)) + signal_data = json.loads(body["Message"]) + + signal_instance = signal_service.create_signal_instance( + db_session=db_session, + signal_instance_in=SignalInstanceCreate( + project=project, raw=signal_data, **signal_data + ), + ) metrics_provider.counter( - "sqs.signal.received", tags={"signalName": signal.name} + "sqs.signal.received", + tags={ + "signalName": signal_instance.signal.name, + "externalId": signal_instance.signal.external_id, + }, + ) + log.debug( + f"Received signal: SignalName: {signal_instance.signal.name} ExernalId: {signal_instance.signal.external_id}" ) - log.debug(f"Received signal: {signal}") entries.append( {"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]} ) except Exception as e: log.exception(e) - client.delete_message_batch(QueueUrl=sqs_queue_url, Entries=entries) + client.delete_message_batch(QueueUrl=queue_url, Entries=entries) diff --git a/src/dispatch/signal/models.py b/src/dispatch/signal/models.py index d35154a8c325..a295c02fb486 100644 --- a/src/dispatch/signal/models.py +++ b/src/dispatch/signal/models.py @@ -363,11 +363,12 @@ class AdditionalMetadata(DispatchBase): class SignalInstanceBase(DispatchBase): - project: ProjectRead + project: Optional[ProjectRead] case: Optional[CaseReadMinimal] canary: Optional[bool] = False entities: Optional[List[EntityRead]] = [] raw: dict[str, Any] + external_id: Optional[str] filter_action: SignalFilterAction = None created_at: Optional[datetime] = None diff --git a/src/dispatch/signal/service.py b/src/dispatch/signal/service.py index 8c87b3f1c4bc..eadc7e0924e1 100644 --- a/src/dispatch/signal/service.py +++ b/src/dispatch/signal/service.py @@ -25,7 +25,7 @@ from .exceptions import ( SignalNotDefinedException, SignalNotEnabledException, - SignalNotIdentifiableException, + SignalNotIdentifiedException, ) from .models import ( @@ -119,33 +119,25 @@ def get_signal_engagement_by_name_or_raise( def create_signal_instance(*, db_session: Session, signal_instance_in: SignalInstanceCreate): - project = project_service.get_by_name_or_default( - db_session=db_session, project_in=signal_instance_in.project - ) - if not signal_instance_in.signal: - external_id = signal_instance_in.raw.get("externalId") - variant = signal_instance_in.raw.get("variant") + external_id = signal_instance_in.external_id - if external_id or variant: - signal = get_by_variant_or_external_id( - db_session=db_session, - project_id=project.id, - external_id=external_id, - variant=variant, + # this assumes the external_ids are uuids + if external_id: + signal = ( + db_session.query(Signal).filter(Signal.external_id == external_id).one_or_none() ) - signal_instance_in.signal = signal else: - msg = "An external id or variant must be provided." - raise SignalNotIdentifiableException(msg) + msg = "An externalId must be provided." + raise SignalNotIdentifiedException(msg) if not signal: - msg = f"No signal definition found. External Id: {external_id} Variant: {variant}" + msg = f"No signal definition found. ExternalId: {external_id}" raise SignalNotDefinedException(msg) if not signal.enabled: - msg = f"Signal definition not enabled. Signal Name: {signal.name}" + msg = f"Signal definition not enabled. SignalName: {signal.name} ExternalId: {signal.external_id}" raise SignalNotEnabledException(msg) try: @@ -159,7 +151,7 @@ def create_signal_instance(*, db_session: Session, signal_instance_in: SignalIns signal_instance = update_instance( db_session=db_session, signal_instance_in=signal_instance_in ) - # Note: we can do this because it's still relatively cheap, if we add more logic to the flow + # Note: we can do this because it's still relatively cheap, if we add more logic here # this will need to be moved to a background function (similar to case creation) # fetch `all` entities that should be associated with all signal definitions entity_types = entity_type_service.get_all( @@ -520,6 +512,7 @@ def create_instance( "project", "entities", "raw", + "external_id", } ), raw=json.loads(json.dumps(signal_instance_in.raw)),