Skip to content

Commit

Permalink
aio: significant rework
Browse files Browse the repository at this point in the history
The attempt to use nni_task_abort() was completely misguided.
In fact this function isn't needed, and is a relic of a design that
predates the nni_aio_begin / nni_aio_schedule split.

Additionally, nni_aio_abort needed a fix to prevent a hang if
it was called between the calls to nni_aio_prep and nni_aio_schedule.
(Essentially a canceled operation should fail in scheduling.)
  • Loading branch information
gdamore committed Dec 4, 2024
1 parent a94acc7 commit 80d2283
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 96 deletions.
94 changes: 32 additions & 62 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,32 +89,14 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_timeout = NNG_DURATION_INFINITE;
aio->a_expire_q =
nni_aio_expire_q_list[nni_random() % nni_aio_expire_q_cnt];
#ifndef NDEBUG
nni_atomic_init_bool(&aio->a_started);
#endif
nni_atomic_init_bool(&aio->a_stopped);
aio->a_init = true;
}

#ifndef NDEBUG
static bool
aio_started(nni_aio *aio)
{
return (nni_atomic_get_bool(&aio->a_started));
}
#endif

static bool
aio_stopped(nni_aio *aio)
{
return (nni_atomic_get_bool(&aio->a_stopped));
}

void
nni_aio_fini(nni_aio *aio)
{
if (aio != NULL && aio->a_init) {
NNI_ASSERT(aio_stopped(aio) || !aio_started(aio));
NNI_ASSERT(!nni_aio_busy(aio));
nni_task_fini(&aio->a_task);
}
}
Expand Down Expand Up @@ -178,7 +160,7 @@ nni_aio_set_iov(nni_aio *aio, unsigned nio, const nni_iov *iov)
void
nni_aio_stop(nni_aio *aio)
{
if (aio != NULL && aio->a_init && !aio_stopped(aio)) {
if (aio != NULL && aio->a_init) {
nni_aio_cancel_fn fn;
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
Expand All @@ -194,20 +176,17 @@ nni_aio_stop(nni_aio *aio)

if (fn != NULL) {
fn(aio, arg, NNG_ECANCELED);
} else {
nni_task_abort(&aio->a_task);
}

nni_aio_wait(aio);

nni_atomic_set_bool(&aio->a_stopped, true);
NNI_ASSERT(!nni_aio_busy(aio));
}
}

void
nni_aio_close(nni_aio *aio)
{
if (aio != NULL && aio->a_init && !aio_stopped(aio)) {
if (aio != NULL && aio->a_init) {
nni_aio_cancel_fn fn;
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
Expand All @@ -223,8 +202,6 @@ nni_aio_close(nni_aio *aio)

if (fn != NULL) {
fn(aio, arg, NNG_ECLOSED);
} else {
nni_task_abort(&aio->a_task);
}
}
}
Expand Down Expand Up @@ -310,7 +287,7 @@ nni_aio_count(nni_aio *aio)
void
nni_aio_wait(nni_aio *aio)
{
if (aio != NULL && aio->a_init && !aio_stopped(aio)) {
if (aio != NULL && aio->a_init) {
nni_task_wait(&aio->a_task);
}
}
Expand All @@ -324,35 +301,21 @@ nni_aio_busy(nni_aio *aio)
int
nni_aio_begin(nni_aio *aio)
{
// If any of these triggers then the caller has a defect because
// it means that the aio is already in use. This is always
// a bug in the caller. These checks are not technically thread
// safe in the event that they are false. Users of race detectors
// checks may wish ignore or suppress these checks.
nni_aio_expire_q *eq = aio->a_expire_q;

if (aio_stopped(aio)) {
NNI_ASSERT(!nni_list_node_active(&aio->a_expire_node));
aio->a_result = NNG_ECANCELED;
aio->a_cancel_fn = NULL;
return (NNG_ECANCELED);
}
#ifndef NDEBUG
nni_atomic_set_bool(&aio->a_started, true);
#endif
nni_mtx_lock(&eq->eq_mtx);
NNI_ASSERT(!nni_aio_list_active(aio));
NNI_ASSERT(aio->a_cancel_fn == NULL);
NNI_ASSERT(!nni_list_node_active(&aio->a_expire_node));

// Some initialization can be done outside the lock, because
// we must have exclusive access to the aio.
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
aio->a_result = 0;
aio->a_count = 0;
aio->a_cancel_fn = NULL;
aio->a_result = 0;
aio->a_count = 0;
aio->a_cancel_fn = NULL;
aio->a_abort = false;
aio->a_abort_result = 0;

// We should not reschedule anything at this point.
if (aio->a_stop) {
Expand All @@ -361,7 +324,6 @@ nni_aio_begin(nni_aio *aio)
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
aio->a_expire_ok = false;
nni_atomic_set_bool(&aio->a_stopped, true);
nni_mtx_unlock(&eq->eq_mtx);

return (NNG_ECANCELED);
Expand All @@ -380,7 +342,6 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
nni_task_abort(&aio->a_task);
return (NNG_ETIMEDOUT);
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
Expand All @@ -393,11 +354,18 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}

nni_mtx_lock(&eq->eq_mtx);
NNI_ASSERT(nni_task_busy(&aio->a_task));
if (aio->a_stop) {
nni_task_abort(&aio->a_task);
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ECLOSED);
}
if (aio->a_abort) {
int rv = aio->a_abort_result;
aio->a_abort = false;
aio->a_abort_result = 0;
nni_mtx_unlock(&eq->eq_mtx);
return (rv);
}

NNI_ASSERT(aio->a_cancel_fn == NULL);
aio->a_cancel_fn = cancel;
Expand All @@ -421,19 +389,26 @@ nni_aio_abort(nni_aio *aio, int rv)
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;

NNI_ASSERT(rv > 0);
nni_mtx_lock(&eq->eq_mtx);
nni_aio_expire_rm(aio);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
aio->a_cancel_arg = NULL;
if (fn != NULL) {
aio->a_abort = true;
aio->a_abort_result = rv;
}
nni_mtx_unlock(&eq->eq_mtx);

// Stop any I/O at the provider level.
// If this doesn't catch it, it will be reported
// at nni_aio_schedule (if this is to be scheduled),
// or else we have proceeded to far to cancel this operation.
// (In which case it should complete shortly.)
if (fn != NULL) {
fn(aio, arg, rv);
} else {
nni_task_abort(&aio->a_task);
}
}

Expand All @@ -445,13 +420,6 @@ nni_aio_finish_impl(
{
nni_aio_expire_q *eq = aio->a_expire_q;

if (eq == NULL) {
// This happens if we have stopped I/O
// and some caller has not noticed yet.
// (The caller should own the aio.)
return;
}

nni_mtx_lock(&eq->eq_mtx);

nni_aio_expire_rm(aio);
Expand All @@ -463,9 +431,11 @@ nni_aio_finish_impl(
aio->a_msg = msg;
}

aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
aio->a_use_expire = false;
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
aio->a_use_expire = false;
aio->a_abort = false;
aio->a_abort_result = 0;
nni_mtx_unlock(&eq->eq_mtx);

if (sync) {
Expand Down
27 changes: 12 additions & 15 deletions src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,18 @@ typedef struct nni_aio_expire_q nni_aio_expire_q;
// any of these members -- the definition is provided here to facilitate
// inlining, but that should be the only use.
struct nng_aio {
size_t a_count; // Bytes transferred (I/O only)
nni_time a_expire; // Absolute timeout
nni_duration a_timeout; // Relative timeout
int a_result; // Result code (nng_errno)
bool a_stop; // Shutting down (no new operations)
bool a_sleep; // Sleeping with no action
bool a_expire_ok; // Expire from sleep is ok
bool a_expiring; // Expiration in progress
bool a_use_expire; // Use expire instead of timeout
bool a_init; // This is initialized
size_t a_count; // Bytes transferred (I/O only)
nni_time a_expire; // Absolute timeout
nni_duration a_timeout; // Relative timeout
int a_result; // Result code (nng_errno)
int a_abort_result; // Reason for abort (result code)
bool a_stop; // Shutting down (no new operations)
bool a_abort; // Aborted (after begin, before schedule)
bool a_sleep; // Sleeping with no action
bool a_expire_ok; // Expire from sleep is ok
bool a_expiring; // Expiration in progress
bool a_use_expire; // Use expire instead of timeout
bool a_init; // This is initialized

nni_task a_task;

Expand All @@ -236,11 +238,6 @@ struct nng_aio {
void *a_inputs[4];
void *a_outputs[4];

#ifndef NDEBUG
nni_atomic_bool a_started; // Used only in NNI_ASSERT
#endif

nni_atomic_bool a_stopped;
nni_aio_expire_q *a_expire_q;
nni_list_node a_expire_node; // Expiration node
nni_reap_node a_reap_node;
Expand Down
24 changes: 10 additions & 14 deletions src/core/taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ nni_taskq_thread(void *self)
for (;;) {
if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {

NNI_ASSERT(!task->task_dead);
nni_list_remove(&tq->tq_tasks, task);

nni_mtx_unlock(&tq->tq_mtx);
Expand All @@ -50,6 +51,7 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&task->task_mtx);
task->task_busy--;
if (task->task_busy == 0) {
NNI_ASSERT(!task->task_prep);
nni_cv_wake(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
Expand Down Expand Up @@ -131,6 +133,8 @@ void
nni_task_exec(nni_task *task)
{
nni_mtx_lock(&task->task_mtx);
NNI_ASSERT(!task->task_dead);
NNI_ASSERT(task->task_busy > 0);
if (task->task_prep) {
task->task_prep = false;
} else {
Expand All @@ -155,13 +159,15 @@ nni_task_dispatch(nni_task *task)
{
nni_taskq *tq = task->task_tq;

NNI_ASSERT(!task->task_dead);
// If there is no callback to perform, then do nothing!
// The user will be none the wiser.
if (task->task_cb == NULL) {
nni_task_exec(task);
return;
}
nni_mtx_lock(&task->task_mtx);
NNI_ASSERT(task->task_busy > 0);
if (task->task_prep) {
task->task_prep = false;
} else {
Expand All @@ -179,29 +185,17 @@ void
nni_task_prep(nni_task *task)
{
nni_mtx_lock(&task->task_mtx);
NNI_ASSERT(!task->task_dead);
task->task_busy++;
task->task_prep = true;
nni_mtx_unlock(&task->task_mtx);
}

void
nni_task_abort(nni_task *task)
{
// This is called when unscheduling the task.
nni_mtx_lock(&task->task_mtx);
if (task->task_prep) {
task->task_prep = false;
task->task_busy--;
if (task->task_busy == 0) {
nni_cv_wake(&task->task_cv);
}
}
nni_mtx_unlock(&task->task_mtx);
}
void
nni_task_wait(nni_task *task)
{
nni_mtx_lock(&task->task_mtx);
NNI_ASSERT(!task->task_dead);
while (task->task_busy) {
nni_cv_wait(&task->task_cv);
}
Expand Down Expand Up @@ -234,6 +228,8 @@ nni_task_init(nni_task *task, nni_taskq *tq, nni_cb cb, void *arg)
void
nni_task_fini(nni_task *task)
{
task->task_dead = true;
NNI_ASSERT(!nni_list_node_active(&task->task_node));
nni_cv_fini(&task->task_cv);
nni_mtx_fini(&task->task_mtx);
}
Expand Down
7 changes: 2 additions & 5 deletions src/core/taskq.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ extern void nni_task_exec(nni_task *);
// nni_task_exec).
extern void nni_task_prep(nni_task *);

// nni_task_abort is called to undo the effect of nni_task_prep,
// basically. The aio framework uses this when nni_aio_schedule()
// returns an error.
extern void nni_task_abort(nni_task *);

// nni_task_busy checks to see if a task is still busy.
// This is uses the same check that nni_task_wait uses.
extern bool nni_task_busy(nni_task *);
Expand All @@ -53,6 +48,7 @@ extern bool nni_task_busy(nni_task *);
// work is scheduled on the task then it will not return until that
// work (or any other work subsequently scheduled) is complete.
extern void nni_task_wait(nni_task *);
extern void nni_task_stop(nni_task *);
extern void nni_task_init(nni_task *, nni_taskq *, nni_cb, void *);

// nni_task_fini destroys the task. It will reap resources asynchronously
Expand All @@ -74,6 +70,7 @@ struct nni_task {
nni_taskq *task_tq;
unsigned task_busy;
bool task_prep;
bool task_dead;
nni_mtx task_mtx;
nni_cv task_cv;
};
Expand Down

0 comments on commit 80d2283

Please sign in to comment.