Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support expiring hash keys #1421

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/proc_hash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ int proc_hdel(NetworkServer *net, Link *link, const Request &req, Response *resp
int proc_hclear(NetworkServer *net, Link *link, const Request &req, Response *resp){
CHECK_NUM_PARAMS(2);
SSDBServer *serv = (SSDBServer *)net->data;

const Bytes &name = req[1];
int64_t count = serv->ssdb->hclear(name);
serv->expiration->del_ttl(DataType::HASH, name);
resp->reply_int(0, count);

return 0;
Expand Down Expand Up @@ -297,7 +298,7 @@ int proc_hdecr(NetworkServer *net, Link *link, const Request &req, Response *res
int proc_hfix(NetworkServer *net, Link *link, const Request &req, Response *resp){
SSDBServer *serv = (SSDBServer *)net->data;
CHECK_NUM_PARAMS(2);

const Bytes &name = req[1];
int64_t ret = serv->ssdb->hfix(name);
resp->reply_int(ret, ret);
Expand Down
24 changes: 18 additions & 6 deletions src/proc_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ int proc_setx(NetworkServer *net, Link *link, const Request &req, Response *resp
resp->push_back("error");
return 0;
}
ret = serv->expiration->set_ttl(req[1], req[3].Int());
ret = serv->expiration->set_ttl(DataType::KV, req[1], req[3].Int());
if(ret == -1){
resp->push_back("error");
}else{
Expand All @@ -79,7 +79,10 @@ int proc_ttl(NetworkServer *net, Link *link, const Request &req, Response *resp)
CHECK_NUM_PARAMS(2);
CHECK_KV_KEY_RANGE(1);

int64_t ttl = serv->expiration->get_ttl(req[1]);
int64_t ttl = serv->expiration->get_ttl(DataType::KV, req[1]);
if(ttl == -1){
ttl = serv->expiration->get_ttl(DataType::HASH, req[1]);
}
resp->push_back("ok");
resp->push_back(str(ttl));
return 0;
Expand All @@ -94,7 +97,16 @@ int proc_expire(NetworkServer *net, Link *link, const Request &req, Response *re
std::string val;
int ret = serv->ssdb->get(req[1], &val);
if(ret == 1){
ret = serv->expiration->set_ttl(req[1], req[2].Int());
ret = serv->expiration->set_ttl(DataType::KV, req[1], req[2].Int());
if(ret != -1){
resp->push_back("ok");
resp->push_back("1");
}else{
resp->push_back("error");
}
return 0;
}else if(serv->ssdb->hsize(req[1]) != 0){
ret = serv->expiration->set_ttl(DataType::HASH, req[1], req[2].Int());
if(ret != -1){
resp->push_back("ok");
resp->push_back("1");
Expand Down Expand Up @@ -163,7 +175,7 @@ int proc_multi_del(NetworkServer *net, Link *link, const Request &req, Response
}else{
for(Request::const_iterator it=req.begin()+1; it!=req.end(); it++){
const Bytes key = *it;
serv->expiration->del_ttl(key);
serv->expiration->del_ttl(DataType::KV, key);
}
resp->reply_int(0, ret);
}
Expand Down Expand Up @@ -196,8 +208,8 @@ int proc_del(NetworkServer *net, Link *link, const Request &req, Response *resp)
if(ret == -1){
resp->push_back("error");
}else{
serv->expiration->del_ttl(req[1]);
serv->expiration->del_ttl(DataType::KV, req[1]);

resp->push_back("ok");
resp->push_back("1");
}
Expand Down
40 changes: 24 additions & 16 deletions src/ssdb/ttl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,43 +46,45 @@ void ExpirationHandler::stop(){
}
}

int ExpirationHandler::set_ttl(const Bytes &key, int64_t ttl){
int ExpirationHandler::set_ttl(char data_type, const Bytes &key, int64_t ttl){
int64_t expired = time_ms() + ttl * 1000;
std::string expire_str = str(expired);
int ret = ssdb->zset(this->list_name, key, expire_str);
std::string ttlkey = encode_ttl_key(data_type, key);
int ret = ssdb->zset(this->list_name, ttlkey.data(), expire_str);
if(ret == -1){
return -1;
}
if(expired < first_timeout){
first_timeout = expired;
}
std::string s_key = key.String();
if(!fast_keys.empty() && expired <= fast_keys.max_score()){
fast_keys.add(s_key, expired);
fast_keys.add(ttlkey, expired);
if(fast_keys.size() > BATCH_SIZE){
fast_keys.pop_back();
}
}else{
fast_keys.del(s_key);
fast_keys.del(ttlkey);
//log_debug("don't put in fast_keys");
}

return 0;
}

int ExpirationHandler::del_ttl(const Bytes &key){
int ExpirationHandler::del_ttl(char data_type, const Bytes &key){
// 这样用是有 bug 的, 虽然 fast_keys 为空, 不代表整个 ttl 队列为空
// if(!this->fast_keys.empty()){
if(first_timeout != INT64_MAX){
fast_keys.del(key.String());
ssdb->zdel(this->list_name, key);
std::string ttlkey = encode_ttl_key(data_type, key);
fast_keys.del(ttlkey);
ssdb->zdel(this->list_name, ttlkey.data());
}
return 0;
}

int64_t ExpirationHandler::get_ttl(const Bytes &key){
int64_t ExpirationHandler::get_ttl(char data_type, const Bytes &key){
std::string score;
if(ssdb->zget(this->list_name, key, &score) == 1){
std::string ttlkey = encode_ttl_key(data_type, key);
if(ssdb->zget(this->list_name, ttlkey.data(), &score) == 1){
int64_t ex = str_to_int64(score);
return (ex - time_ms())/1000;
}
Expand Down Expand Up @@ -134,10 +136,16 @@ void ExpirationHandler::expire_loop(){
std::string key;
if(this->fast_keys.front(&key, &score)){
this->first_timeout = score;

if(score <= time_ms()){
log_debug("expired %s", key.c_str());
ssdb->del(key);
const char data_type = key.at(0);
std::string real_key = key.substr(1, std::string::npos);
if(data_type == DataType::KV){
ssdb->del(real_key);
}else if(data_type == DataType::HASH){
ssdb->hclear(real_key);
}
ssdb->zdel(this->list_name, key);
this->fast_keys.pop_front();
}
Expand All @@ -146,7 +154,7 @@ void ExpirationHandler::expire_loop(){

void* ExpirationHandler::thread_func(void *arg){
ExpirationHandler *handler = (ExpirationHandler *)arg;

while(!handler->thread_quit){
bool need_sleep = false;
{
Expand All @@ -160,10 +168,10 @@ void* ExpirationHandler::thread_func(void *arg){
usleep(10 * 1000);
continue;
}

handler->expire_loop();
}

log_debug("ExpirationHandler thread quit");
handler->thread_quit = false;
return (void *)NULL;
Expand Down
15 changes: 12 additions & 3 deletions src/ssdb/ttl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ found in the LICENSE file.
#include "../util/sorted_set.h"
#include <string>

inline static
std::string encode_ttl_key(char data_type, const Bytes &key){
std::string buf;
buf.reserve(key.String().length() + 1);
buf.append(1, data_type);
buf.append(key.String());
return buf;
}

class ExpirationHandler
{
public:
Expand All @@ -22,10 +31,10 @@ class ExpirationHandler
// "In Redis 2.6 or older the command returns -1 if the key does not exist
// or if the key exist but has no associated expire. Starting with Redis 2.8.."
// I stick to Redis 2.6
int64_t get_ttl(const Bytes &key);
int64_t get_ttl(char data_type, const Bytes &key);
// The caller must hold mutex before calling set/del functions
int del_ttl(const Bytes &key);
int set_ttl(const Bytes &key, int64_t ttl);
int del_ttl(char data_type, const Bytes &key);
int set_ttl(char data_type, const Bytes &key, int64_t ttl);

private:
SSDB *ssdb;
Expand Down