Skip to content

Commit

Permalink
Wrapper for rdbLoadRioWithLoadingCtx for scoped RDB
Browse files Browse the repository at this point in the history
Signed-off-by: naglera <[email protected]>
  • Loading branch information
naglera committed Nov 17, 2024
1 parent 0af35e3 commit cd9dca0
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 19 deletions.
21 changes: 14 additions & 7 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ char *rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);

#ifdef __GNUC__
void rdbReportError(int corruption_error, int linenum, char *reason, ...) __attribute__((format(printf, 3, 4)));
Expand Down Expand Up @@ -2980,17 +2981,24 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s
return res;
}

/* Cleanup function to restore the original loading_rio value. */
static void _restore_loading_rio(rio **old_rio_ptr) {
server.loading_rio = *old_rio_ptr;
}

/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
functionsLibCtx *functions_lib_ctx = functionsLibCtxGetCurrent();
rdbLoadingCtx loading_ctx = {.dbarray = server.db, .functions_lib_ctx = functions_lib_ctx};
int retval = rdbLoadRioWithLoadingCtx(rdb, rdbflags, rsi, &loading_ctx);
int retval = rdbLoadRioWithLoadingCtxScopedRdb(rdb, rdbflags, rsi, &loading_ctx);
return retval;
}

/* Wrapper for rdbLoadRioWithLoadingCtx that manages a scoped RDB context.
* This method wraps the rdbLoadRioWithLoadingCtx function, providing temporary
* RDB context management. It sets a new current loading RDB, calls the wrapped
* function, and then restores the previous loading RDB context. */
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) {
rio *prev_rio = server.loading_rio;
server.loading_rio = rdb;
int retval = rdbLoadRioWithLoadingCtx(rdb, rdbflags, rsi, rdb_loading_ctx);
server.loading_rio = prev_rio;
return retval;
}

Expand All @@ -3008,7 +3016,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
char buf[1024];
int error;
long long empty_keys_skipped = 0;
RDB_SCOPED_LOADING_RIO(rdb);

rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
Expand Down
8 changes: 1 addition & 7 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,6 @@
#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */
#define RDB_LOAD_ERR_OTHER 2 /* Any other errors */


/* Macro to temporarily set server.loading_rio within a scope. */
#define RDB_SCOPED_LOADING_RIO(new_rio) \
__attribute__((cleanup(_restore_loading_rio))) rio *_old_rio __attribute__((unused)) = server.loading_rio; \
server.loading_rio = new_rio;

ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len);
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
Expand Down Expand Up @@ -178,7 +172,7 @@ int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err);
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
ssize_t rdbSaveFunctions(rio *rdb);
Expand Down
2 changes: 1 addition & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2244,7 +2244,7 @@ void readSyncBulkPayload(connection *conn) {

int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
if (rdbLoadRioWithLoadingCtx(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
/* RDB loading failed. */
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB "
"from socket, check server logs.");
Expand Down
7 changes: 3 additions & 4 deletions tests/integration/dual-channel-replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1111,8 +1111,8 @@ start_server {tags {"dual-channel-replication external:skip"}} {
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry

# Generating RDB will cost 100 sec to generate
$primary debug populate 10000 primary 1
$primary config set rdb-key-save-delay 10000
$primary debug populate 100000 primary 1
$primary config set rdb-key-save-delay 1000

start_server {} {
set replica [srv 0 client]
Expand Down Expand Up @@ -1207,12 +1207,11 @@ start_server {tags {"dual-channel-replication external:skip"}} {

$primary config set repl-diskless-sync yes
$primary config set dual-channel-replication-enabled yes
$primary config set loglevel debug
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry

# Generating RDB will take 100 sec to generate
$primary debug populate 1000000 primary 1
$primary config set rdb-key-save-delay -1000
$primary config set rdb-key-save-delay 100

start_server {} {
set replica [srv 0 client]
Expand Down

0 comments on commit cd9dca0

Please sign in to comment.