Skip to content

Commit

Permalink
Merge pull request #598 from oda-hub/temp_dir-inside-scratch_dir
Browse files Browse the repository at this point in the history
Temp dir inside scratch dir
  • Loading branch information
volodymyrss authored Oct 9, 2023
2 parents a19bb7b + 0d1d191 commit 761973b
Showing 1 changed file with 40 additions and 17 deletions.
57 changes: 40 additions & 17 deletions cdci_data_analysis/flask_app/dispatcher_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def __init__(self, app,

self.app = app

temp_scratch_dir = None

self.set_sentry_sdk(getattr(self.app.config.get('conf'), 'sentry_url', None))

try:
Expand All @@ -151,6 +153,7 @@ def __init__(self, app,
raise RequestNotUnderstood("job_id must be present during a call_back")
if data_server_call_back:
# this can be set since it's a call_back and job_id and session_id are available
self.logger.info(f"before setting scratch_dir: job_id: {self.par_dic['job_id']} callback: {data_server_call_back}, resolve_job_url: {resolve_job_url}")
self.set_scratch_dir(session_id=self.par_dic['session_id'], job_id=self.par_dic['job_id'])
self.set_scws_call_back_related_params()
else:
Expand Down Expand Up @@ -244,7 +247,30 @@ def __init__(self, app,
# TODO is also the case of call_back to handle ?
if not data_server_call_back:
self.set_instrument(self.instrument_name, roles, email)
verbose = self.par_dic.get('verbose', 'False') == 'True'

# TODO: if not callback!
# if 'query_status' not in self.par_dic:
# raise MissingRequestParameter('no query_status!')

verbose = self.par_dic.get('verbose', 'False') == 'True'
if not (data_server_call_back or resolve_job_url):
query_status = self.par_dic['query_status']
self.job_id = None
if query_status == 'new':
# let's generate a temporary job_id used for the creation of the scratch_dir
self.generate_job_id()
else:
if 'job_id' not in self.par_dic:
raise RequestNotUnderstood(
f"job_id must be present if query_status != \"new\" (it is \"{query_status}\")")

self.job_id = self.par_dic['job_id']

# let's generate a temporary scratch_dir using the temporary job_id
self.set_scratch_dir(self.par_dic['session_id'], job_id=self.job_id, verbose=verbose)
# temp_job_id = self.job_id
temp_scratch_dir = self.scratch_dir
if not data_server_call_back:
try:
self.set_temp_dir(self.par_dic['session_id'], verbose=verbose)
except Exception as e:
Expand All @@ -263,24 +289,16 @@ def __init__(self, app,
sentry_dsn=self.sentry_dsn
)
self.par_dic = self.instrument.set_pars_from_dic(self.par_dic, verbose=verbose)

# TODO: if not callback!
# if 'query_status' not in self.par_dic:
# raise MissingRequestParameter('no query_status!')

# update the job_id
if not (data_server_call_back or resolve_job_url):
query_status = self.par_dic['query_status']
self.job_id = None
if query_status == 'new':
# this will overwrite any job_id provided, it should be validated or ignored
#if 'job_id' in self.par_dic:
# self.job_id = self.par_dic['job_id']
# self.validate_job_id()

provided_job_id = self.par_dic.get('job_id', None)
if provided_job_id == "": # frontend sends this
provided_job_id = None

# let's generate the definitive job_id
self.generate_job_id()

if provided_job_id is not None and self.job_id != provided_job_id:
Expand All @@ -295,8 +313,8 @@ def __init__(self, app,

self.job_id = self.par_dic['job_id']

self.set_scratch_dir(
self.par_dic['session_id'], job_id=self.job_id, verbose=verbose)
# let's set the scratch_dir with the updated job_id
self.set_scratch_dir(self.par_dic['session_id'], job_id=self.job_id, verbose=verbose)

self.log_query_progression("before move_temp_content")
self.move_temp_content()
Expand All @@ -319,7 +337,7 @@ def __init__(self, app,
finally:
self.logger.info("==> clean-up temporary directory")
self.log_query_progression("before clear_temp_dir")
self.clear_temp_dir()
self.clear_temp_dir(temp_scratch_dir=temp_scratch_dir)
self.log_query_progression("after clear_temp_dir")

logger.info("constructed %s:%s for data_server_call_back=%s", self.__class__, self, data_server_call_back)
Expand Down Expand Up @@ -744,8 +762,10 @@ def set_temp_dir(self, session_id, job_id=None, verbose=False):

if job_id is not None:
suffix += '_jid_'+job_id

td = tempfile.mkdtemp(suffix=suffix)
temp_parent_dir = '.'
if hasattr(self, 'scratch_dir'):
temp_parent_dir = self.scratch_dir
td = tempfile.mkdtemp(suffix=suffix, dir=temp_parent_dir)
self.temp_dir = td

def move_temp_content(self):
Expand All @@ -755,9 +775,11 @@ def move_temp_content(self):
file_full_path = os.path.join(self.temp_dir, f)
shutil.copy(file_full_path, self.scratch_dir)

def clear_temp_dir(self):
def clear_temp_dir(self, temp_scratch_dir=None):
if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
if temp_scratch_dir is not None and temp_scratch_dir != self.scratch_dir and os.path.exists(temp_scratch_dir):
shutil.rmtree(temp_scratch_dir)

def prepare_download(self, file_list, file_name, scratch_dir):
file_name = file_name.replace(' ', '_')
Expand Down Expand Up @@ -1340,6 +1362,7 @@ def get_existing_job_ID_path(self, wd):

elif len(dir_list) > 1:
sentry.capture_message('Found two non aliased identical job_id')
print(f'Found two non aliased identical job_id, dir_list: {dir_list}')
raise RuntimeError('Found two non aliased identical job_id')

else:
Expand Down

0 comments on commit 761973b

Please sign in to comment.