Skip to content

Commit

Permalink
replace atomicvar header file
Browse files Browse the repository at this point in the history
  • Loading branch information
teej4y committed May 14, 2024
1 parent 7614c66 commit dd2b27e
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 82 deletions.
16 changes: 7 additions & 9 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ void stopAppendOnly(void) {
server.aof_last_incr_size = 0;
server.aof_last_incr_fsync_offset = 0;
server.fsynced_reploff = -1;
atomicSet(server.fsynced_reploff_pending, 0);
atomic_store_explicit(&server.fsynced_reploff_pending, 0, memory_order_relaxed);
killAppendOnlyChild();
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
Expand Down Expand Up @@ -1000,12 +1000,11 @@ int startAppendOnly(void) {
}
server.aof_last_fsync = server.mstime;
/* If AOF fsync error in bio job, we just ignore it and log the event. */
int aof_bio_fsync_status;
atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status);
int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed);
if (aof_bio_fsync_status == C_ERR) {
serverLog(LL_WARNING,
"AOF reopen, just ignore the AOF fsync error in bio job");
atomicSet(server.aof_bio_fsync_status,C_OK);
atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed);
}

/* If AOF was in error state, we just ignore it and log the event. */
Expand Down Expand Up @@ -1093,7 +1092,7 @@ void flushAppendOnlyFile(int force) {
* (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on
* the higher offset (which contains data that was only propagated to replicas, and not to AOF) */
if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO)
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
return;
}
}
Expand Down Expand Up @@ -1261,7 +1260,7 @@ void flushAppendOnlyFile(int force) {
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.mstime;
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.mstime - server.aof_last_fsync >= 1000) {
if (!sync_in_progress) {
Expand Down Expand Up @@ -2463,7 +2462,7 @@ int rewriteAppendOnlyFileBackground(void) {

/* Set the initial repl_offset, which will be applied to fsynced_reploff
* when AOFRW finishes (after possibly being updated by a bio thread) */
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
server.fsynced_reploff = 0;
}

Expand Down Expand Up @@ -2715,8 +2714,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
/* Update the fsynced replication offset that just now become valid.
* This could either be the one we took in startAppendOnly, or a
* newer one set by the bio thread. */
long long fsynced_reploff_pending;
atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending);
long long fsynced_reploff_pending = atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed);
server.fsynced_reploff = fsynced_reploff_pending;
}

Expand Down
12 changes: 6 additions & 6 deletions src/bio.c
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,17 @@ void *bioProcessBackgroundJobs(void *arg) {
if (valkey_fsync(job->fd_args.fd) == -1 &&
errno != EBADF && errno != EINVAL)
{
int last_status;
atomicGet(server.aof_bio_fsync_status,last_status);
atomicSet(server.aof_bio_fsync_status,C_ERR);
atomicSet(server.aof_bio_fsync_errno,errno);
int last_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed);

atomic_store_explicit(&server.aof_bio_fsync_status, C_ERR, memory_order_relaxed);
atomic_store_explicit(&server.aof_bio_fsync_errno, errno, memory_order_relaxed);
if (last_status == C_OK) {
serverLog(LL_WARNING,
"Fail to fsync the AOF file: %s",strerror(errno));
}
} else {
atomicSet(server.aof_bio_fsync_status,C_OK);
atomicSet(server.fsynced_reploff_pending, job->fd_args.offset);
atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, job->fd_args.offset, memory_order_relaxed);
}

