Skip to content

Commit

Permalink
Cherrypick bug fixes into 7.2 branch (#624)
Browse files Browse the repository at this point in the history
Integrate critical fixes for Valkey 7.2

Includes commits:

-
b3aaa0a
-
1f00c95
-
492021d
-
5fdaa53
-
763827c
-
da727ad

Fixes #561

---------

Signed-off-by: Jacob Murphy <[email protected]>
Co-authored-by: bentotten <[email protected]>
Co-authored-by: debing.sun <[email protected]>
Co-authored-by: Matthew Douglass <[email protected]>
Co-authored-by: Binbin <[email protected]>
Co-authored-by: Oran Agra <[email protected]>
Co-authored-by: LiiNen <[email protected]>
Co-authored-by: Yanqi Lv <[email protected]>
  • Loading branch information
8 people authored Jun 13, 2024
1 parent 8c2a76f commit 8b7db30
Show file tree
Hide file tree
Showing 22 changed files with 401 additions and 46 deletions.
4 changes: 3 additions & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ aofInfo *aofInfoDup(aofInfo *orig) {
return ai;
}

/* Format aofInfo as a string and it will be a line in the manifest. */
/* Format aofInfo as a string and it will be a line in the manifest.
*
* When update this format, make sure to update valkey-check-aof as well. */
sds aofInfoFormat(sds buf, aofInfo *ai) {
sds filename_repr = NULL;

Expand Down
8 changes: 6 additions & 2 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,12 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;

c->bstate.timeout = timeout;
if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) {
/* If the client is re-processing the command, we do not set the timeout
* because we need to retain the client's original timeout. */
c->bstate.timeout = timeout;
}

for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dictionary ignore it. */
if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) {
Expand All @@ -392,7 +397,6 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
listAddNodeTail(l,c);
dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l));


/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
* wants to be awakened if key is deleted (like XREADGROUP) */
if (unblock_on_nokey) {
Expand Down
9 changes: 6 additions & 3 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ void clusterInit(void) {
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;
server.cluster->state = CLUSTER_FAIL;
server.cluster->size = 1;
server.cluster->size = 0;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType);
server.cluster->shards = dictCreate(&clusterSdsToListType);
Expand Down Expand Up @@ -4781,10 +4781,13 @@ void clusterCron(void) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= CLUSTER_NODE_PFAIL;
update_state = 1;
if (nodeIsMaster(myself) && server.cluster->size == 1) {
markNodeAsFailingIfNeeded(node);
} else {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing", node->name);
}
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -7707,15 +7707,15 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx, ValkeyModuleC
bc->background_timer = 0;
bc->background_duration = 0;

c->bstate.timeout = 0;
mstime_t timeout = 0;
if (timeout_ms) {
mstime_t now = mstime();
if (timeout_ms > LLONG_MAX - now) {
if (timeout_ms > LLONG_MAX - now) {
c->bstate.module_blocked_handle = NULL;
addReplyError(c, "timeout is out of range"); /* 'timeout_ms+now' would overflow */
return bc;
}
c->bstate.timeout = timeout_ms + now;
timeout = timeout_ms + now;
}

if (islua || ismulti) {
Expand All @@ -7731,8 +7731,9 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx, ValkeyModuleC
addReplyError(c, "Clients undergoing module based authentication can only be blocked on auth");
} else {
if (keys) {
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,c->bstate.timeout,flags&VALKEYMODULE_BLOCK_UNBLOCK_DELETED);
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&VALKEYMODULE_BLOCK_UNBLOCK_DELETED);
} else {
c->bstate.timeout = timeout;
blockClient(c,BLOCKED_MODULE);
}
}
Expand Down
82 changes: 64 additions & 18 deletions src/quicklist.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name)
quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node);
void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm);

quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset, int after);
quicklistNode *_quicklistMergeNodes(quicklist *quicklist, quicklistNode *center);

/* Simple way to give quicklistEntry structs default values with one call. */
#define initEntry(e) \
do { \
Expand Down Expand Up @@ -378,6 +381,15 @@ REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,
quicklistCompressNode(reverse);
}

