Skip to content

Commit

Permalink
using LOCK_NB constants to make the request non-blocking.
Browse files Browse the repository at this point in the history
  • Loading branch information
burnout87 committed Oct 7, 2024
1 parent 238c4d4 commit 90e5286
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions cdci_data_analysis/flask_app/dispatcher_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,6 @@ def get_request_files_dir(self):
return request_files_dir.path

def set_scratch_dir(self, session_id, job_id=None, verbose=False):

lock_file = f".lock_{self.job_id}"
scratch_dir_retry_attempts = 5
scratch_dir_retry_delay = 0.2
Expand All @@ -907,7 +906,7 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False):
for attempt in range(scratch_dir_retry_attempts):
try:
with open(lock_file, 'w') as lock:
fcntl.flock(lock, fcntl.LOCK_EX)
fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
alias_workdir = self.get_existing_job_ID_path(wd=FilePath(file_dir=wd).path)
if alias_workdir is not None:
wd = wd + '_aliased'
Expand All @@ -917,15 +916,15 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False):
self.scratch_dir = wd_path_obj.path
scratch_dir_created = True
break
except IOError as io_e:
except (OSError, IOError) as io_e:
scratch_dir_created = False
self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts - attempt + 1} left), sleeping {scratch_dir_retry_delay} seconds until retry.\n{str(io_e)}')
self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts - (attempt + 1)} left), sleeping {scratch_dir_retry_delay} seconds until retry.\nError: {str(io_e)}')
time.sleep(scratch_dir_retry_delay)

if not scratch_dir_created:
dir_list = glob.glob(f"*_jid_{job_id}*")
sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts. dir_list: {dir_list}")
raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.")
sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts.\njob_id: {self.job_id}\ndir_list: {dir_list}")
raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.", status_code=500)

def set_temp_dir(self, session_id, job_id=None, verbose=False):
if verbose:
Expand Down

0 comments on commit 90e5286

Please sign in to comment.