Skip to content

Commit

Permalink
Rdb channel for full sync
Browse files Browse the repository at this point in the history
In this PR we introduce the main benefit of rdb channel by continuously
steaming the COB in parallel to the RDB and thus keeping the primary
side COB small AND accelerating the overall sync process.
By streaming the replication data to the replica during the full sync,
we reduce
1. Memory load from the primary node.
2. CPU load from the primary main process (will be introduced in later
   PR).
This opens up possibilities to future improvements with better
TLS connection handling and removal of the the need to pipeline the RDB
from the child process to the main.

Squashed commit of the following:

commit 7d4756681655ae1c6e04681710740e422a6def4c
Merge: 97894e3 7253862
Author: naglera <[email protected]>
Date:   Wed May 22 08:15:26 2024 +0000

    Merge branch 'unstable' into rdb-channel

commit 97894e3
Author: Amit Nagler <[email protected]>
Date:   Sun May 19 15:21:58 2024 +0000

    fix psync stat error count

    rdb channel sync should also count psync failures.
    This commit fix tests "replication partial resync" to run with rdb
    channel enabled

commit a51b0a5
Author: Amit Nagler <[email protected]>
Date:   Sun May 19 11:06:04 2024 +0000

    fix test PSYNC2: Full resync after Master restart when too many key expired

commit 96c93f3
Author: Amit Nagler <[email protected]>
Date:   Sun May 19 09:17:45 2024 +0000

    Allow auth using rdb channel

commit 9916f80
Author: Amit Nagler <[email protected]>
Date:   Thu May 16 12:59:44 2024 +0000

    fix use after free error & adjust test's configuration

commit c343261
Author: Amit Nagler <[email protected]>
Date:   Thu May 16 12:19:33 2024 +0000

    spell check fix

commit e2decc4
Author: Amit Nagler <[email protected]>
Date:   Tue May 7 13:54:26 2024 +0000

    Fix replication tests to work with rdb channel

commit bc010e8
Author: Amit Nagler <[email protected]>
Date:   Thu May 9 10:40:47 2024 +0000

    fix initial offset issue at replica side

commit 1ecf5fb
Author: Amit Nagler <[email protected]>
Date:   Tue May 7 12:44:21 2024 +0000

    fix leak of repl_transfer_tmpfile, and use before allocation of end offset buffer

commit 47d482f
Author: Amit Nagler <[email protected]>
Date:   Tue May 7 10:05:13 2024 +0000

    fix crash after repl_transfer_s double free

commit b73dcc2
Author: Amit Nagler <[email protected]>
Date:   Sun Apr 21 15:43:09 2024 +0000

    replica connection free at fork process

commit eb28c91
Author: Amit Nagler <[email protected]>
Date:   Sun Apr 21 13:19:55 2024 +0000

    Allow abort sync connection from rdb con sync

commit 08885c5
Author: Amit Nagler <[email protected]>
Date:   Wed Apr 17 15:14:23 2024 +0000

    Revert "free repl_transfer_tmpfile after rdb sync abort"

    This reverts commit bfa469d.

commit b89fa41
Author: Amit Nagler <[email protected]>
Date:   Wed Apr 17 15:13:30 2024 +0000

    Fix cancel replicaiton handshake rdb-channel case

commit 2eaa109
Author: Amit Nagler <[email protected]>
Date:   Tue Apr 16 15:31:11 2024 +0000

    add new line at the end of rio.c

commit bfa469d
Author: Amit Nagler <[email protected]>
Date:   Tue Apr 16 15:28:26 2024 +0000

    free repl_transfer_tmpfile after rdb sync abort

commit e8462aa
Author: Amit Nagler <[email protected]>
Date:   Tue Apr 16 13:06:26 2024 +0000

    free conns in both direct an normal cases

commit 9ad4bd0
Author: Amit Nagler <[email protected]>
Date:   Wed Apr 10 11:03:43 2024 +0000

    stat_total_reads_processed->stat_net_repl_input_bytes

