Skip to content

Commit

Permalink
Apply clang-format
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Xie <[email protected]>
  • Loading branch information
PingXie committed May 24, 2024
1 parent c5dde8a commit 3ebab38
Show file tree
Hide file tree
Showing 21 changed files with 721 additions and 863 deletions.
46 changes: 22 additions & 24 deletions src/modules/helloacl.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
static ValkeyModuleUser *global;
static uint64_t global_auth_client_id = 0;

/* HELLOACL.REVOKE
/* HELLOACL.REVOKE
* Synchronously revoke access from a user. */
int RevokeCommand_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
Expand All @@ -49,11 +49,11 @@ int RevokeCommand_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
ValkeyModule_DeauthenticateAndCloseClient(ctx, global_auth_client_id);
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
} else {
return ValkeyModule_ReplyWithError(ctx, "Global user currently not used");
return ValkeyModule_ReplyWithError(ctx, "Global user currently not used");
}
}

/* HELLOACL.RESET
/* HELLOACL.RESET
* Synchronously delete and re-create a module user. */
int ResetCommand_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
Expand All @@ -68,22 +68,22 @@ int ResetCommand_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}

/* Callback handler for user changes, use this to notify a module of
/* Callback handler for user changes, use this to notify a module of
* changes to users authenticated by the module */
void HelloACL_UserChanged(uint64_t client_id, void *privdata) {
VALKEYMODULE_NOT_USED(privdata);
VALKEYMODULE_NOT_USED(client_id);
global_auth_client_id = 0;
}

/* HELLOACL.AUTHGLOBAL
/* HELLOACL.AUTHGLOBAL
* Synchronously assigns a module user to the current context. */
int AuthGlobalCommand_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);

if (global_auth_client_id) {
return ValkeyModule_ReplyWithError(ctx, "Global user currently used");
return ValkeyModule_ReplyWithError(ctx, "Global user currently used");
}

ValkeyModule_AuthenticateClientWithUser(ctx, global, HelloACL_UserChanged, NULL, &global_auth_client_id);
Expand All @@ -102,9 +102,8 @@ int HelloACL_Reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
ValkeyModuleString *user_string = ValkeyModule_GetBlockedClientPrivateData(ctx);
const char *name = ValkeyModule_StringPtrLen(user_string, &length);

if (ValkeyModule_AuthenticateClientWithACLUser(ctx, name, length, NULL, NULL, NULL) ==
VALKEYMODULE_ERR) {
return ValkeyModule_ReplyWithError(ctx, "Invalid Username or password");
if (ValkeyModule_AuthenticateClientWithACLUser(ctx, name, length, NULL, NULL, NULL) == VALKEYMODULE_ERR) {
return ValkeyModule_ReplyWithError(ctx, "Invalid Username or password");
}
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
Expand All @@ -129,20 +128,21 @@ void *HelloACL_ThreadMain(void *args) {
ValkeyModuleString *user = targs[1];
ValkeyModule_Free(targs);

ValkeyModule_UnblockClient(bc,user);
ValkeyModule_UnblockClient(bc, user);
return NULL;
}

/* HELLOACL.AUTHASYNC
/* HELLOACL.AUTHASYNC
* Asynchronously assigns an ACL user to the current context. */
int AuthAsyncCommand_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
if (argc != 2) return ValkeyModule_WrongArity(ctx);

pthread_t tid;
ValkeyModuleBlockedClient *bc = ValkeyModule_BlockClient(ctx, HelloACL_Reply, HelloACL_Timeout, HelloACL_FreeData, TIMEOUT_TIME);

ValkeyModuleBlockedClient *bc =
ValkeyModule_BlockClient(ctx, HelloACL_Reply, HelloACL_Timeout, HelloACL_FreeData, TIMEOUT_TIME);

void **targs = ValkeyModule_Alloc(sizeof(void*)*2);

void **targs = ValkeyModule_Alloc(sizeof(void *) * 2);
targs[0] = bc;
targs[1] = ValkeyModule_CreateStringFromString(NULL, argv[1]);

Expand All @@ -160,23 +160,21 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);

