Skip to content

Commit

Permalink
Support empty callback on function and free temp function in async way
Browse files Browse the repository at this point in the history
We have a replicationEmptyDbCallback, it is a callback used by emptyData
while flushing away old data. Previously, we did not add this callback
logic for function, in case of abuse, there may be a lot of functions,
and also to make the code consistent, we add the same callback logic
for function.

Changes around this commit:
1. Extend emptyData / functionsLibCtxClear to support passing callback
   when flushing functions.
2. Added disklessLoad function create and discard helper function, just
   like disklessLoadInitTempDb and disklessLoadDiscardTempDb), we wll
   always flush the temp function in a async way to avoid any block.
3. Cleanup around discardTempDb, remove the callback pointer since in
   async way we don't need the callback.
4. Remove functionsLibCtxClear call in readSyncBulkPayload, because we
   called emptyData in the previous lines, which also empty functions.

Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Nov 21, 2024
1 parent 6038eda commit 68ba9c7
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 22 deletions.
10 changes: 4 additions & 6 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ long long emptyData(int dbnum, int flags, void(callback)(dict *)) {

if (with_functions) {
serverAssert(dbnum == -1);
functionsLibCtxClearCurrent(async);
functionsLibCtxClearCurrent(async, callback);
}

/* Also fire the end event. Note that this event will fire almost
Expand Down Expand Up @@ -600,12 +600,10 @@ serverDb *initTempDb(void) {
return tempDb;
}

/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */
void discardTempDb(serverDb *tempDb, void(callback)(dict *)) {
int async = 1;

/* Discard tempDb, it's always async. */
void discardTempDb(serverDb *tempDb) {
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, async, callback);
emptyDbStructure(tempDb, -1, 1, NULL);
for (int i = 0; i < server.dbnum; i++) {
kvstoreRelease(tempDb[i].keys);
kvstoreRelease(tempDb[i].expires);
Expand Down
16 changes: 8 additions & 8 deletions src/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ static void engineLibraryDispose(void *obj) {
}

/* Clear all the functions from the given library ctx */
void functionsLibCtxClear(functionsLibCtx *lib_ctx) {
dictEmpty(lib_ctx->functions, NULL);
dictEmpty(lib_ctx->libraries, NULL);
void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *)) {
dictEmpty(lib_ctx->functions, callback);
dictEmpty(lib_ctx->libraries, callback);
dictIterator *iter = dictGetIterator(lib_ctx->engines_stats);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
Expand All @@ -175,13 +175,13 @@ void functionsLibCtxClear(functionsLibCtx *lib_ctx) {
lib_ctx->cache_memory = 0;
}

void functionsLibCtxClearCurrent(int async) {
void functionsLibCtxClearCurrent(int async, void(callback)(dict *)) {
if (async) {
functionsLibCtx *old_l_ctx = curr_functions_lib_ctx;
curr_functions_lib_ctx = functionsLibCtxCreate();
freeFunctionsAsync(old_l_ctx);
} else {
functionsLibCtxClear(curr_functions_lib_ctx);
functionsLibCtxClear(curr_functions_lib_ctx, callback);
}
}

Expand All @@ -196,7 +196,7 @@ static void functionsLibCtxFreeGeneric(functionsLibCtx *functions_lib_ctx, int a

/* Free the given functions ctx */
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) {
functionsLibCtxClear(functions_lib_ctx);
functionsLibCtxClear(functions_lib_ctx, NULL);
dictRelease(functions_lib_ctx->functions);
dictRelease(functions_lib_ctx->libraries);
dictRelease(functions_lib_ctx->engines_stats);
Expand Down Expand Up @@ -380,7 +380,7 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l
dictReleaseIterator(iter);
iter = NULL;

functionsLibCtxClear(functions_lib_ctx_src);
functionsLibCtxClear(functions_lib_ctx_src, NULL);
if (old_libraries_list) {
listRelease(old_libraries_list);
old_libraries_list = NULL;
Expand Down Expand Up @@ -820,7 +820,7 @@ void functionFlushCommand(client *c) {
return;
}

functionsLibCtxClearCurrent(async);
functionsLibCtxClearCurrent(async, NULL);

/* Indicate that the command changed the data so it will be replicated and
* counted as a data change (for persistence configuration) */
Expand Down
4 changes: 2 additions & 2 deletions src/functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ dict *functionsLibGet(void);
size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx);
functionsLibCtx *functionsLibCtxGetCurrent(void);
functionsLibCtx *functionsLibCtxCreate(void);
void functionsLibCtxClearCurrent(int async);
void functionsLibCtxClearCurrent(int async, void(callback)(dict *));
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx);
void functionsLibCtxClear(functionsLibCtx *lib_ctx);
void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *));
void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async);

int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err);
Expand Down
22 changes: 17 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1981,7 +1981,20 @@ serverDb *disklessLoadInitTempDb(void) {
/* Helper function for readSyncBulkPayload() to discard our tempDb
* when the loading succeeded or failed. */
void disklessLoadDiscardTempDb(serverDb *tempDb) {
discardTempDb(tempDb, replicationEmptyDbCallback);
discardTempDb(tempDb);
}

/* Helper function for to initialize temp function lib context.
* The temp ctx may be populated by functionsLibCtxSwapWithCurrent or
* freed by disklessLoadDiscardFunctionsLibCtx later. */
functionsLibCtx *disklessLoadFunctionsLibCtxCreate(void) {
return functionsLibCtxCreate();
}

/* Helper function to discard our temp function lib context
* when the loading succeeded or failed. */
void disklessLoadDiscardFunctionsLibCtx(functionsLibCtx *temp_functions_lib_ctx) {
freeFunctionsAsync(temp_functions_lib_ctx);
}

/* If we know we got an entirely different data set from our primary
Expand Down Expand Up @@ -2186,7 +2199,7 @@ void readSyncBulkPayload(connection *conn) {
if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Initialize empty tempDb dictionaries. */
diskless_load_tempDb = disklessLoadInitTempDb();
temp_functions_lib_ctx = functionsLibCtxCreate();
temp_functions_lib_ctx = disklessLoadFunctionsLibCtxCreate();

moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL);
}
Expand Down Expand Up @@ -2226,7 +2239,6 @@ void readSyncBulkPayload(connection *conn) {

dbarray = server.db;
functions_lib_ctx = functionsLibCtxGetCurrent();
functionsLibCtxClear(functions_lib_ctx);
}

rioInitWithConn(&rdb, conn, server.repl_transfer_size);
Expand Down Expand Up @@ -2264,7 +2276,7 @@ void readSyncBulkPayload(connection *conn) {
NULL);

disklessLoadDiscardTempDb(diskless_load_tempDb);
functionsLibCtxFree(temp_functions_lib_ctx);
disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx);
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
} else {
/* Remove the half-loaded data in case we started with an empty replica. */
Expand All @@ -2289,7 +2301,7 @@ void readSyncBulkPayload(connection *conn) {
swapMainDbWithTempDb(diskless_load_tempDb);

/* swap existing functions ctx with the temporary one */
functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx, 0);
functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx, 1);

moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED,
NULL);
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3568,7 +3568,7 @@ long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callbac
void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount(void);
serverDb *initTempDb(void);
void discardTempDb(serverDb *tempDb, void(callback)(dict *));
void discardTempDb(serverDb *tempDb);


int selectDb(client *c, int id);
Expand Down

0 comments on commit 68ba9c7

Please sign in to comment.