diff --git a/.gitignore b/.gitignore index dadbf521..761600cf 100644 --- a/.gitignore +++ b/.gitignore @@ -164,4 +164,5 @@ workers/current_evaluation *.pid -*.jsonl \ No newline at end of file +*.jsonl +*.pem \ No newline at end of file diff --git a/AWS_ECS_CLUSTER_W_EC2.md b/AWS_ECS_CLUSTER_W_EC2.md new file mode 100644 index 00000000..bf6bc275 --- /dev/null +++ b/AWS_ECS_CLUSTER_W_EC2.md @@ -0,0 +1,7 @@ +# Things to take note of + +- EC2 that is provisioned needs to have more memory then the task definition within the ECS service + - EC2 4vCPU 16GB + - EC2 Task Definition in ECS 4vCPU 12GB + - Need to check how small the difference can be +- EC2 Needs to have a public IPV4 Address diff --git a/pv-validation-hub-client b/pv-validation-hub-client index adbd5225..c144e18d 160000 --- a/pv-validation-hub-client +++ b/pv-validation-hub-client @@ -1 +1 @@ -Subproject commit adbd5225148b0514ebde8d9190571136a99beac3 +Subproject commit c144e18d982ed4274dc990512d4ad5e902b65a09 diff --git a/workers/src/submission_worker.py b/workers/src/submission_worker.py index 2afd81e2..9f0ba8b8 100644 --- a/workers/src/submission_worker.py +++ b/workers/src/submission_worker.py @@ -514,6 +514,7 @@ def get_or_create_sqs_queue(queue_name): Returns: Returns the SQS Queue object """ + logger.info(f"Getting or creating SQS queue: {queue_name}") # Use the Docker endpoint URL for local development if IS_LOCAL: sqs: SQSServiceResource = boto3.resource( @@ -530,10 +531,11 @@ def get_or_create_sqs_queue(queue_name): "sqs", # type: ignore region_name=os.environ.get("AWS_DEFAULT_REGION", "us-west-2"), ) - + logger.info(f"Retrieved SQS resource") if queue_name == "": queue_name = "valhub_submission_queue.fifo" # Check if the queue exists. If no, then create one + logger.info(f"Getting queue by name: {queue_name}") try: queue = sqs.get_queue_by_name(QueueName=queue_name) except botocore.exceptions.ClientError as ex: @@ -546,6 +548,7 @@ def get_or_create_sqs_queue(queue_name): QueueName=queue_name, Attributes={"FifoQueue": "true"} ) + logger.info(f"Queue retrieved") return queue @@ -664,18 +667,22 @@ def main(): f'Starting submission worker to process messages from "valhub_submission_queue.fifo"' ) queue = get_or_create_sqs_queue("valhub_submission_queue.fifo") - # print(queue) + logger.info(f'Retrieved queue "valhub_submission_queue.fifo"') + logger.info(f"SQS queue URL: {queue.url}") is_finished = False + logger.info("Listening for messages...") # infinite loop to listen and process messages while not is_finished: + logger.info("Polling for messages...") messages = queue.receive_messages( MaxNumberOfMessages=1, VisibilityTimeout=43200 ) + logger.info(f"Received {len(messages)} messages") + logger.info(f"Processing messages...") for message in messages: - logger.info(f"Received message: {message.body}") json_message: dict[str, Any] = json.loads(message.body)