diff --git a/.settings/language.settings.xml b/.settings/language.settings.xml index b62474a..4049f1d 100644 --- a/.settings/language.settings.xml +++ b/.settings/language.settings.xml @@ -5,7 +5,7 @@ - + diff --git a/Makefile b/Makefile index 5ca19ea..2e05788 100644 --- a/Makefile +++ b/Makefile @@ -69,7 +69,7 @@ clean: # Remove all generated files .PHONY: distclean -distclean: clean +distclean: rm -f Doxyfile.local $(LIB)/libredisx.so* $(LIB)/libredisx.a @@ -77,7 +77,7 @@ distclean: clean # The nitty-gritty stuff below # ---------------------------------------------------------------------------- -SOURCES = $(SRC)/redisx.c $(SRC)/redisx-net.c $(SRC)/redisx-hooks.c $(SRC)/redisx-client.c \ +SOURCES = $(SRC)/redisx.c $(SRC)/resp.c $(SRC)/redisx-net.c $(SRC)/redisx-hooks.c $(SRC)/redisx-client.c \ $(SRC)/redisx-tab.c $(SRC)/redisx-sub.c $(SRC)/redisx-script.c # Generate a list of object (obj/*.o) files from the input sources diff --git a/README.md b/README.md index b045152..06a16f3 100644 --- a/README.md +++ b/README.md @@ -100,14 +100,16 @@ prior to invoking `make`. The following build variables can be configured: - `CPPFLAGS`: C preprocessor flags, such as externally defined compiler constants. - - `CFLAGS`: Flags to pass onto the C compiler (default: `-Os -Wall -std=c99`). Note, `-Iinclude` will be added + - `CFLAGS`: Flags to pass onto the C compiler (default: `-g -Os -Wall`). Note, `-Iinclude` will be added automatically. + - `CSTANDARD`: Optionally, specify the C standard to compile for, e.g. `c99` to compile for the C99 standard. If + defined then `-std=$(CSTANDARD)` is added to `CFLAGS` automatically. + + - `WEXTRA`: If set to 1, `-Wextra` is added to `CFLAGS` automatically. + - `LDFLAGS`: Extra linker flags (default is _not set_). Note, `-lm -lxchange` will be added automatically. - - `BUILD_MODE`: You can set it to `debug` to enable debugging features: it will initialize the global `xDebug` - variable to `TRUE` and add `-g` to `CFLAGS`. - - `CHECKEXTRA`: Extra options to pass to `cppcheck` for the `make check` target - `XCHANGE`: If the [Smithsonian/xchange](https://github.com/Smithsonian/xchange) library is not installed on your @@ -205,10 +207,25 @@ the default 6379), and the database authentication (if any): redisxSetPassword(redis, mySecretPasswordString); ``` -You might also tweak the send/receive buffer sizes to use for clients, if you find the socket defaults sub-optimal for -your application (note, that this setting is common to all `Redis` instances managed by the library): +You can also set the RESP protocol to use (provided your server is compatible with Redis 6 or later): + +```c + // (optional) Use RESP3 (provided the server supports it) + redisxSetProtocol(redis, REDISX_RESP3); +``` + +The above call will use the `HELLO` command (since Redis 6) upon connecting. If you do not set the protocol, `HELLO` +will not be used, and RESP2 will be assumed -- which is best for older servers. (Note, that you can always check the +actual protocol used after connecting, using `redisxGetProtocol()`). Note, that after connecting, you may retrieve +the set of server properties sent in response to `HELLO` using `redisxGetHelloData()`. + +You might also tweak the socket options used for clients, if you find the socket defaults sub-optimal for your +application (note, that this setting is common to all `Redis` instances managed by the library): ```c + // (optional) Set 1000 ms socket read/write timeout for future connections. + redisxSetSocketTimeout(redis, 1000); + // (optional) Set the TCP send/rcv buffer sizes to use if not default values. // This setting applies to all new connections after... redisxSetTcpBuf(65536); @@ -304,12 +321,14 @@ The same goes for disconnect hooks, using `redisxAddDisconnectHook()` instead. ## Simple Redis queries - [Interactive transactions](#interactive-transactions) - - [RESP data type](#resp-data-type) + - [Push notifications](#push-notifications) + - [RESP data type](#resp-data-type) + Redis queries are sent as strings, according the the specification of the Redis protocol. All responses sent back by -the server using the RESP protocol. Specifically, Redis uses version 2.0 of the RESP protocol (a.k.a. RESP2) by -default, with optional support for the newer RESP3 introduced in Redis version 6.0. The __RedisX__ library currently -processes the standard RESP2 replies only. RESP3 support to the library may be added in the future (stay tuned...) +the server using the RESP protocol. Specifically, Redis uses version 2 of the RESP protocol (a.k.a. RESP2) by +default, with optional support for the newer RESP3 introduced in Redis version 6.0. The __RedisX__ library provides +support for both RESP2 and RESP3. @@ -355,6 +374,46 @@ individual parameters are not 0-terminated strings. In interactive mode, each request is sent to the Redis server, and the response is collected before the call returns with that response (or `NULL` if there was an error). + +### Push notifications + +Redis 6 introduced out-of-band push notifications along with RESP3. It allows the server to send messages to any +connected client that are not in response to a query. For example, Redis 6 allows `CLIENT TRACKING` to use such push +notifications (e.g. `INVALIDATE foo`), to notify connected clients when a watched variable has been updated from +somewhere else. + +__RedisX__ allows you to specify a custom callback `RedisPushProcessor` function to handle such push notifications, +e.g.: + +```c + void my_push_processor(RESP *message, void *ptr) { + char *owner = (char *) ptr; // Additional argument we need, in this case a string. + printf("[%s] Got push message: type %c, n = %d.\n", owner, message->type, message->n); + } +``` + +Then you can activate the processing of push notifications with `redisxSetPushProcessor()`. You can specify the +optional additional data that you want to pass along to the push processor function -- just make sure that the data +has a sufficient scope / lifetime such that it is valid at all times while push messages are being processed. E.g. + +```c + static owner = "my process"; // The long life data we want to pass to my_push_processor... + + // Use my_push_processor and pass along the owner as a parameter + redisxSetPushProcessor(redis, my_push_processor, owner); +``` + +There are some things to look out for in your `RedisPushProcessor` implementation: + +- The call should not block (except perhaps for a quick mutex lock) and should return quickly. If blocking calls, or + extensive processing is required, you should place a copy of the PUSH notification onto a queue and let an + asynchronous thread take it from there. +- The call should not attempt to alter or destroy the push message. If needed it can copy parts or the whole. +- You should not attempt to lock or release clients from the call. If you need access to a client, it's best to put a + copy of the RESP notification onto a queue and let an asynchronous thread deal with it. +- You should + + ### RESP data type @@ -378,9 +437,19 @@ whose contents are: | `RESP_ARRAY` | `*` | number of `RESP *` pointers | `(RESP **)` | | `RESP_INT` | `:` | integer return value | `(void)` | | `RESP_SIMPLE_STRING` | `+` | string length | `(char *)` | - | `RESP_ERROR` | `-` | string length | `(char *)` | + | `RESP_ERROR` | `-` | total string length | `(char *)` | | `RESP_BULK_STRING` | `$` | string length or -1 if `NULL` | `(char *)` | - + | `RESP3_NULL` | `_` | 0 | `(void)` | + | `RESP3_BOOLEAN` | `#` | 1 if _true_, 0 if _false_ | `(void)` | + | `RESP3_DOUBLE` | `,` | _unused_ | `(double *)` | + | `RESP3_BIG_NUMBER` | `(` | string representation length | `(char *)` | + | `RESP3_BLOB_ERROR` | `!` | total string length | `(char *)` | + | `RESP3_VERBATIM_TEXT` | `=` | text length (incl. type) | `(char *)` | + | `RESP3_SET` | `~` | number of `RESP *` pointers | `(RESP *)` | + | `RESP3_MAP` | `%` | number of key / value pairs | `(RedisMapEntry *)` | + | `RESP3_ATTRIBUTE` | `\|` | number of key / value pairs | `(RedisMapEntry *)` | + | `RESP3_PUSH` | `>` | number of `RESP *` pointers | `(RESP **)` | + Each `RESP` has a type (e.g. `RESP_SIMPLE_STRING`), an integer value `n`, and a `value` pointer to further data. If the type is `RESP_INT`, then `n` represents the actual return value (and the `value` pointer is @@ -388,9 +457,16 @@ not used). For string type values `n` is the number of characters in the string while for `RESP_ARRAY` types the `value` is a pointer to an embedded `RESP` array and `n` is the number of elements in that. -You may check the integrity of a `RESP` using `redisxCheckRESP()`. Since `RESP` data is dynamically allocated, the -user is responsible for discarding them once they are no longer needed, e.g. by calling `redisxDestroyRESP()`. The -two steps may be combined to automatically discard invalid or unexpected `RESP` data in a single step by calling +To help with deciding what cast to use for a given `value` field of the RESP data structure, we provide the +convenience methods `redisxIsScalarType()` when cast is `(void)` or else `(double *)`), `redisxIsArrayType()` if the +cast is `(RESP **)`, `redisxIsStringTupe()` if the cast should be `(char *)`, and `redisxIsMapType()` if the cast +should be to `(RedisMapEntry *)`. + +You can check that two `RESP` data structures are equivalent with `redisxIsEqualRESP(RESP *a, RESP *b)`. + +You may also check the integrity of a `RESP` using `redisxCheckRESP()`. Since `RESP` data is dynamically allocated, +the user is responsible for discarding them once they are no longer needed, e.g. by calling `redisxDestroyRESP()`. +The two steps may be combined to automatically discard invalid or unexpected `RESP` data in a single step by calling `redisxCheckDestroyRESP()`. ```c @@ -429,10 +505,36 @@ Before destroying a RESP structure, the caller may want to dereference values wi stringValue = (char *) r->value; r->value = NULL; - redisxDestroyRESP(r); // The 'stringValue' is still a valid pointer after! + redisxDestroyRESP(r); // 'stringValue' is still a valid pointer after! } ``` +Note, that you can usually convert a RESP to an `XField`, and/or to JSON representation using the +`redisxRESP2XField()` and `redisxRESP2JSON()` functions, e.g.: + +```c + Redis redis = ... + + // Obtain a copy of the response received from HELLO upon connecting... + RESP *resp = redisxGetHelloData(redis); + + // Print the response from HELLO to the standard output in JSON format + char *json = redisxRESP2JSON("hello_response", resp); + if(json != NULL) { + printf("%s", json); + free(json); + } + + ... + + // Clean up + redisxDestroyRESP(resp); +```c + +All RESP can be represented in JSON format. This is trivial for map entries, which have strings as their keywords -- +which is the case for all RESP sent by Redis. And, it is also possible for map entries with non-string keys, albeit +via a more tedious (and less standard) JSON representation, stored under the `.non-string-keys` keyword. + ----------------------------------------------------------------------------- @@ -790,8 +892,10 @@ Stay tuned. ## Advanced queries and pipelining - [Asynchronous client processing](#asynchronous-client-processing) + - [Bundled Attributes](#attributes) - [Pipelined transactions](#pipelined-transactions) + ### Asynchronous client processing @@ -873,6 +977,37 @@ Of course you can build up arbitrarily complex set of queries and deal with a se what works best for your application. + +### Bundled Attributes + +As of Redis 6, the server might send ancillary data along with replies, if the RESP3 protocol is used. These are +collected together with the expected responses. However, these optional attributes are not returned to the user +automatically. Instead, the user may retrieve attributes directly after getting a response from +`redisxReadReplyAsync()` using `redisxGetAttributesAsync()`. And, attributes that were received previously can be +discarded with `redisxClearAttributesAsync()`. For example, + +```c + RedisClient *cl = ... // The client we use for our transactions + + if(redisxLockEnabled(cl) == X_SUCCESS) { + ... + + // Clear any prior attributes we may have previously received for the client... + redisxClearAttributesAsync(cl); + + // Read a response for a request we sent earlier... + RESP *reply = redisxReadReplyAsync(cl); + + // Retrieve the attributes (if any) that were sent with the response. + RESP *attributes = redisxGetAttributes(cl); + + ... + + redisxUnlockClient(cl); + } +``` + + ### Pipelined transactions @@ -1045,7 +1180,6 @@ Some obvious ways the library could evolve and grow in the not too distant futur - Automated regression testing and coverage tracking. - Keep track of subscription patterns, and automatically resubscribe to them on reconnecting. - - Support for the [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) standard and Redis `HELLO`. - Support for [Redis Sentinel](https://redis.io/docs/latest/develop/reference/sentinel-clients/) clients, for high-availability server configurations. - TLS support (perhaps...) diff --git a/build.mk b/build.mk index a6ace54..28a196d 100644 --- a/build.mk +++ b/build.mk @@ -46,7 +46,7 @@ clean: clean-local # Remove intermediate files (general) .PHONY: distclean -distclean: distclean-local +distclean: clean distclean-local # Static code analysis using 'cppcheck' .PHONY: analyze diff --git a/config.mk b/config.mk index d98cd61..18c9a96 100644 --- a/config.mk +++ b/config.mk @@ -27,19 +27,27 @@ CC ?= gcc CPPFLAGS += -I$(INC) # Base compiler options (if not defined externally...) -CFLAGS ?= -Os -Wall -std=c99 +CFLAGS ?= -g -Os -Wall + +# Compile for specific C standard +ifdef CSTANDARD + CFLAGS += -std=$(CSTANDARD) +endif # Extra warnings (not supported on all compilers) -#CFLAGS += -Wextra +ifeq ($(WEXTRA), 1) + CFLAGS += -Wextra +endif # Extra linker flags (if any) #LDFLAGS= # cppcheck options for 'check' target CHECKOPTS ?= --enable=performance,warning,portability,style --language=c \ - --error-exitcode=1 --std=c99 $(CHECKEXTRA) + --error-exitcode=1 --std=c99 -CHECKOPTS += --template='{file}({line}): {severity} ({id}): {message}' --inline-suppr +# Add-on ccpcheck options +CHECKOPTS += --inline-suppr $(CHECKEXTRA) # Exhaustive checking for newer cppcheck #CHECKOPTS += --check-level=exhaustive @@ -53,11 +61,6 @@ CHECKOPTS += --template='{file}({line}): {severity} ({id}): {message}' --inline- # Below are some generated constants based on the one that were set above # ============================================================================ -# Compiler and linker options etc. -ifeq ($(BUILD_MODE),debug) - CFLAGS += -g -DDEBUG -endif - # Link against math libs (for e.g. isnan()), and xchange dependency LDFLAGS += -lm -lxchange diff --git a/include/redisx-priv.h b/include/redisx-priv.h index 735f1f3..1c71977 100644 --- a/include/redisx-priv.h +++ b/include/redisx-priv.h @@ -50,6 +50,7 @@ typedef struct { int next; ///< Index of next unconsumed byte in buffer. int socket; ///< Changing the socket should require both locks! int pendingRequests; ///< Number of request sent and not yet answered... + RESP *attributes; ///< Attributes from the last packet received. } ClientPrivate; @@ -57,9 +58,14 @@ typedef struct { uint32_t addr; ///< The 32-bit inet address int port; ///< port number (usually 6379) int dbIndex; ///< the zero-based database index - char *username; ///< REdis user name (if any) + char *username; ///< Redis user name (if any) char *password; ///< Redis password (if any) + int timeoutMillis; ///< [ms] Socket read/write timeout + int protocol; ///< RESP version to use + boolean hello; ///< whether to use HELLO (introduced in Redis 6.0.0 only) + RESP *helloData; ///< RESP data received from server during last connection. + RedisClient *clients; pthread_mutex_t configLock; @@ -81,17 +87,23 @@ typedef struct { pthread_mutex_t subscriberLock; MessageConsumer *subscriberList; + RedisPushProcessor pushConsumer; ///< User-defined function to consume RESP3 push messages. + void *pushArg; ///< User-defined argument to pass along with push messages. } RedisPrivate; // in redisx-sub.c ------------------------> -void rConfigLock(Redis *redis); -void rConfigUnlock(Redis *redis); +int rConfigLock(Redis *redis); +int rConfigUnlock(Redis *redis); // in redisx-net.c ------------------------> int rConnectClient(Redis *redis, enum redisx_channel channel); void rCloseClient(RedisClient *cl); boolean rIsLowLatency(const ClientPrivate *cp); +int rCheckClient(const RedisClient *cl); + +// in resp.c ------------------------------> +int redisxAppendRESP(RESP *resp, RESP *part); /// \endcond diff --git a/include/redisx.h b/include/redisx.h index 5350703..0fd8a29 100644 --- a/include/redisx.h +++ b/include/redisx.h @@ -17,22 +17,22 @@ // Configuration constants -------------------------------------------------------> #ifndef REDISX_TCP_PORT /// Default TCP/IP port on which Redis server listens to clients. -# define REDISX_TCP_PORT 6379 +# define REDISX_TCP_PORT 6379 #endif #ifndef REDISX_TCP_BUF_SIZE /// (bytes) Default TCP buffer size (send/recv) for Redis clients. Values <= 0 will use system default. -# define REDISX_TCP_BUF_SIZE 0 +# define REDISX_TCP_BUF_SIZE 0 #endif #ifndef REDISX_CMDBUF_SIZE /// (bytes) Size of many internal arrays, and the max. send chunk size. At least ~16 bytes... -# define REDISX_CMDBUF_SIZE 8192 +# define REDISX_CMDBUF_SIZE 8192 #endif #ifndef REDISX_RCVBUF_SIZE /// (bytes) Redis receive buffer size (at most that much is read from the socket in a single call). -# define REDISX_RCVBUF_SIZE 8192 +# define REDISX_RCVBUF_SIZE 8192 #endif #ifndef REDISX_SET_LISTENER_PRIORITY @@ -46,6 +46,11 @@ # define REDISX_LISTENER_REL_PRIORITY (0.5) #endif +#ifndef REDISX_DEFAULT_TIMEOUT_MILLIS +/// [ms] Default socket read/write timeout for Redis clients +# define REDISX_DEFAULT_TIMEOUT_MILLIS 3000 +#endif + // Various exposed constants -----------------------------------------------------> /// API major version @@ -55,7 +60,7 @@ #define REDISX_MINOR_VERSION 9 /// Integer sub version of the release -#define REDISX_PATCHLEVEL 1 +#define REDISX_PATCHLEVEL 2 /// Additional release information in version, e.g. "-1", or "-rc1". #define REDISX_RELEASE_STRING "-devel" @@ -83,12 +88,33 @@ #define REDISX_VERSION_STRING str_2(REDISX_MAJOR_VERSION) "." str_2(REDISX_MINOR_VERSION) \ "." str_2(REDISX_PATCHLEVEL) REDISX_RELEASE_STRING -// These are the first character ID's for the standard RESP interface implemented by Redis. -#define RESP_ARRAY '*' ///< \hideinitializer RESP array type -#define RESP_INT ':' ///< \hideinitializer RESP integer type -#define RESP_SIMPLE_STRING '+' ///< \hideinitializer RESP simple string type -#define RESP_ERROR '-' ///< \hideinitializer RESP error message type -#define RESP_BULK_STRING '$' ///< \hideinitializer RESP bulk string type +/** + * Enumeration of RESP component types. These are the first character IDs for the standard RESP interface + * implemented by Redis. + * + */ +enum resp_type { + // RESP2 types: + RESP_ARRAY = '*', ///< \hideinitializer RESP array type + RESP_INT = ':', ///< \hideinitializer RESP integer type + RESP_SIMPLE_STRING = '+', ///< \hideinitializer RESP simple string type + RESP_ERROR = '-', ///< \hideinitializer RESP error message type + RESP_BULK_STRING = '$', ///< \hideinitializer RESP bulk string type + + // RESP3 types: + RESP3_NULL = '_', ///< \hideinitializer RESP3 null value + RESP3_DOUBLE = ',', ///< \hideinitializer RESP3 floating-point value + RESP3_BOOLEAN = '#', ///< \hideinitializer RESP3 boolean value + RESP3_BLOB_ERROR = '!', ///< \hideinitializer RESP3 blob error + RESP3_VERBATIM_STRING = '=', ///< \hideinitializer RESP3 verbatim string (with type) + RESP3_BIG_NUMBER = '(', ///< \hideinitializer RESP3 big integer / decimal + RESP3_MAP = '%', ///< \hideinitializer RESP3 dictionary of key / value + RESP3_SET = '~', ///< \hideinitializer RESP3 unordered set of elements + RESP3_ATTRIBUTE = '|', ///< \hideinitializer RESP3 dictionary of attributes (metadata) + RESP3_PUSH = '>', ///< \hideinitializer RESP3 dictionary of attributes (metadata) +}; + +#define RESP3_CONTINUED ';' ///< \hideinitializer RESP3 dictionary of attributes (metadata) #define REDIS_INVALID_CHANNEL (-101) ///< \hideinitializer There is no such channel in the Redis instance. #define REDIS_NULL (-102) ///< \hideinitializer Redis returned NULL @@ -112,20 +138,52 @@ enum redisx_channel { #define REDISX_CHANNELS (REDISX_SUBSCRIPTION_CHANNEL + 1) ///< \hideinitializer The number of channels a Redis instance has. +/** + * The RESP protocol to use for a Redis instance. Redis originally used RESP2, but later releases added + * support for RESP3. + * + */ +enum redisx_protocol { + REDISX_RESP2 = 2, ///< \hideinitializer RESP2 protocol + REDISX_RESP3 ///< \hideinitializer RESP3 protocol (since Redis version 6.0.0) +}; /** * \brief Structure that represents a Redis response (RESP format). * + * REFERENCES: + *
    + *
  1. https://github.com/redis/redis-specifications/tree/master/protocol
  2. + *
