Skip to content

Commit

Permalink
fixes #1572 nng creates too many threads
Browse files Browse the repository at this point in the history
This further limits some of the thread counts, but principally it
offers a new runtime facility, nng_init_set_parameter(), which can
be used to set certain runtime parameters on the number of threads,
provided it is called before the rest of application start up.

This facility is quite intentionally "undocumented", at least for now,
as we want to limit our commitment to it.  Still this should be helpful
for applications that need to reduce the number of threads that are
created.
  • Loading branch information
gdamore committed Jan 1, 2024
1 parent 07ad78c commit e2ada26
Show file tree
Hide file tree
Showing 12 changed files with 415 additions and 134 deletions.
22 changes: 18 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2023 Staysail Systems, Inc. <[email protected]>
# Copyright 2024 Staysail Systems, Inc. <[email protected]>
# Copyright (c) 2012 Martin Sustrik All rights reserved.
# Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
# Copyright (c) 2015-2016 Jack R. Dunaway. All rights reserved.
Expand Down Expand Up @@ -114,24 +114,32 @@ endif ()

nng_defines_if(NNG_ENABLE_STATS NNG_ENABLE_STATS)

set(NNG_RESOLV_CONCURRENCY 4 CACHE STRING "Resolver (DNS) concurrency.")
mark_as_advanced(NNG_RESOLV_CONCURRENCY)
if (NNG_RESOLV_CONCURRENCY)
add_definitions(-DNNG_RESOLV_CONCURRENCY=${NNG_RESOLV_CONCURRENCY})
endif ()
mark_as_advanced(NNG_RESOLV_CONCURRENCY)

set(NNG_NUM_TASKQ_THREADS 0 CACHE STRING "Fixed number of task threads, 0 for automatic")
mark_as_advanced(NNG_NUM_TASKQ_THREADS)
if (NNG_NUM_TASKQ_THREADS)
add_definitions(-DNNG_NUM_TASKQ_THREADS=${NNG_NUM_TASKQ_THREADS})
endif ()
mark_as_advanced(NNG_NUM_TASKQ_THREADS)

set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on taskq threads, 0 for no limit")
set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on task threads, 0 for no limit")
mark_as_advanced(NNG_MAX_TASKQ_THREADS)
if (NNG_MAX_TASKQ_THREADS)
add_definitions(-DNNG_MAX_TASKQ_THREADS=${NNG_MAX_TASKQ_THREADS})
endif ()

# Expire threads. This runs the timeout handling, and having more of them
# reduces contention on the common locks used for aio expiration.
set(NNG_NUM_EXPIRE_THREADS 0 CACHE STRING "Fixed number of expire threads, 0 for automatic")
mark_as_advanced(NNG_NUM_EXPIRE_THREADS)
if (NNG_NUM_EXPIRE_THREADS)
add_definitions(-DNNG_NUM_EXPIRE_THREADS=${NNG_NUM_EXPIRE_THREADS})
endif ()

set(NNG_MAX_EXPIRE_THREADS 8 CACHE STRING "Upper bound on expire threads, 0 for no limit")
mark_as_advanced(NNG_MAX_EXPIRE_THREADS)
if (NNG_MAX_EXPIRE_THREADS)
Expand All @@ -140,6 +148,12 @@ endif()

# Poller threads. These threads run the pollers. This is mostly used
# on Windows right now, as the POSIX platforms use a single threaded poller.
set(NNG_NUM_POLLER_THREADS 0 CACHE STRING "Fixed number of I/O poller threads, 0 for automatic")
if (NNG_NUM_POLLER_THREADS)
add_definitions(-DNNG_NUM_POLLER_THREADS=${NNG_NUM_POLLER_THREADS})
endif ()
mark_as_advanced(NNG_NUM_POLLER_THREADS)

