Skip to content

Commit

Permalink
redisx-cli: background processing of subscriptions, push messages
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Jan 8, 2025
1 parent 4983991 commit a83d75b
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 20 deletions.
2 changes: 2 additions & 0 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ void redisxClearDisconnectHooks(Redis *redis);
RESP *redisxRequest(Redis *redis, const char *command, const char *arg1, const char *arg2, const char *arg3, int *status);
RESP *redisxArrayRequest(Redis *redis, const char **args, const int *length, int n, int *status);
RESP *redisxGetAttributes(Redis *redis);
int redisxGetAvailable(RedisClient *cl);
int redisxSetValue(Redis *redis, const char *table, const char *key, const char *value, boolean confirm);
RESP *redisxGetValue(Redis*redis, const char *table, const char *key, int *status);
char *redisxGetStringValue(Redis *redis, const char *table, const char *key, int *len);
Expand Down Expand Up @@ -516,6 +517,7 @@ int redisxSendArrayRequestAsync(RedisClient *cl, const char **args, const int *l
int redisxClusterAskMigratingAsync(RedisClient *cl, const char **args, const int *lengths, int n);
int redisxSetValueAsync(RedisClient *cl, const char *table, const char *key, const char *value, boolean confirm);
int redisxMultiSetAsync(RedisClient *cl, const char *table, const RedisEntry *entries, int n, boolean confirm);
int redisxGetAvailableAsync(RedisClient *cl);
RESP *redisxReadReplyAsync(RedisClient *cl, int *pStatus);
int redisxClearAttributesAsync(RedisClient *cl);
const RESP *redisxGetAttributesAsync(const RedisClient *cl);
Expand Down
91 changes: 71 additions & 20 deletions src/redisx-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <popt.h>
#include <time.h>
#include <math.h>
#include <pthread.h>
#include <readline/readline.h>
#include <readline/history.h>
#include <bsd/readpassphrase.h>
Expand All @@ -32,6 +33,8 @@
#define FORMAT_JSON 1
#define FORMAT_RAW 2

#define POLL_MILLIS 10

static char *host = "127.0.0.1";
static int port = 6379;
static int format = 0;
Expand All @@ -42,6 +45,9 @@ static int attrib = 0;
static Redis *redis;
static RedisCluster *cluster;

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;


static void printVersion(const char *name) {
printf("%s %s\n", name, REDISX_VERSION_STRING);
}
Expand Down Expand Up @@ -69,10 +75,65 @@ static void printRESP(const RESP *resp) {
}
}

static void printResult(const RESP *reply, const RESP *attr) {
if(format == FORMAT_JSON) {
// Bundle reply and attributes into one JSON document.
XStructure *s = xCreateStruct();
char *json;

if(attr) xSetField(s, redisxRESP2XField("ATTRIBUTES", attr));
if(reply) xSetField(s, redisxRESP2XField("REPLY", reply));

json = xjsonToString(s);
xDestroyStruct(s);

if(json) {
printf("%s", json);
free(json);
}
else printf("{}\n");
}
else {
if(reply) printRESP(reply);
if(attr) printRESP(attr);
}

}

static void *ListenerThread() {
while(TRUE) {
struct timespec nap;

nap.tv_sec = POLL_MILLIS / 1000;
nap.tv_nsec = 1000000 * (POLL_MILLIS % 1000);

pthread_mutex_lock(&mutex);

if(redis && redisxLockConnected(redis->interactive) == X_SUCCESS) {
while(redisxGetAvailableAsync(redis->interactive) > 0) {
int status = X_SUCCESS;
const RESP *resp = redisxReadReplyAsync(redis->interactive, &status);
if(status) break;
else if(resp && resp->type == RESP3_ATTRIBUTE) printResult(NULL, resp);
else printResult(resp, NULL);
}
redisxUnlockClient(redis->interactive);
}

pthread_mutex_unlock(&mutex);

nanosleep(&nap, NULL);
}

return NULL; /* NOT REACHED */
}