commit e56ca18
Author: Amit Nagler <[email protected]>
Date:   Tue Apr 9 16:17:30 2024 +0000

    add replconf sub command documentation

commit cefd9e2
Merge: 9454ec2 03650e9
Author: naglera <[email protected]>
Date:   Tue Apr 9 19:26:58 2024 +0300

    Merge branch 'unstable' into rdb-channel

commit 9454ec2
Author: naglera <[email protected]>
Date:   Tue Apr 9 15:09:00 2024 +0000

    fix comments

commit ad361c2
Author: Amit Nagler <[email protected]>
Date:   Tue Apr 9 09:15:34 2024 +0000

    Fix info format

commit 241b130
Author: Amit Nagler <[email protected]>
Date:   Tue Apr 9 08:56:47 2024 +0000

    Fix leak in rdbSaveToSlavesSockets

commit 74b0ca5
Author: Amit Nagler <[email protected]>
Date:   Tue Apr 9 08:31:56 2024 +0000

    Fix leak in slaveTryPartialResynchronization

commit 80b9c5f
Author: Amit Nagler <[email protected]>
Date:   Tue Apr 9 08:12:36 2024 +0000

    Fix fsync requirement bug

    Fix requirement bug causing Dumping an RDB - functions only: yes test failure

commit 841013d
Author: naglera <naglera>
Date:   Mon Apr 8 12:52:36 2024 +0000

    update top comment

commit 3722cb1
Author: naglera <[email protected]>
Date:   Mon Apr 8 11:50:32 2024 +0000

    Handle COB overrun during rdb-chan sync

commit 2adf3c2
Author: naglera <[email protected]>
Date:   Tue Mar 19 13:18:39 2024 +0000

    Avoid reading from replica's rdb client after it marked as closed asap

commit 8db90c7
Merge: 6b60dd2 6411629
Author: naglera <[email protected]>
Date:   Mon Apr 8 08:23:21 2024 +0000

    Merge branch 'unstable' into rdb-channel

commit 6b60dd2
Author: naglera <[email protected]>
Date:   Mon Mar 18 15:21:23 2024 +0000

    Remove lookupClientByIDGeneric

commit 7511f26
Author: Amit Nagler <[email protected]>
Date:   Thu Mar 14 13:56:54 2024 +0000

    debug command for wait_before_rdb_client_free

commit 3ad61d0
Author: naglera <[email protected]>
Date:   Thu Mar 14 11:45:59 2024 +0000

    Use radix tree for waiting replicas for psync

commit 8d9a057
Author: Amit Nagler <[email protected]>
Date:   Wed Mar 13 17:39:11 2024 +0000

    renaming and minor comments

commit fab702a
Author: Amit Nagler <[email protected]>
Date:   Wed Mar 13 17:38:34 2024 +0000

    use CLIENT_PROTECTED_RDB_CHANNEL flag instead of CLIENT_PROTECTED

commit 9b320d0
Author: naglera <[email protected]>
Date:   Wed Mar 13 16:07:10 2024 +0000

    Test edge cases of master connection peering

commit 0597fce
Author: Amit Nagler <[email protected]>
Date:   Wed Mar 13 09:39:25 2024 +0000

    Fix  CI workflow run comments

commit baa932c
Author: naglera <[email protected]>
Date:   Wed Mar 13 19:09:59 2024 +0200

    Update src/server.h

    Co-authored-by: debing.sun <[email protected]>

commit 073a86b
Author: naglera <[email protected]>
Date:   Tue Mar 12 16:10:14 2024 +0000

    Prevent freeing repl buffer blocks that are used by pre established psync connection

    This is necessery for the case in which the RDB is loaded before psync
    establshed. We do that by protecting the RDB client for short grace
    period (5sec) that will allow the replica main channel to finish
    handshake.

commit 75e68e1
Author: naglera <[email protected]>
Date:   Tue Mar 12 13:47:59 2024 +0000

    update replica state machine diagram

