Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move clientCron onto a separate timer #1387

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ void loadServerConfigFromString(char *config) {
{"list-max-ziplist-value", 2, 2},
{"lua-replicate-commands", 2, 2},
{"io-threads-do-reads", 2, 2},
{"dynamic-hz", 2, 2},
{NULL, 0},
};
char buf[1024];
Expand Down Expand Up @@ -626,8 +627,8 @@ void loadServerConfigFromString(char *config) {
}

/* To ensure backward compatibility and work while hz is out of range */
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;
if (server.hz < CONFIG_MIN_HZ) server.hz = CONFIG_MIN_HZ;
if (server.hz > CONFIG_MAX_HZ) server.hz = CONFIG_MAX_HZ;

sdsfreesplitres(lines, totlines);
reading_config_file = 0;
Expand Down Expand Up @@ -2472,9 +2473,8 @@ static int updateHZ(const char **err) {
UNUSED(err);
/* Hz is more a hint from the user, so we accept values out of range
* but cap them to reasonable values. */
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;
server.hz = server.config_hz;
if (server.hz < CONFIG_MIN_HZ) server.hz = CONFIG_MIN_HZ;
if (server.hz > CONFIG_MAX_HZ) server.hz = CONFIG_MAX_HZ;
return 1;
}

Expand Down Expand Up @@ -3168,7 +3168,6 @@ standardConfig static_configs[] = {
createBoolConfig("activerehashing", NULL, MODIFIABLE_CONFIG, server.activerehashing, 1, NULL, NULL),
createBoolConfig("stop-writes-on-bgsave-error", NULL, MODIFIABLE_CONFIG, server.stop_writes_on_bgsave_err, 1, NULL, NULL),
createBoolConfig("set-proc-title", NULL, IMMUTABLE_CONFIG, server.set_proc_title, 1, NULL, NULL), /* Should setproctitle be used? */
createBoolConfig("dynamic-hz", NULL, MODIFIABLE_CONFIG, server.dynamic_hz, 1, NULL, NULL), /* Adapt hz to # of clients.*/
createBoolConfig("lazyfree-lazy-eviction", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_eviction, 1, NULL, NULL),
createBoolConfig("lazyfree-lazy-expire", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_expire, 1, NULL, NULL),
createBoolConfig("lazyfree-lazy-server-del", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_server_del, 1, NULL, NULL),
Expand Down Expand Up @@ -3302,7 +3301,7 @@ standardConfig static_configs[] = {
createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, INT_MIN, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, INT_MIN, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_replicas_to_write, 0, INTEGER_CONFIG, NULL, updateGoodReplicas),
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_replicas_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodReplicas),
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
Expand Down
108 changes: 57 additions & 51 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) {
size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};