static void process(const char **cmdargs, int nargs) {
int status = X_SUCCESS;
RESP *reply, *attr = NULL;

pthread_mutex_lock(&mutex);

if(cluster) {
const char *key = NULL;

Expand All @@ -85,33 +146,16 @@ static void process(const char **cmdargs, int nargs) {
redis = redisxClusterGetShard(cluster, key);
if(!redis) {
fprintf(stderr, "ERROR! No suitable cluster node found for transaction.");
pthread_mutex_unlock(&mutex);
return;
}
}

reply = redisxArrayRequest(redis, cmdargs, NULL, nargs, &status);
if(!status && attrib) attr = redisxGetAttributes(redis);
pthread_mutex_unlock(&mutex);

if(format == FORMAT_JSON) {
// Bundle reply and attributes into one JSON document.
XStructure *s = xCreateStruct();
char *json;

if(attr) xSetField(s, redisxRESP2XField("ATTRIBUTES", attr));
xSetField(s, redisxRESP2XField("REPLY", reply));
json = xjsonToString(s);
xDestroyStruct(s);

if(json) {
printf("%s", json);
free(json);
}
else printf("{}\n");
}
else {
if(!status) printRESP(reply);
if(attr) printRESP(attr);
}
printResult(reply, attr);

redisxDestroyRESP(attr);
redisxDestroyRESP(reply);
Expand All @@ -125,11 +169,17 @@ static void PushProcessor(RedisClient *cl, RESP *resp, void *ptr) {
}

static int interactive(Redis *redis) {
pthread_t listenerTID;
char *prompt = malloc(strlen(host) + 20);
sprintf(prompt, "%s:%d> ", host, port);

using_history();

if(pthread_create(&listenerTID, NULL, ListenerThread, NULL) < 0) {
perror("ERROR! launching listener thread");
exit(1);
}

for(;;) {
char *line = readline(prompt);
const char **args;
Expand Down Expand Up @@ -393,6 +443,7 @@ int main(int argc, const char *argv[]) {
}

if(!cluster) {
if(!redis) return x_error(X_NULL, EAGAIN, fn, "redis is NULL");
prop_error(fn, redisxConnect(redis, FALSE));

if(scan) {
Expand Down
48 changes: 48 additions & 0 deletions src/redisx-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <ctype.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#if __Lynx__
# include <socket.h>
#else
Expand Down Expand Up @@ -855,6 +856,52 @@ int redisxClearAttributesAsync(RedisClient *cl) {
return X_SUCCESS;
}

/**
* Returns the number of bytes of response available on the given Redis client connection. This version
* assumes the caller has exclusive access to the client.
*
* @param cl a locked and connected Redis client
* @return the number of bytes of response available on the client, or else an error code &lt;0.
*
* @sa redisxGetAvailable()
* @sa redisxLockConnected()
* @sa redisxReadReplyAsync()
*/
int redisxGetAvailableAsync(RedisClient *cl) {
static const char *fn = "redisxGetAvailable";

ClientPrivate *cp;
int n = 0;

prop_error(fn, rCheckClient(cl));

cp = cl->priv;
if(ioctl(cp->socket, FIONREAD, &n) < 0) return x_error(X_FAILURE, errno, fn, "ioctl() error: %s", strerror(errno));

return n;
}

/**
* Returns the number of bytes of response available on the given Redis client connection. This is the
* synchronized version, which will obtain a exclusive lock on the client before determining the result.
*
* @param cl a locked and connected Redis client
* @return the number of bytes of response available on the client, or else an error code &lt;0.
*
* @sa redisxGetAvailableAsync()
*/
int redisxGetAvailable(RedisClient *cl) {
static const char *fn = "redisxGetAvailable";

int n;

prop_error(fn, redisxLockConnected(cl));
n = redisxGetAvailable(cl);
redisxUnlockClient(cl);

prop_error(fn, n);
return n;
}

/**
* Reads a response from Redis and returns it.
Expand All @@ -868,6 +915,7 @@ int redisxClearAttributesAsync(RedisClient *cl) {
* i.e., other than a timeout, the client will be disabled.)
*
* @sa redisxSetReplyTimeout()
* @sa redisxGetAvailavleAsync()
*/
RESP *redisxReadReplyAsync(RedisClient *cl, int *pStatus) {
static const char *fn = "redisxReadReplyAsync";
Expand Down

0 comments on commit a83d75b

Please sign in to comment.