+ * * \sa redisxDestroyRESP() + * \sa redisxIsScalarType() + * \sa redisxIsStringType() + * \sa redisxIsArrayType() + * \sa redisxIsMapType() */ typedef struct RESP { - char type; ///< RESP type RESP_ARRAY, RESP_INT ... - int n; ///< Either the integer value of a RESP_INT response, or the + enum resp_type type; ///< RESP type; RESP_ARRAY, RESP_INT ... + int n; ///< Either the integer value of a RESP_INT or a RESP3_BOOLEAN response, or the ///< dimension of the value field. void *value; ///< Pointer to text (char *) content or to an array of components - ///< (RESP**)... + ///< (RESP**) or (RedisMapEntry *), or else a pointer to a `double`, depending + ///< on `type`. } RESP; +/** + * Structure that represents a key/value mapping in RESP3. + * + * @sa redisxIsMapType() + * @sa RESP3_MAP + * @sa RESP3_ATTRIBUTE + * + */ +typedef struct { + RESP *key; ///< The keyword component + RESP *value; ///< The associated value component +} RedisMapEntry; + /** * \brief A single key / value entry, or field, in the Redis database. @@ -134,7 +192,7 @@ typedef struct RESP { typedef struct RedisEntry { char *key; ///< The Redis key or field name char *value; ///< The string value stored for that field. - int length; ///< Bytes in value. + int length; ///< Bytes in value. } RedisEntry; @@ -207,7 +265,6 @@ typedef struct Redis { */ typedef void (*RedisSubscriberCall)(const char *pattern, const char *channel, const char *msg, long length); - /** * User-specified callback function for handling RedisX errors. * @@ -217,6 +274,61 @@ typedef void (*RedisSubscriberCall)(const char *pattern, const char *channel, co */ typedef void (*RedisErrorHandler)(Redis *redis, enum redisx_channel channel, const char *op); + +/** + * A user-defined function for consuming responses from a Redis pipeline connection. The implementation + * should follow a set of simple rules: + * + *
    + *
  • the implementation should not destroy the RESP data. The RESP will be destroyed automatically + * after the call returns. However, the call may retain any data from the RESP itself, provided + * the data is de-referenced from the RESP before return.
  • + *
  • The implementation should not block (aside from maybe a quick mutex unlock) and return quickly, + * so as to not block the client for long periods
  • + *
  • If extensive processing or blocking calls are required to process the message, it is best to + * simply place a copy of the RESP on a queue and then return quickly, and then process the message + * asynchronously in a background thread.
  • + *
+ * + * @param response A response received from the pipeline client. + * + * @sa redisxSetPipelineConsumer() + */ +typedef void (*RedisPipelineProcessor)(RESP *response); + +/** + * A user-defined function for consuming push messages from a Redis client. The implementation + * should follow a set of simple rules: + * + *
    + *
  • the implementation should not destroy the RESP data. The RESP will be destroyed automatically + * after the call returns. However, the call may retain any data from the RESP itself, provided + * that such data is de-referenced from the RESP before the return.
  • + *
  • The implementation should not block (aside from maybe a quick mutex unlock) and return quickly, + * so as to not block the client for long periods
  • + *
  • If extensive processing or blocking calls are required to process the message, it is best to + * simply place a copy of the RESP on a queue and then return quickly, and then process the message + * asynchronously in a background thread.
  • + *
  • The client on which the push notification originated will be locked when this function is + * called, waiting for a response to an earlier query. Thus the implementation should not attempt to + * lock the client again or release the lock. It may send asynchronous requests on the client, e.g. via + * redisxSendRequestAsync(), but it should not try to read a response (given that the client is + * blocked for another read operation). If more flexible client access is needed, the implementation + * should make a copy of the RESP and place it on a queue for asynchronous processing by another thread. + *
  • + *
+ * + * @param cl The Redis client that sent the push. The client is locked for exlusive + * access when this function is called. + * @param message The RESP3 message that was pushed by the client + * @param ptr Additional data passed along. + * + * @sa redisxSetPushProcessor() + */ +typedef void (*RedisPushProcessor)(RedisClient *cl, RESP *message, void *ptr); + + + void redisxSetVerbose(boolean value); boolean redisxIsVerbose(); void redisxDebugTraffic(boolean value); @@ -228,8 +340,12 @@ int redisxSetPort(Redis *redis, int port); int redisxSetUser(Redis *redis, const char *username); int redisxSetPassword(Redis *redis, const char *passwd); int redisxSelectDB(Redis *redis, int idx); +int redisxSetProtocol(Redis *redis, enum redisx_protocol protocol); +enum redisx_protocol redisxGetProtocol(Redis *redis); +RESP *redisxGetHelloData(Redis *redis); Redis *redisxInit(const char *server); +int redisxCheckValid(const Redis *redis); void redisxDestroy(Redis *redis); int redisxConnect(Redis *redis, boolean usePipeline); void redisxDisconnect(Redis *redis); @@ -265,7 +381,8 @@ int redisxGetScanCount(Redis *redis); void redisxDestroyEntries(RedisEntry *entries, int count); void redisxDestroyKeys(char **keys, int count); -int redisxSetPipelineConsumer(Redis *redis, void (*f)(RESP *)); +int redisxSetPipelineConsumer(Redis *redis, RedisPipelineProcessor f); +int redisxSetPushProcessor(Redis *redis, RedisPushProcessor func, void *arg); int redisxPublish(Redis *redis, const char *channel, const char *message, int length); int redisxNotify(Redis *redis, const char *channel, const char *message); @@ -282,10 +399,25 @@ RESP *redisxExecBlockAsync(RedisClient *cl); int redisxLoadScript(Redis *redis, const char *script, char **sha1); int redisxGetTime(Redis *redis, struct timespec *t); +int redisxIsGlobPattern(const char *str); -int redisxCheckRESP(const RESP *resp, char expectedType, int expectedSize); -int redisxCheckDestroyRESP(RESP *resp, char expectedType, int expectedSize); +RESP *redisxCopyOfRESP(const RESP *resp); +int redisxCheckRESP(const RESP *resp, enum resp_type expectedType, int expectedSize); +int redisxCheckDestroyRESP(RESP *resp, enum resp_type, int expectedSize); void redisxDestroyRESP(RESP *resp); +boolean redisxIsScalarType(const RESP *r); +boolean redisxIsStringType(const RESP *r); +boolean redisxIsArrayType(const RESP *r); +boolean redisxIsMapType(const RESP *r); +boolean redisxHasComponents(const RESP *r); +boolean redisxIsEqualRESP(const RESP *a, const RESP *b); +int redisxSplitText(RESP *resp, char **text); +XField *redisxRESP2XField(const char *name, const RESP *resp); +char *redisxRESP2JSON(const char *name, const RESP *resp); +int redisxPrintRESP(const char *name, const RESP *resp); + +RedisMapEntry *redisxGetMapEntry(const RESP *map, const RESP *key); +RedisMapEntry *redisxGetKeywordEntry(const RESP *map, const char *key); // Locks for async calls int redisxLockClient(RedisClient *cl); @@ -298,10 +430,13 @@ int redisxSendArrayRequestAsync(RedisClient *cl, char *args[], int length[], int int redisxSetValueAsync(RedisClient *cl, const char *table, const char *key, const char *value, boolean confirm); int redisxMultiSetAsync(RedisClient *cl, const char *table, const RedisEntry *entries, int n, boolean confirm); RESP *redisxReadReplyAsync(RedisClient *cl); +int redisxClearAttributesAsync(RedisClient *cl); +const RESP *redisxGetAttributesAsync(const RedisClient *cl); int redisxIgnoreReplyAsync(RedisClient *cl); int redisxSkipReplyAsync(RedisClient *cl); int redisxPublishAsync(Redis *redis, const char *channel, const char *data, int length); + // Error generation with stderr message... int redisxError(const char *func, int errorCode); const char* redisxErrorDescription(int code); diff --git a/src/redisx-client.c b/src/redisx-client.c index 0e0ac95..f9f2ef5 100644 --- a/src/redisx-client.c +++ b/src/redisx-client.c @@ -12,6 +12,7 @@ #include #include #include +#include #if __Lynx__ # include #else @@ -20,11 +21,6 @@ #include "redisx-priv.h" -#ifndef REDIS_TIMEOUT_SECONDS -/// (seconds) Abort with an error if cannot send before this timeout (<=0 for not timeout) -#endif -# define REDIS_TIMEOUT_SECONDS 3 - #ifndef REDIS_SIMPLE_STRING_SIZE /// (bytes) Only store up to this many characters from Redis confirms and errors. # define REDIS_SIMPLE_STRING_SIZE 256 @@ -46,6 +42,24 @@ int debugTraffic = FALSE; ///< Whether to print excerpts of all traffic to/fr /// \endcond +/// \cond PROTECTED + +/** + * Checks that a redis instance is valid. + * + * @param cl The Redis client instance + * @return X_SUCCESS (0) if the client is valid, or X_NULL if the argument is NULL, + * or else X_NO_INIT if the redis instance is not initialized. + */ +int rCheckClient(const RedisClient *cl) { + static const char *fn = "rCheckRedis"; + if(!cl) return x_error(X_NULL, EINVAL, fn, "Redis client is NULL"); + if(!cl->priv) return x_error(X_NO_INIT, EAGAIN, fn, "Redis client is not initialized"); + return X_SUCCESS; +} + +/// \endcond + /** * Specific call for dealing with socket level transmit (send/rcv) errors. It prints a descriptive message to * sdterr, and calls the configured user transmit error handler routine, and either exists the program @@ -275,23 +289,26 @@ static int rSendBytesAsync(ClientPrivate *cp, const char *buf, int length, boole * \param redis Pointer to a Redis instance. * \param channel REDISX_INTERACTIVE_CHANNEL, REDISX_PIPELINE_CHANNEL, or REDISX_SUBSCRIPTION_CHANNEL * - * \return Pointer to the matching Redis client, or NULL if the channel argument is invalid. + * \return Pointer to the matching Redis client, or NULL if redis is null (EINVAL) or not initialized + * (EAGAIN) or if the channel argument is invalid (ECHRNG). * + * @sa redisxGetLockedConnectedClient() */ RedisClient *redisxGetClient(Redis *redis, enum redisx_channel channel) { + static const char *fn = "redisxGetClient"; + RedisPrivate *p; - if(redis == NULL) { - x_error(0, EINVAL, "redisxGetClient", "redis is NULL"); - return NULL; - } + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null(fn, NULL); p = (RedisPrivate *) redis->priv; - if(channel < 0 || channel >= REDISX_CHANNELS) return NULL; + if(channel < 0 || channel >= REDISX_CHANNELS) { + x_error(0, ECHRNG, fn, "channel %d is our of range", channel); + return NULL; + } return &p->clients[channel]; } - /** * Returns the redis client for a given connection type in a Redis instance, with the exclusive access lock * if the client is valid and is connected, or else NULL. It is effectively the combination of `redisxGetClient()` @@ -307,12 +324,8 @@ RedisClient *redisxGetClient(Redis *redis, enum redisx_channel channel) { * @sa redisxLockConnected() */ RedisClient *redisxGetLockedConnectedClient(Redis *redis, enum redisx_channel channel) { - static const char *fn = "redisxGetLockedConnectedClient"; - RedisClient *cl = redisxGetClient(redis, channel); - if(!cl) return x_trace_null(fn, NULL); - - if(redisxLockConnected(cl) != X_SUCCESS) return x_trace_null(fn, NULL); + if(redisxLockConnected(cl) != X_SUCCESS) return x_trace_null("redisxGetLockedConnectedClient", NULL); return cl; } @@ -334,7 +347,8 @@ int redisxLockClient(RedisClient *cl) { ClientPrivate *cp; int status; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + cp = (ClientPrivate *) cl->priv; if(cp == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); @@ -393,7 +407,8 @@ int redisxUnlockClient(RedisClient *cl) { ClientPrivate *cp; int status; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + cp = (ClientPrivate *) cl->priv; if(cp == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); @@ -420,7 +435,8 @@ int redisxSkipReplyAsync(RedisClient *cl) { static const char *fn = "redisSkipReplyAsync"; static const char cmd[] = "*3\r\n$6\r\nCLIENT\r\n$5\r\nREPLY\r\n$4\r\nSKIP\r\n"; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + if(cl->priv == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); prop_error(fn, rSendBytesAsync((ClientPrivate *) cl->priv, cmd, sizeof(cmd) - 1, TRUE)); @@ -453,7 +469,8 @@ int redisxStartBlockAsync(RedisClient *cl) { static const char *fn = "redisxStartBlockAsync"; static const char cmd[] = "*1\r\n$5\r\nMULTI\r\n"; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + if(cl->priv == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); prop_error(fn, rSendBytesAsync((ClientPrivate *) cl->priv, cmd, sizeof(cmd) - 1, TRUE)); @@ -477,7 +494,8 @@ int redisxAbortBlockAsync(RedisClient *cl) { static const char *fn = "redisxAbortBlockAsync"; static const char cmd[] = "*1\r\n$7\r\nDISCARD\r\n"; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + if(cl->priv == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); prop_error(fn, rSendBytesAsync((ClientPrivate *) cl->priv, cmd, sizeof(cmd) - 1, TRUE)); @@ -506,10 +524,7 @@ RESP *redisxExecBlockAsync(RedisClient *cl) { int status; - if(cl == NULL) { - x_error(0, EINVAL, fn, "client is NULL"); - return NULL; - } + if(rCheckClient(cl) != X_SUCCESS) return x_trace_null(fn, NULL); if(cl->priv == NULL) { x_error(0, EINVAL, fn, "client is not initialized"); @@ -557,7 +572,8 @@ int redisxSendRequestAsync(RedisClient *cl, const char *command, const char *arg const char *args[] = { command, arg1, arg2, arg3 }; int n; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); + if(command == NULL) return x_error(X_NAME_INVALID, EINVAL, fn, "command is NULL"); // Count the non-null arguments... @@ -591,7 +607,7 @@ int redisxSendArrayRequestAsync(RedisClient *cl, char *args[], int lengths[], in int i, L; ClientPrivate *cp; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); cp = (ClientPrivate *) cl->priv; if(cp == NULL) return x_error(X_NO_INIT, EINVAL, fn, "client is not initialized"); @@ -600,12 +616,23 @@ int redisxSendArrayRequestAsync(RedisClient *cl, char *args[], int lengths[], in // Send the number of string elements in the command... L = sprintf(buf, "*%d\r\n", n); + + xvprintf("Redis-X> request[%d]", n); + for(i = 0; i < n; i++) { + if(args[i]) xvprintf(" %s", args[i]); + if(i == 4) { + xvprintf("..."); + } + } + xvprintf("\n"); + for(i = 0; i < n; i++) { int l, L1; if(!args[i]) l = 0; // Check for potential NULL parameters... - else if(!lengths) l = (int) strlen(args[i]); - else l = lengths[i] > 0 ? lengths[i] : (int) strlen(args[i]); + else if(lengths) l = lengths[i] > 0 ? lengths[i] : (int) strlen(args[i]); + else l = (int) strlen(args[i]); + L += sprintf(buf + L, "$%d\r\n", l); @@ -660,7 +687,7 @@ int redisxIgnoreReplyAsync(RedisClient *cl) { static const char *fn = "redisxIgnoreReplyAsync"; RESP *resp; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); + prop_error(fn, rCheckClient(cl)); resp = redisxReadReplyAsync(cl); if(resp == NULL) return x_trace(fn, NULL, REDIS_NULL); @@ -668,6 +695,114 @@ int redisxIgnoreReplyAsync(RedisClient *cl) { return X_SUCCESS; } +static int rTypeIsParametrized(char type) { + switch(type) { + case RESP_INT: + case RESP_BULK_STRING: + case RESP_ARRAY: + case RESP3_SET: + case RESP3_PUSH: + case RESP3_MAP: + case RESP3_ATTRIBUTE: + case RESP3_BLOB_ERROR: + case RESP3_VERBATIM_STRING: + case RESP3_CONTINUED: + return TRUE; + default: + return FALSE; + } +} + + + +static void rPushMessageAsync(RedisClient *cl, RESP *resp) { + int i; + ClientPrivate *cp = (ClientPrivate *) cl->priv; + RedisPrivate *p = (RedisPrivate *) cp->redis->priv; + RESP **array; + + if(resp->n < 0) return; + + array = (RESP **) calloc(resp->n, sizeof(RESP *)); + if(!array) fprintf(stderr, "WARNING! Redis-X : not enough memory for push message (%d elements). Skipping.\n", resp->n); + + for(i = 0; i < resp->n; i++) { + RESP *r = redisxReadReplyAsync(cl); + if(array) array[i] = r; + else redisxDestroyRESP(r); + } + + resp->value = array; + + if(p->pushConsumer) p->pushConsumer(cl, resp, p->pushArg); + + redisxDestroyRESP(resp); +} + +/** + * Returns the attributes (if any) that were last sent along a response to the client. + * This function should be called only if the caller has an exclusive lock on the client's + * mutex. Also, there are a few rules the caller should follow: + * + *
    + *
  • The caller should not block the client for long and return quickly. If it has + * blocking calls, or requires extensive processing, it should make a copy of the + * RESP first, and release the lock immediately after.
  • + *
  • The caller must not attempt to call free() on the returned RESP
  • + *
+ * + * Normally the user would typically call this function right after a redisxReadReplyAsync() + * call, for which atributes are expected. The caller might also want to call + * redisxClearAttributeAsync() before attempting to read the response to ensure that + * the attributes returned are for the same reply from the server. + * + * @param cl The Redis client instance + * @return The attributes last received (possibly NULL). + * + * @sa redisxClearAttributesAsync() + * @sa redisxReadReplyAsync() + * @sa redisxLockClient() + * + */ +const RESP *redisxGetAttributesAsync(const RedisClient *cl) { + const ClientPrivate *cp; + if(!cl) { + x_error(0, EINVAL, "redisxGetAttributesAsync", "client is NULL"); + return NULL; + } + + cp = (ClientPrivate *) cl->priv; + return cp->attributes; +} + +static void rSetAttributeAsync(ClientPrivate *cp, RESP *resp) { + redisxDestroyRESP(cp->attributes); + cp->attributes = resp; +} + +/** + * Clears the attributes for the specified client. The caller should have an exclusive lock + * on the client's mutex prior to making this call. + * + * Typically a user migh call this function prior to calling redisxReadReplyAsync() on the + * same client, to ensure that any attributes that are available after the read will be the + * ones that were sent with the last response from the server. + * + * @param cl The Redis client instance + * @return X_SUCCESS (0) if successful, or else X_NULL if the client is NULL. + * + * @sa redisxGetAttributesAsync() + * @sa redisxReadReplyAsync() + * @sa redisxLockClient() + * + */ +int redisxClearAttributesAsync(RedisClient *cl) { + prop_error("redisxClearAttributesAsync", rCheckClient(cl)); + + rSetAttributeAsync((ClientPrivate *) cl->priv, NULL); + return X_SUCCESS; +} + /** * Reads a response from Redis and returns it. * @@ -686,10 +821,7 @@ RESP *redisxReadReplyAsync(RedisClient *cl) { int size = 0; int status = X_SUCCESS; - if(cl == NULL) { - x_error(0, EINVAL, fn, "client is NULL"); - return NULL; - } + if(rCheckClient(cl) != X_SUCCESS) return x_trace_null(fn, NULL); cp = cl->priv; @@ -702,62 +834,152 @@ RESP *redisxReadReplyAsync(RedisClient *cl) { return NULL; } - size = rReadToken(cp, buf, REDIS_SIMPLE_STRING_SIZE + 1); - if(size < 0) { - // Either read/recv had an error, or we got garbage... - if(cp->isEnabled) x_trace_null(fn, NULL); - cp->isEnabled = FALSE; // Disable this client so we don't attempt to read from it again... - return NULL; - } + for(;;) { + size = rReadToken(cp, buf, REDIS_SIMPLE_STRING_SIZE + 1); + if(size < 0) { + // Either read/recv had an error, or we got garbage... + if(cp->isEnabled) x_trace_null(fn, NULL); + cp->isEnabled = FALSE; // Disable this client so we don't attempt to read from it again... + return NULL; + } - resp = (RESP *) calloc(1, sizeof(RESP)); - x_check_alloc(resp); - resp->type = buf[0]; - - // Get the integer / size value... - if(resp->type == RESP_ARRAY || resp->type == RESP_INT || resp->type == RESP_BULK_STRING) { - char *tail; - errno = 0; - resp->n = (int) strtol(&buf[1], &tail, 10); - if(errno) { - fprintf(stderr, "WARNING! Redis-X : unparseable dimension '%s'\n", &buf[1]); - status = X_PARSE_ERROR; + resp = (RESP *) calloc(1, sizeof(RESP)); + x_check_alloc(resp); + resp->type = buf[0]; + + // Parametrized type. + if(rTypeIsParametrized(resp->type)) { + + if(buf[1] == '?') { + // Streaming RESP in parts... + for(;;) { + RESP *r = redisxReadReplyAsync(cl); + if(r->type != RESP3_CONTINUED) { + int type = r->type; + redisxDestroyRESP(r); + fprintf(stderr, "WARNIG! Redis-X: expected type '%c', got type '%c'.", resp->type, type); + return resp; + } + + if(r->n == 0) { + if(resp->type == RESP3_PUSH || resp->type == RESP3_ATTRIBUTE) break; + if(redisxHasComponents(resp)) break; + return resp; // We are done, return the result. + } + + r->type = resp->type; + redisxAppendRESP(resp, r); + } + } + else { + // Get the integer / size value... + char *tail; + errno = 0; + resp->n = (int) strtol(&buf[1], &tail, 10); + if(errno) { + fprintf(stderr, "WARNING! Redis-X : unparseable dimension '%s'\n", &buf[1]); + status = X_PARSE_ERROR; + } + } } + + // Deal with push messages and attributes... + if(resp->type == RESP3_PUSH) rPushMessageAsync(cl, resp); + else if(resp->type == RESP3_ATTRIBUTE) rSetAttributeAsync(cp, resp); + else break; } // Now get the body of the response... if(!status) switch(resp->type) { - case RESP_ARRAY: { + case RESP3_NULL: + resp->n = 0; + break; + + case RESP_INT: // Nothing left to do for INT type response. + break; + + case RESP3_BOOLEAN: { + switch(tolower(buf[1])) { + case 't': resp->n = TRUE; break; + case 'f': resp->n = FALSE; break; + default: + resp->n = -1; + fprintf(stderr, "WARNING! Redis-X : invalid boolean value '%c'\n", buf[1]); + status = X_PARSE_ERROR; + } + break; + } + + case RESP3_DOUBLE: { + double *dval = (double *) calloc(1, sizeof(double)); + x_check_alloc(dval); + + *dval = xParseDouble(&buf[1], NULL); + if(errno) { + fprintf(stderr, "WARNING! Redis-X : invalid double value '%s'\n", &buf[1]); + status = X_PARSE_ERROR; + } + resp->value = dval; + break; + } + + case RESP_ARRAY: + case RESP3_SET: + case RESP3_PUSH: { RESP **component; int i; if(resp->n <= 0) break; - resp->value = (RESP **) malloc(resp->n * sizeof(RESP *)); - if(resp->value == NULL) { + component = (RESP **) malloc(resp->n * sizeof(RESP *)); + if(component == NULL) { status = x_error(X_FAILURE, errno, fn, "malloc() error (%d RESP)", resp->n); // We should get the data from the input even if we have nowhere to store... } - component = (RESP **) resp->value; for(i=0; in; i++) { - RESP* r = redisxReadReplyAsync(cl); // Always read RESP even if we don't have storage for it... - if(resp->value) component[i] = r; + RESP *r = redisxReadReplyAsync(cl); // Always read RESP even if we don't have storage for it... + if(component) component[i] = r; + else redisxDestroyRESP(r); } // Consistency check. Discard response if incomplete (because of read errors...) - if(resp->value) for(i = 0; i < resp->n; i++) if(component[i] == NULL) { + if(component) for(i = 0; i < resp->n; i++) if(component[i] == NULL || component[i]->type == RESP3_NULL) { fprintf(stderr, "WARNING! Redis-X : incomplete array received (index %d of %d).\n", (i+1), resp->n); if(!status) status = REDIS_INCOMPLETE_TRANSFER; break; } + resp->value = component; + + break; + } + + case RESP3_MAP: + case RESP3_ATTRIBUTE: { + RedisMapEntry *dict; + int i; + + if(resp->n <= 0) break; + + dict = (RedisMapEntry *) calloc(resp->n, sizeof(RedisMapEntry)); + x_check_alloc(dict); + + for(i=0; in; i++) { + RedisMapEntry *e = &dict[i]; + e->key = redisxReadReplyAsync(cl); + e->value = redisxReadReplyAsync(cl); + } + resp->value = dict; + break; } case RESP_BULK_STRING: + case RESP3_BLOB_ERROR: + case RESP3_VERBATIM_STRING: if(resp->n < 0) break; // no string token following! resp->value = malloc(resp->n + 2); // \r\n @@ -781,6 +1003,7 @@ RESP *redisxReadReplyAsync(RedisClient *cl) { case RESP_SIMPLE_STRING: case RESP_ERROR: + case RESP3_BIG_NUMBER: resp->value = malloc(size); if(resp->value == NULL) { @@ -795,9 +1018,6 @@ RESP *redisxReadReplyAsync(RedisClient *cl) { break; - case RESP_INT: // Nothing left to do for INT type response. - break; - default: // FIXME workaround for Redis 4.x improper OK reply to QUIT if(!strcmp(buf, "OK")) { @@ -836,8 +1056,6 @@ int redisxResetClient(RedisClient *cl) { int status; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); - prop_error(fn, redisxLockConnected(cl)); status = redisxSendRequestAsync(cl, "RESET", NULL, NULL, NULL); diff --git a/src/redisx-hooks.c b/src/redisx-hooks.c index fac2459..7d7ab8f 100644 --- a/src/redisx-hooks.c +++ b/src/redisx-hooks.c @@ -37,14 +37,13 @@ int redisxAddConnectHook(Redis *redis, void (*setupCall)(Redis *)) { static const char *fn = "redisxAddConnectHook"; RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); if(setupCall == NULL) return x_error(X_NULL, EINVAL, fn, "setupCall is NULL"); xvprintf("Redis-X> Adding a connect callback.\n"); + prop_error(fn, rConfigLock(redis)); p = (RedisPrivate *) redis->priv; - rConfigLock(redis); if(p->firstConnectCall == NULL) p->firstConnectCall = createHook(redis, setupCall); else { // Check if the specified hook is already added... @@ -76,14 +75,13 @@ int redisxRemoveConnectHook(Redis *redis, void (*setupCall)(Redis *)) { RedisPrivate *p; Hook *c, *last = NULL; - if(redis == NULL) x_error(X_NULL, EINVAL, fn, "redis is NULL"); + if(setupCall == NULL) x_error(X_NULL, EINVAL, fn, "setupCall is NULL"); xvprintf("Redis-X> Removing a connect callback.\n"); + prop_error(fn, rConfigLock(redis)); p = (RedisPrivate *) redis->priv; - - rConfigLock(redis); c = p->firstConnectCall; while(c != NULL) { @@ -112,13 +110,10 @@ void redisxClearConnectHooks(Redis *redis) { RedisPrivate *p; Hook *c; - if(redis == NULL) return; - xvprintf("Redis-X> Clearing all connect callbacks.\n"); + if(rConfigLock(redis) != X_SUCCESS) return; p = (RedisPrivate *) redis->priv; - - rConfigLock(redis); c = p->firstConnectCall; while(c != NULL) { @@ -147,14 +142,13 @@ int redisxAddDisconnectHook(Redis *redis, void (*cleanupCall)(Redis *)) { RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); if(cleanupCall == NULL) return x_error(X_NULL, EINVAL, fn, "cleanupCall is NULL"); xvprintf("Redis-X> Adding a disconnect callback.\n"); + prop_error(fn, rConfigLock(redis)); p = (RedisPrivate *) redis->priv; - rConfigLock(redis); if(p->firstCleanupCall == NULL) p->firstCleanupCall = createHook(redis, cleanupCall); else { // Check if the specified hook is already added... @@ -186,14 +180,12 @@ int redisxRemoveDisconnectHook(Redis *redis, void (*cleanupCall)(Redis *)) { RedisPrivate *p; Hook *c, *last = NULL; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); if(cleanupCall == NULL) return x_error(X_NULL, EINVAL, fn, "cleanupCall is NULL"); xvprintf("Redis-X> Removing a disconnect callback.\n"); + prop_error(fn, rConfigLock(redis)); p = (RedisPrivate *) redis->priv; - - rConfigLock(redis); c = p->firstCleanupCall; while(c != NULL) { @@ -222,13 +214,10 @@ void redisxClearDisconnectHooks(Redis *redis) { RedisPrivate *p; Hook *c; - if(redis == NULL) return; - xvprintf("Redis-X> Clearing all disconnect callbacks.\n"); + if(rConfigLock(redis) != X_SUCCESS) return; p = (RedisPrivate *) redis->priv; - - rConfigLock(redis); c = p->firstCleanupCall; while(c != NULL) { diff --git a/src/redisx-net.c b/src/redisx-net.c index 4e276a4..2422a9a 100644 --- a/src/redisx-net.c +++ b/src/redisx-net.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #if __Lynx__ @@ -87,21 +88,21 @@ static int hostnameToIP(const char *hostName, char *ip) { /** * Configure the Redis client sockets for optimal performance... * - * \param socket The socket file descriptor. - * \param lowLatency TRUE (non-zero) if socket is to be configured for low latency, or else FALSE (0). + * \param socket The socket file descriptor. + * \param timeoutMillis [ms] Socket read/write timeout, or <0 to no set. + * \param lowLatency TRUE (non-zero) if socket is to be configured for low latency, or else FALSE (0). * */ -static void rConfigSocket(int socket, boolean lowLatency) { +static void rConfigSocket(int socket, int timeoutMillis, boolean lowLatency) { const boolean enable = TRUE; -#if REDIS_TIMEOUT_SECONDS > 0 - { + if(timeoutMillis > 0) { struct linger linger; struct timeval timeout; // Set a time limit for sending. - timeout.tv_sec = REDIS_TIMEOUT_SECONDS; - timeout.tv_usec = 0; + timeout.tv_sec = timeoutMillis / 1000; + timeout.tv_usec = 1000 * (timeoutMillis % 1000); if(setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, & timeout, sizeof(struct timeval))) xvprintf("WARNING! Redix-X: socket send timeout not set: %s", strerror(errno)); @@ -113,7 +114,6 @@ static void rConfigSocket(int socket, boolean lowLatency) { if(setsockopt(socket, SOL_SOCKET, SO_LINGER, & linger, sizeof(struct timeval))) xvprintf("WARNING! Redis-X: socket linger not set: %s", strerror(errno)); } -#endif #if __linux__ { @@ -127,9 +127,9 @@ static void rConfigSocket(int socket, boolean lowLatency) { #endif #if !(__Lynx__ && __powerpc__) - // Send packets immediately even if small... - if(lowLatency) if(setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, & enable, sizeof(int))) - xvprintf("WARNING! Redis-X: socket tcpnodelay not enabled: %s", strerror(errno)); + // Send packets immediately even if small... + if(lowLatency) if(setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, & enable, sizeof(int))) + xvprintf("WARNING! Redis-X: socket tcpnodelay not enabled: %s", strerror(errno)); #endif // Check connection to remote every once in a while to detect if it's down... @@ -313,9 +313,10 @@ static void rCloseClientAsync(RedisClient *cl) { * */ void rCloseClient(RedisClient *cl) { - redisxLockClient(cl); - rCloseClientAsync(cl); - redisxUnlockClient(cl); + if(redisxLockClient(cl) == X_SUCCESS) { + rCloseClientAsync(cl); + redisxUnlockClient(cl); + } return; } @@ -363,7 +364,7 @@ static int rReconnectAsync(Redis *redis, boolean usePipeline) { * */ void redisxDisconnect(Redis *redis) { - if(redis == NULL) return; + if(redisxCheckValid(redis) != X_SUCCESS) return; rConfigLock(redis); rDisconnectAsync(redis); @@ -387,12 +388,9 @@ int redisxReconnect(Redis *redis, boolean usePipeline) { int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - - rConfigLock(redis); + prop_error(fn, rConfigLock(redis)); status = rReconnectAsync(redis, usePipeline); rConfigUnlock(redis); - prop_error(fn, status); return X_SUCCESS; @@ -502,6 +500,50 @@ boolean rIsLowLatency(const ClientPrivate *cp) { return cp->idx != REDISX_PIPELINE_CHANNEL; } +static int rHelloAsync(RedisClient *cl, char *clientID) { + ClientPrivate *cp = (ClientPrivate *) cl->priv; + RedisPrivate *p = (RedisPrivate *) cp->redis->priv; + RESP *reply; + char proto[20]; + char *args[6]; + int status, k = 0; + + args[k++] = "HELLO"; + + // Try HELLO and see what we get back... + sprintf(proto, "%d", (int) p->protocol); + args[k++] = proto; + + if(p->password) { + args[k++] = "AUTH"; + args[k++] = p->username ? p->username : "default"; + args[k++] = p->password; + } + + args[k++] = "SETNAME"; + args[k++] = clientID; + + status = redisxSendArrayRequestAsync(cl, args, NULL, k); + if(status != X_SUCCESS) return status; + + reply = redisxReadReplyAsync(cl); + status = redisxCheckRESP(reply, RESP3_MAP, 0); + if(status == X_SUCCESS) { + RedisMapEntry *e = redisxGetKeywordEntry(reply, "proto"); + if(e && e->value->type == RESP_INT) { + p->protocol = e->value->n; + xvprintf("Confirmed protocol %d\n", p->protocol); + } + + redisxDestroyRESP(p->helloData); + p->helloData = reply; + } + else xvprintf("! Redis-X: HELLO failed: %s\n", redisxErrorDescription(status)); + + return status; +} + + /** * Connects the specified Redis client to the Redis server. * @@ -521,13 +563,13 @@ int rConnectClient(Redis *redis, enum redisx_channel channel) { struct sockaddr_in serverAddress; struct utsname u; - const RedisPrivate *p; + RedisPrivate *p; RedisClient *cl; ClientPrivate *cp; const char *channelID; char host[200], *id; - int status; + int status = X_SUCCESS; int sock; cl = redisxGetClient(redis, channel); @@ -543,7 +585,7 @@ int rConnectClient(Redis *redis, enum redisx_channel channel) { if((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) return x_error(X_NO_SERVICE, errno, fn, "client %d socket creation failed", channel); - rConfigSocket(sock, rIsLowLatency(cp)); + rConfigSocket(sock, p->timeoutMillis, rIsLowLatency(cp)); while(connect(sock, (struct sockaddr *) &serverAddress, sizeof(serverAddress)) != 0) { close(sock); @@ -552,20 +594,6 @@ int rConnectClient(Redis *redis, enum redisx_channel channel) { xvprintf("Redis-X> client %d assigned socket fd %d.\n", channel, sock); - redisxLockClient(cl); - - cp->socket = sock; - cp->isEnabled = TRUE; - - if(p->password) { - status = rAuthAsync(cl); - if(status) { - rCloseClientAsync(cl); - redisxUnlockClient(cl); - return status; - } - } - // Set the client name in Redis. uname(&u); strncpy(host, u.nodename, sizeof(host) - 1); @@ -577,11 +605,26 @@ int rConnectClient(Redis *redis, enum redisx_channel channel) { case REDISX_SUBSCRIPTION_CHANNEL: channelID = "subscription"; break; default: channelID = "unknown"; } - sprintf(id, "%s:pid-%d:%s", host, (int) getppid(), channelID); - status = redisxSkipReplyAsync(cl); - if(!status) status = redisxSendRequestAsync(cl, "CLIENT", "SETNAME", id, NULL); + redisxLockClient(cl); + + cp->socket = sock; + cp->isEnabled = TRUE; + + if(p->hello) status = rHelloAsync(cl, id); + + if(status != X_SUCCESS) { + status = X_SUCCESS; + p->hello = FALSE; + + // No HELLO, go the old way... + p->protocol = REDISX_RESP2; + if(p->password) status = rAuthAsync(cl); + + if(!status) status = redisxSkipReplyAsync(cl); + if(!status) status = redisxSendRequestAsync(cl, "CLIENT", "SETNAME", id, NULL); + } free(id); @@ -650,13 +693,15 @@ Redis *redisxInit(const char *server) { redis->subscription = &p->clients[REDISX_SUBSCRIPTION_CHANNEL]; redis->id = xStringCopyOf(ipAddress); - for(i=REDISX_CHANNELS; --i >= 0; ) { + for(i = REDISX_CHANNELS; --i >= 0; ) { ClientPrivate *cp = (ClientPrivate *) p->clients[i].priv; cp->redis = redis; } p->addr = inet_addr((char *) ipAddress); p->port = REDISX_TCP_PORT; + p->protocol = REDISX_RESP2; // Default + p->timeoutMillis = REDISX_DEFAULT_TIMEOUT_MILLIS; l = (ServerLink *) calloc(1, sizeof(ServerLink)); x_check_alloc(l); @@ -687,7 +732,7 @@ void redisxDestroy(Redis *redis) { if(redisxIsConnected(redis)) redisxDisconnect(redis); - for(i=REDISX_CHANNELS; --i >= 0; ) { + for(i = REDISX_CHANNELS; --i >= 0; ) { ClientPrivate *cp = (ClientPrivate *) p->clients[i].priv; pthread_mutex_destroy(&cp->readLock); pthread_mutex_destroy(&cp->writeLock); @@ -719,15 +764,40 @@ void redisxSetTcpBuf(int size) { * @param redis Pointer to a Redis instance. * @param port The TCP port number to use. * + * @return X_SUCCESS (0) if successful, or else X_NULL if the redis instance is NULL, + * or X_N_INIT if the redis instance is not initialized. + * * @sa redisxConnect(); */ int redisxSetPort(Redis *redis, int port) { RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, "redisxSetPort", "redis is NULL"); - + prop_error("redisxSetPort", rConfigLock(redis)); p = (RedisPrivate *) redis->priv; p->port = port; + rConfigUnlock(redis); + + return X_SUCCESS; +} + + +/** + * Sets a socket timeout for future client connections on a Redis instance. If not set (or set to zero + * or a negative value), then the timeout will not be configured for sockets, and the system default + * timeout values will apply. + * + * @param redis The Redis instance + * @param timeoutMillis [ms] The desired socket read/write timeout, or <0 for socket default. + * @return X_SUCCESS (0) if successful, or else X_NULL if the redis instance is NULL, + * or X_N_INIT if the redis instance is not initialized. + */ +int redisxSetSocketTimeout(Redis *redis, int timeoutMillis) { + RedisPrivate *p; + + prop_error("redisxSetPort", rConfigLock(redis)); + p = (RedisPrivate *) redis->priv; + p->timeoutMillis = timeoutMillis; + rConfigUnlock(redis); return X_SUCCESS; } @@ -757,14 +827,9 @@ int redisxConnect(Redis *redis, boolean usePipeline) { static const char *fn = "redisxConnect"; int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - - rConfigLock(redis); - + prop_error(fn, rConfigLock(redis)); status = rConnectAsync(redis, usePipeline); - rConfigUnlock(redis); - prop_error(fn, status); return X_SUCCESS; @@ -781,8 +846,7 @@ int redisxConnect(Redis *redis, boolean usePipeline) { boolean redisxIsConnected(Redis *redis) { const ClientPrivate *ip; - if(redis == NULL) return FALSE; - + if(redisxCheckValid(redis) != X_SUCCESS) return FALSE; ip = (ClientPrivate *) redis->interactive->priv; return ip->isEnabled; } @@ -810,10 +874,7 @@ void *RedisPipelineListener(void *pRedis) { xvprintf("Redis-X> Started processing pipelined responses...\n"); - if(redis == NULL) { - x_error(0, EINVAL, "RedisPipelineListener", "redis is NULL"); - return NULL; - } + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null("RedisPipelineListener", NULL); p = (RedisPrivate *) redis->priv; cl = redis->pipeline; diff --git a/src/redisx-script.c b/src/redisx-script.c index 220a046..a1b90a3 100644 --- a/src/redisx-script.c +++ b/src/redisx-script.c @@ -33,9 +33,8 @@ int redisxLoadScript(Redis *redis, const char *script, char **sha1) { static const char *fn = "redisxLoadScript"; RESP *reply; - int status; + int status = X_SUCCESS; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); if(script == NULL) return x_error(X_NULL, EINVAL, fn, "input script is NULL"); if(*script == '\0') return x_error(X_NULL, EINVAL, fn, "input script is empty"); @@ -81,8 +80,9 @@ int redisxRunScriptAsync(RedisClient *cl, const char *sha1, const char **keys, c int i = 0, k, nkeys = 0, nparams = 0, nargs; char sn[20], **args; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "client is NULL"); - if(sha1 == NULL) return x_error(X_NULL, EINVAL, fn, "input script SHA1 sum is NULL"); + prop_error(fn, rCheckClient(cl)); + + if(sha1 == NULL) return x_error(X_NULL, EINVAL, fn, "input script SHA1 sum is NULL"); if(keys) while(keys[nkeys]) nkeys++; if(params) while(params[nparams]) nparams++; @@ -128,7 +128,12 @@ RESP *redisxRunScript(Redis *redis, const char *sha1, const char **keys, const c RESP *reply = NULL; - if(redis == NULL || sha1 == NULL) return NULL; + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null(fn, NULL); + + if(sha1 == NULL) { + x_error(0, EINVAL, fn, "sha1 parameter is NULL"); + return NULL; + } if(redisxLockConnected(redis->interactive) != X_SUCCESS) return x_trace_null(fn, NULL); diff --git a/src/redisx-sub.c b/src/redisx-sub.c index 3e7aebb..98bdf98 100644 --- a/src/redisx-sub.c +++ b/src/redisx-sub.c @@ -23,9 +23,11 @@ static int rStartSubscriptionListenerAsync(Redis *redis); * \param redis Pointer to a Redis instance. * */ -static void rSubscriberLock(Redis *redis) { +static int rSubscriberLock(Redis *redis) { + prop_error("rSubscriberLock", redisxCheckValid(redis)); RedisPrivate *p = (RedisPrivate *) redis->priv; pthread_mutex_lock(&p->subscriberLock); + return X_SUCCESS; } /** @@ -34,9 +36,11 @@ static void rSubscriberLock(Redis *redis) { * \param redis Pointer to a Redis instance. * */ -static void rSubscriberUnlock(Redis *redis) { +static int rSubscriberUnlock(Redis *redis) { + prop_error("rSubscriberLock", redisxCheckValid(redis)); RedisPrivate *p = (RedisPrivate *) redis->priv; pthread_mutex_unlock(&p->subscriberLock); + return X_SUCCESS; } /** @@ -51,10 +55,16 @@ static void rSubscriberUnlock(Redis *redis) { static int rConnectSubscriptionClientAsync(Redis *redis) { static const char *fn = "rConnectSubscriptionClientAsync"; - int status; - const ClientPrivate *sp = (ClientPrivate *) redis->subscription->priv; + int status, isEnabled; + const ClientPrivate *sp; - if(sp->isEnabled) { + prop_error(fn, redisxCheckValid(redis)); + prop_error(fn, redisxLockClient(redis->subscription)); + sp = (ClientPrivate *) redis->subscription->priv; + isEnabled = sp->isEnabled; + redisxUnlockClient(redis->subscription); + + if(isEnabled) { x_warn(fn, "Redis-X : pub/sub client is already connected at %s.\n", redis->id); return X_SUCCESS; } @@ -89,7 +99,8 @@ int redisxPublishAsync(Redis *redis, const char *channel, const char *data, int char *args[3]; int L[3] = {0}; - if(redis == NULL) return x_error(X_NULL,EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxCheckValid(redis)); + if(channel == NULL) return x_error(X_NULL,EINVAL, fn, "channel parameter is NULL"); if(*channel == '\0') return x_error(X_NULL,EINVAL, fn, "channel parameter is empty"); @@ -133,8 +144,7 @@ int redisxPublish(Redis *redis, const char *channel, const char *data, int lengt int status = 0; - if(redis == NULL) return x_error(X_NULL,EINVAL, fn, "redis is NULL"); - + prop_error(fn, redisxCheckValid(redis)); prop_error(fn, redisxLockConnected(redis->interactive)); // Now send the message @@ -201,12 +211,9 @@ int redisxAddSubscriber(Redis *redis, const char *channelStem, RedisSubscriberCa MessageConsumer *c; RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - + prop_error(fn, rSubscriberLock(redis)); p = (RedisPrivate *) redis->priv; - rSubscriberLock(redis); - // Check if the subscriber is already listed with the same stem. If so, nothing to do... for(c = p->subscriberList; c != NULL; c = c->next) { if(f != c->func) continue; @@ -261,13 +268,11 @@ int redisxRemoveSubscribers(Redis *redis, RedisSubscriberCall f) { MessageConsumer *c, *last = NULL; int removed = 0; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); if(!f) return x_error(X_NULL, EINVAL, fn, "subscriber function parameter is NULL"); + prop_error(fn, rSubscriberLock(redis)); p = (RedisPrivate *) redis->priv; - rSubscriberLock(redis); - for(c = p->subscriberList; c != NULL; ) { MessageConsumer *next = c->next; @@ -306,11 +311,8 @@ int redisxClearSubscribers(Redis *redis) { MessageConsumer *c; int n = 0; - if(redis == NULL) return x_error(X_NULL, EINVAL, "redisxClearSubscrivers", "redis is NULL"); - + prop_error("redisxClearSubscribers", rSubscriberLock(redis)); p = (RedisPrivate *) redis->priv; - - rSubscriberLock(redis); c = p->subscriberList; p->subscriberList = NULL; rSubscriberUnlock(redis); @@ -327,22 +329,6 @@ int redisxClearSubscribers(Redis *redis) { return n; } -/** - * Checks if a given string is a glob-style pattern. - * - * \param str The string to check. - * - * \return TRUE if it is a glob pattern (e.g. has '*', '?' or '['), otherwise FALSE. - * - */ -static int rIsGlobPattern(const char *str) { - for(; *str; str++) switch(*str) { - case '*': - case '?': - case '[': return TRUE; - } - return FALSE; -} /** * Subscribe to a specific Redis channel. The call will also start the subscription listener @@ -370,11 +356,10 @@ int redisxSubscribe(Redis *redis, const char *pattern) { const ClientPrivate *cp; int status = 0; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); if(pattern == NULL) return x_error(X_NULL, EINVAL, fn, "pattern parameter is NULL"); // connect subscription client as needed. - rConfigLock(redis); + prop_error(fn, rConfigLock(redis)); cp = (ClientPrivate *) redis->subscription->priv; if(!cp->isEnabled) status = rConnectSubscriptionClientAsync(redis); if(!status) { @@ -383,15 +368,11 @@ int redisxSubscribe(Redis *redis, const char *pattern) { if(!p->isSubscriptionListenerEnabled) rStartSubscriptionListenerAsync(redis); } rConfigUnlock(redis); - - prop_error(fn, status); - - status = redisxLockConnected(redis->subscription); prop_error(fn, status); - status = redisxSendRequestAsync(redis->subscription, rIsGlobPattern(pattern) ? "PSUBSCRIBE" : "SUBSCRIBE", pattern, NULL, NULL); + prop_error(fn, redisxLockConnected(redis->subscription)); + status = redisxSendRequestAsync(redis->subscription, redisxIsGlobPattern(pattern) ? "PSUBSCRIBE" : "SUBSCRIBE", pattern, NULL, NULL); redisxUnlockClient(redis->subscription); - prop_error(fn, status); return X_SUCCESS; @@ -419,12 +400,11 @@ int redisxUnsubscribe(Redis *redis, const char *pattern) { int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - + prop_error(fn, redisxCheckValid(redis)); prop_error(fn, redisxLockConnected(redis->subscription)); if(pattern) { - status = redisxSendRequestAsync(redis->subscription, rIsGlobPattern(pattern) ? "PUNSUBSCRIBE" : "UNSUBSCRIBE", pattern, NULL, NULL); + status = redisxSendRequestAsync(redis->subscription, redisxIsGlobPattern(pattern) ? "PUNSUBSCRIBE" : "UNSUBSCRIBE", pattern, NULL, NULL); } else { status = redisxSendRequestAsync(redis->subscription, "UNSUBSCRIBE", NULL, NULL, NULL); @@ -432,8 +412,8 @@ int redisxUnsubscribe(Redis *redis, const char *pattern) { } redisxUnlockClient(redis->subscription); - prop_error(fn, status); + return X_SUCCESS; } @@ -456,18 +436,18 @@ static int rEndSubscriptionAsync(Redis *redis) { RedisPrivate *p; int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - xvprintf("Redis-X> End all subscriptions, and quit listener.\n"); - p = (RedisPrivate *) redis->priv; - status = redisxUnsubscribe(redis, NULL); + prop_error(fn, rConfigLock(redis)); + p = (RedisPrivate *) redis->priv; p->isSubscriptionListenerEnabled = FALSE; - rCloseClient(redis->subscription); + rConfigUnlock(redis); + rCloseClient(redis->subscription); prop_error(fn, status); + return X_SUCCESS; } @@ -487,13 +467,11 @@ int redisxEndSubscription(Redis *redis) { int status; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - - rConfigLock(redis); + prop_error(fn, rConfigLock(redis)); status = rEndSubscriptionAsync(redis); rConfigUnlock(redis); - prop_error(fn, status); + return X_SUCCESS; } @@ -505,12 +483,9 @@ static void rNotifyConsumers(Redis *redis, char *pattern, char *channel, char *m xdprintf("NOTIFY: %s | %s\n", channel, msg); - if(!redis) return; - + if(rSubscriberLock(redis) != X_SUCCESS) return; p = (RedisPrivate *) redis->priv; - rSubscriberLock(redis); - // Count how many matching subscribers there are... for(c = p->subscriberList ; c != NULL; c = c->next) { if(c->channelStem != NULL) if(strncmp(c->channelStem, channel, strlen(c->channelStem))) continue; @@ -564,10 +539,7 @@ void *RedisSubscriptionListener(void *pRedis) { xvprintf("Redis-X> Started processing subsciptions...\n"); - if(redis == NULL) { - x_error(0, EINVAL, "RedisSubscriptionListener", "redis is NULL"); - return NULL; - } + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null("RedisSubscriptionListener", NULL); p = (RedisPrivate *) redis->priv; cl = redis->subscription; @@ -647,10 +619,11 @@ void *RedisSubscriptionListener(void *pRedis) { } // <-- End of listener loop - rConfigLock(redis); - // If we are the current listener thread, then mark the listener as disabled. - if(pthread_equal(p->subscriptionListenerTID, pthread_self())) p->isSubscriptionListenerEnabled = FALSE; - rConfigUnlock(redis); + if(rConfigLock(redis) == X_SUCCESS) { + // If we are the current listener thread, then mark the listener as disabled. + if(pthread_equal(p->subscriptionListenerTID, pthread_self())) p->isSubscriptionListenerEnabled = FALSE; + rConfigUnlock(redis); + } xvprintf("Redis-X> Stopped processing subscriptions (%d processed)...\n", counter); diff --git a/src/redisx-tab.c b/src/redisx-tab.c index bcfc9de..31eeaf4 100644 --- a/src/redisx-tab.c +++ b/src/redisx-tab.c @@ -51,11 +51,6 @@ RedisEntry *redisxGetTable(Redis *redis, const char *table, int *n) { return NULL; } - if(redis == NULL) { - *n = x_error(X_NULL, EINVAL, fn, "redis is NULL"); - return NULL; - } - if(table == NULL) { *n = x_error(X_GROUP_INVALID, EINVAL, fn, "table parameter is NULL"); return NULL; @@ -73,14 +68,21 @@ RedisEntry *redisxGetTable(Redis *redis, const char *table, int *n) { return x_trace_null(fn, NULL); } - *n = redisxCheckDestroyRESP(reply, RESP_ARRAY, 0); + // Cast RESP2 array respone to RESP3 map also... + if(reply && reply->type == RESP_ARRAY) { + reply->type = RESP3_MAP; + reply->n /= 2; + } + + *n = redisxCheckDestroyRESP(reply, RESP3_MAP, 0); if(*n) { return x_trace_null(fn, NULL); } - *n = reply->n / 2; + *n = reply->n; if(*n > 0) { + RedisMapEntry *dict = (RedisMapEntry *) reply->value; entries = (RedisEntry *) calloc(*n, sizeof(RedisEntry)); if(entries == NULL) { @@ -90,21 +92,19 @@ RedisEntry *redisxGetTable(Redis *redis, const char *table, int *n) { int i; for(i=0; in; i+=2) { - RedisEntry *e = &entries[i>>1]; - RESP **component = (RESP **) reply->value; - - e->key = (char *) component[i]->value; - e->value = (char *) component[i+1]->value; - e->length = component[i+1]->n; - - // Dereference the values from the RESP - component[i]->value = NULL; - component[i+1]->value = NULL; + RedisEntry *e = &entries[i]; + RedisMapEntry *component = &dict[i]; + e->key = component->key->value; + e->value = component->value->value; + + // Dereference the key/value so we don't destroy them with the reply. + component->key->value = NULL; + component->value->value = NULL; } } } - // Free the Reply container, but not the strings inside, which are returned. + // Free the reply container, but not the strings inside, which are returned. redisxDestroyRESP(reply); return entries; } @@ -131,12 +131,10 @@ int redisxSetValue(Redis *redis, const char *table, const char *key, const char int status = X_SUCCESS; - if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - + prop_error(fn, redisxCheckValid(redis)); prop_error(fn, redisxLockConnected(redis->interactive)); status = redisxSetValueAsync(redis->interactive, table, key, value, confirm); - redisxUnlockClient(redis->interactive); prop_error(fn, status); @@ -203,8 +201,9 @@ int redisxSetValueAsync(RedisClient *cl, const char *table, const char *key, con * \param[out] status (optional) pointer to the return error status, which is either X_SUCCESS on success or else * the error code set by redisxArrayRequest(). It may be NULL if not required. * - * \return A freshly allocated RESP array containing the Redis response, or NULL if no valid - * response could be obtained. + * \return A freshly allocated RESP containing the Redis response, or NULL if no valid + * response could be obtained. Values are returned as RESP_BULK_STRING (count = 1), + * or else type RESP_ERROR or RESP_NULL if Redis responded with an error or null, respectively. * * \sa redisxGetStringValue() */ @@ -212,13 +211,7 @@ RESP *redisxGetValue(Redis *redis, const char *table, const char *key, int *stat static const char *fn = "redisxGetValue"; RESP *reply; - int s; - - if(redis == NULL) { - x_error(X_NULL, EINVAL, fn, "redis is NULL"); - if(status) *status = X_NULL; - return NULL; - } + int s = X_SUCCESS; if(table && !table[0]) { x_error(X_GROUP_INVALID, EINVAL, fn, "'table' parameter is empty"); @@ -268,20 +261,11 @@ RESP *redisxGetValue(Redis *redis, const char *table, const char *key, int *stat * \sa redisxGetValue() */ char *redisxGetStringValue(Redis *redis, const char *table, const char *key, int *len) { - static const char *fn = "redisxGetStringValue"; - RESP *reply; char *str = NULL; int status; - if(redis == NULL) { - x_error(0, EINVAL, fn, "redis is NULL"); - if(len) *len = X_NULL; - return NULL; - } - reply = redisxGetValue(redis, table, key, len); - status = redisxCheckRESP(reply, RESP_BULK_STRING, 0); if(status == X_SUCCESS) { @@ -293,7 +277,7 @@ char *redisxGetStringValue(Redis *redis, const char *table, const char *key, int redisxDestroyRESP(reply); - if(status) x_trace_null(fn, NULL); + if(status) x_trace_null("redisxGetStringValue", NULL); return str; } @@ -339,14 +323,14 @@ int redisxMultiSetAsync(RedisClient *cl, const char *table, const RedisEntry *en return x_trace(fn, NULL, X_FAILURE); } - req[0] = "HMSET"; + req[0] = "HMSET"; // TODO, as of Redis 4.0.0, just use HSET... req[1] = (char *) table; for(i=0; iinteractive)); - status = redisxMultiSetAsync(redis->interactive, table, entries, n, confirm); if(status == X_SUCCESS && confirm) { RESP *reply = redisxReadReplyAsync(redis->interactive); @@ -391,6 +375,7 @@ int redisxMultiSet(Redis *redis, const char *table, const RedisEntry *entries, i if(!status) if(strcmp(reply->value, "OK")) status = REDIS_ERROR; redisxDestroyRESP(reply); } + redisxUnlockClient(redis->interactive); prop_error(fn, status); @@ -424,17 +409,12 @@ char **redisxGetKeys(Redis *redis, const char *table, int *n) { return NULL; } - if(redis == NULL) { - *n = x_error(X_NULL, EINVAL, fn, "redis is NULL"); - return NULL; - } - if(table && !table[0]) { *n = x_error(X_NULL, EINVAL, fn, "'table' parameter is empty"); return NULL; } - reply = redisxRequest(redis, "HKEYS", table, NULL, NULL, n); + reply = redisxRequest(redis, table ? "HKEYS" : "KEYS", table ? table : "*", NULL, NULL, n); if(*n) { redisxDestroyRESP(reply); @@ -485,10 +465,8 @@ char **redisxGetKeys(Redis *redis, const char *table, int *n) { int redisxSetScanCount(Redis *redis, int count) { RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, "redisxSetScanCount", "redis is NULL"); - + prop_error("redisxSetScanCount", rConfigLock(redis)); p = (RedisPrivate *) redis->priv; - rConfigLock(redis); p->scanCount = count; rConfigUnlock(redis); @@ -499,7 +477,8 @@ int redisxSetScanCount(Redis *redis, int count) { * Returns the COUNT parameter currently set to be used with Redis SCAN-type commands * * @param redis Pointer to a Redis instance. - * @return The current COUNT to use for SCAN-type commands or <0 to use default. + * @return The current COUNT to use for SCAN-type commands, or <0 in case + * of an error. * * @sa redisxGetScanCount() * @sa redisxScanKeys() @@ -507,11 +486,14 @@ int redisxSetScanCount(Redis *redis, int count) { */ int redisxGetScanCount(Redis *redis) { const RedisPrivate *p; + int count; - if(redis == NULL) return x_error(-1, EINVAL, "redisxSetScanCount", "redis is NULL"); - + prop_error("redisxGetScanCount", rConfigLock(redis)); p = (RedisPrivate *) redis->priv; - return p->scanCount; + count = p->scanCount; + rConfigUnlock(redis); + + return count; } static int compare_strings(const void *a, const void *b) { @@ -555,20 +537,14 @@ char **redisxScanKeys(Redis *redis, const char *pattern, int *n, int *status) { char countArg[20]; int args = 0, i, j, capacity = SCAN_INITIAL_STORE_CAPACITY; - if(status == NULL) { - x_error(0, EINVAL, fn, "'status' parameter is NULL"); - return NULL; - } - if(n == NULL) { x_error(X_NULL, EINVAL, fn, "parameter 'n' is NULL"); - *status = X_NULL; + if(status) *status = X_NULL; return NULL; } - if(redis == NULL) { - x_error(X_NULL, EINVAL, fn, "redis is NULL"); - *status = X_NULL; + if(status == NULL) { + x_error(0, EINVAL, fn, "'status' parameter is NULL"); return NULL; } @@ -592,8 +568,6 @@ char **redisxScanKeys(Redis *redis, const char *pattern, int *n, int *status) { cmd[args++] = countArg; } - xdprintf("Redis-X> Calling SCAN (MATCH %s)\n", pattern); - do { int count; RESP **components; @@ -654,17 +628,16 @@ char **redisxScanKeys(Redis *redis, const char *pattern, int *n, int *status) { } while (strcmp(*pCursor, SCAN_INITIAL_CURSOR)); // Done when cursor is back to 0... - // Check for errors - if(*status) x_trace(fn, NULL, *status); - // Clean up. redisxDestroyRESP(reply); free(*pCursor); + // Check for errors + if(*status) x_trace(fn, NULL, *status); + if(!names) return NULL; // Sort alphabetically. - xdprintf("Redis-X> Sorting %d scanned table entries.\n", *n); qsort(names, *n, sizeof(char *), compare_strings); // Remove duplicates @@ -729,20 +702,14 @@ RedisEntry *redisxScanTable(Redis *redis, const char *table, const char *pattern char **pCursor; int args= 0, i, j, capacity = SCAN_INITIAL_STORE_CAPACITY; - if(status == NULL) { - x_error(0, EINVAL, fn, "'status' parameter is NULL"); - return NULL; - } - if(n == NULL) { x_error(X_NULL, EINVAL, fn, "parameter 'n' is NULL"); - *status = X_NULL; + if(status) *status = X_NULL; return NULL; } - if(redis == NULL) { - x_error(X_NULL, EINVAL, fn, "redis is NULL"); - *status = X_NULL; + if(status == NULL) { + x_error(0, EINVAL, fn, "'status' parameter is NULL"); return NULL; } @@ -779,8 +746,6 @@ RedisEntry *redisxScanTable(Redis *redis, const char *table, const char *pattern cmd[args++] = countArg; } - xdprintf("Redis-X> Calling HSCAN %s (MATCH %s)\n", table, pattern); - do { int count; RESP **components; @@ -853,17 +818,16 @@ RedisEntry *redisxScanTable(Redis *redis, const char *table, const char *pattern } while(strcmp(*pCursor, SCAN_INITIAL_CURSOR)); // Done when cursor is back to 0... - // Check for errors - if(*status) x_trace(fn, NULL, *status); - // Clean up. redisxDestroyRESP(reply); free(*pCursor); + // Check for errors + if(*status) x_trace(fn, NULL, *status); + if(!entries) return NULL; // Sort alphabetically. - xdprintf("Redis-X> Sorting %d scanned table entries.\n", *n); qsort(entries, *n, sizeof(RedisEntry), compare_entries); // Remove duplicates @@ -887,7 +851,16 @@ RedisEntry *redisxScanTable(Redis *redis, const char *table, const char *pattern } /** - * Destroy a RedisEntry array, such as returned e.g. by redisxScanTable() + * Destroy a RedisEntry array with dynamically allocate keys/values, such as returned e.g. by + * redisxScanTable(). + * + * IMPORTANT: + * + * You should not use this function to destroy RedisEntry[] arrays, which contain static + * string references (keys or values). If the table contains only static references you can simply + * call free() on the table. Otherwise, you will have to first free only the dynamically sized + * string fields within before calling free() on the table itself. + * * * @param entries Pointer to the entries array (or single entry data). It may be NULL, in which * case this call will return immediately. @@ -939,36 +912,46 @@ void redisxDestroyKeys(char **keys, int count) { int redisxDeleteEntries(Redis *redis, const char *pattern) { static const char *fn = "redisxDeleteEntries"; + char *root, *key; char **keys; - int i, n = 0, status; + int i, n = 0, found = 0, status; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); if(!pattern) return x_error(X_NULL, EINVAL, fn, "'pattern' is NULL"); if(!pattern[0]) return x_error(X_NULL, EINVAL, fn, "'pattern' is empty"); - keys = redisxScanKeys(redis, pattern, &n, &status); - if(status) return status; - if(!keys) return x_trace(fn, NULL, X_NULL); + // Separate the top-level component + root = xStringCopyOf(pattern); + xSplitID(root, &key); + + if(redisxIsGlobPattern(root)) { + keys = redisxScanKeys(redis, root, &n, &status); + if(status || !keys) { + free(root); + prop_error(fn, status); + return x_trace(fn, NULL, X_NULL); + } + } + else { + keys = (char **) calloc(1, sizeof(char *)); + x_check_alloc(keys); + keys[0] = xStringCopyOf(root); + n = 1; + } for(i = 0; i < n ; i++) { - char *root, *key; const char *table = keys[i]; RedisEntry *entries; int nEntries; // If the table itself matches, delete it wholesale... - if(fnmatch(pattern, table, 0) == 0) { + if(fnmatch(root, table, 0) == 0) { RESP *reply = redisxRequest(redis, "DEL", table, NULL, NULL, &status); - if(redisxCheckDestroyRESP(reply, RESP_INT, 1) == X_SUCCESS) n++; + if(redisxCheckDestroyRESP(reply, RESP_INT, 1) == X_SUCCESS) found++; continue; } - // Look for table:key style patterns - root = xStringCopyOf(pattern); - xSplitID(root, &key); - // Otherwise check the table entries... - entries = redisxScanTable(redis, table, root, &nEntries, &status); + entries = redisxScanTable(redis, table, key, &nEntries, &status); if(status == X_SUCCESS) { int k; for(k = 0; k < nEntries; k++) { @@ -976,9 +959,9 @@ int redisxDeleteEntries(Redis *redis, const char *pattern) { char *id = xGetAggregateID(table, e->key); if(id) { - if(fnmatch(pattern, id, 0) == 0) { + if(fnmatch(key, id, 0) == 0) { RESP *reply = redisxRequest(redis, "HDEL", table, e->key, NULL, &status); - if(redisxCheckDestroyRESP(reply, RESP_INT, 1) == X_SUCCESS) n++; + if(redisxCheckDestroyRESP(reply, RESP_INT, 1) == X_SUCCESS) found++; } free(id); } @@ -988,11 +971,14 @@ int redisxDeleteEntries(Redis *redis, const char *pattern) { } } - free(root); - if(entries) free(entries); } - return n; + + redisxDestroyKeys(keys, n); + free(root); + + + return found; } #endif diff --git a/src/redisx.c b/src/redisx.c index d362c13..f9f0525 100644 --- a/src/redisx.c +++ b/src/redisx.c @@ -36,6 +36,21 @@ extern int debugTraffic; ///< Whether to print excerpts of all traffi /// \endcond + +/** + * Checks that a redis instance is valid. + * + * @param redis The Redis instance + * @return X_SUCCESS (0) if the instance is valid, or X_NULL if the argument is NULL, + * or else X_NO_INIT if the redis instance is not initialized. + */ +int redisxCheckValid(const Redis *redis) { + static const char *fn = "rCheckRedis"; + if(!redis) return x_error(X_NULL, EINVAL, fn, "Redis instamce is NULL"); + if(!redis->priv) return x_error(X_NO_INIT, EAGAIN, fn, "Redis instance is not initialized"); + return X_SUCCESS; +} + /// \cond PROTECTED /** @@ -44,9 +59,11 @@ extern int debugTraffic; ///< Whether to print excerpts of all traffi * \param redis Pointer to a Redis instance. * */ -void rConfigLock(Redis *redis) { +int rConfigLock(Redis *redis) { + prop_error("rConfigLock", redisxCheckValid(redis)); RedisPrivate *p = (RedisPrivate *) redis->priv; pthread_mutex_lock(&p->configLock); + return X_SUCCESS; } /** @@ -55,9 +72,11 @@ void rConfigLock(Redis *redis) { * \param redis Pointer to a Redis instance. * */ -void rConfigUnlock(Redis *redis) { +int rConfigUnlock(Redis *redis) { + prop_error("rConfigUnlock", redisxCheckValid(redis)); RedisPrivate *p = (RedisPrivate *) redis->priv; pthread_mutex_unlock(&p->configLock); + return X_SUCCESS; } /// \endcond @@ -113,16 +132,18 @@ void redisxDebugTraffic(boolean value) { int redisxSetUser(Redis *redis, const char *username) { static const char *fn = "redisxSetUser"; - RedisPrivate *p; - - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - if(redisxIsConnected(redis)) return x_error(X_ALREADY_OPEN, EALREADY, fn, "already connected"); + int status = X_SUCCESS; - p = (RedisPrivate *) redis->priv; - if(p->username) free(p->username); - p->username = xStringCopyOf(username); + prop_error(fn, rConfigLock(redis)); + if(redisxIsConnected(redis)) status = x_error(X_ALREADY_OPEN, EALREADY, fn, "already connected"); + else { + RedisPrivate *p = (RedisPrivate *) redis->priv; + if(p->username) free(p->username); + p->username = xStringCopyOf(username); + } + rConfigUnlock(redis); - return X_SUCCESS; + return status; } /** @@ -141,18 +162,74 @@ int redisxSetUser(Redis *redis, const char *username) { int redisxSetPassword(Redis *redis, const char *passwd) { static const char *fn = "redisxSetPassword"; - RedisPrivate *p; + int status = X_SUCCESS; + + prop_error(fn, rConfigLock(redis)); + if(redisxIsConnected(redis)) status = x_error(X_ALREADY_OPEN, EALREADY, fn, "already connected"); + else { + RedisPrivate *p = (RedisPrivate *) redis->priv; + if(p->password) free(p->password); + p->password = xStringCopyOf(passwd); + } + rConfigUnlock(redis); + + return status; +} + + +/** + * Sets the RESP prorocol version to use for future client connections. The protocol is set with the + * HELLO command, which was introduced in Redis 6.0.0 only. For older Redis server instances, the + * protocol will default to RESP2. Calling this function will enable using HELLO to handshake with + * the server. + * + * @param redis The Redis server instance + * @param protocol REDISX_RESP2 or REDISX_RESP3. + * @return X_SUCCESS (0) if successful, or X_NULL if the redis argument in NULL, X_NO_INIT + * if the redis instance was not initialized. + * + * @sa redisxGetProtocol() + * @sa redisxGetHelloReply() + */ +int redisxSetProtocol(Redis *redis, enum redisx_protocol protocol) { + static const char *fn = "redisxSetProtocol"; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - if(redisxIsConnected(redis)) return x_error(X_ALREADY_OPEN, EALREADY, fn, "already connected"); + RedisPrivate *p; + prop_error(fn, rConfigLock(redis)); p = (RedisPrivate *) redis->priv; - if(p->password) free(p->password); - p->password = xStringCopyOf(passwd); + p->hello = TRUE; + p->protocol = protocol; + rConfigUnlock(redis); return X_SUCCESS; } +/** + * Returns the actual protocol used with the Redis server. If HELLO was used during connection it will + * be the protocol that was confirmed in the response of HELLO (and which hopefully matches the + * protocol requested). Otherwise, RedisX will default to RESP2. + * + * @param redis The Redis server instance + * @return REDISX_RESP2 or REDISX_RESP3, or else an error code, such as X_NULL if the + * argument is NULL, or X_NO_INIT if the Redis server instance was not initialized. + * + * @sa redisxSetProtocol() + */ +enum redisx_protocol redisxGetProtocol(Redis *redis) { + static const char *fn = "redisxGetProtocol"; + + const RedisPrivate *p; + int protocol; + + prop_error(fn, rConfigLock(redis)); + p = (RedisPrivate *) redis->priv; + protocol = p->protocol; + rConfigUnlock(redis); + + return protocol; +} + /** * Sets the user-specific error handler to call if a socket level trasmit error occurs. * It replaces any prior handlers set earlier. @@ -172,11 +249,11 @@ int redisxSetPassword(Redis *redis, const char *passwd) { * Redis instance is NULL. */ int redisxSetTransmitErrorHandler(Redis *redis, RedisErrorHandler f) { - RedisPrivate *p; + static const char *fn = "redisxSetTransmitErrorHandler"; - if(!redis) return x_error(X_NULL, EINVAL, "redisxSetTransmitErrorHandler", "redis is NULL"); + RedisPrivate *p; - rConfigLock(redis); + prop_error(fn, rConfigLock(redis)); p = (RedisPrivate *) redis->priv; p->transmitErrorFunc = f; rConfigUnlock(redis); @@ -189,7 +266,7 @@ int redisxSetTransmitErrorHandler(Redis *redis, RedisErrorHandler f) { * Returns the current time on the Redis server instance. * * @param redis Pointer to a Redis instance. - * @param[out] t Pointer to a timespec structure in which to return the server time. + * @param[out] t Pointer to a timespec structure in which to return the server time. * @return X_SUCCESS (0) if successful, or X_NULL if either argument is NULL, or X_PARSE_ERROR * if could not parse the response, or another error returned by redisxCheckRESP(). */ @@ -200,7 +277,6 @@ int redisxGetTime(Redis *redis, struct timespec *t) { int status = X_SUCCESS; char *tail; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); if(!t) return x_error(X_NULL, EINVAL, fn, "output timespec is NULL"); memset(t, 0, sizeof(*t)); @@ -261,8 +337,6 @@ int redisxPing(Redis *redis, const char *message) { int status = X_SUCCESS; RESP *reply; - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - reply = redisxRequest(redis, "PING", message, NULL, NULL, &status); if(!status) { @@ -297,9 +371,7 @@ static int redisxSelectDBAsync(RedisClient *cl, int idx, boolean confirm) { char sval[20]; - if(!confirm) { - prop_error(fn, redisxSkipReplyAsync(cl)); - } + if(!confirm) prop_error(fn, redisxSkipReplyAsync(cl)); sprintf(sval, "%d", idx); prop_error(fn, redisxSendRequestAsync(cl, "SELECT", sval, NULL, NULL)); @@ -340,16 +412,16 @@ static void rAffirmDB(Redis *redis) { int redisxSelectDB(Redis *redis, int idx) { static const char *fn = "redisxSelectDB"; - RedisPrivate *p; + const RedisPrivate *p; enum redisx_channel c; - int status = X_SUCCESS; - - if(!redis) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + int dbIdx, status = X_SUCCESS; + prop_error(fn, rConfigLock(redis)); p = (RedisPrivate *) redis->priv; - if(p->dbIndex == idx) return X_SUCCESS; + dbIdx = p->dbIndex; + rConfigUnlock(redis); - p->dbIndex = idx; + if(dbIdx == idx) return X_SUCCESS; if(idx) redisxAddConnectHook(redis, rAffirmDB); else redisxRemoveConnectHook(redis, rAffirmDB); @@ -382,74 +454,6 @@ int redisxSelectDB(Redis *redis, int idx) { return status; } -/** - * Frees up the resources used by a RESP structure that was dynamically allocated. - * The call will segfault if the same RESP is destroyed twice or if the argument - * is a static allocation. - * - * \param resp Pointer to the RESP structure to be destroyed, which may be NULL (no action taken). - */ -void redisxDestroyRESP(RESP *resp) { - if(resp == NULL) return; - if(resp->type == RESP_ARRAY) while(--resp->n >= 0) { - RESP **component = (RESP **) resp->value; - redisxDestroyRESP(component[resp->n]); - } - if(resp->value != NULL) free(resp->value); - free(resp); -} - - -/** - * Checks a Redis RESP for NULL values or unexpected values. - * - * \param resp Pointer to the RESP structure from Redis. - * \param expectedType The RESP type expected (e.g. RESP_ARRAY) or 0 if not checking type. - * \param expectedSize The expected size of the RESP (array or bytes) or <=0 to skip checking - * - * \return X_SUCCESS (0) if the RESP passes the tests, or - * X_NULL if the RESP is NULL (garbled response). - * REDIS_NULL if Redis returned (nil), - * REDIS_UNEXPECTED_TYPE if got a reply of a different type than expected - * REDIS_UNEXPECTED_ARRAY_SIZE if got a reply of different size than expected. - * - * or the error returned in resp->n. - * - */ -int redisxCheckRESP(const RESP *resp, char expectedType, int expectedSize) { - static const char *fn = "redisxCheckRESP"; - - if(resp == NULL) return x_error(X_NULL, EINVAL, fn, "RESP is NULL"); - if(resp->type != RESP_INT) { - if(resp->n < 0) return x_error(X_FAILURE, EBADMSG, fn, "RESP error code: %d", resp->n); - if(resp->value == NULL) if(resp->n) return x_error(REDIS_NULL, ENOMSG, fn, "RESP with NULL value, n=%d", resp->n); - } - if(expectedType) if(resp->type != expectedType) - return x_error(REDIS_UNEXPECTED_RESP, ENOMSG, fn, "unexpected RESP type: expected '%c', got '%c'", expectedType, resp->type); - if(expectedSize > 0) if(resp->n != expectedSize) - return x_error(REDIS_UNEXPECTED_RESP, ENOMSG, fn, "unexpected RESP size: expected %d, got %d", expectedSize, resp->n); - return X_SUCCESS; -} - -/** - * Like redisxCheckRESP(), but it also destroys the RESP in case of an error. - * - * \param resp Pointer to the RESP structure from Redis. - * \param expectedType The RESP type expected (e.g. RESP_ARRAY) or 0 if not checking type. - * \param expectedSize The expected size of the RESP (array or bytes) or <=0 to skip checking - * - * \return The return value of redisxCheckRESP(). - * - * \sa redisxCheckRESP() - * - */ -int redisxCheckDestroyRESP(RESP *resp, char expectedType, int expectedSize) { - int status = redisxCheckRESP(resp, expectedType, expectedSize); - if(status) redisxDestroyRESP(resp); - prop_error("redisxCheckDestroyRESP", status); - return status; -} - /** * Prints a descriptive error message to stderr, and returns the error code. @@ -484,36 +488,53 @@ int redisxError(const char *func, int errorCode) { * \return TRUE (1) if the pipeline client is enabled on the Redis intance, or FALSE (0) otherwise. */ boolean redisxHasPipeline(Redis *redis) { + static const char *fn = "redisxHasPipeline"; + const ClientPrivate *pp; - if(redis == NULL) return FALSE; + boolean isEnabled; + + prop_error(fn, redisxCheckValid(redis)); + prop_error(fn, redisxLockClient(redis->pipeline)); pp = (ClientPrivate *) redis->pipeline->priv; - return pp->isEnabled; + isEnabled = pp->isEnabled; + redisxUnlockClient(redis->pipeline); + + return isEnabled; } /** - * Sets the function processing valid pipeline responses. + * Sets the function processing valid pipeline responses. The implementation should follow a + * simple set of rules: + * + *
    + *
  • the implementation should not destroy the RESP data. The RESP will be destroyed automatically + * after the call returns. However, the call may retain any data from the RESP itself, provided + * the data is de-referenced from the RESP before return.
  • + *
  • The implementation should not block (aside from maybe a quick mutex unlock) and return quickly, + * so as to not block the client for long periods
  • + *
  • If extensive processing or blocking calls are required to process the message, it is best to + * simply place a copy of the RESP on a queue and then return quickly, and then process the message + * asynchronously in a background thread.
  • + *
* * \param redis Pointer to a Redis instance. - * \param f T he function that processes a single argument of type RESP pointer. + * \param f The function that processes a single argument of type RESP pointer. * * \return X_SUCCESS (0) if successful, or * X_NULL if the Redis instance is NULL. */ -int redisxSetPipelineConsumer(Redis *redis, void (*f)(RESP *)) { +int redisxSetPipelineConsumer(Redis *redis, RedisPipelineProcessor f) { RedisPrivate *p; - if(redis == NULL) return x_error(X_NULL, EINVAL, "redisxSetPipelineConsumer", "redis is NULL"); - + prop_error("redisxSetPipelineConsumer", rConfigLock(redis)); p = (RedisPrivate *) redis->priv; - rConfigLock(redis); p->pipelineConsumerFunc = f; rConfigUnlock(redis); return X_SUCCESS; } - /** * Returns the result of a Redis command with up to 3 regularly terminated string arguments. This is not the highest * throughput mode (that would be sending asynchronous pipeline request, and then asynchronously collecting the results @@ -540,14 +561,9 @@ int redisxSetPipelineConsumer(Redis *redis, void (*f)(RESP *)) { * @sa redisxReadReplyAsync() */ RESP *redisxRequest(Redis *redis, const char *command, const char *arg1, const char *arg2, const char *arg3, int *status) { - static const char *fn = "redisxRequest"; - RESP *reply; const char *args[] = { command, arg1, arg2, arg3 }; - int n, s; - - - if(redis == NULL) x_error(X_NULL, EINVAL, fn, "redis is NULL"); + int n, s = X_SUCCESS; if(command == NULL) n = 0; else if(arg1 == NULL) n = 1; @@ -558,7 +574,7 @@ RESP *redisxRequest(Redis *redis, const char *command, const char *arg1, const c reply = redisxArrayRequest(redis, (char **) args, NULL, n, &s); if(status) *status = s; - if(s) x_trace_null(fn, NULL); + if(s) x_trace_null("redisxRequest", NULL); return reply; } @@ -598,20 +614,21 @@ RESP *redisxArrayRequest(Redis *redis, char *args[], int lengths[], int n, int * RESP *reply = NULL; RedisClient *cl; + if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null(fn, NULL); - if(redis == NULL || args == NULL || n < 1 || status == NULL) { - x_error(0, EINVAL, fn, "invalid parameter: redis=%p, args=%p, n=%d, status=%p", redis, args, n, status); + if(args == NULL || n < 1 || status == NULL) { + x_error(0, EINVAL, fn, "invalid parameter: args=%p, n=%d, status=%p", args, n, status); if(status) *status = X_NULL; return NULL; } else *status = X_SUCCESS; - xvprintf("Redis-X> request %s... [%d].\n", args[0], n); - cl = redis->interactive; *status = redisxLockConnected(cl); if(*status) return x_trace_null(fn, NULL); + redisxClearAttributesAsync(cl); + *status = redisxSendArrayRequestAsync(cl, args, lengths, n); if(!(*status)) reply = redisxReadReplyAsync(cl); redisxUnlockClient(cl); @@ -622,6 +639,85 @@ RESP *redisxArrayRequest(Redis *redis, char *args[], int lengths[], int n, int * } +/** + * Sets a user-defined function to process push messages for a specific Redis instance. The function's + * implementation must follow a simple set of rules: + * + *
    + *
  • the implementation should not destroy the RESP data. The RESP will be destroyed automatically + * after the call returns. However, the call may retain any data from the RESP itself, provided + * the data is de-referenced from the RESP before return.
  • + *
  • The call will have exclusive access to the client. As such it should not try to obtain a + * lock or release the lock itself.
  • + *
  • The implementation should not block (aside from maybe a quick mutex unlock) and return quickly, + * so as to not block the client for long periods
  • + *
  • If extensive processing or blocking calls are required to process the message, it is best to + * simply place a copy of the RESP on a queue and then return quickly, and then process the message + * asynchronously in a background thread.
  • + *
  • The client on which the push is originated will be locked, thus the implementation should + * avoid getting explusive access to the client
  • + *
+ * + * @param redis Redis instance + * @param func Function to use for processing push messages from the given Redis instance, or NULL + * to ignore push messages. + * @param arg (optional) User-defined pointer argument to pass along to the processing function. + * @return X_SUCCESS (0) if successful, or else X_NULL (errno set to EINVAL) if the client + * argument is NULL, or X_NO_INIT (errno set to EAGAIN) if redis is uninitialized. + */ +int redisxSetPushProcessor(Redis *redis, RedisPushProcessor func, void *arg) { + static const char *fn = "redisxSetPushProcessor"; + + RedisPrivate *p; + + prop_error(fn, rConfigLock(redis)); + p = redis->priv; + p->pushConsumer = func; + p->pushArg = arg; + rConfigUnlock(redis); + + return X_SUCCESS; +} + +/** + * Returns a copy of the RESP map that the Redis server has sent us as a response to HELLO on the + * last client connection, or NULL if HELLO was not used or available. + * + * @param redis The redis instance + * @return A copy of the response sent by HELLO on the last client connection, or NULL. + * + * @sa redisxSetProtocol() + */ +RESP *redisxGetHelloData(Redis *redis) { + const RedisPrivate *p; + RESP *data; + + int status = rConfigLock(redis); + if(status) return x_trace_null("redisxGetHelloData", NULL); + p = (RedisPrivate *) redis->priv; + data = redisxCopyOfRESP(p->helloData); + rConfigUnlock(redis); + + return data; +} + +/** + * Checks if a given string is a glob-style pattern. + * + * \param str The string to check. + * + * \return TRUE if it is a glob pattern (e.g. has '*', '?' or '['), otherwise FALSE. + * + */ +int redisxIsGlobPattern(const char *str) { + for(; *str; str++) switch(*str) { + case '*': + case '?': + case '[': return TRUE; + } + return FALSE; +} + /** * Returns a string description for one of the RM error codes. * diff --git a/src/resp.c b/src/resp.c new file mode 100644 index 0000000..9780b07 --- /dev/null +++ b/src/resp.c @@ -0,0 +1,765 @@ +/** + * @file + * + * @date Created on Dec 6, 2024 + * @author Attila Kovacs + * + * A set of utilities for handling RESP responses from a Redis / Valkey server. + * + */ + +// We'll use gcc major version as a proxy for the glibc library to decide which feature macro to use. +// gcc 5.1 was released 2015-04-22... +#ifndef __GNUC__ +# define _DEFAULT_SOURCE ///< strcasecmp() feature macro starting glibc 2.20 (2014-09-08) +#elif __GNUC__ >= 5 || __clang__ +# define _DEFAULT_SOURCE ///< strcasecmp() feature macro starting glibc 2.20 (2014-09-08) +#else +# define _BSD_SOURCE ///< strcasecmp() feature macro for glibc <= 2.19 +#endif + + +#include +#include +#include +#include +#include + + +#include "redisx-priv.h" +#include + + +/** + * Frees up the resources used by a RESP structure that was dynamically allocated. + * The call will segfault if the same RESP is destroyed twice or if the argument + * is a static allocation. + * + * \param resp Pointer to the RESP structure to be destroyed, which may be NULL (no action taken). + */ +void redisxDestroyRESP(RESP *resp) { + if(resp == NULL) return; + + if(resp->value) switch(resp->type) { + case RESP_ARRAY: + case RESP3_SET: + case RESP3_PUSH: { + RESP **component = (RESP **) resp->value; + while(--resp->n >= 0) redisxDestroyRESP(component[resp->n]); + break; + } + case RESP3_MAP: + case RESP3_ATTRIBUTE: { + RedisMapEntry *component = (RedisMapEntry *) resp->value; + while(--resp->n >= 0) { + RedisMapEntry *e = &component[resp->n]; + redisxDestroyRESP(e->key); + redisxDestroyRESP(e->value); + } + break; + } + default: + ; + } + + if(resp->value != NULL) free(resp->value); + free(resp); +} + +/** + * Creates an independent deep copy of the RESP, which shares no references with the original. + * + * @param resp The original RESP data structure (it may be NULL). + * @return A copy of the original, with no shared references. + */ +RESP *redisxCopyOfRESP(const RESP *resp) { + RESP *copy; + + if(!resp) return NULL; + + copy = (RESP *) calloc(1, sizeof(RESP)); + x_check_alloc(copy); + + copy->type = resp->type; + copy->n = resp->n; + + if(resp->value == NULL) return copy; + + switch(resp->type) { + case RESP_ARRAY: + case RESP3_SET: + case RESP3_PUSH: { + RESP **from = (RESP **) resp->value; + RESP **to = (RESP **) calloc(resp->n, sizeof(RESP *)); + int i; + + x_check_alloc(to); + + for(i = 0; i < resp->n; i++) to[i] = redisxCopyOfRESP(from[i]); + copy->value = to; + break; + } + + case RESP3_MAP: + case RESP3_ATTRIBUTE: { + const RedisMapEntry *from = (RedisMapEntry *) resp->value; + RedisMapEntry *to = (RedisMapEntry *) calloc(resp->n, sizeof(RedisMapEntry)); + int i; + + x_check_alloc(to); + + for(i = 0; i < resp->n; i++) { + to[i].key = redisxCopyOfRESP(from[i].key); + to[i].value = redisxCopyOfRESP(from[i].value); + } + copy->value = to; + break; + } + + case RESP_SIMPLE_STRING: + case RESP_ERROR: + case RESP_BULK_STRING: + case RESP3_BLOB_ERROR: + case RESP3_VERBATIM_STRING: + case RESP3_BIG_NUMBER: { + char *str = (char *) malloc(resp->n + 1); + x_check_alloc(str); + memcpy(str, resp->value, resp->n); + str[resp->n] = '\0'; + copy->value = str; + break; + } + + case RESP3_DOUBLE: + copy->value = (double *) malloc(sizeof(double)); + x_check_alloc(copy->value); + memcpy(copy->value, resp->value, sizeof(double)); + break; + + default: + ; + } + + return copy; +} + +/** + * Checks a Redis RESP for NULL values or unexpected values. + * + * \param resp Pointer to the RESP structure from Redis. + * \param expectedType The RESP type expected (e.g. RESP_ARRAY) or 0 if not checking type. + * \param expectedSize The expected size of the RESP (array or bytes) or <=0 to skip checking + * + * \return X_SUCCESS (0) if the RESP passes the tests, or + * X_NULL if the RESP is NULL (garbled response). + * REDIS_NULL if Redis returned (nil), + * REDIS_UNEXPECTED_TYPE if got a reply of a different type than expected + * REDIS_UNEXPECTED_ARRAY_SIZE if got a reply of different size than expected. + * + * or the error returned in resp->n. + * + */ +int redisxCheckRESP(const RESP *resp, enum resp_type expectedType, int expectedSize) { + static const char *fn = "redisxCheckRESP"; + + if(resp == NULL) return x_error(X_NULL, EINVAL, fn, "RESP is NULL"); + if(resp->type == RESP3_BOOLEAN) { + if(resp->n != (expectedSize ? 1 : 0)) return x_error(X_FAILURE, EBADMSG, fn, "unexpected boolean value: expected %d, got %d", (expectedSize ? 1 : 0), resp->n); + } + if(resp->type != RESP_INT && resp->type != RESP3_NULL) { + if(resp->n < 0) return x_error(X_FAILURE, EBADMSG, fn, "RESP error code: %d", resp->n); + if(resp->value == NULL) if(resp->n) return x_error(REDIS_NULL, ENOMSG, fn, "RESP with NULL value, n=%d", resp->n); + } + if(expectedType) if(resp->type != expectedType) + return x_error(REDIS_UNEXPECTED_RESP, ENOMSG, fn, "unexpected RESP type: expected '%c', got '%c'", expectedType, resp->type); + if(expectedSize > 0) if(resp->n != expectedSize) + return x_error(REDIS_UNEXPECTED_RESP, ENOMSG, fn, "unexpected RESP size: expected %d, got %d", expectedSize, resp->n); + return X_SUCCESS; +} + +/** + * Like redisxCheckRESP(), but it also destroys the RESP in case of an error. + * + * \param resp Pointer to the RESP structure from Redis. + * \param expectedType The RESP type expected (e.g. RESP_ARRAY) or 0 if not checking type. + * \param expectedSize The expected size of the RESP (array or bytes) or <=0 to skip checking + * + * \return The return value of redisxCheckRESP(). + * + * \sa redisxCheckRESP() + * + */ +int redisxCheckDestroyRESP(RESP *resp, enum resp_type expectedType, int expectedSize) { + int status = redisxCheckRESP(resp, expectedType, expectedSize); + if(status) redisxDestroyRESP(resp); + prop_error("redisxCheckDestroyRESP", status); + return status; +} + + +/** + * Splits the string value of a RESP into two components, by terminating the first component with a null + * byte and optionally returning the remaining part and length in the output parameters. Only RESP_ERROR + * RESP_BLOB_ERROR and RESP_VERBATIM_STRING types can be split this way. All others will return + * REDIS_UNEXPECTED_RESP. + * + * @param resp The input RESP. + * @param[out] text (optional) pointer in which to return the start of the remnant text component. + * @return n the length of the remnant text (<=0), or else X_NULL if the input RESP was NULL, + * or REDIS_UNEXPEXCTED_RESP if the input RESP does not contain a two-component string + * value. + * + * @sa RESP_ERROR + * @sa RESP3_BLOB_ERROR + * @sa RESP3_VERBATIM_STRING + */ +int redisxSplitText(RESP *resp, char **text) { + static const char *fn = "redisxSplitText"; + char *str; + + if(!resp) return x_error(X_NULL, EINVAL, fn, "input RESP is NULL"); + + if(!resp->value) { + if(text) *text = NULL; + return 0; + } + + str = (char *) resp->value; + + switch(resp->type) { + case RESP3_VERBATIM_STRING: + if(resp->n < 4) + return x_error(X_PARSE_ERROR, ERANGE, fn, "value '%s' is too short (%d bytes) for verbatim string type", str, resp->n); + str[3] = '\0'; + if(text) *text = &str[4]; + return resp->n - 4; + + case RESP_ERROR: + case RESP3_BLOB_ERROR: { + const char *code = strtok(str, " \t\r\n"); + int offset = strlen(code) + 1; + + if(offset < resp->n) { + if(text) *text = &str[offset]; + return resp->n - offset - 1; + } + + if(text) *text = NULL; + return 0; + } + + default: + return x_error(REDIS_UNEXPECTED_RESP, EINVAL, fn, "RESP type '%c' does not have a two-component string value", resp->type); + } +} + +/** + * Checks if a RESP holds a scalar type value, such as an integer, a boolean or a double-precision value, or a null value. + * + * @param r Pointer to a RESP data structure + * @return TRUE (1) if the data holds a scalar-type value, or else FALSE (0). + * + * @sa redisxIsStringType() + * @sa redisxIsArrayType() + * @sa redisxIsMapType() + * @sa RESP_INT + * @sa RESP3_BOOLEAN + * @sa RESP3_DOUBLE + * @sa RESP3_NULL + * + */ +boolean redisxIsScalarType(const RESP *r) { + if(!r) return FALSE; + + switch(r->type) { + case RESP_INT: + case RESP3_BOOLEAN: + case RESP3_DOUBLE: + case RESP3_NULL: + return TRUE; + + default: + return FALSE; + } + +} + +/** + * Checks if a RESP holds a string type value, whose `value` can be cast to `(char *)` to use. + * + * @param r Pointer to a RESP data structure + * @return TRUE (1) if the data holds a string type value, or else FALSE (0). + * + * @sa redisxIsScalarType() + * @sa redisxIsArrayType() + * @sa redisxIsMapType() + * @sa RESP_SIMPLE_STRING + * @sa RESP_ERROR + * @sa RESP_BULK_STRING + * @sa RESP3_BLOB_ERROR + * @sa RESP3_VERBATIM_STRING + * + */ +boolean redisxIsStringType(const RESP *r) { + if(!r) return FALSE; + + switch(r->type) { + case RESP_SIMPLE_STRING: + case RESP_ERROR: + case RESP_BULK_STRING: + case RESP3_BLOB_ERROR: + case RESP3_VERBATIM_STRING: + case RESP3_BIG_NUMBER: + return TRUE; + + default: + return FALSE; + } +} + +/** + * Checks if a RESP holds an array of RESP pointers, and whose `value` can be cast to `(RESP **)` to use. + * + * @param r Pointer to a RESP data structure + * @return TRUE (1) if the data holds an array of `RESP *` pointers, or else FALSE (0). + * + * @sa redisxIsScalarType() + * @sa redisxIsStringType() + * @sa redisxIsMapType() + * @sa RESP_ARRAY + * @sa RESP3_SET + * @sa RESP3_PUSH + * + */ +boolean redisxIsArrayType(const RESP *r) { + if(!r) return FALSE; + + switch(r->type) { + case RESP_ARRAY: + case RESP3_SET: + case RESP3_PUSH: + return TRUE; + + default: + return FALSE; + } +} + +/** + * Checks if a RESP holds a dictionary, and whose `value` can be cast to `(RedisMapEntry *)` to use. + * + * @param r Pointer to a RESP data structure + * @return TRUE (1) if the data holds a dictionary (a RedisMapEntry array), or else FALSE (0). + * + * @sa redisxIsScalarType() + * @sa redisxIsStringType() + * @sa redisxIsMapType() + * @sa RESP3_MAP + * @sa RESP3_ATTRIBUTE + * + */ +boolean redisxIsMapType(const RESP *r) { + if(!r) return FALSE; + + switch(r->type) { + case RESP3_MAP: + case RESP3_ATTRIBUTE: + return TRUE; + + default: + return FALSE; + } +} + +/** + * Checks if a RESP has subcomponents, such as arrays or maps (dictionaries). + * + * @param r Pointer to a RESP data structure + * @return TRUE (1) if the data has sub-components, or else FALSE (0). + * + * @sa redisxIsArrayType() + * @sa redisxIsMapType() + * @sa RESP3_MAP + * @sa RESP3_ATTRIBUTE + * + */ +boolean redisxHasComponents(const RESP *r) { + if(!r) return FALSE; + + return r->n > 0 && (redisxIsArrayType(r) || redisxIsMapType(r)); +} + + +/** + * Appends a part to an existing RESP of the same type, before discarding the part. + * + * @param[in, out] resp The RESP to which the part is appended + * @param part The part, which is destroyed after the content is appended to the first RESP argument. + * @return X_SUCCESS (0) if successful, or else X_NULL if the first argument is NULL, or + * REDIS_UNEXPECTED_RESP if the types do not match, or X_FAILURE if there was an allocation + * error. + */ +int redisxAppendRESP(RESP *resp, RESP *part) { + static const char *fn = "redisxAppendRESP"; + char *old, *extend; + size_t eSize; + + if(!resp) + return x_error(X_NULL, EINVAL, fn, "NULL resp"); + if(!part || part->type == RESP3_NULL || part->n <= 0) + return 0; + if(resp->type != part->type) { + int err = x_error(REDIS_UNEXPECTED_RESP, EINVAL, fn, "Mismatched types: '%c' vs. '%c'", resp->type, part->type); + redisxDestroyRESP(part); + return err; + } + if(redisxIsScalarType(resp)) + return x_error(REDIS_UNEXPECTED_RESP, EINVAL, fn, "Cannot append to RESP type '%c'", resp->type); + + if(redisxIsArrayType(resp)) + eSize = sizeof(RESP *); + else if(redisxIsMapType(resp)) + eSize = sizeof(RedisMapEntry); + else + eSize = 1; + + old = resp->value; + extend = (char *) realloc(resp->value, (resp->n + part->n) * eSize); + if(!extend) { + free(old); + return x_error(X_FAILURE, errno, fn, "alloc RESP array (%d components)", resp->n + part->n); + } + + memcpy(extend + resp->n * eSize, part->value, part->n * eSize); + resp->n += part->n; + resp->value = extend; + free(part); + + return X_SUCCESS; +} + +/** + * Checks if two RESP are equal, that is they hold the same type of data, have the same 'n' value, + * and the values match byte-for-byte, or are both NULL. + * + * @param a Ponter to a RESP data structure. + * @param b Pointer to another RESP data structure. + * @return TRUE (1) if the two RESP structures match, or else FALSE (0). + */ +boolean redisxIsEqualRESP(const RESP *a, const RESP *b) { + if(a == b) return TRUE; + if(!a || !b) return FALSE; + + + if(a->type != b->type) return FALSE; + if(a->n != b->n) return FALSE; + if(a->value == NULL) return (b->value == NULL); + if(!b->value) return FALSE; + + return (memcmp(a->value, b->value, a->n) == 0); +} + +/** + * Retrieves a keyed entry from a map-type RESP data structure. + * + * @param map The map-type REST data structure containing a dictionary + * @param key The RESP key to match + * @return The matching map entry or NULL if the map contains no such entry. + * + * @sa RESP3_MAP + * @sa RESP3_ATTRIBUTE + * + * @sa redisxGetKeywordEntry() + */ +RedisMapEntry *redisxGetMapEntry(const RESP *map, const RESP *key) { + int i; + RedisMapEntry *entries; + + if(!key) return NULL; + if(!redisxIsMapType(map)) return NULL; + if(!map->value) return NULL; + + entries = (RedisMapEntry *) map->value; + + for(i = 0; i < map->n; i++) { + RedisMapEntry *e = &entries[i]; + + if(e->key->type != key->type) continue; + if(e->key->n != key->n) continue; + if(key->value == NULL) { + if(e->key->value == NULL) return e; + continue; + } + if(e->key->value == NULL) continue; + if(memcmp(e->key->value, key->value, key->n) == 0) return e; + } + + return NULL; +} + +/** + * Retrieves a entry, by its string keyword, from a map-type RESP data structure. + * + * @param map The map-type REST data structure containing a dictionary + * @param key The string keyword to match + * @return The matching map entry or NULL if the map contains no such entry. + * + * @sa RESP3_MAP + * @sa RESP3_ATTRIBUTE + * + * @sa redisxGetMapEntry() + */ +RedisMapEntry *redisxGetKeywordEntry(const RESP *map, const char *key) { + int i; + RedisMapEntry *entries; + + if(!key) return NULL; + if(!redisxIsMapType(map)) return NULL; + if(!map->value) return NULL; + + entries = (RedisMapEntry *) map->value; + + for(i = 0; i < map->n; i++) { + RedisMapEntry *e = &entries[i]; + + if(!redisxIsStringType(e->key)) continue; + if(strcmp(e->key->value, key) == 0) return e; + } + + return NULL; +} + +static XType resp2xType(enum resp_type type) { + switch(type) { + case RESP3_NULL: + return X_UNKNOWN; + case RESP3_BOOLEAN: + return X_BOOLEAN; + case RESP_INT: + return X_LONG; + case RESP3_DOUBLE: + return X_DOUBLE; + case RESP_SIMPLE_STRING: + case RESP_BULK_STRING: + case RESP_ERROR: + case RESP3_BLOB_ERROR: + case RESP3_VERBATIM_STRING: + case RESP3_BIG_NUMBER: + return X_STRING; + case RESP_ARRAY: + case RESP3_SET: + case RESP3_PUSH: + return X_FIELD; + case RESP3_MAP: + case RESP3_ATTRIBUTE: + return X_STRUCT; + } + + return X_UNKNOWN; +} + + +static XField *respArrayToXField(const char *name, const RESP **component, int n) { + static const char *fn = "respArrayToXField"; + + XField *f; + enum resp_type type = RESP3_NULL; + int i; + + if(n < 0) return NULL; + + for(i = 0; i < n; i++) { + if(i == 0) type = component[i]->type; + else if(component[i]->type != type) break; + } + + if(i < n) { + // -------------------------------------------------------- + // Heterogeneous array... + + XField *array; + + f = xCreateMixed1DField(name, n); + + if(!f->value) return x_trace_null(fn, "field array"); + + array = (XField *) f->value; + + for(i = 0; i < n; i++) { + XField *e = redisxRESP2XField(array[i].name, component[i]); + if(e) { + array[i] = *e; + free(e); + } + } + } + + else { + // -------------------------------------------------------- + // Homogeneous array... + + XType eType = resp2xType(type); + char *array; + size_t eSize; + + if(eType == X_UNKNOWN) return xCreateMixed1DField(name, 0); + + eSize = xElementSizeOf(eType); + array = (char *) calloc(1, n * eSize); + + f = xCreate1DField(name, eType, n, array); + f->flags = type; + if(!array) return x_trace_null(fn, "field array"); + + for(i = 0; i < n; i++) { + XField *e = redisxRESP2XField("", component[i]); + if(e) { + memcpy(&array[i * eSize], e, sizeof(XField)); + free(e); + } + } + } + + return f; +} + + +static XField *respMap2XField(const char *name, const RedisMapEntry *map, int n) { + XStructure *s = xCreateStruct(), *nonstring = NULL; + int nNonString = 0; + + while(--n >= 0) { + const RedisMapEntry *e = &map[n]; + + if(redisxIsStringType(e->key)) { + XField *fi = redisxRESP2XField((char *) e->key->value, e->value); + if(fi) { + fi->next = s->firstField; + s->firstField = fi; + } + } + else { + // Non string keyed entries will be added under a '.non-string-keys' sub-structure + // as indexed fields. + char idx[20]; + XStructure *sub = xCreateStruct(); + xSetField(sub, redisxRESP2XField("value", e->value)); + xSetField(sub, redisxRESP2XField("key", e->key)); + sprintf(idx, ".%d", ++nNonString); + if(!nonstring) + nonstring = xCreateStruct(); + xSetSubstruct(nonstring, xStringCopyOf(idx), sub); + } + } + + if(nonstring) xSetSubstruct(s, ".non-string-keys", nonstring); + + return xCreateScalarField(name, X_STRUCT, s); +} + + + +/** + * Converts a RESP to the xchange representation as an appropriate XField. + * + *
    + *
  • RESP3_NULL values are converted to NULL.
  • + *
  • Scalar values are converted to an XField with the equivalent type.
  • + *
  • Homogenerous arrays are converted to a field with a 1D array of corresponding xchange type.
  • + *
  • Heterogeneous arrays are converted to a field with a 1D array of X_FIELD type (containing an array of fields).
  • + *
  • Maps with string keywords are converted to an X_STRUCT.
  • + *
  • Maps with non-string keywords are added under a sub-structure named '.non-string-keys' as indexed structures + * with separate 'key' and 'value' fields. + *
+ * + * @param name The name to assign to the field + * @param resp The RESP data to convert + * @return An XField with the data from the RESP, or NULL if there was an error (errno will be + * set to indicate the type of error). + * + * @sa resp2json() + */ +XField *redisxRESP2XField(const char *name, const RESP *resp) { + static const char *fn = "resp2XField"; + + errno = 0; + + switch(resp->type) { + case RESP3_NULL: + return xCreateStringField(name, NULL); + + case RESP3_BOOLEAN: + return xCreateBooleanField(name, resp->n); + + case RESP_INT: + return xCreateIntField(name, resp->n); + + case RESP3_DOUBLE: + return xCreateDoubleField(name, *(double *) resp->value); + + case RESP_SIMPLE_STRING: + case RESP_BULK_STRING: + case RESP_ERROR: + case RESP3_BLOB_ERROR: + case RESP3_VERBATIM_STRING: + case RESP3_BIG_NUMBER: { + XField *f = xCreateStringField(name, xStringCopyOf((char *) resp->value)); + f->flags = resp->type; + return f; + } + + case RESP_ARRAY: + case RESP3_SET: + case RESP3_PUSH: { + XField *f = respArrayToXField(name, (const RESP **) resp->value, resp->n); + if(!f) return x_trace_null(fn, NULL); + f->flags = resp->type; + return f; + } + + case RESP3_MAP: + case RESP3_ATTRIBUTE: { + XField *f = respMap2XField(name, (const RedisMapEntry *) resp->value, resp->n); + if(!f) return x_trace_null(fn, NULL); + f->flags = resp->type; + return f; + } + + } + + return NULL; +} + +/** + * Converts a RESP to the xchange representation as an appropriate XField. + * + * @param name The name to assign to the field + * @param resp The RESP data to convert + * @return An XField with the data from the RESP, or NULL if there was an error (errno will be + * set to indicate the type of error). + * + * @sa resp2XField() + */ +char *redisxRESP2JSON(const char *name, const RESP *resp) { + return xjsonFieldToString(redisxRESP2XField(name, resp)); +} + +/** + * Prints a RESP in JSON format to the standard output with the specified name + * + * @param name The name/ID to assign to the RESP + * @param resp The RESP data to print + * @return 0 + */ +int redisxPrintRESP(const char *name, const RESP *resp) { + char *json = redisxRESP2JSON("hello_response", resp); + + if(json) { + printf("%s", json); + free(json); + } + else printf("\"%s\": null\n", name); + + return X_SUCCESS; +} + diff --git a/test/Makefile b/test/Makefile new file mode 100644 index 0000000..4be0979 --- /dev/null +++ b/test/Makefile @@ -0,0 +1,36 @@ +# Use the parent directories for libraries and headers. +LIB = ../lib +INC = ../include +BUILD_MODE = debug + +ifdef XCHANGE + XCHANGE := ../$(XCHANGE) +endif + +# Load the common Makefile definitions... +include ../config.mk + +.PHONY: all +all: tests run + +.PHONY: tests +tests: $(BIN)/test-hello $(BIN)/test-tab + +.PHONY: run +run: tests + $(BIN)/test-hello + $(BIN)/test-tab + +$(BIN)/test-%: $(OBJ)/test-%.o $(LIB)/libredisx.a + make $(BIN) + $(CC) -o $@ $^ $(LDFLAGS) -lredisx + +.PHONY: clean-test +clean-test: + rm -rf bin + +clean: clean-test + +# Finally, the standard generic rules and targets... +include ../build.mk + diff --git a/test/src/test-hello.c b/test/src/test-hello.c new file mode 100644 index 0000000..70f5593 --- /dev/null +++ b/test/src/test-hello.c @@ -0,0 +1,62 @@ +/** + * @file + * + * @date Created on Dec 8, 2024 + * @author Attila Kovacs + */ + +#include +#include +#include + +#include "redisx.h" +#include "xchange.h" + +int main() { + + Redis *redis = redisxInit("localhost"); + RedisEntry *e; + RESP *resp; + const char *json; + int n = -1; + + xSetDebug(TRUE); + //redisxSetVerbose(TRUE); + //redisxDebugTraffic(TRUE); + + redisxSetProtocol(redis, REDISX_RESP3); + + if(redisxConnect(redis, FALSE) < 0) { + perror("ERROR! connect"); + return 1; + } + + resp = redisxGetHelloData(redis); + json = redisxRESP2JSON("server_properties", resp); + printf("%s", json ? json : ""); + free(json); + redisxDestroyRESP(resp); + + + if(redisxGetProtocol(redis) != REDISX_RESP3) { + fprintf(stderr, "ERROR! verify RESP3 protocol\n"); + return 1; + } + + if(redisxSetValue(redis, "_test_", "_value_", "1", TRUE) < 0) { + perror("ERROR! set value"); + return 1; + } + + e = redisxGetTable(redis, "_test_", &n); + if(n <= 0) return 1; + + redisxDestroyEntries(e, n); + redisxDisconnect(redis); + redisxDestroy(redis); + + fprintf(stderr, "OK\n"); + + return 0; + +} diff --git a/test/src/test-tab.c b/test/src/test-tab.c new file mode 100644 index 0000000..ab9128b --- /dev/null +++ b/test/src/test-tab.c @@ -0,0 +1,189 @@ +/** + * @file + * + * @date Created on Dec 8, 2024 + * @author Attila Kovacs + */ + +#include +#include +#include + +#include "redisx.h" +#include "xchange.h" + +int main() { + + Redis *redis = redisxInit("localhost"); + RedisEntry *e; + RESP *resp; + int status; + char **keys, *value; + int n1 = -1, n2 = -1; + + xSetDebug(TRUE); + //redisxSetVerbose(TRUE); + //redisxDebugTraffic(TRUE); + + if(redisxConnect(redis, TRUE) < 0) { + perror("ERROR! connect"); + return 1; + } + + if(redisxSetValue(redis, "_test_", "_value_", "2", TRUE) < 0) { + perror("ERROR! set value"); + return 1; + } + + resp = redisxGetValue(redis, "_test_", "_value_", &status); + if(status) { + perror("ERROR! get value"); + return 1; + } + redisxPrintRESP("get value", resp); + redisxCheckDestroyRESP(resp, RESP_BULK_STRING, 1); + if(strcmp("2", (char *) resp->value) != 0) { + fprintf(stderr, "ERROR! mismatched value: got '%s', expected '%s'\n", (char *) resp->value, "2"); + return 1; + } + redisxDestroyRESP(resp); + + if(redisxSetValue(redis, "_test_", "_value_", "3", FALSE) < 0) { + perror("ERROR! set value (noconfirm)"); + return 1; + } + + resp = redisxGetValue(redis, "_test_", "_value_", &status); + if(status) { + perror("ERROR! get value (noconfirm)"); + return 1; + } + redisxCheckDestroyRESP(resp, RESP_BULK_STRING, 1); + if(strcmp("3", (char *) resp->value) != 0) { + fprintf(stderr, "ERROR! mismatched value: got '%s', expected '%s'\n", (char *) resp->value, "3"); + return 1; + } + redisxDestroyRESP(resp); + + e = redisxGetTable(redis, "_test_", &n1); + if(n1 <= 0) { + fprintf(stderr, "ERROR! get table: %d\n", n1); + return 1; + } + redisxDestroyEntries(e, n1); + + e = redisxScanTable(redis, "_test_", "*", &n2, &status); + if(n2 <= 0 || status) { + fprintf(stderr, "ERROR! scan table: n = %d, status = %d\n", n2, status); + return 1; + } + redisxDestroyEntries(e, n2); + + if(n1 != n2) { + fprintf(stderr, "ERROR! scan table mismatch: got %d, expected %d\n", n2, n1); + return 1; + } + + // table keys + keys = redisxGetKeys(redis, "_test_", &n1); + if(n1 <= 0 || keys == NULL) { + fprintf(stderr, "ERROR! get test table keys: %d\n", n1); + return 1; + } + redisxDestroyKeys(keys, n1); + + // Top-level keys + keys = redisxGetKeys(redis, NULL, &n1); + if(n1 <= 0 || keys == NULL) { + fprintf(stderr, "ERROR! get keys: %d\n", n1); + return 1; + } + redisxDestroyKeys(keys, n1); + + // Scanned top-level keys + keys = redisxScanKeys(redis, "*", &n2, &status); + if(n2 <= 0 || keys == NULL) { + fprintf(stderr, "ERROR! scan keys: n = %d, status = %d\n", n2, status); + return 1; + } + redisxDestroyKeys(keys, n2); + + if(n1 != n2) { + fprintf(stderr, "ERROR! scan keys mismatch: got %d, expected %d\n", n2, n1); + return 1; + } + + // TODO multiset + e = (RedisEntry *) calloc(2, sizeof(RedisEntry)); + e[0].key = "_value_"; + e[0].value = "4"; + e[1].key = "_extra_"; + e[1].value = "5"; + status = redisxMultiSet(redis, "_test_", e, 2, TRUE); + if(status) { + perror("ERROR! multiset"); + return 1; + } + + value = redisxGetStringValue(redis, "_test_", e[0].key, 0); + if(strcmp(e[0].value, value) != 0) { + fprintf(stderr, "ERROR! mismatched multi value 1: got '%s', expected '%s'\n", value, "4"); + return 1; + } + free(value); + + value = redisxGetStringValue(redis, "_test_", e[1].key, 0); + if(strcmp(e[1].value, value) != 0) { + fprintf(stderr, "ERROR! mismatched multi value 2: got '%s', expected '%s'\n", value, "4"); + return 1; + } + free(value); + + e[1].value = "6"; + status = redisxMultiSet(redis, "_test_", e, 2, FALSE); + if(status) { + perror("ERROR! multiset (noconfirm)"); + return 1; + } + + value = redisxGetStringValue(redis, "_test_", e[1].key, 0); + if(strcmp(e[1].value, value) != 0) { + fprintf(stderr, "ERROR! mismatched multi value 2 (noconfirm): got '%s', expected '%s'\n", value, "4"); + return 1; + } + free(value); + free(e); + + +#if __STDC_VERSION__ > 201112L + // The following is not available on prior to the POSIX.1-2008 standard + // We'll use the __STDC_VERSION__ constant as a proxy to see if fnmatch is available + + n1 = redisxDeleteEntries(redis, "_*:_extra_"); + if(n1 < 0) { + perror("ERROR! delete entries"); + return 1; + } + if(n1 !=1) { + fprintf(stderr, "ERROR! mismatched deleted entries: got %d, expected %d\n", n1, 1); + return 1; + } + + xSetDebug(FALSE); + value = redisxGetStringValue(redis, "_test_", "_extra_", 0); + if(value) { + fprintf(stderr, "ERROR! deleted entry exists\n"); + return 1; + } + xSetDebug(TRUE); + +#endif + + redisxDisconnect(redis); + redisxDestroy(redis); + + fprintf(stderr, "OK\n"); + + return 0; + +}