Skip to content

Commit

Permalink
feat(hash): add the support of HSETEXPIRE command (HSET + EXPIRE) (#2750
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ltagliamonte-dd authored Jan 30, 2025
1 parent 88f1f3e commit f7c5688
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 5 deletions.
32 changes: 32 additions & 0 deletions src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "error_constants.h"
#include "scan_base.h"
#include "server/server.h"
#include "time_util.h"
#include "types/redis_hash.h"

namespace redis {
Expand Down Expand Up @@ -258,6 +259,36 @@ class CommandHMSet : public Commander {
std::vector<FieldValue> field_values_;
};

class CommandHSetExpire : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
ttl_ = GET_OR_RET(ParseInt<uint64_t>(args[2], 10));
if ((args.size() - 3) % 2 != 0) {
return {Status::RedisParseErr, "Invalid number of arguments: field-value pairs must be complete"};
}
for (size_t i = 3; i < args_.size(); i += 2) {
field_values_.emplace_back(args_[i], args_[i + 1]);
}
return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
uint64_t ret = 0;
redis::Hash hash_db(srv->storage, conn->GetNamespace());

auto s = hash_db.MSet(ctx, args_[1], field_values_, false, &ret, ttl_ * 1000 + util::GetTimeStampMS());
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::RESP_OK;
return Status::OK();
}

private:
std::vector<FieldValue> field_values_;
uint64_t ttl_ = 0;
};

