Skip to content

Commit

Permalink
Import-mode: Avoid expiration and eviction during data syncing (valke…
Browse files Browse the repository at this point in the history
…y-io#1185)

New config: `import-mode (yes|no)`

New command: `CLIENT IMPORT-SOURCE (ON|OFF)`

The config, when set to `yes`, disables eviction and deletion of expired
keys, except for commands coming from a client which has marked itself
as an import-source, the data source when importing data from another
node, using the CLIENT IMPORT-SOURCE command.

When we sync data from the source Valkey to the destination Valkey using
some sync tools like
[redis-shake](https://github.com/tair-opensource/RedisShake), the
destination Valkey can perform expiration and eviction, which may cause
data corruption. This problem has been discussed in
redis/redis#9760 (reply in thread)
and Redis already have a solution. But in Valkey we haven't fixed it by
now.

E.g. we call `set key 1 ex 1` on the source server and transfer this
command to the destination server. Then we call `incr key` on the source
server before the key expired, we will have a key on the source server
with a value of 2. But when the command arrived at the destination
server, the key may be expired and has deleted. So we will have a key on
the destination server with a value of 1, which is inconsistent with the
source server.

In standalone mode, we can use writable replica to simplify the sync
process. However, in cluster mode, we still need a sync tool to help us
transfer the source data to the destination. The sync tool usually work
as a normal client and the destination works as a primary which keep
expiration and eviction.

In this PR, we add a new mode named 'import-mode'. In this mode, server
stop expiration and eviction just like a replica. Notice that this mode
exists only in sync state to avoid data inconsistency caused by
expiration and eviction. Import mode only takes effect on the primary.
Sync tools can mark their clients as an import source by `CLIENT
IMPORT-SOURCE`, which work like a client from primary and can visit
expired keys in `lookupkey`.

**Notice: during the migration, other clients, apart from the import
source, should not access the data imported by import source.**

---------

Signed-off-by: lvyanqi.lyq <[email protected]>
Signed-off-by: Yanqi Lv <[email protected]>
Co-authored-by: Madelyn Olson <[email protected]>
  • Loading branch information
lyq2333 and madolson authored Nov 19, 2024
1 parent ee386c9 commit 4986310
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 10 deletions.
29 changes: 29 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,34 @@ struct COMMAND_ARG CLIENT_CAPA_Args[] = {
#define CLIENT_ID_Keyspecs NULL
#endif

/********** CLIENT IMPORT_SOURCE ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLIENT IMPORT_SOURCE history */
#define CLIENT_IMPORT_SOURCE_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLIENT IMPORT_SOURCE tips */
#define CLIENT_IMPORT_SOURCE_Tips NULL
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLIENT IMPORT_SOURCE key specs */
#define CLIENT_IMPORT_SOURCE_Keyspecs NULL
#endif

/* CLIENT IMPORT_SOURCE enabled argument table */
struct COMMAND_ARG CLIENT_IMPORT_SOURCE_enabled_Subargs[] = {
{MAKE_ARG("on",ARG_TYPE_PURE_TOKEN,-1,"ON",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("off",ARG_TYPE_PURE_TOKEN,-1,"OFF",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLIENT IMPORT_SOURCE argument table */
struct COMMAND_ARG CLIENT_IMPORT_SOURCE_Args[] = {
{MAKE_ARG("enabled",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLIENT_IMPORT_SOURCE_enabled_Subargs},
};

/********** CLIENT INFO ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -1630,6 +1658,7 @@ struct COMMAND_STRUCT CLIENT_Subcommands[] = {
{MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)},
{MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)},
{MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)},
{MAKE_CMD("import-source","Mark this client as an import source when server is in import mode.","O(1)","8.1.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args},
{MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)},
{MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args},
{MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args},
Expand Down
40 changes: 40 additions & 0 deletions src/commands/client-import-source.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"IMPORT-SOURCE": {
"summary": "Mark this client as an import source when server is in import mode.",
"complexity": "O(1)",
"group": "connection",
"since": "8.1.0",
"arity": 3,
"container": "CLIENT",
"function": "clientCommand",
"command_flags": [
"NOSCRIPT",
"LOADING",
"STALE"
],
"acl_categories": [
"CONNECTION"
],
"reply_schema": {
"const": "OK"
},
"arguments": [
{
"name": "enabled",
"type": "oneof",
"arguments": [
{
"name": "on",
"type": "pure-token",
"token": "ON"
},
{
"name": "off",
"type": "pure-token",
"token": "OFF"
}
]
}
]
}
}
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3139,6 +3139,7 @@ standardConfig static_configs[] = {
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
21 changes: 20 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ robj *dbRandomKey(serverDb *db) {
key = dictGetKey(de);
keyobj = createStringObject(key, sdslen(key));
if (dbFindExpiresWithDictIndex(db, key, randomDictIndex)) {
if (allvolatile && server.primary_host && --maxtries == 0) {
if (allvolatile && (server.primary_host || server.import_mode) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
* expired in the repilca, so the function cannot stop because
Expand Down Expand Up @@ -1821,6 +1821,25 @@ keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int di
if (server.primary_host != NULL) {
if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
} else if (server.import_mode) {
/* If we are running in the import mode on a primary, instead of
* evicting the expired key from the database, we return ASAP:
* the key expiration is controlled by the import source that will
* send us synthesized DEL operations for expired keys. The
* exception is when write operations are performed on this server
* because it's a primary.
*
* Notice: other clients, apart from the import source, should not access
* the data imported by import source.
*
* Still we try to return the right information to the caller,
* that is, KEY_VALID if we think the key should still be valid,
* KEY_EXPIRED if we think the key is expired but don't want to delete it at this time.
*
* When receiving commands from the import source, keys are never considered
* expired. */
if (server.current_client && (server.current_client->flag.import_source)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
}

/* In some cases we're explicitly instructed to return an indication of a
Expand Down
4 changes: 2 additions & 2 deletions src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ int performEvictions(void) {
goto update_metrics;
}

if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) {
result = EVICT_FAIL; /* We need to free memory, but policy forbids. */
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || (iAmPrimary() && server.import_mode)) {
result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in import mode. */
goto update_metrics;
}

Expand Down
7 changes: 5 additions & 2 deletions src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,11 @@ int checkAlreadyExpired(long long when) {
* of a replica instance.
*
* Instead we add the already expired key to the database with expire time
* (possibly in the past) and wait for an explicit DEL from the primary. */
return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host);
* (possibly in the past) and wait for an explicit DEL from the primary.
*
* If the server is a primary and in the import mode, we also add the already
* expired key and wait for an explicit DEL from the import source. */
return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.import_mode);
}

#define EXPIRE_NX (1 << 0)
Expand Down
20 changes: 20 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -3585,6 +3585,10 @@ void clientCommand(client *c) {
" Protect current client connection from eviction.",
"NO-TOUCH (ON|OFF)",
" Will not touch LRU/LFU stats when this mode is on.",
"IMPORT-SOURCE (ON|OFF)",
" Mark this connection as an import source if server.import_mode is true.",
" Sync tools can set their connections into 'import-source' state to visit",
" expired keys.",
NULL};
addReplyHelp(c, help);
} else if (!strcasecmp(c->argv[1]->ptr, "id") && c->argc == 2) {
Expand Down Expand Up @@ -4058,6 +4062,22 @@ void clientCommand(client *c) {
}
}
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "import-source")) {
/* CLIENT IMPORT-SOURCE ON|OFF */
if (!server.import_mode) {
addReplyError(c, "Server is not in import mode");
return;
}
if (!strcasecmp(c->argv[2]->ptr, "on")) {
c->flag.import_source = 1;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[2]->ptr, "off")) {
c->flag.import_source = 0;
addReply(c, shared.ok);
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
} else {
addReplySubcommandSyntaxError(c);
}
Expand Down
9 changes: 5 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1131,10 +1131,10 @@ void databasesCron(void) {
/* Expire keys by random sampling. Not required for replicas
* as primary will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmPrimary()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
if (!iAmPrimary()) {
expireReplicaKeys();
} else if (!server.import_mode) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
}
}

Expand Down Expand Up @@ -1727,7 +1727,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {

/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
if (server.active_expire_enabled && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
if (server.active_expire_enabled && !server.import_mode && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

if (moduleCount()) {
moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP, NULL);
Expand Down Expand Up @@ -2133,6 +2133,7 @@ void initServerConfig(void) {
server.extended_redis_compat = 0;
server.pause_cron = 0;
server.dict_resizing = 1;
server.import_mode = 0;

server.latency_tracking_info_percentiles_len = 3;
server.latency_tracking_info_percentiles = zmalloc(sizeof(double) * (server.latency_tracking_info_percentiles_len));
Expand Down
5 changes: 4 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,8 @@ typedef struct ClientFlags {
* knows that it does not need the cache and required a full sync. With this
* flag, we won't cache the primary in freeClient. */
uint64_t fake : 1; /* This is a fake client without a real connection. */
uint64_t reserved : 5; /* Reserved for future use */
uint64_t import_source : 1; /* This client is importing data to server and can visit expired key. */
uint64_t reserved : 4; /* Reserved for future use */
} ClientFlags;

typedef struct client {
Expand Down Expand Up @@ -2089,6 +2090,8 @@ struct valkeyServer {
char primary_replid[CONFIG_RUN_ID_SIZE + 1]; /* Primary PSYNC runid. */
long long primary_initial_offset; /* Primary PSYNC offset. */
int repl_replica_lazy_flush; /* Lazy FLUSHALL before loading DB? */
/* Import Mode */
int import_mode; /* If true, server is in import mode and forbid expiration and eviction. */
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */
int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */
Expand Down
74 changes: 74 additions & 0 deletions tests/unit/expire.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,80 @@ start_server {tags {"expire"}} {
close_replication_stream $repl
assert_equal [r debug set-active-expire 1] {OK}
} {} {needs:debug}

test {Import mode should forbid active expiration} {
r flushall

r config set import-mode yes
assert_equal [r client import-source on] {OK}

r set foo1 bar PX 1
r set foo2 bar PX 1
after 100

assert_equal [r dbsize] {2}

assert_equal [r client import-source off] {OK}
r config set import-mode no

# Verify all keys have expired
wait_for_condition 40 100 {
[r dbsize] eq 0
} else {
fail "Keys did not actively expire."
}
}

test {Import mode should forbid lazy expiration} {
r flushall
r debug set-active-expire 0

r config set import-mode yes
assert_equal [r client import-source on] {OK}

r set foo1 1 PX 1
after 10

r get foo1
assert_equal [r dbsize] {1}

assert_equal [r client import-source off] {OK}
r config set import-mode no

r get foo1

assert_equal [r dbsize] {0}

assert_equal [r debug set-active-expire 1] {OK}
} {} {needs:debug}

test {RANDOMKEY can return expired key in import mode} {
r flushall

r config set import-mode yes
assert_equal [r client import-source on] {OK}

r set foo1 bar PX 1
after 10

set client [valkey [srv "host"] [srv "port"] 0 $::tls]
if {!$::singledb} {
$client select 9
}
assert_equal [$client ttl foo1] {-2}

assert_equal [r randomkey] {foo1}

assert_equal [r client import-source off] {OK}
r config set import-mode no

# Verify all keys have expired
wait_for_condition 40 100 {
[r dbsize] eq 0
} else {
fail "Keys did not actively expire."
}
}
}

start_cluster 1 0 {tags {"expire external:skip cluster"}} {
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/maxmemory.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,21 @@ start_server {tags {"maxmemory" "external:skip"}} {
assert {[r object freq foo] == 5}
}
}

start_server {tags {"maxmemory" "external:skip"}} {
test {Import mode should forbid eviction} {
r set key val
r config set import-mode yes
assert_equal [r client import-source on] {OK}
r config set maxmemory-policy allkeys-lru
r config set maxmemory 1

assert_equal [r dbsize] {1}
assert_error {OOM command not allowed*} {r set key1 val1}

assert_equal [r client import-source off] {OK}
r config set import-mode no

assert_equal [r dbsize] {0}
}
}
7 changes: 7 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,13 @@ replica-priority 100
#
# replica-ignore-disk-write-errors no

# Make the primary forbid expiration and eviction.
# This is useful for sync tools, because expiration and eviction may cause the data corruption.
# Sync tools can mark their connections as importing source by CLIENT IMPORT-SOURCE.
# NOTICE: Clients should avoid writing the same key on the source server and the destination server.
#
# import-mode no

# -----------------------------------------------------------------------------
# By default, Sentinel includes all replicas in its reports. A replica
# can be excluded from Sentinel's announcements. An unannounced replica
Expand Down

0 comments on commit 4986310

Please sign in to comment.