From 23c4a2745b353a4710c1a192e2b4c53c510dd376 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 15 Apr 2024 16:49:56 +0200 Subject: [PATCH 1/9] read and store return-progress output on a dedicated output file --- .../flask_app/dispatcher_query.py | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 3c9244d0..7249b0df 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -1452,8 +1452,16 @@ def build_dispatcher_response(self, if query_new_status is not None: out_dict['query_status'] = query_new_status if query_out is not None: - out_dict['products'] = query_out.prod_dictionary - out_dict['exit_status'] = query_out.status_dictionary + if self.return_progress: + out_dict['return_progress_products'] = query_out.prod_dictionary + out_dict['return_progress_exit_status'] = query_out.status_dictionary + with open(self.response_filename, "r") as query_output_f: + query_output_file_content = json.load(query_output_f) + out_dict['products'] = query_output_file_content['prod_dictionary'] + out_dict['exit_status'] = query_output_file_content['status_dictionary'] + else: + out_dict['products'] = query_out.prod_dictionary + out_dict['exit_status'] = query_out.status_dictionary if getattr(self.instrument, 'unknown_arguments_name_list', []): if len(self.instrument.unknown_arguments_name_list) == 1: comment = f'Please note that argument {self.instrument.unknown_arguments_name_list[0]} is not used' @@ -1820,13 +1828,16 @@ def store_response(self, query_out, job_monitor): if os.path.exists(self.response_filename): if not os.path.exists(self.query_log_dir): os.makedirs(self.query_log_dir) - os.rename(self.response_filename, self.response_log_filename) - self.logger.info("renamed query log log %s => %s", - self.response_filename, self.response_log_filename) + if not self.return_progress: + os.rename(self.response_filename, self.response_log_filename) + self.logger.info("renamed query log log %s => %s", + self.response_filename, self.response_log_filename) - query_out.serialize(open(self.response_filename, "w")) - json.dump(job_monitor, open( - self.response_filename + ".job-monitor", "w")) + if self.return_progress: + query_out.serialize(open(self.response_filename + ".return-progress-query-output", "w")) + else: + query_out.serialize(open(self.response_filename, "w")) + json.dump(job_monitor, open(self.response_filename + ".job-monitor", "w")) def load_config(self): try: From ecf3c9df3e39923c50f47bd288a56ae756354423 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 15 Apr 2024 17:13:04 +0200 Subject: [PATCH 2/9] dedicated output --- cdci_data_analysis/flask_app/dispatcher_query.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 7249b0df..33f361cd 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -1455,10 +1455,6 @@ def build_dispatcher_response(self, if self.return_progress: out_dict['return_progress_products'] = query_out.prod_dictionary out_dict['return_progress_exit_status'] = query_out.status_dictionary - with open(self.response_filename, "r") as query_output_f: - query_output_file_content = json.load(query_output_f) - out_dict['products'] = query_output_file_content['prod_dictionary'] - out_dict['exit_status'] = query_output_file_content['status_dictionary'] else: out_dict['products'] = query_out.prod_dictionary out_dict['exit_status'] = query_out.status_dictionary @@ -1467,8 +1463,9 @@ def build_dispatcher_response(self, comment = f'Please note that argument {self.instrument.unknown_arguments_name_list[0]} is not used' else: comment = f'Please note that arguments {", ".join(self.instrument.unknown_arguments_name_list)} are not used' - out_dict['exit_status']['comment'] = \ - out_dict['exit_status']['comment'] + ' ' + comment if out_dict['exit_status']['comment'] else comment + if not self.return_progress: + out_dict['exit_status']['comment'] = \ + out_dict['exit_status']['comment'] + ' ' + comment if out_dict['exit_status']['comment'] else comment if job_monitor is not None: out_dict['job_monitor'] = job_monitor From 9201baae7818b746b69e5f246ec9465807bacf90 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 15 Apr 2024 17:13:10 +0200 Subject: [PATCH 3/9] adapted test --- tests/test_server_basic.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/test_server_basic.py b/tests/test_server_basic.py index 535a87c7..97dfbd2f 100644 --- a/tests/test_server_basic.py +++ b/tests/test_server_basic.py @@ -1651,11 +1651,12 @@ def test_empty_async_return_progress_instrument_request(dispatcher_live_fixture, logger.info("Json output content") logger.info(json.dumps(jdata, indent=4)) - assert jdata["exit_status"]["debug_message"] == "" - assert jdata["exit_status"]["error_message"] == "" - assert jdata["exit_status"]["message"] == "" + assert 'return_progress_exit_status' in jdata + assert jdata["return_progress_exit_status"]["debug_message"] == "" + assert jdata["return_progress_exit_status"]["error_message"] == "" + assert jdata["return_progress_exit_status"]["message"] == "" - assert jdata["products"]["p"] == 5 + assert jdata["return_progress_products"]["p"] == 5 params.pop("return_progress", None) ReturnProgressProductQuery.set_p_value(15) From d9a080d1a8c5af82625c8e3611b5bfd1313e71e4 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 15 Apr 2024 18:27:17 +0200 Subject: [PATCH 4/9] no update job_monitor if return_progress --- .../flask_app/dispatcher_query.py | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 33f361cd..3be37d29 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -1467,12 +1467,19 @@ def build_dispatcher_response(self, out_dict['exit_status']['comment'] = \ out_dict['exit_status']['comment'] + ' ' + comment if out_dict['exit_status']['comment'] else comment - if job_monitor is not None: - out_dict['job_monitor'] = job_monitor - out_dict['job_status'] = job_monitor['status'] + if not self.return_progress: + if job_monitor is not None: + out_dict['job_monitor'] = job_monitor + out_dict['job_status'] = job_monitor['status'] + else: + job_monitor_path = self.response_filename + ".job-monitor" + with open(job_monitor_path, 'r') as f: + job_monitor_content = json.load(f) + out_dict['job_monitor'] = job_monitor_content + out_dict['job_status'] = job_monitor_content['status'] - if job_monitor is not None: - out_dict['job_monitor'] = job_monitor + # if job_monitor is not None: + # out_dict['job_monitor'] = job_monitor out_dict['session_id'] = self.par_dic['session_id'] @@ -2294,8 +2301,8 @@ def run_query(self, off_line=False, disp_conf=None): else: query_new_status = 'failed' job.set_failed() - - job.write_dataserver_status() + if not self.return_progress: + job.write_dataserver_status() print('-----------------> query status update for done/ready: ', query_new_status) @@ -2354,7 +2361,7 @@ def run_query(self, off_line=False, disp_conf=None): self.logger.info( '==============================> query done <==============================') - if not job_is_aliased and query_status != query_new_status: + if not job_is_aliased and query_status != query_new_status and not self.return_progress: job.write_dataserver_status() if not self.async_dispatcher: From 4333cfd6ce21e96ce2be110cb521c10c48b30395 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 15 Apr 2024 18:38:30 +0200 Subject: [PATCH 5/9] no job monitor alteration in case return_progress --- cdci_data_analysis/flask_app/dispatcher_query.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 3be37d29..b5e47810 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -1473,10 +1473,11 @@ def build_dispatcher_response(self, out_dict['job_status'] = job_monitor['status'] else: job_monitor_path = self.response_filename + ".job-monitor" - with open(job_monitor_path, 'r') as f: - job_monitor_content = json.load(f) - out_dict['job_monitor'] = job_monitor_content - out_dict['job_status'] = job_monitor_content['status'] + if os.path.exists(job_monitor_path): + with open(job_monitor_path, 'r') as f: + job_monitor_content = json.load(f) + out_dict['job_monitor'] = job_monitor_content + out_dict['job_status'] = job_monitor_content['status'] # if job_monitor is not None: # out_dict['job_monitor'] = job_monitor From 6ecfebcc3509cc68555dbbd02cab7d2db8ba60a7 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 15 Apr 2024 21:29:55 +0200 Subject: [PATCH 6/9] dedicated output job status --- .../flask_app/dispatcher_query.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index b5e47810..4326a6eb 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -1472,15 +1472,15 @@ def build_dispatcher_response(self, out_dict['job_monitor'] = job_monitor out_dict['job_status'] = job_monitor['status'] else: - job_monitor_path = self.response_filename + ".job-monitor" - if os.path.exists(job_monitor_path): - with open(job_monitor_path, 'r') as f: - job_monitor_content = json.load(f) - out_dict['job_monitor'] = job_monitor_content - out_dict['job_status'] = job_monitor_content['status'] - - # if job_monitor is not None: - # out_dict['job_monitor'] = job_monitor + if job_monitor is not None: + out_dict['return_progress_job_monitor'] = job_monitor + out_dict['return_progress_job_status'] = job_monitor['status'] + # job_monitor_path = self.response_filename + ".job-monitor" + # if os.path.exists(job_monitor_path): + # with open(job_monitor_path, 'r') as f: + # job_monitor_content = json.load(f) + # out_dict['job_monitor'] = job_monitor_content + # out_dict['job_status'] = job_monitor_content['status'] out_dict['session_id'] = self.par_dic['session_id'] @@ -1840,6 +1840,7 @@ def store_response(self, query_out, job_monitor): if self.return_progress: query_out.serialize(open(self.response_filename + ".return-progress-query-output", "w")) + json.dump(job_monitor, open(self.response_filename + ".return-progress-job-monitor", "w")) else: query_out.serialize(open(self.response_filename, "w")) json.dump(job_monitor, open(self.response_filename + ".job-monitor", "w")) From 9a347ed7c6e174a8bf27b30de0c89c4ee090a7d6 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 15 Apr 2024 22:09:03 +0200 Subject: [PATCH 7/9] no comments --- cdci_data_analysis/flask_app/dispatcher_query.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 4326a6eb..c0116cf1 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -1475,12 +1475,6 @@ def build_dispatcher_response(self, if job_monitor is not None: out_dict['return_progress_job_monitor'] = job_monitor out_dict['return_progress_job_status'] = job_monitor['status'] - # job_monitor_path = self.response_filename + ".job-monitor" - # if os.path.exists(job_monitor_path): - # with open(job_monitor_path, 'r') as f: - # job_monitor_content = json.load(f) - # out_dict['job_monitor'] = job_monitor_content - # out_dict['job_status'] = job_monitor_content['status'] out_dict['session_id'] = self.par_dic['session_id'] From f189228e0fdbc1f535f78b1d274e46d193f3baf9 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 22 Apr 2024 18:20:06 +0200 Subject: [PATCH 8/9] restablished previous behavior query_output --- .../flask_app/dispatcher_query.py | 36 +++++++------------ 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index c0116cf1..f93a6856 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -1452,29 +1452,19 @@ def build_dispatcher_response(self, if query_new_status is not None: out_dict['query_status'] = query_new_status if query_out is not None: - if self.return_progress: - out_dict['return_progress_products'] = query_out.prod_dictionary - out_dict['return_progress_exit_status'] = query_out.status_dictionary - else: - out_dict['products'] = query_out.prod_dictionary - out_dict['exit_status'] = query_out.status_dictionary + out_dict['products'] = query_out.prod_dictionary + out_dict['exit_status'] = query_out.status_dictionary if getattr(self.instrument, 'unknown_arguments_name_list', []): if len(self.instrument.unknown_arguments_name_list) == 1: comment = f'Please note that argument {self.instrument.unknown_arguments_name_list[0]} is not used' else: comment = f'Please note that arguments {", ".join(self.instrument.unknown_arguments_name_list)} are not used' - if not self.return_progress: - out_dict['exit_status']['comment'] = \ - out_dict['exit_status']['comment'] + ' ' + comment if out_dict['exit_status']['comment'] else comment + out_dict['exit_status']['comment'] = \ + out_dict['exit_status']['comment'] + ' ' + comment if out_dict['exit_status']['comment'] else comment - if not self.return_progress: - if job_monitor is not None: - out_dict['job_monitor'] = job_monitor - out_dict['job_status'] = job_monitor['status'] - else: - if job_monitor is not None: - out_dict['return_progress_job_monitor'] = job_monitor - out_dict['return_progress_job_status'] = job_monitor['status'] + if job_monitor is not None: + out_dict['job_monitor'] = job_monitor + out_dict['job_status'] = job_monitor['status'] out_dict['session_id'] = self.par_dic['session_id'] @@ -1824,20 +1814,20 @@ def request_query_out(self, overwrite=False): def store_response(self, query_out, job_monitor): self.logger.info("storing query output: %s, %s", self.response_filename, self.response_log_filename) + if os.path.exists(self.response_filename): if not os.path.exists(self.query_log_dir): os.makedirs(self.query_log_dir) - if not self.return_progress: - os.rename(self.response_filename, self.response_log_filename) - self.logger.info("renamed query log log %s => %s", - self.response_filename, self.response_log_filename) + # if not self.return_progress: + os.rename(self.response_filename, self.response_log_filename) + self.logger.info("renamed query log log %s => %s", + self.response_filename, self.response_log_filename) if self.return_progress: - query_out.serialize(open(self.response_filename + ".return-progress-query-output", "w")) json.dump(job_monitor, open(self.response_filename + ".return-progress-job-monitor", "w")) else: - query_out.serialize(open(self.response_filename, "w")) json.dump(job_monitor, open(self.response_filename + ".job-monitor", "w")) + query_out.serialize(open(self.response_filename, "w")) def load_config(self): try: From 53dd71aa5b5d67bd696534293eeb82edc9130e73 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 22 Apr 2024 18:25:44 +0200 Subject: [PATCH 9/9] re-adapted test --- tests/test_server_basic.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/test_server_basic.py b/tests/test_server_basic.py index 97dfbd2f..535a87c7 100644 --- a/tests/test_server_basic.py +++ b/tests/test_server_basic.py @@ -1651,12 +1651,11 @@ def test_empty_async_return_progress_instrument_request(dispatcher_live_fixture, logger.info("Json output content") logger.info(json.dumps(jdata, indent=4)) - assert 'return_progress_exit_status' in jdata - assert jdata["return_progress_exit_status"]["debug_message"] == "" - assert jdata["return_progress_exit_status"]["error_message"] == "" - assert jdata["return_progress_exit_status"]["message"] == "" + assert jdata["exit_status"]["debug_message"] == "" + assert jdata["exit_status"]["error_message"] == "" + assert jdata["exit_status"]["message"] == "" - assert jdata["return_progress_products"]["p"] == 5 + assert jdata["products"]["p"] == 5 params.pop("return_progress", None) ReturnProgressProductQuery.set_p_value(15)