diff --git a/docs/man/nng_req.7.adoc b/docs/man/nng_req.7.adoc index 287650832..64426846d 100644 --- a/docs/man/nng_req.7.adoc +++ b/docs/man/nng_req.7.adoc @@ -1,6 +1,6 @@ = nng_req(7) // -// Copyright 2018 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This document is supplied under the terms of the MIT License, a @@ -100,10 +100,27 @@ The following protocol-specific option is available. (xref:nng_duration.5.adoc[`nng_duration`]) When a new request is started, a timer of this duration is also started. If no reply is received before this timer expires, then the request will - be resent. - (Requests are also automatically resent if the peer to whom - the original request was sent disconnects, or if a peer becomes available - while the requester is waiting for an available peer.) + be resent. + ++ +(Requests are also automatically resent if the peer to whom +the original request was sent disconnects, or if a peer becomes available +while the requester is waiting for an available peer.) + ++ +Resending may be deferred up to the value of the `NNG_OPT_RESENDTICK` parameter. + +((`NNG_OPT_REQ_RESENDTICK`)):: + + (xref:nng_duration.5.adoc[`nng_duration`]) + This is the granularity of the clock that is used to check for resending. + The default is a second. Setting this to a higher rate will allow for + more timely resending to occur, but may incur significant additional + overhead when the socket has many outstanding requests (contexts). + ++ +When there are no requests outstanding that have a resend set, then +the clock does not tick at all. + ++ +This option is shared for all contexts on a socket, and is only available for the socket itself. + === Protocol Headers diff --git a/include/nng/protocol/reqrep0/req.h b/include/nng/protocol/reqrep0/req.h index 3ed802165..0c9fde3ff 100644 --- a/include/nng/protocol/reqrep0/req.h +++ b/include/nng/protocol/reqrep0/req.h @@ -31,6 +31,7 @@ NNG_DECL int nng_req0_open_raw(nng_socket *); #define NNG_REQ0_PEER_NAME "rep" #define NNG_OPT_REQ_RESENDTIME "req:resend-time" +#define NNG_OPT_REQ_RESENDTICK "req:resend-tick" #ifdef __cplusplus } diff --git a/src/sp/protocol/reqrep0/req.c b/src/sp/protocol/reqrep0/req.c index fdac7d5cd..c388a5126 100644 --- a/src/sp/protocol/reqrep0/req.c +++ b/src/sp/protocol/reqrep0/req.c @@ -21,34 +21,36 @@ typedef struct req0_ctx req0_ctx; static void req0_run_send_queue(req0_sock *, nni_aio_completions *); static void req0_ctx_reset(req0_ctx *); -static void req0_ctx_timeout(void *); static void req0_pipe_fini(void *); static void req0_ctx_fini(void *); static void req0_ctx_init(void *, void *); +static void req0_retry_cb(void *); // A req0_ctx is a "context" for the request. It uses most of the // socket, but keeps track of its own outstanding replays, the request ID, // and so forth. struct req0_ctx { - req0_sock *sock; - nni_list_node sock_node; // node on the socket context list - nni_list_node send_node; // node on the send_queue - nni_list_node pipe_node; // node on the pipe list - uint32_t request_id; // request ID, without high bit set - nni_aio *recv_aio; // user aio waiting to recv - only one! - nni_aio *send_aio; // user aio waiting to send - nng_msg *req_msg; // request message (owned by protocol) - size_t req_len; // length of request message (for stats) - nng_msg *rep_msg; // reply message - nni_timer_node timer; - nni_duration retry; - bool conn_reset; // sent message w/o retry, peer disconnect + req0_sock *sock; + nni_list_node sock_node; // node on the socket context list + nni_list_node send_node; // node on the send_queue + nni_list_node pipe_node; // node on the pipe list + nni_list_node retry_node; // node on the socket retry list + uint32_t request_id; // request ID, without high bit set + nni_aio *recv_aio; // user aio waiting to recv - only one! + nni_aio *send_aio; // user aio waiting to send + nng_msg *req_msg; // request message (owned by protocol) + size_t req_len; // length of request message (for stats) + nng_msg *rep_msg; // reply message + nni_duration retry; + nni_time retry_time; // retry after this expires + bool conn_reset; // sent message w/o retry, peer disconnect }; // A req0_sock is our per-socket protocol private structure. struct req0_sock { nni_duration retry; bool closed; + bool retry_active; // true if retry aio running nni_atomic_int ttl; req0_ctx master; // base socket master nni_list ready_pipes; @@ -56,9 +58,12 @@ struct req0_sock { nni_list stop_pipes; nni_list contexts; nni_list send_queue; // contexts waiting to send. - nni_id_map requests; // contexts by request ID + nni_list retry_queue; + nni_aio retry_aio; // retry timer + nni_id_map requests; // contexts by request ID nni_pollable readable; nni_pollable writable; + nni_duration retry_tick; // clock interval for retry timer nni_mtx mtx; }; @@ -95,16 +100,20 @@ req0_sock_init(void *arg, nni_sock *sock) NNI_LIST_INIT(&s->busy_pipes, req0_pipe, node); NNI_LIST_INIT(&s->stop_pipes, req0_pipe, node); NNI_LIST_INIT(&s->send_queue, req0_ctx, send_node); + NNI_LIST_INIT(&s->retry_queue, req0_ctx, retry_node); NNI_LIST_INIT(&s->contexts, req0_ctx, sock_node); // this is "semi random" start for request IDs. - s->retry = NNI_SECOND * 60; + s->retry = NNI_SECOND * 60; + s->retry_tick = NNI_SECOND; // how often we check for retries req0_ctx_init(&s->master, s); nni_pollable_init(&s->writable); nni_pollable_init(&s->readable); + nni_aio_init(&s->retry_aio, req0_retry_cb, s); + nni_atomic_init(&s->ttl); nni_atomic_set(&s->ttl, 8); } @@ -130,6 +139,7 @@ req0_sock_fini(void *arg) { req0_sock *s = arg; + nni_aio_stop(&s->retry_aio); nni_mtx_lock(&s->mtx); NNI_ASSERT(nni_list_empty(&s->busy_pipes)); NNI_ASSERT(nni_list_empty(&s->stop_pipes)); @@ -140,6 +150,7 @@ req0_sock_fini(void *arg) nni_pollable_fini(&s->readable); nni_pollable_fini(&s->writable); nni_id_map_fini(&s->requests); + nni_aio_fini(&s->retry_aio); nni_mtx_fini(&s->mtx); } @@ -236,12 +247,9 @@ req0_pipe_close(void *arg) ctx->conn_reset = true; } } else { - // Reset the timer on this so it expires immediately. - // This is actually easier than canceling the timer and - // running the send_queue separately. (In particular, - // it avoids a potential deadlock on cancelling the - // timer.) - nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO); + // Reset the retry time to make it expire immediately. + // The timer should already be running. + ctx->retry_time = nni_clock(); } } nni_mtx_unlock(&s->mtx); @@ -363,16 +371,41 @@ req0_recv_cb(void *arg) } static void -req0_ctx_timeout(void *arg) +req0_retry_cb(void *arg) { - req0_ctx *ctx = arg; - req0_sock *s = ctx->sock; - + req0_sock *s = arg; + req0_ctx *ctx; + nni_time now; + bool reschedule = false; + + // The design of this is that retries are infrequent, because + // we should normally be succeeding. We also hope that we are not + // executing this linear scan of all requests too often, once + // per clock tick is all we want. + now = nni_clock(); nni_mtx_lock(&s->mtx); - if ((ctx->req_msg != NULL) && (!s->closed)) { + if (s->closed || (nni_aio_result(&s->retry_aio) != 0)) { + nni_mtx_unlock(&s->mtx); + return; + } + + NNI_LIST_FOREACH (&s->retry_queue, ctx) { + if (ctx->retry_time > now || (ctx->req_msg == NULL)) { + continue; + } if (!nni_list_node_active(&ctx->send_node)) { nni_list_append(&s->send_queue, ctx); } + reschedule = true; + } + if (!nni_list_empty(&s->retry_queue)) { + // if there are still jobs in the queue waiting to be + // retried, do them. + nni_sleep_aio(s->retry_tick, &s->retry_aio); + } else { + s->retry_active = false; + } + if (reschedule) { req0_run_send_queue(s, NULL); } nni_mtx_unlock(&s->mtx); @@ -384,8 +417,6 @@ req0_ctx_init(void *arg, void *sock) req0_sock *s = sock; req0_ctx *ctx = arg; - nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx); - nni_mtx_lock(&s->mtx); ctx->sock = s; ctx->recv_aio = NULL; @@ -415,9 +446,6 @@ req0_ctx_fini(void *arg) req0_ctx_reset(ctx); nni_list_remove(&s->contexts, ctx); nni_mtx_unlock(&s->mtx); - - nni_timer_cancel(&ctx->timer); - nni_timer_fini(&ctx->timer); } static int @@ -448,20 +476,20 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list) return; } - // We have a place to send it, so do the send. + // We have a place to send it, so send it. // If a sending error occurs that causes the message to // be dropped, we rely on the resend timer to pick it up. // We also notify the completion callback if this is the // first send attempt. nni_list_remove(&s->send_queue, ctx); - // Schedule a resubmit timer. We only do this if we got + // Schedule a retry. We only do this if we got // a pipe to send to. Otherwise, we should get handled // the next time that the send_queue is run. We don't do this // if the retry is "disabled" with NNG_DURATION_INFINITE. if (ctx->retry > 0) { - nni_timer_schedule( - &ctx->timer, nni_clock() + ctx->retry); + nni_list_node_remove(&ctx->retry_node); + nni_list_append(&s->retry_queue, ctx); } // Put us on the pipe list of active contexts. @@ -489,7 +517,7 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list) } // At this point, we will never give this message back to - // to the user, so we don't have to worry about making it + // the user, so we don't have to worry about making it // unique. We can freely clone it. nni_msg_clone(ctx->req_msg); nni_aio_set_msg(&p->aio_send, ctx->req_msg); @@ -503,16 +531,7 @@ req0_ctx_reset(req0_ctx *ctx) req0_sock *s = ctx->sock; // Call with sock lock held! - // We cannot safely "wait" using nni_timer_cancel, but this removes - // any scheduled timer activation. If the timeout is already running - // concurrently, it will still run. It should do nothing, because - // we toss the request. There is still a very narrow race if the - // timeout fires, but doesn't actually start running before we - // both finish this function, *and* manage to reschedule another - // request. The consequence of that occurring is that the request - // will be emitted on the wire twice. This is not actually tragic. - nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER); - + nni_list_node_remove(&ctx->retry_node); nni_list_node_remove(&ctx->pipe_node); nni_list_node_remove(&ctx->send_node); if (ctx->request_id != 0) { @@ -561,7 +580,7 @@ req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv) // entire state machine. This allows us to preserve the // semantic of exactly one receive operation per send // operation, and should be the least surprising for users. The - // main consequence is that if a receive operation is completed + // main consequence is that if the operation is completed // (in error or otherwise), the user must submit a new send // operation to restart the state machine. req0_ctx_reset(ctx); @@ -713,6 +732,15 @@ req0_ctx_send(void *arg, nni_aio *aio) ctx->send_aio = aio; nni_aio_set_msg(aio, NULL); + if (ctx->retry > 0) { + ctx->retry_time = nni_clock() + ctx->retry; + nni_list_append(&s->retry_queue, ctx); + if (!s->retry_active) { + s->retry_active = true; + nni_sleep_aio(s->retry_tick, &s->retry_aio); + } + } + // Stick us on the send_queue list. nni_list_append(&s->send_queue, ctx); @@ -771,6 +799,34 @@ req0_sock_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t) return (req0_ctx_get_resend_time(&s->master, buf, szp, t)); } +static int +req0_sock_set_resend_tick( + void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + req0_sock *s = arg; + nng_duration tick; + int rv; + + if ((rv = nni_copyin_ms(&tick, buf, sz, t)) == 0) { + nni_mtx_lock(&s->mtx); + s->retry_tick = tick; + nni_mtx_unlock(&s->mtx); + } + return (rv); +} + +static int +req0_sock_get_resend_tick(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + req0_sock *s = arg; + nng_duration tick; + + nni_mtx_lock(&s->mtx); + tick = s->retry_tick; + nni_mtx_unlock(&s->mtx); + return (nni_copyout_ms(tick, buf, szp, t)); +} + static int req0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) { @@ -846,6 +902,12 @@ static nni_option req0_sock_options[] = { .o_name = NNG_OPT_SENDFD, .o_get = req0_sock_get_send_fd, }, + { + .o_name = NNG_OPT_REQ_RESENDTICK, + .o_get = req0_sock_get_resend_tick, + .o_set = req0_sock_set_resend_tick, + }, + // terminate list { .o_name = NULL, diff --git a/src/sp/protocol/reqrep0/req_test.c b/src/sp/protocol/reqrep0/req_test.c index 40a6fe6bc..ae670ed3a 100644 --- a/src/sp/protocol/reqrep0/req_test.c +++ b/src/sp/protocol/reqrep0/req_test.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -15,7 +15,7 @@ test_req_identity(void) { nng_socket s; int p; - char * n; + char *n; NUTS_PASS(nng_req0_open(&s)); NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); @@ -74,7 +74,29 @@ test_req_resend_option(void) nng_duration d; bool b; size_t sz = sizeof(b); - const char * opt = NNG_OPT_REQ_RESENDTIME; + const char *opt = NNG_OPT_REQ_RESENDTIME; + + NUTS_PASS(nng_req0_open(&req)); + + NUTS_TRUE(nng_socket_set_ms(req, opt, 10) == 0); + NUTS_FAIL(nng_socket_set(req, opt, "", 1), NNG_EINVAL); + NUTS_FAIL(nng_socket_get(req, opt, &b, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(req, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(req, opt, &b), NNG_EBADTYPE); + + NUTS_PASS(nng_socket_get_ms(req, opt, &d)); + NUTS_TRUE(d == 10); + NUTS_CLOSE(req); +} + +static void +test_req_resend_tick_option(void) +{ + nng_socket req; + nng_duration d; + bool b; + size_t sz = sizeof(b); + const char *opt = NNG_OPT_REQ_RESENDTICK; NUTS_PASS(nng_req0_open(&req)); @@ -93,7 +115,7 @@ void test_req_recv_bad_state(void) { nng_socket req; - nng_msg * msg = NULL; + nng_msg *msg = NULL; NUTS_TRUE(nng_req0_open(&req) == 0); NUTS_TRUE(nng_recvmsg(req, &msg, 0) == NNG_ESTATE); @@ -106,7 +128,7 @@ test_req_recv_garbage(void) { nng_socket rep; nng_socket req; - nng_msg * m; + nng_msg *m; uint32_t req_id; NUTS_PASS(nng_rep0_open_raw(&rep)); @@ -176,7 +198,8 @@ test_req_resend(void) NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, SECOND)); NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, SECOND)); NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, SECOND)); - NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, 10)); + NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTICK, 10)); + NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, 100)); NUTS_MARRY(rep, req); @@ -246,6 +269,8 @@ test_req_resend_disconnect(void) // We intentionally set the retry time long; that way we only see // the retry from loss of our original peer. NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, 60 * SECOND)); + // And make sure the tick runs faster than our timeout! + NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTICK, SECOND/10)); NUTS_MARRY(rep1, req); NUTS_SEND(req, "ping"); @@ -303,7 +328,7 @@ test_req_disconnect_abort(void) nng_socket req; nng_socket rep1; nng_socket rep2; - nng_aio * aio; + nng_aio *aio; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_rep0_open(&rep1)); @@ -352,6 +377,7 @@ test_req_cancel(void) NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 5 * SECOND)); NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 5 * SECOND)); NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, retry)); + NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTICK, SECOND / 10)); NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 16)); NUTS_MARRY(rep, req); @@ -392,7 +418,7 @@ test_req_cancel(void) void test_req_cancel_abort_recv(void) { - nng_aio * aio; + nng_aio *aio; nng_duration retry = SECOND * 10; // 10s (kind of never) nng_socket req; nng_socket rep; @@ -521,11 +547,11 @@ test_req_poll_contention(void) int fd; nng_socket req; nng_socket rep; - nng_aio * aio; + nng_aio *aio; nng_ctx ctx[5]; - nng_aio * ctx_aio[5]; - nng_msg * ctx_msg[5]; - nng_msg * msg; + nng_aio *ctx_aio[5]; + nng_msg *ctx_msg[5]; + nng_msg *msg; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_rep0_open(&rep)); @@ -623,7 +649,7 @@ test_req_poll_readable(void) int fd; nng_socket req; nng_socket rep; - nng_msg * msg; + nng_msg *msg; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_rep0_open(&rep)); @@ -681,8 +707,8 @@ test_req_ctx_send_queued(void) nng_socket req; nng_socket rep; nng_ctx ctx[3]; - nng_aio * aio[3]; - nng_msg * msg[3]; + nng_aio *aio[3]; + nng_msg *msg[3]; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_rep0_open(&rep)); @@ -723,8 +749,8 @@ test_req_ctx_send_close(void) { nng_socket req; nng_ctx ctx[3]; - nng_aio * aio[3]; - nng_msg * msg[3]; + nng_aio *aio[3]; + nng_msg *msg[3]; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000)); @@ -758,8 +784,8 @@ test_req_ctx_send_abort(void) { nng_socket req; nng_ctx ctx[3]; - nng_aio * aio[3]; - nng_msg * msg[3]; + nng_aio *aio[3]; + nng_msg *msg[3]; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000)); @@ -793,8 +819,8 @@ test_req_ctx_send_twice(void) { nng_socket req; nng_ctx ctx; - nng_aio * aio[2]; - nng_msg * msg[2]; + nng_aio *aio[2]; + nng_msg *msg[2]; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000)); @@ -831,8 +857,8 @@ test_req_ctx_send_recv_abort(void) { nng_socket req; nng_ctx ctx; - nng_aio * aio[2]; - nng_msg * msg; + nng_aio *aio[2]; + nng_msg *msg; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, 100)); @@ -867,8 +893,8 @@ test_req_ctx_recv_nonblock(void) nng_socket req; nng_socket rep; nng_ctx ctx; - nng_aio * aio; - nng_msg * msg; + nng_aio *aio; + nng_msg *msg; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_rep0_open(&rep)); @@ -897,8 +923,8 @@ test_req_ctx_send_nonblock(void) { nng_socket req; nng_ctx ctx; - nng_aio * aio; - nng_msg * msg; + nng_aio *aio; + nng_msg *msg; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_ctx_open(&ctx, req)); @@ -921,8 +947,8 @@ test_req_ctx_recv_close_socket(void) nng_socket req; nng_socket rep; nng_ctx ctx; - nng_aio * aio; - nng_msg * m; + nng_aio *aio; + nng_msg *m; NUTS_PASS(nng_req0_open(&req)); NUTS_PASS(nng_rep0_open(&rep)); @@ -947,9 +973,9 @@ static void test_req_validate_peer(void) { nng_socket s1, s2; - nng_stat * stats; - nng_stat * reject; - char * addr; + nng_stat *stats; + nng_stat *reject; + char *addr; NUTS_ADDR(addr, "inproc"); @@ -978,6 +1004,7 @@ NUTS_TESTS = { { "req identity", test_req_identity }, { "req ttl option", test_req_ttl_option }, { "req resend option", test_req_resend_option }, + { "req resend tick option", test_req_resend_tick_option }, { "req recv bad state", test_req_recv_bad_state }, { "req recv garbage", test_req_recv_garbage }, { "req rep exchange", test_req_rep_exchange },