Skip to content

Commit

Permalink
More error handling and HELLO tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Dec 9, 2024
1 parent 5a6113f commit cbe8433
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 171 deletions.
7 changes: 5 additions & 2 deletions include/redisx-priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ typedef struct {
int socket; ///< Changing the socket should require both locks!
int pendingRequests; ///< Number of request sent and not yet answered...
RESP *attributes; ///< Attributes from the last packet received.
RedisPushProcessor pushConsumer; ///< User-defined function to consume RESP3 push messages.
void *pushArg; ///< User-defined argument to pass along with push messages.
} ClientPrivate;


Expand All @@ -64,6 +62,7 @@ typedef struct {
char *password; ///< Redis password (if any)
int protocol; ///< RESP version to use
boolean hello; ///< whether to use HELLO (introduced in Redis 6.0.0 only)
RESP *helloData; ///< RESP data received from server during last connection.

RedisClient *clients;

Expand All @@ -85,6 +84,9 @@ typedef struct {

pthread_mutex_t subscriberLock;
MessageConsumer *subscriberList;

RedisPushProcessor pushConsumer; ///< User-defined function to consume RESP3 push messages.
void *pushArg; ///< User-defined argument to pass along with push messages.
} RedisPrivate;


Expand All @@ -96,6 +98,7 @@ void rConfigUnlock(Redis *redis);
int rConnectClient(Redis *redis, enum redisx_channel channel);
void rCloseClient(RedisClient *cl);
boolean rIsLowLatency(const ClientPrivate *cp);
int rCheckClient(const RedisClient *cl);

// in resp.c ------------------------------>
int redisxAppendRESP(RESP *resp, RESP *part);
Expand Down
14 changes: 12 additions & 2 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,14 +304,23 @@ typedef void (*RedisPipelineProcessor)(RESP *response);
* <li>If extensive processing or blocking calls are required to process the message, it is best to
* simply place a copy of the RESP on a queue and then return quickly, and then process the message
* asynchronously in a background thread.</li>
* <li>The client on which the push notification originated will be locked when this function is
* called, waiting for a response to an earlier query. Thus the implementation should not attempt to
* lock the client again or release the lock. It may send asynchronous requests on the client, e.g. via
* redisxSendRequestAsync(), but it should not try to read a response (given that the client is
* blocked for another read operation). If more flexible client access is needed, the implementation
* should make a copy of the RESP and place it on a queue for asynchronous processing by another thread.
* </li>
* </ul>
*
* @param cl The Redis client that sent the push. The client is locked for exlusive
* access when this function is called.
* @param message The RESP3 message that was pushed by the client
* @param ptr Additional data passed along.
*
* @sa redisxSetPushProcessor()
*/
typedef void (*RedisPushProcessor)(RESP *message, void *ptr);
typedef void (*RedisPushProcessor)(RedisClient *cl, RESP *message, void *ptr);



Expand All @@ -330,6 +339,7 @@ int redisxSetProtocol(Redis *redis, enum redisx_protocol protocol);
enum redisx_protocol redisxGetProtocol(const Redis *redis);

Redis *redisxInit(const char *server);
int redisxCheckValid(const Redis *redis);
void redisxDestroy(Redis *redis);
int redisxConnect(Redis *redis, boolean usePipeline);
void redisxDisconnect(Redis *redis);
Expand Down Expand Up @@ -366,7 +376,7 @@ void redisxDestroyEntries(RedisEntry *entries, int count);
void redisxDestroyKeys(char **keys, int count);

int redisxSetPipelineConsumer(Redis *redis, RedisPipelineProcessor f);
int redisxSetPushProcessor(RedisClient *cl, RedisPushProcessor func, void *arg);
int redisxSetPushProcessor(Redis *redis, RedisPushProcessor func, void *arg);

int redisxPublish(Redis *redis, const char *channel, const char *message, int length);
int redisxNotify(Redis *redis, const char *channel, const char *message);
Expand Down
103 changes: 39 additions & 64 deletions src/redisx-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ int debugTraffic = FALSE; ///< Whether to print excerpts of all traffic to/fr

/// \endcond

/// \cond PROTECTED

/**
* Checks that a redis instance is valid.
*
* @param cl The Redis client instance
* @return X_SUCCESS (0) if the client is valid, or X_NULL if the argument is NULL,
* or else X_NO_INIT if the redis instance is not initialized.
*/
int rCheckClient(const RedisClient *cl) {
static const char *fn = "rCheckRedis";
if(!cl) return x_error(X_NULL, EINVAL, fn, "Redis client is NULL");
if(!cl->priv) return x_error(X_NO_INIT, EAGAIN, fn, "Redis client is not initialized");
return X_SUCCESS;
}

/// \endcond

/**
* Specific call for dealing with socket level transmit (send/rcv) errors. It prints a descriptive message to
* sdterr, and calls the configured user transmit error handler routine, and either exists the program
Expand Down Expand Up @@ -282,10 +300,7 @@ static int rSendBytesAsync(ClientPrivate *cp, const char *buf, int length, boole
RedisClient *redisxGetClient(Redis *redis, enum redisx_channel channel) {
RedisPrivate *p;

if(redis == NULL) {
x_error(0, EINVAL, "redisxGetClient", "redis is NULL");
return NULL;
}
if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null("redisxGetClient", NULL);

p = (RedisPrivate *) redis->priv;
if(channel < 0 || channel >= REDISX_CHANNELS) return NULL;
Expand All @@ -311,9 +326,8 @@ RedisClient *redisxGetLockedConnectedClient(Redis *redis, enum redisx_channel ch
static const char *fn = "redisxGetLockedConnectedClient";

RedisClient *cl = redisxGetClient(redis, channel);
if(!cl) return x_trace_null(fn, NULL);

if(redisxLockConnected(cl) != X_SUCCESS) return x_trace_null(fn, NULL);

return cl;
}

Expand All @@ -335,7 +349,8 @@ int redisxLockClient(RedisClient *cl) {
ClientPrivate *cp;
int status;

if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL");
prop_error(fn, rCheckClient(cl));

cp = (ClientPrivate *) cl->priv;
if(cp == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized");

Expand Down Expand Up @@ -394,7 +409,8 @@ int redisxUnlockClient(RedisClient *cl) {
ClientPrivate *cp;
int status;

if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL");
prop_error(fn, rCheckClient(cl));

cp = (ClientPrivate *) cl->priv;
if(cp == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized");

Expand All @@ -421,7 +437,8 @@ int redisxSkipReplyAsync(RedisClient *cl) {
static const char *fn = "redisSkipReplyAsync";
static const char cmd[] = "*3\r\n$6\r\nCLIENT\r\n$5\r\nREPLY\r\n$4\r\nSKIP\r\n";

if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL");
prop_error(fn, rCheckClient(cl));

if(cl->priv == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized");

prop_error(fn, rSendBytesAsync((ClientPrivate *) cl->priv, cmd, sizeof(cmd) - 1, TRUE));
Expand Down Expand Up @@ -454,7 +471,8 @@ int redisxStartBlockAsync(RedisClient *cl) {
static const char *fn = "redisxStartBlockAsync";
static const char cmd[] = "*1\r\n$5\r\nMULTI\r\n";

if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL");
prop_error(fn, rCheckClient(cl));

if(cl->priv == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized");

prop_error(fn, rSendBytesAsync((ClientPrivate *) cl->priv, cmd, sizeof(cmd) - 1, TRUE));
Expand All @@ -478,7 +496,8 @@ int redisxAbortBlockAsync(RedisClient *cl) {
static const char *fn = "redisxAbortBlockAsync";
static const char cmd[] = "*1\r\n$7\r\nDISCARD\r\n";

if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL");
prop_error(fn, rCheckClient(cl));

if(cl->priv == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized");

prop_error(fn, rSendBytesAsync((ClientPrivate *) cl->priv, cmd, sizeof(cmd) - 1, TRUE));
Expand Down Expand Up @@ -507,10 +526,7 @@ RESP *redisxExecBlockAsync(RedisClient *cl) {

int status;

if(cl == NULL) {
x_error(0, EINVAL, fn, "client is NULL");
return NULL;
}
if(rCheckClient(cl) != X_SUCCESS) return x_trace_null(fn, NULL);

if(cl->priv == NULL) {
x_error(0, EINVAL, fn, "client is not initialized");
Expand Down Expand Up @@ -558,7 +574,8 @@ int redisxSendRequestAsync(RedisClient *cl, const char *command, const char *arg
const char *args[] = { command, arg1, arg2, arg3 };
int n;

if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL");
prop_error(fn, rCheckClient(cl));

if(command == NULL) return x_error(X_NAME_INVALID, EINVAL, fn, "command is NULL");

// Count the non-null arguments...
Expand Down Expand Up @@ -592,7 +609,7 @@ int redisxSendArrayRequestAsync(RedisClient *cl, char *args[], int lengths[], in
int i, L;
ClientPrivate *cp;

if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL");
prop_error(fn, rCheckClient(cl));

cp = (ClientPrivate *) cl->priv;
if(cp == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized");
Expand Down Expand Up @@ -672,7 +689,7 @@ int redisxIgnoreReplyAsync(RedisClient *cl) {
static const char *fn = "redisxIgnoreReplyAsync";
RESP *resp;

if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL");
prop_error(fn, rCheckClient(cl));

resp = redisxReadReplyAsync(cl);
if(resp == NULL) return x_trace(fn, NULL, REDIS_NULL);
Expand All @@ -698,49 +715,12 @@ static int rTypeIsParametrized(char type) {
}
}

/**
* Sets a user-defined function to process push messages for a specific Redis client. The function's
* implementation must follow a simple set of rules:
*
* <ul>
* <li>the implementation should not destroy the RESP data. The RESP will be destroyed automatically
* after the call returns. However, the call may retain any data from the RESP itself, provided
* the data is de-referenced from the RESP before return.<li>
* <li>The call will have exclusive access to the client. As such it should not try to obtain a
* lock or release the lock itself.</li>
* <li>The implementation should not block (aside from maybe a quick mutex unlock) and return quickly,
* so as to not block the client for long periods</li>
* <li>If extensive processing or blocking calls are required to process the message, it is best to
* simply place a copy of the RESP on a queue and then return quickly, and then process the message
* asynchronously in a background thread.</li>
* </ul>
*
* @param cl Redis client instance
* @param func Function to use for processing push messages from the client, or NULL to ignore
* push messages.
* @param arg (optional) User-defined pointer argument to pass along to the processing function.
* @return X_SUCCESS (0) if successful, or else X_NULL (errno set to EINVAL) if the client
* argument is NULL.
*/
int redisxSetPushProcessor(RedisClient *cl, RedisPushProcessor func, void *arg) {
static const char *fn = "redisxSetPushProcessor";

ClientPrivate *cp;

if(!cl) return x_error(X_NULL, EINVAL, fn, "input client is NULL");

prop_error(fn, redisxLockClient(cl));
cp = cl->priv;
cp->pushConsumer = func;
cp->pushArg = arg;
redisxUnlockClient(cl);

return X_SUCCESS;
}

static void rPushMessageAsync(RedisClient *cl, RESP *resp) {
int i;
ClientPrivate *cp = (ClientPrivate *) cl->priv;
RedisPrivate *p = (RedisPrivate *) cp->redis->priv;
RESP **array;

if(resp->n < 0) return;
Expand All @@ -756,7 +736,7 @@ static void rPushMessageAsync(RedisClient *cl, RESP *resp) {

resp->value = array;

if(cp->pushConsumer) cp->pushConsumer(resp, cp->pushArg);
if(p->pushConsumer) p->pushConsumer(cl, resp, p->pushArg);

redisxDestroyRESP(resp);
}
Expand Down Expand Up @@ -819,7 +799,7 @@ static void rSetAttributeAsync(ClientPrivate *cp, RESP *resp) {
*
*/
int redisxClearAttributesAsync(RedisClient *cl) {
if(!cl) return x_error(X_NULL, EINVAL, "redisxClearAttributes", "client is NULL");
prop_error("redisxClearAttributesAsync", rCheckClient(cl));

rSetAttributeAsync((ClientPrivate *) cl->priv, NULL);
return X_SUCCESS;
Expand All @@ -843,10 +823,7 @@ RESP *redisxReadReplyAsync(RedisClient *cl) {
int size = 0;
int status = X_SUCCESS;

if(cl == NULL) {
x_error(0, EINVAL, fn, "client is NULL");
return NULL;
}
if(rCheckClient(cl) != X_SUCCESS) return x_trace_null(fn, NULL);

cp = cl->priv;

Expand Down Expand Up @@ -1081,8 +1058,6 @@ int redisxResetClient(RedisClient *cl) {

int status;

if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL");

prop_error(fn, redisxLockConnected(cl));

status = redisxSendRequestAsync(cl, "RESET", NULL, NULL, NULL);
Expand Down
16 changes: 10 additions & 6 deletions src/redisx-hooks.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ int redisxAddConnectHook(Redis *redis, void (*setupCall)(Redis *)) {
static const char *fn = "redisxAddConnectHook";
RedisPrivate *p;

if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL");
prop_error(fn, redisxCheckValid(redis));

if(setupCall == NULL) return x_error(X_NULL, EINVAL, fn, "setupCall is NULL");

xvprintf("Redis-X> Adding a connect callback.\n");
Expand Down Expand Up @@ -76,7 +77,8 @@ int redisxRemoveConnectHook(Redis *redis, void (*setupCall)(Redis *)) {
RedisPrivate *p;
Hook *c, *last = NULL;

if(redis == NULL) x_error(X_NULL, EINVAL, fn, "redis is NULL");
prop_error(fn, redisxCheckValid(redis));

if(setupCall == NULL) x_error(X_NULL, EINVAL, fn, "setupCall is NULL");

xvprintf("Redis-X> Removing a connect callback.\n");
Expand Down Expand Up @@ -112,7 +114,7 @@ void redisxClearConnectHooks(Redis *redis) {
RedisPrivate *p;
Hook *c;

if(redis == NULL) return;
if(redisxCheckValid(redis) != X_SUCCESS) return;

xvprintf("Redis-X> Clearing all connect callbacks.\n");

Expand Down Expand Up @@ -147,7 +149,8 @@ int redisxAddDisconnectHook(Redis *redis, void (*cleanupCall)(Redis *)) {

RedisPrivate *p;

if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL");
prop_error(fn, redisxCheckValid(redis));

if(cleanupCall == NULL) return x_error(X_NULL, EINVAL, fn, "cleanupCall is NULL");

xvprintf("Redis-X> Adding a disconnect callback.\n");
Expand Down Expand Up @@ -186,7 +189,8 @@ int redisxRemoveDisconnectHook(Redis *redis, void (*cleanupCall)(Redis *)) {
RedisPrivate *p;
Hook *c, *last = NULL;

if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL");
prop_error(fn, redisxCheckValid(redis));

if(cleanupCall == NULL) return x_error(X_NULL, EINVAL, fn, "cleanupCall is NULL");

xvprintf("Redis-X> Removing a disconnect callback.\n");
Expand Down Expand Up @@ -222,7 +226,7 @@ void redisxClearDisconnectHooks(Redis *redis) {
RedisPrivate *p;
Hook *c;

if(redis == NULL) return;
if(redisxCheckValid(redis) != X_SUCCESS) return;

xvprintf("Redis-X> Clearing all disconnect callbacks.\n");

Expand Down
Loading

0 comments on commit cbe8433

Please sign in to comment.