Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require C11 _Atomics #490

Merged
merged 8 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ void stopAppendOnly(void) {
server.aof_last_incr_size = 0;
server.aof_last_incr_fsync_offset = 0;
server.fsynced_reploff = -1;
atomicSet(server.fsynced_reploff_pending, 0);
atomic_store_explicit(&server.fsynced_reploff_pending, 0, memory_order_relaxed);
killAppendOnlyChild();
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
Expand Down Expand Up @@ -985,11 +985,11 @@ int startAppendOnly(void) {
}
server.aof_last_fsync = server.mstime;
/* If AOF fsync error in bio job, we just ignore it and log the event. */
int aof_bio_fsync_status;
atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status);
int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed);
if (aof_bio_fsync_status == C_ERR) {
serverLog(LL_WARNING, "AOF reopen, just ignore the AOF fsync error in bio job");
atomicSet(server.aof_bio_fsync_status, C_OK);
serverLog(LL_WARNING,
"AOF reopen, just ignore the AOF fsync error in bio job");
atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed);
}

/* If AOF was in error state, we just ignore it and log the event. */
Expand Down Expand Up @@ -1074,7 +1074,7 @@ void flushAppendOnlyFile(int force) {
* (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on
* the higher offset (which contains data that was only propagated to replicas, and not to AOF) */
if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO)
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
return;
}
}
Expand Down Expand Up @@ -1244,8 +1244,9 @@ void flushAppendOnlyFile(int force) {
latencyAddSampleIfNeeded("aof-fsync-always", latency);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.mstime;
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.mstime - server.aof_last_fsync >= 1000) {
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.mstime - server.aof_last_fsync >= 1000) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
Expand Down Expand Up @@ -2409,7 +2410,7 @@ int rewriteAppendOnlyFileBackground(void) {

/* Set the initial repl_offset, which will be applied to fsynced_reploff
* when AOFRW finishes (after possibly being updated by a bio thread) */
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
server.fsynced_reploff = 0;
}

Expand Down Expand Up @@ -2647,8 +2648,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
/* Update the fsynced replication offset that just now become valid.
* This could either be the one we took in startAppendOnly, or a
* newer one set by the bio thread. */
long long fsynced_reploff_pending;
atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending);
long long fsynced_reploff_pending = atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed);
server.fsynced_reploff = fsynced_reploff_pending;
}

Expand Down
17 changes: 10 additions & 7 deletions src/bio.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

#include "server.h"
#include "bio.h"
#include <stdatomic.h>

