Skip to content

Commit

Permalink
More error handling and HELLO tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Dec 9, 2024
1 parent 5a6113f commit bbb74a4
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 217 deletions.
89 changes: 85 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,16 @@ You can also set the RESP protocol to use (provided your server is compatible wi

The above call will use the `HELLO` command (since Redis 6) upon connecting. If you do not set the protocol, `HELLO`
will not be used, and RESP2 will be assumed -- which is best for older servers. (Note, that you can always check the
actual protocol used after connecting, using `redisxGetProtocol()`).
actual protocol used after connecting, using `redisxGetProtocol()`). Note, that after connecting, you may retrieve
the set of server properties sent in response to `HELLO` using `redisxGetHelloData()`.

You might also tweak the send/receive buffer sizes to use for clients, if you find the socket defaults sub-optimal for
your application (note, that this setting is common to all `Redis` instances managed by the library):
You might also tweak the socket options used for clients, if you find the socket defaults sub-optimal for your
application (note, that this setting is common to all `Redis` instances managed by the library):

```c
// (optional) Set 1000 ms socket read/write timeout for future connections.
redisxSetSocketTimeout(redis, 1000);

// (optional) Set the TCP send/rcv buffer sizes to use if not default values.
// This setting applies to all new connections after...
redisxSetTcpBuf(65536);
Expand Down Expand Up @@ -315,7 +319,9 @@ The same goes for disconnect hooks, using `redisxAddDisconnectHook()` instead.
## Simple Redis queries
- [Interactive transactions](#interactive-transactions)
- [RESP data type](#resp-data-type)
- [Push notifications](#push-notifications)
- [RESP data type](#resp-data-type)
Redis queries are sent as strings, according the the specification of the Redis protocol. All responses sent back by
the server using the RESP protocol. Specifically, Redis uses version 2 of the RESP protocol (a.k.a. RESP2) by
Expand Down Expand Up @@ -366,6 +372,46 @@ individual parameters are not 0-terminated strings.
In interactive mode, each request is sent to the Redis server, and the response is collected before the call returns
with that response (or `NULL` if there was an error).
<a name="push-notifications"></a>
### Push notifications
Redis 6 introduced out-of-band push notifications along with RESP3. It allows the server to send messages to any
connected client that are not in response to a query. For example, Redis 6 allows `CLIENT TRACKING` to use such push
notifications (e.g. `INVALIDATE foo`), to notify connected clients when a watched variable has been updated from
somewhere else.
__RedisX__ allows you to specify a custom callback `RedisPushProcessor` function to handle such push notifications,
e.g.:
```c
void my_push_processor(RESP *message, void *ptr) {
char *owner = (char *) ptr; // Additional argument we need, in this case a string.
printf("[%s] Got push message: type %c, n = %d.\n", owner, message->type, message->n);
}
```

Then you can activate the processing of push notifications with `redisxSetPushProcessor()`. You can specify the
optional additional data that you want to pass along to the push processor function -- just make sure that the data
has a sufficient scope / lifetime such that it is valid at all times while push messages are being processed. E.g.

```c
static owner = "my process"; // The long life data we want to pass to my_push_processor...

// Use my_push_processor and pass along the owner as a parameter
redisxSetPushProcessor(redis, my_push_processor, owner);
```
There are some things to look out for in your `RedisPushProcessor` implementation:
- The call should not block (except perhaps for a quick mutex lock) and should return quickly. If blocking calls, or
extensive processing is required, you should place a copy of the PUSH notification onto a queue and let an
asynchronous thread take it from there.
- The call should not attempt to alter or destroy the push message. If needed it can copy parts or the whole.
- You should not attempt to lock or release clients from the call. If you need access to a client, it's best to put a
copy of the RESP notification onto a queue and let an asynchronous thread deal with it.
- You should
<a name="resp-data-type"></a>
### RESP data type
Expand Down Expand Up @@ -462,6 +508,8 @@ Before destroying a RESP structure, the caller may want to dereference values wi
```




-----------------------------------------------------------------------------

<a name="accessing-key-value-data"></a>
Expand Down Expand Up @@ -818,8 +866,10 @@ Stay tuned.
## Advanced queries and pipelining

