Skip to content

Commit

Permalink
Merge pull request #716 from oda-hub/extend-free-up-space
Browse files Browse the repository at this point in the history
Extend free up space and propery check token expiration
  • Loading branch information
burnout87 authored Oct 15, 2024
2 parents a4068ea + 9b47089 commit f3e0e20
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 13 deletions.
41 changes: 31 additions & 10 deletions cdci_data_analysis/flask_app/dispatcher_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def __init__(self, app,
self.app = app

temp_scratch_dir = None
temp_job_id = None

self.set_sentry_sdk(getattr(self.app.config.get('conf'), 'sentry_url', None))

Expand Down Expand Up @@ -283,6 +284,7 @@ def __init__(self, app,
self.set_scratch_dir(self.par_dic['session_id'], job_id=self.job_id, verbose=verbose)
# temp_job_id = self.job_id
temp_scratch_dir = self.scratch_dir
temp_job_id = self.job_id
if not data_server_call_back:
try:
self.set_temp_dir(self.par_dic['session_id'], verbose=verbose)
Expand Down Expand Up @@ -363,7 +365,7 @@ def __init__(self, app,
finally:
self.logger.info("==> clean-up temporary directory")
self.log_query_progression("before clear_temp_dir")
self.clear_temp_dir(temp_scratch_dir=temp_scratch_dir)
self.clear_temp_dir(temp_scratch_dir=temp_scratch_dir, temp_job_id=temp_job_id)
self.log_query_progression("after clear_temp_dir")

logger.info("constructed %s:%s for data_server_call_back=%s", self.__class__, self, data_server_call_back)
Expand All @@ -386,7 +388,7 @@ def free_up_space(app):
hard_minimum_folder_age_days = app_config.hard_minimum_folder_age_days
# let's pass the minimum age the folders to be deleted should have
soft_minimum_folder_age_days = request.args.get('soft_minimum_age_days', None)
if soft_minimum_folder_age_days is None or isinstance(soft_minimum_folder_age_days, int):
if soft_minimum_folder_age_days is None:
soft_minimum_folder_age_days = app_config.soft_minimum_folder_age_days
else:
soft_minimum_folder_age_days = int(soft_minimum_folder_age_days)
Expand All @@ -404,8 +406,11 @@ def free_up_space(app):
dict_analysis_parameters = json.load(analysis_parameters_file)
token = dict_analysis_parameters.get('token', None)
token_expired = False
if token is not None and token['exp'] < current_time_secs:
token_expired = True
if token is not None:
try:
tokenHelper.get_decoded_token(token, secret_key)
except jwt.exceptions.ExpiredSignatureError:
token_expired = True

job_monitor_path = os.path.join(scratch_dir, 'job_monitor.json')
with open(job_monitor_path, 'r') as jm_file:
Expand Down Expand Up @@ -433,15 +438,28 @@ def free_up_space(app):
for d in list_scratch_dir_to_delete:
shutil.rmtree(d)

list_lock_files = sorted(glob.glob(".lock_*"), key=os.path.getatime)
num_lock_files_removed = 0
for l in list_lock_files:
lock_file_job_id = l.split('_')[-1]
list_job_id_scratch_dir = glob.glob(f"scratch_sid_*_jid_{lock_file_job_id}*")
if len(list_job_id_scratch_dir) == 0:
os.remove(l)
num_lock_files_removed += 1

post_clean_space_space = shutil.disk_usage(os.getcwd())
post_clean_available_space = format_size(post_clean_space_space.free, format_returned='M')

list_scratch_dir = sorted(glob.glob("scratch_sid_*_jid_*"))
logger.info(f"Number of scratch folder after clean-up: {len(list_scratch_dir)}.\n"
f"Removed {len(list_scratch_dir_to_delete)} scratch directories, "
f"and now the available amount of space is {post_clean_available_space}")

result_scratch_dir_deletion = f"Removed {len(list_scratch_dir_to_delete)} scratch directories"
list_lock_files = sorted(glob.glob(".lock_*"))
logger.info(f"Number of scratch folder after clean-up: {len(list_scratch_dir)}, "
f"number of lock files after clean-up: {len(list_lock_files)}.\n"
f"Removed {len(list_scratch_dir_to_delete)} scratch directories "
f"and {num_lock_files_removed} lock files.\n"
f"Now the available amount of space is {post_clean_available_space}")

result_scratch_dir_deletion = f"Removed {len(list_scratch_dir_to_delete)} scratch directories, " \
f"and {num_lock_files_removed} lock files."
logger.info(result_scratch_dir_deletion)

return jsonify(dict(output_status=result_scratch_dir_deletion))
Expand Down Expand Up @@ -951,11 +969,14 @@ def move_temp_content(self):
file_full_path = os.path.join(self.temp_dir, f)
shutil.copy(file_full_path, self.scratch_dir)

def clear_temp_dir(self, temp_scratch_dir=None):
def clear_temp_dir(self, temp_scratch_dir=None, temp_job_id=None):
if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
if temp_scratch_dir is not None and temp_scratch_dir != self.scratch_dir and os.path.exists(temp_scratch_dir):
shutil.rmtree(temp_scratch_dir)
if temp_job_id is not None and os.path.exists(f".lock_{temp_job_id}"):
os.remove(f".lock_{temp_job_id}")


@staticmethod
def validated_download_file_path(basepath, filename, should_exist=True):
Expand Down
9 changes: 9 additions & 0 deletions cdci_data_analysis/pytest_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,15 @@ def remove_scratch_folders(job_id=None):
for d in dir_list:
shutil.rmtree(d)

@staticmethod
def remove_lock_files(job_id=None):
if job_id is None:
lock_files = glob.glob('.lock_*')
else:
lock_files = glob.glob(f'.lock_{job_id}')
for f in lock_files:
os.remove(f)

@staticmethod
def remove_download_folders(id=None):
if id is None:
Expand Down
11 changes: 8 additions & 3 deletions tests/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -2605,6 +2605,7 @@ def test_email_t1_t2(dispatcher_long_living_fixture,
("hard_minimum_folder_age_days", 60)], indirect=True)
def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_minimum_age_days):
DispatcherJobState.remove_scratch_folders()
DispatcherJobState.remove_lock_files()

server = dispatcher_live_fixture

Expand All @@ -2622,6 +2623,8 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m
"exp": int(time.time()) - 15
}