static char *bio_worker_title[] = {
"bio_close_file",
Expand Down Expand Up @@ -256,17 +257,19 @@ void *bioProcessBackgroundJobs(void *arg) {
/* The fd may be closed by main thread and reused for another
* socket, pipe, or file. We just ignore these errno because
* aof fsync did not really fail. */
if (valkey_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) {
int last_status;
atomicGet(server.aof_bio_fsync_status, last_status);
atomicSet(server.aof_bio_fsync_status, C_ERR);
atomicSet(server.aof_bio_fsync_errno, errno);
if (valkey_fsync(job->fd_args.fd) == -1 &&
errno != EBADF && errno != EINVAL)
{
int last_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed);

atomic_store_explicit(&server.aof_bio_fsync_errno, errno, memory_order_relaxed);
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
atomic_store_explicit(&server.aof_bio_fsync_status, C_ERR, memory_order_release);
if (last_status == C_OK) {
serverLog(LL_WARNING, "Fail to fsync the AOF file: %s", strerror(errno));
}
} else {
atomicSet(server.aof_bio_fsync_status, C_OK);
atomicSet(server.fsynced_reploff_pending, job->fd_args.offset);
atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, job->fd_args.offset, memory_order_relaxed);
}

if (job->fd_args.need_reclaim_cache) {
Expand Down
1 change: 0 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

#include "server.h"
#include "cluster.h"
#include "atomicvar.h"
#include "latency.h"
#include "script.h"
#include "functions.h"
Expand Down
1 change: 0 additions & 1 deletion src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

#include "server.h"
#include "bio.h"
#include "atomicvar.h"
#include "script.h"
#include <math.h>

Expand Down
1 change: 0 additions & 1 deletion src/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "sds.h"
#include "dict.h"
#include "adlist.h"
#include "atomicvar.h"

#define LOAD_TIMEOUT_MS 500

Expand Down
73 changes: 37 additions & 36 deletions src/lazyfree.c
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
#include "server.h"
#include "bio.h"
#include "atomicvar.h"
#include "functions.h"
#include "cluster.h"

static serverAtomic size_t lazyfree_objects = 0;
static serverAtomic size_t lazyfreed_objects = 0;
#include <stdatomic.h>

static _Atomic size_t lazyfree_objects = 0;
static _Atomic size_t lazyfreed_objects = 0;

/* Release objects from the lazyfree thread. It's just decrRefCount()
* updating the count of objects to release. */
void lazyfreeFreeObject(void *args[]) {
robj *o = (robj *)args[0];
decrRefCount(o);
atomicDecr(lazyfree_objects, 1);
atomicIncr(lazyfreed_objects, 1);
atomic_fetch_sub_explicit(&lazyfree_objects,1,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,1,memory_order_relaxed);
}

/* Release a database from the lazyfree thread. The 'db' pointer is the
Expand All @@ -26,26 +27,26 @@ void lazyfreeFreeDatabase(void *args[]) {
size_t numkeys = kvstoreSize(da1);
kvstoreRelease(da1);
kvstoreRelease(da2);
atomicDecr(lazyfree_objects, numkeys);
atomicIncr(lazyfreed_objects, numkeys);
atomic_fetch_sub_explicit(&lazyfree_objects,numkeys,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,numkeys,memory_order_relaxed);
}

/* Release the key tracking table. */
void lazyFreeTrackingTable(void *args[]) {
rax *rt = args[0];
size_t len = rt->numele;
freeTrackingRadixTree(rt);
atomicDecr(lazyfree_objects, len);
atomicIncr(lazyfreed_objects, len);
atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed);
}

/* Release the error stats rax tree. */
void lazyFreeErrors(void *args[]) {
rax *errors = args[0];
size_t len = errors->numele;
raxFreeWithCallback(errors, zfree);
atomicDecr(lazyfree_objects, len);
atomicIncr(lazyfreed_objects, len);
atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed);
}

/* Release the lua_scripts dict. */
Expand All @@ -55,17 +56,17 @@ void lazyFreeLuaScripts(void *args[]) {
lua_State *lua = args[2];
long long len = dictSize(lua_scripts);
freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
atomicDecr(lazyfree_objects, len);
atomicIncr(lazyfreed_objects, len);
atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed);
}

/* Release the functions ctx. */
void lazyFreeFunctionsCtx(void *args[]) {
functionsLibCtx *functions_lib_ctx = args[0];
size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx);
functionsLibCtxFree(functions_lib_ctx);
atomicDecr(lazyfree_objects, len);
atomicIncr(lazyfreed_objects, len);
atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed);
}

/* Release replication backlog referencing memory. */
Expand All @@ -76,26 +77,24 @@ void lazyFreeReplicationBacklogRefMem(void *args[]) {
len += raxSize(index);
listRelease(blocks);
raxFree(index);
atomicDecr(lazyfree_objects, len);
atomicIncr(lazyfreed_objects, len);
atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed);
}

/* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) {
size_t aux;
atomicGet(lazyfree_objects, aux);
size_t aux = atomic_load_explicit(&lazyfree_objects,memory_order_relaxed);
return aux;
}

/* Return the number of objects that have been freed. */
size_t lazyfreeGetFreedObjectsCount(void) {
size_t aux;
atomicGet(lazyfreed_objects, aux);
size_t aux = atomic_load_explicit(&lazyfreed_objects,memory_order_relaxed);
return aux;
}

void lazyfreeResetStats(void) {
atomicSet(lazyfreed_objects, 0);
atomic_store_explicit(&lazyfreed_objects,0,memory_order_relaxed);
}

