Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scoped RDB loading context and immediate abort flag #1173

Merged
merged 18 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -2980,6 +2980,11 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s
return res;
}

/* Cleanup function to restore the original loading_rio value. */
static void _restore_loading_rio(rio **old_rio_ptr) {
server.loading_rio = *old_rio_ptr;
}

/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
Expand All @@ -3003,6 +3008,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
char buf[1024];
int error;
long long empty_keys_skipped = 0;
RDB_SCOPED_LOADING_RIO(rdb);

rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
Expand Down
6 changes: 6 additions & 0 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@
#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */
#define RDB_LOAD_ERR_OTHER 2 /* Any other errors */


/* Macro to temporarily set server.loading_rio within a scope. */
#define RDB_SCOPED_LOADING_RIO(new_rio) \
__attribute__((cleanup(_restore_loading_rio))) rio *_old_rio __attribute__((unused)) = server.loading_rio; \
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
server.loading_rio = new_rio;

ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len);
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
Expand Down
14 changes: 6 additions & 8 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2824,18 +2824,16 @@ typedef struct replDataBufBlock {
* Reads replication data from primary into specified repl buffer block */
int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t read) {
int nread = connRead(conn, data_block->buf + data_block->used, read);
if (nread == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn));

if (nread <= 0) {
if (nread == 0 || connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING, "Provisional primary closed connection");
/* Signal ongoing RDB load to terminate gracefully */
if (server.loading_rio) rioCloseASAP(server.loading_rio);
cancelReplicationHandshake(1);
}
return C_ERR;
}
if (nread == 0) {
serverLog(LL_VERBOSE, "Provisional primary closed connection");
cancelReplicationHandshake(1);
return C_ERR;
}
data_block->used += nread;
server.stat_total_reads_processed++;
return read - nread;
Expand Down
16 changes: 13 additions & 3 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down Expand Up @@ -115,7 +116,7 @@ typedef struct _rio rio;
* if needed. */

static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
if (r->flags & RIO_FLAG_WRITE_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
while (len) {
size_t bytes_to_write =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
Expand All @@ -132,7 +133,7 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
}

static inline size_t rioRead(rio *r, void *buf, size_t len) {
if (r->flags & RIO_FLAG_READ_ERROR) return 0;
if (r->flags & RIO_FLAG_READ_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
while (len) {
size_t bytes_to_read =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
Expand All @@ -156,6 +157,10 @@ static inline int rioFlush(rio *r) {
return r->flush(r);
}

static inline void rioCloseASAP(rio *r) {
r->flags |= RIO_FLAG_CLOSE_ASAP;
}

/* This function allows to know if there was a read error in any past
* operation, since the rio stream was created or since the last call
* to rioClearError(). */
Expand All @@ -168,8 +173,13 @@ static inline int rioGetWriteError(rio *r) {
return (r->flags & RIO_FLAG_WRITE_ERROR) != 0;
}

/* Like rioGetReadError() but for async close errors. */
static inline int rioGetAsyncCloseError(rio *r) {
ranshid marked this conversation as resolved.
Show resolved Hide resolved
return (r->flags & RIO_FLAG_CLOSE_ASAP) != 0;
}

static inline void rioClearErrors(rio *r) {
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR);
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR | RIO_FLAG_CLOSE_ASAP);
}

void rioInitWithFile(rio *r, FILE *fp);
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2058,6 +2058,7 @@ struct valkeyServer {
int dbid;
} repl_provisional_primary;
client *cached_primary; /* Cached primary to be reused for PSYNC. */
rio *loading_rio; /* Pointer to the rio object currently used for loading data. */
ranshid marked this conversation as resolved.
Show resolved Hide resolved
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a replica */
int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */
Expand Down
68 changes: 67 additions & 1 deletion tests/integration/dual-channel-replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
}

$primary debug log "killing replica main connection"
set replica_main_conn_id [get_client_id_by_last_cmd $primary "sync"]
set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"]
assert {$replica_main_conn_id != ""}
set loglines [count_log_lines -1]
$primary client kill id $replica_main_conn_id
Expand All @@ -1197,3 +1197,69 @@ start_server {tags {"dual-channel-replication external:skip"}} {
stop_write_load $load_handle
}
}


start_server {tags {"dual-channel-replication external:skip"}} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
set loglines [count_log_lines 0]
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

$primary config set repl-diskless-sync yes
$primary config set dual-channel-replication-enabled yes
$primary config set loglevel debug
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry

# Generating RDB will cost 100 sec to generate
madolson marked this conversation as resolved.
Show resolved Hide resolved
$primary debug populate 1000000 primary 1
$primary config set rdb-key-save-delay -1000

start_server {} {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
set replica_log [srv 0 stdout]

$replica config set dual-channel-replication-enabled yes
$replica config set loglevel debug
$replica config set repl-timeout 10
$replica config set repl-diskless-load flush-before-load

test "Replica notice main-connection killed during rdb load callback" {; # https://github.com/valkey-io/valkey/issues/1152
set loglines [count_log_lines 0]
$replica replicaof $primary_host $primary_port
# Wait for sync session to start
wait_for_condition 500 1000 {
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
[s -1 rdb_bgsave_in_progress] eq 1
} else {
fail "replica didn't start sync session in time"
}
wait_for_log_messages 0 {"*Loading RDB produced by Valkey version*"} $loglines 1000 10
$primary set key val
set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"]
$primary debug log "killing replica main connection $replica_main_conn_id"
assert {$replica_main_conn_id != ""}
set loglines [count_log_lines 0]
$primary config set rdb-key-save-delay 0; # disable delay to allow next sync to succeed
$primary client kill id $replica_main_conn_id
# Wait for primary to abort the sync
wait_for_condition 50 1000 {
[string match {*replicas_waiting_psync:0*} [$primary info replication]]
} else {
fail "Primary did not free repl buf block after sync failure"
}
wait_for_log_messages 0 {"*Failed trying to load the PRIMARY synchronization DB from socket*"} $loglines 1000 10
# Replica should retry
wait_for_condition 500 1000 {
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
[s -1 rdb_bgsave_in_progress] eq 1
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
} else {
fail "replica didn't retry after connection close"
}
verify_replica_online $primary 0 500
}
}
}
Loading