expired_token_encoded = jwt.encode(expired_token, secret_key, algorithm='HS256')

params = {
'query_status': 'new',
'product_type': 'dummy',
Expand Down Expand Up @@ -2654,12 +2657,12 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m
analysis_parameters_path = os.path.join(scratch_dir, 'analysis_parameters.json')
with open(analysis_parameters_path) as analysis_parameters_file:
dict_analysis_parameters = json.load(analysis_parameters_file)
dict_analysis_parameters['token'] = expired_token
# dict_analysis_parameters['token'] = expired_token
dict_analysis_parameters['token'] = expired_token_encoded
with open(analysis_parameters_path, 'w') as dict_analysis_parameters_outfile:
my_json_str = json.dumps(dict_analysis_parameters, indent=4)
dict_analysis_parameters_outfile.write(u'%s' % my_json_str)


params = {
'token': encoded_token,
'soft_minimum_age_days': soft_minimum_age_days
Expand All @@ -2674,7 +2677,9 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m

assert 'output_status' in jdata

assert jdata['output_status'] == f"Removed {number_folders_to_delete} scratch directories"
number_lock_files_deleted = 0 if number_folders_to_delete < number_analysis_to_run else 1
assert jdata['output_status'] == (f"Removed {number_folders_to_delete} scratch directories, "
f"and {number_lock_files_deleted} lock files.")

assert len(glob.glob("scratch_sid_*_jid_*")) == number_analysis_to_run - number_folders_to_delete

Expand Down

0 comments on commit f3e0e20

Please sign in to comment.