diff --git a/src/sp/protocol/reqrep0/req.c b/src/sp/protocol/reqrep0/req.c index fdac7d5cd..37278de3a 100644 --- a/src/sp/protocol/reqrep0/req.c +++ b/src/sp/protocol/reqrep0/req.c @@ -21,10 +21,10 @@ typedef struct req0_ctx req0_ctx; static void req0_run_send_queue(req0_sock *, nni_aio_completions *); static void req0_ctx_reset(req0_ctx *); -static void req0_ctx_timeout(void *); static void req0_pipe_fini(void *); static void req0_ctx_fini(void *); static void req0_ctx_init(void *, void *); +static void req0_retry_cb(void *); // A req0_ctx is a "context" for the request. It uses most of the // socket, but keeps track of its own outstanding replays, the request ID, @@ -34,14 +34,15 @@ struct req0_ctx { nni_list_node sock_node; // node on the socket context list nni_list_node send_node; // node on the send_queue nni_list_node pipe_node; // node on the pipe list + nni_list_node retry_node; // node on the socket retry list uint32_t request_id; // request ID, without high bit set nni_aio *recv_aio; // user aio waiting to recv - only one! nni_aio *send_aio; // user aio waiting to send nng_msg *req_msg; // request message (owned by protocol) size_t req_len; // length of request message (for stats) nng_msg *rep_msg; // reply message - nni_timer_node timer; nni_duration retry; + nni_time retry_time; // retry after this expires bool conn_reset; // sent message w/o retry, peer disconnect }; @@ -49,6 +50,7 @@ struct req0_ctx { struct req0_sock { nni_duration retry; bool closed; + bool retry_active; // true if retry aio running nni_atomic_int ttl; req0_ctx master; // base socket master nni_list ready_pipes; @@ -56,9 +58,12 @@ struct req0_sock { nni_list stop_pipes; nni_list contexts; nni_list send_queue; // contexts waiting to send. + nni_list retry_queue; + nni_aio retry_aio; // retry timer nni_id_map requests; // contexts by request ID nni_pollable readable; nni_pollable writable; + nni_duration retry_tick; // clock interval for retry timer nni_mtx mtx; }; @@ -95,16 +100,20 @@ req0_sock_init(void *arg, nni_sock *sock) NNI_LIST_INIT(&s->busy_pipes, req0_pipe, node); NNI_LIST_INIT(&s->stop_pipes, req0_pipe, node); NNI_LIST_INIT(&s->send_queue, req0_ctx, send_node); + NNI_LIST_INIT(&s->retry_queue, req0_ctx, retry_node); NNI_LIST_INIT(&s->contexts, req0_ctx, sock_node); // this is "semi random" start for request IDs. s->retry = NNI_SECOND * 60; + s->retry_tick = NNI_SECOND; // how often we check for retries req0_ctx_init(&s->master, s); nni_pollable_init(&s->writable); nni_pollable_init(&s->readable); + nni_aio_init(&s->retry_aio, req0_retry_cb, s); + nni_atomic_init(&s->ttl); nni_atomic_set(&s->ttl, 8); } @@ -130,6 +139,7 @@ req0_sock_fini(void *arg) { req0_sock *s = arg; + nni_aio_stop(&s->retry_aio); nni_mtx_lock(&s->mtx); NNI_ASSERT(nni_list_empty(&s->busy_pipes)); NNI_ASSERT(nni_list_empty(&s->stop_pipes)); @@ -140,6 +150,7 @@ req0_sock_fini(void *arg) nni_pollable_fini(&s->readable); nni_pollable_fini(&s->writable); nni_id_map_fini(&s->requests); + nni_aio_fini(&s->retry_aio); nni_mtx_fini(&s->mtx); } @@ -236,12 +247,9 @@ req0_pipe_close(void *arg) ctx->conn_reset = true; } } else { - // Reset the timer on this so it expires immediately. - // This is actually easier than canceling the timer and - // running the send_queue separately. (In particular, - // it avoids a potential deadlock on cancelling the - // timer.) - nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO); + // Reset the retry time to make it expire immediately. + // The timer should already be running. + ctx->retry_time = nni_clock(); } } nni_mtx_unlock(&s->mtx); @@ -363,16 +371,41 @@ req0_recv_cb(void *arg) } static void -req0_ctx_timeout(void *arg) +req0_retry_cb(void *arg) { - req0_ctx *ctx = arg; - req0_sock *s = ctx->sock; - + req0_sock *s = arg; + req0_ctx *ctx; + nni_time now; + bool reschedule = false; + + // The design of this is that retries are infrequent, because + // we should normally be succeeding. We also hope that we are not + // executing this linear scan of all requests too often, once + // per clock tick is all we want. + now = nni_clock(); nni_mtx_lock(&s->mtx); - if ((ctx->req_msg != NULL) && (!s->closed)) { + if (s->closed || (nni_aio_result(&s->retry_aio) != 0)) { + nni_mtx_unlock(&s->mtx); + return; + } + + NNI_LIST_FOREACH (&s->retry_queue, ctx) { + if (ctx->retry_time > now || (ctx->req_msg == NULL)) { + continue; + } if (!nni_list_node_active(&ctx->send_node)) { nni_list_append(&s->send_queue, ctx); } + reschedule = true; + } + if (!nni_list_empty(&s->retry_queue)) { + // if there are still jobs in the queue waiting to be + // retried, do them. + nni_sleep_aio(s->retry_tick, &s->retry_aio); + } else { + s->retry_active = false; + } + if (reschedule) { req0_run_send_queue(s, NULL); } nni_mtx_unlock(&s->mtx); @@ -384,8 +417,6 @@ req0_ctx_init(void *arg, void *sock) req0_sock *s = sock; req0_ctx *ctx = arg; - nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx); - nni_mtx_lock(&s->mtx); ctx->sock = s; ctx->recv_aio = NULL; @@ -415,9 +446,6 @@ req0_ctx_fini(void *arg) req0_ctx_reset(ctx); nni_list_remove(&s->contexts, ctx); nni_mtx_unlock(&s->mtx); - - nni_timer_cancel(&ctx->timer); - nni_timer_fini(&ctx->timer); } static int @@ -448,20 +476,20 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list) return; } - // We have a place to send it, so do the send. + // We have a place to send it, so send it. // If a sending error occurs that causes the message to // be dropped, we rely on the resend timer to pick it up. // We also notify the completion callback if this is the // first send attempt. nni_list_remove(&s->send_queue, ctx); - // Schedule a resubmit timer. We only do this if we got + // Schedule a retry. We only do this if we got // a pipe to send to. Otherwise, we should get handled // the next time that the send_queue is run. We don't do this // if the retry is "disabled" with NNG_DURATION_INFINITE. if (ctx->retry > 0) { - nni_timer_schedule( - &ctx->timer, nni_clock() + ctx->retry); + nni_list_node_remove(&ctx->retry_node); + nni_list_append(&s->retry_queue, ctx); } // Put us on the pipe list of active contexts. @@ -489,7 +517,7 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list) } // At this point, we will never give this message back to - // to the user, so we don't have to worry about making it + // the user, so we don't have to worry about making it // unique. We can freely clone it. nni_msg_clone(ctx->req_msg); nni_aio_set_msg(&p->aio_send, ctx->req_msg); @@ -503,16 +531,7 @@ req0_ctx_reset(req0_ctx *ctx) req0_sock *s = ctx->sock; // Call with sock lock held! - // We cannot safely "wait" using nni_timer_cancel, but this removes - // any scheduled timer activation. If the timeout is already running - // concurrently, it will still run. It should do nothing, because - // we toss the request. There is still a very narrow race if the - // timeout fires, but doesn't actually start running before we - // both finish this function, *and* manage to reschedule another - // request. The consequence of that occurring is that the request - // will be emitted on the wire twice. This is not actually tragic. - nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER); - + nni_list_node_remove(&ctx->retry_node); nni_list_node_remove(&ctx->pipe_node); nni_list_node_remove(&ctx->send_node); if (ctx->request_id != 0) { @@ -561,7 +580,7 @@ req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv) // entire state machine. This allows us to preserve the // semantic of exactly one receive operation per send // operation, and should be the least surprising for users. The - // main consequence is that if a receive operation is completed + // main consequence is that if the operation is completed // (in error or otherwise), the user must submit a new send // operation to restart the state machine. req0_ctx_reset(ctx); @@ -713,6 +732,15 @@ req0_ctx_send(void *arg, nni_aio *aio) ctx->send_aio = aio; nni_aio_set_msg(aio, NULL); + if (ctx->retry > 0) { + ctx->retry_time = nni_clock() + ctx->retry; + nni_list_append(&s->retry_queue, ctx); + if (!s->retry_active) { + s->retry_active = true; + nni_sleep_aio(s->retry_tick, &s->retry_aio); + } + } + // Stick us on the send_queue list. nni_list_append(&s->send_queue, ctx);