Skip to content

Commit

Permalink
pipe: allocate and destroy pipe transport data with common pipe data
Browse files Browse the repository at this point in the history
This updates the pipe to use contiguous data for the transport data
as well as the pipe protocol data.  It updates sockfd to use this, and
eliminates the need for the sockfd transport to do its own asynchronous
reaping, thereby hopefully closing a shutdown race.

The other transports will shortly get the same treatment.
  • Loading branch information
gdamore committed Dec 2, 2024
1 parent 635be5c commit 2e14f13
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 111 deletions.
62 changes: 51 additions & 11 deletions src/core/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

#include <nng/nng.h>

#include "core/defs.h"
#include "core/nng_impl.h"

#include "core/pipe.h"
#include "dialer.h"
#include "listener.h"
#include "sockimpl.h"
Expand Down Expand Up @@ -233,28 +235,42 @@ pipe_stats_init(nni_pipe *p)
static int
pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
{
nni_pipe *p;
int rv;
void *sock_data = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
size_t sz;
nni_pipe *p;
int rv;
void *sock_data = nni_sock_proto_data(sock);
const nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
const nni_sp_pipe_ops *tops = tran->tran_pipe;
size_t sz;

sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size;
sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) +
NNI_ALIGN_UP(tops->p_size);

if ((p = nni_zalloc(sz)) == NULL) {
// In this case we just toss the pipe...
tran->tran_pipe->p_fini(tran_data);
// TODO: remove when all transports converted
// to use p_size.
if (tran_data != NULL) {
tops->p_fini(tran_data);
}

Check warning on line 254 in src/core/pipe.c

View check run for this annotation

Codecov / codecov/patch

src/core/pipe.c#L253-L254

Added lines #L253 - L254 were not covered by tests
return (NNG_ENOMEM);
}

uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p));

if (tran_data == NULL) {
tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size);
}

p->p_size = sz;
p->p_proto_data = p + 1;
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tran_data;
p->p_proto_data = proto_data;
p->p_proto_ops = *pops;
p->p_tran_ops = *tops;
p->p_tran_data = tran_data;
p->p_sock = sock;
p->p_cbs = false;

// Two references - one for our caller, and
// one to be dropped when the pipe is closed.
nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy);

nni_atomic_init_bool(&p->p_closed);
Expand All @@ -272,7 +288,7 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)

nni_sock_hold(sock);

if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) ||
if ((rv != 0) || ((rv = tops->p_init(tran_data, p)) != 0) ||
((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) {
nni_pipe_close(p);
nni_pipe_rele(p);
Expand Down Expand Up @@ -308,6 +324,30 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data)
return (0);
}

void *
nni_pipe_tran_data(nni_pipe *p)
{
return (p->p_tran_data);
}

int
nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l)
{
int rv;
nni_pipe *p;
if (d != NULL) {
rv = nni_pipe_create_dialer(&p, d, NULL);

Check warning on line 339 in src/core/pipe.c

View check run for this annotation

Codecov / codecov/patch

src/core/pipe.c#L339

Added line #L339 was not covered by tests
} else if (l != NULL) {
rv = nni_pipe_create_listener(&p, l, NULL);
} else {
rv = NNG_EINVAL;

Check warning on line 343 in src/core/pipe.c

View check run for this annotation

Codecov / codecov/patch

src/core/pipe.c#L343

Added line #L343 was not covered by tests
}
if (rv == 0) {
*datap = nni_pipe_tran_data(p);
}
return (rv);
}

int
nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data)
{
Expand Down
4 changes: 3 additions & 1 deletion src/core/pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
// OUTSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
// TRANSPORTS.

#include "nng/nng.h"

#include "core/defs.h"
#include "core/thread.h"
#include "nng/nng.h"
#include "sp/transport.h"

// AIO
Expand Down Expand Up @@ -62,5 +63,6 @@ extern void nni_pipe_bump_tx(nni_pipe *, size_t);
extern void nni_pipe_bump_error(nni_pipe *, int);

extern char *nni_pipe_peer_addr(nni_pipe *p, char buf[NNG_MAXADDRSTRLEN]);
extern void *nni_pipe_tran_data(nni_pipe *);

#endif // CORE_PIPE_H
10 changes: 7 additions & 3 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1272,9 +1272,13 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_pipe *p;

nni_mtx_lock(&s->s_mx);
if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
nni_mtx_unlock(&s->s_mx);
return;
if (l->l_tran->tran_pipe->p_size != 0) {
p = tpipe;
} else {
if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
nni_mtx_unlock(&s->s_mx);
return;

Check warning on line 1280 in src/core/socket.c

View check run for this annotation

Codecov / codecov/patch

src/core/socket.c#L1279-L1280

Added lines #L1279 - L1280 were not covered by tests
}
}

nni_list_append(&l->l_pipes, p);
Expand Down
17 changes: 13 additions & 4 deletions src/sp/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#ifndef PROTOCOL_SP_TRANSPORT_H
#define PROTOCOL_SP_TRANSPORT_H

#include "core/defs.h"
#include "core/list.h"
#include "core/options.h"

// Endpoint operations are called by the socket in a
Expand Down Expand Up @@ -118,9 +120,14 @@ struct nni_sp_listener_ops {
// pointers back to socket or even enclosing pipe state, are not
// provided.)
struct nni_sp_pipe_ops {
// p_init initializes the pipe data structures. The main
// purpose of this is so that the pipe will see the upper
// layer nni_pipe and get a chance to register stats and such.
// p_size specifies the size of the transport private pipe data.
size_t p_size;

// p_init initializes the transport's pipe data structure.
// The pipe MUST be left in a state that p_fini can be safely
// called on it, even if it does not succeed. (The upper layers
// will call p_fini as part of the cleanup of a failure.)
// This function should not acquire any locks.
int (*p_init)(void *, nni_pipe *);

// p_fini destroys the pipe. This should clean up all local
Expand All @@ -134,7 +141,7 @@ struct nni_sp_pipe_ops {
// resources with p_fini.
void (*p_stop)(void *);

// p_aio_send queues the message for transmit. If this fails,
// p_send queues the message for transmit. If this fails,
// then the caller may try again with the same message (or free
// it). If the call succeeds, then the transport has taken
// ownership of the message, and the caller may not use it
Expand Down Expand Up @@ -194,4 +201,6 @@ extern void nni_sp_tran_sys_init(void);
extern void nni_sp_tran_sys_fini(void);
extern void nni_sp_tran_register(nni_sp_tran *);

extern int nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l);

#endif // PROTOCOL_SP_TRANSPORT_H
Loading

0 comments on commit 2e14f13

Please sign in to comment.