From 883573dd6a1dbd6348067cfd21484d43776ccefe Mon Sep 17 00:00:00 2001 From: adetunjii Date: Sun, 12 May 2024 01:48:07 +0100 Subject: [PATCH 1/5] Introduce C11 _Atomics Signed-off-by: adetunjii --- src/lazyfree.c | 58 +++++++++++++++++++++++----------------------- src/module.c | 4 ++-- src/rdb.c | 3 ++- src/replication.c | 13 ++++++----- src/server.c | 59 +++++++++++++++++++++++------------------------ src/server.h | 27 +++++++++++----------- src/zmalloc.c | 10 ++++---- 7 files changed, 88 insertions(+), 86 deletions(-) diff --git a/src/lazyfree.c b/src/lazyfree.c index 22fa2dc863..6dd65a9c31 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -4,16 +4,18 @@ #include "functions.h" #include "cluster.h" -static serverAtomic size_t lazyfree_objects = 0; -static serverAtomic size_t lazyfreed_objects = 0; +#include + +static _Atomic size_t lazyfree_objects = 0; +static _Atomic size_t lazyfreed_objects = 0; /* Release objects from the lazyfree thread. It's just decrRefCount() * updating the count of objects to release. */ void lazyfreeFreeObject(void *args[]) { robj *o = (robj *) args[0]; decrRefCount(o); - atomicDecr(lazyfree_objects,1); - atomicIncr(lazyfreed_objects,1); + atomic_fetch_sub_explicit(&lazyfree_objects,1,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,1,memory_order_relaxed); } /* Release a database from the lazyfree thread. The 'db' pointer is the @@ -26,8 +28,8 @@ void lazyfreeFreeDatabase(void *args[]) { size_t numkeys = kvstoreSize(da1); kvstoreRelease(da1); kvstoreRelease(da2); - atomicDecr(lazyfree_objects,numkeys); - atomicIncr(lazyfreed_objects,numkeys); + atomic_fetch_sub_explicit(&lazyfree_objects,numkeys,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,numkeys,memory_order_relaxed); } /* Release the key tracking table. */ @@ -35,8 +37,8 @@ void lazyFreeTrackingTable(void *args[]) { rax *rt = args[0]; size_t len = rt->numele; freeTrackingRadixTree(rt); - atomicDecr(lazyfree_objects,len); - atomicIncr(lazyfreed_objects,len); + atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); } /* Release the error stats rax tree. */ @@ -44,8 +46,8 @@ void lazyFreeErrors(void *args[]) { rax *errors = args[0]; size_t len = errors->numele; raxFreeWithCallback(errors, zfree); - atomicDecr(lazyfree_objects,len); - atomicIncr(lazyfreed_objects,len); + atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); } /* Release the lua_scripts dict. */ @@ -55,8 +57,8 @@ void lazyFreeLuaScripts(void *args[]) { lua_State *lua = args[2]; long long len = dictSize(lua_scripts); freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua); - atomicDecr(lazyfree_objects,len); - atomicIncr(lazyfreed_objects,len); + atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); } /* Release the functions ctx. */ @@ -64,8 +66,8 @@ void lazyFreeFunctionsCtx(void *args[]) { functionsLibCtx *functions_lib_ctx = args[0]; size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx); functionsLibCtxFree(functions_lib_ctx); - atomicDecr(lazyfree_objects,len); - atomicIncr(lazyfreed_objects,len); + atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); } /* Release replication backlog referencing memory. */ @@ -76,26 +78,24 @@ void lazyFreeReplicationBacklogRefMem(void *args[]) { len += raxSize(index); listRelease(blocks); raxFree(index); - atomicDecr(lazyfree_objects,len); - atomicIncr(lazyfreed_objects,len); + atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); } /* Return the number of currently pending objects to free. */ size_t lazyfreeGetPendingObjectsCount(void) { - size_t aux; - atomicGet(lazyfree_objects,aux); + size_t aux = atomic_load_explicit(&lazyfree_objects,memory_order_relaxed); return aux; } /* Return the number of objects that have been freed. */ size_t lazyfreeGetFreedObjectsCount(void) { - size_t aux; - atomicGet(lazyfreed_objects,aux); + size_t aux = atomic_load_explicit(&lazyfreed_objects,memory_order_relaxed); return aux; } void lazyfreeResetStats(void) { - atomicSet(lazyfreed_objects,0); + atomic_store_explicit(&lazyfreed_objects,0,memory_order_relaxed); } /* Return the amount of work needed in order to free an object. @@ -175,7 +175,7 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { * of parts of the server core may call incrRefCount() to protect * objects, and then call dbDelete(). */ if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) { - atomicIncr(lazyfree_objects,1); + atomic_fetch_add_explicit(&lazyfree_objects,1,memory_order_relaxed); bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj); } else { decrRefCount(obj); @@ -195,7 +195,7 @@ void emptyDbAsync(serverDb *db) { kvstore *oldkeys = db->keys, *oldexpires = db->expires; db->keys = kvstoreCreate(&dbDictType, slot_count_bits, flags); db->expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags); - atomicIncr(lazyfree_objects, kvstoreSize(oldkeys)); + atomic_fetch_add_explicit(&lazyfree_objects, kvstoreSize(oldkeys), memory_order_relaxed); bioCreateLazyFreeJob(lazyfreeFreeDatabase, 2, oldkeys, oldexpires); } @@ -204,7 +204,7 @@ void emptyDbAsync(serverDb *db) { void freeTrackingRadixTreeAsync(rax *tracking) { /* Because this rax has only keys and no values so we use numnodes. */ if (tracking->numnodes > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects,tracking->numele); + atomic_fetch_add_explicit(&lazyfree_objects,tracking->numele,memory_order_relaxed); bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking); } else { freeTrackingRadixTree(tracking); @@ -216,7 +216,7 @@ void freeTrackingRadixTreeAsync(rax *tracking) { void freeErrorsRadixTreeAsync(rax *errors) { /* Because this rax has only keys and no values so we use numnodes. */ if (errors->numnodes > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects,errors->numele); + atomic_fetch_add_explicit(&lazyfree_objects,errors->numele,memory_order_relaxed); bioCreateLazyFreeJob(lazyFreeErrors,1,errors); } else { raxFreeWithCallback(errors, zfree); @@ -227,7 +227,7 @@ void freeErrorsRadixTreeAsync(rax *errors) { * Close lua interpreter, if there are a lot of lua scripts, close it in async way. */ void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua) { if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects,dictSize(lua_scripts)); + atomic_fetch_add_explicit(&lazyfree_objects,dictSize(lua_scripts),memory_order_relaxed); bioCreateLazyFreeJob(lazyFreeLuaScripts,3,lua_scripts,lua_scripts_lru_list,lua); } else { freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua); @@ -237,7 +237,7 @@ void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_Stat /* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */ void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) { if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx)); + atomic_fetch_add_explicit(&lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx),memory_order_relaxed); bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,functions_lib_ctx); } else { functionsLibCtxFree(functions_lib_ctx); @@ -248,8 +248,8 @@ void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) { void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) { if (listLength(blocks) > LAZYFREE_THRESHOLD || raxSize(index) > LAZYFREE_THRESHOLD) - { - atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index)); + { + atomic_fetch_add_explicit(&lazyfree_objects,listLength(blocks)+raxSize(index),memory_order_relaxed); bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index); } else { listRelease(blocks); diff --git a/src/module.c b/src/module.c index 01abd7adfe..c847971182 100644 --- a/src/module.c +++ b/src/module.c @@ -2434,7 +2434,7 @@ void VM_Yield(ValkeyModuleCtx *ctx, int flags, const char *busy_reply) { * loop (ae.c) and avoid potential race conditions. */ int acquiring; - atomicGet(server.module_gil_acquring, acquiring); + atomicGet(server.module_gil_acquiring, acquiring); if (!acquiring) { /* If the main thread has not yet entered the acquiring GIL state, * we attempt to wake it up and exit without waiting for it to @@ -11994,7 +11994,7 @@ void moduleInitModulesSystem(void) { moduleUnblockedClients = listCreate(); server.loadmodule_queue = listCreate(); server.module_configs_queue = dictCreate(&sdsKeyValueHashDictType); - server.module_gil_acquring = 0; + server.module_gil_acquiring = 0; modules = dictCreate(&modulesDictType); moduleAuthCallbacks = listCreate(); diff --git a/src/rdb.c b/src/rdb.c index 0995907e62..145f372fb7 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -39,6 +39,7 @@ #include #include +#include #include #include #include @@ -2971,7 +2972,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { processModuleLoadingProgressEvent(0); } if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) { - atomicIncr(server.stat_net_repl_input_bytes, len); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,len,memory_order_relaxed); } } diff --git a/src/replication.c b/src/replication.c index f53fdc9160..397f41041c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -1424,7 +1425,7 @@ void sendBulkToSlave(connection *conn) { freeClient(slave); return; } - atomicIncr(server.stat_net_repl_output_bytes, nwritten); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); sdsrange(slave->replpreamble,nwritten,-1); if (sdslen(slave->replpreamble) == 0) { sdsfree(slave->replpreamble); @@ -1453,7 +1454,7 @@ void sendBulkToSlave(connection *conn) { return; } slave->repldboff += nwritten; - atomicIncr(server.stat_net_repl_output_bytes, nwritten); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); if (slave->repldboff == slave->repldbsize) { closeRepldbfd(slave); connSetWriteHandler(slave->conn,NULL); @@ -1499,7 +1500,7 @@ void rdbPipeWriteHandler(struct connection *conn) { return; } else { slave->repldboff += nwritten; - atomicIncr(server.stat_net_repl_output_bytes, nwritten); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); if (slave->repldboff < server.rdb_pipe_bufflen) { slave->repl_last_partial_write = server.unixtime; return; /* more data to write.. */ @@ -1579,7 +1580,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, /* Note: when use diskless replication, 'repldboff' is the offset * of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */ slave->repldboff = nwritten; - atomicIncr(server.stat_net_repl_output_bytes, nwritten); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); } /* If we were unable to write all the data to one of the replicas, * setup write handler (and disable pipe read handler, below) */ @@ -1897,7 +1898,7 @@ void readSyncBulkPayload(connection *conn) { } else { /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and * convert "\r\n" to '\0' so 1 byte is lost. */ - atomicIncr(server.stat_net_repl_input_bytes, nread+1); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread+1,memory_order_relaxed); } if (buf[0] == '-') { @@ -1968,7 +1969,7 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(1); return; } - atomicIncr(server.stat_net_repl_input_bytes, nread); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread,memory_order_relaxed); /* When a mark is used, we want to detect EOF asap in order to avoid * writing the EOF mark into the file... */ diff --git a/src/server.c b/src/server.c index 6dc99eee3a..b968edac36 100644 --- a/src/server.c +++ b/src/server.c @@ -1111,7 +1111,7 @@ static inline void updateCachedTimeWithUs(int update_daylight_info, const long l server.ustime = ustime; server.mstime = server.ustime / 1000; time_t unixtime = server.mstime / 1000; - atomicSet(server.unixtime, unixtime); + atomic_store_explicit(&server.unixtime, unixtime, memory_order_relaxed); /* To get information about daylight saving time, we need to call * localtime_r and cache the result. However calling localtime_r in this @@ -1302,10 +1302,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { run_with_period(100) { long long stat_net_input_bytes, stat_net_output_bytes; long long stat_net_repl_input_bytes, stat_net_repl_output_bytes; - atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); - atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); - atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes); - atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes); + + stat_net_input_bytes = atomic_load_explicit(&server.stat_net_input_bytes,memory_order_relaxed); + stat_net_output_bytes = atomic_load_explicit(&server.stat_net_output_bytes,memory_order_relaxed); + stat_net_repl_input_bytes = atomic_load_explicit(&server.stat_net_repl_input_bytes,memory_order_relaxed); + stat_net_repl_output_bytes = atomic_load_explicit(&server.stat_net_repl_output_bytes,memory_order_relaxed); + monotime current_time = getMonotonicUs(); long long factor = 1000000; // us trackInstantaneousMetric(STATS_METRIC_COMMAND, server.stat_numcommands, current_time, factor); @@ -1742,8 +1744,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * If an initial rewrite is in progress then not all data is guaranteed to have actually been * persisted to disk yet, so we cannot update the field. We will wait for the rewrite to complete. */ if (server.aof_state == AOF_ON && server.fsynced_reploff != -1) { - long long fsynced_reploff_pending; - atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending); + long long fsynced_reploff_pending = atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed); server.fsynced_reploff = fsynced_reploff_pending; /* If we have blocked [WAIT]AOF clients, and fsynced_reploff changed, we want to try to @@ -1814,10 +1815,9 @@ void afterSleep(struct aeEventLoop *eventLoop) { if (moduleCount()) { mstime_t latency; latencyStartMonitor(latency); - - atomicSet(server.module_gil_acquring, 1); + atomic_store_explicit(&server.module_gil_acquiring,1,memory_order_relaxed); moduleAcquireGIL(); - atomicSet(server.module_gil_acquring, 0); + atomic_store_explicit(&server.module_gil_acquiring,0,memory_order_relaxed); moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP, NULL); @@ -2084,7 +2084,7 @@ void initServerConfig(void) { server.aof_flush_sleep = 0; server.aof_last_fsync = time(NULL) * 1000; server.aof_cur_timestamp = 0; - atomicSet(server.aof_bio_fsync_status,C_OK); + atomic_store_explicit(&server.aof_bio_fsync_status,C_OK,memory_order_relaxed); server.aof_rewrite_time_last = -1; server.aof_rewrite_time_start = -1; server.aof_lastbgrewrite_status = C_OK; @@ -2568,10 +2568,10 @@ void resetServerStats(void) { server.stat_sync_partial_ok = 0; server.stat_sync_partial_err = 0; server.stat_io_reads_processed = 0; - atomicSet(server.stat_total_reads_processed, 0); + atomic_store_explicit(&server.stat_total_reads_processed,0,memory_order_relaxed); server.stat_io_writes_processed = 0; - atomicSet(server.stat_total_writes_processed, 0); - atomicSet(server.stat_client_qbuf_limit_disconnections, 0); + atomic_store_explicit(&server.stat_total_writes_processed,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_client_qbuf_limit_disconnections,0,memory_order_relaxed); server.stat_client_outbuf_limit_disconnections = 0; for (j = 0; j < STATS_METRIC_COUNT; j++) { server.inst_metric[j].idx = 0; @@ -2583,10 +2583,10 @@ void resetServerStats(void) { server.stat_aof_rewrites = 0; server.stat_rdb_saves = 0; server.stat_aofrw_consecutive_failures = 0; - atomicSet(server.stat_net_input_bytes, 0); - atomicSet(server.stat_net_output_bytes, 0); - atomicSet(server.stat_net_repl_input_bytes, 0); - atomicSet(server.stat_net_repl_output_bytes, 0); + atomic_store_explicit(&server.stat_net_input_bytes,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_net_output_bytes,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_net_repl_input_bytes,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_net_repl_output_bytes,0,memory_order_relaxed); server.stat_unexpected_error_replies = 0; server.stat_total_error_replies = 0; server.stat_dump_payload_sanitizations = 0; @@ -4589,10 +4589,9 @@ int writeCommandsDeniedByDiskError(void) { return DISK_ERROR_TYPE_AOF; } /* AOF fsync error. */ - int aof_bio_fsync_status; - atomicGet(server.aof_bio_fsync_status,aof_bio_fsync_status); + int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status,memory_order_relaxed); if (aof_bio_fsync_status == C_ERR) { - atomicGet(server.aof_bio_fsync_errno,server.aof_last_write_errno); + server.aof_last_write_errno = atomic_load_explicit(&server.aof_bio_fsync_errno,memory_order_relaxed); return DISK_ERROR_TYPE_AOF; } } @@ -5789,8 +5788,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { } else if (server.stat_current_save_keys_total) { fork_perc = ((double)server.stat_current_save_keys_processed / server.stat_current_save_keys_total) * 100; } - int aof_bio_fsync_status; - atomicGet(server.aof_bio_fsync_status,aof_bio_fsync_status); + int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status,memory_order_relaxed); /* clang-format off */ info = sdscatprintf(info, "# Persistence\r\n" FMTARGS( @@ -5889,13 +5887,14 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { long long current_active_defrag_time = server.stat_last_active_defrag_time ? (long long) elapsedUs(server.stat_last_active_defrag_time): 0; long long stat_client_qbuf_limit_disconnections; - atomicGet(server.stat_total_reads_processed, stat_total_reads_processed); - atomicGet(server.stat_total_writes_processed, stat_total_writes_processed); - atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); - atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); - atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes); - atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes); - atomicGet(server.stat_client_qbuf_limit_disconnections, stat_client_qbuf_limit_disconnections); + + stat_total_reads_processed = atomic_load_explicit(&server.stat_total_reads_processed, memory_order_relaxed); + stat_total_writes_processed = atomic_load_explicit(&server.stat_total_writes_processed, memory_order_relaxed); + stat_net_input_bytes = atomic_load_explicit(&server.stat_net_input_bytes, memory_order_relaxed); + stat_net_output_bytes = atomic_load_explicit(&server.stat_net_output_bytes, memory_order_relaxed); + stat_net_repl_input_bytes = atomic_load_explicit(&server.stat_net_repl_input_bytes, memory_order_relaxed); + stat_net_repl_output_bytes = atomic_load_explicit(&server.stat_net_repl_output_bytes, memory_order_relaxed); + stat_client_qbuf_limit_disconnections = atomic_load_explicit(&server.stat_client_qbuf_limit_disconnections, memory_order_relaxed); if (sections++) info = sdscat(info,"\r\n"); /* clang-format off */ diff --git a/src/server.h b/src/server.h index 23bcda81d7..af6797ccfc 100644 --- a/src/server.h +++ b/src/server.h @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -1599,7 +1600,7 @@ struct valkeyServer { int module_pipe[2]; /* Pipe used to awake the event loop by module threads. */ pid_t child_pid; /* PID of current child */ int child_type; /* Type of current child */ - serverAtomic int module_gil_acquring; /* Indicates whether the GIL is being acquiring by the main thread. */ + _Atomic int module_gil_acquiring; /* Indicates whether the GIL is being acquiring by the main thread. */ /* Networking */ int port; /* TCP listening port */ int tls_port; /* TLS listening port */ @@ -1638,7 +1639,7 @@ struct valkeyServer { pause_event client_pause_per_purpose[NUM_PAUSE_PURPOSES]; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ - serverAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ + _Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ int protected_mode; /* Don't accept external connections. */ int io_threads_num; /* Number of IO threads to use. */ int io_threads_do_reads; /* Read and parse from IO threads? */ @@ -1694,10 +1695,10 @@ struct valkeyServer { long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ - serverAtomic long long stat_net_input_bytes; /* Bytes read from network. */ - serverAtomic long long stat_net_output_bytes; /* Bytes written to network. */ - serverAtomic long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */ - serverAtomic long long stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */ + _Atomic long long stat_net_input_bytes; /* Bytes read from network. */ + _Atomic long long stat_net_output_bytes; /* Bytes written to network. */ + _Atomic long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */ + _Atomic long long stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */ size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */ size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */ monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */ @@ -1714,9 +1715,9 @@ struct valkeyServer { long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */ long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */ - serverAtomic long long stat_total_reads_processed; /* Total number of read events processed */ - serverAtomic long long stat_total_writes_processed; /* Total number of write events processed */ - serverAtomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ + _Atomic long long stat_total_reads_processed; /* Total number of read events processed */ + _Atomic long long stat_total_writes_processed; /* Total number of write events processed */ + _Atomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ @@ -1806,8 +1807,8 @@ struct valkeyServer { int aof_last_write_errno; /* Valid if aof write/fsync status is ERR */ int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ int aof_use_rdb_preamble; /* Specify base AOF to use RDB encoding on AOF rewrites. */ - serverAtomic int aof_bio_fsync_status; /* Status of AOF fsync in bio job. */ - serverAtomic int aof_bio_fsync_errno; /* Errno of AOF fsync in bio job. */ + _Atomic int aof_bio_fsync_status; /* Status of AOF fsync in bio job. */ + _Atomic int aof_bio_fsync_errno; /* Errno of AOF fsync in bio job. */ aofManifest *aof_manifest; /* Used to track AOFs. */ int aof_disable_auto_gc; /* If disable automatically deleting HISTORY type AOFs? default no. (for testings). */ @@ -1872,7 +1873,7 @@ struct valkeyServer { char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ long long master_repl_offset; /* My current replication offset */ long long second_replid_offset; /* Accept offsets up to this for replid2. */ - serverAtomic long long fsynced_reploff_pending;/* Largest replication offset to + _Atomic long long fsynced_reploff_pending;/* Largest replication offset to * potentially have been fsynced, applied to fsynced_reploff only when AOF state is AOF_ON (not during the initial rewrite) */ @@ -1980,7 +1981,7 @@ struct valkeyServer { int list_max_listpack_size; int list_compress_depth; /* time cache */ - serverAtomic time_t unixtime; /* Unix time sampled every cron cycle. */ + _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ time_t timezone; /* Cached timezone. As set by tzset(). */ int daylight_active; /* Currently in daylight saving time. */ mstime_t mstime; /* 'unixtime' in milliseconds. */ diff --git a/src/zmalloc.c b/src/zmalloc.c index 550752240f..ffb3b6d16a 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -52,6 +52,7 @@ void zlibc_free(void *ptr) { #include #include "zmalloc.h" #include "atomicvar.h" +#include #define UNUSED(x) ((void)(x)) @@ -87,10 +88,10 @@ void zlibc_free(void *ptr) { #define dallocx(ptr,flags) je_dallocx(ptr,flags) #endif -#define update_zmalloc_stat_alloc(__n) atomicIncr(used_memory,(__n)) -#define update_zmalloc_stat_free(__n) atomicDecr(used_memory,(__n)) +#define update_zmalloc_stat_alloc(__n) atomic_fetch_add_explicit(&used_memory, (__n), memory_order_relaxed) +#define update_zmalloc_stat_free(__n) atomic_fetch_sub_explicit(&used_memory, (__n), memory_order_relaxed) -static serverAtomic size_t used_memory = 0; +static _Atomic size_t used_memory = 0; static void zmalloc_default_oom(size_t size) { fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n", @@ -409,8 +410,7 @@ char *zstrdup(const char *s) { } size_t zmalloc_used_memory(void) { - size_t um; - atomicGet(used_memory,um); + size_t um = atomic_load_explicit(&used_memory,memory_order_relaxed); return um; } From 32dc844eb2ea06eea96d29e7f381d5ce6476ab12 Mon Sep 17 00:00:00 2001 From: teej4y Date: Tue, 14 May 2024 02:33:05 +0100 Subject: [PATCH 2/5] replace atomicvar header file Signed-off-by: adetunjii --- src/aof.c | 16 +++++----- src/bio.c | 12 ++++---- src/db.c | 1 - src/evict.c | 1 - src/functions.c | 1 - src/lazyfree.c | 1 - src/module.c | 3 +- src/networking.c | 26 ++++++++--------- src/server.c | 1 - src/server.h | 1 - src/threads_mngr.c | 18 +++++------- src/valkey-benchmark.c | 66 ++++++++++++++++++++---------------------- 12 files changed, 65 insertions(+), 82 deletions(-) diff --git a/src/aof.c b/src/aof.c index 2f2255deb2..7ce07084d7 100644 --- a/src/aof.c +++ b/src/aof.c @@ -965,7 +965,7 @@ void stopAppendOnly(void) { server.aof_last_incr_size = 0; server.aof_last_incr_fsync_offset = 0; server.fsynced_reploff = -1; - atomicSet(server.fsynced_reploff_pending, 0); + atomic_store_explicit(&server.fsynced_reploff_pending, 0, memory_order_relaxed); killAppendOnlyChild(); sdsfree(server.aof_buf); server.aof_buf = sdsempty(); @@ -1000,12 +1000,11 @@ int startAppendOnly(void) { } server.aof_last_fsync = server.mstime; /* If AOF fsync error in bio job, we just ignore it and log the event. */ - int aof_bio_fsync_status; - atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status); + int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); if (aof_bio_fsync_status == C_ERR) { serverLog(LL_WARNING, "AOF reopen, just ignore the AOF fsync error in bio job"); - atomicSet(server.aof_bio_fsync_status,C_OK); + atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed); } /* If AOF was in error state, we just ignore it and log the event. */ @@ -1093,7 +1092,7 @@ void flushAppendOnlyFile(int force) { * (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on * the higher offset (which contains data that was only propagated to replicas, and not to AOF) */ if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO) - atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); + atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed); return; } } @@ -1261,7 +1260,7 @@ void flushAppendOnlyFile(int force) { latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_incr_fsync_offset = server.aof_last_incr_size; server.aof_last_fsync = server.mstime; - atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); + atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed); } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.mstime - server.aof_last_fsync >= 1000) { if (!sync_in_progress) { @@ -2463,7 +2462,7 @@ int rewriteAppendOnlyFileBackground(void) { /* Set the initial repl_offset, which will be applied to fsynced_reploff * when AOFRW finishes (after possibly being updated by a bio thread) */ - atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); + atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed); server.fsynced_reploff = 0; } @@ -2715,8 +2714,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { /* Update the fsynced replication offset that just now become valid. * This could either be the one we took in startAppendOnly, or a * newer one set by the bio thread. */ - long long fsynced_reploff_pending; - atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending); + long long fsynced_reploff_pending = atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed); server.fsynced_reploff = fsynced_reploff_pending; } diff --git a/src/bio.c b/src/bio.c index e0da640258..46f6b893f6 100644 --- a/src/bio.c +++ b/src/bio.c @@ -263,17 +263,17 @@ void *bioProcessBackgroundJobs(void *arg) { if (valkey_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) { - int last_status; - atomicGet(server.aof_bio_fsync_status,last_status); - atomicSet(server.aof_bio_fsync_status,C_ERR); - atomicSet(server.aof_bio_fsync_errno,errno); + int last_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); + + atomic_store_explicit(&server.aof_bio_fsync_status, C_ERR, memory_order_relaxed); + atomic_store_explicit(&server.aof_bio_fsync_errno, errno, memory_order_relaxed); if (last_status == C_OK) { serverLog(LL_WARNING, "Fail to fsync the AOF file: %s",strerror(errno)); } } else { - atomicSet(server.aof_bio_fsync_status,C_OK); - atomicSet(server.fsynced_reploff_pending, job->fd_args.offset); + atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed); + atomic_store_explicit(&server.fsynced_reploff_pending, job->fd_args.offset, memory_order_relaxed); } if (job->fd_args.need_reclaim_cache) { diff --git a/src/db.c b/src/db.c index f5aebf896a..d3f3d0fadd 100644 --- a/src/db.c +++ b/src/db.c @@ -29,7 +29,6 @@ #include "server.h" #include "cluster.h" -#include "atomicvar.h" #include "latency.h" #include "script.h" #include "functions.h" diff --git a/src/evict.c b/src/evict.c index fcac92dfc8..36cf6f852f 100644 --- a/src/evict.c +++ b/src/evict.c @@ -32,7 +32,6 @@ #include "server.h" #include "bio.h" -#include "atomicvar.h" #include "script.h" #include diff --git a/src/functions.c b/src/functions.c index a99434b51b..dd8f76744e 100644 --- a/src/functions.c +++ b/src/functions.c @@ -31,7 +31,6 @@ #include "sds.h" #include "dict.h" #include "adlist.h" -#include "atomicvar.h" #define LOAD_TIMEOUT_MS 500 diff --git a/src/lazyfree.c b/src/lazyfree.c index 6dd65a9c31..a1abcb836f 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -1,6 +1,5 @@ #include "server.h" #include "bio.h" -#include "atomicvar.h" #include "functions.h" #include "cluster.h" diff --git a/src/module.c b/src/module.c index c847971182..e89bf6e9e3 100644 --- a/src/module.c +++ b/src/module.c @@ -2433,8 +2433,7 @@ void VM_Yield(ValkeyModuleCtx *ctx, int flags, const char *busy_reply) { * after the main thread enters acquiring GIL state in order to protect the event * loop (ae.c) and avoid potential race conditions. */ - int acquiring; - atomicGet(server.module_gil_acquiring, acquiring); + int acquiring = atomic_load_explicit(&server.module_gil_acquiring, memory_order_relaxed); if (!acquiring) { /* If the main thread has not yet entered the acquiring GIL state, * we attempt to wake it up and exit without waiting for it to diff --git a/src/networking.c b/src/networking.c index 5aa02e8315..dc36f7d9f1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -28,7 +28,6 @@ */ #include "server.h" -#include "atomicvar.h" #include "cluster.h" #include "script.h" #include "fpconv_dtoa.h" @@ -37,6 +36,7 @@ #include #include #include +#include static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); @@ -132,8 +132,7 @@ client *createClient(connection *conn) { } c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size); selectDb(c,0); - uint64_t client_id; - atomicGetIncr(server.next_client_id, client_id, 1); + uint64_t client_id = atomic_fetch_add_explicit(&server.next_client_id,1,memory_order_relaxed); c->id = client_id; #ifdef LOG_REQ_RES reqresReset(c, 0); @@ -1963,7 +1962,7 @@ int _writeToClient(client *c, ssize_t *nwritten) { * thread safe. */ int writeToClient(client *c, int handler_installed) { /* Update total number of writes on server */ - atomicIncr(server.stat_total_writes_processed, 1); + atomic_fetch_add_explicit(&server.stat_total_writes_processed,1, memory_order_relaxed); ssize_t nwritten = 0, totwritten = 0; @@ -1990,9 +1989,9 @@ int writeToClient(client *c, int handler_installed) { } if (getClientType(c) == CLIENT_TYPE_SLAVE) { - atomicIncr(server.stat_net_repl_output_bytes, totwritten); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, totwritten, memory_order_relaxed); } else { - atomicIncr(server.stat_net_output_bytes, totwritten); + atomic_fetch_add_explicit(&server.stat_net_output_bytes, totwritten, memory_order_relaxed); } c->net_output_bytes += totwritten; @@ -2649,7 +2648,7 @@ void readQueryFromClient(connection *conn) { if (postponeClientRead(c)) return; /* Update total number of reads on server */ - atomicIncr(server.stat_total_reads_processed, 1); + atomic_fetch_add_explicit(&server.stat_total_reads_processed,1,memory_order_relaxed); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply @@ -2718,9 +2717,9 @@ void readQueryFromClient(connection *conn) { c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) { c->read_reploff += nread; - atomicIncr(server.stat_net_repl_input_bytes, nread); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread,memory_order_relaxed); } else { - atomicIncr(server.stat_net_input_bytes, nread); + atomic_fetch_add_explicit(&server.stat_net_input_bytes,nread,memory_order_relaxed); } c->net_input_bytes += nread; @@ -2738,7 +2737,7 @@ void readQueryFromClient(connection *conn) { sdsfree(ci); sdsfree(bytes); freeClientAsync(c); - atomicIncr(server.stat_client_qbuf_limit_disconnections, 1); + atomic_fetch_add_explicit(&server.stat_client_qbuf_limit_disconnections,1,memory_order_relaxed); goto done; } @@ -4210,7 +4209,7 @@ void processEventsWhileBlocked(void) { #endif typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) threads_pending { - serverAtomic unsigned long value; + _Atomic unsigned long value; } threads_pending; pthread_t io_threads[IO_THREADS_MAX_NUM]; @@ -4224,13 +4223,12 @@ int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_ list *io_threads_list[IO_THREADS_MAX_NUM]; static inline unsigned long getIOPendingCount(int i) { - unsigned long count = 0; - atomicGetWithSync(io_threads_pending[i].value, count); + unsigned long count = atomic_load(&io_threads_pending[i].value); return count; } static inline void setIOPendingCount(int i, unsigned long count) { - atomicSetWithSync(io_threads_pending[i].value, count); + atomic_store(&io_threads_pending[i].value, count); } void *IOThreadMain(void *myid) { diff --git a/src/server.c b/src/server.c index b968edac36..e933b19fc3 100644 --- a/src/server.c +++ b/src/server.c @@ -33,7 +33,6 @@ #include "slowlog.h" #include "bio.h" #include "latency.h" -#include "atomicvar.h" #include "mt19937-64.h" #include "functions.h" #include "hdr_histogram.h" diff --git a/src/server.h b/src/server.h index af6797ccfc..2e6eaf4cba 100644 --- a/src/server.h +++ b/src/server.h @@ -34,7 +34,6 @@ #include "config.h" #include "solarisfixes.h" #include "rio.h" -#include "atomicvar.h" #include "commands.h" #include diff --git a/src/threads_mngr.c b/src/threads_mngr.c index 2d71ffacd0..d09584c35b 100644 --- a/src/threads_mngr.c +++ b/src/threads_mngr.c @@ -32,12 +32,12 @@ #define UNUSED(V) ((void) V) #ifdef __linux__ -#include "atomicvar.h" #include "server.h" #include #include #include +#include #define IN_PROGRESS 1 static const clock_t RUN_ON_THREADS_TIMEOUT = 2; @@ -46,10 +46,10 @@ static const clock_t RUN_ON_THREADS_TIMEOUT = 2; static run_on_thread_cb g_callback = NULL; static volatile size_t g_tids_len = 0; -static serverAtomic size_t g_num_threads_done = 0; +static _Atomic size_t g_num_threads_done = 0; /* This flag is set while ThreadsManager_runOnThreads is running */ -static serverAtomic int g_in_progress = 0; +static _Atomic int g_in_progress = 0; /*============================ Internal prototypes ========================== */ @@ -112,9 +112,8 @@ int ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb c static int test_and_start(void) { - /* atomicFlagGetSet sets the variable to 1 and returns the previous value */ - int prev_state; - atomicFlagGetSet(g_in_progress, prev_state); + /* atomic_exchange_expicit sets the variable to 1 and returns the previous value */ + int prev_state = atomic_exchange_explicit(&g_in_progress,1,memory_order_relaxed); /* If prev_state is 1, g_in_progress was on. */ return prev_state; @@ -126,7 +125,7 @@ static void invoke_callback(int sig) { run_on_thread_cb callback = g_callback; if (callback) { callback(); - atomicIncr(g_num_threads_done, 1); + atomic_fetch_add_explicit(&g_num_threads_done,1,memory_order_relaxed); } else { serverLogFromHandler(LL_WARNING, "tid %ld: ThreadsManager g_callback is NULL", syscall(SYS_gettid)); } @@ -150,7 +149,7 @@ static void wait_threads(void) { /* Sleep a bit to yield to other threads. */ /* usleep isn't listed as signal safe, so we use select instead */ select(0, NULL, NULL, NULL, &tv); - atomicGet(g_num_threads_done, curr_done_count); + curr_done_count = atomic_load_explicit(&g_num_threads_done,memory_order_relaxed); clock_gettime(CLOCK_REALTIME, &curr_time); } while (curr_done_count < g_tids_len && curr_time.tv_sec <= timeout_time.tv_sec); @@ -167,8 +166,7 @@ static void ThreadsManager_cleanups(void) { g_num_threads_done = 0; /* Lastly, turn off g_in_progress */ - atomicSet(g_in_progress, 0); - + atomic_store_explicit(&g_in_progress,0,memory_order_relaxed); } #else diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index ae515a1579..5ad9db120b 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -41,6 +41,7 @@ #include #include #include +#include #include /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */ #include /* Use hiredis sds. */ @@ -54,7 +55,6 @@ #include "adlist.h" #include "dict.h" #include "zmalloc.h" -#include "atomicvar.h" #include "crc16_slottable.h" #include "hdr_histogram.h" #include "cli_common.h" @@ -85,11 +85,11 @@ static struct config { int tls; struct cliSSLconfig sslconfig; int numclients; - serverAtomic int liveclients; + _Atomic int liveclients; int requests; - serverAtomic int requests_issued; - serverAtomic int requests_finished; - serverAtomic int previous_requests_finished; + _Atomic int requests_issued; + _Atomic int requests_finished; + _Atomic int previous_requests_finished; int last_printed_bytes; long long previous_tick; int keysize; @@ -118,9 +118,9 @@ static struct config { struct serverConfig *redis_config; struct hdr_histogram* latency_histogram; struct hdr_histogram* current_sec_latency_histogram; - serverAtomic int is_fetching_slots; - serverAtomic int is_updating_slots; - serverAtomic int slots_last_update; + _Atomic int is_fetching_slots; + _Atomic int is_updating_slots; + _Atomic int slots_last_update; int enable_tracking; pthread_mutex_t liveclients_mutex; pthread_mutex_t is_updating_slots_mutex; @@ -356,8 +356,7 @@ static void freeClient(client c) { aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); aeDeleteFileEvent(el,c->context->fd,AE_READABLE); if (c->thread_id >= 0) { - int requests_finished = 0; - atomicGet(config.requests_finished, requests_finished); + int requests_finished = atomic_load_explicit(&config.requests_finished,memory_order_relaxed); if (requests_finished >= config.requests) { aeStop(el); } @@ -416,8 +415,7 @@ static void setClusterKeyHashTag(client c) { assert(c->thread_id >= 0); clusterNode *node = c->cluster_node; assert(node); - int is_updating_slots = 0; - atomicGet(config.is_updating_slots, is_updating_slots); + int is_updating_slots = atomic_load_explicit(&config.is_updating_slots,memory_order_relaxed); /* If updateClusterSlotsConfiguration is updating the slots array, * call updateClusterSlotsConfiguration is order to block the thread * since the mutex is locked. When the slots will be updated by the @@ -438,8 +436,7 @@ static void setClusterKeyHashTag(client c) { } static void clientDone(client c) { - int requests_finished = 0; - atomicGet(config.requests_finished, requests_finished); + int requests_finished = atomic_load_explicit(&config.requests_finished,memory_order_relaxed); if (requests_finished >= config.requests) { freeClient(c); if (!config.num_threads && config.el) aeStop(config.el); @@ -540,8 +537,7 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } continue; } - int requests_finished = 0; - atomicGetIncr(config.requests_finished, requests_finished, 1); + int requests_finished = atomic_fetch_add_explicit(&config.requests_finished,1,memory_order_relaxed); if (requests_finished < config.requests){ if (config.num_threads == 0) { hdr_record_value( @@ -580,8 +576,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Initialize request when nothing was written. */ if (c->written == 0) { /* Enforce upper bound to number of requests. */ - int requests_issued = 0; - atomicGetIncr(config.requests_issued, requests_issued, config.pipeline); + int requests_issued = atomic_fetch_add_explicit(&config.requests_issued,config.pipeline,memory_order_relaxed); if (requests_issued >= config.requests) { return; } @@ -589,7 +584,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Really initialize: randomize keys and set start time. */ if (config.randomkeys) randomizeClientKey(c); if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c); - atomicGet(config.slots_last_update, c->slots_last_update); + c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); c->start = ustime(); c->latency = -1; } @@ -825,8 +820,9 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c); listAddNodeTail(config.clients,c); - atomicIncr(config.liveclients, 1); - atomicGet(config.slots_last_update, c->slots_last_update); + atomic_fetch_add_explicit(&config.liveclients, 1, memory_order_relaxed); + + c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); return c; } @@ -1272,15 +1268,17 @@ static int fetchClusterSlotsConfiguration(client c) { UNUSED(c); int success = 1, is_fetching_slots = 0, last_update = 0; size_t i; - atomicGet(config.slots_last_update, last_update); + + last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); if (c->slots_last_update < last_update) { c->slots_last_update = last_update; return -1; } redisReply *reply = NULL; - atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1); + + is_fetching_slots = atomic_fetch_add_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); if (is_fetching_slots) return -1; //TODO: use other codes || errno ? - atomicSet(config.is_fetching_slots, 1); + atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); const char *errmsg = "Failed to update cluster slots configuration"; @@ -1354,14 +1352,15 @@ static int fetchClusterSlotsConfiguration(client c) { freeReplyObject(reply); redisFree(ctx); dictRelease(masters); - atomicSet(config.is_fetching_slots, 0); + atomic_store_explicit(&config.is_fetching_slots, 0, memory_order_relaxed); return success; } /* Atomically update the new slots configuration. */ static void updateClusterSlotsConfiguration(void) { pthread_mutex_lock(&config.is_updating_slots_mutex); - atomicSet(config.is_updating_slots, 1); + atomic_store_explicit(&config.is_updating_slots, 1, memory_order_relaxed); + int i; for (i = 0; i < config.cluster_node_count; i++) { clusterNode *node = config.cluster_nodes[i]; @@ -1374,8 +1373,8 @@ static void updateClusterSlotsConfiguration(void) { zfree(oldslots); } } - atomicSet(config.is_updating_slots, 0); - atomicIncr(config.slots_last_update, 1); + atomic_store_explicit(&config.is_updating_slots, 0, memory_order_relaxed); + atomic_fetch_add_explicit(&config.slots_last_update, 1, memory_order_relaxed); pthread_mutex_unlock(&config.is_updating_slots_mutex); } @@ -1662,13 +1661,10 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData UNUSED(eventLoop); UNUSED(id); benchmarkThread *thread = (benchmarkThread *)clientData; - int liveclients = 0; - int requests_finished = 0; - int previous_requests_finished = 0; + int liveclients = atomic_load_explicit(&config.liveclients, memory_order_relaxed); + int requests_finished = atomic_load_explicit(&config.requests_finished, memory_order_relaxed); + int previous_requests_finished = atomic_load_explicit(&config.previous_requests_finished, memory_order_relaxed); long long current_tick = mstime(); - atomicGet(config.liveclients, liveclients); - atomicGet(config.requests_finished, requests_finished); - atomicGet(config.previous_requests_finished, previous_requests_finished); if (liveclients == 0 && requests_finished != config.requests) { fprintf(stderr,"All clients disconnected... aborting.\n"); @@ -1693,7 +1689,7 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData const float instantaneous_dt = (float)(current_tick-config.previous_tick)/1000.0; const float instantaneous_rps = (float)(requests_finished-previous_requests_finished)/instantaneous_dt; config.previous_tick = current_tick; - atomicSet(config.previous_requests_finished,requests_finished); + atomic_store_explicit(&config.previous_requests_finished, requests_finished, memory_order_relaxed); printf("%*s\r", config.last_printed_bytes, " "); /* ensure there is a clean line */ int printed_bytes = printf("%s: rps=%.1f (overall: %.1f) avg_msec=%.3f (overall: %.3f)\r", config.title, instantaneous_rps, rps, hdr_mean(config.current_sec_latency_histogram)/1000.0f, hdr_mean(config.latency_histogram)/1000.0f); config.last_printed_bytes = printed_bytes; From 08ce3ee614bbde5f1bc05829be7ec8dcd318d502 Mon Sep 17 00:00:00 2001 From: teej4y Date: Wed, 15 May 2024 01:14:02 +0100 Subject: [PATCH 3/5] drop atomic_var field from server info Signed-off-by: adetunjii --- src/bio.c | 3 ++- src/server.c | 7 ++++--- src/zmalloc.c | 1 - 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/bio.c b/src/bio.c index 46f6b893f6..50fc46dcfe 100644 --- a/src/bio.c +++ b/src/bio.c @@ -62,6 +62,7 @@ #include "server.h" #include "bio.h" +#include static char* bio_worker_title[] = { "bio_close_file", @@ -265,8 +266,8 @@ void *bioProcessBackgroundJobs(void *arg) { { int last_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); - atomic_store_explicit(&server.aof_bio_fsync_status, C_ERR, memory_order_relaxed); atomic_store_explicit(&server.aof_bio_fsync_errno, errno, memory_order_relaxed); + atomic_store_explicit(&server.aof_bio_fsync_status, C_ERR, memory_order_release); if (last_status == C_OK) { serverLog(LL_WARNING, "Fail to fsync the AOF file: %s",strerror(errno)); diff --git a/src/server.c b/src/server.c index e933b19fc3..7005c8ddbd 100644 --- a/src/server.c +++ b/src/server.c @@ -4588,9 +4588,11 @@ int writeCommandsDeniedByDiskError(void) { return DISK_ERROR_TYPE_AOF; } /* AOF fsync error. */ - int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status,memory_order_relaxed); + int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); + atomic_thread_fence(memory_order_acquire); + if (aof_bio_fsync_status == C_ERR) { - server.aof_last_write_errno = atomic_load_explicit(&server.aof_bio_fsync_errno,memory_order_relaxed); + server.aof_last_write_errno = atomic_load_explicit(&server.aof_bio_fsync_errno, memory_order_relaxed); return DISK_ERROR_TYPE_AOF; } } @@ -5628,7 +5630,6 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "arch_bits:%i\r\n", server.arch_bits, "monotonic_clock:%s\r\n", monotonicInfoString(), "multiplexing_api:%s\r\n", aeGetApiName(), - "atomicvar_api:%s\r\n", REDIS_ATOMIC_API, "gcc_version:%s\r\n", GNUC_VERSION_STR, "process_id:%I\r\n", (int64_t) getpid(), "process_supervised:%s\r\n", supervised, diff --git a/src/zmalloc.c b/src/zmalloc.c index ffb3b6d16a..27d6bdcb67 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -51,7 +51,6 @@ void zlibc_free(void *ptr) { #include #include "zmalloc.h" -#include "atomicvar.h" #include #define UNUSED(x) ((void)(x)) From 14ba1619a98c268abcfeca35ef30c444177f7a3d Mon Sep 17 00:00:00 2001 From: Samuel Adetunji Date: Thu, 16 May 2024 00:03:52 +0100 Subject: [PATCH 4/5] remove atomicvar header file Signed-off-by: Samuel Adetunji --- src/atomicvar.h | 195 --------------------------------------------- src/server.c | 4 +- src/threads_mngr.c | 2 +- 3 files changed, 2 insertions(+), 199 deletions(-) delete mode 100644 src/atomicvar.h diff --git a/src/atomicvar.h b/src/atomicvar.h deleted file mode 100644 index 17d1c15a6d..0000000000 --- a/src/atomicvar.h +++ /dev/null @@ -1,195 +0,0 @@ -/* This file implements atomic counters using c11 _Atomic, __atomic or __sync - * macros if available, otherwise we will throw an error when compile. - * - * The exported interface is composed of the following macros: - * - * atomicIncr(var,count) -- Increment the atomic counter - * atomicGetIncr(var,oldvalue_var,count) -- Get and increment the atomic counter - * atomicIncrGet(var,newvalue_var,count) -- Increment and get the atomic counter new value - * atomicDecr(var,count) -- Decrement the atomic counter - * atomicGet(var,dstvar) -- Fetch the atomic counter value - * atomicSet(var,value) -- Set the atomic counter value - * atomicGetWithSync(var,value) -- 'atomicGet' with inter-thread synchronization - * atomicSetWithSync(var,value) -- 'atomicSet' with inter-thread synchronization - * - * Atomic operations on flags. - * Flag type can be int, long, long long or their unsigned counterparts. - * The value of the flag can be 1 or 0. - * - * atomicFlagGetSet(var,oldvalue_var) -- Get and set the atomic counter value - * - * NOTE1: __atomic* and _Atomic implementations can be actually elaborated to support any value by changing the - * hardcoded new value passed to __atomic_exchange* from 1 to @param count - * i.e oldvalue_var = atomic_exchange_explicit(&var, count). - * However, in order to be compatible with the __sync functions family, we can use only 0 and 1. - * The only exchange alternative suggested by __sync is __sync_lock_test_and_set, - * But as described by the gnu manual for __sync_lock_test_and_set(): - * https://gcc.gnu.org/onlinedocs/gcc/_005f_005fsync-Builtins.html - * "A target may support reduced functionality here by which the only valid value to store is the immediate constant 1. The exact value - * actually stored in *ptr is implementation defined." - * Hence, we can't rely on it for a any value other than 1. - * We eventually chose to implement this method with __sync_val_compare_and_swap since it satisfies functionality needed for atomicFlagGetSet - * (if the flag was 0 -> set to 1, if it's already 1 -> do nothing, but the final result is that the flag is set), - * and also it has a full barrier (__sync_lock_test_and_set has acquire barrier). - * - * NOTE2: Unlike other atomic type, which aren't guaranteed to be lock free, c11 atomic_flag does. - * To check whether a type is lock free, atomic_is_lock_free() can be used. - * It can be considered to limit the flag type to atomic_flag to improve performance. - * - * Never use return value from the macros, instead use the AtomicGetIncr() - * if you need to get the current value and increment it atomically, like - * in the following example: - * - * long oldvalue; - * atomicGetIncr(myvar,oldvalue,1); - * doSomethingWith(oldvalue); - * - * ---------------------------------------------------------------------------- - * - * Copyright (c) 2015, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include -#include "config.h" - -#ifndef __ATOMIC_VAR_H -#define __ATOMIC_VAR_H - -/* Define serverAtomic for atomic variable. */ -#define serverAtomic - -/* To test the server with Helgrind (a Valgrind tool) it is useful to define - * the following macro, so that __sync macros are used: those can be detected - * by Helgrind (even if they are less efficient) so that no false positive - * is reported. */ -// #define __ATOMIC_VAR_FORCE_SYNC_MACROS - -/* There will be many false positives if we test the server with Helgrind, since - * Helgrind can't understand we have imposed ordering on the program, so - * we use macros in helgrind.h to tell Helgrind inter-thread happens-before - * relationship explicitly for avoiding false positives. - * - * For more details, please see: valgrind/helgrind.h and - * https://www.valgrind.org/docs/manual/hg-manual.html#hg-manual.effective-use - * - * These macros take effect only when 'make helgrind', and you must first - * install Valgrind in the default path configuration. */ -#ifdef __ATOMIC_VAR_FORCE_SYNC_MACROS -#include -#else -#define ANNOTATE_HAPPENS_BEFORE(v) ((void) v) -#define ANNOTATE_HAPPENS_AFTER(v) ((void) v) -#endif - -#if !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && defined(__STDC_VERSION__) && \ - (__STDC_VERSION__ >= 201112L) && !defined(__STDC_NO_ATOMICS__) -/* Use '_Atomic' keyword if the compiler supports. */ -#undef serverAtomic -#define serverAtomic _Atomic -/* Implementation using _Atomic in C11. */ - -#include -#define atomicIncr(var,count) atomic_fetch_add_explicit(&var,(count),memory_order_relaxed) -#define atomicGetIncr(var,oldvalue_var,count) do { \ - oldvalue_var = atomic_fetch_add_explicit(&var,(count),memory_order_relaxed); \ -} while(0) -#define atomicIncrGet(var, newvalue_var, count) \ - newvalue_var = atomicIncr(var,count) + count -#define atomicDecr(var,count) atomic_fetch_sub_explicit(&var,(count),memory_order_relaxed) -#define atomicGet(var,dstvar) do { \ - dstvar = atomic_load_explicit(&var,memory_order_relaxed); \ -} while(0) -#define atomicSet(var,value) atomic_store_explicit(&var,value,memory_order_relaxed) -#define atomicGetWithSync(var,dstvar) do { \ - dstvar = atomic_load_explicit(&var,memory_order_seq_cst); \ -} while(0) -#define atomicSetWithSync(var,value) \ - atomic_store_explicit(&var,value,memory_order_seq_cst) -#define atomicFlagGetSet(var,oldvalue_var) \ - oldvalue_var = atomic_exchange_explicit(&var,1,memory_order_relaxed) -#define REDIS_ATOMIC_API "c11-builtin" - -#elif !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && \ - (!defined(__clang__) || !defined(__APPLE__) || __apple_build_version__ > 4210057) && \ - defined(__ATOMIC_RELAXED) && defined(__ATOMIC_SEQ_CST) -/* Implementation using __atomic macros. */ - -#define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED) -#define atomicIncrGet(var, newvalue_var, count) \ - newvalue_var = __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED) -#define atomicGetIncr(var,oldvalue_var,count) do { \ - oldvalue_var = __atomic_fetch_add(&var,(count),__ATOMIC_RELAXED); \ -} while(0) -#define atomicDecr(var,count) __atomic_sub_fetch(&var,(count),__ATOMIC_RELAXED) -#define atomicGet(var,dstvar) do { \ - dstvar = __atomic_load_n(&var,__ATOMIC_RELAXED); \ -} while(0) -#define atomicSet(var,value) __atomic_store_n(&var,value,__ATOMIC_RELAXED) -#define atomicGetWithSync(var,dstvar) do { \ - dstvar = __atomic_load_n(&var,__ATOMIC_SEQ_CST); \ -} while(0) -#define atomicSetWithSync(var,value) \ - __atomic_store_n(&var,value,__ATOMIC_SEQ_CST) -#define atomicFlagGetSet(var,oldvalue_var) \ - oldvalue_var = __atomic_exchange_n(&var,1,__ATOMIC_RELAXED) -#define REDIS_ATOMIC_API "atomic-builtin" - -#elif defined(HAVE_ATOMIC) -/* Implementation using __sync macros. */ - -#define atomicIncr(var,count) __sync_add_and_fetch(&var,(count)) -#define atomicIncrGet(var, newvalue_var, count) \ - newvalue_var = __sync_add_and_fetch(&var,(count)) -#define atomicGetIncr(var,oldvalue_var,count) do { \ - oldvalue_var = __sync_fetch_and_add(&var,(count)); \ -} while(0) -#define atomicDecr(var,count) __sync_sub_and_fetch(&var,(count)) -#define atomicGet(var,dstvar) do { \ - dstvar = __sync_sub_and_fetch(&var,0); \ -} while(0) -#define atomicSet(var,value) do { \ - while(!__sync_bool_compare_and_swap(&var,var,value)); \ -} while(0) -/* Actually the builtin issues a full memory barrier by default. */ -#define atomicGetWithSync(var,dstvar) do { \ - dstvar = __sync_sub_and_fetch(&var,0,__sync_synchronize); \ - ANNOTATE_HAPPENS_AFTER(&var); \ -} while(0) -#define atomicSetWithSync(var,value) do { \ - ANNOTATE_HAPPENS_BEFORE(&var); \ - while(!__sync_bool_compare_and_swap(&var,var,value,__sync_synchronize)); \ -} while(0) -#define atomicFlagGetSet(var,oldvalue_var) \ - oldvalue_var = __sync_val_compare_and_swap(&var,0,1) -#define REDIS_ATOMIC_API "sync-builtin" - -#else -#error "Unable to determine atomic operations for your platform" - -#endif -#endif /* __ATOMIC_VAR_H */ diff --git a/src/server.c b/src/server.c index 6a159e6a9b..12920f2280 100644 --- a/src/server.c +++ b/src/server.c @@ -4588,9 +4588,7 @@ int writeCommandsDeniedByDiskError(void) { return DISK_ERROR_TYPE_AOF; } /* AOF fsync error. */ - int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); - atomic_thread_fence(memory_order_acquire); - + int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_acquire); if (aof_bio_fsync_status == C_ERR) { server.aof_last_write_errno = atomic_load_explicit(&server.aof_bio_fsync_errno, memory_order_relaxed); return DISK_ERROR_TYPE_AOF; diff --git a/src/threads_mngr.c b/src/threads_mngr.c index d09584c35b..e2b395bd62 100644 --- a/src/threads_mngr.c +++ b/src/threads_mngr.c @@ -112,7 +112,7 @@ int ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb c static int test_and_start(void) { - /* atomic_exchange_expicit sets the variable to 1 and returns the previous value */ + /* atomic_exchange_explicit sets the variable to 1 and returns the previous value */ int prev_state = atomic_exchange_explicit(&g_in_progress,1,memory_order_relaxed); /* If prev_state is 1, g_in_progress was on. */ From 6a73d02df0f5f54a69bc3dfdc9252178da3d0dde Mon Sep 17 00:00:00 2001 From: Samuel Adetunji Date: Sun, 26 May 2024 15:37:00 +0100 Subject: [PATCH 5/5] fix merge conflicts Signed-off-by: Samuel Adetunji --- .github/workflows/ci.yml | 17 ++-- .github/workflows/daily.yml | 172 ++++++++++++++++++++++++++++-------- src/.clang-format | 32 +++++++ src/.clang-format-ignore | 4 + 4 files changed, 175 insertions(+), 50 deletions(-) create mode 100644 src/.clang-format diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 91900f2d35..40376f1628 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,10 +39,7 @@ jobs: # build with TLS module just for compilation coverage run: make SANITIZER=address SERVER_CFLAGS='-Werror -DDEBUG_ASSERTIONS' BUILD_TLS=module - name: testprep - # Work around ASAN issue, see https://github.com/google/sanitizers/issues/1716 - run: | - sudo apt-get install tcl8.6 tclx -y - sudo sysctl vm.mmap_rnd_bits=28 + run: sudo apt-get install tcl8.6 tclx -y - name: test run: ./runtest --verbose --tags -slow --dump-logs - name: module api test @@ -81,16 +78,14 @@ jobs: - name: make run: make SERVER_CFLAGS='-Werror' MALLOC=libc - build-centos7-jemalloc: + build-almalinux8-jemalloc: runs-on: ubuntu-latest - container: centos:7 + container: almalinux:8 steps: - # on centos7, actions/checkout@v4 does not work, so we use v3 - # ref. https://github.com/actions/checkout/issues/1487 - - uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - name: make run: | - yum -y install gcc make - make SERVER_CFLAGS='-Werror' + dnf -y install epel-release gcc make procps-ng which + make -j SERVER_CFLAGS='-Werror' diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index 7047a3247d..658e58b235 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -11,7 +11,7 @@ on: inputs: skipjobs: description: 'jobs to skip (delete the ones you wanna keep, do not leave empty)' - default: 'valgrind,sanitizer,tls,freebsd,macos,alpine,32bit,iothreads,ubuntu,centos,malloc,specific,fortify,reply-schema' + default: 'valgrind,sanitizer,tls,freebsd,macos,alpine,32bit,iothreads,ubuntu,rpm-distros,malloc,specific,fortify,reply-schema' skiptests: description: 'tests to skip (delete the ones you wanna keep, do not leave empty)' default: 'valkey,modules,sentinel,cluster,unittest' @@ -600,11 +600,9 @@ jobs: - name: make run: make all-with-unit-tests OPT=-O3 SANITIZER=address SERVER_CFLAGS='-DSERVER_TEST -Werror -DDEBUG_ASSERTIONS' - name: testprep - # Work around ASAN issue, see https://github.com/google/sanitizers/issues/1716 run: | sudo apt-get update sudo apt-get install tcl8.6 tclx -y - sudo sysctl vm.mmap_rnd_bits=28 - name: test if: true && !contains(github.event.inputs.skiptests, 'valkey') run: ./runtest --accurate --verbose --dump-logs ${{github.event.inputs.test_args}} @@ -674,13 +672,34 @@ jobs: if: true && !contains(github.event.inputs.skiptests, 'unittest') run: ./src/valkey-unit-tests --accurate - test-centos7-jemalloc: - runs-on: ubuntu-latest + test-rpm-distros-jemalloc: if: | - (github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey')) && - !contains(github.event.inputs.skipjobs, 'centos') - container: centos:7 + (github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey')) && + !contains(github.event.inputs.skipjobs, 'rpm-distros') + strategy: + fail-fast: false + matrix: + include: + - name: test-almalinux8-jemalloc + container: almalinux:8 + install_epel: true + - name: test-almalinux9-jemalloc + container: almalinux:8 + install_epel: true + - name: test-centosstream9-jemalloc + container: quay.io/centos/centos:stream9 + install_epel: true + - name: test-fedoralatest-jemalloc + container: fedora:latest + - name: test-fedorarawhide-jemalloc + container: fedora:rawhide + + name: ${{ matrix.name }} + runs-on: ubuntu-latest + + container: ${{ matrix.container }} timeout-minutes: 14400 + steps: - name: prep if: github.event_name == 'workflow_dispatch' @@ -691,18 +710,19 @@ jobs: echo "skiptests: ${{github.event.inputs.skiptests}}" echo "test_args: ${{github.event.inputs.test_args}}" echo "cluster_test_args: ${{github.event.inputs.cluster_test_args}}" - # On centos7 actions/checkout@v4 does not work, so we use v3 - # ref. https://github.com/actions/checkout/issues/1487 - - uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 with: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} + - name: Install EPEL + if: matrix.install_epel + run: dnf -y install epel-release - name: make run: | - yum -y install gcc make - make SERVER_CFLAGS='-Werror' + dnf -y install gcc make procps-ng which /usr/bin/kill + make -j SERVER_CFLAGS='-Werror' - name: testprep - run: yum -y install which tcl tclx + run: dnf -y install tcl tcltls - name: test if: true && !contains(github.event.inputs.skiptests, 'valkey') run: ./runtest --accurate --verbose --dump-logs ${{github.event.inputs.test_args}} @@ -716,13 +736,34 @@ jobs: if: true && !contains(github.event.inputs.skiptests, 'cluster') run: ./runtest-cluster ${{github.event.inputs.cluster_test_args}} - test-centos7-tls-module: - runs-on: ubuntu-latest + test-rpm-distros-tls-module: if: | - (github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey')) && - !contains(github.event.inputs.skipjobs, 'tls') - container: centos:7 + (github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey')) && + !contains(github.event.inputs.skipjobs, 'tls') + strategy: + fail-fast: false + matrix: + include: + - name: test-almalinux8-tls-module + container: almalinux:8 + install_epel: true + - name: test-almalinux9-tls-module + container: almalinux:8 + install_epel: true + - name: test-centosstream9-tls-module + container: quay.io/centos/centos:stream9 + install_epel: true + - name: test-fedoralatest-tls-module + container: fedora:latest + - name: test-fedorarawhide-tls-module + container: fedora:rawhide + + name: ${{ matrix.name }} + runs-on: ubuntu-latest + + container: ${{ matrix.container }} timeout-minutes: 14400 + steps: - name: prep if: github.event_name == 'workflow_dispatch' @@ -733,20 +774,20 @@ jobs: echo "skiptests: ${{github.event.inputs.skiptests}}" echo "test_args: ${{github.event.inputs.test_args}}" echo "cluster_test_args: ${{github.event.inputs.cluster_test_args}}" - # On centos7 actions/checkout@v4 does not work, so we use v3 - # ref. https://github.com/actions/checkout/issues/1487 - - uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 with: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} + - name: Install EPEL + if: matrix.install_epel + run: dnf -y install epel-release - name: make run: | - yum -y install centos-release-scl epel-release - yum -y install devtoolset-7 openssl-devel openssl - scl enable devtoolset-7 "make BUILD_TLS=module SERVER_CFLAGS='-Werror'" + dnf -y install make gcc openssl-devel openssl procps-ng which /usr/bin/kill + make -j BUILD_TLS=module SERVER_CFLAGS='-Werror' - name: testprep run: | - yum -y install tcl tcltls tclx + dnf -y install tcl tcltls ./utils/gen-test-certs.sh - name: test if: true && !contains(github.event.inputs.skiptests, 'valkey') @@ -765,13 +806,34 @@ jobs: run: | ./runtest-cluster --tls-module ${{github.event.inputs.cluster_test_args}} - test-centos7-tls-module-no-tls: - runs-on: ubuntu-latest + test-rpm-distros-tls-module-no-tls: if: | - (github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey')) && - !contains(github.event.inputs.skipjobs, 'tls') - container: centos:7 + (github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey')) && + !contains(github.event.inputs.skipjobs, 'tls') + strategy: + fail-fast: false + matrix: + include: + - name: test-almalinux8-tls-module-no-tls + container: almalinux:8 + install_epel: true + - name: test-almalinux9-tls-module-no-tls + container: almalinux:8 + install_epel: true + - name: test-centosstream9-tls-module-no-tls + container: quay.io/centos/centos:stream9 + install_epel: true + - name: test-fedoralatest-tls-module-no-tls + container: fedora:latest + - name: test-fedorarawhide-tls-module-no-tls + container: fedora:rawhide + + name: ${{ matrix.name }} + runs-on: ubuntu-latest + + container: ${{ matrix.container }} timeout-minutes: 14400 + steps: - name: prep if: github.event_name == 'workflow_dispatch' @@ -782,20 +844,20 @@ jobs: echo "skiptests: ${{github.event.inputs.skiptests}}" echo "test_args: ${{github.event.inputs.test_args}}" echo "cluster_test_args: ${{github.event.inputs.cluster_test_args}}" - # On centos7 actions/checkout@v4 does not work, so we use v3 - # ref. https://github.com/actions/checkout/issues/1487 - - uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0 + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 with: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} + - name: Install EPEL + if: matrix.install_epel + run: dnf -y install epel-release - name: make run: | - yum -y install centos-release-scl epel-release - yum -y install devtoolset-7 openssl-devel openssl - scl enable devtoolset-7 "make BUILD_TLS=module SERVER_CFLAGS='-Werror'" + dnf -y install make gcc openssl-devel openssl procps-ng which /usr/bin/kill + make -j BUILD_TLS=module SERVER_CFLAGS='-Werror' - name: testprep run: | - yum -y install tcl tcltls tclx + dnf -y install tcl tcltls ./utils/gen-test-certs.sh - name: test if: true && !contains(github.event.inputs.skiptests, 'valkey') @@ -898,7 +960,7 @@ jobs: build-macos: strategy: matrix: - os: [macos-11, macos-13] + os: [macos-12, macos-14] runs-on: ${{ matrix.os }} if: | (github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey')) && @@ -1073,3 +1135,35 @@ jobs: - name: validator run: ./utils/req-res-log-validator.py --verbose --fail-missing-reply-schemas ${{ (!contains(github.event.inputs.skiptests, 'valkey') && !contains(github.event.inputs.skiptests, 'module') && !contains(github.event.inputs.sentinel, 'valkey') && !contains(github.event.inputs.skiptests, 'cluster')) && github.event.inputs.test_args == '' && github.event.inputs.cluster_test_args == '' && '--fail-commands-not-all-hit' || '' }} + notify-about-job-results: + runs-on: ubuntu-latest + if: always() && github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey' + needs: [test-ubuntu-jemalloc, test-ubuntu-jemalloc-fortify, test-ubuntu-libc-malloc, test-ubuntu-no-malloc-usable-size, test-ubuntu-32bit, test-ubuntu-tls, test-ubuntu-tls-no-tls, test-ubuntu-io-threads, test-ubuntu-reclaim-cache, test-valgrind-test, test-valgrind-misc, test-valgrind-no-malloc-usable-size-test, test-valgrind-no-malloc-usable-size-misc, test-sanitizer-address, test-sanitizer-undefined, test-rpm-distros-jemalloc, test-rpm-distros-tls-module, test-rpm-distros-tls-module-no-tls, test-macos-latest, test-macos-latest-sentinel, test-macos-latest-cluster, build-macos, test-freebsd, test-alpine-jemalloc, test-alpine-libc-malloc, reply-schemas-validator] + steps: + - name: Collect job status + run: | + FAILED_JOBS=() + NEEDS_JSON='${{ toJSON(needs) }}' + JOBS=($(echo "$NEEDS_JSON" | jq 'keys' | tr -d '[] ,')) + for JOB in ${JOBS[@]}; do + JOB_RESULT=$(echo "$NEEDS_JSON" | jq ".[$JOB][\"result\"]" | tr -d '"') + if [ $JOB_RESULT = "failure" ]; then + FAILED_JOBS+=($JOB) + fi + done + + if [[ ${#FAILED_JOBS[@]} -ne 0 ]]; then + echo "FAILED_JOBS=${FAILED_JOBS[@]}" >> $GITHUB_ENV + echo "STATUS=failure" >> $GITHUB_ENV + else + echo "STATUS=success" >> $GITHUB_ENV + fi + - name: Notify about results + uses: ravsamhq/notify-slack-action@v2 + with: + status: ${{ env.STATUS }} + notify_when: "failure" + notification_title: "Daily test run <${{github.server_url}}/${{github.repository}}/actions/runs/${{github.run_id}}|Failure>" + message_format: ":fire: Tests failed: ${{ env.FAILED_JOBS }}" + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_NOTIFICATIONS_WEBHOOK_URL }} diff --git a/src/.clang-format b/src/.clang-format new file mode 100644 index 0000000000..dceaa4b029 --- /dev/null +++ b/src/.clang-format @@ -0,0 +1,32 @@ +BasedOnStyle: LLVM +IndentWidth: 4 +TabWidth: 4 +UseTab: Never +ColumnLimit: 120 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakString: 100 +PenaltyExcessCharacter: 100 +MaxEmptyLinesToKeep: 2 +BreakBeforeBraces: Attach +AllowShortCaseLabelsOnASingleLine: true +AllowShortIfStatementsOnASingleLine: WithoutElse +AllowShortLoopsOnASingleLine: true +AllowShortFunctionsOnASingleLine: false +AlignConsecutiveAssignments: false +AlignConsecutiveDeclarations: false +AlignTrailingComments: true +PointerAlignment: Right +KeepEmptyLinesAtTheStartOfBlocks: false +SpaceBeforeParens: ControlStatements +SpacesInParentheses: false +SpacesInAngles: false +SpacesInCStyleCastParentheses: false +SpaceAfterCStyleCast: false +SpacesInSquareBrackets: false +ReflowComments: true +CommentPragmas: '^\\s*\\*' +SortIncludes: false +AllowAllParametersOfDeclarationOnNextLine: false +BinPackParameters: false +AlignAfterOpenBracket: Align diff --git a/src/.clang-format-ignore b/src/.clang-format-ignore index 4c3cac4228..b5519ddfc4 100644 --- a/src/.clang-format-ignore +++ b/src/.clang-format-ignore @@ -1,7 +1,11 @@ # Don't format files copied from other sources. lzf* crccombine.* +crc16.c +crc16_slottable.h +crc64.c crcspeed.* +fmtargs.h mt19937-64.* pqsort.* setcpuaffinity.c