From dd2b27ea4eeecf0b02d3b985fff7de3c49dc1e14 Mon Sep 17 00:00:00 2001 From: teej4y Date: Tue, 14 May 2024 02:33:05 +0100 Subject: [PATCH] replace atomicvar header file --- src/aof.c | 16 +++++----- src/bio.c | 12 ++++---- src/db.c | 1 - src/evict.c | 1 - src/functions.c | 1 - src/lazyfree.c | 1 - src/module.c | 3 +- src/networking.c | 26 ++++++++--------- src/server.c | 1 - src/server.h | 1 - src/threads_mngr.c | 18 +++++------- src/valkey-benchmark.c | 66 ++++++++++++++++++++---------------------- 12 files changed, 65 insertions(+), 82 deletions(-) diff --git a/src/aof.c b/src/aof.c index 2f2255deb2..7ce07084d7 100644 --- a/src/aof.c +++ b/src/aof.c @@ -965,7 +965,7 @@ void stopAppendOnly(void) { server.aof_last_incr_size = 0; server.aof_last_incr_fsync_offset = 0; server.fsynced_reploff = -1; - atomicSet(server.fsynced_reploff_pending, 0); + atomic_store_explicit(&server.fsynced_reploff_pending, 0, memory_order_relaxed); killAppendOnlyChild(); sdsfree(server.aof_buf); server.aof_buf = sdsempty(); @@ -1000,12 +1000,11 @@ int startAppendOnly(void) { } server.aof_last_fsync = server.mstime; /* If AOF fsync error in bio job, we just ignore it and log the event. */ - int aof_bio_fsync_status; - atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status); + int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); if (aof_bio_fsync_status == C_ERR) { serverLog(LL_WARNING, "AOF reopen, just ignore the AOF fsync error in bio job"); - atomicSet(server.aof_bio_fsync_status,C_OK); + atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed); } /* If AOF was in error state, we just ignore it and log the event. */ @@ -1093,7 +1092,7 @@ void flushAppendOnlyFile(int force) { * (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on * the higher offset (which contains data that was only propagated to replicas, and not to AOF) */ if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO) - atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); + atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed); return; } } @@ -1261,7 +1260,7 @@ void flushAppendOnlyFile(int force) { latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_incr_fsync_offset = server.aof_last_incr_size; server.aof_last_fsync = server.mstime; - atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); + atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed); } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.mstime - server.aof_last_fsync >= 1000) { if (!sync_in_progress) { @@ -2463,7 +2462,7 @@ int rewriteAppendOnlyFileBackground(void) { /* Set the initial repl_offset, which will be applied to fsynced_reploff * when AOFRW finishes (after possibly being updated by a bio thread) */ - atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); + atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed); server.fsynced_reploff = 0; } @@ -2715,8 +2714,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { /* Update the fsynced replication offset that just now become valid. * This could either be the one we took in startAppendOnly, or a * newer one set by the bio thread. */ - long long fsynced_reploff_pending; - atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending); + long long fsynced_reploff_pending = atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed); server.fsynced_reploff = fsynced_reploff_pending; } diff --git a/src/bio.c b/src/bio.c index e0da640258..46f6b893f6 100644 --- a/src/bio.c +++ b/src/bio.c @@ -263,17 +263,17 @@ void *bioProcessBackgroundJobs(void *arg) { if (valkey_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) { - int last_status; - atomicGet(server.aof_bio_fsync_status,last_status); - atomicSet(server.aof_bio_fsync_status,C_ERR); - atomicSet(server.aof_bio_fsync_errno,errno); + int last_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); + + atomic_store_explicit(&server.aof_bio_fsync_status, C_ERR, memory_order_relaxed); + atomic_store_explicit(&server.aof_bio_fsync_errno, errno, memory_order_relaxed); if (last_status == C_OK) { serverLog(LL_WARNING, "Fail to fsync the AOF file: %s",strerror(errno)); } } else { - atomicSet(server.aof_bio_fsync_status,C_OK); - atomicSet(server.fsynced_reploff_pending, job->fd_args.offset); + atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed); + atomic_store_explicit(&server.fsynced_reploff_pending, job->fd_args.offset, memory_order_relaxed); } if (job->fd_args.need_reclaim_cache) { diff --git a/src/db.c b/src/db.c index f5aebf896a..d3f3d0fadd 100644 --- a/src/db.c +++ b/src/db.c @@ -29,7 +29,6 @@ #include "server.h" #include "cluster.h" -#include "atomicvar.h" #include "latency.h" #include "script.h" #include "functions.h" diff --git a/src/evict.c b/src/evict.c index fcac92dfc8..36cf6f852f 100644 --- a/src/evict.c +++ b/src/evict.c @@ -32,7 +32,6 @@ #include "server.h" #include "bio.h" -#include "atomicvar.h" #include "script.h" #include diff --git a/src/functions.c b/src/functions.c index a99434b51b..dd8f76744e 100644 --- a/src/functions.c +++ b/src/functions.c @@ -31,7 +31,6 @@ #include "sds.h" #include "dict.h" #include "adlist.h" -#include "atomicvar.h" #define LOAD_TIMEOUT_MS 500 diff --git a/src/lazyfree.c b/src/lazyfree.c index 6dd65a9c31..a1abcb836f 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -1,6 +1,5 @@ #include "server.h" #include "bio.h" -#include "atomicvar.h" #include "functions.h" #include "cluster.h" diff --git a/src/module.c b/src/module.c index c847971182..e89bf6e9e3 100644 --- a/src/module.c +++ b/src/module.c @@ -2433,8 +2433,7 @@ void VM_Yield(ValkeyModuleCtx *ctx, int flags, const char *busy_reply) { * after the main thread enters acquiring GIL state in order to protect the event * loop (ae.c) and avoid potential race conditions. */ - int acquiring; - atomicGet(server.module_gil_acquiring, acquiring); + int acquiring = atomic_load_explicit(&server.module_gil_acquiring, memory_order_relaxed); if (!acquiring) { /* If the main thread has not yet entered the acquiring GIL state, * we attempt to wake it up and exit without waiting for it to diff --git a/src/networking.c b/src/networking.c index 5aa02e8315..dc36f7d9f1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -28,7 +28,6 @@ */ #include "server.h" -#include "atomicvar.h" #include "cluster.h" #include "script.h" #include "fpconv_dtoa.h" @@ -37,6 +36,7 @@ #include #include #include +#include static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); @@ -132,8 +132,7 @@ client *createClient(connection *conn) { } c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size); selectDb(c,0); - uint64_t client_id; - atomicGetIncr(server.next_client_id, client_id, 1); + uint64_t client_id = atomic_fetch_add_explicit(&server.next_client_id,1,memory_order_relaxed); c->id = client_id; #ifdef LOG_REQ_RES reqresReset(c, 0); @@ -1963,7 +1962,7 @@ int _writeToClient(client *c, ssize_t *nwritten) { * thread safe. */ int writeToClient(client *c, int handler_installed) { /* Update total number of writes on server */ - atomicIncr(server.stat_total_writes_processed, 1); + atomic_fetch_add_explicit(&server.stat_total_writes_processed,1, memory_order_relaxed); ssize_t nwritten = 0, totwritten = 0; @@ -1990,9 +1989,9 @@ int writeToClient(client *c, int handler_installed) { } if (getClientType(c) == CLIENT_TYPE_SLAVE) { - atomicIncr(server.stat_net_repl_output_bytes, totwritten); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, totwritten, memory_order_relaxed); } else { - atomicIncr(server.stat_net_output_bytes, totwritten); + atomic_fetch_add_explicit(&server.stat_net_output_bytes, totwritten, memory_order_relaxed); } c->net_output_bytes += totwritten; @@ -2649,7 +2648,7 @@ void readQueryFromClient(connection *conn) { if (postponeClientRead(c)) return; /* Update total number of reads on server */ - atomicIncr(server.stat_total_reads_processed, 1); + atomic_fetch_add_explicit(&server.stat_total_reads_processed,1,memory_order_relaxed); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply @@ -2718,9 +2717,9 @@ void readQueryFromClient(connection *conn) { c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) { c->read_reploff += nread; - atomicIncr(server.stat_net_repl_input_bytes, nread); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread,memory_order_relaxed); } else { - atomicIncr(server.stat_net_input_bytes, nread); + atomic_fetch_add_explicit(&server.stat_net_input_bytes,nread,memory_order_relaxed); } c->net_input_bytes += nread; @@ -2738,7 +2737,7 @@ void readQueryFromClient(connection *conn) { sdsfree(ci); sdsfree(bytes); freeClientAsync(c); - atomicIncr(server.stat_client_qbuf_limit_disconnections, 1); + atomic_fetch_add_explicit(&server.stat_client_qbuf_limit_disconnections,1,memory_order_relaxed); goto done; } @@ -4210,7 +4209,7 @@ void processEventsWhileBlocked(void) { #endif typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) threads_pending { - serverAtomic unsigned long value; + _Atomic unsigned long value; } threads_pending; pthread_t io_threads[IO_THREADS_MAX_NUM]; @@ -4224,13 +4223,12 @@ int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_ list *io_threads_list[IO_THREADS_MAX_NUM]; static inline unsigned long getIOPendingCount(int i) { - unsigned long count = 0; - atomicGetWithSync(io_threads_pending[i].value, count); + unsigned long count = atomic_load(&io_threads_pending[i].value); return count; } static inline void setIOPendingCount(int i, unsigned long count) { - atomicSetWithSync(io_threads_pending[i].value, count); + atomic_store(&io_threads_pending[i].value, count); } void *IOThreadMain(void *myid) { diff --git a/src/server.c b/src/server.c index b968edac36..e933b19fc3 100644 --- a/src/server.c +++ b/src/server.c @@ -33,7 +33,6 @@ #include "slowlog.h" #include "bio.h" #include "latency.h" -#include "atomicvar.h" #include "mt19937-64.h" #include "functions.h" #include "hdr_histogram.h" diff --git a/src/server.h b/src/server.h index af6797ccfc..2e6eaf4cba 100644 --- a/src/server.h +++ b/src/server.h @@ -34,7 +34,6 @@ #include "config.h" #include "solarisfixes.h" #include "rio.h" -#include "atomicvar.h" #include "commands.h" #include diff --git a/src/threads_mngr.c b/src/threads_mngr.c index 2d71ffacd0..d09584c35b 100644 --- a/src/threads_mngr.c +++ b/src/threads_mngr.c @@ -32,12 +32,12 @@ #define UNUSED(V) ((void) V) #ifdef __linux__ -#include "atomicvar.h" #include "server.h" #include #include #include +#include #define IN_PROGRESS 1 static const clock_t RUN_ON_THREADS_TIMEOUT = 2; @@ -46,10 +46,10 @@ static const clock_t RUN_ON_THREADS_TIMEOUT = 2; static run_on_thread_cb g_callback = NULL; static volatile size_t g_tids_len = 0; -static serverAtomic size_t g_num_threads_done = 0; +static _Atomic size_t g_num_threads_done = 0; /* This flag is set while ThreadsManager_runOnThreads is running */ -static serverAtomic int g_in_progress = 0; +static _Atomic int g_in_progress = 0; /*============================ Internal prototypes ========================== */ @@ -112,9 +112,8 @@ int ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb c static int test_and_start(void) { - /* atomicFlagGetSet sets the variable to 1 and returns the previous value */ - int prev_state; - atomicFlagGetSet(g_in_progress, prev_state); + /* atomic_exchange_expicit sets the variable to 1 and returns the previous value */ + int prev_state = atomic_exchange_explicit(&g_in_progress,1,memory_order_relaxed); /* If prev_state is 1, g_in_progress was on. */ return prev_state; @@ -126,7 +125,7 @@ static void invoke_callback(int sig) { run_on_thread_cb callback = g_callback; if (callback) { callback(); - atomicIncr(g_num_threads_done, 1); + atomic_fetch_add_explicit(&g_num_threads_done,1,memory_order_relaxed); } else { serverLogFromHandler(LL_WARNING, "tid %ld: ThreadsManager g_callback is NULL", syscall(SYS_gettid)); } @@ -150,7 +149,7 @@ static void wait_threads(void) { /* Sleep a bit to yield to other threads. */ /* usleep isn't listed as signal safe, so we use select instead */ select(0, NULL, NULL, NULL, &tv); - atomicGet(g_num_threads_done, curr_done_count); + curr_done_count = atomic_load_explicit(&g_num_threads_done,memory_order_relaxed); clock_gettime(CLOCK_REALTIME, &curr_time); } while (curr_done_count < g_tids_len && curr_time.tv_sec <= timeout_time.tv_sec); @@ -167,8 +166,7 @@ static void ThreadsManager_cleanups(void) { g_num_threads_done = 0; /* Lastly, turn off g_in_progress */ - atomicSet(g_in_progress, 0); - + atomic_store_explicit(&g_in_progress,0,memory_order_relaxed); } #else diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index ae515a1579..5ad9db120b 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -41,6 +41,7 @@ #include #include #include +#include #include /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */ #include /* Use hiredis sds. */ @@ -54,7 +55,6 @@ #include "adlist.h" #include "dict.h" #include "zmalloc.h" -#include "atomicvar.h" #include "crc16_slottable.h" #include "hdr_histogram.h" #include "cli_common.h" @@ -85,11 +85,11 @@ static struct config { int tls; struct cliSSLconfig sslconfig; int numclients; - serverAtomic int liveclients; + _Atomic int liveclients; int requests; - serverAtomic int requests_issued; - serverAtomic int requests_finished; - serverAtomic int previous_requests_finished; + _Atomic int requests_issued; + _Atomic int requests_finished; + _Atomic int previous_requests_finished; int last_printed_bytes; long long previous_tick; int keysize; @@ -118,9 +118,9 @@ static struct config { struct serverConfig *redis_config; struct hdr_histogram* latency_histogram; struct hdr_histogram* current_sec_latency_histogram; - serverAtomic int is_fetching_slots; - serverAtomic int is_updating_slots; - serverAtomic int slots_last_update; + _Atomic int is_fetching_slots; + _Atomic int is_updating_slots; + _Atomic int slots_last_update; int enable_tracking; pthread_mutex_t liveclients_mutex; pthread_mutex_t is_updating_slots_mutex; @@ -356,8 +356,7 @@ static void freeClient(client c) { aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); aeDeleteFileEvent(el,c->context->fd,AE_READABLE); if (c->thread_id >= 0) { - int requests_finished = 0; - atomicGet(config.requests_finished, requests_finished); + int requests_finished = atomic_load_explicit(&config.requests_finished,memory_order_relaxed); if (requests_finished >= config.requests) { aeStop(el); } @@ -416,8 +415,7 @@ static void setClusterKeyHashTag(client c) { assert(c->thread_id >= 0); clusterNode *node = c->cluster_node; assert(node); - int is_updating_slots = 0; - atomicGet(config.is_updating_slots, is_updating_slots); + int is_updating_slots = atomic_load_explicit(&config.is_updating_slots,memory_order_relaxed); /* If updateClusterSlotsConfiguration is updating the slots array, * call updateClusterSlotsConfiguration is order to block the thread * since the mutex is locked. When the slots will be updated by the @@ -438,8 +436,7 @@ static void setClusterKeyHashTag(client c) { } static void clientDone(client c) { - int requests_finished = 0; - atomicGet(config.requests_finished, requests_finished); + int requests_finished = atomic_load_explicit(&config.requests_finished,memory_order_relaxed); if (requests_finished >= config.requests) { freeClient(c); if (!config.num_threads && config.el) aeStop(config.el); @@ -540,8 +537,7 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } continue; } - int requests_finished = 0; - atomicGetIncr(config.requests_finished, requests_finished, 1); + int requests_finished = atomic_fetch_add_explicit(&config.requests_finished,1,memory_order_relaxed); if (requests_finished < config.requests){ if (config.num_threads == 0) { hdr_record_value( @@ -580,8 +576,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Initialize request when nothing was written. */ if (c->written == 0) { /* Enforce upper bound to number of requests. */ - int requests_issued = 0; - atomicGetIncr(config.requests_issued, requests_issued, config.pipeline); + int requests_issued = atomic_fetch_add_explicit(&config.requests_issued,config.pipeline,memory_order_relaxed); if (requests_issued >= config.requests) { return; } @@ -589,7 +584,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Really initialize: randomize keys and set start time. */ if (config.randomkeys) randomizeClientKey(c); if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c); - atomicGet(config.slots_last_update, c->slots_last_update); + c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); c->start = ustime(); c->latency = -1; } @@ -825,8 +820,9 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c); listAddNodeTail(config.clients,c); - atomicIncr(config.liveclients, 1); - atomicGet(config.slots_last_update, c->slots_last_update); + atomic_fetch_add_explicit(&config.liveclients, 1, memory_order_relaxed); + + c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); return c; } @@ -1272,15 +1268,17 @@ static int fetchClusterSlotsConfiguration(client c) { UNUSED(c); int success = 1, is_fetching_slots = 0, last_update = 0; size_t i; - atomicGet(config.slots_last_update, last_update); + + last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); if (c->slots_last_update < last_update) { c->slots_last_update = last_update; return -1; } redisReply *reply = NULL; - atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1); + + is_fetching_slots = atomic_fetch_add_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); if (is_fetching_slots) return -1; //TODO: use other codes || errno ? - atomicSet(config.is_fetching_slots, 1); + atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); const char *errmsg = "Failed to update cluster slots configuration"; @@ -1354,14 +1352,15 @@ static int fetchClusterSlotsConfiguration(client c) { freeReplyObject(reply); redisFree(ctx); dictRelease(masters); - atomicSet(config.is_fetching_slots, 0); + atomic_store_explicit(&config.is_fetching_slots, 0, memory_order_relaxed); return success; } /* Atomically update the new slots configuration. */ static void updateClusterSlotsConfiguration(void) { pthread_mutex_lock(&config.is_updating_slots_mutex); - atomicSet(config.is_updating_slots, 1); + atomic_store_explicit(&config.is_updating_slots, 1, memory_order_relaxed); + int i; for (i = 0; i < config.cluster_node_count; i++) { clusterNode *node = config.cluster_nodes[i]; @@ -1374,8 +1373,8 @@ static void updateClusterSlotsConfiguration(void) { zfree(oldslots); } } - atomicSet(config.is_updating_slots, 0); - atomicIncr(config.slots_last_update, 1); + atomic_store_explicit(&config.is_updating_slots, 0, memory_order_relaxed); + atomic_fetch_add_explicit(&config.slots_last_update, 1, memory_order_relaxed); pthread_mutex_unlock(&config.is_updating_slots_mutex); } @@ -1662,13 +1661,10 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData UNUSED(eventLoop); UNUSED(id); benchmarkThread *thread = (benchmarkThread *)clientData; - int liveclients = 0; - int requests_finished = 0; - int previous_requests_finished = 0; + int liveclients = atomic_load_explicit(&config.liveclients, memory_order_relaxed); + int requests_finished = atomic_load_explicit(&config.requests_finished, memory_order_relaxed); + int previous_requests_finished = atomic_load_explicit(&config.previous_requests_finished, memory_order_relaxed); long long current_tick = mstime(); - atomicGet(config.liveclients, liveclients); - atomicGet(config.requests_finished, requests_finished); - atomicGet(config.previous_requests_finished, previous_requests_finished); if (liveclients == 0 && requests_finished != config.requests) { fprintf(stderr,"All clients disconnected... aborting.\n"); @@ -1693,7 +1689,7 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData const float instantaneous_dt = (float)(current_tick-config.previous_tick)/1000.0; const float instantaneous_rps = (float)(requests_finished-previous_requests_finished)/instantaneous_dt; config.previous_tick = current_tick; - atomicSet(config.previous_requests_finished,requests_finished); + atomic_store_explicit(&config.previous_requests_finished, requests_finished, memory_order_relaxed); printf("%*s\r", config.last_printed_bytes, " "); /* ensure there is a clean line */ int printed_bytes = printf("%s: rps=%.1f (overall: %.1f) avg_msec=%.3f (overall: %.3f)\r", config.title, instantaneous_rps, rps, hdr_mean(config.current_sec_latency_histogram)/1000.0f, hdr_mean(config.latency_histogram)/1000.0f); config.last_printed_bytes = printed_bytes;