From cbe8433133bc7ee877136036f5d1f3868974fd56 Mon Sep 17 00:00:00 2001 From: Attila Kovacs Date: Mon, 9 Dec 2024 06:48:42 +0100 Subject: [PATCH] More error handling and HELLO tweaks --- include/redisx-priv.h | 7 ++- include/redisx.h | 14 ++++- src/redisx-client.c | 103 +++++++++++++--------------------- src/redisx-hooks.c | 16 ++++-- src/redisx-net.c | 30 +++++----- src/redisx-script.c | 17 ++++-- src/redisx-sub.c | 30 +++++----- src/redisx-tab.c | 67 +++++++++++----------- src/redisx.c | 127 ++++++++++++++++++++++++++++++++++-------- src/resp.c | 2 +- test/src/test-hello.c | 2 +- 11 files changed, 244 insertions(+), 171 deletions(-) diff --git a/include/redisx-priv.h b/include/redisx-priv.h index daf3e14..e8601a9 100644 --- a/include/redisx-priv.h +++ b/include/redisx-priv.h @@ -51,8 +51,6 @@ typedef struct { int socket; ///< Changing the socket should require both locks! int pendingRequests; ///< Number of request sent and not yet answered... RESP *attributes; ///< Attributes from the last packet received. - RedisPushProcessor pushConsumer; ///< User-defined function to consume RESP3 push messages. - void *pushArg; ///< User-defined argument to pass along with push messages. } ClientPrivate; @@ -64,6 +62,7 @@ typedef struct { char *password; ///< Redis password (if any) int protocol; ///< RESP version to use boolean hello; ///< whether to use HELLO (introduced in Redis 6.0.0 only) + RESP *helloData; ///< RESP data received from server during last connection. RedisClient *clients; @@ -85,6 +84,9 @@ typedef struct { pthread_mutex_t subscriberLock; MessageConsumer *subscriberList; + + RedisPushProcessor pushConsumer; ///< User-defined function to consume RESP3 push messages. + void *pushArg; ///< User-defined argument to pass along with push messages. } RedisPrivate; @@ -96,6 +98,7 @@ void rConfigUnlock(Redis *redis); int rConnectClient(Redis *redis, enum redisx_channel channel); void rCloseClient(RedisClient *cl); boolean rIsLowLatency(const ClientPrivate *cp); +int rCheckClient(const RedisClient *cl); // in resp.c ------------------------------> int redisxAppendRESP(RESP *resp, RESP *part); diff --git a/include/redisx.h b/include/redisx.h index b9f8018..369aa43 100644 --- a/include/redisx.h +++ b/include/redisx.h @@ -304,14 +304,23 @@ typedef void (*RedisPipelineProcessor)(RESP *response); *
  • If extensive processing or blocking calls are required to process the message, it is best to * simply place a copy of the RESP on a queue and then return quickly, and then process the message * asynchronously in a background thread.
  • + *
  • The client on which the push notification originated will be locked when this function is + * called, waiting for a response to an earlier query. Thus the implementation should not attempt to + * lock the client again or release the lock. It may send asynchronous requests on the client, e.g. via + * redisxSendRequestAsync(), but it should not try to read a response (given that the client is + * blocked for another read operation). If more flexible client access is needed, the implementation + * should make a copy of the RESP and place it on a queue for asynchronous processing by another thread. + *
  • * * + * @param cl The Redis client that sent the push. The client is locked for exlusive + * access when this function is called. * @param message The RESP3 message that was pushed by the client * @param ptr Additional data passed along. * * @sa redisxSetPushProcessor() */ -typedef void (*RedisPushProcessor)(RESP *message, void *ptr); +typedef void (*RedisPushProcessor)(RedisClient *cl, RESP *message, void *ptr); @@ -330,6 +339,7 @@ int redisxSetProtocol(Redis *redis, enum redisx_protocol protocol); enum redisx_protocol redisxGetProtocol(const Redis *redis); Redis *redisxInit(const char *server); +int redisxCheckValid(const Redis *redis); void redisxDestroy(Redis *redis); int redisxConnect(Redis *redis, boolean usePipeline); void redisxDisconnect(Redis *redis); @@ -366,7 +376,7 @@ void redisxDestroyEntries(RedisEntry *entries, int count); void redisxDestroyKeys(char **keys, int count); int redisxSetPipelineConsumer(Redis *redis, RedisPipelineProcessor f); -int redisxSetPushProcessor(RedisClient *cl, RedisPushProcessor func, void *arg); +int redisxSetPushProcessor(Redis *redis, RedisPushProcessor func, void *arg); int redisxPublish(Redis *redis, const char *channel, const char *message, int length); int redisxNotify(Redis *redis, const char *channel, const char *message); diff --git a/src/redisx-client.c b/src/redisx-client.c index e43c31a..f01b655 100644 --- a/src/redisx-client.c +++ b/src/redisx-client.c @@ -47,6 +47,24 @@ int debugTraffic = FALSE; ///< Whether to print excerpts of all traffic to/fr /// \endcond +/// \cond PROTECTED + +/** + * Checks that a redis instance is valid. + * + * @param cl The Redis client instance + * @return X_SUCCESS (0) if the client is valid, or X_NULL if the argument is NULL, + * or else X_NO_INIT if the redis instance is not initialized. + */ +int rCheckClient(const RedisClient *cl) { + static const char *fn = "rCheckRedis"; + if(!cl) return x_error(X_NULL, EINVAL, fn, "Redis client is NULL"); + if(!cl->priv) return x_error(X_NO_INIT, EAGAIN, fn, "Redis client is not initialized"); + return X_SUCCESS; +} + +/// \endcond + /** * Specific call for dealing with socket level transmit (send/rcv) errors. It prints a descriptive message to * sdterr, and calls the configured user transmit error handler routine, and either exists the program @@ -282,10 +300,7 @@ static int rSendBytesAsync(ClientPrivate *cp, const char *buf, int length, boole RedisClient *redisxGetClient(Redis *redis, enum redisx_channel channel) { RedisPrivate *p; - if(redis == NULL) { - x_error(0, EINVAL, "redisxGetClient", "redis is NULL"); - return NULL; - } + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null("redisxGetClient", NULL); p = (RedisPrivate *) redis->priv; if(channel < 0 || channel >= REDISX_CHANNELS) return NULL; @@ -311,9 +326,8 @@ RedisClient *redisxGetLockedConnectedClient(Redis *redis, enum redisx_channel ch static const char *fn = "redisxGetLockedConnectedClient"; RedisClient *cl = redisxGetClient(redis, channel); - if(!cl) return x_trace_null(fn, NULL); - if(redisxLockConnected(cl) != X_SUCCESS) return x_trace_null(fn, NULL); + return cl; } @@ -335,7 +349,8 @@ int redisxLockClient(RedisClient *cl) { ClientPrivate *cp; int status; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + cp = (ClientPrivate *) cl->priv; if(cp == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); @@ -394,7 +409,8 @@ int redisxUnlockClient(RedisClient *cl) { ClientPrivate *cp; int status; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + cp = (ClientPrivate *) cl->priv; if(cp == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); @@ -421,7 +437,8 @@ int redisxSkipReplyAsync(RedisClient *cl) { static const char *fn = "redisSkipReplyAsync"; static const char cmd[] = "*3\r\n$6\r\nCLIENT\r\n$5\r\nREPLY\r\n$4\r\nSKIP\r\n"; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + if(cl->priv == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); prop_error(fn, rSendBytesAsync((ClientPrivate *) cl->priv, cmd, sizeof(cmd) - 1, TRUE)); @@ -454,7 +471,8 @@ int redisxStartBlockAsync(RedisClient *cl) { static const char *fn = "redisxStartBlockAsync"; static const char cmd[] = "*1\r\n$5\r\nMULTI\r\n"; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + if(cl->priv == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); prop_error(fn, rSendBytesAsync((ClientPrivate *) cl->priv, cmd, sizeof(cmd) - 1, TRUE)); @@ -478,7 +496,8 @@ int redisxAbortBlockAsync(RedisClient *cl) { static const char *fn = "redisxAbortBlockAsync"; static const char cmd[] = "*1\r\n$7\r\nDISCARD\r\n"; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + if(cl->priv == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); prop_error(fn, rSendBytesAsync((ClientPrivate *) cl->priv, cmd, sizeof(cmd) - 1, TRUE)); @@ -507,10 +526,7 @@ RESP *redisxExecBlockAsync(RedisClient *cl) { int status; - if(cl == NULL) { - x_error(0, EINVAL, fn, "client is NULL"); - return NULL; - } + if(rCheckClient(cl) != X_SUCCESS) return x_trace_null(fn, NULL); if(cl->priv == NULL) { x_error(0, EINVAL, fn, "client is not initialized"); @@ -558,7 +574,8 @@ int redisxSendRequestAsync(RedisClient *cl, const char *command, const char *arg const char *args[] = { command, arg1, arg2, arg3 }; int n; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + if(command == NULL) return x_error(X_NAME_INVALID, EINVAL, fn, "command is NULL"); // Count the non-null arguments... @@ -592,7 +609,7 @@ int redisxSendArrayRequestAsync(RedisClient *cl, char *args[], int lengths[], in int i, L; ClientPrivate *cp; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); cp = (ClientPrivate *) cl->priv; if(cp == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); @@ -672,7 +689,7 @@ int redisxIgnoreReplyAsync(RedisClient *cl) { static const char *fn = "redisxIgnoreReplyAsync"; RESP *resp; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); resp = redisxReadReplyAsync(cl); if(resp == NULL) return x_trace(fn, NULL, REDIS_NULL); @@ -698,49 +715,12 @@ static int rTypeIsParametrized(char type) { } } -/** - * Sets a user-defined function to process push messages for a specific Redis client. The function's - * implementation must follow a simple set of rules: - * - * - * - * @param cl Redis client instance - * @param func Function to use for processing push messages from the client, or NULL to ignore - * push messages. - * @param arg (optional) User-defined pointer argument to pass along to the processing function. - * @return X_SUCCESS (0) if successful, or else X_NULL (errno set to EINVAL) if the client - * argument is NULL. - */ -int redisxSetPushProcessor(RedisClient *cl, RedisPushProcessor func, void *arg) { - static const char *fn = "redisxSetPushProcessor"; - - ClientPrivate *cp; - if(!cl) return x_error(X_NULL, EINVAL, fn, "input client is NULL"); - - prop_error(fn, redisxLockClient(cl)); - cp = cl->priv; - cp->pushConsumer = func; - cp->pushArg = arg; - redisxUnlockClient(cl); - - return X_SUCCESS; -} static void rPushMessageAsync(RedisClient *cl, RESP *resp) { int i; ClientPrivate *cp = (ClientPrivate *) cl->priv; + RedisPrivate *p = (RedisPrivate *) cp->redis->priv; RESP **array; if(resp->n < 0) return; @@ -756,7 +736,7 @@ static void rPushMessageAsync(RedisClient *cl, RESP *resp) { resp->value = array; - if(cp->pushConsumer) cp->pushConsumer(resp, cp->pushArg); + if(p->pushConsumer) p->pushConsumer(cl, resp, p->pushArg); redisxDestroyRESP(resp); } @@ -819,7 +799,7 @@ static void rSetAttributeAsync(ClientPrivate *cp, RESP *resp) { * */ int redisxClearAttributesAsync(RedisClient *cl) { - if(!cl) return x_error(X_NULL, EINVAL, "redisxClearAttributes", "client is NULL"); + prop_error("redisxClearAttributesAsync", rCheckClient(cl)); rSetAttributeAsync((ClientPrivate *) cl->priv, NULL); return X_SUCCESS; @@ -843,10 +823,7 @@ RESP *redisxReadReplyAsync(RedisClient *cl) { int size = 0; int status = X_SUCCESS; - if(cl == NULL) { - x_error(0, EINVAL, fn, "client is NULL"); - return NULL; - } + if(rCheckClient(cl) != X_SUCCESS) return x_trace_null(fn, NULL); cp = cl->priv; @@ -1081,8 +1058,6 @@ int redisxResetClient(RedisClient *cl) { int status; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); - prop_error(fn, redisxLockConnected(cl)); status = redisxSendRequestAsync(cl, "RESET", NULL, NULL, NULL); diff --git a/src/redisx-hooks.c b/src/redisx-hooks.c index fac2459..e3a19ff 100644 --- a/src/redisx-hooks.c +++ b/src/redisx-hooks.c @@ -37,7 +37,8 @@ int redisxAddConnectHook(Redis *redis, void (*setupCall)(Redis *)) { static const char *fn = "redisxAddConnectHook"; RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); + if(setupCall == NULL) return x_error(X_NULL, EINVAL, fn, "setupCall is NULL"); xvprintf("Redis-X> Adding a connect callback.\n"); @@ -76,7 +77,8 @@ int redisxRemoveConnectHook(Redis *redis, void (*setupCall)(Redis *)) { RedisPrivate *p; Hook *c, *last = NULL; - if(redis == NULL) x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); + if(setupCall == NULL) x_error(X_NULL, EINVAL, fn, "setupCall is NULL"); xvprintf("Redis-X> Removing a connect callback.\n"); @@ -112,7 +114,7 @@ void redisxClearConnectHooks(Redis *redis) { RedisPrivate *p; Hook *c; - if(redis == NULL) return; + if(redisxCheckValid(redis) != X_SUCCESS) return; xvprintf("Redis-X> Clearing all connect callbacks.\n"); @@ -147,7 +149,8 @@ int redisxAddDisconnectHook(Redis *redis, void (*cleanupCall)(Redis *)) { RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); + if(cleanupCall == NULL) return x_error(X_NULL, EINVAL, fn, "cleanupCall is NULL"); xvprintf("Redis-X> Adding a disconnect callback.\n"); @@ -186,7 +189,8 @@ int redisxRemoveDisconnectHook(Redis *redis, void (*cleanupCall)(Redis *)) { RedisPrivate *p; Hook *c, *last = NULL; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); + if(cleanupCall == NULL) return x_error(X_NULL, EINVAL, fn, "cleanupCall is NULL"); xvprintf("Redis-X> Removing a disconnect callback.\n"); @@ -222,7 +226,7 @@ void redisxClearDisconnectHooks(Redis *redis) { RedisPrivate *p; Hook *c; - if(redis == NULL) return; + if(redisxCheckValid(redis) != X_SUCCESS) return; xvprintf("Redis-X> Clearing all disconnect callbacks.\n"); diff --git a/src/redisx-net.c b/src/redisx-net.c index 06f65ff..1f440bb 100644 --- a/src/redisx-net.c +++ b/src/redisx-net.c @@ -313,9 +313,10 @@ static void rCloseClientAsync(RedisClient *cl) { * */ void rCloseClient(RedisClient *cl) { - redisxLockClient(cl); - rCloseClientAsync(cl); - redisxUnlockClient(cl); + if(redisxLockClient(cl) == X_SUCCESS) { + rCloseClientAsync(cl); + redisxUnlockClient(cl); + } return; } @@ -363,7 +364,7 @@ static int rReconnectAsync(Redis *redis, boolean usePipeline) { * */ void redisxDisconnect(Redis *redis) { - if(redis == NULL) return; + if(redisxCheckValid(redis) != X_SUCCESS) return; rConfigLock(redis); rDisconnectAsync(redis); @@ -387,7 +388,7 @@ int redisxReconnect(Redis *redis, boolean usePipeline) { int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); rConfigLock(redis); status = rReconnectAsync(redis, usePipeline); @@ -538,7 +539,11 @@ static int rHelloAsync(RedisClient *cl, char *clientID) { } } else xvprintf("! Redis-X: HELLO failed: %s\n", redisxErrorDescription(status)); - redisxDestroyRESP(reply); + + rConfigLock(cp->redis); + redisxDestroyRESP(p->helloData); + p->helloData = reply; + rConfigUnlock(cp->redis); return status; } @@ -693,7 +698,7 @@ Redis *redisxInit(const char *server) { redis->subscription = &p->clients[REDISX_SUBSCRIPTION_CHANNEL]; redis->id = xStringCopyOf(ipAddress); - for(i=REDISX_CHANNELS; --i >= 0; ) { + for(i = REDISX_CHANNELS; --i >= 0; ) { ClientPrivate *cp = (ClientPrivate *) p->clients[i].priv; cp->redis = redis; } @@ -768,7 +773,7 @@ void redisxSetTcpBuf(int size) { int redisxSetPort(Redis *redis, int port) { RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, "redisxSetPort", "redis is NULL"); + prop_error("redisxSetPort", redisxCheckValid(redis)); p = (RedisPrivate *) redis->priv; p->port = port; @@ -801,7 +806,7 @@ int redisxConnect(Redis *redis, boolean usePipeline) { static const char *fn = "redisxConnect"; int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); rConfigLock(redis); @@ -825,7 +830,7 @@ int redisxConnect(Redis *redis, boolean usePipeline) { boolean redisxIsConnected(Redis *redis) { const ClientPrivate *ip; - if(redis == NULL) return FALSE; + if(redisxCheckValid(redis) != X_SUCCESS) return FALSE; ip = (ClientPrivate *) redis->interactive->priv; return ip->isEnabled; @@ -854,10 +859,7 @@ void *RedisPipelineListener(void *pRedis) { xvprintf("Redis-X> Started processing pipelined responses...\n"); - if(redis == NULL) { - x_error(0, EINVAL, "RedisPipelineListener", "redis is NULL"); - return NULL; - } + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null("RedisPipelineListener", NULL); p = (RedisPrivate *) redis->priv; cl = redis->pipeline; diff --git a/src/redisx-script.c b/src/redisx-script.c index 220a046..74a776b 100644 --- a/src/redisx-script.c +++ b/src/redisx-script.c @@ -33,9 +33,10 @@ int redisxLoadScript(Redis *redis, const char *script, char **sha1) { static const char *fn = "redisxLoadScript"; RESP *reply; - int status; + int status = X_SUCCESS; + + prop_error(fn, redisxCheckValid(redis)); - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); if(script == NULL) return x_error(X_NULL, EINVAL, fn, "input script is NULL"); if(*script == '\0') return x_error(X_NULL, EINVAL, fn, "input script is empty"); @@ -81,8 +82,9 @@ int redisxRunScriptAsync(RedisClient *cl, const char *sha1, const char **keys, c int i = 0, k, nkeys = 0, nparams = 0, nargs; char sn[20], **args; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); - if(sha1 == NULL) return x_error(X_NULL, EINVAL, fn, "input script SHA1 sum is NULL"); + prop_error(fn, rCheckClient(cl)); + + if(sha1 == NULL) return x_error(X_NULL, EINVAL, fn, "input script SHA1 sum is NULL"); if(keys) while(keys[nkeys]) nkeys++; if(params) while(params[nparams]) nparams++; @@ -128,7 +130,12 @@ RESP *redisxRunScript(Redis *redis, const char *sha1, const char **keys, const c RESP *reply = NULL; - if(redis == NULL || sha1 == NULL) return NULL; + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null(fn, NULL); + + if(sha1 == NULL) { + x_error(0, EINVAL, fn, "sha1 parameter is NULL"); + return NULL; + } if(redisxLockConnected(redis->interactive) != X_SUCCESS) return x_trace_null(fn, NULL); diff --git a/src/redisx-sub.c b/src/redisx-sub.c index 3e7aebb..73d5188 100644 --- a/src/redisx-sub.c +++ b/src/redisx-sub.c @@ -89,7 +89,8 @@ int redisxPublishAsync(Redis *redis, const char *channel, const char *data, int char *args[3]; int L[3] = {0}; - if(redis == NULL) return x_error(X_NULL,EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); + if(channel == NULL) return x_error(X_NULL,EINVAL, fn, "channel parameter is NULL"); if(*channel == '\0') return x_error(X_NULL,EINVAL, fn, "channel parameter is empty"); @@ -133,8 +134,7 @@ int redisxPublish(Redis *redis, const char *channel, const char *data, int lengt int status = 0; - if(redis == NULL) return x_error(X_NULL,EINVAL, fn, "redis is NULL"); - + prop_error(fn, redisxCheckValid(redis)); prop_error(fn, redisxLockConnected(redis->interactive)); // Now send the message @@ -201,7 +201,7 @@ int redisxAddSubscriber(Redis *redis, const char *channelStem, RedisSubscriberCa MessageConsumer *c; RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); p = (RedisPrivate *) redis->priv; @@ -261,7 +261,8 @@ int redisxRemoveSubscribers(Redis *redis, RedisSubscriberCall f) { MessageConsumer *c, *last = NULL; int removed = 0; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); + if(!f) return x_error(X_NULL, EINVAL, fn, "subscriber function parameter is NULL"); p = (RedisPrivate *) redis->priv; @@ -306,7 +307,7 @@ int redisxClearSubscribers(Redis *redis) { MessageConsumer *c; int n = 0; - if(redis == NULL) return x_error(X_NULL, EINVAL, "redisxClearSubscrivers", "redis is NULL"); + prop_error("redisxClearSubscribers", redisxCheckValid(redis)); p = (RedisPrivate *) redis->priv; @@ -370,7 +371,8 @@ int redisxSubscribe(Redis *redis, const char *pattern) { const ClientPrivate *cp; int status = 0; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); + if(pattern == NULL) return x_error(X_NULL, EINVAL, fn, "pattern parameter is NULL"); // connect subscription client as needed. @@ -419,8 +421,7 @@ int redisxUnsubscribe(Redis *redis, const char *pattern) { int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - + prop_error(fn, redisxCheckValid(redis)); prop_error(fn, redisxLockConnected(redis->subscription)); if(pattern) { @@ -456,7 +457,7 @@ static int rEndSubscriptionAsync(Redis *redis) { RedisPrivate *p; int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); xvprintf("Redis-X> End all subscriptions, and quit listener.\n"); @@ -487,7 +488,7 @@ int redisxEndSubscription(Redis *redis) { int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); rConfigLock(redis); status = rEndSubscriptionAsync(redis); @@ -505,7 +506,7 @@ static void rNotifyConsumers(Redis *redis, char *pattern, char *channel, char *m xdprintf("NOTIFY: %s | %s\n", channel, msg); - if(!redis) return; + if(redisxCheckValid(redis) != X_SUCCESS) return; p = (RedisPrivate *) redis->priv; @@ -564,10 +565,7 @@ void *RedisSubscriptionListener(void *pRedis) { xvprintf("Redis-X> Started processing subsciptions...\n"); - if(redis == NULL) { - x_error(0, EINVAL, "RedisSubscriptionListener", "redis is NULL"); - return NULL; - } + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null("RedisSubscriptionListener", NULL); p = (RedisPrivate *) redis->priv; cl = redis->subscription; diff --git a/src/redisx-tab.c b/src/redisx-tab.c index 5fc5b72..a9d406a 100644 --- a/src/redisx-tab.c +++ b/src/redisx-tab.c @@ -46,13 +46,13 @@ RedisEntry *redisxGetTable(Redis *redis, const char *table, int *n) { RedisEntry *entries = NULL; RESP *reply; - if(n == NULL) { - x_error(0, EINVAL, fn, "parameter 'n' is NULL"); - return NULL; + if(redisxCheckValid(redis) != X_SUCCESS) { + if(n) *n = redisxCheckValid(redis); + return x_trace_null(fn, NULL); } - if(redis == NULL) { - *n = x_error(X_NULL, EINVAL, fn, "redis is NULL"); + if(n == NULL) { + x_error(0, EINVAL, fn, "parameter 'n' is NULL"); return NULL; } @@ -136,9 +136,9 @@ int redisxSetValue(Redis *redis, const char *table, const char *key, const char int status = X_SUCCESS; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - + prop_error(fn, redisxCheckValid(redis)); prop_error(fn, redisxLockConnected(redis->interactive)); + status = redisxSetValueAsync(redis->interactive, table, key, value, confirm); redisxUnlockClient(redis->interactive); @@ -215,12 +215,11 @@ RESP *redisxGetValue(Redis *redis, const char *table, const char *key, int *stat static const char *fn = "redisxGetValue"; RESP *reply; - int s; + int s = X_SUCCESS; - if(redis == NULL) { - x_error(X_NULL, EINVAL, fn, "redis is NULL"); + if(redisxCheckValid(redis) != X_SUCCESS) { if(status) *status = X_NULL; - return NULL; + return x_trace_null(fn, NULL); } if(table && !table[0]) { @@ -277,10 +276,9 @@ char *redisxGetStringValue(Redis *redis, const char *table, const char *key, int char *str = NULL; int status; - if(redis == NULL) { - x_error(0, EINVAL, fn, "redis is NULL"); + if(redisxCheckValid(redis) != X_SUCCESS) { if(len) *len = X_NULL; - return NULL; + return x_trace_null(fn, NULL); } reply = redisxGetValue(redis, table, key, len); @@ -379,7 +377,8 @@ int redisxMultiSet(Redis *redis, const char *table, const RedisEntry *entries, i static const char *fn = "redisxMultiSet"; int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); + if(table == NULL) return x_error(X_GROUP_INVALID, EINVAL, fn, "table parameter is NULL"); if(!table[0]) return x_error(X_GROUP_INVALID, EINVAL, fn, "table parameter is empty"); if(entries == NULL) return x_error(X_NULL, EINVAL, fn, "'entries' parameter is NULL"); @@ -422,13 +421,13 @@ char **redisxGetKeys(Redis *redis, const char *table, int *n) { RESP *reply; char **names = NULL; - if(n == NULL) { - x_error(X_NULL, EINVAL, fn, "parameter 'n' is NULL"); - return NULL; + if(redisxCheckValid(redis) != X_SUCCESS) { + if(n) *n = redisxCheckValid(redis); + return x_trace_null(fn, NULL); } - if(redis == NULL) { - *n = x_error(X_NULL, EINVAL, fn, "redis is NULL"); + if(n == NULL) { + x_error(X_NULL, EINVAL, fn, "parameter 'n' is NULL"); return NULL; } @@ -488,7 +487,7 @@ char **redisxGetKeys(Redis *redis, const char *table, int *n) { int redisxSetScanCount(Redis *redis, int count) { RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, "redisxSetScanCount", "redis is NULL"); + prop_error("redisxSetScanCount", redisxCheckValid(redis)); p = (RedisPrivate *) redis->priv; rConfigLock(redis); @@ -511,7 +510,7 @@ int redisxSetScanCount(Redis *redis, int count) { int redisxGetScanCount(Redis *redis) { const RedisPrivate *p; - if(redis == NULL) return x_error(-1, EINVAL, "redisxSetScanCount", "redis is NULL"); + prop_error("redisxGetScanCount", redisxCheckValid(redis)); p = (RedisPrivate *) redis->priv; return p->scanCount; @@ -558,20 +557,18 @@ char **redisxScanKeys(Redis *redis, const char *pattern, int *n, int *status) { char countArg[20]; int args = 0, i, j, capacity = SCAN_INITIAL_STORE_CAPACITY; - if(status == NULL) { - x_error(0, EINVAL, fn, "'status' parameter is NULL"); - return NULL; + if(redisxCheckValid(redis) != X_SUCCESS) { + if(status) *status = redisxCheckValid(redis); } if(n == NULL) { x_error(X_NULL, EINVAL, fn, "parameter 'n' is NULL"); - *status = X_NULL; + if(status) *status = X_NULL; return NULL; } - if(redis == NULL) { - x_error(X_NULL, EINVAL, fn, "redis is NULL"); - *status = X_NULL; + if(status == NULL) { + x_error(0, EINVAL, fn, "'status' parameter is NULL"); return NULL; } @@ -732,20 +729,18 @@ RedisEntry *redisxScanTable(Redis *redis, const char *table, const char *pattern char **pCursor; int args= 0, i, j, capacity = SCAN_INITIAL_STORE_CAPACITY; - if(status == NULL) { - x_error(0, EINVAL, fn, "'status' parameter is NULL"); - return NULL; + if(redisxCheckValid(redis) != X_SUCCESS) { + if(status) *status = redisxCheckValid(redis); } if(n == NULL) { x_error(X_NULL, EINVAL, fn, "parameter 'n' is NULL"); - *status = X_NULL; + if(status) *status = X_NULL; return NULL; } - if(redis == NULL) { - x_error(X_NULL, EINVAL, fn, "redis is NULL"); - *status = X_NULL; + if(status == NULL) { + x_error(0, EINVAL, fn, "'status' parameter is NULL"); return NULL; } diff --git a/src/redisx.c b/src/redisx.c index 05f9bf2..49be698 100644 --- a/src/redisx.c +++ b/src/redisx.c @@ -62,6 +62,21 @@ void rConfigUnlock(Redis *redis) { /// \endcond + +/** + * Checks that a redis instance is valid. + * + * @param redis The Redis instance + * @return X_SUCCESS (0) if the instance is valid, or X_NULL if the argument is NULL, + * or else X_NO_INIT if the redis instance is not initialized. + */ +int redisxCheckValid(const Redis *redis) { + static const char *fn = "rCheckRedis"; + if(!redis) return x_error(X_NULL, EINVAL, fn, "Redis instamce is NULL"); + if(!redis->priv) return x_error(X_NO_INIT, EAGAIN, fn, "Redis instance is not initialized"); + return X_SUCCESS; +} + /** * Enable or disable verbose reporting of all Redis operations (and possibly some details of them). * Reporting is done on the standard output (stdout). It may be useful when debugging programs @@ -115,7 +130,7 @@ int redisxSetUser(Redis *redis, const char *username) { RedisPrivate *p; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); if(redisxIsConnected(redis)) return x_error(X_ALREADY_OPEN, EALREADY, fn, "already connected"); p = (RedisPrivate *) redis->priv; @@ -143,7 +158,7 @@ int redisxSetPassword(Redis *redis, const char *passwd) { RedisPrivate *p; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); if(redisxIsConnected(redis)) return x_error(X_ALREADY_OPEN, EALREADY, fn, "already connected"); p = (RedisPrivate *) redis->priv; @@ -155,16 +170,15 @@ int redisxSetPassword(Redis *redis, const char *passwd) { /** - * Sets the RESP prorocol version to use with the given Redis server instance. You must set the protocol - * prior to connecting to Redis. The protocol is set with the HELLO command, which was introduced in - * Redis 6.0.0 only. For older Redis server instances, the protocol will default to RESP2. - * Calling this function will enable using HELLO to handshake with the server. + * Sets the RESP prorocol version to use for future client connections. The protocol is set with the + * HELLO command, which was introduced in Redis 6.0.0 only. For older Redis server instances, the + * protocol will default to RESP2. Calling this function will enable using HELLO to handshake with + * the server. * * @param redis The Redis server instance * @param protocol REDISX_RESP2 or REDISX_RESP3. * @return X_SUCCESS (0) if successful, or X_NULL if the redis argument in NULL, X_NO_INIT - * if the redis instance was not initialized, or X_ALREADY_OPEN if the server was - * already connected. + * if the redis instance was not initialized. * * @sa redisxGetProtocol() */ @@ -172,9 +186,8 @@ int redisxSetProtocol(Redis *redis, enum redisx_protocol protocol) { static const char *fn = "redisxSetProtocol"; RedisPrivate *p; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - if(!redis->priv) return x_error(X_NO_INIT, EINVAL, fn, "redis is not initialized"); - if(redisxIsConnected(redis)) return x_error(X_ALREADY_OPEN, EALREADY, fn, "already connected"); + + prop_error(fn, redisxCheckValid(redis)); p = (RedisPrivate *) redis->priv; p->hello = TRUE; @@ -198,8 +211,7 @@ enum redisx_protocol redisxGetProtocol(const Redis *redis) { static const char *fn = "redisxGetProtocol"; const RedisPrivate *p; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - if(!redis->priv) return x_error(X_NO_INIT, EINVAL, fn, "redis is not initialized"); + prop_error(fn, redisxCheckValid(redis)); p = (RedisPrivate *) redis->priv; return p->protocol; @@ -226,7 +238,7 @@ enum redisx_protocol redisxGetProtocol(const Redis *redis) { int redisxSetTransmitErrorHandler(Redis *redis, RedisErrorHandler f) { RedisPrivate *p; - if(!redis) return x_error(X_NULL, EINVAL, "redisxSetTransmitErrorHandler", "redis is NULL"); + prop_error("redisxSetTransmitErrorHandler", redisxCheckValid(redis)); rConfigLock(redis); p = (RedisPrivate *) redis->priv; @@ -252,7 +264,7 @@ int redisxGetTime(Redis *redis, struct timespec *t) { int status = X_SUCCESS; char *tail; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); if(!t) return x_error(X_NULL, EINVAL, fn, "output timespec is NULL"); memset(t, 0, sizeof(*t)); @@ -313,7 +325,7 @@ int redisxPing(Redis *redis, const char *message) { int status = X_SUCCESS; RESP *reply; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); reply = redisxRequest(redis, "PING", message, NULL, NULL, &status); @@ -396,7 +408,7 @@ int redisxSelectDB(Redis *redis, int idx) { enum redisx_channel c; int status = X_SUCCESS; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); p = (RedisPrivate *) redis->priv; if(p->dbIndex == idx) return X_SUCCESS; @@ -470,9 +482,16 @@ int redisxError(const char *func, int errorCode) { boolean redisxHasPipeline(Redis *redis) { const ClientPrivate *pp; - if(redis == NULL) return FALSE; + boolean isEnabled; + + if(redisxCheckValid(redis) != X_SUCCESS) return FALSE; + + prop_error("redisxHasPipeline", redisxLockClient(redis->pipeline)); pp = (ClientPrivate *) redis->pipeline->priv; - return pp->isEnabled; + isEnabled = pp->isEnabled; + redisxUnlockClient(redis->pipeline); + + return isEnabled; } /** @@ -499,7 +518,7 @@ boolean redisxHasPipeline(Redis *redis) { int redisxSetPipelineConsumer(Redis *redis, RedisPipelineProcessor f) { RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, "redisxSetPipelineConsumer", "redis is NULL"); + prop_error("redisxSetPipelineConsumer", redisxCheckValid(redis)); p = (RedisPrivate *) redis->priv; rConfigLock(redis); @@ -539,10 +558,10 @@ RESP *redisxRequest(Redis *redis, const char *command, const char *arg1, const c RESP *reply; const char *args[] = { command, arg1, arg2, arg3 }; - int n, s; + int n, s = X_SUCCESS; - if(redis == NULL) x_error(X_NULL, EINVAL, fn, "redis is NULL"); + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null(fn, NULL); if(command == NULL) n = 0; else if(arg1 == NULL) n = 1; @@ -594,8 +613,10 @@ RESP *redisxArrayRequest(Redis *redis, char *args[], int lengths[], int n, int * RedisClient *cl; - if(redis == NULL || args == NULL || n < 1 || status == NULL) { - x_error(0, EINVAL, fn, "invalid parameter: redis=%p, args=%p, n=%d, status=%p", redis, args, n, status); + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null(fn, NULL); + + if(args == NULL || n < 1 || status == NULL) { + x_error(0, EINVAL, fn, "invalid parameter: args=%p, n=%d, status=%p", args, n, status); if(status) *status = X_NULL; return NULL; } @@ -615,6 +636,64 @@ RESP *redisxArrayRequest(Redis *redis, char *args[], int lengths[], int n, int * } +/** + * Sets a user-defined function to process push messages for a specific Redis instance. The function's + * implementation must follow a simple set of rules: + * + * + * + * @param redis Redis instance + * @param func Function to use for processing push messages from the given Redis instance, or NULL + * to ignore push messages. + * @param arg (optional) User-defined pointer argument to pass along to the processing function. + * @return X_SUCCESS (0) if successful, or else X_NULL (errno set to EINVAL) if the client + * argument is NULL, or X_NO_INIT (errno set to EAGAIN) if redis is uninitialized. + */ +int redisxSetPushProcessor(Redis *redis, RedisPushProcessor func, void *arg) { + static const char *fn = "redisxSetPushProcessor"; + + RedisPrivate *p; + + prop_error(fn, redisxCheckValid(redis)); + + rConfigLock(redis); + p = redis->priv; + p->pushConsumer = func; + p->pushArg = arg; + rConfigUnlock(redis); + + return X_SUCCESS; +} + +RESP *redisxGetHelloData(Redis *redis) { + static const char *fn = "redisxGetHelloData"; + + const RedisPrivate *p; + RESP *data; + + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null(fn, NULL); + + rConfigLock(redis); + p = (RedisPrivate *) redis->priv; + data = redisxCopyOfRESP(p->helloData); + rConfigUnlock(redis); + + return data; +} + /** * Returns a string description for one of the RM error codes. * diff --git a/src/resp.c b/src/resp.c index 7f27924..427a89d 100644 --- a/src/resp.c +++ b/src/resp.c @@ -570,7 +570,7 @@ static XField *respArrayToXField(const char *name, const RESP **component, int n XField *array; - f = xCreate1DFieldArray(name, n); + f = xCreateHeterogeneous1DField(name, n); if(!f->value) return x_trace_null(fn, "field array"); diff --git a/test/src/test-hello.c b/test/src/test-hello.c index 1d5f47a..4eb9391 100644 --- a/test/src/test-hello.c +++ b/test/src/test-hello.c @@ -29,7 +29,7 @@ int main() { return 1; } - if(redisxGetProtocol(redis) != REDISX_RESP3) { + if(redisxGetProtocol(redisxGetClient(redis, REDISX_INTERACTIVE_CHANNEL)) != REDISX_RESP3) { fprintf(stderr, "ERROR! verify RESP3 protocol\n"); return 1; }