diff --git a/src/commands.def b/src/commands.def index 791b30d540..ecc77126af 100644 --- a/src/commands.def +++ b/src/commands.def @@ -1230,6 +1230,34 @@ struct COMMAND_ARG CLIENT_CAPA_Args[] = { #define CLIENT_ID_Keyspecs NULL #endif +/********** CLIENT IMPORT_SOURCE ********************/ + +#ifndef SKIP_CMD_HISTORY_TABLE +/* CLIENT IMPORT_SOURCE history */ +#define CLIENT_IMPORT_SOURCE_History NULL +#endif + +#ifndef SKIP_CMD_TIPS_TABLE +/* CLIENT IMPORT_SOURCE tips */ +#define CLIENT_IMPORT_SOURCE_Tips NULL +#endif + +#ifndef SKIP_CMD_KEY_SPECS_TABLE +/* CLIENT IMPORT_SOURCE key specs */ +#define CLIENT_IMPORT_SOURCE_Keyspecs NULL +#endif + +/* CLIENT IMPORT_SOURCE enabled argument table */ +struct COMMAND_ARG CLIENT_IMPORT_SOURCE_enabled_Subargs[] = { +{MAKE_ARG("on",ARG_TYPE_PURE_TOKEN,-1,"ON",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("off",ARG_TYPE_PURE_TOKEN,-1,"OFF",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +}; + +/* CLIENT IMPORT_SOURCE argument table */ +struct COMMAND_ARG CLIENT_IMPORT_SOURCE_Args[] = { +{MAKE_ARG("enabled",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLIENT_IMPORT_SOURCE_enabled_Subargs}, +}; + /********** CLIENT INFO ********************/ #ifndef SKIP_CMD_HISTORY_TABLE @@ -1630,6 +1658,7 @@ struct COMMAND_STRUCT CLIENT_Subcommands[] = { {MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)}, {MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)}, {MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)}, +{MAKE_CMD("import-source","Mark this client as an import source when server is in import mode.","O(1)","8.1.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, {MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)}, {MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args}, {MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args}, diff --git a/src/commands/client-import-source.json b/src/commands/client-import-source.json new file mode 100644 index 0000000000..113c07d70a --- /dev/null +++ b/src/commands/client-import-source.json @@ -0,0 +1,40 @@ +{ + "IMPORT-SOURCE": { + "summary": "Mark this client as an import source when server is in import mode.", + "complexity": "O(1)", + "group": "connection", + "since": "8.1.0", + "arity": 3, + "container": "CLIENT", + "function": "clientCommand", + "command_flags": [ + "NOSCRIPT", + "LOADING", + "STALE" + ], + "acl_categories": [ + "CONNECTION" + ], + "reply_schema": { + "const": "OK" + }, + "arguments": [ + { + "name": "enabled", + "type": "oneof", + "arguments": [ + { + "name": "on", + "type": "pure-token", + "token": "ON" + }, + { + "name": "off", + "type": "pure-token", + "token": "OFF" + } + ] + } + ] + } +} \ No newline at end of file diff --git a/src/config.c b/src/config.c index 15fec15276..c4009adefa 100644 --- a/src/config.c +++ b/src/config.c @@ -3139,6 +3139,7 @@ standardConfig static_configs[] = { createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL), createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), + createBoolConfig("import-mode", NULL, MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/db.c b/src/db.c index b59c7727b2..10d4a04091 100644 --- a/src/db.c +++ b/src/db.c @@ -385,7 +385,7 @@ robj *dbRandomKey(serverDb *db) { key = dictGetKey(de); keyobj = createStringObject(key, sdslen(key)); if (dbFindExpiresWithDictIndex(db, key, randomDictIndex)) { - if (allvolatile && server.primary_host && --maxtries == 0) { + if (allvolatile && (server.primary_host || server.import_mode) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically * expired in the repilca, so the function cannot stop because @@ -1821,6 +1821,25 @@ keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int di if (server.primary_host != NULL) { if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID; if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; + } else if (server.import_mode) { + /* If we are running in the import mode on a primary, instead of + * evicting the expired key from the database, we return ASAP: + * the key expiration is controlled by the import source that will + * send us synthesized DEL operations for expired keys. The + * exception is when write operations are performed on this server + * because it's a primary. + * + * Notice: other clients, apart from the import source, should not access + * the data imported by import source. + * + * Still we try to return the right information to the caller, + * that is, KEY_VALID if we think the key should still be valid, + * KEY_EXPIRED if we think the key is expired but don't want to delete it at this time. + * + * When receiving commands from the import source, keys are never considered + * expired. */ + if (server.current_client && (server.current_client->flag.import_source)) return KEY_VALID; + if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; } /* In some cases we're explicitly instructed to return an indication of a diff --git a/src/evict.c b/src/evict.c index 5e4b6220eb..5208328b32 100644 --- a/src/evict.c +++ b/src/evict.c @@ -546,8 +546,8 @@ int performEvictions(void) { goto update_metrics; } - if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) { - result = EVICT_FAIL; /* We need to free memory, but policy forbids. */ + if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || (iAmPrimary() && server.import_mode)) { + result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in import mode. */ goto update_metrics; } diff --git a/src/expire.c b/src/expire.c index 928bb58d86..c22df1ef86 100644 --- a/src/expire.c +++ b/src/expire.c @@ -520,8 +520,11 @@ int checkAlreadyExpired(long long when) { * of a replica instance. * * Instead we add the already expired key to the database with expire time - * (possibly in the past) and wait for an explicit DEL from the primary. */ - return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host); + * (possibly in the past) and wait for an explicit DEL from the primary. + * + * If the server is a primary and in the import mode, we also add the already + * expired key and wait for an explicit DEL from the import source. */ + return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.import_mode); } #define EXPIRE_NX (1 << 0) diff --git a/src/networking.c b/src/networking.c index 4791055b5a..9558780f39 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3585,6 +3585,10 @@ void clientCommand(client *c) { " Protect current client connection from eviction.", "NO-TOUCH (ON|OFF)", " Will not touch LRU/LFU stats when this mode is on.", + "IMPORT-SOURCE (ON|OFF)", + " Mark this connection as an import source if server.import_mode is true.", + " Sync tools can set their connections into 'import-source' state to visit", + " expired keys.", NULL}; addReplyHelp(c, help); } else if (!strcasecmp(c->argv[1]->ptr, "id") && c->argc == 2) { @@ -4058,6 +4062,22 @@ void clientCommand(client *c) { } } addReply(c, shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr, "import-source")) { + /* CLIENT IMPORT-SOURCE ON|OFF */ + if (!server.import_mode) { + addReplyError(c, "Server is not in import mode"); + return; + } + if (!strcasecmp(c->argv[2]->ptr, "on")) { + c->flag.import_source = 1; + addReply(c, shared.ok); + } else if (!strcasecmp(c->argv[2]->ptr, "off")) { + c->flag.import_source = 0; + addReply(c, shared.ok); + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } } else { addReplySubcommandSyntaxError(c); } diff --git a/src/server.c b/src/server.c index 12691df8ee..aebbb57a93 100644 --- a/src/server.c +++ b/src/server.c @@ -1131,10 +1131,10 @@ void databasesCron(void) { /* Expire keys by random sampling. Not required for replicas * as primary will synthesize DELs for us. */ if (server.active_expire_enabled) { - if (iAmPrimary()) { - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else { + if (!iAmPrimary()) { expireReplicaKeys(); + } else if (!server.import_mode) { + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } } @@ -1727,7 +1727,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + if (server.active_expire_enabled && !server.import_mode && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); if (moduleCount()) { moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP, NULL); @@ -2133,6 +2133,7 @@ void initServerConfig(void) { server.extended_redis_compat = 0; server.pause_cron = 0; server.dict_resizing = 1; + server.import_mode = 0; server.latency_tracking_info_percentiles_len = 3; server.latency_tracking_info_percentiles = zmalloc(sizeof(double) * (server.latency_tracking_info_percentiles_len)); diff --git a/src/server.h b/src/server.h index 5ef04a9080..531ca8e7c8 100644 --- a/src/server.h +++ b/src/server.h @@ -1233,7 +1233,8 @@ typedef struct ClientFlags { * knows that it does not need the cache and required a full sync. With this * flag, we won't cache the primary in freeClient. */ uint64_t fake : 1; /* This is a fake client without a real connection. */ - uint64_t reserved : 5; /* Reserved for future use */ + uint64_t import_source : 1; /* This client is importing data to server and can visit expired key. */ + uint64_t reserved : 4; /* Reserved for future use */ } ClientFlags; typedef struct client { @@ -2089,6 +2090,8 @@ struct valkeyServer { char primary_replid[CONFIG_RUN_ID_SIZE + 1]; /* Primary PSYNC runid. */ long long primary_initial_offset; /* Primary PSYNC offset. */ int repl_replica_lazy_flush; /* Lazy FLUSHALL before loading DB? */ + /* Import Mode */ + int import_mode; /* If true, server is in import mode and forbid expiration and eviction. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */ diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index d85ce7ee68..fba425f62d 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -832,6 +832,80 @@ start_server {tags {"expire"}} { close_replication_stream $repl assert_equal [r debug set-active-expire 1] {OK} } {} {needs:debug} + + test {Import mode should forbid active expiration} { + r flushall + + r config set import-mode yes + assert_equal [r client import-source on] {OK} + + r set foo1 bar PX 1 + r set foo2 bar PX 1 + after 100 + + assert_equal [r dbsize] {2} + + assert_equal [r client import-source off] {OK} + r config set import-mode no + + # Verify all keys have expired + wait_for_condition 40 100 { + [r dbsize] eq 0 + } else { + fail "Keys did not actively expire." + } + } + + test {Import mode should forbid lazy expiration} { + r flushall + r debug set-active-expire 0 + + r config set import-mode yes + assert_equal [r client import-source on] {OK} + + r set foo1 1 PX 1 + after 10 + + r get foo1 + assert_equal [r dbsize] {1} + + assert_equal [r client import-source off] {OK} + r config set import-mode no + + r get foo1 + + assert_equal [r dbsize] {0} + + assert_equal [r debug set-active-expire 1] {OK} + } {} {needs:debug} + + test {RANDOMKEY can return expired key in import mode} { + r flushall + + r config set import-mode yes + assert_equal [r client import-source on] {OK} + + r set foo1 bar PX 1 + after 10 + + set client [valkey [srv "host"] [srv "port"] 0 $::tls] + if {!$::singledb} { + $client select 9 + } + assert_equal [$client ttl foo1] {-2} + + assert_equal [r randomkey] {foo1} + + assert_equal [r client import-source off] {OK} + r config set import-mode no + + # Verify all keys have expired + wait_for_condition 40 100 { + [r dbsize] eq 0 + } else { + fail "Keys did not actively expire." + } + } } start_cluster 1 0 {tags {"expire external:skip cluster"}} { diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index d4e62246f1..89e9699a3e 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -611,3 +611,21 @@ start_server {tags {"maxmemory" "external:skip"}} { assert {[r object freq foo] == 5} } } + +start_server {tags {"maxmemory" "external:skip"}} { + test {Import mode should forbid eviction} { + r set key val + r config set import-mode yes + assert_equal [r client import-source on] {OK} + r config set maxmemory-policy allkeys-lru + r config set maxmemory 1 + + assert_equal [r dbsize] {1} + assert_error {OOM command not allowed*} {r set key1 val1} + + assert_equal [r client import-source off] {OK} + r config set import-mode no + + assert_equal [r dbsize] {0} + } +} \ No newline at end of file diff --git a/valkey.conf b/valkey.conf index 7c7b9da43e..bf82b01874 100644 --- a/valkey.conf +++ b/valkey.conf @@ -818,6 +818,13 @@ replica-priority 100 # # replica-ignore-disk-write-errors no +# Make the primary forbid expiration and eviction. +# This is useful for sync tools, because expiration and eviction may cause the data corruption. +# Sync tools can mark their connections as importing source by CLIENT IMPORT-SOURCE. +# NOTICE: Clients should avoid writing the same key on the source server and the destination server. +# +# import-mode no + # ----------------------------------------------------------------------------- # By default, Sentinel includes all replicas in its reports. A replica # can be excluded from Sentinel's announcements. An unannounced replica