commit 5c4c824
Author: naglera <[email protected]>
Date:   Tue Mar 12 13:43:57 2024 +0000

    Use identity to hash dict integers and store keys as plain text

commit 616455e
Author: naglera <[email protected]>
Date:   Tue Mar 12 13:22:21 2024 +0000

    rename REPLCONF identify => set-rdb-conn-id

commit 83a471d
Author: naglera <[email protected]>
Date:   Tue Mar 12 12:58:34 2024 +0000

    remove getLongLongFromObjectOrReply and fix identations

commit da22d0d
Author: naglera <[email protected]>
Date:   Mon Mar 11 17:02:04 2024 +0000

    Rename peer & unpeer => add & remove

commit 39364a6
Author: naglera <[email protected]>
Date:   Mon Mar 11 16:23:27 2024 +0000

    Rename pending_slaves to slaves_waiting_psync

commit 7073371
Author: naglera <[email protected]>
Date:   Mon Mar 11 15:51:24 2024 +0000

    Use cliet id for pending replicas dict

commit 8ccfe0b
Author: naglera <[email protected]>
Date:   Thu Mar 7 12:30:22 2024 +0000

    fix info field: replstate name

commit 79eae09
Author: naglera <[email protected]>
Date:   Mon Feb 26 14:22:17 2024 +0000

    Fix comments and add high level design top comment and diagram

commit 5e20d8e
Author: naglera <[email protected]>
Date:   Mon Feb 26 13:26:17 2024 +0000

    move pending_slaves dict update in case of cob overrun to freeClient

commit 98f0b9c
Author: naglera <[email protected]>
Date:   Sun Feb 25 19:46:58 2024 +0200

    Update src/replication.c

    update slaves_waitlng_psync documentation

    Co-authored-by: Oran Agra <[email protected]>

commit f668d29
Author: naglera <[email protected]>
Date:   Sun Feb 25 19:43:22 2024 +0200

    Update src/replication.c

    Co-authored-by: Oran Agra <[email protected]>

commit 9edeec3
Author: naglera <[email protected]>
Date:   Wed Feb 21 15:19:13 2024 +0000

    spellcheck fix

commit e6bc18a
Author: naglera <[email protected]>
Date:   Wed Feb 21 15:15:31 2024 +0000

    add documentation

commit 93d0e47
Author: naglera <[email protected]>
Date:   Wed Feb 21 13:56:35 2024 +0000

    remove unnecessary if from rollback mechanism

commit b4868fb
Author: naglera <[email protected]>
Date:   Wed Feb 21 09:56:40 2024 +0000

    fix comments

commit 6de2dc3
Author: naglera <[email protected]>
Date:   Mon Feb 19 14:01:58 2024 +0000

    add documentation

commit 24a34dd
Author: naglera <[email protected]>
Date:   Sun Feb 18 10:39:41 2024 +0000

    Test peering connection

commit 6bcbc62
Author: naglera <[email protected]>
Date:   Fri Feb 16 13:33:23 2024 +0000

    Rollback after COB overrun during rdb-channel sync handshake

commit 669e22b
Author: naglera <[email protected]>
Date:   Thu Feb 15 15:53:28 2024 +0000

    Fix rollback mechanism

commit 4130f8f
Author: naglera <[email protected]>
Date:   Wed Feb 14 11:05:34 2024 +0000

    Peer replica with repl backlog block upon fork

commit ee527c3
Author: naglera <[email protected]>
Date:   Thu Feb 15 08:16:23 2024 +0000

    rename SLAVE_STATE_ACTIVE_RDB_CHAN to SLAVE_STATE_BG_RDB_LOAD

    tmp rename

commit 8832ed7
Author: naglera <[email protected]>
Date:   Sun Feb 11 10:47:04 2024 +0000

    fix comment- end offset format parsing

commit 2028315
Author: naglera <[email protected]>
Date:   Thu Feb 8 07:59:08 2024 +0000

    Test fixes

