Skip to content

Commit

Permalink
Merge branch 'master' into ivoa_helper-endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
burnout87 authored Oct 15, 2024
2 parents ebf7b24 + f3e0e20 commit f069dad
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 30 deletions.
1 change: 1 addition & 0 deletions cdci_data_analysis/analysis/drupal_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,7 @@ def resolve_name(local_name_resolver_url: str, external_name_resolver_url: str,
resolved_obj['RA'] = float(ra_tag.text)
resolved_obj['DEC'] = float(dec_tag.text)
resolved_obj['entity_portal_link'] = entities_portal_url.format(quoted_name)
resolved_obj['message'] = f'{name} successfully resolved'

try:
Simbad.add_votable_fields("otype")
Expand Down
92 changes: 65 additions & 27 deletions cdci_data_analysis/flask_app/dispatcher_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import glob
import string
import random
import fcntl

from flask import jsonify, send_from_directory, make_response
from flask import request, g
Expand Down Expand Up @@ -134,6 +135,7 @@ def __init__(self, app,
self.app = app

temp_scratch_dir = None
temp_job_id = None

self.set_sentry_sdk(getattr(self.app.config.get('conf'), 'sentry_url', None))

Expand Down Expand Up @@ -282,6 +284,7 @@ def __init__(self, app,
self.set_scratch_dir(self.par_dic['session_id'], job_id=self.job_id, verbose=verbose)
# temp_job_id = self.job_id
temp_scratch_dir = self.scratch_dir
temp_job_id = self.job_id
if not data_server_call_back:
try:
self.set_temp_dir(self.par_dic['session_id'], verbose=verbose)
Expand Down Expand Up @@ -362,7 +365,7 @@ def __init__(self, app,
finally:
self.logger.info("==> clean-up temporary directory")
self.log_query_progression("before clear_temp_dir")
self.clear_temp_dir(temp_scratch_dir=temp_scratch_dir)
self.clear_temp_dir(temp_scratch_dir=temp_scratch_dir, temp_job_id=temp_job_id)
self.log_query_progression("after clear_temp_dir")

logger.info("constructed %s:%s for data_server_call_back=%s", self.__class__, self, data_server_call_back)
Expand All @@ -385,7 +388,7 @@ def free_up_space(app):
hard_minimum_folder_age_days = app_config.hard_minimum_folder_age_days
# let's pass the minimum age the folders to be deleted should have
soft_minimum_folder_age_days = request.args.get('soft_minimum_age_days', None)
if soft_minimum_folder_age_days is None or isinstance(soft_minimum_folder_age_days, int):
if soft_minimum_folder_age_days is None:
soft_minimum_folder_age_days = app_config.soft_minimum_folder_age_days
else:
soft_minimum_folder_age_days = int(soft_minimum_folder_age_days)
Expand All @@ -403,8 +406,11 @@ def free_up_space(app):
dict_analysis_parameters = json.load(analysis_parameters_file)
token = dict_analysis_parameters.get('token', None)
token_expired = False
if token is not None and token['exp'] < current_time_secs:
token_expired = True
if token is not None:
try:
tokenHelper.get_decoded_token(token, secret_key)
except jwt.exceptions.ExpiredSignatureError:
token_expired = True

job_monitor_path = os.path.join(scratch_dir, 'job_monitor.json')
with open(job_monitor_path, 'r') as jm_file:
Expand Down Expand Up @@ -432,15 +438,28 @@ def free_up_space(app):
for d in list_scratch_dir_to_delete:
shutil.rmtree(d)

list_lock_files = sorted(glob.glob(".lock_*"), key=os.path.getatime)
num_lock_files_removed = 0
for l in list_lock_files:
lock_file_job_id = l.split('_')[-1]
list_job_id_scratch_dir = glob.glob(f"scratch_sid_*_jid_{lock_file_job_id}*")
if len(list_job_id_scratch_dir) == 0:
os.remove(l)
num_lock_files_removed += 1

post_clean_space_space = shutil.disk_usage(os.getcwd())
post_clean_available_space = format_size(post_clean_space_space.free, format_returned='M')

list_scratch_dir = sorted(glob.glob("scratch_sid_*_jid_*"))
logger.info(f"Number of scratch folder after clean-up: {len(list_scratch_dir)}.\n"
f"Removed {len(list_scratch_dir_to_delete)} scratch directories, "
f"and now the available amount of space is {post_clean_available_space}")

result_scratch_dir_deletion = f"Removed {len(list_scratch_dir_to_delete)} scratch directories"
list_lock_files = sorted(glob.glob(".lock_*"))
logger.info(f"Number of scratch folder after clean-up: {len(list_scratch_dir)}, "
f"number of lock files after clean-up: {len(list_lock_files)}.\n"
f"Removed {len(list_scratch_dir_to_delete)} scratch directories "
f"and {num_lock_files_removed} lock files.\n"
f"Now the available amount of space is {post_clean_available_space}")

result_scratch_dir_deletion = f"Removed {len(list_scratch_dir_to_delete)} scratch directories, " \
f"and {num_lock_files_removed} lock files."
logger.info(result_scratch_dir_deletion)

return jsonify(dict(output_status=result_scratch_dir_deletion))
Expand Down Expand Up @@ -886,9 +905,13 @@ def get_request_files_dir(self):
return request_files_dir.path

def set_scratch_dir(self, session_id, job_id=None, verbose=False):
if verbose == True:
print('SETSCRATCH ---->', session_id,
type(session_id), job_id, type(job_id))
lock_file = f".lock_{self.job_id}"
scratch_dir_retry_attempts = 5
scratch_dir_retry_delay = 0.2
scratch_dir_created = True

if verbose:
print('SETSCRATCH ---->', session_id, type(session_id), job_id, type(job_id))

wd = 'scratch'

Expand All @@ -898,14 +921,28 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False):
if job_id is not None:
wd += '_jid_'+job_id

alias_workdir = self.get_existing_job_ID_path(
wd=FilePath(file_dir=wd).path)
if alias_workdir is not None:
wd = wd+'_aliased'

wd = FilePath(file_dir=wd)
wd.mkdir()
self.scratch_dir = wd.path
for attempt in range(scratch_dir_retry_attempts):
try:
with open(lock_file, 'w') as lock:
fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
alias_workdir = self.get_existing_job_ID_path(wd=FilePath(file_dir=wd).path)
if alias_workdir is not None:
wd = wd + '_aliased'

wd_path_obj = FilePath(file_dir=wd)
wd_path_obj.mkdir()
self.scratch_dir = wd_path_obj.path
scratch_dir_created = True
break
except (OSError, IOError) as io_e:
scratch_dir_created = False
self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts - (attempt + 1)} left), sleeping {scratch_dir_retry_delay} seconds until retry.\nError: {str(io_e)}')
time.sleep(scratch_dir_retry_delay)

if not scratch_dir_created:
dir_list = glob.glob(f"*_jid_{job_id}*")
sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts.\njob_id: {self.job_id}\ndir_list: {dir_list}")
raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.", status_code=500)

def set_temp_dir(self, session_id, job_id=None, verbose=False):
if verbose:
Expand All @@ -932,11 +969,14 @@ def move_temp_content(self):
file_full_path = os.path.join(self.temp_dir, f)
shutil.copy(file_full_path, self.scratch_dir)

def clear_temp_dir(self, temp_scratch_dir=None):
def clear_temp_dir(self, temp_scratch_dir=None, temp_job_id=None):
if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
if temp_scratch_dir is not None and temp_scratch_dir != self.scratch_dir and os.path.exists(temp_scratch_dir):
shutil.rmtree(temp_scratch_dir)
if temp_job_id is not None and os.path.exists(f".lock_{temp_job_id}"):
os.remove(f".lock_{temp_job_id}")


@staticmethod
def validated_download_file_path(basepath, filename, should_exist=True):
Expand Down Expand Up @@ -1659,9 +1699,7 @@ def set_config(self):
def get_existing_job_ID_path(self, wd):
# exist same job_ID, different session ID
dir_list = glob.glob(f'*_jid_{self.job_id}')
# print('dirs',dir_list)
if dir_list:
dir_list = [d for d in dir_list if 'aliased' not in d]
dir_list = [d for d in dir_list if 'aliased' not in d]

if len(dir_list) == 1:
if dir_list[0] != wd:
Expand All @@ -1670,9 +1708,8 @@ def get_existing_job_ID_path(self, wd):
alias_dir = None

elif len(dir_list) > 1:
sentry.capture_message(f'Found two non aliased identical job_id, dir_list: {dir_list}')
self.logger.warning(f'Found two non aliased identical job_id, dir_list: {dir_list}')

sentry.capture_message(f'Found two or more non aliased identical job_id, dir_list: {dir_list}')
self.logger.warning(f'Found two or more non aliased identical job_id, dir_list: {dir_list}')
raise InternalError("We have encountered an internal error! "
"Our team is notified and is working on it. We are sorry! "
"When we find a solution we will try to reach you",
Expand All @@ -1683,6 +1720,7 @@ def get_existing_job_ID_path(self, wd):

return alias_dir


def get_file_mtime(self, file):
return os.path.getmtime(file)

Expand Down
9 changes: 9 additions & 0 deletions cdci_data_analysis/pytest_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,15 @@ def remove_scratch_folders(job_id=None):
for d in dir_list:
shutil.rmtree(d)

@staticmethod
def remove_lock_files(job_id=None):
if job_id is None:
lock_files = glob.glob('.lock_*')
else:
lock_files = glob.glob(f'.lock_{job_id}')
for f in lock_files:
os.remove(f)

@staticmethod
def remove_download_folders(id=None):
if id is None:
Expand Down
11 changes: 8 additions & 3 deletions tests/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -2605,6 +2605,7 @@ def test_email_t1_t2(dispatcher_long_living_fixture,
("hard_minimum_folder_age_days", 60)], indirect=True)
def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_minimum_age_days):
DispatcherJobState.remove_scratch_folders()
DispatcherJobState.remove_lock_files()

