Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kevgliss committed Sep 27, 2023
1 parent aca9f98 commit 7b0e988
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 63 deletions.
3 changes: 2 additions & 1 deletion requirements-base.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ atlassian-python-api==3.32.0
attrs==22.1.0
bcrypt
blockkit
boto3
cachetools
chardet
click
Expand Down Expand Up @@ -44,8 +45,8 @@ schemathesis
sentry-asgi
sentry-sdk
sh
slack-bolt
slack_sdk
slack-bolt
slowapi
spacy
sqlalchemy-filters
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ def run(self):
"dispatch.plugins": [
"dispatch_atlassian_confluence = dispatch.plugins.dispatch_atlassian_confluence.plugin:ConfluencePagePlugin",
"dispatch_atlassian_confluence_document = dispatch.plugins.dispatch_atlassian_confluence.docs.plugin:ConfluencePageDocPlugin",
"dispatch_aws_sns = dispatch.plugins.dispatch_aws.plugin:AWSSQSSignalConsumerPlugin",
"dispatch_basic_auth = dispatch.plugins.dispatch_core.plugin:BasicAuthProviderPlugin",
"dispatch_contact = dispatch.plugins.dispatch_core.plugin:DispatchContactPlugin",
"dispatch_document_resolver = dispatch.plugins.dispatch_core.plugin:DispatchDocumentResolverPlugin",
Expand Down
31 changes: 12 additions & 19 deletions src/dispatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,15 +775,14 @@ def signals_group():
@signals_group.command("consume")
@click.argument("organization")
@click.argument("project")
@click.argument("plugin")
def consume_signals(organization, project, plugin): # TODO support multiple from one command
@click.argument("plugin_name")
def consume_signals(organization, project, plugin_name): # TODO support multiple from one command
"""Runs a continuous process that consumes signals from the specified plugin."""
from sqlalchemy import true

from dispatch.common.utils.cli import install_plugins
from dispatch.database.core import refetch_db_session
from dispatch.project import service as project_service
from dispatch.project.models import ProjectRead
from dispatch.plugin import service as plugin_service

install_plugins()

Expand All @@ -793,26 +792,20 @@ def consume_signals(organization, project, plugin): # TODO support multiple fro
db_session=session, project_in=ProjectRead(name=project)
)

