Skip to content

Commit

Permalink
Separate sentinel timeout and other tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Dec 11, 2024
1 parent e681a8e commit 6262833
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 46 deletions.
1 change: 1 addition & 0 deletions include/redisx-priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ typedef struct {
RedisServer *servers; ///< List of sentinel servers.
int nServers; ///< number of servers in list
char *serviceName; ///< Name of service (for Sentinel).
int timeoutMillis; ///< [ms] Connection timeout for sentinel nodes.
} RedisSentinel;


Expand Down
7 changes: 7 additions & 0 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
# define REDISX_DEFAULT_TIMEOUT_MILLIS 3000
#endif

#ifndef REDISX_DEFAULT_SENTINEL_TIMEOUT_MILLIS
/// [ms] Default socket read/write timeout for Redis clients
# define REDISX_DEFAULT_SENTINEL_TIMEOUT_MILLIS 100
#endif

// Various exposed constants ----------------------------------------------------->

/// API major version
Expand Down Expand Up @@ -343,6 +348,8 @@ boolean redisxIsVerbose();
void redisxDebugTraffic(boolean value);

void redisxSetTcpBuf(int size);
int redisxSetSocketTimeout(Redis *redis, int millis);
int redisxSetSentinelTimeout(Redis *redis, int millis);
int redisxSetTransmitErrorHandler(Redis *redis, RedisErrorHandler f);

