Skip to content

Commit

Permalink
Replace jobs array with job_queue_node_type vector
Browse files Browse the repository at this point in the history
* Replace **jobs array with std::vector<job_queue_node_type *>
* Move initialization to struct and replace NULL with nullptr
* Inline job_list_reset and remove job_list_get_size
* Initialize struct contents brace style
  • Loading branch information
andreas-el authored Sep 19, 2023
1 parent 6e7d720 commit bfbd5f3
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 130 deletions.
3 changes: 1 addition & 2 deletions src/clib/lib/include/ert/job_queue/job_list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ typedef struct job_list_struct job_list_type;

job_list_type *job_list_alloc();
void job_list_free(job_list_type *job_list);
int job_list_get_size(const job_list_type *job_list);
void job_list_add_job(job_list_type *job_list, job_queue_node_type *job_node);
void job_list_reset(job_list_type *job_list);
size_t job_list_get_size(const job_list_type *job_list);
void job_list_get_wrlock(job_list_type *list);
void job_list_get_rdlock(job_list_type *list);
void job_list_unlock(job_list_type *list);
Expand Down
30 changes: 15 additions & 15 deletions src/clib/lib/include/ert/job_queue/job_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,36 @@
*/
struct job_queue_node_struct {
/** How many cpu's will this job need - the driver is free to ignore if not relevant. */
int num_cpu;
int num_cpu = 0;
/** The path to the actual executable. */
char *run_cmd;
/** The queue will look for the occurence of this file to detect a failure. */
char *exit_file;
char *run_cmd = nullptr;
/** The queue will look for the occurrence of this file to detect a failure. */
char *exit_file = nullptr;
/** The queue will look for this file to verify that the job is running or
* has run. */
char *status_file;
char *status_file = nullptr;
/** The name of the job. */
char *job_name;
char *job_name = nullptr;
/** Where the job is run - absolute path. */
char *run_path;
char *run_path = nullptr;
/** The number of commandline arguments to pass when starting the job. */
int argc;
int argc = 0;
/** The commandline arguments. */
char **argv;
int queue_index;
int queue_index = 0;

std::optional<std::string> fail_message;
std::optional<std::string> fail_message{};

/** Which attempt is this ... */
int submit_attempt;
int submit_attempt = 0;
/** The current status of the job. */
job_status_type job_status;
job_status_type job_status = JOB_QUEUE_NOT_ACTIVE;
/** Protecting the access to the job_data pointer. */
pthread_mutex_t data_mutex;
pthread_mutex_t data_mutex{};
/** Driver specific data about this job - fully handled by the driver. */
void *job_data;
void *job_data = nullptr;
/** When did the job change status -> RUNNING - the LAST TIME. */
time_t sim_start;
time_t sim_start = 0;
};

typedef bool(job_callback_ftype)(void *);
Expand Down
62 changes: 13 additions & 49 deletions src/clib/lib/job_queue/job_list.cpp
Original file line number Diff line number Diff line change
@@ -1,66 +1,30 @@
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <ert/util/util.hpp>
#include <vector>

#include <ert/job_queue/job_list.hpp>
#include <ert/job_queue/job_node.hpp>

struct job_list_struct {
int active_size;
int alloc_size;
job_queue_node_type **jobs;
pthread_rwlock_t lock;
std::vector<job_queue_node_type *> vec_jobs;
pthread_rwlock_t lock{};
};

job_list_type *job_list_alloc() {
auto job_list = new job_list_type;
job_list->active_size = 0;
job_list->alloc_size = 0;
job_list->jobs = NULL;
pthread_rwlock_init(&job_list->lock, NULL);
return job_list;
}

void job_list_reset(job_list_type *job_list) {
for (int queue_index = 0; queue_index < job_list->active_size;
queue_index++) {
job_queue_node_type *node = job_list->jobs[queue_index];
job_queue_node_free(node);
job_list->jobs[queue_index] = NULL;
}
job_list->active_size = 0;
}

int job_list_get_size(const job_list_type *job_list) {
return job_list->active_size;
}
job_list_type *job_list_alloc() { return new job_list_type; }

/**
This takes ownership to the job node instance.
*/
void job_list_add_job(job_list_type *job_list, job_queue_node_type *job_node) {
if (job_list->alloc_size == job_list->active_size) {
int new_alloc_size = util_int_max(16, job_list->alloc_size * 2);
job_list->jobs = (job_queue_node_type **)util_realloc(
job_list->jobs, sizeof *job_list->jobs * new_alloc_size);
job_list->alloc_size = new_alloc_size;
}

int queue_index = job_list->active_size;
job_queue_node_set_queue_index(job_node, queue_index);
job_list->jobs[queue_index] = job_node;
unsigned long queue_index = job_list->vec_jobs.size();
job_queue_node_set_queue_index(job_node, static_cast<int>(queue_index));
job_list->vec_jobs.push_back(job_node);
}

job_list->active_size++;
size_t job_list_get_size(const job_list_type *job_list) {
return job_list->vec_jobs.size();
}

void job_list_free(job_list_type *job_list) {
if (job_list->alloc_size > 0) {
job_list_reset(job_list);
free(job_list->jobs);
}
for (auto &vec_job : job_list->vec_jobs)
job_queue_node_free(vec_job);
job_list->vec_jobs.clear();
delete job_list;
}

