From e623dedab28a1fec6270c05f9643e68bfb98b7c3 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 25 Nov 2023 17:35:35 -0800 Subject: [PATCH] fixes #1523 rare SEGV in sub nni_list_remove Credit goes to Wu Xuan (@willwu1217) for diagnosing and proposing a fix as part of #1695. This approach takes a revised approach to avoid adding extra memory, and it also is slightly faster as we do not need to update both pointers in the linked list, by reusing the reap node. As part of this a new internal API, nni_aio_completions, is introduced. In all likelihood we will be able to use this to solve some similar crashes in other areas of the code. --- src/core/aio.c | 33 +++++++++++++++++ src/core/aio.h | 26 +++++++++++++- src/sp/protocol/pubsub0/sub.c | 67 +++++++++++++++++------------------ 3 files changed, 90 insertions(+), 36 deletions(-) diff --git a/src/core/aio.c b/src/core/aio.c index 564e91a30..e849b33dc 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -508,6 +508,39 @@ nni_aio_list_active(nni_aio *aio) return (nni_list_node_active(&aio->a_prov_node)); } +// completions list. +// Implementation note: in order to avoid wasting space, we +// reuse the reap node -- which will be inactive here. +void +nni_aio_completions_init(nni_aio_completions *clp) +{ + *clp = NULL; +} + +void +nni_aio_completions_add(nni_aio_completions *clp, nni_aio *aio, int result, size_t count) +{ + NNI_ASSERT(!nni_aio_list_active(aio)); + aio->a_reap_node.rn_next = *clp; + aio->a_result = result; + aio->a_count = count; + *clp = aio; +} + +void +nni_aio_completions_run(nni_aio_completions *clp) +{ + nni_aio *aio; + nni_aio *cl = *clp; + *clp = NULL; + + while ((aio = cl) != NULL) { + cl = (void *)aio->a_reap_node.rn_next; + aio->a_reap_node.rn_next = NULL; + nni_aio_finish_sync(aio, aio->a_result, aio->a_count); + } +} + static void nni_aio_expire_add(nni_aio *aio) { diff --git a/src/core/aio.h b/src/core/aio.h index 6315e90c9..a2ebf70a9 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -1,5 +1,5 @@ // -// Copyright 2022 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -166,6 +166,30 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *); extern void nni_sleep_aio(nni_duration, nni_aio *); +// nni_aio_completion_list is used after removing the aio from an +// active work queue, and keeping them so that the completions can +// be run in a deferred manner. These lists are simple, and intended +// to be used as local variables. It's important to initialize the +// list before using it. Also, any AIO added to a completion list must +// not be in active use anywhere. +typedef void *nni_aio_completions; + +// nni_aio_completions_init just initializes a completions list. +// This just sets the pointed value to NULL. +extern void nni_aio_completions_init(nni_aio_completions *); + +// nni_aio_completions_run runs nni_aio_finish_sync for all the aio objects +// that have been added to the completions. The result code and count used +// are those supplied in nni_aio_completions_add. Callers should not hold +// locks when calling this. +extern void nni_aio_completions_run(nni_aio_completions *); + +// nni_aio_completions_add adds an aio (with the result code and length as +// appropriate) to the completion list. This should be done while the +// appropriate lock is held. The aio must not be scheduled. +extern void nni_aio_completions_add(nni_aio_completions *, nni_aio *, + int, size_t); + extern int nni_aio_sys_init(void); extern void nni_aio_sys_fini(void); diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index 10f42724d..e7540deea 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // Copyright 2019 Nathan Kent // @@ -44,14 +44,14 @@ static void sub0_pipe_fini(void *); struct sub0_topic { nni_list_node node; size_t len; - void * buf; + void *buf; }; // sub0_ctx is a context for a SUB socket. The advantage of contexts is // that different contexts can maintain different subscriptions. struct sub0_ctx { nni_list_node node; - sub0_sock * sock; + sub0_sock *sock; nni_list topics; // TODO: Consider patricia trie nni_list recv_queue; // can have multiple pending receives nni_lmq lmq; @@ -71,7 +71,7 @@ struct sub0_sock { // sub0_pipe is our per-pipe protocol private structure. struct sub0_pipe { - nni_pipe * pipe; + nni_pipe *pipe; sub0_sock *sub; nni_aio aio_recv; }; @@ -79,7 +79,7 @@ struct sub0_pipe { static void sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; nni_mtx_lock(&sock->lk); if (nni_list_active(&ctx->recv_queue, aio)) { @@ -92,9 +92,9 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) static void sub0_ctx_recv(void *arg, nni_aio *aio) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; - nni_msg * msg; + nni_msg *msg; if (nni_aio_begin(aio) != 0) { return; @@ -140,9 +140,9 @@ sub0_ctx_send(void *arg, nni_aio *aio) static void sub0_ctx_close(void *arg) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; - nni_aio * aio; + nni_aio *aio; nni_mtx_lock(&sock->lk); while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { @@ -155,8 +155,8 @@ sub0_ctx_close(void *arg) static void sub0_ctx_fini(void *arg) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; + sub0_ctx *ctx = arg; + sub0_sock *sock = ctx->sock; sub0_topic *topic; sub0_ctx_close(ctx); @@ -179,7 +179,7 @@ static void sub0_ctx_init(void *ctx_arg, void *sock_arg) { sub0_sock *sock = sock_arg; - sub0_ctx * ctx = ctx_arg; + sub0_ctx *ctx = ctx_arg; size_t len; bool prefer_new; @@ -311,22 +311,22 @@ sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len) static void sub0_recv_cb(void *arg) { - sub0_pipe *p = arg; - sub0_sock *sock = p->sub; - sub0_ctx * ctx; - nni_msg * msg; - size_t len; - uint8_t * body; - nni_list finish; - nng_aio * aio; - nni_msg * dup_msg; + sub0_pipe *p = arg; + sub0_sock *sock = p->sub; + sub0_ctx *ctx; + nni_msg *msg; + size_t len; + uint8_t *body; + nng_aio *aio; + nni_msg *dup_msg; + nni_aio_completions finish; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - nni_aio_list_init(&finish); + nni_aio_completions_init(&finish); msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); @@ -370,7 +370,7 @@ sub0_recv_cb(void *arg) nni_aio_set_msg(aio, dup_msg); // Save for synchronous completion - nni_list_append(&finish, aio); + nni_aio_completions_add(&finish, aio, 0, len); } else if (nni_lmq_full(&ctx->lmq)) { // Make space for the new message. nni_msg *old; @@ -401,10 +401,7 @@ sub0_recv_cb(void *arg) nni_msg_free(msg); } - while ((aio = nni_list_first(&finish)) != NULL) { - nni_list_remove(&finish, aio); - nni_aio_finish_sync(aio, 0, len); - } + nni_aio_completions_run(&finish); nni_pipe_recv(p->pipe, &p->aio_recv); } @@ -412,7 +409,7 @@ sub0_recv_cb(void *arg) static int sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; int val; nni_mtx_lock(&sock->lk); @@ -425,7 +422,7 @@ sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) static int sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; int val; int rv; @@ -456,8 +453,8 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) static int sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; + sub0_ctx *ctx = arg; + sub0_sock *sock = ctx->sock; sub0_topic *topic; sub0_topic *new_topic; NNI_ARG_UNUSED(t); @@ -494,8 +491,8 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) static int sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; + sub0_ctx *ctx = arg; + sub0_sock *sock = ctx->sock; sub0_topic *topic; size_t len; NNI_ARG_UNUSED(t); @@ -540,7 +537,7 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) static int sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; bool val; @@ -554,7 +551,7 @@ sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) static int sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; bool val; int rv;