diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index eb511a834..f003d7562 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -68,8 +68,6 @@ nng_sources( tcp.h thread.c thread.h - timer.c - timer.h url.c url.h ) diff --git a/src/core/aio.c b/src/core/aio.c index e849b33dc..3d4a56c19 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -115,7 +115,7 @@ void nni_aio_fini(nni_aio *aio) { nni_aio_cancel_fn fn; - void *arg; + void *arg; nni_aio_expire_q *eq = aio->a_expire_q; // This is like aio_close, but we don't want to dispatch @@ -247,7 +247,21 @@ nni_aio_close(nni_aio *aio) void nni_aio_set_timeout(nni_aio *aio, nni_duration when) { - aio->a_timeout = when; + aio->a_timeout = when; + aio->a_use_expire = false; +} + +void +nni_aio_set_expire(nni_aio *aio, nni_time expire) +{ + aio->a_expire = expire; + aio->a_use_expire = true; +} + +nng_duration +nni_aio_get_timeout(nni_aio *aio) +{ + return (aio->a_timeout); } void @@ -369,7 +383,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) { nni_aio_expire_q *eq = aio->a_expire_q; - if (!aio->a_sleep) { + if ((!aio->a_sleep) && (!aio->a_use_expire)) { // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: @@ -411,7 +425,7 @@ void nni_aio_abort(nni_aio *aio, int rv) { nni_aio_cancel_fn fn; - void *arg; + void *arg; nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); @@ -447,8 +461,9 @@ nni_aio_finish_impl( aio->a_msg = msg; } - aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; + aio->a_expire = NNI_TIME_NEVER; + aio->a_sleep = false; + aio->a_use_expire = false; nni_mtx_unlock(&eq->eq_mtx); if (sync) { @@ -518,13 +533,14 @@ nni_aio_completions_init(nni_aio_completions *clp) } void -nni_aio_completions_add(nni_aio_completions *clp, nni_aio *aio, int result, size_t count) +nni_aio_completions_add( + nni_aio_completions *clp, nni_aio *aio, int result, size_t count) { NNI_ASSERT(!nni_aio_list_active(aio)); aio->a_reap_node.rn_next = *clp; - aio->a_result = result; - aio->a_count = count; - *clp = aio; + aio->a_result = result; + aio->a_count = count; + *clp = aio; } void @@ -532,10 +548,10 @@ nni_aio_completions_run(nni_aio_completions *clp) { nni_aio *aio; nni_aio *cl = *clp; - *clp = NULL; + *clp = NULL; while ((aio = cl) != NULL) { - cl = (void *)aio->a_reap_node.rn_next; + cl = (void *) aio->a_reap_node.rn_next; aio->a_reap_node.rn_next = NULL; nni_aio_finish_sync(aio, aio->a_result, aio->a_count); } diff --git a/src/core/aio.h b/src/core/aio.h index a2ebf70a9..cae8610f8 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -149,10 +149,12 @@ extern size_t nni_aio_iov_count(nni_aio *); extern int nni_aio_set_iov(nni_aio *, unsigned, const nni_iov *); -extern void nni_aio_set_timeout(nni_aio *, nng_duration); -extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **); -extern void nni_aio_normalize_timeout(nni_aio *, nng_duration); -extern void nni_aio_bump_count(nni_aio *, size_t); +extern void nni_aio_set_timeout(nni_aio *, nng_duration); +extern void nni_aio_set_expire(nni_aio *, nni_time); +extern nng_duration nni_aio_get_timeout(nni_aio *); +extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **); +extern void nni_aio_normalize_timeout(nni_aio *, nng_duration); +extern void nni_aio_bump_count(nni_aio *, size_t); // nni_aio_schedule indicates that the AIO has begun, and is scheduled for // asynchronous completion. This also starts the expiration timer. Note that @@ -187,8 +189,8 @@ extern void nni_aio_completions_run(nni_aio_completions *); // nni_aio_completions_add adds an aio (with the result code and length as // appropriate) to the completion list. This should be done while the // appropriate lock is held. The aio must not be scheduled. -extern void nni_aio_completions_add(nni_aio_completions *, nni_aio *, - int, size_t); +extern void nni_aio_completions_add( + nni_aio_completions *, nni_aio *, int, size_t); extern int nni_aio_sys_init(void); extern void nni_aio_sys_fini(void); @@ -202,14 +204,15 @@ typedef struct nni_aio_expire_q nni_aio_expire_q; // any of these members -- the definition is provided here to facilitate // inlining, but that should be the only use. struct nng_aio { - size_t a_count; // Bytes transferred (I/O only) - nni_time a_expire; // Absolute timeout - nni_duration a_timeout; // Relative timeout - int a_result; // Result code (nng_errno) - bool a_stop; // Shutting down (no new operations) - bool a_sleep; // Sleeping with no action - bool a_expire_ok; // Expire from sleep is ok - bool a_expiring; // Expiration in progress + size_t a_count; // Bytes transferred (I/O only) + nni_time a_expire; // Absolute timeout + nni_duration a_timeout; // Relative timeout + int a_result; // Result code (nng_errno) + bool a_stop; // Shutting down (no new operations) + bool a_sleep; // Sleeping with no action + bool a_expire_ok; // Expire from sleep is ok + bool a_expiring; // Expiration in progress + bool a_use_expire; // Use expire instead of timeout nni_task a_task; // Read/write operations. @@ -227,8 +230,8 @@ struct nng_aio { // Provider-use fields. nni_aio_cancel_fn a_cancel_fn; - void *a_cancel_arg; - void *a_prov_data; + void *a_cancel_arg; + void *a_prov_data; nni_list_node a_prov_node; // Linkage on provider list. nni_aio_expire_q *a_expire_q; nni_list_node a_expire_node; // Expiration node diff --git a/src/core/init.c b/src/core/init.c index f8ecb385e..f2195bcb4 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -34,7 +34,6 @@ nni_init_helper(void) if (((rv = nni_taskq_sys_init()) != 0) || ((rv = nni_reap_sys_init()) != 0) || - ((rv = nni_timer_sys_init()) != 0) || ((rv = nni_aio_sys_init()) != 0) || ((rv = nni_tls_sys_init()) != 0)) { nni_fini(); @@ -65,7 +64,6 @@ nni_fini(void) nni_tls_sys_fini(); nni_reap_drain(); nni_aio_sys_fini(); - nni_timer_sys_fini(); nni_taskq_sys_fini(); nni_reap_sys_fini(); // must be before timer and aio (expire) nni_id_map_sys_fini(); diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 15db8d160..0eceaf0bd 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -1,5 +1,5 @@ // -// Copyright 2021 Garrett D'Amore +// Copyright 2023 Staysail Systems, Inc. // Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -45,7 +45,6 @@ #include "core/strs.h" #include "core/taskq.h" #include "core/thread.h" -#include "core/timer.h" #include "core/url.h" // transport needs to come after url diff --git a/src/core/timer.c b/src/core/timer.c deleted file mode 100644 index 360248175..000000000 --- a/src/core/timer.c +++ /dev/null @@ -1,177 +0,0 @@ -// -// Copyright 2020 Staysail Systems, Inc. -// Copyright 2018 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#include -#include - -static void nni_timer_loop(void *); - -// XXX: replace this timer list with a minHeap based priority queue. -struct nni_timer { - nni_mtx t_mx; - nni_cv t_wait_cv; - nni_cv t_sched_cv; - nni_list t_entries; - nni_thr t_thr; - int t_run; - int t_waiting; - nni_timer_node *t_active; // Must never ever be dereferenced! -}; - -typedef struct nni_timer nni_timer; - -static nni_timer nni_global_timer; - -int -nni_timer_sys_init(void) -{ - int rv; - nni_timer *timer = &nni_global_timer; - - memset(timer, 0, sizeof(*timer)); - NNI_LIST_INIT(&timer->t_entries, nni_timer_node, t_node); - - nni_mtx_init(&timer->t_mx); - nni_cv_init(&timer->t_sched_cv, &timer->t_mx); - nni_cv_init(&timer->t_wait_cv, &timer->t_mx); - - if ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0) { - nni_timer_sys_fini(); - return (rv); - } - timer->t_run = 1; - nni_thr_run(&timer->t_thr); - return (0); -} - -void -nni_timer_sys_fini(void) -{ - nni_timer *timer = &nni_global_timer; - - if (timer->t_run) { - nni_mtx_lock(&timer->t_mx); - timer->t_run = 0; - nni_cv_wake(&timer->t_sched_cv); - nni_mtx_unlock(&timer->t_mx); - } - - nni_thr_fini(&timer->t_thr); - nni_cv_fini(&timer->t_wait_cv); - nni_cv_fini(&timer->t_sched_cv); - nni_mtx_fini(&timer->t_mx); -} - -void -nni_timer_init(nni_timer_node *node, nni_cb cb, void *arg) -{ - node->t_cb = cb; - node->t_arg = arg; -} - -void -nni_timer_fini(nni_timer_node *node) -{ - NNI_ARG_UNUSED(node); -} - -void -nni_timer_cancel(nni_timer_node *node) -{ - nni_timer *timer = &nni_global_timer; - - nni_mtx_lock(&timer->t_mx); - while (timer->t_active == node) { - timer->t_waiting = 1; - nni_cv_wait(&timer->t_wait_cv); - } - if (nni_list_active(&timer->t_entries, node)) { - nni_list_remove(&timer->t_entries, node); - } - nni_mtx_unlock(&timer->t_mx); -} - -void -nni_timer_schedule(nni_timer_node *node, nni_time when) -{ - nni_timer *timer = &nni_global_timer; - - nni_mtx_lock(&timer->t_mx); - node->t_expire = when; - - if (nni_list_active(&timer->t_entries, node)) { - nni_list_remove(&timer->t_entries, node); - } - - if (when != NNI_TIME_NEVER) { - nni_timer_node *srch = nni_list_first(&timer->t_entries); - while ((srch != NULL) && (srch->t_expire < node->t_expire)) { - srch = nni_list_next(&timer->t_entries, srch); - } - if (srch != NULL) { - nni_list_insert_before(&timer->t_entries, node, srch); - } else { - nni_list_append(&timer->t_entries, node); - } - if (nni_list_first(&timer->t_entries) == node) { - nni_cv_wake1(&timer->t_sched_cv); - } - } - nni_mtx_unlock(&timer->t_mx); -} - -static void -nni_timer_loop(void *arg) -{ - nni_timer * timer = arg; - nni_time now; - nni_timer_node *node; - - nni_thr_set_name(NULL, "nng:timer"); - - for (;;) { - nni_mtx_lock(&timer->t_mx); - timer->t_active = NULL; - if (timer->t_waiting) { - timer->t_waiting = 0; - nni_cv_wake(&timer->t_wait_cv); - } - if (!timer->t_run) { - nni_mtx_unlock(&timer->t_mx); - break; - } - - now = nni_clock(); - if ((node = nni_list_first(&timer->t_entries)) == NULL) { - nni_cv_wait(&timer->t_sched_cv); - nni_mtx_unlock(&timer->t_mx); - continue; - } - if (now < node->t_expire) { - // End of run, we have to wait for next. - nni_cv_until(&timer->t_sched_cv, node->t_expire); - nni_mtx_unlock(&timer->t_mx); - continue; - } - - nni_list_remove(&timer->t_entries, node); - - // Save the active node. Note that the timer callback can - // free this memory or do something else with it, so it is - // important that we never dereference this pointer, but - // just compare the value of the pointer itself. - timer->t_active = node; - nni_mtx_unlock(&timer->t_mx); - - node->t_cb(node->t_arg); - } -} diff --git a/src/core/timer.h b/src/core/timer.h deleted file mode 100644 index a8108d5f2..000000000 --- a/src/core/timer.h +++ /dev/null @@ -1,34 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#ifndef CORE_TIMER_H -#define CORE_TIMER_H - -#include "core/defs.h" -#include "core/list.h" - -// For the sake of simplicity, we just maintain a single global timer thread. - -struct nni_timer_node { - nni_time t_expire; - nni_cb t_cb; - void * t_arg; - nni_list_node t_node; -}; - -typedef struct nni_timer_node nni_timer_node; - -extern void nni_timer_init(nni_timer_node *, nni_cb, void *); -extern void nni_timer_fini(nni_timer_node *); -extern void nni_timer_schedule(nni_timer_node *, nni_time); -extern void nni_timer_cancel(nni_timer_node *); -extern int nni_timer_sys_init(void); -extern void nni_timer_sys_fini(void); - -#endif // CORE_TIMER_H diff --git a/src/sp/protocol/survey0/survey.c b/src/sp/protocol/survey0/survey.c index 5c52d8f8e..18074016e 100644 --- a/src/sp/protocol/survey0/survey.c +++ b/src/sp/protocol/survey0/survey.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -25,17 +25,15 @@ typedef struct surv0_ctx surv0_ctx; static void surv0_pipe_send_cb(void *); static void surv0_pipe_recv_cb(void *); -static void surv0_ctx_timeout(void *); struct surv0_ctx { surv0_sock * sock; uint32_t survey_id; // survey id - nni_timer_node timer; - nni_time expire; nni_lmq recv_lmq; nni_list recv_queue; nni_atomic_int recv_buf; nni_atomic_int survey_time; + nni_time expire; int err; }; @@ -99,7 +97,6 @@ surv0_ctx_fini(void *arg) surv0_ctx *ctx = arg; surv0_ctx_close(ctx); - nni_timer_cancel(&ctx->timer); nni_lmq_fini(&ctx->recv_lmq); } @@ -129,7 +126,6 @@ surv0_ctx_init(void *c, void *s) ctx->sock = sock; nni_lmq_init(&ctx->recv_lmq, len); - nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx); } static void @@ -155,17 +151,28 @@ surv0_ctx_recv(void *arg, nni_aio *aio) surv0_ctx * ctx = arg; surv0_sock *sock = ctx->sock; nni_msg * msg; + nni_time now; + nni_duration timeout; if (nni_aio_begin(aio) != 0) { return; } + now = nni_clock(); + nni_mtx_lock(&sock->mtx); - if (ctx->survey_id == 0) { + if ((ctx->survey_id == 0) || (now >= ctx->expire)) { nni_mtx_unlock(&sock->mtx); nni_aio_finish_error(aio, NNG_ESTATE); return; } + + timeout = nni_aio_get_timeout(aio); + if ((timeout < 1) || ((now + timeout) > ctx->expire)) { + // limit the timeout to the survey time + nni_aio_set_expire(aio, ctx->expire); + } + again: if (nni_lmq_get(&ctx->recv_lmq, &msg) != 0) { int rv; @@ -190,23 +197,6 @@ surv0_ctx_recv(void *arg, nni_aio *aio) nni_aio_finish_msg(aio, msg); } -void -surv0_ctx_timeout(void *arg) -{ - surv0_ctx * ctx = arg; - surv0_sock *sock = ctx->sock; - - nni_mtx_lock(&sock->mtx); - if (nni_clock() < ctx->expire) { - nni_mtx_unlock(&sock->mtx); - return; - } - - // Abort any pending receives. - surv0_ctx_abort(ctx, NNG_ETIMEDOUT); - nni_mtx_unlock(&sock->mtx); -} - static void surv0_ctx_send(void *arg, nni_aio *aio) { @@ -215,7 +205,6 @@ surv0_ctx_send(void *arg, nni_aio *aio) surv0_pipe * pipe; nni_msg * msg = nni_aio_get_msg(aio); size_t len = nni_msg_len(msg); - nni_time now = nni_clock(); nng_duration survey_time; int rv; @@ -229,7 +218,6 @@ surv0_ctx_send(void *arg, nni_aio *aio) // Abort everything outstanding. surv0_ctx_abort(ctx, NNG_ECANCELED); - nni_timer_cancel(&ctx->timer); // Allocate the new ID. if ((rv = nni_id_alloc(&sock->surveys, &ctx->survey_id, ctx)) != 0) { @@ -258,8 +246,9 @@ surv0_ctx_send(void *arg, nni_aio *aio) } } - ctx->expire = now + survey_time; - nni_timer_schedule(&ctx->timer, ctx->expire); + // save the survey time, so we know the maximum timeout to use when + // waiting for receive + ctx->expire = nni_clock() + survey_time; nni_mtx_unlock(&sock->mtx); nni_msg_free(msg); diff --git a/src/sp/protocol/survey0/survey_test.c b/src/sp/protocol/survey0/survey_test.c index 1e1b8635c..94a7922a2 100644 --- a/src/sp/protocol/survey0/survey_test.c +++ b/src/sp/protocol/survey0/survey_test.c @@ -188,7 +188,7 @@ test_surv_cancel(void) // will be canceled before it gets to the peer.) NUTS_SLEEP(100); - // Send the next next request ("def"). Note that + // Send the next request ("def"). Note that // the RESP side server will have already buffered the receive // request, and should simply be waiting for us to reply to abc. NUTS_SEND(surv, "def"); @@ -246,7 +246,7 @@ test_surv_cancel_abort_recv(void) // Give time for this recv to post properly. NUTS_SLEEP(100); - // Send the next next request ("def"). Note that + // Send the next request ("def"). Note that // the respondent side server will have already buffered the receive // request, and should simply be waiting for us to reply to // abc.