Skip to content

Commit

Permalink
Merge branch 'unstable' into new-clstrbs-msg
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <[email protected]>
  • Loading branch information
roshkhatri committed Jul 11, 2024
2 parents 125ffb7 + 548b4e0 commit d6e5b31
Show file tree
Hide file tree
Showing 138 changed files with 4,359 additions and 2,294 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,10 @@ jobs:
run: sudo apt-get install tcl8.6 tclx
- name: test
if: true && !contains(github.event.inputs.skiptests, 'valkey')
run: ./runtest --config io-threads 4 --config io-threads-do-reads yes --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
run: ./runtest --config io-threads 2 --config events-per-io-thread 0 --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
- name: cluster tests
if: true && !contains(github.event.inputs.skiptests, 'cluster')
run: ./runtest-cluster --config io-threads 4 --config io-threads-do-reads yes ${{github.event.inputs.cluster_test_args}}
run: ./runtest-cluster --config io-threads 2 --config events-per-io-thread 0 ${{github.event.inputs.cluster_test_args}}

test-ubuntu-reclaim-cache:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion deps/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ ifeq ($(uname_S),SunOS)
LUA_CFLAGS= -D__C99FEATURES__=1
endif

LUA_CFLAGS+= -Wall -DLUA_ANSI -DENABLE_CJSON_GLOBAL -DREDIS_STATIC='' -DLUA_USE_MKSTEMP $(CFLAGS)
LUA_CFLAGS+= -Wall -DLUA_ANSI -DENABLE_CJSON_GLOBAL -DLUA_USE_MKSTEMP $(CFLAGS)
LUA_LDFLAGS+= $(LDFLAGS)
ifeq ($(LUA_DEBUG),yes)
LUA_CFLAGS+= -O0 -g -DLUA_USE_APICHECK
Expand Down
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram fpconv
NODEPS:=clean distclean

# Default settings
STD=-pedantic -DSERVER_STATIC=''
STD=-pedantic

# Use -Wno-c11-extensions on clang, either where explicitly used or on
# platforms we can assume it's being used.
Expand Down Expand Up @@ -401,7 +401,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
10 changes: 5 additions & 5 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,11 @@ void ACLFreeUserAndKillClients(user *u) {
* more defensive to set the default user and put
* it in non authenticated mode. */
c->user = DefaultUser;
c->flags &= ~CLIENT_AUTHENTICATED;
c->flag.authenticated = 0;
/* We will write replies to this client later, so we can't
* close it directly even if async. */
if (c == server.current_client) {
c->flags |= CLIENT_CLOSE_AFTER_COMMAND;
c->flag.close_after_command = 1;
} else {
freeClientAsync(c);
}
Expand Down Expand Up @@ -1494,13 +1494,13 @@ void addAuthErrReply(client *c, robj *err) {
* The return value is AUTH_OK on success (valid username / password pair) & AUTH_ERR otherwise. */
int checkPasswordBasedAuth(client *c, robj *username, robj *password) {
if (ACLCheckUserCredentials(username, password) == C_OK) {
c->flags |= CLIENT_AUTHENTICATED;
c->flag.authenticated = 1;
c->user = ACLGetUserByName(username->ptr, sdslen(username->ptr));
moduleNotifyUserChanged(c);
return AUTH_OK;
} else {
addACLLogEntry(c, ACL_DENIED_AUTH, (c->flags & CLIENT_MULTI) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL, 0,
username->ptr, NULL);
addACLLogEntry(c, ACL_DENIED_AUTH, (c->flag.multi) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL, 0, username->ptr,
NULL);
return AUTH_ERR;
}
}
Expand Down
17 changes: 14 additions & 3 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
* is removed. */
if (mask & AE_WRITABLE) mask |= AE_BARRIER;

aeApiDelEvent(eventLoop, fd, mask);
/* Only remove attached events */
mask = mask & fe->mask;

fe->mask = fe->mask & (~mask);
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd */
Expand All @@ -193,6 +195,15 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}

/* Check whether there are events to be removed.
* Note: user may remove the AE_BARRIER without
* touching the actual events. */
if (mask & (AE_READABLE | AE_WRITABLE)) {
/* Must be invoked after the eventLoop mask is modified,
* which is required by evport and epoll */
aeApiDelEvent(eventLoop, fd, mask);
}
}

