diff --git a/src/debug.c b/src/debug.c index 2ea0c1a3e8..27bc481767 100644 --- a/src/debug.c +++ b/src/debug.c @@ -495,8 +495,8 @@ void debugCommand(client *c) { " In case RESET is provided the peak reset time will be restored to the default value", "REPLYBUFFER RESIZING <0|1>", " Enable or disable the reply buffer resize cron job", - "SLEEP-AFTER-FORK-SECONDS ", - " Stop the server's main process for after forking.", + "PAUSE-AFTER-FORK <0|1>", + " Stop the server's main process after fork.", "DELAY-RDB-CLIENT-FREE-SECOND ", " Grace period in seconds for replica main channel to establish psync.", "DICT-RESIZING <0|1>", @@ -995,12 +995,8 @@ void debugCommand(client *c) { return; } addReply(c, shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr, "sleep-after-fork-seconds") && c->argc == 3) { - double sleep_after_fork_seconds; - if (getDoubleFromObjectOrReply(c, c->argv[2], &sleep_after_fork_seconds, NULL) != C_OK) { - return; - } - server.debug_sleep_after_fork_us = (int)(sleep_after_fork_seconds * 1e6); + } else if (!strcasecmp(c->argv[1]->ptr, "pause-after-fork") && c->argc == 3) { + server.debug_pause_after_fork = atoi(c->argv[2]->ptr); addReply(c, shared.ok); } else if (!strcasecmp(c->argv[1]->ptr, "delay-rdb-client-free-seconds") && c->argc == 3) { server.wait_before_rdb_client_free = atoi(c->argv[2]->ptr); @@ -2305,6 +2301,12 @@ void applyWatchdogPeriod(void) { } } +void debugPauseProcess(void) { + serverLog(LL_NOTICE, "Process is about to stop."); + raise(SIGSTOP); + serverLog(LL_NOTICE, "Process has been continued."); +} + /* Positive input is sleep time in microseconds. Negative input is fractions * of microseconds, i.e. -10 means 100 nanoseconds. */ void debugDelay(int usec) { diff --git a/src/networking.c b/src/networking.c index 501476e35d..59b894367c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1896,15 +1896,13 @@ int freeClientsInAsyncFreeQueue(void) { c->rdb_client_disconnect_time = server.unixtime; serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id, replicationGetReplicaName(c), server.wait_before_rdb_client_free); - continue; - } - if (server.unixtime - c->rdb_client_disconnect_time > server.wait_before_rdb_client_free) { - serverLog(LL_NOTICE, - "Replica main channel failed to establish PSYNC within the grace period (%ld seconds). " - "Freeing RDB client %llu.", - (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); - c->flag.protected_rdb_channel = 0; } + if (server.unixtime - c->rdb_client_disconnect_time <= server.wait_before_rdb_client_free) continue; + serverLog(LL_NOTICE, + "Replica main channel failed to establish PSYNC within the grace period (%ld seconds). " + "Freeing RDB client %llu.", + (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); + c->flag.protected_rdb_channel = 0; } if (c->flag.protected) continue; @@ -4332,9 +4330,9 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) { if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(), c); /* Remove RDB connection protection on COB overrun */ - c->flag.protected_rdb_channel = 0; - if (async) { + if (async || c->flag.protected_rdb_channel) { + c->flag.protected_rdb_channel = 0; freeClientAsync(c); serverLog(LL_WARNING, "Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); diff --git a/src/replication.c b/src/replication.c index b00da525bf..1da616d801 100644 --- a/src/replication.c +++ b/src/replication.c @@ -961,7 +961,7 @@ int startBgsaveForReplication(int mincapa, int req) { /* Keep the page cache since it'll get used soon */ retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE); } - if (server.debug_sleep_after_fork_us) usleep(server.debug_sleep_after_fork_us); + if (server.debug_pause_after_fork) debugPauseProcess(); } else { serverLog(LL_WARNING, "BGSAVE for replication: replication information not available, can't generate the RDB " "file right now. Try later."); @@ -2654,7 +2654,7 @@ static void fullSyncWithPrimary(connection *conn) { } /* Receive end offset response */ if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_ENDOFF) { - int64_t rdb_client_id; + uint64_t rdb_client_id; err = receiveSynchronousResponse(conn); if (err == NULL) goto error; if (err[0] == '\0') { @@ -2667,7 +2667,7 @@ static void fullSyncWithPrimary(connection *conn) { char primary_replid[CONFIG_RUN_ID_SIZE + 1]; int dbid; /* Parse end offset response */ - char *endoff_format = "$ENDOFF:%lld %40s %d %ld"; + char *endoff_format = "$ENDOFF:%lld %40s %d %llu"; if (sscanf(err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) { goto error; } @@ -2859,9 +2859,14 @@ void dualChannelSyncSuccess(void) { server.primary_initial_offset = server.repl_provisional_primary.reploff; replicationResurrectProvisionalPrimary(); /* Wait for the accumulated buffer to be processed before reading any more replication updates */ - if (streamReplDataBufToDb(server.primary) == C_ERR) { + if (server.pending_repl_data.blocks && streamReplDataBufToDb(server.primary) == C_ERR) { /* Sync session aborted during repl data streaming. */ serverLog(LL_WARNING, "Failed to stream local replication buffer into memory"); + /* Verify sync is still in progress */ + if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { + replicationAbortDualChannelSyncTransfer(); + replicationUnsetPrimary(); + } return; } freePendingReplDataBuf(); @@ -3160,7 +3165,7 @@ void setupMainConnForPsync(connection *conn) { char *err = NULL; if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { /* We already have an initialized connection at primary side, we only need to associate it with RDB connection */ - ll2string(llstr, sizeof(llstr), server.rdb_client_id); + ull2string(llstr, sizeof(llstr), server.rdb_client_id); err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL); if (err) goto error; server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; @@ -3181,7 +3186,7 @@ void setupMainConnForPsync(connection *conn) { } if (server.repl_state == REPL_STATE_SEND_PSYNC) { - if (server.debug_sleep_after_fork_us) usleep(server.debug_sleep_after_fork_us); + if (server.debug_pause_after_fork) debugPauseProcess(); if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { serverLog(LL_WARNING, "Aborting dual channel sync. Write error."); cancelReplicationHandshake(1); diff --git a/src/server.h b/src/server.h index 50ab8e6f30..10dc52ba3a 100644 --- a/src/server.h +++ b/src/server.h @@ -2005,8 +2005,8 @@ struct valkeyServer { * use dual channel replication for full syncs. */ int wait_before_rdb_client_free; /* Grace period in seconds for replica main channel * to establish psync. */ - int debug_sleep_after_fork_us; /* Debug param that force the main process to - * sleep for N microseconds after fork() in repl. */ + int debug_pause_after_fork; /* Debug param that pauses the main process + * after a replication fork() (for bgsave). */ size_t repl_buffer_mem; /* The memory of replication buffer. */ list *repl_buffer_blocks; /* Replication buffers blocks list * (serving replica clients and repl backlog) */ @@ -3981,6 +3981,7 @@ void killThreads(void); void makeThreadKillable(void); void swapMainDbWithTempDb(serverDb *tempDb); sds getVersion(void); +void debugPauseProcess(void); /* Use macro for checking log level to avoid evaluating arguments in cases log * should be ignored due to low level. */ diff --git a/tests/helpers/bg_server_sleep.tcl b/tests/helpers/bg_server_sleep.tcl deleted file mode 100644 index 7e46fe9c90..0000000000 --- a/tests/helpers/bg_server_sleep.tcl +++ /dev/null @@ -1,10 +0,0 @@ -source tests/support/valkey.tcl -source tests/support/util.tcl - -proc bg_server_sleep {host port sec} { - set r [valkey $host $port 0] - $r client setname SLEEP_HANDLER - $r debug sleep $sec -} - -bg_server_sleep [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index 7344d0d3b1..e986a4cd8d 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -5,15 +5,6 @@ proc log_file_matches {log pattern} { string match $pattern $content } -proc start_bg_server_sleep {host port sec} { - set tclsh [info nameofexecutable] - exec $tclsh tests/helpers/bg_server_sleep.tcl $host $port $sec & -} - -proc stop_bg_server_sleep {handle} { - catch {exec /bin/kill -9 $handle} -} - proc get_client_id_by_last_cmd {r cmd} { set client_list [$r client list] set client_id "" @@ -37,6 +28,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] + $replica config set loglevel debug start_server {} { set primary [srv 0 client] set primary_host [srv 0 host] @@ -47,6 +39,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary config set repl-diskless-sync yes $primary config set repl-diskless-sync-delay 1000 $primary config set dual-channel-replication-enabled yes + $primary config set loglevel debug # Start the replication process... $replica config set dual-channel-replication-enabled yes @@ -267,7 +260,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set primary_port [srv 0 port] set loglines [count_log_lines -1] - populate 10000 primary 10000 + populate 10000 primary 10 $primary set key1 val1 $primary config set repl-diskless-sync yes @@ -284,23 +277,13 @@ start_server {tags {"dual-channel-replication external:skip"}} { test "dual-channel-replication with multiple replicas" { $replica1 replicaof $primary_host $primary_port $replica2 replicaof $primary_host $primary_port + verify_replica_online $primary 0 500 + verify_replica_online $primary 1 500 wait_for_value_to_propegate_to_replica $primary $replica1 "key1" wait_for_value_to_propegate_to_replica $primary $replica2 "key1" - wait_for_condition 100 100 { - [s 0 total_forks] eq "1" - } else { - fail "Primary <-> Replica didn't start the full sync" - } - - verify_replica_online $primary 0 500 - verify_replica_online $primary 1 500 - wait_for_condition 50 1000 { - [status $replica1 master_link_status] == "up" - } else { - fail "Replica is not synced" - } + assert {[s 0 total_forks] eq "1" } } $replica1 replicaof no one @@ -314,12 +297,10 @@ start_server {tags {"dual-channel-replication external:skip"}} { test "Test diverse replica sync: dual-channel on/off" { $replica1 replicaof $primary_host $primary_port $replica2 replicaof $primary_host $primary_port - - wait_for_value_to_propegate_to_replica $primary $replica1 "key2" - wait_for_value_to_propegate_to_replica $primary $replica2 "key2" - verify_replica_online $primary 0 500 verify_replica_online $primary 1 500 + wait_for_value_to_propegate_to_replica $primary $replica1 "key2" + wait_for_value_to_propegate_to_replica $primary $replica2 "key2" wait_for_condition 50 1000 { [status $replica1 master_link_status] == "up" } else { @@ -328,7 +309,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } $replica1 replicaof no one - $primary set key4 val4 + $primary set key3 val3 test "Test replica's buffer limit reached" { $primary config set repl-diskless-sync-delay 0 @@ -338,7 +319,6 @@ start_server {tags {"dual-channel-replication external:skip"}} { # It will give us enough time to fill the replica buffer. $replica1 config set dual-channel-replication-enabled yes $replica1 config set client-output-buffer-limit "replica 16383 16383 0" - $replica1 config set loglevel debug $replica1 replicaof $primary_host $primary_port # Wait for replica to establish psync using main channel @@ -348,7 +328,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { fail "replica didn't start sync session in time" } - populate 10000 primary 10000 + populate 10000 primary 10; # set ~ 100kb # Wait for replica's buffer limit reached wait_for_condition 50 1000 { [log_file_matches $replica1_log "*Replication buffer limit reached, stopping buffering*"] @@ -358,18 +338,18 @@ start_server {tags {"dual-channel-replication external:skip"}} { assert {[s -2 replicas_replication_buffer_size] <= 16385*2} # Wait for sync to succeed - wait_for_value_to_propegate_to_replica $primary $replica1 "key4" wait_for_condition 50 1000 { [status $replica1 master_link_status] == "up" } else { fail "Replica is not synced" } + wait_for_value_to_propegate_to_replica $primary $replica1 "key3" } $replica1 replicaof no one $replica1 config set client-output-buffer-limit "replica 256mb 256mb 0"; # remove repl buffer limitation - $primary set key5 val5 + $primary set key4 val4 test "dual-channel-replication fails when primary diskless disabled" { set cur_psync [status $primary sync_partial_ok] @@ -379,13 +359,12 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica1 replicaof $primary_host $primary_port # Wait for mitigation and resync - wait_for_value_to_propegate_to_replica $primary $replica1 "key5" - wait_for_condition 50 1000 { [status $replica1 master_link_status] == "up" } else { fail "Replica is not synced" } + wait_for_value_to_propegate_to_replica $primary $replica1 "key4" # Verify that we did not use dual-channel-replication sync assert {[status $primary sync_partial_ok] == $cur_psync} @@ -403,12 +382,12 @@ start_server {tags {"dual-channel-replication external:skip"}} { set primary [srv 0 client] set primary_host [srv 0 host] set primary_port [srv 0 port] - set loglines [count_log_lines -1] + set primary_pid [srv 0 pid] # Create small enough db to be loaded before replica establish psync connection $primary set key1 val1 $primary config set repl-diskless-sync yes - $primary debug sleep-after-fork-seconds 5;# Stop primary after fork for 5 seconds + $primary debug pause-after-fork 1 $primary config set dual-channel-replication-enabled yes $replica config set dual-channel-replication-enabled yes @@ -416,6 +395,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { test "Test dual-channel-replication sync- psync established after rdb load" { $replica replicaof $primary_host $primary_port + wait_for_log_messages -1 {"*Done loading RDB*"} 0 2000 1 + resume_process $primary_pid verify_replica_online $primary 0 500 wait_for_condition 50 1000 { @@ -425,9 +406,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } wait_for_value_to_propegate_to_replica $primary $replica "key1" # Confirm the occurrence of a race condition. - set res [wait_for_log_messages -1 {"*Dual channel sync - psync established after rdb load*"} $loglines 2000 1] - set loglines [lindex $res 1] - incr $loglines + wait_for_log_messages -1 {"*Dual channel sync - psync established after rdb load*"} 0 2000 1 } } } @@ -437,6 +416,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] + set replica_pid [srv 0 pid] start_server {} { set primary [srv 0 client] set primary_host [srv 0 host] @@ -459,8 +439,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica config set dual-channel-replication-enabled yes $replica config set loglevel debug $replica config set repl-timeout 10 - # Stop replica after primary fork for 5 seconds - $replica debug sleep-after-fork-seconds 5 + # Pause replica after primary fork + $replica debug pause-after-fork 1 test "dual-channel-replication: Primary COB growth with inactive replica" { $replica replicaof $primary_host $primary_port @@ -470,6 +450,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } else { fail "Primary should allow backlog to grow beyond its limits during dual-channel-replication sync handshake" } + resume_process $replica_pid verify_replica_online $primary 0 500 wait_for_condition 50 1000 { @@ -491,11 +472,13 @@ start_server {tags {"dual-channel-replication external:skip"}} { set replica1_host [srv 0 host] set replica1_port [srv 0 port] set replica1_log [srv 0 stdout] + set replica1_pid [srv 0 pid] start_server {} { set replica2 [srv 0 client] set replica2_host [srv 0 host] set replica2_port [srv 0 port] set replica2_log [srv 0 stdout] + set replica2_pid [srv 0 pid] start_server {} { set primary [srv 0 client] set primary_host [srv 0 host] @@ -520,15 +503,16 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica1 config set repl-timeout 10 $replica2 config set repl-timeout 10 - # Stop replica after primary fork for 2 seconds - $replica1 debug sleep-after-fork-seconds 2 - $replica2 debug sleep-after-fork-seconds 2 + # Pause replicas after primary forks for + $replica1 debug pause-after-fork 1 + $replica2 debug pause-after-fork 1 test "Test dual-channel: primary tracking replica backlog refcount - start with empty backlog" { $replica1 replicaof $primary_host $primary_port set res [wait_for_log_messages 0 {"*Add rdb replica * no repl-backlog to track*"} $loglines 2000 1] set res [wait_for_log_messages 0 {"*Attach replica rdb client*"} $loglines 2000 1] set loglines [lindex $res 1] - incr $loglines + incr $loglines + resume_process $replica1_pid verify_replica_online $primary 0 700 wait_for_condition 50 1000 { [status $replica1 master_link_status] == "up" @@ -543,7 +527,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica2 replicaof $primary_host $primary_port set res [wait_for_log_messages 0 {"*Add rdb replica * tracking repl-backlog tail*"} $loglines 2000 1] set loglines [lindex $res 1] - incr $loglines + incr $loglines + resume_process $replica2_pid verify_replica_online $primary 0 700 wait_for_condition 50 1000 { [status $replica2 master_link_status] == "up" @@ -562,6 +547,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set primary [srv 0 client] set primary_host [srv 0 host] set primary_port [srv 0 port] + set primary_pid [srv 0 pid] $primary config set repl-diskless-sync yes $primary config set dual-channel-replication-enabled yes @@ -570,36 +556,43 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary config set repl-timeout 10 # generate small db populate 10 primary 10 - # Stop primary main process after fork for 2 seconds - $primary debug sleep-after-fork-seconds 2 - $primary debug delay-rdb-client-free-seconds 5 + # Pause primary main process after fork + $primary debug pause-after-fork 1 + # Give replica two second grace period before disconnection + $primary debug delay-rdb-client-free-seconds 2 start_server {} { set replica [srv 0 client] set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] + set replica_pid [srv 0 pid] set loglines [count_log_lines 0] set load_handle0 [start_write_load $primary_host $primary_port 20] $replica config set dual-channel-replication-enabled yes $replica config set loglevel debug - $replica config set repl-timeout 10 + $replica config set repl-timeout 10 test "Psync established after rdb load - within grace period" { + # Test Sequence: + # 1. Replica initiates synchronization via RDB channel. + # 2. Primary's main process is suspended. + # 3. Replica completes RDB loading and pauses before establishing PSYNC connection. + # 4. Primary resumes operation and detects closed RDB channel. + # 5. Replica resumes operation. + # Expected outcome: Primary maintains RDB channel until replica establishes PSYNC connection. $replica replicaof $primary_host $primary_port - set res [wait_for_log_messages 0 {"*Done loading RDB*"} $loglines 2000 1] - set loglines [lindex $res 1] - incr $loglines - # At this point rdb is loaded but psync hasn't been established yet. - # Force the replica to sleep for 3 seconds so the primary main process will wake up, while the replica is unresponsive. - set sleep_handle [start_bg_server_sleep $replica_host $replica_port 3] + wait_for_log_messages 0 {"*Done loading RDB*"} $loglines 2000 1 + pause_process $replica_pid + resume_process $primary_pid wait_for_condition 50 100 { [string match {*replicas_waiting_psync:1*} [$primary info replication]] } else { fail "Primary freed RDB client before psync was established" } + resume_process $replica_pid verify_replica_online $primary 0 500 wait_for_condition 50 100 { @@ -617,6 +610,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] + set replica_pid [srv 0 pid] set loglines [count_log_lines 0] set load_handle0 [start_write_load $primary_host $primary_port 20] @@ -626,14 +620,16 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica config set repl-timeout 10 test "Psync established after RDB load - beyond grace period" { + # Test Sequence: + # 1. Replica initiates synchronization via RDB channel. + # 2. Primary's main process is suspended. + # 3. Replica completes RDB loading and pauses before establishing PSYNC connection. + # 4. Primary resumes operation and detects closed RDB channel. + # Expected outcome: Primary drops the RDB channel after grace period is done. $replica replicaof $primary_host $primary_port - set res [wait_for_log_messages 0 {"*Done loading RDB*"} $loglines 2000 1] - set loglines [lindex $res 1] - incr $loglines - # At this point rdb is loaded but psync hasn't been established yet. - # Force the replica to sleep for 8 seconds so the primary main process will wake up, while the replica is unresponsive. - # We expect the grace time to be over before the replica wake up, so sync will fail. - set sleep_handle [start_bg_server_sleep $replica_host $replica_port 8] + wait_for_log_messages 0 {"*Done loading RDB*"} $loglines 2000 1 + pause_process $replica_pid + resume_process $primary_pid wait_for_condition 50 100 { [string match {*replicas_waiting_psync:1*} [$primary info replication]] } else { @@ -642,15 +638,12 @@ start_server {tags {"dual-channel-replication external:skip"}} { # Sync should fail once the replica ask for PSYNC using main channel set res [wait_for_log_messages -1 {"*Replica main channel failed to establish PSYNC within the grace period*"} 0 4000 1] - - # Should succeed on retry - verify_replica_online $primary 0 500 wait_for_condition 50 100 { [string match {*replicas_waiting_psync:0*} [$primary info replication]] } else { - fail "Primary did not free repl buf block after psync establishment" + fail "Primary did not free waiting psync replica after grace period" } - $replica replicaof no one + resume_process $replica_pid } stop_write_load $load_handle0 } @@ -661,6 +654,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { set primary_host [srv 0 host] set primary_port [srv 0 port] set loglines [count_log_lines 0] + set primary_pid [srv 0 pid] $primary config set repl-diskless-sync yes $primary config set dual-channel-replication-enabled yes @@ -668,13 +662,12 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary config set loglevel debug # generate small db populate 10 primary 10 - # Stop primary main process after fork for 1 seconds - $primary debug sleep-after-fork-seconds 2 start_server {} { set replica [srv 0 client] set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] + set replica_pid [srv 0 pid] set load_handle0 [start_write_load $primary_host $primary_port 20] set load_handle1 [start_write_load $primary_host $primary_port 20] @@ -685,31 +678,39 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica config set repl-timeout 10 test "Test dual-channel-replication primary gets cob overrun before established psync" { + # Pause primary main process after fork + $primary debug pause-after-fork 1 $replica replicaof $primary_host $primary_port wait_for_log_messages 0 {"*Done loading RDB*"} 0 2000 1 # At this point rdb is loaded but psync hasn't been established yet. - # Force the replica to sleep for 5 seconds so the primary main process will wake up while the - # replica is unresponsive. We expect the main process to fill the COB before the replica wakes. - set sleep_handle [start_bg_server_sleep $replica_host $replica_port 5] + # Pause the replica so the primary main process will wake up while the + # replica is unresponsive. We expect the main process to fill the COB and disconnect the replica. + pause_process $replica_pid + resume_process $primary_pid + $primary debug pause-after-fork 0 wait_for_log_messages -1 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 2000 1 wait_for_condition 50 100 { [string match {*replicas_waiting_psync:0*} [$primary info replication]] } else { fail "Primary did not free repl buf block after sync failure" } + resume_process $replica_pid set res [wait_for_log_messages -1 {"*Unable to partial resync with replica * for lack of backlog*"} $loglines 20000 1] set loglines [lindex $res 1] } - $replica replicaof no one - $replica debug sleep-after-fork-seconds 2 - + wait_for_condition 500 1000 { + [s -1 rdb_bgsave_in_progress] eq 0 + } else { + fail "Primary should abort sync" + } + + $replica debug pause-after-fork 1 $primary debug populate 1000 primary 100000 # Set primary with a slow rdb generation, so that we can easily intercept loading # 10ms per key, with 1000 keys is 10 seconds $primary config set rdb-key-save-delay 10000 - $primary debug sleep-after-fork-seconds 0 test "Test dual-channel-replication primary gets cob overrun during replica rdb load" { set cur_client_closed_count [s -1 client_output_buffer_limit_disconnections] @@ -725,6 +726,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } else { fail "Primary did not free repl buf block after sync failure" } + resume_process $replica_pid set res [wait_for_log_messages -1 {"*Unable to partial resync with replica * for lack of backlog*"} $loglines 20000 1] set loglines [lindex $res 0] } @@ -850,7 +852,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] - + set replica_pid [srv 0 pid] + set load_handle [start_write_load $primary_host $primary_port 20] $replica config set dual-channel-replication-enabled yes @@ -928,7 +931,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { test "Test dual-channel-replication primary reject set-rdb-client after client killed" { # Ensure replica main channel will not handshake before rdb client is killed - $replica debug sleep-after-fork-seconds 10 + $replica debug pause-after-fork 1 $replica replicaof $primary_host $primary_port # Wait for sync session to start wait_for_condition 500 1000 { @@ -944,13 +947,15 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary client kill id $replica_rdb_channel_id # Wait for primary to abort the sync wait_for_condition 10000000 10 { - [s -1 rdb_bgsave_in_progress] eq 0 + [s -1 rdb_bgsave_in_progress] eq 0 && + [string match {*replicas_waiting_psync:0*} [$primary info replication]] } else { fail "Primary should abort sync" } # Verify primary reject replconf set-rdb-client-id set res [catch {$primary replconf set-rdb-client-id $replica_rdb_channel_id} err] assert [string match *ERR* $err] + resume_process $replica_pid } stop_write_load $load_handle } @@ -1110,7 +1115,5 @@ start_server {tags {"dual-channel-replication external:skip"}} { fail "Replica offset didn't catch up with the primary after too long time" } } - - } -} \ No newline at end of file +} diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl index 2303d01273..18f8aa7eb5 100644 --- a/tests/integration/replication-buffer.tcl +++ b/tests/integration/replication-buffer.tcl @@ -27,6 +27,9 @@ start_server {} { # Make sure replica3 is synchronized with master $replica3 replicaof $master_host $master_port wait_for_sync $replica3 + if {$dualchannel == "yes"} { + wait_for_ofs_sync $master $replica3 + } # Generating RDB will take some 100 seconds $master config set rdb-key-save-delay 1000000