Skip to content

Commit

Permalink
pipes: make separate dialer/listener pipe allocators.
Browse files Browse the repository at this point in the history
Also, includes a few fixes for the sockfd transport.
  • Loading branch information
gdamore committed Dec 4, 2024
1 parent a00a9fc commit f0595c8
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 55 deletions.
102 changes: 61 additions & 41 deletions src/core/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,12 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
return (0);
}

int
nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data)
static void
pipe_init_dialer(nni_pipe *p, nni_dialer *d)
{
int rv;
nni_sp_tran *tran = d->d_tran;
nni_pipe *p;

if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) {
return (rv);
}
p->p_dialer = d;
nni_sock_hold(d->d_sock);
nni_dialer_hold(d);
#ifdef NNG_ENABLE_STATS
static const nni_stat_info dialer_info = {
.si_name = "dialer",
Expand All @@ -312,34 +307,38 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data)
pipe_stat_init(p, &p->st_ep_id, &dialer_info);
nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d));
#endif
nni_sock_hold(d->d_sock);
nni_dialer_hold(d);
*pp = p;
return (0);
}

void *
nni_pipe_tran_data(nni_pipe *p)
static void
pipe_init_listener(nni_pipe *p, nni_listener *l)
{
return (p->p_tran_data);
p->p_listener = l;
nni_listener_hold(l);
nni_sock_hold(l->l_sock);
#ifdef NNG_ENABLE_STATS
static const nni_stat_info listener_info = {
.si_name = "listener",
.si_desc = "listener for pipe",
.si_type = NNG_STAT_ID,
};
pipe_stat_init(p, &p->st_ep_id, &listener_info);
nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l));
#endif
}

int
nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l)
nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data)
{
int rv;
nni_pipe *p;
if (d != NULL) {
rv = nni_pipe_create_dialer(&p, d, NULL);
} else if (l != NULL) {
rv = nni_pipe_create_listener(&p, l, NULL);
} else {
rv = NNG_EINVAL;
}
if (rv == 0) {
*datap = nni_pipe_tran_data(p);
int rv;
nni_sp_tran *tran = d->d_tran;
nni_pipe *p;

if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) {
return (rv);
}
return (rv);
pipe_init_dialer(p, d);
*pp = p;
return (0);
}

int
Expand All @@ -352,22 +351,43 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data)
if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) {
return (rv);
}
p->p_listener = l;
#ifdef NNG_ENABLE_STATS
static const nni_stat_info listener_info = {
.si_name = "listener",
.si_desc = "listener for pipe",
.si_type = NNG_STAT_ID,
};
pipe_stat_init(p, &p->st_ep_id, &listener_info);
nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l));
#endif
nni_listener_hold(l);
nni_sock_hold(l->l_sock);
pipe_init_listener(p, l);
*pp = p;
return (0);
}

int
nni_pipe_alloc_dialer(void **datap, nni_dialer *d)
{
int rv;
nni_sp_tran *tran = d->d_tran;
nni_sock *s = d->d_sock;
nni_pipe *p;

if ((rv = pipe_create(&p, s, tran, NULL)) != 0) {
return (rv);
}
pipe_init_dialer(p, d);
*datap = p->p_tran_data;
return (0);
}

int
nni_pipe_alloc_listener(void **datap, nni_listener *l)
{
int rv;
nni_sp_tran *tran = l->l_tran;
nni_sock *s = l->l_sock;
nni_pipe *p;

if ((rv = pipe_create(&p, s, tran, NULL)) != 0) {
return (rv);
}
pipe_init_listener(p, l);
*datap = p->p_tran_data;
return (0);
}

int
nni_pipe_getopt(
nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
Expand Down
4 changes: 3 additions & 1 deletion src/core/pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ extern void nni_pipe_bump_tx(nni_pipe *, size_t);
extern void nni_pipe_bump_error(nni_pipe *, int);

extern char *nni_pipe_peer_addr(nni_pipe *p, char buf[NNG_MAXADDRSTRLEN]);
extern void *nni_pipe_tran_data(nni_pipe *);

extern int nni_pipe_alloc_dialer(void **, nni_dialer *);
extern int nni_pipe_alloc_listener(void **, nni_listener *);

#endif // CORE_PIPE_H
2 changes: 0 additions & 2 deletions src/sp/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,4 @@ extern void nni_sp_tran_sys_init(void);
extern void nni_sp_tran_sys_fini(void);
extern void nni_sp_tran_register(nni_sp_tran *);

extern int nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l);

#endif // PROTOCOL_SP_TRANSPORT_H
19 changes: 8 additions & 11 deletions src/sp/transport/socket/sockfd.c
Original file line number Diff line number Diff line change
Expand Up @@ -661,21 +661,20 @@ sfd_tran_accept_cb(void *arg)
if ((rv = nni_aio_result(aio)) != 0) {
goto error;
}

conn = nni_aio_get_output(aio, 0);
if ((rv = nni_sp_pipe_alloc((void **) &p, NULL, ep->nlistener)) != 0) {
if (ep->closed) {
nng_stream_free(conn);
rv = NNG_ECLOSED;
goto error;
}
p->conn = conn;
p->ep = ep;

if (ep->closed) {
nni_pipe_close(p->npipe);
nni_pipe_rele(p->npipe);
rv = NNG_ECLOSED;
if ((rv = nni_pipe_alloc_listener((void **) &p, ep->nlistener)) != 0) {
nng_stream_free(conn);
goto error;
}
p->conn = conn;
p->ep = ep;

sfd_tran_pipe_start(p, conn, ep);
nng_stream_listener_accept(ep->listener, &ep->connaio);
nni_mtx_unlock(&ep->mtx);
Expand Down Expand Up @@ -829,15 +828,13 @@ sfd_tran_ep_accept(void *arg, nni_aio *aio)
if ((rv = nni_aio_schedule(aio, sfd_tran_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
}
ep->useraio = aio;
if (!ep->started) {
ep->started = true;
nng_stream_listener_accept(ep->listener, &ep->connaio);
} else {
sfd_tran_ep_match(ep);
}
sfd_tran_ep_match(ep);
nni_mtx_unlock(&ep->mtx);
}

Expand Down

0 comments on commit f0595c8

Please sign in to comment.