set(NNG_MAX_POLLER_THREADS 8 CACHE STRING "Upper bound on I/O poller threads, 0 for no limit")
mark_as_advanced(NNG_MAX_POLLER_THREADS)
if (NNG_MAX_POLLER_THREADS)
Expand Down
234 changes: 172 additions & 62 deletions include/nng/nng.h

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2021 Staysail Systems, Inc. <[email protected]>
# Copyright 2024 Staysail Systems, Inc. <[email protected]>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
Expand Down Expand Up @@ -78,6 +78,7 @@ nng_test(aio_test)
nng_test(buf_size_test)
nng_test(errors_test)
nng_test(id_test)
nng_test(init_test)
nng_test(list_test)
nng_test(message_test)
nng_test(reconnect_test)
Expand Down
28 changes: 19 additions & 9 deletions src/core/aio.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -843,18 +843,28 @@ int
nni_aio_sys_init(void)
{
int num_thr;
int max_thr;

#ifndef NNG_NUM_EXPIRE_THREADS
num_thr = nni_plat_ncpu();
#else
num_thr = NNG_NUM_EXPIRE_THREADS;
#ifndef NNG_MAX_EXPIRE_THREADS
#define NNG_MAX_EXPIRE_THREADS 8
#endif
#if NNG_MAX_EXPIRE_THREADS > 0
if (num_thr > NNG_MAX_EXPIRE_THREADS) {
num_thr = NNG_MAX_EXPIRE_THREADS;
}

#ifndef NNG_NUM_EXPIRE_THREADS
#define NNG_NUM_EXPIRE_THREADS nni_plat_ncpu()
#endif

max_thr = (int) nni_init_get_param(
NNG_INIT_MAX_EXPIRE_THREADS, NNG_MAX_EXPIRE_THREADS);

num_thr = (int) nni_init_get_param(
NNG_INIT_NUM_EXPIRE_THREADS, NNG_NUM_EXPIRE_THREADS);

if ((max_thr > 0) && (num_thr > max_thr)) {
num_thr = max_thr;

Check warning on line 863 in src/core/aio.c

View check run for this annotation

Codecov / codecov/patch

src/core/aio.c#L863

Added line #L863 was not covered by tests
}
if (num_thr < 0) {
num_thr = 1;

Check warning on line 866 in src/core/aio.c

View check run for this annotation

Codecov / codecov/patch

src/core/aio.c#L866

Added line #L866 was not covered by tests
}
nni_aio_expire_q_list =
nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr);
nni_aio_expire_q_cnt = num_thr;
Expand Down
62 changes: 61 additions & 1 deletion src/core/init.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -54,10 +54,69 @@ nni_init(void)
return (nni_plat_init(nni_init_helper));
}

// accessing the list of parameters
typedef struct nni_init_param {
nni_list_node node;
nng_init_parameter param;
uint64_t value;
} nni_init_param;

static nni_list nni_init_params =
NNI_LIST_INITIALIZER(nni_init_params, nni_init_param, node);

void
nni_init_set_param(nng_init_parameter p, uint64_t value)
{
if (nni_inited) {
// this is paranoia -- if some library code started already
// then we cannot safely change parameters, and modifying the list is not
// thread safe.
return;

Check warning on line 74 in src/core/init.c

View check run for this annotation

Codecov / codecov/patch

src/core/init.c#L74

Added line #L74 was not covered by tests
}
nni_init_param *item;
NNI_LIST_FOREACH (&nni_init_params, item) {
if (item->param == p) {
item->value = value;
return;

Check warning on line 80 in src/core/init.c

View check run for this annotation

Codecov / codecov/patch

src/core/init.c#L78-L80

Added lines #L78 - L80 were not covered by tests
}
}
if ((item = NNI_ALLOC_STRUCT(item)) != NULL) {
item->param = p;
item->value = value;
nni_list_append(&nni_init_params, item);
}
}

uint64_t
nni_init_get_param(nng_init_parameter p, uint64_t default_value)
{
nni_init_param *item;
NNI_LIST_FOREACH (&nni_init_params, item) {
if (item->param == p) {
return (item->value);
}
}
return (default_value);
}

static void
nni_init_params_fini(void)
{
nni_init_param *item;
printf("FINI\n");
while ((item = nni_list_first(&nni_init_params)) != NULL) {
printf("DOING a removal of %d", (int)item->param);
nni_list_remove(&nni_init_params, item);
NNI_FREE_STRUCT(item);
}
}

void
nni_fini(void)
{
if (!nni_inited) {
// make sure we discard parameters even if we didn't startup
nni_init_params_fini();
return;
}
nni_sp_tran_sys_fini();
Expand All @@ -67,6 +126,7 @@ nni_fini(void)
nni_taskq_sys_fini();
nni_reap_sys_fini(); // must be before timer and aio (expire)
nni_id_map_sys_fini();
nni_init_params_fini();

nni_plat_fini();
nni_inited = false;
Expand Down
11 changes: 9 additions & 2 deletions src/core/init.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2017 Capitar IT Group BV <[email protected]>
// Copyright 2017 Staysail Systems, Inc. <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
Expand All @@ -23,4 +22,12 @@ int nni_init(void);
// that all resources used by the library are released back to the system.
void nni_fini(void);

// nni_init_param is used by applications (via nng_init_param) to configure
// some tunable settings at runtime. It must be called before any other NNG
// functions are called, in order to have any effect at all.
void nni_init_set_param(nng_init_parameter, uint64_t value);

// subsystems can call this to obtain a parameter value.
uint64_t nni_init_get_param(nng_init_parameter parameter, uint64_t default_value);

#endif // CORE_INIT_H
27 changes: 27 additions & 0 deletions src/core/init_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

#include <nuts.h>
#include "init.h"

void
test_init_param(void)
{
NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 456) == 456);
nng_init_set_parameter(NNG_INIT_PARAMETER_NONE, 123);
NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123);
nng_fini();
NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 567);
}


