Skip to content

Commit

Permalink
Initial cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Jan 4, 2025
1 parent fd212e0 commit c3a8086
Show file tree
Hide file tree
Showing 11 changed files with 742 additions and 115 deletions.
26 changes: 9 additions & 17 deletions .cproject
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
</extensions>
</storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0">
<configuration buildProperties="" description="" id="cdt.managedbuild.toolchain.gnu.base.2018240843" name="Default" parent="org.eclipse.cdt.build.core.emptycfg">
<configuration artifactName="${ProjName}" buildProperties="" description="" id="cdt.managedbuild.toolchain.gnu.base.2018240843" name="Default" parent="org.eclipse.cdt.build.core.emptycfg">
<folderInfo id="cdt.managedbuild.toolchain.gnu.base.2018240843.338203313" name="/" resourcePath="">
<toolChain id="cdt.managedbuild.toolchain.gnu.base.822348769" name="Linux GCC" superClass="cdt.managedbuild.toolchain.gnu.base">
<targetPlatform archList="all" binaryParser="org.eclipse.cdt.core.GNU_ELF" id="cdt.managedbuild.target.gnu.platform.base.2098629020" name="Debug Platform" osList="linux,hpux,aix,qnx" superClass="cdt.managedbuild.target.gnu.platform.base"/>
Expand All @@ -36,27 +36,15 @@
</tool>
</toolChain>
</folderInfo>
<folderInfo id="cdt.managedbuild.toolchain.gnu.base.2018240843.1756755366" name="/" resourcePath="examples">
<toolChain id="cdt.managedbuild.toolchain.gnu.base.1872719987" name="Linux GCC" superClass="cdt.managedbuild.toolchain.gnu.base" unusedChildren="">
<tool id="cdt.managedbuild.tool.gnu.archiver.base.142140314" name="GCC Archiver" superClass="cdt.managedbuild.tool.gnu.archiver.base.1422446010"/>
<tool id="cdt.managedbuild.tool.gnu.cpp.compiler.base.846657740" name="GCC C++ Compiler" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.base.1815812035"/>
<tool id="cdt.managedbuild.tool.gnu.c.compiler.base.1716784206" name="GCC C Compiler" superClass="cdt.managedbuild.tool.gnu.c.compiler.base.415530852">
<inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.2140904408" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
</tool>
<tool id="cdt.managedbuild.tool.gnu.c.linker.base.2010450577" name="GCC C Linker" superClass="cdt.managedbuild.tool.gnu.c.linker.base.31299320"/>
<tool id="cdt.managedbuild.tool.gnu.cpp.linker.base.754977188" name="GCC C++ Linker" superClass="cdt.managedbuild.tool.gnu.cpp.linker.base.1554120195"/>
<tool id="cdt.managedbuild.tool.gnu.assembler.base.559620659" name="GCC Assembler" superClass="cdt.managedbuild.tool.gnu.assembler.base.281878813">
<inputType id="cdt.managedbuild.tool.gnu.assembler.input.957053907" superClass="cdt.managedbuild.tool.gnu.assembler.input"/>
</tool>
</toolChain>
</folderInfo>
<sourceEntries>
<entry excluding="examples" flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name=""/>
<entry flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="examples"/>
</sourceEntries>
</configuration>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.core.externalSettings"/>
<storageModule moduleId="org.eclipse.cdt.core.externalSettings">
<externalSettings containerId="xchange;" factoryId="org.eclipse.cdt.core.cfg.export.settings.sipplier"/>
</storageModule>
</cconfiguration>
</storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0">
Expand All @@ -71,5 +59,9 @@
</scannerConfigBuildInfo>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.internal.ui.text.commentOwnerProjectMappings"/>
<storageModule moduleId="refreshScope"/>
<storageModule moduleId="refreshScope" versionNumber="2">
<configuration configurationName="Default">
<resource resourceType="PROJECT" workspacePath="/redisx"/>
</configuration>
</storageModule>
</cproject>
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ distclean:
# ----------------------------------------------------------------------------

