From 81667efed3760fc91080ceacb0c47c0806caa131 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 18 Jul 2024 11:32:39 +0000 Subject: [PATCH 01/32] Deflake test "Psync established after RDB load - beyond grace period" *** [err]: Psync established after RDB load - beyond grace period in tests/integration/dual-channel-replication.tcl log message of '"*Replica main channel failed to establish PSYNC within the grace period*"' not found in ./tests/tmp/server.7063.182/stdout after line: 0 till line: 196 Signed-off-by: naglera --- tests/helpers/bg_server_sleep.tcl | 2 +- tests/integration/dual-channel-replication.tcl | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/helpers/bg_server_sleep.tcl b/tests/helpers/bg_server_sleep.tcl index 7e46fe9c90..1a1feb0be2 100644 --- a/tests/helpers/bg_server_sleep.tcl +++ b/tests/helpers/bg_server_sleep.tcl @@ -4,7 +4,7 @@ source tests/support/util.tcl proc bg_server_sleep {host port sec} { set r [valkey $host $port 0] $r client setname SLEEP_HANDLER - $r debug sleep $sec + catch {$r debug sleep $sec} } bg_server_sleep [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index 7344d0d3b1..98036819ea 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -631,9 +631,9 @@ start_server {tags {"dual-channel-replication external:skip"}} { set loglines [lindex $res 1] incr $loglines # At this point rdb is loaded but psync hasn't been established yet. - # Force the replica to sleep for 8 seconds so the primary main process will wake up, while the replica is unresponsive. + # Force the replica to sleep so the primary main process will wake up, while the replica is unresponsive. # We expect the grace time to be over before the replica wake up, so sync will fail. - set sleep_handle [start_bg_server_sleep $replica_host $replica_port 8] + set sleep_handle [start_bg_server_sleep $replica_host $replica_port 1000] wait_for_condition 50 100 { [string match {*replicas_waiting_psync:1*} [$primary info replication]] } else { @@ -642,15 +642,12 @@ start_server {tags {"dual-channel-replication external:skip"}} { # Sync should fail once the replica ask for PSYNC using main channel set res [wait_for_log_messages -1 {"*Replica main channel failed to establish PSYNC within the grace period*"} 0 4000 1] - - # Should succeed on retry - verify_replica_online $primary 0 500 wait_for_condition 50 100 { [string match {*replicas_waiting_psync:0*} [$primary info replication]] } else { - fail "Primary did not free repl buf block after psync establishment" + fail "Primary did not free waiting psync replica after grace period" } - $replica replicaof no one + stop_bg_server_sleep $sleep_handle } stop_write_load $load_handle0 } From 2ce775611acf11e2acab03c04536e3f6b53db65a Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 18 Jul 2024 12:26:38 +0000 Subject: [PATCH 02/32] Deflake Dual channel replication test batch Deflaked the following tests: 1. dual-channel-replication with multiple replicas 2. Test diverse replica sync: dual-channel on/off 3. Test replica's buffer limit reached 4. dual-channel-replication fails when primary diskless disabled First check that replica is online and then wait for value to propegate. test failed in https://github.com/valkey-io/valkey/actions/runs/9986538309/job/27599242506 Signed-off-by: naglera --- .../integration/dual-channel-replication.tcl | 35 ++++++------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index 98036819ea..835b48fd37 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -284,23 +284,13 @@ start_server {tags {"dual-channel-replication external:skip"}} { test "dual-channel-replication with multiple replicas" { $replica1 replicaof $primary_host $primary_port $replica2 replicaof $primary_host $primary_port + verify_replica_online $primary 0 500 + verify_replica_online $primary 1 500 wait_for_value_to_propegate_to_replica $primary $replica1 "key1" wait_for_value_to_propegate_to_replica $primary $replica2 "key1" - wait_for_condition 100 100 { - [s 0 total_forks] eq "1" - } else { - fail "Primary <-> Replica didn't start the full sync" - } - - verify_replica_online $primary 0 500 - verify_replica_online $primary 1 500 - wait_for_condition 50 1000 { - [status $replica1 master_link_status] == "up" - } else { - fail "Replica is not synced" - } + assert {[s 0 total_forks] eq "1" } } $replica1 replicaof no one @@ -314,12 +304,10 @@ start_server {tags {"dual-channel-replication external:skip"}} { test "Test diverse replica sync: dual-channel on/off" { $replica1 replicaof $primary_host $primary_port $replica2 replicaof $primary_host $primary_port - - wait_for_value_to_propegate_to_replica $primary $replica1 "key2" - wait_for_value_to_propegate_to_replica $primary $replica2 "key2" - verify_replica_online $primary 0 500 verify_replica_online $primary 1 500 + wait_for_value_to_propegate_to_replica $primary $replica1 "key2" + wait_for_value_to_propegate_to_replica $primary $replica2 "key2" wait_for_condition 50 1000 { [status $replica1 master_link_status] == "up" } else { @@ -328,7 +316,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } $replica1 replicaof no one - $primary set key4 val4 + $primary set key3 val3 test "Test replica's buffer limit reached" { $primary config set repl-diskless-sync-delay 0 @@ -358,18 +346,18 @@ start_server {tags {"dual-channel-replication external:skip"}} { assert {[s -2 replicas_replication_buffer_size] <= 16385*2} # Wait for sync to succeed - wait_for_value_to_propegate_to_replica $primary $replica1 "key4" wait_for_condition 50 1000 { [status $replica1 master_link_status] == "up" } else { fail "Replica is not synced" } + wait_for_value_to_propegate_to_replica $primary $replica1 "key3" } $replica1 replicaof no one $replica1 config set client-output-buffer-limit "replica 256mb 256mb 0"; # remove repl buffer limitation - $primary set key5 val5 + $primary set key4 val4 test "dual-channel-replication fails when primary diskless disabled" { set cur_psync [status $primary sync_partial_ok] @@ -379,13 +367,12 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica1 replicaof $primary_host $primary_port # Wait for mitigation and resync - wait_for_value_to_propegate_to_replica $primary $replica1 "key5" - wait_for_condition 50 1000 { [status $replica1 master_link_status] == "up" } else { fail "Replica is not synced" } + wait_for_value_to_propegate_to_replica $primary $replica1 "key4" # Verify that we did not use dual-channel-replication sync assert {[status $primary sync_partial_ok] == $cur_psync} @@ -1107,7 +1094,5 @@ start_server {tags {"dual-channel-replication external:skip"}} { fail "Replica offset didn't catch up with the primary after too long time" } } - - } -} \ No newline at end of file +} From 309a83e4d44bdf17873ccd8de7e0c087897efb2a Mon Sep 17 00:00:00 2001 From: naglera Date: Sun, 21 Jul 2024 15:07:07 +0000 Subject: [PATCH 03/32] Use pause process instead of brute force sleep Signed-off-by: naglera --- src/debug.c | 18 +-- src/replication.c | 4 +- src/server.h | 5 +- tests/helpers/bg_server_sleep.tcl | 10 -- .../integration/dual-channel-replication.tcl | 115 ++++++++++-------- 5 files changed, 82 insertions(+), 70 deletions(-) delete mode 100644 tests/helpers/bg_server_sleep.tcl diff --git a/src/debug.c b/src/debug.c index 2ea0c1a3e8..f752bad26b 100644 --- a/src/debug.c +++ b/src/debug.c @@ -495,8 +495,8 @@ void debugCommand(client *c) { " In case RESET is provided the peak reset time will be restored to the default value", "REPLYBUFFER RESIZING <0|1>", " Enable or disable the reply buffer resize cron job", - "SLEEP-AFTER-FORK-SECONDS ", - " Stop the server's main process for after forking.", + "PAUSE-AFTER-FORK <0|1>", + " Stop the server's main process after fork.", "DELAY-RDB-CLIENT-FREE-SECOND ", " Grace period in seconds for replica main channel to establish psync.", "DICT-RESIZING <0|1>", @@ -995,12 +995,8 @@ void debugCommand(client *c) { return; } addReply(c, shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr, "sleep-after-fork-seconds") && c->argc == 3) { - double sleep_after_fork_seconds; - if (getDoubleFromObjectOrReply(c, c->argv[2], &sleep_after_fork_seconds, NULL) != C_OK) { - return; - } - server.debug_sleep_after_fork_us = (int)(sleep_after_fork_seconds * 1e6); + } else if (!strcasecmp(c->argv[1]->ptr, "pause-after-fork") && c->argc == 3) { + server.debug_pause_after_fork = atoi(c->argv[2]->ptr); addReply(c, shared.ok); } else if (!strcasecmp(c->argv[1]->ptr, "delay-rdb-client-free-seconds") && c->argc == 3) { server.wait_before_rdb_client_free = atoi(c->argv[2]->ptr); @@ -2305,6 +2301,12 @@ void applyWatchdogPeriod(void) { } } +void debugPauseProcess(void) { + serverLog(LL_DEBUG, "Process is about to stop."); + raise(SIGSTOP); + serverLog(LL_DEBUG, "Process has been continued."); +} + /* Positive input is sleep time in microseconds. Negative input is fractions * of microseconds, i.e. -10 means 100 nanoseconds. */ void debugDelay(int usec) { diff --git a/src/replication.c b/src/replication.c index b00da525bf..b3220e8a44 100644 --- a/src/replication.c +++ b/src/replication.c @@ -961,7 +961,7 @@ int startBgsaveForReplication(int mincapa, int req) { /* Keep the page cache since it'll get used soon */ retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE); } - if (server.debug_sleep_after_fork_us) usleep(server.debug_sleep_after_fork_us); + if (server.debug_pause_after_fork) debugPauseProcess(); } else { serverLog(LL_WARNING, "BGSAVE for replication: replication information not available, can't generate the RDB " "file right now. Try later."); @@ -3181,7 +3181,7 @@ void setupMainConnForPsync(connection *conn) { } if (server.repl_state == REPL_STATE_SEND_PSYNC) { - if (server.debug_sleep_after_fork_us) usleep(server.debug_sleep_after_fork_us); + if (server.debug_pause_after_fork) debugPauseProcess(); if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { serverLog(LL_WARNING, "Aborting dual channel sync. Write error."); cancelReplicationHandshake(1); diff --git a/src/server.h b/src/server.h index 50ab8e6f30..e767deaa8b 100644 --- a/src/server.h +++ b/src/server.h @@ -2005,8 +2005,8 @@ struct valkeyServer { * use dual channel replication for full syncs. */ int wait_before_rdb_client_free; /* Grace period in seconds for replica main channel * to establish psync. */ - int debug_sleep_after_fork_us; /* Debug param that force the main process to - * sleep for N microseconds after fork() in repl. */ + int debug_pause_after_fork; /* Debug param that pause the main process + * after a replication fork() (for bgsave). */ size_t repl_buffer_mem; /* The memory of replication buffer. */ list *repl_buffer_blocks; /* Replication buffers blocks list * (serving replica clients and repl backlog) */ @@ -3981,6 +3981,7 @@ void killThreads(void); void makeThreadKillable(void); void swapMainDbWithTempDb(serverDb *tempDb); sds getVersion(void); +void debugPauseProcess(void); /* Use macro for checking log level to avoid evaluating arguments in cases log * should be ignored due to low level. */ diff --git a/tests/helpers/bg_server_sleep.tcl b/tests/helpers/bg_server_sleep.tcl deleted file mode 100644 index 1a1feb0be2..0000000000 --- a/tests/helpers/bg_server_sleep.tcl +++ /dev/null @@ -1,10 +0,0 @@ -source tests/support/valkey.tcl -source tests/support/util.tcl - -proc bg_server_sleep {host port sec} { - set r [valkey $host $port 0] - $r client setname SLEEP_HANDLER - catch {$r debug sleep $sec} -} - -bg_server_sleep [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index 835b48fd37..e076020ad1 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -5,15 +5,6 @@ proc log_file_matches {log pattern} { string match $pattern $content } -proc start_bg_server_sleep {host port sec} { - set tclsh [info nameofexecutable] - exec $tclsh tests/helpers/bg_server_sleep.tcl $host $port $sec & -} - -proc stop_bg_server_sleep {handle} { - catch {exec /bin/kill -9 $handle} -} - proc get_client_id_by_last_cmd {r cmd} { set client_list [$r client list] set client_id "" @@ -390,12 +381,12 @@ start_server {tags {"dual-channel-replication external:skip"}} { set primary [srv 0 client] set primary_host [srv 0 host] set primary_port [srv 0 port] - set loglines [count_log_lines -1] + set primary_pid [srv 0 pid] # Create small enough db to be loaded before replica establish psync connection $primary set key1 val1 $primary config set repl-diskless-sync yes - $primary debug sleep-after-fork-seconds 5;# Stop primary after fork for 5 seconds + $primary debug pause-after-fork 1 $primary config set dual-channel-replication-enabled yes $replica config set dual-channel-replication-enabled yes @@ -403,6 +394,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { test "Test dual-channel-replication sync- psync established after rdb load" { $replica replicaof $primary_host $primary_port + wait_for_log_messages -1 {"*Done loading RDB*"} 0 2000 1 + resume_process $primary_pid verify_replica_online $primary 0 500 wait_for_condition 50 1000 { @@ -412,9 +405,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } wait_for_value_to_propegate_to_replica $primary $replica "key1" # Confirm the occurrence of a race condition. - set res [wait_for_log_messages -1 {"*Dual channel sync - psync established after rdb load*"} $loglines 2000 1] - set loglines [lindex $res 1] - incr $loglines + wait_for_log_messages -1 {"*Dual channel sync - psync established after rdb load*"} 0 2000 1 } } } @@ -424,6 +415,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] + set replica_pid [srv 0 pid] start_server {} { set primary [srv 0 client] set primary_host [srv 0 host] @@ -446,8 +438,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica config set dual-channel-replication-enabled yes $replica config set loglevel debug $replica config set repl-timeout 10 - # Stop replica after primary fork for 5 seconds - $replica debug sleep-after-fork-seconds 5 + # Pause replica after primary fork + $replica debug pause-after-fork 1 test "dual-channel-replication: Primary COB growth with inactive replica" { $replica replicaof $primary_host $primary_port @@ -457,6 +449,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } else { fail "Primary should allow backlog to grow beyond its limits during dual-channel-replication sync handshake" } + resume_process $replica_pid verify_replica_online $primary 0 500 wait_for_condition 50 1000 { @@ -478,11 +471,13 @@ start_server {tags {"dual-channel-replication external:skip"}} { set replica1_host [srv 0 host] set replica1_port [srv 0 port] set replica1_log [srv 0 stdout] + set replica1_pid [srv 0 pid] start_server {} { set replica2 [srv 0 client] set replica2_host [srv 0 host] set replica2_port [srv 0 port] set replica2_log [srv 0 stdout] + set replica2_pid [srv 0 pid] start_server {} { set primary [srv 0 client] set primary_host [srv 0 host] @@ -507,15 +502,16 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica1 config set repl-timeout 10 $replica2 config set repl-timeout 10 - # Stop replica after primary fork for 2 seconds - $replica1 debug sleep-after-fork-seconds 2 - $replica2 debug sleep-after-fork-seconds 2 + # Pause replicas after primary forks for + $replica1 debug pause-after-fork 1 + $replica2 debug pause-after-fork 1 test "Test dual-channel: primary tracking replica backlog refcount - start with empty backlog" { $replica1 replicaof $primary_host $primary_port set res [wait_for_log_messages 0 {"*Add rdb replica * no repl-backlog to track*"} $loglines 2000 1] set res [wait_for_log_messages 0 {"*Attach replica rdb client*"} $loglines 2000 1] set loglines [lindex $res 1] - incr $loglines + incr $loglines + resume_process $replica1_pid verify_replica_online $primary 0 700 wait_for_condition 50 1000 { [status $replica1 master_link_status] == "up" @@ -530,7 +526,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica2 replicaof $primary_host $primary_port set res [wait_for_log_messages 0 {"*Add rdb replica * tracking repl-backlog tail*"} $loglines 2000 1] set loglines [lindex $res 1] - incr $loglines + incr $loglines + resume_process $replica2_pid verify_replica_online $primary 0 700 wait_for_condition 50 1000 { [status $replica2 master_link_status] == "up" @@ -549,6 +546,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set primary [srv 0 client] set primary_host [srv 0 host] set primary_port [srv 0 port] + set primary_pid [srv 0 pid] $primary config set repl-diskless-sync yes $primary config set dual-channel-replication-enabled yes @@ -557,36 +555,43 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary config set repl-timeout 10 # generate small db populate 10 primary 10 - # Stop primary main process after fork for 2 seconds - $primary debug sleep-after-fork-seconds 2 - $primary debug delay-rdb-client-free-seconds 5 + # Pause primary main process after fork + $primary debug pause-after-fork 1 + # Give replica two second grace period before disconnection + $primary debug delay-rdb-client-free-seconds 2 start_server {} { set replica [srv 0 client] set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] + set replica_pid [srv 0 pid] set loglines [count_log_lines 0] set load_handle0 [start_write_load $primary_host $primary_port 20] $replica config set dual-channel-replication-enabled yes $replica config set loglevel debug - $replica config set repl-timeout 10 + $replica config set repl-timeout 10 test "Psync established after rdb load - within grace period" { + # Test Sequence: + # 1. Replica initiates synchronization via RDB channel. + # 2. Primary's main process is suspended. + # 3. Replica completes RDB loading and pauses before establishing PSYNC connection. + # 4. Primary resumes operation and detects closed RDB channel. + # 5. Replica resumes operation. + # Expected outcome: Primary maintains RDB channel until replica establishes PSYNC connection. $replica replicaof $primary_host $primary_port - set res [wait_for_log_messages 0 {"*Done loading RDB*"} $loglines 2000 1] - set loglines [lindex $res 1] - incr $loglines - # At this point rdb is loaded but psync hasn't been established yet. - # Force the replica to sleep for 3 seconds so the primary main process will wake up, while the replica is unresponsive. - set sleep_handle [start_bg_server_sleep $replica_host $replica_port 3] + wait_for_log_messages 0 {"*Done loading RDB*"} $loglines 2000 1 + pause_process $replica_pid + resume_process $primary_pid wait_for_condition 50 100 { [string match {*replicas_waiting_psync:1*} [$primary info replication]] } else { fail "Primary freed RDB client before psync was established" } + resume_process $replica_pid verify_replica_online $primary 0 500 wait_for_condition 50 100 { @@ -604,6 +609,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] + set replica_pid [srv 0 pid] set loglines [count_log_lines 0] set load_handle0 [start_write_load $primary_host $primary_port 20] @@ -613,14 +619,16 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica config set repl-timeout 10 test "Psync established after RDB load - beyond grace period" { + # Test Sequence: + # 1. Replica initiates synchronization via RDB channel. + # 2. Primary's main process is suspended. + # 3. Replica completes RDB loading and pauses before establishing PSYNC connection. + # 4. Primary resumes operation and detects closed RDB channel. + # Expected outcome: Primary drops the RDB channel after grace period is done. $replica replicaof $primary_host $primary_port - set res [wait_for_log_messages 0 {"*Done loading RDB*"} $loglines 2000 1] - set loglines [lindex $res 1] - incr $loglines - # At this point rdb is loaded but psync hasn't been established yet. - # Force the replica to sleep so the primary main process will wake up, while the replica is unresponsive. - # We expect the grace time to be over before the replica wake up, so sync will fail. - set sleep_handle [start_bg_server_sleep $replica_host $replica_port 1000] + wait_for_log_messages 0 {"*Done loading RDB*"} $loglines 2000 1 + pause_process $replica_pid + resume_process $primary_pid wait_for_condition 50 100 { [string match {*replicas_waiting_psync:1*} [$primary info replication]] } else { @@ -634,7 +642,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } else { fail "Primary did not free waiting psync replica after grace period" } - stop_bg_server_sleep $sleep_handle + resume_process $replica_pid } stop_write_load $load_handle0 } @@ -645,6 +653,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set primary_host [srv 0 host] set primary_port [srv 0 port] set loglines [count_log_lines 0] + set primary_pid [srv 0 pid] $primary config set repl-diskless-sync yes $primary config set dual-channel-replication-enabled yes @@ -652,13 +661,12 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary config set loglevel debug # generate small db populate 10 primary 10 - # Stop primary main process after fork for 1 seconds - $primary debug sleep-after-fork-seconds 2 start_server {} { set replica [srv 0 client] set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] + set replica_pid [srv 0 pid] set load_handle0 [start_write_load $primary_host $primary_port 20] set load_handle1 [start_write_load $primary_host $primary_port 20] @@ -669,31 +677,39 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica config set repl-timeout 10 test "Test dual-channel-replication primary gets cob overrun before established psync" { + # Stop primary main process after fork for 1 seconds + $primary debug pause-after-fork 1 $replica replicaof $primary_host $primary_port wait_for_log_messages 0 {"*Done loading RDB*"} 0 2000 1 # At this point rdb is loaded but psync hasn't been established yet. # Force the replica to sleep for 5 seconds so the primary main process will wake up while the # replica is unresponsive. We expect the main process to fill the COB before the replica wakes. - set sleep_handle [start_bg_server_sleep $replica_host $replica_port 5] + pause_process $replica_pid + resume_process $primary_pid + $primary debug pause-after-fork 0 wait_for_log_messages -1 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 2000 1 wait_for_condition 50 100 { [string match {*replicas_waiting_psync:0*} [$primary info replication]] } else { fail "Primary did not free repl buf block after sync failure" } + resume_process $replica_pid set res [wait_for_log_messages -1 {"*Unable to partial resync with replica * for lack of backlog*"} $loglines 20000 1] set loglines [lindex $res 1] } - $replica replicaof no one - $replica debug sleep-after-fork-seconds 2 - + wait_for_condition 500 1000 { + [s -1 rdb_bgsave_in_progress] eq 0 + } else { + fail "Primary should abort sync" + } + + $replica debug pause-after-fork 1 $primary debug populate 1000 primary 100000 # Set primary with a slow rdb generation, so that we can easily intercept loading # 10ms per key, with 1000 keys is 10 seconds $primary config set rdb-key-save-delay 10000 - $primary debug sleep-after-fork-seconds 0 test "Test dual-channel-replication primary gets cob overrun during replica rdb load" { set cur_client_closed_count [s -1 client_output_buffer_limit_disconnections] @@ -709,6 +725,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } else { fail "Primary did not free repl buf block after sync failure" } + resume_process $replica_pid set res [wait_for_log_messages -1 {"*Unable to partial resync with replica * for lack of backlog*"} $loglines 20000 1] set loglines [lindex $res 0] } @@ -834,7 +851,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] - + set replica_pid [srv 0 pid] + set load_handle [start_write_load $primary_host $primary_port 20] $replica config set dual-channel-replication-enabled yes @@ -912,7 +930,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { test "Test dual-channel-replication primary reject set-rdb-client after client killed" { # Ensure replica main channel will not handshake before rdb client is killed - $replica debug sleep-after-fork-seconds 10 + $replica debug pause-after-fork 1 $replica replicaof $primary_host $primary_port # Wait for sync session to start wait_for_condition 500 1000 { @@ -935,6 +953,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { # Verify primary reject replconf set-rdb-client-id set res [catch {$primary replconf set-rdb-client-id $replica_rdb_channel_id} err] assert [string match *ERR* $err] + resume_process $replica_pid } stop_write_load $load_handle } From 4d3001a40e609ebe1db5c79ca46efe553f9514d5 Mon Sep 17 00:00:00 2001 From: naglera Date: Sun, 21 Jul 2024 15:46:37 +0000 Subject: [PATCH 04/32] test documentation fix Signed-off-by: naglera --- tests/integration/dual-channel-replication.tcl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index e076020ad1..0f46382cdc 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -677,14 +677,14 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica config set repl-timeout 10 test "Test dual-channel-replication primary gets cob overrun before established psync" { - # Stop primary main process after fork for 1 seconds + # Pause primary main process after fork $primary debug pause-after-fork 1 $replica replicaof $primary_host $primary_port wait_for_log_messages 0 {"*Done loading RDB*"} 0 2000 1 # At this point rdb is loaded but psync hasn't been established yet. - # Force the replica to sleep for 5 seconds so the primary main process will wake up while the - # replica is unresponsive. We expect the main process to fill the COB before the replica wakes. + # Pause the replica so the primary main process will wake up while the + # replica is unresponsive. We expect the main process to fill the COB and disconnect the replica. pause_process $replica_pid resume_process $primary_pid $primary debug pause-after-fork 0 From 81355d015a0ef5822545ed1824d4c98ca76edcb2 Mon Sep 17 00:00:00 2001 From: naglera Date: Mon, 22 Jul 2024 07:47:03 +0000 Subject: [PATCH 05/32] Deflake- Test dual-channel-replication primary reject set-rdb-client after client killed we should wait before asserting replconf will fail Signed-off-by: naglera --- tests/integration/dual-channel-replication.tcl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index 0f46382cdc..609476ca4c 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -946,7 +946,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary client kill id $replica_rdb_channel_id # Wait for primary to abort the sync wait_for_condition 10000000 10 { - [s -1 rdb_bgsave_in_progress] eq 0 + [s -1 rdb_bgsave_in_progress] eq 0 && + [string match {*replicas_waiting_psync:0*} [$primary info replication]] } else { fail "Primary should abort sync" } From 773ad9c6f0c95c11dc904c09a8af8db3f059aab4 Mon Sep 17 00:00:00 2001 From: naglera Date: Mon, 22 Jul 2024 08:29:41 +0000 Subject: [PATCH 06/32] Use smaller RDB for "Test replica's buffer limit reached" Signed-off-by: naglera --- tests/integration/dual-channel-replication.tcl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index 609476ca4c..7d996c2507 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -258,7 +258,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set primary_port [srv 0 port] set loglines [count_log_lines -1] - populate 10000 primary 10000 + populate 10000 primary 10 $primary set key1 val1 $primary config set repl-diskless-sync yes @@ -317,7 +317,6 @@ start_server {tags {"dual-channel-replication external:skip"}} { # It will give us enough time to fill the replica buffer. $replica1 config set dual-channel-replication-enabled yes $replica1 config set client-output-buffer-limit "replica 16383 16383 0" - $replica1 config set loglevel debug $replica1 replicaof $primary_host $primary_port # Wait for replica to establish psync using main channel @@ -327,7 +326,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { fail "replica didn't start sync session in time" } - populate 10000 primary 10000 + populate 10000 primary 10; # set ~ 100kb # Wait for replica's buffer limit reached wait_for_condition 50 1000 { [log_file_matches $replica1_log "*Replication buffer limit reached, stopping buffering*"] From e0696c518bd23cfb8f9dd5a6006d4f7e2023e884 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Mon, 22 Jul 2024 17:46:37 -0700 Subject: [PATCH 07/32] Update src/server.h Signed-off-by: Madelyn Olson Signed-off-by: naglera --- src/server.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.h b/src/server.h index e767deaa8b..10dc52ba3a 100644 --- a/src/server.h +++ b/src/server.h @@ -2005,7 +2005,7 @@ struct valkeyServer { * use dual channel replication for full syncs. */ int wait_before_rdb_client_free; /* Grace period in seconds for replica main channel * to establish psync. */ - int debug_pause_after_fork; /* Debug param that pause the main process + int debug_pause_after_fork; /* Debug param that pauses the main process * after a replication fork() (for bgsave). */ size_t repl_buffer_mem; /* The memory of replication buffer. */ list *repl_buffer_blocks; /* Replication buffers blocks list From 021975d7a81b8db433f04d164f86411e7047354e Mon Sep 17 00:00:00 2001 From: naglera Date: Tue, 23 Jul 2024 10:48:46 +0000 Subject: [PATCH 08/32] Fix race condition in rdb client free Signed-off-by: Ubuntu Signed-off-by: naglera --- src/networking.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index 501476e35d..45c1f5f61b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4332,9 +4332,9 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) { if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(), c); /* Remove RDB connection protection on COB overrun */ - c->flag.protected_rdb_channel = 0; - if (async) { + if (async || c->flag.protected_rdb_channel) { + c->flag.protected_rdb_channel = 0; freeClientAsync(c); serverLog(LL_WARNING, "Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); From 67dd350bdd8979ef8a5fe7910c778af08a6df11b Mon Sep 17 00:00:00 2001 From: naglera Date: Tue, 23 Jul 2024 14:05:52 +0000 Subject: [PATCH 09/32] Fix race condition dualChannelSync abort after local replication buffer stream Signed-off-by: Ubuntu Signed-off-by: naglera --- src/replication.c | 2 ++ tests/integration/replication-buffer.tcl | 3 +++ 2 files changed, 5 insertions(+) diff --git a/src/replication.c b/src/replication.c index b3220e8a44..56f784f817 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2862,6 +2862,8 @@ void dualChannelSyncSuccess(void) { if (streamReplDataBufToDb(server.primary) == C_ERR) { /* Sync session aborted during repl data streaming. */ serverLog(LL_WARNING, "Failed to stream local replication buffer into memory"); + replicationAbortDualChannelSyncTransfer(); + replicationUnsetPrimary(); return; } freePendingReplDataBuf(); diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl index 2303d01273..18f8aa7eb5 100644 --- a/tests/integration/replication-buffer.tcl +++ b/tests/integration/replication-buffer.tcl @@ -27,6 +27,9 @@ start_server {} { # Make sure replica3 is synchronized with master $replica3 replicaof $master_host $master_port wait_for_sync $replica3 + if {$dualchannel == "yes"} { + wait_for_ofs_sync $master $replica3 + } # Generating RDB will take some 100 seconds $master config set rdb-key-save-delay 1000000 From 2931ecc33098e6283ac2633d0d9de10e8f7dfbcb Mon Sep 17 00:00:00 2001 From: naglera Date: Tue, 23 Jul 2024 16:01:22 +0000 Subject: [PATCH 10/32] Complete dualChannelSync abort after local buffer stream fix: Handle cleanup Signed-off-by: Ubuntu Signed-off-by: naglera --- src/replication.c | 2 +- tests/integration/dual-channel-replication.tcl | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 56f784f817..baaf37491d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2859,7 +2859,7 @@ void dualChannelSyncSuccess(void) { server.primary_initial_offset = server.repl_provisional_primary.reploff; replicationResurrectProvisionalPrimary(); /* Wait for the accumulated buffer to be processed before reading any more replication updates */ - if (streamReplDataBufToDb(server.primary) == C_ERR) { + if (server.pending_repl_data.blocks && streamReplDataBufToDb(server.primary) == C_ERR) { /* Sync session aborted during repl data streaming. */ serverLog(LL_WARNING, "Failed to stream local replication buffer into memory"); replicationAbortDualChannelSyncTransfer(); diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index 7d996c2507..e986a4cd8d 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -28,6 +28,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] + $replica config set loglevel debug start_server {} { set primary [srv 0 client] set primary_host [srv 0 host] @@ -38,6 +39,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary config set repl-diskless-sync yes $primary config set repl-diskless-sync-delay 1000 $primary config set dual-channel-replication-enabled yes + $primary config set loglevel debug # Start the replication process... $replica config set dual-channel-replication-enabled yes From caa0c1a963295b6fdbb471c7185d3195a6ce529d Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Tue, 23 Jul 2024 14:01:45 -0700 Subject: [PATCH 11/32] Update src/debug.c Signed-off-by: Madelyn Olson Signed-off-by: naglera --- src/debug.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/debug.c b/src/debug.c index f752bad26b..27bc481767 100644 --- a/src/debug.c +++ b/src/debug.c @@ -2302,9 +2302,9 @@ void applyWatchdogPeriod(void) { } void debugPauseProcess(void) { - serverLog(LL_DEBUG, "Process is about to stop."); + serverLog(LL_NOTICE, "Process is about to stop."); raise(SIGSTOP); - serverLog(LL_DEBUG, "Process has been continued."); + serverLog(LL_NOTICE, "Process has been continued."); } /* Positive input is sleep time in microseconds. Negative input is fractions From c4f2ecb5c274d7b41e5a748032a0ac1351529529 Mon Sep 17 00:00:00 2001 From: naglera Date: Wed, 24 Jul 2024 08:11:43 +0000 Subject: [PATCH 12/32] Move repl_rdb_channel_state check to abortDualChannelSync Signed-off-by: naglera --- src/replication.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/replication.c b/src/replication.c index baaf37491d..89d77381a4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2514,7 +2514,7 @@ void freePendingReplDataBuf(void) { * Upon dual-channel sync failure, close rdb-connection, reset repl-state, reset * provisional primary struct, and free local replication buffer. */ void replicationAbortDualChannelSyncTransfer(void) { - serverAssert(server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE); + if(server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) return; serverLog(LL_NOTICE, "Aborting dual channel sync"); if (server.repl_rdb_transfer_s) { connClose(server.repl_rdb_transfer_s); @@ -3688,9 +3688,7 @@ void replicationAbortSyncTransfer(void) { * * Otherwise zero is returned and no operation is performed at all. */ int cancelReplicationHandshake(int reconnect) { - if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { - replicationAbortDualChannelSyncTransfer(); - } + replicationAbortDualChannelSyncTransfer(); if (server.repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(); server.repl_state = REPL_STATE_CONNECT; From 605ee5f5b5a202ee5f48e201c97ded9e8308515c Mon Sep 17 00:00:00 2001 From: naglera Date: Wed, 24 Jul 2024 08:29:33 +0000 Subject: [PATCH 13/32] clang format Signed-off-by: naglera --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 89d77381a4..8069b0f0f9 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2514,7 +2514,7 @@ void freePendingReplDataBuf(void) { * Upon dual-channel sync failure, close rdb-connection, reset repl-state, reset * provisional primary struct, and free local replication buffer. */ void replicationAbortDualChannelSyncTransfer(void) { - if(server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) return; + if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) return; serverLog(LL_NOTICE, "Aborting dual channel sync"); if (server.repl_rdb_transfer_s) { connClose(server.repl_rdb_transfer_s); From 92855ac87a371a5de8ac922c420ce7c1a817308a Mon Sep 17 00:00:00 2001 From: naglera Date: Wed, 24 Jul 2024 09:31:23 +0000 Subject: [PATCH 14/32] Revert "Move repl_rdb_channel_state check to abortDualChannelSync" This reverts commit fc2465d8a380b9b9220ae64a401ef5df6ff3c28b. Signed-off-by: naglera --- src/replication.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/replication.c b/src/replication.c index 8069b0f0f9..baaf37491d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2514,7 +2514,7 @@ void freePendingReplDataBuf(void) { * Upon dual-channel sync failure, close rdb-connection, reset repl-state, reset * provisional primary struct, and free local replication buffer. */ void replicationAbortDualChannelSyncTransfer(void) { - if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) return; + serverAssert(server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE); serverLog(LL_NOTICE, "Aborting dual channel sync"); if (server.repl_rdb_transfer_s) { connClose(server.repl_rdb_transfer_s); @@ -3688,7 +3688,9 @@ void replicationAbortSyncTransfer(void) { * * Otherwise zero is returned and no operation is performed at all. */ int cancelReplicationHandshake(int reconnect) { - replicationAbortDualChannelSyncTransfer(); + if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { + replicationAbortDualChannelSyncTransfer(); + } if (server.repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(); server.repl_state = REPL_STATE_CONNECT; From a68231966c0c4d42d941e02d344dbf8670921c4b Mon Sep 17 00:00:00 2001 From: naglera Date: Wed, 24 Jul 2024 14:04:49 +0000 Subject: [PATCH 15/32] Fix uint64 on 32bit machine bug Signed-off-by: naglera --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index baaf37491d..f1ed49101a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3162,7 +3162,7 @@ void setupMainConnForPsync(connection *conn) { char *err = NULL; if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { /* We already have an initialized connection at primary side, we only need to associate it with RDB connection */ - ll2string(llstr, sizeof(llstr), server.rdb_client_id); + ull2string(llstr, sizeof(llstr), server.rdb_client_id); err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL); if (err) goto error; server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; From ab211b4785aca812dcf951d046e7fefc13edd97a Mon Sep 17 00:00:00 2001 From: naglera Date: Wed, 24 Jul 2024 14:56:52 +0000 Subject: [PATCH 16/32] Verify sync is still in progress when sync aborted during local replicaition buffer streaming Signed-off-by: naglera --- src/replication.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/replication.c b/src/replication.c index f1ed49101a..3acd29d32e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2862,8 +2862,11 @@ void dualChannelSyncSuccess(void) { if (server.pending_repl_data.blocks && streamReplDataBufToDb(server.primary) == C_ERR) { /* Sync session aborted during repl data streaming. */ serverLog(LL_WARNING, "Failed to stream local replication buffer into memory"); - replicationAbortDualChannelSyncTransfer(); - replicationUnsetPrimary(); + /* Verify sync is still in progress */ + if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { + replicationAbortDualChannelSyncTransfer(); + replicationUnsetPrimary(); + } return; } freePendingReplDataBuf(); From a8ef15db98a552a4b575e09e2eeb82ae0c6664c1 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 07:39:16 +0000 Subject: [PATCH 17/32] NOT FOR MERGE - debug logs Signed-off-by: naglera --- src/networking.c | 3 ++- src/replication.c | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/networking.c b/src/networking.c index 45c1f5f61b..5fbe9ac3d8 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1770,6 +1770,7 @@ void freeClient(client *c) { void freeClientAsync(client *c) { if (c->flag.close_asap || c->flag.script) return; c->flag.close_asap = 1; + serverAssert(listSearchKey(server.clients_to_close,c) != NULL); // TODO remove listAddNodeTail(server.clients_to_close, c); } @@ -1894,7 +1895,7 @@ int freeClientsInAsyncFreeQueue(void) { if (!c->rdb_client_disconnect_time) { if (c->conn) connSetReadHandler(c->conn, NULL); c->rdb_client_disconnect_time = server.unixtime; - serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id, + serverLog(LL_NOTICE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id, replicationGetReplicaName(c), server.wait_before_rdb_client_free); continue; } diff --git a/src/replication.c b/src/replication.c index 3acd29d32e..4a12ed5755 100644 --- a/src/replication.c +++ b/src/replication.c @@ -225,7 +225,7 @@ void addRdbReplicaToPsyncWait(client *replica_rdb_client) { tail->refcount++; } } - serverLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ", + serverLog(LL_NOTICE, "Add rdb replica %s to waiting psync, with cid %llu, %s ", replicationGetReplicaName(replica_rdb_client), (unsigned long long)replica_rdb_client->id, tail ? "tracking repl-backlog tail" : "no repl-backlog to track"); replica_rdb_client->ref_repl_buf_node = tail ? ln : NULL; @@ -250,7 +250,7 @@ void backfillRdbReplicasToPsyncWait(void) { if (replica_rdb_client->ref_repl_buf_node) continue; replica_rdb_client->ref_repl_buf_node = ln; head->refcount++; - serverLog(LL_DEBUG, "Attach replica rdb client %llu to repl buf block", + serverLog(LL_NOTICE, "Attach replica rdb client %llu to repl buf block", (long long unsigned int)replica_rdb_client->id); } raxStop(&iter); @@ -269,7 +269,7 @@ void removeReplicaFromPsyncWait(client *replica_main_client) { } replica_rdb_client->ref_repl_buf_node = NULL; replica_rdb_client->flag.protected_rdb_channel = 0; - serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", + serverLog(LL_NOTICE, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", replicationGetReplicaName(replica_main_client), (long long unsigned int)replica_main_client->associated_rdb_client_id, o ? "ref count decreased" : "doesn't exist"); @@ -389,7 +389,7 @@ void freeReplicaReferencedReplBuffer(client *replica) { if (replica->flag.repl_rdb_channel) { uint64_t rdb_cid = htonu64(replica->id); if (raxRemove(server.replicas_waiting_psync, (unsigned char *)&rdb_cid, sizeof(rdb_cid), NULL)) { - serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.", + serverLog(LL_NOTICE, "Remove psync waiting replica %s with cid %llu from replicas rax.", replicationGetReplicaName(replica), (long long unsigned int)replica->associated_rdb_client_id); } } From b32505fc1940773ead6971648bb17de00ec88ef3 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 08:01:30 +0000 Subject: [PATCH 18/32] NOT FOR MERGE - debug logs Signed-off-by: naglera --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 5fbe9ac3d8..36d3b846ed 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1770,7 +1770,7 @@ void freeClient(client *c) { void freeClientAsync(client *c) { if (c->flag.close_asap || c->flag.script) return; c->flag.close_asap = 1; - serverAssert(listSearchKey(server.clients_to_close,c) != NULL); // TODO remove + serverAssert(listSearchKey(server.clients_to_close,c) == NULL); // TODO remove listAddNodeTail(server.clients_to_close, c); } From 5d3baf9d4ff0a59d4dfb61bf9c62beb1430e236d Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 09:23:43 +0000 Subject: [PATCH 19/32] Fix freeClientAsync double free Signed-off-by: naglera --- src/networking.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/networking.c b/src/networking.c index 36d3b846ed..fd7a162f53 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1897,15 +1897,13 @@ int freeClientsInAsyncFreeQueue(void) { c->rdb_client_disconnect_time = server.unixtime; serverLog(LL_NOTICE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id, replicationGetReplicaName(c), server.wait_before_rdb_client_free); - continue; - } - if (server.unixtime - c->rdb_client_disconnect_time > server.wait_before_rdb_client_free) { - serverLog(LL_NOTICE, - "Replica main channel failed to establish PSYNC within the grace period (%ld seconds). " - "Freeing RDB client %llu.", - (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); - c->flag.protected_rdb_channel = 0; } + if (server.unixtime - c->rdb_client_disconnect_time <= server.wait_before_rdb_client_free) continue; + serverLog(LL_NOTICE, + "Replica main channel failed to establish PSYNC within the grace period (%ld seconds). " + "Freeing RDB client %llu.", + (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); + c->flag.protected_rdb_channel = 0; } if (c->flag.protected) continue; From dbac48bd94fbe5390d4315c77b0fb91d73ef42e3 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 11:14:54 +0000 Subject: [PATCH 20/32] NOT FOR MERGE - debug logs Signed-off-by: naglera --- src/replication.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 4a12ed5755..96a2273aab 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1392,6 +1392,14 @@ void replconfCommand(client *c) { /* REPLCONF identify is used to identify the current replica main channel with existing * rdb-connection with the given id. */ long long client_id = 0; + robj *o = c->argv[j + 1]; + if (sdsEncodedObject(o)) { + serverLog(LL_NOTICE, "replconfCommand got from replica (char*)o->ptr %s", (char*)o->ptr); + } else if (o->encoding == OBJ_ENCODING_INT) { + serverLog(LL_NOTICE, "replconfCommand got from replica (long)o->ptr %ld", (long)o->ptr); + } else { + serverPanic("Unknown string encoding"); + } if (getLongLongFromObjectOrReply(c, c->argv[j + 1], &client_id, NULL) != C_OK) { return; } @@ -2532,6 +2540,7 @@ void replicationAbortDualChannelSyncTransfer(void) { server.repl_provisional_primary.conn = NULL; server.repl_provisional_primary.dbid = -1; server.rdb_client_id = -1; + serverLog(LL_NOTICE, "replicationAbortDualChannelSyncTransfer ull2string server.rdb_client_id = -1"); freePendingReplDataBuf(); return; } @@ -2673,6 +2682,7 @@ static void fullSyncWithPrimary(connection *conn) { } sdsfree(err); server.rdb_client_id = rdb_client_id; + serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string server.rdb_client_id %lu",server.rdb_client_id); server.primary_initial_offset = reploffset; /* Initiate repl_provisional_primary to act as this replica temp primary until RDB is loaded */ @@ -2874,6 +2884,7 @@ void dualChannelSyncSuccess(void) { /* We can resume reading from the primary connection once the local replication buffer has been loaded. */ replicationSteadyStateInit(); replicationSendAck(); /* Send ACK to notify primary that replica is synced */ + serverLog(LL_NOTICE, "dualChannelSyncSuccess ull2string server.rdb_client_id = -1"); server.rdb_client_id = -1; server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_STATE_NONE; } @@ -3165,7 +3176,8 @@ void setupMainConnForPsync(connection *conn) { char *err = NULL; if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { /* We already have an initialized connection at primary side, we only need to associate it with RDB connection */ - ull2string(llstr, sizeof(llstr), server.rdb_client_id); + int k = ull2string(llstr, sizeof(llstr), server.rdb_client_id); + serverLog(3, "setupMainConnForPsync ull2string %s %i",llstr,k); err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL); if (err) goto error; server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; From ff5bd6841553da98fd31169f73970fb29a4b5f0f Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 11:24:06 +0000 Subject: [PATCH 21/32] NOT FOR MERGE - debug logs Signed-off-by: naglera --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 96a2273aab..31b743eeee 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2682,7 +2682,7 @@ static void fullSyncWithPrimary(connection *conn) { } sdsfree(err); server.rdb_client_id = rdb_client_id; - serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string server.rdb_client_id %lu",server.rdb_client_id); + serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string server.rdb_client_id %llu",server.rdb_client_id); server.primary_initial_offset = reploffset; /* Initiate repl_provisional_primary to act as this replica temp primary until RDB is loaded */ From 2ad0e0d94e22d2768509e48add3b7f158aab62b2 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 11:28:01 +0000 Subject: [PATCH 22/32] NOT FOR MERGE - debug logs Signed-off-by: naglera --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 31b743eeee..53d67c44a9 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2682,7 +2682,7 @@ static void fullSyncWithPrimary(connection *conn) { } sdsfree(err); server.rdb_client_id = rdb_client_id; - serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string server.rdb_client_id %llu",server.rdb_client_id); + serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string server.rdb_client_id %llu",(long long unsigned int)server.rdb_client_id); server.primary_initial_offset = reploffset; /* Initiate repl_provisional_primary to act as this replica temp primary until RDB is loaded */ From 5b0e4476bdc8a645fa2c48efa785f9e8bbffd9c0 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 12:17:13 +0000 Subject: [PATCH 23/32] NOT FOR MERGE - debug logs Signed-off-by: naglera --- src/replication.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/replication.c b/src/replication.c index 53d67c44a9..653d1d52f3 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2672,6 +2672,7 @@ static void fullSyncWithPrimary(connection *conn) { sdsfree(err); return; } + serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string message got from primary: %s",err); long long reploffset; char primary_replid[CONFIG_RUN_ID_SIZE + 1]; int dbid; From 5b9cb0e72a584b4c1cf8c7a8658eb54588feb992 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 12:17:42 +0000 Subject: [PATCH 24/32] fix endoffset format Signed-off-by: naglera --- src/replication.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/replication.c b/src/replication.c index 653d1d52f3..3ca4e54e24 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2663,7 +2663,7 @@ static void fullSyncWithPrimary(connection *conn) { } /* Receive end offset response */ if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_ENDOFF) { - int64_t rdb_client_id; + uint64_t rdb_client_id; err = receiveSynchronousResponse(conn); if (err == NULL) goto error; if (err[0] == '\0') { @@ -2677,7 +2677,7 @@ static void fullSyncWithPrimary(connection *conn) { char primary_replid[CONFIG_RUN_ID_SIZE + 1]; int dbid; /* Parse end offset response */ - char *endoff_format = "$ENDOFF:%lld %40s %d %ld"; + char *endoff_format = "$ENDOFF:%lld %40s %d %"SCNu64; if (sscanf(err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) { goto error; } From 4a5ec3c172617a9e8550ac3761b2f5206e024366 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 13:40:08 +0000 Subject: [PATCH 25/32] fix endoffset format 2 Signed-off-by: naglera --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 3ca4e54e24..5e39900b3b 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2677,7 +2677,7 @@ static void fullSyncWithPrimary(connection *conn) { char primary_replid[CONFIG_RUN_ID_SIZE + 1]; int dbid; /* Parse end offset response */ - char *endoff_format = "$ENDOFF:%lld %40s %d %"SCNu64; + char *endoff_format = "$ENDOFF:%lld %40s %d %llu"; if (sscanf(err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) { goto error; } From 1bf7dbe411eebe1bfd05d16451446e52e92c7c81 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 14:00:31 +0000 Subject: [PATCH 26/32] Revert "NOT FOR MERGE - debug logs" This reverts commit a967cb49a169fc00f842afaf0f9cd1d01deaf910. Signed-off-by: naglera --- src/replication.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 5e39900b3b..1676ce3ba5 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2672,7 +2672,6 @@ static void fullSyncWithPrimary(connection *conn) { sdsfree(err); return; } - serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string message got from primary: %s",err); long long reploffset; char primary_replid[CONFIG_RUN_ID_SIZE + 1]; int dbid; From 0d59c7f5fa4264700856ef3426bd7609f6af8652 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 14:00:34 +0000 Subject: [PATCH 27/32] Revert "NOT FOR MERGE - debug logs" This reverts commit 00e62bd5336d624571cfd0f754f0f61668339916. Signed-off-by: naglera --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 1676ce3ba5..5ac3a918bc 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2682,7 +2682,7 @@ static void fullSyncWithPrimary(connection *conn) { } sdsfree(err); server.rdb_client_id = rdb_client_id; - serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string server.rdb_client_id %llu",(long long unsigned int)server.rdb_client_id); + serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string server.rdb_client_id %llu",server.rdb_client_id); server.primary_initial_offset = reploffset; /* Initiate repl_provisional_primary to act as this replica temp primary until RDB is loaded */ From d3eac4b4493a5407e177e88b16c77596b7c56e02 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 14:00:37 +0000 Subject: [PATCH 28/32] Revert "NOT FOR MERGE - debug logs" This reverts commit 4b4795918bcf5900141b46f29d29a50921f1f7f2. Signed-off-by: naglera --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 5ac3a918bc..87a5492aa8 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2682,7 +2682,7 @@ static void fullSyncWithPrimary(connection *conn) { } sdsfree(err); server.rdb_client_id = rdb_client_id; - serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string server.rdb_client_id %llu",server.rdb_client_id); + serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string server.rdb_client_id %lu",server.rdb_client_id); server.primary_initial_offset = reploffset; /* Initiate repl_provisional_primary to act as this replica temp primary until RDB is loaded */ From eebf226ebaa6f31ed5a880cac1a03a795d0598b0 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 14:00:38 +0000 Subject: [PATCH 29/32] Revert "NOT FOR MERGE - debug logs" This reverts commit a88ef2d9b8c516172e6d0033744d73a74797707c. Signed-off-by: naglera --- src/replication.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/replication.c b/src/replication.c index 87a5492aa8..237b7a95b7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1392,14 +1392,6 @@ void replconfCommand(client *c) { /* REPLCONF identify is used to identify the current replica main channel with existing * rdb-connection with the given id. */ long long client_id = 0; - robj *o = c->argv[j + 1]; - if (sdsEncodedObject(o)) { - serverLog(LL_NOTICE, "replconfCommand got from replica (char*)o->ptr %s", (char*)o->ptr); - } else if (o->encoding == OBJ_ENCODING_INT) { - serverLog(LL_NOTICE, "replconfCommand got from replica (long)o->ptr %ld", (long)o->ptr); - } else { - serverPanic("Unknown string encoding"); - } if (getLongLongFromObjectOrReply(c, c->argv[j + 1], &client_id, NULL) != C_OK) { return; } @@ -2540,7 +2532,6 @@ void replicationAbortDualChannelSyncTransfer(void) { server.repl_provisional_primary.conn = NULL; server.repl_provisional_primary.dbid = -1; server.rdb_client_id = -1; - serverLog(LL_NOTICE, "replicationAbortDualChannelSyncTransfer ull2string server.rdb_client_id = -1"); freePendingReplDataBuf(); return; } @@ -2682,7 +2673,6 @@ static void fullSyncWithPrimary(connection *conn) { } sdsfree(err); server.rdb_client_id = rdb_client_id; - serverLog(LL_NOTICE, "fullSyncWithPrimary ull2string server.rdb_client_id %lu",server.rdb_client_id); server.primary_initial_offset = reploffset; /* Initiate repl_provisional_primary to act as this replica temp primary until RDB is loaded */ @@ -2884,7 +2874,6 @@ void dualChannelSyncSuccess(void) { /* We can resume reading from the primary connection once the local replication buffer has been loaded. */ replicationSteadyStateInit(); replicationSendAck(); /* Send ACK to notify primary that replica is synced */ - serverLog(LL_NOTICE, "dualChannelSyncSuccess ull2string server.rdb_client_id = -1"); server.rdb_client_id = -1; server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_STATE_NONE; } @@ -3176,8 +3165,7 @@ void setupMainConnForPsync(connection *conn) { char *err = NULL; if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { /* We already have an initialized connection at primary side, we only need to associate it with RDB connection */ - int k = ull2string(llstr, sizeof(llstr), server.rdb_client_id); - serverLog(3, "setupMainConnForPsync ull2string %s %i",llstr,k); + ull2string(llstr, sizeof(llstr), server.rdb_client_id); err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL); if (err) goto error; server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; From 08d0b3e646e61657f4ec134802c387259f362766 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 14:00:40 +0000 Subject: [PATCH 30/32] Revert "NOT FOR MERGE - debug logs" This reverts commit bb53f859ea100b1a33386fe824e7579cf815440c. Signed-off-by: naglera --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index fd7a162f53..f8607dacce 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1770,7 +1770,7 @@ void freeClient(client *c) { void freeClientAsync(client *c) { if (c->flag.close_asap || c->flag.script) return; c->flag.close_asap = 1; - serverAssert(listSearchKey(server.clients_to_close,c) == NULL); // TODO remove + serverAssert(listSearchKey(server.clients_to_close,c) != NULL); // TODO remove listAddNodeTail(server.clients_to_close, c); } From b2f739fb28b0e9bf7ccf791624a1502a13772168 Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 14:00:42 +0000 Subject: [PATCH 31/32] Revert "NOT FOR MERGE - debug logs" This reverts commit 9e3ad13eeaf89ed951146af1f4c964ed4479e49d. Signed-off-by: naglera --- src/networking.c | 3 +-- src/replication.c | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/networking.c b/src/networking.c index f8607dacce..c2e986bbb2 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1770,7 +1770,6 @@ void freeClient(client *c) { void freeClientAsync(client *c) { if (c->flag.close_asap || c->flag.script) return; c->flag.close_asap = 1; - serverAssert(listSearchKey(server.clients_to_close,c) != NULL); // TODO remove listAddNodeTail(server.clients_to_close, c); } @@ -1895,7 +1894,7 @@ int freeClientsInAsyncFreeQueue(void) { if (!c->rdb_client_disconnect_time) { if (c->conn) connSetReadHandler(c->conn, NULL); c->rdb_client_disconnect_time = server.unixtime; - serverLog(LL_NOTICE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id, + serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id, replicationGetReplicaName(c), server.wait_before_rdb_client_free); } if (server.unixtime - c->rdb_client_disconnect_time <= server.wait_before_rdb_client_free) continue; diff --git a/src/replication.c b/src/replication.c index 237b7a95b7..1da616d801 100644 --- a/src/replication.c +++ b/src/replication.c @@ -225,7 +225,7 @@ void addRdbReplicaToPsyncWait(client *replica_rdb_client) { tail->refcount++; } } - serverLog(LL_NOTICE, "Add rdb replica %s to waiting psync, with cid %llu, %s ", + serverLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ", replicationGetReplicaName(replica_rdb_client), (unsigned long long)replica_rdb_client->id, tail ? "tracking repl-backlog tail" : "no repl-backlog to track"); replica_rdb_client->ref_repl_buf_node = tail ? ln : NULL; @@ -250,7 +250,7 @@ void backfillRdbReplicasToPsyncWait(void) { if (replica_rdb_client->ref_repl_buf_node) continue; replica_rdb_client->ref_repl_buf_node = ln; head->refcount++; - serverLog(LL_NOTICE, "Attach replica rdb client %llu to repl buf block", + serverLog(LL_DEBUG, "Attach replica rdb client %llu to repl buf block", (long long unsigned int)replica_rdb_client->id); } raxStop(&iter); @@ -269,7 +269,7 @@ void removeReplicaFromPsyncWait(client *replica_main_client) { } replica_rdb_client->ref_repl_buf_node = NULL; replica_rdb_client->flag.protected_rdb_channel = 0; - serverLog(LL_NOTICE, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", + serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", replicationGetReplicaName(replica_main_client), (long long unsigned int)replica_main_client->associated_rdb_client_id, o ? "ref count decreased" : "doesn't exist"); @@ -389,7 +389,7 @@ void freeReplicaReferencedReplBuffer(client *replica) { if (replica->flag.repl_rdb_channel) { uint64_t rdb_cid = htonu64(replica->id); if (raxRemove(server.replicas_waiting_psync, (unsigned char *)&rdb_cid, sizeof(rdb_cid), NULL)) { - serverLog(LL_NOTICE, "Remove psync waiting replica %s with cid %llu from replicas rax.", + serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.", replicationGetReplicaName(replica), (long long unsigned int)replica->associated_rdb_client_id); } } From 468c969b173f1285d19ece8929ebb122df1fb6fb Mon Sep 17 00:00:00 2001 From: naglera Date: Thu, 25 Jul 2024 14:02:09 +0000 Subject: [PATCH 32/32] clang format Signed-off-by: naglera --- src/networking.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/networking.c b/src/networking.c index c2e986bbb2..59b894367c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1899,9 +1899,9 @@ int freeClientsInAsyncFreeQueue(void) { } if (server.unixtime - c->rdb_client_disconnect_time <= server.wait_before_rdb_client_free) continue; serverLog(LL_NOTICE, - "Replica main channel failed to establish PSYNC within the grace period (%ld seconds). " - "Freeing RDB client %llu.", - (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); + "Replica main channel failed to establish PSYNC within the grace period (%ld seconds). " + "Freeing RDB client %llu.", + (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); c->flag.protected_rdb_channel = 0; }