NUTS_TESTS = {
{ "init parameter", test_init_param },
{ NULL, NULL },
};
31 changes: 21 additions & 10 deletions src/core/taskq.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2022 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -245,20 +245,31 @@ nni_task_fini(nni_task *task)
int
nni_taskq_sys_init(void)
{
int nthrs;
int num_thr;
int max_thr;

#ifndef NNG_NUM_TASKQ_THREADS
nthrs = nni_plat_ncpu() * 2;
#else
nthrs = NNG_NUM_TASKQ_THREADS;
#define NNG_NUM_TASKQ_THREADS (nni_plat_ncpu() * 2)
#endif
#if NNG_MAX_TASKQ_THREADS > 0
if (nthrs > NNG_MAX_TASKQ_THREADS) {
nthrs = NNG_MAX_TASKQ_THREADS;
}

#ifndef NNG_MAX_TASKQ_THREADS
#define NNG_MAX_TASKQ_THREADS 16
#endif

return (nni_taskq_init(&nni_taskq_systq, nthrs));
max_thr = (int) nni_init_get_param(
NNG_INIT_MAX_TASK_THREADS, NNG_MAX_TASKQ_THREADS);

num_thr = (int) nni_init_get_param(
NNG_INIT_NUM_TASK_THREADS, NNG_NUM_TASKQ_THREADS);

if (num_thr > max_thr) {
num_thr = max_thr;

Check warning on line 266 in src/core/taskq.c

View check run for this annotation

Codecov / codecov/patch

src/core/taskq.c#L266

Added line #L266 was not covered by tests
}
if (num_thr < 2) {
num_thr = 2;

Check warning on line 269 in src/core/taskq.c

View check run for this annotation

Codecov / codecov/patch

src/core/taskq.c#L269

Added line #L269 was not covered by tests
}

return (nni_taskq_init(&nni_taskq_systq, num_thr));
}

void
Expand Down
6 changes: 6 additions & 0 deletions src/nng.c
Original file line number Diff line number Diff line change
Expand Up @@ -2011,3 +2011,9 @@ nng_version(void)
return (xstr(NNG_MAJOR_VERSION) "." xstr(NNG_MINOR_VERSION) "." xstr(
NNG_PATCH_VERSION) NNG_RELEASE_SUFFIX);
}

void
nng_init_set_parameter(nng_init_parameter p, uint64_t value)
{
nni_init_set_param(p, value);
}
37 changes: 26 additions & 11 deletions src/platform/posix/posix_resolv_gai.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand All @@ -25,14 +25,10 @@
// for it to ensure that names can be looked up concurrently. This isn't
// as elegant or scalable as a true asynchronous resolver would be, but
// it has the advantage of being fairly portable, and concurrent enough for
// the vast, vast majority of use cases. The total thread count can be
// the vast majority of use cases. The total thread count can be
// changed with this define. Note that some platforms may not have a
// thread-safe getaddrinfo(). In that case they should set this to 1.

#ifndef NNG_RESOLV_CONCURRENCY
#define NNG_RESOLV_CONCURRENCY 4
#endif

#ifndef AI_NUMERICSERV
#define AI_NUMERICSERV 0
#endif
Expand All @@ -41,7 +37,8 @@ static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER;
static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx);
static bool resolv_fini = false;
static nni_list resolv_aios;
static nni_thr resolv_thrs[NNG_RESOLV_CONCURRENCY];
static nni_thr *resolv_thrs;
static int resolv_num_thr;

typedef struct resolv_item resolv_item;
struct resolv_item {
Expand Down Expand Up @@ -450,14 +447,29 @@ nni_posix_resolv_sysinit(void)
resolv_fini = false;
nni_aio_list_init(&resolv_aios);

for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) {
#ifndef NNG_RESOLV_CONCURRENCY
#define NNG_RESOLV_CONCURRENCY 4
#endif

resolv_num_thr = nni_init_get_param(
NNG_INIT_NUM_RESOLVER_THREADS, NNG_RESOLV_CONCURRENCY);
if (resolv_num_thr < 1) {
resolv_num_thr = 1;

Check warning on line 457 in src/platform/posix/posix_resolv_gai.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_resolv_gai.c#L457

Added line #L457 was not covered by tests
}
// no limit on the maximum for now
resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr);
if (resolv_thrs == NULL) {
return (NNG_ENOMEM);

Check warning on line 462 in src/platform/posix/posix_resolv_gai.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_resolv_gai.c#L462

Added line #L462 was not covered by tests
}

for (int i = 0; i < resolv_num_thr; i++) {
int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
if (rv != 0) {
nni_posix_resolv_sysfini();
return (rv);
}
}
for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) {
for (int i = 0; i < resolv_num_thr; i++) {
nni_thr_run(&resolv_thrs[i]);
}

Expand All @@ -472,8 +484,11 @@ nni_posix_resolv_sysfini(void)
nni_cv_wake(&resolv_cv);
nni_mtx_unlock(&resolv_mtx);

for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) {
nni_thr_fini(&resolv_thrs[i]);
if (resolv_thrs != NULL) {
for (int i = 0; i < resolv_num_thr; i++) {
nni_thr_fini(&resolv_thrs[i]);
}
NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr);
}
}

Expand Down
Loading

0 comments on commit e2ada26

Please sign in to comment.