diff --git a/src/dispatch/plugins/dispatch_aws/plugin.py b/src/dispatch/plugins/dispatch_aws/plugin.py index fb6902e83cc1..518d301ffea6 100644 --- a/src/dispatch/plugins/dispatch_aws/plugin.py +++ b/src/dispatch/plugins/dispatch_aws/plugin.py @@ -39,17 +39,17 @@ def consume(self, db_session, project): QueueOwnerAWSAccountId=self.configuration.queue_owner, )["QueueUrl"] - while True: - response = client.receive_message( - QueueUrl=queue_url, - MaxNumberOfMessages=self.configuration.batch_size, - VisibilityTimeout=40, - WaitTimeSeconds=20, - ) - if response.get("Messages") and len(response.get("Messages")) > 0: - entries = [] - for message in response["Messages"]: - try: + try: + while True: + response = client.receive_message( + QueueUrl=queue_url, + MaxNumberOfMessages=self.configuration.batch_size, + VisibilityTimeout=40, + WaitTimeSeconds=20, + ) + if response.get("Messages") and len(response.get("Messages")) > 0: + entries = [] + for message in response["Messages"]: body = json.loads(message["Body"]) signal_data = json.loads(body["Message"]) @@ -72,7 +72,7 @@ def consume(self, db_session, project): entries.append( {"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]} ) - except Exception as e: - log.exception(e) - - client.delete_message_batch(QueueUrl=queue_url, Entries=entries) + if entries: + client.delete_message_batch(QueueUrl=queue_url, Entries=entries) + except Exception as e: + log.exception(e)