Skip to content

Commit

Permalink
Updating logging for AWS SQS queue processing
Browse files Browse the repository at this point in the history
  • Loading branch information
MitchellAV committed Aug 10, 2024
1 parent dff4ffe commit 040c8b4
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,5 @@ workers/current_evaluation


*.pid
*.jsonl
*.jsonl
*.pem
7 changes: 7 additions & 0 deletions AWS_ECS_CLUSTER_W_EC2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Things to take note of

- EC2 that is provisioned needs to have more memory then the task definition within the ECS service
- EC2 4vCPU 16GB
- EC2 Task Definition in ECS 4vCPU 12GB
- Need to check how small the difference can be
- EC2 Needs to have a public IPV4 Address
13 changes: 10 additions & 3 deletions workers/src/submission_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ def get_or_create_sqs_queue(queue_name):
Returns:
Returns the SQS Queue object
"""
logger.info(f"Getting or creating SQS queue: {queue_name}")
# Use the Docker endpoint URL for local development
if IS_LOCAL:
sqs: SQSServiceResource = boto3.resource(
Expand All @@ -530,10 +531,11 @@ def get_or_create_sqs_queue(queue_name):
"sqs", # type: ignore
region_name=os.environ.get("AWS_DEFAULT_REGION", "us-west-2"),
)

logger.info(f"Retrieved SQS resource")
if queue_name == "":
queue_name = "valhub_submission_queue.fifo"
# Check if the queue exists. If no, then create one
logger.info(f"Getting queue by name: {queue_name}")
try:
queue = sqs.get_queue_by_name(QueueName=queue_name)
except botocore.exceptions.ClientError as ex:
Expand All @@ -546,6 +548,7 @@ def get_or_create_sqs_queue(queue_name):
QueueName=queue_name, Attributes={"FifoQueue": "true"}
)

logger.info(f"Queue retrieved")
return queue


Expand Down Expand Up @@ -664,18 +667,22 @@ def main():
f'Starting submission worker to process messages from "valhub_submission_queue.fifo"'
)
queue = get_or_create_sqs_queue("valhub_submission_queue.fifo")
# print(queue)
logger.info(f'Retrieved queue "valhub_submission_queue.fifo"')
logger.info(f"SQS queue URL: {queue.url}")

is_finished = False

logger.info("Listening for messages...")
# infinite loop to listen and process messages
while not is_finished:
logger.info("Polling for messages...")
messages = queue.receive_messages(
MaxNumberOfMessages=1, VisibilityTimeout=43200
)
logger.info(f"Received {len(messages)} messages")

logger.info(f"Processing messages...")
for message in messages:

logger.info(f"Received message: {message.body}")

json_message: dict[str, Any] = json.loads(message.body)
Expand Down

0 comments on commit 040c8b4

Please sign in to comment.