if (job->fd_args.need_reclaim_cache) {
Expand Down
1 change: 0 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

#include "server.h"
#include "cluster.h"
#include "atomicvar.h"
#include "latency.h"
#include "script.h"
#include "functions.h"
Expand Down
1 change: 0 additions & 1 deletion src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

#include "server.h"
#include "bio.h"
#include "atomicvar.h"
#include "script.h"
#include <math.h>

Expand Down
1 change: 0 additions & 1 deletion src/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "sds.h"
#include "dict.h"
#include "adlist.h"
#include "atomicvar.h"

#define LOAD_TIMEOUT_MS 500

Expand Down
1 change: 0 additions & 1 deletion src/lazyfree.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "server.h"
#include "bio.h"
#include "atomicvar.h"
#include "functions.h"
#include "cluster.h"

Expand Down
3 changes: 1 addition & 2 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -2433,8 +2433,7 @@ void VM_Yield(ValkeyModuleCtx *ctx, int flags, const char *busy_reply) {
* after the main thread enters acquiring GIL state in order to protect the event
* loop (ae.c) and avoid potential race conditions. */

int acquiring;
atomicGet(server.module_gil_acquiring, acquiring);
int acquiring = atomic_load_explicit(&server.module_gil_acquiring, memory_order_relaxed);
if (!acquiring) {
/* If the main thread has not yet entered the acquiring GIL state,
* we attempt to wake it up and exit without waiting for it to
Expand Down
26 changes: 12 additions & 14 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
*/

#include "server.h"
#include "atomicvar.h"
#include "cluster.h"
#include "script.h"
#include "fpconv_dtoa.h"
Expand All @@ -37,6 +36,7 @@
#include <sys/uio.h>
#include <math.h>
#include <ctype.h>
#include <stdatomic.h>

static void setProtocolError(const char *errstr, client *c);
static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
Expand Down Expand Up @@ -132,8 +132,7 @@ client *createClient(connection *conn) {
}
c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size);
selectDb(c,0);
uint64_t client_id;
atomicGetIncr(server.next_client_id, client_id, 1);
uint64_t client_id = atomic_fetch_add_explicit(&server.next_client_id,1,memory_order_relaxed);
c->id = client_id;
#ifdef LOG_REQ_RES
reqresReset(c, 0);
Expand Down Expand Up @@ -1963,7 +1962,7 @@ int _writeToClient(client *c, ssize_t *nwritten) {
* thread safe. */
int writeToClient(client *c, int handler_installed) {
/* Update total number of writes on server */
atomicIncr(server.stat_total_writes_processed, 1);
atomic_fetch_add_explicit(&server.stat_total_writes_processed,1, memory_order_relaxed);

ssize_t nwritten = 0, totwritten = 0;

Expand All @@ -1990,9 +1989,9 @@ int writeToClient(client *c, int handler_installed) {
}

if (getClientType(c) == CLIENT_TYPE_SLAVE) {
atomicIncr(server.stat_net_repl_output_bytes, totwritten);
atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, totwritten, memory_order_relaxed);
} else {
atomicIncr(server.stat_net_output_bytes, totwritten);
atomic_fetch_add_explicit(&server.stat_net_output_bytes, totwritten, memory_order_relaxed);
}
c->net_output_bytes += totwritten;

Expand Down Expand Up @@ -2649,7 +2648,7 @@ void readQueryFromClient(connection *conn) {
if (postponeClientRead(c)) return;

/* Update total number of reads on server */
atomicIncr(server.stat_total_reads_processed, 1);
atomic_fetch_add_explicit(&server.stat_total_reads_processed,1,memory_order_relaxed);

readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
Expand Down Expand Up @@ -2718,9 +2717,9 @@ void readQueryFromClient(connection *conn) {
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) {
c->read_reploff += nread;
atomicIncr(server.stat_net_repl_input_bytes, nread);
atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread,memory_order_relaxed);
} else {
atomicIncr(server.stat_net_input_bytes, nread);
atomic_fetch_add_explicit(&server.stat_net_input_bytes,nread,memory_order_relaxed);
}
c->net_input_bytes += nread;

Expand All @@ -2738,7 +2737,7 @@ void readQueryFromClient(connection *conn) {
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
atomicIncr(server.stat_client_qbuf_limit_disconnections, 1);
atomic_fetch_add_explicit(&server.stat_client_qbuf_limit_disconnections,1,memory_order_relaxed);
goto done;
}

Expand Down Expand Up @@ -4210,7 +4209,7 @@ void processEventsWhileBlocked(void) {
#endif

typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) threads_pending {
serverAtomic unsigned long value;
_Atomic unsigned long value;
} threads_pending;

pthread_t io_threads[IO_THREADS_MAX_NUM];
Expand All @@ -4224,13 +4223,12 @@ int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_
list *io_threads_list[IO_THREADS_MAX_NUM];

static inline unsigned long getIOPendingCount(int i) {
unsigned long count = 0;
atomicGetWithSync(io_threads_pending[i].value, count);
unsigned long count = atomic_load(&io_threads_pending[i].value);
return count;
}

static inline void setIOPendingCount(int i, unsigned long count) {
atomicSetWithSync(io_threads_pending[i].value, count);
atomic_store(&io_threads_pending[i].value, count);
}

void *IOThreadMain(void *myid) {
Expand Down
1 change: 0 additions & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "slowlog.h"
#include "bio.h"
#include "latency.h"
#include "atomicvar.h"
#include "mt19937-64.h"
#include "functions.h"
#include "hdr_histogram.h"
Expand Down
1 change: 0 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "config.h"
#include "solarisfixes.h"
#include "rio.h"
#include "atomicvar.h"
#include "commands.h"

#include <stdio.h>
Expand Down
18 changes: 8 additions & 10 deletions src/threads_mngr.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
#define UNUSED(V) ((void) V)

#ifdef __linux__
#include "atomicvar.h"
#include "server.h"

#include <signal.h>
#include <time.h>
#include <sys/syscall.h>
#include <stdatomic.h>

#define IN_PROGRESS 1
static const clock_t RUN_ON_THREADS_TIMEOUT = 2;
Expand All @@ -46,10 +46,10 @@ static const clock_t RUN_ON_THREADS_TIMEOUT = 2;

static run_on_thread_cb g_callback = NULL;
static volatile size_t g_tids_len = 0;
static serverAtomic size_t g_num_threads_done = 0;
static _Atomic size_t g_num_threads_done = 0;

/* This flag is set while ThreadsManager_runOnThreads is running */
static serverAtomic int g_in_progress = 0;
static _Atomic int g_in_progress = 0;

/*============================ Internal prototypes ========================== */

Expand Down Expand Up @@ -112,9 +112,8 @@ int ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb c


static int test_and_start(void) {
/* atomicFlagGetSet sets the variable to 1 and returns the previous value */
int prev_state;
atomicFlagGetSet(g_in_progress, prev_state);
/* atomic_exchange_expicit sets the variable to 1 and returns the previous value */
int prev_state = atomic_exchange_explicit(&g_in_progress,1,memory_order_relaxed);

/* If prev_state is 1, g_in_progress was on. */
return prev_state;
Expand All @@ -126,7 +125,7 @@ static void invoke_callback(int sig) {
run_on_thread_cb callback = g_callback;
if (callback) {
callback();
atomicIncr(g_num_threads_done, 1);
atomic_fetch_add_explicit(&g_num_threads_done,1,memory_order_relaxed);
} else {
serverLogFromHandler(LL_WARNING, "tid %ld: ThreadsManager g_callback is NULL", syscall(SYS_gettid));
}
Expand All @@ -150,7 +149,7 @@ static void wait_threads(void) {
/* Sleep a bit to yield to other threads. */
/* usleep isn't listed as signal safe, so we use select instead */
select(0, NULL, NULL, NULL, &tv);
atomicGet(g_num_threads_done, curr_done_count);
curr_done_count = atomic_load_explicit(&g_num_threads_done,memory_order_relaxed);
clock_gettime(CLOCK_REALTIME, &curr_time);
} while (curr_done_count < g_tids_len &&
curr_time.tv_sec <= timeout_time.tv_sec);
Expand All @@ -167,8 +166,7 @@ static void ThreadsManager_cleanups(void) {
g_num_threads_done = 0;

/* Lastly, turn off g_in_progress */
atomicSet(g_in_progress, 0);

atomic_store_explicit(&g_in_progress,0,memory_order_relaxed);
}
#else

Expand Down
Loading

0 comments on commit dd2b27e

Please sign in to comment.