From 653d5f7fe3d44adfb2a2e10c9110a3efacd3f0da Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 25 Nov 2024 09:59:37 +0800 Subject: [PATCH] Support empty callback on function and free temp function in async way (#1334) We have a replicationEmptyDbCallback, it is a callback used by emptyData while flushing away old data. Previously, we did not add this callback logic for function, in case of abuse, there may be a lot of functions, and also to make the code consistent, we add the same callback logic for function. Changes around this commit: 1. Extend emptyData / functionsLibCtxClear to support passing callback when flushing functions. 2. Added disklessLoad function create and discard helper function, just like disklessLoadInitTempDb and disklessLoadDiscardTempDb), we wll always flush the temp function in a async way to avoid any block. 3. Cleanup around discardTempDb, remove the callback pointer since in async way we don't need the callback. 4. Remove functionsLibCtxClear call in readSyncBulkPayload, because we called emptyData in the previous lines, which also empty functions. We are doing this callback in replication is because during the flush, replica may block a while if the flush is doing in the sync way, to avoid the primary to detect the replica is timing out, replica will use this callback to notify the primary (we also do this callback when loading a RDB). And in the async way, we empty the data in the bio and there is no slw operation, so it will ignores the callback. Signed-off-by: Binbin --- src/db.c | 10 ++++------ src/functions.c | 16 ++++++++-------- src/functions.h | 4 ++-- src/replication.c | 22 +++++++++++++++++----- src/server.h | 2 +- 5 files changed, 32 insertions(+), 22 deletions(-) diff --git a/src/db.c b/src/db.c index 5a57863de8..d3ef19027d 100644 --- a/src/db.c +++ b/src/db.c @@ -574,7 +574,7 @@ long long emptyData(int dbnum, int flags, void(callback)(dict *)) { if (with_functions) { serverAssert(dbnum == -1); - functionsLibCtxClearCurrent(async); + functionsLibCtxClearCurrent(async, callback); } /* Also fire the end event. Note that this event will fire almost @@ -602,12 +602,10 @@ serverDb *initTempDb(void) { return tempDb; } -/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */ -void discardTempDb(serverDb *tempDb, void(callback)(dict *)) { - int async = 1; - +/* Discard tempDb, it's always async. */ +void discardTempDb(serverDb *tempDb) { /* Release temp DBs. */ - emptyDbStructure(tempDb, -1, async, callback); + emptyDbStructure(tempDb, -1, 1, NULL); for (int i = 0; i < server.dbnum; i++) { kvstoreRelease(tempDb[i].keys); kvstoreRelease(tempDb[i].expires); diff --git a/src/functions.c b/src/functions.c index 916d8fd622..b694e35252 100644 --- a/src/functions.c +++ b/src/functions.c @@ -161,9 +161,9 @@ static void engineLibraryDispose(void *obj) { } /* Clear all the functions from the given library ctx */ -void functionsLibCtxClear(functionsLibCtx *lib_ctx) { - dictEmpty(lib_ctx->functions, NULL); - dictEmpty(lib_ctx->libraries, NULL); +void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *)) { + dictEmpty(lib_ctx->functions, callback); + dictEmpty(lib_ctx->libraries, callback); dictIterator *iter = dictGetIterator(lib_ctx->engines_stats); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { @@ -175,13 +175,13 @@ void functionsLibCtxClear(functionsLibCtx *lib_ctx) { lib_ctx->cache_memory = 0; } -void functionsLibCtxClearCurrent(int async) { +void functionsLibCtxClearCurrent(int async, void(callback)(dict *)) { if (async) { functionsLibCtx *old_l_ctx = curr_functions_lib_ctx; curr_functions_lib_ctx = functionsLibCtxCreate(); freeFunctionsAsync(old_l_ctx); } else { - functionsLibCtxClear(curr_functions_lib_ctx); + functionsLibCtxClear(curr_functions_lib_ctx, callback); } } @@ -196,7 +196,7 @@ static void functionsLibCtxFreeGeneric(functionsLibCtx *functions_lib_ctx, int a /* Free the given functions ctx */ void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) { - functionsLibCtxClear(functions_lib_ctx); + functionsLibCtxClear(functions_lib_ctx, NULL); dictRelease(functions_lib_ctx->functions); dictRelease(functions_lib_ctx->libraries); dictRelease(functions_lib_ctx->engines_stats); @@ -380,7 +380,7 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l dictReleaseIterator(iter); iter = NULL; - functionsLibCtxClear(functions_lib_ctx_src); + functionsLibCtxClear(functions_lib_ctx_src, NULL); if (old_libraries_list) { listRelease(old_libraries_list); old_libraries_list = NULL; @@ -820,7 +820,7 @@ void functionFlushCommand(client *c) { return; } - functionsLibCtxClearCurrent(async); + functionsLibCtxClearCurrent(async, NULL); /* Indicate that the command changed the data so it will be replicated and * counted as a data change (for persistence configuration) */ diff --git a/src/functions.h b/src/functions.h index 429405bb2d..b199fbd06e 100644 --- a/src/functions.h +++ b/src/functions.h @@ -133,9 +133,9 @@ dict *functionsLibGet(void); size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx); functionsLibCtx *functionsLibCtxGetCurrent(void); functionsLibCtx *functionsLibCtxCreate(void); -void functionsLibCtxClearCurrent(int async); +void functionsLibCtxClearCurrent(int async, void(callback)(dict *)); void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx); -void functionsLibCtxClear(functionsLibCtx *lib_ctx); +void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *)); void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async); int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err); diff --git a/src/replication.c b/src/replication.c index 1654847bd6..dcf7ee3f8c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1981,7 +1981,20 @@ serverDb *disklessLoadInitTempDb(void) { /* Helper function for readSyncBulkPayload() to discard our tempDb * when the loading succeeded or failed. */ void disklessLoadDiscardTempDb(serverDb *tempDb) { - discardTempDb(tempDb, replicationEmptyDbCallback); + discardTempDb(tempDb); +} + +/* Helper function for to initialize temp function lib context. + * The temp ctx may be populated by functionsLibCtxSwapWithCurrent or + * freed by disklessLoadDiscardFunctionsLibCtx later. */ +functionsLibCtx *disklessLoadFunctionsLibCtxCreate(void) { + return functionsLibCtxCreate(); +} + +/* Helper function to discard our temp function lib context + * when the loading succeeded or failed. */ +void disklessLoadDiscardFunctionsLibCtx(functionsLibCtx *temp_functions_lib_ctx) { + freeFunctionsAsync(temp_functions_lib_ctx); } /* If we know we got an entirely different data set from our primary @@ -2186,7 +2199,7 @@ void readSyncBulkPayload(connection *conn) { if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { /* Initialize empty tempDb dictionaries. */ diskless_load_tempDb = disklessLoadInitTempDb(); - temp_functions_lib_ctx = functionsLibCtxCreate(); + temp_functions_lib_ctx = disklessLoadFunctionsLibCtxCreate(); moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL); } @@ -2226,7 +2239,6 @@ void readSyncBulkPayload(connection *conn) { dbarray = server.db; functions_lib_ctx = functionsLibCtxGetCurrent(); - functionsLibCtxClear(functions_lib_ctx); } rioInitWithConn(&rdb, conn, server.repl_transfer_size); @@ -2264,7 +2276,7 @@ void readSyncBulkPayload(connection *conn) { NULL); disklessLoadDiscardTempDb(diskless_load_tempDb); - functionsLibCtxFree(temp_functions_lib_ctx); + disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx); serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background"); } else { /* Remove the half-loaded data in case we started with an empty replica. */ @@ -2289,7 +2301,7 @@ void readSyncBulkPayload(connection *conn) { swapMainDbWithTempDb(diskless_load_tempDb); /* swap existing functions ctx with the temporary one */ - functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx, 0); + functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx, 1); moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED, NULL); diff --git a/src/server.h b/src/server.h index f4c7306009..09b67b2670 100644 --- a/src/server.h +++ b/src/server.h @@ -3572,7 +3572,7 @@ long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callbac void flushAllDataAndResetRDB(int flags); long long dbTotalServerKeyCount(void); serverDb *initTempDb(void); -void discardTempDb(serverDb *tempDb, void(callback)(dict *)); +void discardTempDb(serverDb *tempDb); int selectDb(client *c, int id);