commit 19cddc4
Author: naglera <[email protected]>
Date:   Wed Feb 7 20:10:00 2024 +0000

    Split completeTaskRDBChannelSync into completeTaskRDBChannelSyncMainConn and completeTaskRDBChannelSyncRdbConn

commit d25c36e
Author: naglera <[email protected]>
Date:   Tue Feb 6 16:51:05 2024 +0000

    Simplfy replica state machine by spliting replica state

    server.repl_state will be used to represent the main conneciton state.
    server.repl_rdb_conn_state is introduced to represent the rdb-channel
    state.

    Main channel can reuse REPL_STATE_SEND_PSYNC,
    REPL_STATE_RECEIVE_PSYNC_REPLY, REPL_STATE_TRANSFER after rdb-channel
    initialization.

commit ed6d69a
Author: naglera <[email protected]>
Date:   Tue Feb 6 11:55:41 2024 +0000

    Merge rdbSaveToSlavesSocketsWithPipeline with rdbSaveToSlavesSocketsDirect

commit 1e1f74f
Author: naglera <[email protected]>
Date:   Tue Feb 6 09:40:30 2024 +0000

    Small comment fixes

commit 5fc0be9
Author: naglera <[email protected]>
Date:   Mon Feb 5 17:30:48 2024 +0000

    Hide repl-rdb-channel config, . Clarify fallback point. Better utilize master_supports_rdb_channel

commit e44bc69
Author: naglera <[email protected]>
Date:   Mon Feb 5 10:00:43 2024 +0000

    Fix typo

commit b4ead39
Author: naglera <[email protected]>
Date:   Mon Feb 5 10:51:07 2024 +0000

    Fix possible crash after sync failure

commit d50e456
Author: naglera <[email protected]>
Date:   Wed Jan 31 08:03:55 2024 +0000

    Space fixes and redis.conf update

commit acd12a8
Author: naglera <[email protected]>
Date:   Tue Jan 30 13:06:15 2024 +0000

    Test rdb-connection race condition

    Test the case in which the replica establish psync connection after the
    RDB was already loaded.
    Added debug command in order to froce main process to be slower then the
    bg child.

commit 1a6052f
Author: naglera <[email protected]>
Date:   Mon Jan 29 14:22:37 2024 +0000

    Disable child process pipeline

    Stream replication data directly from bg child process to replica

commit f71122c
Author: naglera <[email protected]>
Date:   Wed Jan 24 17:39:03 2024 +0000

    Refactor- Extract rdb-channel logic from slaveTryPartialReSync

    Extract setupMainConnForPsync from slaveTryPartialResynchronization to
    make slaveTryPartialResynchronization readable.

commit a67a5ff
Author: naglera <[email protected]>
Date:   Wed Jan 24 14:57:00 2024 +0000

    Add time threshold to replica buffer streaming progress callback

commit cedbf8a
Author: naglera <[email protected]>
Date:   Wed Jan 24 10:39:42 2024 +0000

    Stop reading from main connection after replica's local buffer limit reached

commit 03477b0
Author: naglera <[email protected]>
Date:   Wed Jan 24 08:54:24 2024 +0000

    switch primary -> master

commit 09b30b3
Author: naglera <[email protected]>
Date:   Wed Jan 24 08:47:52 2024 +0000

    unwrap processEndOffsetResponse

commit 4337734
Author: naglera <[email protected]>
Date:   Mon Jan 22 17:59:28 2024 +0000

    add new replication state REPL_RDB_CONN_SEND_CAPA

commit 7cd6252
Author: naglera <[email protected]>
Date:   Mon Jan 22 14:15:17 2024 +0000

    add documetation for process end offset response

commit 2e64230
Author: naglera <[email protected]>
Date:   Mon Jan 22 13:05:43 2024 +0000

    switch rdb-channel sync to normal sync only when we know that master does not support rdb-channel

commit d65381a
Author: naglera <[email protected]>
Date:   Mon Jan 22 10:53:28 2024 +0000

    Send Ack at the end of readSyncBulkPayload if it isn't rdb connection sync. AlSome method renamed

