diff --git a/include/redisx-priv.h b/include/redisx-priv.h index 1c71977..3bedfc9 100644 --- a/include/redisx-priv.h +++ b/include/redisx-priv.h @@ -53,8 +53,15 @@ typedef struct { RESP *attributes; ///< Attributes from the last packet received. } ClientPrivate; +typedef struct { + RedisServer *servers; ///< List of sentinel servers. + int nServers; ///< number of servers in list + char *serviceName; ///< Name of service (for Sentinel). +} RedisSentinel; + typedef struct { + RedisSentinel *sentinel; ///< Sentinel (high-availability) server configuration. uint32_t addr; ///< The 32-bit inet address int port; ///< port number (usually 6379) int dbIndex; ///< the zero-based database index diff --git a/include/redisx.h b/include/redisx.h index 7488d4e..d9c8680 100644 --- a/include/redisx.h +++ b/include/redisx.h @@ -195,6 +195,15 @@ typedef struct RedisEntry { int length; ///< Bytes in value. } RedisEntry; +/** + * Redis server host and port specification. + * + * @sa redisxInitSentinel() + */ +typedef struct RedisServer { + char *host; ///< The hostname or IP address of the server + int port; ///< The port number or <=0 to use the default 6379 +} RedisServer; /** * \brief Structure that represents a single Redis client connection instance. @@ -345,6 +354,7 @@ enum redisx_protocol redisxGetProtocol(Redis *redis); RESP *redisxGetHelloData(Redis *redis); Redis *redisxInit(const char *server); +Redis *redisxInitSentinel(const char *serviceName, const RedisServer *serverList, int nServers); int redisxCheckValid(const Redis *redis); void redisxDestroy(Redis *redis); int redisxConnect(Redis *redis, boolean usePipeline); diff --git a/src/redisx-net.c b/src/redisx-net.c index 25d1fca..0b479ef 100644 --- a/src/redisx-net.c +++ b/src/redisx-net.c @@ -29,6 +29,8 @@ #include "redisx-priv.h" static int rStartPipelineListenerAsync(Redis *redis); +static void rDisconnectClientAsync(RedisClient *cl); +static int rReconnectAsync(Redis *redis, boolean usePipeline); /// \cond PRIVATE /// @@ -85,6 +87,28 @@ static int hostnameToIP(const char *hostName, char *ip) { return x_error(X_NULL, ENODEV, fn, "no valid address for host %s", hostName); } +static int rSetServerAsync(Redis *redis, const char *desc, const char *hostname, int port) { + static const char *fn = "rSetServer"; + + RedisPrivate *p = (RedisPrivate *) redis; + char ipAddress[IP_ADDRESS_LENGTH] = {'\0'}; + int status; + + if(!hostname) return x_error(X_NULL, EINVAL, fn, "%s address is NULL", desc); + if(!hostname[0]) return x_error(X_NAME_INVALID, EINVAL, fn, "%s name is empty", desc); + + status = hostnameToIP(hostname, ipAddress); + if(status) return x_trace(fn, desc, status); + + p->addr = inet_addr((char *) ipAddress); + p->port = port > 0 ? port : 0; + + if(redis->id) free(redis->id); + redis->id = xStringCopyOf(ipAddress); + + return X_SUCCESS; +} + /** * Configure the Redis client sockets for optimal performance... * @@ -178,6 +202,128 @@ static int rAuthAsync(RedisClient *cl) { return X_SUCCESS; } +static int rRegisterServer(Redis *redis) { + ServerLink *l = (ServerLink *) calloc(1, sizeof(ServerLink)); + x_check_alloc(l); + l->redis = redis; + + pthread_mutex_lock(&serverLock); + l->next = serverList; + serverList = l; + pthread_mutex_unlock(&serverLock); + + return X_SUCCESS; +} + +static int rTryConnectSentinel(Redis *redis, int serverIndex) { + static const char *fn = "rTryConnectSentinel"; + + RedisPrivate *p = (RedisPrivate *) redis->priv; + RedisSentinel *s = p->sentinel; + RedisServer server = s->servers[serverIndex]; // A copy, not a reference... + char desc[80]; + int status; + + sprintf(desc, "sentinel server %d", serverIndex); + prop_error(fn, rSetServerAsync(redis, desc, server.host, server.port)); + + status = rConnectClient(redis, REDISX_INTERACTIVE_CHANNEL); + if(status != X_SUCCESS) return status; // No error propagation. It's OK if server is down... + + // Move server to the top of the list, so next time we try this one first... + memmove(&s->servers[1], s->servers, serverIndex * sizeof(RedisServer)); + s->servers[0] = server; + + return X_SUCCESS; +} + + +static int rDiscoverSentinel(Redis *redis) { + static const char *fn = "rConnectSentinel"; + + const RedisPrivate *p = (RedisPrivate *) redis->priv; + const RedisSentinel *s = p->sentinel; + int i; + + + for(i = 0; i < s->nServers; i++) if(rTryConnectSentinel(redis, i) == X_SUCCESS) { + RESP *reply; + int status; + + // Get the name of the master... + reply = redisxRequest(redis, "SENTINEL", "get-master-addr-by-name", s->serviceName, NULL, &status); + if(status) continue; + + rDisconnectClientAsync(redis->interactive); + + if(redisxCheckDestroyRESP(reply, RESP_ARRAY, 2) == X_SUCCESS) { + RESP **component = (RESP **) reply->value; + int port = (int) strtol((char *) component[1]->value, NULL, 10); + + status = rSetServerAsync(redis, "sentinel master", (char *) component[0]->value, port); + redisxDestroyRESP(reply); + + prop_error(fn, status); + return X_SUCCESS; + } + } + + return x_error(X_NO_SERVICE, ENOTCONN, fn, "no Sentinel server available"); +} + +static int rConfirmMasterRole(Redis *redis) { + static const char *fn = "rConfirmMasterRole"; + + RESP *reply, **component; + int status; + + // Try ROLE command first (available since Redis 4) + reply = redisxRequest(redis, "ROLE", NULL, NULL, NULL, &status); + prop_error(fn, status); + + if(redisxCheckDestroyRESP(reply, RESP_ARRAY, 0) != X_SUCCESS) { + // Fallback to using INFO replication... + char *str; + + reply = redisxRequest(redis, "INFO", "replication", NULL, NULL, &status); + prop_error(fn, status); + prop_error(fn, redisxCheckDestroyRESP(reply, RESP_BULK_STRING, 0)); + + str = strtok((char *) reply->value, "\n"); + + while(str) { + const char *tok = strtok(str, ":"); + + if(strcmp("role", tok) == 0) { + const char *role = strtok(NULL, "\n"); + + status = strcmp("master", role) == 0; + redisxDestroyRESP(reply); + + if(status) return x_error(X_FAILURE, EAGAIN, fn, "Replica is not master"); + return X_SUCCESS; + } + + str = strtok(NULL, "\n"); + } + + return x_error(X_FAILURE, EBADE, fn, "Got empty array response"); + } + + component = (RESP **) reply->value; + + if(reply->n < 1) { + redisxDestroyRESP(reply); + return x_error(X_FAILURE, EBADE, fn, "Got empty array response"); + } + + status = strcmp("master", (char *) component[0]->value) == 0; + redisxDestroyRESP(reply); + + if(status) return x_error(X_FAILURE, EAGAIN, fn, "Replica is not master"); + return X_SUCCESS; +} + /** * Same as connectRedis() except without the exlusive locking mechanism... * @@ -207,6 +353,11 @@ static int rConnectAsync(Redis *redis, boolean usePipeline) { return X_ALREADY_OPEN; } + if(p->sentinel) { + prop_error(fn, rDiscoverSentinel(redis)); + // TODO update sentinel server list... + } + if(!ip->isEnabled) { static int warnedInteractive; @@ -223,6 +374,10 @@ static int rConnectAsync(Redis *redis, boolean usePipeline) { warnedInteractive = FALSE; } + if(p->sentinel) { + if(rConfirmMasterRole(redis) != X_SUCCESS) prop_error(fn, rReconnectAsync(redis, usePipeline)); + } + if(usePipeline) { if(!pp->isEnabled) { static int warnedPipeline; @@ -578,7 +733,7 @@ int rConnectClient(Redis *redis, enum redisx_channel channel) { cp = (ClientPrivate *) cl->priv; serverAddress.sin_family = AF_INET; - serverAddress.sin_port = htons(REDISX_TCP_PORT); + serverAddress.sin_port = htons(p->port > 0 ? p->port : REDISX_TCP_PORT); serverAddress.sin_addr.s_addr = p->addr; memset(serverAddress.sin_zero, '\0', sizeof(serverAddress.sin_zero)); @@ -649,6 +804,8 @@ int rConnectClient(Redis *redis, enum redisx_channel channel) { * \return X_SUCCESS or * X_FAILURE if the IP address is invalid. * X_NULL if the IP address is NULL. + * + * @sa redisxInitSentinel() */ Redis *redisxInit(const char *server) { static const char *fn = "redisxInit"; @@ -656,65 +813,135 @@ Redis *redisxInit(const char *server) { Redis *redis; RedisPrivate *p; - ServerLink *l; int i; - char ipAddress[IP_ADDRESS_LENGTH]; if(server == NULL) { x_error(0, EINVAL, fn, "server name is NULL"); return NULL; } - if(hostnameToIP(server, ipAddress) < 0) return x_trace_null(fn, NULL); - if(!isInitialized) { // Initialize the thread attributes once only to avoid segfaulting... atexit(rShutdownAsync); isInitialized = TRUE; } + // Allocate Redis, including private data... p = (RedisPrivate *) calloc(1, sizeof(RedisPrivate)); x_check_alloc(p); + redis = (Redis *) calloc(1, sizeof(Redis)); + x_check_alloc(redis); + + redis->priv = p; + + // Try set server... + i = rSetServerAsync(redis, "server", server, 0); + if(i) { + free(redis->priv); + free(redis); + return x_trace_null(fn, NULL); + } + + // Initialize mutexes pthread_mutex_init(&p->configLock, NULL); pthread_mutex_init(&p->subscriberLock, NULL); + + p->protocol = REDISX_RESP2; // Default + p->timeoutMillis = REDISX_DEFAULT_TIMEOUT_MILLIS; + + // Create clients... p->clients = (RedisClient *) calloc(3, sizeof(RedisClient)); x_check_alloc(p->clients); - // Initialize the store access mutexes for each client channel. - for(i = REDISX_CHANNELS; --i >= 0; ) rInitClient(&p->clients[i], i); - - redis = (Redis *) calloc(1, sizeof(Redis)); - x_check_alloc(redis); + // Initialize clients. + for(i = REDISX_CHANNELS; --i >= 0; ) { + ClientPrivate *cp = (ClientPrivate *) p->clients[i].priv; + rInitClient(&p->clients[i], i); + cp->redis = redis; + } - redis->priv = p; + // Alias clients redis->interactive = &p->clients[REDISX_INTERACTIVE_CHANNEL]; redis->pipeline = &p->clients[REDISX_PIPELINE_CHANNEL]; redis->subscription = &p->clients[REDISX_SUBSCRIPTION_CHANNEL]; - redis->id = xStringCopyOf(ipAddress); - for(i = REDISX_CHANNELS; --i >= 0; ) { - ClientPrivate *cp = (ClientPrivate *) p->clients[i].priv; - cp->redis = redis; + rRegisterServer(redis); + + return redis; +} + +/** + * Initializes a Redis client with a Sentinel configuration of alternate servers + * + * @param serviceName The service name as registered in the Sentinel server configuration. + * @param serverList An set of Sentinel servers to use to dynamically find the current master. A + * copy of the supplied name will be used, so the argument passed can be + * freely destroyed after the call. + * @param nServers The number of servers in the list + * @return X_SUCCESS (0) if successful, or else an error code <0. + * + * @sa redisxInit() + * @sa redisxConnect() + */ +Redis *redisxInitSentinel(const char *serviceName, const RedisServer *serverList, int nServers) { + static const char *fn = "redisxInitSentinel"; + + Redis *redis; + RedisPrivate *p; + RedisSentinel *s; + + if(!serverList) { + x_error(0, EINVAL, fn, "input serverList is NULL"); + return NULL; + } + if(nServers < 1) { + x_error(0, EINVAL, fn, "invalid nServers: %d", nServers); + return NULL; } - p->addr = inet_addr((char *) ipAddress); - p->port = p->port > 0 ? p->port : REDISX_TCP_PORT; - p->protocol = REDISX_RESP2; // Default - p->timeoutMillis = REDISX_DEFAULT_TIMEOUT_MILLIS; + if(serverList[0].host == NULL) { + x_error(0, EINVAL, fn, "first server address is NULL"); + return NULL; + } + if(!serverList[0].host[0]) { + x_error(0, EINVAL, fn, "first server address is empty"); + return NULL; + } - l = (ServerLink *) calloc(1, sizeof(ServerLink)); - x_check_alloc(l); - l->redis = redis; + redis = redisxInit(serverList[0].host); + if(!redis) return x_trace_null(fn, NULL); - pthread_mutex_lock(&serverLock); - l->next = serverList; - serverList = l; - pthread_mutex_unlock(&serverLock); + p = (RedisPrivate *) redis->priv; + s = (RedisSentinel *) calloc(1, sizeof(RedisSentinel)); + x_check_alloc(s); + + s->servers = (RedisServer *) calloc(nServers, sizeof(RedisServer)); + if(!s->servers) { + x_error(0, errno, fn, "alloc error (%d RedisServer)", nServers); + free(s); + return NULL; + } + memcpy(s->servers, serverList, nServers * sizeof(RedisServer)); + + s->nServers = nServers; + s->serviceName = xStringCopyOf(serviceName); + + p->sentinel = s; return redis; } +static void rDestroySentinel(RedisSentinel *sentinel) { + if(!sentinel) return; + + while(--sentinel->nServers >= 0) { + RedisServer *server = &sentinel->servers[sentinel->nServers]; + if(server->host) free(server->host); + } + if(sentinel->serviceName) free(sentinel->serviceName); +} + /** * Destroys a Redis intance, disconnecting any clients that may be connected, and freeing all resources * used by that Redis instance. @@ -736,14 +963,16 @@ void redisxDestroy(Redis *redis) { ClientPrivate *cp = (ClientPrivate *) p->clients[i].priv; if(!cp) continue; + redisxDestroyRESP(cp->attributes); + pthread_mutex_destroy(&cp->readLock); pthread_mutex_destroy(&cp->writeLock); pthread_mutex_destroy(&cp->pendingLock); - redisxDestroyRESP(cp->attributes); free(cp); } + rDestroySentinel(p->sentinel); redisxDestroyRESP(p->helloData); redisxClearConnectHooks(redis); redisxClearSubscribers(redis); diff --git a/src/redisx.c b/src/redisx.c index 97a827c..8705e33 100644 --- a/src/redisx.c +++ b/src/redisx.c @@ -574,7 +574,11 @@ RESP *redisxRequest(Redis *redis, const char *command, const char *arg1, const c reply = redisxArrayRequest(redis, (char **) args, NULL, n, &s); if(status) *status = s; - if(s) x_trace_null("redisxRequest", NULL); + + if(s) { + redisxDestroyRESP(reply); + return x_trace_null("redisxRequest", NULL); + } return reply; }