- [Asynchronous client processing](#asynchronous-client-processing)
- [Bundled Attributes](#attributes)
- [Pipelined transactions](#pipelined-transactions)


<a name="asynchronous-client-processing"></a>
### Asynchronous client processing

Expand Down Expand Up @@ -901,6 +951,37 @@ Of course you can build up arbitrarily complex set of queries and deal with a se
what works best for your application.
<a name="attributes"></a>
### Bundled Attributes
As of Redis 6, the server might send ancillary data along with replies, if the RESP3 protocol is used. These are
collected together with the expected responses. However, these optional attributes are not returned to the user
automatically. Instead, the user may retrieve attributes directly after getting a response from
`redisxReadReplyAsync()` using `redisxGetAttributesAsync()`. And, attributes that were received previously can be
discarded with `redisxClearAttributesAsync()`. For example,
```c
RedisClient *cl = ... // The client we use for our transactions
if(redisxLockEnabled(cl) == X_SUCCESS) {
...
// Clear any prior attributes we may have previously received for the client...
redisxClearAttributesAsync(cl);
// Read a response for a request we sent earlier...
RESP *reply = redisxReadReplyAsync(cl);
// Retrieve the attributes (if any) that were sent with the response.
RESP *attributes = redisxGetAttributes(cl);
...
redisxUnlockClient(cl);
}
```


<a name="pipelined-transactions"></a>
### Pipelined transactions

Expand Down
9 changes: 7 additions & 2 deletions include/redisx-priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ typedef struct {
int socket; ///< Changing the socket should require both locks!
int pendingRequests; ///< Number of request sent and not yet answered...
RESP *attributes; ///< Attributes from the last packet received.
RedisPushProcessor pushConsumer; ///< User-defined function to consume RESP3 push messages.
void *pushArg; ///< User-defined argument to pass along with push messages.
} ClientPrivate;


Expand All @@ -62,8 +60,11 @@ typedef struct {
int dbIndex; ///< the zero-based database index
char *username; ///< Redis user name (if any)
char *password; ///< Redis password (if any)

int timeoutMillis; ///< [ms] Socket read/write timeout
int protocol; ///< RESP version to use
boolean hello; ///< whether to use HELLO (introduced in Redis 6.0.0 only)
RESP *helloData; ///< RESP data received from server during last connection.

RedisClient *clients;

Expand All @@ -85,6 +86,9 @@ typedef struct {

pthread_mutex_t subscriberLock;
MessageConsumer *subscriberList;

RedisPushProcessor pushConsumer; ///< User-defined function to consume RESP3 push messages.
void *pushArg; ///< User-defined argument to pass along with push messages.
} RedisPrivate;


Expand All @@ -96,6 +100,7 @@ void rConfigUnlock(Redis *redis);
int rConnectClient(Redis *redis, enum redisx_channel channel);
void rCloseClient(RedisClient *cl);
boolean rIsLowLatency(const ClientPrivate *cp);
int rCheckClient(const RedisClient *cl);

// in resp.c ------------------------------>
int redisxAppendRESP(RESP *resp, RESP *part);
Expand Down
21 changes: 19 additions & 2 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
# define REDISX_LISTENER_REL_PRIORITY (0.5)
#endif

#ifndef REDISX_DEFAULT_TIMEOUT_MILLIS
/// [ms] Default socket read/write timeout for Redis clients
# define REDISX_DEFAULT_TIMEOUT_MILLIS 3000
#endif

// Various exposed constants ----------------------------------------------------->

/// API major version
Expand Down Expand Up @@ -304,14 +309,23 @@ typedef void (*RedisPipelineProcessor)(RESP *response);
* <li>If extensive processing or blocking calls are required to process the message, it is best to
* simply place a copy of the RESP on a queue and then return quickly, and then process the message
* asynchronously in a background thread.</li>
* <li>The client on which the push notification originated will be locked when this function is
* called, waiting for a response to an earlier query. Thus the implementation should not attempt to
* lock the client again or release the lock. It may send asynchronous requests on the client, e.g. via
* redisxSendRequestAsync(), but it should not try to read a response (given that the client is
* blocked for another read operation). If more flexible client access is needed, the implementation
* should make a copy of the RESP and place it on a queue for asynchronous processing by another thread.
* </li>
* </ul>
*
* @param cl The Redis client that sent the push. The client is locked for exlusive
* access when this function is called.
* @param message The RESP3 message that was pushed by the client
* @param ptr Additional data passed along.
*
* @sa redisxSetPushProcessor()
*/
typedef void (*RedisPushProcessor)(RESP *message, void *ptr);
typedef void (*RedisPushProcessor)(RedisClient *cl, RESP *message, void *ptr);



Expand All @@ -328,8 +342,10 @@ int redisxSetPassword(Redis *redis, const char *passwd);
int redisxSelectDB(Redis *redis, int idx);
int redisxSetProtocol(Redis *redis, enum redisx_protocol protocol);
enum redisx_protocol redisxGetProtocol(const Redis *redis);
RESP *redisxGetHelloData(Redis *redis);

Redis *redisxInit(const char *server);
int redisxCheckValid(const Redis *redis);
void redisxDestroy(Redis *redis);
int redisxConnect(Redis *redis, boolean usePipeline);
void redisxDisconnect(Redis *redis);
Expand Down Expand Up @@ -366,7 +382,7 @@ void redisxDestroyEntries(RedisEntry *entries, int count);
void redisxDestroyKeys(char **keys, int count);

int redisxSetPipelineConsumer(Redis *redis, RedisPipelineProcessor f);
int redisxSetPushProcessor(RedisClient *cl, RedisPushProcessor func, void *arg);
int redisxSetPushProcessor(Redis *redis, RedisPushProcessor func, void *arg);

int redisxPublish(Redis *redis, const char *channel, const char *message, int length);
int redisxNotify(Redis *redis, const char *channel, const char *message);
Expand Down Expand Up @@ -396,6 +412,7 @@ boolean redisxHasComponents(const RESP *r);
boolean redisxIsEqualRESP(const RESP *a, const RESP *b);
int redisxSplitText(RESP *resp, char **text);
XField *resp2XField(const char *name, const RESP *resp);
char *resp2json(const char *name, const RESP *resp);

RedisMapEntry *redisxGetMapEntry(const RESP *map, const RESP *key);
RedisMapEntry *redisxGetKeywordEntry(const RESP *map, const char *key);
Expand Down
Loading

0 comments on commit bbb74a4

Please sign in to comment.