int clientsCronTrackExpansiveClients(client *c, int time_idx) {
int clientsCronTrackExpensiveClients(client *c, int time_idx) {
size_t qb_size = c->querybuf ? sdsAllocSize(c->querybuf) : 0;
size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0;
size_t in_usage = qb_size + c->argv_len_sum + argv_size;
Expand Down Expand Up @@ -1035,8 +1035,8 @@ int updateClientMemUsageAndBucket(client *c) {
}

/* Return the max samples in the memory usage of clients tracked by
* the function clientsCronTrackExpansiveClients(). */
void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
* the function clientsCronTrackExpensiveClients(). */
void getExpensiveClientsInfo(size_t *in_usage, size_t *out_usage) {
size_t i = 0, o = 0;
for (int j = 0; j < CLIENTS_PEAK_MEM_USAGE_SLOTS; j++) {
if (ClientsPeakMemInput[j] > i) i = ClientsPeakMemInput[j];
Expand All @@ -1046,38 +1046,23 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
*out_usage = o;
}

/* This function is called by serverCron() and is used in order to perform
/* This function is called by clientsTimerProc() and is used in order to perform
* operations on clients that are important to perform constantly. For instance
* we use this function in order to disconnect clients after a timeout, including
* clients blocked in some blocking command with a non-zero timeout.
*
* The function makes some effort to process all the clients every second, even
* if this cannot be strictly guaranteed, since serverCron() may be called with
* an actual frequency lower than server.hz in case of latency events like slow
* if this cannot be strictly guaranteed, since clientsTimerProc() may be called with
* an actual frequency lower than the intended rate in case of latency events like slow
* commands.
*
* It is very important for this function, and the functions it calls, to be
* very fast: sometimes the server has tens of hundreds of connected clients, and the
* default server.hz value is 10, so sometimes here we need to process thousands
* of clients per second, turning this function into a source of latency.
* very fast. Sometimes the server has tens of thousands of connected clients, and all
* of them need to be processed every second.
*/
#define CLIENTS_CRON_MIN_ITERATIONS 5
void clientsCron(void) {
/* Try to process at least numclients/server.hz of clients
* per call. Since normally (if there are no big latency events) this
* function is called server.hz times per second, in the average case we
* process all the clients in 1 second. */
int numclients = listLength(server.clients);
int iterations = numclients / server.hz;
static void clientsCron(int clients_this_cycle) {
mstime_t now = mstime();

/* Process at least a few clients while we are at it, even if we need
* to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
* of processing each client once per second. */
if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ? numclients : CLIENTS_CRON_MIN_ITERATIONS;


int curr_peak_mem_usage_slot = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
/* Always zero the next sample, so that when we switch to that second, we'll
* only register samples that are greater in that second without considering
Expand All @@ -1088,14 +1073,13 @@ void clientsCron(void) {
* some slow command is called taking multiple seconds to execute. In that
* case our array may end containing data which is potentially older
* than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem
* since here we want just to track if "recently" there were very expansive
* since here we want just to track if "recently" there were very expensive
* clients from the POV of memory usage. */
int zeroidx = (curr_peak_mem_usage_slot + 1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
ClientsPeakMemInput[zeroidx] = 0;
ClientsPeakMemOutput[zeroidx] = 0;


while (listLength(server.clients) && iterations--) {
while (listLength(server.clients) && clients_this_cycle--) {
client *c;
listNode *head;

Expand All @@ -1105,14 +1089,14 @@ void clientsCron(void) {
c = listNodeValue(head);
listRotateHeadToTail(server.clients);
if (c->io_read_state != CLIENT_IDLE || c->io_write_state != CLIENT_IDLE) continue;

/* The following functions do different service checks on the client.
* The protocol is that they return non-zero if the client was
* terminated. */
if (clientsCronHandleTimeout(c, now)) continue;
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronResizeOutputBuffer(c, now)) continue;

if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue;
if (clientsCronTrackExpensiveClients(c, curr_peak_mem_usage_slot)) continue;

/* Iterating all the clients in getMemoryOverheadData() is too slow and
* in turn would make the INFO command too slow. So we perform this
Expand All @@ -1126,6 +1110,40 @@ void clientsCron(void) {
}
}

/* A periodic timer that performs client maintenance.
* This cron task follows the following rules:
* - All clients need to be checked (at least) once per second
* - To manage latency, we don't check more than MAX_CLIENTS_PER_CLOCK_TICK at a time
* - The minimum rate will be defined by server.hz
* - At least CLIENTS_CRON_MIN_ITERATIONS will be performed each cycle
*/
#define CLIENTS_CRON_MIN_ITERATIONS 5
long long clientsTimerProc(struct aeEventLoop *eventLoop, long long id, void *clientData) {
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);

int numclients = listLength(server.clients);
int clients_this_cycle = numclients / server.hz; // Initial computation based on standard hz
int delayMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: delay_ms or delayms.


if (clients_this_cycle < CLIENTS_CRON_MIN_ITERATIONS) {
clients_this_cycle = min(numclients, CLIENTS_CRON_MIN_ITERATIONS);
}

if (clients_this_cycle > MAX_CLIENTS_PER_CLOCK_TICK) {
clients_this_cycle = MAX_CLIENTS_PER_CLOCK_TICK;
float required_hz = (float)numclients / MAX_CLIENTS_PER_CLOCK_TICK;
delayMs = 1000.0 / required_hz;
} else {
delayMs = 1000 / server.hz;
}

clientsCron(clients_this_cycle);

return delayMs;
}

/* This function handles 'background' operations we are required to do
* incrementally in the databases, such as active key expiring, resizing,
* rehashing. */
Expand Down Expand Up @@ -1346,19 +1364,6 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa
* handler if we don't return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

server.hz = server.config_hz;
/* Adapt the server.hz value to the number of configured clients. If we have
* many clients, we want to call serverCron() with an higher frequency. */
if (server.dynamic_hz) {
while (listLength(server.clients) / server.hz > MAX_CLIENTS_PER_CLOCK_TICK) {
server.hz *= 2;
if (server.hz > CONFIG_MAX_HZ) {
server.hz = CONFIG_MAX_HZ;
break;
}
}
}

/* for debug purposes: skip actual cron work if pause_cron is on */
if (server.pause_cron) return 1000 / server.hz;

Expand Down Expand Up @@ -1444,9 +1449,6 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa
}
}

/* We need to do a few operations on clients asynchronously. */
clientsCron();

/* Handle background operations on databases. */
databasesCron();

Expand Down Expand Up @@ -2629,7 +2631,6 @@ void initServer(void) {
/* Initialization after setting defaults from the config system. */
server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
server.fsynced_reploff = server.aof_enabled ? 0 : -1;
server.hz = server.config_hz;
server.in_fork_child = CHILD_TYPE_NONE;
server.rdb_pipe_read = -1;
server.rdb_child_exit_pipe = -1;
Expand Down Expand Up @@ -2775,10 +2776,15 @@ void initServer(void) {
server.acl_info.invalid_channel_accesses = 0;

/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
* operations incrementally, like eviction of unaccessed expired keys, etc. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
serverPanic("Can't create serverCron timer.");
exit(1);
}
/* A separate timer for client maintenance. Runs at a variable speed depending
* on the client count. */
if (aeCreateTimeEvent(server.el, 1, clientsTimerProc, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event clientsTimerProc timer.");
exit(1);
}

Expand Down Expand Up @@ -5549,7 +5555,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"uptime_in_seconds:%I\r\n", (int64_t)uptime,
"uptime_in_days:%I\r\n", (int64_t)(uptime / (3600 * 24)),
"hz:%i\r\n", server.hz,
"configured_hz:%i\r\n", server.config_hz,
"configured_hz:%i\r\n", server.hz,
"lru_clock:%u\r\n", server.lruclock,
"executable:%s\r\n", server.executable ? server.executable : "",
"config_file:%s\r\n", server.configfile ? server.configfile : "",
Expand All @@ -5570,7 +5576,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
if (all_sections || (dictFind(section_dict, "clients") != NULL)) {
size_t maxin, maxout;
unsigned long blocking_keys, blocking_keys_on_nokey, watched_keys;
getExpansiveClientsInfo(&maxin, &maxout);
getExpensiveClientsInfo(&maxin, &maxout);
totalNumberOfStatefulKeys(&blocking_keys, &blocking_keys_on_nokey, &watched_keys);
if (sections++) info = sdscat(info, "\r\n");
info = sdscatprintf(
Expand Down
4 changes: 0 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1681,10 +1681,6 @@ struct valkeyServer {
char *configfile; /* Absolute config file path, or NULL */
char *executable; /* Absolute executable file path. */
char **exec_argv; /* Executable argv vector (copy). */
int dynamic_hz; /* Change hz value depending on # of clients. */
int config_hz; /* Configured HZ value. May be different than
the actual 'hz' field value if dynamic-hz
is enabled. */
mode_t umask; /* The umask value of the process on startup */
int hz; /* serverCron() calls frequency in hertz */
int in_fork_child; /* indication that this is a fork child */
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -730,8 +730,6 @@ test {diskless loading short read} {
$replica config set repl-diskless-load swapdb
$master config set hz 500
$replica config set hz 500
$master config set dynamic-hz no
$replica config set dynamic-hz no
# Try to fill the master with all types of data types / encodings
set start [clock clicks -milliseconds]

Expand Down
2 changes: 0 additions & 2 deletions tests/unit/moduleapi/hooks.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ tags "modules" {
}
# set some configs that will cause many loading progress events during aof loading
r config set key-load-delay 500
r config set dynamic-hz no
r config set hz 500
r DEBUG LOADAOF
assert_equal [r hooks.event_last loading-aof-start] 0
Expand All @@ -73,7 +72,6 @@ tags "modules" {
}
}
# undo configs before next test
r config set dynamic-hz yes
r config set key-load-delay 0

test {Test module rdb save hook} {
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/moduleapi/testrdb.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ tags "modules" {
$replica config set repl-diskless-load swapdb
$master config set hz 500
$replica config set hz 500
$master config set dynamic-hz no
$replica config set dynamic-hz no
set start [clock clicks -milliseconds]
for {set k 0} {$k < 30} {incr k} {
r testrdb.set.key key$k [string repeat A [expr {int(rand()*1000000)}]]
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/other.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ start_server {tags {"other external:skip"}} {
}

start_cluster 1 0 {tags {"other external:skip cluster slow"}} {
r config set dynamic-hz no hz 500
r config set hz 500
test "Server can trigger resizing" {
r flushall
# hashslot(foo) is 12182
Expand Down
16 changes: 0 additions & 16 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2270,22 +2270,6 @@ client-output-buffer-limit pubsub 32mb 8mb 60
# 100 only in environments where very low latency is required.
hz 10

# Normally it is useful to have an HZ value which is proportional to the
# number of clients connected. This is useful in order, for instance, to
# avoid too many clients are processed for each background task invocation
# in order to avoid latency spikes.
#
# Since the default HZ value by default is conservatively set to 10, the server
# offers, and enables by default, the ability to use an adaptive HZ value
# which will temporarily raise when there are many connected clients.
#
# When dynamic HZ is enabled, the actual configured HZ will be used
# as a baseline, but multiples of the configured HZ value will be actually
# used as needed once more clients are connected. In this way an idle
# instance will use very little CPU time while a busy instance will be
# more responsive.
dynamic-hz yes

# When a child rewrites the AOF file, if the following option is enabled
# the file will be fsync-ed every 4 MB of data generated. This is useful
# in order to commit the file to the disk more incrementally and avoid
Expand Down
Loading