/* This macro is used to compress a node.
*
* If the 'recompress' flag of the node is true, we compress it directly without
* checking whether it is within the range of compress depth.
* However, it's important to ensure that the 'recompress' flag of head and tail
* is always false, as we always assume that head and tail are not compressed.
*
* If the 'recompress' flag of the node is false, we check whether the node is
* within the range of compress depth before compressing it. */
#define quicklistCompress(_ql, _node) \
do { \
if ((_node)->recompress) \
Expand Down Expand Up @@ -529,19 +541,25 @@ REDIS_STATIC int _quicklistNodeAllowMerge(const quicklistNode *a,
(node)->sz = lpBytes((node)->entry); \
} while (0)

static quicklistNode* __quicklistCreatePlainNode(void *value, size_t sz) {
static quicklistNode* __quicklistCreateNode(int container, void *value, size_t sz) {
quicklistNode *new_node = quicklistCreateNode();
new_node->entry = zmalloc(sz);
new_node->container = QUICKLIST_NODE_CONTAINER_PLAIN;
memcpy(new_node->entry, value, sz);
new_node->container = container;
if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
new_node->entry = zmalloc(sz);
memcpy(new_node->entry, value, sz);
} else {
new_node->entry = lpPrepend(lpNew(0), value, sz);
}
new_node->sz = sz;
new_node->count++;
return new_node;
}

static void __quicklistInsertPlainNode(quicklist *quicklist, quicklistNode *old_node,
void *value, size_t sz, int after) {
__quicklistInsertNode(quicklist, old_node, __quicklistCreatePlainNode(value, sz), after);
void *value, size_t sz, int after)
{
quicklistNode *new_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz);
__quicklistInsertNode(quicklist, old_node, new_node, after);
quicklist->count++;
}

Expand Down Expand Up @@ -741,9 +759,13 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
void *data, size_t sz)
{
quicklist* quicklist = iter->quicklist;
quicklistNode *node = entry->node;
unsigned char *newentry;

if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz))) {
entry->node->entry = lpReplace(entry->node->entry, &entry->zi, data, sz);
if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz) &&
(newentry = lpReplace(entry->node->entry, &entry->zi, data, sz)) != NULL))
{
entry->node->entry = newentry;
quicklistNodeUpdateSz(entry->node);
/* quicklistNext() and quicklistGetIteratorEntryAtIdx() provide an uncompressed node */
quicklistCompress(quicklist, entry->node);
Expand All @@ -758,17 +780,37 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
quicklistInsertAfter(iter, entry, data, sz);
__quicklistDelNode(quicklist, entry->node);
}
} else {
entry->node->dont_compress = 1; /* Prevent compression in quicklistInsertAfter() */
quicklistInsertAfter(iter, entry, data, sz);
} else { /* The node is full or data is a large element */
quicklistNode *split_node = NULL, *new_node;
node->dont_compress = 1; /* Prevent compression in __quicklistInsertNode() */

/* If the entry is not at the tail, split the node at the entry's offset. */
if (entry->offset != node->count - 1 && entry->offset != -1)
split_node = _quicklistSplitNode(node, entry->offset, 1);

/* Create a new node and insert it after the original node.
* If the original node was split, insert the split node after the new node. */
new_node = __quicklistCreateNode(isLargeElement(sz) ?
QUICKLIST_NODE_CONTAINER_PLAIN : QUICKLIST_NODE_CONTAINER_PACKED, data, sz);
__quicklistInsertNode(quicklist, node, new_node, 1);
if (split_node) __quicklistInsertNode(quicklist, new_node, split_node, 1);
quicklist->count++;

/* Delete the replaced element. */
if (entry->node->count == 1) {
__quicklistDelNode(quicklist, entry->node);
} else {
unsigned char *p = lpSeek(entry->node->entry, -1);
quicklistDelIndex(quicklist, entry->node, &p);
entry->node->dont_compress = 0; /* Re-enable compression */
quicklistCompress(quicklist, entry->node);
quicklistCompress(quicklist, entry->node->next);
new_node = _quicklistMergeNodes(quicklist, new_node);
/* We can't know if the current node and its sibling nodes are correctly compressed,
* and we don't know if they are within the range of compress depth, so we need to
* use quicklistCompress() for compression, which checks if node is within compress
* depth before compressing. */
quicklistCompress(quicklist, new_node);
quicklistCompress(quicklist, new_node->prev);
if (new_node->next) quicklistCompress(quicklist, new_node->next);
}
}

