Skip to content

Commit

Permalink
potential fix for visibility timeout error. added logging for the pub…
Browse files Browse the repository at this point in the history
…lic metric return to better identify how to fix it.

Signed-off-by: Duncan Ragsdale <[email protected]>
  • Loading branch information
Thistleman committed Dec 8, 2023
1 parent 32cdd7f commit b006323
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,24 @@
"file_name",
"run_time",
"data_requirements",
"mean_absolute_error",
"mean_absolute_error_time_series",
"data_sampling_frequency",
"issue"
],
"plots": [
{
"type": "histogram",
"x_val": "mean_absolute_error",
"x_val": "mean_absolute_error_time_series",
"color_code": "issue",
"title": "Time Series MAE Distribution by Issue",
"save_file_path": "mean_absolute_error_dist.png"
"save_file_path": "mean_absolute_error_time_series_dist.png"
},
{
"type": "histogram",
"x_val": "mean_absolute_error",
"x_val": "mean_absolute_error_time_series",
"color_code": "data_sampling_frequency",
"title": "Time Series MAE Distribution by Sampling Frequency",
"save_file_path": "mean_absolute_error_dist.png"
"save_file_path": "mean_absolute_error_time_series_dist.png"
},
{
"type": "histogram",
Expand Down
5 changes: 3 additions & 2 deletions workers/pvinsight-validation-runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ def run(module_to_import_s3_path,
with open(os.path.join(results_dir, config_data['public_results_table']),
'w') as fp:
json.dump(public_metrics_dict, fp)

logger.info(f"public_metrics_dict: {public_metrics_dict}")
# Now generate private results. These will be more specific to the
# type of analysis being run as results will be color-coded by certain
# parameters. These params will be available as columns in the
Expand All @@ -374,8 +376,7 @@ def run(module_to_import_s3_path,
file_metadata,
on='file_name')
# Filter to only the necessary columns (available via the config)
results_df_private = results_df_private[config_data
["private_results_columns"]]
results_df_private = results_df_private[config_data["private_results_columns"]]
results_df_private.to_csv(
os.path.join(results_dir,
module_name + "_full_results.csv"))
Expand Down
26 changes: 25 additions & 1 deletion workers/submission_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import time
import urllib.request
import inspect
import threading
import time

def is_local():
"""
Expand Down Expand Up @@ -673,6 +675,16 @@ def get_analysis_pk():
return int(tag['Value'])
return 1

# function to update visibility timeout, to prevent the error "ReceiptHandle is invalid. Reason: The receipt handle has expired."
def update_visibility_timeout(queue, message, timeout):
while True:
# Update visibility timeout
queue.change_message_visibility(
QueueUrl=queue.url,
ReceiptHandle=message.receipt_handle,
VisibilityTimeout=timeout
)
time.sleep(60) # Adjust the sleep duration as needed

def main():
killer = GracefulKiller()
Expand All @@ -688,21 +700,33 @@ def main():
while True:
messages = queue.receive_messages(
MaxNumberOfMessages=1,
VisibilityTimeout=28800
VisibilityTimeout=43200
)

for message in messages:
logger.info(
"{} Processing message body: {}".format(
WORKER_LOGS_PREFIX, message.body
)
)
print(message.body)

# start a thread to refresh the timeout
t = threading.Thread(target=update_visibility_timeout, args=(queue, message, 43200))
t.start()

process_submission_callback(message.body)

# Let the queue know that the message is processed
message.delete()
logger.info(
"{} Message processed successfully".format(WORKER_LOGS_PREFIX)
)

# stop the thread
t.do_run = False
t.join()
break

if killer.kill_now:
break
Expand Down

0 comments on commit b006323

Please sign in to comment.