instances = (
session.query(PluginInstance)
.filter(PluginInstance.enabled == true())
.filter(PluginInstance.project_id == project.id)
.all()
plugins = plugin_service.get_active_instances(
db_session=session, plugin_type="signal-consumer", project_id=project.id
)

instance = None
for i in instances:
if i.plugin.slug == "signal-consumer":
instance: PluginInstance = i
break

if not instance:
click.secho(
f"No signal consumer plugin has been configured for this organization/plugin. Organization: {organization} Project: {project}",
fg="red",
if not plugins:
log.debug(
"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}"
)
return

for plugin in plugins:
if plugin.plugin.slug == plugin_name:
plugin.instance.consume(db_session=session, project=project)


@signals_group.command("process")
def process_signals():
Expand Down
1 change: 1 addition & 0 deletions src/dispatch/plugins/dispatch_aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from ._version import __version__ # noqa
14 changes: 7 additions & 7 deletions src/dispatch/plugins/dispatch_aws/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@


class AWSSQSConfiguration(BaseConfigurationModel):
"""SQS configuration description."""
"""Signal SQS configuration"""

queue_name: str = Field(
title="SQS Queue Name",
description="SQS Queue Name, not the ARN.",
title="Queue Name",
description="Queue Name, not the ARN.",
)

queue_owner: str = Field(
title="SQS Queue Owner",
description="SQS Queue Owner Account ID.",
title="Queue Owner",
description="Queue Owner Account ID.",
)

region: str = Field(
Expand All @@ -22,8 +22,8 @@ class AWSSQSConfiguration(BaseConfigurationModel):
)

batch_size: int = Field(
title="SQS Batch Size",
description="SQS Batch Size.",
title="Batch Size",
description="Number of messages to retrieve from SQS.",
default=10,
le=10,
)
44 changes: 28 additions & 16 deletions src/dispatch/plugins/dispatch_aws/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
from dispatch.metrics import provider as metrics_provider
from dispatch.plugins.bases import SignalConsumerPlugin
from dispatch.signal import service as signal_service
from dispatch.signal.models import SignalInstance
from dispatch.plugins.dispatch_aws.config import AWSSQSConfigurationSchema
from dispatch.signal.models import SignalInstanceCreate
from dispatch.plugins.dispatch_aws.config import AWSSQSConfiguration

from . import __version__

log = logging.getLogger(__name__)


class SQSSignalConsumerPlugin(SignalConsumerPlugin):
class AWSSQSSignalConsumerPlugin(SignalConsumerPlugin):
title = "AWS SQS - Signal Consumer"
slug = "aws-sqs-signal-consumer"
description = "Uses sqs to consume signals"
Expand All @@ -30,37 +30,49 @@ class SQSSignalConsumerPlugin(SignalConsumerPlugin):
author_url = "https://github.com/netflix/dispatch.git"

def __init__(self):
self.configuration_schema = AWSSQSConfigurationSchema
self.configuration_schema = AWSSQSConfiguration

def consume(
self,
):
def consume(self, db_session, project):
client = boto3.client("sqs", region_name=self.configuration.region)
sqs_queue_url: str = client.get_queue_url(
QueueName=self.configuration.queue_name, QueueOwnerAWSAccountId=self.sqs_queue_owner
queue_url: str = client.get_queue_url(
QueueName=self.configuration.queue_name,
QueueOwnerAWSAccountId=self.configuration.queue_owner,
)["QueueUrl"]

while True:
response = client.receive_message(
QueueUrl=sqs_queue_url,
QueueUrl=queue_url,
MaxNumberOfMessages=self.configuration.batch_size,
VisibilityTimeout=2 * self.round_length,
WaitTimeSeconds=self.round_length,
VisibilityTimeout=40,
WaitTimeSeconds=20,
)
if response.get("Messages") and len(response.get("Messages")) > 0:
entries = []
for message in response["Messages"]:
try:
body = json.loads(message["Body"])
signal = signal_service.create_signal_instance(SignalInstance(**body))
signal_data = json.loads(body["Message"])

signal_instance = signal_service.create_signal_instance(
db_session=db_session,
signal_instance_in=SignalInstanceCreate(
project=project, raw=signal_data, **signal_data
),
)
metrics_provider.counter(
"sqs.signal.received", tags={"signalName": signal.name}
"sqs.signal.received",
tags={
"signalName": signal_instance.signal.name,
"externalId": signal_instance.signal.external_id,
},
)
log.debug(
f"Received signal: SignalName: {signal_instance.signal.name} ExernalId: {signal_instance.signal.external_id}"
)
log.debug(f"Received signal: {signal}")
entries.append(
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}
)
except Exception as e:
log.exception(e)

client.delete_message_batch(QueueUrl=sqs_queue_url, Entries=entries)
client.delete_message_batch(QueueUrl=queue_url, Entries=entries)
3 changes: 2 additions & 1 deletion src/dispatch/signal/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,12 @@ class AdditionalMetadata(DispatchBase):


class SignalInstanceBase(DispatchBase):
project: ProjectRead
project: Optional[ProjectRead]
case: Optional[CaseReadMinimal]
canary: Optional[bool] = False
entities: Optional[List[EntityRead]] = []
raw: dict[str, Any]
external_id: Optional[str]
filter_action: SignalFilterAction = None
created_at: Optional[datetime] = None

Expand Down
31 changes: 12 additions & 19 deletions src/dispatch/signal/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .exceptions import (
SignalNotDefinedException,
SignalNotEnabledException,
SignalNotIdentifiableException,
SignalNotIdentifiedException,
)

from .models import (
Expand Down Expand Up @@ -119,33 +119,25 @@ def get_signal_engagement_by_name_or_raise(


def create_signal_instance(*, db_session: Session, signal_instance_in: SignalInstanceCreate):
project = project_service.get_by_name_or_default(
db_session=db_session, project_in=signal_instance_in.project
)

if not signal_instance_in.signal:
external_id = signal_instance_in.raw.get("externalId")
variant = signal_instance_in.raw.get("variant")
external_id = signal_instance_in.external_id

if external_id or variant:
signal = get_by_variant_or_external_id(
db_session=db_session,
project_id=project.id,
external_id=external_id,
variant=variant,
# this assumes the external_ids are uuids
if external_id:
signal = (
db_session.query(Signal).filter(Signal.external_id == external_id).one_or_none()
)

signal_instance_in.signal = signal
else:
msg = "An external id or variant must be provided."
raise SignalNotIdentifiableException(msg)
msg = "An externalId must be provided."
raise SignalNotIdentifiedException(msg)

if not signal:
msg = f"No signal definition found. External Id: {external_id} Variant: {variant}"
msg = f"No signal definition found. ExternalId: {external_id}"
raise SignalNotDefinedException(msg)

if not signal.enabled:
msg = f"Signal definition not enabled. Signal Name: {signal.name}"
msg = f"Signal definition not enabled. SignalName: {signal.name} ExternalId: {signal.external_id}"
raise SignalNotEnabledException(msg)

try:
Expand All @@ -159,7 +151,7 @@ def create_signal_instance(*, db_session: Session, signal_instance_in: SignalIns
signal_instance = update_instance(
db_session=db_session, signal_instance_in=signal_instance_in
)
# Note: we can do this because it's still relatively cheap, if we add more logic to the flow
# Note: we can do this because it's still relatively cheap, if we add more logic here
# this will need to be moved to a background function (similar to case creation)
# fetch `all` entities that should be associated with all signal definitions
entity_types = entity_type_service.get_all(
Expand Down Expand Up @@ -520,6 +512,7 @@ def create_instance(
"project",
"entities",
"raw",
"external_id",
}
),
raw=json.loads(json.dumps(signal_instance_in.raw)),
Expand Down

0 comments on commit 7b0e988

Please sign in to comment.