Skip to content

Commit

Permalink
ASK redirection and automatic interactive redirections
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Jan 8, 2025
1 parent 5abd45e commit 39c6fde
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 94 deletions.
32 changes: 29 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1478,13 +1478,30 @@ You can start using the cluster right away. You can obtain a connected `Redis` i
// Run your query on using the given Redis key / keys.
RESP *reply = redisxRequest(shard, "GET", key, NULL, NULL, &status);
...
```

The interactive queries handle both `MOVED` and `ASK` redirections automatically. However, asynchronous queries do not
since they return before receiving a response. Thus, when using `redisxReadReplyAsync()` later to process replies, you
should check for redirections:

```c
RESP *reply = redisxReadReplyAsync(...);

if(redisxClusterMoved(reply)) {
// The key is now served by another shard.
// You might want to obtain the new shard and try again...
// You might want to obtain the new shard and repeat the failed
// transaction again (interactively or pipelined)...
...
}
if(redisxClusterIsMigrating(reply)) {
// The key's slot is currently migrating. You may try the redirected
// address indicated in the reply, with the ASKING command, e.g. via a
// redisxClusterAskMigrating() interactive transaction.
...
}
...

```
Finally, when you are done using the cluster, simply discard it:
Expand All @@ -1498,7 +1515,7 @@ Finally, when you are done using the cluster, simply discard it:
### Detecting cluster reconfiguration

In the above example we have shown one way you might check for errors that result from cluster being reconfigured
on-the-fly, using `redisxClusterMoved()` on the `RESP` reply obtained from the shard.
on-the-fly, using `redisxClusterMoved()` and/or `redisxClusterIsMigrating()` on the `RESP` reply obtained from the shard.

Equivalently, you might use `redisxCheckRESP()` or `redisxCheckDestroyRESP()` also for detecting a cluster
reconfiguration. Both of these will return a designated `REDIS_MOVED` error code if the keyword is now served on
Expand All @@ -1511,13 +1528,22 @@ another shard:
// The key is now served by another shard.
...
}
if(s == REDIS_MIGRATING) {
// The key is migrating and may be accessed from new location via an ASKING directive
...
}
if(s != X_SUCCESS) {
// The reply is no good for some other reason...
...
}
...
```
To help managing migration errors, we provide `redisxClusterGetRedirection()` to obtain the redirected Redis instance based
on the redirection `RESP`. Once the redirected server shard is identified you may either resubmit the same query as before
(e.h. with `redisxSendArrayRequest()`) if `MOVED`, or else repeat the query via an interactive `ASKING` directive using
`redisxClusterAskMigrating()`.
A `REDIS_MOVED` error code will be returned by higher-level functions also, which ingest the `RESP` replies from the
shard and return a digested error code. For example, `redisxGetStringValue()` will set the output `len` value to
`REDIS_MOVED` if the value could not be obtained because of a cluster reconfiguration.
Expand Down
6 changes: 6 additions & 0 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ enum resp_type {
#define REDIS_UNEXPECTED_RESP (-105) ///< \hideinitializer Got a Redis response of a different type than expected
#define REDIS_UNEXPECTED_ARRAY_SIZE (-106) ///< \hideinitializer Got a Redis response with different number of elements than expected.
#define REDIS_MOVED (-107) ///< \hideinitializer The requested key has moved to another cluster shard.
#define REDIS_MIGRATING (-108) ///< \hideinitializer The requested key is importing, and you may query with ASKED on the specified node.

/**
* RedisX channel IDs. RedisX uses up to three separate connections to the server: (1) an interactive client, in which
Expand Down Expand Up @@ -419,9 +420,12 @@ int redisxSetTLSSkipVerify(Redis *redis, boolean value);
RedisCluster *redisxClusterInit(Redis *node);
Redis *redisxClusterGetShard(RedisCluster *cluster, const char *key);
boolean redisxClusterMoved(const RESP *reply);
boolean redisxClusterIsMigrating(const RESP *reply);
int redisxClusterConnect(RedisCluster *cluster);
int redisxClusterDisconnect(RedisCluster *cluster);
void redisxClusterDestroy(RedisCluster *cluster);
Redis *redisxClusterGetRedirection(RedisCluster *cluster, const RESP *redirect, boolean refresh);
RESP *redisxClusterAskMigrating(Redis *redis, const char **args, const int *lengths, int n, int *status);

int redisxPing(Redis *redis, const char *message);
enum redisx_protocol redisxGetProtocol(Redis *redis);
Expand Down Expand Up @@ -507,6 +511,7 @@ int redisxUnlockClient(RedisClient *cl);
// Asynchronous access routines (use within redisxLockClient()/ redisxUnlockClient() blocks)...
int redisxSendRequestAsync(RedisClient *cl, const char *command, const char *arg1, const char *arg2, const char *arg3);
int redisxSendArrayRequestAsync(RedisClient *cl, const char **args, const int *length, int n);
int redisxClusterAskMigratingAsync(RedisClient *cl, const char **args, const int *lengths, int n);
int redisxSetValueAsync(RedisClient *cl, const char *table, const char *key, const char *value, boolean confirm);
int redisxMultiSetAsync(RedisClient *cl, const char *table, const RedisEntry *entries, int n, boolean confirm);
RESP *redisxReadReplyAsync(RedisClient *cl, int *pStatus);
Expand All @@ -516,6 +521,7 @@ int redisxIgnoreReplyAsync(RedisClient *cl);
int redisxSkipReplyAsync(RedisClient *cl);
int redisxPublishAsync(Redis *redis, const char *channel, const char *data, int length);


// Error generation with stderr message...
int redisxError(const char *func, int errorCode);
const char* redisxErrorDescription(int code);
Expand Down
1 change: 0 additions & 1 deletion src/redisx-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,6 @@ int redisxSendArrayRequestAsync(RedisClient *cl, const char **args, const int *l
// Send the number of string elements in the command...
L = sprintf(buf, "*%d\r\n", n);


xvprintf("Redis-X> request[%d]", n);
for(i = 0; i < n; i++) {
if(args[i]) xvprintf(" %s", args[i]);
Expand Down
246 changes: 243 additions & 3 deletions src/redisx-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,65 @@ Redis *redisxClusterGetShard(RedisCluster *cluster, const char *key) {
return NULL;
}

/// \cond PRIVATE
static Redis *rClusterGetShardByAddress(RedisCluster *cluster, const char *host, int port, boolean refresh) {
static const char *fn = "redisxClusterGetShard";

ClusterPrivate *p;
int i;

if(!cluster) {
x_error(X_NULL, EINVAL, fn, "cluster is NULL");
return NULL;
}

if(!host) {
x_error(X_NAME_INVALID, EINVAL, fn, "address is NULL");
return NULL;
}

if(!host[0]) {
x_error(X_NAME_INVALID, EINVAL, fn, "address is empty");
return NULL;
}

p = (ClusterPrivate *) cluster->priv;
if(!p) {
x_error(X_NO_INIT, ENXIO, fn, "cluster is not initialized");
return NULL;
}

pthread_mutex_lock(&p->mutex);

for(i = 0; i < p->n_shards; i++) {
const RedisShard *s = &p->shard[i];
int m;

for(m = 0; m < s->n_servers; m++) {
Redis *r = s->redis[m];
const RedisPrivate *np = (RedisPrivate *) r->priv;

if(np->port == port && strcmp(np->hostname, host) == 0) {
if(!redisxIsConnected(r)) if(redisxConnect(r, p->usePipeline) != X_SUCCESS) continue;
pthread_mutex_unlock(&p->mutex);
return r;
}
}
}

pthread_mutex_unlock(&p->mutex);

if(refresh) {
rClusterRefresh(cluster);
return rClusterGetShardByAddress(cluster, host, port, FALSE);
}

x_error(0, EAGAIN, fn, "not a known member of the cluster: %s:%d", host, port);
return NULL;
}

/// \endcond

/**
* Initializes a Redis cluster configuration using a known cluster node. The call will connect to
* the specified node (if not already connected), and will query the cluster configuration from it.
Expand Down Expand Up @@ -587,14 +646,15 @@ int redisxClusterDisconnect(RedisCluster *cluster) {

/**
* Checks if the reply is an error indicating that the cluster has been reconfigured and
* the request can no longer be fulfilled on the given shard. You might want to obtain
* the new shard using redisxClusterGetShard() again, and re-submit the request to the
* new shard.
* the request can no longer be fulfilled on the given shard (i.e., `MOVED` redirection).
* You might want to obtain the new shard using redisxClusterGetShard() again, and re-submit
* the request to the new shard.
*
* @param reply The response obtained from the Redis shard / server.
* @return TRUE (1) if the reply is an error indicating that the cluster has been
* reconfigured and the key has moved to another shard.
*
* @sa redisxClusterIsMigrating()
* @sa redisxClusterGetShard()
*/
boolean redisxClusterMoved(const RESP *reply) {
Expand All @@ -603,3 +663,183 @@ boolean redisxClusterMoved(const RESP *reply) {
if(reply->n < 5) return FALSE;
return (strncmp("MOVED", (char *) reply->value, 5) == 0);
}

/**
* Checks if the reply is an error indicating that the query is for a slot that is currently
* migrating to another shard (i.e., `ASK` redirection). You may need to use an `ASKING`
* directive, e.g. via redisxClusterAskMigrating() on the node specified in the message to
* access the key.
*
* @param reply The response obtained from the Redis shard / server.
* @return TRUE (1) if the reply is an error indicating that the cluster has been
* reconfigured and the key has moved to another shard.
*
* @sa redisxClusterMoved()
* @sa redisxClusterAskMigrating()
*/
boolean redisxClusterIsMigrating(const RESP *reply) {
if(!reply) return FALSE;
if(reply->type != RESP_ERROR) return FALSE;
if(reply->n < 3) return FALSE;
return (strncmp("ASK", (char *) reply->value, 3) == 0);
}

/**
* Parses a `-MOVED` or `-ASK` redirection response from a Redis cluster node, to obtain
* the shard from which the same keyword that caused the error can now be accessed.
*
* @param cluster Redis cluster configuration
* @param redirect the redirection response sent to a keyword query
* @param refresh whether it should refresh the cluster configuration and try again if the
* redirection target is not found in the current cluster configuration.
* @return the migrated server, from which the keyword should be queried now.
*
* @sa redisxClusterMoved()
* @sa redisxClusterIsMigrating()
* @sa redisxClusterAskMigrating()
*/
Redis *redisxClusterGetRedirection(RedisCluster *cluster, const RESP *redirect, boolean refresh) {
static const char *fn = "redisxClusterGetRedirection";

char *str, *tok;

if(!cluster) {
x_error(0, EINVAL, fn, "input cluster is NULL");
return NULL;
}

if(!redisxClusterMoved(redirect) && !redisxClusterIsMigrating(redirect)) {
return NULL;
}

str = xStringCopyOf((char *) redirect->value);
x_check_alloc(str);

strtok(str, " \t\r\n"); // MOVED or ASK
strtok(NULL, " \t\r\n"); // SLOT #
tok = strtok(NULL, ":"); // host:port
if(tok) {
const char *host = tok;
int port = strtol(strtok(NULL, " \t\r\n"), NULL, 10);
free(str);
return rClusterGetShardByAddress(cluster, host, port, refresh);
}

x_error(X_PARSE_ERROR, EBADMSG, fn, "Unparseable migration reply: %s", str);

free(str);
return NULL;
}

/**
* Makes a redirected transaction using the ASKING directive to the specific client. This should be
* in response to an -ASK redirection error to obtain a key that is in a slot that is currently
* migrating. The requested Redis command arguments are sent prefixed with the 'ASKING' directive,
* as per the Redis Cluster specification.
*
* @param redis Redirected Redis instance, e.g. from redisxClusterGetRedirect()
* @param args Original command arguments that were redirected
* @param lengths Original argument byte lengths redirected (or NULL to use strlen() automatically).
* @param n Original number of arguments.
* @param status Pointer to integer in which to return status: X_SUCCESS (0) if successful or
* else and error code &lt;0.
* @return The response to the `ASKING` query from the redirected server.
*
* @sa redisxClusterAskMigratingAsync()
* @sa redisxClusterIsMigrating()
* @sa redisxClusterGetRedirect()
* @sa redisxArrayRequest()
*/
RESP *redisxClusterAskMigrating(Redis *redis, const char **args, const int *lengths, int n, int *status) {
static const char *fn = "redisxClusterAskMigrating";

RedisClient *cl;
RESP *reply = NULL;
int s;

s = redisxCheckValid(redis);
if(s != X_SUCCESS) {
if(status) *status = s;
return x_trace_null(fn, NULL);
}

cl = redis->interactive;
s = redisxLockConnected(cl);
if(s) {
if(status) *status = s;
return x_trace_null(fn, NULL);
}

redisxClearAttributesAsync(cl);

s = redisxClusterAskMigratingAsync(cl, args, lengths, n);
if(s == X_SUCCESS) reply = redisxReadReplyAsync(cl, &s);
redisxUnlockClient(cl);

if(s != X_SUCCESS) {
if(status) *status = s;
x_trace_null(fn, NULL);
}

return reply;
}


/**
* Makes a redirected request using the ASKING directive to the specific client. This should be
* in response to an -ASK redirection error to obtain a key that is in a slot that is currently
* migrating. The requested Redis command arguments are sent prefixed with the 'ASKING' directive,
* as per the Redis Cluster specification.
*
* This function should be called with exclusive access to the client.
*
* @param cl Locked client on a redirected Redis instance, e.g. from redisxClusterGetRedirect()
* @param args Original command arguments that were redirected
* @param lengths Original argument byte lengths redirected (or NULL to use strlen() automatically).
* @param n Original number of arguments.
* @return X_SUCCESS (0) if successful or else and error code &lt;0.
*
* @sa redisxClusterAskMigrating()
* @sa redisxClusterIsMigrating()
* @sa redisxClusterGetRedirect()
* @sa redisxArrayRequest()
*/
int redisxClusterAskMigratingAsync(RedisClient *cl, const char **args, const int *lengths, int n) {
static const char *fn = "redisxClusterAskMigratingAsync";

const ClientPrivate *cp;
const char **askargs = NULL;
int *asklen = NULL;
int status = X_SUCCESS;

prop_error(fn, rCheckClient(cl));

cp = (ClientPrivate *) cl->priv;
if(!cp->isEnabled) return x_error(X_NO_SERVICE, ENOTCONN, fn, "client is not connected");

if(!args) return x_error(X_NULL, EINVAL, fn, "input args is NULL");

askargs = (const char **) malloc((n + 1) * sizeof(char *));
if(!askargs) return x_error(X_FAILURE, errno, fn, "alloc error (%d char *)", (n + 1));

if(lengths) {
asklen = (int *) malloc((n + 1) * sizeof(char *));
if(!asklen) {
free(askargs);
return x_error(X_FAILURE, errno, fn, "alloc error (%d int)", (n + 1));
}

asklen[0] = 0;
memcpy(&asklen[1], lengths, n * sizeof(int));
}

askargs[0] = xStringCopyOf("ASKING");
memcpy(&askargs[1], args, n * sizeof(char *));

status = redisxSendArrayRequestAsync(cl, askargs, asklen, n + 1);

if(asklen) free(asklen);
free(askargs);

return status;
}
Loading

0 comments on commit 39c6fde

Please sign in to comment.