Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/unstable' into atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
lipzhu committed Jun 13, 2024
2 parents 0fa87fa + b546dd2 commit 8c18b84
Show file tree
Hide file tree
Showing 89 changed files with 3,956 additions and 3,749 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: make
# build with TLS module just for compilation coverage
run: make SANITIZER=address SERVER_CFLAGS='-Werror -DDEBUG_ASSERTIONS' BUILD_TLS=module
run: make SANITIZER=address SERVER_CFLAGS='-Werror' BUILD_TLS=module
- name: testprep
run: sudo apt-get install tcl8.6 tclx -y
- name: test
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
- name: Check for formatting changes
if: ${{ steps.clang-format.outputs.diff }}
run: |
echo "Code is not formatted correctly. Here is the diff:"
echo "ERROR: Code is not formatted correctly. Here is the diff:"
# Decode the Base64 diff to display it
echo "${{ steps.clang-format.outputs.diff }}" | base64 --decode
exit 1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ jobs:
repository: ${{ env.GITHUB_REPOSITORY }}
ref: ${{ env.GITHUB_HEAD_REF }}
- name: make
run: make all-with-unit-tests OPT=-O3 SANITIZER=address SERVER_CFLAGS='-DSERVER_TEST -Werror -DDEBUG_ASSERTIONS'
run: make all-with-unit-tests OPT=-O3 SANITIZER=address SERVER_CFLAGS='-DSERVER_TEST -Werror'
- name: testprep
run: |
sudo apt-get update
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ redis.code-workspace
.cache
.cscope*
.swp
nodes.conf
tests/cluster/tmp/*
4 changes: 4 additions & 0 deletions COPYING
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# License 1

BSD 3-Clause License

Copyright (c) 2024-present, Valkey contributors
All rights reserved.

Expand All @@ -13,6 +15,8 @@ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND

# License 2

BSD 3-Clause License

Copyright (c) 2006-2020, Salvatore Sanfilippo
All rights reserved.

Expand Down
5 changes: 5 additions & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ DEBUG=-g -ggdb
# Linux ARM32 needs -latomic at linking time
ifneq (,$(findstring armv,$(uname_M)))
FINAL_LIBS+=-latomic
else
# Linux POWER needs -latomic at linking time
ifneq (,$(findstring ppc,$(uname_M)))
FINAL_LIBS+=-latomic
endif
endif

ifeq ($(uname_S),SunOS)
Expand Down
40 changes: 15 additions & 25 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ void ACLFreeUserAndKillClients(user *u) {
* more defensive to set the default user and put
* it in non authenticated mode. */
c->user = DefaultUser;
c->authenticated = 0;
c->flags &= ~CLIENT_AUTHENTICATED;
/* We will write replies to this client later, so we can't
* close it directly even if async. */
if (c == server.current_client) {
Expand Down Expand Up @@ -1494,7 +1494,7 @@ void addAuthErrReply(client *c, robj *err) {
* The return value is AUTH_OK on success (valid username / password pair) & AUTH_ERR otherwise. */
int checkPasswordBasedAuth(client *c, robj *username, robj *password) {
if (ACLCheckUserCredentials(username, password) == C_OK) {
c->authenticated = 1;
c->flags |= CLIENT_AUTHENTICATED;
c->user = ACLGetUserByName(username->ptr, sdslen(username->ptr));
moduleNotifyUserChanged(c);
return AUTH_OK;
Expand Down Expand Up @@ -1587,12 +1587,10 @@ static int ACLSelectorCheckKey(aclSelector *selector, const char *key, int keyle
listRewind(selector->patterns, &li);

int key_flags = 0;
/* clang-format off */
if (keyspec_flags & CMD_KEY_ACCESS) key_flags |= ACL_READ_PERMISSION;
if (keyspec_flags & CMD_KEY_INSERT) key_flags |= ACL_WRITE_PERMISSION;
if (keyspec_flags & CMD_KEY_DELETE) key_flags |= ACL_WRITE_PERMISSION;
if (keyspec_flags & CMD_KEY_UPDATE) key_flags |= ACL_WRITE_PERMISSION;
/* clang-format on */

/* Test this key against every pattern. */
while ((ln = listNext(&li))) {
Expand All @@ -1618,12 +1616,10 @@ static int ACLSelectorHasUnrestrictedKeyAccess(aclSelector *selector, int flags)
listRewind(selector->patterns, &li);

int access_flags = 0;
/* clang-format off */
if (flags & CMD_KEY_ACCESS) access_flags |= ACL_READ_PERMISSION;
if (flags & CMD_KEY_INSERT) access_flags |= ACL_WRITE_PERMISSION;
if (flags & CMD_KEY_DELETE) access_flags |= ACL_WRITE_PERMISSION;
if (flags & CMD_KEY_UPDATE) access_flags |= ACL_WRITE_PERMISSION;
/* clang-format on */

/* Test this key against every pattern. */
while ((ln = listNext(&li))) {
Expand Down Expand Up @@ -2669,15 +2665,13 @@ void addACLLogEntry(client *c, int reason, int context, int argpos, sds username
if (object) {
le->object = object;
} else {
/* clang-format off */
switch(reason) {
switch (reason) {
case ACL_DENIED_CMD: le->object = sdsdup(c->cmd->fullname); break;
case ACL_DENIED_KEY: le->object = sdsdup(c->argv[argpos]->ptr); break;
case ACL_DENIED_CHANNEL: le->object = sdsdup(c->argv[argpos]->ptr); break;
case ACL_DENIED_AUTH: le->object = sdsdup(c->argv[0]->ptr); break;
default: le->object = sdsempty();
}
/* clang-format on */
}

/* if we have a real client from the network, use it (could be missing on module timers) */
Expand Down Expand Up @@ -3058,28 +3052,24 @@ void aclCommand(client *c) {

addReplyBulkCString(c, "reason");
char *reasonstr;
/* clang-format off */
switch(le->reason) {
case ACL_DENIED_CMD: reasonstr="command"; break;
case ACL_DENIED_KEY: reasonstr="key"; break;
case ACL_DENIED_CHANNEL: reasonstr="channel"; break;
case ACL_DENIED_AUTH: reasonstr="auth"; break;
default: reasonstr="unknown";
switch (le->reason) {
case ACL_DENIED_CMD: reasonstr = "command"; break;
case ACL_DENIED_KEY: reasonstr = "key"; break;
case ACL_DENIED_CHANNEL: reasonstr = "channel"; break;
case ACL_DENIED_AUTH: reasonstr = "auth"; break;
default: reasonstr = "unknown";
}
/* clang-format on */
addReplyBulkCString(c, reasonstr);

addReplyBulkCString(c, "context");
char *ctxstr;
/* clang-format off */
switch(le->context) {
case ACL_LOG_CTX_TOPLEVEL: ctxstr="toplevel"; break;
case ACL_LOG_CTX_MULTI: ctxstr="multi"; break;
case ACL_LOG_CTX_LUA: ctxstr="lua"; break;
case ACL_LOG_CTX_MODULE: ctxstr="module"; break;
default: ctxstr="unknown";
switch (le->context) {
case ACL_LOG_CTX_TOPLEVEL: ctxstr = "toplevel"; break;
case ACL_LOG_CTX_MULTI: ctxstr = "multi"; break;
case ACL_LOG_CTX_LUA: ctxstr = "lua"; break;
case ACL_LOG_CTX_MODULE: ctxstr = "module"; break;
default: ctxstr = "unknown";
}
/* clang-format on */
addReplyBulkCString(c, ctxstr);

addReplyBulkCString(c, "object");
Expand Down
65 changes: 31 additions & 34 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -904,12 +904,12 @@ int aofFsyncInProgress(void) {
/* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */
void aof_background_fsync(int fd) {
bioCreateFsyncJob(fd, server.master_repl_offset, 1);
bioCreateFsyncJob(fd, server.primary_repl_offset, 1);
}

/* Close the fd on the basis of aof_background_fsync. */
void aof_background_fsync_and_close(int fd) {
bioCreateCloseAofJob(fd, server.master_repl_offset, 1);
bioCreateCloseAofJob(fd, server.primary_repl_offset, 1);
}

/* Kills an AOFRW child process if exists */
Expand Down Expand Up @@ -1069,11 +1069,12 @@ void flushAppendOnlyFile(int force) {
} else {
/* All data is fsync'd already: Update fsynced_reploff_pending just in case.
* This is needed to avoid a WAITAOF hang in case a module used RM_Call with the NO_AOF flag,
* in which case master_repl_offset will increase but fsynced_reploff_pending won't be updated
* in which case primary_repl_offset will increase but fsynced_reploff_pending won't be updated
* (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on
* the higher offset (which contains data that was only propagated to replicas, and not to AOF) */
if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO)
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset,
memory_order_relaxed);
return;
}
}
Expand Down Expand Up @@ -1243,7 +1244,7 @@ void flushAppendOnlyFile(int force) {
latencyAddSampleIfNeeded("aof-fsync-always", latency);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.mstime;
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset, memory_order_relaxed);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.mstime - server.aof_last_fsync >= 1000) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
Expand Down Expand Up @@ -1355,7 +1356,7 @@ struct client *createAOFClient(void) {
c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */

/*
* The AOF client should never be blocked (unlike master
* The AOF client should never be blocked (unlike primary
* replication connection).
* This is because blocking the AOF client might cause
* deadlock (because potentially no one will unblock it).
Expand All @@ -1365,9 +1366,9 @@ struct client *createAOFClient(void) {
*/
c->flags = CLIENT_DENY_BLOCKING;

/* We set the fake client as a slave waiting for the synchronization
/* We set the fake client as a replica waiting for the synchronization
* so that the server will not try to send replies to this client. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
return c;
}

Expand Down Expand Up @@ -1993,21 +1994,19 @@ int rioWriteStreamPendingEntry(rio *r,
RETRYCOUNT <count> JUSTID FORCE. */
streamID id;
streamDecodeID(rawid, &id);
/* clang-format off */
if (rioWriteBulkCount(r,'*',12) == 0) return 0;
if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
if (rioWriteBulkString(r,"TIME",4) == 0) return 0;
if (rioWriteBulkLongLong(r,nack->delivery_time) == 0) return 0;
if (rioWriteBulkString(r,"RETRYCOUNT",10) == 0) return 0;
if (rioWriteBulkLongLong(r,nack->delivery_count) == 0) return 0;
if (rioWriteBulkString(r,"JUSTID",6) == 0) return 0;
if (rioWriteBulkString(r,"FORCE",5) == 0) return 0;
/* clang-format on */
if (rioWriteBulkCount(r, '*', 12) == 0) return 0;
if (rioWriteBulkString(r, "XCLAIM", 6) == 0) return 0;
if (rioWriteBulkObject(r, key) == 0) return 0;
if (rioWriteBulkString(r, groupname, groupname_len) == 0) return 0;
if (rioWriteBulkString(r, consumer->name, sdslen(consumer->name)) == 0) return 0;
if (rioWriteBulkString(r, "0", 1) == 0) return 0;
if (rioWriteBulkStreamID(r, &id) == 0) return 0;
if (rioWriteBulkString(r, "TIME", 4) == 0) return 0;
if (rioWriteBulkLongLong(r, nack->delivery_time) == 0) return 0;
if (rioWriteBulkString(r, "RETRYCOUNT", 10) == 0) return 0;
if (rioWriteBulkLongLong(r, nack->delivery_count) == 0) return 0;
if (rioWriteBulkString(r, "JUSTID", 6) == 0) return 0;
if (rioWriteBulkString(r, "FORCE", 5) == 0) return 0;
return 1;
}

Expand All @@ -2020,14 +2019,12 @@ int rioWriteStreamEmptyConsumer(rio *r,
size_t groupname_len,
streamConsumer *consumer) {
/* XGROUP CREATECONSUMER <key> <group> <consumer> */
/* clang-format off */
if (rioWriteBulkCount(r,'*',5) == 0) return 0;
if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0;
if (rioWriteBulkString(r,"CREATECONSUMER",14) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
/* clang-format on */
if (rioWriteBulkCount(r, '*', 5) == 0) return 0;
if (rioWriteBulkString(r, "XGROUP", 6) == 0) return 0;
if (rioWriteBulkString(r, "CREATECONSUMER", 14) == 0) return 0;
if (rioWriteBulkObject(r, key) == 0) return 0;
if (rioWriteBulkString(r, groupname, groupname_len) == 0) return 0;
if (rioWriteBulkString(r, consumer->name, sdslen(consumer->name)) == 0) return 0;
return 1;
}

Expand Down Expand Up @@ -2320,7 +2317,7 @@ int rewriteAppendOnlyFile(char *filename) {

if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(SLAVE_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
errno = error;
goto werr;
}
Expand Down Expand Up @@ -2403,12 +2400,12 @@ int rewriteAppendOnlyFileBackground(void) {
* between updates to `fsynced_reploff_pending` of the worker thread, belonging
* to the previous AOF, and the new one. This concern is specific for a full
* sync scenario where we don't wanna risk the ACKed replication offset
* jumping backwards or forward when switching to a different master. */
* jumping backwards or forward when switching to a different primary. */
bioDrainWorker(BIO_AOF_FSYNC);

/* Set the initial repl_offset, which will be applied to fsynced_reploff
* when AOFRW finishes (after possibly being updated by a bio thread) */
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset, memory_order_relaxed);
server.fsynced_reploff = 0;
}

Expand Down
6 changes: 3 additions & 3 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void initClientBlockingState(client *c) {
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
/* Master client should never be blocked unless pause or module */
serverAssert(!(c->flags & CLIENT_MASTER && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
serverAssert(!(c->flags & CLIENT_PRIMARY && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));

c->flags |= CLIENT_BLOCKED;
c->bstate.btype = btype;
Expand Down Expand Up @@ -265,8 +265,8 @@ void replyToClientsBlockedOnShutdown(void) {

/* Mass-unblock clients because something changed in the instance that makes
* blocking no longer safe. For example clients blocked in list operations
* in an instance which turns from master to slave is unsafe, so this function
* is called when a master turns into a slave.
* in an instance which turns from master to replica is unsafe, so this function
* is called when a master turns into a replica.
*
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */
Expand Down
Loading

0 comments on commit 8c18b84

Please sign in to comment.