Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new replicate module API to bypass command validation #1357

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 42 additions & 24 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ static void zsetKeyReset(ValkeyModuleKey *key);
static void moduleInitKeyTypeSpecific(ValkeyModuleKey *key);
void VM_FreeDict(ValkeyModuleCtx *ctx, ValkeyModuleDict *d);
void VM_FreeServerInfo(ValkeyModuleCtx *ctx, ValkeyModuleServerInfoData *data);
int moduleReplicate(ValkeyModuleCtx *ctx, ValkeyModuleFlag flag, const char *cmdname, const char *fmt, va_list ap);

/* Helpers for VM_SetCommandInfo. */
static int moduleValidateCommandInfo(const ValkeyModuleCommandInfo *info);
Expand Down Expand Up @@ -3531,34 +3532,22 @@ int VM_ReplyWithLongDouble(ValkeyModuleCtx *ctx, long double ld) {
* The command returns VALKEYMODULE_ERR if the format specifiers are invalid
* or the command name does not belong to a known command. */
int VM_Replicate(ValkeyModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
struct serverCommand *cmd;
robj **argv = NULL;
int argc = 0, flags = 0, j;
va_list ap;

cmd = lookupCommandByCString((char *)cmdname);
if (!cmd) return VALKEYMODULE_ERR;

/* Create the client and dispatch the command. */
va_start(ap, fmt);
argv = moduleCreateArgvFromUserFormat(cmdname, fmt, &argc, &flags, ap);
int result = moduleReplicate(ctx, VALKEYMODULE_FLAG_DEFAULT, cmdname, fmt, ap);
va_end(ap);
if (argv == NULL) return VALKEYMODULE_ERR;

/* Select the propagation target. Usually is AOF + replicas, however
* the caller can exclude one or the other using the "A" or "R"
* modifiers. */
int target = 0;
if (!(flags & VALKEYMODULE_ARGV_NO_AOF)) target |= PROPAGATE_AOF;
if (!(flags & VALKEYMODULE_ARGV_NO_REPLICAS)) target |= PROPAGATE_REPL;

alsoPropagate(ctx->client->db->id, argv, argc, target);
return result;
}

/* Release the argv. */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
zfree(argv);
server.dirty++;
return VALKEYMODULE_OK;
/* Same as ValkeyModule_Replicate, but can take ValkeyModuleFlag
* Can be either VALKEYMODULE_FLAG_DEFAULT, which means default behavior
* (same as calling ValkeyModule_Replicate) */
int VM_ReplicateWithFlag(ValkeyModuleCtx *ctx, ValkeyModuleFlag flag, const char *cmdname, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
int result = moduleReplicate(ctx, flag, cmdname, fmt, ap);
va_end(ap);
return result;
}

/* This function will replicate the command exactly as it was invoked
Expand Down Expand Up @@ -13523,6 +13512,34 @@ void moduleDefragGlobals(void) {
dictReleaseIterator(di);
}

/* Helper function for VM_Replicate and VM_ReplicateWithFlag to replicate the specified command
* and arguments to replicas and AOF, as effect of execution of the calling command implementation.
* Skip command validation if the ValkeyModuleFlag is set to VALKEYMODULE_FLAG_SKIP_VALIDATION. */
int moduleReplicate(ValkeyModuleCtx *ctx, ValkeyModuleFlag flag, const char *cmdname, const char *fmt, va_list ap) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls move this function to just above the VM_Replicate, reference the function zsetInitLexRange. It is a helper function too.

struct serverCommand *cmd;
robj **argv = NULL;
int argc = 0, flags = 0, j;
if (flag != VALKEYMODULE_FLAG_SKIP_VALIDATION) {
cmd = lookupCommandByCString((char *)cmdname);
if (!cmd) return VALKEYMODULE_ERR;
}
/* Create the client and dispatch the command. */
argv = moduleCreateArgvFromUserFormat(cmdname, fmt, &argc, &flags, ap);
if (argv == NULL) return VALKEYMODULE_ERR;
/* Select the propagation target. Usually is AOF + replicas, however
* the caller can exclude one or the other using the "A" or "R"
* modifiers. */
int target = 0;
if (!(flags & VALKEYMODULE_ARGV_NO_AOF)) target |= PROPAGATE_AOF;
if (!(flags & VALKEYMODULE_ARGV_NO_REPLICAS)) target |= PROPAGATE_REPL;
alsoPropagate(ctx->client->db->id, argv, argc, target);
/* Release the argv. */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
zfree(argv);
server.dirty++;
return VALKEYMODULE_OK;
}

/* Returns the name of the key currently being processed.
* There is no guarantee that the key name is always available, so this may return NULL.
*/
Expand Down Expand Up @@ -13635,6 +13652,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(StringPtrLen);
REGISTER_API(AutoMemory);
REGISTER_API(Replicate);
REGISTER_API(ReplicateWithFlag);
REGISTER_API(ReplicateVerbatim);
REGISTER_API(DeleteKey);
REGISTER_API(UnlinkKey);
Expand Down
8 changes: 8 additions & 0 deletions src/valkeymodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,11 @@ typedef enum {
VALKEYMODULE_ACL_LOG_CHANNEL /* Channel authorization failure */
} ValkeyModuleACLLogEntryReason;

typedef enum {
VALKEYMODULE_FLAG_DEFAULT = 0, /* Default behavior */
VALKEYMODULE_FLAG_SKIP_VALIDATION, /* Skip validation */
} ValkeyModuleFlag;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I get some suggestions if this naming is too general

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there benefit set the flag as enum, more flags later? Can we just use int skip (0 for non-skip, 1 for skip) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's always complicated to have types like this in the module API. I believe we support backward and forward compatiblity for modules. I mean, a module built with the valkeymodule.h from Valkey 10 can run on Valkey 9 and vice versa. A module can check if some function exists in the current server by checking if the function is NULL, like if (ValkeyModule_ReplicateWithFlag != NULL) ..., but to check which flags are valid in an enum, we don't have a way for modules to check that.