commit ac6d324
Author: naglera <[email protected]>
Date:   Thu Jan 18 09:51:00 2024 +0000

    Make repl_provisional_master contain only the necessary client's fields

commit bab299e
Author: naglera <[email protected]>
Date:   Tue Jan 16 11:07:02 2024 +0000

    Mostly renaming and comment fixes

commit c8de526
Author: naglera <[email protected]>
Date:   Mon Jan 15 07:56:35 2024 +0000

    Use -FULLSYNCNEEDED instead of empty bulk

commit 400664d
Author: naglera <[email protected]>
Date:   Mon Jan 8 17:06:06 2024 +0000

    Fixed comments, rename methods and states

commit be1f66f
Author: naglera <[email protected]>
Date:   Thu Jan 4 18:24:28 2024 +0000

    Move sendCurentOffsetToReplica to replication.c

commit 03d35b6
Author: naglera <[email protected]>
Date:   Thu Jan 4 14:08:51 2024 +0000

    Remove sendReplicationOffsetToReplicas(), since we dont want to permit rdb-channel sync along with master side disk sync

commit b7c1688
Author: naglera <[email protected]>
Date:   Wed Jan 3 16:05:15 2024 +0000

    rename primary_can_sync_using_rdb_channel-> master_supports_rdb_channel

commit 21c797e
Author: naglera <[email protected]>
Date:   Thu Jan 4 10:35:51 2024 +0000

    Decrement pending_repl_data.len during replica buffer streaming.

    Stop using stat_repl_processed_bytes to follow streaming progress,
    instread keep record of the buffer's peak.

commit 33bfd16
Author: naglera <[email protected]>
Date:   Wed Jan 3 16:01:16 2024 +0000

    replica buffer will use the same mechanism as other client reply buffers

commit 07f24f5
Author: naglera <[email protected]>
Date:   Tue Jan 2 14:34:06 2024 +0000

    Remove mentions of second channel

commit f0252d8
Author: naglera <[email protected]>
Date:   Tue May 23 09:51:05 2023 +0000

    Fix indentation

commit 3efab3a
Author: naglera <[email protected]>
Date:   Tue May 23 06:49:18 2023 +0000

    Fix comments

    Removed adlist method and primary block size.
    Use sync write for sending rdb end offset.
    Removed replconf connected sub command. The replica will send ack
    instead.
    Fixed git diff issue.
    Refactored replicationAbortSyncTransfer + replicationAbortSyncTransfer.

commit 6ef6736
Author: naglera <[email protected]>
Date:   Thu May 4 16:41:54 2023 +0000

    void instead of empty params

commit d976a81
Author: naglera <[email protected]>
Date:   Thu May 4 16:03:36 2023 +0000

    Rename repl_data_buf to pending_repl_data

    Fix memory leak
    Added void param to isOngoingRdbChannelSync

commit 9d464e2
Merge: 7297a35 e49c2a5
Author: naglera <[email protected]>
Date:   Thu May 4 14:16:49 2023 +0300

    Merge branch 'unstable' into rdb-channel

commit 7297a35
Author: naglera <[email protected]>
Date:   Thu May 4 11:00:31 2023 +0000

    Use linked list for replication data

    1. The replica side mmap replication buffer has been replaced with a linked list
    replication buffer.
    2. The replica buffer limit now depends on the client-output-buffer-limit type replica.
    3. Buffering policy changed to not cancel sync when replication buffer is full. We
    instead continue to sync without reading from the replication data socket, so when the
    replica side reaches replication buffer limits, the primary replication buffer will
    take part in the replication data buffering.

commit b9bddab
Author: naglera <[email protected]>
Date:   Sun Apr 30 07:52:41 2023 +0000

    fix CI workflow run comments

