Skip to content

Commit

Permalink
Ensures that signals are actually consumed forever (#3874)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevgliss authored Oct 16, 2023
1 parent 2ed3927 commit 2a7e051
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 41 deletions.
14 changes: 7 additions & 7 deletions src/dispatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,13 +836,13 @@ def consume_signals():

for plugin in plugins:
log.debug(f"Consuming signals for plugin: {plugin.plugin.slug}")
p = multiprocessing.Process(
target=_run_consume,
args=(plugin.plugin.slug, organization.slug, project.id, running),
)
p.start()
workers.append(p)
print(workers)
for _ in range(5): # TODO add plugin.instance.concurrency:
p = multiprocessing.Process(
target=_run_consume,
args=(plugin.plugin.slug, organization.slug, project.id, running),
)
p.start()
workers.append(p)

def terminate_processes(signum, frame):
print("Terminating main process...")
Expand Down
69 changes: 35 additions & 34 deletions src/dispatch/plugins/dispatch_aws/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,39 +39,40 @@ def consume(self, db_session, project):
QueueOwnerAWSAccountId=self.configuration.queue_owner,
)["QueueUrl"]

response = client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=self.configuration.batch_size,
VisibilityTimeout=40,
WaitTimeSeconds=20,
)
if response.get("Messages") and len(response.get("Messages")) > 0:
entries = []
for message in response["Messages"]:
try:
body = json.loads(message["Body"])
signal_data = json.loads(body["Message"])
while True:
response = client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=self.configuration.batch_size,
VisibilityTimeout=40,
WaitTimeSeconds=20,
)
if response.get("Messages") and len(response.get("Messages")) > 0:
entries = []
for message in response["Messages"]:
try:
body = json.loads(message["Body"])
signal_data = json.loads(body["Message"])

signal_instance = signal_service.create_signal_instance(
db_session=db_session,
signal_instance_in=SignalInstanceCreate(
project=project, raw=signal_data, **signal_data
),
)
metrics_provider.counter(
"aws-sqs-signal-consumer.signal.received",
tags={
"signalName": signal_instance.signal.name,
"externalId": signal_instance.signal.external_id,
},
)
log.debug(
f"Received signal: SignalName: {signal_instance.signal.name} ExernalId: {signal_instance.signal.external_id}"
)
entries.append(
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}
)
except Exception as e:
log.exception(e)
signal_instance = signal_service.create_signal_instance(
db_session=db_session,
signal_instance_in=SignalInstanceCreate(
project=project, raw=signal_data, **signal_data
),
)
metrics_provider.counter(
"aws-sqs-signal-consumer.signal.received",
tags={
"signalName": signal_instance.signal.name,
"externalId": signal_instance.signal.external_id,
},
)
log.debug(
f"Received signal: SignalName: {signal_instance.signal.name} ExernalId: {signal_instance.signal.external_id}"
)
entries.append(
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}
)
except Exception as e:
log.exception(e)

client.delete_message_batch(QueueUrl=queue_url, Entries=entries)
client.delete_message_batch(QueueUrl=queue_url, Entries=entries)

0 comments on commit 2a7e051

Please sign in to comment.