diff --git a/libs/libcommon/src/libcommon/queue/jobs.py b/libs/libcommon/src/libcommon/queue/jobs.py index 5c0bf9e48..cfbdea208 100644 --- a/libs/libcommon/src/libcommon/queue/jobs.py +++ b/libs/libcommon/src/libcommon/queue/jobs.py @@ -30,7 +30,7 @@ ) from libcommon.dtos import FlatJobInfo, JobInfo, Priority, Status, WorkerSize from libcommon.queue.dataset_blockages import DATASET_STATUS_BLOCKED, DATASET_STATUS_NORMAL, get_blocked_datasets -from libcommon.queue.lock import lock, release_lock, release_locks +from libcommon.queue.lock import lock, release_lock from libcommon.queue.metrics import ( decrease_metric, decrease_worker_size_metrics, @@ -147,7 +147,7 @@ class JobDocument(Document): config (`str`, *optional*): The config on which to apply the job. split (`str`, *optional*): The split on which to apply the job. unicity_id (`str`): A string that identifies the job uniquely. Only one job with the same unicity_id can be in - the started state. The revision is not part of the unicity_id. + the started state. namespace (`str`): The dataset namespace (user or organization) if any, else the dataset name (canonical name). priority (`Priority`, *optional*): The priority of the job. Defaults to Priority.LOW. status (`Status`, *optional*): The status of the job. Defaults to Status.WAITING. @@ -260,7 +260,7 @@ class Queue: the jobs. You can create multiple Queue objects, it has no effect on the database. It's a FIFO queue, with the following properties: - - a job is identified by its input arguments: unicity_id (type, dataset, config and split, NOT revision) + - a job is identified by its input arguments: unicity_id (type, dataset, config and split, revision) - a job can be in one of the following states: waiting, started - a job can be in the queue only once (unicity_id) in the "started" state - a job can be in the queue multiple times in the other states @@ -698,8 +698,7 @@ def finish_job(self, job_id: str) -> Optional[Priority]: ) job_priority = job.priority job.delete() - release_locks(owner=job_id) - # ^ bug: the lock owner is not set to the job id anymore when calling start_job()! + release_lock(key=job.unicity_id) if was_blocked: pending_jobs = self.get_pending_jobs_df(dataset=job.dataset) for _, pending_job in pending_jobs.iterrows(): diff --git a/libs/libcommon/src/libcommon/queue/lock.py b/libs/libcommon/src/libcommon/queue/lock.py index 7f0486e3a..7b583a896 100644 --- a/libs/libcommon/src/libcommon/queue/lock.py +++ b/libs/libcommon/src/libcommon/queue/lock.py @@ -180,21 +180,6 @@ def git_branch( return cls(key=key, owner=owner, sleeps=sleeps, ttl=_TTL.LOCK_TTL_SECONDS_TO_WRITE_ON_GIT_BRANCH) -def release_locks(owner: str) -> None: - """ - Release all locks owned by the given owner - - Args: - owner (`str`): the current owner that holds the locks - """ - Lock.objects(owner=owner).update( - write_concern={"w": "majority", "fsync": True}, - read_concern={"level": "majority"}, - owner=None, - updated_at=get_datetime(), - ) - - def release_lock(key: str) -> None: """ Release the lock for a specific key