commit cc74971
Author: naglera <[email protected]>
Date:   Thu Apr 27 08:31:44 2023 +0000

    Rdb channel for full sync

    In this PR we introduce the main benefit of rdb channel by continuously
    steaming the COB in parallel to the RDB and thus keeping the primary
    side COB small AND accelerating the overall sync process.
    By streaming the replication data to the replica during the full sync,
    we reduce
    1. Memory load from the primary node.
    2. CPU load from the primary main process (will be introduced in later
       PR).
    This opens up possibilities to future improvements with better
    TLS connection handling and removal of the the need to pipeline the RDB
    from the child process to the main.

commit 4a93f4d
Author: naglera <[email protected]>
Date:   Thu Apr 27 08:31:44 2023 +0000

    Rdb channel for full sync

    In this PR we introduce the main benefit of rdb channel by continuously
    steaming the COB in parallel to the RDB and thus keeping the primary
    side COB small AND accelerating the overall sync process.
    By streaming the replication data to the replica during the full sync,
    we reduce
    1. Memory load from the primary node.
    2. CPU load from the primary main process (will be introduced in later
       PR).
    This opens up possibilities to future improvements with better
    TLS connection handling and removal of the the need to pipeline the RDB
    from the child process to the main.
  • Loading branch information
naglera committed May 22, 2024
1 parent 7253862 commit 378dcf2
Show file tree
Hide file tree
Showing 18 changed files with 2,114 additions and 190 deletions.
2 changes: 2 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3088,6 +3088,7 @@ standardConfig static_configs[] = {
createBoolConfig("lazyfree-lazy-user-flush", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_flush , 0, NULL, NULL),
createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL),
createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 1, NULL, NULL),
createBoolConfig("repl-rdb-channel", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.rdb_channel_enabled, 0, NULL, NULL),
createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.aof_rewrite_incremental_fsync, 1, NULL, NULL),
createBoolConfig("no-appendfsync-on-rewrite", NULL, MODIFIABLE_CONFIG, server.aof_no_fsync_on_rewrite, 0, NULL, NULL),
createBoolConfig("cluster-require-full-coverage", NULL, MODIFIABLE_CONFIG, server.cluster_require_full_coverage, 1, NULL, NULL),
Expand Down Expand Up @@ -3252,6 +3253,7 @@ standardConfig static_configs[] = {

/* Other configs */
createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */
createTimeTConfig("loading-process-events-interval-ms", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, LONG_MAX, server.loading_process_events_interval_ms, 100, INTEGER_CONFIG, NULL, NULL), /* Default: 0.1 seconds */
createOffTConfig("auto-aof-rewrite-min-size", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.aof_rewrite_min_size, 64*1024*1024, MEMORY_CONFIG, NULL, NULL),
createOffTConfig("loading-process-events-interval-bytes", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 1024, INT_MAX, server.loading_process_events_interval_bytes, 1024*1024*2, INTEGER_CONFIG, NULL, NULL),

Expand Down
12 changes: 12 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,10 @@ void debugCommand(client *c) {
" In case RESET is provided the peak reset time will be restored to the default value",
"REPLYBUFFER RESIZING <0|1>",
" Enable or disable the reply buffer resize cron job",
"SLEEP-AFTER-FORK <micro seconds>",
" Stop the server's main process for <seconds> after forking.",
"WAIT-BEFORE-RDB-CLIENT-FREE <seconds>",
" Grace period in seconds for replica main channel to establish psync.",
"DICT-RESIZING <0|1>",
" Enable or disable the main dict and expire dict resizing.",
NULL
Expand Down Expand Up @@ -1022,6 +1026,14 @@ NULL
return;
}
addReply(c, shared.ok);
} else if(!strcasecmp(c->argv[1]->ptr,"SLEEP-AFTER-FORK") &&
c->argc == 3) {
server.debug_sleep_after_fork = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if(!strcasecmp(c->argv[1]->ptr,"WAIT-BEFORE-RDB-CLIENT-FREE") &&
c->argc == 3) {
server.wait_before_rdb_client_free = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "dict-resizing") && c->argc == 3) {
server.dict_resizing = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
Expand Down
49 changes: 44 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ client *createClient(connection *conn) {
c->slave_addr = NULL;
c->slave_capa = SLAVE_CAPA_NONE;
c->slave_req = SLAVE_REQ_NONE;
c->associated_rdb_client_id = 0;
c->rdb_client_disconnect_time = 0;
c->reply = listCreate();
c->deferred_reply_errors = NULL;
c->reply_bytes = 0;
Expand Down Expand Up @@ -237,6 +239,11 @@ void installClientWriteHandler(client *c) {
}
}

/* Determining whether a replica requires online data updates based on its state */
int isReplDataRequired(client *c) {
return c->replstate == SLAVE_STATE_ONLINE || c->replstate == SLAVE_STATE_BG_RDB_LOAD;
}

/* This function puts the client in the queue of clients that should write
* their output buffers to the socket. Note that it does not *yet* install
* the write handler, to start clients are put in a queue of clients that need
Expand All @@ -250,7 +257,7 @@ void putClientInPendingWriteQueue(client *c) {
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack)))
(isReplDataRequired(c) && !c->repl_start_cmd_stream_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
Expand Down Expand Up @@ -1569,7 +1576,7 @@ void freeClient(client *c) {

/* If a client is protected, yet we need to free it right now, make sure
* to at least use asynchronous freeing. */
if (c->flags & CLIENT_PROTECTED) {
if ((c->flags & CLIENT_PROTECTED) || (c->flags & CLIENT_PROTECTED_RDB_CHANNEL)) {
freeClientAsync(c);
return;
}
Expand Down Expand Up @@ -1700,6 +1707,9 @@ void freeClient(client *c) {
moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICA_CHANGE,
VALKEYMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE,
NULL);
if (c->flags & CLIENT_REPL_RDB_CHANNEL) {
uint64_t id = htonu64(c->id);
raxRemove(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),NULL); }
}