SOURCES = $(SRC)/redisx.c $(SRC)/resp.c $(SRC)/redisx-net.c $(SRC)/redisx-hooks.c $(SRC)/redisx-client.c \
$(SRC)/redisx-tab.c $(SRC)/redisx-sub.c $(SRC)/redisx-script.c
$(SRC)/redisx-tab.c $(SRC)/redisx-sub.c $(SRC)/redisx-script.c $(SRC)/redisx-cluster.c

# Generate a list of object (obj/*.o) files from the input sources
OBJECTS := $(subst $(SRC),$(OBJ),$(SOURCES))
Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,9 @@ with that response (or `NULL` if there was an error).
### Bundled Attributes

Redis 6 introduced the possibility of sending optional attributes along with responses, using the RESP3 protocol.
These attributes are not included in the responses sent to users, in accordance with RESP3 protocol. Rather, they are
made available to users on demand, when needed, after the response to a request is received. You may retrieve the
attributes to interactive requests _after_ the `redisxRequest()` or `redisxArrayRequest()` queries, using
`redisxGetAttributes()`, e.g.:
These attributes are not returned to users by default, in line with the RESP3 protocol specification. Rather, they
are available on demand, after the response to a request is received. You may retrieve the attributes to interactive
requests _after_ the `redisxRequest()` or `redisxArrayRequest()` queries, using `redisxGetAttributes()`, e.g.:

```c
...
Expand Down
8 changes: 8 additions & 0 deletions config.mk
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ BIN ?= bin
# Compiler: use gcc by default
CC ?= gcc

# Whether to use OpenMP
WITH_OPENMP = 1

# Add include/ directory
CPPFLAGS += -I$(INC)

Expand Down Expand Up @@ -64,6 +67,11 @@ CHECKOPTS += --inline-suppr $(CHECKEXTRA)
# Link against math libs (for e.g. isnan()), and xchange dependency
LDFLAGS += -lm -lxchange

ifeq ($(WITH_OPENMP),1)
CFLAGS += -fopenmp
LDFLAGS += -fopenmp
endif

# Search for libraries under LIB
ifneq ($(findstring $(LIB),$(LD_LIBRARY_PATH)),$LIB)
LDFLAGS += -L$(LIB)
Expand Down
50 changes: 35 additions & 15 deletions include/redisx-priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,26 +60,43 @@ typedef struct {
int timeoutMillis; ///< [ms] Connection timeout for sentinel nodes.
} RedisSentinel;


/**
* A set of configuration settings for a Redis server connection.
*
*/
typedef struct {
RedisSentinel *sentinel; ///< Sentinel (high-availability) server configuration.
uint32_t addr; ///< The 32-bit inet address
int port; ///< port number (usually 6379)
int dbIndex; ///< the zero-based database index
char *username; ///< Redis user name (if any)
char *password; ///< Redis password (if any)

int timeoutMillis; ///< [ms] Socket read/write timeout
int tcpBufSize; ///< [bytes] TCP read/write buffer sizes to use
int protocol; ///< RESP version to use
boolean hello; ///< whether to use HELLO (introduced in Redis 6.0.0 only)
RESP *helloData; ///< RESP data received from server during last connection.
RedisSocketConfigurator socketConf; ///< Additional user configuration of client sockets

RedisClient *clients;
Hook *firstCleanupCall; ///< Linked list of cleanup calls
Hook *firstConnectCall; ///< Linked list of connection calls

void (*pipelineConsumerFunc)(RESP *response); ///< Callback function to process pipelined responses
RedisErrorHandler transmitErrorFunc; ///< Callback function to process socket-level errors

RedisPushProcessor pushConsumer; ///< User-defined function to consume RESP3 push messages.
void *pushArg; ///< User-defined argument to pass along with push messages.
} RedisConfig;

typedef struct {
char *hostname; ///< Server host name or IP address
int port; ///< port number (usually 6379)

RedisSentinel *sentinel; ///< Sentinel (high-availability) server configuration.
uint32_t addr; ///< The 32-bit inet address

RedisConfig config;
pthread_mutex_t configLock;

RESP *helloData; ///< RESP data received from server during last connection.

RedisClient *clients;
int scanCount; ///< Count argument to use in SCAN commands, or <= 0 for default

pthread_t pipelineListenerTID;
Expand All @@ -88,19 +105,14 @@ typedef struct {
boolean isPipelineListenerEnabled;
boolean isSubscriptionListenerEnabled;

Hook *firstCleanupCall;
Hook *firstConnectCall;

void (*pipelineConsumerFunc)(RESP *response);
RedisErrorHandler transmitErrorFunc;

pthread_mutex_t subscriberLock;
MessageConsumer *subscriberList;

RedisPushProcessor pushConsumer; ///< User-defined function to consume RESP3 push messages.
void *pushArg; ///< User-defined argument to pass along with push messages.
} RedisPrivate;

// in redisx.c ---------------------------->
int rCopyConfig(const RedisConfig *src, Redis *dst);
void rDestroyConfig(RedisConfig *config);

// in redisx-sub.c ------------------------>
int rConfigLock(Redis *redis);
Expand All @@ -113,6 +125,14 @@ void rCloseClientAsync(RedisClient *cl);
boolean rIsLowLatency(const ClientPrivate *cp);
int rCheckClient(const RedisClient *cl);

// in redisx-hooks.c ---------------------->
Hook *rCopyHooks(const Hook *list, Redis *owner);
void rClearHooks(Hook *first);

// in redisx-cluster.c -------------------->
int rClusterRefresh(RedisCluster *cluster);
uint16_t rCalcHash(const char *key);

// in resp.c ------------------------------>
int redisxAppendRESP(RESP *resp, RESP *part);

Expand Down
20 changes: 20 additions & 0 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ typedef struct RedisEntry {
int length; ///< Bytes in value.
} RedisEntry;





/**
* Redis server host and port specification.
*
Expand All @@ -210,6 +214,16 @@ typedef struct RedisServer {
int port; ///< The port number or &lt;=0 to use the default 6379
} RedisServer;



/**
* A Redis cluster configuration
*
*/
typedef struct {
void *priv; ///< Private data not exposed to users.
} RedisCluster;

/**
* \brief Structure that represents a single Redis client connection instance.
*
Expand Down Expand Up @@ -393,6 +407,12 @@ int redisxConnect(Redis *redis, boolean usePipeline);
void redisxDisconnect(Redis *redis);
int redisxReconnect(Redis *redis, boolean usePipeline);

RedisCluster *redisxClusterInit(Redis *node);
Redis *redisxClusterGetShard(RedisCluster *cluster, const char *key);
int redisxClusterConnect(RedisCluster *cluster);
int redisxClusterDisconnect(RedisCluster *cluster);
void redisxClusterDestroy(RedisCluster *cluster);

int redisxPing(Redis *redis, const char *message);
enum redisx_protocol redisxGetProtocol(Redis *redis);
XLookupTable *redisxGetInfo(Redis *redis, const char *parameter);
Expand Down
6 changes: 3 additions & 3 deletions src/redisx-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ static int rTransmitErrorAsync(ClientPrivate *cp, const char *op) {
if(cp->isEnabled) {
RedisPrivate *p = (RedisPrivate *) cp->redis->priv;
// Let the handler disconnect, if it wants to....
if(p->transmitErrorFunc) {
p->transmitErrorFunc(cp->redis, cp->idx, op);
if(p->config.transmitErrorFunc) {
p->config.transmitErrorFunc(cp->redis, cp->idx, op);
if(cp->isEnabled) {
if(errno == EAGAIN || errno == EWOULDBLOCK) status = x_error(X_TIMEDOUT, errno, "rTransmitErrorAsync", "%s timed out on %s channel %d\n", op, cp->redis->id, cp->idx);
else status = x_error(X_NO_SERVICE, errno, "rTransmitErrorAsync", "%s failed on %s channel %d\n", op, cp->redis->id, cp->idx);
Expand Down Expand Up @@ -770,7 +770,7 @@ static void rPushMessageAsync(RedisClient *cl, RESP *resp) {
else redisxDestroyRESP(r);
}

if(p->pushConsumer) p->pushConsumer(cl, resp, p->pushArg);
if(p->config.pushConsumer) p->config.pushConsumer(cl, resp, p->config.pushArg);

redisxDestroyRESP(resp);
}
Expand Down
Loading

0 comments on commit c3a8086

Please sign in to comment.