I think Wen's suggestion with an int is simpler, but an even simpler API is to just add ValkeyModule_ReplicateWithoutValidataion. WDYT?

Internally, we can use an enum or int, but that's doesn't have to be part of the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ValkeyModule_ReplicateWithoutValidataion is also good idea


/* Incomplete structures needed by both the core and modules. */
typedef struct ValkeyModuleIO ValkeyModuleIO;
typedef struct ValkeyModuleDigest ValkeyModuleDigest;
Expand Down Expand Up @@ -1092,6 +1097,8 @@ VALKEYMODULE_API int (*ValkeyModule_StringToStreamID)(const ValkeyModuleString *
VALKEYMODULE_API void (*ValkeyModule_AutoMemory)(ValkeyModuleCtx *ctx) VALKEYMODULE_ATTR;
VALKEYMODULE_API int (*ValkeyModule_Replicate)(ValkeyModuleCtx *ctx, const char *cmdname, const char *fmt, ...)
VALKEYMODULE_ATTR;
VALKEYMODULE_API int (*ValkeyModule_ReplicateWithFlag)(ValkeyModuleCtx *ctx, ValkeyModuleFlag flag, const char *cmdname, const char *fmt, ...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ValkeyModuleFlag can be set as int, here we can pass int value;

VALKEYMODULE_ATTR;
VALKEYMODULE_API int (*ValkeyModule_ReplicateVerbatim)(ValkeyModuleCtx *ctx) VALKEYMODULE_ATTR;
VALKEYMODULE_API const char *(*ValkeyModule_CallReplyStringPtr)(ValkeyModuleCallReply *reply,
size_t *len)VALKEYMODULE_ATTR;
Expand Down Expand Up @@ -1750,6 +1757,7 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in
VALKEYMODULE_GET_API(StringPtrLen);
VALKEYMODULE_GET_API(AutoMemory);
VALKEYMODULE_GET_API(Replicate);
VALKEYMODULE_GET_API(ReplicateWithFlag);
VALKEYMODULE_GET_API(ReplicateVerbatim);
VALKEYMODULE_GET_API(DeleteKey);
VALKEYMODULE_GET_API(UnlinkKey);
Expand Down
25 changes: 22 additions & 3 deletions tests/modules/propagate.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ int propagateTestSimpleCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,

/* Replicate two commands to test MULTI/EXEC wrapping. */
ValkeyModule_Replicate(ctx,"INCR","c","counter-1");
ValkeyModule_Replicate(ctx,"INCR","c","counter-2");
ValkeyModule_ReplicateWithFlag(ctx, VALKEYMODULE_FLAG_SKIP_VALIDATION, "INCR",
"c", "counter-2");
ValkeyModule_ReplyWithSimpleString(ctx,"OK");
return VALKEYMODULE_OK;
}
Expand All @@ -266,15 +267,28 @@ int propagateTestMixedCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, i
ValkeyModule_FreeCallReply(reply);

ValkeyModule_Replicate(ctx,"INCR","c","counter-1");
ValkeyModule_Replicate(ctx,"INCR","c","counter-2");

ValkeyModule_ReplicateWithFlag(ctx, VALKEYMODULE_FLAG_SKIP_VALIDATION, "INCR",
"c", "counter-2");
reply = ValkeyModule_Call(ctx, "INCR", "c!", "after-call");
ValkeyModule_FreeCallReply(reply);

ValkeyModule_ReplyWithSimpleString(ctx,"OK");
return VALKEYMODULE_OK;
}

int propagateTestInvalidCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
/* Replicate two commands to test MULTI/EXEC wrapping. */
ValkeyModule_ReplicateWithFlag(ctx, VALKEYMODULE_FLAG_SKIP_VALIDATION, "INVALID",
"c", "counter-1");
ValkeyModule_ReplicateWithFlag(ctx, VALKEYMODULE_FLAG_SKIP_VALIDATION, "INVALID",
"c", "counter-2");
ValkeyModule_ReplyWithSimpleString(ctx, "OK");
return VALKEYMODULE_OK;
}

int propagateTestNestedCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc)
{
VALKEYMODULE_NOT_USED(argv);
Expand Down Expand Up @@ -380,6 +394,11 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
"write",1,1,1) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "propagate-test.invalid",
propagateTestInvalidCommand, "", 1, 1,
1) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx,"propagate-test.nested",
propagateTestNestedCommand,
"write",1,1,1) == VALKEYMODULE_ERR)
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/moduleapi/propagate.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,32 @@ tags "modules" {
}
}

tags "modules" {
start_server [list overrides [list loadmodule "$testmodule"]] {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
start_server [list overrides [list loadmodule "$testmodule"]] {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
# Start the replication process...
$replica replicaof $master_host $master_port
wait_for_sync $replica
after 1000
test {module crash when propagating invalid command} {
$master propagate-test.invalid
catch {wait_for_sync $replica}

wait_for_log_messages -1 {"*=== * BUG REPORT START: Cut & paste starting from here ===*"} 0 10 1000
wait_for_log_messages -1 {"* This replica panicked sending an error to its primary after processing the command '<unknown>' *"} 0 10 1000

assert_equal 1 [count_log_message -1 "=== .* BUG REPORT START: Cut & paste starting from here ==="]
assert_equal 1 [count_log_message -1 "This replica panicked sending an error to its primary after processing the command '<unknown>'"]
}
}
}
}

tags "modules aof" {
foreach aofload_type {debug_cmd startup} {
Expand Down
Loading