From 9acbd0904330fd89087b332527c46f33f8c28d5f Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Mon, 16 Oct 2023 12:06:25 -0700 Subject: [PATCH] Ensures that signals are actually consumed forever --- src/dispatch/cli.py | 14 ++--- src/dispatch/plugins/dispatch_aws/plugin.py | 69 +++++++++++---------- 2 files changed, 42 insertions(+), 41 deletions(-) diff --git a/src/dispatch/cli.py b/src/dispatch/cli.py index 8ed30c41518d..ca2a9609d9ee 100644 --- a/src/dispatch/cli.py +++ b/src/dispatch/cli.py @@ -836,13 +836,13 @@ def consume_signals(): for plugin in plugins: log.debug(f"Consuming signals for plugin: {plugin.plugin.slug}") - p = multiprocessing.Process( - target=_run_consume, - args=(plugin.plugin.slug, organization.slug, project.id, running), - ) - p.start() - workers.append(p) - print(workers) + for _ in range(5): # TODO add plugin.instance.concurrency: + p = multiprocessing.Process( + target=_run_consume, + args=(plugin.plugin.slug, organization.slug, project.id, running), + ) + p.start() + workers.append(p) def terminate_processes(signum, frame): print("Terminating main process...") diff --git a/src/dispatch/plugins/dispatch_aws/plugin.py b/src/dispatch/plugins/dispatch_aws/plugin.py index 7d15fec5ab8d..fb6902e83cc1 100644 --- a/src/dispatch/plugins/dispatch_aws/plugin.py +++ b/src/dispatch/plugins/dispatch_aws/plugin.py @@ -39,39 +39,40 @@ def consume(self, db_session, project): QueueOwnerAWSAccountId=self.configuration.queue_owner, )["QueueUrl"] - response = client.receive_message( - QueueUrl=queue_url, - MaxNumberOfMessages=self.configuration.batch_size, - VisibilityTimeout=40, - WaitTimeSeconds=20, - ) - if response.get("Messages") and len(response.get("Messages")) > 0: - entries = [] - for message in response["Messages"]: - try: - body = json.loads(message["Body"]) - signal_data = json.loads(body["Message"]) + while True: + response = client.receive_message( + QueueUrl=queue_url, + MaxNumberOfMessages=self.configuration.batch_size, + VisibilityTimeout=40, + WaitTimeSeconds=20, + ) + if response.get("Messages") and len(response.get("Messages")) > 0: + entries = [] + for message in response["Messages"]: + try: + body = json.loads(message["Body"]) + signal_data = json.loads(body["Message"]) - signal_instance = signal_service.create_signal_instance( - db_session=db_session, - signal_instance_in=SignalInstanceCreate( - project=project, raw=signal_data, **signal_data - ), - ) - metrics_provider.counter( - "aws-sqs-signal-consumer.signal.received", - tags={ - "signalName": signal_instance.signal.name, - "externalId": signal_instance.signal.external_id, - }, - ) - log.debug( - f"Received signal: SignalName: {signal_instance.signal.name} ExernalId: {signal_instance.signal.external_id}" - ) - entries.append( - {"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]} - ) - except Exception as e: - log.exception(e) + signal_instance = signal_service.create_signal_instance( + db_session=db_session, + signal_instance_in=SignalInstanceCreate( + project=project, raw=signal_data, **signal_data + ), + ) + metrics_provider.counter( + "aws-sqs-signal-consumer.signal.received", + tags={ + "signalName": signal_instance.signal.name, + "externalId": signal_instance.signal.external_id, + }, + ) + log.debug( + f"Received signal: SignalName: {signal_instance.signal.name} ExernalId: {signal_instance.signal.external_id}" + ) + entries.append( + {"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]} + ) + except Exception as e: + log.exception(e) - client.delete_message_batch(QueueUrl=queue_url, Entries=entries) + client.delete_message_batch(QueueUrl=queue_url, Entries=entries)