if (ValkeyModule_Init(ctx,"helloacl",1,VALKEYMODULE_APIVER_1)
== VALKEYMODULE_ERR) return VALKEYMODULE_ERR;
if (ValkeyModule_Init(ctx, "helloacl", 1, VALKEYMODULE_APIVER_1) == VALKEYMODULE_ERR) return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx,"helloacl.reset",
ResetCommand_ValkeyCommand,"",0,0,0) == VALKEYMODULE_ERR)
if (ValkeyModule_CreateCommand(ctx, "helloacl.reset", ResetCommand_ValkeyCommand, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx,"helloacl.revoke",
RevokeCommand_ValkeyCommand,"",0,0,0) == VALKEYMODULE_ERR)
if (ValkeyModule_CreateCommand(ctx, "helloacl.revoke", RevokeCommand_ValkeyCommand, "", 0, 0, 0) ==
VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx,"helloacl.authglobal",
AuthGlobalCommand_ValkeyCommand,"no-auth",0,0,0) == VALKEYMODULE_ERR)
if (ValkeyModule_CreateCommand(ctx, "helloacl.authglobal", AuthGlobalCommand_ValkeyCommand, "no-auth", 0, 0, 0) ==
VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx,"helloacl.authasync",
AuthAsyncCommand_ValkeyCommand,"no-auth",0,0,0) == VALKEYMODULE_ERR)
if (ValkeyModule_CreateCommand(ctx, "helloacl.authasync", AuthAsyncCommand_ValkeyCommand, "no-auth", 0, 0, 0) ==
VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

global = ValkeyModule_CreateModuleUser("global");
Expand Down
69 changes: 31 additions & 38 deletions src/modules/helloblock.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ int HelloBlock_Reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc)
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
int *myint = ValkeyModule_GetBlockedClientPrivateData(ctx);
return ValkeyModule_ReplyWithLongLong(ctx,*myint);
return ValkeyModule_ReplyWithLongLong(ctx, *myint);
}

/* Timeout callback for blocking command HELLO.BLOCK */
int HelloBlock_Timeout(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
return ValkeyModule_ReplyWithSimpleString(ctx,"Request timedout");
return ValkeyModule_ReplyWithSimpleString(ctx, "Request timedout");
}

/* Private data freeing callback for HELLO.BLOCK command. */
Expand All @@ -69,7 +69,7 @@ void *HelloBlock_ThreadMain(void *arg) {
sleep(delay);
int *r = ValkeyModule_Alloc(sizeof(int));
*r = rand();
ValkeyModule_UnblockClient(bc,r);
ValkeyModule_UnblockClient(bc, r);
return NULL;
}

