From 294594c59f3cdcd8eef2008d2c46a3fea98b025c Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 7 Oct 2024 12:12:30 +0200 Subject: [PATCH 1/6] use a lock and loop to attempt to acquire the lock to check and create the scratch dir --- .../flask_app/dispatcher_query.py | 47 ++++++++++++------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index e086245f..1e7ed546 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -22,6 +22,7 @@ import glob import string import random +import fcntl from flask import jsonify, send_from_directory, make_response from flask import request, g @@ -886,9 +887,13 @@ def get_request_files_dir(self): return request_files_dir.path def set_scratch_dir(self, session_id, job_id=None, verbose=False): - if verbose == True: - print('SETSCRATCH ---->', session_id, - type(session_id), job_id, type(job_id)) + + lock_file = f".lock_{self.job_id}" + scratch_dir_retry_attempts = 5 + scratch_dir_retry_delay = 0.2 + + if verbose: + print('SETSCRATCH ---->', session_id, type(session_id), job_id, type(job_id)) wd = 'scratch' @@ -898,14 +903,24 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False): if job_id is not None: wd += '_jid_'+job_id - alias_workdir = self.get_existing_job_ID_path( - wd=FilePath(file_dir=wd).path) - if alias_workdir is not None: - wd = wd+'_aliased' - - wd = FilePath(file_dir=wd) - wd.mkdir() - self.scratch_dir = wd.path + for attempt in range(scratch_dir_retry_attempts): + try: + with open(lock_file, 'w') as lock: + fcntl.flock(lock, fcntl.LOCK_EX) + alias_workdir = self.get_existing_job_ID_path(wd=FilePath(file_dir=wd).path) + if alias_workdir is not None: + wd = wd + '_aliased' + + wd = FilePath(file_dir=wd) + wd.mkdir() + self.scratch_dir = wd.path + except IOError as io_e: + self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts} - {attempt + 1} left), sleeping {scratch_dir_retry_delay} seconds until retry.\n{str(io_e)}') + time.sleep(scratch_dir_retry_delay) + + dir_list = glob.glob(f"*_jid_{job_id}*") + sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts. dir_list: {dir_list}") + raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.") def set_temp_dir(self, session_id, job_id=None, verbose=False): if verbose: @@ -1659,9 +1674,7 @@ def set_config(self): def get_existing_job_ID_path(self, wd): # exist same job_ID, different session ID dir_list = glob.glob(f'*_jid_{self.job_id}') - # print('dirs',dir_list) - if dir_list: - dir_list = [d for d in dir_list if 'aliased' not in d] + dir_list = [d for d in dir_list if 'aliased' not in d] if len(dir_list) == 1: if dir_list[0] != wd: @@ -1670,9 +1683,8 @@ def get_existing_job_ID_path(self, wd): alias_dir = None elif len(dir_list) > 1: - sentry.capture_message(f'Found two non aliased identical job_id, dir_list: {dir_list}') - self.logger.warning(f'Found two non aliased identical job_id, dir_list: {dir_list}') - + sentry.capture_message(f'Found two or more non aliased identical job_id, dir_list: {dir_list}') + self.logger.warning(f'Found two or more non aliased identical job_id, dir_list: {dir_list}') raise InternalError("We have encountered an internal error! " "Our team is notified and is working on it. We are sorry! " "When we find a solution we will try to reach you", @@ -1683,6 +1695,7 @@ def get_existing_job_ID_path(self, wd): return alias_dir + def get_file_mtime(self, file): return os.path.getmtime(file) From a370115736b8f717af303500369e82cca0298542 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 7 Oct 2024 12:34:57 +0200 Subject: [PATCH 2/6] implemented interrupting the loop --- .../flask_app/dispatcher_query.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 1e7ed546..56554f27 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -891,6 +891,7 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False): lock_file = f".lock_{self.job_id}" scratch_dir_retry_attempts = 5 scratch_dir_retry_delay = 0.2 + scratch_dir_created = True if verbose: print('SETSCRATCH ---->', session_id, type(session_id), job_id, type(job_id)) @@ -911,16 +912,20 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False): if alias_workdir is not None: wd = wd + '_aliased' - wd = FilePath(file_dir=wd) - wd.mkdir() - self.scratch_dir = wd.path + wd_path_obj = FilePath(file_dir=wd) + wd_path_obj.mkdir() + self.scratch_dir = wd_path_obj.path + scratch_dir_created = True + break except IOError as io_e: + scratch_dir_created = False self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts} - {attempt + 1} left), sleeping {scratch_dir_retry_delay} seconds until retry.\n{str(io_e)}') time.sleep(scratch_dir_retry_delay) - dir_list = glob.glob(f"*_jid_{job_id}*") - sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts. dir_list: {dir_list}") - raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.") + if not scratch_dir_created: + dir_list = glob.glob(f"*_jid_{job_id}*") + sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts. dir_list: {dir_list}") + raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.") def set_temp_dir(self, session_id, job_id=None, verbose=False): if verbose: From 59caad58491e3c11f5f30ed4a23d5181d873285c Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 7 Oct 2024 14:20:39 +0200 Subject: [PATCH 3/6] log --- cdci_data_analysis/flask_app/dispatcher_query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 56554f27..5c48c601 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -919,7 +919,7 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False): break except IOError as io_e: scratch_dir_created = False - self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts} - {attempt + 1} left), sleeping {scratch_dir_retry_delay} seconds until retry.\n{str(io_e)}') + self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts - attempt + 1} left), sleeping {scratch_dir_retry_delay} seconds until retry.\n{str(io_e)}') time.sleep(scratch_dir_retry_delay) if not scratch_dir_created: From 238c4d474286b4d796d5dff14671f35f89099b09 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 7 Oct 2024 15:07:48 +0200 Subject: [PATCH 4/6] dedicated test --- tests/test_server_basic.py | 52 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/test_server_basic.py b/tests/test_server_basic.py index 6612e4b2..c833c1d1 100644 --- a/tests/test_server_basic.py +++ b/tests/test_server_basic.py @@ -2,6 +2,8 @@ import shutil import urllib import io +from os import close + import requests import time import uuid @@ -11,6 +13,7 @@ import jwt import glob import pytest +import fcntl from datetime import datetime, timedelta from dateutil import parser, tz from functools import reduce @@ -320,6 +323,55 @@ def test_error_two_scratch_dir_same_job_id(dispatcher_live_fixture): os.rmdir(fake_scratch_dir) +@pytest.mark.not_safe_parallel +@pytest.mark.fast +def test_scratch_dir_creation_lock_error(dispatcher_live_fixture): + DispatcherJobState.remove_scratch_folders() + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + encoded_token = jwt.encode(default_token_payload, secret_key, algorithm='HS256') + # issuing a request each, with the same set of parameters + params = dict( + query_status="new", + query_type="Real", + instrument="empty-async", + product_type="dummy", + token=encoded_token + ) + DataServerQuery.set_status('submitted') + # let's generate a fake scratch dir + jdata = ask(server, + params, + expected_query_status=["submitted"], + max_time_s=50, + ) + + job_id = jdata['job_monitor']['job_id'] + session_id = jdata['session_id'] + fake_scratch_dir = f'scratch_sid_01234567890_jid_{job_id}' + os.makedirs(fake_scratch_dir) + + params['job_id'] = job_id + params['session_id'] = session_id + + lock_file = f".lock_{job_id}" + + with open(lock_file, 'w') as f_lock: + fcntl.flock(f_lock, fcntl.LOCK_EX) + + jdata = ask(server, + params, + expected_status_code=500, + expected_query_status=None, + ) + scratch_dir_retry_attempts = 5 + assert jdata['error'] == f"InternalError():Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts." + assert jdata['error_message'] == f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts." + os.rmdir(fake_scratch_dir) + os.remove(lock_file) + + @pytest.mark.fast def test_same_request_different_users(dispatcher_live_fixture): server = dispatcher_live_fixture From 90e52866d2e32cdf6b0937421f33d507e29cc5ef Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 7 Oct 2024 15:08:03 +0200 Subject: [PATCH 5/6] using LOCK_NB constants to make the request non-blocking. --- cdci_data_analysis/flask_app/dispatcher_query.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 5c48c601..4dfad4e0 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -887,7 +887,6 @@ def get_request_files_dir(self): return request_files_dir.path def set_scratch_dir(self, session_id, job_id=None, verbose=False): - lock_file = f".lock_{self.job_id}" scratch_dir_retry_attempts = 5 scratch_dir_retry_delay = 0.2 @@ -907,7 +906,7 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False): for attempt in range(scratch_dir_retry_attempts): try: with open(lock_file, 'w') as lock: - fcntl.flock(lock, fcntl.LOCK_EX) + fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB) alias_workdir = self.get_existing_job_ID_path(wd=FilePath(file_dir=wd).path) if alias_workdir is not None: wd = wd + '_aliased' @@ -917,15 +916,15 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False): self.scratch_dir = wd_path_obj.path scratch_dir_created = True break - except IOError as io_e: + except (OSError, IOError) as io_e: scratch_dir_created = False - self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts - attempt + 1} left), sleeping {scratch_dir_retry_delay} seconds until retry.\n{str(io_e)}') + self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts - (attempt + 1)} left), sleeping {scratch_dir_retry_delay} seconds until retry.\nError: {str(io_e)}') time.sleep(scratch_dir_retry_delay) if not scratch_dir_created: dir_list = glob.glob(f"*_jid_{job_id}*") - sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts. dir_list: {dir_list}") - raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.") + sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts.\njob_id: {self.job_id}\ndir_list: {dir_list}") + raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.", status_code=500) def set_temp_dir(self, session_id, job_id=None, verbose=False): if verbose: From 4919457cbad98d1715442c13e4f0f05277109c93 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 7 Oct 2024 15:12:11 +0200 Subject: [PATCH 6/6] not needed import --- tests/test_server_basic.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_server_basic.py b/tests/test_server_basic.py index c833c1d1..26a21e9a 100644 --- a/tests/test_server_basic.py +++ b/tests/test_server_basic.py @@ -2,7 +2,6 @@ import shutil import urllib import io -from os import close import requests import time