int redisxSetPort(Redis *redis, int port);
Expand Down
66 changes: 50 additions & 16 deletions src/redisx-net.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,11 @@ static int rTryConnectSentinel(Redis *redis, int serverIndex) {
static int rDiscoverSentinel(Redis *redis) {
static const char *fn = "rConnectSentinel";

const RedisPrivate *p = (RedisPrivate *) redis->priv;
RedisPrivate *p = (RedisPrivate *) redis->priv;
const RedisSentinel *s = p->sentinel;
int i;
int i, savedTimeout = p->timeoutMillis;

p->timeoutMillis = s->timeoutMillis > 0 ? s->timeoutMillis : REDISX_DEFAULT_SENTINEL_TIMEOUT_MILLIS;

for(i = 0; i < s->nServers; i++) if(rTryConnectSentinel(redis, i) == X_SUCCESS) {
RESP *reply;
Expand All @@ -261,13 +262,16 @@ static int rDiscoverSentinel(Redis *redis) {
int port = (int) strtol((char *) component[1]->value, NULL, 10);

status = rSetServerAsync(redis, "sentinel master", (char *) component[0]->value, port);

redisxDestroyRESP(reply);
p->timeoutMillis = savedTimeout;

prop_error(fn, status);
return X_SUCCESS;
}
}

p->timeoutMillis = savedTimeout;
return x_error(X_NO_SERVICE, ENOTCONN, fn, "no Sentinel server available");
}

Expand Down Expand Up @@ -872,15 +876,17 @@ Redis *redisxInit(const char *server) {
}

/**
* Initializes a Redis client with a Sentinel configuration of alternate servers
* Initializes a Redis client with a Sentinel configuration of alternate servers, and the default
* sentinel node connection timeout.
*
* @param serviceName The service name as registered in the Sentinel server configuration.
* @param serverList An set of Sentinel servers to use to dynamically find the current master. A
* copy of the supplied name will be used, so the argument passed can be
* freely destroyed after the call.
* @param nServers The number of servers in the list
* @return X_SUCCESS (0) if successful, or else an error code &lt;0.
* @param serviceName The service name as registered in the Sentinel server configuration.
* @param serverList An set of Sentinel servers to use to dynamically find the current master. A
* copy of the supplied name will be used, so the argument passed can be
* freely destroyed after the call.
* @param nServers The number of servers in the list
* @return X_SUCCESS (0) if successful, or else an error code &lt;0.
*
* @sa redisxSetSentinelTimeout()
* @sa redisxInit()
* @sa redisxConnect()
*/
Expand Down Expand Up @@ -926,12 +932,40 @@ Redis *redisxInitSentinel(const char *serviceName, const RedisServer *serverList

s->nServers = nServers;
s->serviceName = xStringCopyOf(serviceName);

s->timeoutMillis = REDISX_DEFAULT_SENTINEL_TIMEOUT_MILLIS;
p->sentinel = s;

return redis;
}

/**
* Changes the connection timeout for Sentinel server instances in the discovery phase. This is different
* from the timeout that is used for the master server, once it is discovered.
*
* @param redis The Redis instance, which was initialized for Sentinel via redisxInitSentinel().
* @param millis [ms] The new connection timeout or &lt;=0 to use the default value.
* @return X_SUCCESS (0) if successfully set sentinel connection timeout, or else X_NULL if the
* redis instance is NULL, or X_NO_INIT if the redis instance is not initialized for
* Sentinel.
*
* @sa redisxSetSocketTimeout()
* @sa redisxInitSentinel()
*/
int redisxSetSentinelTimeout(Redis *redis, int millis) {
static const char *fn = "redisxSetSentinelTimeout";

RedisPrivate *p;
int status = X_SUCCESS;

prop_error(fn, rConfigLock(redis));
p = (RedisPrivate *) redis->priv;
if(p->sentinel) p->sentinel->timeoutMillis = millis > 0 ? millis : REDISX_DEFAULT_SENTINEL_TIMEOUT_MILLIS;
else status = x_error(X_NO_INIT, EAGAIN, fn, "Redis was not initialized for Sentinel");
rConfigUnlock(redis);

return status;
}

static void rDestroySentinel(RedisSentinel *sentinel) {
if(!sentinel) return;

Expand Down Expand Up @@ -1029,17 +1063,17 @@ int redisxSetPort(Redis *redis, int port) {
* or a negative value), then the timeout will not be configured for sockets, and the system default
* timeout values will apply.
*
* @param redis The Redis instance
* @param timeoutMillis [ms] The desired socket read/write timeout, or &lt;0 for socket default.
* @return X_SUCCESS (0) if successful, or else X_NULL if the redis instance is NULL,
* or X_N_INIT if the redis instance is not initialized.
* @param redis The Redis instance
* @param millis [ms] The desired socket read/write timeout, or &lt;0 for socket default.
* @return X_SUCCESS (0) if successful, or else X_NULL if the redis instance is NULL,
* or X_NO_INIT if the redis instance is not initialized.
*/
int redisxSetSocketTimeout(Redis *redis, int timeoutMillis) {
int redisxSetSocketTimeout(Redis *redis, int millis) {
RedisPrivate *p;

prop_error("redisxSetPort", rConfigLock(redis));
p = (RedisPrivate *) redis->priv;
p->timeoutMillis = timeoutMillis;
p->timeoutMillis = millis;
rConfigUnlock(redis);

return X_SUCCESS;
Expand Down
6 changes: 1 addition & 5 deletions src/redisx-script.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ int redisxLoadScript(Redis *redis, const char *script, char **sha1) {
*sha1 = NULL;

reply = redisxRequest(redis, "SCRIPT", "LOAD", script, NULL, &status);

if(!status) {
redisxDestroyRESP(reply);
return x_trace(fn, NULL, status);
}
if(status) return x_trace(fn, NULL, status);

prop_error(fn, redisxCheckDestroyRESP(reply, RESP_BULK_STRING, 0));

Expand Down
12 changes: 2 additions & 10 deletions src/redisx-tab.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ RedisEntry *redisxGetTable(Redis *redis, const char *table, int *n) {
}

reply = redisxRequest(redis, "HGETALL", table, NULL, NULL, n);

if(*n) {
redisxDestroyRESP(reply);
return x_trace_null(fn, NULL);
}
if(*n) return x_trace_null(fn, NULL);

// Cast RESP2 array respone to RESP3 map also...
if(reply && reply->type == RESP_ARRAY) {
Expand Down Expand Up @@ -415,11 +411,7 @@ char **redisxGetKeys(Redis *redis, const char *table, int *n) {
}

reply = redisxRequest(redis, table ? "HKEYS" : "KEYS", table ? table : "*", NULL, NULL, n);

if(*n) {
redisxDestroyRESP(reply);
return x_trace_null(fn, NULL);
}
if(*n) return x_trace_null(fn, NULL);

*n = redisxCheckDestroyRESP(reply, RESP_ARRAY, 0);
if(*n) return x_trace_null(fn, NULL);
Expand Down
23 changes: 8 additions & 15 deletions src/redisx.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,7 @@ int redisxGetTime(Redis *redis, struct timespec *t) {
memset(t, 0, sizeof(*t));

reply = redisxRequest(redis, "TIME", NULL, NULL, NULL, &status);
if(status) {
redisxDestroyRESP(reply);
return x_trace(fn, NULL, status);
}
if(status) return x_trace(fn, NULL, status);

status = redisxCheckDestroyRESP(reply, RESP_ARRAY, 2);
prop_error(fn, status);
Expand Down Expand Up @@ -338,18 +335,15 @@ int redisxPing(Redis *redis, const char *message) {
RESP *reply;

reply = redisxRequest(redis, "PING", message, NULL, NULL, &status);
prop_error(fn, status);

if(!status) {
if(!reply) status = x_error(X_NULL, errno, fn, "reply was NULL");
else {
status = redisxCheckRESP(reply, message ? RESP_BULK_STRING : RESP_SIMPLE_STRING, 0);
if(!status) if(strcmp(message ? message : "PONG", (char *)reply->value) != 0)
status = x_error(REDIS_UNEXPECTED_RESP, ENOMSG, fn, "expected 'PONG', got '%s'", (char *)reply->value);
}
}
if(!reply) return x_error(X_NULL, errno, fn, "reply was NULL");

status = redisxCheckRESP(reply, message ? RESP_BULK_STRING : RESP_SIMPLE_STRING, 0);
if(!status) if(strcmp(message ? message : "PONG", (char *)reply->value) != 0)
status = x_error(REDIS_UNEXPECTED_RESP, ENOMSG, fn, "expected 'PONG', got '%s'", (char *)reply->value);

redisxDestroyRESP(reply);
prop_error(fn, status);

return X_SUCCESS;
}
Expand Down Expand Up @@ -554,7 +548,7 @@ int redisxSetPipelineConsumer(Redis *redis, RedisPipelineProcessor f) {
* the error code set by redisxArrayRequest().
*
* \return A freshly allocated RESP array containing the Redis response, or NULL if no valid
* response could be obtained.
* response could be obtained or status is not X_SUCCESS.
*
* @sa redisxArrayRequest()
* @sa redisxSendRequestAsync()
Expand All @@ -572,7 +566,6 @@ RESP *redisxRequest(Redis *redis, const char *command, const char *arg1, const c
else n = 4;

reply = redisxArrayRequest(redis, (char **) args, NULL, n, &s);

if(status) *status = s;

if(s) {
Expand Down

0 comments on commit 6262833

Please sign in to comment.