Skip to content

Commit

Permalink
Sentinel and cluster deadlock fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Jan 12, 2025
1 parent 1c30f1c commit 185dd83
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 45 deletions.
4 changes: 3 additions & 1 deletion include/redisx-priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ int rConfigLock(Redis *redis);
int rConfigUnlock(Redis *redis);

// in redisx-net.c ------------------------>
int rConnectClient(Redis *redis, enum redisx_channel channel);
int rConnectAsync(Redis *redis, boolean usePipeline);
int rConnectClientAsync(Redis *redis, enum redisx_channel channel);
void rCloseClient(RedisClient *cl);
void rCloseClientAsync(RedisClient *cl);
boolean rIsLowLatency(const ClientPrivate *cp);
int rCheckClient(const RedisClient *cl);
int rSetServerAsync(Redis *redis, const char *desc, const char *hostname, int port);
void rDisconnectAsync(Redis *redis);

// in redisx-hooks.c ---------------------->
Hook *rCopyHooks(const Hook *list, Redis *owner);
Expand Down
51 changes: 34 additions & 17 deletions src/redisx-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ static void rDiscardShardsAsync(RedisShard *shards, int n_shards) {
}

/**
* Returns the current cluster configuration obtained from the specified node
* Returns the current cluster configuration obtained from the specified node. The caller must have
* an exclusive lock on the Redis configuration mutex.
*
* @param redis The node to use for discovery. It need not be in a connected state.
* @param[out] n_shards Pointer to integer in which to return the number of shards discovered
Expand All @@ -156,20 +157,27 @@ static void rDiscardShardsAsync(RedisShard *shards, int n_shards) {
static RedisShard *rClusterDiscoverAsync(Redis *redis, int *n_shards) {
static const char *fn = "rClusterDiscoverAsync";

RESP *reply;
RESP *reply = NULL;
RedisShard *shards = NULL;
int isConnected;

isConnected = redisxIsConnected(redis);

if(!isConnected) {
*n_shards = redisxConnect(redis, FALSE);
*n_shards = rConnectClientAsync(redis, REDISX_INTERACTIVE_CHANNEL);
if(*n_shards) return x_trace_null(fn, NULL);
}


*n_shards = redisxLockConnected(redis->interactive);
if(*n_shards) return x_trace_null(fn, NULL);

xvprintf("Redis-X> Discovering cluster configuration...\n");

reply = redisxRequest(redis, "CLUSTER", "SLOTS", NULL, NULL, n_shards);
*n_shards = redisxSendRequestAsync(redis->interactive, "CLUSTER", "SLOTS", NULL, NULL);
if(*n_shards == X_SUCCESS) reply = redisxReadReplyAsync(redis->interactive, n_shards);
redisxUnlockClient(redis->interactive);

if(*n_shards) {
redisxDestroyRESP(reply);
return x_trace_null(fn, NULL);
Expand Down Expand Up @@ -206,20 +214,20 @@ static RedisShard *rClusterDiscoverAsync(Redis *redis, int *n_shards) {
}

for(m = 0; m < s->n_servers; s++) {
Redis *r;
RESP **node = (RESP **) desc[2]->value;
Redis *r = s->redis[m] = redisxInit((char *) node[0]->value);
RedisPrivate *p = (RedisPrivate *) r->priv;

r = s->redis[m] = redisxInit((char *) node[0]->value);
redisxSetPort(s->redis[m], node[1]->n);
rCopyConfig(&((RedisPrivate *) redis->priv)->config, r);
redisxSelectDB(r, 0); // Only DB 0 is allowed for clusters.
p->port = node[1]->n;
rCopyConfig(&p->config, r);
p->config.dbIndex = 0; // Only DB 0 is allowed for clusters.
}
}
}

redisxDestroyRESP(reply);

if(!isConnected) redisxDisconnect(redis);
if(!isConnected) rDisconnectAsync(redis);

if(*n_shards < 0) x_trace(fn, NULL, *n_shards);

Expand All @@ -229,7 +237,8 @@ static RedisShard *rClusterDiscoverAsync(Redis *redis, int *n_shards) {
/**
* Sets a new set of shards for a cluster. All servers in the shards will have the cluster registered
* as a parent, so they may all initiate reconfiguration if the hashes have `MOVED`. Normally this
* should be called after rClusterDiscoverAsync()
* should be called after rClusterDiscoverAsync(). The caller must have an exclusive lock on the Redis
* configuration mutex.
*
* @param cluster Pointer to a Redis cluster configuration
* @param shard New array of shards to set
Expand All @@ -250,11 +259,8 @@ static void rClusterSetShardsAsync(RedisCluster *cluster, RedisShard *shard, int
int m;
for(m = 0; m < s->n_servers; m++) {
Redis *r = s->redis[m];
RedisPrivate *np;
if(rConfigLock(r) != X_SUCCESS) continue;
np = (RedisPrivate *) r->priv;
RedisPrivate *np = (RedisPrivate *) r->priv;
np->cluster = cluster;
rConfigUnlock(r);
}
}

Expand Down Expand Up @@ -402,7 +408,13 @@ Redis *redisxClusterGetShard(RedisCluster *cluster, const char *key) {

for(m = 0; m < s->n_servers; m++) {
Redis *r = s->redis[m];
if(!redisxIsConnected(r)) if(redisxConnect(r, p->usePipeline) != X_SUCCESS) continue;

if(rConfigLock(r) != X_SUCCESS) continue;
if(!redisxIsConnected(r)) if(rConnectAsync(r, p->usePipeline) != X_SUCCESS) {
rConfigUnlock(r);
continue;
}
rConfigUnlock(r);
pthread_mutex_unlock(&p->mutex);
return r;
}
Expand Down Expand Up @@ -471,7 +483,12 @@ static Redis *rClusterGetShardByAddress(RedisCluster *cluster, const char *host,
const RedisPrivate *np = (RedisPrivate *) r->priv;

if(np->port == port && strcmp(np->hostname, host) == 0) {
if(!redisxIsConnected(r)) if(redisxConnect(r, p->usePipeline) != X_SUCCESS) continue;
if(rConfigLock(r) != X_SUCCESS) continue;
if(!redisxIsConnected(r)) if(rConnectAsync(r, p->usePipeline) != X_SUCCESS) {
rConfigUnlock(r);
continue;
}
rConfigUnlock(r);
pthread_mutex_unlock(&p->mutex);
return r;
}
Expand Down
30 changes: 17 additions & 13 deletions src/redisx-net.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

/// \cond PRIVATE
extern int rDiscoverSentinelAsync(Redis *redis);
extern int rConfirmMasterRole(Redis *redis);
extern int rConfirmMasterRoleAsync(Redis *redis);
extern void rDestroySentinel(RedisSentinel *sentinel);
/// \endcond

Expand Down Expand Up @@ -237,7 +237,7 @@ static int rRegisterServer(Redis *redis) {
}

/**
* Same as connectRedis() except without the exlusive locking mechanism...
* Same as rConnectClient() but called with the client's mutex already locked.
*
* \param redis Pointer to a Redis instance.
* \param usePipeline TRUE (non-zero) if a pipeline client should be connected also, or FALSE to create an interactive
Expand All @@ -248,10 +248,10 @@ static int rRegisterServer(Redis *redis) {
* X_NO_SERVICE if there was an error connecting to Redis,
* or else an error (&lt;0) returned by rConnectClientAsync().
*
* \sa rConnectClientAsync()
* \sa rConnectClient()
*
*/
static int rConnectAsync(Redis *redis, boolean usePipeline) {
int rConnectAsync(Redis *redis, boolean usePipeline) {
static const char *fn = "rConnectAsync";

int status = X_SUCCESS;
Expand All @@ -271,7 +271,7 @@ static int rConnectAsync(Redis *redis, boolean usePipeline) {
static int warnedInteractive;

xvprintf("Redis-X> Connect interactive client.\n");
status = rConnectClient(redis, REDISX_INTERACTIVE_CHANNEL);
status = rConnectClientAsync(redis, REDISX_INTERACTIVE_CHANNEL);

if(status) {
if(!warnedInteractive) {
Expand All @@ -284,15 +284,15 @@ static int rConnectAsync(Redis *redis, boolean usePipeline) {
}

if(p->sentinel) {
if(rConfirmMasterRole(redis) != X_SUCCESS) prop_error(fn, rReconnectAsync(redis, usePipeline));
if(rConfirmMasterRoleAsync(redis) != X_SUCCESS) prop_error(fn, rReconnectAsync(redis, usePipeline));
}

if(usePipeline) {
if(!pp->isEnabled) {
static int warnedPipeline;

xvprintf("Redis-X> Connect pipeline client.\n");
status = rConnectClient(redis, REDISX_PIPELINE_CHANNEL);
status = rConnectClientAsync(redis, REDISX_PIPELINE_CHANNEL);

if(status) {
if(!warnedPipeline) {
Expand Down Expand Up @@ -395,12 +395,13 @@ void rCloseClient(RedisClient *cl) {
/// \endcond

/**
* Same as disconnectRedis() except without the exlusive locking mechanism...
* Same as rCloseClient() except without the exlusive locking mechanism of the client's
* IO.
*
* \param redis Pointer to a Redis instance.
*
*/
static void rDisconnectAsync(Redis *redis) {
void rDisconnectAsync(Redis *redis) {
RedisPrivate *p = (RedisPrivate *) redis->priv;
Hook *f;

Expand Down Expand Up @@ -605,7 +606,8 @@ static int rHelloAsync(RedisClient *cl, char *clientID) {


/**
* Connects the specified Redis client to the Redis server.
* Connects the specified Redis client to the Redis server. It should be called with the the configuration
* mutex of the Redis instance locked.
*
* \param redis Pointer to a Redis instance.
* \param channel REDISX_INTERACTIVE_CHANNEL, REDISX_PIPELINE_CHANNEL, or REDISX_SUBSCRIPTION_CHANNEL
Expand All @@ -617,12 +619,14 @@ static int rHelloAsync(RedisClient *cl, char *clientID) {
* X_NAME_INVALID if the redis server address is invalid.
* X_ALREADY_OPEN if the client on that channels is already connected.
* X_NO_SERVICE if the socket or connection could not be opened.
*
* @sa rConfigLock()
*/
int rConnectClient(Redis *redis, enum redisx_channel channel) {
int rConnectClientAsync(Redis *redis, enum redisx_channel channel) {
static const char *fn = "rConnectClient";

#if WITH_TLS
extern int rConnectTLSClient(ClientPrivate *cp, const TLSConfig *tls);
extern int rConnectTLSClientAsync(ClientPrivate *cp, const TLSConfig *tls);
#endif

struct sockaddr_in serverAddress;
Expand Down Expand Up @@ -661,7 +665,7 @@ int rConnectClient(Redis *redis, enum redisx_channel channel) {
}

#if WITH_TLS
if(config->tls.enabled && rConnectTLSClient(cp, &config->tls) != X_SUCCESS) {
if(config->tls.enabled && rConnectTLSClientAsync(cp, &config->tls) != X_SUCCESS) {
close(sock);
return x_error(X_NO_INIT, errno, fn, "failed to connect (with TLS) to %s:%hu: %s", redis->id, port, strerror(errno));
}
Expand Down
39 changes: 27 additions & 12 deletions src/redisx-sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

/**
* Attemps to connect to a given Redis sentinel server. The Redis instance is assumed to be
* unconnected at the time of the call.
* unconnected at the time of the call, and the caller must have an exclusive lock on the
* Redis configuration mutex.
*
* @param redis A Redis server instance
* @param serverIndex the current array index of the server among the sentinels
* @return X_SUCCESS (0) if successful, or else an error code &lt;0.
*/
static int rTryConnectSentinel(Redis *redis, int serverIndex) {
static int rTryConnectSentinelAsync(Redis *redis, int serverIndex) {
static const char *fn = "rTryConnectSentinel";

RedisPrivate *p = (RedisPrivate *) redis->priv;
Expand All @@ -32,15 +33,12 @@ static int rTryConnectSentinel(Redis *redis, int serverIndex) {
char desc[80];
int status;

// Since we'll set a new address / port, we should ensure we aren't connected to something else...
redisxDisconnect(redis);

sprintf(desc, "sentinel server %d", serverIndex);
xvprintf("Redis-X> Connect to %s.\n", desc);

prop_error(fn, rSetServerAsync(redis, desc, server.host, server.port));

status = rConnectClient(redis, REDISX_INTERACTIVE_CHANNEL);
status = rConnectClientAsync(redis, REDISX_INTERACTIVE_CHANNEL);
if(status != X_SUCCESS) return status; // No error propagation. It's OK if server is down...

return X_SUCCESS;
Expand All @@ -50,6 +48,7 @@ static int rTryConnectSentinel(Redis *redis, int serverIndex) {

/**
* Moves a server from its current position in the Sentinel server list to the top.
* the caller must have an exclusive lock on the Redis configuration mutex.
*
* @param s The Redis Sentinel configuration
* @param idx The current zero-based index of the server in the list
Expand All @@ -74,6 +73,16 @@ static int rSetTopSentinelAsync(RedisSentinel *s, int idx) {
return X_SUCCESS;
}

/**
* Moves or adds the master server, specified by host name and port number, to the top of
* the list of known Sentinel servers, so next time we try it first when connecting.
* The caller must have an exclusive lock on the Redis configuration mutex.
*
* @param s Redis Sentinel configuration
* @param hostname Host name or IP address of the master node
* @param port Port number of master server on node
* @return X_SUCCESS (0) if successful, or else an error code &lt;0.
*/
static int rIncludeMasterAsync(RedisSentinel *s, char *hostname, int port) {
int i;
void *old = s->servers;
Expand Down Expand Up @@ -111,7 +120,7 @@ static int rIncludeMasterAsync(RedisSentinel *s, char *hostname, int port) {
/**
* Verifies that a given Redis instance is in the master role. Users should always communicate
* to the master, not to replicas. It assumes that we have a live interactive connection to the
* server already.
* server already, and the caller must have an exclusive lock on the Redis configuration mutex.
*
* Upon successful return the current master is added or moved to the top of the sentinel server
* list, so it is the first to try next time.
Expand All @@ -120,14 +129,17 @@ static int rIncludeMasterAsync(RedisSentinel *s, char *hostname, int port) {
* @return X_SUCCESS (0) if the server is confirmed to be the master, or else an error
* code &lt;0.
*/
int rConfirmMasterRole(Redis *redis) {
int rConfirmMasterRoleAsync(Redis *redis) {
static const char *fn = "rConfirmMasterRole";

RESP *reply, **component;
int status;

// Try ROLE command first (available since Redis 4)
reply = redisxRequest(redis, "ROLE", NULL, NULL, NULL, &status);
status = redisxSendRequestAsync(redis->interactive, "ROLE", NULL, NULL, NULL);
prop_error(fn, status);

reply = redisxReadReplyAsync(redis->interactive, &status);
prop_error(fn, status);

if(redisxCheckDestroyRESP(reply, RESP_ARRAY, 0) != X_SUCCESS) {
Expand Down Expand Up @@ -185,15 +197,18 @@ int rDiscoverSentinelAsync(Redis *redis) {

xvprintf("Redis-X> Looking for the Sentinel master...\n");

for(i = 0; i < s->nServers; i++) if(rTryConnectSentinel(redis, i) == X_SUCCESS) {
for(i = 0; i < s->nServers; i++) if(rTryConnectSentinelAsync(redis, i) == X_SUCCESS) {
RESP *reply;
int status;

// Check if this is master...
if(rConfirmMasterRole(redis)) goto success; // @suppress("Goto statement used")
if(rConfirmMasterRoleAsync(redis)) goto success; // @suppress("Goto statement used")

// Get the name of the master...
reply = redisxRequest(redis, "SENTINEL", "get-master-addr-by-name", s->serviceName, NULL, &status);
status = redisxSendRequestAsync(redis->interactive, "SENTINEL", "get-master-addr-by-name", s->serviceName, NULL);
if(status) continue;

reply = redisxReadReplyAsync(redis->interactive, &status);
if(status) continue;

if(redisxCheckDestroyRESP(reply, RESP_ARRAY, 2) == X_SUCCESS) {
Expand Down
2 changes: 1 addition & 1 deletion src/redisx-sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static int rConnectSubscriptionClientAsync(Redis *redis) {
}

xvprintf("Redis-X> Connect pub/sub client.\n");
status = rConnectClient(redis, REDISX_SUBSCRIPTION_CHANNEL);
status = rConnectClientAsync(redis, REDISX_SUBSCRIPTION_CHANNEL);
if(status) {
xvprintf("ERROR! Redis-X : subscription client connection failed: %s\n", redisxErrorDescription(status));
return x_trace(fn, NULL, X_NO_SERVICE);
Expand Down
4 changes: 3 additions & 1 deletion src/redisx-tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void rDestroyClientTLS(ClientPrivate *cp) {
* @param tls TLS configuration.
* @return X_SUCCESS (0) if successful, or else an error code &lt;0.
*/
int rConnectTLSClient(ClientPrivate *cp, const TLSConfig *tls) {
int rConnectTLSClientAsync(ClientPrivate *cp, const TLSConfig *tls) {
static const char *fn = "rConnectClientTLS";
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

Expand Down Expand Up @@ -327,6 +327,7 @@ int redisxSetTLS(Redis *redis, const char *ca_path, const char *ca_file) {
return X_SUCCESS;
#else
(void) redis;
(void) ca_path;
(void) ca_file;

return x_error(X_FAILURE, ENOSYS, fn, "RedisX was built without TLS support");
Expand Down Expand Up @@ -566,6 +567,7 @@ int redisxSetTLSSkipVerify(Redis *redis, boolean value) {
return X_SUCCESS;
#else
(void) redis;
(void) value;

return x_error(X_FAILURE, ENOSYS, fn, "RedisX was built without TLS support");
#endif
Expand Down

0 comments on commit 185dd83

Please sign in to comment.