void *aeGetFileClientData(aeEventLoop *eventLoop, int fd) {
Expand Down Expand Up @@ -392,7 +403,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
}

/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop, numevents);

for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
Expand Down Expand Up @@ -489,6 +500,6 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
eventLoop->beforesleep = beforesleep;
}

void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}
5 changes: 3 additions & 2 deletions src/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData,
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);

/* File event structure */
typedef struct aeFileEvent {
Expand Down Expand Up @@ -107,7 +108,7 @@ typedef struct aeEventLoop {
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
aeAfterSleepProc *aftersleep;
int flags;
} aeEventLoop;

Expand All @@ -130,7 +131,7 @@ int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
Expand Down
6 changes: 4 additions & 2 deletions src/ae_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
return 0;
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int mask = eventLoop->events[fd].mask & (~delmask);

/* We rely on the fact that our caller has already updated the mask in the eventLoop. */
mask = eventLoop->events[fd].mask;

ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
Expand Down
31 changes: 12 additions & 19 deletions src/ae_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,31 +101,24 @@ static void aeApiFree(aeEventLoop *eventLoop) {

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct kevent ke;
struct kevent evs[2];
int nch = 0;

if (mask & AE_READABLE) {
EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
}
if (mask & AE_WRITABLE) {
EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
}
return 0;
if (mask & AE_READABLE) EV_SET(evs + nch++, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (mask & AE_WRITABLE) EV_SET(evs + nch++, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);

return kevent(state->kqfd, evs, nch, NULL, 0, NULL);
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct kevent ke;
struct kevent evs[2];
int nch = 0;

if (mask & AE_READABLE) {
EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
}
if (mask & AE_WRITABLE) {
EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
}
if (mask & AE_READABLE) EV_SET(evs + nch++, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
if (mask & AE_WRITABLE) EV_SET(evs + nch++, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);

kevent(state->kqfd, evs, nch, NULL, 0, NULL);
}

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
Expand Down
9 changes: 5 additions & 4 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,8 @@ struct client *createAOFClient(void) {
* background processing there is a chance that the
* command execution order will be violated.
*/
c->flags = CLIENT_DENY_BLOCKING;
c->raw_flag = 0;
c->flag.deny_blocking = 1;

/* We set the fake client as a replica waiting for the synchronization
* so that the server will not try to send replies to this client. */
Expand Down Expand Up @@ -1536,7 +1537,7 @@ int loadSingleAppendOnlyFile(char *filename) {

/* Run the command in the context of a fake client */
fakeClient->cmd = fakeClient->lastcmd = cmd;
if (fakeClient->flags & CLIENT_MULTI && fakeClient->cmd->proc != execCommand) {
if (fakeClient->flag.multi && fakeClient->cmd->proc != execCommand) {
/* Note: we don't have to attempt calling evalGetCommandFlags,
* since this is AOF, the checks in processCommand are not made
* anyway.*/
Expand All @@ -1549,7 +1550,7 @@ int loadSingleAppendOnlyFile(char *filename) {
serverAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);

/* The fake client should never get blocked */
serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);
serverAssert(fakeClient->flag.blocked == 0);

/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
Expand All @@ -1562,7 +1563,7 @@ int loadSingleAppendOnlyFile(char *filename) {
* If the client is in the middle of a MULTI/EXEC, handle it as it was
* a short read, even if technically the protocol is correct: we want
* to remove the unprocessed tail and continue. */
if (fakeClient->flags & CLIENT_MULTI) {
if (fakeClient->flag.multi) {
serverLog(LL_WARNING, "Revert incomplete MULTI/EXEC transaction in AOF file %s", filename);
valid_up_to = valid_before_multi;
goto uxeof;
Expand Down
Loading

0 comments on commit d6e5b31

Please sign in to comment.