Skip to content

Commit

Permalink
Remove JobListReadLock and simplify JobQueue
Browse files Browse the repository at this point in the history
* Remove unused JobListReadLock
* Initialize struct members
* Combine alloc and set_driver functions
* Update signatures for pybind
* Update test_job_queue test
  • Loading branch information
andreas-el authored Sep 22, 2023
1 parent 8a15332 commit 8b6b6c5
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 55 deletions.
4 changes: 1 addition & 3 deletions src/clib/lib/include/ert/job_queue/job_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
#include <ert/job_queue/queue_driver.hpp>

typedef struct job_queue_struct job_queue_type;
extern "C" void job_queue_set_driver(job_queue_type *queue,
queue_driver_type *driver);
extern "C" job_queue_type *job_queue_alloc();
extern "C" job_queue_type *job_queue_alloc(queue_driver_type *driver);
extern "C" void job_queue_free(job_queue_type *);
extern "C" PY_USED int job_queue_add_job_node(job_queue_type *queue,
job_queue_node_type *node);
Expand Down
2 changes: 1 addition & 1 deletion src/clib/lib/job_queue/job_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <ert/job_queue/job_node.hpp>

struct job_list_struct {
std::vector<job_queue_node_type *> vec_jobs;
std::vector<job_queue_node_type *> vec_jobs{};
pthread_rwlock_t lock{};
};

Expand Down
32 changes: 5 additions & 27 deletions src/clib/lib/job_queue/job_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,18 @@
#include <ert/job_queue/job_queue.hpp>

struct job_queue_struct {
job_list_type *job_list;
job_queue_status_type *status;
job_list_type *job_list = nullptr;
job_queue_status_type *status = nullptr;
/** A pointer to a driver instance (LSF|LOCAL) which actually 'does it'. */
queue_driver_type *driver;
queue_driver_type *driver = nullptr;
};

class JobListReadLock {
/* This is just a trick to make sure list is unlocked when exiting scope,
* also when exiting due to exceptions */
public:
JobListReadLock(job_list_type *job_list) : job_list(job_list) {
job_list_get_rdlock(this->job_list);
}
~JobListReadLock() { job_list_unlock(this->job_list); }

private:
job_list_type *job_list;
};

/**
Observe that the job_queue returned by this function is NOT ready
for use; a driver must be set explicitly with a call to
job_queue_set_driver() first.
*/
job_queue_type *job_queue_alloc() {
job_queue_type *job_queue_alloc(queue_driver_type *driver) {
auto queue = new job_queue_type;
queue->driver = NULL;
queue->job_list = job_list_alloc();
queue->status = job_queue_status_alloc();
return queue;
}

void job_queue_set_driver(job_queue_type *queue, queue_driver_type *driver) {
queue->driver = driver;
return queue;
}

void job_queue_free(job_queue_type *queue) {
Expand Down
4 changes: 1 addition & 3 deletions src/clib/lib/job_queue/job_queue_status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
#include <ert/job_queue/queue_driver.hpp>

job_queue_status_type *job_queue_status_alloc() {
auto status = new job_queue_status_struct;
pthread_rwlock_init(&status->rw_lock, nullptr);
return status;
return new job_queue_status_struct;
}

void job_queue_status_free(job_queue_status_type *status) { delete status; }
Expand Down
5 changes: 1 addition & 4 deletions src/clib/old_tests/job_queue/test_job_queue_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@
#include <ert/job_queue/torque_driver.hpp>

void job_queue_set_driver_(job_driver_type driver_type) {
job_queue_type *queue = job_queue_alloc();
queue_driver_type *driver = queue_driver_alloc(driver_type);

job_queue_set_driver(queue, driver);

job_queue_type *queue = job_queue_alloc(driver);
job_queue_free(queue);
queue_driver_free(driver);
}
Expand Down
7 changes: 2 additions & 5 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,8 @@ def _queue_state_event_type(state: str) -> str:
# pylint: disable=too-many-public-methods
class JobQueue(BaseCClass): # type: ignore
TYPE_NAME = "job_queue"
_alloc = ResPrototype("void* job_queue_alloc()", bind=False)
_alloc = ResPrototype("void* job_queue_alloc(void*)", bind=False)
_free = ResPrototype("void job_queue_free( job_queue )")
_set_driver = ResPrototype("void job_queue_set_driver( job_queue , void* )")

_add_job = ResPrototype("int job_queue_add_job_node(job_queue, job_queue_node)")

def __repr__(self) -> str:
Expand All @@ -112,11 +110,10 @@ def __init__(self, driver: "Driver", max_submit: int = 2):

self.job_list: List[JobQueueNode] = []
self._stopped = False
c_ptr = self._alloc(STATUS_file, ERROR_file)
c_ptr = self._alloc(driver.from_param(driver))
super().__init__(c_ptr)

self.driver = driver
self._set_driver(driver.from_param(driver))
self._differ = QueueDiffer()
self._max_job_duration = 0
self._max_submit = max_submit
Expand Down
20 changes: 8 additions & 12 deletions tests/unit_tests/job_queue/test_job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
from threading import BoundedSemaphore
from typing import Any, Callable, Dict, Optional
from unittest.mock import MagicMock, patch
from unittest.mock import patch

import ert.callbacks
from ert.config import QueueSystem
Expand Down Expand Up @@ -331,17 +331,13 @@ def test_stop_long_running():
job_list[i]._start_time = 0
job_list[i]._end_time = 5

# The driver is of no consequence, so resolving it in the c layer is
# uninteresting and mocked out.
with patch("ert.job_queue.JobQueue._set_driver"):
queue = JobQueue(MagicMock())

# We don't need the c layer call here, we only need it added to
# the queue's job_list.
with patch("ert.job_queue.JobQueue._add_job") as _add_job:
for idx, job in enumerate(job_list):
_add_job.return_value = idx
queue.add_job(job, idx)
driver = Driver(driver_type=QueueSystem.LOCAL)
queue = JobQueue(driver)

with patch("ert.job_queue.JobQueue._add_job") as _add_job:
for idx, job in enumerate(job_list):
_add_job.return_value = idx
queue.add_job(job, idx)

queue.stop_long_running_jobs(5)
queue._differ.transition(queue.job_list)
Expand Down

0 comments on commit 8b6b6c5

Please sign in to comment.