Skip to content

Commit

Permalink
use a lock and loop to attempt to acquire the lock to check and creat…
Browse files Browse the repository at this point in the history
…e the scratch dir
  • Loading branch information
burnout87 committed Oct 7, 2024
1 parent 2d00c76 commit 294594c
Showing 1 changed file with 30 additions and 17 deletions.
47 changes: 30 additions & 17 deletions cdci_data_analysis/flask_app/dispatcher_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import glob
import string
import random
import fcntl

from flask import jsonify, send_from_directory, make_response
from flask import request, g
Expand Down Expand Up @@ -886,9 +887,13 @@ def get_request_files_dir(self):
return request_files_dir.path

def set_scratch_dir(self, session_id, job_id=None, verbose=False):
if verbose == True:
print('SETSCRATCH ---->', session_id,
type(session_id), job_id, type(job_id))

lock_file = f".lock_{self.job_id}"
scratch_dir_retry_attempts = 5
scratch_dir_retry_delay = 0.2

if verbose:
print('SETSCRATCH ---->', session_id, type(session_id), job_id, type(job_id))

wd = 'scratch'

Expand All @@ -898,14 +903,24 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False):
if job_id is not None:
wd += '_jid_'+job_id

alias_workdir = self.get_existing_job_ID_path(
wd=FilePath(file_dir=wd).path)
if alias_workdir is not None:
wd = wd+'_aliased'

wd = FilePath(file_dir=wd)
wd.mkdir()
self.scratch_dir = wd.path
for attempt in range(scratch_dir_retry_attempts):
try:
with open(lock_file, 'w') as lock:
fcntl.flock(lock, fcntl.LOCK_EX)
alias_workdir = self.get_existing_job_ID_path(wd=FilePath(file_dir=wd).path)
if alias_workdir is not None:
wd = wd + '_aliased'

wd = FilePath(file_dir=wd)
wd.mkdir()
self.scratch_dir = wd.path
except IOError as io_e:
self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts} - {attempt + 1} left), sleeping {scratch_dir_retry_delay} seconds until retry.\n{str(io_e)}')
time.sleep(scratch_dir_retry_delay)

dir_list = glob.glob(f"*_jid_{job_id}*")
sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts. dir_list: {dir_list}")
raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.")

def set_temp_dir(self, session_id, job_id=None, verbose=False):
if verbose:
Expand Down Expand Up @@ -1659,9 +1674,7 @@ def set_config(self):
def get_existing_job_ID_path(self, wd):
# exist same job_ID, different session ID
dir_list = glob.glob(f'*_jid_{self.job_id}')
# print('dirs',dir_list)
if dir_list:
dir_list = [d for d in dir_list if 'aliased' not in d]
dir_list = [d for d in dir_list if 'aliased' not in d]

if len(dir_list) == 1:
if dir_list[0] != wd:
Expand All @@ -1670,9 +1683,8 @@ def get_existing_job_ID_path(self, wd):
alias_dir = None

elif len(dir_list) > 1:
sentry.capture_message(f'Found two non aliased identical job_id, dir_list: {dir_list}')
self.logger.warning(f'Found two non aliased identical job_id, dir_list: {dir_list}')

sentry.capture_message(f'Found two or more non aliased identical job_id, dir_list: {dir_list}')
self.logger.warning(f'Found two or more non aliased identical job_id, dir_list: {dir_list}')
raise InternalError("We have encountered an internal error! "
"Our team is notified and is working on it. We are sorry! "
"When we find a solution we will try to reach you",
Expand All @@ -1683,6 +1695,7 @@ def get_existing_job_ID_path(self, wd):

return alias_dir


def get_file_mtime(self, file):
return os.path.getmtime(file)

Expand Down

0 comments on commit 294594c

Please sign in to comment.