Expand Down Expand Up @@ -826,6 +868,8 @@ REDIS_STATIC quicklistNode *_quicklistListpackMerge(quicklist *quicklist,
}
keep->count = lpLength(keep->entry);
quicklistNodeUpdateSz(keep);
keep->recompress = 0; /* Prevent 'keep' from being recompressed if
* it becomes head or tail after merging. */

nokeep->count = 0;
__quicklistDelNode(quicklist, nokeep);
Expand All @@ -844,9 +888,10 @@ REDIS_STATIC quicklistNode *_quicklistListpackMerge(quicklist *quicklist,
* - (center->next, center->next->next)
* - (center->prev, center)
* - (center, center->next)
*
* Returns the new 'center' after merging.
*/
REDIS_STATIC void _quicklistMergeNodes(quicklist *quicklist,
quicklistNode *center) {
REDIS_STATIC quicklistNode *_quicklistMergeNodes(quicklist *quicklist, quicklistNode *center) {
int fill = quicklist->fill;
quicklistNode *prev, *prev_prev, *next, *next_next, *target;
prev = prev_prev = next = next_next = target = NULL;
Expand Down Expand Up @@ -886,8 +931,9 @@ REDIS_STATIC void _quicklistMergeNodes(quicklist *quicklist,

/* Use result of center merge (or original) to merge with next node. */
if (_quicklistNodeAllowMerge(target, target->next, fill)) {
_quicklistListpackMerge(quicklist, target, target->next);
target = _quicklistListpackMerge(quicklist, target, target->next);
}
return target;
}

/* Split 'node' into two parts, parameterized by 'offset' and 'after'.
Expand Down Expand Up @@ -1002,7 +1048,7 @@ REDIS_STATIC void _quicklistInsert(quicklistIter *iter, quicklistEntry *entry,
} else {
quicklistDecompressNodeForUse(node);
new_node = _quicklistSplitNode(node, entry->offset, after);
quicklistNode *entry_node = __quicklistCreatePlainNode(value, sz);
quicklistNode *entry_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz);
__quicklistInsertNode(quicklist, node, entry_node, after);
__quicklistInsertNode(quicklist, entry_node, new_node, after);
quicklist->count++;
Expand Down Expand Up @@ -3224,7 +3270,7 @@ int quicklistTest(int argc, char *argv[], int flags) {
memcpy(s, "helloworld", 10);
memcpy(s + sz - 10, "1234567890", 10);

quicklistNode *node = __quicklistCreatePlainNode(s, sz);
quicklistNode *node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, s, sz);

/* Just to avoid triggering the assertion in __quicklistCompressNode(),
* it disables the passing of quicklist head or tail node. */
Expand Down
5 changes: 1 addition & 4 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@
#define RDB_TYPE_MODULE_PRE_GA 6 /* Used in 4.0 release candidates */
#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without
the generating module being loaded. */
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */

/* Object types for encoded objects. */
#define RDB_TYPE_HASH_ZIPMAP 9
#define RDB_TYPE_LIST_ZIPLIST 10
#define RDB_TYPE_SET_INTSET 11
Expand All @@ -97,7 +94,7 @@
#define RDB_TYPE_STREAM_LISTPACKS_2 19
#define RDB_TYPE_SET_LISTPACK 20
#define RDB_TYPE_STREAM_LISTPACKS_3 21
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType(), and rdb_type_string[] */

/* Test if a type is an object type. */
#define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 21))
Expand Down
13 changes: 11 additions & 2 deletions src/script_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,17 @@ static robj **luaArgsToRedisArgv(lua_State *lua, int *argc, int *argv_len) {
/* We can't use lua_tolstring() for number -> string conversion
* since Lua uses a format specifier that loses precision. */
lua_Number num = lua_tonumber(lua,j+1);
obj_len = fpconv_dtoa((double)num, dbuf);
dbuf[obj_len] = '\0';
/* Integer printing function is much faster, check if we can safely use it.
* Since lua_Number is not explicitly an integer or a double, we need to make an effort
* to convert it as an integer when that's possible, since the string could later be used
* in a context that doesn't support scientific notation (e.g. 1e9 instead of 100000000). */
long long lvalue;
if (double2ll((double)num, &lvalue))
obj_len = ll2string(dbuf, sizeof(dbuf), lvalue);
else {
obj_len = fpconv_dtoa((double)num, dbuf);
dbuf[obj_len] = '\0';
}
obj_s = dbuf;
} else {
obj_s = (char*)lua_tolstring(lua,j+1,&obj_len);
Expand Down
10 changes: 9 additions & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3512,12 +3512,20 @@ void call(client *c, int flags) {
* re-processing and unblock the client.*/
c->flags |= CLIENT_EXECUTING_COMMAND;

/* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual
* processing of the command proc, the client is aware that it is being
* re-processed. */
if (reprocessing_command) c->flags |= CLIENT_REPROCESSING_COMMAND;

monotime monotonic_start = 0;
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
monotonic_start = getMonotonicUs();

c->cmd->proc(c);

/* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */
if (reprocessing_command) c->flags &= ~CLIENT_REPROCESSING_COMMAND;

exitExecutionUnit();

/* In case client is blocked after trying to execute the command,
Expand Down Expand Up @@ -3575,7 +3583,7 @@ void call(client *c, int flags) {

/* Send the command to clients in MONITOR mode if applicable,
* since some administrative commands are considered too dangerous to be shown.
* Other exceptions is a client which is unblocked and retring to process the command
* Other exceptions is a client which is unblocked and retrying to process the command
* or we are currently in the process of loading AOF. */
if (update_command_stats && !reprocessing_command &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) {
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
auth had been authenticated from the Module. */
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down
5 changes: 3 additions & 2 deletions src/t_zset.c
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,8 @@ unsigned long zsetLength(const robj *zobj) {
* and the value len hint indicates the approximate individual size of the added elements,
* they are used to determine the initial representation.
*
* If the hints are not known, and underestimation or 0 is suitable. */
* If the hints are not known, and underestimation or 0 is suitable.
* We should never pass a negative value because it will convert to a very large unsigned number. */
robj *zsetTypeCreate(size_t size_hint, size_t val_len_hint) {
if (size_hint <= server.zset_max_listpack_entries &&
val_len_hint <= server.zset_max_listpack_value)
Expand Down Expand Up @@ -3001,7 +3002,7 @@ static void zrangeResultFinalizeClient(zrange_result_handler *handler,
/* Result handler methods for storing the ZRANGESTORE to a zset. */
static void zrangeResultBeginStore(zrange_result_handler *handler, long length)
{
handler->dstobj = zsetTypeCreate(length, 0);
handler->dstobj = zsetTypeCreate(length >= 0 ? length : 0, 0);
}

static void zrangeResultEmitCBufferForStore(zrange_result_handler *handler,
Expand Down
10 changes: 9 additions & 1 deletion src/valkey-check-aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ int checkSingleAof(char *aof_filename, char *aof_filepath, int last_file, int fi
struct redis_stat sb;
if (redis_fstat(fileno(fp),&sb) == -1) {
printf("Cannot stat file: %s, aborting...\n", aof_filename);
fclose(fp);
exit(1);
}

Expand Down Expand Up @@ -343,6 +344,7 @@ int fileIsRDB(char *filepath) {
struct redis_stat sb;
if (redis_fstat(fileno(fp), &sb) == -1) {
printf("Cannot stat file: %s\n", filepath);
fclose(fp);
exit(1);
}

Expand Down Expand Up @@ -379,6 +381,7 @@ int fileIsManifest(char *filepath) {
struct redis_stat sb;
if (redis_fstat(fileno(fp), &sb) == -1) {
printf("Cannot stat file: %s\n", filepath);
fclose(fp);
exit(1);
}

Expand All @@ -395,15 +398,20 @@ int fileIsManifest(char *filepath) {
break;
} else {
printf("Cannot read file: %s\n", filepath);
fclose(fp);
exit(1);
}
}

/* Skip comments lines */
/* We will skip comments lines.
* At present, the manifest format is fixed, see aofInfoFormat.
* We will break directly as long as it encounters other items. */
if (buf[0] == '#') {
continue;
} else if (!memcmp(buf, "file", strlen("file"))) {
is_manifest = 1;
} else {
break;
}
}

Expand Down
Loading

0 comments on commit 8b7db30

Please sign in to comment.