class CommandHKeys : public Commander {
public:
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -448,6 +479,7 @@ REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1
MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandHMSet>("hset", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandHSetExpire>("hsetexpire", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandHSetNX>("hsetnx", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandHDel>("hdel", -3, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandHStrlen>("hstrlen", 3, "read-only", 1, 1, 1),
Expand Down
10 changes: 7 additions & 3 deletions src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,18 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const
}

rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const std::vector<FieldValue> &field_values,
bool nx, uint64_t *added_cnt) {
bool nx, uint64_t *added_cnt, uint64_t expire) {
*added_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);

HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;

bool ttl_updated = false;
if (expire > 0 && metadata.expire != expire) {
metadata.expire = expire;
ttl_updated = true;
}
int added = 0;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisHash);
Expand Down Expand Up @@ -279,7 +283,7 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st
if (!s.ok()) return s;
}

if (added > 0) {
if (added > 0 || ttl_updated) {
*added_cnt = added;
metadata.size += added;
std::string bytes;
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Hash : public SubKeyScanner {
rocksdb::Status IncrByFloat(engine::Context &ctx, const Slice &user_key, const Slice &field, double increment,
double *new_value);
rocksdb::Status MSet(engine::Context &ctx, const Slice &user_key, const std::vector<FieldValue> &field_values,
bool nx, uint64_t *added_cnt);
bool nx, uint64_t *added_cnt, uint64_t expire = 0);
rocksdb::Status RangeByLex(engine::Context &ctx, const Slice &user_key, const RangeLexSpec &spec,
std::vector<FieldValue> *field_values);
rocksdb::Status MGet(engine::Context &ctx, const Slice &user_key, const std::vector<Slice> &fields,
Expand Down
134 changes: 133 additions & 1 deletion tests/gocase/unit/type/hash/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (
"testing"
"time"

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/apache/kvrocks/tests/gocase/util"
)

func getKeys(hash map[string]string) []string {
Expand Down Expand Up @@ -118,6 +120,136 @@ var testHash = func(t *testing.T, configs util.KvrocksServerConfigs) {
require.Equal(t, int64(1), rdb.HSet(ctx, "hmsetmulti", "key1", "val1", "key3", "val3").Val())
})

t.Run("HSETEXPIRE wrong number of args", func(t *testing.T) {
pattern := ".*wrong number.*"
ttlStr := "3600"
testKey := "hsetKey"
r := rdb.Do(ctx, "hsetexpire", testKey, ttlStr)
util.ErrorRegexp(t, r.Err(), pattern)
})

t.Run("HSETEXPIRE incomplete pairs", func(t *testing.T) {
pattern := ".*field-value pairs must be complete.*"
ttlStr := "3600"
testKey := "hsetKey"
r := rdb.Do(ctx, "hsetexpire", testKey, ttlStr, "key1", "val1", "key2")
util.ErrorRegexp(t, r.Err(), pattern)
})

t.Run("HSET/HSETEXPIRE/HSETEXPIRE/persist update expire time", func(t *testing.T) {
ttlStr := "3600"
testKey := "hsetKeyUpdateTime"
// create an hash without expiration
r := rdb.Do(ctx, "hset", testKey, "key1", "val1", "key2", "val2")
require.NoError(t, r.Err())
noExp := rdb.ExpireTime(ctx, testKey)
// make sure there is not exp set on the key
assert.Equal(t, -1*time.Nanosecond, noExp.Val())
// validate we inserted the key/vals
values := rdb.HGetAll(ctx, testKey)
assert.Equal(t, 2, len(values.Val()))

// update the hash and add expiration
r = rdb.Do(ctx, "hsetexpire", testKey, ttlStr, "key3", "val3")
require.NoError(t, r.Err())
assert.Equal(t, "OK", r.Val())
firstExp := rdb.ExpireTime(ctx, testKey)
firstExpireTime := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Add(firstExp.Val()).Unix()
// validate there is exp set on the key
assert.NotEqual(t, -1, firstExpireTime)
assert.Greater(t, firstExpireTime, time.Now().Unix())
// validate we updated the key/vals
values = rdb.HGetAll(ctx, testKey)
assert.Equal(t, 3, len(values.Val()))

// update the has and expiration
time.Sleep(1 * time.Second)
r = rdb.Do(ctx, "hsetexpire", testKey, ttlStr, "key4", "val4")
require.NoError(t, r.Err())
assert.Equal(t, "OK", r.Val())
// validate there is exp set on the key and it is new
secondExp := rdb.ExpireTime(ctx, testKey)
secondExpireTime := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Add(secondExp.Val()).Unix()
assert.NotEqual(t, -1, secondExpireTime)
assert.Greater(t, secondExpireTime, time.Now().Unix())
assert.Greater(t, secondExpireTime, firstExpireTime)
// validate we updated the key/vals
values = rdb.HGetAll(ctx, testKey)
assert.Equal(t, 4, len(values.Val()))

//remove expiration on the key and verify
r = rdb.Do(ctx, "persist", testKey)
require.NoError(t, r.Err())
persist := rdb.ExpireTime(ctx, testKey)
assert.Equal(t, -1*time.Nanosecond, persist.Val())
// validate we still have the correct number of key/vals
values = rdb.HGetAll(ctx, testKey)
assert.Equal(t, 4, len(values.Val()))
})

t.Run("HSETEXPIRE/HLEN/EXPIRETIME - Small hash creation", func(t *testing.T) {
ttlStr := "3600"
testKey := "hsetexsmallhash"
hsetExSmallHash := make(map[string]string)
for i := 0; i < 8; i++ {
key := "__avoid_collisions__" + util.RandString(0, 8, util.Alpha)
val := "__avoid_collisions__" + util.RandString(0, 8, util.Alpha)
if _, ok := hsetExSmallHash[key]; ok {
i--
}
rdb.Do(ctx, "hsetexpire", testKey, ttlStr, key, val)
hsetExSmallHash[key] = val
}
require.Equal(t, int64(8), rdb.HLen(ctx, testKey).Val())
val := rdb.ExpireTime(ctx, testKey).Val()
expireTime := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Add(val).Unix()
require.Greater(t, expireTime, time.Now().Unix())
})

t.Run("HSETEXPIRE/HLEN/EXPIRETIME - Big hash creation", func(t *testing.T) {
ttlStr := "3600"
testKey := "hsetexbighash"
hsetExBigHash := make(map[string]string)
for i := 0; i < 1024; i++ {
key := "__avoid_collisions__" + util.RandString(0, 8, util.Alpha)
val := "__avoid_collisions__" + util.RandString(0, 8, util.Alpha)
if _, ok := hsetExBigHash[key]; ok {
i--
}
rdb.Do(ctx, "hsetexpire", testKey, ttlStr, key, val)
hsetExBigHash[key] = val
}
require.Equal(t, int64(1024), rdb.HLen(ctx, testKey).Val())
val := rdb.ExpireTime(ctx, testKey).Val()
expireTime := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Add(val).Unix()
require.Greater(t, expireTime, time.Now().Unix())
})

t.Run("HSETEXPIRE/HLEN/EXPIRETIME - Multi field-value pairs creation", func(t *testing.T) {
ttlStr := "3600"
testKey := "hsetexbighashPair"
hsetExBigHash := make(map[string]string)
cmd := []string{"hsetexpire", testKey, ttlStr}
for i := 0; i < 10; i++ {
key := "__avoid_collisions__" + util.RandString(0, 8, util.Alpha)
val := "__avoid_collisions__" + util.RandString(0, 8, util.Alpha)
if _, ok := hsetExBigHash[key]; ok {
i--
}
cmd = append(cmd, key, val)
hsetExBigHash[key] = val
}
args := make([]interface{}, len(cmd))
for i, v := range cmd {
args[i] = v
}
rdb.Do(ctx, args...)
require.Equal(t, int64(10), rdb.HLen(ctx, testKey).Val())
val := rdb.ExpireTime(ctx, testKey).Val()
expireTime := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Add(val).Unix()
require.Greater(t, expireTime, time.Now().Unix())
})

t.Run("HGET against the small hash", func(t *testing.T) {
var err error
for key, val := range smallhash {
Expand Down

0 comments on commit f7c5688

Please sign in to comment.