/* Return the amount of work needed in order to free an object.
Expand Down Expand Up @@ -175,8 +174,8 @@ void freeObjAsync(robj *key, robj *obj, int dbid) {
* of parts of the server core may call incrRefCount() to protect
* objects, and then call dbDelete(). */
if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
atomicIncr(lazyfree_objects, 1);
bioCreateLazyFreeJob(lazyfreeFreeObject, 1, obj);
atomic_fetch_add_explicit(&lazyfree_objects,1,memory_order_relaxed);
bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);
} else {
decrRefCount(obj);
}
Expand All @@ -195,7 +194,7 @@ void emptyDbAsync(serverDb *db) {
kvstore *oldkeys = db->keys, *oldexpires = db->expires;
db->keys = kvstoreCreate(&dbDictType, slot_count_bits, flags);
db->expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags);
atomicIncr(lazyfree_objects, kvstoreSize(oldkeys));
atomic_fetch_add_explicit(&lazyfree_objects, kvstoreSize(oldkeys), memory_order_relaxed);
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 2, oldkeys, oldexpires);
}

Expand All @@ -204,8 +203,8 @@ void emptyDbAsync(serverDb *db) {
void freeTrackingRadixTreeAsync(rax *tracking) {
/* Because this rax has only keys and no values so we use numnodes. */
if (tracking->numnodes > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects, tracking->numele);
bioCreateLazyFreeJob(lazyFreeTrackingTable, 1, tracking);
atomic_fetch_add_explicit(&lazyfree_objects,tracking->numele,memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
} else {
freeTrackingRadixTree(tracking);
}
Expand All @@ -216,8 +215,8 @@ void freeTrackingRadixTreeAsync(rax *tracking) {
void freeErrorsRadixTreeAsync(rax *errors) {
/* Because this rax has only keys and no values so we use numnodes. */
if (errors->numnodes > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects, errors->numele);
bioCreateLazyFreeJob(lazyFreeErrors, 1, errors);
atomic_fetch_add_explicit(&lazyfree_objects,errors->numele,memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeErrors,1,errors);
} else {
raxFreeWithCallback(errors, zfree);
}
Expand All @@ -227,8 +226,8 @@ void freeErrorsRadixTreeAsync(rax *errors) {
* Close lua interpreter, if there are a lot of lua scripts, close it in async way. */
void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua) {
if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects, dictSize(lua_scripts));
bioCreateLazyFreeJob(lazyFreeLuaScripts, 3, lua_scripts, lua_scripts_lru_list, lua);
atomic_fetch_add_explicit(&lazyfree_objects,dictSize(lua_scripts),memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeLuaScripts,3,lua_scripts,lua_scripts_lru_list,lua);
} else {
freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
}
Expand All @@ -237,18 +236,20 @@ void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_Stat
/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) {
if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects, functionsLibCtxFunctionsLen(functions_lib_ctx));
bioCreateLazyFreeJob(lazyFreeFunctionsCtx, 1, functions_lib_ctx);
atomic_fetch_add_explicit(&lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx),memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,functions_lib_ctx);
} else {
functionsLibCtxFree(functions_lib_ctx);
}
}

/* Free replication backlog referencing buffer blocks and rax index. */
void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
if (listLength(blocks) > LAZYFREE_THRESHOLD || raxSize(index) > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects, listLength(blocks) + raxSize(index));
bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem, 2, blocks, index);
if (listLength(blocks) > LAZYFREE_THRESHOLD ||
raxSize(index) > LAZYFREE_THRESHOLD)
{
atomic_fetch_add_explicit(&lazyfree_objects,listLength(blocks)+raxSize(index),memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index);
} else {
listRelease(blocks);
raxFree(index);
Expand Down
5 changes: 2 additions & 3 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -2413,8 +2413,7 @@ void VM_Yield(ValkeyModuleCtx *ctx, int flags, const char *busy_reply) {
* after the main thread enters acquiring GIL state in order to protect the event
* loop (ae.c) and avoid potential race conditions. */

int acquiring;
atomicGet(server.module_gil_acquring, acquiring);
int acquiring = atomic_load_explicit(&server.module_gil_acquiring, memory_order_relaxed);
if (!acquiring) {
/* If the main thread has not yet entered the acquiring GIL state,
* we attempt to wake it up and exit without waiting for it to
Expand Down Expand Up @@ -11823,7 +11822,7 @@ void moduleInitModulesSystem(void) {
moduleUnblockedClients = listCreate();
server.loadmodule_queue = listCreate();
server.module_configs_queue = dictCreate(&sdsKeyValueHashDictType);
server.module_gil_acquring = 0;
server.module_gil_acquiring = 0;
modules = dictCreate(&modulesDictType);
moduleAuthCallbacks = listCreate();

Expand Down
Loading
Loading