/* Master/slave cleanup Case 2:
Expand Down Expand Up @@ -1802,6 +1812,18 @@ int freeClientsInAsyncFreeQueue(void) {
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);

if (c->flags & CLIENT_PROTECTED_RDB_CHANNEL) {
/* Check if we can remove RDB connection protection. */
if (!c->rdb_client_disconnect_time) {
c->rdb_client_disconnect_time = server.unixtime;
continue;
}
if (server.unixtime - c->rdb_client_disconnect_time > server.wait_before_rdb_client_free) {
serverLog(LL_NOTICE, "Replica main connection failed to establish PSYNC within the grace period (%ld seconds). Freeing RDB client %llu.", (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id);
c->flags &= ~CLIENT_PROTECTED_RDB_CHANNEL;
}
}

if (c->flags & CLIENT_PROTECTED) continue;

c->flags &= ~CLIENT_CLOSE_ASAP;
Expand All @@ -1822,6 +1844,15 @@ client *lookupClientByID(uint64_t id) {
return c;
}

/* Return a client by ID, or NULL if the client ID is not in the set
* of slaves waiting psync clients. */
client *lookupRdbClientByID(uint64_t id) {
id = htonu64(id);
void *c = NULL;
raxFind(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),&c);
return c;
}

/* This function should be called from _writeToClient when the reply list is not empty,
* it gathers the scattered buffers from reply list and sends them away with connWritev.
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
Expand Down Expand Up @@ -2644,6 +2675,9 @@ void readQueryFromClient(connection *conn) {
int nread, big_arg = 0;
size_t qblen, readlen;

/* If the replica RDB client is marked as closed ASAP, do not try to read from it */
if ((c->flags & CLIENT_CLOSE_ASAP) && (c->flags & CLIENT_PROTECTED_RDB_CHANNEL)) return;