server = dispatcher_live_fixture

Expand All @@ -2622,6 +2623,8 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m
"exp": int(time.time()) - 15
}

expired_token_encoded = jwt.encode(expired_token, secret_key, algorithm='HS256')

params = {
'query_status': 'new',
'product_type': 'dummy',
Expand Down Expand Up @@ -2654,12 +2657,12 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m
analysis_parameters_path = os.path.join(scratch_dir, 'analysis_parameters.json')
with open(analysis_parameters_path) as analysis_parameters_file:
dict_analysis_parameters = json.load(analysis_parameters_file)
dict_analysis_parameters['token'] = expired_token
# dict_analysis_parameters['token'] = expired_token
dict_analysis_parameters['token'] = expired_token_encoded
with open(analysis_parameters_path, 'w') as dict_analysis_parameters_outfile:
my_json_str = json.dumps(dict_analysis_parameters, indent=4)
dict_analysis_parameters_outfile.write(u'%s' % my_json_str)


params = {
'token': encoded_token,
'soft_minimum_age_days': soft_minimum_age_days
Expand All @@ -2674,7 +2677,9 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m

assert 'output_status' in jdata

assert jdata['output_status'] == f"Removed {number_folders_to_delete} scratch directories"
number_lock_files_deleted = 0 if number_folders_to_delete < number_analysis_to_run else 1
assert jdata['output_status'] == (f"Removed {number_folders_to_delete} scratch directories, "
f"and {number_lock_files_deleted} lock files.")

assert len(glob.glob("scratch_sid_*_jid_*")) == number_analysis_to_run - number_folders_to_delete

Expand Down
53 changes: 53 additions & 0 deletions tests/test_server_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shutil
import urllib
import io

import requests
import time
import uuid
Expand All @@ -11,6 +12,7 @@
import jwt
import glob
import pytest
import fcntl
from datetime import datetime, timedelta
from dateutil import parser, tz
from functools import reduce
Expand Down Expand Up @@ -320,6 +322,55 @@ def test_error_two_scratch_dir_same_job_id(dispatcher_live_fixture):
os.rmdir(fake_scratch_dir)


@pytest.mark.not_safe_parallel
@pytest.mark.fast
def test_scratch_dir_creation_lock_error(dispatcher_live_fixture):
DispatcherJobState.remove_scratch_folders()
server = dispatcher_live_fixture
logger.info("constructed server: %s", server)

encoded_token = jwt.encode(default_token_payload, secret_key, algorithm='HS256')
# issuing a request each, with the same set of parameters
params = dict(
query_status="new",
query_type="Real",
instrument="empty-async",
product_type="dummy",
token=encoded_token
)
DataServerQuery.set_status('submitted')
# let's generate a fake scratch dir
jdata = ask(server,
params,
expected_query_status=["submitted"],
max_time_s=50,
)

job_id = jdata['job_monitor']['job_id']
session_id = jdata['session_id']
fake_scratch_dir = f'scratch_sid_01234567890_jid_{job_id}'
os.makedirs(fake_scratch_dir)

params['job_id'] = job_id
params['session_id'] = session_id

lock_file = f".lock_{job_id}"

with open(lock_file, 'w') as f_lock:
fcntl.flock(f_lock, fcntl.LOCK_EX)

jdata = ask(server,
params,
expected_status_code=500,
expected_query_status=None,
)
scratch_dir_retry_attempts = 5
assert jdata['error'] == f"InternalError():Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts."
assert jdata['error_message'] == f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts."
os.rmdir(fake_scratch_dir)
os.remove(lock_file)


@pytest.mark.fast
def test_same_request_different_users(dispatcher_live_fixture):
server = dispatcher_live_fixture
Expand Down Expand Up @@ -2711,6 +2762,7 @@ def test_source_resolver(dispatcher_live_fixture_with_gallery, dispatcher_test_c
assert 'entity_portal_link' in resolved_obj
assert 'object_ids' in resolved_obj
assert 'object_type' in resolved_obj
assert 'message' in resolved_obj

assert resolved_obj['name'] == source_to_resolve.replace('_', ' ')
assert resolved_obj['entity_portal_link'] == dispatcher_test_conf_with_gallery["product_gallery_options"]["entities_portal_url"]\
Expand Down Expand Up @@ -2758,6 +2810,7 @@ def test_source_resolver_invalid_local_resolver(dispatcher_live_fixture_with_gal
assert 'entity_portal_link' in resolved_obj
assert 'object_ids' in resolved_obj
assert 'object_type' in resolved_obj
assert 'message' in resolved_obj

assert resolved_obj['name'] == source_to_resolve.replace('_', ' ')
assert resolved_obj['entity_portal_link'] == dispatcher_test_conf_with_gallery_invalid_local_resolver["product_gallery_options"]["entities_portal_url"]\
Expand Down

0 comments on commit f069dad

Please sign in to comment.