Skip to content

Commit

Permalink
Initial RESP3 and HELLO support
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Dec 6, 2024
1 parent 3789bce commit b4190d2
Show file tree
Hide file tree
Showing 10 changed files with 748 additions and 120 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ clean:

# Remove all generated files
.PHONY: distclean
distclean: clean
distclean:
rm -f Doxyfile.local $(LIB)/libredisx.so* $(LIB)/libredisx.a


# ----------------------------------------------------------------------------
# The nitty-gritty stuff below
# ----------------------------------------------------------------------------

SOURCES = $(SRC)/redisx.c $(SRC)/redisx-net.c $(SRC)/redisx-hooks.c $(SRC)/redisx-client.c \
SOURCES = $(SRC)/redisx.c $(SRC)/resp.c $(SRC)/redisx-net.c $(SRC)/redisx-hooks.c $(SRC)/redisx-client.c \
$(SRC)/redisx-tab.c $(SRC)/redisx-sub.c $(SRC)/redisx-script.c

# Generate a list of object (obj/*.o) files from the input sources
Expand Down
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,19 @@ whose contents are:
| `RESP_ARRAY` | `*` | number of `RESP *` pointers | `(RESP **)` |
| `RESP_INT` | `:` | integer return value | `(void)` |
| `RESP_SIMPLE_STRING` | `+` | string length | `(char *)` |
| `RESP_ERROR` | `-` | string length | `(char *)` |
| `RESP_ERROR` | `-` | total string length | `(char *)` |
| `RESP_BULK_STRING` | `$` | string length or -1 if `NULL` | `(char *)` |

| `RESP3_NULL` | `_` | 0 | `(void)` |
| `RESP3_BOOLEAN` | `#` | 1 if _true_, 0 if _false_ | `(void)` |
| `RESP3_DOUBLE` | `,` | _unused_ | `(double *)` |
| `RESP3_BIG_NUMBER` | `(` | string representation length | `(char *)` |
| `RESP3_BLOB_ERROR` | `!` | total string length | `(char *)` |
| `RESP3_VERBATIM_TEXT` | `=` | text length (incl. type) | `(char *)` |
| `RESP3_SET` | `~` | number of `RESP *` pointers | `(RESP *)` |
| `RESP3_MAP` | `%` | number of key / value pairs | `(RedisMapEntry *)` |
| `RESP3_ATTRIBUTE` | `|` | number of key / value pairs | `(RedisMapEntry *)` |
| `RESP3_PUSH` | `>` | number of `RESP *` pointers | `(RESP **)` |


