Skip to content

Commit

Permalink
Add ASAP abort flag to provisional primary for safer replication disc…
Browse files Browse the repository at this point in the history
…onnection handling

Introduces a dedicated flag in provisional primary struct to signal immediate
abort, preventing potential use-after-free scenarios during replication
disconnection in dual-channel load. This ensures proper termination of
rdbLoadRioWithLoadingCtx when replication is cancelled due to connection loss
on main connection.

Fixes #1152

Signed-off-by: naglera <[email protected]>
  • Loading branch information
naglera committed Oct 15, 2024
1 parent 36d438b commit acedb47
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 7 deletions.
7 changes: 6 additions & 1 deletion src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -2920,7 +2920,7 @@ void stopSaving(int success) {

/* Track loading progress in order to serve client's from time to time
and if needed calculate rdb checksum */
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
int rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.rdb_checksum) rioGenericUpdateChecksum(r, buf, len);
if (server.loading_process_events_interval_bytes &&
(r->processed_bytes + len) / server.loading_process_events_interval_bytes >
Expand All @@ -2933,6 +2933,11 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) {
server.stat_net_repl_input_bytes += len;
}
if (server.repl_provisional_primary.close_asap == 1) {
serverLog(LL_WARNING, "Primary main connection droped during RDB load callback");
return -1;
}
return 0;
}

/* Save the given functions_ctx to the rdb.
Expand Down
2 changes: 2 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2717,6 +2717,7 @@ static void fullSyncWithPrimary(connection *conn) {
server.repl_provisional_primary.reploff = reploffset;
server.repl_provisional_primary.read_reploff = reploffset;
server.repl_provisional_primary.dbid = dbid;
server.repl_provisional_primary.close_asap = 0;

/* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the
* main connection accordingly.*/
Expand Down Expand Up @@ -2799,6 +2800,7 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t
}
if (nread == 0) {
serverLog(LL_VERBOSE, "Provisional primary closed connection");
server.repl_provisional_primary.close_asap = 1;
cancelReplicationHandshake(1);
return C_ERR;
}
Expand Down
3 changes: 2 additions & 1 deletion src/rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,9 @@ void rioFreeFd(rio *r) {

/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
int rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
r->cksum = crc64(r->cksum, buf, len);
return 1;
}

/* Set the file-based rio object to auto-fsync every 'bytes' file written.
Expand Down
10 changes: 6 additions & 4 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ struct _rio {
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
* computation.
* The method should return -1 to indicate that the rio operation should be
* terminated, or a non-negative value to continue processing. */
int (*update_cksum)(struct _rio *, const void *buf, size_t len);

/* The current checksum and flags (see RIO_FLAG_*) */
uint64_t cksum, flags;
Expand Down Expand Up @@ -140,7 +142,7 @@ static inline size_t rioRead(rio *r, void *buf, size_t len) {
r->flags |= RIO_FLAG_READ_ERROR;
return 0;
}
if (r->update_cksum) r->update_cksum(r, buf, bytes_to_read);
if (r->update_cksum && r->update_cksum(r, buf, bytes_to_read) < 0) return 0;
buf = (char *)buf + bytes_to_read;
len -= bytes_to_read;
r->processed_bytes += bytes_to_read;
Expand Down Expand Up @@ -188,7 +190,7 @@ size_t rioWriteBulkDouble(rio *r, double d);
struct serverObject;
int rioWriteBulkObject(rio *r, struct serverObject *obj);

void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
int rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
void rioSetAutoSync(rio *r, off_t bytes);
void rioSetReclaimCache(rio *r, int enabled);
uint8_t rioCheckType(rio *r);
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2038,6 +2038,7 @@ struct valkeyServer {
long long reploff;
long long read_reploff;
int dbid;
uint64_t close_asap : 1;
} repl_provisional_primary;
client *cached_primary; /* Cached primary to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
Expand Down
2 changes: 1 addition & 1 deletion src/valkey-check-rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include <sys/stat.h>

void createSharedObjects(void);
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len);
int rdbLoadProgressCallback(rio *r, const void *buf, size_t len);
int rdbCheckMode = 0;

struct {
Expand Down

0 comments on commit acedb47

Please sign in to comment.