Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a simple SNDPRIO implemention on req0 #1743

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/nng/nng.h
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe);
#define NNG_OPT_RECVMAXSZ "recv-size-max"
#define NNG_OPT_RECONNMINT "reconnect-time-min"
#define NNG_OPT_RECONNMAXT "reconnect-time-max"
#define NNG_OPT_SNDPRIO "send-prio"

// TLS options are only used when the underlying transport supports TLS.

Expand Down
17 changes: 16 additions & 1 deletion src/core/dialer.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,15 @@ nni_dialer_setopt(
nni_mtx_unlock(&d->d_mtx);
return (rv);
}
if (strcmp(name, NNG_OPT_SNDPRIO) == 0) {
int rv;

nni_mtx_lock(&d->d_mtx);
rv = nni_copyin_int(&d->d_sndprio, val, sz, 1, 16, t);
nni_mtx_unlock(&d->d_mtx);
return (rv);
}

if (d->d_ops.d_setopt != NULL) {
int rv = d->d_ops.d_setopt(d->d_data, name, val, sz, t);
if (rv != NNG_ENOTSUP) {
Expand Down Expand Up @@ -523,7 +531,14 @@ nni_dialer_getopt(
nni_mtx_unlock(&d->d_mtx);
return (rv);
}

if (strcmp(name, NNG_OPT_SNDPRIO) == 0) {
int rv;
nni_mtx_lock(&d->d_mtx);
rv = nni_copyout_int(d->d_sndprio, valp, szp, t);
nni_mtx_unlock(&d->d_mtx);
return (rv);
}

if (d->d_ops.d_getopt != NULL) {
int rv = d->d_ops.d_getopt(d->d_data, name, valp, szp, t);
if (rv != NNG_ENOTSUP) {
Expand Down
13 changes: 13 additions & 0 deletions src/core/listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,13 @@ nni_listener_setopt(
return (NNG_EREADONLY);
}

if (strcmp(name, NNG_OPT_SNDPRIO) == 0) {
int rv;
rv = nni_copyin_int(&l->l_sndprio, val, sz, 1, 16, t);
return (rv);
}


if (l->l_ops.l_setopt != NULL) {
int rv = l->l_ops.l_setopt(l->l_data, name, val, sz, t);
if (rv != NNG_ENOTSUP) {
Expand Down Expand Up @@ -461,6 +468,12 @@ nni_listener_getopt(
{
nni_option *o;

if (strcmp(name, NNG_OPT_SNDPRIO) == 0) {
int rv;
rv = nni_copyout_int(l->l_sndprio, val, szp, t);
return (rv);
}

if (l->l_ops.l_getopt != NULL) {
int rv = l->l_ops.l_getopt(l->l_data, name, val, szp, t);
if (rv != NNG_ENOTSUP) {
Expand Down
1 change: 0 additions & 1 deletion src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1504,7 +1504,6 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_mtx_unlock(&s->s_mx);
return;
}

nni_list_append(&d->d_pipes, p);
nni_list_append(&s->s_pipes, p);
d->d_pipe = p;
Expand Down
2 changes: 2 additions & 0 deletions src/core/sockimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct nni_dialer {
nni_duration d_currtime; // current time for reconnect
nni_duration d_inirtime; // initial time for reconnect
nni_reap_node d_reap;
int d_sndprio;

#ifdef NNG_ENABLE_STATS
nni_stat_item st_root;
Expand Down Expand Up @@ -73,6 +74,7 @@ struct nni_listener {
nni_aio l_acc_aio;
nni_aio l_tmo_aio;
nni_reap_node l_reap;
int l_sndprio;

#ifdef NNG_ENABLE_STATS
nni_stat_item st_root;
Expand Down
38 changes: 34 additions & 4 deletions src/sp/protocol/reqrep0/req.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "core/nng_impl.h"
#include "nng/protocol/reqrep0/req.h"
#include "core/sockimpl.h"

// Request protocol. The REQ protocol is the "request" side of a
// request-reply pair. This is useful for building RPC clients, for example.
Expand Down Expand Up @@ -76,6 +77,7 @@ struct req0_pipe {
bool closed;
nni_aio aio_send;
nni_aio aio_recv;
int sndprio;
};

static void req0_sock_fini(void *);
Expand Down Expand Up @@ -190,18 +192,45 @@ req0_pipe_init(void *arg, nni_pipe *pipe, void *s)
return (0);
}

static void req0_pipe_insert_by_prio(nni_list *ready_pipes, req0_pipe *p){
req0_pipe *p2 = NULL;
int sndprio = p->sndprio;

if(sndprio < 1){
return nni_list_append(ready_pipes, p);
}

NNI_LIST_FOREACH(ready_pipes, p2) {
if (sndprio > p2->sndprio) {
nni_list_insert_before(ready_pipes, p, p2);
return ;
}
}
if (p2 == NULL) {
nni_list_append(ready_pipes, p);
}
}

static int
req0_pipe_start(void *arg)
{
req0_pipe *p = arg;
req0_sock *s = p->req;

int prio = 0;

if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) {
return (NNG_EPROTO);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option should be a generic option, not just on dialers, but on end points, and then configured on the pipe by the end point.

I also think that we should expand this to RECV priority as well, so that we can use the same option for REP to give preference to some peers. I would like to think on this a bit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, in the nanomsg code, I find the ep option when I call the nn_connect function, and then copy the socket option(ep_template) to pipe .
But in the nng code, I can't find the ep . Are the dialer and listener eps? If so, I will add option on the listener also or only add option on the socket, and then copy this option to dialer or listener on creating them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added SNDPRIO to listener also.

if(p->pipe->p_dialer && nni_dialer_getopt(p->pipe->p_dialer, NNG_OPT_SNDPRIO, &prio, NULL, NNI_TYPE_INT32) == 0 && prio > 0){
// the first time copy sndprio from dialer
p -> sndprio = prio;
}else if(p->pipe->p_listener && nni_listener_getopt(p->pipe->p_listener, NNG_OPT_SNDPRIO, &prio, NULL, NNI_TYPE_INT32) == 0 && prio > 0){
// the first time copy sndprio from listener
p -> sndprio = prio;
}
nni_mtx_lock(&s->mtx);
nni_list_append(&s->ready_pipes, p);
req0_pipe_insert_by_prio(&s->ready_pipes, p);
//nni_list_append(&s->ready_pipes, p);
nni_pollable_raise(&s->writable);
req0_run_send_queue(s, NULL);
nni_mtx_unlock(&s->mtx);
Expand Down Expand Up @@ -294,7 +323,8 @@ req0_send_cb(void *arg)
return;
}
nni_list_remove(&s->busy_pipes, p);
nni_list_append(&s->ready_pipes, p);
req0_pipe_insert_by_prio(&s->ready_pipes, p);
// nni_list_append(&s->ready_pipes, p);
if (nni_list_empty(&s->send_queue)) {
nni_pollable_raise(&s->writable);
}
Expand Down