Expand All @@ -82,8 +82,7 @@ void *HelloBlock_ThreadMain(void *arg) {
* amount of seconds with a while loop calling sleep(1), so that once we
* detect the client disconnection, we can terminate the thread ASAP. */
void HelloBlock_Disconnected(ValkeyModuleCtx *ctx, ValkeyModuleBlockedClient *bc) {
ValkeyModule_Log(ctx,"warning","Blocked client %p disconnected!",
(void*)bc);
ValkeyModule_Log(ctx, "warning", "Blocked client %p disconnected!", (void *)bc);

/* Here you should cleanup your state / threads, and if possible
* call ValkeyModule_UnblockClient(), or notify the thread that will
Expand All @@ -98,32 +97,33 @@ int HelloBlock_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, in
long long delay;
long long timeout;

if (ValkeyModule_StringToLongLong(argv[1],&delay) != VALKEYMODULE_OK) {
return ValkeyModule_ReplyWithError(ctx,"ERR invalid count");
if (ValkeyModule_StringToLongLong(argv[1], &delay) != VALKEYMODULE_OK) {
return ValkeyModule_ReplyWithError(ctx, "ERR invalid count");
}

if (ValkeyModule_StringToLongLong(argv[2],&timeout) != VALKEYMODULE_OK) {
return ValkeyModule_ReplyWithError(ctx,"ERR invalid count");
if (ValkeyModule_StringToLongLong(argv[2], &timeout) != VALKEYMODULE_OK) {
return ValkeyModule_ReplyWithError(ctx, "ERR invalid count");
}

pthread_t tid;
ValkeyModuleBlockedClient *bc = ValkeyModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
ValkeyModuleBlockedClient *bc =
ValkeyModule_BlockClient(ctx, HelloBlock_Reply, HelloBlock_Timeout, HelloBlock_FreeData, timeout);

/* Here we set a disconnection handler, however since this module will
* block in sleep() in a thread, there is not much we can do in the
* callback, so this is just to show you the API. */
ValkeyModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
ValkeyModule_SetDisconnectCallback(bc, HelloBlock_Disconnected);

/* Now that we setup a blocking client, we need to pass the control
* to the thread. However we need to pass arguments to the thread:
* the delay and a reference to the blocked client handle. */
void **targ = ValkeyModule_Alloc(sizeof(void*)*2);
void **targ = ValkeyModule_Alloc(sizeof(void *) * 2);
targ[0] = bc;
targ[1] = (void*)(unsigned long) delay;
targ[1] = (void *)(unsigned long)delay;

if (pthread_create(&tid,NULL,HelloBlock_ThreadMain,targ) != 0) {
if (pthread_create(&tid, NULL, HelloBlock_ThreadMain, targ) != 0) {
ValkeyModule_AbortBlock(bc);
return ValkeyModule_ReplyWithError(ctx,"-ERR Can't start thread");
return ValkeyModule_ReplyWithError(ctx, "-ERR Can't start thread");
}
return VALKEYMODULE_OK;
}
Expand All @@ -141,35 +141,31 @@ void *HelloKeys_ThreadMain(void *arg) {
long long cursor = 0;
size_t replylen = 0;

ValkeyModule_ReplyWithArray(ctx,VALKEYMODULE_POSTPONED_LEN);
ValkeyModule_ReplyWithArray(ctx, VALKEYMODULE_POSTPONED_LEN);
do {
ValkeyModule_ThreadSafeContextLock(ctx);
ValkeyModuleCallReply *reply = ValkeyModule_Call(ctx,
"SCAN","l",(long long)cursor);
ValkeyModuleCallReply *reply = ValkeyModule_Call(ctx, "SCAN", "l", (long long)cursor);
ValkeyModule_ThreadSafeContextUnlock(ctx);

ValkeyModuleCallReply *cr_cursor =
ValkeyModule_CallReplyArrayElement(reply,0);
ValkeyModuleCallReply *cr_keys =
ValkeyModule_CallReplyArrayElement(reply,1);
ValkeyModuleCallReply *cr_cursor = ValkeyModule_CallReplyArrayElement(reply, 0);
ValkeyModuleCallReply *cr_keys = ValkeyModule_CallReplyArrayElement(reply, 1);

ValkeyModuleString *s = ValkeyModule_CreateStringFromCallReply(cr_cursor);
ValkeyModule_StringToLongLong(s,&cursor);
ValkeyModule_FreeString(ctx,s);
ValkeyModule_StringToLongLong(s, &cursor);
ValkeyModule_FreeString(ctx, s);

size_t items = ValkeyModule_CallReplyLength(cr_keys);
for (size_t j = 0; j < items; j++) {
ValkeyModuleCallReply *ele =
ValkeyModule_CallReplyArrayElement(cr_keys,j);
ValkeyModule_ReplyWithCallReply(ctx,ele);
ValkeyModuleCallReply *ele = ValkeyModule_CallReplyArrayElement(cr_keys, j);
ValkeyModule_ReplyWithCallReply(ctx, ele);
replylen++;
}
ValkeyModule_FreeCallReply(reply);
} while (cursor != 0);
ValkeyModule_ReplySetArrayLength(ctx,replylen);
ValkeyModule_ReplySetArrayLength(ctx, replylen);

ValkeyModule_FreeThreadSafeContext(ctx);
ValkeyModule_UnblockClient(bc,NULL);
ValkeyModule_UnblockClient(bc, NULL);
return NULL;
}

Expand All @@ -186,14 +182,14 @@ int HelloKeys_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int
/* Note that when blocking the client we do not set any callback: no
* timeout is possible since we passed '0', nor we need a reply callback
* because we'll use the thread safe context to accumulate a reply. */
ValkeyModuleBlockedClient *bc = ValkeyModule_BlockClient(ctx,NULL,NULL,NULL,0);
ValkeyModuleBlockedClient *bc = ValkeyModule_BlockClient(ctx, NULL, NULL, NULL, 0);

/* Now that we setup a blocking client, we need to pass the control
* to the thread. However we need to pass arguments to the thread:
* the reference to the blocked client handle. */
if (pthread_create(&tid,NULL,HelloKeys_ThreadMain,bc) != 0) {
if (pthread_create(&tid, NULL, HelloKeys_ThreadMain, bc) != 0) {
ValkeyModule_AbortBlock(bc);
return ValkeyModule_ReplyWithError(ctx,"-ERR Can't start thread");
return ValkeyModule_ReplyWithError(ctx, "-ERR Can't start thread");
}
return VALKEYMODULE_OK;
}
Expand All @@ -204,14 +200,11 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);

if (ValkeyModule_Init(ctx,"helloblock",1,VALKEYMODULE_APIVER_1)
== VALKEYMODULE_ERR) return VALKEYMODULE_ERR;
if (ValkeyModule_Init(ctx, "helloblock", 1, VALKEYMODULE_APIVER_1) == VALKEYMODULE_ERR) return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx,"hello.block",
HelloBlock_ValkeyCommand,"",0,0,0) == VALKEYMODULE_ERR)
if (ValkeyModule_CreateCommand(ctx, "hello.block", HelloBlock_ValkeyCommand, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;
if (ValkeyModule_CreateCommand(ctx,"hello.keys",
HelloKeys_ValkeyCommand,"",0,0,0) == VALKEYMODULE_ERR)
if (ValkeyModule_CreateCommand(ctx, "hello.keys", HelloKeys_ValkeyCommand, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

return VALKEYMODULE_OK;
Expand Down
55 changes: 31 additions & 24 deletions src/modules/hellocluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int PingallCommand_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);

ValkeyModule_SendClusterMessage(ctx,NULL,MSGTYPE_PING,"Hey",3);
ValkeyModule_SendClusterMessage(ctx, NULL, MSGTYPE_PING, "Hey", 3);
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}

Expand All @@ -54,36 +54,44 @@ int ListCommand_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, i
VALKEYMODULE_NOT_USED(argc);

size_t numnodes;
char **ids = ValkeyModule_GetClusterNodesList(ctx,&numnodes);
char **ids = ValkeyModule_GetClusterNodesList(ctx, &numnodes);
if (ids == NULL) {
return ValkeyModule_ReplyWithError(ctx,"Cluster not enabled");
return ValkeyModule_ReplyWithError(ctx, "Cluster not enabled");
}

ValkeyModule_ReplyWithArray(ctx,numnodes);
ValkeyModule_ReplyWithArray(ctx, numnodes);
for (size_t j = 0; j < numnodes; j++) {
int port;
ValkeyModule_GetClusterNodeInfo(ctx,ids[j],NULL,NULL,&port,NULL);
ValkeyModule_ReplyWithArray(ctx,2);
ValkeyModule_ReplyWithStringBuffer(ctx,ids[j],VALKEYMODULE_NODE_ID_LEN);
ValkeyModule_ReplyWithLongLong(ctx,port);
ValkeyModule_GetClusterNodeInfo(ctx, ids[j], NULL, NULL, &port, NULL);
ValkeyModule_ReplyWithArray(ctx, 2);
ValkeyModule_ReplyWithStringBuffer(ctx, ids[j], VALKEYMODULE_NODE_ID_LEN);
ValkeyModule_ReplyWithLongLong(ctx, port);
}
ValkeyModule_FreeClusterNodesList(ids);
return VALKEYMODULE_OK;
}

/* Callback for message MSGTYPE_PING */
void PingReceiver(ValkeyModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len) {
ValkeyModule_Log(ctx,"notice","PING (type %d) RECEIVED from %.*s: '%.*s'",
type,VALKEYMODULE_NODE_ID_LEN,sender_id,(int)len, payload);
ValkeyModule_SendClusterMessage(ctx,NULL,MSGTYPE_PONG,"Ohi!",4);
void PingReceiver(ValkeyModuleCtx *ctx,
const char *sender_id,
uint8_t type,
const unsigned char *payload,
uint32_t len) {
ValkeyModule_Log(ctx, "notice", "PING (type %d) RECEIVED from %.*s: '%.*s'", type, VALKEYMODULE_NODE_ID_LEN,
sender_id, (int)len, payload);
ValkeyModule_SendClusterMessage(ctx, NULL, MSGTYPE_PONG, "Ohi!", 4);
ValkeyModuleCallReply *reply = ValkeyModule_Call(ctx, "INCR", "c", "pings_received");
ValkeyModule_FreeCallReply(reply);
}

/* Callback for message MSGTYPE_PONG. */
void PongReceiver(ValkeyModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len) {
ValkeyModule_Log(ctx,"notice","PONG (type %d) RECEIVED from %.*s: '%.*s'",
type,VALKEYMODULE_NODE_ID_LEN,sender_id,(int)len, payload);
void PongReceiver(ValkeyModuleCtx *ctx,
const char *sender_id,
uint8_t type,
const unsigned char *payload,
uint32_t len) {
ValkeyModule_Log(ctx, "notice", "PONG (type %d) RECEIVED from %.*s: '%.*s'", type, VALKEYMODULE_NODE_ID_LEN,
sender_id, (int)len, payload);
}

/* This function must be present on each module. It is used in order to
Expand All @@ -92,15 +100,14 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);

if (ValkeyModule_Init(ctx,"hellocluster",1,VALKEYMODULE_APIVER_1)
== VALKEYMODULE_ERR) return VALKEYMODULE_ERR;
if (ValkeyModule_Init(ctx, "hellocluster", 1, VALKEYMODULE_APIVER_1) == VALKEYMODULE_ERR) return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx,"hellocluster.pingall",
PingallCommand_ValkeyCommand,"readonly",0,0,0) == VALKEYMODULE_ERR)
if (ValkeyModule_CreateCommand(ctx, "hellocluster.pingall", PingallCommand_ValkeyCommand, "readonly", 0, 0, 0) ==
VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx,"hellocluster.list",
ListCommand_ValkeyCommand,"readonly",0,0,0) == VALKEYMODULE_ERR)
if (ValkeyModule_CreateCommand(ctx, "hellocluster.list", ListCommand_ValkeyCommand, "readonly", 0, 0, 0) ==
VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

/* Disable Cluster sharding and redirections. This way every node
Expand All @@ -109,10 +116,10 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
* variable. Normally you do that in order for the distributed system
* you create as a module to have total freedom in the keyspace
* manipulation. */
ValkeyModule_SetClusterFlags(ctx,VALKEYMODULE_CLUSTER_FLAG_NO_REDIRECTION);
ValkeyModule_SetClusterFlags(ctx, VALKEYMODULE_CLUSTER_FLAG_NO_REDIRECTION);

/* Register our handlers for different message types. */
ValkeyModule_RegisterClusterMessageReceiver(ctx,MSGTYPE_PING,PingReceiver);
ValkeyModule_RegisterClusterMessageReceiver(ctx,MSGTYPE_PONG,PongReceiver);
ValkeyModule_RegisterClusterMessageReceiver(ctx, MSGTYPE_PING, PingReceiver);
ValkeyModule_RegisterClusterMessageReceiver(ctx, MSGTYPE_PONG, PongReceiver);
return VALKEYMODULE_OK;
}
Loading

0 comments on commit 3ebab38

Please sign in to comment.