Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/unstable' into migrate_cluster…
Browse files Browse the repository at this point in the history
…test

Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed May 8, 2024
2 parents 8906faa + 4e944ce commit a9374f8
Show file tree
Hide file tree
Showing 37 changed files with 1,851 additions and 850 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ jobs:
run: |
apt-get update && apt-get install -y make gcc-13
update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-13 100
make CC=gcc SERVER_CFLAGS='-Werror -DSERVER_TEST -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3'
make all-with-unit-tests CC=gcc OPT=-O3 SERVER_CFLAGS='-Werror -DSERVER_TEST -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3'
- name: testprep
run: apt-get install -y tcl8.6 tclx procps
- name: test
Expand All @@ -121,7 +121,7 @@ jobs:
run: ./src/valkey-server test all --accurate
- name: new unit tests
if: true && !contains(github.event.inputs.skiptests, 'unittest')
run: make valkey-unit-tests && ./src/valkey-unit-tests --accurate
run: ./src/valkey-unit-tests --accurate

test-ubuntu-libc-malloc:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -598,7 +598,7 @@ jobs:
repository: ${{ env.GITHUB_REPOSITORY }}
ref: ${{ env.GITHUB_HEAD_REF }}
- name: make
run: make all-with-unit-tests SANITIZER=address SERVER_CFLAGS='-DSERVER_TEST -Werror -DDEBUG_ASSERTIONS'
run: make all-with-unit-tests OPT=-O3 SANITIZER=address SERVER_CFLAGS='-DSERVER_TEST -Werror -DDEBUG_ASSERTIONS'
- name: testprep
# Work around ASAN issue, see https://github.com/google/sanitizers/issues/1716
run: |
Expand Down Expand Up @@ -650,7 +650,7 @@ jobs:
repository: ${{ env.GITHUB_REPOSITORY }}
ref: ${{ env.GITHUB_HEAD_REF }}
- name: make
run: make all-with-unit-tests SANITIZER=undefined SERVER_CFLAGS='-DSERVER_TEST -Werror' LUA_DEBUG=yes # we (ab)use this flow to also check Lua C API violations
run: make all-with-unit-tests OPT=-O3 SANITIZER=undefined SERVER_CFLAGS='-DSERVER_TEST -Werror' LUA_DEBUG=yes # we (ab)use this flow to also check Lua C API violations
- name: testprep
run: |
sudo apt-get update
Expand Down
2 changes: 1 addition & 1 deletion src/anet.c
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ int anetUnixServer(char *err, char *path, mode_t perm, int backlog)
}

int type = SOCK_STREAM;
int flags = ANET_SOCKET_CLOEXEC | ANET_SOCKET_NONBLOCK | ANET_SOCKET_REUSEADDR;
int flags = ANET_SOCKET_CLOEXEC | ANET_SOCKET_NONBLOCK;
if ((s = anetCreateSocket(err,AF_LOCAL,type,0,flags)) == ANET_ERR)
return ANET_ERR;

Expand Down
36 changes: 24 additions & 12 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_err
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
c->lastcmd->calls++;
c->commands_processed++;
server.stat_numcommands++;
if (had_errors)
c->lastcmd->failed_calls++;
Expand Down Expand Up @@ -186,7 +187,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF ||
c->bstate.btype == BLOCKED_WAIT_PREREPL) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
Expand All @@ -202,7 +204,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN &&
c->bstate.btype != BLOCKED_WAIT_PREREPL) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
Expand Down Expand Up @@ -240,6 +243,8 @@ void replyToBlockedClientTimedOut(client *c) {
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down Expand Up @@ -597,23 +602,30 @@ static void handleClientsBlockedOnKey(readyList *rl) {
}
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
blockClient(c, btype);
}

/* block a client due to pre-replication */
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0);
c->flags |= CLIENT_PENDING_COMMAND;
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0);
}

/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAITAOF);
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal);
}

/* Postpone client from executing a command. For example the server might be busy
Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);

/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
Expand Down
Loading

0 comments on commit a9374f8

Please sign in to comment.