Skip to content

Commit

Permalink
Initial implementation of Sentinel support
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Dec 11, 2024
1 parent 2a732e5 commit e0845ad
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 2 deletions.
7 changes: 7 additions & 0 deletions include/redisx-priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,15 @@ typedef struct {
RESP *attributes; ///< Attributes from the last packet received.
} ClientPrivate;

typedef struct {
RedisServer *servers; ///< List of sentinel servers.
int nServers; ///< number of servers in list
char *serviceName; ///< Name of service (for Sentinel).
} RedisSentinel;


typedef struct {
RedisSentinel *sentinel; ///< Sentinel (high-availability) server configuration.
uint32_t addr; ///< The 32-bit inet address
int port; ///< port number (usually 6379)
int dbIndex; ///< the zero-based database index
Expand Down
10 changes: 10 additions & 0 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ typedef struct RedisEntry {
int length; ///< Bytes in value.
} RedisEntry;

/**
* Redis server host and port specification.
*
* @sa redisxInitSentinel()
*/
typedef struct RedisServer {
char *host; ///< The hostname or IP address of the server
int port; ///< The port number or &lt;=0 to use the default 6379
} RedisServer;

/**
* \brief Structure that represents a single Redis client connection instance.
Expand Down Expand Up @@ -345,6 +354,7 @@ enum redisx_protocol redisxGetProtocol(Redis *redis);
RESP *redisxGetHelloData(Redis *redis);

Redis *redisxInit(const char *server);
Redis *redisxInitSentinel(const char *serviceName, const RedisServer *serverList, int nServers);
int redisxCheckValid(const Redis *redis);
void redisxDestroy(Redis *redis);
int redisxConnect(Redis *redis, boolean usePipeline);
Expand Down
224 changes: 222 additions & 2 deletions src/redisx-net.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "redisx-priv.h"

static int rStartPipelineListenerAsync(Redis *redis);
static void rDisconnectClientAsync(RedisClient *cl);
static int rReconnectAsync(Redis *redis, boolean usePipeline);

/// \cond PRIVATE
///
Expand Down Expand Up @@ -178,6 +180,132 @@ static int rAuthAsync(RedisClient *cl) {
return X_SUCCESS;
}

static int rTryConnectSentinel(Redis *redis, int serverIndex) {
static const char *fn = "rTryConnectSentinel";

RedisPrivate *p = (RedisPrivate *) redis->priv;
RedisSentinel *s = p->sentinel;
RedisServer server = s->servers[serverIndex]; // A copy, not a reference...
char ipAddress[IP_ADDRESS_LENGTH] = {'\0'};
int status;

if(!server.host) return x_error(X_NULL, EINVAL, fn, "sentinel server %d address is NULL", serverIndex);
if(!server.host[0]) return x_error(X_NAME_INVALID, EINVAL, fn, "sentinel server %d name is empty", serverIndex);

status = hostnameToIP(server.host, ipAddress);
if(status) return status;

p->addr = inet_addr((char *) ipAddress);
p->port = server.port;

if(redis->id) free(redis->id);
redis->id = xStringCopyOf(ipAddress);

status = rConnectClient(redis, REDISX_INTERACTIVE_CHANNEL);
if(status != X_SUCCESS) return status;

// Move server to the top of the list, so next time we try this one first...
memmove(&s->servers[1], s->servers, serverIndex * sizeof(RedisServer));
s->servers[0] = server;

return X_SUCCESS;
}


static int rDiscoverSentinel(Redis *redis) {
static const char *fn = "rConnectSentinel";

RedisPrivate *p = (RedisPrivate *) redis->priv;
const RedisSentinel *s = p->sentinel;
int i;


for(i = 0; i < s->nServers; i++) if(rTryConnectSentinel(redis, i) == X_SUCCESS) {
RESP *reply;
int status;

// Get the name of the master...
reply = redisxRequest(redis, "SENTINEL", "get-master-addr-by-name", s->serviceName, NULL, &status);
if(status) {
redisxDestroyRESP(reply);
continue;
}

rDisconnectClientAsync(redis->interactive);

if(redisxCheckDestroyRESP(reply, RESP_ARRAY, 2) == X_SUCCESS) {
char ipAddress[IP_ADDRESS_LENGTH] = {'\0'};
RESP **component = (RESP **) reply->value;

status = hostnameToIP((char *) component[0]->value, ipAddress);

if(status < 0) {
redisxDestroyRESP(reply);
return x_trace(fn, NULL, status);
}

p->addr = inet_addr((char *) ipAddress);
p->port = (int) strtol((char *) component[1]->value, NULL, 10);

rDisconnectClientAsync(redis->interactive);

return X_SUCCESS;
}
}

return x_error(X_NO_SERVICE, ENOTCONN, fn, "no Sentinel server available");
}

static int rConfirmMasterRole(Redis *redis) {
static const char *fn = "rConfirmMasterRole";

RESP *reply, **component;
int status;

reply = redisxRequest(redis, "ROLE", NULL, NULL, NULL, &status);

if(status) {
redisxDestroyRESP(reply);
return x_trace(fn, NULL, status);
}

if(redisxCheckDestroyRESP(reply, RESP_ARRAY, 0)) {
char *str;
reply = redisxRequest(redis, "INFO", "replication", NULL, NULL, &status);
prop_error(fn, redisxCheckDestroyRESP(reply, RESP_BULK_STRING, 0));

str = strtok((char *) reply->value, "\n");

while(str) {
const char *tok = strtok(str, ":");
if(strcmp("role", tok) == 0) {
const char *role = strtok(NULL, "\n");
status = strcmp("master", role) == 0;
redisxDestroyRESP(reply);

if(status) return x_error(X_FAILURE, EAGAIN, fn, "Replica is not master");
return X_SUCCESS;
}
str = strtok(NULL, "\n");
}

return x_error(X_FAILURE, EBADE, fn, "Got empty array response");
}

component = (RESP **) reply->value;

if(reply->n < 1) {
redisxDestroyRESP(reply);
return x_error(X_FAILURE, EBADE, fn, "Got empty array response");
}

status = strcmp("master", (char *) component[0]->value) == 0;
redisxDestroyRESP(reply);

if(status) return x_error(X_FAILURE, EAGAIN, fn, "Replica is not master");
return X_SUCCESS;
}

/**
* Same as connectRedis() except without the exlusive locking mechanism...
*
Expand Down Expand Up @@ -207,6 +335,11 @@ static int rConnectAsync(Redis *redis, boolean usePipeline) {
return X_ALREADY_OPEN;
}

if(p->sentinel) {
prop_error(fn, rDiscoverSentinel(redis));
// TODO update sentinel server list...
}

if(!ip->isEnabled) {
static int warnedInteractive;

Expand All @@ -223,6 +356,10 @@ static int rConnectAsync(Redis *redis, boolean usePipeline) {
warnedInteractive = FALSE;
}

if(p->sentinel) {
if(rConfirmMasterRole(redis) != X_SUCCESS) prop_error(fn, rReconnectAsync(redis, usePipeline));
}

if(usePipeline) {
if(!pp->isEnabled) {
static int warnedPipeline;
Expand Down Expand Up @@ -578,7 +715,7 @@ int rConnectClient(Redis *redis, enum redisx_channel channel) {
cp = (ClientPrivate *) cl->priv;

serverAddress.sin_family = AF_INET;
serverAddress.sin_port = htons(REDISX_TCP_PORT);
serverAddress.sin_port = htons(p->port > 0 ? p->port : REDISX_TCP_PORT);
serverAddress.sin_addr.s_addr = p->addr;
memset(serverAddress.sin_zero, '\0', sizeof(serverAddress.sin_zero));

Expand Down Expand Up @@ -649,6 +786,8 @@ int rConnectClient(Redis *redis, enum redisx_channel channel) {
* \return X_SUCCESS or
* X_FAILURE if the IP address is invalid.
* X_NULL if the IP address is NULL.
*
* @sa redisxInitSentinel()
*/
Redis *redisxInit(const char *server) {
static const char *fn = "redisxInit";
Expand Down Expand Up @@ -715,6 +854,77 @@ Redis *redisxInit(const char *server) {
return redis;
}

/**
* Initializes a Redis client with a Sentinel configuration of alternate servers
*
* @param serviceName The service name as registered in the Sentinel server configuration.
* @param serverList An set of Sentinel servers to use to dynamically find the current master. A
* copy of the supplied name will be used, so the argument passed can be
* freely destroyed after the call.
* @param nServers The number of servers in the list
* @return X_SUCCESS (0) if successful, or else an error code &lt;0.
*
* @sa redisxInit()
* @sa redisxConnect()
*/
Redis *redisxInitSentinel(const char *serviceName, const RedisServer *serverList, int nServers) {
static const char *fn = "redisxInitSentinel";

Redis *redis;
RedisPrivate *p;
RedisSentinel *s;

if(!serverList) {
x_error(0, EINVAL, fn, "input serverList is NULL");
return NULL;
}
if(nServers < 1) {
x_error(0, EINVAL, fn, "invalid nServers: %d", nServers);
return NULL;
}

if(serverList[0].host == NULL) {
x_error(0, EINVAL, fn, "first server address is NULL");
return NULL;
}
if(!serverList[0].host[0]) {
x_error(0, EINVAL, fn, "first server address is empty");
return NULL;
}

redis = redisxInit(serverList[0].host);
if(!redis) return x_trace_null(fn, NULL);

p = (RedisPrivate *) redis->priv;
s = (RedisSentinel *) calloc(1, sizeof(RedisSentinel));
x_check_alloc(s);

s->servers = (RedisServer *) calloc(nServers, sizeof(RedisServer));
if(!s->servers) {
x_error(0, errno, fn, "alloc error (%d RedisServer)", nServers);
free(s);
return NULL;
}
memcpy(s->servers, serverList, nServers * sizeof(RedisServer));

s->nServers = nServers;
s->serviceName = xStringCopyOf(serviceName);

p->sentinel = s;

return redis;
}

static void rDestroySentinel(RedisSentinel *sentinel) {
if(!sentinel) return;

while(--sentinel->nServers >= 0) {
RedisServer *server = &sentinel->servers[sentinel->nServers];
if(server->host) free(server->host);
}
if(sentinel->serviceName) free(sentinel->serviceName);
}

/**
* Destroys a Redis intance, disconnecting any clients that may be connected, and freeing all resources
* used by that Redis instance.
Expand All @@ -734,16 +944,26 @@ void redisxDestroy(Redis *redis) {

for(i = REDISX_CHANNELS; --i >= 0; ) {
ClientPrivate *cp = (ClientPrivate *) p->clients[i].priv;
if(!cp) continue;

redisxDestroyRESP(cp->attributes);

pthread_mutex_destroy(&cp->readLock);
pthread_mutex_destroy(&cp->writeLock);
pthread_mutex_destroy(&cp->pendingLock);
if(cp != NULL) free(cp);

free(cp);
}

redisxDestroyRESP(p->helloData);
rDestroySentinel(p->sentinel);
free(p->clients);
free(p);

rUnregisterServer(redis);

if(redis->id) free(redis->id);

free(redis);
}

Expand Down

0 comments on commit e0845ad

Please sign in to comment.