From b451b43c564076acfe8de62eb001277876631a9a Mon Sep 17 00:00:00 2001 From: filipe oliveira Date: Mon, 21 Sep 2020 10:32:57 +0100 Subject: [PATCH] Added support for DUPLICATE_POLICY / ON_DUPLICATE keywords ( TS.INFO update accordingly ) (#66) * [add] Added support for DUPLICATE_POLICY / ON_DUPLICATE keywords ( TS.INFO update accordingly ) * [fix] Fixed CI build issues around pytest not being present --- .circleci/config.yml | 2 -- redistimeseries/client.py | 68 ++++++++++++++++++++++++++++++++++----- test_commands.py | 55 +++++++++++++++++++++++++++++-- 3 files changed, 112 insertions(+), 13 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5218f5d..0fa45c1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,8 +58,6 @@ jobs: . venv/bin/activate REDIS_PORT=6379 coverage run test_commands.py - - early_return_for_forked_pull_requests - - run: name: codecove command: | diff --git a/redistimeseries/client.py b/redistimeseries/client.py index 703221a..f01dba5 100644 --- a/redistimeseries/client.py +++ b/redistimeseries/client.py @@ -16,6 +16,7 @@ class TSInfo(object): # As of RedisTimeseries >= v1.4 max_samples_per_chunk is deprecated in favor of chunk_size max_samples_per_chunk = None chunk_size = None + duplicate_policy = None def __init__(self, args): @@ -34,6 +35,10 @@ def __init__(self, args): self.chunk_size = self.max_samples_per_chunk * 16 # backward compatible changes if 'chunkSize' in response: self.chunk_size = response['chunkSize'] + if 'duplicatePolicy' in response: + self.duplicate_policy = response['duplicatePolicy'] + if type(self.duplicate_policy) == bytes: + self.duplicate_policy = self.duplicate_policy.decode() def list_to_dict(aList): return {nativestr(aList[i][0]):nativestr(aList[i][1]) @@ -163,7 +168,15 @@ def appendChunkSize(params, chunk_size): if chunk_size is not None: params.extend(['CHUNK_SIZE', chunk_size]) - def create(self, key, retention_msecs=None, uncompressed=False, labels={}, chunk_size=None): + @staticmethod + def appendDuplicatePolicy(params, command, duplicate_policy): + if duplicate_policy is not None: + if command == 'TS.ADD': + params.extend(['ON_DUPLICATE', duplicate_policy]) + else: + params.extend(['DUPLICATE_POLICY', duplicate_policy]) + + def create(self, key, **kwargs): """ Create a new time-series. @@ -177,28 +190,45 @@ def create(self, key, retention_msecs=None, uncompressed=False, labels={}, chunk labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + duplicate_policy: since RedisTimeSeries v1.4 you can specify the duplicate sample policy ( Configure what to do on duplicate sample. ) + Can be one of: + - 'block': an error will occur for any out of order sample + - 'first': ignore the new value + - 'last': override with latest value + - 'min': only override if the value is lower than the existing value + - 'max': only override if the value is higher than the existing value + When this is not set, the server-wide default will be used. """ + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) + duplicate_policy = kwargs.get('duplicate_policy', None) params = [key] self.appendRetention(params, retention_msecs) self.appendUncompressed(params, uncompressed) self.appendChunkSize(params, chunk_size) + self.appendDuplicatePolicy(params, self.CREATE_CMD, duplicate_policy) self.appendLabels(params, labels) return self.redis.execute_command(self.CREATE_CMD, *params) - def alter(self, key, retention_msecs=None, labels={}): + def alter(self, key, **kwargs): """ Update the retention, labels of an existing key. The parameters are the same as TS.CREATE. """ + retention_msecs = kwargs.get('retention_msecs', None) + labels = kwargs.get('labels', {}) + duplicate_policy = kwargs.get('duplicate_policy', None) params = [key] self.appendRetention(params, retention_msecs) + self.appendDuplicatePolicy(params, self.ALTER_CMD, duplicate_policy) self.appendLabels(params, labels) return self.redis.execute_command(self.ALTER_CMD, *params) - def add(self, key, timestamp, value, retention_msecs=None, - uncompressed=False, labels={}, chunk_size=None): + def add(self, key, timestamp, value, **kwargs): """ Append (or create and append) a new sample to the series. @@ -214,11 +244,25 @@ def add(self, key, timestamp, value, retention_msecs=None, labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + duplicate_policy: since RedisTimeSeries v1.4 you can specify the duplicate sample policy ( Configure what to do on duplicate sample. ) + Can be one of: + - 'block': an error will occur for any out of order sample + - 'first': ignore the new value + - 'last': override with latest value + - 'min': only override if the value is lower than the existing value + - 'max': only override if the value is higher than the existing value + When this is not set, the server-wide default will be used. """ + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) + duplicate_policy = kwargs.get('duplicate_policy', None) params = [key, timestamp, value] self.appendRetention(params, retention_msecs) self.appendUncompressed(params, uncompressed) self.appendChunkSize(params, chunk_size) + self.appendDuplicatePolicy(params, self.ADD_CMD, duplicate_policy) self.appendLabels(params, labels) return self.redis.execute_command(self.ADD_CMD, *params) @@ -237,8 +281,7 @@ def madd(self, ktv_tuples): return self.redis.execute_command(self.MADD_CMD, *params) - def incrby(self, key, value, timestamp=None, retention_msecs=None, - uncompressed=False, labels={}, chunk_size=None): + def incrby(self, key, value, **kwargs): """ Increment (or create an time-series and increment) the latest sample's of a series. This command can be used as a counter or gauge that automatically gets history as a time series. @@ -256,6 +299,11 @@ def incrby(self, key, value, timestamp=None, retention_msecs=None, chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). """ + timestamp = kwargs.get('timestamp', None) + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) params = [key, value] self.appendTimestamp(params, timestamp) self.appendRetention(params, retention_msecs) @@ -265,8 +313,7 @@ def incrby(self, key, value, timestamp=None, retention_msecs=None, return self.redis.execute_command(self.INCRBY_CMD, *params) - def decrby(self, key, value, timestamp=None, retention_msecs=None, - uncompressed=False, labels={}, chunk_size=None): + def decrby(self, key, value, **kwargs): """ Decrement (or create an time-series and decrement) the latest sample's of a series. This command can be used as a counter or gauge that automatically gets history as a time series. @@ -284,6 +331,11 @@ def decrby(self, key, value, timestamp=None, retention_msecs=None, chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). """ + timestamp = kwargs.get('timestamp', None) + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) params = [key, value] self.appendTimestamp(params, timestamp) self.appendRetention(params, retention_msecs) diff --git a/test_commands.py b/test_commands.py index 8c7fb6b..8c4115c 100644 --- a/test_commands.py +++ b/test_commands.py @@ -40,21 +40,35 @@ def testCreate(self): info = rts.info("time-serie-1") self.assertEqual(128, info.chunk_size) + # Test for duplicate policy + for duplicate_policy in ["block","last","first","min","max"]: + ts_name = "time-serie-ooo-{0}".format(duplicate_policy) + self.assertTrue(rts.create(ts_name, duplicate_policy=duplicate_policy)) + info = rts.info(ts_name) + self.assertEqual(duplicate_policy, info.duplicate_policy) def testAlter(self): '''Test TS.ALTER calls''' - rts.create(1) + self.assertTrue(rts.create(1)) self.assertEqual(0, rts.info(1).retention_msecs) - rts.alter(1, retention_msecs=10) + self.assertTrue(rts.alter(1, retention_msecs=10)) self.assertEqual({}, rts.info(1).labels) self.assertEqual(10, rts.info(1).retention_msecs) - rts.alter(1, labels={'Time':'Series'}) + self.assertTrue(rts.alter(1, labels={'Time':'Series'})) self.assertEqual('Series', rts.info(1).labels['Time']) self.assertEqual(10, rts.info(1).retention_msecs) pipe = rts.pipeline() self.assertTrue(pipe.create(2)) + if version is None or version < 14000: + return + info = rts.info(1) + self.assertEqual(None, info.duplicate_policy) + self.assertTrue(rts.alter(1, duplicate_policy='min')) + info = rts.info(1) + self.assertEqual('min', info.duplicate_policy) + def testAdd(self): '''Test TS.ADD calls''' @@ -76,6 +90,34 @@ def testAdd(self): info = rts.info("time-serie-1") self.assertEqual(128, info.chunk_size) + # Test for duplicate policy BLOCK + self.assertEqual(1, rts.add("time-serie-add-ooo-block", 1, 5.0)) + try: + rts.add("time-serie-add-ooo-block", 1, 5.0, duplicate_policy='block') + except Exception as e: + self.assertEqual("TSDB: Error at upsert, update is not supported in BLOCK mode",e.__str__()) + + # Test for duplicate policy LAST + self.assertEqual(1, rts.add("time-serie-add-ooo-last", 1, 5.0)) + self.assertEqual(1, rts.add("time-serie-add-ooo-last", 1, 10.0, duplicate_policy='last')) + self.assertEqual(10.0, rts.get("time-serie-add-ooo-last")[1]) + + # Test for duplicate policy FIRST + self.assertEqual(1, rts.add("time-serie-add-ooo-first", 1, 5.0)) + self.assertEqual(1, rts.add("time-serie-add-ooo-first", 1, 10.0, duplicate_policy='first')) + self.assertEqual(5.0, rts.get("time-serie-add-ooo-first")[1]) + + # Test for duplicate policy MAX + self.assertEqual(1, rts.add("time-serie-add-ooo-max", 1, 5.0)) + self.assertEqual(1, rts.add("time-serie-add-ooo-max", 1, 10.0, duplicate_policy='max')) + self.assertEqual(10.0, rts.get("time-serie-add-ooo-max")[1]) + + # Test for duplicate policy MIN + self.assertEqual(1, rts.add("time-serie-add-ooo-min", 1, 5.0)) + self.assertEqual(1, rts.add("time-serie-add-ooo-min", 1, 10.0, duplicate_policy='min')) + self.assertEqual(5.0, rts.get("time-serie-add-ooo-min")[1]) + + def testMAdd(self): '''Test TS.MADD calls''' @@ -259,6 +301,13 @@ def testInfo(self): info = rts.info(1) self.assertEqual(5, info.retention_msecs) self.assertEqual(info.labels['currentLabel'], 'currentData') + if version is None or version < 14000: + return + self.assertEqual(None, info.duplicate_policy) + + rts.create('time-serie-2', duplicate_policy='min') + info = rts.info('time-serie-2') + self.assertEqual('min', info.duplicate_policy) def testQueryIndex(self): '''Test TS.QUERYINDEX calls'''