From 1491595c142f155398ee6719bc9b393da181a3ac Mon Sep 17 00:00:00 2001 From: Eoghan Murray Date: Thu, 19 Nov 2020 14:02:12 +0000 Subject: [PATCH] Properly use MGET and MSET against a cluster by detecting uses of hash slots (which determines that all keys go to the same cluster node) --- aredis/commands/strings.py | 55 +++++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/aredis/commands/strings.py b/aredis/commands/strings.py index ecce15e9..e7d7963b 100644 --- a/aredis/commands/strings.py +++ b/aredis/commands/strings.py @@ -6,6 +6,7 @@ dict_merge, bool_ok, string_keys_to_dict) +from collections import OrderedDict, defaultdict class BitField(object): @@ -329,15 +330,30 @@ async def mget(self, keys, *args): Returns a list of values ordered identically to ``keys`` Cluster impl: - Itterate all keys and send GET for each key. - This will go alot slower than a normal mget call in StrictRedis. - - Operation is no longer atomic. - """ - res = list() - for arg in list_or_args(keys, args): - res.append(await self.get(arg)) - return res + Find groups of keys with the same hash tag + https://redis.io/topics/cluster-tutorial#redis-cluster-data-sharding + and execute an MGET for each group + execute individual GETs for all other keys passed in + + Operation will be atomic only if keys belonging to a single + hash tag are passed in. + """ + res_mapping = OrderedDict() + hash_tag_slots = defaultdict(list) + for key in list_or_args(keys, args): + if key.count('{') == key.count('}') == 1 and \ + -1 < key.index('{') < key.index('}') - 1: + hash_tag = key.split('{')[1].split('}')[0] + hash_tag_slots[hash_tag].append(key) + res_mapping[key] = None # establish correct ordering + else: + # a loose key without a hash tag, can't use MGET + res_mapping[key] = await self.get(key) + for mget_keys in hash_tag_slots.values(): + mget_res = await self.execute_command('MGET', *mget_keys) + for key, res in zip(mget_keys, mget_res): + res_mapping[key] = res + return list(res_mapping.values()) async def mset(self, *args, **kwargs): """ @@ -345,18 +361,31 @@ async def mset(self, *args, **kwargs): dictionary argument or as kwargs. Cluster impl: - Itterate over all items and do SET on each (k,v) pair + Find groups of keys with the same hash tag + https://redis.io/topics/cluster-tutorial#redis-cluster-data-sharding + and execute an MSET for each group + execute individual SETs for all other key/value pairs - Operation is no longer atomic. + Operation will be atomic only if keys belonging to a single + hash tag are passed in. """ if args: if len(args) != 1 or not isinstance(args[0], dict): raise RedisError('MSET requires **kwargs or a single dict arg') kwargs.update(args[0]) + hash_tag_slots = defaultdict(list) for pair in iteritems(kwargs): - await self.set(pair[0], pair[1]) - + key, v = pair + if key.count('{') == key.count('}') == 1 and \ + -1 < key.index('{') < key.index('}') - 1: + hash_tag = key.split('{')[1].split('}')[0] + hash_tag_slots[hash_tag].extend(pair) + else: + # a loose key without a hash tag, can't use MSET + await self.set(key, v) + for mset_items in hash_tag_slots.values(): + await self.execute_command('MSET', *mset_items) return True async def msetnx(self, *args, **kwargs):