From 1507d7cd58c93b9d71644e2a6657d7b8eaf91c05 Mon Sep 17 00:00:00 2001 From: hoytluo Date: Tue, 26 Dec 2023 20:31:25 +0800 Subject: [PATCH 1/6] Update nng.h --- include/nng/nng.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/nng/nng.h b/include/nng/nng.h index ce58fb3aa..613a27983 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -706,6 +706,7 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe); #define NNG_OPT_RECVMAXSZ "recv-size-max" #define NNG_OPT_RECONNMINT "reconnect-time-min" #define NNG_OPT_RECONNMAXT "reconnect-time-max" +#define NNG_OPT_SNDPRIO "send-prio" // TLS options are only used when the underlying transport supports TLS. From 1637530104af62f4d488e2cfa06c59dcfec27633 Mon Sep 17 00:00:00 2001 From: hoytluo Date: Tue, 26 Dec 2023 20:32:49 +0800 Subject: [PATCH 2/6] Update sockimpl.h --- src/core/sockimpl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index e568f37fa..1e675098f 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -37,7 +37,7 @@ struct nni_dialer { nni_duration d_currtime; // current time for reconnect nni_duration d_inirtime; // initial time for reconnect nni_reap_node d_reap; - + int sndprio; #ifdef NNG_ENABLE_STATS nni_stat_item st_root; nni_stat_item st_id; From bf69392f7a8f9a6a86f8d7b18587f41de6e0bdf7 Mon Sep 17 00:00:00 2001 From: hoytluo Date: Tue, 26 Dec 2023 20:35:18 +0800 Subject: [PATCH 3/6] Update dialer.c add SNDPRIO option --- src/core/dialer.c | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/core/dialer.c b/src/core/dialer.c index 55a46efb3..d7f8aac5a 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -481,7 +481,15 @@ nni_dialer_setopt( nni_mtx_unlock(&d->d_mtx); return (rv); } + if (strcmp(name, NNG_OPT_SNDPRIO) == 0) { + int rv; + nni_mtx_lock(&d->d_mtx); + rv = nni_copyin_int(&d->sndprio, val, sz, 1, 16, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + if (d->d_ops.d_setopt != NULL) { int rv = d->d_ops.d_setopt(d->d_data, name, val, sz, t); if (rv != NNG_ENOTSUP) { @@ -523,7 +531,14 @@ nni_dialer_getopt( nni_mtx_unlock(&d->d_mtx); return (rv); } - + if (strcmp(name, NNG_OPT_SNDPRIO) == 0) { + int rv; + nni_mtx_lock(&d->d_mtx); + rv = nni_copyout_int(d->sndprio, valp, szp, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + if (d->d_ops.d_getopt != NULL) { int rv = d->d_ops.d_getopt(d->d_data, name, valp, szp, t); if (rv != NNG_ENOTSUP) { From 0b809919c37c8920bdf1485fa9227c9a6b01d4a9 Mon Sep 17 00:00:00 2001 From: hoytluo Date: Tue, 26 Dec 2023 20:39:19 +0800 Subject: [PATCH 4/6] Update req.c add SNDPRO on req0 --- src/sp/protocol/reqrep0/req.c | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/src/sp/protocol/reqrep0/req.c b/src/sp/protocol/reqrep0/req.c index 7e3e3db81..7a4592263 100644 --- a/src/sp/protocol/reqrep0/req.c +++ b/src/sp/protocol/reqrep0/req.c @@ -76,6 +76,7 @@ struct req0_pipe { bool closed; nni_aio aio_send; nni_aio aio_recv; + int sndprio; }; static void req0_sock_fini(void *); @@ -190,18 +191,42 @@ req0_pipe_init(void *arg, nni_pipe *pipe, void *s) return (0); } +static void req0_pipe_insert_by_prio(nni_list *ready_pipes, req0_pipe *p){ + req0_pipe *p2 = NULL; + int sndprio = p->sndprio; + + if(sndprio < 1){ + return nni_list_append(ready_pipes, p); + } + + NNI_LIST_FOREACH(ready_pipes, p2) { + if (sndprio > p2->sndprio) { + nni_list_insert_before(ready_pipes, p, p2); + return ; + } + } + if (p2 == NULL) { + nni_list_append(ready_pipes, p); + } +} + static int req0_pipe_start(void *arg) { req0_pipe *p = arg; req0_sock *s = p->req; - + int prio = 0; + if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) { return (NNG_EPROTO); } - + if(p->pipe->p_dialer && nni_dialer_getopt(p->pipe->p_dialer, NNG_OPT_SNDPRIO, &prio, NULL, NNI_TYPE_INT32) == 0 && prio > 0){ + // the first time copy sndprio from dialer + p -> sndprio = prio; + } nni_mtx_lock(&s->mtx); - nni_list_append(&s->ready_pipes, p); + req0_pipe_insert_by_prio(&s->ready_pipes, p); + //nni_list_append(&s->ready_pipes, p); nni_pollable_raise(&s->writable); req0_run_send_queue(s, NULL); nni_mtx_unlock(&s->mtx); @@ -294,7 +319,8 @@ req0_send_cb(void *arg) return; } nni_list_remove(&s->busy_pipes, p); - nni_list_append(&s->ready_pipes, p); + req0_pipe_insert_by_prio(&s->ready_pipes, p); + // nni_list_append(&s->ready_pipes, p); if (nni_list_empty(&s->send_queue)) { nni_pollable_raise(&s->writable); } From a00325dfdb1ce12c7ff89469410e657ebc49212f Mon Sep 17 00:00:00 2001 From: hoytluo Date: Tue, 26 Dec 2023 20:34:15 +0800 Subject: [PATCH 5/6] add header file --- src/sp/protocol/reqrep0/req.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sp/protocol/reqrep0/req.c b/src/sp/protocol/reqrep0/req.c index 7a4592263..0f942e608 100644 --- a/src/sp/protocol/reqrep0/req.c +++ b/src/sp/protocol/reqrep0/req.c @@ -11,6 +11,7 @@ #include "core/nng_impl.h" #include "nng/protocol/reqrep0/req.h" +#include "core/sockimpl.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. From 7a43d2f198f289f9c18ebdcec6be58caafb5163e Mon Sep 17 00:00:00 2001 From: hoytluo Date: Mon, 8 Jan 2024 18:01:20 +0800 Subject: [PATCH 6/6] add on listener --- src/core/dialer.c | 4 ++-- src/core/listener.c | 13 +++++++++++++ src/core/socket.c | 1 - src/core/sockimpl.h | 4 +++- src/sp/protocol/reqrep0/req.c | 3 +++ 5 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/core/dialer.c b/src/core/dialer.c index d7f8aac5a..60d7d2228 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -485,7 +485,7 @@ nni_dialer_setopt( int rv; nni_mtx_lock(&d->d_mtx); - rv = nni_copyin_int(&d->sndprio, val, sz, 1, 16, t); + rv = nni_copyin_int(&d->d_sndprio, val, sz, 1, 16, t); nni_mtx_unlock(&d->d_mtx); return (rv); } @@ -534,7 +534,7 @@ nni_dialer_getopt( if (strcmp(name, NNG_OPT_SNDPRIO) == 0) { int rv; nni_mtx_lock(&d->d_mtx); - rv = nni_copyout_int(d->sndprio, valp, szp, t); + rv = nni_copyout_int(d->d_sndprio, valp, szp, t); nni_mtx_unlock(&d->d_mtx); return (rv); } diff --git a/src/core/listener.c b/src/core/listener.c index 7d3e3e0d4..4eca95db6 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -434,6 +434,13 @@ nni_listener_setopt( return (NNG_EREADONLY); } + if (strcmp(name, NNG_OPT_SNDPRIO) == 0) { + int rv; + rv = nni_copyin_int(&l->l_sndprio, val, sz, 1, 16, t); + return (rv); + } + + if (l->l_ops.l_setopt != NULL) { int rv = l->l_ops.l_setopt(l->l_data, name, val, sz, t); if (rv != NNG_ENOTSUP) { @@ -461,6 +468,12 @@ nni_listener_getopt( { nni_option *o; + if (strcmp(name, NNG_OPT_SNDPRIO) == 0) { + int rv; + rv = nni_copyout_int(l->l_sndprio, val, szp, t); + return (rv); + } + if (l->l_ops.l_getopt != NULL) { int rv = l->l_ops.l_getopt(l->l_data, name, val, szp, t); if (rv != NNG_ENOTSUP) { diff --git a/src/core/socket.c b/src/core/socket.c index 1f44ebfa3..38f5009c4 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1504,7 +1504,6 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_mtx_unlock(&s->s_mx); return; } - nni_list_append(&d->d_pipes, p); nni_list_append(&s->s_pipes, p); d->d_pipe = p; diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 1e675098f..c084f5a44 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -37,7 +37,8 @@ struct nni_dialer { nni_duration d_currtime; // current time for reconnect nni_duration d_inirtime; // initial time for reconnect nni_reap_node d_reap; - int sndprio; + int d_sndprio; + #ifdef NNG_ENABLE_STATS nni_stat_item st_root; nni_stat_item st_id; @@ -73,6 +74,7 @@ struct nni_listener { nni_aio l_acc_aio; nni_aio l_tmo_aio; nni_reap_node l_reap; + int l_sndprio; #ifdef NNG_ENABLE_STATS nni_stat_item st_root; diff --git a/src/sp/protocol/reqrep0/req.c b/src/sp/protocol/reqrep0/req.c index 0f942e608..d5bd5dc61 100644 --- a/src/sp/protocol/reqrep0/req.c +++ b/src/sp/protocol/reqrep0/req.c @@ -224,6 +224,9 @@ req0_pipe_start(void *arg) if(p->pipe->p_dialer && nni_dialer_getopt(p->pipe->p_dialer, NNG_OPT_SNDPRIO, &prio, NULL, NNI_TYPE_INT32) == 0 && prio > 0){ // the first time copy sndprio from dialer p -> sndprio = prio; + }else if(p->pipe->p_listener && nni_listener_getopt(p->pipe->p_listener, NNG_OPT_SNDPRIO, &prio, NULL, NNI_TYPE_INT32) == 0 && prio > 0){ + // the first time copy sndprio from listener + p -> sndprio = prio; } nni_mtx_lock(&s->mtx); req0_pipe_insert_by_prio(&s->ready_pipes, p);