Each `RESP` has a type (e.g. `RESP_SIMPLE_STRING`), an integer value `n`, and a `value` pointer
to further data. If the type is `RESP_INT`, then `n` represents the actual return value (and the `value` pointer is
Expand Down
2 changes: 1 addition & 1 deletion build.mk
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ clean: clean-local

# Remove intermediate files (general)
.PHONY: distclean
distclean: distclean-local
distclean: clean distclean-local

# Static code analysis using 'cppcheck'
.PHONY: analyze
Expand Down
4 changes: 3 additions & 1 deletion include/redisx-priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ typedef struct {
uint32_t addr; ///< The 32-bit inet address
int port; ///< port number (usually 6379)
int dbIndex; ///< the zero-based database index
char *username; ///< REdis user name (if any)
char *username; ///< Redis user name (if any)
char *password; ///< Redis password (if any)
int protocol; ///< RESP version to use
boolean no_hello; ///< whether to skip trying HELLO (introduced in Redis 6.0.0 only)

RedisClient *clients;

Expand Down
52 changes: 52 additions & 0 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@
#define RESP_ERROR '-' ///< \hideinitializer RESP error message type
#define RESP_BULK_STRING '$' ///< \hideinitializer RESP bulk string type

// RESP3 types
#define RESP3_NULL '_' ///< \hideinitializer RESP3 null value
#define RESP3_DOUBLE ',' ///< \hideinitializer RESP3 floating-point value
#define RESP3_BOOLEAN '#' ///< \hideinitializer RESP3 boolean value
#define RESP3_BLOB_ERROR '!' ///< \hideinitializer RESP3 blob error
#define RESP3_VERBATIM_STRING '=' ///< \hideinitializer RESP3 verbatim string (with type)
#define RESP3_BIG_NUMBER '(' ///< \hideinitializer RESP3 big integer / decimal
#define RESP3_MAP '%' ///< \hideinitializer RESP3 dictionary of key / value
#define RESP3_SET '~' ///< \hideinitializer RESP3 unordered set of elements
#define RESP3_ATTRIBUTE '|' ///< \hideinitializer RESP3 dictionary of attributes (metadata)
#define RESP3_PUSH '>' ///< \hideinitializer RESP3 dictionary of attributes (metadata)
#define RESP3_SNIPPET ';' ///< \hideinitializer RESP3 dictionary of attributes (metadata)

#define REDIS_INVALID_CHANNEL (-101) ///< \hideinitializer There is no such channel in the Redis instance.
#define REDIS_NULL (-102) ///< \hideinitializer Redis returned NULL
#define REDIS_ERROR (-103) ///< \hideinitializer Redis returned an error
Expand All @@ -112,11 +125,24 @@ enum redisx_channel {

#define REDISX_CHANNELS (REDISX_SUBSCRIPTION_CHANNEL + 1) ///< \hideinitializer The number of channels a Redis instance has.

/**
* The RESP protocol to use for a Redis instance. Redis originally used RESP2, but later releases added
* support for RESP3.
*
*/
enum redisx_protocol {
REDISX_RESP2 = 2, ///< \hideinitializer RESP2 protocol
REDISX_RESP3 ///< \hideinitializer RESP3 protocol (since Redis version 6.0.0)
};

/**
* \brief Structure that represents a Redis response (RESP format).
*
* \sa redisxDestroyRESP()
* \sa redisxIsScalarType()
* \sa redisxIsStringType()
* \sa redisxIsArrayType()
* \sa redisxIsMapType()
*/
typedef struct RESP {
char type; ///< RESP type RESP_ARRAY, RESP_INT ...
Expand All @@ -126,6 +152,19 @@ typedef struct RESP {
///< (RESP**)...
} RESP;

/**
* Structure that represents a key/value mapping in RESP3.
*
* @sa redisxIsMapType()
* @sa RESP3_MAP
* @sa RESP3_ATTRIBUTE
*
*/
typedef struct {
RESP *key; ///< The keyword component
RESP *value; ///< The associated value component
} RedisMapEntry;


/**
* \brief A single key / value entry, or field, in the Redis database.
Expand Down Expand Up @@ -228,6 +267,9 @@ int redisxSetPort(Redis *redis, int port);
int redisxSetUser(Redis *redis, const char *username);
int redisxSetPassword(Redis *redis, const char *passwd);
int redisxSelectDB(Redis *redis, int idx);
int redisxNoHello(Redis *redis);
int redisxSetProtocol(Redis *redis, enum redisx_protocol protocol);
enum redisx_protocol redisxGetProtocol(const Redis *redis);

Redis *redisxInit(const char *server);
void redisxDestroy(Redis *redis);
Expand Down Expand Up @@ -286,6 +328,16 @@ int redisxGetTime(Redis *redis, struct timespec *t);
int redisxCheckRESP(const RESP *resp, char expectedType, int expectedSize);
int redisxCheckDestroyRESP(RESP *resp, char expectedType, int expectedSize);
void redisxDestroyRESP(RESP *resp);
boolean redisxIsScalarType(const RESP *r);
boolean redisxIsStringType(const RESP *r);
boolean redisxIsArrayType(const RESP *r);
boolean redisxIsMapType(const RESP *r);
boolean redisxEqualRESPs(const RESP *a, const RESP *b);
int redisxSplitText(RESP *resp, char **text);
int redisxAppendRESP(RESP *resp, RESP *part);

RedisMapEntry *redisxGetMapEntry(const RESP *map, const RESP *key);
RedisMapEntry *redisxGetKeywordEntry(const RESP *map, const char *key);

// Locks for async calls
int redisxLockClient(RedisClient *cl);
Expand Down
172 changes: 147 additions & 25 deletions src/redisx-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <string.h>
#include <errno.h>
#include <limits.h>
#include <ctype.h>
#if __Lynx__
# include <socket.h>
#else
Expand Down Expand Up @@ -668,6 +669,46 @@ int redisxIgnoreReplyAsync(RedisClient *cl) {
return X_SUCCESS;
}

static int rTypeIsParametrized(char type) {
switch(type) {
case RESP_INT:
case RESP_BULK_STRING:
case RESP_ARRAY:
case RESP3_SET:
case RESP3_PUSH:
case RESP3_MAP:
case RESP3_ATTRIBUTE:
case RESP3_BLOB_ERROR:
case RESP3_VERBATIM_STRING:
case RESP3_SNIPPET:
return TRUE;
default:
return FALSE;
}
}

static void rPushMessage(RedisClient *cl, RESP *resp) {
int i;
RESP **array;

if(resp->n < 0) return;

array = (RESP **) calloc(resp->n, sizeof(RESP *));
if(!array) fprintf(stderr, "WARNING! Redis-X : not enough memory for push message (%d elements). Skipping.\n", resp->n);

for(i = 0; i < resp->n; i++) {
RESP *r = redisxReadReplyAsync(cl);
if(array) array[i] = r;
else redisxDestroyRESP(r);
}

resp->value = array;

// TODO push to consumer.

redisxDestroyRESP(resp);
}

/**
* Reads a response from Redis and returns it.
*
Expand Down Expand Up @@ -702,61 +743,141 @@ RESP *redisxReadReplyAsync(RedisClient *cl) {
return NULL;
}

size = rReadToken(cp, buf, REDIS_SIMPLE_STRING_SIZE + 1);
if(size < 0) {
// Either read/recv had an error, or we got garbage...
if(cp->isEnabled) x_trace_null(fn, NULL);
cp->isEnabled = FALSE; // Disable this client so we don't attempt to read from it again...
return NULL;
}
for(;;) {
size = rReadToken(cp, buf, REDIS_SIMPLE_STRING_SIZE + 1);
if(size < 0) {
// Either read/recv had an error, or we got garbage...
if(cp->isEnabled) x_trace_null(fn, NULL);
cp->isEnabled = FALSE; // Disable this client so we don't attempt to read from it again...
return NULL;
}

resp = (RESP *) calloc(1, sizeof(RESP));
x_check_alloc(resp);
resp->type = buf[0];

// Get the integer / size value...
if(resp->type == RESP_ARRAY || resp->type == RESP_INT || resp->type == RESP_BULK_STRING) {
char *tail;
errno = 0;
resp->n = (int) strtol(&buf[1], &tail, 10);
if(errno) {
fprintf(stderr, "WARNING! Redis-X : unparseable dimension '%s'\n", &buf[1]);
status = X_PARSE_ERROR;
resp = (RESP *) calloc(1, sizeof(RESP));
x_check_alloc(resp);
resp->type = buf[0];

// Parametrized type.
if(rTypeIsParametrized(resp->type)) {

if(buf[1] == '?') {
// Streaming RESP in parts...
for(;;) {
RESP *r = redisxReadReplyAsync(cl);
if(r->type != RESP3_SNIPPET) {
int type = r->type;
redisxDestroyRESP(r);
fprintf(stderr, "WARNING! expected type '%c', got type '%c'.", resp->type, type);
return resp;
}

if(r->n == 0) {
if(resp->type == RESP3_PUSH) break;
return resp;
}

r->type = resp->type;
redisxAppendRESP(resp, r);
}
}
else {
// Get the integer / size value...
char *tail;
errno = 0;
resp->n = (int) strtol(&buf[1], &tail, 10);
if(errno) {
fprintf(stderr, "WARNING! Redis-X : unparseable dimension '%s'\n", &buf[1]);
status = X_PARSE_ERROR;
}
}
}

// Deal with push messages...
if(resp->type == RESP3_PUSH) rPushMessage(cl, resp);
else break;
}


// Now get the body of the response...
if(!status) switch(resp->type) {

case RESP3_NULL:
resp->n = 0;
break;

case RESP3_BOOLEAN: {
resp->n = 1;
switch(tolower(buf[1])) {
case 't': resp->n = TRUE; break;
case 'f': resp->n = FALSE; break;
default:
fprintf(stderr, "WARNING! Redis-X : invalid boolean value '%c'\n", buf[1]);
status = X_PARSE_ERROR;
}
break;
}

case RESP3_DOUBLE:
// TODO inf / -inf?
resp->value = (double *) calloc(1, sizeof(double));
x_check_alloc(resp->value);
if(sscanf(&buf[1], "%lf", (double *) resp->value) != 1) {
fprintf(stderr, "WARNING! Redis-X : invalid double value '%s'\n", &buf[1]);
status = X_PARSE_ERROR;
}
break;

case RESP3_SET:
case RESP3_PUSH:
case RESP_ARRAY: {
RESP **component;
int i;

if(resp->n <= 0) break;

resp->value = (RESP **) malloc(resp->n * sizeof(RESP *));
if(resp->value == NULL) {
component = (RESP **) malloc(resp->n * sizeof(RESP *));
if(component == NULL) {
status = x_error(X_FAILURE, errno, fn, "malloc() error (%d RESP)", resp->n);
// We should get the data from the input even if we have nowhere to store...
}

component = (RESP **) resp->value;

for(i=0; i<resp->n; i++) {
RESP* r = redisxReadReplyAsync(cl); // Always read RESP even if we don't have storage for it...
if(resp->value) component[i] = r;
RESP *r = redisxReadReplyAsync(cl); // Always read RESP even if we don't have storage for it...
if(component) component[i] = r;
else redisxDestroyRESP(r);
}

// Consistency check. Discard response if incomplete (because of read errors...)
if(resp->value) for(i = 0; i < resp->n; i++) if(component[i] == NULL) {
if(component) for(i = 0; i < resp->n; i++) if(component[i] == NULL || component[i]->type != RESP3_NULL) {
fprintf(stderr, "WARNING! Redis-X : incomplete array received (index %d of %d).\n", (i+1), resp->n);
if(!status) status = REDIS_INCOMPLETE_TRANSFER;
break;
}

resp->value = component;

break;
}

case RESP3_MAP:
case RESP3_ATTRIBUTE: {
RedisMapEntry *component;
int i;
if(resp->n <= 0) break;

component = (RedisMapEntry *) calloc(resp->n, sizeof(RedisMapEntry));
x_check_alloc(component);

for(i=0; i<resp->n; i++) {
RedisMapEntry *e = &component[i];
e->key = redisxReadReplyAsync(cl);
e->value = redisxReadReplyAsync(cl);
}
break;
}

case RESP3_BLOB_ERROR:
case RESP3_VERBATIM_STRING:
case RESP_BULK_STRING:
if(resp->n < 0) break; // no string token following!

Expand All @@ -781,6 +902,7 @@ RESP *redisxReadReplyAsync(RedisClient *cl) {

case RESP_SIMPLE_STRING:
case RESP_ERROR:
case RESP3_BIG_NUMBER:
resp->value = malloc(size);

if(resp->value == NULL) {
Expand Down
Loading

0 comments on commit b4190d2

Please sign in to comment.