/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
Expand Down Expand Up @@ -2705,6 +2739,9 @@ void readQueryFromClient(connection *conn) {
if (server.verbosity <= LL_VERBOSE) {
sds info = catClientInfoString(sdsempty(), c);
serverLog(LL_VERBOSE, "Client closed connection %s", info);
if (c->flags & CLIENT_PROTECTED_RDB_CHANNEL) {
serverLog(LL_VERBOSE, "Postpone RDB client (%llu) free for %d seconds", (unsigned long long)c->id, server.wait_before_rdb_client_free);
}
sdsfree(info);
}
freeClientAsync(c);
Expand Down Expand Up @@ -3975,10 +4012,11 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) {
/* Note that c->reply_bytes is irrelevant for replica clients
* (they use the global repl buffers). */
if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) ||
c->flags & CLIENT_CLOSE_ASAP) return 0;
(c->flags & CLIENT_CLOSE_ASAP && !(c->flags & CLIENT_PROTECTED_RDB_CHANNEL))) return 0;
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(),c);

/* Remove RDB connection protection on COB overrun */
c->flags &= ~CLIENT_PROTECTED_RDB_CHANNEL;
if (async) {
freeClientAsync(c);
serverLog(LL_WARNING,
Expand Down Expand Up @@ -4025,7 +4063,8 @@ void flushSlavesOutputBuffers(void) {
*
* 3. Obviously if the slave is not ONLINE.
*/
if (slave->replstate == SLAVE_STATE_ONLINE &&
if ((slave->replstate == SLAVE_STATE_ONLINE ||
slave->replstate == SLAVE_STATE_BG_RDB_LOAD) &&
!(slave->flags & CLIENT_CLOSE_ASAP) &&
can_receive_writes &&
!slave->repl_start_cmd_stream_on_ack &&
Expand Down
60 changes: 45 additions & 15 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3547,6 +3547,7 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
listIter li;
pid_t childpid;
int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;
int direct = (req & SLAVE_REQ_RDB_CHANNEL);

if (hasActiveChildProcess()) return C_ERR;

Expand Down Expand Up @@ -3574,17 +3575,35 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {

/* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
int connsnum = 0;
connection **conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
server.rdb_pipe_conns = NULL;
if (!direct) {
server.rdb_pipe_conns = conns;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
}
/* Filter replica connections pending full sync (ie. in WAIT_BGSAVE_START state). */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
/* Check slave has the exact requirements */
if (slave->slave_req != req)
continue;
server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
conns[connsnum++] = slave->conn;
if (direct) {
/* Put the socket in blocking mode to simplify RDB transfer. */
connBlock(slave->conn);
connSendTimeout(slave->conn, server.repl_timeout * 1000);
/* This replica uses diskless rdb channel sync, hence we need
* to inform it with the save end offset.*/
sendCurrentOffsetToReplica(slave);
/* Make sure repl traffic is appended to the replication backlog */
addSlaveToPsyncWaitingRax(slave);
} else {
server.rdb_pipe_numconns++;
}
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
}
}
Expand All @@ -3594,8 +3613,11 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
/* Child */
int retval, dummy;
rio rdb;

rioInitWithFd(&rdb,rdb_pipe_write);
if (direct) {
rioInitWithConnset(&rdb, conns, connsnum);
} else {
rioInitWithFd(&rdb,rdb_pipe_write);
}

/* Close the reading part, so that if the parent crashes, the child will
* get a write error and exit. */
Expand All @@ -3614,8 +3636,12 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}

rioFreeFd(&rdb);
if (direct) {
rioFreeConnset(&rdb);
} else {
rioFreeFd(&rdb);
}
zfree(conns);
/* wake up the reader, tell it we're done. */
close(rdb_pipe_write);
close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
Expand All @@ -3642,20 +3668,24 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
}
close(rdb_pipe_write);
close(server.rdb_pipe_read);
close(server.rdb_child_exit_pipe);
zfree(server.rdb_pipe_conns);
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
zfree(conns);
if (direct) {
closeChildInfoPipe();
} else {
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
}
} else {
serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",
(long) childpid);
serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld to %s",
(long) childpid, direct? "replica socket" : "pipe");
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
if (direct) zfree(conns);
}
close(safe_to_exit_pipe);
return (childpid == -1) ? C_ERR : C_OK;
Expand Down
Loading

0 comments on commit 378dcf2

Please sign in to comment.