Expand Down
31 changes: 5 additions & 26 deletions src/clib/lib/job_queue/job_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
namespace fs = std::filesystem;
static auto logger = ert::get_logger("job_queue");

#define INVALID_QUEUE_INDEX -999
const time_t MAX_CONFIRMED_WAIT = 10 * 60;

/*
Expand Down Expand Up @@ -112,20 +111,12 @@ void job_queue_node_fscanf_EXIT(job_queue_node_type *node) {
}

int job_queue_node_get_queue_index(const job_queue_node_type *node) {
if (node->queue_index == INVALID_QUEUE_INDEX)
util_abort("%s: internal error: asked for not-yet-initialized "
"node->queue_index\n",
__func__);
return node->queue_index;
}

void job_queue_node_set_queue_index(job_queue_node_type *node,
int queue_index) {
if (node->queue_index == INVALID_QUEUE_INDEX)
node->queue_index = queue_index;
else
util_abort("%s: internal error: attempt to reset queue_index \n",
__func__);
node->queue_index = queue_index;
}

/*
Expand Down Expand Up @@ -164,17 +155,15 @@ job_queue_node_type *job_queue_node_alloc(const char *job_name,
const char *exit_file) {
char **argv = stringlist_alloc_char_ref(arguments);
if (!util_is_directory(run_path))
return NULL;
return nullptr;

auto node = new job_queue_node_type;

/* The data initialized in this block should *NEVER* change. */
std::string path = job_name;
std::string basename = path.substr(path.find_last_of("/\\") + 1);
node->job_name = util_alloc_string_copy(basename.data());

node->run_path = util_alloc_realpath(run_path);

node->run_cmd = util_alloc_string_copy(run_cmd);
node->argc = argc;
node->argv = util_alloc_stringlist_copy(
Expand All @@ -183,22 +172,12 @@ job_queue_node_type *job_queue_node_alloc(const char *job_name,

if (status_file)
node->status_file =
util_alloc_filename(node->run_path, status_file, NULL);
else
node->status_file = NULL;
util_alloc_filename(node->run_path, status_file, nullptr);

if (exit_file)
node->exit_file = util_alloc_filename(node->run_path, exit_file, NULL);
else
node->exit_file = NULL;

node->job_status = JOB_QUEUE_NOT_ACTIVE;
node->queue_index = INVALID_QUEUE_INDEX;
node->submit_attempt = 0;
node->job_data = NULL; // assume allocation is run in single thread mode
node->sim_start = 0;
node->exit_file =
util_alloc_filename(node->run_path, exit_file, nullptr);

pthread_mutex_init(&node->data_mutex, NULL);
free(argv);
return node;
}
Expand Down
23 changes: 1 addition & 22 deletions src/clib/old_tests/job_queue/test_job_list.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
#include <stdbool.h>
#include <stdlib.h>

#include <ert/util/test_util.hpp>

#include <ert/job_queue/job_list.hpp>
#include <ert/job_queue/job_node.hpp>
#include <ert/util/test_util.hpp>

void test_create() {
job_list_type *list = job_list_alloc();
Expand All @@ -19,27 +15,10 @@ void test_add_job() {
job_list_add_job(list, node);
test_assert_int_equal(job_list_get_size(list), 1);
test_assert_int_equal(job_queue_node_get_queue_index(node), 0);

struct data_t {
job_list_type *list;
job_queue_node_type *node;
} data{list, node};

test_assert_util_abort(
"job_queue_node_set_queue_index",
[](void *data_) {
auto data = reinterpret_cast<data_t *>(data_);
job_list_add_job(data->list, data->node);
},
&data);

job_list_reset(list);
test_assert_int_equal(0, job_list_get_size(list));
job_list_free(list);
}

int main(int argc, char **argv) {
util_install_signals();
test_create();
test_add_job();
}
18 changes: 2 additions & 16 deletions src/clib/old_tests/job_queue/test_job_node.cpp
Original file line number Diff line number Diff line change
@@ -1,36 +1,22 @@
#include <cassert>
#include <stdlib.h>

#include <ert/job_queue/job_node.hpp>
#include <ert/util/test_util.hpp>

void test_create() {
job_queue_node_type *node = job_queue_node_alloc(
"name", "/tmp", "ls", 0, stringlist_alloc_new(), 1, NULL, NULL);
job_queue_node_free(node);
}

void call_get_queue_index(void *arg) {
auto node = static_cast<job_queue_node_type *>(arg);
job_queue_node_get_queue_index(node);
}

void test_queue_index() {
job_queue_node_type *node = job_queue_node_alloc(
"name", "/tmp", "ls", 0, stringlist_alloc_new(), 1, NULL, NULL);
test_assert_util_abort("job_queue_node_get_queue_index",
call_get_queue_index, node);
}

void test_path_does_not_exist() {
job_queue_node_type *node =
job_queue_node_alloc("name", "does-not-exist", "ls", 0,
stringlist_alloc_new(), 1, NULL, NULL);
test_assert_NULL(node);
assert(node == nullptr);
}

int main(int argc, char **argv) {
util_install_signals();
test_create();
test_queue_index();
test_path_does_not_exist();
}

0 comments on commit bfbd5f3

Please sign in to comment.