From f9553582c7fa51fde33a50b370cabf7241455b80 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 9 Oct 2024 13:28:32 +0200 Subject: [PATCH 1/8] extracting and decoding token from analysis_parameters.json file --- cdci_data_analysis/flask_app/dispatcher_query.py | 11 ++++++++--- tests/test_job_management.py | 5 ++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 4dfad4e0..2743408e 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -386,7 +386,7 @@ def free_up_space(app): hard_minimum_folder_age_days = app_config.hard_minimum_folder_age_days # let's pass the minimum age the folders to be deleted should have soft_minimum_folder_age_days = request.args.get('soft_minimum_age_days', None) - if soft_minimum_folder_age_days is None or isinstance(soft_minimum_folder_age_days, int): + if soft_minimum_folder_age_days is None: soft_minimum_folder_age_days = app_config.soft_minimum_folder_age_days else: soft_minimum_folder_age_days = int(soft_minimum_folder_age_days) @@ -404,8 +404,13 @@ def free_up_space(app): dict_analysis_parameters = json.load(analysis_parameters_file) token = dict_analysis_parameters.get('token', None) token_expired = False - if token is not None and token['exp'] < current_time_secs: - token_expired = True + # if token is not None and token['exp'] < current_time_secs: + # token_expired = True + if token is not None: + try: + tokenHelper.get_decoded_token(token, secret_key) + except jwt.exceptions.ExpiredSignatureError: + token_expired = True job_monitor_path = os.path.join(scratch_dir, 'job_monitor.json') with open(job_monitor_path, 'r') as jm_file: diff --git a/tests/test_job_management.py b/tests/test_job_management.py index 4e1bf1d9..d23e391a 100644 --- a/tests/test_job_management.py +++ b/tests/test_job_management.py @@ -2622,6 +2622,8 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m "exp": int(time.time()) - 15 } + expired_token_encoded = jwt.encode(expired_token, secret_key, algorithm='HS256') + params = { 'query_status': 'new', 'product_type': 'dummy', @@ -2654,7 +2656,8 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m analysis_parameters_path = os.path.join(scratch_dir, 'analysis_parameters.json') with open(analysis_parameters_path) as analysis_parameters_file: dict_analysis_parameters = json.load(analysis_parameters_file) - dict_analysis_parameters['token'] = expired_token + # dict_analysis_parameters['token'] = expired_token + dict_analysis_parameters['token'] = expired_token_encoded with open(analysis_parameters_path, 'w') as dict_analysis_parameters_outfile: my_json_str = json.dumps(dict_analysis_parameters, indent=4) dict_analysis_parameters_outfile.write(u'%s' % my_json_str) From b43562c1dd5ac72d18a0186ba1356cfdb296a95a Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 9 Oct 2024 16:02:32 +0200 Subject: [PATCH 2/8] delete lock files when too old --- .../flask_app/dispatcher_query.py | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 2743408e..c908add5 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -394,6 +394,14 @@ def free_up_space(app): list_scratch_dir = sorted(glob.glob("scratch_sid_*_jid_*"), key=os.path.getmtime) list_scratch_dir_to_delete = [] + list_lock_files = sorted(glob.glob(".lock_*")) + list_lock_files_to_delete = [] + + for l in list_lock_files: + time_from_last_access = ((current_time_secs - os.path.getatime(l)) / 60 * 60 * 24) + if time_from_last_access >= hard_minimum_folder_age_days: + list_lock_files_to_delete.append(l) + for scratch_dir in list_scratch_dir: scratch_dir_age_days = (current_time_secs - os.path.getmtime(scratch_dir)) / (60 * 60 * 24) if scratch_dir_age_days >= hard_minimum_folder_age_days: @@ -419,6 +427,8 @@ def free_up_space(app): job_id = monitor['job_id'] if job_status == 'done' and (token is None or token_expired): list_scratch_dir_to_delete.append(scratch_dir) + if os.path.exists(f".lock_{job_id}"): + list_lock_files_to_delete.append(f".lock_{job_id}") else: incomplete_job_alert_message = f"The job {job_id} is yet to complete despite being older "\ f"than {soft_minimum_folder_age_days} days. This has been detected "\ @@ -438,13 +448,19 @@ def free_up_space(app): for d in list_scratch_dir_to_delete: shutil.rmtree(d) + for l in list_lock_files_to_delete: + os.remove(l) + post_clean_space_space = shutil.disk_usage(os.getcwd()) post_clean_available_space = format_size(post_clean_space_space.free, format_returned='M') list_scratch_dir = sorted(glob.glob("scratch_sid_*_jid_*")) - logger.info(f"Number of scratch folder after clean-up: {len(list_scratch_dir)}.\n" - f"Removed {len(list_scratch_dir_to_delete)} scratch directories, " - f"and now the available amount of space is {post_clean_available_space}") + list_lock_files = sorted(glob.glob(".lock_*")) + logger.info(f"Number of scratch folder after clean-up: {len(list_scratch_dir)}, " + f"number of lock files after clean-up: {len(list_lock_files)}.\n" + f"Removed {len(list_scratch_dir_to_delete)} scratch directories " + f"and {len(list_lock_files_to_delete)} lock files.\n" + f"Now the available amount of space is {post_clean_available_space}") result_scratch_dir_deletion = f"Removed {len(list_scratch_dir_to_delete)} scratch directories" logger.info(result_scratch_dir_deletion) From 61bd4e4e20b1820ec9a0cfdd69009a0cbfe487f0 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 9 Oct 2024 17:31:18 +0200 Subject: [PATCH 3/8] remove temp lock file --- .../flask_app/dispatcher_query.py | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index c908add5..819c3c2e 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -135,6 +135,7 @@ def __init__(self, app, self.app = app temp_scratch_dir = None + temp_job_id = None self.set_sentry_sdk(getattr(self.app.config.get('conf'), 'sentry_url', None)) @@ -283,6 +284,7 @@ def __init__(self, app, self.set_scratch_dir(self.par_dic['session_id'], job_id=self.job_id, verbose=verbose) # temp_job_id = self.job_id temp_scratch_dir = self.scratch_dir + temp_job_id = self.job_id if not data_server_call_back: try: self.set_temp_dir(self.par_dic['session_id'], verbose=verbose) @@ -363,7 +365,7 @@ def __init__(self, app, finally: self.logger.info("==> clean-up temporary directory") self.log_query_progression("before clear_temp_dir") - self.clear_temp_dir(temp_scratch_dir=temp_scratch_dir) + self.clear_temp_dir(temp_scratch_dir=temp_scratch_dir, temp_job_id=temp_job_id) self.log_query_progression("after clear_temp_dir") logger.info("constructed %s:%s for data_server_call_back=%s", self.__class__, self, data_server_call_back) @@ -394,13 +396,13 @@ def free_up_space(app): list_scratch_dir = sorted(glob.glob("scratch_sid_*_jid_*"), key=os.path.getmtime) list_scratch_dir_to_delete = [] - list_lock_files = sorted(glob.glob(".lock_*")) - list_lock_files_to_delete = [] + list_lock_files = sorted(glob.glob(".lock_*"), key=os.path.getatime) + list_lock_files_to_delete = set() for l in list_lock_files: time_from_last_access = ((current_time_secs - os.path.getatime(l)) / 60 * 60 * 24) if time_from_last_access >= hard_minimum_folder_age_days: - list_lock_files_to_delete.append(l) + list_lock_files_to_delete.add(l) for scratch_dir in list_scratch_dir: scratch_dir_age_days = (current_time_secs - os.path.getmtime(scratch_dir)) / (60 * 60 * 24) @@ -427,8 +429,9 @@ def free_up_space(app): job_id = monitor['job_id'] if job_status == 'done' and (token is None or token_expired): list_scratch_dir_to_delete.append(scratch_dir) - if os.path.exists(f".lock_{job_id}"): - list_lock_files_to_delete.append(f".lock_{job_id}") + lock_file_name = f".lock_{job_id}" + if os.path.exists(lock_file_name): + list_lock_files_to_delete.add(lock_file_name) else: incomplete_job_alert_message = f"The job {job_id} is yet to complete despite being older "\ f"than {soft_minimum_folder_age_days} days. This has been detected "\ @@ -462,7 +465,8 @@ def free_up_space(app): f"and {len(list_lock_files_to_delete)} lock files.\n" f"Now the available amount of space is {post_clean_available_space}") - result_scratch_dir_deletion = f"Removed {len(list_scratch_dir_to_delete)} scratch directories" + result_scratch_dir_deletion = f"Removed {len(list_scratch_dir_to_delete)} scratch directories, " \ + f"and {len(list_lock_files_to_delete)} lock files. " logger.info(result_scratch_dir_deletion) return jsonify(dict(output_status=result_scratch_dir_deletion)) @@ -972,11 +976,14 @@ def move_temp_content(self): file_full_path = os.path.join(self.temp_dir, f) shutil.copy(file_full_path, self.scratch_dir) - def clear_temp_dir(self, temp_scratch_dir=None): + def clear_temp_dir(self, temp_scratch_dir=None, temp_job_id=None): if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) if temp_scratch_dir is not None and temp_scratch_dir != self.scratch_dir and os.path.exists(temp_scratch_dir): shutil.rmtree(temp_scratch_dir) + if temp_job_id is not None and os.path.exists(f".lock_{temp_job_id}"): + os.remove(f".lock_{temp_job_id}") + @staticmethod def validated_download_file_path(basepath, filename, should_exist=True): From 0d30cc84ee358b8008c765d66760ac7f014ac1a8 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 9 Oct 2024 18:26:46 +0200 Subject: [PATCH 4/8] deleting lock file only if no scratch dirs for that job_id are left --- .../flask_app/dispatcher_query.py | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 819c3c2e..0d904c96 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -399,11 +399,6 @@ def free_up_space(app): list_lock_files = sorted(glob.glob(".lock_*"), key=os.path.getatime) list_lock_files_to_delete = set() - for l in list_lock_files: - time_from_last_access = ((current_time_secs - os.path.getatime(l)) / 60 * 60 * 24) - if time_from_last_access >= hard_minimum_folder_age_days: - list_lock_files_to_delete.add(l) - for scratch_dir in list_scratch_dir: scratch_dir_age_days = (current_time_secs - os.path.getmtime(scratch_dir)) / (60 * 60 * 24) if scratch_dir_age_days >= hard_minimum_folder_age_days: @@ -429,9 +424,6 @@ def free_up_space(app): job_id = monitor['job_id'] if job_status == 'done' and (token is None or token_expired): list_scratch_dir_to_delete.append(scratch_dir) - lock_file_name = f".lock_{job_id}" - if os.path.exists(lock_file_name): - list_lock_files_to_delete.add(lock_file_name) else: incomplete_job_alert_message = f"The job {job_id} is yet to complete despite being older "\ f"than {soft_minimum_folder_age_days} days. This has been detected "\ @@ -451,8 +443,13 @@ def free_up_space(app): for d in list_scratch_dir_to_delete: shutil.rmtree(d) - for l in list_lock_files_to_delete: - os.remove(l) + num_lock_files_removed = 0 + for l in list_lock_files: + lock_file_job_id = l.split('_')[-1] + list_job_id_scratch_dir = glob.glob(f"scratch_sid_*_jid_{lock_file_job_id}*") + if len(list_job_id_scratch_dir) == 0: + os.remove(l) + num_lock_files_removed += 1 post_clean_space_space = shutil.disk_usage(os.getcwd()) post_clean_available_space = format_size(post_clean_space_space.free, format_returned='M') @@ -462,11 +459,11 @@ def free_up_space(app): logger.info(f"Number of scratch folder after clean-up: {len(list_scratch_dir)}, " f"number of lock files after clean-up: {len(list_lock_files)}.\n" f"Removed {len(list_scratch_dir_to_delete)} scratch directories " - f"and {len(list_lock_files_to_delete)} lock files.\n" + f"and {num_lock_files_removed} lock files.\n" f"Now the available amount of space is {post_clean_available_space}") result_scratch_dir_deletion = f"Removed {len(list_scratch_dir_to_delete)} scratch directories, " \ - f"and {len(list_lock_files_to_delete)} lock files. " + f"and {num_lock_files_removed} lock files." logger.info(result_scratch_dir_deletion) return jsonify(dict(output_status=result_scratch_dir_deletion)) From 8f088af9d6457639ffd667db6509f48db36142fd Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 9 Oct 2024 18:26:55 +0200 Subject: [PATCH 5/8] extended test --- cdci_data_analysis/pytest_fixtures.py | 9 +++++++++ tests/test_job_management.py | 16 +++++++++------- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/cdci_data_analysis/pytest_fixtures.py b/cdci_data_analysis/pytest_fixtures.py index 74b91092..0695158f 100644 --- a/cdci_data_analysis/pytest_fixtures.py +++ b/cdci_data_analysis/pytest_fixtures.py @@ -1568,6 +1568,15 @@ def remove_scratch_folders(job_id=None): for d in dir_list: shutil.rmtree(d) + @staticmethod + def remove_lock_files(job_id=None): + if job_id is None: + lock_files = glob.glob('.lock_*') + else: + lock_files = glob.glob(f'.lock_{job_id}') + for f in lock_files: + os.remove(f) + @staticmethod def remove_download_folders(id=None): if id is None: diff --git a/tests/test_job_management.py b/tests/test_job_management.py index d23e391a..2d5441dd 100644 --- a/tests/test_job_management.py +++ b/tests/test_job_management.py @@ -2605,6 +2605,7 @@ def test_email_t1_t2(dispatcher_long_living_fixture, ("hard_minimum_folder_age_days", 60)], indirect=True) def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_minimum_age_days): DispatcherJobState.remove_scratch_folders() + DispatcherJobState.remove_lock_files() server = dispatcher_live_fixture @@ -2635,11 +2636,11 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m number_analysis_to_run = 8 for i in range(number_analysis_to_run): - ask(server, - params, - expected_query_status=["done"], - max_time_s=150 - ) + jdata = ask(server, + params, + expected_query_status=["done"], + max_time_s=150 + ) list_scratch_dir = sorted(glob.glob("scratch_sid_*_jid_*"), key=os.path.getmtime) @@ -2662,7 +2663,6 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m my_json_str = json.dumps(dict_analysis_parameters, indent=4) dict_analysis_parameters_outfile.write(u'%s' % my_json_str) - params = { 'token': encoded_token, 'soft_minimum_age_days': soft_minimum_age_days @@ -2677,7 +2677,9 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m assert 'output_status' in jdata - assert jdata['output_status'] == f"Removed {number_folders_to_delete} scratch directories" + number_lock_files_deleted = 0 if number_folders_to_delete < number_analysis_to_run else 1 + assert jdata['output_status'] == (f"Removed {number_folders_to_delete} scratch directories, " + f"and {number_lock_files_deleted} lock files.") assert len(glob.glob("scratch_sid_*_jid_*")) == number_analysis_to_run - number_folders_to_delete From e8161ff661477f00f6a716ffce3629694c07c39b Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 10 Oct 2024 14:05:47 +0200 Subject: [PATCH 6/8] not needed code --- cdci_data_analysis/flask_app/dispatcher_query.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 0d904c96..785466f9 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -396,9 +396,6 @@ def free_up_space(app): list_scratch_dir = sorted(glob.glob("scratch_sid_*_jid_*"), key=os.path.getmtime) list_scratch_dir_to_delete = [] - list_lock_files = sorted(glob.glob(".lock_*"), key=os.path.getatime) - list_lock_files_to_delete = set() - for scratch_dir in list_scratch_dir: scratch_dir_age_days = (current_time_secs - os.path.getmtime(scratch_dir)) / (60 * 60 * 24) if scratch_dir_age_days >= hard_minimum_folder_age_days: @@ -443,6 +440,7 @@ def free_up_space(app): for d in list_scratch_dir_to_delete: shutil.rmtree(d) + list_lock_files = sorted(glob.glob(".lock_*"), key=os.path.getatime) num_lock_files_removed = 0 for l in list_lock_files: lock_file_job_id = l.split('_')[-1] From b53c6f32e76fed1a6bc2966f9daf6c2c988f98e7 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Fri, 11 Oct 2024 11:22:35 +0200 Subject: [PATCH 7/8] not needed code --- tests/test_job_management.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_job_management.py b/tests/test_job_management.py index 2d5441dd..1e09ed2c 100644 --- a/tests/test_job_management.py +++ b/tests/test_job_management.py @@ -2636,11 +2636,11 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m number_analysis_to_run = 8 for i in range(number_analysis_to_run): - jdata = ask(server, - params, - expected_query_status=["done"], - max_time_s=150 - ) + ask(server, + params, + expected_query_status=["done"], + max_time_s=150 + ) list_scratch_dir = sorted(glob.glob("scratch_sid_*_jid_*"), key=os.path.getmtime) From 9b47089412286a4a0e342af74111361f7545a3b5 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 14 Oct 2024 12:27:49 +0200 Subject: [PATCH 8/8] not needed code --- cdci_data_analysis/flask_app/dispatcher_query.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 785466f9..11baccce 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -406,8 +406,6 @@ def free_up_space(app): dict_analysis_parameters = json.load(analysis_parameters_file) token = dict_analysis_parameters.get('token', None) token_expired = False - # if token is not None and token['exp'] < current_time_secs: - # token_expired = True if token is not None: try: tokenHelper.get_decoded_token(token, secret_key)